Skip to main content

ironflow_engine/fsm/
run_fsm.rs

1//! [`RunFsm`] — Finite state machine for the run lifecycle.
2//!
3//! Events drive transitions; the FSM rejects invalid ones and keeps
4//! a full history of state changes.
5
6use chrono::Utc;
7use ironflow_store::entities::RunStatus;
8use serde::{Deserialize, Serialize};
9use strum::Display;
10
11use super::{Transition, TransitionError};
12
13/// Events that drive [`RunFsm`] transitions.
14///
15/// Each event represents something that happened during execution.
16///
17/// # Examples
18///
19/// ```
20/// use ironflow_engine::fsm::RunEvent;
21///
22/// let event = RunEvent::PickedUp;
23/// assert_eq!(event.to_string(), "picked_up");
24/// ```
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Display)]
26#[serde(rename_all = "snake_case")]
27#[strum(serialize_all = "snake_case")]
28pub enum RunEvent {
29    /// Worker or inline executor picked up the run.
30    PickedUp,
31    /// All steps completed successfully.
32    AllStepsCompleted,
33    /// A step failed and the error is not retryable, or max retries exhausted.
34    StepFailed,
35    /// A step failed but a retry is possible.
36    StepFailedRetryable,
37    /// Retry attempt started.
38    RetryStarted,
39    /// Maximum retries exhausted after a retryable failure.
40    MaxRetriesExceeded,
41    /// User or API requested cancellation.
42    CancelRequested,
43    /// A step requires human approval before continuing.
44    ApprovalRequested,
45    /// Human approved the run to continue.
46    Approved,
47    /// Human rejected the run.
48    Rejected,
49}
50
51/// Finite state machine for a workflow run.
52///
53/// Wraps a [`RunStatus`] and enforces valid transitions via typed
54/// [`RunEvent`]s. Records every transition in a history log.
55///
56/// # Transition table
57///
58/// | From | Event | To |
59/// |------|-------|----|
60/// | Pending | PickedUp | Running |
61/// | Pending | CancelRequested | Cancelled |
62/// | Running | AllStepsCompleted | Completed |
63/// | Running | StepFailed | Failed |
64/// | Running | StepFailedRetryable | Retrying |
65/// | Running | CancelRequested | Cancelled |
66/// | Retrying | RetryStarted | Running |
67/// | Retrying | MaxRetriesExceeded | Failed |
68/// | Retrying | CancelRequested | Cancelled |
69/// | Running | ApprovalRequested | AwaitingApproval |
70/// | AwaitingApproval | Approved | Running |
71/// | AwaitingApproval | Rejected | Failed |
72/// | AwaitingApproval | CancelRequested | Cancelled |
73///
74/// # Examples
75///
76/// ```
77/// use ironflow_engine::fsm::{RunFsm, RunEvent};
78/// use ironflow_store::entities::RunStatus;
79///
80/// let mut fsm = RunFsm::new();
81/// assert_eq!(fsm.state(), RunStatus::Pending);
82///
83/// fsm.apply(RunEvent::PickedUp).unwrap();
84/// assert_eq!(fsm.state(), RunStatus::Running);
85///
86/// fsm.apply(RunEvent::AllStepsCompleted).unwrap();
87/// assert_eq!(fsm.state(), RunStatus::Completed);
88/// assert_eq!(fsm.history().len(), 2);
89/// ```
90#[derive(Debug, Clone)]
91pub struct RunFsm {
92    state: RunStatus,
93    history: Vec<Transition<RunStatus, RunEvent>>,
94}
95
96impl RunFsm {
97    /// Create a new FSM in `Pending` state.
98    ///
99    /// # Examples
100    ///
101    /// ```
102    /// use ironflow_engine::fsm::RunFsm;
103    /// use ironflow_store::entities::RunStatus;
104    ///
105    /// let fsm = RunFsm::new();
106    /// assert_eq!(fsm.state(), RunStatus::Pending);
107    /// ```
108    pub fn new() -> Self {
109        Self {
110            state: RunStatus::Pending,
111            history: Vec::new(),
112        }
113    }
114
115    /// Create a FSM from an existing state (e.g. loaded from DB).
116    ///
117    /// # Examples
118    ///
119    /// ```
120    /// use ironflow_engine::fsm::RunFsm;
121    /// use ironflow_store::entities::RunStatus;
122    ///
123    /// let fsm = RunFsm::from_state(RunStatus::Running);
124    /// assert_eq!(fsm.state(), RunStatus::Running);
125    /// ```
126    pub fn from_state(state: RunStatus) -> Self {
127        Self {
128            state,
129            history: Vec::new(),
130        }
131    }
132
133    /// Returns the current state.
134    pub fn state(&self) -> RunStatus {
135        self.state
136    }
137
138    /// Returns the full transition history.
139    pub fn history(&self) -> &[Transition<RunStatus, RunEvent>] {
140        &self.history
141    }
142
143    /// Returns `true` if the FSM is in a terminal state.
144    pub fn is_terminal(&self) -> bool {
145        self.state.is_terminal()
146    }
147
148    /// Apply an event, transitioning to a new state if valid.
149    ///
150    /// # Errors
151    ///
152    /// Returns [`TransitionError`] if the event is not allowed in the current state.
153    ///
154    /// # Examples
155    ///
156    /// ```
157    /// use ironflow_engine::fsm::{RunFsm, RunEvent};
158    /// use ironflow_store::entities::RunStatus;
159    ///
160    /// let mut fsm = RunFsm::new();
161    ///
162    /// // Valid transition
163    /// assert!(fsm.apply(RunEvent::PickedUp).is_ok());
164    ///
165    /// // Invalid: can't pick up a running run
166    /// assert!(fsm.apply(RunEvent::PickedUp).is_err());
167    /// ```
168    pub fn apply(
169        &mut self,
170        event: RunEvent,
171    ) -> Result<RunStatus, TransitionError<RunStatus, RunEvent>> {
172        let next = next_state(self.state, event).ok_or(TransitionError {
173            from: self.state,
174            event,
175        })?;
176
177        let transition = Transition {
178            from: self.state,
179            to: next,
180            event,
181            at: Utc::now(),
182        };
183
184        self.history.push(transition);
185        self.state = next;
186        Ok(next)
187    }
188
189    /// Check if an event would be accepted without applying it.
190    ///
191    /// # Examples
192    ///
193    /// ```
194    /// use ironflow_engine::fsm::{RunFsm, RunEvent};
195    ///
196    /// let fsm = RunFsm::new();
197    /// assert!(fsm.can_apply(RunEvent::PickedUp));
198    /// assert!(!fsm.can_apply(RunEvent::AllStepsCompleted));
199    /// ```
200    pub fn can_apply(&self, event: RunEvent) -> bool {
201        next_state(self.state, event).is_some()
202    }
203}
204
205impl Default for RunFsm {
206    fn default() -> Self {
207        Self::new()
208    }
209}
210
211/// Pure transition function — returns the next state for a given (state, event)
212/// pair, or `None` if the transition is invalid.
213fn next_state(from: RunStatus, event: RunEvent) -> Option<RunStatus> {
214    match (from, event) {
215        // Pending
216        (RunStatus::Pending, RunEvent::PickedUp) => Some(RunStatus::Running),
217        (RunStatus::Pending, RunEvent::CancelRequested) => Some(RunStatus::Cancelled),
218
219        // Running
220        (RunStatus::Running, RunEvent::AllStepsCompleted) => Some(RunStatus::Completed),
221        (RunStatus::Running, RunEvent::StepFailed) => Some(RunStatus::Failed),
222        (RunStatus::Running, RunEvent::StepFailedRetryable) => Some(RunStatus::Retrying),
223        (RunStatus::Running, RunEvent::CancelRequested) => Some(RunStatus::Cancelled),
224
225        // Retrying
226        (RunStatus::Retrying, RunEvent::RetryStarted) => Some(RunStatus::Running),
227        (RunStatus::Retrying, RunEvent::MaxRetriesExceeded) => Some(RunStatus::Failed),
228        (RunStatus::Retrying, RunEvent::CancelRequested) => Some(RunStatus::Cancelled),
229
230        // Approval
231        (RunStatus::Running, RunEvent::ApprovalRequested) => Some(RunStatus::AwaitingApproval),
232        (RunStatus::AwaitingApproval, RunEvent::Approved) => Some(RunStatus::Running),
233        (RunStatus::AwaitingApproval, RunEvent::Rejected) => Some(RunStatus::Failed),
234        (RunStatus::AwaitingApproval, RunEvent::CancelRequested) => Some(RunStatus::Cancelled),
235
236        // Terminal states and all other combos → invalid
237        _ => None,
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244
245    // ---- Happy paths ----
246
247    #[test]
248    fn pending_to_running() {
249        let mut fsm = RunFsm::new();
250        let result = fsm.apply(RunEvent::PickedUp);
251        assert!(result.is_ok());
252        assert_eq!(fsm.state(), RunStatus::Running);
253    }
254
255    #[test]
256    fn full_success_path() {
257        let mut fsm = RunFsm::new();
258        fsm.apply(RunEvent::PickedUp).unwrap();
259        fsm.apply(RunEvent::AllStepsCompleted).unwrap();
260        assert_eq!(fsm.state(), RunStatus::Completed);
261        assert!(fsm.is_terminal());
262        assert_eq!(fsm.history().len(), 2);
263    }
264
265    #[test]
266    fn full_failure_path() {
267        let mut fsm = RunFsm::new();
268        fsm.apply(RunEvent::PickedUp).unwrap();
269        fsm.apply(RunEvent::StepFailed).unwrap();
270        assert_eq!(fsm.state(), RunStatus::Failed);
271        assert!(fsm.is_terminal());
272    }
273
274    #[test]
275    fn retry_then_success() {
276        let mut fsm = RunFsm::new();
277        fsm.apply(RunEvent::PickedUp).unwrap();
278        fsm.apply(RunEvent::StepFailedRetryable).unwrap();
279        assert_eq!(fsm.state(), RunStatus::Retrying);
280
281        fsm.apply(RunEvent::RetryStarted).unwrap();
282        assert_eq!(fsm.state(), RunStatus::Running);
283
284        fsm.apply(RunEvent::AllStepsCompleted).unwrap();
285        assert_eq!(fsm.state(), RunStatus::Completed);
286        assert_eq!(fsm.history().len(), 4);
287    }
288
289    #[test]
290    fn retry_then_max_retries_exceeded() {
291        let mut fsm = RunFsm::new();
292        fsm.apply(RunEvent::PickedUp).unwrap();
293        fsm.apply(RunEvent::StepFailedRetryable).unwrap();
294        fsm.apply(RunEvent::MaxRetriesExceeded).unwrap();
295        assert_eq!(fsm.state(), RunStatus::Failed);
296    }
297
298    #[test]
299    fn cancel_from_pending() {
300        let mut fsm = RunFsm::new();
301        fsm.apply(RunEvent::CancelRequested).unwrap();
302        assert_eq!(fsm.state(), RunStatus::Cancelled);
303        assert!(fsm.is_terminal());
304    }
305
306    #[test]
307    fn cancel_from_running() {
308        let mut fsm = RunFsm::new();
309        fsm.apply(RunEvent::PickedUp).unwrap();
310        fsm.apply(RunEvent::CancelRequested).unwrap();
311        assert_eq!(fsm.state(), RunStatus::Cancelled);
312    }
313
314    #[test]
315    fn cancel_from_retrying() {
316        let mut fsm = RunFsm::new();
317        fsm.apply(RunEvent::PickedUp).unwrap();
318        fsm.apply(RunEvent::StepFailedRetryable).unwrap();
319        fsm.apply(RunEvent::CancelRequested).unwrap();
320        assert_eq!(fsm.state(), RunStatus::Cancelled);
321    }
322
323    // ---- Invalid transitions ----
324
325    #[test]
326    fn cannot_complete_from_pending() {
327        let mut fsm = RunFsm::new();
328        let result = fsm.apply(RunEvent::AllStepsCompleted);
329        assert!(result.is_err());
330        assert_eq!(fsm.state(), RunStatus::Pending);
331    }
332
333    #[test]
334    fn cannot_pick_up_running() {
335        let mut fsm = RunFsm::new();
336        fsm.apply(RunEvent::PickedUp).unwrap();
337        let result = fsm.apply(RunEvent::PickedUp);
338        assert!(result.is_err());
339    }
340
341    #[test]
342    fn cannot_transition_from_terminal() {
343        let mut fsm = RunFsm::new();
344        fsm.apply(RunEvent::PickedUp).unwrap();
345        fsm.apply(RunEvent::AllStepsCompleted).unwrap();
346
347        assert!(fsm.apply(RunEvent::PickedUp).is_err());
348        assert!(fsm.apply(RunEvent::CancelRequested).is_err());
349        assert!(fsm.apply(RunEvent::StepFailed).is_err());
350    }
351
352    // ---- can_apply ----
353
354    #[test]
355    fn can_apply_checks_without_mutation() {
356        let fsm = RunFsm::new();
357        assert!(fsm.can_apply(RunEvent::PickedUp));
358        assert!(fsm.can_apply(RunEvent::CancelRequested));
359        assert!(!fsm.can_apply(RunEvent::AllStepsCompleted));
360        assert!(!fsm.can_apply(RunEvent::StepFailed));
361        assert_eq!(fsm.state(), RunStatus::Pending);
362    }
363
364    // ---- from_state ----
365
366    #[test]
367    fn from_state_resumes_at_given_state() {
368        let mut fsm = RunFsm::from_state(RunStatus::Running);
369        assert_eq!(fsm.state(), RunStatus::Running);
370        assert!(fsm.history().is_empty());
371
372        fsm.apply(RunEvent::AllStepsCompleted).unwrap();
373        assert_eq!(fsm.state(), RunStatus::Completed);
374    }
375
376    // ---- History ----
377
378    #[test]
379    fn history_records_transitions() {
380        let mut fsm = RunFsm::new();
381        fsm.apply(RunEvent::PickedUp).unwrap();
382        fsm.apply(RunEvent::StepFailedRetryable).unwrap();
383        fsm.apply(RunEvent::RetryStarted).unwrap();
384
385        let history = fsm.history();
386        assert_eq!(history.len(), 3);
387
388        assert_eq!(history[0].from, RunStatus::Pending);
389        assert_eq!(history[0].to, RunStatus::Running);
390        assert_eq!(history[0].event, RunEvent::PickedUp);
391
392        assert_eq!(history[1].from, RunStatus::Running);
393        assert_eq!(history[1].to, RunStatus::Retrying);
394        assert_eq!(history[1].event, RunEvent::StepFailedRetryable);
395
396        assert_eq!(history[2].from, RunStatus::Retrying);
397        assert_eq!(history[2].to, RunStatus::Running);
398        assert_eq!(history[2].event, RunEvent::RetryStarted);
399    }
400
401    // ---- Approval transitions ----
402
403    #[test]
404    fn running_to_awaiting_approval() {
405        let mut fsm = RunFsm::new();
406        fsm.apply(RunEvent::PickedUp).unwrap();
407        fsm.apply(RunEvent::ApprovalRequested).unwrap();
408        assert_eq!(fsm.state(), RunStatus::AwaitingApproval);
409        assert!(!fsm.is_terminal());
410    }
411
412    #[test]
413    fn awaiting_approval_approved_resumes_running() {
414        let mut fsm = RunFsm::new();
415        fsm.apply(RunEvent::PickedUp).unwrap();
416        fsm.apply(RunEvent::ApprovalRequested).unwrap();
417        fsm.apply(RunEvent::Approved).unwrap();
418        assert_eq!(fsm.state(), RunStatus::Running);
419    }
420
421    #[test]
422    fn awaiting_approval_rejected_fails() {
423        let mut fsm = RunFsm::new();
424        fsm.apply(RunEvent::PickedUp).unwrap();
425        fsm.apply(RunEvent::ApprovalRequested).unwrap();
426        fsm.apply(RunEvent::Rejected).unwrap();
427        assert_eq!(fsm.state(), RunStatus::Failed);
428        assert!(fsm.is_terminal());
429    }
430
431    #[test]
432    fn awaiting_approval_cancel() {
433        let mut fsm = RunFsm::new();
434        fsm.apply(RunEvent::PickedUp).unwrap();
435        fsm.apply(RunEvent::ApprovalRequested).unwrap();
436        fsm.apply(RunEvent::CancelRequested).unwrap();
437        assert_eq!(fsm.state(), RunStatus::Cancelled);
438        assert!(fsm.is_terminal());
439    }
440
441    #[test]
442    fn cannot_approve_from_pending() {
443        let mut fsm = RunFsm::new();
444        assert!(fsm.apply(RunEvent::Approved).is_err());
445    }
446
447    #[test]
448    fn approval_then_complete() {
449        let mut fsm = RunFsm::new();
450        fsm.apply(RunEvent::PickedUp).unwrap();
451        fsm.apply(RunEvent::ApprovalRequested).unwrap();
452        fsm.apply(RunEvent::Approved).unwrap();
453        fsm.apply(RunEvent::AllStepsCompleted).unwrap();
454        assert_eq!(fsm.state(), RunStatus::Completed);
455        assert_eq!(fsm.history().len(), 4);
456    }
457
458    // ---- TransitionError Display ----
459
460    #[test]
461    fn transition_error_display() {
462        let mut fsm = RunFsm::new();
463        let err = fsm.apply(RunEvent::AllStepsCompleted).unwrap_err();
464        let msg = err.to_string();
465        assert!(msg.contains("all_steps_completed"));
466        assert!(msg.contains("Pending"));
467    }
468}