dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Validation and auto-repair for persisted task state.
//!
//! When a run is recovered from disk, records may be in inconsistent states —
//! most commonly a task left in [`TaskState::Running`] because the process
//! crashed mid-execution. The validator detects these and repairs them so the
//! executor can safely resume.

use crate::error::{Result, ValidationError};
use crate::state::{TaskRecord, TaskState};
use std::collections::HashMap;

/// Validates state transitions and repairs recovered state.
#[derive(Debug, Default, Clone)]
pub struct StateValidator;

/// Summary of repairs applied during [`StateValidator::repair`].
#[derive(Debug, Default, PartialEq, Eq)]
pub struct RepairReport {
    /// Tasks that were reset from `Running`/`Retrying` back to a runnable state.
    pub reset_running: Vec<String>,
    /// Tasks whose `attempts` counter was clamped.
    pub clamped_attempts: Vec<String>,
}

impl RepairReport {
    /// Whether any repair was performed.
    pub fn is_clean(&self) -> bool {
        self.reset_running.is_empty() && self.clamped_attempts.is_empty()
    }
}

impl StateValidator {
    /// Construct a validator.
    pub fn new() -> Self {
        StateValidator
    }

    /// Validate a single proposed transition, returning an error if illegal.
    pub fn validate_transition(&self, from: TaskState, to: TaskState) -> Result<()> {
        if from.can_transition_to(to) {
            Ok(())
        } else {
            Err(ValidationError::InvalidTransition {
                from: from.to_string(),
                to: to.to_string(),
            }
            .into())
        }
    }

    /// Repair a set of recovered records in place.
    ///
    /// A task that was `Running` or `Retrying` when the process died is reset:
    /// if it still has retry budget (`attempts < max_attempts`) it becomes
    /// `Pending` (so dependency checks re-run), otherwise it is marked `Failed`.
    pub fn repair(
        &self,
        records: &mut HashMap<String, TaskRecord>,
        max_attempts: u32,
    ) -> RepairReport {
        let mut report = RepairReport::default();
        for (id, record) in records.iter_mut() {
            match record.state {
                TaskState::Running | TaskState::Retrying => {
                    // The worker that owned this is gone. Roll back to a state
                    // the scheduler can reason about again.
                    record.state = if record.attempts < max_attempts {
                        TaskState::Pending
                    } else {
                        TaskState::Failed
                    };
                    record.started_at = None;
                    record.updated_at = crate::state::now_millis();
                    report.reset_running.push(id.clone());
                }
                _ => {}
            }

            if record.attempts > max_attempts {
                record.attempts = max_attempts;
                report.clamped_attempts.push(id.clone());
            }
        }
        report
    }
}