Skip to main content

stormchaser_engine/
step_machine.rs

1use anyhow::Result;
2use chrono::Utc;
3use stormchaser_model::step::{StepInstance, StepStatus};
4
5/// State markers for the typestate pattern
6#[allow(dead_code)]
7pub mod state {
8    /// State representing a step that is waiting to be executed.
9    pub struct Pending;
10    /// State representing a step that is unpacking its state from the Stormchaser File System.
11    pub struct UnpackingSfs;
12    /// State representing a step that is currently executing.
13    pub struct Running;
14    /// State representing a step that is packing its state into the Stormchaser File System.
15    pub struct PackingSfs;
16    /// State representing a step that is waiting for an external event (e.g., Human-In-The-Loop approval).
17    pub struct WaitingForEvent;
18    /// State representing a step that has successfully completed.
19    pub struct Succeeded;
20    /// State representing a step that has failed.
21    pub struct Failed;
22    /// State representing a step that has been skipped (e.g., due to unmet conditions).
23    pub struct Skipped;
24    /// State representing a step that failed but the failure was configured to be ignored.
25    pub struct FailedIgnored;
26    /// State representing a step that has been aborted.
27    pub struct Aborted;
28}
29
30/// A state machine for managing the lifecycle of a `StepInstance`.
31pub struct StepMachine<S> {
32    /// The underlying step instance data model.
33    pub instance: StepInstance,
34    _state: std::marker::PhantomData<S>,
35}
36
37impl<S> StepMachine<S> {
38    /// From instance.
39    pub fn from_instance(instance: StepInstance) -> Self {
40        Self {
41            instance,
42            _state: std::marker::PhantomData,
43        }
44    }
45}
46
47#[allow(dead_code)]
48impl StepMachine<state::Pending> {
49    /// New.
50    pub fn new(instance: StepInstance) -> Self {
51        // Ensure the initial status is correct
52        let mut instance = instance;
53        instance.status = StepStatus::Pending;
54
55        StepMachine {
56            instance,
57            _state: std::marker::PhantomData,
58        }
59    }
60
61    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
62    /// Start.
63    pub async fn start(
64        mut self,
65        runner_id: String,
66        executor: &mut sqlx::PgConnection,
67    ) -> Result<StepMachine<state::Running>> {
68        self.instance.status = StepStatus::Running;
69        self.instance.started_at = Some(Utc::now());
70        self.instance.runner_id = Some(runner_id);
71
72        crate::persistence::persist_step_instance(&self.instance, executor).await?;
73
74        Ok(StepMachine {
75            instance: self.instance,
76            _state: std::marker::PhantomData,
77        })
78    }
79
80    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
81    /// Start unpacking.
82    pub async fn start_unpacking(
83        mut self,
84        runner_id: String,
85        executor: &mut sqlx::PgConnection,
86    ) -> Result<StepMachine<state::UnpackingSfs>> {
87        self.instance.status = StepStatus::UnpackingSfs;
88        self.instance.started_at = Some(Utc::now());
89        self.instance.runner_id = Some(runner_id);
90
91        crate::persistence::persist_step_instance(&self.instance, executor).await?;
92
93        Ok(StepMachine {
94            instance: self.instance,
95            _state: std::marker::PhantomData,
96        })
97    }
98
99    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
100    /// Fail.
101    pub async fn fail(
102        mut self,
103        error: String,
104        exit_code: Option<i32>,
105        executor: &mut sqlx::PgConnection,
106    ) -> Result<StepMachine<state::Failed>> {
107        self.instance.status = StepStatus::Failed;
108        self.instance.finished_at = Some(Utc::now());
109        self.instance.error = Some(error);
110        self.instance.exit_code = exit_code;
111
112        crate::persistence::persist_step_instance(&self.instance, executor).await?;
113
114        Ok(StepMachine {
115            instance: self.instance,
116            _state: std::marker::PhantomData,
117        })
118    }
119}
120
121#[allow(dead_code)]
122impl StepMachine<state::UnpackingSfs> {
123    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
124    /// Start running.
125    pub async fn start_running(
126        mut self,
127        executor: &mut sqlx::PgConnection,
128    ) -> Result<StepMachine<state::Running>> {
129        self.instance.status = StepStatus::Running;
130
131        crate::persistence::persist_step_instance(&self.instance, executor).await?;
132
133        Ok(StepMachine {
134            instance: self.instance,
135            _state: std::marker::PhantomData,
136        })
137    }
138
139    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
140    /// Fail.
141    pub async fn fail(
142        mut self,
143        error: String,
144        exit_code: Option<i32>,
145        executor: &mut sqlx::PgConnection,
146    ) -> Result<StepMachine<state::Failed>> {
147        self.instance.status = StepStatus::Failed;
148        self.instance.finished_at = Some(Utc::now());
149        self.instance.error = Some(error);
150        self.instance.exit_code = exit_code;
151
152        crate::persistence::persist_step_instance(&self.instance, executor).await?;
153
154        Ok(StepMachine {
155            instance: self.instance,
156            _state: std::marker::PhantomData,
157        })
158    }
159
160    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
161    /// Skip.
162    pub async fn skip(
163        mut self,
164        executor: &mut sqlx::PgConnection,
165    ) -> Result<StepMachine<state::Skipped>> {
166        self.instance.status = StepStatus::Skipped;
167        self.instance.finished_at = Some(Utc::now());
168
169        crate::persistence::persist_step_instance(&self.instance, executor).await?;
170
171        Ok(StepMachine {
172            instance: self.instance,
173            _state: std::marker::PhantomData,
174        })
175    }
176
177    /// Into instance.
178    pub fn into_instance(self) -> StepInstance {
179        self.instance
180    }
181}
182
183#[allow(dead_code)]
184impl StepMachine<state::Running> {
185    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
186    /// Start packing.
187    pub async fn start_packing(
188        mut self,
189        executor: &mut sqlx::PgConnection,
190    ) -> Result<StepMachine<state::PackingSfs>> {
191        self.instance.status = StepStatus::PackingSfs;
192
193        crate::persistence::persist_step_instance(&self.instance, executor).await?;
194
195        Ok(StepMachine {
196            instance: self.instance,
197            _state: std::marker::PhantomData,
198        })
199    }
200
201    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
202    /// Succeed.
203    pub async fn succeed(
204        mut self,
205        executor: &mut sqlx::PgConnection,
206    ) -> Result<StepMachine<state::Succeeded>> {
207        self.instance.status = StepStatus::Succeeded;
208        self.instance.finished_at = Some(Utc::now());
209        self.instance.exit_code = Some(0);
210
211        crate::persistence::persist_step_instance(&self.instance, executor).await?;
212
213        Ok(StepMachine {
214            instance: self.instance,
215            _state: std::marker::PhantomData,
216        })
217    }
218
219    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
220    /// Fail.
221    pub async fn fail(
222        mut self,
223        error: String,
224        exit_code: Option<i32>,
225        executor: &mut sqlx::PgConnection,
226    ) -> Result<StepMachine<state::Failed>> {
227        self.instance.status = StepStatus::Failed;
228        self.instance.finished_at = Some(Utc::now());
229        self.instance.error = Some(error);
230        self.instance.exit_code = exit_code;
231
232        crate::persistence::persist_step_instance(&self.instance, executor).await?;
233
234        Ok(StepMachine {
235            instance: self.instance,
236            _state: std::marker::PhantomData,
237        })
238    }
239
240    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
241    /// Wait for event.
242    pub async fn wait_for_event(
243        mut self,
244        executor: &mut sqlx::PgConnection,
245    ) -> Result<StepMachine<state::WaitingForEvent>> {
246        self.instance.status = StepStatus::WaitingForEvent;
247
248        crate::persistence::persist_step_instance(&self.instance, executor).await?;
249
250        Ok(StepMachine {
251            instance: self.instance,
252            _state: std::marker::PhantomData,
253        })
254    }
255
256    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
257    /// Abort.
258    pub async fn abort(
259        mut self,
260        executor: &mut sqlx::PgConnection,
261    ) -> Result<StepMachine<state::Aborted>> {
262        self.instance.status = StepStatus::Aborted;
263        self.instance.finished_at = Some(Utc::now());
264
265        crate::persistence::persist_step_instance(&self.instance, executor).await?;
266
267        Ok(StepMachine {
268            instance: self.instance,
269            _state: std::marker::PhantomData,
270        })
271    }
272
273    /// Into instance.
274    pub fn into_instance(self) -> StepInstance {
275        self.instance
276    }
277}
278
279#[allow(dead_code)]
280impl StepMachine<state::PackingSfs> {
281    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
282    /// Succeed.
283    pub async fn succeed(
284        mut self,
285        executor: &mut sqlx::PgConnection,
286    ) -> Result<StepMachine<state::Succeeded>> {
287        self.instance.status = StepStatus::Succeeded;
288        self.instance.finished_at = Some(Utc::now());
289        self.instance.exit_code = Some(0);
290
291        crate::persistence::persist_step_instance(&self.instance, executor).await?;
292
293        Ok(StepMachine {
294            instance: self.instance,
295            _state: std::marker::PhantomData,
296        })
297    }
298
299    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
300    /// Fail.
301    pub async fn fail(
302        mut self,
303        error: String,
304        exit_code: Option<i32>,
305        executor: &mut sqlx::PgConnection,
306    ) -> Result<StepMachine<state::Failed>> {
307        self.instance.status = StepStatus::Failed;
308        self.instance.finished_at = Some(Utc::now());
309        self.instance.error = Some(error);
310        self.instance.exit_code = exit_code;
311
312        crate::persistence::persist_step_instance(&self.instance, executor).await?;
313
314        Ok(StepMachine {
315            instance: self.instance,
316            _state: std::marker::PhantomData,
317        })
318    }
319
320    /// Into instance.
321    pub fn into_instance(self) -> StepInstance {
322        self.instance
323    }
324}
325
326#[allow(dead_code)]
327impl StepMachine<state::WaitingForEvent> {
328    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
329    /// Resume.
330    pub async fn resume(
331        mut self,
332        executor: &mut sqlx::PgConnection,
333    ) -> Result<StepMachine<state::Running>> {
334        self.instance.status = StepStatus::Running;
335
336        crate::persistence::persist_step_instance(&self.instance, executor).await?;
337
338        Ok(StepMachine {
339            instance: self.instance,
340            _state: std::marker::PhantomData,
341        })
342    }
343
344    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
345    /// Reschedule.
346    pub async fn reschedule(
347        mut self,
348        executor: &mut sqlx::PgConnection,
349    ) -> Result<StepMachine<state::Pending>> {
350        self.instance.status = StepStatus::Pending;
351
352        crate::persistence::persist_step_instance(&self.instance, executor).await?;
353
354        Ok(StepMachine {
355            instance: self.instance,
356            _state: std::marker::PhantomData,
357        })
358    }
359
360    /// Into instance.
361    pub fn into_instance(self) -> StepInstance {
362        self.instance
363    }
364
365    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
366    /// Fail.
367    pub async fn fail(
368        mut self,
369        error: String,
370        exit_code: Option<i32>,
371        executor: &mut sqlx::PgConnection,
372    ) -> Result<StepMachine<state::Failed>> {
373        self.instance.status = StepStatus::Failed;
374        self.instance.finished_at = Some(Utc::now());
375        self.instance.error = Some(error);
376        self.instance.exit_code = exit_code;
377
378        crate::persistence::persist_step_instance(&self.instance, executor).await?;
379
380        Ok(StepMachine {
381            instance: self.instance,
382            _state: std::marker::PhantomData,
383        })
384    }
385}
386
387#[allow(dead_code)]
388impl StepMachine<state::Failed> {
389    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
390    /// Ignore failure.
391    pub async fn ignore_failure(
392        mut self,
393        executor: &mut sqlx::PgConnection,
394    ) -> Result<StepMachine<state::FailedIgnored>> {
395        self.instance.status = StepStatus::FailedIgnored;
396
397        crate::persistence::persist_step_instance(&self.instance, executor).await?;
398
399        Ok(StepMachine {
400            instance: self.instance,
401            _state: std::marker::PhantomData,
402        })
403    }
404
405    /// Into instance.
406    pub fn into_instance(self) -> StepInstance {
407        self.instance
408    }
409}
410
411#[allow(dead_code)]
412impl StepMachine<state::Succeeded> {
413    /// Into instance.
414    pub fn into_instance(self) -> StepInstance {
415        self.instance
416    }
417}
418
419#[allow(dead_code)]
420impl StepMachine<state::Skipped> {
421    /// Into instance.
422    pub fn into_instance(self) -> StepInstance {
423        self.instance
424    }
425}
426
427#[allow(dead_code)]
428impl StepMachine<state::FailedIgnored> {
429    /// Into instance.
430    pub fn into_instance(self) -> StepInstance {
431        self.instance
432    }
433}
434
435#[allow(dead_code)]
436impl StepMachine<state::Aborted> {
437    /// Into instance.
438    pub fn into_instance(self) -> StepInstance {
439        self.instance
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use stormchaser_model::{RunId, StepInstanceId};
447    use uuid::Uuid;
448
449    fn dummy_instance() -> StepInstance {
450        StepInstance {
451            id: StepInstanceId::new(Uuid::new_v4()),
452            run_id: RunId::new(Uuid::new_v4()),
453            step_name: "test".to_string(),
454            step_type: "docker".to_string(),
455            status: StepStatus::Running, // Start with something else
456            iteration_index: None,
457            runner_id: None,
458            affinity_context: None,
459            started_at: None,
460            finished_at: None,
461            exit_code: None,
462            error: None,
463            spec: serde_json::json!({}),
464            params: serde_json::json!({}),
465            created_at: Utc::now(),
466        }
467    }
468
469    #[test]
470    fn test_step_machine_new() {
471        let instance = dummy_instance();
472        let machine: StepMachine<state::Pending> = StepMachine::new(instance.clone());
473        assert_eq!(machine.instance.status, StepStatus::Pending);
474        assert_eq!(machine.instance.id, instance.id);
475    }
476
477    #[test]
478    fn test_step_machine_from_instance() {
479        let instance = dummy_instance();
480        let machine: StepMachine<state::Running> = StepMachine::from_instance(instance.clone());
481        assert_eq!(machine.instance.status, StepStatus::Running);
482    }
483
484    #[test]
485    fn test_step_machine_into_instance() {
486        let instance = dummy_instance();
487        let machine: StepMachine<state::Running> = StepMachine::from_instance(instance.clone());
488        let extracted = machine.into_instance();
489        assert_eq!(extracted.id, instance.id);
490        assert_eq!(extracted.status, StepStatus::Running);
491    }
492}