Skip to main content

chainindex_core/
dlq.rs

1//! Dead letter queue — captures and retries failed handler events.
2//!
3//! When an event handler fails (returns error), the event is stored in the DLQ
4//! rather than blocking the indexer. Events are retried with exponential backoff.
5//!
6//! # Architecture
7//!
8//! ```text
9//! Handler fails → DlqEntry created → pushed to DeadLetterQueue
10//!                                         ↓
11//!                                    pop_ready() returns entries due for retry
12//!                                         ↓
13//!                              retry handler → success: remove
14//!                                            → failure: bump attempt, reschedule
15//!                                            → max retries: mark Failed
16//! ```
17
18use std::collections::VecDeque;
19use std::time::Duration;
20
21use serde::{Deserialize, Serialize};
22
23use crate::handler::DecodedEvent;
24
25// ─── DlqStatus ──────────────────────────────────────────────────────────────
26
27/// Status of a DLQ entry.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
29pub enum DlqStatus {
30    /// Waiting for the next retry attempt.
31    Pending,
32    /// Currently being retried.
33    Retrying,
34    /// Permanently failed — max retries exceeded.
35    Failed,
36}
37
38// ─── DlqEntry ───────────────────────────────────────────────────────────────
39
40/// A single entry in the dead letter queue.
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct DlqEntry {
43    /// Unique ID for this DLQ entry.
44    pub id: String,
45    /// The event that failed processing.
46    pub event: DecodedEvent,
47    /// Name of the handler that failed.
48    pub handler_name: String,
49    /// Error message from the last failure.
50    pub error_message: String,
51    /// Number of times this event has been attempted.
52    pub attempt_count: u32,
53    /// Maximum number of attempts before marking as permanently failed.
54    pub max_attempts: u32,
55    /// Unix timestamp when this event first failed.
56    pub first_failed_at: i64,
57    /// Unix timestamp of the most recent failure.
58    pub last_failed_at: i64,
59    /// Unix timestamp when this entry should next be retried.
60    pub next_retry_at: i64,
61    /// Current status.
62    pub status: DlqStatus,
63}
64
65// ─── DlqConfig ──────────────────────────────────────────────────────────────
66
67/// Configuration for the dead letter queue.
68#[derive(Debug, Clone)]
69pub struct DlqConfig {
70    /// Maximum number of retry attempts (default: 5).
71    pub max_retries: u32,
72    /// Initial backoff duration (default: 1 second).
73    pub initial_backoff: Duration,
74    /// Maximum backoff duration (default: 5 minutes).
75    pub max_backoff: Duration,
76    /// Backoff multiplier (default: 2.0 — exponential).
77    pub backoff_multiplier: f64,
78}
79
80impl Default for DlqConfig {
81    fn default() -> Self {
82        Self {
83            max_retries: 5,
84            initial_backoff: Duration::from_secs(1),
85            max_backoff: Duration::from_secs(300),
86            backoff_multiplier: 2.0,
87        }
88    }
89}
90
91// ─── DlqStats ───────────────────────────────────────────────────────────────
92
93/// Statistics about the dead letter queue.
94#[derive(Debug, Clone, Default, Serialize, Deserialize)]
95pub struct DlqStats {
96    /// Total entries ever added to the DLQ.
97    pub total_added: u64,
98    /// Currently pending entries (waiting for retry).
99    pub pending: u64,
100    /// Permanently failed entries.
101    pub failed: u64,
102    /// Entries successfully retried and removed.
103    pub retried_success: u64,
104}
105
106// ─── DeadLetterQueue ────────────────────────────────────────────────────────
107
108/// In-memory dead letter queue for failed handler events.
109///
110/// Thread-safe — uses internal `Mutex`. For production, back with a database table.
111pub struct DeadLetterQueue {
112    config: DlqConfig,
113    entries: std::sync::Mutex<VecDeque<DlqEntry>>,
114    stats: std::sync::Mutex<DlqStats>,
115}
116
117impl DeadLetterQueue {
118    /// Create a new DLQ with the given configuration.
119    pub fn new(config: DlqConfig) -> Self {
120        Self {
121            config,
122            entries: std::sync::Mutex::new(VecDeque::new()),
123            stats: std::sync::Mutex::new(DlqStats::default()),
124        }
125    }
126
127    /// Push a failed event into the DLQ.
128    ///
129    /// The entry is scheduled for retry based on the configured backoff.
130    pub fn push(
131        &self,
132        event: DecodedEvent,
133        handler_name: impl Into<String>,
134        error_message: impl Into<String>,
135    ) {
136        let now = chrono::Utc::now().timestamp();
137        let next_retry = now + self.config.initial_backoff.as_secs() as i64;
138
139        let entry = DlqEntry {
140            id: format!("dlq-{}-{}-{}", event.tx_hash, event.log_index, now),
141            event,
142            handler_name: handler_name.into(),
143            error_message: error_message.into(),
144            attempt_count: 1,
145            max_attempts: self.config.max_retries,
146            first_failed_at: now,
147            last_failed_at: now,
148            next_retry_at: next_retry,
149            status: DlqStatus::Pending,
150        };
151
152        let mut entries = self.entries.lock().unwrap();
153        entries.push_back(entry);
154
155        let mut stats = self.stats.lock().unwrap();
156        stats.total_added += 1;
157        stats.pending += 1;
158    }
159
160    /// Pop all entries that are due for retry (next_retry_at <= now).
161    ///
162    /// Returns entries with status set to `Retrying`.
163    pub fn pop_ready(&self, now: i64) -> Vec<DlqEntry> {
164        let mut entries = self.entries.lock().unwrap();
165        let mut ready = Vec::new();
166
167        for entry in entries.iter_mut() {
168            if entry.status == DlqStatus::Pending && entry.next_retry_at <= now {
169                entry.status = DlqStatus::Retrying;
170                ready.push(entry.clone());
171            }
172        }
173
174        ready
175    }
176
177    /// Mark an entry as successfully retried (removes it from the DLQ).
178    pub fn mark_success(&self, id: &str) {
179        let mut entries = self.entries.lock().unwrap();
180        let before = entries.len();
181        entries.retain(|e| e.id != id);
182        if entries.len() < before {
183            let mut stats = self.stats.lock().unwrap();
184            stats.pending = stats.pending.saturating_sub(1);
185            stats.retried_success += 1;
186        }
187    }
188
189    /// Mark an entry as failed again (reschedule or permanently fail).
190    pub fn mark_failed(&self, id: &str, error_message: impl Into<String>) {
191        let mut entries = self.entries.lock().unwrap();
192        let now = chrono::Utc::now().timestamp();
193        let error_msg = error_message.into();
194
195        if let Some(entry) = entries.iter_mut().find(|e| e.id == id) {
196            entry.attempt_count += 1;
197            entry.last_failed_at = now;
198            entry.error_message = error_msg;
199
200            if entry.attempt_count >= entry.max_attempts {
201                entry.status = DlqStatus::Failed;
202                let mut stats = self.stats.lock().unwrap();
203                stats.pending = stats.pending.saturating_sub(1);
204                stats.failed += 1;
205            } else {
206                // Exponential backoff
207                let backoff = self.compute_backoff(entry.attempt_count);
208                entry.next_retry_at = now + backoff.as_secs() as i64;
209                entry.status = DlqStatus::Pending;
210            }
211        }
212    }
213
214    /// Get all entries with a specific status.
215    pub fn get_by_status(&self, status: DlqStatus) -> Vec<DlqEntry> {
216        let entries = self.entries.lock().unwrap();
217        entries
218            .iter()
219            .filter(|e| e.status == status)
220            .cloned()
221            .collect()
222    }
223
224    /// Get a single entry by ID.
225    pub fn get(&self, id: &str) -> Option<DlqEntry> {
226        let entries = self.entries.lock().unwrap();
227        entries.iter().find(|e| e.id == id).cloned()
228    }
229
230    /// Total number of entries in the DLQ.
231    pub fn len(&self) -> usize {
232        let entries = self.entries.lock().unwrap();
233        entries.len()
234    }
235
236    /// Returns true if the DLQ is empty.
237    pub fn is_empty(&self) -> bool {
238        self.len() == 0
239    }
240
241    /// Purge (remove) all entries with `last_failed_at` before the given timestamp.
242    pub fn purge_before(&self, timestamp: i64) -> usize {
243        let mut entries = self.entries.lock().unwrap();
244        let before = entries.len();
245        entries.retain(|e| e.last_failed_at >= timestamp);
246        let removed = before - entries.len();
247
248        if removed > 0 {
249            let mut stats = self.stats.lock().unwrap();
250            // Approximate — some may have been pending, some failed
251            stats.pending = stats.pending.saturating_sub(removed as u64);
252        }
253
254        removed
255    }
256
257    /// Reset all `Failed` entries to `Pending` for another round of retries.
258    pub fn retry_all_failed(&self) -> usize {
259        let mut entries = self.entries.lock().unwrap();
260        let now = chrono::Utc::now().timestamp();
261        let mut count = 0;
262
263        for entry in entries.iter_mut() {
264            if entry.status == DlqStatus::Failed {
265                entry.status = DlqStatus::Pending;
266                entry.attempt_count = 0;
267                entry.next_retry_at = now;
268                count += 1;
269            }
270        }
271
272        if count > 0 {
273            let mut stats = self.stats.lock().unwrap();
274            stats.failed = stats.failed.saturating_sub(count as u64);
275            stats.pending += count as u64;
276        }
277
278        count
279    }
280
281    /// Get current DLQ statistics.
282    pub fn stats(&self) -> DlqStats {
283        let stats = self.stats.lock().unwrap();
284        stats.clone()
285    }
286
287    /// Compute the backoff duration for a given attempt number.
288    fn compute_backoff(&self, attempt: u32) -> Duration {
289        let base = self.config.initial_backoff.as_secs_f64();
290        let multiplier = self
291            .config
292            .backoff_multiplier
293            .powi(attempt.saturating_sub(1) as i32);
294        let backoff_secs = base * multiplier;
295        let max_secs = self.config.max_backoff.as_secs_f64();
296        Duration::from_secs_f64(backoff_secs.min(max_secs))
297    }
298}
299
300impl Default for DeadLetterQueue {
301    fn default() -> Self {
302        Self::new(DlqConfig::default())
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309
310    fn make_event(block: u64) -> DecodedEvent {
311        DecodedEvent {
312            chain: "ethereum".into(),
313            schema: "Transfer".into(),
314            address: "0xtoken".into(),
315            tx_hash: format!("0xtx_{block}"),
316            block_number: block,
317            log_index: 0,
318            fields_json: serde_json::json!({"from": "0xA", "to": "0xB"}),
319        }
320    }
321
322    fn test_config() -> DlqConfig {
323        DlqConfig {
324            max_retries: 3,
325            initial_backoff: Duration::from_secs(1),
326            max_backoff: Duration::from_secs(60),
327            backoff_multiplier: 2.0,
328        }
329    }
330
331    #[test]
332    fn push_entry() {
333        let dlq = DeadLetterQueue::new(test_config());
334        dlq.push(make_event(100), "handler1", "connection timeout");
335
336        assert_eq!(dlq.len(), 1);
337        let stats = dlq.stats();
338        assert_eq!(stats.total_added, 1);
339        assert_eq!(stats.pending, 1);
340    }
341
342    #[test]
343    fn pop_ready_returns_due_entries() {
344        let dlq = DeadLetterQueue::new(test_config());
345        dlq.push(make_event(100), "handler1", "error");
346
347        // Not ready yet (now < next_retry_at)
348        let now = chrono::Utc::now().timestamp();
349        let ready = dlq.pop_ready(now - 10);
350        assert!(ready.is_empty());
351
352        // Ready (now >= next_retry_at)
353        let ready = dlq.pop_ready(now + 10);
354        assert_eq!(ready.len(), 1);
355        assert_eq!(ready[0].status, DlqStatus::Retrying);
356    }
357
358    #[test]
359    fn mark_success_removes_entry() {
360        let dlq = DeadLetterQueue::new(test_config());
361        dlq.push(make_event(100), "handler1", "error");
362
363        let now = chrono::Utc::now().timestamp() + 10;
364        let ready = dlq.pop_ready(now);
365        let id = ready[0].id.clone();
366
367        dlq.mark_success(&id);
368        assert_eq!(dlq.len(), 0);
369
370        let stats = dlq.stats();
371        assert_eq!(stats.retried_success, 1);
372        assert_eq!(stats.pending, 0);
373    }
374
375    #[test]
376    fn mark_failed_reschedules() {
377        let dlq = DeadLetterQueue::new(test_config());
378        dlq.push(make_event(100), "handler1", "error");
379
380        let now = chrono::Utc::now().timestamp() + 10;
381        let ready = dlq.pop_ready(now);
382        let id = ready[0].id.clone();
383
384        dlq.mark_failed(&id, "still broken");
385
386        let entry = dlq.get(&id).unwrap();
387        assert_eq!(entry.status, DlqStatus::Pending);
388        assert_eq!(entry.attempt_count, 2);
389        // next_retry_at should be at least the internal now (which is >= our `now - 10`)
390        // plus the backoff (2s for attempt 2). Just verify it was rescheduled.
391        assert!(entry.next_retry_at >= entry.last_failed_at);
392    }
393
394    #[test]
395    fn max_retries_marks_failed() {
396        let dlq = DeadLetterQueue::new(DlqConfig {
397            max_retries: 2,
398            ..test_config()
399        });
400        dlq.push(make_event(100), "handler1", "error");
401
402        let now = chrono::Utc::now().timestamp() + 100;
403        let ready = dlq.pop_ready(now);
404        let id = ready[0].id.clone();
405
406        // Attempt 2 → reaches max_retries (2)
407        dlq.mark_failed(&id, "still broken");
408
409        let entry = dlq.get(&id).unwrap();
410        assert_eq!(entry.status, DlqStatus::Failed);
411        assert_eq!(entry.attempt_count, 2);
412
413        let stats = dlq.stats();
414        assert_eq!(stats.failed, 1);
415        assert_eq!(stats.pending, 0);
416    }
417
418    #[test]
419    fn get_by_status() {
420        let dlq = DeadLetterQueue::new(DlqConfig {
421            max_retries: 2,
422            ..test_config()
423        });
424
425        dlq.push(make_event(100), "h1", "error1");
426        dlq.push(make_event(101), "h2", "error2");
427
428        // Pop and fail only the first one permanently
429        let now = chrono::Utc::now().timestamp() + 100;
430        let ready = dlq.pop_ready(now);
431        assert_eq!(ready.len(), 2);
432
433        // Mark the first as permanently failed
434        dlq.mark_failed(&ready[0].id, "still broken");
435
436        let failed = dlq.get_by_status(DlqStatus::Failed);
437        assert_eq!(failed.len(), 1);
438
439        // The second entry is still in Retrying status (was popped but not yet resolved)
440        let retrying = dlq.get_by_status(DlqStatus::Retrying);
441        assert_eq!(retrying.len(), 1);
442    }
443
444    #[test]
445    fn exponential_backoff() {
446        let dlq = DeadLetterQueue::new(test_config());
447
448        // attempt 1: 1s * 2^0 = 1s
449        let b1 = dlq.compute_backoff(1);
450        assert_eq!(b1, Duration::from_secs(1));
451
452        // attempt 2: 1s * 2^1 = 2s
453        let b2 = dlq.compute_backoff(2);
454        assert_eq!(b2, Duration::from_secs(2));
455
456        // attempt 3: 1s * 2^2 = 4s
457        let b3 = dlq.compute_backoff(3);
458        assert_eq!(b3, Duration::from_secs(4));
459    }
460
461    #[test]
462    fn backoff_capped_at_max() {
463        let dlq = DeadLetterQueue::new(DlqConfig {
464            max_backoff: Duration::from_secs(10),
465            ..test_config()
466        });
467
468        // attempt 10: would be 1s * 2^9 = 512s, but capped at 10s
469        let b = dlq.compute_backoff(10);
470        assert_eq!(b, Duration::from_secs(10));
471    }
472
473    #[test]
474    fn purge_before_removes_old_entries() {
475        let dlq = DeadLetterQueue::new(test_config());
476        dlq.push(make_event(100), "h1", "error1");
477        dlq.push(make_event(101), "h2", "error2");
478
479        // Purge entries last failed before far future → removes all
480        let far_future = chrono::Utc::now().timestamp() + 10000;
481        let removed = dlq.purge_before(far_future);
482        assert_eq!(removed, 2);
483        assert!(dlq.is_empty());
484    }
485
486    #[test]
487    fn retry_all_failed() {
488        let dlq = DeadLetterQueue::new(DlqConfig {
489            max_retries: 1,
490            ..test_config()
491        });
492        dlq.push(make_event(100), "h1", "error");
493
494        // Exhaust retries
495        let now = chrono::Utc::now().timestamp() + 100;
496        let ready = dlq.pop_ready(now);
497        dlq.mark_failed(&ready[0].id, "still broken");
498
499        assert_eq!(dlq.get_by_status(DlqStatus::Failed).len(), 1);
500
501        // Retry all failed
502        let count = dlq.retry_all_failed();
503        assert_eq!(count, 1);
504        assert_eq!(dlq.get_by_status(DlqStatus::Pending).len(), 1);
505        assert_eq!(dlq.get_by_status(DlqStatus::Failed).len(), 0);
506    }
507
508    #[test]
509    fn stats_tracking() {
510        let dlq = DeadLetterQueue::new(test_config());
511
512        dlq.push(make_event(100), "h1", "error1");
513        dlq.push(make_event(101), "h2", "error2");
514
515        let stats = dlq.stats();
516        assert_eq!(stats.total_added, 2);
517        assert_eq!(stats.pending, 2);
518        assert_eq!(stats.failed, 0);
519        assert_eq!(stats.retried_success, 0);
520    }
521}