use crate::handler::{archive_workflow, fetch_run};
use crate::workflow_machine::{state, WorkflowMachine};
use anyhow::Result;
use chrono::Utc;
use sqlx::PgPool;
use std::sync::Arc;
use stormchaser_model::events::WorkflowAbortedEvent;
use stormchaser_model::step::{StepInstance, StepStatus};
use stormchaser_model::workflow::RunStatus;
use stormchaser_model::RunId;
use stormchaser_tls::TlsReloader;
use tracing::info;
#[tracing::instrument(skip(pool, nats_client, _tls_reloader), fields(run_id = %run_id))]
pub async fn handle_workflow_timeout(
run_id: RunId,
pool: PgPool,
nats_client: async_nats::Client,
_tls_reloader: Arc<TlsReloader>,
) -> Result<()> {
info!("Workflow {} timed out, aborting", run_id);
let run = fetch_run(run_id, &pool).await?;
if matches!(
run.status,
RunStatus::Succeeded | RunStatus::Failed | RunStatus::Aborted
) {
return Ok(());
}
let mut machine_run = run.clone();
machine_run.error = Some("Workflow timed out".to_string());
match run.status {
RunStatus::Queued => {
WorkflowMachine::<state::Queued>::new_from_run(machine_run)
.abort(&mut *pool.acquire().await?)
.await?;
}
RunStatus::Resolving => {
WorkflowMachine::<state::Resolving>::new_from_run(machine_run)
.abort(&mut *pool.acquire().await?)
.await?;
}
RunStatus::StartPending => {
WorkflowMachine::<state::StartPending>::new_from_run(machine_run)
.abort(&mut *pool.acquire().await?)
.await?;
}
RunStatus::Running => {
WorkflowMachine::<state::Running>::new_from_run(machine_run)
.abort(&mut *pool.acquire().await?)
.await?;
}
_ => {} };
let steps: Vec<StepInstance> = crate::db::get_step_instances_by_run_id(&pool, run_id).await?;
let mut tx = pool.begin().await?;
for step in steps {
match step.status {
StepStatus::Pending => {
crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(step)
.fail("Workflow timed out".to_string(), None, &mut *tx)
.await?;
}
StepStatus::UnpackingSfs => {
crate::step_machine::StepMachine::<crate::step_machine::state::UnpackingSfs>::from_instance(step)
.fail("Workflow timed out".to_string(), None, &mut *tx)
.await?;
}
StepStatus::Running => {
crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(step)
.fail("Workflow timed out".to_string(), None, &mut *tx)
.await?;
}
StepStatus::PackingSfs => {
crate::step_machine::StepMachine::<crate::step_machine::state::PackingSfs>::from_instance(step)
.fail("Workflow timed out".to_string(), None, &mut *tx)
.await?;
}
StepStatus::WaitingForEvent => {
crate::step_machine::StepMachine::<crate::step_machine::state::WaitingForEvent>::from_instance(step)
.fail("Workflow timed out".to_string(), None, &mut *tx)
.await?;
}
_ => {}
}
}
tx.commit().await?;
let event = WorkflowAbortedEvent {
run_id,
event_type: "workflow_aborted".to_string(),
timestamp: Utc::now(),
};
let js = async_nats::jetstream::new(nats_client);
stormchaser_model::nats::publish_cloudevent(
&js,
"stormchaser.v1.run.aborted",
"stormchaser.v1.run.aborted",
"/stormchaser",
serde_json::to_value(event).unwrap(),
Some("1.0"),
None,
)
.await?;
archive_workflow(run_id, pool).await?;
Ok(())
}