rust_rabbit/
message.rs

1//! Message envelope with metadata for retry tracking and error history
2//!
3//! This module provides a standardized message format that includes:
4//! - Original payload data
5//! - Retry tracking (attempt count, max retries)
6//! - Error history for debugging failed messages
7//! - Timestamps for monitoring and debugging
8//! - MassTransit integration support
9
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14/// Simple wire message format for basic publish/consume
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct WireMessage<T> {
17    pub data: T,
18    pub retry_attempt: u32,
19}
20
21/// MassTransit message envelope format (C# camelCase JSON)
22///
23/// This structure matches MassTransit's message envelope format for integration
24/// with C# services using MassTransit's IBus.
25///
26/// MassTransit wraps messages in this format when publishing via IBus.
27/// The actual message payload is in the `message` field.
28///
29/// According to MassTransit documentation, messageType should be an array of URNs
30/// in the format: "urn:message:Namespace:TypeName"
31#[derive(Debug, Clone, Serialize, Deserialize)]
32#[serde(rename_all = "camelCase")]
33pub struct MassTransitEnvelope {
34    /// Auto-generated message ID (Guid in C#)
35    #[serde(default)]
36    pub message_id: Option<String>,
37
38    /// Correlation ID for tracking operations (Guid? in C#)
39    #[serde(default)]
40    pub correlation_id: Option<String>,
41
42    /// Source address (Uri in C#)
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub source_address: Option<String>,
45
46    /// Destination address (Uri in C#)
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub destination_address: Option<String>,
49
50    /// Sent time (DateTime? in C#)
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub sent_time: Option<DateTime<Utc>>,
53
54    /// Message type array - array of URNs in format "urn:message:Namespace:TypeName"
55    /// This is required for MassTransit to properly route messages
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub message_type: Option<Vec<String>>,
58
59    /// Headers (Dictionary<string, object> in C#)
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub headers: Option<HashMap<String, serde_json::Value>>,
62
63    /// The actual message payload (this is what we care about)
64    /// MassTransit wraps the actual message in this field
65    pub message: serde_json::Value,
66}
67
68impl MassTransitEnvelope {
69    /// Create a new MassTransit envelope with a message payload
70    /// This automatically generates a message ID (Guid format) and sets sent_time
71    pub fn new<T>(message: &T) -> Result<Self, serde_json::Error>
72    where
73        T: Serialize,
74    {
75        let message_json = serde_json::to_value(message)?;
76
77        Ok(Self {
78            message_id: Some(uuid::Uuid::new_v4().to_string()),
79            correlation_id: None,
80            source_address: None,
81            destination_address: None,
82            sent_time: Some(Utc::now()),
83            message_type: None,
84            headers: None,
85            message: message_json,
86        })
87    }
88
89    /// Convert message type string to URN format if needed
90    /// Accepts "Namespace:TypeName" or "urn:message:Namespace:TypeName"
91    fn normalize_message_type(message_type: &str) -> String {
92        if message_type.starts_with("urn:message:") {
93            message_type.to_string()
94        } else {
95            format!("urn:message:{}", message_type)
96        }
97    }
98
99    /// Create a MassTransit envelope with message type for proper routing
100    /// MassTransit uses message type names for routing (e.g., "YourNamespace:YourMessageType")
101    /// The message type will be added as an array in the envelope body AND in headers for full compatibility
102    ///
103    /// Message type can be in format:
104    /// - "Namespace:TypeName" (will be converted to "urn:message:Namespace:TypeName")
105    /// - "urn:message:Namespace:TypeName" (used as-is)
106    pub fn with_message_type<T>(message: &T, message_type: &str) -> Result<Self, serde_json::Error>
107    where
108        T: Serialize,
109    {
110        let mut envelope = Self::new(message)?;
111
112        // Normalize message type to URN format
113        let urn_type = Self::normalize_message_type(message_type);
114
115        // Add message type to envelope body (required by MassTransit)
116        envelope.message_type = Some(vec![urn_type.clone()]);
117
118        // Also add to headers for full compatibility (MT-Host-MessageType header)
119        let mut headers = HashMap::new();
120        let message_type_array = vec![serde_json::Value::String(urn_type)];
121        headers.insert(
122            "MT-Host-MessageType".to_string(),
123            serde_json::Value::Array(message_type_array),
124        );
125        envelope.headers = Some(headers);
126
127        Ok(envelope)
128    }
129
130    /// Set correlation ID for tracking operations
131    pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
132        self.correlation_id = Some(correlation_id.into());
133        self
134    }
135
136    /// Set source address (typically "rabbitmq://host/exchange")
137    pub fn with_source_address(mut self, source_address: impl Into<String>) -> Self {
138        self.source_address = Some(source_address.into());
139        self
140    }
141
142    /// Set destination address (typically "rabbitmq://host/queue")
143    pub fn with_destination_address(mut self, destination_address: impl Into<String>) -> Self {
144        self.destination_address = Some(destination_address.into());
145        self
146    }
147
148    /// Add custom header
149    pub fn with_header(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
150        if self.headers.is_none() {
151            self.headers = Some(HashMap::new());
152        }
153        if let Some(ref mut headers) = self.headers {
154            headers.insert(key.into(), value);
155        }
156        self
157    }
158
159    /// Set message type in envelope body and headers (required for MassTransit routing)
160    /// MassTransit expects message types as an array in both envelope body and MT-Host-MessageType header
161    /// Message type can be in format "Namespace:TypeName" or "urn:message:Namespace:TypeName"
162    pub fn with_message_type_header(mut self, message_type: &str) -> Self {
163        // Normalize to URN format
164        let urn_type = Self::normalize_message_type(message_type);
165
166        // Add to envelope body
167        self.message_type = Some(vec![urn_type.clone()]);
168
169        // Add to headers
170        if self.headers.is_none() {
171            self.headers = Some(HashMap::new());
172        }
173        if let Some(ref mut headers) = self.headers {
174            let message_type_array = vec![serde_json::Value::String(urn_type)];
175            headers.insert(
176                "MT-Host-MessageType".to_string(),
177                serde_json::Value::Array(message_type_array),
178            );
179        }
180        self
181    }
182
183    /// Extract the actual message payload as the specified type
184    pub fn extract_message<T>(&self) -> Result<T, serde_json::Error>
185    where
186        T: for<'de> Deserialize<'de>,
187    {
188        serde_json::from_value(self.message.clone())
189    }
190
191    /// Get correlation ID if present
192    pub fn correlation_id(&self) -> Option<&str> {
193        self.correlation_id.as_deref()
194    }
195
196    /// Get message ID if present
197    pub fn message_id(&self) -> Option<&str> {
198        self.message_id.as_deref()
199    }
200
201    /// Try to deserialize a MassTransit envelope from JSON bytes
202    pub fn from_slice(bytes: &[u8]) -> Result<Self, serde_json::Error> {
203        serde_json::from_slice(bytes)
204    }
205}
206
207/// Message envelope that wraps the actual payload with metadata
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct MessageEnvelope<T> {
210    /// The actual message payload
211    pub payload: T,
212
213    /// Message metadata
214    pub metadata: MessageMetadata,
215}
216
217/// Metadata associated with a message
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct MessageMetadata {
220    /// Unique message ID for tracking
221    pub message_id: String,
222
223    /// Current retry attempt (0 = first attempt, 1 = first retry, etc.)
224    pub retry_attempt: u32,
225
226    /// Maximum number of retry attempts allowed
227    pub max_retries: u32,
228
229    /// When the message was first created
230    pub created_at: DateTime<Utc>,
231
232    /// When the message was last processed (updated on each retry)
233    pub last_processed_at: DateTime<Utc>,
234
235    /// History of errors from previous attempts
236    pub error_history: Vec<ErrorRecord>,
237
238    /// Custom headers for additional metadata
239    pub headers: HashMap<String, String>,
240
241    /// Source information (queue, exchange, routing key where message originated)
242    pub source: MessageSource,
243}
244
245/// Record of an error that occurred during message processing
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct ErrorRecord {
248    /// Which attempt this error occurred on
249    pub attempt: u32,
250
251    /// Error message
252    pub error: String,
253
254    /// When the error occurred
255    pub occurred_at: DateTime<Utc>,
256
257    /// Error category for classification
258    pub error_type: ErrorType,
259
260    /// Additional context about the error
261    pub context: Option<String>,
262}
263
264/// Classification of error types for better handling
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub enum ErrorType {
267    /// Temporary errors that might succeed on retry (network, timeout, etc.)
268    Transient,
269
270    /// Permanent errors that won't succeed on retry (validation, auth, etc.)
271    Permanent,
272
273    /// Resource errors (rate limit, quota exceeded, etc.)
274    Resource,
275
276    /// Unknown error type
277    Unknown,
278}
279
280/// Information about where the message came from
281#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct MessageSource {
283    /// Original queue name
284    pub queue: String,
285
286    /// Exchange name (if any)
287    pub exchange: Option<String>,
288
289    /// Routing key used
290    pub routing_key: Option<String>,
291
292    /// Application or service that published the message
293    pub publisher: Option<String>,
294}
295
296impl<T> MessageEnvelope<T> {
297    /// Create a new message envelope with the given payload
298    pub fn new(payload: T, source_queue: &str) -> Self {
299        let now = Utc::now();
300
301        Self {
302            payload,
303            metadata: MessageMetadata {
304                message_id: uuid::Uuid::new_v4().to_string(),
305                retry_attempt: 0,
306                max_retries: 0, // Will be set by retry config
307                created_at: now,
308                last_processed_at: now,
309                error_history: Vec::new(),
310                headers: HashMap::new(),
311                source: MessageSource {
312                    queue: source_queue.to_string(),
313                    exchange: None,
314                    routing_key: None,
315                    publisher: None,
316                },
317            },
318        }
319    }
320
321    /// Create envelope with source details
322    pub fn with_source(
323        payload: T,
324        queue: &str,
325        exchange: Option<&str>,
326        routing_key: Option<&str>,
327        publisher: Option<&str>,
328    ) -> Self {
329        let mut envelope = Self::new(payload, queue);
330        envelope.metadata.source.exchange = exchange.map(|s| s.to_string());
331        envelope.metadata.source.routing_key = routing_key.map(|s| s.to_string());
332        envelope.metadata.source.publisher = publisher.map(|s| s.to_string());
333        envelope
334    }
335
336    /// Set the maximum number of retries
337    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
338        self.metadata.max_retries = max_retries;
339        self
340    }
341
342    /// Add a custom header
343    pub fn with_header(mut self, key: &str, value: &str) -> Self {
344        self.metadata
345            .headers
346            .insert(key.to_string(), value.to_string());
347        self
348    }
349
350    /// Check if this message has exceeded its retry limit
351    pub fn is_retry_exhausted(&self) -> bool {
352        self.metadata.retry_attempt >= self.metadata.max_retries
353    }
354
355    /// Check if this is the first attempt (not a retry)
356    pub fn is_first_attempt(&self) -> bool {
357        self.metadata.retry_attempt == 0
358    }
359
360    /// Get the next retry attempt number
361    pub fn next_retry_attempt(&self) -> u32 {
362        self.metadata.retry_attempt + 1
363    }
364
365    /// Record an error and create a new envelope for retry
366    pub fn with_error(mut self, error: &str, error_type: ErrorType, context: Option<&str>) -> Self {
367        // Record the error
368        let error_record = ErrorRecord {
369            attempt: self.metadata.retry_attempt,
370            error: error.to_string(),
371            occurred_at: Utc::now(),
372            error_type,
373            context: context.map(|s| s.to_string()),
374        };
375
376        self.metadata.error_history.push(error_record);
377
378        // Increment retry attempt
379        self.metadata.retry_attempt += 1;
380        self.metadata.last_processed_at = Utc::now();
381
382        self
383    }
384
385    /// Get the last error if any
386    pub fn last_error(&self) -> Option<&ErrorRecord> {
387        self.metadata.error_history.last()
388    }
389
390    /// Get all errors of a specific type
391    pub fn errors_by_type(&self, error_type: &ErrorType) -> Vec<&ErrorRecord> {
392        self.metadata
393            .error_history
394            .iter()
395            .filter(|e| std::mem::discriminant(&e.error_type) == std::mem::discriminant(error_type))
396            .collect()
397    }
398
399    /// Get a summary string for dead letter analysis
400    pub fn get_failure_summary(&self) -> String {
401        let total_errors = self.metadata.error_history.len();
402        let last_error = self.last_error();
403
404        match last_error {
405            Some(error) => {
406                format!(
407                    "Message {} failed after {} attempts. Last error (attempt {}): {} [{}]",
408                    self.metadata.message_id,
409                    total_errors,
410                    error.attempt + 1,
411                    error.error,
412                    match error.error_type {
413                        ErrorType::Transient => "TRANSIENT",
414                        ErrorType::Permanent => "PERMANENT",
415                        ErrorType::Resource => "RESOURCE",
416                        ErrorType::Unknown => "UNKNOWN",
417                    }
418                )
419            }
420            None => format!("Message {} has no error history", self.metadata.message_id),
421        }
422    }
423
424    /// Convert to JSON for debugging
425    pub fn to_debug_json(&self) -> Result<String, serde_json::Error>
426    where
427        T: Serialize,
428    {
429        serde_json::to_string_pretty(self)
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    use serde::{Deserialize, Serialize};
437
438    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
439    struct TestPayload {
440        id: u32,
441        name: String,
442    }
443
444    #[test]
445    fn test_message_envelope_creation() {
446        let payload = TestPayload {
447            id: 123,
448            name: "test".to_string(),
449        };
450
451        let envelope = MessageEnvelope::new(payload.clone(), "test_queue").with_max_retries(3); // Set max retries so it's not exhausted initially
452
453        assert_eq!(envelope.payload, payload);
454        assert_eq!(envelope.metadata.retry_attempt, 0);
455        assert_eq!(envelope.metadata.source.queue, "test_queue");
456        assert!(envelope.is_first_attempt());
457        assert!(!envelope.is_retry_exhausted());
458    }
459
460    #[test]
461    fn test_error_tracking() {
462        let payload = TestPayload {
463            id: 123,
464            name: "test".to_string(),
465        };
466
467        let envelope = MessageEnvelope::new(payload, "test_queue")
468            .with_max_retries(3)
469            .with_error("First error", ErrorType::Transient, Some("Network timeout"))
470            .with_error("Second error", ErrorType::Resource, Some("Rate limited"));
471
472        assert_eq!(envelope.metadata.retry_attempt, 2);
473        assert_eq!(envelope.metadata.error_history.len(), 2);
474        assert!(!envelope.is_retry_exhausted());
475
476        let last_error = envelope.last_error().unwrap();
477        assert_eq!(last_error.error, "Second error");
478        assert_eq!(last_error.attempt, 1);
479    }
480
481    #[test]
482    fn test_retry_exhaustion() {
483        let payload = TestPayload {
484            id: 123,
485            name: "test".to_string(),
486        };
487
488        let envelope = MessageEnvelope::new(payload, "test_queue")
489            .with_max_retries(2)
490            .with_error("Error 1", ErrorType::Transient, None)
491            .with_error("Error 2", ErrorType::Transient, None)
492            .with_error("Error 3", ErrorType::Permanent, None);
493
494        assert!(envelope.is_retry_exhausted());
495        assert_eq!(envelope.next_retry_attempt(), 4);
496    }
497
498    #[test]
499    fn test_failure_summary() {
500        let payload = TestPayload {
501            id: 123,
502            name: "test".to_string(),
503        };
504
505        let envelope = MessageEnvelope::new(payload, "test_queue")
506            .with_max_retries(2)
507            .with_error(
508                "Database connection failed",
509                ErrorType::Transient,
510                Some("Timeout after 5s"),
511            )
512            .with_error("Invalid data format", ErrorType::Permanent, None);
513
514        let summary = envelope.get_failure_summary();
515        assert!(summary.contains("failed after 2 attempts"));
516        assert!(summary.contains("Invalid data format"));
517        assert!(summary.contains("PERMANENT"));
518    }
519
520    #[test]
521    fn test_masstransit_envelope_deserialization() {
522        // Simulate MassTransit JSON format (camelCase)
523        let masstransit_json = r#"{
524            "messageId": "123e4567-e89b-12d3-a456-426614174000",
525            "correlationId": "987fcdeb-51a2-43d7-b890-123456789abc",
526            "sourceAddress": "rabbitmq://localhost/test",
527            "destinationAddress": "rabbitmq://localhost/queue",
528            "message": {
529                "id": 123,
530                "name": "test message"
531            }
532        }"#;
533
534        let envelope: MassTransitEnvelope = serde_json::from_str(masstransit_json).unwrap();
535
536        assert_eq!(
537            envelope.message_id,
538            Some("123e4567-e89b-12d3-a456-426614174000".to_string())
539        );
540        assert_eq!(
541            envelope.correlation_id,
542            Some("987fcdeb-51a2-43d7-b890-123456789abc".to_string())
543        );
544
545        // Extract the actual message
546        let payload: TestPayload = envelope.extract_message().unwrap();
547        assert_eq!(payload.id, 123);
548        assert_eq!(payload.name, "test message");
549    }
550
551    #[test]
552    fn test_masstransit_envelope_minimal() {
553        // Minimal MassTransit envelope (only message field)
554        let minimal_json = r#"{
555            "message": {
556                "id": 456,
557                "name": "minimal test"
558            }
559        }"#;
560
561        let envelope: MassTransitEnvelope = serde_json::from_str(minimal_json).unwrap();
562        assert_eq!(envelope.message_id, None);
563        assert_eq!(envelope.correlation_id, None);
564
565        let payload: TestPayload = envelope.extract_message().unwrap();
566        assert_eq!(payload.id, 456);
567        assert_eq!(payload.name, "minimal test");
568    }
569
570    #[test]
571    fn test_masstransit_correlation_id_extraction() {
572        let json = r#"{
573            "correlationId": "test-correlation-id",
574            "message": {"id": 1, "name": "test"}
575        }"#;
576
577        let envelope: MassTransitEnvelope = serde_json::from_str(json).unwrap();
578        assert_eq!(envelope.correlation_id(), Some("test-correlation-id"));
579        assert_eq!(envelope.message_id(), None);
580    }
581}