Skip to main content

enact_core/workflow/
health.rs

1//! Run Health Watchdog - Monitor and detect stuck/abandoned runs
2//!
3//! Implements Antfarm-style medic checks:
4//! - Stuck running steps
5//! - Abandoned claims
6//! - Dead runs with no state movement
7//! - Orphan scheduler jobs
8
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12
13/// Health check configuration
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct HealthCheckConfig {
16    /// Whether health checks are enabled
17    #[serde(default = "default_true")]
18    pub enabled: bool,
19    /// Threshold for considering a step stuck (seconds)
20    #[serde(default = "default_stuck_threshold")]
21    pub stuck_threshold_seconds: i64,
22    /// Threshold for considering a run abandoned (minutes)
23    #[serde(default = "default_abandoned_threshold")]
24    pub abandoned_threshold_minutes: i64,
25    /// Check interval (seconds)
26    #[serde(default = "default_check_interval")]
27    pub check_interval_seconds: u64,
28}
29
30fn default_true() -> bool {
31    true
32}
33
34fn default_stuck_threshold() -> i64 {
35    300 // 5 minutes
36}
37
38fn default_abandoned_threshold() -> i64 {
39    30 // 30 minutes
40}
41
42fn default_check_interval() -> u64 {
43    60 // 1 minute
44}
45
46impl Default for HealthCheckConfig {
47    fn default() -> Self {
48        Self {
49            enabled: true,
50            stuck_threshold_seconds: 300,
51            abandoned_threshold_minutes: 30,
52            check_interval_seconds: 60,
53        }
54    }
55}
56
57/// Run state for health monitoring
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
59#[serde(rename_all = "snake_case")]
60pub enum MonitoredRunState {
61    /// Run is pending/queued
62    Pending,
63    /// Run is currently executing
64    Running,
65    /// Run is paused/waiting
66    Paused,
67    /// Run completed successfully
68    Completed,
69    /// Run failed
70    Failed,
71    /// Run was cancelled
72    Cancelled,
73    /// Run timed out
74    TimedOut,
75}
76
77/// Information about a running step
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct StepExecution {
80    /// Step ID
81    pub step_id: String,
82    /// Step name
83    pub step_name: String,
84    /// When step started
85    pub started_at: DateTime<Utc>,
86    /// Last activity timestamp
87    pub last_activity: DateTime<Utc>,
88    /// Step state
89    pub state: StepState,
90}
91
92/// Step execution state
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
94#[serde(rename_all = "snake_case")]
95pub enum StepState {
96    /// Step is executing
97    Executing,
98    /// Step is waiting (e.g., for user input)
99    Waiting,
100    /// Step is in retry backoff
101    Retrying,
102    /// Step is blocked
103    Blocked,
104}
105
106/// Monitored run information
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct MonitoredRun {
109    /// Run ID
110    pub run_id: String,
111    /// Workflow name
112    pub workflow_name: String,
113    /// Current state
114    pub state: MonitoredRunState,
115    /// When run started
116    pub started_at: DateTime<Utc>,
117    /// Last state change
118    pub last_state_change: DateTime<Utc>,
119    /// Current step (if running)
120    pub current_step: Option<StepExecution>,
121    /// Number of state changes
122    pub state_change_count: usize,
123    /// Retry count
124    pub retry_count: u32,
125    /// Whether run is orphaned (no active process)
126    #[serde(default)]
127    pub is_orphaned: bool,
128    /// Last health check
129    pub last_health_check: Option<DateTime<Utc>>,
130}
131
132/// Health issue detected
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct HealthIssue {
135    /// Issue type
136    pub issue_type: IssueType,
137    /// Run ID affected
138    pub run_id: String,
139    /// Description
140    pub description: String,
141    /// When issue was detected
142    pub detected_at: DateTime<Utc>,
143    /// Severity
144    pub severity: IssueSeverity,
145    /// Recommended action
146    pub recommended_action: RecommendedAction,
147}
148
149/// Types of health issues
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
151#[serde(rename_all = "snake_case")]
152pub enum IssueType {
153    /// Step is stuck
154    StuckStep,
155    /// Run is abandoned
156    AbandonedRun,
157    /// Run has no state movement
158    DeadRun,
159    /// Run is orphaned
160    OrphanedRun,
161    /// Step retry exhaustion
162    RetryExhaustion,
163}
164
165/// Issue severity
166#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Ord)]
167#[serde(rename_all = "lowercase")]
168pub enum IssueSeverity {
169    Info,
170    Warning,
171    Error,
172    Critical,
173}
174
175/// Recommended action for an issue
176#[derive(Debug, Clone, Serialize, Deserialize)]
177#[serde(tag = "type", rename_all = "snake_case")]
178pub enum RecommendedAction {
179    /// Alert operator
180    Alert,
181    /// Auto-retry
182    AutoRetry,
183    /// Terminate run
184    Terminate,
185    /// Escalate to human
186    Escalate,
187    /// Collect diagnostics
188    CollectDiagnostics,
189    /// No action needed
190    None,
191}
192
193/// Health check result
194#[derive(Debug, Clone)]
195pub struct HealthCheckResult {
196    /// Issues found
197    pub issues: Vec<HealthIssue>,
198    /// Runs checked
199    pub runs_checked: usize,
200    /// Timestamp
201    pub checked_at: DateTime<Utc>,
202}
203
204/// Run health watchdog
205pub struct RunHealthWatchdog {
206    config: HealthCheckConfig,
207    runs: HashMap<String, MonitoredRun>,
208}
209
210impl RunHealthWatchdog {
211    /// Create a new watchdog
212    pub fn new(config: HealthCheckConfig) -> Self {
213        Self {
214            config,
215            runs: HashMap::new(),
216        }
217    }
218
219    /// Register a run for monitoring
220    pub fn register_run(&mut self, run_id: String, workflow_name: String) {
221        let now = Utc::now();
222        self.runs.insert(
223            run_id.clone(),
224            MonitoredRun {
225                run_id,
226                workflow_name,
227                state: MonitoredRunState::Pending,
228                started_at: now,
229                last_state_change: now,
230                current_step: None,
231                state_change_count: 0,
232                retry_count: 0,
233                is_orphaned: false,
234                last_health_check: None,
235            },
236        );
237    }
238
239    /// Update run state
240    pub fn update_run_state(&mut self, run_id: &str, new_state: MonitoredRunState) {
241        if let Some(run) = self.runs.get_mut(run_id) {
242            run.state = new_state;
243            run.last_state_change = Utc::now();
244            run.state_change_count += 1;
245        }
246    }
247
248    /// Update current step
249    pub fn update_current_step(
250        &mut self,
251        run_id: &str,
252        step_id: &str,
253        step_name: &str,
254        step_state: StepState,
255    ) {
256        let now = Utc::now();
257        if let Some(run) = self.runs.get_mut(run_id) {
258            run.current_step = Some(StepExecution {
259                step_id: step_id.to_string(),
260                step_name: step_name.to_string(),
261                started_at: now,
262                last_activity: now,
263                state: step_state,
264            });
265            run.last_state_change = now;
266        }
267    }
268
269    /// Record step activity
270    pub fn record_step_activity(&mut self, run_id: &str) {
271        if let Some(run) = self.runs.get_mut(run_id) {
272            if let Some(step) = &mut run.current_step {
273                step.last_activity = Utc::now();
274            }
275        }
276    }
277
278    /// Mark run as orphaned
279    pub fn mark_orphaned(&mut self, run_id: &str) {
280        if let Some(run) = self.runs.get_mut(run_id) {
281            run.is_orphaned = true;
282        }
283    }
284
285    /// Perform health checks on all runs
286    pub fn check_health(&self) -> HealthCheckResult {
287        let mut issues = Vec::new();
288        let now = Utc::now();
289
290        for run in self.runs.values() {
291            // Skip completed/failed/cancelled runs
292            match run.state {
293                MonitoredRunState::Completed
294                | MonitoredRunState::Failed
295                | MonitoredRunState::Cancelled => continue,
296                _ => {}
297            }
298
299            // Check for stuck steps
300            if let Some(step) = &run.current_step {
301                let step_duration = now.signed_duration_since(step.started_at);
302                let inactive_duration = now.signed_duration_since(step.last_activity);
303
304                // Check if step has been running too long
305                if step_duration.num_seconds() > self.config.stuck_threshold_seconds {
306                    issues.push(HealthIssue {
307                        issue_type: IssueType::StuckStep,
308                        run_id: run.run_id.clone(),
309                        description: format!(
310                            "Step '{}' has been executing for {} seconds (threshold: {}s)",
311                            step.step_name,
312                            step_duration.num_seconds(),
313                            self.config.stuck_threshold_seconds
314                        ),
315                        detected_at: now,
316                        severity: IssueSeverity::Error,
317                        recommended_action: RecommendedAction::Escalate,
318                    });
319                }
320
321                // Check for inactive steps (no activity)
322                if inactive_duration.num_seconds() > self.config.stuck_threshold_seconds {
323                    issues.push(HealthIssue {
324                        issue_type: IssueType::DeadRun,
325                        run_id: run.run_id.clone(),
326                        description: format!(
327                            "Step '{}' has had no activity for {} seconds",
328                            step.step_name,
329                            inactive_duration.num_seconds()
330                        ),
331                        detected_at: now,
332                        severity: IssueSeverity::Warning,
333                        recommended_action: RecommendedAction::CollectDiagnostics,
334                    });
335                }
336            }
337
338            // Check for abandoned runs
339            let run_duration = now.signed_duration_since(run.started_at);
340            if run_duration.num_minutes() > self.config.abandoned_threshold_minutes {
341                issues.push(HealthIssue {
342                    issue_type: IssueType::AbandonedRun,
343                    run_id: run.run_id.clone(),
344                    description: format!(
345                        "Run has been active for {} minutes (threshold: {}m)",
346                        run_duration.num_minutes(),
347                        self.config.abandoned_threshold_minutes
348                    ),
349                    detected_at: now,
350                    severity: IssueSeverity::Warning,
351                    recommended_action: RecommendedAction::Alert,
352                });
353            }
354
355            // Check for dead runs (no state changes)
356            let since_last_change = now.signed_duration_since(run.last_state_change);
357            if since_last_change.num_minutes() > self.config.abandoned_threshold_minutes {
358                issues.push(HealthIssue {
359                    issue_type: IssueType::DeadRun,
360                    run_id: run.run_id.clone(),
361                    description: format!(
362                        "No state changes for {} minutes",
363                        since_last_change.num_minutes()
364                    ),
365                    detected_at: now,
366                    severity: IssueSeverity::Error,
367                    recommended_action: RecommendedAction::Terminate,
368                });
369            }
370
371            // Check for orphaned runs
372            if run.is_orphaned {
373                issues.push(HealthIssue {
374                    issue_type: IssueType::OrphanedRun,
375                    run_id: run.run_id.clone(),
376                    description: "Run process no longer exists".to_string(),
377                    detected_at: now,
378                    severity: IssueSeverity::Critical,
379                    recommended_action: RecommendedAction::Terminate,
380                });
381            }
382        }
383
384        HealthCheckResult {
385            issues,
386            runs_checked: self.runs.len(),
387            checked_at: now,
388        }
389    }
390
391    /// Get a specific run
392    pub fn get_run(&self, run_id: &str) -> Option<&MonitoredRun> {
393        self.runs.get(run_id)
394    }
395
396    /// Get all monitored runs
397    pub fn get_all_runs(&self) -> &HashMap<String, MonitoredRun> {
398        &self.runs
399    }
400
401    /// Remove a run from monitoring
402    pub fn remove_run(&mut self, run_id: &str) {
403        self.runs.remove(run_id);
404    }
405
406    /// Get runs by state
407    pub fn get_runs_by_state(&self, state: MonitoredRunState) -> Vec<&MonitoredRun> {
408        self.runs.values().filter(|r| r.state == state).collect()
409    }
410
411    /// Get runs that need attention (have issues)
412    pub fn get_runs_needing_attention(&self) -> Vec<&MonitoredRun> {
413        let health_result = self.check_health();
414        let run_ids: std::collections::HashSet<_> =
415            health_result.issues.iter().map(|i| &i.run_id).collect();
416
417        self.runs
418            .values()
419            .filter(|r| run_ids.contains(&r.run_id))
420            .collect()
421    }
422
423    /// Generate health report
424    pub fn generate_report(&self) -> String {
425        let result = self.check_health();
426        let mut report = String::new();
427
428        report.push_str(&format!(
429            "# Health Check Report\n\n**Checked at:** {}\n\n",
430            result.checked_at.format("%Y-%m-%d %H:%M:%S UTC")
431        ));
432
433        report.push_str(&format!("**Runs monitored:** {}\n\n", result.runs_checked));
434
435        if result.issues.is_empty() {
436            report.push_str("✅ All runs healthy\n");
437        } else {
438            report.push_str(&format!(
439                "âš ī¸  **Issues found:** {}\n\n",
440                result.issues.len()
441            ));
442
443            // Group by severity
444            let mut critical = vec![];
445            let mut errors = vec![];
446            let mut warnings = vec![];
447            let mut infos = vec![];
448
449            for issue in &result.issues {
450                match issue.severity {
451                    IssueSeverity::Critical => critical.push(issue),
452                    IssueSeverity::Error => errors.push(issue),
453                    IssueSeverity::Warning => warnings.push(issue),
454                    IssueSeverity::Info => infos.push(issue),
455                }
456            }
457
458            for (severity, issues) in [
459                ("🔴 Critical", critical),
460                ("❌ Errors", errors),
461                ("âš ī¸  Warnings", warnings),
462                ("â„šī¸  Info", infos),
463            ] {
464                if !issues.is_empty() {
465                    report.push_str(&format!("## {}\n\n", severity));
466                    for issue in issues {
467                        report.push_str(&format!("### {}\n", issue.run_id));
468                        report.push_str(&format!("**Type:** {:?}\n\n", issue.issue_type));
469                        report.push_str(&format!("{}\n\n", issue.description));
470                        report.push_str(&format!(
471                            "**Recommended action:** {:?}\n\n",
472                            issue.recommended_action
473                        ));
474                    }
475                }
476            }
477        }
478
479        // Summary by state
480        report.push_str("\n## Run States\n\n");
481        for state in [
482            MonitoredRunState::Pending,
483            MonitoredRunState::Running,
484            MonitoredRunState::Paused,
485            MonitoredRunState::Completed,
486            MonitoredRunState::Failed,
487            MonitoredRunState::Cancelled,
488            MonitoredRunState::TimedOut,
489        ] {
490            let count = self.get_runs_by_state(state).len();
491            report.push_str(&format!("- {:?}: {}\n", state, count));
492        }
493
494        report
495    }
496}
497
498/// Background health check task
499pub struct HealthCheckTask {
500    watchdog: RunHealthWatchdog,
501    last_check: Option<DateTime<Utc>>,
502}
503
504impl HealthCheckTask {
505    /// Create a new health check task
506    pub fn new(watchdog: RunHealthWatchdog) -> Self {
507        Self {
508            watchdog,
509            last_check: None,
510        }
511    }
512
513    /// Run a single health check
514    pub fn check(&mut self) -> HealthCheckResult {
515        let result = self.watchdog.check_health();
516        self.last_check = Some(Utc::now());
517        result
518    }
519
520    /// Get the watchdog
521    pub fn watchdog(&self) -> &RunHealthWatchdog {
522        &self.watchdog
523    }
524
525    /// Get mutable watchdog
526    pub fn watchdog_mut(&mut self) -> &mut RunHealthWatchdog {
527        &mut self.watchdog
528    }
529}
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534    use chrono::Duration;
535
536    #[test]
537    fn test_watchdog_registration() {
538        let config = HealthCheckConfig::default();
539        let mut watchdog = RunHealthWatchdog::new(config);
540
541        watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
542
543        let run = watchdog.get_run("run-1").unwrap();
544        assert_eq!(run.run_id, "run-1");
545        assert_eq!(run.workflow_name, "feature-dev");
546        assert!(matches!(run.state, MonitoredRunState::Pending));
547    }
548
549    #[test]
550    fn test_stuck_step_detection() {
551        let config = HealthCheckConfig {
552            enabled: true,
553            stuck_threshold_seconds: 10,
554            abandoned_threshold_minutes: 30,
555            check_interval_seconds: 60,
556        };
557
558        let mut watchdog = RunHealthWatchdog::new(config);
559        watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
560
561        // Simulate a step running for too long
562        let long_ago = Utc::now() - Duration::seconds(20);
563        watchdog.runs.get_mut("run-1").unwrap().current_step = Some(StepExecution {
564            step_id: "step-1".to_string(),
565            step_name: "Long Step".to_string(),
566            started_at: long_ago,
567            last_activity: long_ago,
568            state: StepState::Executing,
569        });
570
571        let result = watchdog.check_health();
572        assert!(!result.issues.is_empty());
573        assert!(result
574            .issues
575            .iter()
576            .any(|i| matches!(i.issue_type, IssueType::StuckStep)));
577    }
578
579    #[test]
580    fn test_abandoned_run_detection() {
581        let config = HealthCheckConfig {
582            enabled: true,
583            stuck_threshold_seconds: 300,
584            abandoned_threshold_minutes: 1, // Very short for testing
585            check_interval_seconds: 60,
586        };
587
588        let mut watchdog = RunHealthWatchdog::new(config);
589        watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
590
591        // Set start time to long ago
592        let long_ago = Utc::now() - Duration::minutes(5);
593        watchdog.runs.get_mut("run-1").unwrap().started_at = long_ago;
594
595        let result = watchdog.check_health();
596        assert!(!result.issues.is_empty());
597        assert!(result
598            .issues
599            .iter()
600            .any(|i| matches!(i.issue_type, IssueType::AbandonedRun)));
601    }
602
603    #[test]
604    fn test_orphaned_run_detection() {
605        let config = HealthCheckConfig::default();
606        let mut watchdog = RunHealthWatchdog::new(config);
607        watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
608        watchdog.mark_orphaned("run-1");
609
610        let result = watchdog.check_health();
611        assert!(!result.issues.is_empty());
612        assert!(result
613            .issues
614            .iter()
615            .any(|i| matches!(i.issue_type, IssueType::OrphanedRun)));
616    }
617
618    #[test]
619    fn test_health_report() {
620        let config = HealthCheckConfig::default();
621        let mut watchdog = RunHealthWatchdog::new(config);
622        watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
623        watchdog.register_run("run-2".to_string(), "bug-fix".to_string());
624
625        let report = watchdog.generate_report();
626        assert!(report.contains("Health Check Report"));
627        assert!(report.contains("Runs monitored:** 2"));
628    }
629}