Skip to main content

agentic_workflow/resilience/
dead_letter.rs

1use std::collections::HashMap;
2
3use chrono::Utc;
4use uuid::Uuid;
5
6use crate::types::{
7    DeadLetterItem, DeadLetterPolicy, DeadLetterSummary, FailureGroup,
8    WorkflowError, WorkflowResult,
9};
10
11/// Intelligent dead letter management engine.
12pub struct DeadLetterEngine {
13    items: HashMap<String, DeadLetterItem>,
14    policy: DeadLetterPolicy,
15}
16
17impl DeadLetterEngine {
18    pub fn new() -> Self {
19        Self {
20            items: HashMap::new(),
21            policy: DeadLetterPolicy {
22                retention_days: 30,
23                auto_resurrect_on_recovery: true,
24                max_items: Some(10_000),
25                alert_threshold: Some(100),
26            },
27        }
28    }
29
30    /// Add a failed item to the dead letter queue.
31    pub fn add_item(
32        &mut self,
33        execution_id: &str,
34        workflow_id: &str,
35        step_id: &str,
36        failure_class: &str,
37        error_message: &str,
38        input_data: serde_json::Value,
39        attempt_count: u32,
40    ) -> WorkflowResult<String> {
41        let id = Uuid::new_v4().to_string();
42        let now = Utc::now();
43
44        let expires_at = chrono::Duration::days(self.policy.retention_days as i64)
45            .checked_add(&chrono::Duration::zero())
46            .map(|d| now + d);
47
48        let resurrectable = failure_class != "permanent" && failure_class != "authentication";
49
50        let item = DeadLetterItem {
51            id: id.clone(),
52            execution_id: execution_id.to_string(),
53            workflow_id: workflow_id.to_string(),
54            step_id: step_id.to_string(),
55            failure_class: failure_class.to_string(),
56            error_message: error_message.to_string(),
57            input_data,
58            attempt_count,
59            failed_at: now,
60            expires_at,
61            resurrectable,
62        };
63
64        self.items.insert(id.clone(), item);
65
66        // Check alert threshold
67        if let Some(threshold) = self.policy.alert_threshold {
68            if self.items.len() >= threshold {
69                eprintln!(
70                    "Dead letter queue alert: {} items (threshold: {})",
71                    self.items.len(),
72                    threshold
73                );
74            }
75        }
76
77        Ok(id)
78    }
79
80    /// List all dead letter items.
81    pub fn list_items(&self) -> Vec<&DeadLetterItem> {
82        self.items.values().collect()
83    }
84
85    /// Get a summary grouped by failure class.
86    pub fn summary(&self) -> DeadLetterSummary {
87        let mut groups: HashMap<&str, (usize, bool, String)> = HashMap::new();
88
89        for item in self.items.values() {
90            let entry = groups
91                .entry(&item.failure_class)
92                .or_insert((0, item.resurrectable, item.error_message.clone()));
93            entry.0 += 1;
94        }
95
96        let by_failure_class: Vec<FailureGroup> = groups
97            .into_iter()
98            .map(|(class, (count, auto_retryable, sample_error))| FailureGroup {
99                failure_class: class.to_string(),
100                count,
101                auto_retryable,
102                sample_error,
103            })
104            .collect();
105
106        let auto_retryable = self.items.values().filter(|i| i.resurrectable).count();
107        let needs_human = self.items.len() - auto_retryable;
108        let oldest = self.items.values().map(|i| i.failed_at).min();
109
110        DeadLetterSummary {
111            total_items: self.items.len(),
112            by_failure_class,
113            auto_retryable,
114            needs_human,
115            oldest_item: oldest,
116        }
117    }
118
119    /// Remove an item (after successful retry or manual resolution).
120    pub fn remove_item(&mut self, item_id: &str) -> WorkflowResult<DeadLetterItem> {
121        self.items.remove(item_id).ok_or_else(|| {
122            WorkflowError::Internal(format!("Dead letter item not found: {}", item_id))
123        })
124    }
125
126    /// Purge expired items.
127    pub fn purge_expired(&mut self) -> usize {
128        let now = Utc::now();
129        let before = self.items.len();
130        self.items.retain(|_, item| {
131            item.expires_at.map_or(true, |exp| exp > now)
132        });
133        before - self.items.len()
134    }
135
136    /// Get items that can be auto-retried (for service recovery).
137    pub fn retryable_items(&self, failure_class: &str) -> Vec<&DeadLetterItem> {
138        self.items
139            .values()
140            .filter(|i| i.resurrectable && i.failure_class == failure_class)
141            .collect()
142    }
143
144    /// Update retention policy.
145    pub fn set_policy(&mut self, policy: DeadLetterPolicy) {
146        self.policy = policy;
147    }
148
149    /// Get current policy.
150    pub fn get_policy(&self) -> &DeadLetterPolicy {
151        &self.policy
152    }
153}
154
155impl Default for DeadLetterEngine {
156    fn default() -> Self {
157        Self::new()
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164
165    #[test]
166    fn test_dead_letter_summary() {
167        let mut engine = DeadLetterEngine::new();
168        engine
169            .add_item("e1", "w1", "s1", "rate_limit", "429", serde_json::json!({}), 3)
170            .unwrap();
171        engine
172            .add_item("e2", "w1", "s2", "rate_limit", "429", serde_json::json!({}), 2)
173            .unwrap();
174        engine
175            .add_item("e3", "w1", "s3", "permanent", "invalid data", serde_json::json!({}), 1)
176            .unwrap();
177
178        let summary = engine.summary();
179        assert_eq!(summary.total_items, 3);
180        assert_eq!(summary.auto_retryable, 2);
181        assert_eq!(summary.needs_human, 1);
182    }
183}