use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WireMessage<T> {
pub data: T,
pub retry_attempt: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MassTransitEnvelope {
#[serde(default)]
pub message_id: Option<String>,
#[serde(default)]
pub correlation_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source_address: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub destination_address: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sent_time: Option<DateTime<Utc>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message_type: Option<Vec<String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub headers: Option<HashMap<String, serde_json::Value>>,
pub message: serde_json::Value,
}
impl MassTransitEnvelope {
pub fn new<T>(message: &T) -> Result<Self, serde_json::Error>
where
T: Serialize,
{
let message_json = serde_json::to_value(message)?;
Ok(Self {
message_id: Some(uuid::Uuid::new_v4().to_string()),
correlation_id: None,
source_address: None,
destination_address: None,
sent_time: Some(Utc::now()),
message_type: None,
headers: None,
message: message_json,
})
}
fn normalize_message_type(message_type: &str) -> String {
if message_type.starts_with("urn:message:") {
message_type.to_string()
} else {
format!("urn:message:{}", message_type)
}
}
pub fn with_message_type<T>(message: &T, message_type: &str) -> Result<Self, serde_json::Error>
where
T: Serialize,
{
let mut envelope = Self::new(message)?;
let urn_type = Self::normalize_message_type(message_type);
envelope.message_type = Some(vec![urn_type.clone()]);
let mut headers = HashMap::new();
let message_type_array = vec![serde_json::Value::String(urn_type)];
headers.insert(
"MT-Host-MessageType".to_string(),
serde_json::Value::Array(message_type_array),
);
envelope.headers = Some(headers);
Ok(envelope)
}
pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
self.correlation_id = Some(correlation_id.into());
self
}
pub fn with_source_address(mut self, source_address: impl Into<String>) -> Self {
self.source_address = Some(source_address.into());
self
}
pub fn with_destination_address(mut self, destination_address: impl Into<String>) -> Self {
self.destination_address = Some(destination_address.into());
self
}
pub fn with_header(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
if self.headers.is_none() {
self.headers = Some(HashMap::new());
}
if let Some(ref mut headers) = self.headers {
headers.insert(key.into(), value);
}
self
}
pub fn with_message_type_header(mut self, message_type: &str) -> Self {
let urn_type = Self::normalize_message_type(message_type);
self.message_type = Some(vec![urn_type.clone()]);
if self.headers.is_none() {
self.headers = Some(HashMap::new());
}
if let Some(ref mut headers) = self.headers {
let message_type_array = vec![serde_json::Value::String(urn_type)];
headers.insert(
"MT-Host-MessageType".to_string(),
serde_json::Value::Array(message_type_array),
);
}
self
}
pub fn extract_message<T>(&self) -> Result<T, serde_json::Error>
where
T: for<'de> Deserialize<'de>,
{
serde_json::from_value(self.message.clone())
}
pub fn correlation_id(&self) -> Option<&str> {
self.correlation_id.as_deref()
}
pub fn message_id(&self) -> Option<&str> {
self.message_id.as_deref()
}
pub fn from_slice(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(bytes)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageEnvelope<T> {
pub payload: T,
pub metadata: MessageMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageMetadata {
pub message_id: String,
pub retry_attempt: u32,
pub max_retries: u32,
pub created_at: DateTime<Utc>,
pub last_processed_at: DateTime<Utc>,
pub error_history: Vec<ErrorRecord>,
pub headers: HashMap<String, String>,
pub source: MessageSource,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorRecord {
pub attempt: u32,
pub error: String,
pub occurred_at: DateTime<Utc>,
pub error_type: ErrorType,
pub context: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ErrorType {
Transient,
Permanent,
Resource,
Unknown,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageSource {
pub queue: String,
pub exchange: Option<String>,
pub routing_key: Option<String>,
pub publisher: Option<String>,
}
impl<T> MessageEnvelope<T> {
pub fn new(payload: T, source_queue: &str) -> Self {
let now = Utc::now();
Self {
payload,
metadata: MessageMetadata {
message_id: uuid::Uuid::new_v4().to_string(),
retry_attempt: 0,
max_retries: 0, created_at: now,
last_processed_at: now,
error_history: Vec::new(),
headers: HashMap::new(),
source: MessageSource {
queue: source_queue.to_string(),
exchange: None,
routing_key: None,
publisher: None,
},
},
}
}
pub fn with_source(
payload: T,
queue: &str,
exchange: Option<&str>,
routing_key: Option<&str>,
publisher: Option<&str>,
) -> Self {
let mut envelope = Self::new(payload, queue);
envelope.metadata.source.exchange = exchange.map(|s| s.to_string());
envelope.metadata.source.routing_key = routing_key.map(|s| s.to_string());
envelope.metadata.source.publisher = publisher.map(|s| s.to_string());
envelope
}
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
self.metadata.max_retries = max_retries;
self
}
pub fn with_header(mut self, key: &str, value: &str) -> Self {
self.metadata
.headers
.insert(key.to_string(), value.to_string());
self
}
pub fn is_retry_exhausted(&self) -> bool {
self.metadata.retry_attempt >= self.metadata.max_retries
}
pub fn is_first_attempt(&self) -> bool {
self.metadata.retry_attempt == 0
}
pub fn next_retry_attempt(&self) -> u32 {
self.metadata.retry_attempt + 1
}
pub fn with_error(mut self, error: &str, error_type: ErrorType, context: Option<&str>) -> Self {
let error_record = ErrorRecord {
attempt: self.metadata.retry_attempt,
error: error.to_string(),
occurred_at: Utc::now(),
error_type,
context: context.map(|s| s.to_string()),
};
self.metadata.error_history.push(error_record);
self.metadata.retry_attempt += 1;
self.metadata.last_processed_at = Utc::now();
self
}
pub fn last_error(&self) -> Option<&ErrorRecord> {
self.metadata.error_history.last()
}
pub fn errors_by_type(&self, error_type: &ErrorType) -> Vec<&ErrorRecord> {
self.metadata
.error_history
.iter()
.filter(|e| std::mem::discriminant(&e.error_type) == std::mem::discriminant(error_type))
.collect()
}
pub fn get_failure_summary(&self) -> String {
let total_errors = self.metadata.error_history.len();
let last_error = self.last_error();
match last_error {
Some(error) => {
format!(
"Message {} failed after {} attempts. Last error (attempt {}): {} [{}]",
self.metadata.message_id,
total_errors,
error.attempt + 1,
error.error,
match error.error_type {
ErrorType::Transient => "TRANSIENT",
ErrorType::Permanent => "PERMANENT",
ErrorType::Resource => "RESOURCE",
ErrorType::Unknown => "UNKNOWN",
}
)
}
None => format!("Message {} has no error history", self.metadata.message_id),
}
}
pub fn to_debug_json(&self) -> Result<String, serde_json::Error>
where
T: Serialize,
{
serde_json::to_string_pretty(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct TestPayload {
id: u32,
name: String,
}
#[test]
fn test_message_envelope_creation() {
let payload = TestPayload {
id: 123,
name: "test".to_string(),
};
let envelope = MessageEnvelope::new(payload.clone(), "test_queue").with_max_retries(3);
assert_eq!(envelope.payload, payload);
assert_eq!(envelope.metadata.retry_attempt, 0);
assert_eq!(envelope.metadata.source.queue, "test_queue");
assert!(envelope.is_first_attempt());
assert!(!envelope.is_retry_exhausted());
}
#[test]
fn test_error_tracking() {
let payload = TestPayload {
id: 123,
name: "test".to_string(),
};
let envelope = MessageEnvelope::new(payload, "test_queue")
.with_max_retries(3)
.with_error("First error", ErrorType::Transient, Some("Network timeout"))
.with_error("Second error", ErrorType::Resource, Some("Rate limited"));
assert_eq!(envelope.metadata.retry_attempt, 2);
assert_eq!(envelope.metadata.error_history.len(), 2);
assert!(!envelope.is_retry_exhausted());
let last_error = envelope.last_error().unwrap();
assert_eq!(last_error.error, "Second error");
assert_eq!(last_error.attempt, 1);
}
#[test]
fn test_retry_exhaustion() {
let payload = TestPayload {
id: 123,
name: "test".to_string(),
};
let envelope = MessageEnvelope::new(payload, "test_queue")
.with_max_retries(2)
.with_error("Error 1", ErrorType::Transient, None)
.with_error("Error 2", ErrorType::Transient, None)
.with_error("Error 3", ErrorType::Permanent, None);
assert!(envelope.is_retry_exhausted());
assert_eq!(envelope.next_retry_attempt(), 4);
}
#[test]
fn test_failure_summary() {
let payload = TestPayload {
id: 123,
name: "test".to_string(),
};
let envelope = MessageEnvelope::new(payload, "test_queue")
.with_max_retries(2)
.with_error(
"Database connection failed",
ErrorType::Transient,
Some("Timeout after 5s"),
)
.with_error("Invalid data format", ErrorType::Permanent, None);
let summary = envelope.get_failure_summary();
assert!(summary.contains("failed after 2 attempts"));
assert!(summary.contains("Invalid data format"));
assert!(summary.contains("PERMANENT"));
}
#[test]
fn test_masstransit_envelope_deserialization() {
let masstransit_json = r#"{
"messageId": "123e4567-e89b-12d3-a456-426614174000",
"correlationId": "987fcdeb-51a2-43d7-b890-123456789abc",
"sourceAddress": "rabbitmq://localhost/test",
"destinationAddress": "rabbitmq://localhost/queue",
"message": {
"id": 123,
"name": "test message"
}
}"#;
let envelope: MassTransitEnvelope = serde_json::from_str(masstransit_json).unwrap();
assert_eq!(
envelope.message_id,
Some("123e4567-e89b-12d3-a456-426614174000".to_string())
);
assert_eq!(
envelope.correlation_id,
Some("987fcdeb-51a2-43d7-b890-123456789abc".to_string())
);
let payload: TestPayload = envelope.extract_message().unwrap();
assert_eq!(payload.id, 123);
assert_eq!(payload.name, "test message");
}
#[test]
fn test_masstransit_envelope_minimal() {
let minimal_json = r#"{
"message": {
"id": 456,
"name": "minimal test"
}
}"#;
let envelope: MassTransitEnvelope = serde_json::from_str(minimal_json).unwrap();
assert_eq!(envelope.message_id, None);
assert_eq!(envelope.correlation_id, None);
let payload: TestPayload = envelope.extract_message().unwrap();
assert_eq!(payload.id, 456);
assert_eq!(payload.name, "minimal test");
}
#[test]
fn test_masstransit_correlation_id_extraction() {
let json = r#"{
"correlationId": "test-correlation-id",
"message": {"id": 1, "name": "test"}
}"#;
let envelope: MassTransitEnvelope = serde_json::from_str(json).unwrap();
assert_eq!(envelope.correlation_id(), Some("test-correlation-id"));
assert_eq!(envelope.message_id(), None);
}
}