Skip to main content

agentic_workflow/intelligence/
archaeology.rs

1use std::collections::HashMap;
2
3use crate::types::{
4    ExecutionFingerprint, WorkflowError, WorkflowResult,
5};
6
7/// Execution archaeology — compare, analyze, and diagnose executions.
8pub struct ArchaeologyEngine {
9    fingerprints: Vec<ExecutionFingerprint>,
10}
11
12impl ArchaeologyEngine {
13    pub fn new() -> Self {
14        Self {
15            fingerprints: Vec::new(),
16        }
17    }
18
19    /// Store an execution fingerprint.
20    pub fn record_fingerprint(&mut self, fp: ExecutionFingerprint) {
21        self.fingerprints.push(fp);
22    }
23
24    /// Compare two executions.
25    pub fn compare(
26        &self,
27        exec_a: &str,
28        exec_b: &str,
29    ) -> WorkflowResult<ExecutionComparison> {
30        let fp_a = self
31            .fingerprints
32            .iter()
33            .find(|f| f.execution_id == exec_a)
34            .ok_or_else(|| WorkflowError::ExecutionNotFound(exec_a.to_string()))?;
35
36        let fp_b = self
37            .fingerprints
38            .iter()
39            .find(|f| f.execution_id == exec_b)
40            .ok_or_else(|| WorkflowError::ExecutionNotFound(exec_b.to_string()))?;
41
42        let mut step_diffs = Vec::new();
43        for (step_id, dur_a) in &fp_a.step_durations {
44            if let Some(dur_b) = fp_b.step_durations.get(step_id) {
45                let ratio = *dur_b as f64 / *dur_a as f64;
46                if ratio > 1.5 || ratio < 0.5 {
47                    step_diffs.push(StepDiff {
48                        step_id: step_id.clone(),
49                        duration_a_ms: *dur_a,
50                        duration_b_ms: *dur_b,
51                        ratio,
52                    });
53                }
54            }
55        }
56
57        Ok(ExecutionComparison {
58            execution_a: exec_a.to_string(),
59            execution_b: exec_b.to_string(),
60            duration_a_ms: fp_a.total_duration_ms,
61            duration_b_ms: fp_b.total_duration_ms,
62            duration_ratio: fp_b.total_duration_ms as f64 / fp_a.total_duration_ms.max(1) as f64,
63            significant_step_diffs: step_diffs,
64        })
65    }
66
67    /// Detect anomalous executions for a workflow.
68    pub fn detect_anomalies(&self, workflow_id: &str) -> Vec<Anomaly> {
69        let wf_fps: Vec<&ExecutionFingerprint> = self
70            .fingerprints
71            .iter()
72            .filter(|f| f.workflow_id == workflow_id)
73            .collect();
74
75        if wf_fps.len() < 3 {
76            return Vec::new();
77        }
78
79        let avg_duration: f64 =
80            wf_fps.iter().map(|f| f.total_duration_ms as f64).sum::<f64>() / wf_fps.len() as f64;
81
82        let mut anomalies = Vec::new();
83        for fp in &wf_fps {
84            let ratio = fp.total_duration_ms as f64 / avg_duration;
85            if ratio > 3.0 || ratio < 0.1 {
86                anomalies.push(Anomaly {
87                    execution_id: fp.execution_id.clone(),
88                    metric: "duration".to_string(),
89                    actual: fp.total_duration_ms as f64,
90                    expected: avg_duration,
91                    deviation_factor: ratio,
92                });
93            }
94        }
95
96        anomalies
97    }
98
99    /// Identify bottleneck steps across executions.
100    pub fn bottlenecks(&self, workflow_id: &str) -> Vec<Bottleneck> {
101        let wf_fps: Vec<&ExecutionFingerprint> = self
102            .fingerprints
103            .iter()
104            .filter(|f| f.workflow_id == workflow_id)
105            .collect();
106
107        if wf_fps.is_empty() {
108            return Vec::new();
109        }
110
111        // Average duration per step
112        let mut step_totals: HashMap<&str, (u64, usize)> = HashMap::new();
113        let mut total_workflow_time: u64 = 0;
114
115        for fp in &wf_fps {
116            total_workflow_time += fp.total_duration_ms;
117            for (step_id, dur) in &fp.step_durations {
118                let entry = step_totals.entry(step_id.as_str()).or_insert((0, 0));
119                entry.0 += dur;
120                entry.1 += 1;
121            }
122        }
123
124        let mut bottlenecks: Vec<Bottleneck> = step_totals
125            .into_iter()
126            .map(|(step_id, (total, count))| {
127                let avg = total as f64 / count as f64;
128                let pct = if total_workflow_time > 0 {
129                    (total as f64 / total_workflow_time as f64) * 100.0
130                } else {
131                    0.0
132                };
133                Bottleneck {
134                    step_id: step_id.to_string(),
135                    avg_duration_ms: avg as u64,
136                    percent_of_total: pct,
137                }
138            })
139            .collect();
140
141        bottlenecks.sort_by(|a, b| b.percent_of_total.partial_cmp(&a.percent_of_total).unwrap());
142        bottlenecks
143    }
144
145    /// Get fingerprints for a workflow.
146    pub fn get_fingerprints(&self, workflow_id: &str) -> Vec<&ExecutionFingerprint> {
147        self.fingerprints
148            .iter()
149            .filter(|f| f.workflow_id == workflow_id)
150            .collect()
151    }
152}
153
154impl Default for ArchaeologyEngine {
155    fn default() -> Self {
156        Self::new()
157    }
158}
159
160/// Comparison between two executions.
161#[derive(Debug, Clone, serde::Serialize)]
162pub struct ExecutionComparison {
163    pub execution_a: String,
164    pub execution_b: String,
165    pub duration_a_ms: u64,
166    pub duration_b_ms: u64,
167    pub duration_ratio: f64,
168    pub significant_step_diffs: Vec<StepDiff>,
169}
170
171#[derive(Debug, Clone, serde::Serialize)]
172pub struct StepDiff {
173    pub step_id: String,
174    pub duration_a_ms: u64,
175    pub duration_b_ms: u64,
176    pub ratio: f64,
177}
178
179/// An execution anomaly.
180#[derive(Debug, Clone, serde::Serialize)]
181pub struct Anomaly {
182    pub execution_id: String,
183    pub metric: String,
184    pub actual: f64,
185    pub expected: f64,
186    pub deviation_factor: f64,
187}
188
189/// A bottleneck step.
190#[derive(Debug, Clone, serde::Serialize)]
191pub struct Bottleneck {
192    pub step_id: String,
193    pub avg_duration_ms: u64,
194    pub percent_of_total: f64,
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200    use chrono::Utc;
201
202    #[test]
203    fn test_anomaly_detection() {
204        let mut engine = ArchaeologyEngine::new();
205
206        for i in 0..5 {
207            engine.record_fingerprint(ExecutionFingerprint {
208                execution_id: format!("exec-{}", i),
209                workflow_id: "wf-1".to_string(),
210                total_duration_ms: 1000,
211                step_durations: HashMap::new(),
212                step_outcomes: HashMap::new(),
213                retry_count: 0,
214                completed_at: Utc::now(),
215            });
216        }
217
218        // Add an anomalous execution
219        engine.record_fingerprint(ExecutionFingerprint {
220            execution_id: "exec-outlier".to_string(),
221            workflow_id: "wf-1".to_string(),
222            total_duration_ms: 100_000,
223            step_durations: HashMap::new(),
224            step_outcomes: HashMap::new(),
225            retry_count: 0,
226            completed_at: Utc::now(),
227        });
228
229        let anomalies = engine.detect_anomalies("wf-1");
230        assert!(!anomalies.is_empty());
231    }
232}