Skip to main content

stormchaser_engine/handler/step/
dispatch.rs

1use anyhow::{Context, Result};
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::PgPool;
5use std::sync::Arc;
6use std::time::Duration;
7use stormchaser_model::dsl::Step;
8use stormchaser_model::events::{
9    EventSource, EventType, SchemaVersion, StepEventType, StepScheduledEvent,
10};
11use stormchaser_model::nats::publish_cloudevent;
12use stormchaser_model::storage::BackendType;
13use stormchaser_model::storage::StorageBackend;
14use stormchaser_model::RunId;
15use stormchaser_model::StepInstanceId;
16use stormchaser_tls::TlsReloader;
17
18use crate::handler::{fetch_run_context, RunContext};
19
20use stormchaser_model::dsl;
21
22/// Recursively searches for a step by name within a list of steps.
23pub fn find_step<'a>(steps: &'a [Step], name: &str) -> Option<&'a Step> {
24    for step in steps {
25        if step.name == name {
26            return Some(step);
27        }
28        if let Some(inner) = &step.steps {
29            if let Some(found) = find_step(inner, name) {
30                return Some(found);
31            }
32        }
33    }
34    None
35}
36
37async fn apply_intrinsic_mutations(
38    run_id: RunId,
39    step_type: &mut String,
40    resolved_spec: &mut Value,
41) -> Result<()> {
42    super::intrinsic::git_checkout::mutate(step_type, resolved_spec);
43    super::intrinsic::jq::mutate_if_has_files(step_type, resolved_spec);
44    super::intrinsic::terraform::mutate_if_terraform(run_id.into_inner(), step_type, resolved_spec)
45        .await?;
46    super::intrinsic::terraform::mutate_if_terraform_approval(step_type, resolved_spec);
47    Ok(())
48}
49
50async fn resolve_storage_provision(
51    run_id: RunId,
52    pool: &PgPool,
53    storage: &dsl::Storage,
54    run_context: &RunContext,
55) -> Result<Vec<dsl::Provision>> {
56    let mut provision_data = Vec::new();
57    for prov in &storage.provision {
58        let mut prov_clone = prov.clone();
59        if prov_clone.resource_type == "artifact" {
60            let (backend_id, remote_path) =
61                crate::db::get_artifact_by_name(pool, run_id.into_inner(), &prov_clone.name)
62                    .await?
63                    .with_context(|| {
64                        format!(
65                            "Artifact '{}' not found for run {}",
66                            prov_clone.name, run_id
67                        )
68                    })?;
69
70            let backend_info: StorageBackend =
71                crate::db::get_storage_backend_by_id(pool, backend_id)
72                    .await?
73                    .with_context(|| {
74                        format!(
75                            "Storage backend {} not found for artifact '{}'",
76                            backend_id, prov_clone.name
77                        )
78                    })?;
79
80            if backend_info.backend_type != BackendType::S3 {
81                anyhow::bail!(
82                    "Artifact '{}' requires an S3 backend for provisioning; backend '{}' is not S3",
83                    prov_clone.name,
84                    backend_info.name
85                );
86            }
87
88            let bucket = backend_info
89                .config
90                .get("bucket")
91                .and_then(|b| b.as_str())
92                .with_context(|| {
93                    format!(
94                        "Missing 'bucket' in config for backend '{}' (artifact '{}')",
95                        backend_info.name, prov_clone.name
96                    )
97                })?;
98
99            let client = crate::s3::get_s3_client(&backend_info).await?;
100            let expires = std::time::Duration::from_secs(3600);
101            prov_clone.url = Some(
102                crate::s3::generate_presigned_url(&client, bucket, &remote_path, false, expires)
103                    .await?,
104            );
105        } else if let Some(url) = &prov_clone.url {
106            let mut val = Value::String(url.clone());
107            let hcl_ctx = crate::hcl_eval::create_context(
108                run_context.inputs.clone(),
109                run_id,
110                run_context.secrets.clone(),
111            );
112            if crate::hcl_eval::resolve_expressions(&mut val, &hcl_ctx).is_ok() {
113                prov_clone.url = match val {
114                    Value::String(s) => Some(s),
115                    other => Some(other.to_string()),
116                };
117            }
118        }
119        provision_data.push(prov_clone);
120    }
121    Ok(provision_data)
122}
123
124async fn setup_storage_urls(
125    run_id: RunId,
126    pool: &PgPool,
127    workflow: &dsl::Workflow,
128    resolved_spec: &Value,
129    run_context: &RunContext,
130) -> Result<serde_json::Map<String, Value>> {
131    let mut storage_urls = serde_json::Map::new();
132
133    let mut mounted_storage_names = std::collections::HashSet::new();
134    if let Some(mounts) = resolved_spec
135        .get("storage_mounts")
136        .and_then(|m| m.as_array())
137    {
138        for mount in mounts {
139            if let Some(name) = mount.get("name").and_then(|n| n.as_str()) {
140                mounted_storage_names.insert(name.to_string());
141            }
142        }
143    }
144
145    if workflow.storage.is_empty() {
146        return Ok(storage_urls);
147    }
148
149    for storage in &workflow.storage {
150        if !mounted_storage_names.contains(&storage.name) {
151            continue;
152        }
153
154        let backend: Option<StorageBackend> = if let Some(ref backend_name) = storage.backend {
155            crate::db::get_storage_backend_by_name(pool, backend_name).await?
156        } else {
157            crate::db::get_default_sfs_backend(pool).await?
158        };
159
160        if let Some(backend) = backend {
161            let mut get_url = None;
162            let mut put_url = None;
163
164            if backend.backend_type == BackendType::S3 {
165                let client = crate::s3::get_s3_client(&backend).await?;
166                let bucket = backend.config["bucket"]
167                    .as_str()
168                    .context("Missing bucket in SFS backend config")?;
169
170                let key = format!("{}/{}.tar.gz", run_id, storage.name);
171                let expires = Duration::from_secs(3600);
172
173                get_url = Some(
174                    crate::s3::generate_presigned_url(&client, bucket, &key, false, expires)
175                        .await?,
176                );
177                put_url = Some(
178                    crate::s3::generate_presigned_url(&client, bucket, &key, true, expires).await?,
179                );
180            }
181
182            let last_hash: Option<(String,)> =
183                crate::db::get_run_storage_last_hash(pool, run_id.into_inner(), &storage.name)
184                    .await?;
185
186            let mut artifacts_data = serde_json::Map::new();
187            for artifact in &storage.artifacts {
188                let instructions = crate::artifact::generate_parking_instructions(
189                    &backend,
190                    run_id.into_inner(),
191                    &storage.name,
192                    artifact,
193                )
194                .await?;
195                artifacts_data.insert(artifact.name.clone(), instructions);
196            }
197
198            let provision_data =
199                resolve_storage_provision(run_id, pool, storage, run_context).await?;
200
201            let mut preserve = storage.preserve.clone();
202            if let Some(mounts) = resolved_spec
203                .get("storage_mounts")
204                .and_then(|m| m.as_array())
205            {
206                for mount in mounts {
207                    if let Some(name) = mount.get("name").and_then(|n| n.as_str()) {
208                        if name == storage.name {
209                            if let Some(p) = mount.get("preserve").and_then(|p| p.as_array()) {
210                                preserve = p
211                                    .iter()
212                                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
213                                    .collect();
214                            }
215                        }
216                    }
217                }
218            }
219
220            storage_urls.insert(
221                storage.name.clone(),
222                serde_json::json!({
223                    "get_url": get_url,
224                    "put_url": put_url,
225                    "expected_hash": last_hash.map(|h| h.0),
226                    "artifacts": artifacts_data,
227                    "provision": provision_data,
228                    "preserve": preserve,
229                }),
230            );
231        }
232    }
233
234    Ok(storage_urls)
235}
236
237#[allow(clippy::too_many_arguments)]
238async fn try_dispatch_intrinsic(
239    run_id: RunId,
240    step_instance_id: StepInstanceId,
241    step_type: &str,
242    resolved_spec: &Value,
243    resolved_params: &Value,
244    pool: PgPool,
245    nats_client: async_nats::Client,
246    tls_reloader: Arc<TlsReloader>,
247) -> Result<bool> {
248    if super::intrinsic::wasm::try_dispatch(
249        run_id,
250        step_instance_id,
251        step_type,
252        resolved_spec,
253        resolved_params,
254        pool.clone(),
255        nats_client.clone(),
256        tls_reloader.clone(),
257    )
258    .await?
259    {
260        return Ok(true);
261    }
262    if super::intrinsic::lambda::try_dispatch(
263        run_id,
264        step_instance_id,
265        step_type,
266        resolved_spec,
267        pool.clone(),
268        nats_client.clone(),
269        tls_reloader.clone(),
270    )
271    .await?
272    {
273        return Ok(true);
274    }
275    if super::intrinsic::webhook::try_dispatch(
276        run_id,
277        step_instance_id,
278        step_type,
279        resolved_spec,
280        pool.clone(),
281        nats_client.clone(),
282        tls_reloader.clone(),
283    )
284    .await?
285    {
286        return Ok(true);
287    }
288    if super::intrinsic::jinja::try_dispatch(
289        run_id,
290        step_instance_id,
291        step_type,
292        resolved_spec,
293        pool.clone(),
294        nats_client.clone(),
295        tls_reloader.clone(),
296    )
297    .await?
298    {
299        return Ok(true);
300    }
301    if super::intrinsic::email::try_dispatch(
302        run_id,
303        step_instance_id,
304        step_type,
305        resolved_spec,
306        pool.clone(),
307        nats_client.clone(),
308        tls_reloader.clone(),
309    )
310    .await?
311    {
312        return Ok(true);
313    }
314    if super::intrinsic::test_report_email::try_dispatch(
315        run_id,
316        step_instance_id,
317        step_type,
318        resolved_spec,
319        pool.clone(),
320        nats_client.clone(),
321        tls_reloader.clone(),
322    )
323    .await?
324    {
325        return Ok(true);
326    }
327    if super::intrinsic::jq::try_dispatch(
328        run_id,
329        step_instance_id,
330        step_type,
331        resolved_spec,
332        pool.clone(),
333        nats_client.clone(),
334        tls_reloader.clone(),
335    )
336    .await?
337    {
338        return Ok(true);
339    }
340    Ok(false)
341}
342
343async fn setup_test_report_urls(
344    run_id: RunId,
345    step_instance_id: StepInstanceId,
346    step_name: &str,
347    pool: &PgPool,
348    workflow: &dsl::Workflow,
349) -> Result<serde_json::Map<String, Value>> {
350    let mut test_report_urls = serde_json::Map::new();
351    if let Some(step) = find_step(&workflow.steps, step_name) {
352        if !step.reports.is_empty() {
353            let backend = crate::db::get_default_sfs_backend(pool)
354                .await?
355                .context("Default SFS backend required for test reports")?;
356            let client = crate::s3::get_s3_client(&backend).await?;
357            let bucket = backend.config["bucket"]
358                .as_str()
359                .context("Missing bucket")?;
360
361            for report in &step.reports {
362                let report_key = format!(
363                    "test-reports/{}/{}/{}.tar.gz",
364                    run_id, step_instance_id, report.name
365                );
366                let expires = Duration::from_secs(3600);
367                let put_url =
368                    crate::s3::generate_presigned_url(&client, bucket, &report_key, true, expires)
369                        .await?;
370
371                test_report_urls.insert(
372                    report.name.clone(),
373                    serde_json::json!({
374                        "put_url": put_url,
375                        "remote_path": report_key,
376                        "backend_id": backend.id,
377                    }),
378                );
379            }
380        }
381    }
382    Ok(test_report_urls)
383}
384
385/// Dispatches a step instance, handling intrinsic types or forwarding to runners via NATS.
386#[allow(clippy::too_many_arguments)]
387pub async fn dispatch_step_instance(
388    run_id: RunId,
389    step_instance_id: StepInstanceId,
390    step_name: &str,
391    step_type: &str,
392    resolved_spec: &Value,
393    resolved_params: &Value,
394    nats_client: async_nats::Client,
395    pool: PgPool,
396    tls_reloader: Arc<TlsReloader>,
397) -> Result<()> {
398    let mut step_type = step_type.to_string();
399    let mut resolved_spec = resolved_spec.clone();
400
401    apply_intrinsic_mutations(run_id, &mut step_type, &mut resolved_spec).await?;
402
403    let run_context = fetch_run_context(run_id, &pool).await?;
404    let workflow: dsl::Workflow = serde_json::from_value(run_context.workflow_definition.clone())
405        .context("Failed to parse workflow definition from context")?;
406
407    let storage_urls =
408        setup_storage_urls(run_id, &pool, &workflow, &resolved_spec, &run_context).await?;
409
410    if try_dispatch_intrinsic(
411        run_id,
412        step_instance_id,
413        &step_type,
414        &resolved_spec,
415        resolved_params,
416        pool.clone(),
417        nats_client.clone(),
418        tls_reloader.clone(),
419    )
420    .await?
421    {
422        return Ok(());
423    }
424
425    let mut dsl_step_val = Value::Null;
426    if let Some(found_step) = find_step(&workflow.steps, step_name) {
427        dsl_step_val = serde_json::to_value(found_step).unwrap_or(Value::Null);
428    }
429
430    let test_report_urls =
431        setup_test_report_urls(run_id, step_instance_id, step_name, &pool, &workflow).await?;
432
433    let payload = StepScheduledEvent {
434        run_id,
435        step_id: step_instance_id,
436        step_name: Some(step_name.to_string()),
437        step_type: Some(step_type.clone()),
438        spec: Some(resolved_spec),
439        params: Some(resolved_params.clone()),
440        storage: Some(storage_urls.into_iter().collect()),
441        test_report_urls: Some(test_report_urls.into_iter().collect()),
442        timestamp: Utc::now(),
443        event_type: EventType::Step(StepEventType::Scheduled),
444        step_dsl: dsl_step_val,
445    };
446
447    let js = async_nats::jetstream::new(nats_client);
448    let subject = format!("stormchaser.v1.step.scheduled.{}", step_type.to_lowercase());
449    use stormchaser_model::nats::NatsSubject;
450    publish_cloudevent(
451        &js,
452        NatsSubject::Custom(subject.clone()),
453        EventType::Step(StepEventType::Scheduled),
454        EventSource::System,
455        serde_json::to_value(payload).unwrap(),
456        Some(SchemaVersion::new("1.0".to_string())),
457        None,
458    )
459    .await?;
460
461    Ok(())
462}