Skip to main content

agentic_workflow/resilience/
idempotency.rs

1use std::collections::HashMap;
2
3use chrono::Utc;
4
5use crate::types::{
6    ConflictResolution, IdempotencyConfig, IdempotencyEntry, IdempotencyReport,
7    IdempotencyWindow, KeyStrategy, StepIdempotencyStats,
8    WorkflowError, WorkflowResult,
9};
10
11/// Idempotency engine — deduplication for step executions.
12pub struct IdempotencyEngine {
13    configs: HashMap<String, IdempotencyConfig>,
14    cache: HashMap<String, IdempotencyEntry>,
15    hit_counts: HashMap<String, u64>,
16}
17
18impl IdempotencyEngine {
19    pub fn new() -> Self {
20        Self {
21            configs: HashMap::new(),
22            cache: HashMap::new(),
23            hit_counts: HashMap::new(),
24        }
25    }
26
27    /// Configure idempotency for a step.
28    pub fn configure(
29        &mut self,
30        step_id: &str,
31        key_strategy: KeyStrategy,
32        window: IdempotencyWindow,
33        conflict_resolution: ConflictResolution,
34    ) -> WorkflowResult<()> {
35        let config = IdempotencyConfig {
36            step_id: step_id.to_string(),
37            key_strategy,
38            window,
39            conflict_resolution,
40        };
41
42        self.configs.insert(step_id.to_string(), config);
43        Ok(())
44    }
45
46    /// Compute idempotency key for a step execution.
47    pub fn compute_key(
48        &self,
49        step_id: &str,
50        workflow_id: &str,
51        input: &serde_json::Value,
52    ) -> WorkflowResult<String> {
53        let config = self.configs.get(step_id);
54
55        match config.map(|c| &c.key_strategy) {
56            Some(KeyStrategy::InputHash) | None => {
57                let input_str = serde_json::to_string(input)
58                    .map_err(|e| WorkflowError::SerializationError(e.to_string()))?;
59                let hash = blake3::hash(input_str.as_bytes());
60                Ok(format!("{}:{}:{}", workflow_id, step_id, hash.to_hex()))
61            }
62            Some(KeyStrategy::Expression(expr)) => {
63                Ok(format!("{}:{}:{}", workflow_id, step_id, expr))
64            }
65            Some(KeyStrategy::FieldPath(path)) => {
66                let field_value = input
67                    .pointer(path)
68                    .map(|v| v.to_string())
69                    .unwrap_or_default();
70                Ok(format!("{}:{}:{}", workflow_id, step_id, field_value))
71            }
72        }
73    }
74
75    /// Check if a key was already processed.
76    pub fn check(&self, key: &str) -> Option<&IdempotencyEntry> {
77        let entry = self.cache.get(key)?;
78
79        // Check expiration
80        if let Some(expires_at) = entry.expires_at {
81            if Utc::now() > expires_at {
82                return None;
83            }
84        }
85
86        Some(entry)
87    }
88
89    /// Store execution result for deduplication.
90    pub fn store(
91        &mut self,
92        key: String,
93        step_id: &str,
94        execution_id: &str,
95        input_hash: &str,
96        output: serde_json::Value,
97    ) -> WorkflowResult<()> {
98        let config = self.configs.get(step_id);
99        let now = Utc::now();
100
101        let expires_at = match config.map(|c| &c.window) {
102            Some(IdempotencyWindow::Duration { ms }) => {
103                Some(now + chrono::Duration::milliseconds(*ms as i64))
104            }
105            Some(IdempotencyWindow::Forever) | None => None,
106            Some(IdempotencyWindow::UntilNextExecution) => None,
107        };
108
109        let entry = IdempotencyEntry {
110            key: key.clone(),
111            step_id: step_id.to_string(),
112            execution_id: execution_id.to_string(),
113            input_hash: input_hash.to_string(),
114            output,
115            created_at: now,
116            expires_at,
117        };
118
119        self.cache.insert(key, entry);
120        Ok(())
121    }
122
123    /// Record a cache hit.
124    pub fn record_hit(&mut self, step_id: &str) {
125        *self.hit_counts.entry(step_id.to_string()).or_insert(0) += 1;
126    }
127
128    /// Purge expired entries.
129    pub fn purge_expired(&mut self) -> usize {
130        let now = Utc::now();
131        let before = self.cache.len();
132        self.cache.retain(|_, entry| {
133            entry.expires_at.map_or(true, |exp| exp > now)
134        });
135        before - self.cache.len()
136    }
137
138    /// Get deduplication report.
139    pub fn report(&self) -> IdempotencyReport {
140        let mut by_step: HashMap<&str, (usize, u64)> = HashMap::new();
141
142        for entry in self.cache.values() {
143            by_step.entry(&entry.step_id).or_insert((0, 0)).0 += 1;
144        }
145
146        for (step_id, hits) in &self.hit_counts {
147            by_step.entry(step_id).or_insert((0, 0)).1 = *hits;
148        }
149
150        let total_hits: u64 = self.hit_counts.values().sum();
151        let total_checks = total_hits + self.cache.len() as u64;
152        let hit_rate = if total_checks > 0 {
153            total_hits as f64 / total_checks as f64
154        } else {
155            0.0
156        };
157
158        let stats: Vec<StepIdempotencyStats> = by_step
159            .into_iter()
160            .map(|(step_id, (entries, hits))| StepIdempotencyStats {
161                step_id: step_id.to_string(),
162                entries,
163                hits,
164                saved_executions: hits,
165            })
166            .collect();
167
168        IdempotencyReport {
169            total_entries: self.cache.len(),
170            deduplicated_count: total_hits,
171            cache_hit_rate: hit_rate,
172            oldest_entry: self.cache.values().map(|e| e.created_at).min(),
173            by_step: stats,
174        }
175    }
176
177    /// Clear all cached entries.
178    pub fn clear(&mut self) {
179        self.cache.clear();
180    }
181}
182
183impl Default for IdempotencyEngine {
184    fn default() -> Self {
185        Self::new()
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[test]
194    fn test_idempotency_dedup() {
195        let mut engine = IdempotencyEngine::new();
196        let key = engine
197            .compute_key("step-1", "wf-1", &serde_json::json!({"x": 1}))
198            .unwrap();
199
200        assert!(engine.check(&key).is_none());
201
202        engine
203            .store(key.clone(), "step-1", "exec-1", "abc", serde_json::json!({"result": 42}))
204            .unwrap();
205
206        assert!(engine.check(&key).is_some());
207        assert_eq!(
208            engine.check(&key).unwrap().output,
209            serde_json::json!({"result": 42})
210        );
211    }
212}