dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Task lifecycle states and the persisted record that tracks them.

use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};

/// Returns the current time as milliseconds since the Unix epoch.
///
/// Stored as a plain `u64` so records serialize cleanly to JSON and survive
/// process restarts.
pub fn now_millis() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_millis() as u64)
        .unwrap_or(0)
}

/// The lifecycle state of a single task.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TaskState {
    /// Registered but dependencies not yet satisfied.
    Pending,
    /// All dependencies satisfied; eligible to be scheduled.
    Ready,
    /// Currently executing on a worker.
    Running,
    /// Finished successfully.
    Completed,
    /// Finished with an error (retries exhausted).
    Failed,
    /// Failed but scheduled for another attempt.
    Retrying,
    /// Cancelled (e.g. graceful shutdown).
    Cancelled,
    /// Skipped because an upstream dependency failed.
    Skipped,
    /// Failed terminally and routed to the dead-letter queue.
    DeadLettered,
}

impl TaskState {
    /// Whether the task has reached a state from which it will not progress.
    pub fn is_terminal(&self) -> bool {
        matches!(
            self,
            TaskState::Completed
                | TaskState::Failed
                | TaskState::Cancelled
                | TaskState::Skipped
                | TaskState::DeadLettered
        )
    }

    /// Whether this state counts as a successful completion.
    pub fn is_success(&self) -> bool {
        matches!(self, TaskState::Completed)
    }

    /// Whether this state counts as a failure outcome.
    pub fn is_failure(&self) -> bool {
        matches!(self, TaskState::Failed | TaskState::DeadLettered)
    }

    /// Whether a transition to `next` is permitted by the state machine.
    ///
    /// This is the single source of truth for legal lifecycle transitions and
    /// is enforced by [`crate::state::StateValidator`].
    pub fn can_transition_to(&self, next: TaskState) -> bool {
        use TaskState::*;
        match (self, next) {
            // Idempotent no-ops are always allowed.
            (a, b) if a == &b => true,
            (Pending, Ready | Skipped | Cancelled) => true,
            (Ready, Running | Skipped | Cancelled) => true,
            (Running, Completed | Failed | Retrying | Cancelled) => true,
            (Retrying, Running | Ready | Failed | DeadLettered | Cancelled) => true,
            (Failed, Retrying | DeadLettered) => true,
            _ => false,
        }
    }
}

impl std::fmt::Display for TaskState {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let s = match self {
            TaskState::Pending => "pending",
            TaskState::Ready => "ready",
            TaskState::Running => "running",
            TaskState::Completed => "completed",
            TaskState::Failed => "failed",
            TaskState::Retrying => "retrying",
            TaskState::Cancelled => "cancelled",
            TaskState::Skipped => "skipped",
            TaskState::DeadLettered => "dead_lettered",
        };
        f.write_str(s)
    }
}

/// A persisted snapshot of a task's execution history.
///
/// This is the unit of durability: the executor saves one record per task so
/// that a crashed run can be recovered without re-executing completed work.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskRecord {
    /// Stable task identifier.
    pub id: String,
    /// Current lifecycle state.
    pub state: TaskState,
    /// Number of execution attempts made so far.
    pub attempts: u32,
    /// When the record was first created (ms since epoch).
    pub created_at: u64,
    /// When the record was last modified (ms since epoch).
    pub updated_at: u64,
    /// When execution most recently started, if ever.
    pub started_at: Option<u64>,
    /// When execution most recently finished, if ever.
    pub finished_at: Option<u64>,
    /// The successful output, if any.
    pub output: Option<serde_json::Value>,
    /// The last error message, if any.
    pub error: Option<String>,
    /// Arbitrary user/system metadata.
    pub metadata: HashMap<String, String>,
}

impl TaskRecord {
    /// Create a fresh record in the [`TaskState::Pending`] state.
    pub fn new(id: impl Into<String>) -> Self {
        let now = now_millis();
        TaskRecord {
            id: id.into(),
            state: TaskState::Pending,
            attempts: 0,
            created_at: now,
            updated_at: now,
            started_at: None,
            finished_at: None,
            output: None,
            error: None,
            metadata: HashMap::new(),
        }
    }

    /// Transition the record to `state`, updating `updated_at`.
    ///
    /// Returns `false` (and leaves the record untouched) if the transition is
    /// not allowed by [`TaskState::can_transition_to`].
    pub fn transition(&mut self, state: TaskState) -> bool {
        if !self.state.can_transition_to(state) {
            return false;
        }
        self.state = state;
        self.updated_at = now_millis();
        match state {
            TaskState::Running => self.started_at = Some(self.updated_at),
            s if s.is_terminal() => self.finished_at = Some(self.updated_at),
            _ => {}
        }
        true
    }

    /// Wall-clock execution duration in milliseconds, if both endpoints exist.
    pub fn duration_millis(&self) -> Option<u64> {
        match (self.started_at, self.finished_at) {
            (Some(s), Some(f)) if f >= s => Some(f - s),
            _ => None,
        }
    }
}