Skip to main content

stormchaser_engine/handler/workflow/
queued.rs

1use crate::git_cache::GitCache;
2use crate::handler::{fetch_inputs, fetch_run};
3use crate::workflow_machine::{state, WorkflowMachine};
4use anyhow::{Context, Result};
5use sqlx::PgPool;
6use std::fs;
7use std::sync::Arc;
8use stormchaser_dsl::StormchaserParser;
9use stormchaser_model::auth::{EngineOpaContext, OpaClient};
10use stormchaser_model::events::WorkflowStartPendingEvent;
11use stormchaser_model::RunId;
12use stormchaser_tls::TlsReloader;
13use tracing::{debug, error, info};
14
15#[tracing::instrument(skip(pool, git_cache, opa_client, nats_client, _tls_reloader), fields(run_id = %run_id))]
16/// Handles the event when a workflow is queued and ready for resolution.
17pub async fn handle_workflow_queued(
18    run_id: RunId,
19    pool: PgPool,
20    git_cache: Arc<GitCache>,
21    opa_client: Arc<OpaClient>,
22    nats_client: async_nats::Client,
23    _tls_reloader: Arc<TlsReloader>,
24) -> Result<()> {
25    info!("Handling queued workflow run: {}", run_id);
26
27    // 1. Fetch WorkflowRun from DB
28    let run = fetch_run(run_id, &pool).await?;
29    let machine = WorkflowMachine::<state::Queued>::new(run);
30
31    // 2. Transition to Resolving
32    let machine = machine.start_resolving(&mut *pool.acquire().await?).await?;
33
34    // 3. Resolve the workflow file into cache
35    let repo_path_res = git_cache.ensure_files(
36        &machine.run.repo_url,
37        &machine.run.git_ref,
38        std::slice::from_ref(&machine.run.workflow_path),
39    );
40
41    let repo_path = match repo_path_res {
42        Ok(path) => path,
43        Err(e) => {
44            let _ = machine
45                .fail(
46                    format!("Git resolution failed: {}", e),
47                    &mut *pool.acquire().await?,
48                )
49                .await?;
50            return Err(e);
51        }
52    };
53
54    let storm_file_path = repo_path.join(&machine.run.workflow_path);
55    if !storm_file_path.exists() {
56        let err_msg = format!(
57            "Workflow file {} not found in repo",
58            machine.run.workflow_path
59        );
60        let _ = machine
61            .fail(err_msg.clone(), &mut *pool.acquire().await?)
62            .await?;
63        return Err(anyhow::anyhow!(err_msg));
64    }
65
66    // 4. Read and Parse the workflow file
67    let workflow_content = fs::read_to_string(&storm_file_path)
68        .with_context(|| format!("Failed to read workflow file at {:?}", storm_file_path))?;
69
70    debug!(
71        "Workflow content loaded for {}: {} bytes",
72        run_id,
73        workflow_content.len()
74    );
75
76    let parser = StormchaserParser::new();
77    let mut parsed_workflow = match parser.parse(&workflow_content) {
78        Ok(w) => w,
79        Err(e) => {
80            let _ = machine
81                .fail(
82                    format!("Workflow parsing failed: {}", e),
83                    &mut *pool.acquire().await?,
84                )
85                .await?;
86            return Err(e);
87        }
88    };
89
90    // 4.5 Resolve includes inline
91    let mut resolved_includes = std::collections::HashSet::new();
92    let mut includes_to_process = parsed_workflow.includes.clone();
93    parsed_workflow.includes.clear(); // We will inline them, so remove from AST
94
95    while let Some(inc) = includes_to_process.pop() {
96        if !resolved_includes.insert(inc.workflow.clone()) {
97            continue; // Prevent infinite loops
98        }
99
100        let inc_path = repo_path.join(&inc.workflow);
101        if !inc_path.exists() {
102            let err_msg = format!("Included workflow file {} not found", inc.workflow);
103            let _ = machine
104                .fail(err_msg.clone(), &mut *pool.acquire().await?)
105                .await?;
106            return Err(anyhow::anyhow!(err_msg));
107        }
108
109        let inc_content = fs::read_to_string(&inc_path)?;
110        let inc_workflow = match parser.parse(&inc_content) {
111            Ok(w) => w,
112            Err(e) => {
113                let err_msg = format!("Included workflow {} parsing failed: {}", inc.workflow, e);
114                let _ = machine
115                    .fail(err_msg.clone(), &mut *pool.acquire().await?)
116                    .await?;
117                return Err(anyhow::anyhow!(err_msg));
118            }
119        };
120
121        // Inline step libraries
122        parsed_workflow
123            .step_libraries
124            .extend(inc_workflow.step_libraries);
125
126        // Inline steps, applying the include prefix to avoid name collisions and substituting inputs
127        let prefix = format!("{}.", inc.name);
128        for mut step in inc_workflow.steps {
129            step.name = format!("{}{}", prefix, step.name);
130
131            // Update next pointers
132            for next_ref in &mut step.next {
133                *next_ref = format!("{}{}", prefix, next_ref);
134            }
135
136            // Very naive input substitution using HCL templates/variables
137            // In a real engine, we'd use hcl_eval, but for AST merging we can inject param overrides
138            for (k, v) in &inc.inputs {
139                let var_pattern = format!("inputs.{}", k);
140                let val_str = v.to_string();
141
142                for param_val in step.params.values_mut() {
143                    *param_val = param_val.replace(&var_pattern, &val_str);
144                }
145            }
146
147            parsed_workflow.steps.push(step);
148        }
149
150        includes_to_process.extend(inc_workflow.includes);
151    }
152
153    // 5. OPA Policy Check after Parsing
154    // We send the full context: AST, user, and inputs
155    let inputs = fetch_inputs(run_id, &pool).await?;
156
157    let opa_context = EngineOpaContext {
158        run_id,
159        initiating_user: machine.run.initiating_user.clone(),
160        workflow_ast: serde_json::to_value(&parsed_workflow)?,
161        inputs,
162    };
163
164    match opa_client.check_context(opa_context).await {
165        Ok(true) => debug!("OPA allowed execution for run {}", run_id),
166        Ok(false) => {
167            let err_msg = "Execution denied by OPA policy".to_string();
168            info!("Run {}: {}", run_id, err_msg);
169            let _ = machine.fail(err_msg, &mut *pool.acquire().await?).await?;
170            return Ok(()); // Handled failure
171        }
172        Err(e) => {
173            let err_msg = format!("OPA check failed: {}", e);
174            error!("Run {}: {}", run_id, err_msg);
175            let _ = machine.fail(err_msg, &mut *pool.acquire().await?).await?;
176            return Err(e);
177        }
178    }
179
180    // 6. Update RunContext with the definition and source code
181    crate::db::update_run_context(
182        &pool,
183        serde_json::to_value(&parsed_workflow)?,
184        Some(&workflow_content).map(|s| s.as_str()),
185        &parsed_workflow.dsl_version,
186        run_id,
187    )
188    .await
189    .with_context(|| format!("Failed to update run context for {}", run_id))?;
190
191    // 7. Transition to StartPending
192    let machine = machine.start_pending(&mut *pool.acquire().await?).await?;
193
194    // Emit event for transition to StartPending
195    let event = WorkflowStartPendingEvent {
196        run_id,
197        event_type: "workflow_start_pending".to_string(),
198        timestamp: chrono::Utc::now(),
199    };
200    let js = async_nats::jetstream::new(nats_client);
201    stormchaser_model::nats::publish_cloudevent(
202        &js,
203        "stormchaser.v1.run.start_pending",
204        "stormchaser.v1.run.start_pending",
205        "/stormchaser",
206        serde_json::to_value(event).unwrap(),
207        Some("1.0"),
208        None,
209    )
210    .await
211    .with_context(|| format!("Failed to publish start_pending event for {}", run_id))?;
212
213    // 8. Transition to Running
214    let _ = machine.start(&mut *pool.acquire().await?).await?;
215
216    info!(
217        "Successfully resolved and parsed workflow file, started run {}",
218        run_id
219    );
220
221    Ok(())
222}