Skip to main content

oxirs_stream/
dead_letter_queue.rs

1//! Dead Letter Queue (DLQ) for failed stream messages.
2//!
3//! Provides a bounded, time-aware queue for messages that have exceeded retry
4//! limits or failed validation.  Supports FIFO pop, topic-based lookup, replay,
5//! and reason grouping.
6
7use std::collections::{HashMap, VecDeque};
8
9// ──────────────────────────────────────────────────────────────────────────────
10// Public types
11// ──────────────────────────────────────────────────────────────────────────────
12
13/// Why a message ended up in the dead letter queue.
14#[derive(Debug, Clone, PartialEq, Eq, Hash)]
15pub enum FailureReason {
16    MaxRetriesExceeded,
17    Timeout,
18    ProcessingError(String),
19    SchemaValidationFailed,
20    PoisonMessage,
21}
22
23impl FailureReason {
24    /// A short string label used for grouping.
25    fn label(&self) -> String {
26        match self {
27            FailureReason::MaxRetriesExceeded => "MaxRetriesExceeded".to_string(),
28            FailureReason::Timeout => "Timeout".to_string(),
29            FailureReason::ProcessingError(msg) => format!("ProcessingError({})", msg),
30            FailureReason::SchemaValidationFailed => "SchemaValidationFailed".to_string(),
31            FailureReason::PoisonMessage => "PoisonMessage".to_string(),
32        }
33    }
34}
35
36/// A single dead-lettered message.
37#[derive(Debug, Clone)]
38pub struct DeadLetter {
39    pub message_id: String,
40    pub payload: Vec<u8>,
41    pub original_topic: String,
42    pub failure_reason: FailureReason,
43    pub retry_count: usize,
44    pub first_failed_at: u64,
45    pub last_failed_at: u64,
46    pub metadata: HashMap<String, String>,
47}
48
49/// Configuration for the [`DeadLetterQueue`].
50#[derive(Debug, Clone)]
51pub struct DlqConfig {
52    /// Maximum number of entries the queue may hold.
53    pub max_size: usize,
54    /// Maximum age of an entry in milliseconds; older entries are purged.
55    pub max_age_ms: u64,
56    /// Whether the queue allows replaying (removing + returning) entries.
57    pub enable_replay: bool,
58}
59
60impl Default for DlqConfig {
61    fn default() -> Self {
62        Self {
63            max_size: 10_000,
64            max_age_ms: 7 * 24 * 60 * 60 * 1_000, // 7 days
65            enable_replay: true,
66        }
67    }
68}
69
70/// Errors returned by [`DeadLetterQueue`] operations.
71#[derive(Debug)]
72pub enum DlqError {
73    QueueFull,
74    ReplayDisabled,
75}
76
77impl std::fmt::Display for DlqError {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        match self {
80            DlqError::QueueFull => write!(f, "dead letter queue is full"),
81            DlqError::ReplayDisabled => write!(f, "replay is disabled for this queue"),
82        }
83    }
84}
85
86impl std::error::Error for DlqError {}
87
88/// A bounded dead letter queue.
89pub struct DeadLetterQueue {
90    config: DlqConfig,
91    letters: VecDeque<DeadLetter>,
92    total_received: u64,
93}
94
95impl DeadLetterQueue {
96    /// Create a new queue with the given configuration.
97    pub fn new(config: DlqConfig) -> Self {
98        let capacity = config.max_size;
99        Self {
100            config,
101            letters: VecDeque::with_capacity(capacity),
102            total_received: 0,
103        }
104    }
105
106    /// Push a dead letter into the queue.
107    ///
108    /// Returns [`DlqError::QueueFull`] if the queue has reached `max_size`.
109    pub fn push(&mut self, letter: DeadLetter) -> Result<(), DlqError> {
110        if self.letters.len() >= self.config.max_size {
111            return Err(DlqError::QueueFull);
112        }
113        self.letters.push_back(letter);
114        self.total_received += 1;
115        Ok(())
116    }
117
118    /// Remove and return the oldest dead letter (FIFO).
119    pub fn pop(&mut self) -> Option<DeadLetter> {
120        self.letters.pop_front()
121    }
122
123    /// Peek at the oldest dead letter without removing it.
124    pub fn peek(&self) -> Option<&DeadLetter> {
125        self.letters.front()
126    }
127
128    /// Number of entries currently in the queue.
129    pub fn len(&self) -> usize {
130        self.letters.len()
131    }
132
133    /// Returns `true` when the queue is empty.
134    pub fn is_empty(&self) -> bool {
135        self.letters.is_empty()
136    }
137
138    /// Total number of dead letters ever pushed into this queue (including
139    /// ones that were later popped or purged).
140    pub fn total_received(&self) -> u64 {
141        self.total_received
142    }
143
144    /// Remove entries whose `last_failed_at` is older than `max_age_ms`
145    /// relative to `current_time_ms`.
146    ///
147    /// Returns the number of entries removed.
148    pub fn purge_expired(&mut self, current_time_ms: u64) -> usize {
149        let cutoff = current_time_ms.saturating_sub(self.config.max_age_ms);
150        let before = self.letters.len();
151        self.letters.retain(|l| l.last_failed_at >= cutoff);
152        before - self.letters.len()
153    }
154
155    /// Return references to all entries whose `original_topic` equals `topic`.
156    pub fn find_by_topic(&self, topic: &str) -> Vec<&DeadLetter> {
157        self.letters
158            .iter()
159            .filter(|l| l.original_topic == topic)
160            .collect()
161    }
162
163    /// If `enable_replay` is true, remove the entry with the given
164    /// `message_id` and return it for re-processing.
165    ///
166    /// Returns `None` if no matching entry exists.
167    /// Returns `Err(DlqError::ReplayDisabled)` if replay is turned off.
168    pub fn replay(&mut self, message_id: &str) -> Result<Option<DeadLetter>, DlqError> {
169        if !self.config.enable_replay {
170            return Err(DlqError::ReplayDisabled);
171        }
172        if let Some(pos) = self.letters.iter().position(|l| l.message_id == message_id) {
173            Ok(self.letters.remove(pos))
174        } else {
175            Ok(None)
176        }
177    }
178
179    /// Group entries by their failure reason label and return a map of
180    /// `label → count`.
181    pub fn group_by_reason(&self) -> HashMap<String, usize> {
182        let mut map: HashMap<String, usize> = HashMap::new();
183        for letter in &self.letters {
184            *map.entry(letter.failure_reason.label()).or_insert(0) += 1;
185        }
186        map
187    }
188}
189
190// ──────────────────────────────────────────────────────────────────────────────
191// Tests
192// ──────────────────────────────────────────────────────────────────────────────
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197
198    fn make_letter(id: &str, topic: &str, reason: FailureReason, last_ms: u64) -> DeadLetter {
199        DeadLetter {
200            message_id: id.to_string(),
201            payload: id.as_bytes().to_vec(),
202            original_topic: topic.to_string(),
203            failure_reason: reason,
204            retry_count: 1,
205            first_failed_at: last_ms.saturating_sub(100),
206            last_failed_at: last_ms,
207            metadata: HashMap::new(),
208        }
209    }
210
211    fn default_queue() -> DeadLetterQueue {
212        DeadLetterQueue::new(DlqConfig {
213            max_size: 5,
214            max_age_ms: 1_000,
215            enable_replay: true,
216        })
217    }
218
219    // ── push / pop ───────────────────────────────────────────────────────────
220
221    #[test]
222    fn test_push_and_pop_single() {
223        let mut q = default_queue();
224        let letter = make_letter("m1", "t1", FailureReason::Timeout, 1000);
225        q.push(letter).expect("push should succeed");
226        assert_eq!(q.len(), 1);
227        let popped = q.pop().expect("should have an element");
228        assert_eq!(popped.message_id, "m1");
229        assert!(q.is_empty());
230    }
231
232    #[test]
233    fn test_push_pop_fifo_order() {
234        let mut q = default_queue();
235        for i in 0..3u8 {
236            q.push(make_letter(
237                &format!("m{i}"),
238                "t",
239                FailureReason::MaxRetriesExceeded,
240                1000,
241            ))
242            .unwrap();
243        }
244        for i in 0..3u8 {
245            let popped = q.pop().unwrap();
246            assert_eq!(popped.message_id, format!("m{i}"));
247        }
248    }
249
250    #[test]
251    fn test_pop_empty_returns_none() {
252        let mut q = default_queue();
253        assert!(q.pop().is_none());
254    }
255
256    // ── max_size enforcement ─────────────────────────────────────────────────
257
258    #[test]
259    fn test_max_size_enforced() {
260        let mut q = default_queue(); // max_size = 5
261        for i in 0..5u8 {
262            q.push(make_letter(
263                &format!("m{i}"),
264                "t",
265                FailureReason::Timeout,
266                1000,
267            ))
268            .unwrap();
269        }
270        let result = q.push(make_letter("overflow", "t", FailureReason::Timeout, 1000));
271        assert!(matches!(result, Err(DlqError::QueueFull)));
272    }
273
274    #[test]
275    fn test_queue_full_error_display() {
276        let err = DlqError::QueueFull;
277        assert!(!err.to_string().is_empty());
278    }
279
280    #[test]
281    fn test_queue_full_after_fill() {
282        let mut q = DeadLetterQueue::new(DlqConfig {
283            max_size: 2,
284            ..DlqConfig::default()
285        });
286        q.push(make_letter("a", "t", FailureReason::Timeout, 0))
287            .unwrap();
288        q.push(make_letter("b", "t", FailureReason::Timeout, 0))
289            .unwrap();
290        assert!(matches!(
291            q.push(make_letter("c", "t", FailureReason::Timeout, 0)),
292            Err(DlqError::QueueFull)
293        ));
294    }
295
296    // ── purge_expired ────────────────────────────────────────────────────────
297
298    #[test]
299    fn test_purge_expired_removes_old_entries() {
300        let mut q = default_queue(); // max_age_ms = 1_000
301                                     // old entry: last_failed_at = 0  (age = 2000 > 1000)
302        q.push(make_letter("old", "t", FailureReason::Timeout, 0))
303            .unwrap();
304        // fresh entry: last_failed_at = 1500 (age = 500 < 1000)
305        q.push(make_letter("new", "t", FailureReason::Timeout, 1500))
306            .unwrap();
307
308        let removed = q.purge_expired(2000);
309        assert_eq!(removed, 1);
310        assert_eq!(q.len(), 1);
311        assert_eq!(q.peek().unwrap().message_id, "new");
312    }
313
314    #[test]
315    fn test_purge_expired_keeps_all_if_none_old() {
316        let mut q = default_queue();
317        q.push(make_letter("m1", "t", FailureReason::Timeout, 900))
318            .unwrap();
319        q.push(make_letter("m2", "t", FailureReason::Timeout, 950))
320            .unwrap();
321        // current_time = 1000, max_age = 1000 → cutoff = 0, nothing too old
322        let removed = q.purge_expired(1000);
323        assert_eq!(removed, 0);
324        assert_eq!(q.len(), 2);
325    }
326
327    #[test]
328    fn test_purge_expired_removes_all() {
329        let mut q = default_queue();
330        for i in 0..3u64 {
331            q.push(make_letter(
332                &format!("m{i}"),
333                "t",
334                FailureReason::Timeout,
335                i * 10,
336            ))
337            .unwrap();
338        }
339        // very large current time → everything is old
340        let removed = q.purge_expired(u64::MAX);
341        assert_eq!(removed, 3);
342        assert!(q.is_empty());
343    }
344
345    // ── peek ─────────────────────────────────────────────────────────────────
346
347    #[test]
348    fn test_peek_does_not_remove() {
349        let mut q = default_queue();
350        q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
351            .unwrap();
352        let peeked = q.peek().unwrap();
353        assert_eq!(peeked.message_id, "m1");
354        assert_eq!(q.len(), 1);
355    }
356
357    #[test]
358    fn test_peek_empty_returns_none() {
359        let q = default_queue();
360        assert!(q.peek().is_none());
361    }
362
363    // ── find_by_topic ────────────────────────────────────────────────────────
364
365    #[test]
366    fn test_find_by_topic_returns_matching() {
367        let mut q = default_queue();
368        q.push(make_letter("a", "topic-A", FailureReason::Timeout, 0))
369            .unwrap();
370        q.push(make_letter("b", "topic-B", FailureReason::Timeout, 0))
371            .unwrap();
372        q.push(make_letter("c", "topic-A", FailureReason::Timeout, 0))
373            .unwrap();
374
375        let found = q.find_by_topic("topic-A");
376        assert_eq!(found.len(), 2);
377        assert!(found.iter().all(|l| l.original_topic == "topic-A"));
378    }
379
380    #[test]
381    fn test_find_by_topic_none_matching() {
382        let mut q = default_queue();
383        q.push(make_letter("a", "topic-A", FailureReason::Timeout, 0))
384            .unwrap();
385        let found = q.find_by_topic("topic-Z");
386        assert!(found.is_empty());
387    }
388
389    #[test]
390    fn test_find_by_topic_empty_queue() {
391        let q = default_queue();
392        assert!(q.find_by_topic("anything").is_empty());
393    }
394
395    // ── replay ───────────────────────────────────────────────────────────────
396
397    #[test]
398    fn test_replay_removes_and_returns_entry() {
399        let mut q = default_queue();
400        q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
401            .unwrap();
402        q.push(make_letter("m2", "t", FailureReason::Timeout, 0))
403            .unwrap();
404
405        let replayed = q.replay("m1").unwrap().unwrap();
406        assert_eq!(replayed.message_id, "m1");
407        assert_eq!(q.len(), 1);
408        assert_eq!(q.pop().unwrap().message_id, "m2");
409    }
410
411    #[test]
412    fn test_replay_missing_id_returns_none() {
413        let mut q = default_queue();
414        q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
415            .unwrap();
416        let result = q.replay("missing").unwrap();
417        assert!(result.is_none());
418        assert_eq!(q.len(), 1);
419    }
420
421    #[test]
422    fn test_replay_disabled_returns_error() {
423        let mut q = DeadLetterQueue::new(DlqConfig {
424            max_size: 10,
425            max_age_ms: 1_000,
426            enable_replay: false,
427        });
428        q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
429            .unwrap();
430        assert!(matches!(q.replay("m1"), Err(DlqError::ReplayDisabled)));
431    }
432
433    #[test]
434    fn test_replay_disabled_error_display() {
435        let err = DlqError::ReplayDisabled;
436        assert!(!err.to_string().is_empty());
437    }
438
439    // ── group_by_reason ──────────────────────────────────────────────────────
440
441    #[test]
442    fn test_group_by_reason_counts() {
443        let mut q = default_queue();
444        q.push(make_letter("a", "t", FailureReason::Timeout, 0))
445            .unwrap();
446        q.push(make_letter("b", "t", FailureReason::Timeout, 0))
447            .unwrap();
448        q.push(make_letter("c", "t", FailureReason::MaxRetriesExceeded, 0))
449            .unwrap();
450
451        let groups = q.group_by_reason();
452        assert_eq!(groups["Timeout"], 2);
453        assert_eq!(groups["MaxRetriesExceeded"], 1);
454    }
455
456    #[test]
457    fn test_group_by_reason_empty_queue() {
458        let q = default_queue();
459        assert!(q.group_by_reason().is_empty());
460    }
461
462    #[test]
463    fn test_group_by_reason_processing_error() {
464        let mut q = default_queue();
465        q.push(make_letter(
466            "e",
467            "t",
468            FailureReason::ProcessingError("oops".to_string()),
469            0,
470        ))
471        .unwrap();
472        let groups = q.group_by_reason();
473        assert_eq!(groups.len(), 1);
474        assert!(groups.keys().any(|k| k.contains("ProcessingError")));
475    }
476
477    // ── total_received ───────────────────────────────────────────────────────
478
479    #[test]
480    fn test_total_received_increments_on_push() {
481        let mut q = default_queue();
482        assert_eq!(q.total_received(), 0);
483        q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
484            .unwrap();
485        assert_eq!(q.total_received(), 1);
486        q.push(make_letter("m2", "t", FailureReason::Timeout, 0))
487            .unwrap();
488        assert_eq!(q.total_received(), 2);
489    }
490
491    #[test]
492    fn test_total_received_does_not_decrement_on_pop() {
493        let mut q = default_queue();
494        q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
495            .unwrap();
496        q.pop();
497        assert_eq!(q.total_received(), 1);
498    }
499
500    #[test]
501    fn test_total_received_not_incremented_on_full_push() {
502        let mut q = DeadLetterQueue::new(DlqConfig {
503            max_size: 1,
504            ..DlqConfig::default()
505        });
506        q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
507            .unwrap();
508        // This should fail — total_received must not increase
509        let _ = q.push(make_letter("m2", "t", FailureReason::Timeout, 0));
510        assert_eq!(q.total_received(), 1);
511    }
512
513    // ── FailureReason variants ───────────────────────────────────────────────
514
515    #[test]
516    fn test_failure_reason_max_retries() {
517        let reason = FailureReason::MaxRetriesExceeded;
518        assert_eq!(reason.label(), "MaxRetriesExceeded");
519    }
520
521    #[test]
522    fn test_failure_reason_timeout() {
523        let reason = FailureReason::Timeout;
524        assert_eq!(reason.label(), "Timeout");
525    }
526
527    #[test]
528    fn test_failure_reason_processing_error() {
529        let reason = FailureReason::ProcessingError("bad data".to_string());
530        assert!(reason.label().contains("ProcessingError"));
531        assert!(reason.label().contains("bad data"));
532    }
533
534    #[test]
535    fn test_failure_reason_schema_validation_failed() {
536        let reason = FailureReason::SchemaValidationFailed;
537        assert_eq!(reason.label(), "SchemaValidationFailed");
538    }
539
540    #[test]
541    fn test_failure_reason_poison_message() {
542        let reason = FailureReason::PoisonMessage;
543        assert_eq!(reason.label(), "PoisonMessage");
544    }
545
546    // ── misc / edge cases ────────────────────────────────────────────────────
547
548    #[test]
549    fn test_is_empty_initial() {
550        let q = default_queue();
551        assert!(q.is_empty());
552        assert_eq!(q.len(), 0);
553    }
554
555    #[test]
556    fn test_len_after_push_pop() {
557        let mut q = default_queue();
558        q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
559            .unwrap();
560        assert_eq!(q.len(), 1);
561        q.pop();
562        assert_eq!(q.len(), 0);
563    }
564
565    #[test]
566    fn test_metadata_preserved() {
567        let mut q = default_queue();
568        let mut meta = HashMap::new();
569        meta.insert("env".to_string(), "prod".to_string());
570        let letter = DeadLetter {
571            message_id: "m".to_string(),
572            payload: b"data".to_vec(),
573            original_topic: "topic".to_string(),
574            failure_reason: FailureReason::Timeout,
575            retry_count: 3,
576            first_failed_at: 100,
577            last_failed_at: 200,
578            metadata: meta.clone(),
579        };
580        q.push(letter).unwrap();
581        let popped = q.pop().unwrap();
582        assert_eq!(popped.metadata["env"], "prod");
583    }
584
585    #[test]
586    fn test_payload_preserved() {
587        let mut q = default_queue();
588        let payload = b"hello world".to_vec();
589        let letter = DeadLetter {
590            message_id: "p".to_string(),
591            payload: payload.clone(),
592            original_topic: "t".to_string(),
593            failure_reason: FailureReason::PoisonMessage,
594            retry_count: 0,
595            first_failed_at: 0,
596            last_failed_at: 0,
597            metadata: HashMap::new(),
598        };
599        q.push(letter).unwrap();
600        assert_eq!(q.pop().unwrap().payload, payload);
601    }
602
603    #[test]
604    fn test_retry_count_preserved() {
605        let mut q = default_queue();
606        let mut letter = make_letter("m", "t", FailureReason::Timeout, 0);
607        letter.retry_count = 42;
608        q.push(letter).unwrap();
609        assert_eq!(q.pop().unwrap().retry_count, 42);
610    }
611
612    #[test]
613    fn test_purge_then_push_succeeds() {
614        let mut q = DeadLetterQueue::new(DlqConfig {
615            max_size: 2,
616            max_age_ms: 100,
617            enable_replay: true,
618        });
619        q.push(make_letter("old", "t", FailureReason::Timeout, 0))
620            .unwrap();
621        q.push(make_letter("old2", "t", FailureReason::Timeout, 0))
622            .unwrap();
623        // Queue full — purge old entries
624        q.purge_expired(200);
625        // Now there is room
626        q.push(make_letter("new", "t", FailureReason::Timeout, 150))
627            .unwrap();
628        assert_eq!(q.len(), 1);
629    }
630
631    #[test]
632    fn test_replay_middle_element() {
633        let mut q = default_queue();
634        q.push(make_letter("a", "t", FailureReason::Timeout, 0))
635            .unwrap();
636        q.push(make_letter("b", "t", FailureReason::Timeout, 0))
637            .unwrap();
638        q.push(make_letter("c", "t", FailureReason::Timeout, 0))
639            .unwrap();
640
641        let replayed = q.replay("b").unwrap().unwrap();
642        assert_eq!(replayed.message_id, "b");
643        assert_eq!(q.len(), 2);
644        // Remaining should be a, c in order
645        assert_eq!(q.pop().unwrap().message_id, "a");
646        assert_eq!(q.pop().unwrap().message_id, "c");
647    }
648
649    #[test]
650    fn test_group_by_reason_all_variants() {
651        let mut q = DeadLetterQueue::new(DlqConfig {
652            max_size: 10,
653            ..DlqConfig::default()
654        });
655        q.push(make_letter("a", "t", FailureReason::MaxRetriesExceeded, 0))
656            .unwrap();
657        q.push(make_letter("b", "t", FailureReason::Timeout, 0))
658            .unwrap();
659        q.push(make_letter(
660            "c",
661            "t",
662            FailureReason::ProcessingError("e".into()),
663            0,
664        ))
665        .unwrap();
666        q.push(make_letter(
667            "d",
668            "t",
669            FailureReason::SchemaValidationFailed,
670            0,
671        ))
672        .unwrap();
673        q.push(make_letter("e", "t", FailureReason::PoisonMessage, 0))
674            .unwrap();
675
676        let groups = q.group_by_reason();
677        assert_eq!(groups.len(), 5);
678        assert_eq!(groups["MaxRetriesExceeded"], 1);
679        assert_eq!(groups["Timeout"], 1);
680        assert_eq!(groups["SchemaValidationFailed"], 1);
681        assert_eq!(groups["PoisonMessage"], 1);
682    }
683
684    #[test]
685    fn test_find_by_topic_all_same_topic() {
686        let mut q = default_queue();
687        for i in 0..4u8 {
688            q.push(make_letter(
689                &format!("m{i}"),
690                "same-topic",
691                FailureReason::Timeout,
692                0,
693            ))
694            .unwrap();
695        }
696        assert_eq!(q.find_by_topic("same-topic").len(), 4);
697    }
698
699    #[test]
700    fn test_failure_reason_equality() {
701        assert_eq!(FailureReason::Timeout, FailureReason::Timeout);
702        assert_ne!(FailureReason::Timeout, FailureReason::PoisonMessage);
703        assert_eq!(
704            FailureReason::ProcessingError("x".into()),
705            FailureReason::ProcessingError("x".into())
706        );
707        assert_ne!(
708            FailureReason::ProcessingError("x".into()),
709            FailureReason::ProcessingError("y".into())
710        );
711    }
712
713    #[test]
714    fn test_dlq_default_config() {
715        let cfg = DlqConfig::default();
716        assert!(cfg.max_size > 0);
717        assert!(cfg.max_age_ms > 0);
718        assert!(cfg.enable_replay);
719    }
720
721    #[test]
722    fn test_push_after_pop_succeeds_on_full_queue() {
723        let mut q = DeadLetterQueue::new(DlqConfig {
724            max_size: 1,
725            ..DlqConfig::default()
726        });
727        q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
728            .unwrap();
729        // Queue is full - pop to make room
730        q.pop();
731        q.push(make_letter("m2", "t", FailureReason::Timeout, 0))
732            .unwrap();
733        assert_eq!(q.len(), 1);
734    }
735
736    #[test]
737    fn test_total_received_after_replay() {
738        let mut q = default_queue();
739        q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
740            .unwrap();
741        let _ = q.replay("m1").unwrap();
742        // total_received should not decrement on replay
743        assert_eq!(q.total_received(), 1);
744    }
745
746    #[test]
747    fn test_group_by_reason_two_of_same() {
748        let mut q = default_queue();
749        q.push(make_letter("a", "t", FailureReason::PoisonMessage, 0))
750            .unwrap();
751        q.push(make_letter("b", "t", FailureReason::PoisonMessage, 0))
752            .unwrap();
753        let groups = q.group_by_reason();
754        assert_eq!(groups.get("PoisonMessage").copied().unwrap_or(0), 2);
755    }
756
757    #[test]
758    fn test_find_by_topic_does_not_consume() {
759        let mut q = default_queue();
760        q.push(make_letter("m1", "topic-X", FailureReason::Timeout, 0))
761            .unwrap();
762        let found = q.find_by_topic("topic-X");
763        assert_eq!(found.len(), 1);
764        // Queue still has the item
765        assert_eq!(q.len(), 1);
766    }
767
768    #[test]
769    fn test_first_failed_at_preserved() {
770        let mut q = default_queue();
771        let mut letter = make_letter("m", "t", FailureReason::Timeout, 500);
772        letter.first_failed_at = 100;
773        q.push(letter).unwrap();
774        let popped = q.pop().unwrap();
775        assert_eq!(popped.first_failed_at, 100);
776        assert_eq!(popped.last_failed_at, 500);
777    }
778
779    #[test]
780    fn test_replay_on_empty_queue_returns_none() {
781        let mut q = default_queue();
782        let result = q.replay("nonexistent").unwrap();
783        assert!(result.is_none());
784    }
785
786    #[test]
787    fn test_purge_returns_correct_count() {
788        let mut q = DeadLetterQueue::new(DlqConfig {
789            max_size: 10,
790            max_age_ms: 100,
791            enable_replay: true,
792        });
793        for i in 0..5u64 {
794            // All very old
795            q.push(make_letter(
796                &format!("old{i}"),
797                "t",
798                FailureReason::Timeout,
799                i,
800            ))
801            .unwrap();
802        }
803        for i in 0..3u64 {
804            // These are fresh (within max_age_ms=100)
805            q.push(make_letter(
806                &format!("new{i}"),
807                "t",
808                FailureReason::Timeout,
809                950 + i,
810            ))
811            .unwrap();
812        }
813        let removed = q.purge_expired(1000);
814        assert_eq!(removed, 5);
815        assert_eq!(q.len(), 3);
816    }
817}