1use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct WireMessage<T> {
17 pub data: T,
18 pub retry_attempt: u32,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
32#[serde(rename_all = "camelCase")]
33pub struct MassTransitEnvelope {
34 #[serde(default)]
36 pub message_id: Option<String>,
37
38 #[serde(default)]
40 pub correlation_id: Option<String>,
41
42 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub source_address: Option<String>,
45
46 #[serde(default, skip_serializing_if = "Option::is_none")]
48 pub destination_address: Option<String>,
49
50 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub sent_time: Option<DateTime<Utc>>,
53
54 #[serde(default, skip_serializing_if = "Option::is_none")]
57 pub message_type: Option<Vec<String>>,
58
59 #[serde(default, skip_serializing_if = "Option::is_none")]
61 pub headers: Option<HashMap<String, serde_json::Value>>,
62
63 pub message: serde_json::Value,
66}
67
68impl MassTransitEnvelope {
69 pub fn new<T>(message: &T) -> Result<Self, serde_json::Error>
72 where
73 T: Serialize,
74 {
75 let message_json = serde_json::to_value(message)?;
76
77 Ok(Self {
78 message_id: Some(uuid::Uuid::new_v4().to_string()),
79 correlation_id: None,
80 source_address: None,
81 destination_address: None,
82 sent_time: Some(Utc::now()),
83 message_type: None,
84 headers: None,
85 message: message_json,
86 })
87 }
88
89 fn normalize_message_type(message_type: &str) -> String {
92 if message_type.starts_with("urn:message:") {
93 message_type.to_string()
94 } else {
95 format!("urn:message:{}", message_type)
96 }
97 }
98
99 pub fn with_message_type<T>(message: &T, message_type: &str) -> Result<Self, serde_json::Error>
107 where
108 T: Serialize,
109 {
110 let mut envelope = Self::new(message)?;
111
112 let urn_type = Self::normalize_message_type(message_type);
114
115 envelope.message_type = Some(vec![urn_type.clone()]);
117
118 let mut headers = HashMap::new();
120 let message_type_array = vec![serde_json::Value::String(urn_type)];
121 headers.insert(
122 "MT-Host-MessageType".to_string(),
123 serde_json::Value::Array(message_type_array),
124 );
125 envelope.headers = Some(headers);
126
127 Ok(envelope)
128 }
129
130 pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
132 self.correlation_id = Some(correlation_id.into());
133 self
134 }
135
136 pub fn with_source_address(mut self, source_address: impl Into<String>) -> Self {
138 self.source_address = Some(source_address.into());
139 self
140 }
141
142 pub fn with_destination_address(mut self, destination_address: impl Into<String>) -> Self {
144 self.destination_address = Some(destination_address.into());
145 self
146 }
147
148 pub fn with_header(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
150 if self.headers.is_none() {
151 self.headers = Some(HashMap::new());
152 }
153 if let Some(ref mut headers) = self.headers {
154 headers.insert(key.into(), value);
155 }
156 self
157 }
158
159 pub fn with_message_type_header(mut self, message_type: &str) -> Self {
163 let urn_type = Self::normalize_message_type(message_type);
165
166 self.message_type = Some(vec![urn_type.clone()]);
168
169 if self.headers.is_none() {
171 self.headers = Some(HashMap::new());
172 }
173 if let Some(ref mut headers) = self.headers {
174 let message_type_array = vec![serde_json::Value::String(urn_type)];
175 headers.insert(
176 "MT-Host-MessageType".to_string(),
177 serde_json::Value::Array(message_type_array),
178 );
179 }
180 self
181 }
182
183 pub fn extract_message<T>(&self) -> Result<T, serde_json::Error>
185 where
186 T: for<'de> Deserialize<'de>,
187 {
188 serde_json::from_value(self.message.clone())
189 }
190
191 pub fn correlation_id(&self) -> Option<&str> {
193 self.correlation_id.as_deref()
194 }
195
196 pub fn message_id(&self) -> Option<&str> {
198 self.message_id.as_deref()
199 }
200
201 pub fn from_slice(bytes: &[u8]) -> Result<Self, serde_json::Error> {
203 serde_json::from_slice(bytes)
204 }
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct MessageEnvelope<T> {
210 pub payload: T,
212
213 pub metadata: MessageMetadata,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct MessageMetadata {
220 pub message_id: String,
222
223 pub retry_attempt: u32,
225
226 pub max_retries: u32,
228
229 pub created_at: DateTime<Utc>,
231
232 pub last_processed_at: DateTime<Utc>,
234
235 pub error_history: Vec<ErrorRecord>,
237
238 pub headers: HashMap<String, String>,
240
241 pub source: MessageSource,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct ErrorRecord {
248 pub attempt: u32,
250
251 pub error: String,
253
254 pub occurred_at: DateTime<Utc>,
256
257 pub error_type: ErrorType,
259
260 pub context: Option<String>,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
266pub enum ErrorType {
267 Transient,
269
270 Permanent,
272
273 Resource,
275
276 Unknown,
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct MessageSource {
283 pub queue: String,
285
286 pub exchange: Option<String>,
288
289 pub routing_key: Option<String>,
291
292 pub publisher: Option<String>,
294}
295
296impl<T> MessageEnvelope<T> {
297 pub fn new(payload: T, source_queue: &str) -> Self {
299 let now = Utc::now();
300
301 Self {
302 payload,
303 metadata: MessageMetadata {
304 message_id: uuid::Uuid::new_v4().to_string(),
305 retry_attempt: 0,
306 max_retries: 0, created_at: now,
308 last_processed_at: now,
309 error_history: Vec::new(),
310 headers: HashMap::new(),
311 source: MessageSource {
312 queue: source_queue.to_string(),
313 exchange: None,
314 routing_key: None,
315 publisher: None,
316 },
317 },
318 }
319 }
320
321 pub fn with_source(
323 payload: T,
324 queue: &str,
325 exchange: Option<&str>,
326 routing_key: Option<&str>,
327 publisher: Option<&str>,
328 ) -> Self {
329 let mut envelope = Self::new(payload, queue);
330 envelope.metadata.source.exchange = exchange.map(|s| s.to_string());
331 envelope.metadata.source.routing_key = routing_key.map(|s| s.to_string());
332 envelope.metadata.source.publisher = publisher.map(|s| s.to_string());
333 envelope
334 }
335
336 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
338 self.metadata.max_retries = max_retries;
339 self
340 }
341
342 pub fn with_header(mut self, key: &str, value: &str) -> Self {
344 self.metadata
345 .headers
346 .insert(key.to_string(), value.to_string());
347 self
348 }
349
350 pub fn is_retry_exhausted(&self) -> bool {
352 self.metadata.retry_attempt >= self.metadata.max_retries
353 }
354
355 pub fn is_first_attempt(&self) -> bool {
357 self.metadata.retry_attempt == 0
358 }
359
360 pub fn next_retry_attempt(&self) -> u32 {
362 self.metadata.retry_attempt + 1
363 }
364
365 pub fn with_error(mut self, error: &str, error_type: ErrorType, context: Option<&str>) -> Self {
367 let error_record = ErrorRecord {
369 attempt: self.metadata.retry_attempt,
370 error: error.to_string(),
371 occurred_at: Utc::now(),
372 error_type,
373 context: context.map(|s| s.to_string()),
374 };
375
376 self.metadata.error_history.push(error_record);
377
378 self.metadata.retry_attempt += 1;
380 self.metadata.last_processed_at = Utc::now();
381
382 self
383 }
384
385 pub fn last_error(&self) -> Option<&ErrorRecord> {
387 self.metadata.error_history.last()
388 }
389
390 pub fn errors_by_type(&self, error_type: &ErrorType) -> Vec<&ErrorRecord> {
392 self.metadata
393 .error_history
394 .iter()
395 .filter(|e| std::mem::discriminant(&e.error_type) == std::mem::discriminant(error_type))
396 .collect()
397 }
398
399 pub fn get_failure_summary(&self) -> String {
401 let total_errors = self.metadata.error_history.len();
402 let last_error = self.last_error();
403
404 match last_error {
405 Some(error) => {
406 format!(
407 "Message {} failed after {} attempts. Last error (attempt {}): {} [{}]",
408 self.metadata.message_id,
409 total_errors,
410 error.attempt + 1,
411 error.error,
412 match error.error_type {
413 ErrorType::Transient => "TRANSIENT",
414 ErrorType::Permanent => "PERMANENT",
415 ErrorType::Resource => "RESOURCE",
416 ErrorType::Unknown => "UNKNOWN",
417 }
418 )
419 }
420 None => format!("Message {} has no error history", self.metadata.message_id),
421 }
422 }
423
424 pub fn to_debug_json(&self) -> Result<String, serde_json::Error>
426 where
427 T: Serialize,
428 {
429 serde_json::to_string_pretty(self)
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436 use serde::{Deserialize, Serialize};
437
438 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
439 struct TestPayload {
440 id: u32,
441 name: String,
442 }
443
444 #[test]
445 fn test_message_envelope_creation() {
446 let payload = TestPayload {
447 id: 123,
448 name: "test".to_string(),
449 };
450
451 let envelope = MessageEnvelope::new(payload.clone(), "test_queue").with_max_retries(3); assert_eq!(envelope.payload, payload);
454 assert_eq!(envelope.metadata.retry_attempt, 0);
455 assert_eq!(envelope.metadata.source.queue, "test_queue");
456 assert!(envelope.is_first_attempt());
457 assert!(!envelope.is_retry_exhausted());
458 }
459
460 #[test]
461 fn test_error_tracking() {
462 let payload = TestPayload {
463 id: 123,
464 name: "test".to_string(),
465 };
466
467 let envelope = MessageEnvelope::new(payload, "test_queue")
468 .with_max_retries(3)
469 .with_error("First error", ErrorType::Transient, Some("Network timeout"))
470 .with_error("Second error", ErrorType::Resource, Some("Rate limited"));
471
472 assert_eq!(envelope.metadata.retry_attempt, 2);
473 assert_eq!(envelope.metadata.error_history.len(), 2);
474 assert!(!envelope.is_retry_exhausted());
475
476 let last_error = envelope.last_error().unwrap();
477 assert_eq!(last_error.error, "Second error");
478 assert_eq!(last_error.attempt, 1);
479 }
480
481 #[test]
482 fn test_retry_exhaustion() {
483 let payload = TestPayload {
484 id: 123,
485 name: "test".to_string(),
486 };
487
488 let envelope = MessageEnvelope::new(payload, "test_queue")
489 .with_max_retries(2)
490 .with_error("Error 1", ErrorType::Transient, None)
491 .with_error("Error 2", ErrorType::Transient, None)
492 .with_error("Error 3", ErrorType::Permanent, None);
493
494 assert!(envelope.is_retry_exhausted());
495 assert_eq!(envelope.next_retry_attempt(), 4);
496 }
497
498 #[test]
499 fn test_failure_summary() {
500 let payload = TestPayload {
501 id: 123,
502 name: "test".to_string(),
503 };
504
505 let envelope = MessageEnvelope::new(payload, "test_queue")
506 .with_max_retries(2)
507 .with_error(
508 "Database connection failed",
509 ErrorType::Transient,
510 Some("Timeout after 5s"),
511 )
512 .with_error("Invalid data format", ErrorType::Permanent, None);
513
514 let summary = envelope.get_failure_summary();
515 assert!(summary.contains("failed after 2 attempts"));
516 assert!(summary.contains("Invalid data format"));
517 assert!(summary.contains("PERMANENT"));
518 }
519
520 #[test]
521 fn test_masstransit_envelope_deserialization() {
522 let masstransit_json = r#"{
524 "messageId": "123e4567-e89b-12d3-a456-426614174000",
525 "correlationId": "987fcdeb-51a2-43d7-b890-123456789abc",
526 "sourceAddress": "rabbitmq://localhost/test",
527 "destinationAddress": "rabbitmq://localhost/queue",
528 "message": {
529 "id": 123,
530 "name": "test message"
531 }
532 }"#;
533
534 let envelope: MassTransitEnvelope = serde_json::from_str(masstransit_json).unwrap();
535
536 assert_eq!(
537 envelope.message_id,
538 Some("123e4567-e89b-12d3-a456-426614174000".to_string())
539 );
540 assert_eq!(
541 envelope.correlation_id,
542 Some("987fcdeb-51a2-43d7-b890-123456789abc".to_string())
543 );
544
545 let payload: TestPayload = envelope.extract_message().unwrap();
547 assert_eq!(payload.id, 123);
548 assert_eq!(payload.name, "test message");
549 }
550
551 #[test]
552 fn test_masstransit_envelope_minimal() {
553 let minimal_json = r#"{
555 "message": {
556 "id": 456,
557 "name": "minimal test"
558 }
559 }"#;
560
561 let envelope: MassTransitEnvelope = serde_json::from_str(minimal_json).unwrap();
562 assert_eq!(envelope.message_id, None);
563 assert_eq!(envelope.correlation_id, None);
564
565 let payload: TestPayload = envelope.extract_message().unwrap();
566 assert_eq!(payload.id, 456);
567 assert_eq!(payload.name, "minimal test");
568 }
569
570 #[test]
571 fn test_masstransit_correlation_id_extraction() {
572 let json = r#"{
573 "correlationId": "test-correlation-id",
574 "message": {"id": 1, "name": "test"}
575 }"#;
576
577 let envelope: MassTransitEnvelope = serde_json::from_str(json).unwrap();
578 assert_eq!(envelope.correlation_id(), Some("test-correlation-id"));
579 assert_eq!(envelope.message_id(), None);
580 }
581}