1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
use crate::interceptor::SendMode;
use crate::node::ActorId;
use std::any::Any;
/// Reason a message became a dead letter.
#[derive(Debug, Clone)]
pub enum DeadLetterReason {
/// The target actor has stopped.
ActorStopped,
/// The target actor was not found.
ActorNotFound,
/// The mailbox was full and overflow strategy rejected the message.
MailboxFull,
/// An interceptor dropped the message or stream item.
DroppedByInterceptor {
/// Name of the interceptor that dropped the message.
interceptor: String,
},
/// A wire interceptor dropped the envelope before message body deserialization.
WireInterceptorDrop {
/// Name of the wire interceptor that dropped the envelope.
interceptor: String,
},
/// A wire interceptor rejected the envelope before message body deserialization.
WireInterceptorReject {
/// Name of the wire interceptor that rejected the envelope.
interceptor: String,
/// Reason for rejection.
reason: String,
},
}
impl std::fmt::Display for DeadLetterReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ActorStopped => write!(f, "actor stopped"),
Self::ActorNotFound => write!(f, "actor not found"),
Self::MailboxFull => write!(f, "mailbox full"),
Self::DroppedByInterceptor { interceptor } => {
write!(f, "dropped by interceptor '{}'", interceptor)
}
Self::WireInterceptorDrop { interceptor } => {
write!(f, "dropped by wire interceptor '{}'", interceptor)
}
Self::WireInterceptorReject {
interceptor,
reason,
} => {
write!(
f,
"rejected by wire interceptor '{}': {}",
interceptor, reason
)
}
}
}
}
/// Information about a dead letter — a message that could not be delivered.
#[derive(Debug)]
pub struct DeadLetterEvent {
/// The intended target actor.
pub target_id: ActorId,
/// The target actor's name (if known).
pub target_name: Option<String>,
/// The Rust type name of the message.
pub message_type: &'static str,
/// How the message was sent.
pub send_mode: SendMode,
/// Why the message could not be delivered.
pub reason: DeadLetterReason,
/// The message body (type-erased). May be None if the message was consumed.
pub message: Option<Box<dyn Any + Send>>,
}
/// Handler for dead letter events. Registered on the runtime.
///
/// The default implementation logs dead letters via `tracing::warn!`.
/// Applications can provide custom handlers for monitoring, alerting,
/// or forwarding dead letters to a retry queue.
pub trait DeadLetterHandler: Send + Sync + 'static {
/// Called when a message cannot be delivered.
fn on_dead_letter(&self, event: DeadLetterEvent);
}
/// Default dead letter handler that logs via tracing.
pub struct LoggingDeadLetterHandler;
impl DeadLetterHandler for LoggingDeadLetterHandler {
fn on_dead_letter(&self, event: DeadLetterEvent) {
tracing::warn!(
target = %event.target_id,
message_type = event.message_type,
send_mode = ?event.send_mode,
reason = %event.reason,
"dead letter"
);
}
}
/// A dead letter handler that collects events for testing.
pub struct CollectingDeadLetterHandler {
events: std::sync::Mutex<Vec<DeadLetterInfo>>,
}
/// Simplified dead letter info for test assertions (without the type-erased message).
#[derive(Debug, Clone)]
pub struct DeadLetterInfo {
/// The intended target actor.
pub target_id: ActorId,
/// The target actor's name (if known).
pub target_name: Option<String>,
/// The Rust type name of the message.
pub message_type: String,
/// How the message was sent.
pub send_mode: SendMode,
/// Why the message could not be delivered.
pub reason: DeadLetterReason,
}
impl CollectingDeadLetterHandler {
/// Create a new empty collecting handler.
pub fn new() -> Self {
Self {
events: std::sync::Mutex::new(Vec::new()),
}
}
/// Get all collected dead letter events.
pub fn events(&self) -> Vec<DeadLetterInfo> {
self.events.lock().unwrap().clone()
}
/// Number of collected events.
pub fn count(&self) -> usize {
self.events.lock().unwrap().len()
}
/// Clear collected events.
pub fn clear(&self) {
self.events.lock().unwrap().clear();
}
}
impl Default for CollectingDeadLetterHandler {
fn default() -> Self {
Self::new()
}
}
impl DeadLetterHandler for CollectingDeadLetterHandler {
fn on_dead_letter(&self, event: DeadLetterEvent) {
self.events.lock().unwrap().push(DeadLetterInfo {
target_id: event.target_id,
target_name: event.target_name,
message_type: event.message_type.to_string(),
send_mode: event.send_mode,
reason: event.reason,
});
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::node::NodeId;
#[test]
fn test_dead_letter_reason_display() {
assert_eq!(
format!("{}", DeadLetterReason::ActorStopped),
"actor stopped"
);
assert_eq!(format!("{}", DeadLetterReason::MailboxFull), "mailbox full");
assert_eq!(
format!(
"{}",
DeadLetterReason::DroppedByInterceptor {
interceptor: "auth".into()
}
),
"dropped by interceptor 'auth'"
);
}
#[test]
fn test_logging_handler_does_not_panic() {
let handler = LoggingDeadLetterHandler;
handler.on_dead_letter(DeadLetterEvent {
target_id: ActorId {
node: NodeId("n1".into()),
local: 1,
},
target_name: Some("test-actor".into()),
message_type: "MyMsg",
send_mode: SendMode::Tell,
reason: DeadLetterReason::ActorStopped,
message: None,
});
}
#[test]
fn test_collecting_handler() {
let handler = CollectingDeadLetterHandler::new();
handler.on_dead_letter(DeadLetterEvent {
target_id: ActorId {
node: NodeId("n1".into()),
local: 1,
},
target_name: Some("actor-1".into()),
message_type: "Increment",
send_mode: SendMode::Tell,
reason: DeadLetterReason::ActorStopped,
message: None,
});
handler.on_dead_letter(DeadLetterEvent {
target_id: ActorId {
node: NodeId("n1".into()),
local: 2,
},
target_name: Some("actor-2".into()),
message_type: "GetCount",
send_mode: SendMode::Ask,
reason: DeadLetterReason::MailboxFull,
message: None,
});
assert_eq!(handler.count(), 2);
let events = handler.events();
assert_eq!(events[0].message_type, "Increment");
assert_eq!(events[1].reason.to_string(), "mailbox full");
handler.clear();
assert_eq!(handler.count(), 0);
}
#[test]
fn test_dead_letter_with_message_body() {
let handler = CollectingDeadLetterHandler::new();
handler.on_dead_letter(DeadLetterEvent {
target_id: ActorId {
node: NodeId("n1".into()),
local: 1,
},
target_name: None,
message_type: "MyMsg",
send_mode: SendMode::Tell,
reason: DeadLetterReason::ActorNotFound,
message: Some(Box::new(42u64)),
});
assert_eq!(handler.count(), 1);
}
}