Skip to main content

stormchaser_engine/handler/workflow/
start_pending.rs

1use crate::handler::{dispatch_pending_steps, fetch_run, fetch_run_context, schedule_step};
2use crate::workflow_machine::{state, WorkflowMachine};
3use anyhow::{Context, Result};
4use sqlx::PgPool;
5use std::collections::HashSet;
6use std::sync::Arc;
7use stormchaser_dsl::ast;
8use stormchaser_dsl::ast::Workflow;
9use stormchaser_model::events::WorkflowRunningEvent;
10use stormchaser_model::RunId;
11use stormchaser_tls::TlsReloader;
12use tracing::{debug, error, info};
13
14#[tracing::instrument(skip(pool, nats_client, tls_reloader), fields(run_id = %run_id))]
15/// Handle workflow start pending.
16pub async fn handle_workflow_start_pending(
17    run_id: RunId,
18    pool: PgPool,
19    nats_client: async_nats::Client,
20    tls_reloader: Arc<TlsReloader>,
21) -> Result<()> {
22    info!("Handling start_pending workflow run: {}", run_id);
23
24    let mut tx = pool.begin().await?;
25
26    // Lock the workflow run to serialize state evaluations
27    if crate::db::lock_workflow_run(&mut *tx, run_id)
28        .await?
29        .is_none()
30    {
31        debug!(
32            "Workflow run {} already archived or missing, skipping handler",
33            run_id
34        );
35        return Ok(());
36    }
37
38    // 1. Fetch WorkflowRun and RunContext
39    let run = fetch_run(run_id, &mut *tx).await?;
40    let context = fetch_run_context(run_id, &mut *tx).await?;
41
42    // 2. Parse AST from context
43    let workflow: Workflow = serde_json::from_value(context.workflow_definition)
44        .context("Failed to parse workflow definition from DB")?;
45
46    // 3. Identify starting steps (steps that are not in anyone's 'next' list)
47    let all_next_steps: HashSet<String> = workflow
48        .steps
49        .iter()
50        .flat_map(|s| s.next.iter().cloned())
51        .collect();
52
53    let initial_steps: Vec<&ast::Step> = workflow
54        .steps
55        .iter()
56        .filter(|s| !all_next_steps.contains(&s.name))
57        .collect();
58
59    if initial_steps.is_empty() && !workflow.steps.is_empty() {
60        error!("Workflow {} for run {} has steps but no initial steps found (cycle or misconfiguration)", workflow.name, run_id);
61        return Err(anyhow::anyhow!("No initial steps found in workflow"));
62    }
63
64    // 4. Create CelContext for resolving expressions
65    let hcl_ctx =
66        crate::hcl_eval::create_context(context.inputs.clone(), run_id, serde_json::json!({}));
67
68    // 5. Create StepInstances for initial steps and schedule them
69    for step_dsl in initial_steps {
70        #[allow(clippy::explicit_auto_deref)]
71        schedule_step(
72            run_id,
73            step_dsl,
74            &mut *tx,
75            nats_client.clone(),
76            &hcl_ctx,
77            pool.clone(),
78            &workflow,
79        )
80        .await?;
81    }
82
83    // 5. Transition Workflow to Running
84    let machine = WorkflowMachine::<state::StartPending>::new_from_run(run.clone());
85    let _ = machine.start(&mut *tx).await?;
86
87    let js = async_nats::jetstream::new(nats_client.clone());
88    if let Err(e) = stormchaser_model::nats::publish_cloudevent(
89        &js,
90        "stormchaser.v1.run.running",
91        "workflow_running",
92        "stormchaser-engine",
93        serde_json::to_value(WorkflowRunningEvent {
94            run_id,
95            event_type: "workflow_running".to_string(),
96            timestamp: chrono::Utc::now(),
97        })
98        .unwrap(),
99        None,
100        None,
101    )
102    .await
103    {
104        error!(
105            "Failed to publish workflow running event for {}: {:?}",
106            run_id, e
107        );
108    }
109
110    crate::RUNS_STARTED.add(
111        1,
112        &[
113            opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
114            opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
115        ],
116    );
117
118    tx.commit().await?;
119
120    if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
121        error!(
122            "Failed to dispatch pending steps for run {}: {:?}",
123            run_id, e
124        );
125    }
126
127    info!("Transitioned run {} to Running", run_id);
128
129    Ok(())
130}