agentic_workflow/resilience/
idempotency.rs1use std::collections::HashMap;
2
3use chrono::Utc;
4
5use crate::types::{
6 ConflictResolution, IdempotencyConfig, IdempotencyEntry, IdempotencyReport,
7 IdempotencyWindow, KeyStrategy, StepIdempotencyStats,
8 WorkflowError, WorkflowResult,
9};
10
11pub struct IdempotencyEngine {
13 configs: HashMap<String, IdempotencyConfig>,
14 cache: HashMap<String, IdempotencyEntry>,
15 hit_counts: HashMap<String, u64>,
16}
17
18impl IdempotencyEngine {
19 pub fn new() -> Self {
20 Self {
21 configs: HashMap::new(),
22 cache: HashMap::new(),
23 hit_counts: HashMap::new(),
24 }
25 }
26
27 pub fn configure(
29 &mut self,
30 step_id: &str,
31 key_strategy: KeyStrategy,
32 window: IdempotencyWindow,
33 conflict_resolution: ConflictResolution,
34 ) -> WorkflowResult<()> {
35 let config = IdempotencyConfig {
36 step_id: step_id.to_string(),
37 key_strategy,
38 window,
39 conflict_resolution,
40 };
41
42 self.configs.insert(step_id.to_string(), config);
43 Ok(())
44 }
45
46 pub fn compute_key(
48 &self,
49 step_id: &str,
50 workflow_id: &str,
51 input: &serde_json::Value,
52 ) -> WorkflowResult<String> {
53 let config = self.configs.get(step_id);
54
55 match config.map(|c| &c.key_strategy) {
56 Some(KeyStrategy::InputHash) | None => {
57 let input_str = serde_json::to_string(input)
58 .map_err(|e| WorkflowError::SerializationError(e.to_string()))?;
59 let hash = blake3::hash(input_str.as_bytes());
60 Ok(format!("{}:{}:{}", workflow_id, step_id, hash.to_hex()))
61 }
62 Some(KeyStrategy::Expression(expr)) => {
63 Ok(format!("{}:{}:{}", workflow_id, step_id, expr))
64 }
65 Some(KeyStrategy::FieldPath(path)) => {
66 let field_value = input
67 .pointer(path)
68 .map(|v| v.to_string())
69 .unwrap_or_default();
70 Ok(format!("{}:{}:{}", workflow_id, step_id, field_value))
71 }
72 }
73 }
74
75 pub fn check(&self, key: &str) -> Option<&IdempotencyEntry> {
77 let entry = self.cache.get(key)?;
78
79 if let Some(expires_at) = entry.expires_at {
81 if Utc::now() > expires_at {
82 return None;
83 }
84 }
85
86 Some(entry)
87 }
88
89 pub fn store(
91 &mut self,
92 key: String,
93 step_id: &str,
94 execution_id: &str,
95 input_hash: &str,
96 output: serde_json::Value,
97 ) -> WorkflowResult<()> {
98 let config = self.configs.get(step_id);
99 let now = Utc::now();
100
101 let expires_at = match config.map(|c| &c.window) {
102 Some(IdempotencyWindow::Duration { ms }) => {
103 Some(now + chrono::Duration::milliseconds(*ms as i64))
104 }
105 Some(IdempotencyWindow::Forever) | None => None,
106 Some(IdempotencyWindow::UntilNextExecution) => None,
107 };
108
109 let entry = IdempotencyEntry {
110 key: key.clone(),
111 step_id: step_id.to_string(),
112 execution_id: execution_id.to_string(),
113 input_hash: input_hash.to_string(),
114 output,
115 created_at: now,
116 expires_at,
117 };
118
119 self.cache.insert(key, entry);
120 Ok(())
121 }
122
123 pub fn record_hit(&mut self, step_id: &str) {
125 *self.hit_counts.entry(step_id.to_string()).or_insert(0) += 1;
126 }
127
128 pub fn purge_expired(&mut self) -> usize {
130 let now = Utc::now();
131 let before = self.cache.len();
132 self.cache.retain(|_, entry| {
133 entry.expires_at.map_or(true, |exp| exp > now)
134 });
135 before - self.cache.len()
136 }
137
138 pub fn report(&self) -> IdempotencyReport {
140 let mut by_step: HashMap<&str, (usize, u64)> = HashMap::new();
141
142 for entry in self.cache.values() {
143 by_step.entry(&entry.step_id).or_insert((0, 0)).0 += 1;
144 }
145
146 for (step_id, hits) in &self.hit_counts {
147 by_step.entry(step_id).or_insert((0, 0)).1 = *hits;
148 }
149
150 let total_hits: u64 = self.hit_counts.values().sum();
151 let total_checks = total_hits + self.cache.len() as u64;
152 let hit_rate = if total_checks > 0 {
153 total_hits as f64 / total_checks as f64
154 } else {
155 0.0
156 };
157
158 let stats: Vec<StepIdempotencyStats> = by_step
159 .into_iter()
160 .map(|(step_id, (entries, hits))| StepIdempotencyStats {
161 step_id: step_id.to_string(),
162 entries,
163 hits,
164 saved_executions: hits,
165 })
166 .collect();
167
168 IdempotencyReport {
169 total_entries: self.cache.len(),
170 deduplicated_count: total_hits,
171 cache_hit_rate: hit_rate,
172 oldest_entry: self.cache.values().map(|e| e.created_at).min(),
173 by_step: stats,
174 }
175 }
176
177 pub fn clear(&mut self) {
179 self.cache.clear();
180 }
181}
182
183impl Default for IdempotencyEngine {
184 fn default() -> Self {
185 Self::new()
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 #[test]
194 fn test_idempotency_dedup() {
195 let mut engine = IdempotencyEngine::new();
196 let key = engine
197 .compute_key("step-1", "wf-1", &serde_json::json!({"x": 1}))
198 .unwrap();
199
200 assert!(engine.check(&key).is_none());
201
202 engine
203 .store(key.clone(), "step-1", "exec-1", "abc", serde_json::json!({"result": 42}))
204 .unwrap();
205
206 assert!(engine.check(&key).is_some());
207 assert_eq!(
208 engine.check(&key).unwrap().output,
209 serde_json::json!({"result": 42})
210 );
211 }
212}