Skip to main content

stormchaser_engine/
persistence.rs

1use anyhow::Result;
2use stormchaser_model::step::{StepInstance, StepStatus};
3use stormchaser_model::workflow::WorkflowRun;
4
5/// Persist run.
6pub async fn persist_run(run: &mut WorkflowRun, executor: &mut sqlx::PgConnection) -> Result<()> {
7    let result = crate::db::update_workflow_run_status_full(
8        executor,
9        &run.status,
10        run.updated_at,
11        run.started_resolving_at,
12        run.started_at,
13        run.finished_at,
14        run.error.as_deref(),
15        run.id,
16        run.version,
17    )
18    .await;
19
20    let res = match result {
21        Ok(r) => r,
22        Err(e) => {
23            return Err(anyhow::anyhow!(
24                "Failed to persist workflow run state for {}: {:?}",
25                run.id,
26                e
27            ))
28        }
29    };
30
31    if res.rows_affected() == 0 {
32        anyhow::bail!(
33            "Optimistic concurrency control failure for run {}: version mismatch (expected {})",
34            run.id,
35            run.version
36        );
37    }
38
39    run.version += 1;
40
41    Ok(())
42}
43
44/// Persists a `StepInstance` state change to the database, along with logs and outputs.
45pub async fn persist_step_instance(
46    instance: &StepInstance,
47    executor: &mut sqlx::PgConnection,
48) -> Result<()> {
49    match instance.status {
50        StepStatus::Running | StepStatus::UnpackingSfs => {
51            crate::db::update_step_instance_running(
52                &mut *executor,
53                &instance.status,
54                instance.runner_id.as_deref(),
55                instance.id,
56            )
57            .await?;
58        }
59        StepStatus::Succeeded | StepStatus::Failed => {
60            crate::db::complete_step_instance(
61                &mut *executor,
62                &instance.status,
63                instance.exit_code,
64                instance.runner_id.as_deref(),
65                instance.id,
66            )
67            .await?;
68        }
69        StepStatus::Skipped | StepStatus::Aborted => {
70            crate::db::update_step_instance_terminal(&mut *executor, &instance.status, instance.id)
71                .await?;
72        }
73        _ => {
74            crate::db::update_step_instance_status(&mut *executor, &instance.status, instance.id)
75                .await?;
76        }
77    }
78
79    // Record status history
80    crate::db::record_step_status_history(executor, instance.id, &instance.status).await?;
81
82    Ok(())
83}