Skip to main content

stormchaser_engine/handler/step/
dispatch.rs

1use anyhow::{Context, Result};
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::PgPool;
5use std::sync::Arc;
6use std::time::Duration;
7use stormchaser_model::dsl::Step;
8use stormchaser_model::events::StepScheduledEvent;
9use stormchaser_model::storage::BackendType;
10use stormchaser_model::storage::StorageBackend;
11use stormchaser_model::RunId;
12use stormchaser_model::StepInstanceId;
13use stormchaser_tls::TlsReloader;
14
15use crate::handler::{fetch_run_context, RunContext};
16
17use stormchaser_model::dsl;
18
19/// Recursively searches for a step by name within a list of steps.
20pub fn find_step<'a>(steps: &'a [Step], name: &str) -> Option<&'a Step> {
21    for step in steps {
22        if step.name == name {
23            return Some(step);
24        }
25        if let Some(inner) = &step.steps {
26            if let Some(found) = find_step(inner, name) {
27                return Some(found);
28            }
29        }
30    }
31    None
32}
33
34async fn apply_intrinsic_mutations(
35    run_id: RunId,
36    step_type: &mut String,
37    resolved_spec: &mut Value,
38) -> Result<()> {
39    super::intrinsic::git_checkout::mutate(step_type, resolved_spec);
40    super::intrinsic::jq::mutate_if_has_files(step_type, resolved_spec);
41    super::intrinsic::terraform::mutate_if_terraform(run_id.into_inner(), step_type, resolved_spec)
42        .await?;
43    super::intrinsic::terraform::mutate_if_terraform_approval(step_type, resolved_spec);
44    Ok(())
45}
46
47async fn resolve_storage_provision(
48    run_id: RunId,
49    pool: &PgPool,
50    storage: &dsl::Storage,
51    run_context: &RunContext,
52) -> Result<Vec<dsl::Provision>> {
53    let mut provision_data = Vec::new();
54    for prov in &storage.provision {
55        let mut prov_clone = prov.clone();
56        if prov_clone.resource_type == "artifact" {
57            let (backend_id, remote_path) =
58                crate::db::get_artifact_by_name(pool, run_id.into_inner(), &prov_clone.name)
59                    .await?
60                    .with_context(|| {
61                        format!(
62                            "Artifact '{}' not found for run {}",
63                            prov_clone.name, run_id
64                        )
65                    })?;
66
67            let backend_info: StorageBackend =
68                crate::db::get_storage_backend_by_id(pool, backend_id)
69                    .await?
70                    .with_context(|| {
71                        format!(
72                            "Storage backend {} not found for artifact '{}'",
73                            backend_id, prov_clone.name
74                        )
75                    })?;
76
77            if backend_info.backend_type != BackendType::S3 {
78                anyhow::bail!(
79                    "Artifact '{}' requires an S3 backend for provisioning; backend '{}' is not S3",
80                    prov_clone.name,
81                    backend_info.name
82                );
83            }
84
85            let bucket = backend_info
86                .config
87                .get("bucket")
88                .and_then(|b| b.as_str())
89                .with_context(|| {
90                    format!(
91                        "Missing 'bucket' in config for backend '{}' (artifact '{}')",
92                        backend_info.name, prov_clone.name
93                    )
94                })?;
95
96            let client = crate::s3::get_s3_client(&backend_info).await?;
97            let expires = std::time::Duration::from_secs(3600);
98            prov_clone.url = Some(
99                crate::s3::generate_presigned_url(&client, bucket, &remote_path, false, expires)
100                    .await?,
101            );
102        } else if let Some(url) = &prov_clone.url {
103            let mut val = Value::String(url.clone());
104            let hcl_ctx = crate::hcl_eval::create_context(
105                run_context.inputs.clone(),
106                run_id,
107                run_context.secrets.clone(),
108            );
109            if crate::hcl_eval::resolve_expressions(&mut val, &hcl_ctx).is_ok() {
110                prov_clone.url = match val {
111                    Value::String(s) => Some(s),
112                    other => Some(other.to_string()),
113                };
114            }
115        }
116        provision_data.push(prov_clone);
117    }
118    Ok(provision_data)
119}
120
121async fn setup_storage_urls(
122    run_id: RunId,
123    pool: &PgPool,
124    workflow: &dsl::Workflow,
125    resolved_spec: &Value,
126    run_context: &RunContext,
127) -> Result<serde_json::Map<String, Value>> {
128    let mut storage_urls = serde_json::Map::new();
129
130    let mut mounted_storage_names = std::collections::HashSet::new();
131    if let Some(mounts) = resolved_spec
132        .get("storage_mounts")
133        .and_then(|m| m.as_array())
134    {
135        for mount in mounts {
136            if let Some(name) = mount.get("name").and_then(|n| n.as_str()) {
137                mounted_storage_names.insert(name.to_string());
138            }
139        }
140    }
141
142    if workflow.storage.is_empty() {
143        return Ok(storage_urls);
144    }
145
146    for storage in &workflow.storage {
147        if !mounted_storage_names.contains(&storage.name) {
148            continue;
149        }
150
151        let backend: Option<StorageBackend> = if let Some(ref backend_name) = storage.backend {
152            crate::db::get_storage_backend_by_name(pool, backend_name).await?
153        } else {
154            crate::db::get_default_sfs_backend(pool).await?
155        };
156
157        if let Some(backend) = backend {
158            let mut get_url = None;
159            let mut put_url = None;
160
161            if backend.backend_type == BackendType::S3 {
162                let client = crate::s3::get_s3_client(&backend).await?;
163                let bucket = backend.config["bucket"]
164                    .as_str()
165                    .context("Missing bucket in SFS backend config")?;
166
167                let key = format!("{}/{}.tar.gz", run_id, storage.name);
168                let expires = Duration::from_secs(3600);
169
170                get_url = Some(
171                    crate::s3::generate_presigned_url(&client, bucket, &key, false, expires)
172                        .await?,
173                );
174                put_url = Some(
175                    crate::s3::generate_presigned_url(&client, bucket, &key, true, expires).await?,
176                );
177            }
178
179            let last_hash: Option<(String,)> =
180                crate::db::get_run_storage_last_hash(pool, run_id.into_inner(), &storage.name)
181                    .await?;
182
183            let mut artifacts_data = serde_json::Map::new();
184            for artifact in &storage.artifacts {
185                let instructions = crate::artifact::generate_parking_instructions(
186                    &backend,
187                    run_id.into_inner(),
188                    &storage.name,
189                    artifact,
190                )
191                .await?;
192                artifacts_data.insert(artifact.name.clone(), instructions);
193            }
194
195            let provision_data =
196                resolve_storage_provision(run_id, pool, storage, run_context).await?;
197
198            let mut preserve = storage.preserve.clone();
199            if let Some(mounts) = resolved_spec
200                .get("storage_mounts")
201                .and_then(|m| m.as_array())
202            {
203                for mount in mounts {
204                    if let Some(name) = mount.get("name").and_then(|n| n.as_str()) {
205                        if name == storage.name {
206                            if let Some(p) = mount.get("preserve").and_then(|p| p.as_array()) {
207                                preserve = p
208                                    .iter()
209                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
210                                    .collect();
211                            }
212                        }
213                    }
214                }
215            }
216
217            storage_urls.insert(
218                storage.name.clone(),
219                serde_json::json!({
220                    "get_url": get_url,
221                    "put_url": put_url,
222                    "expected_hash": last_hash.map(|h| h.0),
223                    "artifacts": artifacts_data,
224                    "provision": provision_data,
225                    "preserve": preserve,
226                }),
227            );
228        }
229    }
230
231    Ok(storage_urls)
232}
233
234#[allow(clippy::too_many_arguments)]
235async fn try_dispatch_intrinsic(
236    run_id: RunId,
237    step_instance_id: StepInstanceId,
238    step_type: &str,
239    resolved_spec: &Value,
240    resolved_params: &Value,
241    pool: PgPool,
242    nats_client: async_nats::Client,
243    tls_reloader: Arc<TlsReloader>,
244) -> Result<bool> {
245    if super::intrinsic::wasm::try_dispatch(
246        run_id,
247        step_instance_id,
248        step_type,
249        resolved_spec,
250        resolved_params,
251        pool.clone(),
252        nats_client.clone(),
253        tls_reloader.clone(),
254    )
255    .await?
256    {
257        return Ok(true);
258    }
259    if super::intrinsic::lambda::try_dispatch(
260        run_id,
261        step_instance_id,
262        step_type,
263        resolved_spec,
264        pool.clone(),
265        nats_client.clone(),
266        tls_reloader.clone(),
267    )
268    .await?
269    {
270        return Ok(true);
271    }
272    if super::intrinsic::webhook::try_dispatch(
273        run_id,
274        step_instance_id,
275        step_type,
276        resolved_spec,
277        pool.clone(),
278        nats_client.clone(),
279        tls_reloader.clone(),
280    )
281    .await?
282    {
283        return Ok(true);
284    }
285    if super::intrinsic::jinja::try_dispatch(
286        run_id,
287        step_instance_id,
288        step_type,
289        resolved_spec,
290        pool.clone(),
291        nats_client.clone(),
292        tls_reloader.clone(),
293    )
294    .await?
295    {
296        return Ok(true);
297    }
298    if super::intrinsic::email::try_dispatch(
299        run_id,
300        step_instance_id,
301        step_type,
302        resolved_spec,
303        pool.clone(),
304        nats_client.clone(),
305        tls_reloader.clone(),
306    )
307    .await?
308    {
309        return Ok(true);
310    }
311    if super::intrinsic::test_report_email::try_dispatch(
312        run_id,
313        step_instance_id,
314        step_type,
315        resolved_spec,
316        pool.clone(),
317        nats_client.clone(),
318        tls_reloader.clone(),
319    )
320    .await?
321    {
322        return Ok(true);
323    }
324    if super::intrinsic::jq::try_dispatch(
325        run_id,
326        step_instance_id,
327        step_type,
328        resolved_spec,
329        pool.clone(),
330        nats_client.clone(),
331        tls_reloader.clone(),
332    )
333    .await?
334    {
335        return Ok(true);
336    }
337    Ok(false)
338}
339
340async fn setup_test_report_urls(
341    run_id: RunId,
342    step_instance_id: StepInstanceId,
343    step_name: &str,
344    pool: &PgPool,
345    workflow: &dsl::Workflow,
346) -> Result<serde_json::Map<String, Value>> {
347    let mut test_report_urls = serde_json::Map::new();
348    if let Some(step) = find_step(&workflow.steps, step_name) {
349        if !step.reports.is_empty() {
350            let backend = crate::db::get_default_sfs_backend(pool)
351                .await?
352                .context("Default SFS backend required for test reports")?;
353            let client = crate::s3::get_s3_client(&backend).await?;
354            let bucket = backend.config["bucket"]
355                .as_str()
356                .context("Missing bucket")?;
357
358            for report in &step.reports {
359                let report_key = format!(
360                    "test-reports/{}/{}/{}.tar.gz",
361                    run_id, step_instance_id, report.name
362                );
363                let expires = Duration::from_secs(3600);
364                let put_url =
365                    crate::s3::generate_presigned_url(&client, bucket, &report_key, true, expires)
366                        .await?;
367
368                test_report_urls.insert(
369                    report.name.clone(),
370                    serde_json::json!({
371                        "put_url": put_url,
372                        "remote_path": report_key,
373                        "backend_id": backend.id,
374                    }),
375                );
376            }
377        }
378    }
379    Ok(test_report_urls)
380}
381
382/// Dispatches a step instance, handling intrinsic types or forwarding to runners via NATS.
383#[allow(clippy::too_many_arguments)]
384pub async fn dispatch_step_instance(
385    run_id: RunId,
386    step_instance_id: StepInstanceId,
387    step_name: &str,
388    step_type: &str,
389    resolved_spec: &Value,
390    resolved_params: &Value,
391    nats_client: async_nats::Client,
392    pool: PgPool,
393    tls_reloader: Arc<TlsReloader>,
394) -> Result<()> {
395    let mut step_type = step_type.to_string();
396    let mut resolved_spec = resolved_spec.clone();
397
398    apply_intrinsic_mutations(run_id, &mut step_type, &mut resolved_spec).await?;
399
400    let run_context = fetch_run_context(run_id, &pool).await?;
401    let workflow: dsl::Workflow = serde_json::from_value(run_context.workflow_definition.clone())
402        .context("Failed to parse workflow definition from context")?;
403
404    let storage_urls =
405        setup_storage_urls(run_id, &pool, &workflow, &resolved_spec, &run_context).await?;
406
407    if try_dispatch_intrinsic(
408        run_id,
409        step_instance_id,
410        &step_type,
411        &resolved_spec,
412        resolved_params,
413        pool.clone(),
414        nats_client.clone(),
415        tls_reloader.clone(),
416    )
417    .await?
418    {
419        return Ok(());
420    }
421
422    let mut dsl_step_val = Value::Null;
423    if let Some(found_step) = find_step(&workflow.steps, step_name) {
424        dsl_step_val = serde_json::to_value(found_step).unwrap_or(Value::Null);
425    }
426
427    let test_report_urls =
428        setup_test_report_urls(run_id, step_instance_id, step_name, &pool, &workflow).await?;
429
430    let payload = StepScheduledEvent {
431        run_id,
432        step_id: step_instance_id,
433        step_name: Some(step_name.to_string()),
434        step_type: Some(step_type.clone()),
435        spec: Some(resolved_spec),
436        params: Some(resolved_params.clone()),
437        storage: Some(storage_urls.into_iter().collect()),
438        test_report_urls: Some(test_report_urls.into_iter().collect()),
439        timestamp: Utc::now(),
440        event_type: "stormchaser.v1.step.scheduled".to_string(),
441        step_dsl: dsl_step_val,
442    };
443
444    let js = async_nats::jetstream::new(nats_client);
445    let subject = format!("stormchaser.v1.step.scheduled.{}", step_type.to_lowercase());
446    stormchaser_model::nats::publish_cloudevent(
447        &js,
448        &subject,
449        &subject,
450        "/stormchaser",
451        serde_json::to_value(payload).unwrap(),
452        Some("1.0"),
453        None,
454    )
455    .await?;
456
457    Ok(())
458}