dag_executor/state/validator.rs
1//! Validation and auto-repair for persisted task state.
2//!
3//! When a run is recovered from disk, records may be in inconsistent states —
4//! most commonly a task left in [`TaskState::Running`] because the process
5//! crashed mid-execution. The validator detects these and repairs them so the
6//! executor can safely resume.
7
8use crate::error::{Result, ValidationError};
9use crate::state::{TaskRecord, TaskState};
10use std::collections::HashMap;
11
12/// Validates state transitions and repairs recovered state.
13#[derive(Debug, Default, Clone)]
14pub struct StateValidator;
15
16/// Summary of repairs applied during [`StateValidator::repair`].
17#[derive(Debug, Default, PartialEq, Eq)]
18pub struct RepairReport {
19 /// Tasks that were reset from `Running`/`Retrying` back to a runnable state.
20 pub reset_running: Vec<String>,
21 /// Tasks whose `attempts` counter was clamped.
22 pub clamped_attempts: Vec<String>,
23}
24
25impl RepairReport {
26 /// Whether any repair was performed.
27 pub fn is_clean(&self) -> bool {
28 self.reset_running.is_empty() && self.clamped_attempts.is_empty()
29 }
30}
31
32impl StateValidator {
33 /// Construct a validator.
34 pub fn new() -> Self {
35 StateValidator
36 }
37
38 /// Validate a single proposed transition, returning an error if illegal.
39 pub fn validate_transition(&self, from: TaskState, to: TaskState) -> Result<()> {
40 if from.can_transition_to(to) {
41 Ok(())
42 } else {
43 Err(ValidationError::InvalidTransition {
44 from: from.to_string(),
45 to: to.to_string(),
46 }
47 .into())
48 }
49 }
50
51 /// Repair a set of recovered records in place.
52 ///
53 /// A task that was `Running` or `Retrying` when the process died is reset:
54 /// if it still has retry budget (`attempts < max_attempts`) it becomes
55 /// `Pending` (so dependency checks re-run), otherwise it is marked `Failed`.
56 pub fn repair(
57 &self,
58 records: &mut HashMap<String, TaskRecord>,
59 max_attempts: u32,
60 ) -> RepairReport {
61 let mut report = RepairReport::default();
62 for (id, record) in records.iter_mut() {
63 match record.state {
64 TaskState::Running | TaskState::Retrying => {
65 // The worker that owned this is gone. Roll back to a state
66 // the scheduler can reason about again.
67 record.state = if record.attempts < max_attempts {
68 TaskState::Pending
69 } else {
70 TaskState::Failed
71 };
72 record.started_at = None;
73 record.updated_at = crate::state::now_millis();
74 report.reset_running.push(id.clone());
75 }
76 _ => {}
77 }
78
79 if record.attempts > max_attempts {
80 record.attempts = max_attempts;
81 report.clamped_attempts.push(id.clone());
82 }
83 }
84 report
85 }
86}