Skip to main content

stormchaser_engine/
step_machine.rs

1use anyhow::Result;
2use chrono::Utc;
3use stormchaser_model::step::{StepInstance, StepStatus};
4
5/// State markers for the typestate pattern
6#[allow(dead_code)]
7pub mod state {
8    /// State representing a step that is waiting to be executed.
9    pub struct Pending;
10    /// State representing a step that is unpacking its state from the Stormchaser File System.
11    pub struct UnpackingSfs;
12    /// State representing a step that is currently executing.
13    pub struct Running;
14    /// State representing a step that is packing its state into the Stormchaser File System.
15    pub struct PackingSfs;
16    /// State representing a step that is waiting for an external event (e.g., Human-In-The-Loop approval).
17    pub struct WaitingForEvent;
18    /// State representing a step that has successfully completed.
19    pub struct Succeeded;
20    /// State representing a step that has failed.
21    pub struct Failed;
22    /// State representing a step that has been skipped (e.g., due to unmet conditions).
23    pub struct Skipped;
24    /// State representing a step that failed but the failure was configured to be ignored.
25    pub struct FailedIgnored;
26    /// State representing a step that has been aborted.
27    pub struct Aborted;
28}
29
30/// A state machine for managing the lifecycle of a `StepInstance`.
31pub struct StepMachine<S> {
32    /// The underlying step instance data model.
33    pub instance: StepInstance,
34    _state: std::marker::PhantomData<S>,
35}
36
37impl<S> StepMachine<S> {
38    /// From instance.
39    pub fn from_instance(instance: StepInstance) -> Self {
40        Self {
41            instance,
42            _state: std::marker::PhantomData,
43        }
44    }
45}
46
47#[allow(dead_code)]
48impl StepMachine<state::Pending> {
49    /// New.
50    pub fn new(instance: StepInstance) -> Self {
51        // Ensure the initial status is correct
52        let mut instance = instance;
53        instance.status = StepStatus::Pending;
54
55        StepMachine {
56            instance,
57            _state: std::marker::PhantomData,
58        }
59    }
60
61    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
62    /// Start.
63    pub async fn start(
64        mut self,
65        runner_id: String,
66        executor: &mut sqlx::PgConnection,
67    ) -> Result<StepMachine<state::Running>> {
68        self.instance.status = StepStatus::Running;
69        self.instance.started_at = Some(Utc::now());
70        self.instance.runner_id = Some(runner_id);
71
72        crate::persistence::persist_step_instance(&self.instance, executor).await?;
73
74        Ok(StepMachine {
75            instance: self.instance,
76            _state: std::marker::PhantomData,
77        })
78    }
79
80    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
81    /// Start unpacking.
82    pub async fn start_unpacking(
83        mut self,
84        runner_id: String,
85        executor: &mut sqlx::PgConnection,
86    ) -> Result<StepMachine<state::UnpackingSfs>> {
87        self.instance.status = StepStatus::UnpackingSfs;
88        self.instance.started_at = Some(Utc::now());
89        self.instance.runner_id = Some(runner_id);
90
91        crate::persistence::persist_step_instance(&self.instance, executor).await?;
92
93        Ok(StepMachine {
94            instance: self.instance,
95            _state: std::marker::PhantomData,
96        })
97    }
98}
99
100#[allow(dead_code)]
101impl StepMachine<state::UnpackingSfs> {
102    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
103    /// Start running.
104    pub async fn start_running(
105        mut self,
106        executor: &mut sqlx::PgConnection,
107    ) -> Result<StepMachine<state::Running>> {
108        self.instance.status = StepStatus::Running;
109
110        crate::persistence::persist_step_instance(&self.instance, executor).await?;
111
112        Ok(StepMachine {
113            instance: self.instance,
114            _state: std::marker::PhantomData,
115        })
116    }
117
118    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
119    /// Fail.
120    pub async fn fail(
121        mut self,
122        error: String,
123        exit_code: Option<i32>,
124        executor: &mut sqlx::PgConnection,
125    ) -> Result<StepMachine<state::Failed>> {
126        self.instance.status = StepStatus::Failed;
127        self.instance.finished_at = Some(Utc::now());
128        self.instance.error = Some(error);
129        self.instance.exit_code = exit_code;
130
131        crate::persistence::persist_step_instance(&self.instance, executor).await?;
132
133        Ok(StepMachine {
134            instance: self.instance,
135            _state: std::marker::PhantomData,
136        })
137    }
138
139    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
140    /// Skip.
141    pub async fn skip(
142        mut self,
143        executor: &mut sqlx::PgConnection,
144    ) -> Result<StepMachine<state::Skipped>> {
145        self.instance.status = StepStatus::Skipped;
146        self.instance.finished_at = Some(Utc::now());
147
148        crate::persistence::persist_step_instance(&self.instance, executor).await?;
149
150        Ok(StepMachine {
151            instance: self.instance,
152            _state: std::marker::PhantomData,
153        })
154    }
155
156    /// Into instance.
157    pub fn into_instance(self) -> StepInstance {
158        self.instance
159    }
160}
161
162#[allow(dead_code)]
163impl StepMachine<state::Running> {
164    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
165    /// Start packing.
166    pub async fn start_packing(
167        mut self,
168        executor: &mut sqlx::PgConnection,
169    ) -> Result<StepMachine<state::PackingSfs>> {
170        self.instance.status = StepStatus::PackingSfs;
171
172        crate::persistence::persist_step_instance(&self.instance, executor).await?;
173
174        Ok(StepMachine {
175            instance: self.instance,
176            _state: std::marker::PhantomData,
177        })
178    }
179
180    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
181    /// Succeed.
182    pub async fn succeed(
183        mut self,
184        executor: &mut sqlx::PgConnection,
185    ) -> Result<StepMachine<state::Succeeded>> {
186        self.instance.status = StepStatus::Succeeded;
187        self.instance.finished_at = Some(Utc::now());
188        self.instance.exit_code = Some(0);
189
190        crate::persistence::persist_step_instance(&self.instance, executor).await?;
191
192        Ok(StepMachine {
193            instance: self.instance,
194            _state: std::marker::PhantomData,
195        })
196    }
197
198    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
199    /// Fail.
200    pub async fn fail(
201        mut self,
202        error: String,
203        exit_code: Option<i32>,
204        executor: &mut sqlx::PgConnection,
205    ) -> Result<StepMachine<state::Failed>> {
206        self.instance.status = StepStatus::Failed;
207        self.instance.finished_at = Some(Utc::now());
208        self.instance.error = Some(error);
209        self.instance.exit_code = exit_code;
210
211        crate::persistence::persist_step_instance(&self.instance, executor).await?;
212
213        Ok(StepMachine {
214            instance: self.instance,
215            _state: std::marker::PhantomData,
216        })
217    }
218
219    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
220    /// Wait for event.
221    pub async fn wait_for_event(
222        mut self,
223        executor: &mut sqlx::PgConnection,
224    ) -> Result<StepMachine<state::WaitingForEvent>> {
225        self.instance.status = StepStatus::WaitingForEvent;
226
227        crate::persistence::persist_step_instance(&self.instance, executor).await?;
228
229        Ok(StepMachine {
230            instance: self.instance,
231            _state: std::marker::PhantomData,
232        })
233    }
234
235    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
236    /// Abort.
237    pub async fn abort(
238        mut self,
239        executor: &mut sqlx::PgConnection,
240    ) -> Result<StepMachine<state::Aborted>> {
241        self.instance.status = StepStatus::Aborted;
242        self.instance.finished_at = Some(Utc::now());
243
244        crate::persistence::persist_step_instance(&self.instance, executor).await?;
245
246        Ok(StepMachine {
247            instance: self.instance,
248            _state: std::marker::PhantomData,
249        })
250    }
251
252    /// Into instance.
253    pub fn into_instance(self) -> StepInstance {
254        self.instance
255    }
256}
257
258#[allow(dead_code)]
259impl StepMachine<state::PackingSfs> {
260    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
261    /// Succeed.
262    pub async fn succeed(
263        mut self,
264        executor: &mut sqlx::PgConnection,
265    ) -> Result<StepMachine<state::Succeeded>> {
266        self.instance.status = StepStatus::Succeeded;
267        self.instance.finished_at = Some(Utc::now());
268        self.instance.exit_code = Some(0);
269
270        crate::persistence::persist_step_instance(&self.instance, executor).await?;
271
272        Ok(StepMachine {
273            instance: self.instance,
274            _state: std::marker::PhantomData,
275        })
276    }
277
278    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
279    /// Fail.
280    pub async fn fail(
281        mut self,
282        error: String,
283        exit_code: Option<i32>,
284        executor: &mut sqlx::PgConnection,
285    ) -> Result<StepMachine<state::Failed>> {
286        self.instance.status = StepStatus::Failed;
287        self.instance.finished_at = Some(Utc::now());
288        self.instance.error = Some(error);
289        self.instance.exit_code = exit_code;
290
291        crate::persistence::persist_step_instance(&self.instance, executor).await?;
292
293        Ok(StepMachine {
294            instance: self.instance,
295            _state: std::marker::PhantomData,
296        })
297    }
298
299    /// Into instance.
300    pub fn into_instance(self) -> StepInstance {
301        self.instance
302    }
303}
304
305#[allow(dead_code)]
306impl StepMachine<state::WaitingForEvent> {
307    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
308    /// Resume.
309    pub async fn resume(
310        mut self,
311        executor: &mut sqlx::PgConnection,
312    ) -> Result<StepMachine<state::Running>> {
313        self.instance.status = StepStatus::Running;
314
315        crate::persistence::persist_step_instance(&self.instance, executor).await?;
316
317        Ok(StepMachine {
318            instance: self.instance,
319            _state: std::marker::PhantomData,
320        })
321    }
322
323    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
324    /// Reschedule.
325    pub async fn reschedule(
326        mut self,
327        executor: &mut sqlx::PgConnection,
328    ) -> Result<StepMachine<state::Pending>> {
329        self.instance.status = StepStatus::Pending;
330
331        crate::persistence::persist_step_instance(&self.instance, executor).await?;
332
333        Ok(StepMachine {
334            instance: self.instance,
335            _state: std::marker::PhantomData,
336        })
337    }
338
339    /// Into instance.
340    pub fn into_instance(self) -> StepInstance {
341        self.instance
342    }
343}
344
345#[allow(dead_code)]
346impl StepMachine<state::Failed> {
347    #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
348    /// Ignore failure.
349    pub async fn ignore_failure(
350        mut self,
351        executor: &mut sqlx::PgConnection,
352    ) -> Result<StepMachine<state::FailedIgnored>> {
353        self.instance.status = StepStatus::FailedIgnored;
354
355        crate::persistence::persist_step_instance(&self.instance, executor).await?;
356
357        Ok(StepMachine {
358            instance: self.instance,
359            _state: std::marker::PhantomData,
360        })
361    }
362
363    /// Into instance.
364    pub fn into_instance(self) -> StepInstance {
365        self.instance
366    }
367}
368
369#[allow(dead_code)]
370impl StepMachine<state::Succeeded> {
371    /// Into instance.
372    pub fn into_instance(self) -> StepInstance {
373        self.instance
374    }
375}
376
377#[allow(dead_code)]
378impl StepMachine<state::Skipped> {
379    /// Into instance.
380    pub fn into_instance(self) -> StepInstance {
381        self.instance
382    }
383}
384
385#[allow(dead_code)]
386impl StepMachine<state::FailedIgnored> {
387    /// Into instance.
388    pub fn into_instance(self) -> StepInstance {
389        self.instance
390    }
391}
392
393#[allow(dead_code)]
394impl StepMachine<state::Aborted> {
395    /// Into instance.
396    pub fn into_instance(self) -> StepInstance {
397        self.instance
398    }
399}