1use std::collections::{HashMap, VecDeque};
8
9#[derive(Debug, Clone, PartialEq, Eq, Hash)]
15pub enum FailureReason {
16 MaxRetriesExceeded,
17 Timeout,
18 ProcessingError(String),
19 SchemaValidationFailed,
20 PoisonMessage,
21}
22
23impl FailureReason {
24 fn label(&self) -> String {
26 match self {
27 FailureReason::MaxRetriesExceeded => "MaxRetriesExceeded".to_string(),
28 FailureReason::Timeout => "Timeout".to_string(),
29 FailureReason::ProcessingError(msg) => format!("ProcessingError({})", msg),
30 FailureReason::SchemaValidationFailed => "SchemaValidationFailed".to_string(),
31 FailureReason::PoisonMessage => "PoisonMessage".to_string(),
32 }
33 }
34}
35
36#[derive(Debug, Clone)]
38pub struct DeadLetter {
39 pub message_id: String,
40 pub payload: Vec<u8>,
41 pub original_topic: String,
42 pub failure_reason: FailureReason,
43 pub retry_count: usize,
44 pub first_failed_at: u64,
45 pub last_failed_at: u64,
46 pub metadata: HashMap<String, String>,
47}
48
49#[derive(Debug, Clone)]
51pub struct DlqConfig {
52 pub max_size: usize,
54 pub max_age_ms: u64,
56 pub enable_replay: bool,
58}
59
60impl Default for DlqConfig {
61 fn default() -> Self {
62 Self {
63 max_size: 10_000,
64 max_age_ms: 7 * 24 * 60 * 60 * 1_000, enable_replay: true,
66 }
67 }
68}
69
70#[derive(Debug)]
72pub enum DlqError {
73 QueueFull,
74 ReplayDisabled,
75}
76
77impl std::fmt::Display for DlqError {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 match self {
80 DlqError::QueueFull => write!(f, "dead letter queue is full"),
81 DlqError::ReplayDisabled => write!(f, "replay is disabled for this queue"),
82 }
83 }
84}
85
86impl std::error::Error for DlqError {}
87
88pub struct DeadLetterQueue {
90 config: DlqConfig,
91 letters: VecDeque<DeadLetter>,
92 total_received: u64,
93}
94
95impl DeadLetterQueue {
96 pub fn new(config: DlqConfig) -> Self {
98 let capacity = config.max_size;
99 Self {
100 config,
101 letters: VecDeque::with_capacity(capacity),
102 total_received: 0,
103 }
104 }
105
106 pub fn push(&mut self, letter: DeadLetter) -> Result<(), DlqError> {
110 if self.letters.len() >= self.config.max_size {
111 return Err(DlqError::QueueFull);
112 }
113 self.letters.push_back(letter);
114 self.total_received += 1;
115 Ok(())
116 }
117
118 pub fn pop(&mut self) -> Option<DeadLetter> {
120 self.letters.pop_front()
121 }
122
123 pub fn peek(&self) -> Option<&DeadLetter> {
125 self.letters.front()
126 }
127
128 pub fn len(&self) -> usize {
130 self.letters.len()
131 }
132
133 pub fn is_empty(&self) -> bool {
135 self.letters.is_empty()
136 }
137
138 pub fn total_received(&self) -> u64 {
141 self.total_received
142 }
143
144 pub fn purge_expired(&mut self, current_time_ms: u64) -> usize {
149 let cutoff = current_time_ms.saturating_sub(self.config.max_age_ms);
150 let before = self.letters.len();
151 self.letters.retain(|l| l.last_failed_at >= cutoff);
152 before - self.letters.len()
153 }
154
155 pub fn find_by_topic(&self, topic: &str) -> Vec<&DeadLetter> {
157 self.letters
158 .iter()
159 .filter(|l| l.original_topic == topic)
160 .collect()
161 }
162
163 pub fn replay(&mut self, message_id: &str) -> Result<Option<DeadLetter>, DlqError> {
169 if !self.config.enable_replay {
170 return Err(DlqError::ReplayDisabled);
171 }
172 if let Some(pos) = self.letters.iter().position(|l| l.message_id == message_id) {
173 Ok(self.letters.remove(pos))
174 } else {
175 Ok(None)
176 }
177 }
178
179 pub fn group_by_reason(&self) -> HashMap<String, usize> {
182 let mut map: HashMap<String, usize> = HashMap::new();
183 for letter in &self.letters {
184 *map.entry(letter.failure_reason.label()).or_insert(0) += 1;
185 }
186 map
187 }
188}
189
190#[cfg(test)]
195mod tests {
196 use super::*;
197
198 fn make_letter(id: &str, topic: &str, reason: FailureReason, last_ms: u64) -> DeadLetter {
199 DeadLetter {
200 message_id: id.to_string(),
201 payload: id.as_bytes().to_vec(),
202 original_topic: topic.to_string(),
203 failure_reason: reason,
204 retry_count: 1,
205 first_failed_at: last_ms.saturating_sub(100),
206 last_failed_at: last_ms,
207 metadata: HashMap::new(),
208 }
209 }
210
211 fn default_queue() -> DeadLetterQueue {
212 DeadLetterQueue::new(DlqConfig {
213 max_size: 5,
214 max_age_ms: 1_000,
215 enable_replay: true,
216 })
217 }
218
219 #[test]
222 fn test_push_and_pop_single() {
223 let mut q = default_queue();
224 let letter = make_letter("m1", "t1", FailureReason::Timeout, 1000);
225 q.push(letter).expect("push should succeed");
226 assert_eq!(q.len(), 1);
227 let popped = q.pop().expect("should have an element");
228 assert_eq!(popped.message_id, "m1");
229 assert!(q.is_empty());
230 }
231
232 #[test]
233 fn test_push_pop_fifo_order() {
234 let mut q = default_queue();
235 for i in 0..3u8 {
236 q.push(make_letter(
237 &format!("m{i}"),
238 "t",
239 FailureReason::MaxRetriesExceeded,
240 1000,
241 ))
242 .unwrap();
243 }
244 for i in 0..3u8 {
245 let popped = q.pop().unwrap();
246 assert_eq!(popped.message_id, format!("m{i}"));
247 }
248 }
249
250 #[test]
251 fn test_pop_empty_returns_none() {
252 let mut q = default_queue();
253 assert!(q.pop().is_none());
254 }
255
256 #[test]
259 fn test_max_size_enforced() {
260 let mut q = default_queue(); for i in 0..5u8 {
262 q.push(make_letter(
263 &format!("m{i}"),
264 "t",
265 FailureReason::Timeout,
266 1000,
267 ))
268 .unwrap();
269 }
270 let result = q.push(make_letter("overflow", "t", FailureReason::Timeout, 1000));
271 assert!(matches!(result, Err(DlqError::QueueFull)));
272 }
273
274 #[test]
275 fn test_queue_full_error_display() {
276 let err = DlqError::QueueFull;
277 assert!(!err.to_string().is_empty());
278 }
279
280 #[test]
281 fn test_queue_full_after_fill() {
282 let mut q = DeadLetterQueue::new(DlqConfig {
283 max_size: 2,
284 ..DlqConfig::default()
285 });
286 q.push(make_letter("a", "t", FailureReason::Timeout, 0))
287 .unwrap();
288 q.push(make_letter("b", "t", FailureReason::Timeout, 0))
289 .unwrap();
290 assert!(matches!(
291 q.push(make_letter("c", "t", FailureReason::Timeout, 0)),
292 Err(DlqError::QueueFull)
293 ));
294 }
295
296 #[test]
299 fn test_purge_expired_removes_old_entries() {
300 let mut q = default_queue(); q.push(make_letter("old", "t", FailureReason::Timeout, 0))
303 .unwrap();
304 q.push(make_letter("new", "t", FailureReason::Timeout, 1500))
306 .unwrap();
307
308 let removed = q.purge_expired(2000);
309 assert_eq!(removed, 1);
310 assert_eq!(q.len(), 1);
311 assert_eq!(q.peek().unwrap().message_id, "new");
312 }
313
314 #[test]
315 fn test_purge_expired_keeps_all_if_none_old() {
316 let mut q = default_queue();
317 q.push(make_letter("m1", "t", FailureReason::Timeout, 900))
318 .unwrap();
319 q.push(make_letter("m2", "t", FailureReason::Timeout, 950))
320 .unwrap();
321 let removed = q.purge_expired(1000);
323 assert_eq!(removed, 0);
324 assert_eq!(q.len(), 2);
325 }
326
327 #[test]
328 fn test_purge_expired_removes_all() {
329 let mut q = default_queue();
330 for i in 0..3u64 {
331 q.push(make_letter(
332 &format!("m{i}"),
333 "t",
334 FailureReason::Timeout,
335 i * 10,
336 ))
337 .unwrap();
338 }
339 let removed = q.purge_expired(u64::MAX);
341 assert_eq!(removed, 3);
342 assert!(q.is_empty());
343 }
344
345 #[test]
348 fn test_peek_does_not_remove() {
349 let mut q = default_queue();
350 q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
351 .unwrap();
352 let peeked = q.peek().unwrap();
353 assert_eq!(peeked.message_id, "m1");
354 assert_eq!(q.len(), 1);
355 }
356
357 #[test]
358 fn test_peek_empty_returns_none() {
359 let q = default_queue();
360 assert!(q.peek().is_none());
361 }
362
363 #[test]
366 fn test_find_by_topic_returns_matching() {
367 let mut q = default_queue();
368 q.push(make_letter("a", "topic-A", FailureReason::Timeout, 0))
369 .unwrap();
370 q.push(make_letter("b", "topic-B", FailureReason::Timeout, 0))
371 .unwrap();
372 q.push(make_letter("c", "topic-A", FailureReason::Timeout, 0))
373 .unwrap();
374
375 let found = q.find_by_topic("topic-A");
376 assert_eq!(found.len(), 2);
377 assert!(found.iter().all(|l| l.original_topic == "topic-A"));
378 }
379
380 #[test]
381 fn test_find_by_topic_none_matching() {
382 let mut q = default_queue();
383 q.push(make_letter("a", "topic-A", FailureReason::Timeout, 0))
384 .unwrap();
385 let found = q.find_by_topic("topic-Z");
386 assert!(found.is_empty());
387 }
388
389 #[test]
390 fn test_find_by_topic_empty_queue() {
391 let q = default_queue();
392 assert!(q.find_by_topic("anything").is_empty());
393 }
394
395 #[test]
398 fn test_replay_removes_and_returns_entry() {
399 let mut q = default_queue();
400 q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
401 .unwrap();
402 q.push(make_letter("m2", "t", FailureReason::Timeout, 0))
403 .unwrap();
404
405 let replayed = q.replay("m1").unwrap().unwrap();
406 assert_eq!(replayed.message_id, "m1");
407 assert_eq!(q.len(), 1);
408 assert_eq!(q.pop().unwrap().message_id, "m2");
409 }
410
411 #[test]
412 fn test_replay_missing_id_returns_none() {
413 let mut q = default_queue();
414 q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
415 .unwrap();
416 let result = q.replay("missing").unwrap();
417 assert!(result.is_none());
418 assert_eq!(q.len(), 1);
419 }
420
421 #[test]
422 fn test_replay_disabled_returns_error() {
423 let mut q = DeadLetterQueue::new(DlqConfig {
424 max_size: 10,
425 max_age_ms: 1_000,
426 enable_replay: false,
427 });
428 q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
429 .unwrap();
430 assert!(matches!(q.replay("m1"), Err(DlqError::ReplayDisabled)));
431 }
432
433 #[test]
434 fn test_replay_disabled_error_display() {
435 let err = DlqError::ReplayDisabled;
436 assert!(!err.to_string().is_empty());
437 }
438
439 #[test]
442 fn test_group_by_reason_counts() {
443 let mut q = default_queue();
444 q.push(make_letter("a", "t", FailureReason::Timeout, 0))
445 .unwrap();
446 q.push(make_letter("b", "t", FailureReason::Timeout, 0))
447 .unwrap();
448 q.push(make_letter("c", "t", FailureReason::MaxRetriesExceeded, 0))
449 .unwrap();
450
451 let groups = q.group_by_reason();
452 assert_eq!(groups["Timeout"], 2);
453 assert_eq!(groups["MaxRetriesExceeded"], 1);
454 }
455
456 #[test]
457 fn test_group_by_reason_empty_queue() {
458 let q = default_queue();
459 assert!(q.group_by_reason().is_empty());
460 }
461
462 #[test]
463 fn test_group_by_reason_processing_error() {
464 let mut q = default_queue();
465 q.push(make_letter(
466 "e",
467 "t",
468 FailureReason::ProcessingError("oops".to_string()),
469 0,
470 ))
471 .unwrap();
472 let groups = q.group_by_reason();
473 assert_eq!(groups.len(), 1);
474 assert!(groups.keys().any(|k| k.contains("ProcessingError")));
475 }
476
477 #[test]
480 fn test_total_received_increments_on_push() {
481 let mut q = default_queue();
482 assert_eq!(q.total_received(), 0);
483 q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
484 .unwrap();
485 assert_eq!(q.total_received(), 1);
486 q.push(make_letter("m2", "t", FailureReason::Timeout, 0))
487 .unwrap();
488 assert_eq!(q.total_received(), 2);
489 }
490
491 #[test]
492 fn test_total_received_does_not_decrement_on_pop() {
493 let mut q = default_queue();
494 q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
495 .unwrap();
496 q.pop();
497 assert_eq!(q.total_received(), 1);
498 }
499
500 #[test]
501 fn test_total_received_not_incremented_on_full_push() {
502 let mut q = DeadLetterQueue::new(DlqConfig {
503 max_size: 1,
504 ..DlqConfig::default()
505 });
506 q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
507 .unwrap();
508 let _ = q.push(make_letter("m2", "t", FailureReason::Timeout, 0));
510 assert_eq!(q.total_received(), 1);
511 }
512
513 #[test]
516 fn test_failure_reason_max_retries() {
517 let reason = FailureReason::MaxRetriesExceeded;
518 assert_eq!(reason.label(), "MaxRetriesExceeded");
519 }
520
521 #[test]
522 fn test_failure_reason_timeout() {
523 let reason = FailureReason::Timeout;
524 assert_eq!(reason.label(), "Timeout");
525 }
526
527 #[test]
528 fn test_failure_reason_processing_error() {
529 let reason = FailureReason::ProcessingError("bad data".to_string());
530 assert!(reason.label().contains("ProcessingError"));
531 assert!(reason.label().contains("bad data"));
532 }
533
534 #[test]
535 fn test_failure_reason_schema_validation_failed() {
536 let reason = FailureReason::SchemaValidationFailed;
537 assert_eq!(reason.label(), "SchemaValidationFailed");
538 }
539
540 #[test]
541 fn test_failure_reason_poison_message() {
542 let reason = FailureReason::PoisonMessage;
543 assert_eq!(reason.label(), "PoisonMessage");
544 }
545
546 #[test]
549 fn test_is_empty_initial() {
550 let q = default_queue();
551 assert!(q.is_empty());
552 assert_eq!(q.len(), 0);
553 }
554
555 #[test]
556 fn test_len_after_push_pop() {
557 let mut q = default_queue();
558 q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
559 .unwrap();
560 assert_eq!(q.len(), 1);
561 q.pop();
562 assert_eq!(q.len(), 0);
563 }
564
565 #[test]
566 fn test_metadata_preserved() {
567 let mut q = default_queue();
568 let mut meta = HashMap::new();
569 meta.insert("env".to_string(), "prod".to_string());
570 let letter = DeadLetter {
571 message_id: "m".to_string(),
572 payload: b"data".to_vec(),
573 original_topic: "topic".to_string(),
574 failure_reason: FailureReason::Timeout,
575 retry_count: 3,
576 first_failed_at: 100,
577 last_failed_at: 200,
578 metadata: meta.clone(),
579 };
580 q.push(letter).unwrap();
581 let popped = q.pop().unwrap();
582 assert_eq!(popped.metadata["env"], "prod");
583 }
584
585 #[test]
586 fn test_payload_preserved() {
587 let mut q = default_queue();
588 let payload = b"hello world".to_vec();
589 let letter = DeadLetter {
590 message_id: "p".to_string(),
591 payload: payload.clone(),
592 original_topic: "t".to_string(),
593 failure_reason: FailureReason::PoisonMessage,
594 retry_count: 0,
595 first_failed_at: 0,
596 last_failed_at: 0,
597 metadata: HashMap::new(),
598 };
599 q.push(letter).unwrap();
600 assert_eq!(q.pop().unwrap().payload, payload);
601 }
602
603 #[test]
604 fn test_retry_count_preserved() {
605 let mut q = default_queue();
606 let mut letter = make_letter("m", "t", FailureReason::Timeout, 0);
607 letter.retry_count = 42;
608 q.push(letter).unwrap();
609 assert_eq!(q.pop().unwrap().retry_count, 42);
610 }
611
612 #[test]
613 fn test_purge_then_push_succeeds() {
614 let mut q = DeadLetterQueue::new(DlqConfig {
615 max_size: 2,
616 max_age_ms: 100,
617 enable_replay: true,
618 });
619 q.push(make_letter("old", "t", FailureReason::Timeout, 0))
620 .unwrap();
621 q.push(make_letter("old2", "t", FailureReason::Timeout, 0))
622 .unwrap();
623 q.purge_expired(200);
625 q.push(make_letter("new", "t", FailureReason::Timeout, 150))
627 .unwrap();
628 assert_eq!(q.len(), 1);
629 }
630
631 #[test]
632 fn test_replay_middle_element() {
633 let mut q = default_queue();
634 q.push(make_letter("a", "t", FailureReason::Timeout, 0))
635 .unwrap();
636 q.push(make_letter("b", "t", FailureReason::Timeout, 0))
637 .unwrap();
638 q.push(make_letter("c", "t", FailureReason::Timeout, 0))
639 .unwrap();
640
641 let replayed = q.replay("b").unwrap().unwrap();
642 assert_eq!(replayed.message_id, "b");
643 assert_eq!(q.len(), 2);
644 assert_eq!(q.pop().unwrap().message_id, "a");
646 assert_eq!(q.pop().unwrap().message_id, "c");
647 }
648
649 #[test]
650 fn test_group_by_reason_all_variants() {
651 let mut q = DeadLetterQueue::new(DlqConfig {
652 max_size: 10,
653 ..DlqConfig::default()
654 });
655 q.push(make_letter("a", "t", FailureReason::MaxRetriesExceeded, 0))
656 .unwrap();
657 q.push(make_letter("b", "t", FailureReason::Timeout, 0))
658 .unwrap();
659 q.push(make_letter(
660 "c",
661 "t",
662 FailureReason::ProcessingError("e".into()),
663 0,
664 ))
665 .unwrap();
666 q.push(make_letter(
667 "d",
668 "t",
669 FailureReason::SchemaValidationFailed,
670 0,
671 ))
672 .unwrap();
673 q.push(make_letter("e", "t", FailureReason::PoisonMessage, 0))
674 .unwrap();
675
676 let groups = q.group_by_reason();
677 assert_eq!(groups.len(), 5);
678 assert_eq!(groups["MaxRetriesExceeded"], 1);
679 assert_eq!(groups["Timeout"], 1);
680 assert_eq!(groups["SchemaValidationFailed"], 1);
681 assert_eq!(groups["PoisonMessage"], 1);
682 }
683
684 #[test]
685 fn test_find_by_topic_all_same_topic() {
686 let mut q = default_queue();
687 for i in 0..4u8 {
688 q.push(make_letter(
689 &format!("m{i}"),
690 "same-topic",
691 FailureReason::Timeout,
692 0,
693 ))
694 .unwrap();
695 }
696 assert_eq!(q.find_by_topic("same-topic").len(), 4);
697 }
698
699 #[test]
700 fn test_failure_reason_equality() {
701 assert_eq!(FailureReason::Timeout, FailureReason::Timeout);
702 assert_ne!(FailureReason::Timeout, FailureReason::PoisonMessage);
703 assert_eq!(
704 FailureReason::ProcessingError("x".into()),
705 FailureReason::ProcessingError("x".into())
706 );
707 assert_ne!(
708 FailureReason::ProcessingError("x".into()),
709 FailureReason::ProcessingError("y".into())
710 );
711 }
712
713 #[test]
714 fn test_dlq_default_config() {
715 let cfg = DlqConfig::default();
716 assert!(cfg.max_size > 0);
717 assert!(cfg.max_age_ms > 0);
718 assert!(cfg.enable_replay);
719 }
720
721 #[test]
722 fn test_push_after_pop_succeeds_on_full_queue() {
723 let mut q = DeadLetterQueue::new(DlqConfig {
724 max_size: 1,
725 ..DlqConfig::default()
726 });
727 q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
728 .unwrap();
729 q.pop();
731 q.push(make_letter("m2", "t", FailureReason::Timeout, 0))
732 .unwrap();
733 assert_eq!(q.len(), 1);
734 }
735
736 #[test]
737 fn test_total_received_after_replay() {
738 let mut q = default_queue();
739 q.push(make_letter("m1", "t", FailureReason::Timeout, 0))
740 .unwrap();
741 let _ = q.replay("m1").unwrap();
742 assert_eq!(q.total_received(), 1);
744 }
745
746 #[test]
747 fn test_group_by_reason_two_of_same() {
748 let mut q = default_queue();
749 q.push(make_letter("a", "t", FailureReason::PoisonMessage, 0))
750 .unwrap();
751 q.push(make_letter("b", "t", FailureReason::PoisonMessage, 0))
752 .unwrap();
753 let groups = q.group_by_reason();
754 assert_eq!(groups.get("PoisonMessage").copied().unwrap_or(0), 2);
755 }
756
757 #[test]
758 fn test_find_by_topic_does_not_consume() {
759 let mut q = default_queue();
760 q.push(make_letter("m1", "topic-X", FailureReason::Timeout, 0))
761 .unwrap();
762 let found = q.find_by_topic("topic-X");
763 assert_eq!(found.len(), 1);
764 assert_eq!(q.len(), 1);
766 }
767
768 #[test]
769 fn test_first_failed_at_preserved() {
770 let mut q = default_queue();
771 let mut letter = make_letter("m", "t", FailureReason::Timeout, 500);
772 letter.first_failed_at = 100;
773 q.push(letter).unwrap();
774 let popped = q.pop().unwrap();
775 assert_eq!(popped.first_failed_at, 100);
776 assert_eq!(popped.last_failed_at, 500);
777 }
778
779 #[test]
780 fn test_replay_on_empty_queue_returns_none() {
781 let mut q = default_queue();
782 let result = q.replay("nonexistent").unwrap();
783 assert!(result.is_none());
784 }
785
786 #[test]
787 fn test_purge_returns_correct_count() {
788 let mut q = DeadLetterQueue::new(DlqConfig {
789 max_size: 10,
790 max_age_ms: 100,
791 enable_replay: true,
792 });
793 for i in 0..5u64 {
794 q.push(make_letter(
796 &format!("old{i}"),
797 "t",
798 FailureReason::Timeout,
799 i,
800 ))
801 .unwrap();
802 }
803 for i in 0..3u64 {
804 q.push(make_letter(
806 &format!("new{i}"),
807 "t",
808 FailureReason::Timeout,
809 950 + i,
810 ))
811 .unwrap();
812 }
813 let removed = q.purge_expired(1000);
814 assert_eq!(removed, 5);
815 assert_eq!(q.len(), 3);
816 }
817}