dag_executor/advanced/
dead_letter.rs1use crate::error::Result;
4use crate::state::now_millis;
5use crate::storage::Storage;
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct DeadLetter {
12 pub task_id: String,
14 pub attempts: u32,
16 pub error: String,
18 pub timestamp: u64,
20}
21
22pub struct DeadLetterQueue {
27 storage: Arc<dyn Storage>,
28}
29
30impl DeadLetterQueue {
31 pub fn new(storage: Arc<dyn Storage>) -> Self {
33 DeadLetterQueue { storage }
34 }
35
36 fn key(task_id: &str) -> String {
37 format!("dlq:{task_id}")
38 }
39
40 pub async fn push(&self, task_id: &str, attempts: u32, error: impl Into<String>) -> Result<()> {
42 let entry = DeadLetter {
43 task_id: task_id.to_string(),
44 attempts,
45 error: error.into(),
46 timestamp: now_millis(),
47 };
48 let value = serde_json::to_value(&entry).map_err(crate::error::StorageError::from)?;
49 self.storage.save(&Self::key(task_id), &value).await?;
50 Ok(())
51 }
52
53 pub async fn get(&self, task_id: &str) -> Result<Option<DeadLetter>> {
55 match self.storage.load(&Self::key(task_id)).await? {
56 Some(value) => {
57 let entry =
58 serde_json::from_value(value).map_err(crate::error::StorageError::from)?;
59 Ok(Some(entry))
60 }
61 None => Ok(None),
62 }
63 }
64
65 pub async fn list(&self) -> Result<Vec<DeadLetter>> {
67 let mut out = Vec::new();
68 for key in self.storage.list().await? {
69 if let Some(task_id) = key.strip_prefix("dlq:") {
70 if let Some(entry) = self.get(task_id).await? {
71 out.push(entry);
72 }
73 }
74 }
75 Ok(out)
76 }
77
78 pub async fn remove(&self, task_id: &str) -> Result<()> {
80 self.storage.delete(&Self::key(task_id)).await?;
81 Ok(())
82 }
83}