Skip to main content

stormchaser_engine/handler/step/
dispatch.rs

1use anyhow::{Context, Result};
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::PgPool;
5use std::sync::Arc;
6use std::time::Duration;
7use stormchaser_model::dsl::Step;
8use stormchaser_tls::TlsReloader;
9use uuid::Uuid;
10
11use crate::handler::fetch_run_context;
12
13use stormchaser_model::dsl;
14use stormchaser_model::storage;
15use stormchaser_model::storage::BackendType;
16
17/// Recursively searches for a step by name within a list of steps.
18pub fn find_step<'a>(steps: &'a [Step], name: &str) -> Option<&'a Step> {
19    for step in steps {
20        if step.name == name {
21            return Some(step);
22        }
23        if let Some(inner) = &step.steps {
24            if let Some(found) = find_step(inner, name) {
25                return Some(found);
26            }
27        }
28    }
29    None
30}
31
32/// Dispatches a step instance, handling intrinsic types or forwarding to runners via NATS.
33#[allow(clippy::too_many_arguments)]
34pub async fn dispatch_step_instance(
35    run_id: Uuid,
36    step_instance_id: Uuid,
37    step_name: &str,
38    step_type: &str,
39    resolved_spec: &Value,
40    resolved_params: &Value,
41    nats_client: async_nats::Client,
42    pool: PgPool,
43    tls_reloader: Arc<TlsReloader>,
44) -> Result<()> {
45    let mut step_type = step_type.to_string();
46    let mut resolved_spec = resolved_spec.clone();
47
48    super::intrinsic::git_checkout::mutate(&mut step_type, &mut resolved_spec);
49    super::intrinsic::jq::mutate_if_has_files(&mut step_type, &mut resolved_spec);
50
51    let run_context = fetch_run_context(run_id, &pool).await?;
52    let workflow: dsl::Workflow = serde_json::from_value(run_context.workflow_definition.clone())
53        .context("Failed to parse workflow definition from context")?;
54
55    let mut storage_urls = serde_json::Map::new();
56
57    if !workflow.storage.is_empty() {
58        for storage in workflow.storage {
59            let backend: Option<storage::StorageBackend> =
60                if let Some(ref backend_name) = storage.backend {
61                    crate::db::get_storage_backend_by_name(&pool, backend_name).await?
62                } else {
63                    crate::db::get_default_sfs_backend(&pool).await?
64                };
65
66            if let Some(backend) = backend {
67                let mut get_url = None;
68                let mut put_url = None;
69
70                if backend.backend_type == BackendType::S3 {
71                    let client = crate::s3::get_s3_client(&backend).await?;
72                    let bucket = backend.config["bucket"]
73                        .as_str()
74                        .context("Missing bucket in SFS backend config")?;
75
76                    let key = format!("{}/{}.tar.gz", run_id, storage.name);
77                    let expires = Duration::from_secs(3600);
78
79                    get_url = Some(
80                        crate::s3::generate_presigned_url(&client, bucket, &key, false, expires)
81                            .await?,
82                    );
83                    put_url = Some(
84                        crate::s3::generate_presigned_url(&client, bucket, &key, true, expires)
85                            .await?,
86                    );
87                }
88
89                let last_hash: Option<(String,)> =
90                    crate::db::get_run_storage_last_hash(&pool, run_id, &storage.name).await?;
91
92                let mut artifacts_data = serde_json::Map::new();
93                for artifact in storage.artifacts {
94                    let instructions = crate::artifact::generate_parking_instructions(
95                        &backend,
96                        run_id,
97                        &storage.name,
98                        &artifact,
99                    )
100                    .await?;
101                    artifacts_data.insert(artifact.name.clone(), instructions);
102                }
103
104                let mut provision_data = Vec::new();
105                for mut prov in storage.provision {
106                    if let Some(url) = &prov.url {
107                        let mut val = Value::String(url.clone());
108                        let hcl_ctx = crate::hcl_eval::create_context(
109                            run_context.inputs.clone(),
110                            run_id,
111                            run_context.secrets.clone(),
112                        );
113                        if crate::hcl_eval::resolve_expressions(&mut val, &hcl_ctx).is_ok() {
114                            prov.url = match val {
115                                Value::String(s) => Some(s),
116                                other => Some(other.to_string()),
117                            };
118                        }
119                    }
120                    provision_data.push(prov);
121                }
122
123                storage_urls.insert(
124                    storage.name.clone(),
125                    serde_json::json!({
126                        "get_url": get_url,
127                        "put_url": put_url,
128                        "expected_hash": last_hash.map(|h| h.0),
129                        "artifacts": artifacts_data,
130                        "provision": provision_data,
131                    }),
132                );
133            }
134        }
135    }
136
137    if super::intrinsic::wasm::try_dispatch(
138        run_id,
139        step_instance_id,
140        &step_type,
141        &resolved_spec,
142        resolved_params,
143        pool.clone(),
144        nats_client.clone(),
145        tls_reloader.clone(),
146    )
147    .await?
148    {
149        return Ok(());
150    }
151
152    if super::intrinsic::lambda::try_dispatch(
153        run_id,
154        step_instance_id,
155        &step_type,
156        &resolved_spec,
157        pool.clone(),
158        nats_client.clone(),
159        tls_reloader.clone(),
160    )
161    .await?
162    {
163        return Ok(());
164    }
165
166    if super::intrinsic::webhook::try_dispatch(
167        run_id,
168        step_instance_id,
169        &step_type,
170        &resolved_spec,
171        pool.clone(),
172        nats_client.clone(),
173        tls_reloader.clone(),
174    )
175    .await?
176    {
177        return Ok(());
178    }
179
180    if super::intrinsic::jinja::try_dispatch(
181        run_id,
182        step_instance_id,
183        &step_type,
184        &resolved_spec,
185        pool.clone(),
186        nats_client.clone(),
187        tls_reloader.clone(),
188    )
189    .await?
190    {
191        return Ok(());
192    }
193
194    if super::intrinsic::email::try_dispatch(
195        run_id,
196        step_instance_id,
197        &step_type,
198        &resolved_spec,
199        pool.clone(),
200        nats_client.clone(),
201        tls_reloader.clone(),
202    )
203    .await?
204    {
205        return Ok(());
206    }
207
208    if super::intrinsic::test_report_email::try_dispatch(
209        run_id,
210        step_instance_id,
211        &step_type,
212        &resolved_spec,
213        pool.clone(),
214        nats_client.clone(),
215        tls_reloader.clone(),
216    )
217    .await?
218    {
219        return Ok(());
220    }
221
222    if super::intrinsic::jq::try_dispatch(
223        run_id,
224        step_instance_id,
225        &step_type,
226        &resolved_spec,
227        pool.clone(),
228        nats_client.clone(),
229        tls_reloader.clone(),
230    )
231    .await?
232    {
233        return Ok(());
234    }
235
236    let mut dsl_step_val = Value::Null;
237    if let Some(found_step) = find_step(&workflow.steps, step_name) {
238        dsl_step_val = serde_json::to_value(found_step).unwrap_or(Value::Null);
239    }
240
241    let mut test_report_urls = serde_json::Map::new();
242    if let Some(step) = workflow.steps.iter().find(|s| s.name == step_name) {
243        if !step.reports.is_empty() {
244            let backend = crate::db::get_default_sfs_backend(&pool)
245                .await?
246                .context("Default SFS backend required for test reports")?;
247            let client = crate::s3::get_s3_client(&backend).await?;
248            let bucket = backend.config["bucket"]
249                .as_str()
250                .context("Missing bucket")?;
251
252            for report in &step.reports {
253                let report_key = format!(
254                    "test-reports/{}/{}/{}.tar.gz",
255                    run_id, step_instance_id, report.name
256                );
257                let expires = Duration::from_secs(3600);
258                let put_url =
259                    crate::s3::generate_presigned_url(&client, bucket, &report_key, true, expires)
260                        .await?;
261
262                test_report_urls.insert(
263                    report.name.clone(),
264                    serde_json::json!({
265                        "put_url": put_url,
266                        "remote_path": report_key,
267                        "backend_id": backend.id,
268                    }),
269                );
270            }
271        }
272    }
273
274    let payload = serde_json::json!({
275        "run_id": run_id,
276        "step_id": step_instance_id,
277        "step_name": step_name,
278        "step_type": step_type,
279        "spec": resolved_spec,
280        "params": resolved_params,
281        "storage": storage_urls,
282        "test_report_urls": test_report_urls,
283        "timestamp": Utc::now(),
284        "step_dsl": dsl_step_val,
285    });
286
287    let js = async_nats::jetstream::new(nats_client);
288    let subject = format!("stormchaser.step.scheduled.{}", step_type.to_lowercase());
289    js.publish(subject, payload.to_string().into()).await?;
290    Ok(())
291}