Skip to main content

a3s_event/
dlq.rs

1//! Dead Letter Queue — handle events that exceed max delivery attempts
2//!
3//! Provides a `DlqHandler` trait for routing failed events. This is an
4//! application-level concern — providers handle retry/backoff natively,
5//! but DLQ routing lives above the provider layer.
6
7use crate::error::Result;
8#[cfg(feature = "routing")]
9use crate::sink::EventSink;
10use crate::types::ReceivedEvent;
11use crate::types::now_millis;
12use async_trait::async_trait;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15
16// Event is used by SinkDlqHandler (routing) and tests
17#[cfg(any(feature = "routing", test))]
18use crate::types::Event;
19
20/// A failed event with context about why it ended up in the DLQ
21#[derive(Debug, Clone)]
22pub struct DeadLetterEvent {
23    /// The original received event
24    pub event: ReceivedEvent,
25
26    /// Reason the event was sent to DLQ
27    pub reason: String,
28
29    /// Unix timestamp in milliseconds when the event was dead-lettered
30    pub dead_lettered_at: u64,
31
32    /// Original subject the event was published to (for routing context)
33    pub original_subject: Option<String>,
34
35    /// Number of delivery attempts before dead-lettering
36    pub delivery_attempts: Option<u64>,
37
38    /// Unix timestamp in milliseconds of the first delivery failure
39    pub first_failure_at: Option<u64>,
40}
41
42/// Trait for dead letter queue handlers
43///
44/// Implementations decide what to do with events that exceed
45/// max delivery attempts or fail processing permanently.
46#[async_trait]
47pub trait DlqHandler: Send + Sync {
48    /// Handle a dead-lettered event
49    ///
50    /// Called when an event exceeds max delivery attempts or is
51    /// explicitly rejected. Implementations may log, store, forward,
52    /// or alert on the failed event.
53    async fn handle(&self, event: DeadLetterEvent) -> Result<()>;
54
55    /// Get the number of events currently in the DLQ
56    async fn count(&self) -> Result<usize>;
57
58    /// List recent dead-lettered events
59    async fn list(&self, limit: usize) -> Result<Vec<DeadLetterEvent>>;
60}
61
62/// In-memory DLQ handler for development and testing
63///
64/// Stores dead-lettered events in a `Vec` with configurable max capacity.
65pub struct MemoryDlqHandler {
66    events: Arc<RwLock<Vec<DeadLetterEvent>>>,
67    max_events: usize,
68}
69
70impl MemoryDlqHandler {
71    /// Create a new in-memory DLQ handler
72    pub fn new(max_events: usize) -> Self {
73        Self {
74            events: Arc::new(RwLock::new(Vec::new())),
75            max_events,
76        }
77    }
78}
79
80impl Default for MemoryDlqHandler {
81    fn default() -> Self {
82        Self::new(10_000)
83    }
84}
85
86#[async_trait]
87impl DlqHandler for MemoryDlqHandler {
88    async fn handle(&self, event: DeadLetterEvent) -> Result<()> {
89        tracing::warn!(
90            event_id = %event.event.event.id,
91            subject = %event.event.event.subject,
92            num_delivered = event.event.num_delivered,
93            reason = %event.reason,
94            "Event dead-lettered"
95        );
96
97        let mut events = self.events.write().await;
98        events.push(event);
99
100        // Enforce max capacity
101        if self.max_events > 0 && events.len() > self.max_events {
102            let drain_count = events.len() - self.max_events;
103            events.drain(..drain_count);
104        }
105
106        Ok(())
107    }
108
109    async fn count(&self) -> Result<usize> {
110        let events = self.events.read().await;
111        Ok(events.len())
112    }
113
114    async fn list(&self, limit: usize) -> Result<Vec<DeadLetterEvent>> {
115        let events = self.events.read().await;
116        let result: Vec<DeadLetterEvent> = events.iter().rev().take(limit).cloned().collect();
117        Ok(result)
118    }
119}
120
121/// Check if a received event should be dead-lettered based on max delivery count
122pub fn should_dead_letter(event: &ReceivedEvent, max_deliver: u64) -> bool {
123    max_deliver > 0 && event.num_delivered >= max_deliver
124}
125
126
127impl DeadLetterEvent {
128    /// Create a new dead letter event
129    pub fn new(event: ReceivedEvent, reason: impl Into<String>) -> Self {
130        Self {
131            event,
132            reason: reason.into(),
133            dead_lettered_at: now_millis(),
134            original_subject: None,
135            delivery_attempts: None,
136            first_failure_at: None,
137        }
138    }
139
140    /// Set the original subject
141    pub fn with_original_subject(mut self, subject: impl Into<String>) -> Self {
142        self.original_subject = Some(subject.into());
143        self
144    }
145
146    /// Set the number of delivery attempts
147    pub fn with_delivery_attempts(mut self, attempts: u64) -> Self {
148        self.delivery_attempts = Some(attempts);
149        self
150    }
151
152    /// Set the timestamp of the first failure
153    pub fn with_first_failure_at(mut self, timestamp: u64) -> Self {
154        self.first_failure_at = Some(timestamp);
155        self
156    }
157}
158
159/// DLQ handler that forwards dead-lettered events to an EventSink
160///
161/// Wraps a dead-lettered event as a new `Event` with DLQ metadata
162/// and delivers it through the configured sink. Useful for routing
163/// failed events to external systems (logging, alerting, reprocessing).
164#[cfg(feature = "routing")]
165pub struct SinkDlqHandler {
166    sink: Arc<dyn EventSink>,
167    events: Arc<RwLock<Vec<DeadLetterEvent>>>,
168    max_events: usize,
169}
170
171#[cfg(feature = "routing")]
172impl SinkDlqHandler {
173    /// Create a new sink-based DLQ handler
174    pub fn new(sink: Arc<dyn EventSink>, max_events: usize) -> Self {
175        Self {
176            sink,
177            events: Arc::new(RwLock::new(Vec::new())),
178            max_events,
179        }
180    }
181
182    /// Convert a dead-lettered event into a regular Event with DLQ metadata
183    fn to_dlq_event(dle: &DeadLetterEvent) -> Event {
184        let mut event = Event::typed(
185            format!("events.dlq.{}", dle.event.event.subject),
186            "dlq",
187            "a3s.dlq.dead_letter",
188            1,
189            format!("Dead letter: {}", dle.reason),
190            "dlq-handler",
191            dle.event.event.payload.clone(),
192        )
193        .with_metadata("dlq_reason", &dle.reason)
194        .with_metadata("dlq_original_id", &dle.event.event.id)
195        .with_metadata(
196            "dlq_dead_lettered_at",
197            dle.dead_lettered_at.to_string(),
198        );
199
200        if let Some(ref subj) = dle.original_subject {
201            event = event.with_metadata("dlq_original_subject", subj);
202        }
203        if let Some(attempts) = dle.delivery_attempts {
204            event = event.with_metadata("dlq_delivery_attempts", attempts.to_string());
205        }
206        if let Some(first_fail) = dle.first_failure_at {
207            event = event.with_metadata("dlq_first_failure_at", first_fail.to_string());
208        }
209
210        event
211    }
212}
213
214#[cfg(feature = "routing")]
215#[async_trait]
216impl DlqHandler for SinkDlqHandler {
217    async fn handle(&self, event: DeadLetterEvent) -> Result<()> {
218        // Forward to sink as a regular event
219        let dlq_event = Self::to_dlq_event(&event);
220        self.sink.deliver(&dlq_event).await?;
221
222        // Also store locally for listing
223        let mut events = self.events.write().await;
224        events.push(event);
225
226        if self.max_events > 0 && events.len() > self.max_events {
227            let drain_count = events.len() - self.max_events;
228            events.drain(..drain_count);
229        }
230
231        Ok(())
232    }
233
234    async fn count(&self) -> Result<usize> {
235        let events = self.events.read().await;
236        Ok(events.len())
237    }
238
239    async fn list(&self, limit: usize) -> Result<Vec<DeadLetterEvent>> {
240        let events = self.events.read().await;
241        let result: Vec<DeadLetterEvent> = events.iter().rev().take(limit).cloned().collect();
242        Ok(result)
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249
250    fn test_received_event(num_delivered: u64) -> ReceivedEvent {
251        ReceivedEvent {
252            event: Event::new(
253                "events.test.a",
254                "test",
255                "Test event",
256                "test",
257                serde_json::json!({}),
258            ),
259            sequence: 1,
260            num_delivered,
261            stream: "test".to_string(),
262        }
263    }
264
265    #[test]
266    fn test_should_dead_letter() {
267        assert!(!should_dead_letter(&test_received_event(1), 5));
268        assert!(!should_dead_letter(&test_received_event(4), 5));
269        assert!(should_dead_letter(&test_received_event(5), 5));
270        assert!(should_dead_letter(&test_received_event(10), 5));
271    }
272
273    #[test]
274    fn test_should_dead_letter_zero_max() {
275        // max_deliver=0 means unlimited
276        assert!(!should_dead_letter(&test_received_event(100), 0));
277    }
278
279    #[test]
280    fn test_dead_letter_event_creation() {
281        let received = test_received_event(5);
282        let dle = DeadLetterEvent::new(received.clone(), "Max retries exceeded");
283        assert_eq!(dle.reason, "Max retries exceeded");
284        assert_eq!(dle.event.event.id, received.event.id);
285        assert!(dle.dead_lettered_at > 0);
286    }
287
288    #[tokio::test]
289    async fn test_memory_dlq_handle_and_count() {
290        let dlq = MemoryDlqHandler::default();
291        assert_eq!(dlq.count().await.unwrap(), 0);
292
293        let dle = DeadLetterEvent::new(test_received_event(5), "failed");
294        dlq.handle(dle).await.unwrap();
295
296        assert_eq!(dlq.count().await.unwrap(), 1);
297    }
298
299    #[tokio::test]
300    async fn test_memory_dlq_list() {
301        let dlq = MemoryDlqHandler::default();
302
303        for i in 0..5 {
304            let mut received = test_received_event(3);
305            received.sequence = i;
306            let dle = DeadLetterEvent::new(received, format!("reason {}", i));
307            dlq.handle(dle).await.unwrap();
308        }
309
310        let list = dlq.list(3).await.unwrap();
311        assert_eq!(list.len(), 3);
312        // Most recent first
313        assert_eq!(list[0].reason, "reason 4");
314        assert_eq!(list[2].reason, "reason 2");
315    }
316
317    #[tokio::test]
318    async fn test_memory_dlq_max_capacity() {
319        let dlq = MemoryDlqHandler::new(3);
320
321        for i in 0..5 {
322            let dle = DeadLetterEvent::new(test_received_event(1), format!("reason {}", i));
323            dlq.handle(dle).await.unwrap();
324        }
325
326        assert_eq!(dlq.count().await.unwrap(), 3);
327        let list = dlq.list(10).await.unwrap();
328        // Oldest events drained
329        assert_eq!(list[0].reason, "reason 4");
330        assert_eq!(list[2].reason, "reason 2");
331    }
332
333    #[test]
334    fn test_dead_letter_event_builder_methods() {
335        let received = test_received_event(5);
336        let dle = DeadLetterEvent::new(received, "timeout")
337            .with_original_subject("events.payment.process")
338            .with_delivery_attempts(5)
339            .with_first_failure_at(1700000000000);
340
341        assert_eq!(dle.original_subject.as_deref(), Some("events.payment.process"));
342        assert_eq!(dle.delivery_attempts, Some(5));
343        assert_eq!(dle.first_failure_at, Some(1700000000000));
344    }
345
346    #[test]
347    fn test_dead_letter_event_optional_fields_default_none() {
348        let received = test_received_event(3);
349        let dle = DeadLetterEvent::new(received, "failed");
350
351        assert!(dle.original_subject.is_none());
352        assert!(dle.delivery_attempts.is_none());
353        assert!(dle.first_failure_at.is_none());
354    }
355
356    #[cfg(feature = "routing")]
357    #[tokio::test]
358    async fn test_sink_dlq_handler() {
359        use crate::sink::CollectorSink;
360
361        let collector = Arc::new(CollectorSink::new("dlq-collector"));
362        let dlq = SinkDlqHandler::new(collector.clone(), 100);
363
364        let received = test_received_event(5);
365        let dle = DeadLetterEvent::new(received, "processing error")
366            .with_original_subject("events.order.process")
367            .with_delivery_attempts(5);
368
369        dlq.handle(dle).await.unwrap();
370
371        assert_eq!(dlq.count().await.unwrap(), 1);
372
373        // Verify event was forwarded to sink
374        let events = collector.events().await;
375        assert_eq!(events.len(), 1);
376        assert_eq!(events[0].event_type, "a3s.dlq.dead_letter");
377        assert_eq!(events[0].category, "dlq");
378        assert_eq!(events[0].metadata["dlq_reason"], "processing error");
379        assert_eq!(
380            events[0].metadata["dlq_original_subject"],
381            "events.order.process"
382        );
383        assert_eq!(events[0].metadata["dlq_delivery_attempts"], "5");
384    }
385
386    #[cfg(feature = "routing")]
387    #[tokio::test]
388    async fn test_sink_dlq_handler_list() {
389        use crate::sink::CollectorSink;
390
391        let collector = Arc::new(CollectorSink::new("dlq-collector"));
392        let dlq = SinkDlqHandler::new(collector, 100);
393
394        for i in 0..3 {
395            let dle = DeadLetterEvent::new(
396                test_received_event(1),
397                format!("error {}", i),
398            );
399            dlq.handle(dle).await.unwrap();
400        }
401
402        assert_eq!(dlq.count().await.unwrap(), 3);
403        let list = dlq.list(2).await.unwrap();
404        assert_eq!(list.len(), 2);
405        assert_eq!(list[0].reason, "error 2");
406    }
407
408    #[cfg(feature = "routing")]
409    #[tokio::test]
410    async fn test_sink_dlq_handler_max_capacity() {
411        use crate::sink::CollectorSink;
412
413        let collector = Arc::new(CollectorSink::new("dlq-collector"));
414        let dlq = SinkDlqHandler::new(collector, 2);
415
416        for i in 0..5 {
417            let dle = DeadLetterEvent::new(
418                test_received_event(1),
419                format!("error {}", i),
420            );
421            dlq.handle(dle).await.unwrap();
422        }
423
424        assert_eq!(dlq.count().await.unwrap(), 2);
425    }
426}