Skip to main content

stormchaser_engine/handler/workflow/
timeout.rs

1use crate::handler::{archive_workflow, fetch_run};
2use crate::workflow_machine::{state, WorkflowMachine};
3use anyhow::Result;
4use chrono::Utc;
5use sqlx::PgPool;
6use std::sync::Arc;
7use stormchaser_model::events::WorkflowAbortedEvent;
8use stormchaser_model::step::{StepInstance, StepStatus};
9use stormchaser_model::workflow::RunStatus;
10use stormchaser_model::RunId;
11use stormchaser_tls::TlsReloader;
12use tracing::info;
13
14#[tracing::instrument(skip(pool, nats_client, _tls_reloader), fields(run_id = %run_id))]
15/// Handle workflow timeout.
16pub async fn handle_workflow_timeout(
17    run_id: RunId,
18    pool: PgPool,
19    nats_client: async_nats::Client,
20    _tls_reloader: Arc<TlsReloader>,
21) -> Result<()> {
22    info!("Workflow {} timed out, aborting", run_id);
23
24    let run = fetch_run(run_id, &pool).await?;
25    if matches!(
26        run.status,
27        RunStatus::Succeeded | RunStatus::Failed | RunStatus::Aborted
28    ) {
29        return Ok(());
30    }
31
32    // 1. Mark Workflow as Aborted
33    let mut machine_run = run.clone();
34    machine_run.error = Some("Workflow timed out".to_string());
35
36    match run.status {
37        RunStatus::Queued => {
38            WorkflowMachine::<state::Queued>::new_from_run(machine_run)
39                .abort(&mut *pool.acquire().await?)
40                .await?;
41        }
42        RunStatus::Resolving => {
43            WorkflowMachine::<state::Resolving>::new_from_run(machine_run)
44                .abort(&mut *pool.acquire().await?)
45                .await?;
46        }
47        RunStatus::StartPending => {
48            WorkflowMachine::<state::StartPending>::new_from_run(machine_run)
49                .abort(&mut *pool.acquire().await?)
50                .await?;
51        }
52        RunStatus::Running => {
53            WorkflowMachine::<state::Running>::new_from_run(machine_run)
54                .abort(&mut *pool.acquire().await?)
55                .await?;
56        }
57        _ => {} // Should not happen due to check above
58    };
59
60    // 2. Mark all non-terminal steps as failed
61    let steps: Vec<StepInstance> = crate::db::get_step_instances_by_run_id(&pool, run_id).await?;
62
63    let mut tx = pool.begin().await?;
64    for step in steps {
65        match step.status {
66            StepStatus::Pending => {
67                crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(step)
68                    .fail("Workflow timed out".to_string(), None, &mut *tx)
69                    .await?;
70            }
71            StepStatus::UnpackingSfs => {
72                crate::step_machine::StepMachine::<crate::step_machine::state::UnpackingSfs>::from_instance(step)
73                    .fail("Workflow timed out".to_string(), None, &mut *tx)
74                    .await?;
75            }
76            StepStatus::Running => {
77                crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(step)
78                    .fail("Workflow timed out".to_string(), None, &mut *tx)
79                    .await?;
80            }
81            StepStatus::PackingSfs => {
82                crate::step_machine::StepMachine::<crate::step_machine::state::PackingSfs>::from_instance(step)
83                    .fail("Workflow timed out".to_string(), None, &mut *tx)
84                    .await?;
85            }
86            StepStatus::WaitingForEvent => {
87                crate::step_machine::StepMachine::<crate::step_machine::state::WaitingForEvent>::from_instance(step)
88                    .fail("Workflow timed out".to_string(), None, &mut *tx)
89                    .await?;
90            }
91            _ => {}
92        }
93    }
94    tx.commit().await?;
95
96    // 3. Publish abort event
97    let event = WorkflowAbortedEvent {
98        run_id,
99        event_type: "workflow_aborted".to_string(),
100        timestamp: Utc::now(),
101    };
102    let js = async_nats::jetstream::new(nats_client);
103    stormchaser_model::nats::publish_cloudevent(
104        &js,
105        "stormchaser.v1.run.aborted",
106        "stormchaser.v1.run.aborted",
107        "/stormchaser",
108        serde_json::to_value(event).unwrap(),
109        Some("1.0"),
110        None,
111    )
112    .await?;
113
114    // 4. Archive
115    archive_workflow(run_id, pool).await?;
116
117    Ok(())
118}