1use anyhow::{Context, Result};
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::PgPool;
5use std::sync::Arc;
6use std::time::Duration;
7use stormchaser_model::dsl::Step;
8use stormchaser_model::events::StepScheduledEvent;
9use stormchaser_model::storage::BackendType;
10use stormchaser_model::storage::StorageBackend;
11use stormchaser_model::RunId;
12use stormchaser_model::StepInstanceId;
13use stormchaser_tls::TlsReloader;
14
15use crate::handler::{fetch_run_context, RunContext};
16
17use stormchaser_model::dsl;
18
19pub fn find_step<'a>(steps: &'a [Step], name: &str) -> Option<&'a Step> {
21 for step in steps {
22 if step.name == name {
23 return Some(step);
24 }
25 if let Some(inner) = &step.steps {
26 if let Some(found) = find_step(inner, name) {
27 return Some(found);
28 }
29 }
30 }
31 None
32}
33
34async fn apply_intrinsic_mutations(
35 run_id: RunId,
36 step_type: &mut String,
37 resolved_spec: &mut Value,
38) -> Result<()> {
39 super::intrinsic::git_checkout::mutate(step_type, resolved_spec);
40 super::intrinsic::jq::mutate_if_has_files(step_type, resolved_spec);
41 super::intrinsic::terraform::mutate_if_terraform(run_id.into_inner(), step_type, resolved_spec)
42 .await?;
43 super::intrinsic::terraform::mutate_if_terraform_approval(step_type, resolved_spec);
44 Ok(())
45}
46
47async fn resolve_storage_provision(
48 run_id: RunId,
49 pool: &PgPool,
50 storage: &dsl::Storage,
51 run_context: &RunContext,
52) -> Result<Vec<dsl::Provision>> {
53 let mut provision_data = Vec::new();
54 for prov in &storage.provision {
55 let mut prov_clone = prov.clone();
56 if prov_clone.resource_type == "artifact" {
57 let (backend_id, remote_path) =
58 crate::db::get_artifact_by_name(pool, run_id.into_inner(), &prov_clone.name)
59 .await?
60 .with_context(|| {
61 format!(
62 "Artifact '{}' not found for run {}",
63 prov_clone.name, run_id
64 )
65 })?;
66
67 let backend_info: StorageBackend =
68 crate::db::get_storage_backend_by_id(pool, backend_id)
69 .await?
70 .with_context(|| {
71 format!(
72 "Storage backend {} not found for artifact '{}'",
73 backend_id, prov_clone.name
74 )
75 })?;
76
77 if backend_info.backend_type != BackendType::S3 {
78 anyhow::bail!(
79 "Artifact '{}' requires an S3 backend for provisioning; backend '{}' is not S3",
80 prov_clone.name,
81 backend_info.name
82 );
83 }
84
85 let bucket = backend_info
86 .config
87 .get("bucket")
88 .and_then(|b| b.as_str())
89 .with_context(|| {
90 format!(
91 "Missing 'bucket' in config for backend '{}' (artifact '{}')",
92 backend_info.name, prov_clone.name
93 )
94 })?;
95
96 let client = crate::s3::get_s3_client(&backend_info).await?;
97 let expires = std::time::Duration::from_secs(3600);
98 prov_clone.url = Some(
99 crate::s3::generate_presigned_url(&client, bucket, &remote_path, false, expires)
100 .await?,
101 );
102 } else if let Some(url) = &prov_clone.url {
103 let mut val = Value::String(url.clone());
104 let hcl_ctx = crate::hcl_eval::create_context(
105 run_context.inputs.clone(),
106 run_id,
107 run_context.secrets.clone(),
108 );
109 if crate::hcl_eval::resolve_expressions(&mut val, &hcl_ctx).is_ok() {
110 prov_clone.url = match val {
111 Value::String(s) => Some(s),
112 other => Some(other.to_string()),
113 };
114 }
115 }
116 provision_data.push(prov_clone);
117 }
118 Ok(provision_data)
119}
120
121async fn setup_storage_urls(
122 run_id: RunId,
123 pool: &PgPool,
124 workflow: &dsl::Workflow,
125 resolved_spec: &Value,
126 run_context: &RunContext,
127) -> Result<serde_json::Map<String, Value>> {
128 let mut storage_urls = serde_json::Map::new();
129
130 let mut mounted_storage_names = std::collections::HashSet::new();
131 if let Some(mounts) = resolved_spec
132 .get("storage_mounts")
133 .and_then(|m| m.as_array())
134 {
135 for mount in mounts {
136 if let Some(name) = mount.get("name").and_then(|n| n.as_str()) {
137 mounted_storage_names.insert(name.to_string());
138 }
139 }
140 }
141
142 if workflow.storage.is_empty() {
143 return Ok(storage_urls);
144 }
145
146 for storage in &workflow.storage {
147 if !mounted_storage_names.contains(&storage.name) {
148 continue;
149 }
150
151 let backend: Option<StorageBackend> = if let Some(ref backend_name) = storage.backend {
152 crate::db::get_storage_backend_by_name(pool, backend_name).await?
153 } else {
154 crate::db::get_default_sfs_backend(pool).await?
155 };
156
157 if let Some(backend) = backend {
158 let mut get_url = None;
159 let mut put_url = None;
160
161 if backend.backend_type == BackendType::S3 {
162 let client = crate::s3::get_s3_client(&backend).await?;
163 let bucket = backend.config["bucket"]
164 .as_str()
165 .context("Missing bucket in SFS backend config")?;
166
167 let key = format!("{}/{}.tar.gz", run_id, storage.name);
168 let expires = Duration::from_secs(3600);
169
170 get_url = Some(
171 crate::s3::generate_presigned_url(&client, bucket, &key, false, expires)
172 .await?,
173 );
174 put_url = Some(
175 crate::s3::generate_presigned_url(&client, bucket, &key, true, expires).await?,
176 );
177 }
178
179 let last_hash: Option<(String,)> =
180 crate::db::get_run_storage_last_hash(pool, run_id.into_inner(), &storage.name)
181 .await?;
182
183 let mut artifacts_data = serde_json::Map::new();
184 for artifact in &storage.artifacts {
185 let instructions = crate::artifact::generate_parking_instructions(
186 &backend,
187 run_id.into_inner(),
188 &storage.name,
189 artifact,
190 )
191 .await?;
192 artifacts_data.insert(artifact.name.clone(), instructions);
193 }
194
195 let provision_data =
196 resolve_storage_provision(run_id, pool, storage, run_context).await?;
197
198 let mut preserve = storage.preserve.clone();
199 if let Some(mounts) = resolved_spec
200 .get("storage_mounts")
201 .and_then(|m| m.as_array())
202 {
203 for mount in mounts {
204 if let Some(name) = mount.get("name").and_then(|n| n.as_str()) {
205 if name == storage.name {
206 if let Some(p) = mount.get("preserve").and_then(|p| p.as_array()) {
207 preserve = p
208 .iter()
209 .filter_map(|v| v.as_str().map(|s| s.to_string()))
210 .collect();
211 }
212 }
213 }
214 }
215 }
216
217 storage_urls.insert(
218 storage.name.clone(),
219 serde_json::json!({
220 "get_url": get_url,
221 "put_url": put_url,
222 "expected_hash": last_hash.map(|h| h.0),
223 "artifacts": artifacts_data,
224 "provision": provision_data,
225 "preserve": preserve,
226 }),
227 );
228 }
229 }
230
231 Ok(storage_urls)
232}
233
234#[allow(clippy::too_many_arguments)]
235async fn try_dispatch_intrinsic(
236 run_id: RunId,
237 step_instance_id: StepInstanceId,
238 step_type: &str,
239 resolved_spec: &Value,
240 resolved_params: &Value,
241 pool: PgPool,
242 nats_client: async_nats::Client,
243 tls_reloader: Arc<TlsReloader>,
244) -> Result<bool> {
245 if super::intrinsic::wasm::try_dispatch(
246 run_id,
247 step_instance_id,
248 step_type,
249 resolved_spec,
250 resolved_params,
251 pool.clone(),
252 nats_client.clone(),
253 tls_reloader.clone(),
254 )
255 .await?
256 {
257 return Ok(true);
258 }
259 if super::intrinsic::lambda::try_dispatch(
260 run_id,
261 step_instance_id,
262 step_type,
263 resolved_spec,
264 pool.clone(),
265 nats_client.clone(),
266 tls_reloader.clone(),
267 )
268 .await?
269 {
270 return Ok(true);
271 }
272 if super::intrinsic::webhook::try_dispatch(
273 run_id,
274 step_instance_id,
275 step_type,
276 resolved_spec,
277 pool.clone(),
278 nats_client.clone(),
279 tls_reloader.clone(),
280 )
281 .await?
282 {
283 return Ok(true);
284 }
285 if super::intrinsic::jinja::try_dispatch(
286 run_id,
287 step_instance_id,
288 step_type,
289 resolved_spec,
290 pool.clone(),
291 nats_client.clone(),
292 tls_reloader.clone(),
293 )
294 .await?
295 {
296 return Ok(true);
297 }
298 if super::intrinsic::email::try_dispatch(
299 run_id,
300 step_instance_id,
301 step_type,
302 resolved_spec,
303 pool.clone(),
304 nats_client.clone(),
305 tls_reloader.clone(),
306 )
307 .await?
308 {
309 return Ok(true);
310 }
311 if super::intrinsic::test_report_email::try_dispatch(
312 run_id,
313 step_instance_id,
314 step_type,
315 resolved_spec,
316 pool.clone(),
317 nats_client.clone(),
318 tls_reloader.clone(),
319 )
320 .await?
321 {
322 return Ok(true);
323 }
324 if super::intrinsic::jq::try_dispatch(
325 run_id,
326 step_instance_id,
327 step_type,
328 resolved_spec,
329 pool.clone(),
330 nats_client.clone(),
331 tls_reloader.clone(),
332 )
333 .await?
334 {
335 return Ok(true);
336 }
337 Ok(false)
338}
339
340async fn setup_test_report_urls(
341 run_id: RunId,
342 step_instance_id: StepInstanceId,
343 step_name: &str,
344 pool: &PgPool,
345 workflow: &dsl::Workflow,
346) -> Result<serde_json::Map<String, Value>> {
347 let mut test_report_urls = serde_json::Map::new();
348 if let Some(step) = find_step(&workflow.steps, step_name) {
349 if !step.reports.is_empty() {
350 let backend = crate::db::get_default_sfs_backend(pool)
351 .await?
352 .context("Default SFS backend required for test reports")?;
353 let client = crate::s3::get_s3_client(&backend).await?;
354 let bucket = backend.config["bucket"]
355 .as_str()
356 .context("Missing bucket")?;
357
358 for report in &step.reports {
359 let report_key = format!(
360 "test-reports/{}/{}/{}.tar.gz",
361 run_id, step_instance_id, report.name
362 );
363 let expires = Duration::from_secs(3600);
364 let put_url =
365 crate::s3::generate_presigned_url(&client, bucket, &report_key, true, expires)
366 .await?;
367
368 test_report_urls.insert(
369 report.name.clone(),
370 serde_json::json!({
371 "put_url": put_url,
372 "remote_path": report_key,
373 "backend_id": backend.id,
374 }),
375 );
376 }
377 }
378 }
379 Ok(test_report_urls)
380}
381
382#[allow(clippy::too_many_arguments)]
384pub async fn dispatch_step_instance(
385 run_id: RunId,
386 step_instance_id: StepInstanceId,
387 step_name: &str,
388 step_type: &str,
389 resolved_spec: &Value,
390 resolved_params: &Value,
391 nats_client: async_nats::Client,
392 pool: PgPool,
393 tls_reloader: Arc<TlsReloader>,
394) -> Result<()> {
395 let mut step_type = step_type.to_string();
396 let mut resolved_spec = resolved_spec.clone();
397
398 apply_intrinsic_mutations(run_id, &mut step_type, &mut resolved_spec).await?;
399
400 let run_context = fetch_run_context(run_id, &pool).await?;
401 let workflow: dsl::Workflow = serde_json::from_value(run_context.workflow_definition.clone())
402 .context("Failed to parse workflow definition from context")?;
403
404 let storage_urls =
405 setup_storage_urls(run_id, &pool, &workflow, &resolved_spec, &run_context).await?;
406
407 if try_dispatch_intrinsic(
408 run_id,
409 step_instance_id,
410 &step_type,
411 &resolved_spec,
412 resolved_params,
413 pool.clone(),
414 nats_client.clone(),
415 tls_reloader.clone(),
416 )
417 .await?
418 {
419 return Ok(());
420 }
421
422 let mut dsl_step_val = Value::Null;
423 if let Some(found_step) = find_step(&workflow.steps, step_name) {
424 dsl_step_val = serde_json::to_value(found_step).unwrap_or(Value::Null);
425 }
426
427 let test_report_urls =
428 setup_test_report_urls(run_id, step_instance_id, step_name, &pool, &workflow).await?;
429
430 let payload = StepScheduledEvent {
431 run_id,
432 step_id: step_instance_id,
433 step_name: Some(step_name.to_string()),
434 step_type: Some(step_type.clone()),
435 spec: Some(resolved_spec),
436 params: Some(resolved_params.clone()),
437 storage: Some(storage_urls.into_iter().collect()),
438 test_report_urls: Some(test_report_urls.into_iter().collect()),
439 timestamp: Utc::now(),
440 event_type: "stormchaser.v1.step.scheduled".to_string(),
441 step_dsl: dsl_step_val,
442 };
443
444 let js = async_nats::jetstream::new(nats_client);
445 let subject = format!("stormchaser.v1.step.scheduled.{}", step_type.to_lowercase());
446 stormchaser_model::nats::publish_cloudevent(
447 &js,
448 &subject,
449 &subject,
450 "/stormchaser",
451 serde_json::to_value(payload).unwrap(),
452 Some("1.0"),
453 None,
454 )
455 .await?;
456
457 Ok(())
458}