rust_rabbit/
message.rs

1//! Message envelope with metadata for retry tracking and error history
2//!
3//! This module provides a standardized message format that includes:
4//! - Original payload data
5//! - Retry tracking (attempt count, max retries)
6//! - Error history for debugging failed messages
7//! - Timestamps for monitoring and debugging
8
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12
13/// Message envelope that wraps the actual payload with metadata
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct MessageEnvelope<T> {
16    /// The actual message payload
17    pub payload: T,
18    
19    /// Message metadata
20    pub metadata: MessageMetadata,
21}
22
23/// Metadata associated with a message
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct MessageMetadata {
26    /// Unique message ID for tracking
27    pub message_id: String,
28    
29    /// Current retry attempt (0 = first attempt, 1 = first retry, etc.)
30    pub retry_attempt: u32,
31    
32    /// Maximum number of retry attempts allowed
33    pub max_retries: u32,
34    
35    /// When the message was first created
36    pub created_at: DateTime<Utc>,
37    
38    /// When the message was last processed (updated on each retry)
39    pub last_processed_at: DateTime<Utc>,
40    
41    /// History of errors from previous attempts
42    pub error_history: Vec<ErrorRecord>,
43    
44    /// Custom headers for additional metadata
45    pub headers: HashMap<String, String>,
46    
47    /// Source information (queue, exchange, routing key where message originated)
48    pub source: MessageSource,
49}
50
51/// Record of an error that occurred during message processing
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ErrorRecord {
54    /// Which attempt this error occurred on
55    pub attempt: u32,
56    
57    /// Error message
58    pub error: String,
59    
60    /// When the error occurred
61    pub occurred_at: DateTime<Utc>,
62    
63    /// Error category for classification
64    pub error_type: ErrorType,
65    
66    /// Additional context about the error
67    pub context: Option<String>,
68}
69
70/// Classification of error types for better handling
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub enum ErrorType {
73    /// Temporary errors that might succeed on retry (network, timeout, etc.)
74    Transient,
75    
76    /// Permanent errors that won't succeed on retry (validation, auth, etc.)
77    Permanent,
78    
79    /// Resource errors (rate limit, quota exceeded, etc.)
80    Resource,
81    
82    /// Unknown error type
83    Unknown,
84}
85
86/// Information about where the message came from
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct MessageSource {
89    /// Original queue name
90    pub queue: String,
91    
92    /// Exchange name (if any)
93    pub exchange: Option<String>,
94    
95    /// Routing key used
96    pub routing_key: Option<String>,
97    
98    /// Application or service that published the message
99    pub publisher: Option<String>,
100}
101
102impl<T> MessageEnvelope<T> {
103    /// Create a new message envelope with the given payload
104    pub fn new(payload: T, source_queue: &str) -> Self {
105        let now = Utc::now();
106        
107        Self {
108            payload,
109            metadata: MessageMetadata {
110                message_id: uuid::Uuid::new_v4().to_string(),
111                retry_attempt: 0,
112                max_retries: 0, // Will be set by retry config
113                created_at: now,
114                last_processed_at: now,
115                error_history: Vec::new(),
116                headers: HashMap::new(),
117                source: MessageSource {
118                    queue: source_queue.to_string(),
119                    exchange: None,
120                    routing_key: None,
121                    publisher: None,
122                },
123            },
124        }
125    }
126    
127    /// Create envelope with source details
128    pub fn with_source(
129        payload: T,
130        queue: &str,
131        exchange: Option<&str>,
132        routing_key: Option<&str>,
133        publisher: Option<&str>,
134    ) -> Self {
135        let mut envelope = Self::new(payload, queue);
136        envelope.metadata.source.exchange = exchange.map(|s| s.to_string());
137        envelope.metadata.source.routing_key = routing_key.map(|s| s.to_string());
138        envelope.metadata.source.publisher = publisher.map(|s| s.to_string());
139        envelope
140    }
141    
142    /// Set the maximum number of retries
143    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
144        self.metadata.max_retries = max_retries;
145        self
146    }
147    
148    /// Add a custom header
149    pub fn with_header(mut self, key: &str, value: &str) -> Self {
150        self.metadata.headers.insert(key.to_string(), value.to_string());
151        self
152    }
153    
154    /// Check if this message has exceeded its retry limit
155    pub fn is_retry_exhausted(&self) -> bool {
156        self.metadata.retry_attempt >= self.metadata.max_retries
157    }
158    
159    /// Check if this is the first attempt (not a retry)
160    pub fn is_first_attempt(&self) -> bool {
161        self.metadata.retry_attempt == 0
162    }
163    
164    /// Get the next retry attempt number
165    pub fn next_retry_attempt(&self) -> u32 {
166        self.metadata.retry_attempt + 1
167    }
168    
169    /// Record an error and create a new envelope for retry
170    pub fn with_error(mut self, error: &str, error_type: ErrorType, context: Option<&str>) -> Self {
171        // Record the error
172        let error_record = ErrorRecord {
173            attempt: self.metadata.retry_attempt,
174            error: error.to_string(),
175            occurred_at: Utc::now(),
176            error_type,
177            context: context.map(|s| s.to_string()),
178        };
179        
180        self.metadata.error_history.push(error_record);
181        
182        // Increment retry attempt
183        self.metadata.retry_attempt += 1;
184        self.metadata.last_processed_at = Utc::now();
185        
186        self
187    }
188    
189    /// Get the last error if any
190    pub fn last_error(&self) -> Option<&ErrorRecord> {
191        self.metadata.error_history.last()
192    }
193    
194    /// Get all errors of a specific type
195    pub fn errors_by_type(&self, error_type: &ErrorType) -> Vec<&ErrorRecord> {
196        self.metadata.error_history
197            .iter()
198            .filter(|e| std::mem::discriminant(&e.error_type) == std::mem::discriminant(error_type))
199            .collect()
200    }
201    
202    /// Get a summary string for dead letter analysis
203    pub fn get_failure_summary(&self) -> String {
204        let total_errors = self.metadata.error_history.len();
205        let last_error = self.last_error();
206        
207        match last_error {
208            Some(error) => {
209                format!(
210                    "Message {} failed after {} attempts. Last error (attempt {}): {} [{}]",
211                    self.metadata.message_id,
212                    total_errors,
213                    error.attempt + 1,
214                    error.error,
215                    match error.error_type {
216                        ErrorType::Transient => "TRANSIENT",
217                        ErrorType::Permanent => "PERMANENT", 
218                        ErrorType::Resource => "RESOURCE",
219                        ErrorType::Unknown => "UNKNOWN",
220                    }
221                )
222            }
223            None => format!("Message {} has no error history", self.metadata.message_id),
224        }
225    }
226    
227    /// Convert to JSON for debugging
228    pub fn to_debug_json(&self) -> Result<String, serde_json::Error> 
229    where 
230        T: Serialize 
231    {
232        serde_json::to_string_pretty(self)
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239    use serde::{Deserialize, Serialize};
240    
241    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
242    struct TestPayload {
243        id: u32,
244        name: String,
245    }
246    
247    #[test]
248    fn test_message_envelope_creation() {
249        let payload = TestPayload {
250            id: 123,
251            name: "test".to_string(),
252        };
253        
254        let envelope = MessageEnvelope::new(payload.clone(), "test_queue")
255            .with_max_retries(3); // Set max retries so it's not exhausted initially
256        
257        assert_eq!(envelope.payload, payload);
258        assert_eq!(envelope.metadata.retry_attempt, 0);
259        assert_eq!(envelope.metadata.source.queue, "test_queue");
260        assert!(envelope.is_first_attempt());
261        assert!(!envelope.is_retry_exhausted());
262    }
263    
264    #[test]
265    fn test_error_tracking() {
266        let payload = TestPayload {
267            id: 123,
268            name: "test".to_string(),
269        };
270        
271        let envelope = MessageEnvelope::new(payload, "test_queue")
272            .with_max_retries(3)
273            .with_error("First error", ErrorType::Transient, Some("Network timeout"))
274            .with_error("Second error", ErrorType::Resource, Some("Rate limited"));
275        
276        assert_eq!(envelope.metadata.retry_attempt, 2);
277        assert_eq!(envelope.metadata.error_history.len(), 2);
278        assert!(!envelope.is_retry_exhausted());
279        
280        let last_error = envelope.last_error().unwrap();
281        assert_eq!(last_error.error, "Second error");
282        assert_eq!(last_error.attempt, 1);
283    }
284    
285    #[test] 
286    fn test_retry_exhaustion() {
287        let payload = TestPayload {
288            id: 123,
289            name: "test".to_string(),
290        };
291        
292        let envelope = MessageEnvelope::new(payload, "test_queue")
293            .with_max_retries(2)
294            .with_error("Error 1", ErrorType::Transient, None)
295            .with_error("Error 2", ErrorType::Transient, None)
296            .with_error("Error 3", ErrorType::Permanent, None);
297        
298        assert!(envelope.is_retry_exhausted());
299        assert_eq!(envelope.next_retry_attempt(), 4);
300    }
301    
302    #[test]
303    fn test_failure_summary() {
304        let payload = TestPayload {
305            id: 123,
306            name: "test".to_string(),
307        };
308        
309        let envelope = MessageEnvelope::new(payload, "test_queue")
310            .with_max_retries(2)
311            .with_error("Database connection failed", ErrorType::Transient, Some("Timeout after 5s"))
312            .with_error("Invalid data format", ErrorType::Permanent, None);
313        
314        let summary = envelope.get_failure_summary();
315        assert!(summary.contains("failed after 2 attempts"));
316        assert!(summary.contains("Invalid data format"));
317        assert!(summary.contains("PERMANENT"));
318    }
319}