stormchaser_engine/
persistence.rs1use anyhow::Result;
2use stormchaser_model::step::{StepInstance, StepStatus};
3use stormchaser_model::workflow::WorkflowRun;
4
5pub async fn persist_run(run: &mut WorkflowRun, executor: &mut sqlx::PgConnection) -> Result<()> {
7 let result = crate::db::update_workflow_run_status_full(
8 executor,
9 &run.status,
10 run.updated_at,
11 run.started_resolving_at,
12 run.started_at,
13 run.finished_at,
14 run.error.as_deref(),
15 run.id,
16 run.version,
17 )
18 .await;
19
20 let res = match result {
21 Ok(r) => r,
22 Err(e) => {
23 return Err(anyhow::anyhow!(
24 "Failed to persist workflow run state for {}: {:?}",
25 run.id,
26 e
27 ))
28 }
29 };
30
31 if res.rows_affected() == 0 {
32 anyhow::bail!(
33 "Optimistic concurrency control failure for run {}: version mismatch (expected {})",
34 run.id,
35 run.version
36 );
37 }
38
39 run.version += 1;
40
41 Ok(())
42}
43
44pub async fn persist_step_instance(
46 instance: &StepInstance,
47 executor: &mut sqlx::PgConnection,
48) -> Result<()> {
49 match instance.status {
50 StepStatus::Running | StepStatus::UnpackingSfs => {
51 crate::db::update_step_instance_running(
52 &mut *executor,
53 &instance.status,
54 instance.runner_id.as_deref(),
55 instance.id,
56 )
57 .await?;
58 }
59 StepStatus::Succeeded | StepStatus::Failed => {
60 crate::db::complete_step_instance(
61 &mut *executor,
62 &instance.status,
63 instance.exit_code,
64 instance.runner_id.as_deref(),
65 instance.id,
66 )
67 .await?;
68 }
69 StepStatus::Skipped | StepStatus::Aborted => {
70 crate::db::update_step_instance_terminal(&mut *executor, &instance.status, instance.id)
71 .await?;
72 }
73 _ => {
74 crate::db::update_step_instance_status(&mut *executor, &instance.status, instance.id)
75 .await?;
76 }
77 }
78
79 crate::db::record_step_status_history(executor, instance.id, &instance.status).await?;
81
82 Ok(())
83}