Skip to main content

stormchaser_engine/handler/workflow/
queued.rs

1use crate::git_cache::GitCache;
2use crate::handler::{fetch_inputs, fetch_run};
3use crate::workflow_machine::{state, WorkflowMachine};
4use anyhow::{Context, Result};
5use sqlx::PgPool;
6use std::fs;
7use std::sync::Arc;
8use stormchaser_dsl::StormchaserParser;
9use stormchaser_model::auth::{EngineOpaContext, OpaClient};
10use stormchaser_model::events::WorkflowStartPendingEvent;
11use stormchaser_model::events::{EventSource, EventType, SchemaVersion, WorkflowEventType};
12use stormchaser_model::RunId;
13use stormchaser_tls::TlsReloader;
14use tracing::{debug, error, info};
15
16#[tracing::instrument(skip(pool, git_cache, opa_client, nats_client, _tls_reloader), fields(run_id = %run_id))]
17/// Handles the event when a workflow is queued and ready for resolution.
18pub async fn handle_workflow_queued(
19    run_id: RunId,
20    pool: PgPool,
21    git_cache: Arc<GitCache>,
22    opa_client: Arc<OpaClient>,
23    nats_client: async_nats::Client,
24    _tls_reloader: Arc<TlsReloader>,
25) -> Result<()> {
26    info!("Handling queued workflow run: {}", run_id);
27
28    // 1. Fetch WorkflowRun from DB
29    let run = fetch_run(run_id, &pool).await?;
30    let machine = WorkflowMachine::<state::Queued>::new(run);
31
32    // 2. Transition to Resolving
33    let machine = machine.start_resolving(&mut *pool.acquire().await?).await?;
34
35    // 3. Resolve the workflow file into cache
36    let repo_path_res = git_cache.ensure_files(
37        &machine.run.repo_url,
38        &machine.run.git_ref,
39        std::slice::from_ref(&machine.run.workflow_path),
40    );
41
42    let repo_path = match repo_path_res {
43        Ok(path) => path,
44        Err(e) => {
45            let _ = machine
46                .fail(
47                    format!("Git resolution failed: {}", e),
48                    &mut *pool.acquire().await?,
49                )
50                .await?;
51            return Err(e);
52        }
53    };
54
55    let storm_file_path = repo_path.join(&machine.run.workflow_path);
56    if !storm_file_path.exists() {
57        let err_msg = format!(
58            "Workflow file {} not found in repo",
59            machine.run.workflow_path
60        );
61        let _ = machine
62            .fail(err_msg.clone(), &mut *pool.acquire().await?)
63            .await?;
64        return Err(anyhow::anyhow!(err_msg));
65    }
66
67    // 4. Read and Parse the workflow file
68    let workflow_content = fs::read_to_string(&storm_file_path)
69        .with_context(|| format!("Failed to read workflow file at {:?}", storm_file_path))?;
70
71    debug!(
72        "Workflow content loaded for {}: {} bytes",
73        run_id,
74        workflow_content.len()
75    );
76
77    let parser = StormchaserParser::new();
78    let mut parsed_workflow = match parser.parse(&workflow_content) {
79        Ok(w) => w,
80        Err(e) => {
81            let _ = machine
82                .fail(
83                    format!("Workflow parsing failed: {}", e),
84                    &mut *pool.acquire().await?,
85                )
86                .await?;
87            return Err(e);
88        }
89    };
90
91    // 4.5 Resolve includes inline
92    let mut resolved_includes = std::collections::HashSet::new();
93    let mut includes_to_process = parsed_workflow.includes.clone();
94    parsed_workflow.includes.clear(); // We will inline them, so remove from AST
95
96    while let Some(inc) = includes_to_process.pop() {
97        if !resolved_includes.insert(inc.workflow.clone()) {
98            continue; // Prevent infinite loops
99        }
100
101        let inc_path = repo_path.join(&inc.workflow);
102        if !inc_path.exists() {
103            let err_msg = format!("Included workflow file {} not found", inc.workflow);
104            let _ = machine
105                .fail(err_msg.clone(), &mut *pool.acquire().await?)
106                .await?;
107            return Err(anyhow::anyhow!(err_msg));
108        }
109
110        let inc_content = fs::read_to_string(&inc_path)?;
111        let inc_workflow = match parser.parse(&inc_content) {
112            Ok(w) => w,
113            Err(e) => {
114                let err_msg = format!("Included workflow {} parsing failed: {}", inc.workflow, e);
115                let _ = machine
116                    .fail(err_msg.clone(), &mut *pool.acquire().await?)
117                    .await?;
118                return Err(anyhow::anyhow!(err_msg));
119            }
120        };
121
122        // Inline step libraries
123        parsed_workflow
124            .step_libraries
125            .extend(inc_workflow.step_libraries);
126
127        // Inline steps, applying the include prefix to avoid name collisions and substituting inputs
128        let prefix = format!("{}.", inc.name);
129        for mut step in inc_workflow.steps {
130            step.name = format!("{}{}", prefix, step.name);
131
132            // Update next pointers
133            for next_ref in &mut step.next {
134                *next_ref = format!("{}{}", prefix, next_ref);
135            }
136
137            // Very naive input substitution using HCL templates/variables
138            // In a real engine, we'd use hcl_eval, but for AST merging we can inject param overrides
139            for (k, v) in &inc.inputs {
140                let var_pattern = format!("inputs.{}", k);
141                let val_str = v.to_string();
142
143                for param_val in step.params.values_mut() {
144                    *param_val = param_val.replace(&var_pattern, &val_str);
145                }
146            }
147
148            parsed_workflow.steps.push(step);
149        }
150
151        includes_to_process.extend(inc_workflow.includes);
152    }
153
154    // 5. OPA Policy Check after Parsing
155    // We send the full context: AST, user, and inputs
156    let inputs = fetch_inputs(run_id, &pool).await?;
157
158    let opa_context = EngineOpaContext {
159        run_id,
160        initiating_user: machine.run.initiating_user.clone(),
161        workflow_ast: serde_json::to_value(&parsed_workflow)?,
162        inputs,
163    };
164
165    match opa_client.check_context(opa_context).await {
166        Ok(true) => debug!("OPA allowed execution for run {}", run_id),
167        Ok(false) => {
168            let err_msg = "Execution denied by OPA policy".to_string();
169            info!("Run {}: {}", run_id, err_msg);
170            let _ = machine.fail(err_msg, &mut *pool.acquire().await?).await?;
171            return Ok(()); // Handled failure
172        }
173        Err(e) => {
174            let err_msg = format!("OPA check failed: {}", e);
175            error!("Run {}: {}", run_id, err_msg);
176            let _ = machine.fail(err_msg, &mut *pool.acquire().await?).await?;
177            return Err(e);
178        }
179    }
180
181    // 6. Update RunContext with the definition and source code
182    crate::db::update_run_context(
183        &pool,
184        serde_json::to_value(&parsed_workflow)?,
185        Some(&workflow_content).map(|s| s.as_str()),
186        &parsed_workflow.dsl_version,
187        run_id,
188    )
189    .await
190    .with_context(|| format!("Failed to update run context for {}", run_id))?;
191
192    // 7. Transition to StartPending
193    let machine = machine.start_pending(&mut *pool.acquire().await?).await?;
194
195    // Emit event for transition to StartPending
196    let event = WorkflowStartPendingEvent {
197        run_id,
198        event_type: EventType::Workflow(WorkflowEventType::StartPending),
199        timestamp: chrono::Utc::now(),
200    };
201    let js = async_nats::jetstream::new(nats_client);
202    use stormchaser_model::nats::NatsSubject;
203    stormchaser_model::nats::publish_cloudevent(
204        &js,
205        NatsSubject::RunStartPending,
206        EventType::Workflow(WorkflowEventType::StartPending),
207        EventSource::System,
208        serde_json::to_value(event).unwrap(),
209        Some(SchemaVersion::new("1.0".to_string())),
210        None,
211    )
212    .await
213    .with_context(|| format!("Failed to publish start_pending event for {}", run_id))?;
214
215    // 8. Transition to Running
216    let _ = machine.start(&mut *pool.acquire().await?).await?;
217
218    info!(
219        "Successfully resolved and parsed workflow file, started run {}",
220        run_id
221    );
222
223    Ok(())
224}