1#![allow(clippy::explicit_auto_deref)]
2use super::{
3 archive_workflow, dispatch_pending_steps, fetch_inputs, fetch_run, fetch_run_context,
4 schedule_step,
5};
6use crate::git_cache::GitCache;
7use crate::workflow_machine::{state, WorkflowMachine};
8use anyhow::{Context, Result};
9use chrono::Utc;
10use serde_json::Value;
11use sqlx::PgPool;
12use std::collections::HashSet;
13use std::fs;
14use std::sync::Arc;
15use stormchaser_dsl::ast::Workflow;
16use stormchaser_dsl::StormchaserParser;
17use stormchaser_model::auth::{EngineOpaContext, OpaClient};
18use stormchaser_model::workflow::{RunStatus, WorkflowRun};
19use stormchaser_tls::TlsReloader;
20use tracing::{debug, error, info};
21use uuid::Uuid;
22
23use stormchaser_dsl::ast;
24
25#[tracing::instrument(skip(pool, nats_client, _tls_reloader), fields(run_id = %run_id))]
26pub async fn handle_workflow_timeout(
28 run_id: Uuid,
29 pool: PgPool,
30 nats_client: async_nats::Client,
31 _tls_reloader: Arc<TlsReloader>,
32) -> Result<()> {
33 info!("Workflow {} timed out, aborting", run_id);
34
35 let run = fetch_run(run_id, &pool).await?;
36 if matches!(
37 run.status,
38 RunStatus::Succeeded | RunStatus::Failed | RunStatus::Aborted
39 ) {
40 return Ok(());
41 }
42
43 let mut machine_run = run.clone();
45 machine_run.error = Some("Workflow timed out".to_string());
46
47 match run.status {
48 RunStatus::Queued => {
49 WorkflowMachine::<state::Queued>::new_from_run(machine_run)
50 .abort(&mut *pool.acquire().await?)
51 .await?;
52 }
53 RunStatus::Resolving => {
54 WorkflowMachine::<state::Resolving>::new_from_run(machine_run)
55 .abort(&mut *pool.acquire().await?)
56 .await?;
57 }
58 RunStatus::StartPending => {
59 WorkflowMachine::<state::StartPending>::new_from_run(machine_run)
60 .abort(&mut *pool.acquire().await?)
61 .await?;
62 }
63 RunStatus::Running => {
64 WorkflowMachine::<state::Running>::new_from_run(machine_run)
65 .abort(&mut *pool.acquire().await?)
66 .await?;
67 }
68 _ => {} };
70
71 crate::db::fail_pending_steps_for_run_on_timeout(&pool, run_id).await?;
73
74 let event = serde_json::json!({
76 "run_id": run_id,
77 "event_type": "workflow_aborted",
78 "reason": "timeout",
79 "timestamp": Utc::now(),
80 });
81 let js = async_nats::jetstream::new(nats_client);
82 js.publish("stormchaser.run.aborted", event.to_string().into())
83 .await?;
84
85 archive_workflow(run_id, pool).await?;
87
88 Ok(())
89}
90
91#[tracing::instrument(skip(pool, nats_client, tls_reloader), fields(run_id = %run_id))]
92pub async fn handle_workflow_start_pending(
94 run_id: Uuid,
95 pool: PgPool,
96 nats_client: async_nats::Client,
97 tls_reloader: Arc<TlsReloader>,
98) -> Result<()> {
99 info!("Handling start_pending workflow run: {}", run_id);
100
101 let mut tx = pool.begin().await?;
102
103 if crate::db::lock_workflow_run(&mut *tx, run_id)
105 .await?
106 .is_none()
107 {
108 debug!(
109 "Workflow run {} already archived or missing, skipping handler",
110 run_id
111 );
112 return Ok(());
113 }
114
115 let run = fetch_run(run_id, &mut *tx).await?;
117 let context = fetch_run_context(run_id, &mut *tx).await?;
118
119 let workflow: Workflow = serde_json::from_value(context.workflow_definition)
121 .context("Failed to parse workflow definition from DB")?;
122
123 let all_next_steps: HashSet<String> = workflow
125 .steps
126 .iter()
127 .flat_map(|s| s.next.iter().cloned())
128 .collect();
129
130 let initial_steps: Vec<&ast::Step> = workflow
131 .steps
132 .iter()
133 .filter(|s| !all_next_steps.contains(&s.name))
134 .collect();
135
136 if initial_steps.is_empty() && !workflow.steps.is_empty() {
137 error!("Workflow {} for run {} has steps but no initial steps found (cycle or misconfiguration)", workflow.name, run_id);
138 return Err(anyhow::anyhow!("No initial steps found in workflow"));
139 }
140
141 let hcl_ctx =
143 crate::hcl_eval::create_context(context.inputs.clone(), run_id, serde_json::json!({}));
144
145 for step_dsl in initial_steps {
147 #[allow(clippy::explicit_auto_deref)]
148 schedule_step(
149 run_id,
150 step_dsl,
151 &mut *tx,
152 nats_client.clone(),
153 &hcl_ctx,
154 pool.clone(),
155 &workflow,
156 )
157 .await?;
158 }
159
160 let machine = WorkflowMachine::<state::StartPending>::new_from_run(run.clone());
162 let _ = machine.start(&mut *tx).await?;
163
164 crate::RUNS_STARTED.add(
165 1,
166 &[
167 opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
168 opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
169 ],
170 );
171
172 tx.commit().await?;
173
174 if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
175 error!(
176 "Failed to dispatch pending steps for run {}: {:?}",
177 run_id, e
178 );
179 }
180
181 info!("Transitioned run {} to Running", run_id);
182
183 Ok(())
184}
185
186#[tracing::instrument(skip(payload, pool, opa_client, nats_client), fields(run_id = tracing::field::Empty))]
187pub async fn handle_workflow_direct(
189 payload: Value,
190 pool: PgPool,
191 opa_client: Arc<OpaClient>,
192 nats_client: async_nats::Client,
193) -> Result<()> {
194 let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
195 let run_id = Uuid::parse_str(run_id_str)?;
196 tracing::Span::current().record("run_id", tracing::field::display(run_id));
197 let workflow_content = payload["dsl"].as_str().context("Missing dsl content")?;
198 let initiating_user = payload["initiating_user"]
199 .as_str()
200 .unwrap_or("system")
201 .to_string();
202 let inputs = payload["inputs"].clone();
203
204 info!("Handling direct one-off workflow run: {}", run_id);
205
206 let parser = StormchaserParser::new();
208 let parsed_workflow = match parser.parse(workflow_content) {
209 Ok(w) => w,
210 Err(e) => {
211 error!("Direct workflow parsing failed for {}: {}", run_id, e);
212 return Err(e);
213 }
214 };
215
216 let opa_context = EngineOpaContext {
218 run_id,
219 initiating_user: initiating_user.clone(),
220 workflow_ast: serde_json::to_value(&parsed_workflow)?,
221 inputs: inputs.clone(),
222 };
223
224 match opa_client.check_context(opa_context).await {
225 Ok(true) => debug!("OPA allowed execution for direct run {}", run_id),
226 Ok(false) => {
227 let err_msg = "Execution denied by OPA policy".to_string();
228 info!("Direct run {}: {}", run_id, err_msg);
229 return Err(anyhow::anyhow!(err_msg));
230 }
231 Err(e) => {
232 let err_msg = format!("OPA check failed: {}", e);
233 error!("Direct run {}: {}", run_id, err_msg);
234 return Err(anyhow::anyhow!(err_msg));
235 }
236 }
237
238 let run = WorkflowRun {
241 id: run_id,
242 workflow_name: parsed_workflow.name.clone(),
243 initiating_user: initiating_user.clone(),
244 repo_url: "direct://".to_string(),
245 workflow_path: "inline.storm".to_string(),
246 git_ref: "HEAD".to_string(),
247 status: RunStatus::StartPending,
248 version: 1,
249 fencing_token: Utc::now().timestamp_nanos_opt().unwrap_or(0),
250 created_at: Utc::now(),
251 updated_at: Utc::now(),
252 started_resolving_at: Some(Utc::now()),
253 started_at: None,
254 finished_at: None,
255 error: None,
256 };
257
258 let mut tx = pool.begin().await?;
259
260 crate::db::insert_full_workflow_run(
261 &mut *tx,
262 &run,
263 &parsed_workflow.dsl_version,
264 serde_json::to_value(&parsed_workflow)?,
265 Some(workflow_content),
266 inputs,
267 10,
268 "1",
269 "4Gi",
270 "10Gi",
271 "1h",
272 )
273 .await?;
274
275 tx.commit().await?;
276
277 let event = serde_json::json!({
279 "run_id": run_id,
280 "event_type": "workflow_start_pending",
281 "timestamp": Utc::now(),
282 });
283 let js = async_nats::jetstream::new(nats_client);
284 js.publish("stormchaser.run.start_pending", event.to_string().into())
285 .await
286 .with_context(|| {
287 format!(
288 "Failed to publish start_pending event for direct run {}",
289 run_id
290 )
291 })?;
292
293 info!(
294 "Successfully initialized direct one-off workflow run {}",
295 run_id
296 );
297
298 Ok(())
299}
300
301#[tracing::instrument(skip(pool, git_cache, opa_client, nats_client, _tls_reloader), fields(run_id = %run_id))]
302pub async fn handle_workflow_queued(
304 run_id: Uuid,
305 pool: PgPool,
306 git_cache: Arc<GitCache>,
307 opa_client: Arc<OpaClient>,
308 nats_client: async_nats::Client,
309 _tls_reloader: Arc<TlsReloader>,
310) -> Result<()> {
311 info!("Handling queued workflow run: {}", run_id);
312
313 let run = fetch_run(run_id, &pool).await?;
315 let machine = WorkflowMachine::<state::Queued>::new(run);
316
317 let machine = machine.start_resolving(&mut *pool.acquire().await?).await?;
319
320 let repo_path_res = git_cache.ensure_files(
322 &machine.run.repo_url,
323 &machine.run.git_ref,
324 std::slice::from_ref(&machine.run.workflow_path),
325 );
326
327 let repo_path = match repo_path_res {
328 Ok(path) => path,
329 Err(e) => {
330 let _ = machine
331 .fail(
332 format!("Git resolution failed: {}", e),
333 &mut *pool.acquire().await?,
334 )
335 .await?;
336 return Err(e);
337 }
338 };
339
340 let storm_file_path = repo_path.join(&machine.run.workflow_path);
341 if !storm_file_path.exists() {
342 let err_msg = format!(
343 "Workflow file {} not found in repo",
344 machine.run.workflow_path
345 );
346 let _ = machine
347 .fail(err_msg.clone(), &mut *pool.acquire().await?)
348 .await?;
349 return Err(anyhow::anyhow!(err_msg));
350 }
351
352 let workflow_content = fs::read_to_string(&storm_file_path)
354 .with_context(|| format!("Failed to read workflow file at {:?}", storm_file_path))?;
355
356 debug!(
357 "Workflow content loaded for {}: {} bytes",
358 run_id,
359 workflow_content.len()
360 );
361
362 let parser = StormchaserParser::new();
363 let mut parsed_workflow = match parser.parse(&workflow_content) {
364 Ok(w) => w,
365 Err(e) => {
366 let _ = machine
367 .fail(
368 format!("Workflow parsing failed: {}", e),
369 &mut *pool.acquire().await?,
370 )
371 .await?;
372 return Err(e);
373 }
374 };
375
376 let mut resolved_includes = std::collections::HashSet::new();
378 let mut includes_to_process = parsed_workflow.includes.clone();
379 parsed_workflow.includes.clear(); while let Some(inc) = includes_to_process.pop() {
382 if !resolved_includes.insert(inc.workflow.clone()) {
383 continue; }
385
386 let inc_path = repo_path.join(&inc.workflow);
387 if !inc_path.exists() {
388 let err_msg = format!("Included workflow file {} not found", inc.workflow);
389 let _ = machine
390 .fail(err_msg.clone(), &mut *pool.acquire().await?)
391 .await?;
392 return Err(anyhow::anyhow!(err_msg));
393 }
394
395 let inc_content = fs::read_to_string(&inc_path)?;
396 let inc_workflow = match parser.parse(&inc_content) {
397 Ok(w) => w,
398 Err(e) => {
399 let err_msg = format!("Included workflow {} parsing failed: {}", inc.workflow, e);
400 let _ = machine
401 .fail(err_msg.clone(), &mut *pool.acquire().await?)
402 .await?;
403 return Err(anyhow::anyhow!(err_msg));
404 }
405 };
406
407 parsed_workflow
409 .step_libraries
410 .extend(inc_workflow.step_libraries);
411
412 let prefix = format!("{}.", inc.name);
414 for mut step in inc_workflow.steps {
415 step.name = format!("{}{}", prefix, step.name);
416
417 for next_ref in &mut step.next {
419 *next_ref = format!("{}{}", prefix, next_ref);
420 }
421
422 for (k, v) in &inc.inputs {
425 let var_pattern = format!("inputs.{}", k);
426 let val_str = v.to_string();
427
428 for param_val in step.params.values_mut() {
429 *param_val = param_val.replace(&var_pattern, &val_str);
430 }
431 }
432
433 parsed_workflow.steps.push(step);
434 }
435
436 includes_to_process.extend(inc_workflow.includes);
437 }
438
439 let inputs = fetch_inputs(run_id, &pool).await?;
442
443 let opa_context = EngineOpaContext {
444 run_id,
445 initiating_user: machine.run.initiating_user.clone(),
446 workflow_ast: serde_json::to_value(&parsed_workflow)?,
447 inputs,
448 };
449
450 match opa_client.check_context(opa_context).await {
451 Ok(true) => debug!("OPA allowed execution for run {}", run_id),
452 Ok(false) => {
453 let err_msg = "Execution denied by OPA policy".to_string();
454 info!("Run {}: {}", run_id, err_msg);
455 let _ = machine.fail(err_msg, &mut *pool.acquire().await?).await?;
456 return Ok(()); }
458 Err(e) => {
459 let err_msg = format!("OPA check failed: {}", e);
460 error!("Run {}: {}", run_id, err_msg);
461 let _ = machine.fail(err_msg, &mut *pool.acquire().await?).await?;
462 return Err(e);
463 }
464 }
465
466 crate::db::update_run_context(
468 &pool,
469 serde_json::to_value(&parsed_workflow)?,
470 Some(&workflow_content).map(|s| s.as_str()),
471 &parsed_workflow.dsl_version,
472 run_id,
473 )
474 .await
475 .with_context(|| format!("Failed to update run context for {}", run_id))?;
476
477 let machine = machine.start_pending(&mut *pool.acquire().await?).await?;
479
480 let event = serde_json::json!({
482 "run_id": run_id,
483 "event_type": "workflow_start_pending",
484 "timestamp": chrono::Utc::now(),
485 });
486 let js = async_nats::jetstream::new(nats_client);
487 js.publish("stormchaser.run.start_pending", event.to_string().into())
488 .await
489 .with_context(|| format!("Failed to publish start_pending event for {}", run_id))?;
490
491 let _ = machine.start(&mut *pool.acquire().await?).await?;
493
494 info!(
495 "Successfully resolved and parsed workflow file, started run {}",
496 run_id
497 );
498
499 Ok(())
500}