Skip to main content

foxtive_worker/
dlq.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3
4/// Represents a message that has been sent to the Dead Letter Queue.
5/// Contains the original message payload along with failure context.
6#[derive(Debug, Clone, Serialize, Deserialize)]
7pub struct DeadLetterMessage {
8    /// Original message ID
9    pub original_id: String,
10    
11    /// Original message payload
12    pub original_payload: serde_json::Value,
13    
14    /// Queue where the message originated
15    pub source_queue: String,
16    
17    /// Number of processing attempts before DLQ
18    pub attempt_count: u32,
19    
20    /// Error that caused the final failure
21    pub error_message: String,
22    
23    /// Timestamp when message was sent to DLQ
24    pub dlq_timestamp: DateTime<Utc>,
25    
26    /// Worker ID that processed the message last
27    pub last_worker_id: Option<String>,
28    
29    /// Additional metadata about the failure
30    pub failure_context: serde_json::Value,
31}
32
33impl DeadLetterMessage {
34    /// Create a new dead letter message from processing context.
35    pub fn new(
36        original_id: String,
37        original_payload: serde_json::Value,
38        source_queue: String,
39        attempt_count: u32,
40        error_message: String,
41    ) -> Self {
42        Self {
43            original_id,
44            original_payload,
45            source_queue,
46            attempt_count,
47            error_message,
48            dlq_timestamp: Utc::now(),
49            last_worker_id: None,
50            failure_context: serde_json::json!({}),
51        }
52    }
53
54    /// Set the worker ID that last processed this message.
55    pub fn with_worker_id(mut self, worker_id: String) -> Self {
56        self.last_worker_id = Some(worker_id);
57        self
58    }
59
60    /// Add additional failure context metadata.
61    pub fn with_context(mut self, key: &str, value: serde_json::Value) -> Self {
62        if let serde_json::Value::Object(ref mut map) = self.failure_context {
63            map.insert(key.to_string(), value);
64        }
65        self
66    }
67
68    /// Convert to JSON string for storage/transmission.
69    pub fn to_json(&self) -> Result<String, serde_json::Error> {
70        serde_json::to_string(self)
71    }
72
73    /// Parse from JSON string.
74    pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
75        serde_json::from_str(json)
76    }
77}
78
79/// Configuration for poison pill detection.
80#[derive(Debug, Clone)]
81pub struct PoisonPillConfig {
82    /// Maximum number of failures before considering a message a poison pill
83    pub max_failures: u32,
84    
85    /// Time window for tracking failures (e.g., failures within 1 hour)
86    pub time_window: std::time::Duration,
87    
88    /// Whether to immediately send to DLQ when poison pill is detected
89    pub immediate_dlq: bool,
90}
91
92impl Default for PoisonPillConfig {
93    fn default() -> Self {
94        Self {
95            max_failures: 10,
96            time_window: std::time::Duration::from_secs(3600), // 1 hour
97            immediate_dlq: true,
98        }
99    }
100}
101
102/// Tracks message failures to detect poison pills.
103#[derive(Debug)]
104pub struct PoisonPillTracker {
105    config: PoisonPillConfig,
106    // In a production system, this would use Redis or another shared store
107    // For now, we'll use in-memory tracking (per-process)
108    failure_counts: std::sync::Mutex<std::collections::HashMap<String, Vec<DateTime<Utc>>>>,
109}
110
111impl PoisonPillTracker {
112    /// Create a new poison pill tracker with the given configuration.
113    pub fn new(config: PoisonPillConfig) -> Self {
114        Self {
115            config,
116            failure_counts: std::sync::Mutex::new(std::collections::HashMap::new()),
117        }
118    }
119
120    /// Record a failure for a message and check if it's a poison pill.
121    /// Returns true if the message is considered a poison pill.
122    pub fn record_failure(&self, message_id: &str) -> bool {
123        let mut counts = self.failure_counts.lock().unwrap();
124        let now = Utc::now();
125        
126        // Get or create failure timestamps for this message
127        let failures = counts.entry(message_id.to_string()).or_default();
128        failures.push(now);
129        
130        // Remove old failures outside the time window
131        let cutoff = now - chrono::Duration::from_std(self.config.time_window).unwrap_or_default();
132        failures.retain(|&t| t > cutoff);
133        
134        // Check if this exceeds the threshold
135        let is_poison_pill = failures.len() >= self.config.max_failures as usize;
136        
137        if is_poison_pill {
138            tracing::warn!(
139                "Poison pill detected for message {}: {} failures in {:?}",
140                message_id,
141                failures.len(),
142                self.config.time_window
143            );
144        }
145        
146        is_poison_pill
147    }
148
149    /// Get the current failure count for a message.
150    pub fn get_failure_count(&self, message_id: &str) -> usize {
151        let counts = self.failure_counts.lock().unwrap();
152        counts.get(message_id).map(|v| v.len()).unwrap_or(0)
153    }
154
155    /// Clear tracking data for a message (e.g., after successful processing).
156    pub fn clear(&self, message_id: &str) {
157        let mut counts = self.failure_counts.lock().unwrap();
158        counts.remove(message_id);
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use std::time::Duration;
166
167    #[test]
168    fn test_dead_letter_message_creation() {
169        let dlq_msg = DeadLetterMessage::new(
170            "msg-123".to_string(),
171            serde_json::json!({"data": "test"}),
172            "my-queue".to_string(),
173            5,
174            "Processing failed".to_string(),
175        );
176
177        assert_eq!(dlq_msg.original_id, "msg-123");
178        assert_eq!(dlq_msg.attempt_count, 5);
179        assert_eq!(dlq_msg.source_queue, "my-queue");
180    }
181
182    #[test]
183    fn test_dead_letter_message_serialization() {
184        let dlq_msg = DeadLetterMessage::new(
185            "msg-123".to_string(),
186            serde_json::json!({"data": "test"}),
187            "my-queue".to_string(),
188            5,
189            "Processing failed".to_string(),
190        );
191
192        let json = dlq_msg.to_json().unwrap();
193        let parsed = DeadLetterMessage::from_json(&json).unwrap();
194
195        assert_eq!(parsed.original_id, dlq_msg.original_id);
196        assert_eq!(parsed.attempt_count, dlq_msg.attempt_count);
197    }
198
199    #[test]
200    fn test_poison_pill_detection() {
201        let config = PoisonPillConfig {
202            max_failures: 3,
203            time_window: Duration::from_secs(60),
204            immediate_dlq: true,
205        };
206
207        let tracker = PoisonPillTracker::new(config);
208
209        // First two failures should not trigger
210        assert!(!tracker.record_failure("msg-1"));
211        assert!(!tracker.record_failure("msg-1"));
212
213        // Third failure should trigger poison pill detection
214        assert!(tracker.record_failure("msg-1"));
215    }
216}