foxtive_worker/middleware/
tracing.rs1use async_trait::async_trait;
2
3use crate::message::ReceivedMessage;
4use crate::middleware::{MessageHandler, Middleware, MiddlewareResult};
5
6pub struct TracingMiddleware {
24 service_name: String,
25}
26
27impl std::fmt::Debug for TracingMiddleware {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 f.debug_struct("TracingMiddleware")
30 .field("service_name", &self.service_name)
31 .finish()
32 }
33}
34
35impl TracingMiddleware {
36 pub fn new(service_name: impl Into<String>) -> Self {
41 Self {
42 service_name: service_name.into(),
43 }
44 }
45}
46
47#[async_trait]
48impl Middleware for TracingMiddleware {
49 fn name(&self) -> &str {
50 "tracing"
51 }
52
53 async fn handle(
54 &self,
55 message: ReceivedMessage<serde_json::Value>,
56 next: Box<dyn MessageHandler>,
57 ) -> Result<MiddlewareResult, crate::error::WorkerError> {
58 let message_id = message.message.id.clone();
59 let source = message.message.metadata.source.clone();
60
61 #[cfg(feature = "tracing")]
63 {
64 tracing::info!(
65 service = self.service_name.as_str(),
66 message_id = message_id.as_str(),
67 source = source.as_str(),
68 attempt = message.message.metadata.attempt,
69 "Processing message"
70 );
71 }
72
73 #[cfg(not(feature = "tracing"))]
74 {
75 println!(
76 "[{}] Processing message {} from {} (attempt {})",
77 self.service_name, message_id, source, message.message.metadata.attempt
78 );
79 }
80
81 let start_time = std::time::Instant::now();
82
83 let result = next.handle(message).await;
85
86 let elapsed = start_time.elapsed();
87
88 match &result {
90 Ok(MiddlewareResult::Continue) | Ok(MiddlewareResult::Acknowledged) => {
91 #[cfg(feature = "tracing")]
92 {
93 tracing::info!(
94 service = self.service_name.as_str(),
95 message_id = message_id.as_str(),
96 duration_ms = elapsed.as_millis(),
97 "Message processed successfully"
98 );
99 }
100
101 #[cfg(not(feature = "tracing"))]
102 {
103 println!(
104 "[{}] ✓ Message {} processed in {}ms",
105 self.service_name,
106 message_id,
107 elapsed.as_millis()
108 );
109 }
110 }
111 Err(e) => {
112 #[cfg(feature = "tracing")]
113 {
114 tracing::error!(
115 service = self.service_name.as_str(),
116 message_id = message_id.as_str(),
117 duration_ms = elapsed.as_millis(),
118 error = e.to_string().as_str(),
119 "Message processing failed"
120 );
121 }
122
123 #[cfg(not(feature = "tracing"))]
124 {
125 println!(
126 "[{}] ✗ Message {} failed after {}ms: {}",
127 self.service_name,
128 message_id,
129 elapsed.as_millis(),
130 e
131 );
132 }
133 }
134 }
135
136 result
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use std::sync::Arc;
144
145 struct SuccessHandler;
146
147 #[async_trait]
148 impl MessageHandler for SuccessHandler {
149 async fn handle(
150 &self,
151 _message: ReceivedMessage<serde_json::Value>,
152 ) -> Result<MiddlewareResult, crate::error::WorkerError> {
153 Ok(MiddlewareResult::Continue)
154 }
155 }
156
157 struct FailureHandler;
158
159 #[async_trait]
160 impl MessageHandler for FailureHandler {
161 async fn handle(
162 &self,
163 _message: ReceivedMessage<serde_json::Value>,
164 ) -> Result<MiddlewareResult, crate::error::WorkerError> {
165 Err(crate::error::WorkerError::ProcessingFailed(
166 "test error".to_string(),
167 ))
168 }
169 }
170
171 fn create_test_message() -> ReceivedMessage<serde_json::Value> {
172 use crate::message::{AckHandle, Message, MessageMetadata};
173
174 #[derive(Debug)]
175 struct MockAckHandle;
176
177 #[async_trait]
178 impl AckHandle for MockAckHandle {
179 async fn ack(&self) -> crate::WorkerResult<()> {
180 Ok(())
181 }
182
183 async fn nack(&self, _requeue: bool) -> crate::WorkerResult<()> {
184 Ok(())
185 }
186 }
187
188 let message = Message {
189 id: "test-1".to_string(),
190 payload: serde_json::json!({"test": "data"}),
191 metadata: MessageMetadata::new("test-queue"),
192 };
193 ReceivedMessage::new(message, Arc::new(MockAckHandle))
194 }
195
196 #[tokio::test]
197 async fn test_tracing_middleware_success() {
198 let middleware = TracingMiddleware::new("test-service");
199 let message = create_test_message();
200
201 let result = middleware.handle(message, Box::new(SuccessHandler)).await;
202 assert!(result.is_ok());
203 }
204
205 #[tokio::test]
206 async fn test_tracing_middleware_failure() {
207 let middleware = TracingMiddleware::new("test-service");
208 let message = create_test_message();
209
210 let result = middleware.handle(message, Box::new(FailureHandler)).await;
211 assert!(result.is_err());
212 }
213
214 #[tokio::test]
215 async fn test_tracing_middleware_name() {
216 let middleware = TracingMiddleware::new("my-service");
217 assert_eq!(middleware.name(), "tracing");
218 }
219}