Skip to main content

azoth/
dlq_replayer.rs

1//! Automatic Dead Letter Queue Replay
2//!
3//! Provides automatic retry of failed events with configurable backoff strategies
4//! and replay priorities.
5//!
6//! # Example
7//!
8//! ```no_run
9//! use azoth::prelude::*;
10//! use azoth::dlq_replayer::{DlqReplayer, DlqReplayConfig, BackoffStrategy, ReplayPriority};
11//! use std::time::Duration;
12//! use std::sync::Arc;
13//!
14//! # async fn example() -> Result<()> {
15//! let db = Arc::new(AzothDb::open("./data")?);
16//! let conn = Arc::new(
17//!     rusqlite::Connection::open("./data/projection.db")
18//!         .map_err(|e| AzothError::Projection(e.to_string()))?
19//! );
20//! let dlq = Arc::new(DeadLetterQueue::new(conn.clone())?);
21//! let registry = Arc::new(EventHandlerRegistry::new());
22//!
23//! let config = DlqReplayConfig {
24//!     enabled: true,
25//!     check_interval: Duration::from_secs(60),
26//!     max_retries: 5,
27//!     backoff: BackoffStrategy::Exponential {
28//!         initial: Duration::from_secs(10),
29//!         max: Duration::from_secs(3600),
30//!     },
31//!     min_age: Duration::from_secs(5),
32//!     batch_size: 100,
33//!     priority: ReplayPriority::ByRetryCount,
34//!     stop_on_consecutive_failures: Some(10),
35//! };
36//!
37//! let replayer = Arc::new(DlqReplayer::new(dlq, registry, config));
38//!
39//! // Run replayer (in a real application, run this in a dedicated thread)
40//! // Note: replayer.run() must be called from a thread that owns the Connection
41//! // since rusqlite::Connection is not Send
42//! # Ok(())
43//! # }
44//! ```
45
46use crate::{AzothError, DeadLetterQueue, EventHandlerRegistry, FailedEvent, Result};
47use rusqlite::params;
48use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
49use std::sync::Arc;
50use std::time::{Duration, SystemTime};
51
52/// Automatic DLQ replay configuration
53#[derive(Clone, Debug)]
54pub struct DlqReplayConfig {
55    /// Enable automatic replay
56    pub enabled: bool,
57
58    /// Check interval - how often to poll for failed events
59    pub check_interval: Duration,
60
61    /// Max retry attempts before permanent failure
62    pub max_retries: usize,
63
64    /// Backoff strategy
65    pub backoff: BackoffStrategy,
66
67    /// Age threshold - only retry events older than this
68    /// This prevents immediate retry of events that just failed
69    pub min_age: Duration,
70
71    /// Max events to replay per batch
72    pub batch_size: usize,
73
74    /// Replay priority (FIFO, LIFO, by error type)
75    pub priority: ReplayPriority,
76
77    /// Stop conditions - stop after N consecutive failures
78    pub stop_on_consecutive_failures: Option<usize>,
79}
80
81impl Default for DlqReplayConfig {
82    fn default() -> Self {
83        Self {
84            enabled: true,
85            check_interval: Duration::from_secs(60),
86            max_retries: 5,
87            backoff: BackoffStrategy::Exponential {
88                initial: Duration::from_secs(10),
89                max: Duration::from_secs(3600),
90            },
91            min_age: Duration::from_secs(5),
92            batch_size: 100,
93            priority: ReplayPriority::ByRetryCount,
94            stop_on_consecutive_failures: Some(10),
95        }
96    }
97}
98
99/// Backoff strategy for retry delays
100#[derive(Clone, Debug)]
101pub enum BackoffStrategy {
102    /// Fixed delay between retries
103    Fixed(Duration),
104
105    /// Exponential: delay * 2^attempt (capped at max)
106    Exponential { initial: Duration, max: Duration },
107
108    /// Fibonacci: delay follows fibonacci sequence (capped at max)
109    Fibonacci { initial: Duration, max: Duration },
110}
111
112impl BackoffStrategy {
113    /// Calculate delay for a given retry attempt
114    pub fn calculate(&self, attempt: usize) -> Duration {
115        match self {
116            BackoffStrategy::Fixed(delay) => *delay,
117            BackoffStrategy::Exponential { initial, max } => {
118                let multiplier = 2u64.saturating_pow(attempt as u32);
119                let delay = initial.saturating_mul(multiplier as u32);
120                delay.min(*max)
121            }
122            BackoffStrategy::Fibonacci { initial, max } => {
123                let fib = Self::fibonacci(attempt);
124                let delay = initial.saturating_mul(fib as u32);
125                delay.min(*max)
126            }
127        }
128    }
129
130    fn fibonacci(n: usize) -> u64 {
131        match n {
132            0 => 1,
133            1 => 1,
134            _ => {
135                let mut a = 1u64;
136                let mut b = 1u64;
137                for _ in 2..=n {
138                    let c = a.saturating_add(b);
139                    a = b;
140                    b = c;
141                }
142                b
143            }
144        }
145    }
146}
147
148/// Replay priority determines order of event replay
149#[derive(Clone, Debug)]
150pub enum ReplayPriority {
151    /// First In First Out (oldest first)
152    FIFO,
153
154    /// Last In First Out (newest first)
155    LIFO,
156
157    /// By retry count (fewer retries first - give newer failures priority)
158    ByRetryCount,
159
160    /// By error type (specific errors first)
161    ByErrorType(Vec<String>),
162}
163
164impl ReplayPriority {
165    /// Generate the ORDER BY clause for this priority.
166    ///
167    /// For `ByErrorType`, error type strings are validated against a strict
168    /// allowlist (`[a-zA-Z0-9_ -.]`) to prevent SQL injection via CASE/LIKE expressions.
169    fn order_by_clause(&self) -> Result<String> {
170        match self {
171            ReplayPriority::FIFO => Ok("failed_at ASC".to_string()),
172            ReplayPriority::LIFO => Ok("failed_at DESC".to_string()),
173            ReplayPriority::ByRetryCount => Ok("retry_count ASC, failed_at ASC".to_string()),
174            ReplayPriority::ByErrorType(types) => {
175                // Validate each error type to prevent SQL injection.
176                // Only alphanumeric, underscore, dash, dot, and space are allowed.
177                for t in types {
178                    if t.is_empty() || t.len() > 128 {
179                        return Err(AzothError::Config(format!(
180                            "ByErrorType string must be 1-128 characters, got length {}",
181                            t.len()
182                        )));
183                    }
184                    if !t.chars().all(|c| {
185                        c.is_alphanumeric() || c == '_' || c == '-' || c == '.' || c == ' '
186                    }) {
187                        return Err(AzothError::Config(format!(
188                            "ByErrorType string '{}' contains disallowed characters. \
189                             Only alphanumeric, underscore, dash, dot, and space are permitted.",
190                            t
191                        )));
192                    }
193                }
194
195                // Safe to interpolate after validation
196                let cases = types
197                    .iter()
198                    .enumerate()
199                    .map(|(i, t)| format!("WHEN error_message LIKE '%{}%' THEN {}", t, i))
200                    .collect::<Vec<_>>()
201                    .join(" ");
202                Ok(format!("CASE {} ELSE 999 END, failed_at ASC", cases))
203            }
204        }
205    }
206}
207
208/// DLQ replay metrics
209#[derive(Default)]
210pub struct DlqMetrics {
211    /// Total successful replays
212    pub successes: AtomicU64,
213
214    /// Total failed replays
215    pub failures: AtomicU64,
216
217    /// Events moved to permanent failure
218    pub permanent_failures: AtomicU64,
219
220    /// Last check timestamp
221    pub last_check: AtomicU64,
222}
223
224impl DlqMetrics {
225    fn record_success(&self, _retry_count: i32) {
226        self.successes.fetch_add(1, Ordering::Relaxed);
227    }
228
229    fn record_failure(&self, _retry_count: i32) {
230        self.failures.fetch_add(1, Ordering::Relaxed);
231    }
232
233    fn record_permanent_failure(&self) {
234        self.permanent_failures.fetch_add(1, Ordering::Relaxed);
235    }
236
237    fn update_last_check(&self) {
238        let now = SystemTime::now()
239            .duration_since(SystemTime::UNIX_EPOCH)
240            .unwrap()
241            .as_secs();
242        self.last_check.store(now, Ordering::Relaxed);
243    }
244
245    /// Get metrics snapshot
246    pub fn snapshot(&self) -> DlqMetricsSnapshot {
247        DlqMetricsSnapshot {
248            successes: self.successes.load(Ordering::Relaxed),
249            failures: self.failures.load(Ordering::Relaxed),
250            permanent_failures: self.permanent_failures.load(Ordering::Relaxed),
251            last_check: self.last_check.load(Ordering::Relaxed),
252        }
253    }
254}
255
256/// Snapshot of DLQ metrics
257#[derive(Debug, Clone)]
258pub struct DlqMetricsSnapshot {
259    pub successes: u64,
260    pub failures: u64,
261    pub permanent_failures: u64,
262    pub last_check: u64,
263}
264
265/// Automatic DLQ replayer
266pub struct DlqReplayer {
267    dlq: Arc<DeadLetterQueue>,
268    registry: Arc<EventHandlerRegistry>,
269    config: DlqReplayConfig,
270    shutdown: Arc<AtomicBool>,
271    metrics: Arc<DlqMetrics>,
272}
273
274impl DlqReplayer {
275    /// Create a new DLQ replayer
276    pub fn new(
277        dlq: Arc<DeadLetterQueue>,
278        registry: Arc<EventHandlerRegistry>,
279        config: DlqReplayConfig,
280    ) -> Self {
281        Self {
282            dlq,
283            registry,
284            config,
285            shutdown: Arc::new(AtomicBool::new(false)),
286            metrics: Arc::new(DlqMetrics::default()),
287        }
288    }
289
290    /// Get metrics
291    pub fn metrics(&self) -> &Arc<DlqMetrics> {
292        &self.metrics
293    }
294
295    /// Signal shutdown
296    pub fn shutdown(&self) {
297        self.shutdown.store(true, Ordering::Relaxed);
298    }
299
300    /// Check if replayer is enabled
301    pub fn is_enabled(&self) -> bool {
302        self.config.enabled
303    }
304
305    /// Start automatic replay loop
306    pub async fn run(self: Arc<Self>) -> Result<()> {
307        if !self.config.enabled {
308            tracing::info!("DLQ replayer is disabled");
309            return Ok(());
310        }
311
312        tracing::info!(
313            "DLQ replayer started (check interval: {:?})",
314            self.config.check_interval
315        );
316
317        while !self.shutdown.load(Ordering::Relaxed) {
318            match self.run_replay_cycle().await {
319                Ok(_) => {}
320                Err(e) => {
321                    tracing::error!("DLQ replay cycle error: {}", e);
322                }
323            }
324
325            self.metrics.update_last_check();
326            tokio::time::sleep(self.config.check_interval).await;
327        }
328
329        tracing::info!("DLQ replayer shutdown");
330        Ok(())
331    }
332
333    async fn run_replay_cycle(&self) -> Result<()> {
334        // Get eligible events (respecting min_age, max_retries)
335        let failed_events = self.get_eligible_events()?;
336
337        if failed_events.is_empty() {
338            return Ok(());
339        }
340
341        tracing::debug!("Found {} eligible events for replay", failed_events.len());
342
343        let mut consecutive_failures = 0;
344
345        for event in failed_events {
346            // Calculate backoff delay
347            let delay = self.config.backoff.calculate(event.retry_count as usize);
348
349            // Check if enough time has passed since last retry
350            if !self.should_retry_now(&event, delay)? {
351                continue;
352            }
353
354            // Attempt replay
355            tracing::debug!(
356                "Replaying event {} (attempt {}/{})",
357                event.event_id,
358                event.retry_count + 1,
359                self.config.max_retries
360            );
361
362            match self.replay_event(&event).await {
363                Ok(_) => {
364                    // Success! Remove from DLQ
365                    self.dlq.remove(event.id)?;
366                    self.metrics.record_success(event.retry_count);
367                    consecutive_failures = 0;
368
369                    tracing::info!(
370                        "Successfully replayed event {} after {} retries",
371                        event.event_id,
372                        event.retry_count
373                    );
374                }
375                Err(e) => {
376                    // Failure - update retry count
377                    self.dlq.mark_retry(event.id)?;
378                    self.metrics.record_failure(event.retry_count);
379                    consecutive_failures += 1;
380
381                    tracing::warn!(
382                        "Failed to replay event {}: {} (retry {}/{})",
383                        event.event_id,
384                        e,
385                        event.retry_count + 1,
386                        self.config.max_retries
387                    );
388
389                    // Check if we should stop
390                    if let Some(max) = self.config.stop_on_consecutive_failures {
391                        if consecutive_failures >= max {
392                            tracing::warn!(
393                                "Stopping DLQ replay after {} consecutive failures",
394                                consecutive_failures
395                            );
396                            return Ok(());
397                        }
398                    }
399
400                    // Check if event exceeded max retries
401                    if event.retry_count + 1 >= self.config.max_retries as i32 {
402                        tracing::error!(
403                            "Event {} exceeded max retries ({}), marking as permanently failed",
404                            event.event_id,
405                            self.config.max_retries
406                        );
407                        self.move_to_permanent_failure(&event)?;
408                    }
409                }
410            }
411        }
412
413        Ok(())
414    }
415
416    fn get_eligible_events(&self) -> Result<Vec<FailedEvent>> {
417        let order_by = self.config.priority.order_by_clause()?;
418        let min_age_secs = self.config.min_age.as_secs();
419
420        let query = format!(
421            "SELECT id, event_id, event_bytes, error_message, failed_at, retry_count
422             FROM dead_letter_queue
423             WHERE retry_count < ?
424             AND datetime(COALESCE(last_retry_at, failed_at)) <= datetime('now', '-{} seconds')
425             ORDER BY {}
426             LIMIT ?",
427            min_age_secs, order_by
428        );
429
430        // Get connection from DLQ
431        let conn = self.dlq.connection();
432        let mut stmt = conn
433            .prepare(&query)
434            .map_err(|e: rusqlite::Error| AzothError::Projection(e.to_string()))?;
435
436        let events = stmt
437            .query_map(
438                params![self.config.max_retries as i32, self.config.batch_size],
439                |row: &rusqlite::Row| {
440                    Ok(FailedEvent {
441                        id: row.get(0)?,
442                        event_id: row.get(1)?,
443                        event_bytes: row.get(2)?,
444                        error_message: row.get(3)?,
445                        failed_at: row.get(4)?,
446                        retry_count: row.get(5)?,
447                    })
448                },
449            )
450            .map_err(|e: rusqlite::Error| AzothError::Projection(e.to_string()))?;
451
452        events
453            .collect::<std::result::Result<Vec<_>, _>>()
454            .map_err(|e: rusqlite::Error| AzothError::Projection(e.to_string()))
455    }
456
457    fn should_retry_now(&self, event: &FailedEvent, delay: Duration) -> Result<bool> {
458        // If this is the first retry, check failed_at
459        // Otherwise, check last_retry_at
460        let conn = self.dlq.connection();
461
462        let last_attempt: String = conn
463            .query_row(
464                "SELECT COALESCE(last_retry_at, failed_at) FROM dead_letter_queue WHERE id = ?",
465                [event.id],
466                |row| row.get(0),
467            )
468            .map_err(|e| AzothError::Projection(e.to_string()))?;
469
470        // Parse timestamp
471        use chrono::{DateTime, Utc};
472        let last_time = DateTime::parse_from_rfc3339(&last_attempt)
473            .or_else(|_| {
474                // Try SQLite datetime format
475                chrono::NaiveDateTime::parse_from_str(&last_attempt, "%Y-%m-%d %H:%M:%S")
476                    .map(|dt| DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc).into())
477            })
478            .map_err(|e| {
479                AzothError::Projection(format!("Failed to parse timestamp {}: {}", last_attempt, e))
480            })?;
481
482        let elapsed = Utc::now().signed_duration_since(last_time);
483        let elapsed_duration = Duration::from_secs(elapsed.num_seconds().max(0) as u64);
484
485        Ok(elapsed_duration >= delay)
486    }
487
488    async fn replay_event(&self, event: &FailedEvent) -> Result<()> {
489        // Get connection from DLQ (it uses the projection connection)
490        let conn = self.dlq.connection();
491
492        // Process event through registry
493        self.registry
494            .process(conn.as_ref(), event.event_id, &event.event_bytes)
495    }
496
497    fn move_to_permanent_failure(&self, event: &FailedEvent) -> Result<()> {
498        let conn = self.dlq.connection();
499
500        // Create permanent failures table if it doesn't exist
501        conn.execute(
502            "CREATE TABLE IF NOT EXISTS permanent_failures (
503                id INTEGER PRIMARY KEY AUTOINCREMENT,
504                event_id INTEGER NOT NULL,
505                event_bytes BLOB NOT NULL,
506                error_message TEXT NOT NULL,
507                failed_at TEXT NOT NULL,
508                retry_count INTEGER NOT NULL,
509                marked_permanent_at TEXT NOT NULL DEFAULT (datetime('now'))
510            )",
511            [],
512        )
513        .map_err(|e| AzothError::Projection(e.to_string()))?;
514
515        // Move to permanent failures
516        conn.execute(
517            "INSERT INTO permanent_failures (event_id, event_bytes, error_message, failed_at, retry_count)
518             SELECT event_id, event_bytes, error_message, failed_at, retry_count
519             FROM dead_letter_queue
520             WHERE id = ?",
521            [event.id],
522        )
523        .map_err(|e| AzothError::Projection(e.to_string()))?;
524
525        // Remove from DLQ
526        self.dlq.remove(event.id)?;
527
528        self.metrics.record_permanent_failure();
529
530        Ok(())
531    }
532
533    /// Get count of permanent failures
534    pub fn permanent_failure_count(&self) -> Result<usize> {
535        let conn = self.dlq.connection();
536
537        // Check if table exists
538        let exists: bool = conn
539            .query_row(
540                "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='permanent_failures'",
541                [],
542                |row| row.get(0),
543            )
544            .unwrap_or(false);
545
546        if !exists {
547            return Ok(0);
548        }
549
550        let count: i64 = conn
551            .query_row("SELECT COUNT(*) FROM permanent_failures", [], |row| {
552                row.get(0)
553            })
554            .map_err(|e| AzothError::Projection(e.to_string()))?;
555
556        Ok(count as usize)
557    }
558
559    /// Clear permanent failures
560    pub fn clear_permanent_failures(&self) -> Result<()> {
561        let conn = self.dlq.connection();
562        conn.execute("DELETE FROM permanent_failures", [])
563            .map_err(|e| AzothError::Projection(e.to_string()))?;
564        Ok(())
565    }
566}
567
568#[cfg(test)]
569mod tests {
570    use super::*;
571
572    #[test]
573    fn test_backoff_fixed() {
574        let backoff = BackoffStrategy::Fixed(Duration::from_secs(10));
575        assert_eq!(backoff.calculate(0), Duration::from_secs(10));
576        assert_eq!(backoff.calculate(5), Duration::from_secs(10));
577        assert_eq!(backoff.calculate(100), Duration::from_secs(10));
578    }
579
580    #[test]
581    fn test_backoff_exponential() {
582        let backoff = BackoffStrategy::Exponential {
583            initial: Duration::from_secs(1),
584            max: Duration::from_secs(60),
585        };
586
587        assert_eq!(backoff.calculate(0), Duration::from_secs(1)); // 1 * 2^0 = 1
588        assert_eq!(backoff.calculate(1), Duration::from_secs(2)); // 1 * 2^1 = 2
589        assert_eq!(backoff.calculate(2), Duration::from_secs(4)); // 1 * 2^2 = 4
590        assert_eq!(backoff.calculate(3), Duration::from_secs(8)); // 1 * 2^3 = 8
591        assert_eq!(backoff.calculate(10), Duration::from_secs(60)); // capped at max
592    }
593
594    #[test]
595    fn test_backoff_fibonacci() {
596        let backoff = BackoffStrategy::Fibonacci {
597            initial: Duration::from_secs(1),
598            max: Duration::from_secs(100),
599        };
600
601        assert_eq!(backoff.calculate(0), Duration::from_secs(1)); // fib(0) = 1
602        assert_eq!(backoff.calculate(1), Duration::from_secs(1)); // fib(1) = 1
603        assert_eq!(backoff.calculate(2), Duration::from_secs(2)); // fib(2) = 2
604        assert_eq!(backoff.calculate(3), Duration::from_secs(3)); // fib(3) = 3
605        assert_eq!(backoff.calculate(4), Duration::from_secs(5)); // fib(4) = 5
606        assert_eq!(backoff.calculate(5), Duration::from_secs(8)); // fib(5) = 8
607    }
608
609    #[test]
610    fn test_replay_priority_order_by() {
611        let priority = ReplayPriority::FIFO;
612        assert_eq!(priority.order_by_clause().unwrap(), "failed_at ASC");
613
614        let priority = ReplayPriority::LIFO;
615        assert_eq!(priority.order_by_clause().unwrap(), "failed_at DESC");
616
617        let priority = ReplayPriority::ByRetryCount;
618        assert_eq!(
619            priority.order_by_clause().unwrap(),
620            "retry_count ASC, failed_at ASC"
621        );
622    }
623
624    #[test]
625    fn test_replay_priority_by_error_type_validation() {
626        // Valid error types should work
627        let priority = ReplayPriority::ByErrorType(vec![
628            "timeout".to_string(),
629            "connection_error".to_string(),
630        ]);
631        assert!(priority.order_by_clause().is_ok());
632
633        // SQL injection attempt should be rejected
634        let priority =
635            ReplayPriority::ByErrorType(vec!["'; DROP TABLE dead_letter_queue; --".to_string()]);
636        assert!(priority.order_by_clause().is_err());
637
638        // Empty string should be rejected
639        let priority = ReplayPriority::ByErrorType(vec!["".to_string()]);
640        assert!(priority.order_by_clause().is_err());
641    }
642
643    #[test]
644    fn test_default_config() {
645        let config = DlqReplayConfig::default();
646        assert!(config.enabled);
647        assert_eq!(config.max_retries, 5);
648        assert_eq!(config.batch_size, 100);
649    }
650}