Skip to main content

oxirs_stream/
dlq.rs

1//! Dead Letter Queue (DLQ)
2//!
3//! This module provides robust handling of failed events:
4//! - Automatic retry with exponential backoff
5//! - Dead letter queue for permanently failed events
6//! - Failure analysis and categorization
7//! - Replay capabilities
8//! - Alerting on high failure rates
9
10use crate::StreamEvent;
11use anyhow::Result;
12use chrono::{DateTime, Duration as ChronoDuration, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::{HashMap, VecDeque};
15use std::sync::Arc;
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19/// Failure reason categorization
20#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
21pub enum FailureReason {
22    /// Network connectivity issues
23    NetworkError,
24    /// Serialization/deserialization errors
25    SerializationError,
26    /// Validation errors
27    ValidationError,
28    /// Timeout errors
29    TimeoutError,
30    /// Backend-specific errors
31    BackendError(String),
32    /// Unknown errors
33    Unknown(String),
34}
35
36/// Failed event with metadata
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct FailedEvent {
39    pub event: StreamEvent,
40    pub failure_reason: FailureReason,
41    pub error_message: String,
42    pub first_attempt: DateTime<Utc>,
43    pub last_attempt: DateTime<Utc>,
44    pub retry_count: u32,
45    pub stack_trace: Option<String>,
46}
47
48/// DLQ configuration
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct DlqConfig {
51    /// Maximum retry attempts before moving to DLQ
52    pub max_retries: u32,
53    /// Initial retry delay
54    pub initial_retry_delay: ChronoDuration,
55    /// Maximum retry delay
56    pub max_retry_delay: ChronoDuration,
57    /// Retry backoff multiplier
58    pub backoff_multiplier: f64,
59    /// Maximum DLQ size
60    pub max_dlq_size: usize,
61    /// Enable automatic replay
62    pub enable_auto_replay: bool,
63    /// Replay interval
64    pub replay_interval: ChronoDuration,
65    /// Alert threshold (failure rate percentage)
66    pub alert_threshold: f64,
67}
68
69impl Default for DlqConfig {
70    fn default() -> Self {
71        Self {
72            max_retries: 3,
73            initial_retry_delay: ChronoDuration::milliseconds(100),
74            max_retry_delay: ChronoDuration::seconds(30),
75            backoff_multiplier: 2.0,
76            max_dlq_size: 100000,
77            enable_auto_replay: false,
78            replay_interval: ChronoDuration::hours(1),
79            alert_threshold: 0.05, // 5% failure rate
80        }
81    }
82}
83
84/// DLQ statistics
85#[derive(Debug, Clone, Default, Serialize, Deserialize)]
86pub struct DlqStats {
87    pub events_failed: u64,
88    pub events_retried: u64,
89    pub events_moved_to_dlq: u64,
90    pub events_replayed: u64,
91    pub current_dlq_size: usize,
92    pub failure_by_reason: HashMap<String, u64>,
93    pub failure_rate: f64,
94    pub last_replay: Option<DateTime<Utc>>,
95}
96
97/// Type alias for failure history
98type FailureHistory = Arc<RwLock<VecDeque<(DateTime<Utc>, FailureReason)>>>;
99
100/// Dead Letter Queue manager
101pub struct DeadLetterQueue {
102    config: DlqConfig,
103    retry_queue: Arc<RwLock<VecDeque<FailedEvent>>>,
104    dlq: Arc<RwLock<VecDeque<FailedEvent>>>,
105    stats: Arc<RwLock<DlqStats>>,
106    failure_history: FailureHistory,
107}
108
109impl DeadLetterQueue {
110    /// Create a new DLQ
111    pub fn new(config: DlqConfig) -> Self {
112        Self {
113            config,
114            retry_queue: Arc::new(RwLock::new(VecDeque::new())),
115            dlq: Arc::new(RwLock::new(VecDeque::new())),
116            stats: Arc::new(RwLock::new(DlqStats::default())),
117            failure_history: Arc::new(RwLock::new(VecDeque::new())),
118        }
119    }
120
121    /// Handle a failed event
122    pub async fn handle_failed_event(
123        &self,
124        event: StreamEvent,
125        failure_reason: FailureReason,
126        error_message: String,
127    ) -> Result<()> {
128        let now = Utc::now();
129
130        // Update statistics
131        let mut stats = self.stats.write().await;
132        stats.events_failed += 1;
133
134        let reason_key = format!("{:?}", failure_reason);
135        *stats.failure_by_reason.entry(reason_key).or_insert(0) += 1;
136
137        drop(stats);
138
139        // Record failure
140        let mut history = self.failure_history.write().await;
141        history.push_back((now, failure_reason.clone()));
142
143        // Keep only last 1000 failures
144        if history.len() > 1000 {
145            history.pop_front();
146        }
147
148        drop(history);
149
150        // Create failed event record
151        let failed_event = FailedEvent {
152            event,
153            failure_reason: failure_reason.clone(),
154            error_message: error_message.clone(),
155            first_attempt: now,
156            last_attempt: now,
157            retry_count: 0,
158            stack_trace: None,
159        };
160
161        // Add to retry queue
162        let mut retry_queue = self.retry_queue.write().await;
163        retry_queue.push_back(failed_event);
164
165        info!(
166            "Event failed, added to retry queue: {:?} - {}",
167            failure_reason, error_message
168        );
169
170        // Check alert threshold
171        self.check_failure_rate().await;
172
173        Ok(())
174    }
175
176    /// Process retry queue
177    pub async fn process_retries<F, Fut>(&self, retry_fn: F) -> Result<Vec<StreamEvent>>
178    where
179        F: Fn(StreamEvent) -> Fut + Send + Sync,
180        Fut: std::future::Future<Output = Result<()>> + Send,
181    {
182        let mut retry_queue = self.retry_queue.write().await;
183        let mut still_failing = Vec::new();
184        let mut successfully_retried = Vec::new();
185
186        while let Some(mut failed_event) = retry_queue.pop_front() {
187            let now = Utc::now();
188
189            // Calculate retry delay
190            let delay = self.calculate_retry_delay(failed_event.retry_count);
191            let time_since_last_attempt = now - failed_event.last_attempt;
192
193            if time_since_last_attempt < delay {
194                // Not ready to retry yet
195                still_failing.push(failed_event);
196                continue;
197            }
198
199            // Attempt retry
200            let result = retry_fn(failed_event.event.clone()).await;
201
202            match result {
203                Ok(_) => {
204                    // Success!
205                    successfully_retried.push(failed_event.event.clone());
206
207                    let mut stats = self.stats.write().await;
208                    stats.events_retried += 1;
209
210                    info!(
211                        "Event successfully retried after {} attempts",
212                        failed_event.retry_count + 1
213                    );
214                }
215                Err(e) => {
216                    // Still failing
217                    failed_event.retry_count += 1;
218                    failed_event.last_attempt = now;
219                    failed_event.error_message = e.to_string();
220
221                    if failed_event.retry_count >= self.config.max_retries {
222                        // Move to DLQ
223                        warn!(
224                            "Event failed after {} retries, moving to DLQ: {}",
225                            failed_event.retry_count, e
226                        );
227
228                        self.move_to_dlq(failed_event).await?;
229                    } else {
230                        // Keep retrying
231                        still_failing.push(failed_event);
232                    }
233                }
234            }
235        }
236
237        // Put still-failing events back in retry queue
238        *retry_queue = still_failing.into();
239
240        Ok(successfully_retried)
241    }
242
243    /// Move an event to DLQ
244    async fn move_to_dlq(&self, failed_event: FailedEvent) -> Result<()> {
245        let mut dlq = self.dlq.write().await;
246
247        // Check size limit
248        if dlq.len() >= self.config.max_dlq_size {
249            warn!("DLQ size limit reached, dropping oldest event");
250            dlq.pop_front();
251        }
252
253        dlq.push_back(failed_event);
254
255        let mut stats = self.stats.write().await;
256        stats.events_moved_to_dlq += 1;
257        stats.current_dlq_size = dlq.len();
258
259        Ok(())
260    }
261
262    /// Calculate retry delay with exponential backoff
263    fn calculate_retry_delay(&self, retry_count: u32) -> ChronoDuration {
264        let delay_ms = self.config.initial_retry_delay.num_milliseconds() as f64
265            * self.config.backoff_multiplier.powi(retry_count as i32);
266
267        let delay_ms = delay_ms.min(self.config.max_retry_delay.num_milliseconds() as f64);
268
269        ChronoDuration::milliseconds(delay_ms as i64)
270    }
271
272    /// Replay events from DLQ
273    pub async fn replay_dlq<F, Fut>(
274        &self,
275        replay_fn: F,
276        max_events: Option<usize>,
277    ) -> Result<Vec<StreamEvent>>
278    where
279        F: Fn(StreamEvent) -> Fut + Send + Sync,
280        Fut: std::future::Future<Output = Result<()>> + Send,
281    {
282        let mut dlq = self.dlq.write().await;
283        let mut successfully_replayed = Vec::new();
284        let mut still_failing = Vec::new();
285
286        let replay_count = max_events.unwrap_or(dlq.len()).min(dlq.len());
287
288        for _ in 0..replay_count {
289            if let Some(failed_event) = dlq.pop_front() {
290                let result = replay_fn(failed_event.event.clone()).await;
291
292                match result {
293                    Ok(_) => {
294                        successfully_replayed.push(failed_event.event.clone());
295
296                        let mut stats = self.stats.write().await;
297                        stats.events_replayed += 1;
298
299                        info!("Event successfully replayed from DLQ");
300                    }
301                    Err(e) => {
302                        error!("Event replay failed: {}", e);
303                        still_failing.push(failed_event);
304                    }
305                }
306            }
307        }
308
309        // Put still-failing events back in DLQ
310        for failed_event in still_failing {
311            dlq.push_back(failed_event);
312        }
313
314        let mut stats = self.stats.write().await;
315        stats.current_dlq_size = dlq.len();
316        stats.last_replay = Some(Utc::now());
317
318        info!("Replayed {} events from DLQ", successfully_replayed.len());
319
320        Ok(successfully_replayed)
321    }
322
323    /// Get events from DLQ by failure reason
324    pub async fn get_by_reason(&self, reason: &FailureReason) -> Vec<FailedEvent> {
325        let dlq = self.dlq.read().await;
326
327        dlq.iter()
328            .filter(|evt| &evt.failure_reason == reason)
329            .cloned()
330            .collect()
331    }
332
333    /// Remove specific event from DLQ
334    pub async fn remove_from_dlq(&self, predicate: impl Fn(&FailedEvent) -> bool) -> usize {
335        let mut dlq = self.dlq.write().await;
336        let initial_size = dlq.len();
337
338        dlq.retain(|evt| !predicate(evt));
339
340        let removed = initial_size - dlq.len();
341
342        let mut stats = self.stats.write().await;
343        stats.current_dlq_size = dlq.len();
344
345        removed
346    }
347
348    /// Clear DLQ
349    pub async fn clear_dlq(&self) {
350        let mut dlq = self.dlq.write().await;
351        let cleared = dlq.len();
352        dlq.clear();
353
354        let mut stats = self.stats.write().await;
355        stats.current_dlq_size = 0;
356
357        info!("Cleared {} events from DLQ", cleared);
358    }
359
360    /// Get DLQ statistics
361    pub async fn stats(&self) -> DlqStats {
362        let mut stats = self.stats.read().await.clone();
363
364        // Calculate failure rate
365        stats.failure_rate = self.calculate_failure_rate().await;
366
367        stats
368    }
369
370    /// Calculate current failure rate
371    async fn calculate_failure_rate(&self) -> f64 {
372        let history = self.failure_history.read().await;
373
374        if history.is_empty() {
375            return 0.0;
376        }
377
378        // Calculate failures in last minute
379        let now = Utc::now();
380        let one_minute_ago = now - ChronoDuration::minutes(1);
381
382        let recent_failures = history
383            .iter()
384            .filter(|(timestamp, _)| *timestamp >= one_minute_ago)
385            .count();
386
387        // Estimate total events (failures / assumed failure rate)
388        // This is a rough estimate - in production, you'd track successful events too
389        let estimated_total = (recent_failures as f64 / 0.01).max(recent_failures as f64);
390
391        recent_failures as f64 / estimated_total
392    }
393
394    /// Check if failure rate exceeds threshold
395    async fn check_failure_rate(&self) {
396        let failure_rate = self.calculate_failure_rate().await;
397
398        if failure_rate >= self.config.alert_threshold {
399            error!(
400                "ALERT: Failure rate ({:.2}%) exceeds threshold ({:.2}%)",
401                failure_rate * 100.0,
402                self.config.alert_threshold * 100.0
403            );
404
405            // In a production system, this would trigger alerts (PagerDuty, Slack, etc.)
406        }
407    }
408
409    /// Get retry queue size
410    pub async fn retry_queue_size(&self) -> usize {
411        self.retry_queue.read().await.len()
412    }
413
414    /// Get DLQ size
415    pub async fn dlq_size(&self) -> usize {
416        self.dlq.read().await.len()
417    }
418
419    /// Get all DLQ events
420    pub async fn get_all_dlq_events(&self) -> Vec<FailedEvent> {
421        self.dlq.read().await.iter().cloned().collect()
422    }
423}
424
425/// DLQ-aware event processor
426pub struct DlqEventProcessor<T> {
427    dlq: Arc<DeadLetterQueue>,
428    processor: Arc<dyn Fn(T) -> Result<()> + Send + Sync>,
429}
430
431impl<T: Clone + Into<StreamEvent>> DlqEventProcessor<T> {
432    pub fn new<F>(dlq: Arc<DeadLetterQueue>, processor: F) -> Self
433    where
434        F: Fn(T) -> Result<()> + Send + Sync + 'static,
435    {
436        Self {
437            dlq,
438            processor: Arc::new(processor),
439        }
440    }
441
442    /// Process event with DLQ handling
443    pub async fn process(&self, event: T) -> Result<()> {
444        let stream_event = event.clone().into();
445
446        match (self.processor)(event) {
447            Ok(_) => Ok(()),
448            Err(e) => {
449                // Categorize error
450                let failure_reason = self.categorize_error(&e);
451
452                // Handle with DLQ
453                self.dlq
454                    .handle_failed_event(stream_event, failure_reason, e.to_string())
455                    .await?;
456
457                Err(e)
458            }
459        }
460    }
461
462    /// Categorize error into failure reason
463    fn categorize_error(&self, error: &anyhow::Error) -> FailureReason {
464        let error_str = error.to_string().to_lowercase();
465
466        if error_str.contains("network") || error_str.contains("connection") {
467            FailureReason::NetworkError
468        } else if error_str.contains("serializ") || error_str.contains("deserializ") {
469            FailureReason::SerializationError
470        } else if error_str.contains("validation") || error_str.contains("invalid") {
471            FailureReason::ValidationError
472        } else if error_str.contains("timeout") {
473            FailureReason::TimeoutError
474        } else {
475            FailureReason::Unknown(error.to_string())
476        }
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483    use crate::event::EventMetadata;
484    use anyhow::anyhow;
485
486    fn create_test_event() -> StreamEvent {
487        StreamEvent::TripleAdded {
488            subject: "test".to_string(),
489            predicate: "test".to_string(),
490            object: "test".to_string(),
491            graph: None,
492            metadata: EventMetadata::default(),
493        }
494    }
495
496    #[tokio::test]
497    async fn test_dlq_basic() {
498        let config = DlqConfig::default();
499        let dlq = DeadLetterQueue::new(config);
500
501        let event = create_test_event();
502
503        dlq.handle_failed_event(
504            event,
505            FailureReason::NetworkError,
506            "Connection failed".to_string(),
507        )
508        .await
509        .unwrap();
510
511        assert_eq!(dlq.retry_queue_size().await, 1);
512        assert_eq!(dlq.dlq_size().await, 0);
513
514        let stats = dlq.stats().await;
515        assert_eq!(stats.events_failed, 1);
516    }
517
518    #[tokio::test]
519    async fn test_retry_exhaustion() {
520        let config = DlqConfig {
521            max_retries: 2,
522            initial_retry_delay: ChronoDuration::milliseconds(1),
523            ..Default::default()
524        };
525
526        let dlq = DeadLetterQueue::new(config);
527
528        let event = create_test_event();
529
530        dlq.handle_failed_event(
531            event.clone(),
532            FailureReason::NetworkError,
533            "Connection failed".to_string(),
534        )
535        .await
536        .unwrap();
537
538        // Process retries with failing function
539        let retry_fn = |_: StreamEvent| async { Err(anyhow!("Still failing")) };
540
541        for _ in 0..3 {
542            // Wait for retry delay between attempts
543            tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
544            dlq.process_retries(retry_fn).await.unwrap();
545        }
546
547        // Should be moved to DLQ after exhausting retries
548        assert_eq!(dlq.dlq_size().await, 1);
549        assert_eq!(dlq.retry_queue_size().await, 0);
550    }
551
552    #[tokio::test]
553    async fn test_successful_retry() {
554        let config = DlqConfig {
555            max_retries: 3,
556            initial_retry_delay: ChronoDuration::milliseconds(1),
557            ..Default::default()
558        };
559
560        let dlq = DeadLetterQueue::new(config);
561
562        let event = create_test_event();
563
564        dlq.handle_failed_event(
565            event.clone(),
566            FailureReason::NetworkError,
567            "Connection failed".to_string(),
568        )
569        .await
570        .unwrap();
571
572        // Wait for retry delay
573        tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
574
575        // Process retries with successful function
576        let retry_fn = |_: StreamEvent| async { Ok(()) };
577
578        let retried = dlq.process_retries(retry_fn).await.unwrap();
579
580        assert_eq!(retried.len(), 1);
581        assert_eq!(dlq.retry_queue_size().await, 0);
582        assert_eq!(dlq.dlq_size().await, 0);
583    }
584
585    #[tokio::test]
586    async fn test_dlq_replay() {
587        let config = DlqConfig::default();
588        let dlq = DeadLetterQueue::new(config);
589
590        // Add events to DLQ
591        for i in 0..5 {
592            let mut event = create_test_event();
593            if let StreamEvent::TripleAdded {
594                ref mut subject, ..
595            } = event
596            {
597                *subject = format!("test_{}", i);
598            }
599
600            // Simulate exhausted retries by directly adding to DLQ
601            let failed_event = FailedEvent {
602                event,
603                failure_reason: FailureReason::NetworkError,
604                error_message: "Connection failed".to_string(),
605                first_attempt: Utc::now(),
606                last_attempt: Utc::now(),
607                retry_count: 5,
608                stack_trace: None,
609            };
610
611            let mut dlq_queue = dlq.dlq.write().await;
612            dlq_queue.push_back(failed_event);
613        }
614
615        assert_eq!(dlq.dlq_size().await, 5);
616
617        // Replay with successful function
618        let replay_fn = |_: StreamEvent| async { Ok(()) };
619
620        let replayed = dlq.replay_dlq(replay_fn, Some(3)).await.unwrap();
621
622        assert_eq!(replayed.len(), 3);
623        assert_eq!(dlq.dlq_size().await, 2);
624    }
625}