1use anyhow::Result;
2use chrono::Utc;
3use std::marker::PhantomData;
4use stormchaser_model::workflow::{RunStatus, WorkflowRun};
5
6#[allow(dead_code)]
8pub mod state {
9 pub struct Queued;
11 pub struct Resolving;
13 pub struct StartPending;
15 pub struct Running;
17 pub struct Succeeded;
19 pub struct Failed;
21 pub struct Aborted;
23}
24
25#[allow(dead_code)]
27pub struct WorkflowMachine<S> {
28 pub run: WorkflowRun,
30 _state: PhantomData<S>,
31}
32
33#[allow(dead_code)]
34impl<S> WorkflowMachine<S> {
35 pub fn new_from_run(run: WorkflowRun) -> Self {
37 Self {
38 run,
39 _state: PhantomData,
40 }
41 }
42
43 pub fn into_run(self) -> WorkflowRun {
45 self.run
46 }
47}
48
49#[allow(dead_code)]
50impl WorkflowMachine<state::Queued> {
51 pub fn new(run: WorkflowRun) -> Self {
53 let mut run = run;
54 run.status = RunStatus::Queued;
55
56 Self {
57 run,
58 _state: PhantomData,
59 }
60 }
61
62 #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
63 pub async fn start_resolving(
65 mut self,
66 executor: &mut sqlx::PgConnection,
67 ) -> Result<WorkflowMachine<state::Resolving>> {
68 self.run.status = RunStatus::Resolving;
69 self.run.started_resolving_at = Some(Utc::now());
70 self.run.updated_at = Utc::now();
71
72 crate::persistence::persist_run(&mut self.run, executor).await?;
73
74 Ok(WorkflowMachine {
75 run: self.run,
76 _state: PhantomData,
77 })
78 }
79
80 #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
81 pub async fn abort(
83 mut self,
84 executor: &mut sqlx::PgConnection,
85 ) -> Result<WorkflowMachine<state::Aborted>> {
86 self.run.status = RunStatus::Aborted;
87 self.run.finished_at = Some(Utc::now());
88 self.run.updated_at = Utc::now();
89
90 crate::persistence::persist_run(&mut self.run, executor).await?;
91
92 Ok(WorkflowMachine {
93 run: self.run,
94 _state: PhantomData,
95 })
96 }
97}
98
99#[allow(dead_code)]
100impl WorkflowMachine<state::Resolving> {
101 #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
102 pub async fn start_pending(
104 mut self,
105 executor: &mut sqlx::PgConnection,
106 ) -> Result<WorkflowMachine<state::StartPending>> {
107 self.run.status = RunStatus::StartPending;
108 self.run.updated_at = Utc::now();
109
110 crate::persistence::persist_run(&mut self.run, executor).await?;
111
112 Ok(WorkflowMachine {
113 run: self.run,
114 _state: PhantomData,
115 })
116 }
117
118 #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
119 pub async fn fail(
121 mut self,
122 error: String,
123 executor: &mut sqlx::PgConnection,
124 ) -> Result<WorkflowMachine<state::Failed>> {
125 self.run.status = RunStatus::Failed;
126 self.run.finished_at = Some(Utc::now());
127 self.run.updated_at = Utc::now();
128 self.run.error = Some(error);
129
130 crate::persistence::persist_run(&mut self.run, executor).await?;
131
132 Ok(WorkflowMachine {
133 run: self.run,
134 _state: PhantomData,
135 })
136 }
137
138 #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
139 pub async fn abort(
141 mut self,
142 executor: &mut sqlx::PgConnection,
143 ) -> Result<WorkflowMachine<state::Aborted>> {
144 self.run.status = RunStatus::Aborted;
145 self.run.finished_at = Some(Utc::now());
146 self.run.updated_at = Utc::now();
147
148 crate::persistence::persist_run(&mut self.run, executor).await?;
149
150 Ok(WorkflowMachine {
151 run: self.run,
152 _state: PhantomData,
153 })
154 }
155}
156
157#[allow(dead_code)]
158impl WorkflowMachine<state::StartPending> {
159 #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
160 pub async fn start(
162 mut self,
163 executor: &mut sqlx::PgConnection,
164 ) -> Result<WorkflowMachine<state::Running>> {
165 self.run.status = RunStatus::Running;
166 self.run.started_at = Some(Utc::now());
167 self.run.updated_at = Utc::now();
168
169 crate::persistence::persist_run(&mut self.run, executor).await?;
170
171 Ok(WorkflowMachine {
172 run: self.run,
173 _state: PhantomData,
174 })
175 }
176
177 #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
178 pub async fn fail(
180 mut self,
181 error: String,
182 executor: &mut sqlx::PgConnection,
183 ) -> Result<WorkflowMachine<state::Failed>> {
184 self.run.status = RunStatus::Failed;
185 self.run.finished_at = Some(Utc::now());
186 self.run.updated_at = Utc::now();
187 self.run.error = Some(error);
188
189 crate::persistence::persist_run(&mut self.run, executor).await?;
190
191 Ok(WorkflowMachine {
192 run: self.run,
193 _state: PhantomData,
194 })
195 }
196
197 #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
198 pub async fn abort(
200 mut self,
201 executor: &mut sqlx::PgConnection,
202 ) -> Result<WorkflowMachine<state::Aborted>> {
203 self.run.status = RunStatus::Aborted;
204 self.run.finished_at = Some(Utc::now());
205 self.run.updated_at = Utc::now();
206
207 crate::persistence::persist_run(&mut self.run, executor).await?;
208
209 Ok(WorkflowMachine {
210 run: self.run,
211 _state: PhantomData,
212 })
213 }
214}
215
216#[allow(dead_code)]
217impl WorkflowMachine<state::Running> {
218 #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
219 pub async fn succeed(
221 mut self,
222 executor: &mut sqlx::PgConnection,
223 ) -> Result<WorkflowMachine<state::Succeeded>> {
224 self.run.status = RunStatus::Succeeded;
225 self.run.finished_at = Some(Utc::now());
226 self.run.updated_at = Utc::now();
227
228 crate::persistence::persist_run(&mut self.run, executor).await?;
229
230 Ok(WorkflowMachine {
231 run: self.run,
232 _state: PhantomData,
233 })
234 }
235
236 #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
237 pub async fn fail(
239 mut self,
240 error: String,
241 executor: &mut sqlx::PgConnection,
242 ) -> Result<WorkflowMachine<state::Failed>> {
243 self.run.status = RunStatus::Failed;
244 self.run.finished_at = Some(Utc::now());
245 self.run.updated_at = Utc::now();
246 self.run.error = Some(error);
247
248 crate::persistence::persist_run(&mut self.run, executor).await?;
249
250 Ok(WorkflowMachine {
251 run: self.run,
252 _state: PhantomData,
253 })
254 }
255
256 #[tracing::instrument(skip(self, executor), fields(run_id = %self.run.id))]
257 pub async fn abort(
259 mut self,
260 executor: &mut sqlx::PgConnection,
261 ) -> Result<WorkflowMachine<state::Aborted>> {
262 self.run.status = RunStatus::Aborted;
263 self.run.finished_at = Some(Utc::now());
264 self.run.updated_at = Utc::now();
265
266 crate::persistence::persist_run(&mut self.run, executor).await?;
267
268 Ok(WorkflowMachine {
269 run: self.run,
270 _state: PhantomData,
271 })
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use stormchaser_model::RunId;
279
280 fn dummy_run() -> WorkflowRun {
281 WorkflowRun {
282 id: RunId::new_v4(),
283 workflow_name: "test".to_string(),
284 initiating_user: "u".to_string(),
285 status: RunStatus::Succeeded, created_at: Utc::now(),
287 started_resolving_at: None,
288 started_at: None,
289 finished_at: None,
290 updated_at: Utc::now(),
291 repo_url: "https://example.com/repo".to_string(),
292 workflow_path: "workflow.yaml".to_string(),
293 version: 1,
294 fencing_token: 1,
295 git_ref: "main".to_string(),
296 error: None,
297 }
298 }
299
300 #[test]
301 fn test_workflow_machine_new() {
302 let run = dummy_run();
303 let machine: WorkflowMachine<state::Queued> = WorkflowMachine::new(run.clone());
304 assert_eq!(machine.run.status, RunStatus::Queued);
305 assert_eq!(machine.run.id, run.id);
306 }
307
308 #[test]
309 fn test_workflow_machine_new_from_run() {
310 let run = dummy_run();
311 let machine: WorkflowMachine<state::Succeeded> = WorkflowMachine::new_from_run(run.clone());
312 assert_eq!(machine.run.status, RunStatus::Succeeded);
313 }
314
315 #[test]
316 fn test_workflow_machine_into_run() {
317 let run = dummy_run();
318 let machine: WorkflowMachine<state::Succeeded> = WorkflowMachine::new_from_run(run.clone());
319 let extracted = machine.into_run();
320 assert_eq!(extracted.id, run.id);
321 assert_eq!(extracted.status, RunStatus::Succeeded);
322 }
323}