rust_rabbit/
message.rs

1//! Message envelope with metadata for retry tracking and error history
2//!
3//! This module provides a standardized message format that includes:
4//! - Original payload data
5//! - Retry tracking (attempt count, max retries)
6//! - Error history for debugging failed messages
7//! - Timestamps for monitoring and debugging
8
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12
13/// Simple wire message format for basic publish/consume
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct WireMessage<T> {
16    pub data: T,
17    pub retry_attempt: u32,
18}
19
20/// Message envelope that wraps the actual payload with metadata
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct MessageEnvelope<T> {
23    /// The actual message payload
24    pub payload: T,
25
26    /// Message metadata
27    pub metadata: MessageMetadata,
28}
29
30/// Metadata associated with a message
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct MessageMetadata {
33    /// Unique message ID for tracking
34    pub message_id: String,
35
36    /// Current retry attempt (0 = first attempt, 1 = first retry, etc.)
37    pub retry_attempt: u32,
38
39    /// Maximum number of retry attempts allowed
40    pub max_retries: u32,
41
42    /// When the message was first created
43    pub created_at: DateTime<Utc>,
44
45    /// When the message was last processed (updated on each retry)
46    pub last_processed_at: DateTime<Utc>,
47
48    /// History of errors from previous attempts
49    pub error_history: Vec<ErrorRecord>,
50
51    /// Custom headers for additional metadata
52    pub headers: HashMap<String, String>,
53
54    /// Source information (queue, exchange, routing key where message originated)
55    pub source: MessageSource,
56}
57
58/// Record of an error that occurred during message processing
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct ErrorRecord {
61    /// Which attempt this error occurred on
62    pub attempt: u32,
63
64    /// Error message
65    pub error: String,
66
67    /// When the error occurred
68    pub occurred_at: DateTime<Utc>,
69
70    /// Error category for classification
71    pub error_type: ErrorType,
72
73    /// Additional context about the error
74    pub context: Option<String>,
75}
76
77/// Classification of error types for better handling
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub enum ErrorType {
80    /// Temporary errors that might succeed on retry (network, timeout, etc.)
81    Transient,
82
83    /// Permanent errors that won't succeed on retry (validation, auth, etc.)
84    Permanent,
85
86    /// Resource errors (rate limit, quota exceeded, etc.)
87    Resource,
88
89    /// Unknown error type
90    Unknown,
91}
92
93/// Information about where the message came from
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct MessageSource {
96    /// Original queue name
97    pub queue: String,
98
99    /// Exchange name (if any)
100    pub exchange: Option<String>,
101
102    /// Routing key used
103    pub routing_key: Option<String>,
104
105    /// Application or service that published the message
106    pub publisher: Option<String>,
107}
108
109impl<T> MessageEnvelope<T> {
110    /// Create a new message envelope with the given payload
111    pub fn new(payload: T, source_queue: &str) -> Self {
112        let now = Utc::now();
113
114        Self {
115            payload,
116            metadata: MessageMetadata {
117                message_id: uuid::Uuid::new_v4().to_string(),
118                retry_attempt: 0,
119                max_retries: 0, // Will be set by retry config
120                created_at: now,
121                last_processed_at: now,
122                error_history: Vec::new(),
123                headers: HashMap::new(),
124                source: MessageSource {
125                    queue: source_queue.to_string(),
126                    exchange: None,
127                    routing_key: None,
128                    publisher: None,
129                },
130            },
131        }
132    }
133
134    /// Create envelope with source details
135    pub fn with_source(
136        payload: T,
137        queue: &str,
138        exchange: Option<&str>,
139        routing_key: Option<&str>,
140        publisher: Option<&str>,
141    ) -> Self {
142        let mut envelope = Self::new(payload, queue);
143        envelope.metadata.source.exchange = exchange.map(|s| s.to_string());
144        envelope.metadata.source.routing_key = routing_key.map(|s| s.to_string());
145        envelope.metadata.source.publisher = publisher.map(|s| s.to_string());
146        envelope
147    }
148
149    /// Set the maximum number of retries
150    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
151        self.metadata.max_retries = max_retries;
152        self
153    }
154
155    /// Add a custom header
156    pub fn with_header(mut self, key: &str, value: &str) -> Self {
157        self.metadata
158            .headers
159            .insert(key.to_string(), value.to_string());
160        self
161    }
162
163    /// Check if this message has exceeded its retry limit
164    pub fn is_retry_exhausted(&self) -> bool {
165        self.metadata.retry_attempt >= self.metadata.max_retries
166    }
167
168    /// Check if this is the first attempt (not a retry)
169    pub fn is_first_attempt(&self) -> bool {
170        self.metadata.retry_attempt == 0
171    }
172
173    /// Get the next retry attempt number
174    pub fn next_retry_attempt(&self) -> u32 {
175        self.metadata.retry_attempt + 1
176    }
177
178    /// Record an error and create a new envelope for retry
179    pub fn with_error(mut self, error: &str, error_type: ErrorType, context: Option<&str>) -> Self {
180        // Record the error
181        let error_record = ErrorRecord {
182            attempt: self.metadata.retry_attempt,
183            error: error.to_string(),
184            occurred_at: Utc::now(),
185            error_type,
186            context: context.map(|s| s.to_string()),
187        };
188
189        self.metadata.error_history.push(error_record);
190
191        // Increment retry attempt
192        self.metadata.retry_attempt += 1;
193        self.metadata.last_processed_at = Utc::now();
194
195        self
196    }
197
198    /// Get the last error if any
199    pub fn last_error(&self) -> Option<&ErrorRecord> {
200        self.metadata.error_history.last()
201    }
202
203    /// Get all errors of a specific type
204    pub fn errors_by_type(&self, error_type: &ErrorType) -> Vec<&ErrorRecord> {
205        self.metadata
206            .error_history
207            .iter()
208            .filter(|e| std::mem::discriminant(&e.error_type) == std::mem::discriminant(error_type))
209            .collect()
210    }
211
212    /// Get a summary string for dead letter analysis
213    pub fn get_failure_summary(&self) -> String {
214        let total_errors = self.metadata.error_history.len();
215        let last_error = self.last_error();
216
217        match last_error {
218            Some(error) => {
219                format!(
220                    "Message {} failed after {} attempts. Last error (attempt {}): {} [{}]",
221                    self.metadata.message_id,
222                    total_errors,
223                    error.attempt + 1,
224                    error.error,
225                    match error.error_type {
226                        ErrorType::Transient => "TRANSIENT",
227                        ErrorType::Permanent => "PERMANENT",
228                        ErrorType::Resource => "RESOURCE",
229                        ErrorType::Unknown => "UNKNOWN",
230                    }
231                )
232            }
233            None => format!("Message {} has no error history", self.metadata.message_id),
234        }
235    }
236
237    /// Convert to JSON for debugging
238    pub fn to_debug_json(&self) -> Result<String, serde_json::Error>
239    where
240        T: Serialize,
241    {
242        serde_json::to_string_pretty(self)
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use serde::{Deserialize, Serialize};
250
251    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
252    struct TestPayload {
253        id: u32,
254        name: String,
255    }
256
257    #[test]
258    fn test_message_envelope_creation() {
259        let payload = TestPayload {
260            id: 123,
261            name: "test".to_string(),
262        };
263
264        let envelope = MessageEnvelope::new(payload.clone(), "test_queue").with_max_retries(3); // Set max retries so it's not exhausted initially
265
266        assert_eq!(envelope.payload, payload);
267        assert_eq!(envelope.metadata.retry_attempt, 0);
268        assert_eq!(envelope.metadata.source.queue, "test_queue");
269        assert!(envelope.is_first_attempt());
270        assert!(!envelope.is_retry_exhausted());
271    }
272
273    #[test]
274    fn test_error_tracking() {
275        let payload = TestPayload {
276            id: 123,
277            name: "test".to_string(),
278        };
279
280        let envelope = MessageEnvelope::new(payload, "test_queue")
281            .with_max_retries(3)
282            .with_error("First error", ErrorType::Transient, Some("Network timeout"))
283            .with_error("Second error", ErrorType::Resource, Some("Rate limited"));
284
285        assert_eq!(envelope.metadata.retry_attempt, 2);
286        assert_eq!(envelope.metadata.error_history.len(), 2);
287        assert!(!envelope.is_retry_exhausted());
288
289        let last_error = envelope.last_error().unwrap();
290        assert_eq!(last_error.error, "Second error");
291        assert_eq!(last_error.attempt, 1);
292    }
293
294    #[test]
295    fn test_retry_exhaustion() {
296        let payload = TestPayload {
297            id: 123,
298            name: "test".to_string(),
299        };
300
301        let envelope = MessageEnvelope::new(payload, "test_queue")
302            .with_max_retries(2)
303            .with_error("Error 1", ErrorType::Transient, None)
304            .with_error("Error 2", ErrorType::Transient, None)
305            .with_error("Error 3", ErrorType::Permanent, None);
306
307        assert!(envelope.is_retry_exhausted());
308        assert_eq!(envelope.next_retry_attempt(), 4);
309    }
310
311    #[test]
312    fn test_failure_summary() {
313        let payload = TestPayload {
314            id: 123,
315            name: "test".to_string(),
316        };
317
318        let envelope = MessageEnvelope::new(payload, "test_queue")
319            .with_max_retries(2)
320            .with_error(
321                "Database connection failed",
322                ErrorType::Transient,
323                Some("Timeout after 5s"),
324            )
325            .with_error("Invalid data format", ErrorType::Permanent, None);
326
327        let summary = envelope.get_failure_summary();
328        assert!(summary.contains("failed after 2 attempts"));
329        assert!(summary.contains("Invalid data format"));
330        assert!(summary.contains("PERMANENT"));
331    }
332}