1use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct WireMessage<T> {
16 pub data: T,
17 pub retry_attempt: u32,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct MessageEnvelope<T> {
23 pub payload: T,
25
26 pub metadata: MessageMetadata,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct MessageMetadata {
33 pub message_id: String,
35
36 pub retry_attempt: u32,
38
39 pub max_retries: u32,
41
42 pub created_at: DateTime<Utc>,
44
45 pub last_processed_at: DateTime<Utc>,
47
48 pub error_history: Vec<ErrorRecord>,
50
51 pub headers: HashMap<String, String>,
53
54 pub source: MessageSource,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct ErrorRecord {
61 pub attempt: u32,
63
64 pub error: String,
66
67 pub occurred_at: DateTime<Utc>,
69
70 pub error_type: ErrorType,
72
73 pub context: Option<String>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub enum ErrorType {
80 Transient,
82
83 Permanent,
85
86 Resource,
88
89 Unknown,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct MessageSource {
96 pub queue: String,
98
99 pub exchange: Option<String>,
101
102 pub routing_key: Option<String>,
104
105 pub publisher: Option<String>,
107}
108
109impl<T> MessageEnvelope<T> {
110 pub fn new(payload: T, source_queue: &str) -> Self {
112 let now = Utc::now();
113
114 Self {
115 payload,
116 metadata: MessageMetadata {
117 message_id: uuid::Uuid::new_v4().to_string(),
118 retry_attempt: 0,
119 max_retries: 0, created_at: now,
121 last_processed_at: now,
122 error_history: Vec::new(),
123 headers: HashMap::new(),
124 source: MessageSource {
125 queue: source_queue.to_string(),
126 exchange: None,
127 routing_key: None,
128 publisher: None,
129 },
130 },
131 }
132 }
133
134 pub fn with_source(
136 payload: T,
137 queue: &str,
138 exchange: Option<&str>,
139 routing_key: Option<&str>,
140 publisher: Option<&str>,
141 ) -> Self {
142 let mut envelope = Self::new(payload, queue);
143 envelope.metadata.source.exchange = exchange.map(|s| s.to_string());
144 envelope.metadata.source.routing_key = routing_key.map(|s| s.to_string());
145 envelope.metadata.source.publisher = publisher.map(|s| s.to_string());
146 envelope
147 }
148
149 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
151 self.metadata.max_retries = max_retries;
152 self
153 }
154
155 pub fn with_header(mut self, key: &str, value: &str) -> Self {
157 self.metadata
158 .headers
159 .insert(key.to_string(), value.to_string());
160 self
161 }
162
163 pub fn is_retry_exhausted(&self) -> bool {
165 self.metadata.retry_attempt >= self.metadata.max_retries
166 }
167
168 pub fn is_first_attempt(&self) -> bool {
170 self.metadata.retry_attempt == 0
171 }
172
173 pub fn next_retry_attempt(&self) -> u32 {
175 self.metadata.retry_attempt + 1
176 }
177
178 pub fn with_error(mut self, error: &str, error_type: ErrorType, context: Option<&str>) -> Self {
180 let error_record = ErrorRecord {
182 attempt: self.metadata.retry_attempt,
183 error: error.to_string(),
184 occurred_at: Utc::now(),
185 error_type,
186 context: context.map(|s| s.to_string()),
187 };
188
189 self.metadata.error_history.push(error_record);
190
191 self.metadata.retry_attempt += 1;
193 self.metadata.last_processed_at = Utc::now();
194
195 self
196 }
197
198 pub fn last_error(&self) -> Option<&ErrorRecord> {
200 self.metadata.error_history.last()
201 }
202
203 pub fn errors_by_type(&self, error_type: &ErrorType) -> Vec<&ErrorRecord> {
205 self.metadata
206 .error_history
207 .iter()
208 .filter(|e| std::mem::discriminant(&e.error_type) == std::mem::discriminant(error_type))
209 .collect()
210 }
211
212 pub fn get_failure_summary(&self) -> String {
214 let total_errors = self.metadata.error_history.len();
215 let last_error = self.last_error();
216
217 match last_error {
218 Some(error) => {
219 format!(
220 "Message {} failed after {} attempts. Last error (attempt {}): {} [{}]",
221 self.metadata.message_id,
222 total_errors,
223 error.attempt + 1,
224 error.error,
225 match error.error_type {
226 ErrorType::Transient => "TRANSIENT",
227 ErrorType::Permanent => "PERMANENT",
228 ErrorType::Resource => "RESOURCE",
229 ErrorType::Unknown => "UNKNOWN",
230 }
231 )
232 }
233 None => format!("Message {} has no error history", self.metadata.message_id),
234 }
235 }
236
237 pub fn to_debug_json(&self) -> Result<String, serde_json::Error>
239 where
240 T: Serialize,
241 {
242 serde_json::to_string_pretty(self)
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use serde::{Deserialize, Serialize};
250
251 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
252 struct TestPayload {
253 id: u32,
254 name: String,
255 }
256
257 #[test]
258 fn test_message_envelope_creation() {
259 let payload = TestPayload {
260 id: 123,
261 name: "test".to_string(),
262 };
263
264 let envelope = MessageEnvelope::new(payload.clone(), "test_queue").with_max_retries(3); assert_eq!(envelope.payload, payload);
267 assert_eq!(envelope.metadata.retry_attempt, 0);
268 assert_eq!(envelope.metadata.source.queue, "test_queue");
269 assert!(envelope.is_first_attempt());
270 assert!(!envelope.is_retry_exhausted());
271 }
272
273 #[test]
274 fn test_error_tracking() {
275 let payload = TestPayload {
276 id: 123,
277 name: "test".to_string(),
278 };
279
280 let envelope = MessageEnvelope::new(payload, "test_queue")
281 .with_max_retries(3)
282 .with_error("First error", ErrorType::Transient, Some("Network timeout"))
283 .with_error("Second error", ErrorType::Resource, Some("Rate limited"));
284
285 assert_eq!(envelope.metadata.retry_attempt, 2);
286 assert_eq!(envelope.metadata.error_history.len(), 2);
287 assert!(!envelope.is_retry_exhausted());
288
289 let last_error = envelope.last_error().unwrap();
290 assert_eq!(last_error.error, "Second error");
291 assert_eq!(last_error.attempt, 1);
292 }
293
294 #[test]
295 fn test_retry_exhaustion() {
296 let payload = TestPayload {
297 id: 123,
298 name: "test".to_string(),
299 };
300
301 let envelope = MessageEnvelope::new(payload, "test_queue")
302 .with_max_retries(2)
303 .with_error("Error 1", ErrorType::Transient, None)
304 .with_error("Error 2", ErrorType::Transient, None)
305 .with_error("Error 3", ErrorType::Permanent, None);
306
307 assert!(envelope.is_retry_exhausted());
308 assert_eq!(envelope.next_retry_attempt(), 4);
309 }
310
311 #[test]
312 fn test_failure_summary() {
313 let payload = TestPayload {
314 id: 123,
315 name: "test".to_string(),
316 };
317
318 let envelope = MessageEnvelope::new(payload, "test_queue")
319 .with_max_retries(2)
320 .with_error(
321 "Database connection failed",
322 ErrorType::Transient,
323 Some("Timeout after 5s"),
324 )
325 .with_error("Invalid data format", ErrorType::Permanent, None);
326
327 let summary = envelope.get_failure_summary();
328 assert!(summary.contains("failed after 2 attempts"));
329 assert!(summary.contains("Invalid data format"));
330 assert!(summary.contains("PERMANENT"));
331 }
332}