stormchaser_engine/handler/step/
dispatch.rs1use anyhow::{Context, Result};
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::PgPool;
5use std::sync::Arc;
6use std::time::Duration;
7use stormchaser_model::dsl::Step;
8use stormchaser_tls::TlsReloader;
9use uuid::Uuid;
10
11use crate::handler::fetch_run_context;
12
13use stormchaser_model::dsl;
14use stormchaser_model::storage;
15use stormchaser_model::storage::BackendType;
16
17pub fn find_step<'a>(steps: &'a [Step], name: &str) -> Option<&'a Step> {
19 for step in steps {
20 if step.name == name {
21 return Some(step);
22 }
23 if let Some(inner) = &step.steps {
24 if let Some(found) = find_step(inner, name) {
25 return Some(found);
26 }
27 }
28 }
29 None
30}
31
32#[allow(clippy::too_many_arguments)]
34pub async fn dispatch_step_instance(
35 run_id: Uuid,
36 step_instance_id: Uuid,
37 step_name: &str,
38 step_type: &str,
39 resolved_spec: &Value,
40 resolved_params: &Value,
41 nats_client: async_nats::Client,
42 pool: PgPool,
43 tls_reloader: Arc<TlsReloader>,
44) -> Result<()> {
45 let mut step_type = step_type.to_string();
46 let mut resolved_spec = resolved_spec.clone();
47
48 super::intrinsic::git_checkout::mutate(&mut step_type, &mut resolved_spec);
49 super::intrinsic::jq::mutate_if_has_files(&mut step_type, &mut resolved_spec);
50
51 let run_context = fetch_run_context(run_id, &pool).await?;
52 let workflow: dsl::Workflow = serde_json::from_value(run_context.workflow_definition.clone())
53 .context("Failed to parse workflow definition from context")?;
54
55 let mut storage_urls = serde_json::Map::new();
56
57 if !workflow.storage.is_empty() {
58 for storage in workflow.storage {
59 let backend: Option<storage::StorageBackend> =
60 if let Some(ref backend_name) = storage.backend {
61 crate::db::get_storage_backend_by_name(&pool, backend_name).await?
62 } else {
63 crate::db::get_default_sfs_backend(&pool).await?
64 };
65
66 if let Some(backend) = backend {
67 let mut get_url = None;
68 let mut put_url = None;
69
70 if backend.backend_type == BackendType::S3 {
71 let client = crate::s3::get_s3_client(&backend).await?;
72 let bucket = backend.config["bucket"]
73 .as_str()
74 .context("Missing bucket in SFS backend config")?;
75
76 let key = format!("{}/{}.tar.gz", run_id, storage.name);
77 let expires = Duration::from_secs(3600);
78
79 get_url = Some(
80 crate::s3::generate_presigned_url(&client, bucket, &key, false, expires)
81 .await?,
82 );
83 put_url = Some(
84 crate::s3::generate_presigned_url(&client, bucket, &key, true, expires)
85 .await?,
86 );
87 }
88
89 let last_hash: Option<(String,)> =
90 crate::db::get_run_storage_last_hash(&pool, run_id, &storage.name).await?;
91
92 let mut artifacts_data = serde_json::Map::new();
93 for artifact in storage.artifacts {
94 let instructions = crate::artifact::generate_parking_instructions(
95 &backend,
96 run_id,
97 &storage.name,
98 &artifact,
99 )
100 .await?;
101 artifacts_data.insert(artifact.name.clone(), instructions);
102 }
103
104 let mut provision_data = Vec::new();
105 for mut prov in storage.provision {
106 if let Some(url) = &prov.url {
107 let mut val = Value::String(url.clone());
108 let hcl_ctx = crate::hcl_eval::create_context(
109 run_context.inputs.clone(),
110 run_id,
111 run_context.secrets.clone(),
112 );
113 if crate::hcl_eval::resolve_expressions(&mut val, &hcl_ctx).is_ok() {
114 prov.url = match val {
115 Value::String(s) => Some(s),
116 other => Some(other.to_string()),
117 };
118 }
119 }
120 provision_data.push(prov);
121 }
122
123 storage_urls.insert(
124 storage.name.clone(),
125 serde_json::json!({
126 "get_url": get_url,
127 "put_url": put_url,
128 "expected_hash": last_hash.map(|h| h.0),
129 "artifacts": artifacts_data,
130 "provision": provision_data,
131 }),
132 );
133 }
134 }
135 }
136
137 if super::intrinsic::wasm::try_dispatch(
138 run_id,
139 step_instance_id,
140 &step_type,
141 &resolved_spec,
142 resolved_params,
143 pool.clone(),
144 nats_client.clone(),
145 tls_reloader.clone(),
146 )
147 .await?
148 {
149 return Ok(());
150 }
151
152 if super::intrinsic::lambda::try_dispatch(
153 run_id,
154 step_instance_id,
155 &step_type,
156 &resolved_spec,
157 pool.clone(),
158 nats_client.clone(),
159 tls_reloader.clone(),
160 )
161 .await?
162 {
163 return Ok(());
164 }
165
166 if super::intrinsic::webhook::try_dispatch(
167 run_id,
168 step_instance_id,
169 &step_type,
170 &resolved_spec,
171 pool.clone(),
172 nats_client.clone(),
173 tls_reloader.clone(),
174 )
175 .await?
176 {
177 return Ok(());
178 }
179
180 if super::intrinsic::jinja::try_dispatch(
181 run_id,
182 step_instance_id,
183 &step_type,
184 &resolved_spec,
185 pool.clone(),
186 nats_client.clone(),
187 tls_reloader.clone(),
188 )
189 .await?
190 {
191 return Ok(());
192 }
193
194 if super::intrinsic::email::try_dispatch(
195 run_id,
196 step_instance_id,
197 &step_type,
198 &resolved_spec,
199 pool.clone(),
200 nats_client.clone(),
201 tls_reloader.clone(),
202 )
203 .await?
204 {
205 return Ok(());
206 }
207
208 if super::intrinsic::test_report_email::try_dispatch(
209 run_id,
210 step_instance_id,
211 &step_type,
212 &resolved_spec,
213 pool.clone(),
214 nats_client.clone(),
215 tls_reloader.clone(),
216 )
217 .await?
218 {
219 return Ok(());
220 }
221
222 if super::intrinsic::jq::try_dispatch(
223 run_id,
224 step_instance_id,
225 &step_type,
226 &resolved_spec,
227 pool.clone(),
228 nats_client.clone(),
229 tls_reloader.clone(),
230 )
231 .await?
232 {
233 return Ok(());
234 }
235
236 let mut dsl_step_val = Value::Null;
237 if let Some(found_step) = find_step(&workflow.steps, step_name) {
238 dsl_step_val = serde_json::to_value(found_step).unwrap_or(Value::Null);
239 }
240
241 let mut test_report_urls = serde_json::Map::new();
242 if let Some(step) = workflow.steps.iter().find(|s| s.name == step_name) {
243 if !step.reports.is_empty() {
244 let backend = crate::db::get_default_sfs_backend(&pool)
245 .await?
246 .context("Default SFS backend required for test reports")?;
247 let client = crate::s3::get_s3_client(&backend).await?;
248 let bucket = backend.config["bucket"]
249 .as_str()
250 .context("Missing bucket")?;
251
252 for report in &step.reports {
253 let report_key = format!(
254 "test-reports/{}/{}/{}.tar.gz",
255 run_id, step_instance_id, report.name
256 );
257 let expires = Duration::from_secs(3600);
258 let put_url =
259 crate::s3::generate_presigned_url(&client, bucket, &report_key, true, expires)
260 .await?;
261
262 test_report_urls.insert(
263 report.name.clone(),
264 serde_json::json!({
265 "put_url": put_url,
266 "remote_path": report_key,
267 "backend_id": backend.id,
268 }),
269 );
270 }
271 }
272 }
273
274 let payload = serde_json::json!({
275 "run_id": run_id,
276 "step_id": step_instance_id,
277 "step_name": step_name,
278 "step_type": step_type,
279 "spec": resolved_spec,
280 "params": resolved_params,
281 "storage": storage_urls,
282 "test_report_urls": test_report_urls,
283 "timestamp": Utc::now(),
284 "step_dsl": dsl_step_val,
285 });
286
287 let js = async_nats::jetstream::new(nats_client);
288 let subject = format!("stormchaser.step.scheduled.{}", step_type.to_lowercase());
289 js.publish(subject, payload.to_string().into()).await?;
290 Ok(())
291}