stormchaser_engine/handler/workflow/
queued.rs1use crate::git_cache::GitCache;
2use crate::handler::{fetch_inputs, fetch_run};
3use crate::workflow_machine::{state, WorkflowMachine};
4use anyhow::{Context, Result};
5use sqlx::PgPool;
6use std::fs;
7use std::sync::Arc;
8use stormchaser_dsl::StormchaserParser;
9use stormchaser_model::auth::{EngineOpaContext, OpaClient};
10use stormchaser_model::events::WorkflowStartPendingEvent;
11use stormchaser_model::events::{EventSource, EventType, SchemaVersion, WorkflowEventType};
12use stormchaser_model::RunId;
13use stormchaser_tls::TlsReloader;
14use tracing::{debug, error, info};
15
16#[tracing::instrument(skip(pool, git_cache, opa_client, nats_client, _tls_reloader), fields(run_id = %run_id))]
17pub async fn handle_workflow_queued(
19 run_id: RunId,
20 pool: PgPool,
21 git_cache: Arc<GitCache>,
22 opa_client: Arc<OpaClient>,
23 nats_client: async_nats::Client,
24 _tls_reloader: Arc<TlsReloader>,
25) -> Result<()> {
26 info!("Handling queued workflow run: {}", run_id);
27
28 let run = fetch_run(run_id, &pool).await?;
30 let machine = WorkflowMachine::<state::Queued>::new(run);
31
32 let machine = machine.start_resolving(&mut *pool.acquire().await?).await?;
34
35 let repo_path_res = git_cache.ensure_files(
37 &machine.run.repo_url,
38 &machine.run.git_ref,
39 std::slice::from_ref(&machine.run.workflow_path),
40 );
41
42 let repo_path = match repo_path_res {
43 Ok(path) => path,
44 Err(e) => {
45 let _ = machine
46 .fail(
47 format!("Git resolution failed: {}", e),
48 &mut *pool.acquire().await?,
49 )
50 .await?;
51 return Err(e);
52 }
53 };
54
55 let storm_file_path = repo_path.join(&machine.run.workflow_path);
56 if !storm_file_path.exists() {
57 let err_msg = format!(
58 "Workflow file {} not found in repo",
59 machine.run.workflow_path
60 );
61 let _ = machine
62 .fail(err_msg.clone(), &mut *pool.acquire().await?)
63 .await?;
64 return Err(anyhow::anyhow!(err_msg));
65 }
66
67 let workflow_content = fs::read_to_string(&storm_file_path)
69 .with_context(|| format!("Failed to read workflow file at {:?}", storm_file_path))?;
70
71 debug!(
72 "Workflow content loaded for {}: {} bytes",
73 run_id,
74 workflow_content.len()
75 );
76
77 let parser = StormchaserParser::new();
78 let mut parsed_workflow = match parser.parse(&workflow_content) {
79 Ok(w) => w,
80 Err(e) => {
81 let _ = machine
82 .fail(
83 format!("Workflow parsing failed: {}", e),
84 &mut *pool.acquire().await?,
85 )
86 .await?;
87 return Err(e);
88 }
89 };
90
91 let mut resolved_includes = std::collections::HashSet::new();
93 let mut includes_to_process = parsed_workflow.includes.clone();
94 parsed_workflow.includes.clear(); while let Some(inc) = includes_to_process.pop() {
97 if !resolved_includes.insert(inc.workflow.clone()) {
98 continue; }
100
101 let inc_path = repo_path.join(&inc.workflow);
102 if !inc_path.exists() {
103 let err_msg = format!("Included workflow file {} not found", inc.workflow);
104 let _ = machine
105 .fail(err_msg.clone(), &mut *pool.acquire().await?)
106 .await?;
107 return Err(anyhow::anyhow!(err_msg));
108 }
109
110 let inc_content = fs::read_to_string(&inc_path)?;
111 let inc_workflow = match parser.parse(&inc_content) {
112 Ok(w) => w,
113 Err(e) => {
114 let err_msg = format!("Included workflow {} parsing failed: {}", inc.workflow, e);
115 let _ = machine
116 .fail(err_msg.clone(), &mut *pool.acquire().await?)
117 .await?;
118 return Err(anyhow::anyhow!(err_msg));
119 }
120 };
121
122 parsed_workflow
124 .step_libraries
125 .extend(inc_workflow.step_libraries);
126
127 let prefix = format!("{}.", inc.name);
129 for mut step in inc_workflow.steps {
130 step.name = format!("{}{}", prefix, step.name);
131
132 for next_ref in &mut step.next {
134 *next_ref = format!("{}{}", prefix, next_ref);
135 }
136
137 for (k, v) in &inc.inputs {
140 let var_pattern = format!("inputs.{}", k);
141 let val_str = v.to_string();
142
143 for param_val in step.params.values_mut() {
144 *param_val = param_val.replace(&var_pattern, &val_str);
145 }
146 }
147
148 parsed_workflow.steps.push(step);
149 }
150
151 includes_to_process.extend(inc_workflow.includes);
152 }
153
154 let inputs = fetch_inputs(run_id, &pool).await?;
157
158 let opa_context = EngineOpaContext {
159 run_id,
160 initiating_user: machine.run.initiating_user.clone(),
161 workflow_ast: serde_json::to_value(&parsed_workflow)?,
162 inputs,
163 };
164
165 match opa_client.check_context(opa_context).await {
166 Ok(true) => debug!("OPA allowed execution for run {}", run_id),
167 Ok(false) => {
168 let err_msg = "Execution denied by OPA policy".to_string();
169 info!("Run {}: {}", run_id, err_msg);
170 let _ = machine.fail(err_msg, &mut *pool.acquire().await?).await?;
171 return Ok(()); }
173 Err(e) => {
174 let err_msg = format!("OPA check failed: {}", e);
175 error!("Run {}: {}", run_id, err_msg);
176 let _ = machine.fail(err_msg, &mut *pool.acquire().await?).await?;
177 return Err(e);
178 }
179 }
180
181 crate::db::update_run_context(
183 &pool,
184 serde_json::to_value(&parsed_workflow)?,
185 Some(&workflow_content).map(|s| s.as_str()),
186 &parsed_workflow.dsl_version,
187 run_id,
188 )
189 .await
190 .with_context(|| format!("Failed to update run context for {}", run_id))?;
191
192 let machine = machine.start_pending(&mut *pool.acquire().await?).await?;
194
195 let event = WorkflowStartPendingEvent {
197 run_id,
198 event_type: EventType::Workflow(WorkflowEventType::StartPending),
199 timestamp: chrono::Utc::now(),
200 };
201 let js = async_nats::jetstream::new(nats_client);
202 use stormchaser_model::nats::NatsSubject;
203 stormchaser_model::nats::publish_cloudevent(
204 &js,
205 NatsSubject::RunStartPending,
206 EventType::Workflow(WorkflowEventType::StartPending),
207 EventSource::System,
208 serde_json::to_value(event).unwrap(),
209 Some(SchemaVersion::new("1.0".to_string())),
210 None,
211 )
212 .await
213 .with_context(|| format!("Failed to publish start_pending event for {}", run_id))?;
214
215 let _ = machine.start(&mut *pool.acquire().await?).await?;
217
218 info!(
219 "Successfully resolved and parsed workflow file, started run {}",
220 run_id
221 );
222
223 Ok(())
224}