agentic_workflow/intelligence/
archaeology.rs1use std::collections::HashMap;
2
3use crate::types::{
4 ExecutionFingerprint, WorkflowError, WorkflowResult,
5};
6
7pub struct ArchaeologyEngine {
9 fingerprints: Vec<ExecutionFingerprint>,
10}
11
12impl ArchaeologyEngine {
13 pub fn new() -> Self {
14 Self {
15 fingerprints: Vec::new(),
16 }
17 }
18
19 pub fn record_fingerprint(&mut self, fp: ExecutionFingerprint) {
21 self.fingerprints.push(fp);
22 }
23
24 pub fn compare(
26 &self,
27 exec_a: &str,
28 exec_b: &str,
29 ) -> WorkflowResult<ExecutionComparison> {
30 let fp_a = self
31 .fingerprints
32 .iter()
33 .find(|f| f.execution_id == exec_a)
34 .ok_or_else(|| WorkflowError::ExecutionNotFound(exec_a.to_string()))?;
35
36 let fp_b = self
37 .fingerprints
38 .iter()
39 .find(|f| f.execution_id == exec_b)
40 .ok_or_else(|| WorkflowError::ExecutionNotFound(exec_b.to_string()))?;
41
42 let mut step_diffs = Vec::new();
43 for (step_id, dur_a) in &fp_a.step_durations {
44 if let Some(dur_b) = fp_b.step_durations.get(step_id) {
45 let ratio = *dur_b as f64 / *dur_a as f64;
46 if ratio > 1.5 || ratio < 0.5 {
47 step_diffs.push(StepDiff {
48 step_id: step_id.clone(),
49 duration_a_ms: *dur_a,
50 duration_b_ms: *dur_b,
51 ratio,
52 });
53 }
54 }
55 }
56
57 Ok(ExecutionComparison {
58 execution_a: exec_a.to_string(),
59 execution_b: exec_b.to_string(),
60 duration_a_ms: fp_a.total_duration_ms,
61 duration_b_ms: fp_b.total_duration_ms,
62 duration_ratio: fp_b.total_duration_ms as f64 / fp_a.total_duration_ms.max(1) as f64,
63 significant_step_diffs: step_diffs,
64 })
65 }
66
67 pub fn detect_anomalies(&self, workflow_id: &str) -> Vec<Anomaly> {
69 let wf_fps: Vec<&ExecutionFingerprint> = self
70 .fingerprints
71 .iter()
72 .filter(|f| f.workflow_id == workflow_id)
73 .collect();
74
75 if wf_fps.len() < 3 {
76 return Vec::new();
77 }
78
79 let avg_duration: f64 =
80 wf_fps.iter().map(|f| f.total_duration_ms as f64).sum::<f64>() / wf_fps.len() as f64;
81
82 let mut anomalies = Vec::new();
83 for fp in &wf_fps {
84 let ratio = fp.total_duration_ms as f64 / avg_duration;
85 if ratio > 3.0 || ratio < 0.1 {
86 anomalies.push(Anomaly {
87 execution_id: fp.execution_id.clone(),
88 metric: "duration".to_string(),
89 actual: fp.total_duration_ms as f64,
90 expected: avg_duration,
91 deviation_factor: ratio,
92 });
93 }
94 }
95
96 anomalies
97 }
98
99 pub fn bottlenecks(&self, workflow_id: &str) -> Vec<Bottleneck> {
101 let wf_fps: Vec<&ExecutionFingerprint> = self
102 .fingerprints
103 .iter()
104 .filter(|f| f.workflow_id == workflow_id)
105 .collect();
106
107 if wf_fps.is_empty() {
108 return Vec::new();
109 }
110
111 let mut step_totals: HashMap<&str, (u64, usize)> = HashMap::new();
113 let mut total_workflow_time: u64 = 0;
114
115 for fp in &wf_fps {
116 total_workflow_time += fp.total_duration_ms;
117 for (step_id, dur) in &fp.step_durations {
118 let entry = step_totals.entry(step_id.as_str()).or_insert((0, 0));
119 entry.0 += dur;
120 entry.1 += 1;
121 }
122 }
123
124 let mut bottlenecks: Vec<Bottleneck> = step_totals
125 .into_iter()
126 .map(|(step_id, (total, count))| {
127 let avg = total as f64 / count as f64;
128 let pct = if total_workflow_time > 0 {
129 (total as f64 / total_workflow_time as f64) * 100.0
130 } else {
131 0.0
132 };
133 Bottleneck {
134 step_id: step_id.to_string(),
135 avg_duration_ms: avg as u64,
136 percent_of_total: pct,
137 }
138 })
139 .collect();
140
141 bottlenecks.sort_by(|a, b| b.percent_of_total.partial_cmp(&a.percent_of_total).unwrap());
142 bottlenecks
143 }
144
145 pub fn get_fingerprints(&self, workflow_id: &str) -> Vec<&ExecutionFingerprint> {
147 self.fingerprints
148 .iter()
149 .filter(|f| f.workflow_id == workflow_id)
150 .collect()
151 }
152}
153
154impl Default for ArchaeologyEngine {
155 fn default() -> Self {
156 Self::new()
157 }
158}
159
160#[derive(Debug, Clone, serde::Serialize)]
162pub struct ExecutionComparison {
163 pub execution_a: String,
164 pub execution_b: String,
165 pub duration_a_ms: u64,
166 pub duration_b_ms: u64,
167 pub duration_ratio: f64,
168 pub significant_step_diffs: Vec<StepDiff>,
169}
170
171#[derive(Debug, Clone, serde::Serialize)]
172pub struct StepDiff {
173 pub step_id: String,
174 pub duration_a_ms: u64,
175 pub duration_b_ms: u64,
176 pub ratio: f64,
177}
178
179#[derive(Debug, Clone, serde::Serialize)]
181pub struct Anomaly {
182 pub execution_id: String,
183 pub metric: String,
184 pub actual: f64,
185 pub expected: f64,
186 pub deviation_factor: f64,
187}
188
189#[derive(Debug, Clone, serde::Serialize)]
191pub struct Bottleneck {
192 pub step_id: String,
193 pub avg_duration_ms: u64,
194 pub percent_of_total: f64,
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200 use chrono::Utc;
201
202 #[test]
203 fn test_anomaly_detection() {
204 let mut engine = ArchaeologyEngine::new();
205
206 for i in 0..5 {
207 engine.record_fingerprint(ExecutionFingerprint {
208 execution_id: format!("exec-{}", i),
209 workflow_id: "wf-1".to_string(),
210 total_duration_ms: 1000,
211 step_durations: HashMap::new(),
212 step_outcomes: HashMap::new(),
213 retry_count: 0,
214 completed_at: Utc::now(),
215 });
216 }
217
218 engine.record_fingerprint(ExecutionFingerprint {
220 execution_id: "exec-outlier".to_string(),
221 workflow_id: "wf-1".to_string(),
222 total_duration_ms: 100_000,
223 step_durations: HashMap::new(),
224 step_outcomes: HashMap::new(),
225 retry_count: 0,
226 completed_at: Utc::now(),
227 });
228
229 let anomalies = engine.detect_anomalies("wf-1");
230 assert!(!anomalies.is_empty());
231 }
232}