Skip to main content

agentic_workflow/intelligence/
evolution.rs

1use crate::types::{
2    ExecutionFingerprint, HealthIssue, IssueSeverity, WorkflowHealth,
3    WorkflowResult,
4};
5
6/// Workflow evolution engine — health monitoring and optimization.
7pub struct EvolutionEngine {
8    fingerprints: Vec<ExecutionFingerprint>,
9}
10
11impl EvolutionEngine {
12    pub fn new() -> Self {
13        Self {
14            fingerprints: Vec::new(),
15        }
16    }
17
18    /// Ingest execution data.
19    pub fn ingest(&mut self, fp: ExecutionFingerprint) {
20        self.fingerprints.push(fp);
21    }
22
23    /// Get workflow health score.
24    pub fn health(&self, workflow_id: &str) -> WorkflowResult<WorkflowHealth> {
25        let fps: Vec<&ExecutionFingerprint> = self
26            .fingerprints
27            .iter()
28            .filter(|f| f.workflow_id == workflow_id)
29            .collect();
30
31        if fps.is_empty() {
32            return Ok(WorkflowHealth {
33                workflow_id: workflow_id.to_string(),
34                score: 1.0,
35                success_rate: 1.0,
36                avg_duration_ms: 0,
37                drift_detected: false,
38                issues: Vec::new(),
39            });
40        }
41
42        let success_count = fps.iter().filter(|f| f.retry_count == 0).count();
43        let success_rate = success_count as f64 / fps.len() as f64;
44
45        let avg_duration = fps.iter().map(|f| f.total_duration_ms).sum::<u64>() / fps.len() as u64;
46
47        let drift_detected = self.detect_drift(workflow_id);
48        let mut issues = Vec::new();
49
50        if success_rate < 0.8 {
51            issues.push(HealthIssue {
52                severity: IssueSeverity::Critical,
53                step_id: None,
54                description: format!("Success rate is {:.0}%", success_rate * 100.0),
55                suggestion: "Review failing steps and retry policies".to_string(),
56            });
57        }
58
59        if drift_detected {
60            issues.push(HealthIssue {
61                severity: IssueSeverity::Warning,
62                step_id: None,
63                description: "Performance drift detected".to_string(),
64                suggestion: "Recent executions are taking longer than historical average"
65                    .to_string(),
66            });
67        }
68
69        let score = success_rate * if drift_detected { 0.8 } else { 1.0 };
70
71        Ok(WorkflowHealth {
72            workflow_id: workflow_id.to_string(),
73            score,
74            success_rate,
75            avg_duration_ms: avg_duration,
76            drift_detected,
77            issues,
78        })
79    }
80
81    /// Detect performance drift (recent executions significantly slower).
82    pub fn detect_drift(&self, workflow_id: &str) -> bool {
83        let fps: Vec<&ExecutionFingerprint> = self
84            .fingerprints
85            .iter()
86            .filter(|f| f.workflow_id == workflow_id)
87            .collect();
88
89        if fps.len() < 6 {
90            return false;
91        }
92
93        let split = fps.len() / 2;
94        let old_avg: f64 = fps[..split]
95            .iter()
96            .map(|f| f.total_duration_ms as f64)
97            .sum::<f64>()
98            / split as f64;
99
100        let new_avg: f64 = fps[split..]
101            .iter()
102            .map(|f| f.total_duration_ms as f64)
103            .sum::<f64>()
104            / (fps.len() - split) as f64;
105
106        // Drift if recent is >50% slower
107        new_avg > old_avg * 1.5
108    }
109
110    /// Suggest optimizations.
111    pub fn suggest_optimizations(&self, workflow_id: &str) -> Vec<String> {
112        let mut suggestions = Vec::new();
113
114        let fps: Vec<&ExecutionFingerprint> = self
115            .fingerprints
116            .iter()
117            .filter(|f| f.workflow_id == workflow_id)
118            .collect();
119
120        if fps.is_empty() {
121            return suggestions;
122        }
123
124        let avg_retries: f64 =
125            fps.iter().map(|f| f.retry_count as f64).sum::<f64>() / fps.len() as f64;
126
127        if avg_retries > 2.0 {
128            suggestions.push(format!(
129                "Average retry count is {:.1} — consider optimizing retry policies",
130                avg_retries
131            ));
132        }
133
134        if self.detect_drift(workflow_id) {
135            suggestions.push(
136                "Performance is drifting upward — investigate recent step duration increases"
137                    .to_string(),
138            );
139        }
140
141        suggestions
142    }
143
144    /// Identify potentially outdated steps.
145    pub fn outdated_steps(&self, workflow_id: &str) -> Vec<String> {
146        let fps: Vec<&ExecutionFingerprint> = self
147            .fingerprints
148            .iter()
149            .filter(|f| f.workflow_id == workflow_id)
150            .collect();
151
152        if fps.len() < 5 {
153            return Vec::new();
154        }
155
156        // Find steps with increasing failure rates
157        let recent = &fps[fps.len().saturating_sub(5)..];
158        let mut step_fail_rates: std::collections::HashMap<&str, usize> =
159            std::collections::HashMap::new();
160
161        for fp in recent {
162            for (sid, outcome) in &fp.step_outcomes {
163                if *outcome == crate::types::StepLifecycle::Failed {
164                    *step_fail_rates.entry(sid.as_str()).or_insert(0) += 1;
165                }
166            }
167        }
168
169        step_fail_rates
170            .into_iter()
171            .filter(|(_, fails)| *fails >= 3)
172            .map(|(sid, _)| sid.to_string())
173            .collect()
174    }
175}
176
177impl Default for EvolutionEngine {
178    fn default() -> Self {
179        Self::new()
180    }
181}