stormchaser_engine/handler/workflow/
timeout.rs1use crate::handler::{archive_workflow, fetch_run};
2use crate::workflow_machine::{state, WorkflowMachine};
3use anyhow::Result;
4use chrono::Utc;
5use sqlx::PgPool;
6use std::sync::Arc;
7use stormchaser_model::events::WorkflowAbortedEvent;
8use stormchaser_model::step::{StepInstance, StepStatus};
9use stormchaser_model::workflow::RunStatus;
10use stormchaser_model::RunId;
11use stormchaser_tls::TlsReloader;
12use tracing::info;
13
14#[tracing::instrument(skip(pool, nats_client, _tls_reloader), fields(run_id = %run_id))]
15pub async fn handle_workflow_timeout(
17 run_id: RunId,
18 pool: PgPool,
19 nats_client: async_nats::Client,
20 _tls_reloader: Arc<TlsReloader>,
21) -> Result<()> {
22 info!("Workflow {} timed out, aborting", run_id);
23
24 let run = fetch_run(run_id, &pool).await?;
25 if matches!(
26 run.status,
27 RunStatus::Succeeded | RunStatus::Failed | RunStatus::Aborted
28 ) {
29 return Ok(());
30 }
31
32 let mut machine_run = run.clone();
34 machine_run.error = Some("Workflow timed out".to_string());
35
36 match run.status {
37 RunStatus::Queued => {
38 WorkflowMachine::<state::Queued>::new_from_run(machine_run)
39 .abort(&mut *pool.acquire().await?)
40 .await?;
41 }
42 RunStatus::Resolving => {
43 WorkflowMachine::<state::Resolving>::new_from_run(machine_run)
44 .abort(&mut *pool.acquire().await?)
45 .await?;
46 }
47 RunStatus::StartPending => {
48 WorkflowMachine::<state::StartPending>::new_from_run(machine_run)
49 .abort(&mut *pool.acquire().await?)
50 .await?;
51 }
52 RunStatus::Running => {
53 WorkflowMachine::<state::Running>::new_from_run(machine_run)
54 .abort(&mut *pool.acquire().await?)
55 .await?;
56 }
57 _ => {} };
59
60 let steps: Vec<StepInstance> = crate::db::get_step_instances_by_run_id(&pool, run_id).await?;
62
63 let mut tx = pool.begin().await?;
64 for step in steps {
65 match step.status {
66 StepStatus::Pending => {
67 crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(step)
68 .fail("Workflow timed out".to_string(), None, &mut *tx)
69 .await?;
70 }
71 StepStatus::UnpackingSfs => {
72 crate::step_machine::StepMachine::<crate::step_machine::state::UnpackingSfs>::from_instance(step)
73 .fail("Workflow timed out".to_string(), None, &mut *tx)
74 .await?;
75 }
76 StepStatus::Running => {
77 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(step)
78 .fail("Workflow timed out".to_string(), None, &mut *tx)
79 .await?;
80 }
81 StepStatus::PackingSfs => {
82 crate::step_machine::StepMachine::<crate::step_machine::state::PackingSfs>::from_instance(step)
83 .fail("Workflow timed out".to_string(), None, &mut *tx)
84 .await?;
85 }
86 StepStatus::WaitingForEvent => {
87 crate::step_machine::StepMachine::<crate::step_machine::state::WaitingForEvent>::from_instance(step)
88 .fail("Workflow timed out".to_string(), None, &mut *tx)
89 .await?;
90 }
91 _ => {}
92 }
93 }
94 tx.commit().await?;
95
96 let event = WorkflowAbortedEvent {
98 run_id,
99 event_type: "workflow_aborted".to_string(),
100 timestamp: Utc::now(),
101 };
102 let js = async_nats::jetstream::new(nats_client);
103 stormchaser_model::nats::publish_cloudevent(
104 &js,
105 "stormchaser.v1.run.aborted",
106 "stormchaser.v1.run.aborted",
107 "/stormchaser",
108 serde_json::to_value(event).unwrap(),
109 Some("1.0"),
110 None,
111 )
112 .await?;
113
114 archive_workflow(run_id, pool).await?;
116
117 Ok(())
118}