1use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct HealthCheckConfig {
16 #[serde(default = "default_true")]
18 pub enabled: bool,
19 #[serde(default = "default_stuck_threshold")]
21 pub stuck_threshold_seconds: i64,
22 #[serde(default = "default_abandoned_threshold")]
24 pub abandoned_threshold_minutes: i64,
25 #[serde(default = "default_check_interval")]
27 pub check_interval_seconds: u64,
28}
29
30fn default_true() -> bool {
31 true
32}
33
34fn default_stuck_threshold() -> i64 {
35 300 }
37
38fn default_abandoned_threshold() -> i64 {
39 30 }
41
42fn default_check_interval() -> u64 {
43 60 }
45
46impl Default for HealthCheckConfig {
47 fn default() -> Self {
48 Self {
49 enabled: true,
50 stuck_threshold_seconds: 300,
51 abandoned_threshold_minutes: 30,
52 check_interval_seconds: 60,
53 }
54 }
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
59#[serde(rename_all = "snake_case")]
60pub enum MonitoredRunState {
61 Pending,
63 Running,
65 Paused,
67 Completed,
69 Failed,
71 Cancelled,
73 TimedOut,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct StepExecution {
80 pub step_id: String,
82 pub step_name: String,
84 pub started_at: DateTime<Utc>,
86 pub last_activity: DateTime<Utc>,
88 pub state: StepState,
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
94#[serde(rename_all = "snake_case")]
95pub enum StepState {
96 Executing,
98 Waiting,
100 Retrying,
102 Blocked,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct MonitoredRun {
109 pub run_id: String,
111 pub workflow_name: String,
113 pub state: MonitoredRunState,
115 pub started_at: DateTime<Utc>,
117 pub last_state_change: DateTime<Utc>,
119 pub current_step: Option<StepExecution>,
121 pub state_change_count: usize,
123 pub retry_count: u32,
125 #[serde(default)]
127 pub is_orphaned: bool,
128 pub last_health_check: Option<DateTime<Utc>>,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct HealthIssue {
135 pub issue_type: IssueType,
137 pub run_id: String,
139 pub description: String,
141 pub detected_at: DateTime<Utc>,
143 pub severity: IssueSeverity,
145 pub recommended_action: RecommendedAction,
147}
148
149#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
151#[serde(rename_all = "snake_case")]
152pub enum IssueType {
153 StuckStep,
155 AbandonedRun,
157 DeadRun,
159 OrphanedRun,
161 RetryExhaustion,
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Ord)]
167#[serde(rename_all = "lowercase")]
168pub enum IssueSeverity {
169 Info,
170 Warning,
171 Error,
172 Critical,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
177#[serde(tag = "type", rename_all = "snake_case")]
178pub enum RecommendedAction {
179 Alert,
181 AutoRetry,
183 Terminate,
185 Escalate,
187 CollectDiagnostics,
189 None,
191}
192
193#[derive(Debug, Clone)]
195pub struct HealthCheckResult {
196 pub issues: Vec<HealthIssue>,
198 pub runs_checked: usize,
200 pub checked_at: DateTime<Utc>,
202}
203
204pub struct RunHealthWatchdog {
206 config: HealthCheckConfig,
207 runs: HashMap<String, MonitoredRun>,
208}
209
210impl RunHealthWatchdog {
211 pub fn new(config: HealthCheckConfig) -> Self {
213 Self {
214 config,
215 runs: HashMap::new(),
216 }
217 }
218
219 pub fn register_run(&mut self, run_id: String, workflow_name: String) {
221 let now = Utc::now();
222 self.runs.insert(
223 run_id.clone(),
224 MonitoredRun {
225 run_id,
226 workflow_name,
227 state: MonitoredRunState::Pending,
228 started_at: now,
229 last_state_change: now,
230 current_step: None,
231 state_change_count: 0,
232 retry_count: 0,
233 is_orphaned: false,
234 last_health_check: None,
235 },
236 );
237 }
238
239 pub fn update_run_state(&mut self, run_id: &str, new_state: MonitoredRunState) {
241 if let Some(run) = self.runs.get_mut(run_id) {
242 run.state = new_state;
243 run.last_state_change = Utc::now();
244 run.state_change_count += 1;
245 }
246 }
247
248 pub fn update_current_step(
250 &mut self,
251 run_id: &str,
252 step_id: &str,
253 step_name: &str,
254 step_state: StepState,
255 ) {
256 let now = Utc::now();
257 if let Some(run) = self.runs.get_mut(run_id) {
258 run.current_step = Some(StepExecution {
259 step_id: step_id.to_string(),
260 step_name: step_name.to_string(),
261 started_at: now,
262 last_activity: now,
263 state: step_state,
264 });
265 run.last_state_change = now;
266 }
267 }
268
269 pub fn record_step_activity(&mut self, run_id: &str) {
271 if let Some(run) = self.runs.get_mut(run_id) {
272 if let Some(step) = &mut run.current_step {
273 step.last_activity = Utc::now();
274 }
275 }
276 }
277
278 pub fn mark_orphaned(&mut self, run_id: &str) {
280 if let Some(run) = self.runs.get_mut(run_id) {
281 run.is_orphaned = true;
282 }
283 }
284
285 pub fn check_health(&self) -> HealthCheckResult {
287 let mut issues = Vec::new();
288 let now = Utc::now();
289
290 for run in self.runs.values() {
291 match run.state {
293 MonitoredRunState::Completed
294 | MonitoredRunState::Failed
295 | MonitoredRunState::Cancelled => continue,
296 _ => {}
297 }
298
299 if let Some(step) = &run.current_step {
301 let step_duration = now.signed_duration_since(step.started_at);
302 let inactive_duration = now.signed_duration_since(step.last_activity);
303
304 if step_duration.num_seconds() > self.config.stuck_threshold_seconds {
306 issues.push(HealthIssue {
307 issue_type: IssueType::StuckStep,
308 run_id: run.run_id.clone(),
309 description: format!(
310 "Step '{}' has been executing for {} seconds (threshold: {}s)",
311 step.step_name,
312 step_duration.num_seconds(),
313 self.config.stuck_threshold_seconds
314 ),
315 detected_at: now,
316 severity: IssueSeverity::Error,
317 recommended_action: RecommendedAction::Escalate,
318 });
319 }
320
321 if inactive_duration.num_seconds() > self.config.stuck_threshold_seconds {
323 issues.push(HealthIssue {
324 issue_type: IssueType::DeadRun,
325 run_id: run.run_id.clone(),
326 description: format!(
327 "Step '{}' has had no activity for {} seconds",
328 step.step_name,
329 inactive_duration.num_seconds()
330 ),
331 detected_at: now,
332 severity: IssueSeverity::Warning,
333 recommended_action: RecommendedAction::CollectDiagnostics,
334 });
335 }
336 }
337
338 let run_duration = now.signed_duration_since(run.started_at);
340 if run_duration.num_minutes() > self.config.abandoned_threshold_minutes {
341 issues.push(HealthIssue {
342 issue_type: IssueType::AbandonedRun,
343 run_id: run.run_id.clone(),
344 description: format!(
345 "Run has been active for {} minutes (threshold: {}m)",
346 run_duration.num_minutes(),
347 self.config.abandoned_threshold_minutes
348 ),
349 detected_at: now,
350 severity: IssueSeverity::Warning,
351 recommended_action: RecommendedAction::Alert,
352 });
353 }
354
355 let since_last_change = now.signed_duration_since(run.last_state_change);
357 if since_last_change.num_minutes() > self.config.abandoned_threshold_minutes {
358 issues.push(HealthIssue {
359 issue_type: IssueType::DeadRun,
360 run_id: run.run_id.clone(),
361 description: format!(
362 "No state changes for {} minutes",
363 since_last_change.num_minutes()
364 ),
365 detected_at: now,
366 severity: IssueSeverity::Error,
367 recommended_action: RecommendedAction::Terminate,
368 });
369 }
370
371 if run.is_orphaned {
373 issues.push(HealthIssue {
374 issue_type: IssueType::OrphanedRun,
375 run_id: run.run_id.clone(),
376 description: "Run process no longer exists".to_string(),
377 detected_at: now,
378 severity: IssueSeverity::Critical,
379 recommended_action: RecommendedAction::Terminate,
380 });
381 }
382 }
383
384 HealthCheckResult {
385 issues,
386 runs_checked: self.runs.len(),
387 checked_at: now,
388 }
389 }
390
391 pub fn get_run(&self, run_id: &str) -> Option<&MonitoredRun> {
393 self.runs.get(run_id)
394 }
395
396 pub fn get_all_runs(&self) -> &HashMap<String, MonitoredRun> {
398 &self.runs
399 }
400
401 pub fn remove_run(&mut self, run_id: &str) {
403 self.runs.remove(run_id);
404 }
405
406 pub fn get_runs_by_state(&self, state: MonitoredRunState) -> Vec<&MonitoredRun> {
408 self.runs.values().filter(|r| r.state == state).collect()
409 }
410
411 pub fn get_runs_needing_attention(&self) -> Vec<&MonitoredRun> {
413 let health_result = self.check_health();
414 let run_ids: std::collections::HashSet<_> =
415 health_result.issues.iter().map(|i| &i.run_id).collect();
416
417 self.runs
418 .values()
419 .filter(|r| run_ids.contains(&r.run_id))
420 .collect()
421 }
422
423 pub fn generate_report(&self) -> String {
425 let result = self.check_health();
426 let mut report = String::new();
427
428 report.push_str(&format!(
429 "# Health Check Report\n\n**Checked at:** {}\n\n",
430 result.checked_at.format("%Y-%m-%d %H:%M:%S UTC")
431 ));
432
433 report.push_str(&format!("**Runs monitored:** {}\n\n", result.runs_checked));
434
435 if result.issues.is_empty() {
436 report.push_str("â
All runs healthy\n");
437 } else {
438 report.push_str(&format!(
439 "â ī¸ **Issues found:** {}\n\n",
440 result.issues.len()
441 ));
442
443 let mut critical = vec![];
445 let mut errors = vec![];
446 let mut warnings = vec![];
447 let mut infos = vec![];
448
449 for issue in &result.issues {
450 match issue.severity {
451 IssueSeverity::Critical => critical.push(issue),
452 IssueSeverity::Error => errors.push(issue),
453 IssueSeverity::Warning => warnings.push(issue),
454 IssueSeverity::Info => infos.push(issue),
455 }
456 }
457
458 for (severity, issues) in [
459 ("đ´ Critical", critical),
460 ("â Errors", errors),
461 ("â ī¸ Warnings", warnings),
462 ("âšī¸ Info", infos),
463 ] {
464 if !issues.is_empty() {
465 report.push_str(&format!("## {}\n\n", severity));
466 for issue in issues {
467 report.push_str(&format!("### {}\n", issue.run_id));
468 report.push_str(&format!("**Type:** {:?}\n\n", issue.issue_type));
469 report.push_str(&format!("{}\n\n", issue.description));
470 report.push_str(&format!(
471 "**Recommended action:** {:?}\n\n",
472 issue.recommended_action
473 ));
474 }
475 }
476 }
477 }
478
479 report.push_str("\n## Run States\n\n");
481 for state in [
482 MonitoredRunState::Pending,
483 MonitoredRunState::Running,
484 MonitoredRunState::Paused,
485 MonitoredRunState::Completed,
486 MonitoredRunState::Failed,
487 MonitoredRunState::Cancelled,
488 MonitoredRunState::TimedOut,
489 ] {
490 let count = self.get_runs_by_state(state).len();
491 report.push_str(&format!("- {:?}: {}\n", state, count));
492 }
493
494 report
495 }
496}
497
498pub struct HealthCheckTask {
500 watchdog: RunHealthWatchdog,
501 last_check: Option<DateTime<Utc>>,
502}
503
504impl HealthCheckTask {
505 pub fn new(watchdog: RunHealthWatchdog) -> Self {
507 Self {
508 watchdog,
509 last_check: None,
510 }
511 }
512
513 pub fn check(&mut self) -> HealthCheckResult {
515 let result = self.watchdog.check_health();
516 self.last_check = Some(Utc::now());
517 result
518 }
519
520 pub fn watchdog(&self) -> &RunHealthWatchdog {
522 &self.watchdog
523 }
524
525 pub fn watchdog_mut(&mut self) -> &mut RunHealthWatchdog {
527 &mut self.watchdog
528 }
529}
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534 use chrono::Duration;
535
536 #[test]
537 fn test_watchdog_registration() {
538 let config = HealthCheckConfig::default();
539 let mut watchdog = RunHealthWatchdog::new(config);
540
541 watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
542
543 let run = watchdog.get_run("run-1").unwrap();
544 assert_eq!(run.run_id, "run-1");
545 assert_eq!(run.workflow_name, "feature-dev");
546 assert!(matches!(run.state, MonitoredRunState::Pending));
547 }
548
549 #[test]
550 fn test_stuck_step_detection() {
551 let config = HealthCheckConfig {
552 enabled: true,
553 stuck_threshold_seconds: 10,
554 abandoned_threshold_minutes: 30,
555 check_interval_seconds: 60,
556 };
557
558 let mut watchdog = RunHealthWatchdog::new(config);
559 watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
560
561 let long_ago = Utc::now() - Duration::seconds(20);
563 watchdog.runs.get_mut("run-1").unwrap().current_step = Some(StepExecution {
564 step_id: "step-1".to_string(),
565 step_name: "Long Step".to_string(),
566 started_at: long_ago,
567 last_activity: long_ago,
568 state: StepState::Executing,
569 });
570
571 let result = watchdog.check_health();
572 assert!(!result.issues.is_empty());
573 assert!(result
574 .issues
575 .iter()
576 .any(|i| matches!(i.issue_type, IssueType::StuckStep)));
577 }
578
579 #[test]
580 fn test_abandoned_run_detection() {
581 let config = HealthCheckConfig {
582 enabled: true,
583 stuck_threshold_seconds: 300,
584 abandoned_threshold_minutes: 1, check_interval_seconds: 60,
586 };
587
588 let mut watchdog = RunHealthWatchdog::new(config);
589 watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
590
591 let long_ago = Utc::now() - Duration::minutes(5);
593 watchdog.runs.get_mut("run-1").unwrap().started_at = long_ago;
594
595 let result = watchdog.check_health();
596 assert!(!result.issues.is_empty());
597 assert!(result
598 .issues
599 .iter()
600 .any(|i| matches!(i.issue_type, IssueType::AbandonedRun)));
601 }
602
603 #[test]
604 fn test_orphaned_run_detection() {
605 let config = HealthCheckConfig::default();
606 let mut watchdog = RunHealthWatchdog::new(config);
607 watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
608 watchdog.mark_orphaned("run-1");
609
610 let result = watchdog.check_health();
611 assert!(!result.issues.is_empty());
612 assert!(result
613 .issues
614 .iter()
615 .any(|i| matches!(i.issue_type, IssueType::OrphanedRun)));
616 }
617
618 #[test]
619 fn test_health_report() {
620 let config = HealthCheckConfig::default();
621 let mut watchdog = RunHealthWatchdog::new(config);
622 watchdog.register_run("run-1".to_string(), "feature-dev".to_string());
623 watchdog.register_run("run-2".to_string(), "bug-fix".to_string());
624
625 let report = watchdog.generate_report();
626 assert!(report.contains("Health Check Report"));
627 assert!(report.contains("Runs monitored:** 2"));
628 }
629}