stormchaser_engine/handler/workflow/
queued.rs1use crate::git_cache::GitCache;
2use crate::handler::{fetch_inputs, fetch_run};
3use crate::workflow_machine::{state, WorkflowMachine};
4use anyhow::{Context, Result};
5use sqlx::PgPool;
6use std::fs;
7use std::sync::Arc;
8use stormchaser_dsl::StormchaserParser;
9use stormchaser_model::auth::{EngineOpaContext, OpaClient};
10use stormchaser_model::events::WorkflowStartPendingEvent;
11use stormchaser_model::RunId;
12use stormchaser_tls::TlsReloader;
13use tracing::{debug, error, info};
14
15#[tracing::instrument(skip(pool, git_cache, opa_client, nats_client, _tls_reloader), fields(run_id = %run_id))]
16pub async fn handle_workflow_queued(
18 run_id: RunId,
19 pool: PgPool,
20 git_cache: Arc<GitCache>,
21 opa_client: Arc<OpaClient>,
22 nats_client: async_nats::Client,
23 _tls_reloader: Arc<TlsReloader>,
24) -> Result<()> {
25 info!("Handling queued workflow run: {}", run_id);
26
27 let run = fetch_run(run_id, &pool).await?;
29 let machine = WorkflowMachine::<state::Queued>::new(run);
30
31 let machine = machine.start_resolving(&mut *pool.acquire().await?).await?;
33
34 let repo_path_res = git_cache.ensure_files(
36 &machine.run.repo_url,
37 &machine.run.git_ref,
38 std::slice::from_ref(&machine.run.workflow_path),
39 );
40
41 let repo_path = match repo_path_res {
42 Ok(path) => path,
43 Err(e) => {
44 let _ = machine
45 .fail(
46 format!("Git resolution failed: {}", e),
47 &mut *pool.acquire().await?,
48 )
49 .await?;
50 return Err(e);
51 }
52 };
53
54 let storm_file_path = repo_path.join(&machine.run.workflow_path);
55 if !storm_file_path.exists() {
56 let err_msg = format!(
57 "Workflow file {} not found in repo",
58 machine.run.workflow_path
59 );
60 let _ = machine
61 .fail(err_msg.clone(), &mut *pool.acquire().await?)
62 .await?;
63 return Err(anyhow::anyhow!(err_msg));
64 }
65
66 let workflow_content = fs::read_to_string(&storm_file_path)
68 .with_context(|| format!("Failed to read workflow file at {:?}", storm_file_path))?;
69
70 debug!(
71 "Workflow content loaded for {}: {} bytes",
72 run_id,
73 workflow_content.len()
74 );
75
76 let parser = StormchaserParser::new();
77 let mut parsed_workflow = match parser.parse(&workflow_content) {
78 Ok(w) => w,
79 Err(e) => {
80 let _ = machine
81 .fail(
82 format!("Workflow parsing failed: {}", e),
83 &mut *pool.acquire().await?,
84 )
85 .await?;
86 return Err(e);
87 }
88 };
89
90 let mut resolved_includes = std::collections::HashSet::new();
92 let mut includes_to_process = parsed_workflow.includes.clone();
93 parsed_workflow.includes.clear(); while let Some(inc) = includes_to_process.pop() {
96 if !resolved_includes.insert(inc.workflow.clone()) {
97 continue; }
99
100 let inc_path = repo_path.join(&inc.workflow);
101 if !inc_path.exists() {
102 let err_msg = format!("Included workflow file {} not found", inc.workflow);
103 let _ = machine
104 .fail(err_msg.clone(), &mut *pool.acquire().await?)
105 .await?;
106 return Err(anyhow::anyhow!(err_msg));
107 }
108
109 let inc_content = fs::read_to_string(&inc_path)?;
110 let inc_workflow = match parser.parse(&inc_content) {
111 Ok(w) => w,
112 Err(e) => {
113 let err_msg = format!("Included workflow {} parsing failed: {}", inc.workflow, e);
114 let _ = machine
115 .fail(err_msg.clone(), &mut *pool.acquire().await?)
116 .await?;
117 return Err(anyhow::anyhow!(err_msg));
118 }
119 };
120
121 parsed_workflow
123 .step_libraries
124 .extend(inc_workflow.step_libraries);
125
126 let prefix = format!("{}.", inc.name);
128 for mut step in inc_workflow.steps {
129 step.name = format!("{}{}", prefix, step.name);
130
131 for next_ref in &mut step.next {
133 *next_ref = format!("{}{}", prefix, next_ref);
134 }
135
136 for (k, v) in &inc.inputs {
139 let var_pattern = format!("inputs.{}", k);
140 let val_str = v.to_string();
141
142 for param_val in step.params.values_mut() {
143 *param_val = param_val.replace(&var_pattern, &val_str);
144 }
145 }
146
147 parsed_workflow.steps.push(step);
148 }
149
150 includes_to_process.extend(inc_workflow.includes);
151 }
152
153 let inputs = fetch_inputs(run_id, &pool).await?;
156
157 let opa_context = EngineOpaContext {
158 run_id,
159 initiating_user: machine.run.initiating_user.clone(),
160 workflow_ast: serde_json::to_value(&parsed_workflow)?,
161 inputs,
162 };
163
164 match opa_client.check_context(opa_context).await {
165 Ok(true) => debug!("OPA allowed execution for run {}", run_id),
166 Ok(false) => {
167 let err_msg = "Execution denied by OPA policy".to_string();
168 info!("Run {}: {}", run_id, err_msg);
169 let _ = machine.fail(err_msg, &mut *pool.acquire().await?).await?;
170 return Ok(()); }
172 Err(e) => {
173 let err_msg = format!("OPA check failed: {}", e);
174 error!("Run {}: {}", run_id, err_msg);
175 let _ = machine.fail(err_msg, &mut *pool.acquire().await?).await?;
176 return Err(e);
177 }
178 }
179
180 crate::db::update_run_context(
182 &pool,
183 serde_json::to_value(&parsed_workflow)?,
184 Some(&workflow_content).map(|s| s.as_str()),
185 &parsed_workflow.dsl_version,
186 run_id,
187 )
188 .await
189 .with_context(|| format!("Failed to update run context for {}", run_id))?;
190
191 let machine = machine.start_pending(&mut *pool.acquire().await?).await?;
193
194 let event = WorkflowStartPendingEvent {
196 run_id,
197 event_type: "workflow_start_pending".to_string(),
198 timestamp: chrono::Utc::now(),
199 };
200 let js = async_nats::jetstream::new(nats_client);
201 stormchaser_model::nats::publish_cloudevent(
202 &js,
203 "stormchaser.v1.run.start_pending",
204 "stormchaser.v1.run.start_pending",
205 "/stormchaser",
206 serde_json::to_value(event).unwrap(),
207 Some("1.0"),
208 None,
209 )
210 .await
211 .with_context(|| format!("Failed to publish start_pending event for {}", run_id))?;
212
213 let _ = machine.start(&mut *pool.acquire().await?).await?;
215
216 info!(
217 "Successfully resolved and parsed workflow file, started run {}",
218 run_id
219 );
220
221 Ok(())
222}