Skip to main content

stormchaser_engine/handler/
workflow.rs

1#![allow(clippy::explicit_auto_deref)]
2use super::{
3    archive_workflow, dispatch_pending_steps, fetch_inputs, fetch_run, fetch_run_context,
4    schedule_step,
5};
6use crate::git_cache::GitCache;
7use crate::workflow_machine::{state, WorkflowMachine};
8use anyhow::{Context, Result};
9use chrono::Utc;
10use serde_json::Value;
11use sqlx::PgPool;
12use std::collections::HashSet;
13use std::fs;
14use std::sync::Arc;
15use stormchaser_dsl::ast::Workflow;
16use stormchaser_dsl::StormchaserParser;
17use stormchaser_model::auth::{EngineOpaContext, OpaClient};
18use stormchaser_model::workflow::{RunStatus, WorkflowRun};
19use stormchaser_tls::TlsReloader;
20use tracing::{debug, error, info};
21use uuid::Uuid;
22
23use stormchaser_dsl::ast;
24
25#[tracing::instrument(skip(pool, nats_client, _tls_reloader), fields(run_id = %run_id))]
26/// Handle workflow timeout.
27pub async fn handle_workflow_timeout(
28    run_id: Uuid,
29    pool: PgPool,
30    nats_client: async_nats::Client,
31    _tls_reloader: Arc<TlsReloader>,
32) -> Result<()> {
33    info!("Workflow {} timed out, aborting", run_id);
34
35    let run = fetch_run(run_id, &pool).await?;
36    if matches!(
37        run.status,
38        RunStatus::Succeeded | RunStatus::Failed | RunStatus::Aborted
39    ) {
40        return Ok(());
41    }
42
43    // 1. Mark Workflow as Aborted
44    let mut machine_run = run.clone();
45    machine_run.error = Some("Workflow timed out".to_string());
46
47    match run.status {
48        RunStatus::Queued => {
49            WorkflowMachine::<state::Queued>::new_from_run(machine_run)
50                .abort(&mut *pool.acquire().await?)
51                .await?;
52        }
53        RunStatus::Resolving => {
54            WorkflowMachine::<state::Resolving>::new_from_run(machine_run)
55                .abort(&mut *pool.acquire().await?)
56                .await?;
57        }
58        RunStatus::StartPending => {
59            WorkflowMachine::<state::StartPending>::new_from_run(machine_run)
60                .abort(&mut *pool.acquire().await?)
61                .await?;
62        }
63        RunStatus::Running => {
64            WorkflowMachine::<state::Running>::new_from_run(machine_run)
65                .abort(&mut *pool.acquire().await?)
66                .await?;
67        }
68        _ => {} // Should not happen due to check above
69    };
70
71    // 2. Mark all non-terminal steps as failed
72    crate::db::fail_pending_steps_for_run_on_timeout(&pool, run_id).await?;
73
74    // 3. Publish abort event
75    let event = serde_json::json!({
76        "run_id": run_id,
77        "event_type": "workflow_aborted",
78        "reason": "timeout",
79        "timestamp": Utc::now(),
80    });
81    let js = async_nats::jetstream::new(nats_client);
82    js.publish("stormchaser.run.aborted", event.to_string().into())
83        .await?;
84
85    // 4. Archive
86    archive_workflow(run_id, pool).await?;
87
88    Ok(())
89}
90
91#[tracing::instrument(skip(pool, nats_client, tls_reloader), fields(run_id = %run_id))]
92/// Handle workflow start pending.
93pub async fn handle_workflow_start_pending(
94    run_id: Uuid,
95    pool: PgPool,
96    nats_client: async_nats::Client,
97    tls_reloader: Arc<TlsReloader>,
98) -> Result<()> {
99    info!("Handling start_pending workflow run: {}", run_id);
100
101    let mut tx = pool.begin().await?;
102
103    // Lock the workflow run to serialize state evaluations
104    if crate::db::lock_workflow_run(&mut *tx, run_id)
105        .await?
106        .is_none()
107    {
108        debug!(
109            "Workflow run {} already archived or missing, skipping handler",
110            run_id
111        );
112        return Ok(());
113    }
114
115    // 1. Fetch WorkflowRun and RunContext
116    let run = fetch_run(run_id, &mut *tx).await?;
117    let context = fetch_run_context(run_id, &mut *tx).await?;
118
119    // 2. Parse AST from context
120    let workflow: Workflow = serde_json::from_value(context.workflow_definition)
121        .context("Failed to parse workflow definition from DB")?;
122
123    // 3. Identify starting steps (steps that are not in anyone's 'next' list)
124    let all_next_steps: HashSet<String> = workflow
125        .steps
126        .iter()
127        .flat_map(|s| s.next.iter().cloned())
128        .collect();
129
130    let initial_steps: Vec<&ast::Step> = workflow
131        .steps
132        .iter()
133        .filter(|s| !all_next_steps.contains(&s.name))
134        .collect();
135
136    if initial_steps.is_empty() && !workflow.steps.is_empty() {
137        error!("Workflow {} for run {} has steps but no initial steps found (cycle or misconfiguration)", workflow.name, run_id);
138        return Err(anyhow::anyhow!("No initial steps found in workflow"));
139    }
140
141    // 4. Create CelContext for resolving expressions
142    let hcl_ctx =
143        crate::hcl_eval::create_context(context.inputs.clone(), run_id, serde_json::json!({}));
144
145    // 5. Create StepInstances for initial steps and schedule them
146    for step_dsl in initial_steps {
147        #[allow(clippy::explicit_auto_deref)]
148        schedule_step(
149            run_id,
150            step_dsl,
151            &mut *tx,
152            nats_client.clone(),
153            &hcl_ctx,
154            pool.clone(),
155            &workflow,
156        )
157        .await?;
158    }
159
160    // 5. Transition Workflow to Running
161    let machine = WorkflowMachine::<state::StartPending>::new_from_run(run.clone());
162    let _ = machine.start(&mut *tx).await?;
163
164    crate::RUNS_STARTED.add(
165        1,
166        &[
167            opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
168            opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
169        ],
170    );
171
172    tx.commit().await?;
173
174    if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
175        error!(
176            "Failed to dispatch pending steps for run {}: {:?}",
177            run_id, e
178        );
179    }
180
181    info!("Transitioned run {} to Running", run_id);
182
183    Ok(())
184}
185
186#[tracing::instrument(skip(payload, pool, opa_client, nats_client), fields(run_id = tracing::field::Empty))]
187/// Handle workflow direct.
188pub async fn handle_workflow_direct(
189    payload: Value,
190    pool: PgPool,
191    opa_client: Arc<OpaClient>,
192    nats_client: async_nats::Client,
193) -> Result<()> {
194    let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
195    let run_id = Uuid::parse_str(run_id_str)?;
196    tracing::Span::current().record("run_id", tracing::field::display(run_id));
197    let workflow_content = payload["dsl"].as_str().context("Missing dsl content")?;
198    let initiating_user = payload["initiating_user"]
199        .as_str()
200        .unwrap_or("system")
201        .to_string();
202    let inputs = payload["inputs"].clone();
203
204    info!("Handling direct one-off workflow run: {}", run_id);
205
206    // 1. Parse the DSL content directly
207    let parser = StormchaserParser::new();
208    let parsed_workflow = match parser.parse(workflow_content) {
209        Ok(w) => w,
210        Err(e) => {
211            error!("Direct workflow parsing failed for {}: {}", run_id, e);
212            return Err(e);
213        }
214    };
215
216    // 2. OPA Policy Check
217    let opa_context = EngineOpaContext {
218        run_id,
219        initiating_user: initiating_user.clone(),
220        workflow_ast: serde_json::to_value(&parsed_workflow)?,
221        inputs: inputs.clone(),
222    };
223
224    match opa_client.check_context(opa_context).await {
225        Ok(true) => debug!("OPA allowed execution for direct run {}", run_id),
226        Ok(false) => {
227            let err_msg = "Execution denied by OPA policy".to_string();
228            info!("Direct run {}: {}", run_id, err_msg);
229            return Err(anyhow::anyhow!(err_msg));
230        }
231        Err(e) => {
232            let err_msg = format!("OPA check failed: {}", e);
233            error!("Direct run {}: {}", run_id, err_msg);
234            return Err(anyhow::anyhow!(err_msg));
235        }
236    }
237
238    // 3. Create WorkflowRun and RunContext in DB
239    // Direct runs have no repo_url or workflow_path in the traditional sense
240    let run = WorkflowRun {
241        id: run_id,
242        workflow_name: parsed_workflow.name.clone(),
243        initiating_user: initiating_user.clone(),
244        repo_url: "direct://".to_string(),
245        workflow_path: "inline.storm".to_string(),
246        git_ref: "HEAD".to_string(),
247        status: RunStatus::StartPending,
248        version: 1,
249        fencing_token: Utc::now().timestamp_nanos_opt().unwrap_or(0),
250        created_at: Utc::now(),
251        updated_at: Utc::now(),
252        started_resolving_at: Some(Utc::now()),
253        started_at: None,
254        finished_at: None,
255        error: None,
256    };
257
258    let mut tx = pool.begin().await?;
259
260    crate::db::insert_full_workflow_run(
261        &mut *tx,
262        &run,
263        &parsed_workflow.dsl_version,
264        serde_json::to_value(&parsed_workflow)?,
265        Some(workflow_content),
266        inputs,
267        10,
268        "1",
269        "4Gi",
270        "10Gi",
271        "1h",
272    )
273    .await?;
274
275    tx.commit().await?;
276
277    // 4. Emit event for transition to StartPending
278    let event = serde_json::json!({
279        "run_id": run_id,
280        "event_type": "workflow_start_pending",
281        "timestamp": Utc::now(),
282    });
283    let js = async_nats::jetstream::new(nats_client);
284    js.publish("stormchaser.run.start_pending", event.to_string().into())
285        .await
286        .with_context(|| {
287            format!(
288                "Failed to publish start_pending event for direct run {}",
289                run_id
290            )
291        })?;
292
293    info!(
294        "Successfully initialized direct one-off workflow run {}",
295        run_id
296    );
297
298    Ok(())
299}
300
301#[tracing::instrument(skip(pool, git_cache, opa_client, nats_client, _tls_reloader), fields(run_id = %run_id))]
302/// Handles the event when a workflow is queued and ready for resolution.
303pub async fn handle_workflow_queued(
304    run_id: Uuid,
305    pool: PgPool,
306    git_cache: Arc<GitCache>,
307    opa_client: Arc<OpaClient>,
308    nats_client: async_nats::Client,
309    _tls_reloader: Arc<TlsReloader>,
310) -> Result<()> {
311    info!("Handling queued workflow run: {}", run_id);
312
313    // 1. Fetch WorkflowRun from DB
314    let run = fetch_run(run_id, &pool).await?;
315    let machine = WorkflowMachine::<state::Queued>::new(run);
316
317    // 2. Transition to Resolving
318    let machine = machine.start_resolving(&mut *pool.acquire().await?).await?;
319
320    // 3. Resolve the workflow file into cache
321    let repo_path_res = git_cache.ensure_files(
322        &machine.run.repo_url,
323        &machine.run.git_ref,
324        std::slice::from_ref(&machine.run.workflow_path),
325    );
326
327    let repo_path = match repo_path_res {
328        Ok(path) => path,
329        Err(e) => {
330            let _ = machine
331                .fail(
332                    format!("Git resolution failed: {}", e),
333                    &mut *pool.acquire().await?,
334                )
335                .await?;
336            return Err(e);
337        }
338    };
339
340    let storm_file_path = repo_path.join(&machine.run.workflow_path);
341    if !storm_file_path.exists() {
342        let err_msg = format!(
343            "Workflow file {} not found in repo",
344            machine.run.workflow_path
345        );
346        let _ = machine
347            .fail(err_msg.clone(), &mut *pool.acquire().await?)
348            .await?;
349        return Err(anyhow::anyhow!(err_msg));
350    }
351
352    // 4. Read and Parse the workflow file
353    let workflow_content = fs::read_to_string(&storm_file_path)
354        .with_context(|| format!("Failed to read workflow file at {:?}", storm_file_path))?;
355
356    debug!(
357        "Workflow content loaded for {}: {} bytes",
358        run_id,
359        workflow_content.len()
360    );
361
362    let parser = StormchaserParser::new();
363    let mut parsed_workflow = match parser.parse(&workflow_content) {
364        Ok(w) => w,
365        Err(e) => {
366            let _ = machine
367                .fail(
368                    format!("Workflow parsing failed: {}", e),
369                    &mut *pool.acquire().await?,
370                )
371                .await?;
372            return Err(e);
373        }
374    };
375
376    // 4.5 Resolve includes inline
377    let mut resolved_includes = std::collections::HashSet::new();
378    let mut includes_to_process = parsed_workflow.includes.clone();
379    parsed_workflow.includes.clear(); // We will inline them, so remove from AST
380
381    while let Some(inc) = includes_to_process.pop() {
382        if !resolved_includes.insert(inc.workflow.clone()) {
383            continue; // Prevent infinite loops
384        }
385
386        let inc_path = repo_path.join(&inc.workflow);
387        if !inc_path.exists() {
388            let err_msg = format!("Included workflow file {} not found", inc.workflow);
389            let _ = machine
390                .fail(err_msg.clone(), &mut *pool.acquire().await?)
391                .await?;
392            return Err(anyhow::anyhow!(err_msg));
393        }
394
395        let inc_content = fs::read_to_string(&inc_path)?;
396        let inc_workflow = match parser.parse(&inc_content) {
397            Ok(w) => w,
398            Err(e) => {
399                let err_msg = format!("Included workflow {} parsing failed: {}", inc.workflow, e);
400                let _ = machine
401                    .fail(err_msg.clone(), &mut *pool.acquire().await?)
402                    .await?;
403                return Err(anyhow::anyhow!(err_msg));
404            }
405        };
406
407        // Inline step libraries
408        parsed_workflow
409            .step_libraries
410            .extend(inc_workflow.step_libraries);
411
412        // Inline steps, applying the include prefix to avoid name collisions and substituting inputs
413        let prefix = format!("{}.", inc.name);
414        for mut step in inc_workflow.steps {
415            step.name = format!("{}{}", prefix, step.name);
416
417            // Update next pointers
418            for next_ref in &mut step.next {
419                *next_ref = format!("{}{}", prefix, next_ref);
420            }
421
422            // Very naive input substitution using HCL templates/variables
423            // In a real engine, we'd use hcl_eval, but for AST merging we can inject param overrides
424            for (k, v) in &inc.inputs {
425                let var_pattern = format!("inputs.{}", k);
426                let val_str = v.to_string();
427
428                for param_val in step.params.values_mut() {
429                    *param_val = param_val.replace(&var_pattern, &val_str);
430                }
431            }
432
433            parsed_workflow.steps.push(step);
434        }
435
436        includes_to_process.extend(inc_workflow.includes);
437    }
438
439    // 5. OPA Policy Check after Parsing
440    // We send the full context: AST, user, and inputs
441    let inputs = fetch_inputs(run_id, &pool).await?;
442
443    let opa_context = EngineOpaContext {
444        run_id,
445        initiating_user: machine.run.initiating_user.clone(),
446        workflow_ast: serde_json::to_value(&parsed_workflow)?,
447        inputs,
448    };
449
450    match opa_client.check_context(opa_context).await {
451        Ok(true) => debug!("OPA allowed execution for run {}", run_id),
452        Ok(false) => {
453            let err_msg = "Execution denied by OPA policy".to_string();
454            info!("Run {}: {}", run_id, err_msg);
455            let _ = machine.fail(err_msg, &mut *pool.acquire().await?).await?;
456            return Ok(()); // Handled failure
457        }
458        Err(e) => {
459            let err_msg = format!("OPA check failed: {}", e);
460            error!("Run {}: {}", run_id, err_msg);
461            let _ = machine.fail(err_msg, &mut *pool.acquire().await?).await?;
462            return Err(e);
463        }
464    }
465
466    // 6. Update RunContext with the definition and source code
467    crate::db::update_run_context(
468        &pool,
469        serde_json::to_value(&parsed_workflow)?,
470        Some(&workflow_content).map(|s| s.as_str()),
471        &parsed_workflow.dsl_version,
472        run_id,
473    )
474    .await
475    .with_context(|| format!("Failed to update run context for {}", run_id))?;
476
477    // 7. Transition to StartPending
478    let machine = machine.start_pending(&mut *pool.acquire().await?).await?;
479
480    // Emit event for transition to StartPending
481    let event = serde_json::json!({
482        "run_id": run_id,
483        "event_type": "workflow_start_pending",
484        "timestamp": chrono::Utc::now(),
485    });
486    let js = async_nats::jetstream::new(nats_client);
487    js.publish("stormchaser.run.start_pending", event.to_string().into())
488        .await
489        .with_context(|| format!("Failed to publish start_pending event for {}", run_id))?;
490
491    // 8. Transition to Running
492    let _ = machine.start(&mut *pool.acquire().await?).await?;
493
494    info!(
495        "Successfully resolved and parsed workflow file, started run {}",
496        run_id
497    );
498
499    Ok(())
500}