1use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct MessageEnvelope<T> {
16 pub payload: T,
18
19 pub metadata: MessageMetadata,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct MessageMetadata {
26 pub message_id: String,
28
29 pub retry_attempt: u32,
31
32 pub max_retries: u32,
34
35 pub created_at: DateTime<Utc>,
37
38 pub last_processed_at: DateTime<Utc>,
40
41 pub error_history: Vec<ErrorRecord>,
43
44 pub headers: HashMap<String, String>,
46
47 pub source: MessageSource,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ErrorRecord {
54 pub attempt: u32,
56
57 pub error: String,
59
60 pub occurred_at: DateTime<Utc>,
62
63 pub error_type: ErrorType,
65
66 pub context: Option<String>,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
72pub enum ErrorType {
73 Transient,
75
76 Permanent,
78
79 Resource,
81
82 Unknown,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct MessageSource {
89 pub queue: String,
91
92 pub exchange: Option<String>,
94
95 pub routing_key: Option<String>,
97
98 pub publisher: Option<String>,
100}
101
102impl<T> MessageEnvelope<T> {
103 pub fn new(payload: T, source_queue: &str) -> Self {
105 let now = Utc::now();
106
107 Self {
108 payload,
109 metadata: MessageMetadata {
110 message_id: uuid::Uuid::new_v4().to_string(),
111 retry_attempt: 0,
112 max_retries: 0, created_at: now,
114 last_processed_at: now,
115 error_history: Vec::new(),
116 headers: HashMap::new(),
117 source: MessageSource {
118 queue: source_queue.to_string(),
119 exchange: None,
120 routing_key: None,
121 publisher: None,
122 },
123 },
124 }
125 }
126
127 pub fn with_source(
129 payload: T,
130 queue: &str,
131 exchange: Option<&str>,
132 routing_key: Option<&str>,
133 publisher: Option<&str>,
134 ) -> Self {
135 let mut envelope = Self::new(payload, queue);
136 envelope.metadata.source.exchange = exchange.map(|s| s.to_string());
137 envelope.metadata.source.routing_key = routing_key.map(|s| s.to_string());
138 envelope.metadata.source.publisher = publisher.map(|s| s.to_string());
139 envelope
140 }
141
142 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
144 self.metadata.max_retries = max_retries;
145 self
146 }
147
148 pub fn with_header(mut self, key: &str, value: &str) -> Self {
150 self.metadata.headers.insert(key.to_string(), value.to_string());
151 self
152 }
153
154 pub fn is_retry_exhausted(&self) -> bool {
156 self.metadata.retry_attempt >= self.metadata.max_retries
157 }
158
159 pub fn is_first_attempt(&self) -> bool {
161 self.metadata.retry_attempt == 0
162 }
163
164 pub fn next_retry_attempt(&self) -> u32 {
166 self.metadata.retry_attempt + 1
167 }
168
169 pub fn with_error(mut self, error: &str, error_type: ErrorType, context: Option<&str>) -> Self {
171 let error_record = ErrorRecord {
173 attempt: self.metadata.retry_attempt,
174 error: error.to_string(),
175 occurred_at: Utc::now(),
176 error_type,
177 context: context.map(|s| s.to_string()),
178 };
179
180 self.metadata.error_history.push(error_record);
181
182 self.metadata.retry_attempt += 1;
184 self.metadata.last_processed_at = Utc::now();
185
186 self
187 }
188
189 pub fn last_error(&self) -> Option<&ErrorRecord> {
191 self.metadata.error_history.last()
192 }
193
194 pub fn errors_by_type(&self, error_type: &ErrorType) -> Vec<&ErrorRecord> {
196 self.metadata.error_history
197 .iter()
198 .filter(|e| std::mem::discriminant(&e.error_type) == std::mem::discriminant(error_type))
199 .collect()
200 }
201
202 pub fn get_failure_summary(&self) -> String {
204 let total_errors = self.metadata.error_history.len();
205 let last_error = self.last_error();
206
207 match last_error {
208 Some(error) => {
209 format!(
210 "Message {} failed after {} attempts. Last error (attempt {}): {} [{}]",
211 self.metadata.message_id,
212 total_errors,
213 error.attempt + 1,
214 error.error,
215 match error.error_type {
216 ErrorType::Transient => "TRANSIENT",
217 ErrorType::Permanent => "PERMANENT",
218 ErrorType::Resource => "RESOURCE",
219 ErrorType::Unknown => "UNKNOWN",
220 }
221 )
222 }
223 None => format!("Message {} has no error history", self.metadata.message_id),
224 }
225 }
226
227 pub fn to_debug_json(&self) -> Result<String, serde_json::Error>
229 where
230 T: Serialize
231 {
232 serde_json::to_string_pretty(self)
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239 use serde::{Deserialize, Serialize};
240
241 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
242 struct TestPayload {
243 id: u32,
244 name: String,
245 }
246
247 #[test]
248 fn test_message_envelope_creation() {
249 let payload = TestPayload {
250 id: 123,
251 name: "test".to_string(),
252 };
253
254 let envelope = MessageEnvelope::new(payload.clone(), "test_queue")
255 .with_max_retries(3); assert_eq!(envelope.payload, payload);
258 assert_eq!(envelope.metadata.retry_attempt, 0);
259 assert_eq!(envelope.metadata.source.queue, "test_queue");
260 assert!(envelope.is_first_attempt());
261 assert!(!envelope.is_retry_exhausted());
262 }
263
264 #[test]
265 fn test_error_tracking() {
266 let payload = TestPayload {
267 id: 123,
268 name: "test".to_string(),
269 };
270
271 let envelope = MessageEnvelope::new(payload, "test_queue")
272 .with_max_retries(3)
273 .with_error("First error", ErrorType::Transient, Some("Network timeout"))
274 .with_error("Second error", ErrorType::Resource, Some("Rate limited"));
275
276 assert_eq!(envelope.metadata.retry_attempt, 2);
277 assert_eq!(envelope.metadata.error_history.len(), 2);
278 assert!(!envelope.is_retry_exhausted());
279
280 let last_error = envelope.last_error().unwrap();
281 assert_eq!(last_error.error, "Second error");
282 assert_eq!(last_error.attempt, 1);
283 }
284
285 #[test]
286 fn test_retry_exhaustion() {
287 let payload = TestPayload {
288 id: 123,
289 name: "test".to_string(),
290 };
291
292 let envelope = MessageEnvelope::new(payload, "test_queue")
293 .with_max_retries(2)
294 .with_error("Error 1", ErrorType::Transient, None)
295 .with_error("Error 2", ErrorType::Transient, None)
296 .with_error("Error 3", ErrorType::Permanent, None);
297
298 assert!(envelope.is_retry_exhausted());
299 assert_eq!(envelope.next_retry_attempt(), 4);
300 }
301
302 #[test]
303 fn test_failure_summary() {
304 let payload = TestPayload {
305 id: 123,
306 name: "test".to_string(),
307 };
308
309 let envelope = MessageEnvelope::new(payload, "test_queue")
310 .with_max_retries(2)
311 .with_error("Database connection failed", ErrorType::Transient, Some("Timeout after 5s"))
312 .with_error("Invalid data format", ErrorType::Permanent, None);
313
314 let summary = envelope.get_failure_summary();
315 assert!(summary.contains("failed after 2 attempts"));
316 assert!(summary.contains("Invalid data format"));
317 assert!(summary.contains("PERMANENT"));
318 }
319}