1use anyhow::{Context, Result};
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::PgPool;
5use std::sync::Arc;
6use std::time::Duration;
7use stormchaser_model::dsl::Step;
8use stormchaser_model::events::{
9 EventSource, EventType, SchemaVersion, StepEventType, StepScheduledEvent,
10};
11use stormchaser_model::nats::publish_cloudevent;
12use stormchaser_model::storage::BackendType;
13use stormchaser_model::storage::StorageBackend;
14use stormchaser_model::RunId;
15use stormchaser_model::StepInstanceId;
16use stormchaser_tls::TlsReloader;
17
18use crate::handler::{fetch_run_context, RunContext};
19
20use stormchaser_model::dsl;
21
22pub fn find_step<'a>(steps: &'a [Step], name: &str) -> Option<&'a Step> {
24 for step in steps {
25 if step.name == name {
26 return Some(step);
27 }
28 if let Some(inner) = &step.steps {
29 if let Some(found) = find_step(inner, name) {
30 return Some(found);
31 }
32 }
33 }
34 None
35}
36
37async fn apply_intrinsic_mutations(
38 run_id: RunId,
39 step_type: &mut String,
40 resolved_spec: &mut Value,
41) -> Result<()> {
42 super::intrinsic::git_checkout::mutate(step_type, resolved_spec);
43 super::intrinsic::jq::mutate_if_has_files(step_type, resolved_spec);
44 super::intrinsic::terraform::mutate_if_terraform(run_id.into_inner(), step_type, resolved_spec)
45 .await?;
46 super::intrinsic::terraform::mutate_if_terraform_approval(step_type, resolved_spec);
47 Ok(())
48}
49
50async fn resolve_storage_provision(
51 run_id: RunId,
52 pool: &PgPool,
53 storage: &dsl::Storage,
54 run_context: &RunContext,
55) -> Result<Vec<dsl::Provision>> {
56 let mut provision_data = Vec::new();
57 for prov in &storage.provision {
58 let mut prov_clone = prov.clone();
59 if prov_clone.resource_type == "artifact" {
60 let (backend_id, remote_path) =
61 crate::db::get_artifact_by_name(pool, run_id.into_inner(), &prov_clone.name)
62 .await?
63 .with_context(|| {
64 format!(
65 "Artifact '{}' not found for run {}",
66 prov_clone.name, run_id
67 )
68 })?;
69
70 let backend_info: StorageBackend =
71 crate::db::get_storage_backend_by_id(pool, backend_id)
72 .await?
73 .with_context(|| {
74 format!(
75 "Storage backend {} not found for artifact '{}'",
76 backend_id, prov_clone.name
77 )
78 })?;
79
80 if backend_info.backend_type != BackendType::S3 {
81 anyhow::bail!(
82 "Artifact '{}' requires an S3 backend for provisioning; backend '{}' is not S3",
83 prov_clone.name,
84 backend_info.name
85 );
86 }
87
88 let bucket = backend_info
89 .config
90 .get("bucket")
91 .and_then(|b| b.as_str())
92 .with_context(|| {
93 format!(
94 "Missing 'bucket' in config for backend '{}' (artifact '{}')",
95 backend_info.name, prov_clone.name
96 )
97 })?;
98
99 let client = crate::s3::get_s3_client(&backend_info).await?;
100 let expires = std::time::Duration::from_secs(3600);
101 prov_clone.url = Some(
102 crate::s3::generate_presigned_url(&client, bucket, &remote_path, false, expires)
103 .await?,
104 );
105 } else if let Some(url) = &prov_clone.url {
106 let mut val = Value::String(url.clone());
107 let hcl_ctx = crate::hcl_eval::create_context(
108 run_context.inputs.clone(),
109 run_id,
110 run_context.secrets.clone(),
111 );
112 if crate::hcl_eval::resolve_expressions(&mut val, &hcl_ctx).is_ok() {
113 prov_clone.url = match val {
114 Value::String(s) => Some(s),
115 other => Some(other.to_string()),
116 };
117 }
118 }
119 provision_data.push(prov_clone);
120 }
121 Ok(provision_data)
122}
123
124async fn setup_storage_urls(
125 run_id: RunId,
126 pool: &PgPool,
127 workflow: &dsl::Workflow,
128 resolved_spec: &Value,
129 run_context: &RunContext,
130) -> Result<serde_json::Map<String, Value>> {
131 let mut storage_urls = serde_json::Map::new();
132
133 let mut mounted_storage_names = std::collections::HashSet::new();
134 if let Some(mounts) = resolved_spec
135 .get("storage_mounts")
136 .and_then(|m| m.as_array())
137 {
138 for mount in mounts {
139 if let Some(name) = mount.get("name").and_then(|n| n.as_str()) {
140 mounted_storage_names.insert(name.to_string());
141 }
142 }
143 }
144
145 if workflow.storage.is_empty() {
146 return Ok(storage_urls);
147 }
148
149 for storage in &workflow.storage {
150 if !mounted_storage_names.contains(&storage.name) {
151 continue;
152 }
153
154 let backend: Option<StorageBackend> = if let Some(ref backend_name) = storage.backend {
155 crate::db::get_storage_backend_by_name(pool, backend_name).await?
156 } else {
157 crate::db::get_default_sfs_backend(pool).await?
158 };
159
160 if let Some(backend) = backend {
161 let mut get_url = None;
162 let mut put_url = None;
163
164 if backend.backend_type == BackendType::S3 {
165 let client = crate::s3::get_s3_client(&backend).await?;
166 let bucket = backend.config["bucket"]
167 .as_str()
168 .context("Missing bucket in SFS backend config")?;
169
170 let key = format!("{}/{}.tar.gz", run_id, storage.name);
171 let expires = Duration::from_secs(3600);
172
173 get_url = Some(
174 crate::s3::generate_presigned_url(&client, bucket, &key, false, expires)
175 .await?,
176 );
177 put_url = Some(
178 crate::s3::generate_presigned_url(&client, bucket, &key, true, expires).await?,
179 );
180 }
181
182 let last_hash: Option<(String,)> =
183 crate::db::get_run_storage_last_hash(pool, run_id.into_inner(), &storage.name)
184 .await?;
185
186 let mut artifacts_data = serde_json::Map::new();
187 for artifact in &storage.artifacts {
188 let instructions = crate::artifact::generate_parking_instructions(
189 &backend,
190 run_id.into_inner(),
191 &storage.name,
192 artifact,
193 )
194 .await?;
195 artifacts_data.insert(artifact.name.clone(), instructions);
196 }
197
198 let provision_data =
199 resolve_storage_provision(run_id, pool, storage, run_context).await?;
200
201 let mut preserve = storage.preserve.clone();
202 if let Some(mounts) = resolved_spec
203 .get("storage_mounts")
204 .and_then(|m| m.as_array())
205 {
206 for mount in mounts {
207 if let Some(name) = mount.get("name").and_then(|n| n.as_str()) {
208 if name == storage.name {
209 if let Some(p) = mount.get("preserve").and_then(|p| p.as_array()) {
210 preserve = p
211 .iter()
212 .filter_map(|v| v.as_str().map(|s| s.to_string()))
213 .collect();
214 }
215 }
216 }
217 }
218 }
219
220 storage_urls.insert(
221 storage.name.clone(),
222 serde_json::json!({
223 "get_url": get_url,
224 "put_url": put_url,
225 "expected_hash": last_hash.map(|h| h.0),
226 "artifacts": artifacts_data,
227 "provision": provision_data,
228 "preserve": preserve,
229 }),
230 );
231 }
232 }
233
234 Ok(storage_urls)
235}
236
237#[allow(clippy::too_many_arguments)]
238async fn try_dispatch_intrinsic(
239 run_id: RunId,
240 step_instance_id: StepInstanceId,
241 step_type: &str,
242 resolved_spec: &Value,
243 resolved_params: &Value,
244 pool: PgPool,
245 nats_client: async_nats::Client,
246 tls_reloader: Arc<TlsReloader>,
247) -> Result<bool> {
248 if super::intrinsic::wasm::try_dispatch(
249 run_id,
250 step_instance_id,
251 step_type,
252 resolved_spec,
253 resolved_params,
254 pool.clone(),
255 nats_client.clone(),
256 tls_reloader.clone(),
257 )
258 .await?
259 {
260 return Ok(true);
261 }
262 if super::intrinsic::lambda::try_dispatch(
263 run_id,
264 step_instance_id,
265 step_type,
266 resolved_spec,
267 pool.clone(),
268 nats_client.clone(),
269 tls_reloader.clone(),
270 )
271 .await?
272 {
273 return Ok(true);
274 }
275 if super::intrinsic::webhook::try_dispatch(
276 run_id,
277 step_instance_id,
278 step_type,
279 resolved_spec,
280 pool.clone(),
281 nats_client.clone(),
282 tls_reloader.clone(),
283 )
284 .await?
285 {
286 return Ok(true);
287 }
288 if super::intrinsic::jinja::try_dispatch(
289 run_id,
290 step_instance_id,
291 step_type,
292 resolved_spec,
293 pool.clone(),
294 nats_client.clone(),
295 tls_reloader.clone(),
296 )
297 .await?
298 {
299 return Ok(true);
300 }
301 if super::intrinsic::email::try_dispatch(
302 run_id,
303 step_instance_id,
304 step_type,
305 resolved_spec,
306 pool.clone(),
307 nats_client.clone(),
308 tls_reloader.clone(),
309 )
310 .await?
311 {
312 return Ok(true);
313 }
314 if super::intrinsic::test_report_email::try_dispatch(
315 run_id,
316 step_instance_id,
317 step_type,
318 resolved_spec,
319 pool.clone(),
320 nats_client.clone(),
321 tls_reloader.clone(),
322 )
323 .await?
324 {
325 return Ok(true);
326 }
327 if super::intrinsic::jq::try_dispatch(
328 run_id,
329 step_instance_id,
330 step_type,
331 resolved_spec,
332 pool.clone(),
333 nats_client.clone(),
334 tls_reloader.clone(),
335 )
336 .await?
337 {
338 return Ok(true);
339 }
340 Ok(false)
341}
342
343async fn setup_test_report_urls(
344 run_id: RunId,
345 step_instance_id: StepInstanceId,
346 step_name: &str,
347 pool: &PgPool,
348 workflow: &dsl::Workflow,
349) -> Result<serde_json::Map<String, Value>> {
350 let mut test_report_urls = serde_json::Map::new();
351 if let Some(step) = find_step(&workflow.steps, step_name) {
352 if !step.reports.is_empty() {
353 let backend = crate::db::get_default_sfs_backend(pool)
354 .await?
355 .context("Default SFS backend required for test reports")?;
356 let client = crate::s3::get_s3_client(&backend).await?;
357 let bucket = backend.config["bucket"]
358 .as_str()
359 .context("Missing bucket")?;
360
361 for report in &step.reports {
362 let report_key = format!(
363 "test-reports/{}/{}/{}.tar.gz",
364 run_id, step_instance_id, report.name
365 );
366 let expires = Duration::from_secs(3600);
367 let put_url =
368 crate::s3::generate_presigned_url(&client, bucket, &report_key, true, expires)
369 .await?;
370
371 test_report_urls.insert(
372 report.name.clone(),
373 serde_json::json!({
374 "put_url": put_url,
375 "remote_path": report_key,
376 "backend_id": backend.id,
377 }),
378 );
379 }
380 }
381 }
382 Ok(test_report_urls)
383}
384
385#[allow(clippy::too_many_arguments)]
387pub async fn dispatch_step_instance(
388 run_id: RunId,
389 step_instance_id: StepInstanceId,
390 step_name: &str,
391 step_type: &str,
392 resolved_spec: &Value,
393 resolved_params: &Value,
394 nats_client: async_nats::Client,
395 pool: PgPool,
396 tls_reloader: Arc<TlsReloader>,
397) -> Result<()> {
398 let mut step_type = step_type.to_string();
399 let mut resolved_spec = resolved_spec.clone();
400
401 apply_intrinsic_mutations(run_id, &mut step_type, &mut resolved_spec).await?;
402
403 let run_context = fetch_run_context(run_id, &pool).await?;
404 let workflow: dsl::Workflow = serde_json::from_value(run_context.workflow_definition.clone())
405 .context("Failed to parse workflow definition from context")?;
406
407 let storage_urls =
408 setup_storage_urls(run_id, &pool, &workflow, &resolved_spec, &run_context).await?;
409
410 if try_dispatch_intrinsic(
411 run_id,
412 step_instance_id,
413 &step_type,
414 &resolved_spec,
415 resolved_params,
416 pool.clone(),
417 nats_client.clone(),
418 tls_reloader.clone(),
419 )
420 .await?
421 {
422 return Ok(());
423 }
424
425 let mut dsl_step_val = Value::Null;
426 if let Some(found_step) = find_step(&workflow.steps, step_name) {
427 dsl_step_val = serde_json::to_value(found_step).unwrap_or(Value::Null);
428 }
429
430 let test_report_urls =
431 setup_test_report_urls(run_id, step_instance_id, step_name, &pool, &workflow).await?;
432
433 let payload = StepScheduledEvent {
434 run_id,
435 step_id: step_instance_id,
436 step_name: Some(step_name.to_string()),
437 step_type: Some(step_type.clone()),
438 spec: Some(resolved_spec),
439 params: Some(resolved_params.clone()),
440 storage: Some(storage_urls.into_iter().collect()),
441 test_report_urls: Some(test_report_urls.into_iter().collect()),
442 timestamp: Utc::now(),
443 event_type: EventType::Step(StepEventType::Scheduled),
444 step_dsl: dsl_step_val,
445 };
446
447 let js = async_nats::jetstream::new(nats_client);
448 let subject = format!("stormchaser.v1.step.scheduled.{}", step_type.to_lowercase());
449 use stormchaser_model::nats::NatsSubject;
450 publish_cloudevent(
451 &js,
452 NatsSubject::Custom(subject.clone()),
453 EventType::Step(StepEventType::Scheduled),
454 EventSource::System,
455 serde_json::to_value(payload).unwrap(),
456 Some(SchemaVersion::new("1.0".to_string())),
457 None,
458 )
459 .await?;
460
461 Ok(())
462}