#![cfg(native)]
use super::error::SignalError;
use super::signal::Signal;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum RetryStrategy {
Immediate,
FixedDelay {
delay: Duration,
},
ExponentialBackoff {
initial_delay: Duration,
max_delay: Duration,
},
LinearBackoff {
base_delay: Duration,
},
}
impl RetryStrategy {
fn calculate_delay(&self, attempt: u32) -> Duration {
match self {
RetryStrategy::Immediate => Duration::from_millis(0),
RetryStrategy::FixedDelay { delay } => *delay,
RetryStrategy::ExponentialBackoff {
initial_delay,
max_delay,
} => {
let clamped_attempt = attempt.min(30);
let multiplier = 2_f64.powi(clamped_attempt as i32);
let exp_delay = initial_delay.mul_f64(multiplier);
if exp_delay.as_secs_f64().is_infinite() {
*max_delay
} else {
exp_delay.min(*max_delay)
}
}
RetryStrategy::LinearBackoff { base_delay } => base_delay.mul_f32(attempt as f32 + 1.0),
}
}
}
#[derive(Debug, Clone)]
pub struct DlqConfig {
max_retries: u32,
retry_strategy: RetryStrategy,
max_queue_size: usize,
persist_failed: bool,
}
impl DlqConfig {
pub fn new() -> Self {
Self {
max_retries: 3,
retry_strategy: RetryStrategy::ExponentialBackoff {
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(60),
},
max_queue_size: 10000,
persist_failed: false,
}
}
pub fn with_max_retries(mut self, max: u32) -> Self {
self.max_retries = max;
self
}
pub fn with_retry_strategy(mut self, strategy: RetryStrategy) -> Self {
self.retry_strategy = strategy;
self
}
pub fn with_max_queue_size(mut self, size: usize) -> Self {
self.max_queue_size = size;
self
}
pub fn with_persist_failed(mut self, persist: bool) -> Self {
self.persist_failed = persist;
self
}
pub fn max_retries(&self) -> u32 {
self.max_retries
}
pub fn retry_strategy(&self) -> RetryStrategy {
self.retry_strategy
}
pub fn max_queue_size(&self) -> usize {
self.max_queue_size
}
pub fn persist_failed(&self) -> bool {
self.persist_failed
}
}
impl Default for DlqConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct DlqMessage<T> {
pub payload: T,
pub retry_count: u32,
pub first_queued_at: SystemTime,
pub last_attempt_at: SystemTime,
pub last_error: String,
pub next_retry_at: Instant,
}
impl<T> DlqMessage<T> {
fn new(payload: T, error: String, next_retry_at: Instant) -> Self {
let now = SystemTime::now();
Self {
payload,
retry_count: 0,
first_queued_at: now,
last_attempt_at: now,
last_error: error,
next_retry_at,
}
}
fn mark_retry(&mut self, error: String, next_retry_at: Instant) {
self.retry_count += 1;
self.last_attempt_at = SystemTime::now();
self.last_error = error;
self.next_retry_at = next_retry_at;
}
}
#[derive(Debug, Clone, Default)]
pub struct DlqStats {
queue_size: usize,
total_failed: u64,
total_recovered: u64,
total_retries: u64,
}
impl DlqStats {
pub fn new() -> Self {
Self::default()
}
pub fn queue_size(&self) -> usize {
self.queue_size
}
pub fn total_failed(&self) -> u64 {
self.total_failed
}
pub fn total_recovered(&self) -> u64 {
self.total_recovered
}
pub fn total_retries(&self) -> u64 {
self.total_retries
}
pub fn recovery_rate(&self) -> f64 {
let total = self.total_recovered + self.total_failed;
if total == 0 {
return 100.0;
}
(self.total_recovered as f64 / total as f64) * 100.0
}
}
pub struct DeadLetterQueue<T: Send + Sync + 'static> {
signal: Signal<T>,
config: DlqConfig,
queue: Arc<Mutex<VecDeque<DlqMessage<T>>>>,
stats: Arc<Mutex<DlqStats>>,
}
impl<T: Send + Sync + Clone + 'static> DeadLetterQueue<T> {
pub fn new(signal: Signal<T>, config: DlqConfig) -> Self {
let dlq = Self {
signal,
config,
queue: Arc::new(Mutex::new(VecDeque::new())),
stats: Arc::new(Mutex::new(DlqStats::new())),
};
dlq.start_retry_processor();
dlq
}
pub async fn send(&self, instance: T) -> Result<(), SignalError> {
let result = self.signal.send(instance.clone()).await;
if let Err(e) = result {
self.enqueue(instance, e.message.clone());
Ok(()) } else {
Ok(())
}
}
fn enqueue(&self, payload: T, error: String) {
let queue_len = {
let mut queue = self.queue.lock();
if queue.len() >= self.config.max_queue_size {
queue.pop_front();
}
let delay = self.config.retry_strategy.calculate_delay(0);
let next_retry_at = Instant::now() + delay;
let message = DlqMessage::new(payload, error, next_retry_at);
queue.push_back(message);
queue.len()
};
self.stats.lock().queue_size = queue_len;
}
pub fn stats(&self) -> DlqStats {
self.stats.lock().clone()
}
pub fn get_messages(&self) -> Vec<DlqMessage<T>> {
self.queue.lock().iter().cloned().collect()
}
pub fn clear(&self) {
self.queue.lock().clear();
self.stats.lock().queue_size = 0;
}
fn start_retry_processor(&self) {
let queue = Arc::clone(&self.queue);
let signal = self.signal.clone();
let config = self.config.clone();
let stats = Arc::clone(&self.stats);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(100));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
let now = Instant::now();
let mut message_to_retry = None;
{
let mut queue_guard = queue.lock();
if let Some(msg) = queue_guard.front()
&& msg.next_retry_at <= now
{
message_to_retry = queue_guard.pop_front();
}
}
if let Some(mut msg) = message_to_retry {
stats.lock().total_retries += 1;
let result = signal.send(msg.payload.clone()).await;
match result {
Ok(_) => {
let queue_len = queue.lock().len();
let mut stats_guard = stats.lock();
stats_guard.total_recovered += 1;
stats_guard.queue_size = queue_len;
}
Err(e) => {
if msg.retry_count >= config.max_retries {
let queue_len = queue.lock().len();
let mut stats_guard = stats.lock();
stats_guard.total_failed += 1;
stats_guard.queue_size = queue_len;
} else {
let delay =
config.retry_strategy.calculate_delay(msg.retry_count + 1);
let next_retry_at = Instant::now() + delay;
msg.mark_retry(e.message, next_retry_at);
queue.lock().push_back(msg);
}
}
}
}
}
});
}
pub fn signal(&self) -> &Signal<T> {
&self.signal
}
}
impl<T: Send + Sync + Clone + 'static> Clone for DeadLetterQueue<T> {
fn clone(&self) -> Self {
Self {
signal: self.signal.clone(),
config: self.config.clone(),
queue: Arc::clone(&self.queue),
stats: Arc::clone(&self.stats),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::signals::SignalName;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
async fn poll_until<F, Fut>(
timeout: std::time::Duration,
interval: std::time::Duration,
mut condition: F,
) -> Result<(), String>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = bool>,
{
let start = std::time::Instant::now();
while start.elapsed() < timeout {
if condition().await {
return Ok(());
}
tokio::time::sleep(interval).await;
}
Err(format!("Timeout after {:?} waiting for condition", timeout))
}
#[derive(Debug, Clone, PartialEq)]
struct TestEvent {
id: i32,
message: String,
}
#[test]
fn test_dlq_config() {
let config = DlqConfig::new()
.with_max_retries(5)
.with_retry_strategy(RetryStrategy::FixedDelay {
delay: Duration::from_secs(1),
})
.with_max_queue_size(500)
.with_persist_failed(true);
assert_eq!(config.max_retries(), 5);
assert_eq!(
config.retry_strategy(),
RetryStrategy::FixedDelay {
delay: Duration::from_secs(1)
}
);
assert_eq!(config.max_queue_size(), 500);
assert!(config.persist_failed());
}
#[test]
fn test_retry_strategy_delays() {
let immediate = RetryStrategy::Immediate;
assert_eq!(immediate.calculate_delay(0), Duration::from_millis(0));
assert_eq!(immediate.calculate_delay(5), Duration::from_millis(0));
let fixed = RetryStrategy::FixedDelay {
delay: Duration::from_secs(1),
};
assert_eq!(fixed.calculate_delay(0), Duration::from_secs(1));
assert_eq!(fixed.calculate_delay(10), Duration::from_secs(1));
let exponential = RetryStrategy::ExponentialBackoff {
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
};
assert_eq!(exponential.calculate_delay(0), Duration::from_millis(100));
assert_eq!(exponential.calculate_delay(1), Duration::from_millis(200));
assert_eq!(exponential.calculate_delay(2), Duration::from_millis(400));
assert!(exponential.calculate_delay(10) <= Duration::from_secs(10));
}
#[tokio::test]
async fn test_dlq_basic() {
let signal = Signal::<TestEvent>::new(SignalName::custom("test_dlq"));
let config =
DlqConfig::new()
.with_max_retries(2)
.with_retry_strategy(RetryStrategy::FixedDelay {
delay: Duration::from_millis(100),
});
let dlq = DeadLetterQueue::new(signal.clone(), config);
let should_fail = Arc::new(AtomicBool::new(true));
let attempt_count = Arc::new(AtomicUsize::new(0));
let fail_clone = Arc::clone(&should_fail);
let count_clone = Arc::clone(&attempt_count);
signal.connect(move |_event| {
let fail = Arc::clone(&fail_clone);
let count = Arc::clone(&count_clone);
async move {
count.fetch_add(1, Ordering::SeqCst);
if fail.load(Ordering::SeqCst) {
Err(SignalError::new("Test failure"))
} else {
Ok(())
}
}
});
dlq.send(TestEvent {
id: 1,
message: "test".to_string(),
})
.await
.unwrap();
poll_until(
Duration::from_millis(100),
Duration::from_millis(10),
|| async { attempt_count.load(Ordering::SeqCst) >= 1 },
)
.await
.expect("Initial attempt should complete");
should_fail.store(false, Ordering::SeqCst);
poll_until(
Duration::from_millis(300),
Duration::from_millis(20),
|| async { attempt_count.load(Ordering::SeqCst) >= 2 },
)
.await
.expect("Retry should complete within 300ms");
let stats = dlq.stats();
assert_eq!(stats.total_recovered(), 1);
}
#[tokio::test]
async fn test_dlq_max_retries() {
let signal = Signal::<TestEvent>::new(SignalName::custom("test_max_retries"));
let config = DlqConfig::new()
.with_max_retries(2)
.with_retry_strategy(RetryStrategy::Immediate);
let dlq = DeadLetterQueue::new(signal.clone(), config);
let attempt_count = Arc::new(AtomicUsize::new(0));
let count_clone = Arc::clone(&attempt_count);
signal.connect(move |_event| {
let count = Arc::clone(&count_clone);
async move {
count.fetch_add(1, Ordering::SeqCst);
Err(SignalError::new("Always fails"))
}
});
dlq.send(TestEvent {
id: 1,
message: "test".to_string(),
})
.await
.unwrap();
poll_until(
Duration::from_secs(1),
Duration::from_millis(50),
|| async { attempt_count.load(Ordering::SeqCst) >= 3 },
)
.await
.expect("All retries should complete within 1000ms");
poll_until(
Duration::from_secs(1),
Duration::from_millis(50),
|| async {
let stats = dlq.stats();
stats.total_failed() == 1
},
)
.await
.expect("Stats should be updated within 1000ms");
let stats = dlq.stats();
assert_eq!(stats.total_failed(), 1);
assert_eq!(stats.total_recovered(), 0);
}
#[tokio::test]
async fn test_dlq_queue_size_limit() {
let signal = Signal::<TestEvent>::new(SignalName::custom("test_queue_limit"));
let config = DlqConfig::new()
.with_max_queue_size(3)
.with_max_retries(10) .with_retry_strategy(RetryStrategy::FixedDelay {
delay: Duration::from_secs(100), });
let dlq = DeadLetterQueue::new(signal.clone(), config);
signal.connect(|_event| async move { Err(SignalError::new("Always fails")) });
for i in 0..5 {
dlq.send(TestEvent {
id: i,
message: "test".to_string(),
})
.await
.unwrap();
}
let stats = dlq.stats();
assert!(stats.queue_size() <= 3);
}
#[tokio::test]
async fn test_dlq_clear() {
let signal = Signal::<TestEvent>::new(SignalName::custom("test_clear"));
let config = DlqConfig::new().with_retry_strategy(RetryStrategy::FixedDelay {
delay: Duration::from_secs(100),
});
let dlq = DeadLetterQueue::new(signal.clone(), config);
signal.connect(|_event| async move { Err(SignalError::new("Fail")) });
for i in 0..3 {
dlq.send(TestEvent {
id: i,
message: "test".to_string(),
})
.await
.unwrap();
}
assert!(dlq.stats().queue_size() > 0);
dlq.clear();
assert_eq!(dlq.stats().queue_size(), 0);
}
}