Skip to main content

atomr_core/event/
dead_letters.rs

1//! Dead-letter sink.
2
3use std::any::Any;
4use std::sync::Arc;
5
6use parking_lot::Mutex;
7
8use crate::actor::ActorPath;
9
10/// Why a message ended up at the dead-letter sink.
11/// `Event/AllDeadLetters.cs`, `Event/DroppedMessage.cs`,
12/// `Event/SuppressedDeadLetter.cs`.
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
14pub enum DeadLetterReason {
15    /// Default — no recipient (terminated, unknown, or never existed).
16    #[default]
17    NoRecipient,
18    /// Message was dropped due to mailbox overflow.
19    Dropped,
20    /// Suppressed by upstream policy (e.g. system messages after stop)
21    /// — used to keep the dead-letter log readable.
22    Suppressed,
23}
24
25/// Filter applied to incoming dead letters. The default filter accepts
26/// every reason; tighter filters drop noisy categories (e.g. logging
27/// dropped-message bursts only at `tracing::trace`).
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub struct DeadLetterFilter {
30    pub accept_no_recipient: bool,
31    pub accept_dropped: bool,
32    pub accept_suppressed: bool,
33}
34
35impl Default for DeadLetterFilter {
36    fn default() -> Self {
37        Self { accept_no_recipient: true, accept_dropped: true, accept_suppressed: false }
38    }
39}
40
41impl DeadLetterFilter {
42    pub fn accepts(&self, reason: DeadLetterReason) -> bool {
43        match reason {
44            DeadLetterReason::NoRecipient => self.accept_no_recipient,
45            DeadLetterReason::Dropped => self.accept_dropped,
46            DeadLetterReason::Suppressed => self.accept_suppressed,
47        }
48    }
49}
50
51#[derive(Debug)]
52pub struct DeadLetter {
53    pub recipient: ActorPath,
54    pub sender: Option<ActorPath>,
55    pub message: Box<dyn Any + Send>,
56    pub reason: DeadLetterReason,
57}
58
59#[derive(Clone)]
60pub struct DeadLettersSink {
61    buf: Arc<Mutex<Vec<DeadLetter>>>,
62    filter: Arc<Mutex<DeadLetterFilter>>,
63}
64
65impl Default for DeadLettersSink {
66    fn default() -> Self {
67        Self {
68            buf: Arc::new(Mutex::new(Vec::new())),
69            filter: Arc::new(Mutex::new(DeadLetterFilter::default())),
70        }
71    }
72}
73
74impl DeadLettersSink {
75    pub fn new() -> Self {
76        Self::default()
77    }
78
79    /// Replace the active filter. Subsequent `push` calls consult it.
80    pub fn set_filter(&self, f: DeadLetterFilter) {
81        *self.filter.lock() = f;
82    }
83
84    pub fn filter(&self) -> DeadLetterFilter {
85        *self.filter.lock()
86    }
87
88    pub fn push(&self, dl: DeadLetter) {
89        let f = *self.filter.lock();
90        if !f.accepts(dl.reason) {
91            return;
92        }
93        tracing::warn!(recipient = %dl.recipient, reason = ?dl.reason, "dead letter");
94        self.buf.lock().push(dl);
95    }
96
97    pub fn drain(&self) -> Vec<DeadLetter> {
98        std::mem::take(&mut *self.buf.lock())
99    }
100
101    pub fn len(&self) -> usize {
102        self.buf.lock().len()
103    }
104
105    pub fn is_empty(&self) -> bool {
106        self.buf.lock().is_empty()
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113    use crate::actor::Address;
114
115    fn make(reason: DeadLetterReason) -> DeadLetter {
116        DeadLetter {
117            recipient: ActorPath::root(Address::local("S")).child("x"),
118            sender: None,
119            message: Box::new(1u32),
120            reason,
121        }
122    }
123
124    #[test]
125    fn stores_dead_letters() {
126        let s = DeadLettersSink::new();
127        s.push(make(DeadLetterReason::NoRecipient));
128        assert_eq!(s.len(), 1);
129        let d = s.drain();
130        assert_eq!(d.len(), 1);
131    }
132
133    #[test]
134    fn default_filter_drops_suppressed() {
135        let s = DeadLettersSink::new();
136        s.push(make(DeadLetterReason::Suppressed));
137        assert!(s.is_empty(), "default filter should drop Suppressed");
138    }
139
140    #[test]
141    fn custom_filter_drops_dropped_category() {
142        let s = DeadLettersSink::new();
143        s.set_filter(DeadLetterFilter {
144            accept_no_recipient: true,
145            accept_dropped: false,
146            accept_suppressed: false,
147        });
148        s.push(make(DeadLetterReason::Dropped));
149        assert!(s.is_empty());
150        s.push(make(DeadLetterReason::NoRecipient));
151        assert_eq!(s.len(), 1);
152    }
153}