Skip to main content

stormchaser_engine/handler/workflow/
timeout.rs

1use crate::handler::{archive_workflow, fetch_run};
2use crate::workflow_machine::{state, WorkflowMachine};
3use anyhow::Result;
4use chrono::Utc;
5use sqlx::PgPool;
6use std::sync::Arc;
7use stormchaser_model::events::WorkflowAbortedEvent;
8use stormchaser_model::events::{EventSource, EventType, SchemaVersion, WorkflowEventType};
9use stormchaser_model::step::{StepInstance, StepStatus};
10use stormchaser_model::workflow::RunStatus;
11use stormchaser_model::RunId;
12use stormchaser_tls::TlsReloader;
13use tracing::info;
14
15#[tracing::instrument(skip(pool, nats_client, _tls_reloader), fields(run_id = %run_id))]
16/// Handle workflow timeout.
17pub async fn handle_workflow_timeout(
18    run_id: RunId,
19    pool: PgPool,
20    nats_client: async_nats::Client,
21    _tls_reloader: Arc<TlsReloader>,
22) -> Result<()> {
23    info!("Workflow {} timed out, aborting", run_id);
24
25    let run = fetch_run(run_id, &pool).await?;
26    if matches!(
27        run.status,
28        RunStatus::Succeeded | RunStatus::Failed | RunStatus::Aborted
29    ) {
30        return Ok(());
31    }
32
33    // 1. Mark Workflow as Aborted
34    let mut machine_run = run.clone();
35    machine_run.error = Some("Workflow timed out".to_string());
36
37    match run.status {
38        RunStatus::Queued => {
39            WorkflowMachine::<state::Queued>::new_from_run(machine_run)
40                .abort(&mut *pool.acquire().await?)
41                .await?;
42        }
43        RunStatus::Resolving => {
44            WorkflowMachine::<state::Resolving>::new_from_run(machine_run)
45                .abort(&mut *pool.acquire().await?)
46                .await?;
47        }
48        RunStatus::StartPending => {
49            WorkflowMachine::<state::StartPending>::new_from_run(machine_run)
50                .abort(&mut *pool.acquire().await?)
51                .await?;
52        }
53        RunStatus::Running => {
54            WorkflowMachine::<state::Running>::new_from_run(machine_run)
55                .abort(&mut *pool.acquire().await?)
56                .await?;
57        }
58        _ => {} // Should not happen due to check above
59    };
60
61    // 2. Mark all non-terminal steps as failed
62    let steps: Vec<StepInstance> = crate::db::get_step_instances_by_run_id(&pool, run_id).await?;
63
64    let mut tx = pool.begin().await?;
65    for step in steps {
66        match step.status {
67            StepStatus::Pending => {
68                crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(step)
69                    .fail("Workflow timed out".to_string(), None, &mut *tx)
70                    .await?;
71            }
72            StepStatus::UnpackingSfs => {
73                crate::step_machine::StepMachine::<crate::step_machine::state::UnpackingSfs>::from_instance(step)
74                    .fail("Workflow timed out".to_string(), None, &mut *tx)
75                    .await?;
76            }
77            StepStatus::Running => {
78                crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(step)
79                    .fail("Workflow timed out".to_string(), None, &mut *tx)
80                    .await?;
81            }
82            StepStatus::PackingSfs => {
83                crate::step_machine::StepMachine::<crate::step_machine::state::PackingSfs>::from_instance(step)
84                    .fail("Workflow timed out".to_string(), None, &mut *tx)
85                    .await?;
86            }
87            StepStatus::WaitingForEvent => {
88                crate::step_machine::StepMachine::<crate::step_machine::state::WaitingForEvent>::from_instance(step)
89                    .fail("Workflow timed out".to_string(), None, &mut *tx)
90                    .await?;
91            }
92            _ => {}
93        }
94    }
95    tx.commit().await?;
96
97    // 3. Publish abort event
98    let event = WorkflowAbortedEvent {
99        run_id,
100        event_type: EventType::Workflow(WorkflowEventType::Aborted),
101        timestamp: Utc::now(),
102    };
103    let js = async_nats::jetstream::new(nats_client);
104    use stormchaser_model::nats::NatsSubject;
105    stormchaser_model::nats::publish_cloudevent(
106        &js,
107        NatsSubject::RunAborted,
108        EventType::Workflow(WorkflowEventType::Aborted),
109        EventSource::System,
110        serde_json::to_value(event).unwrap(),
111        Some(SchemaVersion::new("1.0".to_string())),
112        None,
113    )
114    .await?;
115
116    // 4. Archive
117    archive_workflow(run_id, pool).await?;
118
119    Ok(())
120}