stormchaser_engine/handler/workflow/
timeout.rs1use crate::handler::{archive_workflow, fetch_run};
2use crate::workflow_machine::{state, WorkflowMachine};
3use anyhow::Result;
4use chrono::Utc;
5use sqlx::PgPool;
6use std::sync::Arc;
7use stormchaser_model::events::WorkflowAbortedEvent;
8use stormchaser_model::events::{EventSource, EventType, SchemaVersion, WorkflowEventType};
9use stormchaser_model::step::{StepInstance, StepStatus};
10use stormchaser_model::workflow::RunStatus;
11use stormchaser_model::RunId;
12use stormchaser_tls::TlsReloader;
13use tracing::info;
14
15#[tracing::instrument(skip(pool, nats_client, _tls_reloader), fields(run_id = %run_id))]
16pub async fn handle_workflow_timeout(
18 run_id: RunId,
19 pool: PgPool,
20 nats_client: async_nats::Client,
21 _tls_reloader: Arc<TlsReloader>,
22) -> Result<()> {
23 info!("Workflow {} timed out, aborting", run_id);
24
25 let run = fetch_run(run_id, &pool).await?;
26 if matches!(
27 run.status,
28 RunStatus::Succeeded | RunStatus::Failed | RunStatus::Aborted
29 ) {
30 return Ok(());
31 }
32
33 let mut machine_run = run.clone();
35 machine_run.error = Some("Workflow timed out".to_string());
36
37 match run.status {
38 RunStatus::Queued => {
39 WorkflowMachine::<state::Queued>::new_from_run(machine_run)
40 .abort(&mut *pool.acquire().await?)
41 .await?;
42 }
43 RunStatus::Resolving => {
44 WorkflowMachine::<state::Resolving>::new_from_run(machine_run)
45 .abort(&mut *pool.acquire().await?)
46 .await?;
47 }
48 RunStatus::StartPending => {
49 WorkflowMachine::<state::StartPending>::new_from_run(machine_run)
50 .abort(&mut *pool.acquire().await?)
51 .await?;
52 }
53 RunStatus::Running => {
54 WorkflowMachine::<state::Running>::new_from_run(machine_run)
55 .abort(&mut *pool.acquire().await?)
56 .await?;
57 }
58 _ => {} };
60
61 let steps: Vec<StepInstance> = crate::db::get_step_instances_by_run_id(&pool, run_id).await?;
63
64 let mut tx = pool.begin().await?;
65 for step in steps {
66 match step.status {
67 StepStatus::Pending => {
68 crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(step)
69 .fail("Workflow timed out".to_string(), None, &mut *tx)
70 .await?;
71 }
72 StepStatus::UnpackingSfs => {
73 crate::step_machine::StepMachine::<crate::step_machine::state::UnpackingSfs>::from_instance(step)
74 .fail("Workflow timed out".to_string(), None, &mut *tx)
75 .await?;
76 }
77 StepStatus::Running => {
78 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(step)
79 .fail("Workflow timed out".to_string(), None, &mut *tx)
80 .await?;
81 }
82 StepStatus::PackingSfs => {
83 crate::step_machine::StepMachine::<crate::step_machine::state::PackingSfs>::from_instance(step)
84 .fail("Workflow timed out".to_string(), None, &mut *tx)
85 .await?;
86 }
87 StepStatus::WaitingForEvent => {
88 crate::step_machine::StepMachine::<crate::step_machine::state::WaitingForEvent>::from_instance(step)
89 .fail("Workflow timed out".to_string(), None, &mut *tx)
90 .await?;
91 }
92 _ => {}
93 }
94 }
95 tx.commit().await?;
96
97 let event = WorkflowAbortedEvent {
99 run_id,
100 event_type: EventType::Workflow(WorkflowEventType::Aborted),
101 timestamp: Utc::now(),
102 };
103 let js = async_nats::jetstream::new(nats_client);
104 use stormchaser_model::nats::NatsSubject;
105 stormchaser_model::nats::publish_cloudevent(
106 &js,
107 NatsSubject::RunAborted,
108 EventType::Workflow(WorkflowEventType::Aborted),
109 EventSource::System,
110 serde_json::to_value(event).unwrap(),
111 Some(SchemaVersion::new("1.0".to_string())),
112 None,
113 )
114 .await?;
115
116 archive_workflow(run_id, pool).await?;
118
119 Ok(())
120}