1use std::collections::VecDeque;
19use std::time::Duration;
20
21use serde::{Deserialize, Serialize};
22
23use crate::handler::DecodedEvent;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
29pub enum DlqStatus {
30 Pending,
32 Retrying,
34 Failed,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct DlqEntry {
43 pub id: String,
45 pub event: DecodedEvent,
47 pub handler_name: String,
49 pub error_message: String,
51 pub attempt_count: u32,
53 pub max_attempts: u32,
55 pub first_failed_at: i64,
57 pub last_failed_at: i64,
59 pub next_retry_at: i64,
61 pub status: DlqStatus,
63}
64
65#[derive(Debug, Clone)]
69pub struct DlqConfig {
70 pub max_retries: u32,
72 pub initial_backoff: Duration,
74 pub max_backoff: Duration,
76 pub backoff_multiplier: f64,
78}
79
80impl Default for DlqConfig {
81 fn default() -> Self {
82 Self {
83 max_retries: 5,
84 initial_backoff: Duration::from_secs(1),
85 max_backoff: Duration::from_secs(300),
86 backoff_multiplier: 2.0,
87 }
88 }
89}
90
91#[derive(Debug, Clone, Default, Serialize, Deserialize)]
95pub struct DlqStats {
96 pub total_added: u64,
98 pub pending: u64,
100 pub failed: u64,
102 pub retried_success: u64,
104}
105
106pub struct DeadLetterQueue {
112 config: DlqConfig,
113 entries: std::sync::Mutex<VecDeque<DlqEntry>>,
114 stats: std::sync::Mutex<DlqStats>,
115}
116
117impl DeadLetterQueue {
118 pub fn new(config: DlqConfig) -> Self {
120 Self {
121 config,
122 entries: std::sync::Mutex::new(VecDeque::new()),
123 stats: std::sync::Mutex::new(DlqStats::default()),
124 }
125 }
126
127 pub fn push(
131 &self,
132 event: DecodedEvent,
133 handler_name: impl Into<String>,
134 error_message: impl Into<String>,
135 ) {
136 let now = chrono::Utc::now().timestamp();
137 let next_retry = now + self.config.initial_backoff.as_secs() as i64;
138
139 let entry = DlqEntry {
140 id: format!("dlq-{}-{}-{}", event.tx_hash, event.log_index, now),
141 event,
142 handler_name: handler_name.into(),
143 error_message: error_message.into(),
144 attempt_count: 1,
145 max_attempts: self.config.max_retries,
146 first_failed_at: now,
147 last_failed_at: now,
148 next_retry_at: next_retry,
149 status: DlqStatus::Pending,
150 };
151
152 let mut entries = self.entries.lock().unwrap();
153 entries.push_back(entry);
154
155 let mut stats = self.stats.lock().unwrap();
156 stats.total_added += 1;
157 stats.pending += 1;
158 }
159
160 pub fn pop_ready(&self, now: i64) -> Vec<DlqEntry> {
164 let mut entries = self.entries.lock().unwrap();
165 let mut ready = Vec::new();
166
167 for entry in entries.iter_mut() {
168 if entry.status == DlqStatus::Pending && entry.next_retry_at <= now {
169 entry.status = DlqStatus::Retrying;
170 ready.push(entry.clone());
171 }
172 }
173
174 ready
175 }
176
177 pub fn mark_success(&self, id: &str) {
179 let mut entries = self.entries.lock().unwrap();
180 let before = entries.len();
181 entries.retain(|e| e.id != id);
182 if entries.len() < before {
183 let mut stats = self.stats.lock().unwrap();
184 stats.pending = stats.pending.saturating_sub(1);
185 stats.retried_success += 1;
186 }
187 }
188
189 pub fn mark_failed(&self, id: &str, error_message: impl Into<String>) {
191 let mut entries = self.entries.lock().unwrap();
192 let now = chrono::Utc::now().timestamp();
193 let error_msg = error_message.into();
194
195 if let Some(entry) = entries.iter_mut().find(|e| e.id == id) {
196 entry.attempt_count += 1;
197 entry.last_failed_at = now;
198 entry.error_message = error_msg;
199
200 if entry.attempt_count >= entry.max_attempts {
201 entry.status = DlqStatus::Failed;
202 let mut stats = self.stats.lock().unwrap();
203 stats.pending = stats.pending.saturating_sub(1);
204 stats.failed += 1;
205 } else {
206 let backoff = self.compute_backoff(entry.attempt_count);
208 entry.next_retry_at = now + backoff.as_secs() as i64;
209 entry.status = DlqStatus::Pending;
210 }
211 }
212 }
213
214 pub fn get_by_status(&self, status: DlqStatus) -> Vec<DlqEntry> {
216 let entries = self.entries.lock().unwrap();
217 entries
218 .iter()
219 .filter(|e| e.status == status)
220 .cloned()
221 .collect()
222 }
223
224 pub fn get(&self, id: &str) -> Option<DlqEntry> {
226 let entries = self.entries.lock().unwrap();
227 entries.iter().find(|e| e.id == id).cloned()
228 }
229
230 pub fn len(&self) -> usize {
232 let entries = self.entries.lock().unwrap();
233 entries.len()
234 }
235
236 pub fn is_empty(&self) -> bool {
238 self.len() == 0
239 }
240
241 pub fn purge_before(&self, timestamp: i64) -> usize {
243 let mut entries = self.entries.lock().unwrap();
244 let before = entries.len();
245 entries.retain(|e| e.last_failed_at >= timestamp);
246 let removed = before - entries.len();
247
248 if removed > 0 {
249 let mut stats = self.stats.lock().unwrap();
250 stats.pending = stats.pending.saturating_sub(removed as u64);
252 }
253
254 removed
255 }
256
257 pub fn retry_all_failed(&self) -> usize {
259 let mut entries = self.entries.lock().unwrap();
260 let now = chrono::Utc::now().timestamp();
261 let mut count = 0;
262
263 for entry in entries.iter_mut() {
264 if entry.status == DlqStatus::Failed {
265 entry.status = DlqStatus::Pending;
266 entry.attempt_count = 0;
267 entry.next_retry_at = now;
268 count += 1;
269 }
270 }
271
272 if count > 0 {
273 let mut stats = self.stats.lock().unwrap();
274 stats.failed = stats.failed.saturating_sub(count as u64);
275 stats.pending += count as u64;
276 }
277
278 count
279 }
280
281 pub fn stats(&self) -> DlqStats {
283 let stats = self.stats.lock().unwrap();
284 stats.clone()
285 }
286
287 fn compute_backoff(&self, attempt: u32) -> Duration {
289 let base = self.config.initial_backoff.as_secs_f64();
290 let multiplier = self
291 .config
292 .backoff_multiplier
293 .powi(attempt.saturating_sub(1) as i32);
294 let backoff_secs = base * multiplier;
295 let max_secs = self.config.max_backoff.as_secs_f64();
296 Duration::from_secs_f64(backoff_secs.min(max_secs))
297 }
298}
299
300impl Default for DeadLetterQueue {
301 fn default() -> Self {
302 Self::new(DlqConfig::default())
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309
310 fn make_event(block: u64) -> DecodedEvent {
311 DecodedEvent {
312 chain: "ethereum".into(),
313 schema: "Transfer".into(),
314 address: "0xtoken".into(),
315 tx_hash: format!("0xtx_{block}"),
316 block_number: block,
317 log_index: 0,
318 fields_json: serde_json::json!({"from": "0xA", "to": "0xB"}),
319 }
320 }
321
322 fn test_config() -> DlqConfig {
323 DlqConfig {
324 max_retries: 3,
325 initial_backoff: Duration::from_secs(1),
326 max_backoff: Duration::from_secs(60),
327 backoff_multiplier: 2.0,
328 }
329 }
330
331 #[test]
332 fn push_entry() {
333 let dlq = DeadLetterQueue::new(test_config());
334 dlq.push(make_event(100), "handler1", "connection timeout");
335
336 assert_eq!(dlq.len(), 1);
337 let stats = dlq.stats();
338 assert_eq!(stats.total_added, 1);
339 assert_eq!(stats.pending, 1);
340 }
341
342 #[test]
343 fn pop_ready_returns_due_entries() {
344 let dlq = DeadLetterQueue::new(test_config());
345 dlq.push(make_event(100), "handler1", "error");
346
347 let now = chrono::Utc::now().timestamp();
349 let ready = dlq.pop_ready(now - 10);
350 assert!(ready.is_empty());
351
352 let ready = dlq.pop_ready(now + 10);
354 assert_eq!(ready.len(), 1);
355 assert_eq!(ready[0].status, DlqStatus::Retrying);
356 }
357
358 #[test]
359 fn mark_success_removes_entry() {
360 let dlq = DeadLetterQueue::new(test_config());
361 dlq.push(make_event(100), "handler1", "error");
362
363 let now = chrono::Utc::now().timestamp() + 10;
364 let ready = dlq.pop_ready(now);
365 let id = ready[0].id.clone();
366
367 dlq.mark_success(&id);
368 assert_eq!(dlq.len(), 0);
369
370 let stats = dlq.stats();
371 assert_eq!(stats.retried_success, 1);
372 assert_eq!(stats.pending, 0);
373 }
374
375 #[test]
376 fn mark_failed_reschedules() {
377 let dlq = DeadLetterQueue::new(test_config());
378 dlq.push(make_event(100), "handler1", "error");
379
380 let now = chrono::Utc::now().timestamp() + 10;
381 let ready = dlq.pop_ready(now);
382 let id = ready[0].id.clone();
383
384 dlq.mark_failed(&id, "still broken");
385
386 let entry = dlq.get(&id).unwrap();
387 assert_eq!(entry.status, DlqStatus::Pending);
388 assert_eq!(entry.attempt_count, 2);
389 assert!(entry.next_retry_at >= entry.last_failed_at);
392 }
393
394 #[test]
395 fn max_retries_marks_failed() {
396 let dlq = DeadLetterQueue::new(DlqConfig {
397 max_retries: 2,
398 ..test_config()
399 });
400 dlq.push(make_event(100), "handler1", "error");
401
402 let now = chrono::Utc::now().timestamp() + 100;
403 let ready = dlq.pop_ready(now);
404 let id = ready[0].id.clone();
405
406 dlq.mark_failed(&id, "still broken");
408
409 let entry = dlq.get(&id).unwrap();
410 assert_eq!(entry.status, DlqStatus::Failed);
411 assert_eq!(entry.attempt_count, 2);
412
413 let stats = dlq.stats();
414 assert_eq!(stats.failed, 1);
415 assert_eq!(stats.pending, 0);
416 }
417
418 #[test]
419 fn get_by_status() {
420 let dlq = DeadLetterQueue::new(DlqConfig {
421 max_retries: 2,
422 ..test_config()
423 });
424
425 dlq.push(make_event(100), "h1", "error1");
426 dlq.push(make_event(101), "h2", "error2");
427
428 let now = chrono::Utc::now().timestamp() + 100;
430 let ready = dlq.pop_ready(now);
431 assert_eq!(ready.len(), 2);
432
433 dlq.mark_failed(&ready[0].id, "still broken");
435
436 let failed = dlq.get_by_status(DlqStatus::Failed);
437 assert_eq!(failed.len(), 1);
438
439 let retrying = dlq.get_by_status(DlqStatus::Retrying);
441 assert_eq!(retrying.len(), 1);
442 }
443
444 #[test]
445 fn exponential_backoff() {
446 let dlq = DeadLetterQueue::new(test_config());
447
448 let b1 = dlq.compute_backoff(1);
450 assert_eq!(b1, Duration::from_secs(1));
451
452 let b2 = dlq.compute_backoff(2);
454 assert_eq!(b2, Duration::from_secs(2));
455
456 let b3 = dlq.compute_backoff(3);
458 assert_eq!(b3, Duration::from_secs(4));
459 }
460
461 #[test]
462 fn backoff_capped_at_max() {
463 let dlq = DeadLetterQueue::new(DlqConfig {
464 max_backoff: Duration::from_secs(10),
465 ..test_config()
466 });
467
468 let b = dlq.compute_backoff(10);
470 assert_eq!(b, Duration::from_secs(10));
471 }
472
473 #[test]
474 fn purge_before_removes_old_entries() {
475 let dlq = DeadLetterQueue::new(test_config());
476 dlq.push(make_event(100), "h1", "error1");
477 dlq.push(make_event(101), "h2", "error2");
478
479 let far_future = chrono::Utc::now().timestamp() + 10000;
481 let removed = dlq.purge_before(far_future);
482 assert_eq!(removed, 2);
483 assert!(dlq.is_empty());
484 }
485
486 #[test]
487 fn retry_all_failed() {
488 let dlq = DeadLetterQueue::new(DlqConfig {
489 max_retries: 1,
490 ..test_config()
491 });
492 dlq.push(make_event(100), "h1", "error");
493
494 let now = chrono::Utc::now().timestamp() + 100;
496 let ready = dlq.pop_ready(now);
497 dlq.mark_failed(&ready[0].id, "still broken");
498
499 assert_eq!(dlq.get_by_status(DlqStatus::Failed).len(), 1);
500
501 let count = dlq.retry_all_failed();
503 assert_eq!(count, 1);
504 assert_eq!(dlq.get_by_status(DlqStatus::Pending).len(), 1);
505 assert_eq!(dlq.get_by_status(DlqStatus::Failed).len(), 0);
506 }
507
508 #[test]
509 fn stats_tracking() {
510 let dlq = DeadLetterQueue::new(test_config());
511
512 dlq.push(make_event(100), "h1", "error1");
513 dlq.push(make_event(101), "h2", "error2");
514
515 let stats = dlq.stats();
516 assert_eq!(stats.total_added, 2);
517 assert_eq!(stats.pending, 2);
518 assert_eq!(stats.failed, 0);
519 assert_eq!(stats.retried_success, 0);
520 }
521}