atomr_core/event/
dead_letters.rs1use std::any::Any;
4use std::sync::Arc;
5
6use parking_lot::Mutex;
7
8use crate::actor::ActorPath;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
14pub enum DeadLetterReason {
15 #[default]
17 NoRecipient,
18 Dropped,
20 Suppressed,
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub struct DeadLetterFilter {
30 pub accept_no_recipient: bool,
31 pub accept_dropped: bool,
32 pub accept_suppressed: bool,
33}
34
35impl Default for DeadLetterFilter {
36 fn default() -> Self {
37 Self { accept_no_recipient: true, accept_dropped: true, accept_suppressed: false }
38 }
39}
40
41impl DeadLetterFilter {
42 pub fn accepts(&self, reason: DeadLetterReason) -> bool {
43 match reason {
44 DeadLetterReason::NoRecipient => self.accept_no_recipient,
45 DeadLetterReason::Dropped => self.accept_dropped,
46 DeadLetterReason::Suppressed => self.accept_suppressed,
47 }
48 }
49}
50
51#[derive(Debug)]
52pub struct DeadLetter {
53 pub recipient: ActorPath,
54 pub sender: Option<ActorPath>,
55 pub message: Box<dyn Any + Send>,
56 pub reason: DeadLetterReason,
57}
58
59#[derive(Clone)]
60pub struct DeadLettersSink {
61 buf: Arc<Mutex<Vec<DeadLetter>>>,
62 filter: Arc<Mutex<DeadLetterFilter>>,
63}
64
65impl Default for DeadLettersSink {
66 fn default() -> Self {
67 Self {
68 buf: Arc::new(Mutex::new(Vec::new())),
69 filter: Arc::new(Mutex::new(DeadLetterFilter::default())),
70 }
71 }
72}
73
74impl DeadLettersSink {
75 pub fn new() -> Self {
76 Self::default()
77 }
78
79 pub fn set_filter(&self, f: DeadLetterFilter) {
81 *self.filter.lock() = f;
82 }
83
84 pub fn filter(&self) -> DeadLetterFilter {
85 *self.filter.lock()
86 }
87
88 pub fn push(&self, dl: DeadLetter) {
89 let f = *self.filter.lock();
90 if !f.accepts(dl.reason) {
91 return;
92 }
93 tracing::warn!(recipient = %dl.recipient, reason = ?dl.reason, "dead letter");
94 self.buf.lock().push(dl);
95 }
96
97 pub fn drain(&self) -> Vec<DeadLetter> {
98 std::mem::take(&mut *self.buf.lock())
99 }
100
101 pub fn len(&self) -> usize {
102 self.buf.lock().len()
103 }
104
105 pub fn is_empty(&self) -> bool {
106 self.buf.lock().is_empty()
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113 use crate::actor::Address;
114
115 fn make(reason: DeadLetterReason) -> DeadLetter {
116 DeadLetter {
117 recipient: ActorPath::root(Address::local("S")).child("x"),
118 sender: None,
119 message: Box::new(1u32),
120 reason,
121 }
122 }
123
124 #[test]
125 fn stores_dead_letters() {
126 let s = DeadLettersSink::new();
127 s.push(make(DeadLetterReason::NoRecipient));
128 assert_eq!(s.len(), 1);
129 let d = s.drain();
130 assert_eq!(d.len(), 1);
131 }
132
133 #[test]
134 fn default_filter_drops_suppressed() {
135 let s = DeadLettersSink::new();
136 s.push(make(DeadLetterReason::Suppressed));
137 assert!(s.is_empty(), "default filter should drop Suppressed");
138 }
139
140 #[test]
141 fn custom_filter_drops_dropped_category() {
142 let s = DeadLettersSink::new();
143 s.set_filter(DeadLetterFilter {
144 accept_no_recipient: true,
145 accept_dropped: false,
146 accept_suppressed: false,
147 });
148 s.push(make(DeadLetterReason::Dropped));
149 assert!(s.is_empty());
150 s.push(make(DeadLetterReason::NoRecipient));
151 assert_eq!(s.len(), 1);
152 }
153}