use crate::handler::{dispatch_pending_steps, fetch_run, fetch_run_context, schedule_step};
use crate::workflow_machine::{state, WorkflowMachine};
use anyhow::{Context, Result};
use sqlx::PgPool;
use std::collections::HashSet;
use std::sync::Arc;
use stormchaser_dsl::ast;
use stormchaser_dsl::ast::Workflow;
use stormchaser_model::events::WorkflowRunningEvent;
use stormchaser_model::RunId;
use stormchaser_tls::TlsReloader;
use tracing::{debug, error, info};
#[tracing::instrument(skip(pool, nats_client, tls_reloader), fields(run_id = %run_id))]
pub async fn handle_workflow_start_pending(
run_id: RunId,
pool: PgPool,
nats_client: async_nats::Client,
tls_reloader: Arc<TlsReloader>,
) -> Result<()> {
info!("Handling start_pending workflow run: {}", run_id);
let mut tx = pool.begin().await?;
if crate::db::lock_workflow_run(&mut *tx, run_id)
.await?
.is_none()
{
debug!(
"Workflow run {} already archived or missing, skipping handler",
run_id
);
return Ok(());
}
let run = fetch_run(run_id, &mut *tx).await?;
let context = fetch_run_context(run_id, &mut *tx).await?;
let workflow: Workflow = serde_json::from_value(context.workflow_definition)
.context("Failed to parse workflow definition from DB")?;
let all_next_steps: HashSet<String> = workflow
.steps
.iter()
.flat_map(|s| s.next.iter().cloned())
.collect();
let initial_steps: Vec<&ast::Step> = workflow
.steps
.iter()
.filter(|s| !all_next_steps.contains(&s.name))
.collect();
if initial_steps.is_empty() && !workflow.steps.is_empty() {
error!("Workflow {} for run {} has steps but no initial steps found (cycle or misconfiguration)", workflow.name, run_id);
return Err(anyhow::anyhow!("No initial steps found in workflow"));
}
let hcl_ctx =
crate::hcl_eval::create_context(context.inputs.clone(), run_id, serde_json::json!({}));
for step_dsl in initial_steps {
#[allow(clippy::explicit_auto_deref)]
schedule_step(
run_id,
step_dsl,
&mut *tx,
nats_client.clone(),
&hcl_ctx,
pool.clone(),
&workflow,
)
.await?;
}
let machine = WorkflowMachine::<state::StartPending>::new_from_run(run.clone());
let _ = machine.start(&mut *tx).await?;
let js = async_nats::jetstream::new(nats_client.clone());
if let Err(e) = stormchaser_model::nats::publish_cloudevent(
&js,
"stormchaser.v1.run.running",
"workflow_running",
"stormchaser-engine",
serde_json::to_value(WorkflowRunningEvent {
run_id,
event_type: "workflow_running".to_string(),
timestamp: chrono::Utc::now(),
})
.unwrap(),
None,
None,
)
.await
{
error!(
"Failed to publish workflow running event for {}: {:?}",
run_id, e
);
}
crate::RUNS_STARTED.add(
1,
&[
opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
],
);
tx.commit().await?;
if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
error!(
"Failed to dispatch pending steps for run {}: {:?}",
run_id, e
);
}
info!("Transitioned run {} to Running", run_id);
Ok(())
}