dag_executor/state/
task_state.rs1use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7pub fn now_millis() -> u64 {
12 SystemTime::now()
13 .duration_since(UNIX_EPOCH)
14 .map(|d| d.as_millis() as u64)
15 .unwrap_or(0)
16}
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum TaskState {
22 Pending,
24 Ready,
26 Running,
28 Completed,
30 Failed,
32 Retrying,
34 Cancelled,
36 Skipped,
38 DeadLettered,
40}
41
42impl TaskState {
43 pub fn is_terminal(&self) -> bool {
45 matches!(
46 self,
47 TaskState::Completed
48 | TaskState::Failed
49 | TaskState::Cancelled
50 | TaskState::Skipped
51 | TaskState::DeadLettered
52 )
53 }
54
55 pub fn is_success(&self) -> bool {
57 matches!(self, TaskState::Completed)
58 }
59
60 pub fn is_failure(&self) -> bool {
62 matches!(self, TaskState::Failed | TaskState::DeadLettered)
63 }
64
65 pub fn can_transition_to(&self, next: TaskState) -> bool {
70 use TaskState::*;
71 match (self, next) {
72 (a, b) if a == &b => true,
74 (Pending, Ready | Skipped | Cancelled) => true,
75 (Ready, Running | Skipped | Cancelled) => true,
76 (Running, Completed | Failed | Retrying | Cancelled) => true,
77 (Retrying, Running | Ready | Failed | DeadLettered | Cancelled) => true,
78 (Failed, Retrying | DeadLettered) => true,
79 _ => false,
80 }
81 }
82}
83
84impl std::fmt::Display for TaskState {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 let s = match self {
87 TaskState::Pending => "pending",
88 TaskState::Ready => "ready",
89 TaskState::Running => "running",
90 TaskState::Completed => "completed",
91 TaskState::Failed => "failed",
92 TaskState::Retrying => "retrying",
93 TaskState::Cancelled => "cancelled",
94 TaskState::Skipped => "skipped",
95 TaskState::DeadLettered => "dead_lettered",
96 };
97 f.write_str(s)
98 }
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct TaskRecord {
107 pub id: String,
109 pub state: TaskState,
111 pub attempts: u32,
113 pub created_at: u64,
115 pub updated_at: u64,
117 pub started_at: Option<u64>,
119 pub finished_at: Option<u64>,
121 pub output: Option<serde_json::Value>,
123 pub error: Option<String>,
125 pub metadata: HashMap<String, String>,
127}
128
129impl TaskRecord {
130 pub fn new(id: impl Into<String>) -> Self {
132 let now = now_millis();
133 TaskRecord {
134 id: id.into(),
135 state: TaskState::Pending,
136 attempts: 0,
137 created_at: now,
138 updated_at: now,
139 started_at: None,
140 finished_at: None,
141 output: None,
142 error: None,
143 metadata: HashMap::new(),
144 }
145 }
146
147 pub fn transition(&mut self, state: TaskState) -> bool {
152 if !self.state.can_transition_to(state) {
153 return false;
154 }
155 self.state = state;
156 self.updated_at = now_millis();
157 match state {
158 TaskState::Running => self.started_at = Some(self.updated_at),
159 s if s.is_terminal() => self.finished_at = Some(self.updated_at),
160 _ => {}
161 }
162 true
163 }
164
165 pub fn duration_millis(&self) -> Option<u64> {
167 match (self.started_at, self.finished_at) {
168 (Some(s), Some(f)) if f >= s => Some(f - s),
169 _ => None,
170 }
171 }
172}