agentic_workflow/resilience/
dead_letter.rs1use std::collections::HashMap;
2
3use chrono::Utc;
4use uuid::Uuid;
5
6use crate::types::{
7 DeadLetterItem, DeadLetterPolicy, DeadLetterSummary, FailureGroup,
8 WorkflowError, WorkflowResult,
9};
10
11pub struct DeadLetterEngine {
13 items: HashMap<String, DeadLetterItem>,
14 policy: DeadLetterPolicy,
15}
16
17impl DeadLetterEngine {
18 pub fn new() -> Self {
19 Self {
20 items: HashMap::new(),
21 policy: DeadLetterPolicy {
22 retention_days: 30,
23 auto_resurrect_on_recovery: true,
24 max_items: Some(10_000),
25 alert_threshold: Some(100),
26 },
27 }
28 }
29
30 pub fn add_item(
32 &mut self,
33 execution_id: &str,
34 workflow_id: &str,
35 step_id: &str,
36 failure_class: &str,
37 error_message: &str,
38 input_data: serde_json::Value,
39 attempt_count: u32,
40 ) -> WorkflowResult<String> {
41 let id = Uuid::new_v4().to_string();
42 let now = Utc::now();
43
44 let expires_at = chrono::Duration::days(self.policy.retention_days as i64)
45 .checked_add(&chrono::Duration::zero())
46 .map(|d| now + d);
47
48 let resurrectable = failure_class != "permanent" && failure_class != "authentication";
49
50 let item = DeadLetterItem {
51 id: id.clone(),
52 execution_id: execution_id.to_string(),
53 workflow_id: workflow_id.to_string(),
54 step_id: step_id.to_string(),
55 failure_class: failure_class.to_string(),
56 error_message: error_message.to_string(),
57 input_data,
58 attempt_count,
59 failed_at: now,
60 expires_at,
61 resurrectable,
62 };
63
64 self.items.insert(id.clone(), item);
65
66 if let Some(threshold) = self.policy.alert_threshold {
68 if self.items.len() >= threshold {
69 eprintln!(
70 "Dead letter queue alert: {} items (threshold: {})",
71 self.items.len(),
72 threshold
73 );
74 }
75 }
76
77 Ok(id)
78 }
79
80 pub fn list_items(&self) -> Vec<&DeadLetterItem> {
82 self.items.values().collect()
83 }
84
85 pub fn summary(&self) -> DeadLetterSummary {
87 let mut groups: HashMap<&str, (usize, bool, String)> = HashMap::new();
88
89 for item in self.items.values() {
90 let entry = groups
91 .entry(&item.failure_class)
92 .or_insert((0, item.resurrectable, item.error_message.clone()));
93 entry.0 += 1;
94 }
95
96 let by_failure_class: Vec<FailureGroup> = groups
97 .into_iter()
98 .map(|(class, (count, auto_retryable, sample_error))| FailureGroup {
99 failure_class: class.to_string(),
100 count,
101 auto_retryable,
102 sample_error,
103 })
104 .collect();
105
106 let auto_retryable = self.items.values().filter(|i| i.resurrectable).count();
107 let needs_human = self.items.len() - auto_retryable;
108 let oldest = self.items.values().map(|i| i.failed_at).min();
109
110 DeadLetterSummary {
111 total_items: self.items.len(),
112 by_failure_class,
113 auto_retryable,
114 needs_human,
115 oldest_item: oldest,
116 }
117 }
118
119 pub fn remove_item(&mut self, item_id: &str) -> WorkflowResult<DeadLetterItem> {
121 self.items.remove(item_id).ok_or_else(|| {
122 WorkflowError::Internal(format!("Dead letter item not found: {}", item_id))
123 })
124 }
125
126 pub fn purge_expired(&mut self) -> usize {
128 let now = Utc::now();
129 let before = self.items.len();
130 self.items.retain(|_, item| {
131 item.expires_at.map_or(true, |exp| exp > now)
132 });
133 before - self.items.len()
134 }
135
136 pub fn retryable_items(&self, failure_class: &str) -> Vec<&DeadLetterItem> {
138 self.items
139 .values()
140 .filter(|i| i.resurrectable && i.failure_class == failure_class)
141 .collect()
142 }
143
144 pub fn set_policy(&mut self, policy: DeadLetterPolicy) {
146 self.policy = policy;
147 }
148
149 pub fn get_policy(&self) -> &DeadLetterPolicy {
151 &self.policy
152 }
153}
154
155impl Default for DeadLetterEngine {
156 fn default() -> Self {
157 Self::new()
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164
165 #[test]
166 fn test_dead_letter_summary() {
167 let mut engine = DeadLetterEngine::new();
168 engine
169 .add_item("e1", "w1", "s1", "rate_limit", "429", serde_json::json!({}), 3)
170 .unwrap();
171 engine
172 .add_item("e2", "w1", "s2", "rate_limit", "429", serde_json::json!({}), 2)
173 .unwrap();
174 engine
175 .add_item("e3", "w1", "s3", "permanent", "invalid data", serde_json::json!({}), 1)
176 .unwrap();
177
178 let summary = engine.summary();
179 assert_eq!(summary.total_items, 3);
180 assert_eq!(summary.auto_retryable, 2);
181 assert_eq!(summary.needs_human, 1);
182 }
183}