1use crate::{AzothError, DeadLetterQueue, EventHandlerRegistry, FailedEvent, Result};
47use rusqlite::params;
48use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
49use std::sync::Arc;
50use std::time::{Duration, SystemTime};
51
52#[derive(Clone, Debug)]
54pub struct DlqReplayConfig {
55 pub enabled: bool,
57
58 pub check_interval: Duration,
60
61 pub max_retries: usize,
63
64 pub backoff: BackoffStrategy,
66
67 pub min_age: Duration,
70
71 pub batch_size: usize,
73
74 pub priority: ReplayPriority,
76
77 pub stop_on_consecutive_failures: Option<usize>,
79}
80
81impl Default for DlqReplayConfig {
82 fn default() -> Self {
83 Self {
84 enabled: true,
85 check_interval: Duration::from_secs(60),
86 max_retries: 5,
87 backoff: BackoffStrategy::Exponential {
88 initial: Duration::from_secs(10),
89 max: Duration::from_secs(3600),
90 },
91 min_age: Duration::from_secs(5),
92 batch_size: 100,
93 priority: ReplayPriority::ByRetryCount,
94 stop_on_consecutive_failures: Some(10),
95 }
96 }
97}
98
99#[derive(Clone, Debug)]
101pub enum BackoffStrategy {
102 Fixed(Duration),
104
105 Exponential { initial: Duration, max: Duration },
107
108 Fibonacci { initial: Duration, max: Duration },
110}
111
112impl BackoffStrategy {
113 pub fn calculate(&self, attempt: usize) -> Duration {
115 match self {
116 BackoffStrategy::Fixed(delay) => *delay,
117 BackoffStrategy::Exponential { initial, max } => {
118 let multiplier = 2u64.saturating_pow(attempt as u32);
119 let delay = initial.saturating_mul(multiplier as u32);
120 delay.min(*max)
121 }
122 BackoffStrategy::Fibonacci { initial, max } => {
123 let fib = Self::fibonacci(attempt);
124 let delay = initial.saturating_mul(fib as u32);
125 delay.min(*max)
126 }
127 }
128 }
129
130 fn fibonacci(n: usize) -> u64 {
131 match n {
132 0 => 1,
133 1 => 1,
134 _ => {
135 let mut a = 1u64;
136 let mut b = 1u64;
137 for _ in 2..=n {
138 let c = a.saturating_add(b);
139 a = b;
140 b = c;
141 }
142 b
143 }
144 }
145 }
146}
147
148#[derive(Clone, Debug)]
150pub enum ReplayPriority {
151 FIFO,
153
154 LIFO,
156
157 ByRetryCount,
159
160 ByErrorType(Vec<String>),
162}
163
164impl ReplayPriority {
165 fn order_by_clause(&self) -> Result<String> {
170 match self {
171 ReplayPriority::FIFO => Ok("failed_at ASC".to_string()),
172 ReplayPriority::LIFO => Ok("failed_at DESC".to_string()),
173 ReplayPriority::ByRetryCount => Ok("retry_count ASC, failed_at ASC".to_string()),
174 ReplayPriority::ByErrorType(types) => {
175 for t in types {
178 if t.is_empty() || t.len() > 128 {
179 return Err(AzothError::Config(format!(
180 "ByErrorType string must be 1-128 characters, got length {}",
181 t.len()
182 )));
183 }
184 if !t.chars().all(|c| {
185 c.is_alphanumeric() || c == '_' || c == '-' || c == '.' || c == ' '
186 }) {
187 return Err(AzothError::Config(format!(
188 "ByErrorType string '{}' contains disallowed characters. \
189 Only alphanumeric, underscore, dash, dot, and space are permitted.",
190 t
191 )));
192 }
193 }
194
195 let cases = types
197 .iter()
198 .enumerate()
199 .map(|(i, t)| format!("WHEN error_message LIKE '%{}%' THEN {}", t, i))
200 .collect::<Vec<_>>()
201 .join(" ");
202 Ok(format!("CASE {} ELSE 999 END, failed_at ASC", cases))
203 }
204 }
205 }
206}
207
208#[derive(Default)]
210pub struct DlqMetrics {
211 pub successes: AtomicU64,
213
214 pub failures: AtomicU64,
216
217 pub permanent_failures: AtomicU64,
219
220 pub last_check: AtomicU64,
222}
223
224impl DlqMetrics {
225 fn record_success(&self, _retry_count: i32) {
226 self.successes.fetch_add(1, Ordering::Relaxed);
227 }
228
229 fn record_failure(&self, _retry_count: i32) {
230 self.failures.fetch_add(1, Ordering::Relaxed);
231 }
232
233 fn record_permanent_failure(&self) {
234 self.permanent_failures.fetch_add(1, Ordering::Relaxed);
235 }
236
237 fn update_last_check(&self) {
238 let now = SystemTime::now()
239 .duration_since(SystemTime::UNIX_EPOCH)
240 .unwrap()
241 .as_secs();
242 self.last_check.store(now, Ordering::Relaxed);
243 }
244
245 pub fn snapshot(&self) -> DlqMetricsSnapshot {
247 DlqMetricsSnapshot {
248 successes: self.successes.load(Ordering::Relaxed),
249 failures: self.failures.load(Ordering::Relaxed),
250 permanent_failures: self.permanent_failures.load(Ordering::Relaxed),
251 last_check: self.last_check.load(Ordering::Relaxed),
252 }
253 }
254}
255
256#[derive(Debug, Clone)]
258pub struct DlqMetricsSnapshot {
259 pub successes: u64,
260 pub failures: u64,
261 pub permanent_failures: u64,
262 pub last_check: u64,
263}
264
265pub struct DlqReplayer {
267 dlq: Arc<DeadLetterQueue>,
268 registry: Arc<EventHandlerRegistry>,
269 config: DlqReplayConfig,
270 shutdown: Arc<AtomicBool>,
271 metrics: Arc<DlqMetrics>,
272}
273
274impl DlqReplayer {
275 pub fn new(
277 dlq: Arc<DeadLetterQueue>,
278 registry: Arc<EventHandlerRegistry>,
279 config: DlqReplayConfig,
280 ) -> Self {
281 Self {
282 dlq,
283 registry,
284 config,
285 shutdown: Arc::new(AtomicBool::new(false)),
286 metrics: Arc::new(DlqMetrics::default()),
287 }
288 }
289
290 pub fn metrics(&self) -> &Arc<DlqMetrics> {
292 &self.metrics
293 }
294
295 pub fn shutdown(&self) {
297 self.shutdown.store(true, Ordering::Relaxed);
298 }
299
300 pub fn is_enabled(&self) -> bool {
302 self.config.enabled
303 }
304
305 pub async fn run(self: Arc<Self>) -> Result<()> {
307 if !self.config.enabled {
308 tracing::info!("DLQ replayer is disabled");
309 return Ok(());
310 }
311
312 tracing::info!(
313 "DLQ replayer started (check interval: {:?})",
314 self.config.check_interval
315 );
316
317 while !self.shutdown.load(Ordering::Relaxed) {
318 match self.run_replay_cycle().await {
319 Ok(_) => {}
320 Err(e) => {
321 tracing::error!("DLQ replay cycle error: {}", e);
322 }
323 }
324
325 self.metrics.update_last_check();
326 tokio::time::sleep(self.config.check_interval).await;
327 }
328
329 tracing::info!("DLQ replayer shutdown");
330 Ok(())
331 }
332
333 async fn run_replay_cycle(&self) -> Result<()> {
334 let failed_events = self.get_eligible_events()?;
336
337 if failed_events.is_empty() {
338 return Ok(());
339 }
340
341 tracing::debug!("Found {} eligible events for replay", failed_events.len());
342
343 let mut consecutive_failures = 0;
344
345 for event in failed_events {
346 let delay = self.config.backoff.calculate(event.retry_count as usize);
348
349 if !self.should_retry_now(&event, delay)? {
351 continue;
352 }
353
354 tracing::debug!(
356 "Replaying event {} (attempt {}/{})",
357 event.event_id,
358 event.retry_count + 1,
359 self.config.max_retries
360 );
361
362 match self.replay_event(&event).await {
363 Ok(_) => {
364 self.dlq.remove(event.id)?;
366 self.metrics.record_success(event.retry_count);
367 consecutive_failures = 0;
368
369 tracing::info!(
370 "Successfully replayed event {} after {} retries",
371 event.event_id,
372 event.retry_count
373 );
374 }
375 Err(e) => {
376 self.dlq.mark_retry(event.id)?;
378 self.metrics.record_failure(event.retry_count);
379 consecutive_failures += 1;
380
381 tracing::warn!(
382 "Failed to replay event {}: {} (retry {}/{})",
383 event.event_id,
384 e,
385 event.retry_count + 1,
386 self.config.max_retries
387 );
388
389 if let Some(max) = self.config.stop_on_consecutive_failures {
391 if consecutive_failures >= max {
392 tracing::warn!(
393 "Stopping DLQ replay after {} consecutive failures",
394 consecutive_failures
395 );
396 return Ok(());
397 }
398 }
399
400 if event.retry_count + 1 >= self.config.max_retries as i32 {
402 tracing::error!(
403 "Event {} exceeded max retries ({}), marking as permanently failed",
404 event.event_id,
405 self.config.max_retries
406 );
407 self.move_to_permanent_failure(&event)?;
408 }
409 }
410 }
411 }
412
413 Ok(())
414 }
415
416 fn get_eligible_events(&self) -> Result<Vec<FailedEvent>> {
417 let order_by = self.config.priority.order_by_clause()?;
418 let min_age_secs = self.config.min_age.as_secs();
419
420 let query = format!(
421 "SELECT id, event_id, event_bytes, error_message, failed_at, retry_count
422 FROM dead_letter_queue
423 WHERE retry_count < ?
424 AND datetime(COALESCE(last_retry_at, failed_at)) <= datetime('now', '-{} seconds')
425 ORDER BY {}
426 LIMIT ?",
427 min_age_secs, order_by
428 );
429
430 let conn = self.dlq.connection();
432 let mut stmt = conn
433 .prepare(&query)
434 .map_err(|e: rusqlite::Error| AzothError::Projection(e.to_string()))?;
435
436 let events = stmt
437 .query_map(
438 params![self.config.max_retries as i32, self.config.batch_size],
439 |row: &rusqlite::Row| {
440 Ok(FailedEvent {
441 id: row.get(0)?,
442 event_id: row.get(1)?,
443 event_bytes: row.get(2)?,
444 error_message: row.get(3)?,
445 failed_at: row.get(4)?,
446 retry_count: row.get(5)?,
447 })
448 },
449 )
450 .map_err(|e: rusqlite::Error| AzothError::Projection(e.to_string()))?;
451
452 events
453 .collect::<std::result::Result<Vec<_>, _>>()
454 .map_err(|e: rusqlite::Error| AzothError::Projection(e.to_string()))
455 }
456
457 fn should_retry_now(&self, event: &FailedEvent, delay: Duration) -> Result<bool> {
458 let conn = self.dlq.connection();
461
462 let last_attempt: String = conn
463 .query_row(
464 "SELECT COALESCE(last_retry_at, failed_at) FROM dead_letter_queue WHERE id = ?",
465 [event.id],
466 |row| row.get(0),
467 )
468 .map_err(|e| AzothError::Projection(e.to_string()))?;
469
470 use chrono::{DateTime, Utc};
472 let last_time = DateTime::parse_from_rfc3339(&last_attempt)
473 .or_else(|_| {
474 chrono::NaiveDateTime::parse_from_str(&last_attempt, "%Y-%m-%d %H:%M:%S")
476 .map(|dt| DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc).into())
477 })
478 .map_err(|e| {
479 AzothError::Projection(format!("Failed to parse timestamp {}: {}", last_attempt, e))
480 })?;
481
482 let elapsed = Utc::now().signed_duration_since(last_time);
483 let elapsed_duration = Duration::from_secs(elapsed.num_seconds().max(0) as u64);
484
485 Ok(elapsed_duration >= delay)
486 }
487
488 async fn replay_event(&self, event: &FailedEvent) -> Result<()> {
489 let conn = self.dlq.connection();
491
492 self.registry
494 .process(conn.as_ref(), event.event_id, &event.event_bytes)
495 }
496
497 fn move_to_permanent_failure(&self, event: &FailedEvent) -> Result<()> {
498 let conn = self.dlq.connection();
499
500 conn.execute(
502 "CREATE TABLE IF NOT EXISTS permanent_failures (
503 id INTEGER PRIMARY KEY AUTOINCREMENT,
504 event_id INTEGER NOT NULL,
505 event_bytes BLOB NOT NULL,
506 error_message TEXT NOT NULL,
507 failed_at TEXT NOT NULL,
508 retry_count INTEGER NOT NULL,
509 marked_permanent_at TEXT NOT NULL DEFAULT (datetime('now'))
510 )",
511 [],
512 )
513 .map_err(|e| AzothError::Projection(e.to_string()))?;
514
515 conn.execute(
517 "INSERT INTO permanent_failures (event_id, event_bytes, error_message, failed_at, retry_count)
518 SELECT event_id, event_bytes, error_message, failed_at, retry_count
519 FROM dead_letter_queue
520 WHERE id = ?",
521 [event.id],
522 )
523 .map_err(|e| AzothError::Projection(e.to_string()))?;
524
525 self.dlq.remove(event.id)?;
527
528 self.metrics.record_permanent_failure();
529
530 Ok(())
531 }
532
533 pub fn permanent_failure_count(&self) -> Result<usize> {
535 let conn = self.dlq.connection();
536
537 let exists: bool = conn
539 .query_row(
540 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='permanent_failures'",
541 [],
542 |row| row.get(0),
543 )
544 .unwrap_or(false);
545
546 if !exists {
547 return Ok(0);
548 }
549
550 let count: i64 = conn
551 .query_row("SELECT COUNT(*) FROM permanent_failures", [], |row| {
552 row.get(0)
553 })
554 .map_err(|e| AzothError::Projection(e.to_string()))?;
555
556 Ok(count as usize)
557 }
558
559 pub fn clear_permanent_failures(&self) -> Result<()> {
561 let conn = self.dlq.connection();
562 conn.execute("DELETE FROM permanent_failures", [])
563 .map_err(|e| AzothError::Projection(e.to_string()))?;
564 Ok(())
565 }
566}
567
568#[cfg(test)]
569mod tests {
570 use super::*;
571
572 #[test]
573 fn test_backoff_fixed() {
574 let backoff = BackoffStrategy::Fixed(Duration::from_secs(10));
575 assert_eq!(backoff.calculate(0), Duration::from_secs(10));
576 assert_eq!(backoff.calculate(5), Duration::from_secs(10));
577 assert_eq!(backoff.calculate(100), Duration::from_secs(10));
578 }
579
580 #[test]
581 fn test_backoff_exponential() {
582 let backoff = BackoffStrategy::Exponential {
583 initial: Duration::from_secs(1),
584 max: Duration::from_secs(60),
585 };
586
587 assert_eq!(backoff.calculate(0), Duration::from_secs(1)); assert_eq!(backoff.calculate(1), Duration::from_secs(2)); assert_eq!(backoff.calculate(2), Duration::from_secs(4)); assert_eq!(backoff.calculate(3), Duration::from_secs(8)); assert_eq!(backoff.calculate(10), Duration::from_secs(60)); }
593
594 #[test]
595 fn test_backoff_fibonacci() {
596 let backoff = BackoffStrategy::Fibonacci {
597 initial: Duration::from_secs(1),
598 max: Duration::from_secs(100),
599 };
600
601 assert_eq!(backoff.calculate(0), Duration::from_secs(1)); assert_eq!(backoff.calculate(1), Duration::from_secs(1)); assert_eq!(backoff.calculate(2), Duration::from_secs(2)); assert_eq!(backoff.calculate(3), Duration::from_secs(3)); assert_eq!(backoff.calculate(4), Duration::from_secs(5)); assert_eq!(backoff.calculate(5), Duration::from_secs(8)); }
608
609 #[test]
610 fn test_replay_priority_order_by() {
611 let priority = ReplayPriority::FIFO;
612 assert_eq!(priority.order_by_clause().unwrap(), "failed_at ASC");
613
614 let priority = ReplayPriority::LIFO;
615 assert_eq!(priority.order_by_clause().unwrap(), "failed_at DESC");
616
617 let priority = ReplayPriority::ByRetryCount;
618 assert_eq!(
619 priority.order_by_clause().unwrap(),
620 "retry_count ASC, failed_at ASC"
621 );
622 }
623
624 #[test]
625 fn test_replay_priority_by_error_type_validation() {
626 let priority = ReplayPriority::ByErrorType(vec![
628 "timeout".to_string(),
629 "connection_error".to_string(),
630 ]);
631 assert!(priority.order_by_clause().is_ok());
632
633 let priority =
635 ReplayPriority::ByErrorType(vec!["'; DROP TABLE dead_letter_queue; --".to_string()]);
636 assert!(priority.order_by_clause().is_err());
637
638 let priority = ReplayPriority::ByErrorType(vec!["".to_string()]);
640 assert!(priority.order_by_clause().is_err());
641 }
642
643 #[test]
644 fn test_default_config() {
645 let config = DlqReplayConfig::default();
646 assert!(config.enabled);
647 assert_eq!(config.max_retries, 5);
648 assert_eq!(config.batch_size, 100);
649 }
650}