Skip to main content

runkon_flow/
status.rs

1use serde::{Deserialize, Serialize};
2
3/// Status of a workflow run.
4#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
5#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
6#[serde(rename_all = "snake_case")]
7pub enum WorkflowRunStatus {
8    Pending,
9    Running,
10    Completed,
11    Failed,
12    Cancelled,
13    Waiting,
14    /// Transient staging state: the classifier has determined this run is
15    /// eligible for automatic resume. The watchdog picks it up on the next
16    /// tick, CAS-flips it back to `failed`, and spawns a resume thread.
17    /// Neither active nor terminal — consumed within one background tick.
18    NeedsResume,
19    /// Transient state: a cancel signal has been sent; the engine is cleaning up.
20    /// Neither active nor terminal — the engine transitions to `Cancelled` once cleanup completes.
21    Cancelling,
22}
23
24impl std::fmt::Display for WorkflowRunStatus {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        let s = match self {
27            Self::Pending => "pending",
28            Self::Running => "running",
29            Self::Completed => "completed",
30            Self::Failed => "failed",
31            Self::Cancelled => "cancelled",
32            Self::Waiting => "waiting",
33            Self::NeedsResume => "needs_resume",
34            Self::Cancelling => "cancelling",
35        };
36        write!(f, "{s}")
37    }
38}
39
40impl std::str::FromStr for WorkflowRunStatus {
41    type Err = String;
42    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
43        match s {
44            "pending" => Ok(Self::Pending),
45            "running" => Ok(Self::Running),
46            "completed" => Ok(Self::Completed),
47            "failed" => Ok(Self::Failed),
48            "cancelled" => Ok(Self::Cancelled),
49            "waiting" => Ok(Self::Waiting),
50            "needs_resume" => Ok(Self::NeedsResume),
51            "cancelling" => Ok(Self::Cancelling),
52            _ => Err(format!("unknown WorkflowRunStatus: {s}")),
53        }
54    }
55}
56
57impl WorkflowRunStatus {
58    /// Canonical set of statuses that constitute an "active" run.
59    pub const ACTIVE: [WorkflowRunStatus; 3] = [
60        WorkflowRunStatus::Pending,
61        WorkflowRunStatus::Running,
62        WorkflowRunStatus::Waiting,
63    ];
64
65    const ACTIVE_STRS: [&'static str; 3] = ["pending", "running", "waiting"];
66
67    /// Returns the SQL string representations of all active statuses.
68    pub fn active_strings() -> &'static [&'static str] {
69        &Self::ACTIVE_STRS
70    }
71
72    /// Whether this status is terminal (no further transitions expected).
73    pub fn is_terminal(&self) -> bool {
74        matches!(self, Self::Completed | Self::Failed | Self::Cancelled)
75    }
76
77    /// Whether this status is active (run is in progress or waiting).
78    pub fn is_active(&self) -> bool {
79        matches!(self, Self::Pending | Self::Running | Self::Waiting)
80    }
81}
82
83/// Status of a single workflow step execution.
84#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
85#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "snake_case")]
87pub enum WorkflowStepStatus {
88    #[default]
89    Pending,
90    Running,
91    Completed,
92    Failed,
93    Skipped,
94    Waiting,
95    TimedOut,
96}
97
98impl std::fmt::Display for WorkflowStepStatus {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        let s = match self {
101            Self::Pending => "pending",
102            Self::Running => "running",
103            Self::Completed => "completed",
104            Self::Failed => "failed",
105            Self::Skipped => "skipped",
106            Self::Waiting => "waiting",
107            Self::TimedOut => "timed_out",
108        };
109        write!(f, "{s}")
110    }
111}
112
113impl WorkflowStepStatus {
114    /// Short display label used in summaries and status columns.
115    pub fn short_label(&self) -> &'static str {
116        match self {
117            Self::Completed => "ok",
118            Self::Failed => "FAIL",
119            Self::Skipped => "skip",
120            Self::Running => "...",
121            Self::Pending => "-",
122            Self::Waiting => "wait",
123            Self::TimedOut => "tout",
124        }
125    }
126
127    /// Whether this status is terminal (no further transitions expected).
128    pub fn is_terminal(&self) -> bool {
129        matches!(
130            self,
131            Self::Completed | Self::Failed | Self::Skipped | Self::TimedOut
132        )
133    }
134
135    /// Whether this status represents a step that is starting (running or waiting).
136    pub fn is_starting(&self) -> bool {
137        matches!(self, Self::Running | Self::Waiting)
138    }
139}
140
141impl std::str::FromStr for WorkflowStepStatus {
142    type Err = String;
143    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
144        match s {
145            "pending" => Ok(Self::Pending),
146            "running" => Ok(Self::Running),
147            "completed" => Ok(Self::Completed),
148            "failed" => Ok(Self::Failed),
149            "skipped" => Ok(Self::Skipped),
150            "waiting" => Ok(Self::Waiting),
151            "timed_out" => Ok(Self::TimedOut),
152            _ => Err(format!("unknown WorkflowStepStatus: {s}")),
153        }
154    }
155}
156
157#[cfg(feature = "sqlite")]
158mod sql_impls {
159    use super::{WorkflowRunStatus, WorkflowStepStatus};
160    use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput, ValueRef};
161
162    fn status_to_sql(status: &impl std::fmt::Display) -> rusqlite::Result<ToSqlOutput<'_>> {
163        Ok(ToSqlOutput::from(status.to_string()))
164    }
165
166    fn status_from_sql<T>(value: ValueRef<'_>) -> FromSqlResult<T>
167    where
168        T: std::str::FromStr<Err = String>,
169    {
170        let s = String::column_result(value)?;
171        s.parse().map_err(|e: String| {
172            FromSqlError::Other(Box::new(std::io::Error::new(
173                std::io::ErrorKind::InvalidData,
174                e,
175            )))
176        })
177    }
178
179    impl ToSql for WorkflowRunStatus {
180        fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
181            status_to_sql(self)
182        }
183    }
184
185    impl FromSql for WorkflowRunStatus {
186        fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
187            status_from_sql(value)
188        }
189    }
190
191    impl ToSql for WorkflowStepStatus {
192        fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
193            status_to_sql(self)
194        }
195    }
196
197    impl FromSql for WorkflowStepStatus {
198        fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
199            status_from_sql(value)
200        }
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207
208    #[test]
209    fn active_strings_matches_display() {
210        let from_display: Vec<String> = WorkflowRunStatus::ACTIVE
211            .iter()
212            .map(|s| s.to_string())
213            .collect();
214        assert_eq!(
215            WorkflowRunStatus::active_strings(),
216            from_display
217                .iter()
218                .map(|s| s.as_str())
219                .collect::<Vec<_>>()
220                .as_slice(),
221            "ACTIVE_STRS must match Display impl for each ACTIVE variant"
222        );
223    }
224
225    #[test]
226    fn run_terminal_states() {
227        assert!(WorkflowRunStatus::Completed.is_terminal());
228        assert!(WorkflowRunStatus::Failed.is_terminal());
229        assert!(WorkflowRunStatus::Cancelled.is_terminal());
230        assert!(!WorkflowRunStatus::Pending.is_terminal());
231        assert!(!WorkflowRunStatus::Running.is_terminal());
232        assert!(!WorkflowRunStatus::Waiting.is_terminal());
233        // NeedsResume is a transient staging state — not terminal.
234        assert!(!WorkflowRunStatus::NeedsResume.is_terminal());
235    }
236
237    #[test]
238    fn run_active_states() {
239        assert!(WorkflowRunStatus::Pending.is_active());
240        assert!(WorkflowRunStatus::Running.is_active());
241        assert!(WorkflowRunStatus::Waiting.is_active());
242        assert!(!WorkflowRunStatus::Completed.is_active());
243        assert!(!WorkflowRunStatus::Failed.is_active());
244        assert!(!WorkflowRunStatus::Cancelled.is_active());
245        // NeedsResume is a transient staging state — not active.
246        assert!(!WorkflowRunStatus::NeedsResume.is_active());
247    }
248
249    #[test]
250    fn step_terminal_states() {
251        assert!(WorkflowStepStatus::Completed.is_terminal());
252        assert!(WorkflowStepStatus::Failed.is_terminal());
253        assert!(WorkflowStepStatus::Skipped.is_terminal());
254        assert!(WorkflowStepStatus::TimedOut.is_terminal());
255        assert!(!WorkflowStepStatus::Pending.is_terminal());
256        assert!(!WorkflowStepStatus::Running.is_terminal());
257        assert!(!WorkflowStepStatus::Waiting.is_terminal());
258    }
259
260    #[test]
261    fn step_starting_states() {
262        assert!(WorkflowStepStatus::Running.is_starting());
263        assert!(WorkflowStepStatus::Waiting.is_starting());
264        assert!(!WorkflowStepStatus::Pending.is_starting());
265        assert!(!WorkflowStepStatus::Completed.is_starting());
266        assert!(!WorkflowStepStatus::Failed.is_starting());
267        assert!(!WorkflowStepStatus::Skipped.is_starting());
268        assert!(!WorkflowStepStatus::TimedOut.is_starting());
269    }
270
271    #[test]
272    fn timed_out_is_not_a_valid_run_status() {
273        use std::str::FromStr;
274        // 'timed_out' is valid only for workflow_run_steps (WorkflowStepStatus::TimedOut).
275        // workflow_runs.status must never be 'timed_out'; the schema CHECK constraint
276        // (migration 080) enforces this at the DB level.
277        assert!(WorkflowRunStatus::from_str("timed_out").is_err());
278    }
279
280    #[test]
281    fn run_terminal_and_active_are_mutually_exclusive() {
282        // These statuses must be exactly one of terminal or active.
283        let exactly_one = [
284            WorkflowRunStatus::Pending,
285            WorkflowRunStatus::Running,
286            WorkflowRunStatus::Completed,
287            WorkflowRunStatus::Failed,
288            WorkflowRunStatus::Cancelled,
289            WorkflowRunStatus::Waiting,
290        ];
291        for s in exactly_one {
292            assert!(
293                s.is_terminal() != s.is_active(),
294                "{s} should be exactly one of terminal or active"
295            );
296        }
297        // NeedsResume is a transient staging state — neither terminal nor active.
298        // At most one of is_terminal / is_active may be true for any status.
299        assert!(
300            !(WorkflowRunStatus::NeedsResume.is_terminal()
301                && WorkflowRunStatus::NeedsResume.is_active()),
302            "NeedsResume must not be both terminal and active"
303        );
304    }
305
306    #[cfg(feature = "sqlite")]
307    mod rusqlite_roundtrip {
308        use super::*;
309        use rusqlite::types::{FromSql, ToSql};
310
311        fn roundtrip_all<T>(conn: &rusqlite::Connection, variants: &[T])
312        where
313            T: ToSql + FromSql + std::fmt::Display + PartialEq + std::fmt::Debug,
314        {
315            conn.execute("CREATE TABLE IF NOT EXISTS t (status TEXT)", [])
316                .unwrap();
317            for variant in variants {
318                conn.execute("INSERT INTO t (status) VALUES (?1)", [variant])
319                    .unwrap();
320                let recovered: T = conn
321                    .query_row("SELECT status FROM t", [], |row| row.get(0))
322                    .unwrap();
323                assert_eq!(*variant, recovered, "round-trip failed for {variant}");
324                conn.execute("DELETE FROM t", []).unwrap();
325            }
326        }
327
328        fn invalid_string_errors<T>(conn: &rusqlite::Connection)
329        where
330            T: FromSql,
331        {
332            conn.execute("CREATE TABLE IF NOT EXISTS t (status TEXT)", [])
333                .unwrap();
334            conn.execute("INSERT INTO t (status) VALUES (?1)", ["not_a_status"])
335                .unwrap();
336            let result = conn.query_row::<T, _, _>("SELECT status FROM t", [], |row| row.get(0));
337            assert!(result.is_err(), "expected error for invalid status string");
338        }
339
340        #[test]
341        fn workflow_run_status_roundtrip() {
342            let conn = rusqlite::Connection::open_in_memory().unwrap();
343            roundtrip_all(
344                &conn,
345                &[
346                    WorkflowRunStatus::Pending,
347                    WorkflowRunStatus::Running,
348                    WorkflowRunStatus::Completed,
349                    WorkflowRunStatus::Failed,
350                    WorkflowRunStatus::Cancelled,
351                    WorkflowRunStatus::Waiting,
352                    WorkflowRunStatus::NeedsResume,
353                    WorkflowRunStatus::Cancelling,
354                ],
355            );
356        }
357
358        #[test]
359        fn workflow_step_status_roundtrip() {
360            let conn = rusqlite::Connection::open_in_memory().unwrap();
361            roundtrip_all(
362                &conn,
363                &[
364                    WorkflowStepStatus::Pending,
365                    WorkflowStepStatus::Running,
366                    WorkflowStepStatus::Completed,
367                    WorkflowStepStatus::Failed,
368                    WorkflowStepStatus::Skipped,
369                    WorkflowStepStatus::Waiting,
370                    WorkflowStepStatus::TimedOut,
371                ],
372            );
373        }
374
375        #[test]
376        fn workflow_run_status_invalid_string_errors() {
377            let conn = rusqlite::Connection::open_in_memory().unwrap();
378            invalid_string_errors::<WorkflowRunStatus>(&conn);
379        }
380
381        #[test]
382        fn workflow_step_status_invalid_string_errors() {
383            let conn = rusqlite::Connection::open_in_memory().unwrap();
384            invalid_string_errors::<WorkflowStepStatus>(&conn);
385        }
386    }
387}