dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Dead-letter queue for terminally failed tasks.

use crate::error::Result;
use crate::state::now_millis;
use crate::storage::Storage;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

/// A record describing why a task was dead-lettered.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeadLetter {
    /// The task that failed.
    pub task_id: String,
    /// Number of attempts made before giving up.
    pub attempts: u32,
    /// The final error message.
    pub error: String,
    /// When the task was dead-lettered (ms since epoch).
    pub timestamp: u64,
}

/// Durable storage for tasks that exhausted their retries.
///
/// Entries are persisted under `dlq:{task_id}` so they survive restarts and can
/// be inspected or replayed by an operator.
pub struct DeadLetterQueue {
    storage: Arc<dyn Storage>,
}

impl DeadLetterQueue {
    /// Wrap a storage backend as a dead-letter queue.
    pub fn new(storage: Arc<dyn Storage>) -> Self {
        DeadLetterQueue { storage }
    }

    fn key(task_id: &str) -> String {
        format!("dlq:{task_id}")
    }

    /// Add a failed task to the queue.
    pub async fn push(&self, task_id: &str, attempts: u32, error: impl Into<String>) -> Result<()> {
        let entry = DeadLetter {
            task_id: task_id.to_string(),
            attempts,
            error: error.into(),
            timestamp: now_millis(),
        };
        let value = serde_json::to_value(&entry).map_err(crate::error::StorageError::from)?;
        self.storage.save(&Self::key(task_id), &value).await?;
        Ok(())
    }

    /// Fetch the dead-letter entry for a task, if present.
    pub async fn get(&self, task_id: &str) -> Result<Option<DeadLetter>> {
        match self.storage.load(&Self::key(task_id)).await? {
            Some(value) => {
                let entry =
                    serde_json::from_value(value).map_err(crate::error::StorageError::from)?;
                Ok(Some(entry))
            }
            None => Ok(None),
        }
    }

    /// List all dead-lettered entries.
    pub async fn list(&self) -> Result<Vec<DeadLetter>> {
        let mut out = Vec::new();
        for key in self.storage.list().await? {
            if let Some(task_id) = key.strip_prefix("dlq:") {
                if let Some(entry) = self.get(task_id).await? {
                    out.push(entry);
                }
            }
        }
        Ok(out)
    }

    /// Remove a task from the queue (e.g. after a successful manual replay).
    pub async fn remove(&self, task_id: &str) -> Result<()> {
        self.storage.delete(&Self::key(task_id)).await?;
        Ok(())
    }
}