1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3
4#[derive(Debug, Clone, Serialize, Deserialize)]
7pub struct DeadLetterMessage {
8 pub original_id: String,
10
11 pub original_payload: serde_json::Value,
13
14 pub source_queue: String,
16
17 pub attempt_count: u32,
19
20 pub error_message: String,
22
23 pub dlq_timestamp: DateTime<Utc>,
25
26 pub last_worker_id: Option<String>,
28
29 pub failure_context: serde_json::Value,
31}
32
33impl DeadLetterMessage {
34 pub fn new(
36 original_id: String,
37 original_payload: serde_json::Value,
38 source_queue: String,
39 attempt_count: u32,
40 error_message: String,
41 ) -> Self {
42 Self {
43 original_id,
44 original_payload,
45 source_queue,
46 attempt_count,
47 error_message,
48 dlq_timestamp: Utc::now(),
49 last_worker_id: None,
50 failure_context: serde_json::json!({}),
51 }
52 }
53
54 pub fn with_worker_id(mut self, worker_id: String) -> Self {
56 self.last_worker_id = Some(worker_id);
57 self
58 }
59
60 pub fn with_context(mut self, key: &str, value: serde_json::Value) -> Self {
62 if let serde_json::Value::Object(ref mut map) = self.failure_context {
63 map.insert(key.to_string(), value);
64 }
65 self
66 }
67
68 pub fn to_json(&self) -> Result<String, serde_json::Error> {
70 serde_json::to_string(self)
71 }
72
73 pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
75 serde_json::from_str(json)
76 }
77}
78
79#[derive(Debug, Clone)]
81pub struct PoisonPillConfig {
82 pub max_failures: u32,
84
85 pub time_window: std::time::Duration,
87
88 pub immediate_dlq: bool,
90}
91
92impl Default for PoisonPillConfig {
93 fn default() -> Self {
94 Self {
95 max_failures: 10,
96 time_window: std::time::Duration::from_secs(3600), immediate_dlq: true,
98 }
99 }
100}
101
102#[derive(Debug)]
104pub struct PoisonPillTracker {
105 config: PoisonPillConfig,
106 failure_counts: std::sync::Mutex<std::collections::HashMap<String, Vec<DateTime<Utc>>>>,
109}
110
111impl PoisonPillTracker {
112 pub fn new(config: PoisonPillConfig) -> Self {
114 Self {
115 config,
116 failure_counts: std::sync::Mutex::new(std::collections::HashMap::new()),
117 }
118 }
119
120 pub fn record_failure(&self, message_id: &str) -> bool {
123 let mut counts = self.failure_counts.lock().unwrap();
124 let now = Utc::now();
125
126 let failures = counts.entry(message_id.to_string()).or_default();
128 failures.push(now);
129
130 let cutoff = now - chrono::Duration::from_std(self.config.time_window).unwrap_or_default();
132 failures.retain(|&t| t > cutoff);
133
134 let is_poison_pill = failures.len() >= self.config.max_failures as usize;
136
137 if is_poison_pill {
138 tracing::warn!(
139 "Poison pill detected for message {}: {} failures in {:?}",
140 message_id,
141 failures.len(),
142 self.config.time_window
143 );
144 }
145
146 is_poison_pill
147 }
148
149 pub fn get_failure_count(&self, message_id: &str) -> usize {
151 let counts = self.failure_counts.lock().unwrap();
152 counts.get(message_id).map(|v| v.len()).unwrap_or(0)
153 }
154
155 pub fn clear(&self, message_id: &str) {
157 let mut counts = self.failure_counts.lock().unwrap();
158 counts.remove(message_id);
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use std::time::Duration;
166
167 #[test]
168 fn test_dead_letter_message_creation() {
169 let dlq_msg = DeadLetterMessage::new(
170 "msg-123".to_string(),
171 serde_json::json!({"data": "test"}),
172 "my-queue".to_string(),
173 5,
174 "Processing failed".to_string(),
175 );
176
177 assert_eq!(dlq_msg.original_id, "msg-123");
178 assert_eq!(dlq_msg.attempt_count, 5);
179 assert_eq!(dlq_msg.source_queue, "my-queue");
180 }
181
182 #[test]
183 fn test_dead_letter_message_serialization() {
184 let dlq_msg = DeadLetterMessage::new(
185 "msg-123".to_string(),
186 serde_json::json!({"data": "test"}),
187 "my-queue".to_string(),
188 5,
189 "Processing failed".to_string(),
190 );
191
192 let json = dlq_msg.to_json().unwrap();
193 let parsed = DeadLetterMessage::from_json(&json).unwrap();
194
195 assert_eq!(parsed.original_id, dlq_msg.original_id);
196 assert_eq!(parsed.attempt_count, dlq_msg.attempt_count);
197 }
198
199 #[test]
200 fn test_poison_pill_detection() {
201 let config = PoisonPillConfig {
202 max_failures: 3,
203 time_window: Duration::from_secs(60),
204 immediate_dlq: true,
205 };
206
207 let tracker = PoisonPillTracker::new(config);
208
209 assert!(!tracker.record_failure("msg-1"));
211 assert!(!tracker.record_failure("msg-1"));
212
213 assert!(tracker.record_failure("msg-1"));
215 }
216}