Skip to main content

dag_executor/advanced/
dead_letter.rs

1//! Dead-letter queue for terminally failed tasks.
2
3use crate::error::Result;
4use crate::state::now_millis;
5use crate::storage::Storage;
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8
9/// A record describing why a task was dead-lettered.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct DeadLetter {
12    /// The task that failed.
13    pub task_id: String,
14    /// Number of attempts made before giving up.
15    pub attempts: u32,
16    /// The final error message.
17    pub error: String,
18    /// When the task was dead-lettered (ms since epoch).
19    pub timestamp: u64,
20}
21
22/// Durable storage for tasks that exhausted their retries.
23///
24/// Entries are persisted under `dlq:{task_id}` so they survive restarts and can
25/// be inspected or replayed by an operator.
26pub struct DeadLetterQueue {
27    storage: Arc<dyn Storage>,
28}
29
30impl DeadLetterQueue {
31    /// Wrap a storage backend as a dead-letter queue.
32    pub fn new(storage: Arc<dyn Storage>) -> Self {
33        DeadLetterQueue { storage }
34    }
35
36    fn key(task_id: &str) -> String {
37        format!("dlq:{task_id}")
38    }
39
40    /// Add a failed task to the queue.
41    pub async fn push(&self, task_id: &str, attempts: u32, error: impl Into<String>) -> Result<()> {
42        let entry = DeadLetter {
43            task_id: task_id.to_string(),
44            attempts,
45            error: error.into(),
46            timestamp: now_millis(),
47        };
48        let value = serde_json::to_value(&entry).map_err(crate::error::StorageError::from)?;
49        self.storage.save(&Self::key(task_id), &value).await?;
50        Ok(())
51    }
52
53    /// Fetch the dead-letter entry for a task, if present.
54    pub async fn get(&self, task_id: &str) -> Result<Option<DeadLetter>> {
55        match self.storage.load(&Self::key(task_id)).await? {
56            Some(value) => {
57                let entry =
58                    serde_json::from_value(value).map_err(crate::error::StorageError::from)?;
59                Ok(Some(entry))
60            }
61            None => Ok(None),
62        }
63    }
64
65    /// List all dead-lettered entries.
66    pub async fn list(&self) -> Result<Vec<DeadLetter>> {
67        let mut out = Vec::new();
68        for key in self.storage.list().await? {
69            if let Some(task_id) = key.strip_prefix("dlq:") {
70                if let Some(entry) = self.get(task_id).await? {
71                    out.push(entry);
72                }
73            }
74        }
75        Ok(out)
76    }
77
78    /// Remove a task from the queue (e.g. after a successful manual replay).
79    pub async fn remove(&self, task_id: &str) -> Result<()> {
80        self.storage.delete(&Self::key(task_id)).await?;
81        Ok(())
82    }
83}