Skip to main content

diidi_travel_common_queue/
decorator.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use async_trait::async_trait;
5
6use crate::queue::{NackAction, Queue, QueueDelivery, QueueMessage, QueueReceipt};
7use crate::QueueResult;
8
9pub const LOG_TARGET: &str = "diidi::queue";
10
11#[derive(Clone)]
12pub struct LoggingQueue {
13  inner: Arc<dyn Queue>,
14  target: &'static str,
15  slow_threshold: Option<Duration>,
16}
17
18impl std::fmt::Debug for LoggingQueue {
19  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20    f.debug_struct("LoggingQueue")
21      .field("inner", &self.inner.name())
22      .field("target", &self.target)
23      .field("slow_threshold", &self.slow_threshold)
24      .finish()
25  }
26}
27
28impl LoggingQueue {
29  pub fn new(inner: Arc<dyn Queue>) -> Self {
30    Self { inner, target: LOG_TARGET, slow_threshold: None }
31  }
32
33  pub fn with_target(mut self, target: &'static str) -> Self {
34    self.target = target;
35    self
36  }
37
38  pub fn with_slow_threshold(mut self, threshold: Duration) -> Self {
39    self.slow_threshold = Some(threshold);
40    self
41  }
42
43  fn provider(&self) -> &'static str {
44    self.inner.name()
45  }
46
47  fn check_slow(&self, op: &'static str, topic: &str, elapsed: Duration) {
48    if let Some(threshold) = self.slow_threshold
49      && elapsed >= threshold
50    {
51      tracing::warn!(
52        target: LOG_TARGET,
53        provider = self.provider(),
54        op,
55        topic,
56        elapsed_us = elapsed.as_micros() as u64,
57        "queue op slow",
58      );
59    }
60  }
61}
62
63#[async_trait]
64impl Queue for LoggingQueue {
65  fn name(&self) -> &'static str {
66    self.inner.name()
67  }
68
69  fn features(&self) -> crate::feature::QueueFeatures {
70    self.inner.features()
71  }
72
73  async fn publish(&self, message: QueueMessage) -> QueueResult<()> {
74    let topic = message.topic.clone();
75    let bytes = message.payload.len();
76    let start = Instant::now();
77    let res = self.inner.publish(message).await;
78    let elapsed = start.elapsed();
79    self.check_slow("publish", &topic, elapsed);
80    let provider = self.provider();
81    match &res {
82      Ok(()) => tracing::debug!(
83        target: LOG_TARGET,
84        provider,
85        op = "publish",
86        topic,
87        bytes,
88        elapsed_us = elapsed.as_micros() as u64,
89        "queue publish",
90      ),
91      Err(e) => tracing::warn!(
92        target: LOG_TARGET,
93        provider,
94        op = "publish",
95        topic,
96        elapsed_us = elapsed.as_micros() as u64,
97        error = %e,
98        "queue error",
99      ),
100    }
101    res
102  }
103
104  async fn receive(&self) -> QueueResult<QueueDelivery> {
105    let start = Instant::now();
106    let res = self.inner.receive().await;
107    let elapsed = start.elapsed();
108    let provider = self.provider();
109    match &res {
110      Ok(delivery) => tracing::debug!(
111        target: LOG_TARGET,
112        provider,
113        op = "receive",
114        topic = delivery.message.topic,
115        bytes = delivery.message.payload.len(),
116        elapsed_us = elapsed.as_micros() as u64,
117        "queue receive",
118      ),
119      Err(e) => tracing::warn!(
120        target: LOG_TARGET,
121        provider,
122        op = "receive",
123        elapsed_us = elapsed.as_micros() as u64,
124        error = %e,
125        "queue error",
126      ),
127    }
128    res
129  }
130
131  async fn ack(&self, receipt: QueueReceipt) -> QueueResult<()> {
132    let start = Instant::now();
133    let res = self.inner.ack(receipt).await;
134    let elapsed = start.elapsed();
135    let provider = self.provider();
136    match &res {
137      Ok(()) => tracing::debug!(
138        target: LOG_TARGET,
139        provider,
140        op = "ack",
141        elapsed_us = elapsed.as_micros() as u64,
142        "queue ack",
143      ),
144      Err(e) => tracing::warn!(
145        target: LOG_TARGET,
146        provider,
147        op = "ack",
148        elapsed_us = elapsed.as_micros() as u64,
149        error = %e,
150        "queue error",
151      ),
152    }
153    res
154  }
155
156  async fn nack(&self, receipt: QueueReceipt, action: NackAction) -> QueueResult<()> {
157    let start = Instant::now();
158    let res = self.inner.nack(receipt, action).await;
159    let elapsed = start.elapsed();
160    let provider = self.provider();
161    match &res {
162      Ok(()) => tracing::debug!(
163        target: LOG_TARGET,
164        provider,
165        op = "nack",
166        action = action.as_str(),
167        elapsed_us = elapsed.as_micros() as u64,
168        "queue nack",
169      ),
170      Err(e) => tracing::warn!(
171        target: LOG_TARGET,
172        provider,
173        op = "nack",
174        action = action.as_str(),
175        elapsed_us = elapsed.as_micros() as u64,
176        error = %e,
177        "queue error",
178      ),
179    }
180    res
181  }
182
183  async fn ping(&self) -> QueueResult<()> {
184    let start = Instant::now();
185    let res = self.inner.ping().await;
186    let elapsed = start.elapsed();
187    let provider = self.provider();
188    match &res {
189      Ok(()) => tracing::trace!(
190        target: LOG_TARGET,
191        provider,
192        op = "ping",
193        elapsed_us = elapsed.as_micros() as u64,
194        "queue ping",
195      ),
196      Err(e) => tracing::warn!(
197        target: LOG_TARGET,
198        provider,
199        op = "ping",
200        error = %e,
201        "queue ping failed",
202      ),
203    }
204    res
205  }
206}
207
208pub fn with_logging(inner: Arc<dyn Queue>) -> Arc<dyn Queue> {
209  Arc::new(LoggingQueue::new(inner))
210}
211
212#[cfg(test)]
213mod tests {
214  use super::*;
215  use async_trait::async_trait;
216  use bytes::Bytes;
217  use std::sync::atomic::{AtomicUsize, Ordering};
218
219  #[derive(Debug, Default)]
220  struct CountingQueue {
221    publishes: AtomicUsize,
222  }
223
224  #[async_trait]
225  impl Queue for CountingQueue {
226    fn name(&self) -> &'static str {
227      "counting"
228    }
229
230    async fn publish(&self, _: QueueMessage) -> QueueResult<()> {
231      self.publishes.fetch_add(1, Ordering::Relaxed);
232      Ok(())
233    }
234
235    async fn receive(&self) -> QueueResult<QueueDelivery> {
236      Ok(QueueDelivery::new(QueueMessage::new("topic", Bytes::from_static(b"x")), QueueReceipt::new("r")))
237    }
238
239    async fn ack(&self, _: QueueReceipt) -> QueueResult<()> {
240      Ok(())
241    }
242  }
243
244  #[tokio::test]
245  async fn decorator_forwards_to_inner() {
246    let inner = Arc::new(CountingQueue::default());
247    let queue = with_logging(inner.clone());
248    queue.publish(QueueMessage::new("topic", Bytes::from_static(b"v"))).await.unwrap();
249    assert_eq!(inner.publishes.load(Ordering::Relaxed), 1);
250  }
251}