use std::collections::VecDeque;
use std::time::{Duration, Instant};
use crate::message::MessageEnvelope;
use crate::runtime::KernelId;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DeadLetterReason {
MaxRetriesExceeded {
retries: u32,
max: u32,
},
ActorNotFound {
actor_name: String,
},
QueueFull {
queue_capacity: usize,
},
TtlExpired {
age: Duration,
},
ActorDestroyed {
actor_id: String,
},
Rejected {
reason: String,
},
}
impl std::fmt::Display for DeadLetterReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MaxRetriesExceeded { retries, max } => {
write!(f, "max retries exceeded ({}/{})", retries, max)
}
Self::ActorNotFound { actor_name } => write!(f, "actor not found: {}", actor_name),
Self::QueueFull { queue_capacity } => {
write!(f, "queue full (capacity {})", queue_capacity)
}
Self::TtlExpired { age } => write!(f, "TTL expired (age {:?})", age),
Self::ActorDestroyed { actor_id } => write!(f, "actor destroyed: {}", actor_id),
Self::Rejected { reason } => write!(f, "rejected: {}", reason),
}
}
}
#[derive(Debug, Clone)]
pub struct DeadLetter {
pub envelope: MessageEnvelope,
pub reason: DeadLetterReason,
pub destination: KernelId,
pub arrived_at: Instant,
pub attempts: u32,
pub sequence: u64,
}
#[derive(Debug, Clone)]
pub struct DlqConfig {
pub max_size: usize,
pub max_age: Duration,
pub max_retries: u32,
pub log_entries: bool,
}
impl Default for DlqConfig {
fn default() -> Self {
Self {
max_size: 10_000,
max_age: Duration::from_secs(3600), max_retries: 3,
log_entries: true,
}
}
}
pub struct DeadLetterQueue {
letters: VecDeque<DeadLetter>,
config: DlqConfig,
next_sequence: u64,
total_received: u64,
total_replayed: u64,
total_expired: u64,
}
impl DeadLetterQueue {
pub fn new(config: DlqConfig) -> Self {
Self {
letters: VecDeque::new(),
config,
next_sequence: 0,
total_received: 0,
total_replayed: 0,
total_expired: 0,
}
}
pub fn enqueue(
&mut self,
envelope: MessageEnvelope,
reason: DeadLetterReason,
destination: KernelId,
attempts: u32,
) {
let seq = self.next_sequence;
self.next_sequence += 1;
self.total_received += 1;
if self.config.log_entries {
tracing::warn!(
sequence = seq,
destination = %destination,
reason = %reason,
attempts = attempts,
"Message routed to dead letter queue"
);
}
let letter = DeadLetter {
envelope,
reason,
destination,
arrived_at: Instant::now(),
attempts,
sequence: seq,
};
self.letters.push_back(letter);
while self.letters.len() > self.config.max_size {
self.letters.pop_front();
self.total_expired += 1;
}
}
pub fn replay<F>(&mut self, filter: F) -> Vec<DeadLetter>
where
F: Fn(&DeadLetter) -> bool,
{
let mut replayed = Vec::new();
let mut remaining = VecDeque::new();
for letter in self.letters.drain(..) {
if filter(&letter) {
self.total_replayed += 1;
replayed.push(letter);
} else {
remaining.push_back(letter);
}
}
self.letters = remaining;
replayed
}
pub fn replay_for(&mut self, destination: &KernelId) -> Vec<DeadLetter> {
let dest = destination.clone();
self.replay(move |letter| letter.destination == dest)
}
pub fn expire_old(&mut self) -> u64 {
let max_age = self.config.max_age;
let before = self.letters.len();
self.letters
.retain(|letter| letter.arrived_at.elapsed() < max_age);
let expired = (before - self.letters.len()) as u64;
self.total_expired += expired;
expired
}
pub fn browse(&self, limit: usize) -> Vec<&DeadLetter> {
self.letters.iter().take(limit).collect()
}
pub fn len(&self) -> usize {
self.letters.len()
}
pub fn is_empty(&self) -> bool {
self.letters.is_empty()
}
pub fn clear(&mut self) {
self.letters.clear();
}
pub fn metrics(&self) -> DlqMetrics {
let oldest_age = self
.letters
.front()
.map(|l| l.arrived_at.elapsed())
.unwrap_or_default();
DlqMetrics {
depth: self.letters.len() as u64,
total_received: self.total_received,
total_replayed: self.total_replayed,
total_expired: self.total_expired,
oldest_age,
}
}
pub fn config(&self) -> &DlqConfig {
&self.config
}
}
#[derive(Debug, Clone)]
pub struct DlqMetrics {
pub depth: u64,
pub total_received: u64,
pub total_replayed: u64,
pub total_expired: u64,
pub oldest_age: Duration,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hlc::HlcTimestamp;
use crate::message::MessageHeader;
fn test_envelope() -> MessageEnvelope {
MessageEnvelope {
header: MessageHeader::new(1, 0, 1, 64, HlcTimestamp::now(1)),
payload: vec![42u8; 64],
provenance: None,
tenant_id: 0,
audit_tag: crate::k2k::audit_tag::AuditTag::unspecified(),
}
}
#[test]
fn test_dlq_enqueue_and_browse() {
let mut dlq = DeadLetterQueue::new(DlqConfig {
log_entries: false,
..Default::default()
});
dlq.enqueue(
test_envelope(),
DeadLetterReason::MaxRetriesExceeded { retries: 3, max: 3 },
KernelId::new("actor_a"),
3,
);
assert_eq!(dlq.len(), 1);
let letters = dlq.browse(10);
assert_eq!(letters.len(), 1);
assert_eq!(letters[0].destination, KernelId::new("actor_a"));
}
#[test]
fn test_dlq_replay() {
let mut dlq = DeadLetterQueue::new(DlqConfig {
log_entries: false,
..Default::default()
});
dlq.enqueue(
test_envelope(),
DeadLetterReason::QueueFull {
queue_capacity: 256,
},
KernelId::new("a"),
1,
);
dlq.enqueue(
test_envelope(),
DeadLetterReason::QueueFull {
queue_capacity: 256,
},
KernelId::new("b"),
1,
);
dlq.enqueue(
test_envelope(),
DeadLetterReason::QueueFull {
queue_capacity: 256,
},
KernelId::new("a"),
1,
);
let replayed = dlq.replay_for(&KernelId::new("a"));
assert_eq!(replayed.len(), 2);
assert_eq!(dlq.len(), 1); }
#[test]
fn test_dlq_max_size() {
let mut dlq = DeadLetterQueue::new(DlqConfig {
max_size: 3,
log_entries: false,
..Default::default()
});
for i in 0..5 {
dlq.enqueue(
test_envelope(),
DeadLetterReason::ActorNotFound {
actor_name: format!("a{}", i),
},
KernelId::new(&format!("k{}", i)),
1,
);
}
assert_eq!(dlq.len(), 3); let metrics = dlq.metrics();
assert_eq!(metrics.total_received, 5);
assert_eq!(metrics.total_expired, 2);
}
#[test]
fn test_dlq_replay_with_filter() {
let mut dlq = DeadLetterQueue::new(DlqConfig {
log_entries: false,
..Default::default()
});
dlq.enqueue(
test_envelope(),
DeadLetterReason::MaxRetriesExceeded { retries: 3, max: 3 },
KernelId::new("a"),
3,
);
dlq.enqueue(
test_envelope(),
DeadLetterReason::QueueFull {
queue_capacity: 256,
},
KernelId::new("a"),
1,
);
let replayed = dlq.replay(|l| matches!(l.reason, DeadLetterReason::QueueFull { .. }));
assert_eq!(replayed.len(), 1);
assert_eq!(dlq.len(), 1);
}
#[test]
fn test_dlq_metrics() {
let mut dlq = DeadLetterQueue::new(DlqConfig {
log_entries: false,
..Default::default()
});
dlq.enqueue(
test_envelope(),
DeadLetterReason::Rejected {
reason: "test".into(),
},
KernelId::new("a"),
1,
);
dlq.enqueue(
test_envelope(),
DeadLetterReason::Rejected {
reason: "test".into(),
},
KernelId::new("b"),
1,
);
dlq.replay_for(&KernelId::new("a"));
let m = dlq.metrics();
assert_eq!(m.total_received, 2);
assert_eq!(m.total_replayed, 1);
assert_eq!(m.depth, 1);
}
#[test]
fn test_dead_letter_reason_display() {
let r = DeadLetterReason::MaxRetriesExceeded { retries: 3, max: 3 };
assert!(format!("{}", r).contains("3/3"));
let r = DeadLetterReason::ActorNotFound {
actor_name: "test".into(),
};
assert!(format!("{}", r).contains("test"));
}
}