Skip to main content

stormchaser_engine/
workflow_machine.rs

1use anyhow::Result;
2use chrono::Utc;
3use std::marker::PhantomData;
4use stormchaser_model::workflow::{RunStatus, WorkflowRun};
5
6/// State markers for the workflow typestate pattern
7#[allow(dead_code)]
8pub mod state {
9    /// State representing a workflow run that is queued and waiting for resolution.
10    pub struct Queued;
11    /// State representing a workflow run that is resolving its source repository and dependencies.
12    pub struct Resolving;
13    /// State representing a workflow run that has been resolved and is pending start.
14    pub struct StartPending;
15    /// State representing a workflow run that is currently executing.
16    pub struct Running;
17    /// State representing a workflow run that has successfully completed.
18    pub struct Succeeded;
19    /// State representing a workflow run that has failed.
20    pub struct Failed;
21    /// State representing a workflow run that has been aborted.
22    pub struct Aborted;
23}
24
25/// A state machine for managing the lifecycle of a `WorkflowRun`.
26#[allow(dead_code)]
27pub struct WorkflowMachine<S> {
28    /// The underlying workflow run data model.
29    pub run: WorkflowRun,
30    _state: PhantomData<S>,
31}
32
33#[allow(dead_code)]
34impl<S> WorkflowMachine<S> {
35    /// New from run.
36    pub fn new_from_run(run: WorkflowRun) -> Self {
37        Self {
38            run,
39            _state: PhantomData,
40        }
41    }
42
43    /// Into run.
44    pub fn into_run(self) -> WorkflowRun {
45        self.run
46    }
47}
48
49#[allow(dead_code)]
50impl WorkflowMachine<state::Queued> {
51    /// New.
52    pub fn new(run: WorkflowRun) -> Self {
53        let mut run = run;
54        run.status = RunStatus::Queued;
55
56        Self {
57            run,
58            _state: PhantomData,
59        }
60    }
61
62    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
63    /// Start resolving.
64    pub async fn start_resolving(
65        mut self,
66        executor: &mut sqlx::PgConnection,
67    ) -> Result<WorkflowMachine<state::Resolving>> {
68        self.run.status = RunStatus::Resolving;
69        self.run.started_resolving_at = Some(Utc::now());
70        self.run.updated_at = Utc::now();
71
72        crate::persistence::persist_run(&mut self.run, executor).await?;
73
74        Ok(WorkflowMachine {
75            run: self.run,
76            _state: PhantomData,
77        })
78    }
79
80    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
81    /// Abort.
82    pub async fn abort(
83        mut self,
84        executor: &mut sqlx::PgConnection,
85    ) -> Result<WorkflowMachine<state::Aborted>> {
86        self.run.status = RunStatus::Aborted;
87        self.run.finished_at = Some(Utc::now());
88        self.run.updated_at = Utc::now();
89
90        crate::persistence::persist_run(&mut self.run, executor).await?;
91
92        Ok(WorkflowMachine {
93            run: self.run,
94            _state: PhantomData,
95        })
96    }
97}
98
99#[allow(dead_code)]
100impl WorkflowMachine<state::Resolving> {
101    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
102    /// Start pending.
103    pub async fn start_pending(
104        mut self,
105        executor: &mut sqlx::PgConnection,
106    ) -> Result<WorkflowMachine<state::StartPending>> {
107        self.run.status = RunStatus::StartPending;
108        self.run.updated_at = Utc::now();
109
110        crate::persistence::persist_run(&mut self.run, executor).await?;
111
112        Ok(WorkflowMachine {
113            run: self.run,
114            _state: PhantomData,
115        })
116    }
117
118    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
119    /// Fail.
120    pub async fn fail(
121        mut self,
122        error: String,
123        executor: &mut sqlx::PgConnection,
124    ) -> Result<WorkflowMachine<state::Failed>> {
125        self.run.status = RunStatus::Failed;
126        self.run.finished_at = Some(Utc::now());
127        self.run.updated_at = Utc::now();
128        self.run.error = Some(error);
129
130        crate::persistence::persist_run(&mut self.run, executor).await?;
131
132        Ok(WorkflowMachine {
133            run: self.run,
134            _state: PhantomData,
135        })
136    }
137
138    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
139    /// Abort.
140    pub async fn abort(
141        mut self,
142        executor: &mut sqlx::PgConnection,
143    ) -> Result<WorkflowMachine<state::Aborted>> {
144        self.run.status = RunStatus::Aborted;
145        self.run.finished_at = Some(Utc::now());
146        self.run.updated_at = Utc::now();
147
148        crate::persistence::persist_run(&mut self.run, executor).await?;
149
150        Ok(WorkflowMachine {
151            run: self.run,
152            _state: PhantomData,
153        })
154    }
155}
156
157#[allow(dead_code)]
158impl WorkflowMachine<state::StartPending> {
159    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
160    /// Start.
161    pub async fn start(
162        mut self,
163        executor: &mut sqlx::PgConnection,
164    ) -> Result<WorkflowMachine<state::Running>> {
165        self.run.status = RunStatus::Running;
166        self.run.started_at = Some(Utc::now());
167        self.run.updated_at = Utc::now();
168
169        crate::persistence::persist_run(&mut self.run, executor).await?;
170
171        Ok(WorkflowMachine {
172            run: self.run,
173            _state: PhantomData,
174        })
175    }
176
177    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
178    /// Fail.
179    pub async fn fail(
180        mut self,
181        error: String,
182        executor: &mut sqlx::PgConnection,
183    ) -> Result<WorkflowMachine<state::Failed>> {
184        self.run.status = RunStatus::Failed;
185        self.run.finished_at = Some(Utc::now());
186        self.run.updated_at = Utc::now();
187        self.run.error = Some(error);
188
189        crate::persistence::persist_run(&mut self.run, executor).await?;
190
191        Ok(WorkflowMachine {
192            run: self.run,
193            _state: PhantomData,
194        })
195    }
196
197    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
198    /// Abort.
199    pub async fn abort(
200        mut self,
201        executor: &mut sqlx::PgConnection,
202    ) -> Result<WorkflowMachine<state::Aborted>> {
203        self.run.status = RunStatus::Aborted;
204        self.run.finished_at = Some(Utc::now());
205        self.run.updated_at = Utc::now();
206
207        crate::persistence::persist_run(&mut self.run, executor).await?;
208
209        Ok(WorkflowMachine {
210            run: self.run,
211            _state: PhantomData,
212        })
213    }
214}
215
216#[allow(dead_code)]
217impl WorkflowMachine<state::Running> {
218    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
219    /// Succeed.
220    pub async fn succeed(
221        mut self,
222        executor: &mut sqlx::PgConnection,
223    ) -> Result<WorkflowMachine<state::Succeeded>> {
224        self.run.status = RunStatus::Succeeded;
225        self.run.finished_at = Some(Utc::now());
226        self.run.updated_at = Utc::now();
227
228        crate::persistence::persist_run(&mut self.run, executor).await?;
229
230        Ok(WorkflowMachine {
231            run: self.run,
232            _state: PhantomData,
233        })
234    }
235
236    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
237    /// Fail.
238    pub async fn fail(
239        mut self,
240        error: String,
241        executor: &mut sqlx::PgConnection,
242    ) -> Result<WorkflowMachine<state::Failed>> {
243        self.run.status = RunStatus::Failed;
244        self.run.finished_at = Some(Utc::now());
245        self.run.updated_at = Utc::now();
246        self.run.error = Some(error);
247
248        crate::persistence::persist_run(&mut self.run, executor).await?;
249
250        Ok(WorkflowMachine {
251            run: self.run,
252            _state: PhantomData,
253        })
254    }
255
256    #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
257    /// Abort.
258    pub async fn abort(
259        mut self,
260        executor: &mut sqlx::PgConnection,
261    ) -> Result<WorkflowMachine<state::Aborted>> {
262        self.run.status = RunStatus::Aborted;
263        self.run.finished_at = Some(Utc::now());
264        self.run.updated_at = Utc::now();
265
266        crate::persistence::persist_run(&mut self.run, executor).await?;
267
268        Ok(WorkflowMachine {
269            run: self.run,
270            _state: PhantomData,
271        })
272    }
273}