Skip to main content

dag_executor/state/
validator.rs

1//! Validation and auto-repair for persisted task state.
2//!
3//! When a run is recovered from disk, records may be in inconsistent states —
4//! most commonly a task left in [`TaskState::Running`] because the process
5//! crashed mid-execution. The validator detects these and repairs them so the
6//! executor can safely resume.
7
8use crate::error::{Result, ValidationError};
9use crate::state::{TaskRecord, TaskState};
10use std::collections::HashMap;
11
12/// Validates state transitions and repairs recovered state.
13#[derive(Debug, Default, Clone)]
14pub struct StateValidator;
15
16/// Summary of repairs applied during [`StateValidator::repair`].
17#[derive(Debug, Default, PartialEq, Eq)]
18pub struct RepairReport {
19    /// Tasks that were reset from `Running`/`Retrying` back to a runnable state.
20    pub reset_running: Vec<String>,
21    /// Tasks whose `attempts` counter was clamped.
22    pub clamped_attempts: Vec<String>,
23}
24
25impl RepairReport {
26    /// Whether any repair was performed.
27    pub fn is_clean(&self) -> bool {
28        self.reset_running.is_empty() && self.clamped_attempts.is_empty()
29    }
30}
31
32impl StateValidator {
33    /// Construct a validator.
34    pub fn new() -> Self {
35        StateValidator
36    }
37
38    /// Validate a single proposed transition, returning an error if illegal.
39    pub fn validate_transition(&self, from: TaskState, to: TaskState) -> Result<()> {
40        if from.can_transition_to(to) {
41            Ok(())
42        } else {
43            Err(ValidationError::InvalidTransition {
44                from: from.to_string(),
45                to: to.to_string(),
46            }
47            .into())
48        }
49    }
50
51    /// Repair a set of recovered records in place.
52    ///
53    /// A task that was `Running` or `Retrying` when the process died is reset:
54    /// if it still has retry budget (`attempts < max_attempts`) it becomes
55    /// `Pending` (so dependency checks re-run), otherwise it is marked `Failed`.
56    pub fn repair(
57        &self,
58        records: &mut HashMap<String, TaskRecord>,
59        max_attempts: u32,
60    ) -> RepairReport {
61        let mut report = RepairReport::default();
62        for (id, record) in records.iter_mut() {
63            match record.state {
64                TaskState::Running | TaskState::Retrying => {
65                    // The worker that owned this is gone. Roll back to a state
66                    // the scheduler can reason about again.
67                    record.state = if record.attempts < max_attempts {
68                        TaskState::Pending
69                    } else {
70                        TaskState::Failed
71                    };
72                    record.started_at = None;
73                    record.updated_at = crate::state::now_millis();
74                    report.reset_running.push(id.clone());
75                }
76                _ => {}
77            }
78
79            if record.attempts > max_attempts {
80                record.attempts = max_attempts;
81                report.clamped_attempts.push(id.clone());
82            }
83        }
84        report
85    }
86}