Skip to main content

stormchaser_engine/handler/workflow/
start_pending.rs

1use crate::handler::{dispatch_pending_steps, fetch_run, fetch_run_context, schedule_step};
2use crate::workflow_machine::{state, WorkflowMachine};
3use anyhow::{Context, Result};
4use sqlx::PgPool;
5use std::collections::HashSet;
6use std::sync::Arc;
7use stormchaser_dsl::ast;
8use stormchaser_dsl::ast::Workflow;
9use stormchaser_model::events::WorkflowRunningEvent;
10use stormchaser_model::events::{EventSource, EventType, WorkflowEventType};
11use stormchaser_model::RunId;
12use stormchaser_tls::TlsReloader;
13use tracing::{debug, error, info};
14
15#[tracing::instrument(skip(pool, nats_client, tls_reloader), fields(run_id = %run_id))]
16/// Handle workflow start pending.
17pub async fn handle_workflow_start_pending(
18    run_id: RunId,
19    pool: PgPool,
20    nats_client: async_nats::Client,
21    tls_reloader: Arc<TlsReloader>,
22) -> Result<()> {
23    info!("Handling start_pending workflow run: {}", run_id);
24
25    let mut tx = pool.begin().await?;
26
27    // Lock the workflow run to serialize state evaluations
28    if crate::db::lock_workflow_run(&mut *tx, run_id)
29        .await?
30        .is_none()
31    {
32        debug!(
33            "Workflow run {} already archived or missing, skipping handler",
34            run_id
35        );
36        return Ok(());
37    }
38
39    // 1. Fetch WorkflowRun and RunContext
40    let run = fetch_run(run_id, &mut *tx).await?;
41    let context = fetch_run_context(run_id, &mut *tx).await?;
42
43    // 2. Parse AST from context
44    let workflow: Workflow = serde_json::from_value(context.workflow_definition)
45        .context("Failed to parse workflow definition from DB")?;
46
47    // 3. Identify starting steps (steps that are not in anyone's 'next' list)
48    let all_next_steps: HashSet<String> = workflow
49        .steps
50        .iter()
51        .flat_map(|s| s.next.iter().cloned())
52        .collect();
53
54    let initial_steps: Vec<&ast::Step> = workflow
55        .steps
56        .iter()
57        .filter(|s| !all_next_steps.contains(&s.name))
58        .collect();
59
60    if initial_steps.is_empty() && !workflow.steps.is_empty() {
61        error!("Workflow {} for run {} has steps but no initial steps found (cycle or misconfiguration)", workflow.name, run_id);
62        return Err(anyhow::anyhow!("No initial steps found in workflow"));
63    }
64
65    // 4. Create CelContext for resolving expressions
66    let hcl_ctx =
67        crate::hcl_eval::create_context(context.inputs.clone(), run_id, serde_json::json!({}));
68
69    // 5. Create StepInstances for initial steps and schedule them
70    for step_dsl in initial_steps {
71        #[allow(clippy::explicit_auto_deref)]
72        schedule_step(
73            run_id,
74            step_dsl,
75            &mut *tx,
76            nats_client.clone(),
77            &hcl_ctx,
78            pool.clone(),
79            &workflow,
80        )
81        .await?;
82    }
83
84    // 5. Transition Workflow to Running
85    let machine = WorkflowMachine::<state::StartPending>::new_from_run(run.clone());
86    let _ = machine.start(&mut *tx).await?;
87
88    let js = async_nats::jetstream::new(nats_client.clone());
89    use stormchaser_model::nats::NatsSubject;
90    if let Err(e) = stormchaser_model::nats::publish_cloudevent(
91        &js,
92        NatsSubject::RunRunning,
93        EventType::Workflow(WorkflowEventType::Running),
94        EventSource::Engine,
95        serde_json::to_value(WorkflowRunningEvent {
96            run_id,
97            event_type: EventType::Workflow(WorkflowEventType::Running),
98            timestamp: chrono::Utc::now(),
99        })
100        .unwrap(),
101        None,
102        None,
103    )
104    .await
105    {
106        error!(
107            "Failed to publish workflow running event for {}: {:?}",
108            run_id, e
109        );
110    }
111
112    crate::RUNS_STARTED.add(
113        1,
114        &[
115            opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
116            opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
117        ],
118    );
119
120    tx.commit().await?;
121
122    if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
123        error!(
124            "Failed to dispatch pending steps for run {}: {:?}",
125            run_id, e
126        );
127    }
128
129    info!("Transitioned run {} to Running", run_id);
130
131    Ok(())
132}