stormchaser_engine/handler/workflow/
start_pending.rs1use crate::handler::{dispatch_pending_steps, fetch_run, fetch_run_context, schedule_step};
2use crate::workflow_machine::{state, WorkflowMachine};
3use anyhow::{Context, Result};
4use sqlx::PgPool;
5use std::collections::HashSet;
6use std::sync::Arc;
7use stormchaser_dsl::ast;
8use stormchaser_dsl::ast::Workflow;
9use stormchaser_model::events::WorkflowRunningEvent;
10use stormchaser_model::RunId;
11use stormchaser_tls::TlsReloader;
12use tracing::{debug, error, info};
13
14#[tracing::instrument(skip(pool, nats_client, tls_reloader), fields(run_id = %run_id))]
15pub async fn handle_workflow_start_pending(
17 run_id: RunId,
18 pool: PgPool,
19 nats_client: async_nats::Client,
20 tls_reloader: Arc<TlsReloader>,
21) -> Result<()> {
22 info!("Handling start_pending workflow run: {}", run_id);
23
24 let mut tx = pool.begin().await?;
25
26 if crate::db::lock_workflow_run(&mut *tx, run_id)
28 .await?
29 .is_none()
30 {
31 debug!(
32 "Workflow run {} already archived or missing, skipping handler",
33 run_id
34 );
35 return Ok(());
36 }
37
38 let run = fetch_run(run_id, &mut *tx).await?;
40 let context = fetch_run_context(run_id, &mut *tx).await?;
41
42 let workflow: Workflow = serde_json::from_value(context.workflow_definition)
44 .context("Failed to parse workflow definition from DB")?;
45
46 let all_next_steps: HashSet<String> = workflow
48 .steps
49 .iter()
50 .flat_map(|s| s.next.iter().cloned())
51 .collect();
52
53 let initial_steps: Vec<&ast::Step> = workflow
54 .steps
55 .iter()
56 .filter(|s| !all_next_steps.contains(&s.name))
57 .collect();
58
59 if initial_steps.is_empty() && !workflow.steps.is_empty() {
60 error!("Workflow {} for run {} has steps but no initial steps found (cycle or misconfiguration)", workflow.name, run_id);
61 return Err(anyhow::anyhow!("No initial steps found in workflow"));
62 }
63
64 let hcl_ctx =
66 crate::hcl_eval::create_context(context.inputs.clone(), run_id, serde_json::json!({}));
67
68 for step_dsl in initial_steps {
70 #[allow(clippy::explicit_auto_deref)]
71 schedule_step(
72 run_id,
73 step_dsl,
74 &mut *tx,
75 nats_client.clone(),
76 &hcl_ctx,
77 pool.clone(),
78 &workflow,
79 )
80 .await?;
81 }
82
83 let machine = WorkflowMachine::<state::StartPending>::new_from_run(run.clone());
85 let _ = machine.start(&mut *tx).await?;
86
87 let js = async_nats::jetstream::new(nats_client.clone());
88 if let Err(e) = stormchaser_model::nats::publish_cloudevent(
89 &js,
90 "stormchaser.v1.run.running",
91 "workflow_running",
92 "stormchaser-engine",
93 serde_json::to_value(WorkflowRunningEvent {
94 run_id,
95 event_type: "workflow_running".to_string(),
96 timestamp: chrono::Utc::now(),
97 })
98 .unwrap(),
99 None,
100 None,
101 )
102 .await
103 {
104 error!(
105 "Failed to publish workflow running event for {}: {:?}",
106 run_id, e
107 );
108 }
109
110 crate::RUNS_STARTED.add(
111 1,
112 &[
113 opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
114 opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
115 ],
116 );
117
118 tx.commit().await?;
119
120 if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
121 error!(
122 "Failed to dispatch pending steps for run {}: {:?}",
123 run_id, e
124 );
125 }
126
127 info!("Transitioned run {} to Running", run_id);
128
129 Ok(())
130}