Skip to main content

foxtive_worker/middleware/
tracing.rs

1use async_trait::async_trait;
2
3use crate::message::ReceivedMessage;
4use crate::middleware::{MessageHandler, Middleware, MiddlewareResult};
5
6/// Middleware that adds distributed tracing to message processing.
7///
8/// This middleware creates trace spans for each message, capturing:
9/// - Message ID and metadata
10/// - Processing duration
11/// - Success/failure status
12/// - Error details (if any)
13///
14/// When the `tracing` feature is enabled, it uses the `tracing` crate
15/// to emit structured events. Otherwise, it logs to stdout.
16///
17/// # Example
18/// ```rust,no_run
19/// use foxtive_worker::TracingMiddleware;
20///
21/// let middleware = TracingMiddleware::new("my-service");
22/// ```
23pub struct TracingMiddleware {
24    service_name: String,
25}
26
27impl std::fmt::Debug for TracingMiddleware {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        f.debug_struct("TracingMiddleware")
30            .field("service_name", &self.service_name)
31            .finish()
32    }
33}
34
35impl TracingMiddleware {
36    /// Create a new tracing middleware.
37    ///
38    /// # Arguments
39    /// * `service_name` - Name of the service for trace identification
40    pub fn new(service_name: impl Into<String>) -> Self {
41        Self {
42            service_name: service_name.into(),
43        }
44    }
45}
46
47#[async_trait]
48impl Middleware for TracingMiddleware {
49    fn name(&self) -> &str {
50        "tracing"
51    }
52
53    async fn handle(
54        &self,
55        message: ReceivedMessage<serde_json::Value>,
56        next: Box<dyn MessageHandler>,
57    ) -> Result<MiddlewareResult, crate::error::WorkerError> {
58        let message_id = message.message.id.clone();
59        let source = message.message.metadata.source.clone();
60
61        // Log message reception
62        #[cfg(feature = "tracing")]
63        {
64            tracing::info!(
65                service = self.service_name.as_str(),
66                message_id = message_id.as_str(),
67                source = source.as_str(),
68                attempt = message.message.metadata.attempt,
69                "Processing message"
70            );
71        }
72
73        #[cfg(not(feature = "tracing"))]
74        {
75            println!(
76                "[{}] Processing message {} from {} (attempt {})",
77                self.service_name, message_id, source, message.message.metadata.attempt
78            );
79        }
80
81        let start_time = std::time::Instant::now();
82
83        // Process the message
84        let result = next.handle(message).await;
85
86        let elapsed = start_time.elapsed();
87
88        // Log completion
89        match &result {
90            Ok(MiddlewareResult::Continue) | Ok(MiddlewareResult::Acknowledged) => {
91                #[cfg(feature = "tracing")]
92                {
93                    tracing::info!(
94                        service = self.service_name.as_str(),
95                        message_id = message_id.as_str(),
96                        duration_ms = elapsed.as_millis(),
97                        "Message processed successfully"
98                    );
99                }
100
101                #[cfg(not(feature = "tracing"))]
102                {
103                    println!(
104                        "[{}] ✓ Message {} processed in {}ms",
105                        self.service_name,
106                        message_id,
107                        elapsed.as_millis()
108                    );
109                }
110            }
111            Err(e) => {
112                #[cfg(feature = "tracing")]
113                {
114                    tracing::error!(
115                        service = self.service_name.as_str(),
116                        message_id = message_id.as_str(),
117                        duration_ms = elapsed.as_millis(),
118                        error = e.to_string().as_str(),
119                        "Message processing failed"
120                    );
121                }
122
123                #[cfg(not(feature = "tracing"))]
124                {
125                    println!(
126                        "[{}] ✗ Message {} failed after {}ms: {}",
127                        self.service_name,
128                        message_id,
129                        elapsed.as_millis(),
130                        e
131                    );
132                }
133            }
134        }
135
136        result
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use std::sync::Arc;
144
145    struct SuccessHandler;
146
147    #[async_trait]
148    impl MessageHandler for SuccessHandler {
149        async fn handle(&self, _message: ReceivedMessage<serde_json::Value>) -> Result<MiddlewareResult, crate::error::WorkerError> {
150            Ok(MiddlewareResult::Continue)
151        }
152    }
153
154    struct FailureHandler;
155
156    #[async_trait]
157    impl MessageHandler for FailureHandler {
158        async fn handle(&self, _message: ReceivedMessage<serde_json::Value>) -> Result<MiddlewareResult, crate::error::WorkerError> {
159            Err(crate::error::WorkerError::ProcessingFailed(
160                "test error".to_string(),
161            ))
162        }
163    }
164
165    fn create_test_message() -> ReceivedMessage<serde_json::Value> {
166        use crate::message::{AckHandle, Message, MessageMetadata};
167
168        #[derive(Debug)]
169        struct MockAckHandle;
170
171        #[async_trait]
172        impl AckHandle for MockAckHandle {
173            async fn ack(&self) -> crate::WorkerResult<()> {
174                Ok(())
175            }
176
177            async fn nack(&self, _requeue: bool) -> crate::WorkerResult<()> {
178                Ok(())
179            }
180        }
181
182        let message = Message {
183            id: "test-1".to_string(),
184            payload: serde_json::json!({"test": "data"}),
185            metadata: MessageMetadata::new("test-queue"),
186        };
187        ReceivedMessage::new(message, Arc::new(MockAckHandle))
188    }
189
190    #[tokio::test]
191    async fn test_tracing_middleware_success() {
192        let middleware = TracingMiddleware::new("test-service");
193        let message = create_test_message();
194
195        let result = middleware.handle(message, Box::new(SuccessHandler)).await;
196        assert!(result.is_ok());
197    }
198
199    #[tokio::test]
200    async fn test_tracing_middleware_failure() {
201        let middleware = TracingMiddleware::new("test-service");
202        let message = create_test_message();
203
204        let result = middleware.handle(message, Box::new(FailureHandler)).await;
205        assert!(result.is_err());
206    }
207
208    #[tokio::test]
209    async fn test_tracing_middleware_name() {
210        let middleware = TracingMiddleware::new("my-service");
211        assert_eq!(middleware.name(), "tracing");
212    }
213}