Skip to main content

stormchaser_engine/
workflow_machine.rs

1use anyhow::Result;
2use chrono::Utc;
3use std::marker::PhantomData;
4use stormchaser_model::workflow::{RunStatus, WorkflowRun};
5
6/// State markers for the workflow typestate pattern
7#[allow(dead_code)]
8pub mod state {
9    /// State representing a workflow run that is queued and waiting for resolution.
10    pub struct Queued;
11    /// State representing a workflow run that is resolving its source repository and dependencies.
12    pub struct Resolving;
13    /// State representing a workflow run that has been resolved and is pending start.
14    pub struct StartPending;
15    /// State representing a workflow run that is currently executing.
16    pub struct Running;
17    /// State representing a workflow run that has successfully completed.
18    pub struct Succeeded;
19    /// State representing a workflow run that has failed.
20    pub struct Failed;
21    /// State representing a workflow run that has been aborted.
22    pub struct Aborted;
23}
24
25/// A state machine for managing the lifecycle of a `WorkflowRun`.
26#[allow(dead_code)]
27pub struct WorkflowMachine<S> {
28    /// The underlying workflow run data model.
29    pub run: WorkflowRun,
30    _state: PhantomData<S>,
31}
32
33#[allow(dead_code)]
34impl<S> WorkflowMachine<S> {
35    /// New from run.
36    pub fn new_from_run(run: WorkflowRun) -> Self {
37        Self {
38            run,
39            _state: PhantomData,
40        }
41    }
42
43    /// Into run.
44    pub fn into_run(self) -> WorkflowRun {
45        self.run
46    }
47}
48
49#[allow(dead_code)]
50impl WorkflowMachine<state::Queued> {
51    /// New.
52    pub fn new(run: WorkflowRun) -> Self {
53        let mut run = run;
54        run.status = RunStatus::Queued;
55
56        Self {
57            run,
58            _state: PhantomData,
59        }
60    }
61
62    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
63    /// Start resolving.
64    pub async fn start_resolving(
65        mut self,
66        executor: &mut sqlx::PgConnection,
67    ) -> Result<WorkflowMachine<state::Resolving>> {
68        self.run.status = RunStatus::Resolving;
69        self.run.started_resolving_at = Some(Utc::now());
70        self.run.updated_at = Utc::now();
71
72        crate::persistence::persist_run(&mut self.run, executor).await?;
73
74        Ok(WorkflowMachine {
75            run: self.run,
76            _state: PhantomData,
77        })
78    }
79
80    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
81    /// Abort.
82    pub async fn abort(
83        mut self,
84        executor: &mut sqlx::PgConnection,
85    ) -> Result<WorkflowMachine<state::Aborted>> {
86        self.run.status = RunStatus::Aborted;
87        self.run.finished_at = Some(Utc::now());
88        self.run.updated_at = Utc::now();
89
90        crate::persistence::persist_run(&mut self.run, executor).await?;
91
92        Ok(WorkflowMachine {
93            run: self.run,
94            _state: PhantomData,
95        })
96    }
97}
98
99#[allow(dead_code)]
100impl WorkflowMachine<state::Resolving> {
101    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
102    /// Start pending.
103    pub async fn start_pending(
104        mut self,
105        executor: &mut sqlx::PgConnection,
106    ) -> Result<WorkflowMachine<state::StartPending>> {
107        self.run.status = RunStatus::StartPending;
108        self.run.updated_at = Utc::now();
109
110        crate::persistence::persist_run(&mut self.run, executor).await?;
111
112        Ok(WorkflowMachine {
113            run: self.run,
114            _state: PhantomData,
115        })
116    }
117
118    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
119    /// Fail.
120    pub async fn fail(
121        mut self,
122        error: String,
123        executor: &mut sqlx::PgConnection,
124    ) -> Result<WorkflowMachine<state::Failed>> {
125        self.run.status = RunStatus::Failed;
126        self.run.finished_at = Some(Utc::now());
127        self.run.updated_at = Utc::now();
128        self.run.error = Some(error);
129
130        crate::persistence::persist_run(&mut self.run, executor).await?;
131
132        Ok(WorkflowMachine {
133            run: self.run,
134            _state: PhantomData,
135        })
136    }
137
138    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
139    /// Abort.
140    pub async fn abort(
141        mut self,
142        executor: &mut sqlx::PgConnection,
143    ) -> Result<WorkflowMachine<state::Aborted>> {
144        self.run.status = RunStatus::Aborted;
145        self.run.finished_at = Some(Utc::now());
146        self.run.updated_at = Utc::now();
147
148        crate::persistence::persist_run(&mut self.run, executor).await?;
149
150        Ok(WorkflowMachine {
151            run: self.run,
152            _state: PhantomData,
153        })
154    }
155}
156
157#[allow(dead_code)]
158impl WorkflowMachine<state::StartPending> {
159    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
160    /// Start.
161    pub async fn start(
162        mut self,
163        executor: &mut sqlx::PgConnection,
164    ) -> Result<WorkflowMachine<state::Running>> {
165        self.run.status = RunStatus::Running;
166        self.run.started_at = Some(Utc::now());
167        self.run.updated_at = Utc::now();
168
169        crate::persistence::persist_run(&mut self.run, executor).await?;
170
171        Ok(WorkflowMachine {
172            run: self.run,
173            _state: PhantomData,
174        })
175    }
176
177    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
178    /// Fail.
179    pub async fn fail(
180        mut self,
181        error: String,
182        executor: &mut sqlx::PgConnection,
183    ) -> Result<WorkflowMachine<state::Failed>> {
184        self.run.status = RunStatus::Failed;
185        self.run.finished_at = Some(Utc::now());
186        self.run.updated_at = Utc::now();
187        self.run.error = Some(error);
188
189        crate::persistence::persist_run(&mut self.run, executor).await?;
190
191        Ok(WorkflowMachine {
192            run: self.run,
193            _state: PhantomData,
194        })
195    }
196
197    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
198    /// Abort.
199    pub async fn abort(
200        mut self,
201        executor: &mut sqlx::PgConnection,
202    ) -> Result<WorkflowMachine<state::Aborted>> {
203        self.run.status = RunStatus::Aborted;
204        self.run.finished_at = Some(Utc::now());
205        self.run.updated_at = Utc::now();
206
207        crate::persistence::persist_run(&mut self.run, executor).await?;
208
209        Ok(WorkflowMachine {
210            run: self.run,
211            _state: PhantomData,
212        })
213    }
214}
215
216#[allow(dead_code)]
217impl WorkflowMachine<state::Running> {
218    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
219    /// Succeed.
220    pub async fn succeed(
221        mut self,
222        executor: &mut sqlx::PgConnection,
223    ) -> Result<WorkflowMachine<state::Succeeded>> {
224        self.run.status = RunStatus::Succeeded;
225        self.run.finished_at = Some(Utc::now());
226        self.run.updated_at = Utc::now();
227
228        crate::persistence::persist_run(&mut self.run, executor).await?;
229
230        Ok(WorkflowMachine {
231            run: self.run,
232            _state: PhantomData,
233        })
234    }
235
236    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
237    /// Fail.
238    pub async fn fail(
239        mut self,
240        error: String,
241        executor: &mut sqlx::PgConnection,
242    ) -> Result<WorkflowMachine<state::Failed>> {
243        self.run.status = RunStatus::Failed;
244        self.run.finished_at = Some(Utc::now());
245        self.run.updated_at = Utc::now();
246        self.run.error = Some(error);
247
248        crate::persistence::persist_run(&mut self.run, executor).await?;
249
250        Ok(WorkflowMachine {
251            run: self.run,
252            _state: PhantomData,
253        })
254    }
255
256    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
257    /// Abort.
258    pub async fn abort(
259        mut self,
260        executor: &mut sqlx::PgConnection,
261    ) -> Result<WorkflowMachine<state::Aborted>> {
262        self.run.status = RunStatus::Aborted;
263        self.run.finished_at = Some(Utc::now());
264        self.run.updated_at = Utc::now();
265
266        crate::persistence::persist_run(&mut self.run, executor).await?;
267
268        Ok(WorkflowMachine {
269            run: self.run,
270            _state: PhantomData,
271        })
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use stormchaser_model::RunId;
279
280    fn dummy_run() -> WorkflowRun {
281        WorkflowRun {
282            id: RunId::new_v4(),
283            workflow_name: "test".to_string(),
284            initiating_user: "u".to_string(),
285            status: RunStatus::Succeeded, // start with something else
286            created_at: Utc::now(),
287            started_resolving_at: None,
288            started_at: None,
289            finished_at: None,
290            updated_at: Utc::now(),
291            repo_url: "https://example.com/repo".to_string(),
292            workflow_path: "workflow.yaml".to_string(),
293            version: 1,
294            fencing_token: 1,
295            git_ref: "main".to_string(),
296            error: None,
297        }
298    }
299
300    #[test]
301    fn test_workflow_machine_new() {
302        let run = dummy_run();
303        let machine: WorkflowMachine<state::Queued> = WorkflowMachine::new(run.clone());
304        assert_eq!(machine.run.status, RunStatus::Queued);
305        assert_eq!(machine.run.id, run.id);
306    }
307
308    #[test]
309    fn test_workflow_machine_new_from_run() {
310        let run = dummy_run();
311        let machine: WorkflowMachine<state::Succeeded> = WorkflowMachine::new_from_run(run.clone());
312        assert_eq!(machine.run.status, RunStatus::Succeeded);
313    }
314
315    #[test]
316    fn test_workflow_machine_into_run() {
317        let run = dummy_run();
318        let machine: WorkflowMachine<state::Succeeded> = WorkflowMachine::new_from_run(run.clone());
319        let extracted = machine.into_run();
320        assert_eq!(extracted.id, run.id);
321        assert_eq!(extracted.status, RunStatus::Succeeded);
322    }
323}