1use anyhow::Result;
2use chrono::Utc;
3use stormchaser_model::step::{StepInstance, StepStatus};
4
5#[allow(dead_code)]
7pub mod state {
8 pub struct Pending;
10 pub struct UnpackingSfs;
12 pub struct Running;
14 pub struct PackingSfs;
16 pub struct WaitingForEvent;
18 pub struct Succeeded;
20 pub struct Failed;
22 pub struct Skipped;
24 pub struct FailedIgnored;
26 pub struct Aborted;
28}
29
30pub struct StepMachine<S> {
32 pub instance: StepInstance,
34 _state: std::marker::PhantomData<S>,
35}
36
37impl<S> StepMachine<S> {
38 pub fn from_instance(instance: StepInstance) -> Self {
40 Self {
41 instance,
42 _state: std::marker::PhantomData,
43 }
44 }
45}
46
47#[allow(dead_code)]
48impl StepMachine<state::Pending> {
49 pub fn new(instance: StepInstance) -> Self {
51 let mut instance = instance;
53 instance.status = StepStatus::Pending;
54
55 StepMachine {
56 instance,
57 _state: std::marker::PhantomData,
58 }
59 }
60
61 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
62 pub async fn start(
64 mut self,
65 runner_id: String,
66 executor: &mut sqlx::PgConnection,
67 ) -> Result<StepMachine<state::Running>> {
68 self.instance.status = StepStatus::Running;
69 self.instance.started_at = Some(Utc::now());
70 self.instance.runner_id = Some(runner_id);
71
72 crate::persistence::persist_step_instance(&self.instance, executor).await?;
73
74 Ok(StepMachine {
75 instance: self.instance,
76 _state: std::marker::PhantomData,
77 })
78 }
79
80 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
81 pub async fn start_unpacking(
83 mut self,
84 runner_id: String,
85 executor: &mut sqlx::PgConnection,
86 ) -> Result<StepMachine<state::UnpackingSfs>> {
87 self.instance.status = StepStatus::UnpackingSfs;
88 self.instance.started_at = Some(Utc::now());
89 self.instance.runner_id = Some(runner_id);
90
91 crate::persistence::persist_step_instance(&self.instance, executor).await?;
92
93 Ok(StepMachine {
94 instance: self.instance,
95 _state: std::marker::PhantomData,
96 })
97 }
98
99 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
100 pub async fn fail(
102 mut self,
103 error: String,
104 exit_code: Option<i32>,
105 executor: &mut sqlx::PgConnection,
106 ) -> Result<StepMachine<state::Failed>> {
107 self.instance.status = StepStatus::Failed;
108 self.instance.finished_at = Some(Utc::now());
109 self.instance.error = Some(error);
110 self.instance.exit_code = exit_code;
111
112 crate::persistence::persist_step_instance(&self.instance, executor).await?;
113
114 Ok(StepMachine {
115 instance: self.instance,
116 _state: std::marker::PhantomData,
117 })
118 }
119}
120
121#[allow(dead_code)]
122impl StepMachine<state::UnpackingSfs> {
123 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
124 pub async fn start_running(
126 mut self,
127 executor: &mut sqlx::PgConnection,
128 ) -> Result<StepMachine<state::Running>> {
129 self.instance.status = StepStatus::Running;
130
131 crate::persistence::persist_step_instance(&self.instance, executor).await?;
132
133 Ok(StepMachine {
134 instance: self.instance,
135 _state: std::marker::PhantomData,
136 })
137 }
138
139 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
140 pub async fn fail(
142 mut self,
143 error: String,
144 exit_code: Option<i32>,
145 executor: &mut sqlx::PgConnection,
146 ) -> Result<StepMachine<state::Failed>> {
147 self.instance.status = StepStatus::Failed;
148 self.instance.finished_at = Some(Utc::now());
149 self.instance.error = Some(error);
150 self.instance.exit_code = exit_code;
151
152 crate::persistence::persist_step_instance(&self.instance, executor).await?;
153
154 Ok(StepMachine {
155 instance: self.instance,
156 _state: std::marker::PhantomData,
157 })
158 }
159
160 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
161 pub async fn skip(
163 mut self,
164 executor: &mut sqlx::PgConnection,
165 ) -> Result<StepMachine<state::Skipped>> {
166 self.instance.status = StepStatus::Skipped;
167 self.instance.finished_at = Some(Utc::now());
168
169 crate::persistence::persist_step_instance(&self.instance, executor).await?;
170
171 Ok(StepMachine {
172 instance: self.instance,
173 _state: std::marker::PhantomData,
174 })
175 }
176
177 pub fn into_instance(self) -> StepInstance {
179 self.instance
180 }
181}
182
183#[allow(dead_code)]
184impl StepMachine<state::Running> {
185 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
186 pub async fn start_packing(
188 mut self,
189 executor: &mut sqlx::PgConnection,
190 ) -> Result<StepMachine<state::PackingSfs>> {
191 self.instance.status = StepStatus::PackingSfs;
192
193 crate::persistence::persist_step_instance(&self.instance, executor).await?;
194
195 Ok(StepMachine {
196 instance: self.instance,
197 _state: std::marker::PhantomData,
198 })
199 }
200
201 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
202 pub async fn succeed(
204 mut self,
205 executor: &mut sqlx::PgConnection,
206 ) -> Result<StepMachine<state::Succeeded>> {
207 self.instance.status = StepStatus::Succeeded;
208 self.instance.finished_at = Some(Utc::now());
209 self.instance.exit_code = Some(0);
210
211 crate::persistence::persist_step_instance(&self.instance, executor).await?;
212
213 Ok(StepMachine {
214 instance: self.instance,
215 _state: std::marker::PhantomData,
216 })
217 }
218
219 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
220 pub async fn fail(
222 mut self,
223 error: String,
224 exit_code: Option<i32>,
225 executor: &mut sqlx::PgConnection,
226 ) -> Result<StepMachine<state::Failed>> {
227 self.instance.status = StepStatus::Failed;
228 self.instance.finished_at = Some(Utc::now());
229 self.instance.error = Some(error);
230 self.instance.exit_code = exit_code;
231
232 crate::persistence::persist_step_instance(&self.instance, executor).await?;
233
234 Ok(StepMachine {
235 instance: self.instance,
236 _state: std::marker::PhantomData,
237 })
238 }
239
240 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
241 pub async fn wait_for_event(
243 mut self,
244 executor: &mut sqlx::PgConnection,
245 ) -> Result<StepMachine<state::WaitingForEvent>> {
246 self.instance.status = StepStatus::WaitingForEvent;
247
248 crate::persistence::persist_step_instance(&self.instance, executor).await?;
249
250 Ok(StepMachine {
251 instance: self.instance,
252 _state: std::marker::PhantomData,
253 })
254 }
255
256 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
257 pub async fn abort(
259 mut self,
260 executor: &mut sqlx::PgConnection,
261 ) -> Result<StepMachine<state::Aborted>> {
262 self.instance.status = StepStatus::Aborted;
263 self.instance.finished_at = Some(Utc::now());
264
265 crate::persistence::persist_step_instance(&self.instance, executor).await?;
266
267 Ok(StepMachine {
268 instance: self.instance,
269 _state: std::marker::PhantomData,
270 })
271 }
272
273 pub fn into_instance(self) -> StepInstance {
275 self.instance
276 }
277}
278
279#[allow(dead_code)]
280impl StepMachine<state::PackingSfs> {
281 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
282 pub async fn succeed(
284 mut self,
285 executor: &mut sqlx::PgConnection,
286 ) -> Result<StepMachine<state::Succeeded>> {
287 self.instance.status = StepStatus::Succeeded;
288 self.instance.finished_at = Some(Utc::now());
289 self.instance.exit_code = Some(0);
290
291 crate::persistence::persist_step_instance(&self.instance, executor).await?;
292
293 Ok(StepMachine {
294 instance: self.instance,
295 _state: std::marker::PhantomData,
296 })
297 }
298
299 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
300 pub async fn fail(
302 mut self,
303 error: String,
304 exit_code: Option<i32>,
305 executor: &mut sqlx::PgConnection,
306 ) -> Result<StepMachine<state::Failed>> {
307 self.instance.status = StepStatus::Failed;
308 self.instance.finished_at = Some(Utc::now());
309 self.instance.error = Some(error);
310 self.instance.exit_code = exit_code;
311
312 crate::persistence::persist_step_instance(&self.instance, executor).await?;
313
314 Ok(StepMachine {
315 instance: self.instance,
316 _state: std::marker::PhantomData,
317 })
318 }
319
320 pub fn into_instance(self) -> StepInstance {
322 self.instance
323 }
324}
325
326#[allow(dead_code)]
327impl StepMachine<state::WaitingForEvent> {
328 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
329 pub async fn resume(
331 mut self,
332 executor: &mut sqlx::PgConnection,
333 ) -> Result<StepMachine<state::Running>> {
334 self.instance.status = StepStatus::Running;
335
336 crate::persistence::persist_step_instance(&self.instance, executor).await?;
337
338 Ok(StepMachine {
339 instance: self.instance,
340 _state: std::marker::PhantomData,
341 })
342 }
343
344 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
345 pub async fn reschedule(
347 mut self,
348 executor: &mut sqlx::PgConnection,
349 ) -> Result<StepMachine<state::Pending>> {
350 self.instance.status = StepStatus::Pending;
351
352 crate::persistence::persist_step_instance(&self.instance, executor).await?;
353
354 Ok(StepMachine {
355 instance: self.instance,
356 _state: std::marker::PhantomData,
357 })
358 }
359
360 pub fn into_instance(self) -> StepInstance {
362 self.instance
363 }
364
365 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
366 pub async fn fail(
368 mut self,
369 error: String,
370 exit_code: Option<i32>,
371 executor: &mut sqlx::PgConnection,
372 ) -> Result<StepMachine<state::Failed>> {
373 self.instance.status = StepStatus::Failed;
374 self.instance.finished_at = Some(Utc::now());
375 self.instance.error = Some(error);
376 self.instance.exit_code = exit_code;
377
378 crate::persistence::persist_step_instance(&self.instance, executor).await?;
379
380 Ok(StepMachine {
381 instance: self.instance,
382 _state: std::marker::PhantomData,
383 })
384 }
385}
386
387#[allow(dead_code)]
388impl StepMachine<state::Failed> {
389 #[tracing::instrument(skip(self, executor), fields(run_id = %self.instance.run_id, step_id = %self.instance.id))]
390 pub async fn ignore_failure(
392 mut self,
393 executor: &mut sqlx::PgConnection,
394 ) -> Result<StepMachine<state::FailedIgnored>> {
395 self.instance.status = StepStatus::FailedIgnored;
396
397 crate::persistence::persist_step_instance(&self.instance, executor).await?;
398
399 Ok(StepMachine {
400 instance: self.instance,
401 _state: std::marker::PhantomData,
402 })
403 }
404
405 pub fn into_instance(self) -> StepInstance {
407 self.instance
408 }
409}
410
411#[allow(dead_code)]
412impl StepMachine<state::Succeeded> {
413 pub fn into_instance(self) -> StepInstance {
415 self.instance
416 }
417}
418
419#[allow(dead_code)]
420impl StepMachine<state::Skipped> {
421 pub fn into_instance(self) -> StepInstance {
423 self.instance
424 }
425}
426
427#[allow(dead_code)]
428impl StepMachine<state::FailedIgnored> {
429 pub fn into_instance(self) -> StepInstance {
431 self.instance
432 }
433}
434
435#[allow(dead_code)]
436impl StepMachine<state::Aborted> {
437 pub fn into_instance(self) -> StepInstance {
439 self.instance
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use stormchaser_model::{RunId, StepInstanceId};
447 use uuid::Uuid;
448
449 fn dummy_instance() -> StepInstance {
450 StepInstance {
451 id: StepInstanceId::new(Uuid::new_v4()),
452 run_id: RunId::new(Uuid::new_v4()),
453 step_name: "test".to_string(),
454 step_type: "docker".to_string(),
455 status: StepStatus::Running, iteration_index: None,
457 runner_id: None,
458 affinity_context: None,
459 started_at: None,
460 finished_at: None,
461 exit_code: None,
462 error: None,
463 spec: serde_json::json!({}),
464 params: serde_json::json!({}),
465 created_at: Utc::now(),
466 }
467 }
468
469 #[test]
470 fn test_step_machine_new() {
471 let instance = dummy_instance();
472 let machine: StepMachine<state::Pending> = StepMachine::new(instance.clone());
473 assert_eq!(machine.instance.status, StepStatus::Pending);
474 assert_eq!(machine.instance.id, instance.id);
475 }
476
477 #[test]
478 fn test_step_machine_from_instance() {
479 let instance = dummy_instance();
480 let machine: StepMachine<state::Running> = StepMachine::from_instance(instance.clone());
481 assert_eq!(machine.instance.status, StepStatus::Running);
482 }
483
484 #[test]
485 fn test_step_machine_into_instance() {
486 let instance = dummy_instance();
487 let machine: StepMachine<state::Running> = StepMachine::from_instance(instance.clone());
488 let extracted = machine.into_instance();
489 assert_eq!(extracted.id, instance.id);
490 assert_eq!(extracted.status, StepStatus::Running);
491 }
492}