foxtive_worker/middleware/
tracing.rs1use async_trait::async_trait;
2
3use crate::message::ReceivedMessage;
4use crate::middleware::{MessageHandler, Middleware, MiddlewareResult};
5
6pub struct TracingMiddleware {
24 service_name: String,
25}
26
27impl std::fmt::Debug for TracingMiddleware {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 f.debug_struct("TracingMiddleware")
30 .field("service_name", &self.service_name)
31 .finish()
32 }
33}
34
35impl TracingMiddleware {
36 pub fn new(service_name: impl Into<String>) -> Self {
41 Self {
42 service_name: service_name.into(),
43 }
44 }
45}
46
47#[async_trait]
48impl Middleware for TracingMiddleware {
49 fn name(&self) -> &str {
50 "tracing"
51 }
52
53 async fn handle(
54 &self,
55 message: ReceivedMessage<serde_json::Value>,
56 next: Box<dyn MessageHandler>,
57 ) -> Result<MiddlewareResult, crate::error::WorkerError> {
58 let message_id = message.message.id.clone();
59 let source = message.message.metadata.source.clone();
60
61 #[cfg(feature = "tracing")]
63 {
64 tracing::info!(
65 service = self.service_name.as_str(),
66 message_id = message_id.as_str(),
67 source = source.as_str(),
68 attempt = message.message.metadata.attempt,
69 "Processing message"
70 );
71 }
72
73 #[cfg(not(feature = "tracing"))]
74 {
75 println!(
76 "[{}] Processing message {} from {} (attempt {})",
77 self.service_name, message_id, source, message.message.metadata.attempt
78 );
79 }
80
81 let start_time = std::time::Instant::now();
82
83 let result = next.handle(message).await;
85
86 let elapsed = start_time.elapsed();
87
88 match &result {
90 Ok(MiddlewareResult::Continue) | Ok(MiddlewareResult::Acknowledged) => {
91 #[cfg(feature = "tracing")]
92 {
93 tracing::info!(
94 service = self.service_name.as_str(),
95 message_id = message_id.as_str(),
96 duration_ms = elapsed.as_millis(),
97 "Message processed successfully"
98 );
99 }
100
101 #[cfg(not(feature = "tracing"))]
102 {
103 println!(
104 "[{}] ✓ Message {} processed in {}ms",
105 self.service_name,
106 message_id,
107 elapsed.as_millis()
108 );
109 }
110 }
111 Err(e) => {
112 #[cfg(feature = "tracing")]
113 {
114 tracing::error!(
115 service = self.service_name.as_str(),
116 message_id = message_id.as_str(),
117 duration_ms = elapsed.as_millis(),
118 error = e.to_string().as_str(),
119 "Message processing failed"
120 );
121 }
122
123 #[cfg(not(feature = "tracing"))]
124 {
125 println!(
126 "[{}] ✗ Message {} failed after {}ms: {}",
127 self.service_name,
128 message_id,
129 elapsed.as_millis(),
130 e
131 );
132 }
133 }
134 }
135
136 result
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use std::sync::Arc;
144
145 struct SuccessHandler;
146
147 #[async_trait]
148 impl MessageHandler for SuccessHandler {
149 async fn handle(&self, _message: ReceivedMessage<serde_json::Value>) -> Result<MiddlewareResult, crate::error::WorkerError> {
150 Ok(MiddlewareResult::Continue)
151 }
152 }
153
154 struct FailureHandler;
155
156 #[async_trait]
157 impl MessageHandler for FailureHandler {
158 async fn handle(&self, _message: ReceivedMessage<serde_json::Value>) -> Result<MiddlewareResult, crate::error::WorkerError> {
159 Err(crate::error::WorkerError::ProcessingFailed(
160 "test error".to_string(),
161 ))
162 }
163 }
164
165 fn create_test_message() -> ReceivedMessage<serde_json::Value> {
166 use crate::message::{AckHandle, Message, MessageMetadata};
167
168 #[derive(Debug)]
169 struct MockAckHandle;
170
171 #[async_trait]
172 impl AckHandle for MockAckHandle {
173 async fn ack(&self) -> crate::WorkerResult<()> {
174 Ok(())
175 }
176
177 async fn nack(&self, _requeue: bool) -> crate::WorkerResult<()> {
178 Ok(())
179 }
180 }
181
182 let message = Message {
183 id: "test-1".to_string(),
184 payload: serde_json::json!({"test": "data"}),
185 metadata: MessageMetadata::new("test-queue"),
186 };
187 ReceivedMessage::new(message, Arc::new(MockAckHandle))
188 }
189
190 #[tokio::test]
191 async fn test_tracing_middleware_success() {
192 let middleware = TracingMiddleware::new("test-service");
193 let message = create_test_message();
194
195 let result = middleware.handle(message, Box::new(SuccessHandler)).await;
196 assert!(result.is_ok());
197 }
198
199 #[tokio::test]
200 async fn test_tracing_middleware_failure() {
201 let middleware = TracingMiddleware::new("test-service");
202 let message = create_test_message();
203
204 let result = middleware.handle(message, Box::new(FailureHandler)).await;
205 assert!(result.is_err());
206 }
207
208 #[tokio::test]
209 async fn test_tracing_middleware_name() {
210 let middleware = TracingMiddleware::new("my-service");
211 assert_eq!(middleware.name(), "tracing");
212 }
213}