1use crate::StreamEvent;
11use anyhow::Result;
12use chrono::{DateTime, Duration as ChronoDuration, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::{HashMap, VecDeque};
15use std::sync::Arc;
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
21pub enum FailureReason {
22 NetworkError,
24 SerializationError,
26 ValidationError,
28 TimeoutError,
30 BackendError(String),
32 Unknown(String),
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct FailedEvent {
39 pub event: StreamEvent,
40 pub failure_reason: FailureReason,
41 pub error_message: String,
42 pub first_attempt: DateTime<Utc>,
43 pub last_attempt: DateTime<Utc>,
44 pub retry_count: u32,
45 pub stack_trace: Option<String>,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct DlqConfig {
51 pub max_retries: u32,
53 pub initial_retry_delay: ChronoDuration,
55 pub max_retry_delay: ChronoDuration,
57 pub backoff_multiplier: f64,
59 pub max_dlq_size: usize,
61 pub enable_auto_replay: bool,
63 pub replay_interval: ChronoDuration,
65 pub alert_threshold: f64,
67}
68
69impl Default for DlqConfig {
70 fn default() -> Self {
71 Self {
72 max_retries: 3,
73 initial_retry_delay: ChronoDuration::milliseconds(100),
74 max_retry_delay: ChronoDuration::seconds(30),
75 backoff_multiplier: 2.0,
76 max_dlq_size: 100000,
77 enable_auto_replay: false,
78 replay_interval: ChronoDuration::hours(1),
79 alert_threshold: 0.05, }
81 }
82}
83
84#[derive(Debug, Clone, Default, Serialize, Deserialize)]
86pub struct DlqStats {
87 pub events_failed: u64,
88 pub events_retried: u64,
89 pub events_moved_to_dlq: u64,
90 pub events_replayed: u64,
91 pub current_dlq_size: usize,
92 pub failure_by_reason: HashMap<String, u64>,
93 pub failure_rate: f64,
94 pub last_replay: Option<DateTime<Utc>>,
95}
96
97type FailureHistory = Arc<RwLock<VecDeque<(DateTime<Utc>, FailureReason)>>>;
99
100pub struct DeadLetterQueue {
102 config: DlqConfig,
103 retry_queue: Arc<RwLock<VecDeque<FailedEvent>>>,
104 dlq: Arc<RwLock<VecDeque<FailedEvent>>>,
105 stats: Arc<RwLock<DlqStats>>,
106 failure_history: FailureHistory,
107}
108
109impl DeadLetterQueue {
110 pub fn new(config: DlqConfig) -> Self {
112 Self {
113 config,
114 retry_queue: Arc::new(RwLock::new(VecDeque::new())),
115 dlq: Arc::new(RwLock::new(VecDeque::new())),
116 stats: Arc::new(RwLock::new(DlqStats::default())),
117 failure_history: Arc::new(RwLock::new(VecDeque::new())),
118 }
119 }
120
121 pub async fn handle_failed_event(
123 &self,
124 event: StreamEvent,
125 failure_reason: FailureReason,
126 error_message: String,
127 ) -> Result<()> {
128 let now = Utc::now();
129
130 let mut stats = self.stats.write().await;
132 stats.events_failed += 1;
133
134 let reason_key = format!("{:?}", failure_reason);
135 *stats.failure_by_reason.entry(reason_key).or_insert(0) += 1;
136
137 drop(stats);
138
139 let mut history = self.failure_history.write().await;
141 history.push_back((now, failure_reason.clone()));
142
143 if history.len() > 1000 {
145 history.pop_front();
146 }
147
148 drop(history);
149
150 let failed_event = FailedEvent {
152 event,
153 failure_reason: failure_reason.clone(),
154 error_message: error_message.clone(),
155 first_attempt: now,
156 last_attempt: now,
157 retry_count: 0,
158 stack_trace: None,
159 };
160
161 let mut retry_queue = self.retry_queue.write().await;
163 retry_queue.push_back(failed_event);
164
165 info!(
166 "Event failed, added to retry queue: {:?} - {}",
167 failure_reason, error_message
168 );
169
170 self.check_failure_rate().await;
172
173 Ok(())
174 }
175
176 pub async fn process_retries<F, Fut>(&self, retry_fn: F) -> Result<Vec<StreamEvent>>
178 where
179 F: Fn(StreamEvent) -> Fut + Send + Sync,
180 Fut: std::future::Future<Output = Result<()>> + Send,
181 {
182 let mut retry_queue = self.retry_queue.write().await;
183 let mut still_failing = Vec::new();
184 let mut successfully_retried = Vec::new();
185
186 while let Some(mut failed_event) = retry_queue.pop_front() {
187 let now = Utc::now();
188
189 let delay = self.calculate_retry_delay(failed_event.retry_count);
191 let time_since_last_attempt = now - failed_event.last_attempt;
192
193 if time_since_last_attempt < delay {
194 still_failing.push(failed_event);
196 continue;
197 }
198
199 let result = retry_fn(failed_event.event.clone()).await;
201
202 match result {
203 Ok(_) => {
204 successfully_retried.push(failed_event.event.clone());
206
207 let mut stats = self.stats.write().await;
208 stats.events_retried += 1;
209
210 info!(
211 "Event successfully retried after {} attempts",
212 failed_event.retry_count + 1
213 );
214 }
215 Err(e) => {
216 failed_event.retry_count += 1;
218 failed_event.last_attempt = now;
219 failed_event.error_message = e.to_string();
220
221 if failed_event.retry_count >= self.config.max_retries {
222 warn!(
224 "Event failed after {} retries, moving to DLQ: {}",
225 failed_event.retry_count, e
226 );
227
228 self.move_to_dlq(failed_event).await?;
229 } else {
230 still_failing.push(failed_event);
232 }
233 }
234 }
235 }
236
237 *retry_queue = still_failing.into();
239
240 Ok(successfully_retried)
241 }
242
243 async fn move_to_dlq(&self, failed_event: FailedEvent) -> Result<()> {
245 let mut dlq = self.dlq.write().await;
246
247 if dlq.len() >= self.config.max_dlq_size {
249 warn!("DLQ size limit reached, dropping oldest event");
250 dlq.pop_front();
251 }
252
253 dlq.push_back(failed_event);
254
255 let mut stats = self.stats.write().await;
256 stats.events_moved_to_dlq += 1;
257 stats.current_dlq_size = dlq.len();
258
259 Ok(())
260 }
261
262 fn calculate_retry_delay(&self, retry_count: u32) -> ChronoDuration {
264 let delay_ms = self.config.initial_retry_delay.num_milliseconds() as f64
265 * self.config.backoff_multiplier.powi(retry_count as i32);
266
267 let delay_ms = delay_ms.min(self.config.max_retry_delay.num_milliseconds() as f64);
268
269 ChronoDuration::milliseconds(delay_ms as i64)
270 }
271
272 pub async fn replay_dlq<F, Fut>(
274 &self,
275 replay_fn: F,
276 max_events: Option<usize>,
277 ) -> Result<Vec<StreamEvent>>
278 where
279 F: Fn(StreamEvent) -> Fut + Send + Sync,
280 Fut: std::future::Future<Output = Result<()>> + Send,
281 {
282 let mut dlq = self.dlq.write().await;
283 let mut successfully_replayed = Vec::new();
284 let mut still_failing = Vec::new();
285
286 let replay_count = max_events.unwrap_or(dlq.len()).min(dlq.len());
287
288 for _ in 0..replay_count {
289 if let Some(failed_event) = dlq.pop_front() {
290 let result = replay_fn(failed_event.event.clone()).await;
291
292 match result {
293 Ok(_) => {
294 successfully_replayed.push(failed_event.event.clone());
295
296 let mut stats = self.stats.write().await;
297 stats.events_replayed += 1;
298
299 info!("Event successfully replayed from DLQ");
300 }
301 Err(e) => {
302 error!("Event replay failed: {}", e);
303 still_failing.push(failed_event);
304 }
305 }
306 }
307 }
308
309 for failed_event in still_failing {
311 dlq.push_back(failed_event);
312 }
313
314 let mut stats = self.stats.write().await;
315 stats.current_dlq_size = dlq.len();
316 stats.last_replay = Some(Utc::now());
317
318 info!("Replayed {} events from DLQ", successfully_replayed.len());
319
320 Ok(successfully_replayed)
321 }
322
323 pub async fn get_by_reason(&self, reason: &FailureReason) -> Vec<FailedEvent> {
325 let dlq = self.dlq.read().await;
326
327 dlq.iter()
328 .filter(|evt| &evt.failure_reason == reason)
329 .cloned()
330 .collect()
331 }
332
333 pub async fn remove_from_dlq(&self, predicate: impl Fn(&FailedEvent) -> bool) -> usize {
335 let mut dlq = self.dlq.write().await;
336 let initial_size = dlq.len();
337
338 dlq.retain(|evt| !predicate(evt));
339
340 let removed = initial_size - dlq.len();
341
342 let mut stats = self.stats.write().await;
343 stats.current_dlq_size = dlq.len();
344
345 removed
346 }
347
348 pub async fn clear_dlq(&self) {
350 let mut dlq = self.dlq.write().await;
351 let cleared = dlq.len();
352 dlq.clear();
353
354 let mut stats = self.stats.write().await;
355 stats.current_dlq_size = 0;
356
357 info!("Cleared {} events from DLQ", cleared);
358 }
359
360 pub async fn stats(&self) -> DlqStats {
362 let mut stats = self.stats.read().await.clone();
363
364 stats.failure_rate = self.calculate_failure_rate().await;
366
367 stats
368 }
369
370 async fn calculate_failure_rate(&self) -> f64 {
372 let history = self.failure_history.read().await;
373
374 if history.is_empty() {
375 return 0.0;
376 }
377
378 let now = Utc::now();
380 let one_minute_ago = now - ChronoDuration::minutes(1);
381
382 let recent_failures = history
383 .iter()
384 .filter(|(timestamp, _)| *timestamp >= one_minute_ago)
385 .count();
386
387 let estimated_total = (recent_failures as f64 / 0.01).max(recent_failures as f64);
390
391 recent_failures as f64 / estimated_total
392 }
393
394 async fn check_failure_rate(&self) {
396 let failure_rate = self.calculate_failure_rate().await;
397
398 if failure_rate >= self.config.alert_threshold {
399 error!(
400 "ALERT: Failure rate ({:.2}%) exceeds threshold ({:.2}%)",
401 failure_rate * 100.0,
402 self.config.alert_threshold * 100.0
403 );
404
405 }
407 }
408
409 pub async fn retry_queue_size(&self) -> usize {
411 self.retry_queue.read().await.len()
412 }
413
414 pub async fn dlq_size(&self) -> usize {
416 self.dlq.read().await.len()
417 }
418
419 pub async fn get_all_dlq_events(&self) -> Vec<FailedEvent> {
421 self.dlq.read().await.iter().cloned().collect()
422 }
423}
424
425pub struct DlqEventProcessor<T> {
427 dlq: Arc<DeadLetterQueue>,
428 processor: Arc<dyn Fn(T) -> Result<()> + Send + Sync>,
429}
430
431impl<T: Clone + Into<StreamEvent>> DlqEventProcessor<T> {
432 pub fn new<F>(dlq: Arc<DeadLetterQueue>, processor: F) -> Self
433 where
434 F: Fn(T) -> Result<()> + Send + Sync + 'static,
435 {
436 Self {
437 dlq,
438 processor: Arc::new(processor),
439 }
440 }
441
442 pub async fn process(&self, event: T) -> Result<()> {
444 let stream_event = event.clone().into();
445
446 match (self.processor)(event) {
447 Ok(_) => Ok(()),
448 Err(e) => {
449 let failure_reason = self.categorize_error(&e);
451
452 self.dlq
454 .handle_failed_event(stream_event, failure_reason, e.to_string())
455 .await?;
456
457 Err(e)
458 }
459 }
460 }
461
462 fn categorize_error(&self, error: &anyhow::Error) -> FailureReason {
464 let error_str = error.to_string().to_lowercase();
465
466 if error_str.contains("network") || error_str.contains("connection") {
467 FailureReason::NetworkError
468 } else if error_str.contains("serializ") || error_str.contains("deserializ") {
469 FailureReason::SerializationError
470 } else if error_str.contains("validation") || error_str.contains("invalid") {
471 FailureReason::ValidationError
472 } else if error_str.contains("timeout") {
473 FailureReason::TimeoutError
474 } else {
475 FailureReason::Unknown(error.to_string())
476 }
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483 use crate::event::EventMetadata;
484 use anyhow::anyhow;
485
486 fn create_test_event() -> StreamEvent {
487 StreamEvent::TripleAdded {
488 subject: "test".to_string(),
489 predicate: "test".to_string(),
490 object: "test".to_string(),
491 graph: None,
492 metadata: EventMetadata::default(),
493 }
494 }
495
496 #[tokio::test]
497 async fn test_dlq_basic() {
498 let config = DlqConfig::default();
499 let dlq = DeadLetterQueue::new(config);
500
501 let event = create_test_event();
502
503 dlq.handle_failed_event(
504 event,
505 FailureReason::NetworkError,
506 "Connection failed".to_string(),
507 )
508 .await
509 .unwrap();
510
511 assert_eq!(dlq.retry_queue_size().await, 1);
512 assert_eq!(dlq.dlq_size().await, 0);
513
514 let stats = dlq.stats().await;
515 assert_eq!(stats.events_failed, 1);
516 }
517
518 #[tokio::test]
519 async fn test_retry_exhaustion() {
520 let config = DlqConfig {
521 max_retries: 2,
522 initial_retry_delay: ChronoDuration::milliseconds(1),
523 ..Default::default()
524 };
525
526 let dlq = DeadLetterQueue::new(config);
527
528 let event = create_test_event();
529
530 dlq.handle_failed_event(
531 event.clone(),
532 FailureReason::NetworkError,
533 "Connection failed".to_string(),
534 )
535 .await
536 .unwrap();
537
538 let retry_fn = |_: StreamEvent| async { Err(anyhow!("Still failing")) };
540
541 for _ in 0..3 {
542 tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
544 dlq.process_retries(retry_fn).await.unwrap();
545 }
546
547 assert_eq!(dlq.dlq_size().await, 1);
549 assert_eq!(dlq.retry_queue_size().await, 0);
550 }
551
552 #[tokio::test]
553 async fn test_successful_retry() {
554 let config = DlqConfig {
555 max_retries: 3,
556 initial_retry_delay: ChronoDuration::milliseconds(1),
557 ..Default::default()
558 };
559
560 let dlq = DeadLetterQueue::new(config);
561
562 let event = create_test_event();
563
564 dlq.handle_failed_event(
565 event.clone(),
566 FailureReason::NetworkError,
567 "Connection failed".to_string(),
568 )
569 .await
570 .unwrap();
571
572 tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
574
575 let retry_fn = |_: StreamEvent| async { Ok(()) };
577
578 let retried = dlq.process_retries(retry_fn).await.unwrap();
579
580 assert_eq!(retried.len(), 1);
581 assert_eq!(dlq.retry_queue_size().await, 0);
582 assert_eq!(dlq.dlq_size().await, 0);
583 }
584
585 #[tokio::test]
586 async fn test_dlq_replay() {
587 let config = DlqConfig::default();
588 let dlq = DeadLetterQueue::new(config);
589
590 for i in 0..5 {
592 let mut event = create_test_event();
593 if let StreamEvent::TripleAdded {
594 ref mut subject, ..
595 } = event
596 {
597 *subject = format!("test_{}", i);
598 }
599
600 let failed_event = FailedEvent {
602 event,
603 failure_reason: FailureReason::NetworkError,
604 error_message: "Connection failed".to_string(),
605 first_attempt: Utc::now(),
606 last_attempt: Utc::now(),
607 retry_count: 5,
608 stack_trace: None,
609 };
610
611 let mut dlq_queue = dlq.dlq.write().await;
612 dlq_queue.push_back(failed_event);
613 }
614
615 assert_eq!(dlq.dlq_size().await, 5);
616
617 let replay_fn = |_: StreamEvent| async { Ok(()) };
619
620 let replayed = dlq.replay_dlq(replay_fn, Some(3)).await.unwrap();
621
622 assert_eq!(replayed.len(), 3);
623 assert_eq!(dlq.dlq_size().await, 2);
624 }
625}