Skip to main content

ringkernel_core/
dlq.rs

1//! Dead Letter Queue — FR-004
2//!
3//! Persistent DLQ for failed/undeliverable messages:
4//! - Messages that fail processing N times → routed to DLQ
5//! - DLQ stored in memory (can be persisted via checkpoint)
6//! - Replay API: re-inject messages to specified destination
7//! - Per-actor DLQ configuration (max retries, TTL)
8//! - DLQ metrics: depth, ingress rate, age distribution
9
10use std::collections::VecDeque;
11use std::time::{Duration, Instant};
12
13use crate::message::MessageEnvelope;
14use crate::runtime::KernelId;
15
16/// Reason a message was sent to the dead letter queue.
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum DeadLetterReason {
19    /// Message processing failed after max retries.
20    MaxRetriesExceeded {
21        /// Number of retries attempted.
22        retries: u32,
23        /// Maximum retries allowed.
24        max: u32,
25    },
26    /// Destination actor not found.
27    ActorNotFound {
28        /// Name of the actor that was not found.
29        actor_name: String,
30    },
31    /// Queue was full and overflow policy is DropToDlq.
32    QueueFull {
33        /// Capacity of the full queue.
34        queue_capacity: usize,
35    },
36    /// Message TTL expired before delivery.
37    TtlExpired {
38        /// Age of the message when it expired.
39        age: Duration,
40    },
41    /// Actor was destroyed while message was in-flight.
42    ActorDestroyed {
43        /// ID of the destroyed actor.
44        actor_id: String,
45    },
46    /// Explicit rejection by the actor's message handler.
47    Rejected {
48        /// Reason for rejection.
49        reason: String,
50    },
51}
52
53impl std::fmt::Display for DeadLetterReason {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        match self {
56            Self::MaxRetriesExceeded { retries, max } => {
57                write!(f, "max retries exceeded ({}/{})", retries, max)
58            }
59            Self::ActorNotFound { actor_name } => write!(f, "actor not found: {}", actor_name),
60            Self::QueueFull { queue_capacity } => {
61                write!(f, "queue full (capacity {})", queue_capacity)
62            }
63            Self::TtlExpired { age } => write!(f, "TTL expired (age {:?})", age),
64            Self::ActorDestroyed { actor_id } => write!(f, "actor destroyed: {}", actor_id),
65            Self::Rejected { reason } => write!(f, "rejected: {}", reason),
66        }
67    }
68}
69
70/// A message in the dead letter queue with metadata.
71#[derive(Debug, Clone)]
72pub struct DeadLetter {
73    /// The original message.
74    pub envelope: MessageEnvelope,
75    /// Why it was sent to the DLQ.
76    pub reason: DeadLetterReason,
77    /// Original destination.
78    pub destination: KernelId,
79    /// When the message arrived in the DLQ.
80    pub arrived_at: Instant,
81    /// Number of delivery attempts before DLQ.
82    pub attempts: u32,
83    /// Unique DLQ sequence number.
84    pub sequence: u64,
85}
86
87/// Configuration for dead letter queue behavior.
88#[derive(Debug, Clone)]
89pub struct DlqConfig {
90    /// Maximum number of messages in the DLQ.
91    pub max_size: usize,
92    /// Maximum age of a message in the DLQ before automatic expiry.
93    pub max_age: Duration,
94    /// Maximum delivery retries before routing to DLQ.
95    pub max_retries: u32,
96    /// Whether to log each DLQ entry.
97    pub log_entries: bool,
98}
99
100impl Default for DlqConfig {
101    fn default() -> Self {
102        Self {
103            max_size: 10_000,
104            max_age: Duration::from_secs(3600), // 1 hour
105            max_retries: 3,
106            log_entries: true,
107        }
108    }
109}
110
111/// Dead letter queue.
112pub struct DeadLetterQueue {
113    /// Queued dead letters.
114    letters: VecDeque<DeadLetter>,
115    /// Configuration.
116    config: DlqConfig,
117    /// Sequence counter.
118    next_sequence: u64,
119    /// Metrics.
120    total_received: u64,
121    total_replayed: u64,
122    total_expired: u64,
123}
124
125impl DeadLetterQueue {
126    /// Create a new DLQ with the given configuration.
127    pub fn new(config: DlqConfig) -> Self {
128        Self {
129            letters: VecDeque::new(),
130            config,
131            next_sequence: 0,
132            total_received: 0,
133            total_replayed: 0,
134            total_expired: 0,
135        }
136    }
137
138    /// Route a message to the DLQ.
139    pub fn enqueue(
140        &mut self,
141        envelope: MessageEnvelope,
142        reason: DeadLetterReason,
143        destination: KernelId,
144        attempts: u32,
145    ) {
146        let seq = self.next_sequence;
147        self.next_sequence += 1;
148        self.total_received += 1;
149
150        if self.config.log_entries {
151            tracing::warn!(
152                sequence = seq,
153                destination = %destination,
154                reason = %reason,
155                attempts = attempts,
156                "Message routed to dead letter queue"
157            );
158        }
159
160        let letter = DeadLetter {
161            envelope,
162            reason,
163            destination,
164            arrived_at: Instant::now(),
165            attempts,
166            sequence: seq,
167        };
168
169        self.letters.push_back(letter);
170
171        // Evict oldest if over capacity
172        while self.letters.len() > self.config.max_size {
173            self.letters.pop_front();
174            self.total_expired += 1;
175        }
176    }
177
178    /// Replay messages from the DLQ matching a filter.
179    ///
180    /// Returns messages removed from the DLQ for re-delivery.
181    pub fn replay<F>(&mut self, filter: F) -> Vec<DeadLetter>
182    where
183        F: Fn(&DeadLetter) -> bool,
184    {
185        let mut replayed = Vec::new();
186        let mut remaining = VecDeque::new();
187
188        for letter in self.letters.drain(..) {
189            if filter(&letter) {
190                self.total_replayed += 1;
191                replayed.push(letter);
192            } else {
193                remaining.push_back(letter);
194            }
195        }
196
197        self.letters = remaining;
198        replayed
199    }
200
201    /// Replay all messages destined for a specific actor.
202    pub fn replay_for(&mut self, destination: &KernelId) -> Vec<DeadLetter> {
203        let dest = destination.clone();
204        self.replay(move |letter| letter.destination == dest)
205    }
206
207    /// Expire messages older than max_age.
208    pub fn expire_old(&mut self) -> u64 {
209        let max_age = self.config.max_age;
210        let before = self.letters.len();
211
212        self.letters
213            .retain(|letter| letter.arrived_at.elapsed() < max_age);
214
215        let expired = (before - self.letters.len()) as u64;
216        self.total_expired += expired;
217        expired
218    }
219
220    /// Browse the DLQ contents (non-destructive).
221    pub fn browse(&self, limit: usize) -> Vec<&DeadLetter> {
222        self.letters.iter().take(limit).collect()
223    }
224
225    /// Number of messages in the DLQ.
226    pub fn len(&self) -> usize {
227        self.letters.len()
228    }
229
230    /// Check if empty.
231    pub fn is_empty(&self) -> bool {
232        self.letters.is_empty()
233    }
234
235    /// Clear the DLQ.
236    pub fn clear(&mut self) {
237        self.letters.clear();
238    }
239
240    /// Get DLQ metrics.
241    pub fn metrics(&self) -> DlqMetrics {
242        let oldest_age = self
243            .letters
244            .front()
245            .map(|l| l.arrived_at.elapsed())
246            .unwrap_or_default();
247
248        DlqMetrics {
249            depth: self.letters.len() as u64,
250            total_received: self.total_received,
251            total_replayed: self.total_replayed,
252            total_expired: self.total_expired,
253            oldest_age,
254        }
255    }
256
257    /// Get the DLQ configuration.
258    pub fn config(&self) -> &DlqConfig {
259        &self.config
260    }
261}
262
263/// DLQ metrics snapshot.
264#[derive(Debug, Clone)]
265pub struct DlqMetrics {
266    /// Current queue depth.
267    pub depth: u64,
268    /// Total messages received by DLQ (lifetime).
269    pub total_received: u64,
270    /// Total messages successfully replayed.
271    pub total_replayed: u64,
272    /// Total messages expired.
273    pub total_expired: u64,
274    /// Age of the oldest message in the DLQ.
275    pub oldest_age: Duration,
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use crate::hlc::HlcTimestamp;
282    use crate::message::MessageHeader;
283
284    fn test_envelope() -> MessageEnvelope {
285        MessageEnvelope {
286            header: MessageHeader::new(1, 0, 1, 64, HlcTimestamp::now(1)),
287            payload: vec![42u8; 64],
288            provenance: None,
289            tenant_id: 0,
290            audit_tag: crate::k2k::audit_tag::AuditTag::unspecified(),
291        }
292    }
293
294    #[test]
295    fn test_dlq_enqueue_and_browse() {
296        let mut dlq = DeadLetterQueue::new(DlqConfig {
297            log_entries: false,
298            ..Default::default()
299        });
300
301        dlq.enqueue(
302            test_envelope(),
303            DeadLetterReason::MaxRetriesExceeded { retries: 3, max: 3 },
304            KernelId::new("actor_a"),
305            3,
306        );
307
308        assert_eq!(dlq.len(), 1);
309        let letters = dlq.browse(10);
310        assert_eq!(letters.len(), 1);
311        assert_eq!(letters[0].destination, KernelId::new("actor_a"));
312    }
313
314    #[test]
315    fn test_dlq_replay() {
316        let mut dlq = DeadLetterQueue::new(DlqConfig {
317            log_entries: false,
318            ..Default::default()
319        });
320
321        dlq.enqueue(
322            test_envelope(),
323            DeadLetterReason::QueueFull {
324                queue_capacity: 256,
325            },
326            KernelId::new("a"),
327            1,
328        );
329        dlq.enqueue(
330            test_envelope(),
331            DeadLetterReason::QueueFull {
332                queue_capacity: 256,
333            },
334            KernelId::new("b"),
335            1,
336        );
337        dlq.enqueue(
338            test_envelope(),
339            DeadLetterReason::QueueFull {
340                queue_capacity: 256,
341            },
342            KernelId::new("a"),
343            1,
344        );
345
346        // Replay only actor "a" messages
347        let replayed = dlq.replay_for(&KernelId::new("a"));
348        assert_eq!(replayed.len(), 2);
349        assert_eq!(dlq.len(), 1); // Only "b" remains
350    }
351
352    #[test]
353    fn test_dlq_max_size() {
354        let mut dlq = DeadLetterQueue::new(DlqConfig {
355            max_size: 3,
356            log_entries: false,
357            ..Default::default()
358        });
359
360        for i in 0..5 {
361            dlq.enqueue(
362                test_envelope(),
363                DeadLetterReason::ActorNotFound {
364                    actor_name: format!("a{}", i),
365                },
366                KernelId::new(&format!("k{}", i)),
367                1,
368            );
369        }
370
371        assert_eq!(dlq.len(), 3); // Oldest 2 evicted
372        let metrics = dlq.metrics();
373        assert_eq!(metrics.total_received, 5);
374        assert_eq!(metrics.total_expired, 2);
375    }
376
377    #[test]
378    fn test_dlq_replay_with_filter() {
379        let mut dlq = DeadLetterQueue::new(DlqConfig {
380            log_entries: false,
381            ..Default::default()
382        });
383
384        dlq.enqueue(
385            test_envelope(),
386            DeadLetterReason::MaxRetriesExceeded { retries: 3, max: 3 },
387            KernelId::new("a"),
388            3,
389        );
390        dlq.enqueue(
391            test_envelope(),
392            DeadLetterReason::QueueFull {
393                queue_capacity: 256,
394            },
395            KernelId::new("a"),
396            1,
397        );
398
399        // Replay only queue-full messages
400        let replayed = dlq.replay(|l| matches!(l.reason, DeadLetterReason::QueueFull { .. }));
401        assert_eq!(replayed.len(), 1);
402        assert_eq!(dlq.len(), 1);
403    }
404
405    #[test]
406    fn test_dlq_metrics() {
407        let mut dlq = DeadLetterQueue::new(DlqConfig {
408            log_entries: false,
409            ..Default::default()
410        });
411
412        dlq.enqueue(
413            test_envelope(),
414            DeadLetterReason::Rejected {
415                reason: "test".into(),
416            },
417            KernelId::new("a"),
418            1,
419        );
420        dlq.enqueue(
421            test_envelope(),
422            DeadLetterReason::Rejected {
423                reason: "test".into(),
424            },
425            KernelId::new("b"),
426            1,
427        );
428        dlq.replay_for(&KernelId::new("a"));
429
430        let m = dlq.metrics();
431        assert_eq!(m.total_received, 2);
432        assert_eq!(m.total_replayed, 1);
433        assert_eq!(m.depth, 1);
434    }
435
436    #[test]
437    fn test_dead_letter_reason_display() {
438        let r = DeadLetterReason::MaxRetriesExceeded { retries: 3, max: 3 };
439        assert!(format!("{}", r).contains("3/3"));
440
441        let r = DeadLetterReason::ActorNotFound {
442            actor_name: "test".into(),
443        };
444        assert!(format!("{}", r).contains("test"));
445    }
446}