use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthCheckConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default = "default_stuck_threshold")]
pub stuck_threshold_seconds: i64,
#[serde(default = "default_abandoned_threshold")]
pub abandoned_threshold_minutes: i64,
#[serde(default = "default_check_interval")]
pub check_interval_seconds: u64,
}
fn default_true() -> bool {
true
}
fn default_stuck_threshold() -> i64 {
300 }
fn default_abandoned_threshold() -> i64 {
30 }
fn default_check_interval() -> u64 {
60 }
impl Default for HealthCheckConfig {
fn default() -> Self {
Self {
enabled: true,
stuck_threshold_seconds: 300,
abandoned_threshold_minutes: 30,
check_interval_seconds: 60,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum MonitoredRunState {
Pending,
Running,
Paused,
Completed,
Failed,
Cancelled,
TimedOut,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepExecution {
pub step_id: String,
pub step_name: String,
pub started_at: DateTime<Utc>,
pub last_activity: DateTime<Utc>,
pub state: StepState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum StepState {
Executing,
Waiting,
Retrying,
Blocked,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitoredRun {
pub run_id: String,
pub workflow_name: String,
pub state: MonitoredRunState,
pub started_at: DateTime<Utc>,
pub last_state_change: DateTime<Utc>,
pub current_step: Option<StepExecution>,
pub state_change_count: usize,
pub retry_count: u32,
#[serde(default)]
pub is_orphaned: bool,
pub last_health_check: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthIssue {
pub issue_type: IssueType,
pub run_id: String,
pub description: String,
pub detected_at: DateTime<Utc>,
pub severity: IssueSeverity,
pub recommended_action: RecommendedAction,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum IssueType {
StuckStep,
AbandonedRun,
DeadRun,
OrphanedRun,
RetryExhaustion,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Ord)]
#[serde(rename_all = "lowercase")]
pub enum IssueSeverity {
Info,
Warning,
Error,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum RecommendedAction {
Alert,
AutoRetry,
Terminate,
Escalate,
CollectDiagnostics,
None,
}
#[derive(Debug, Clone)]
pub struct HealthCheckResult {
pub issues: Vec<HealthIssue>,
pub runs_checked: usize,
pub checked_at: DateTime<Utc>,
}
pub struct RunHealthWatchdog {
config: HealthCheckConfig,
runs: HashMap<String, MonitoredRun>,
}
impl RunHealthWatchdog {
pub fn new(config: HealthCheckConfig) -> Self {
Self {
config,
runs: HashMap::new(),
}
}
pub fn register_run(&mut self, run_id: String, workflow_name: String) {
let now = Utc::now();
self.runs.insert(
run_id.clone(),
MonitoredRun {
run_id,
workflow_name,
state: MonitoredRunState::Pending,
started_at: now,
last_state_change: now,
current_step: None,
state_change_count: 0,
retry_count: 0,
is_orphaned: false,
last_health_check: None,
},
);
}
pub fn update_run_state(&mut self, run_id: &str, new_state: MonitoredRunState) {
if let Some(run) = self.runs.get_mut(run_id) {
run.state = new_state;
run.last_state_change = Utc::now();
run.state_change_count += 1;
}
}
pub fn update_current_step(
&mut self,
run_id: &str,
step_id: &str,
step_name: &str,
step_state: StepState,
) {
let now = Utc::now();
if let Some(run) = self.runs.get_mut(run_id) {
run.current_step = Some(StepExecution {
step_id: step_id.to_string(),
step_name: step_name.to_string(),
started_at: now,
last_activity: now,
state: step_state,
});
run.last_state_change = now;
}
}
pub fn record_step_activity(&mut self, run_id: &str) {
if let Some(run) = self.runs.get_mut(run_id) {
if let Some(step) = &mut run.current_step {
step.last_activity = Utc::now();
}
}
}
pub fn mark_orphaned(&mut self, run_id: &str) {
if let Some(run) = self.runs.get_mut(run_id) {
run.is_orphaned = true;
}
}
pub fn check_health(&self) -> HealthCheckResult {
let mut issues = Vec::new();
let now = Utc::now();
for run in self.runs.values() {
match run.state {
MonitoredRunState::Completed
| MonitoredRunState::Failed
| MonitoredRunState::Cancelled => continue,
_ => {}
}
if let Some(step) = &run.current_step {
let step_duration = now.signed_duration_since(step.started_at);
let inactive_duration = now.signed_duration_since(step.last_activity);
if step_duration.num_seconds() > self.config.stuck_threshold_seconds {
issues.push(HealthIssue {
issue_type: IssueType::StuckStep,
run_id: run.run_id.clone(),
description: format!(
"Step '{}' has been executing for {} seconds (threshold: {}s)",
step.step_name,
step_duration.num_seconds(),
self.config.stuck_threshold_seconds
),
detected_at: now,
severity: IssueSeverity::Error,
recommended_action: RecommendedAction::Escalate,
});
}
if inactive_duration.num_seconds() > self.config.stuck_threshold_seconds {
issues.push(HealthIssue {
issue_type: IssueType::DeadRun,
run_id: run.run_id.clone(),
description: format!(
"Step '{}' has had no activity for {} seconds",
step.step_name,
inactive_duration.num_seconds()
),
detected_at: now,
severity: IssueSeverity::Warning,
recommended_action: RecommendedAction::CollectDiagnostics,
});
}
}
let run_duration = now.signed_duration_since(run.started_at);
if run_duration.num_minutes() > self.config.abandoned_threshold_minutes {
issues.push(HealthIssue {
issue_type: IssueType::AbandonedRun,
run_id: run.run_id.clone(),
description: format!(
"Run has been active for {} minutes (threshold: {}m)",
run_duration.num_minutes(),
self.config.abandoned_threshold_minutes
),
detected_at: now,
severity: IssueSeverity::Warning,
recommended_action: RecommendedAction::Alert,
});
}
let since_last_change = now.signed_duration_since(run.last_state_change);
if since_last_change.num_minutes() > self.config.abandoned_threshold_minutes {
issues.push(HealthIssue {
issue_type: IssueType::DeadRun,
run_id: run.run_id.clone(),
description: format!(
"No state changes for {} minutes",
since_last_change.num_minutes()
),
detected_at: now,
severity: IssueSeverity::Error,
recommended_action: RecommendedAction::Terminate,
});
}
if run.is_orphaned {
issues.push(HealthIssue {
issue_type: IssueType::OrphanedRun,
run_id: run.run_id.clone(),
description: "Run process no longer exists".to_string(),
detected_at: now,
severity: IssueSeverity::Critical,
recommended_action: RecommendedAction::Terminate,
});
}
}
HealthCheckResult {
issues,
runs_checked: self.runs.len(),
checked_at: now,
}
}
pub fn get_run(&self, run_id: &str) -> Option<&MonitoredRun> {
self.runs.get(run_id)
}
pub fn get_all_runs(&self) -> &HashMap<String, MonitoredRun> {
&self.runs
}
pub fn remove_run(&mut self, run_id: &str) {
self.runs.remove(run_id);
}
pub fn get_runs_by_state(&self, state: MonitoredRunState) -> Vec<&MonitoredRun> {
self.runs.values().filter(|r| r.state == state).collect()
}
pub fn get_runs_needing_attention(&self) -> Vec<&MonitoredRun> {
let health_result = self.check_health();
let run_ids: std::collections::HashSet<_> =
health_result.issues.iter().map(|i| &i.run_id).collect();
self.runs
.values()
.filter(|r| run_ids.contains(&r.run_id))
.collect()
}
pub fn generate_report(&self) -> String {
let result = self.check_health();
let mut report = String::new();
report.push_str(&format!(
"# Health Check Report\n\n**Checked at:** {}\n\n",
result.checked_at.format("%Y-%m-%d %H:%M:%S UTC")
));
report.push_str(&format!("**Runs monitored:** {}\n\n", result.runs_checked));
if result.issues.is_empty() {
report.push_str("✅ All runs healthy\n");
} else {
report.push_str(&format!(
"⚠️ **Issues found:** {}\n\n",
result.issues.len()
));
let mut critical = vec![];
let mut errors = vec![];
let mut warnings = vec![];
let mut infos = vec![];
for issue in &result.issues {
match issue.severity {
IssueSeverity::Critical => critical.push(issue),
IssueSeverity::Error => errors.push(issue),
IssueSeverity::Warning => warnings.push(issue),
IssueSeverity::Info => infos.push(issue),
}
}
for (severity, issues) in [
("🔴 Critical", critical),
("❌ Errors", errors),
("⚠️ Warnings", warnings),
("ℹ️ Info", infos),
] {
if !issues.is_empty() {
report.push_str(&format!("## {}\n\n", severity));
for issue in issues {
report.push_str(&format!("### {}\n", issue.run_id));
report.push_str(&format!("**Type:** {:?}\n\n", issue.issue_type));
report.push_str(&format!("{}\n\n", issue.description));
report.push_str(&format!(
"**Recommended action:** {:?}\n\n",
issue.recommended_action
));
}
}
}
}
report.push_str("\n## Run States\n\n");
for state in [
MonitoredRunState::Pending,
MonitoredRunState::Running,
MonitoredRunState::Paused,
MonitoredRunState::Completed,
MonitoredRunState::Failed,
MonitoredRunState::Cancelled,
MonitoredRunState::TimedOut,
] {
let count = self.get_runs_by_state(state).len();
report.push_str(&format!("- {:?}: {}\n", state, count));
}
report
}
}
pub struct HealthCheckTask {
watchdog: RunHealthWatchdog,
last_check: Option<DateTime<Utc>>,
}
impl HealthCheckTask {
pub fn new(watchdog: RunHealthWatchdog) -> Self {
Self {
watchdog,
last_check: None,
}
}
pub fn check(&mut self) -> HealthCheckResult {
let result = self.watchdog.check_health();
self.last_check = Some(Utc::now());
result
}
pub fn watchdog(&self) -> &RunHealthWatchdog {
&self.watchdog
}
pub fn watchdog_mut(&mut self) -> &mut RunHealthWatchdog {
&mut self.watchdog
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration;
#[test]
fn test_watchdog_registration() {
let config = HealthCheckConfig::default();
let mut watchdog = RunHealthWatchdog::new(config);
watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
let run = watchdog.get_run("run-1").unwrap();
assert_eq!(run.run_id, "run-1");
assert_eq!(run.workflow_name, "feature-dev");
assert!(matches!(run.state, MonitoredRunState::Pending));
}
#[test]
fn test_stuck_step_detection() {
let config = HealthCheckConfig {
enabled: true,
stuck_threshold_seconds: 10,
abandoned_threshold_minutes: 30,
check_interval_seconds: 60,
};
let mut watchdog = RunHealthWatchdog::new(config);
watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
let long_ago = Utc::now() - Duration::seconds(20);
watchdog.runs.get_mut("run-1").unwrap().current_step = Some(StepExecution {
step_id: "step-1".to_string(),
step_name: "Long Step".to_string(),
started_at: long_ago,
last_activity: long_ago,
state: StepState::Executing,
});
let result = watchdog.check_health();
assert!(!result.issues.is_empty());
assert!(result
.issues
.iter()
.any(|i| matches!(i.issue_type, IssueType::StuckStep)));
}
#[test]
fn test_abandoned_run_detection() {
let config = HealthCheckConfig {
enabled: true,
stuck_threshold_seconds: 300,
abandoned_threshold_minutes: 1, check_interval_seconds: 60,
};
let mut watchdog = RunHealthWatchdog::new(config);
watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
let long_ago = Utc::now() - Duration::minutes(5);
watchdog.runs.get_mut("run-1").unwrap().started_at = long_ago;
let result = watchdog.check_health();
assert!(!result.issues.is_empty());
assert!(result
.issues
.iter()
.any(|i| matches!(i.issue_type, IssueType::AbandonedRun)));
}
#[test]
fn test_orphaned_run_detection() {
let config = HealthCheckConfig::default();
let mut watchdog = RunHealthWatchdog::new(config);
watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
watchdog.mark_orphaned("run-1");
let result = watchdog.check_health();
assert!(!result.issues.is_empty());
assert!(result
.issues
.iter()
.any(|i| matches!(i.issue_type, IssueType::OrphanedRun)));
}
#[test]
fn test_health_report() {
let config = HealthCheckConfig::default();
let mut watchdog = RunHealthWatchdog::new(config);
watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
watchdog.register_run("run-2".to_string(), "bug-fix".to_string());
let report = watchdog.generate_report();
assert!(report.contains("Health Check Report"));
assert!(report.contains("Runs monitored:** 2"));
}
}