1use std::collections::VecDeque;
11use std::time::{Duration, Instant};
12
13use crate::message::MessageEnvelope;
14use crate::runtime::KernelId;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum DeadLetterReason {
19 MaxRetriesExceeded {
21 retries: u32,
23 max: u32,
25 },
26 ActorNotFound {
28 actor_name: String,
30 },
31 QueueFull {
33 queue_capacity: usize,
35 },
36 TtlExpired {
38 age: Duration,
40 },
41 ActorDestroyed {
43 actor_id: String,
45 },
46 Rejected {
48 reason: String,
50 },
51}
52
53impl std::fmt::Display for DeadLetterReason {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 match self {
56 Self::MaxRetriesExceeded { retries, max } => {
57 write!(f, "max retries exceeded ({}/{})", retries, max)
58 }
59 Self::ActorNotFound { actor_name } => write!(f, "actor not found: {}", actor_name),
60 Self::QueueFull { queue_capacity } => {
61 write!(f, "queue full (capacity {})", queue_capacity)
62 }
63 Self::TtlExpired { age } => write!(f, "TTL expired (age {:?})", age),
64 Self::ActorDestroyed { actor_id } => write!(f, "actor destroyed: {}", actor_id),
65 Self::Rejected { reason } => write!(f, "rejected: {}", reason),
66 }
67 }
68}
69
70#[derive(Debug, Clone)]
72pub struct DeadLetter {
73 pub envelope: MessageEnvelope,
75 pub reason: DeadLetterReason,
77 pub destination: KernelId,
79 pub arrived_at: Instant,
81 pub attempts: u32,
83 pub sequence: u64,
85}
86
87#[derive(Debug, Clone)]
89pub struct DlqConfig {
90 pub max_size: usize,
92 pub max_age: Duration,
94 pub max_retries: u32,
96 pub log_entries: bool,
98}
99
100impl Default for DlqConfig {
101 fn default() -> Self {
102 Self {
103 max_size: 10_000,
104 max_age: Duration::from_secs(3600), max_retries: 3,
106 log_entries: true,
107 }
108 }
109}
110
111pub struct DeadLetterQueue {
113 letters: VecDeque<DeadLetter>,
115 config: DlqConfig,
117 next_sequence: u64,
119 total_received: u64,
121 total_replayed: u64,
122 total_expired: u64,
123}
124
125impl DeadLetterQueue {
126 pub fn new(config: DlqConfig) -> Self {
128 Self {
129 letters: VecDeque::new(),
130 config,
131 next_sequence: 0,
132 total_received: 0,
133 total_replayed: 0,
134 total_expired: 0,
135 }
136 }
137
138 pub fn enqueue(
140 &mut self,
141 envelope: MessageEnvelope,
142 reason: DeadLetterReason,
143 destination: KernelId,
144 attempts: u32,
145 ) {
146 let seq = self.next_sequence;
147 self.next_sequence += 1;
148 self.total_received += 1;
149
150 if self.config.log_entries {
151 tracing::warn!(
152 sequence = seq,
153 destination = %destination,
154 reason = %reason,
155 attempts = attempts,
156 "Message routed to dead letter queue"
157 );
158 }
159
160 let letter = DeadLetter {
161 envelope,
162 reason,
163 destination,
164 arrived_at: Instant::now(),
165 attempts,
166 sequence: seq,
167 };
168
169 self.letters.push_back(letter);
170
171 while self.letters.len() > self.config.max_size {
173 self.letters.pop_front();
174 self.total_expired += 1;
175 }
176 }
177
178 pub fn replay<F>(&mut self, filter: F) -> Vec<DeadLetter>
182 where
183 F: Fn(&DeadLetter) -> bool,
184 {
185 let mut replayed = Vec::new();
186 let mut remaining = VecDeque::new();
187
188 for letter in self.letters.drain(..) {
189 if filter(&letter) {
190 self.total_replayed += 1;
191 replayed.push(letter);
192 } else {
193 remaining.push_back(letter);
194 }
195 }
196
197 self.letters = remaining;
198 replayed
199 }
200
201 pub fn replay_for(&mut self, destination: &KernelId) -> Vec<DeadLetter> {
203 let dest = destination.clone();
204 self.replay(move |letter| letter.destination == dest)
205 }
206
207 pub fn expire_old(&mut self) -> u64 {
209 let max_age = self.config.max_age;
210 let before = self.letters.len();
211
212 self.letters
213 .retain(|letter| letter.arrived_at.elapsed() < max_age);
214
215 let expired = (before - self.letters.len()) as u64;
216 self.total_expired += expired;
217 expired
218 }
219
220 pub fn browse(&self, limit: usize) -> Vec<&DeadLetter> {
222 self.letters.iter().take(limit).collect()
223 }
224
225 pub fn len(&self) -> usize {
227 self.letters.len()
228 }
229
230 pub fn is_empty(&self) -> bool {
232 self.letters.is_empty()
233 }
234
235 pub fn clear(&mut self) {
237 self.letters.clear();
238 }
239
240 pub fn metrics(&self) -> DlqMetrics {
242 let oldest_age = self
243 .letters
244 .front()
245 .map(|l| l.arrived_at.elapsed())
246 .unwrap_or_default();
247
248 DlqMetrics {
249 depth: self.letters.len() as u64,
250 total_received: self.total_received,
251 total_replayed: self.total_replayed,
252 total_expired: self.total_expired,
253 oldest_age,
254 }
255 }
256
257 pub fn config(&self) -> &DlqConfig {
259 &self.config
260 }
261}
262
263#[derive(Debug, Clone)]
265pub struct DlqMetrics {
266 pub depth: u64,
268 pub total_received: u64,
270 pub total_replayed: u64,
272 pub total_expired: u64,
274 pub oldest_age: Duration,
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use crate::hlc::HlcTimestamp;
282 use crate::message::MessageHeader;
283
284 fn test_envelope() -> MessageEnvelope {
285 MessageEnvelope {
286 header: MessageHeader::new(1, 0, 1, 64, HlcTimestamp::now(1)),
287 payload: vec![42u8; 64],
288 provenance: None,
289 tenant_id: 0,
290 audit_tag: crate::k2k::audit_tag::AuditTag::unspecified(),
291 }
292 }
293
294 #[test]
295 fn test_dlq_enqueue_and_browse() {
296 let mut dlq = DeadLetterQueue::new(DlqConfig {
297 log_entries: false,
298 ..Default::default()
299 });
300
301 dlq.enqueue(
302 test_envelope(),
303 DeadLetterReason::MaxRetriesExceeded { retries: 3, max: 3 },
304 KernelId::new("actor_a"),
305 3,
306 );
307
308 assert_eq!(dlq.len(), 1);
309 let letters = dlq.browse(10);
310 assert_eq!(letters.len(), 1);
311 assert_eq!(letters[0].destination, KernelId::new("actor_a"));
312 }
313
314 #[test]
315 fn test_dlq_replay() {
316 let mut dlq = DeadLetterQueue::new(DlqConfig {
317 log_entries: false,
318 ..Default::default()
319 });
320
321 dlq.enqueue(
322 test_envelope(),
323 DeadLetterReason::QueueFull {
324 queue_capacity: 256,
325 },
326 KernelId::new("a"),
327 1,
328 );
329 dlq.enqueue(
330 test_envelope(),
331 DeadLetterReason::QueueFull {
332 queue_capacity: 256,
333 },
334 KernelId::new("b"),
335 1,
336 );
337 dlq.enqueue(
338 test_envelope(),
339 DeadLetterReason::QueueFull {
340 queue_capacity: 256,
341 },
342 KernelId::new("a"),
343 1,
344 );
345
346 let replayed = dlq.replay_for(&KernelId::new("a"));
348 assert_eq!(replayed.len(), 2);
349 assert_eq!(dlq.len(), 1); }
351
352 #[test]
353 fn test_dlq_max_size() {
354 let mut dlq = DeadLetterQueue::new(DlqConfig {
355 max_size: 3,
356 log_entries: false,
357 ..Default::default()
358 });
359
360 for i in 0..5 {
361 dlq.enqueue(
362 test_envelope(),
363 DeadLetterReason::ActorNotFound {
364 actor_name: format!("a{}", i),
365 },
366 KernelId::new(&format!("k{}", i)),
367 1,
368 );
369 }
370
371 assert_eq!(dlq.len(), 3); let metrics = dlq.metrics();
373 assert_eq!(metrics.total_received, 5);
374 assert_eq!(metrics.total_expired, 2);
375 }
376
377 #[test]
378 fn test_dlq_replay_with_filter() {
379 let mut dlq = DeadLetterQueue::new(DlqConfig {
380 log_entries: false,
381 ..Default::default()
382 });
383
384 dlq.enqueue(
385 test_envelope(),
386 DeadLetterReason::MaxRetriesExceeded { retries: 3, max: 3 },
387 KernelId::new("a"),
388 3,
389 );
390 dlq.enqueue(
391 test_envelope(),
392 DeadLetterReason::QueueFull {
393 queue_capacity: 256,
394 },
395 KernelId::new("a"),
396 1,
397 );
398
399 let replayed = dlq.replay(|l| matches!(l.reason, DeadLetterReason::QueueFull { .. }));
401 assert_eq!(replayed.len(), 1);
402 assert_eq!(dlq.len(), 1);
403 }
404
405 #[test]
406 fn test_dlq_metrics() {
407 let mut dlq = DeadLetterQueue::new(DlqConfig {
408 log_entries: false,
409 ..Default::default()
410 });
411
412 dlq.enqueue(
413 test_envelope(),
414 DeadLetterReason::Rejected {
415 reason: "test".into(),
416 },
417 KernelId::new("a"),
418 1,
419 );
420 dlq.enqueue(
421 test_envelope(),
422 DeadLetterReason::Rejected {
423 reason: "test".into(),
424 },
425 KernelId::new("b"),
426 1,
427 );
428 dlq.replay_for(&KernelId::new("a"));
429
430 let m = dlq.metrics();
431 assert_eq!(m.total_received, 2);
432 assert_eq!(m.total_replayed, 1);
433 assert_eq!(m.depth, 1);
434 }
435
436 #[test]
437 fn test_dead_letter_reason_display() {
438 let r = DeadLetterReason::MaxRetriesExceeded { retries: 3, max: 3 };
439 assert!(format!("{}", r).contains("3/3"));
440
441 let r = DeadLetterReason::ActorNotFound {
442 actor_name: "test".into(),
443 };
444 assert!(format!("{}", r).contains("test"));
445 }
446}