Skip to main content

dag_executor/state/
task_state.rs

1//! Task lifecycle states and the persisted record that tracks them.
2
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7/// Returns the current time as milliseconds since the Unix epoch.
8///
9/// Stored as a plain `u64` so records serialize cleanly to JSON and survive
10/// process restarts.
11pub fn now_millis() -> u64 {
12    SystemTime::now()
13        .duration_since(UNIX_EPOCH)
14        .map(|d| d.as_millis() as u64)
15        .unwrap_or(0)
16}
17
18/// The lifecycle state of a single task.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum TaskState {
22    /// Registered but dependencies not yet satisfied.
23    Pending,
24    /// All dependencies satisfied; eligible to be scheduled.
25    Ready,
26    /// Currently executing on a worker.
27    Running,
28    /// Finished successfully.
29    Completed,
30    /// Finished with an error (retries exhausted).
31    Failed,
32    /// Failed but scheduled for another attempt.
33    Retrying,
34    /// Cancelled (e.g. graceful shutdown).
35    Cancelled,
36    /// Skipped because an upstream dependency failed.
37    Skipped,
38    /// Failed terminally and routed to the dead-letter queue.
39    DeadLettered,
40}
41
42impl TaskState {
43    /// Whether the task has reached a state from which it will not progress.
44    pub fn is_terminal(&self) -> bool {
45        matches!(
46            self,
47            TaskState::Completed
48                | TaskState::Failed
49                | TaskState::Cancelled
50                | TaskState::Skipped
51                | TaskState::DeadLettered
52        )
53    }
54
55    /// Whether this state counts as a successful completion.
56    pub fn is_success(&self) -> bool {
57        matches!(self, TaskState::Completed)
58    }
59
60    /// Whether this state counts as a failure outcome.
61    pub fn is_failure(&self) -> bool {
62        matches!(self, TaskState::Failed | TaskState::DeadLettered)
63    }
64
65    /// Whether a transition to `next` is permitted by the state machine.
66    ///
67    /// This is the single source of truth for legal lifecycle transitions and
68    /// is enforced by [`crate::state::StateValidator`].
69    pub fn can_transition_to(&self, next: TaskState) -> bool {
70        use TaskState::*;
71        match (self, next) {
72            // Idempotent no-ops are always allowed.
73            (a, b) if a == &b => true,
74            (Pending, Ready | Skipped | Cancelled) => true,
75            (Ready, Running | Skipped | Cancelled) => true,
76            (Running, Completed | Failed | Retrying | Cancelled) => true,
77            (Retrying, Running | Ready | Failed | DeadLettered | Cancelled) => true,
78            (Failed, Retrying | DeadLettered) => true,
79            _ => false,
80        }
81    }
82}
83
84impl std::fmt::Display for TaskState {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        let s = match self {
87            TaskState::Pending => "pending",
88            TaskState::Ready => "ready",
89            TaskState::Running => "running",
90            TaskState::Completed => "completed",
91            TaskState::Failed => "failed",
92            TaskState::Retrying => "retrying",
93            TaskState::Cancelled => "cancelled",
94            TaskState::Skipped => "skipped",
95            TaskState::DeadLettered => "dead_lettered",
96        };
97        f.write_str(s)
98    }
99}
100
101/// A persisted snapshot of a task's execution history.
102///
103/// This is the unit of durability: the executor saves one record per task so
104/// that a crashed run can be recovered without re-executing completed work.
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct TaskRecord {
107    /// Stable task identifier.
108    pub id: String,
109    /// Current lifecycle state.
110    pub state: TaskState,
111    /// Number of execution attempts made so far.
112    pub attempts: u32,
113    /// When the record was first created (ms since epoch).
114    pub created_at: u64,
115    /// When the record was last modified (ms since epoch).
116    pub updated_at: u64,
117    /// When execution most recently started, if ever.
118    pub started_at: Option<u64>,
119    /// When execution most recently finished, if ever.
120    pub finished_at: Option<u64>,
121    /// The successful output, if any.
122    pub output: Option<serde_json::Value>,
123    /// The last error message, if any.
124    pub error: Option<String>,
125    /// Arbitrary user/system metadata.
126    pub metadata: HashMap<String, String>,
127}
128
129impl TaskRecord {
130    /// Create a fresh record in the [`TaskState::Pending`] state.
131    pub fn new(id: impl Into<String>) -> Self {
132        let now = now_millis();
133        TaskRecord {
134            id: id.into(),
135            state: TaskState::Pending,
136            attempts: 0,
137            created_at: now,
138            updated_at: now,
139            started_at: None,
140            finished_at: None,
141            output: None,
142            error: None,
143            metadata: HashMap::new(),
144        }
145    }
146
147    /// Transition the record to `state`, updating `updated_at`.
148    ///
149    /// Returns `false` (and leaves the record untouched) if the transition is
150    /// not allowed by [`TaskState::can_transition_to`].
151    pub fn transition(&mut self, state: TaskState) -> bool {
152        if !self.state.can_transition_to(state) {
153            return false;
154        }
155        self.state = state;
156        self.updated_at = now_millis();
157        match state {
158            TaskState::Running => self.started_at = Some(self.updated_at),
159            s if s.is_terminal() => self.finished_at = Some(self.updated_at),
160            _ => {}
161        }
162        true
163    }
164
165    /// Wall-clock execution duration in milliseconds, if both endpoints exist.
166    pub fn duration_millis(&self) -> Option<u64> {
167        match (self.started_at, self.finished_at) {
168            (Some(s), Some(f)) if f >= s => Some(f - s),
169            _ => None,
170        }
171    }
172}