Skip to main content

ironflow_engine/fsm/
run_fsm.rs

1//! [`RunFsm`] — Finite state machine for the run lifecycle.
2//!
3//! Events drive transitions; the FSM rejects invalid ones and keeps
4//! a full history of state changes.
5
6use chrono::Utc;
7use ironflow_store::entities::RunStatus;
8use serde::{Deserialize, Serialize};
9
10use super::{Transition, TransitionError};
11
12/// Events that drive [`RunFsm`] transitions.
13///
14/// Each event represents something that happened during execution.
15///
16/// # Examples
17///
18/// ```
19/// use ironflow_engine::fsm::RunEvent;
20///
21/// let event = RunEvent::PickedUp;
22/// assert_eq!(event.to_string(), "picked_up");
23/// ```
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "snake_case")]
26pub enum RunEvent {
27    /// Worker or inline executor picked up the run.
28    PickedUp,
29    /// All steps completed successfully.
30    AllStepsCompleted,
31    /// A step failed and the error is not retryable, or max retries exhausted.
32    StepFailed,
33    /// A step failed but a retry is possible.
34    StepFailedRetryable,
35    /// Retry attempt started.
36    RetryStarted,
37    /// Maximum retries exhausted after a retryable failure.
38    MaxRetriesExceeded,
39    /// User or API requested cancellation.
40    CancelRequested,
41}
42
43impl std::fmt::Display for RunEvent {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self {
46            RunEvent::PickedUp => f.write_str("picked_up"),
47            RunEvent::AllStepsCompleted => f.write_str("all_steps_completed"),
48            RunEvent::StepFailed => f.write_str("step_failed"),
49            RunEvent::StepFailedRetryable => f.write_str("step_failed_retryable"),
50            RunEvent::RetryStarted => f.write_str("retry_started"),
51            RunEvent::MaxRetriesExceeded => f.write_str("max_retries_exceeded"),
52            RunEvent::CancelRequested => f.write_str("cancel_requested"),
53        }
54    }
55}
56
57/// Finite state machine for a workflow run.
58///
59/// Wraps a [`RunStatus`] and enforces valid transitions via typed
60/// [`RunEvent`]s. Records every transition in a history log.
61///
62/// # Transition table
63///
64/// | From | Event | To |
65/// |------|-------|----|
66/// | Pending | PickedUp | Running |
67/// | Pending | CancelRequested | Cancelled |
68/// | Running | AllStepsCompleted | Completed |
69/// | Running | StepFailed | Failed |
70/// | Running | StepFailedRetryable | Retrying |
71/// | Running | CancelRequested | Cancelled |
72/// | Retrying | RetryStarted | Running |
73/// | Retrying | MaxRetriesExceeded | Failed |
74/// | Retrying | CancelRequested | Cancelled |
75///
76/// # Examples
77///
78/// ```
79/// use ironflow_engine::fsm::{RunFsm, RunEvent};
80/// use ironflow_store::entities::RunStatus;
81///
82/// let mut fsm = RunFsm::new();
83/// assert_eq!(fsm.state(), RunStatus::Pending);
84///
85/// fsm.apply(RunEvent::PickedUp).unwrap();
86/// assert_eq!(fsm.state(), RunStatus::Running);
87///
88/// fsm.apply(RunEvent::AllStepsCompleted).unwrap();
89/// assert_eq!(fsm.state(), RunStatus::Completed);
90/// assert_eq!(fsm.history().len(), 2);
91/// ```
92#[derive(Debug, Clone)]
93pub struct RunFsm {
94    state: RunStatus,
95    history: Vec<Transition<RunStatus, RunEvent>>,
96}
97
98impl RunFsm {
99    /// Create a new FSM in `Pending` state.
100    ///
101    /// # Examples
102    ///
103    /// ```
104    /// use ironflow_engine::fsm::RunFsm;
105    /// use ironflow_store::entities::RunStatus;
106    ///
107    /// let fsm = RunFsm::new();
108    /// assert_eq!(fsm.state(), RunStatus::Pending);
109    /// ```
110    pub fn new() -> Self {
111        Self {
112            state: RunStatus::Pending,
113            history: Vec::new(),
114        }
115    }
116
117    /// Create a FSM from an existing state (e.g. loaded from DB).
118    ///
119    /// # Examples
120    ///
121    /// ```
122    /// use ironflow_engine::fsm::RunFsm;
123    /// use ironflow_store::entities::RunStatus;
124    ///
125    /// let fsm = RunFsm::from_state(RunStatus::Running);
126    /// assert_eq!(fsm.state(), RunStatus::Running);
127    /// ```
128    pub fn from_state(state: RunStatus) -> Self {
129        Self {
130            state,
131            history: Vec::new(),
132        }
133    }
134
135    /// Returns the current state.
136    pub fn state(&self) -> RunStatus {
137        self.state
138    }
139
140    /// Returns the full transition history.
141    pub fn history(&self) -> &[Transition<RunStatus, RunEvent>] {
142        &self.history
143    }
144
145    /// Returns `true` if the FSM is in a terminal state.
146    pub fn is_terminal(&self) -> bool {
147        self.state.is_terminal()
148    }
149
150    /// Apply an event, transitioning to a new state if valid.
151    ///
152    /// # Errors
153    ///
154    /// Returns [`TransitionError`] if the event is not allowed in the current state.
155    ///
156    /// # Examples
157    ///
158    /// ```
159    /// use ironflow_engine::fsm::{RunFsm, RunEvent};
160    /// use ironflow_store::entities::RunStatus;
161    ///
162    /// let mut fsm = RunFsm::new();
163    ///
164    /// // Valid transition
165    /// assert!(fsm.apply(RunEvent::PickedUp).is_ok());
166    ///
167    /// // Invalid: can't pick up a running run
168    /// assert!(fsm.apply(RunEvent::PickedUp).is_err());
169    /// ```
170    pub fn apply(
171        &mut self,
172        event: RunEvent,
173    ) -> Result<RunStatus, TransitionError<RunStatus, RunEvent>> {
174        let next = next_state(self.state, event).ok_or(TransitionError {
175            from: self.state,
176            event,
177        })?;
178
179        let transition = Transition {
180            from: self.state,
181            to: next,
182            event,
183            at: Utc::now(),
184        };
185
186        self.history.push(transition);
187        self.state = next;
188        Ok(next)
189    }
190
191    /// Check if an event would be accepted without applying it.
192    ///
193    /// # Examples
194    ///
195    /// ```
196    /// use ironflow_engine::fsm::{RunFsm, RunEvent};
197    ///
198    /// let fsm = RunFsm::new();
199    /// assert!(fsm.can_apply(RunEvent::PickedUp));
200    /// assert!(!fsm.can_apply(RunEvent::AllStepsCompleted));
201    /// ```
202    pub fn can_apply(&self, event: RunEvent) -> bool {
203        next_state(self.state, event).is_some()
204    }
205}
206
207impl Default for RunFsm {
208    fn default() -> Self {
209        Self::new()
210    }
211}
212
213/// Pure transition function — returns the next state for a given (state, event)
214/// pair, or `None` if the transition is invalid.
215fn next_state(from: RunStatus, event: RunEvent) -> Option<RunStatus> {
216    match (from, event) {
217        // Pending
218        (RunStatus::Pending, RunEvent::PickedUp) => Some(RunStatus::Running),
219        (RunStatus::Pending, RunEvent::CancelRequested) => Some(RunStatus::Cancelled),
220
221        // Running
222        (RunStatus::Running, RunEvent::AllStepsCompleted) => Some(RunStatus::Completed),
223        (RunStatus::Running, RunEvent::StepFailed) => Some(RunStatus::Failed),
224        (RunStatus::Running, RunEvent::StepFailedRetryable) => Some(RunStatus::Retrying),
225        (RunStatus::Running, RunEvent::CancelRequested) => Some(RunStatus::Cancelled),
226
227        // Retrying
228        (RunStatus::Retrying, RunEvent::RetryStarted) => Some(RunStatus::Running),
229        (RunStatus::Retrying, RunEvent::MaxRetriesExceeded) => Some(RunStatus::Failed),
230        (RunStatus::Retrying, RunEvent::CancelRequested) => Some(RunStatus::Cancelled),
231
232        // Terminal states and all other combos → invalid
233        _ => None,
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240
241    // ---- Happy paths ----
242
243    #[test]
244    fn pending_to_running() {
245        let mut fsm = RunFsm::new();
246        let result = fsm.apply(RunEvent::PickedUp);
247        assert!(result.is_ok());
248        assert_eq!(fsm.state(), RunStatus::Running);
249    }
250
251    #[test]
252    fn full_success_path() {
253        let mut fsm = RunFsm::new();
254        fsm.apply(RunEvent::PickedUp).unwrap();
255        fsm.apply(RunEvent::AllStepsCompleted).unwrap();
256        assert_eq!(fsm.state(), RunStatus::Completed);
257        assert!(fsm.is_terminal());
258        assert_eq!(fsm.history().len(), 2);
259    }
260
261    #[test]
262    fn full_failure_path() {
263        let mut fsm = RunFsm::new();
264        fsm.apply(RunEvent::PickedUp).unwrap();
265        fsm.apply(RunEvent::StepFailed).unwrap();
266        assert_eq!(fsm.state(), RunStatus::Failed);
267        assert!(fsm.is_terminal());
268    }
269
270    #[test]
271    fn retry_then_success() {
272        let mut fsm = RunFsm::new();
273        fsm.apply(RunEvent::PickedUp).unwrap();
274        fsm.apply(RunEvent::StepFailedRetryable).unwrap();
275        assert_eq!(fsm.state(), RunStatus::Retrying);
276
277        fsm.apply(RunEvent::RetryStarted).unwrap();
278        assert_eq!(fsm.state(), RunStatus::Running);
279
280        fsm.apply(RunEvent::AllStepsCompleted).unwrap();
281        assert_eq!(fsm.state(), RunStatus::Completed);
282        assert_eq!(fsm.history().len(), 4);
283    }
284
285    #[test]
286    fn retry_then_max_retries_exceeded() {
287        let mut fsm = RunFsm::new();
288        fsm.apply(RunEvent::PickedUp).unwrap();
289        fsm.apply(RunEvent::StepFailedRetryable).unwrap();
290        fsm.apply(RunEvent::MaxRetriesExceeded).unwrap();
291        assert_eq!(fsm.state(), RunStatus::Failed);
292    }
293
294    #[test]
295    fn cancel_from_pending() {
296        let mut fsm = RunFsm::new();
297        fsm.apply(RunEvent::CancelRequested).unwrap();
298        assert_eq!(fsm.state(), RunStatus::Cancelled);
299        assert!(fsm.is_terminal());
300    }
301
302    #[test]
303    fn cancel_from_running() {
304        let mut fsm = RunFsm::new();
305        fsm.apply(RunEvent::PickedUp).unwrap();
306        fsm.apply(RunEvent::CancelRequested).unwrap();
307        assert_eq!(fsm.state(), RunStatus::Cancelled);
308    }
309
310    #[test]
311    fn cancel_from_retrying() {
312        let mut fsm = RunFsm::new();
313        fsm.apply(RunEvent::PickedUp).unwrap();
314        fsm.apply(RunEvent::StepFailedRetryable).unwrap();
315        fsm.apply(RunEvent::CancelRequested).unwrap();
316        assert_eq!(fsm.state(), RunStatus::Cancelled);
317    }
318
319    // ---- Invalid transitions ----
320
321    #[test]
322    fn cannot_complete_from_pending() {
323        let mut fsm = RunFsm::new();
324        let result = fsm.apply(RunEvent::AllStepsCompleted);
325        assert!(result.is_err());
326        assert_eq!(fsm.state(), RunStatus::Pending);
327    }
328
329    #[test]
330    fn cannot_pick_up_running() {
331        let mut fsm = RunFsm::new();
332        fsm.apply(RunEvent::PickedUp).unwrap();
333        let result = fsm.apply(RunEvent::PickedUp);
334        assert!(result.is_err());
335    }
336
337    #[test]
338    fn cannot_transition_from_terminal() {
339        let mut fsm = RunFsm::new();
340        fsm.apply(RunEvent::PickedUp).unwrap();
341        fsm.apply(RunEvent::AllStepsCompleted).unwrap();
342
343        assert!(fsm.apply(RunEvent::PickedUp).is_err());
344        assert!(fsm.apply(RunEvent::CancelRequested).is_err());
345        assert!(fsm.apply(RunEvent::StepFailed).is_err());
346    }
347
348    // ---- can_apply ----
349
350    #[test]
351    fn can_apply_checks_without_mutation() {
352        let fsm = RunFsm::new();
353        assert!(fsm.can_apply(RunEvent::PickedUp));
354        assert!(fsm.can_apply(RunEvent::CancelRequested));
355        assert!(!fsm.can_apply(RunEvent::AllStepsCompleted));
356        assert!(!fsm.can_apply(RunEvent::StepFailed));
357        assert_eq!(fsm.state(), RunStatus::Pending);
358    }
359
360    // ---- from_state ----
361
362    #[test]
363    fn from_state_resumes_at_given_state() {
364        let mut fsm = RunFsm::from_state(RunStatus::Running);
365        assert_eq!(fsm.state(), RunStatus::Running);
366        assert!(fsm.history().is_empty());
367
368        fsm.apply(RunEvent::AllStepsCompleted).unwrap();
369        assert_eq!(fsm.state(), RunStatus::Completed);
370    }
371
372    // ---- History ----
373
374    #[test]
375    fn history_records_transitions() {
376        let mut fsm = RunFsm::new();
377        fsm.apply(RunEvent::PickedUp).unwrap();
378        fsm.apply(RunEvent::StepFailedRetryable).unwrap();
379        fsm.apply(RunEvent::RetryStarted).unwrap();
380
381        let history = fsm.history();
382        assert_eq!(history.len(), 3);
383
384        assert_eq!(history[0].from, RunStatus::Pending);
385        assert_eq!(history[0].to, RunStatus::Running);
386        assert_eq!(history[0].event, RunEvent::PickedUp);
387
388        assert_eq!(history[1].from, RunStatus::Running);
389        assert_eq!(history[1].to, RunStatus::Retrying);
390        assert_eq!(history[1].event, RunEvent::StepFailedRetryable);
391
392        assert_eq!(history[2].from, RunStatus::Retrying);
393        assert_eq!(history[2].to, RunStatus::Running);
394        assert_eq!(history[2].event, RunEvent::RetryStarted);
395    }
396
397    // ---- TransitionError Display ----
398
399    #[test]
400    fn transition_error_display() {
401        let mut fsm = RunFsm::new();
402        let err = fsm.apply(RunEvent::AllStepsCompleted).unwrap_err();
403        let msg = err.to_string();
404        assert!(msg.contains("all_steps_completed"));
405        assert!(msg.contains("Pending"));
406    }
407}