diidi_travel_common_queue/
decorator.rs1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use async_trait::async_trait;
5
6use crate::queue::{NackAction, Queue, QueueDelivery, QueueMessage, QueueReceipt};
7use crate::QueueResult;
8
9pub const LOG_TARGET: &str = "diidi::queue";
10
11#[derive(Clone)]
12pub struct LoggingQueue {
13 inner: Arc<dyn Queue>,
14 target: &'static str,
15 slow_threshold: Option<Duration>,
16}
17
18impl std::fmt::Debug for LoggingQueue {
19 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 f.debug_struct("LoggingQueue")
21 .field("inner", &self.inner.name())
22 .field("target", &self.target)
23 .field("slow_threshold", &self.slow_threshold)
24 .finish()
25 }
26}
27
28impl LoggingQueue {
29 pub fn new(inner: Arc<dyn Queue>) -> Self {
30 Self { inner, target: LOG_TARGET, slow_threshold: None }
31 }
32
33 pub fn with_target(mut self, target: &'static str) -> Self {
34 self.target = target;
35 self
36 }
37
38 pub fn with_slow_threshold(mut self, threshold: Duration) -> Self {
39 self.slow_threshold = Some(threshold);
40 self
41 }
42
43 fn provider(&self) -> &'static str {
44 self.inner.name()
45 }
46
47 fn check_slow(&self, op: &'static str, topic: &str, elapsed: Duration) {
48 if let Some(threshold) = self.slow_threshold
49 && elapsed >= threshold
50 {
51 tracing::warn!(
52 target: LOG_TARGET,
53 provider = self.provider(),
54 op,
55 topic,
56 elapsed_us = elapsed.as_micros() as u64,
57 "queue op slow",
58 );
59 }
60 }
61}
62
63#[async_trait]
64impl Queue for LoggingQueue {
65 fn name(&self) -> &'static str {
66 self.inner.name()
67 }
68
69 fn features(&self) -> crate::feature::QueueFeatures {
70 self.inner.features()
71 }
72
73 async fn publish(&self, message: QueueMessage) -> QueueResult<()> {
74 let topic = message.topic.clone();
75 let bytes = message.payload.len();
76 let start = Instant::now();
77 let res = self.inner.publish(message).await;
78 let elapsed = start.elapsed();
79 self.check_slow("publish", &topic, elapsed);
80 let provider = self.provider();
81 match &res {
82 Ok(()) => tracing::debug!(
83 target: LOG_TARGET,
84 provider,
85 op = "publish",
86 topic,
87 bytes,
88 elapsed_us = elapsed.as_micros() as u64,
89 "queue publish",
90 ),
91 Err(e) => tracing::warn!(
92 target: LOG_TARGET,
93 provider,
94 op = "publish",
95 topic,
96 elapsed_us = elapsed.as_micros() as u64,
97 error = %e,
98 "queue error",
99 ),
100 }
101 res
102 }
103
104 async fn receive(&self) -> QueueResult<QueueDelivery> {
105 let start = Instant::now();
106 let res = self.inner.receive().await;
107 let elapsed = start.elapsed();
108 let provider = self.provider();
109 match &res {
110 Ok(delivery) => tracing::debug!(
111 target: LOG_TARGET,
112 provider,
113 op = "receive",
114 topic = delivery.message.topic,
115 bytes = delivery.message.payload.len(),
116 elapsed_us = elapsed.as_micros() as u64,
117 "queue receive",
118 ),
119 Err(e) => tracing::warn!(
120 target: LOG_TARGET,
121 provider,
122 op = "receive",
123 elapsed_us = elapsed.as_micros() as u64,
124 error = %e,
125 "queue error",
126 ),
127 }
128 res
129 }
130
131 async fn ack(&self, receipt: QueueReceipt) -> QueueResult<()> {
132 let start = Instant::now();
133 let res = self.inner.ack(receipt).await;
134 let elapsed = start.elapsed();
135 let provider = self.provider();
136 match &res {
137 Ok(()) => tracing::debug!(
138 target: LOG_TARGET,
139 provider,
140 op = "ack",
141 elapsed_us = elapsed.as_micros() as u64,
142 "queue ack",
143 ),
144 Err(e) => tracing::warn!(
145 target: LOG_TARGET,
146 provider,
147 op = "ack",
148 elapsed_us = elapsed.as_micros() as u64,
149 error = %e,
150 "queue error",
151 ),
152 }
153 res
154 }
155
156 async fn nack(&self, receipt: QueueReceipt, action: NackAction) -> QueueResult<()> {
157 let start = Instant::now();
158 let res = self.inner.nack(receipt, action).await;
159 let elapsed = start.elapsed();
160 let provider = self.provider();
161 match &res {
162 Ok(()) => tracing::debug!(
163 target: LOG_TARGET,
164 provider,
165 op = "nack",
166 action = action.as_str(),
167 elapsed_us = elapsed.as_micros() as u64,
168 "queue nack",
169 ),
170 Err(e) => tracing::warn!(
171 target: LOG_TARGET,
172 provider,
173 op = "nack",
174 action = action.as_str(),
175 elapsed_us = elapsed.as_micros() as u64,
176 error = %e,
177 "queue error",
178 ),
179 }
180 res
181 }
182
183 async fn ping(&self) -> QueueResult<()> {
184 let start = Instant::now();
185 let res = self.inner.ping().await;
186 let elapsed = start.elapsed();
187 let provider = self.provider();
188 match &res {
189 Ok(()) => tracing::trace!(
190 target: LOG_TARGET,
191 provider,
192 op = "ping",
193 elapsed_us = elapsed.as_micros() as u64,
194 "queue ping",
195 ),
196 Err(e) => tracing::warn!(
197 target: LOG_TARGET,
198 provider,
199 op = "ping",
200 error = %e,
201 "queue ping failed",
202 ),
203 }
204 res
205 }
206}
207
208pub fn with_logging(inner: Arc<dyn Queue>) -> Arc<dyn Queue> {
209 Arc::new(LoggingQueue::new(inner))
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use async_trait::async_trait;
216 use bytes::Bytes;
217 use std::sync::atomic::{AtomicUsize, Ordering};
218
219 #[derive(Debug, Default)]
220 struct CountingQueue {
221 publishes: AtomicUsize,
222 }
223
224 #[async_trait]
225 impl Queue for CountingQueue {
226 fn name(&self) -> &'static str {
227 "counting"
228 }
229
230 async fn publish(&self, _: QueueMessage) -> QueueResult<()> {
231 self.publishes.fetch_add(1, Ordering::Relaxed);
232 Ok(())
233 }
234
235 async fn receive(&self) -> QueueResult<QueueDelivery> {
236 Ok(QueueDelivery::new(QueueMessage::new("topic", Bytes::from_static(b"x")), QueueReceipt::new("r")))
237 }
238
239 async fn ack(&self, _: QueueReceipt) -> QueueResult<()> {
240 Ok(())
241 }
242 }
243
244 #[tokio::test]
245 async fn decorator_forwards_to_inner() {
246 let inner = Arc::new(CountingQueue::default());
247 let queue = with_logging(inner.clone());
248 queue.publish(QueueMessage::new("topic", Bytes::from_static(b"v"))).await.unwrap();
249 assert_eq!(inner.publishes.load(Ordering::Relaxed), 1);
250 }
251}