1#![allow(clippy::explicit_auto_deref)]
2use crate::handler::StepInstance;
3use crate::handler::{
4 archive_workflow, dispatch_pending_steps, fetch_outputs, fetch_run, fetch_run_context,
5 fetch_step_instance,
6};
7use crate::workflow_machine::{state, WorkflowMachine};
8use anyhow::{Context, Result};
9use chrono::Utc;
10use serde_json::Value;
11use sqlx::PgPool;
12use std::sync::Arc;
13use stormchaser_dsl::ast::{self, Workflow};
14use stormchaser_model::dsl::OutputExtraction;
15use stormchaser_model::events::WorkflowCompletedEvent;
16use stormchaser_model::events::{EventSource, EventType, WorkflowEventType};
17use stormchaser_model::nats::publish_cloudevent;
18use stormchaser_model::step::StepStatus;
19use stormchaser_model::LogBackend;
20use stormchaser_tls::TlsReloader;
21use tracing::{debug, error, info};
22use uuid::Uuid;
23
24use crate::handler::step::dispatch::{dispatch_step_instance, find_step};
25use crate::handler::step::quota::release_step_quota_for_instance;
26use crate::handler::step::scheduling::schedule_step;
27
28use super::helpers::persist_step_test_reports;
29
30#[tracing::instrument(skip(event, pool, nats_client, log_backend, tls_reloader), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
31pub async fn handle_step_completed(
33 event: stormchaser_model::events::StepCompletedEvent,
34 pool: PgPool,
35 nats_client: async_nats::Client,
36 log_backend: Arc<Option<LogBackend>>,
37 tls_reloader: Arc<TlsReloader>,
38) -> Result<()> {
39 let run_id = event.run_id;
40 let step_id = event.step_id;
41
42 let span = tracing::Span::current();
43 span.record("run_id", tracing::field::display(run_id));
44 span.record("step_id", tracing::field::display(step_id));
45
46 info!("Step {} (Run {}) completed successfully", step_id, run_id);
47
48 let mut tx = pool.begin().await?;
49
50 if crate::db::lock_workflow_run(&mut *tx, run_id)
52 .await?
53 .is_none()
54 {
55 debug!(
56 "Workflow run {} already archived or missing, skipping handler",
57 run_id
58 );
59 return Ok(());
60 }
61
62 let instance = fetch_step_instance(step_id, &mut *tx).await?;
64
65 if instance.status == StepStatus::Succeeded || instance.status == StepStatus::Failed {
67 return Ok(());
68 }
69
70 let _ = release_step_quota_for_instance(&mut *tx, run_id, step_id).await;
71
72 let machine =
73 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
74 instance.clone(),
75 );
76 let _ = machine.succeed(&mut *tx).await?;
77
78 let attributes = [
79 opentelemetry::KeyValue::new("step_name", instance.step_name),
80 opentelemetry::KeyValue::new("step_type", instance.step_type),
81 opentelemetry::KeyValue::new("runner_id", instance.runner_id.unwrap_or_default()),
82 ];
83
84 crate::STEPS_COMPLETED.add(1, &attributes);
85
86 if let Some(started_at) = instance.started_at {
87 let duration = (Utc::now() - started_at).to_std().unwrap_or_default();
88 crate::STEP_DURATION.record(duration.as_secs_f64(), &attributes);
89 }
90
91 let all_steps: Vec<StepInstance> =
93 crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
94
95 if let Some(outputs) = event
97 .outputs
98 .as_ref()
99 .map(|m| {
100 m.clone()
101 .into_iter()
102 .collect::<serde_json::Map<String, serde_json::Value>>()
103 })
104 .as_ref()
105 {
106 for (key, value) in outputs {
107 crate::db::upsert_step_output(&mut *tx, step_id, key, value).await?;
108 }
109 }
110
111 if let Some(storage_hashes) = event
113 .storage_hashes
114 .as_ref()
115 .map(|m| {
116 m.clone()
117 .into_iter()
118 .collect::<serde_json::Map<String, serde_json::Value>>()
119 })
120 .as_ref()
121 {
122 for (name, hash_val) in storage_hashes {
123 if let Some(hash) = hash_val.as_str() {
124 crate::db::upsert_run_storage_state(&mut *tx, run_id.into_inner(), name, hash)
125 .await?;
126 }
127 }
128 }
129
130 if let Some(artifacts) = event
132 .artifacts
133 .as_ref()
134 .map(|m| {
135 m.clone()
136 .into_iter()
137 .collect::<serde_json::Map<String, serde_json::Value>>()
138 })
139 .as_ref()
140 {
141 let context = fetch_run_context(run_id, &mut *tx).await?;
142 let workflow: Workflow = serde_json::from_value(context.workflow_definition)
143 .context("Failed to parse workflow definition from DB")?;
144
145 for (name, meta) in artifacts {
146 let storage = workflow
148 .storage
149 .iter()
150 .find(|s| s.artifacts.iter().any(|a| a.name == *name));
151 if let Some(s) = storage {
152 let backend_id: Option<Uuid> = if let Some(ref backend_name) = s.backend {
153 crate::db::get_storage_backend_id_by_name(&mut *tx, backend_name).await?
154 } else {
155 crate::db::get_default_sfs_backend_id(&mut *tx).await?
156 };
157
158 if let Some(bid) = backend_id {
159 let remote_path = format!("artifacts/{}/{}/{}", run_id, s.name, name);
160 crate::db::insert_artifact_registry(
161 &mut *tx,
162 run_id,
163 step_id,
164 name,
165 stormchaser_model::BackendId::new(bid),
166 remote_path,
167 meta.clone(),
168 )
169 .await?;
170 }
171 }
172 }
173 }
174
175 persist_step_test_reports(event.test_reports.as_ref(), &mut tx, run_id, step_id, &pool).await?;
177
178 let context = fetch_run_context(run_id, &mut *tx).await?;
180 let workflow: Workflow = serde_json::from_value(context.workflow_definition)
181 .context("Failed to parse workflow definition from DB")?;
182
183 let all_steps_initial: Vec<StepInstance> =
184 crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
185
186 let current_step_instance = all_steps_initial
187 .iter()
188 .find(|s| s.id.into_inner() == step_id.into_inner())
189 .context("Completed step not found in DB")?;
190
191 let mut dsl_step = find_step(&workflow.steps, ¤t_step_instance.step_name).cloned();
192
193 if let Some(step) = &mut dsl_step {
194 if step.r#type == "TerraformApply" {
195 step.outputs.push(OutputExtraction {
196 name: "terraform".to_string(),
197 source: "stdout".to_string(),
198 marker: Some("--- TF OUTPUTS ---".to_string()),
199 format: Some("json".to_string()),
200 regex: Some(r"--- TF OUTPUTS ---\s*(.*)".to_string()),
201 group: Some(1),
202 sensitive: Some(false),
203 });
204 } else if step.r#type == "TerraformPlan" {
205 step.outputs.push(OutputExtraction {
206 name: "plan_summary".to_string(),
207 source: "stdout".to_string(),
208 marker: Some("--- TF PLAN SUMMARY ---".to_string()),
209 format: Some("string".to_string()),
210 regex: Some(r"--- TF PLAN SUMMARY ---\s*(.*)".to_string()),
211 group: Some(1),
212 sensitive: Some(false),
213 });
214 step.outputs.push(OutputExtraction {
215 name: "plan_json".to_string(),
216 source: "stdout".to_string(),
217 marker: Some("--- TF PLAN JSON ---".to_string()),
218 format: Some("json".to_string()),
219 regex: Some(r"--- TF PLAN JSON ---\s*(.*)".to_string()),
220 group: Some(1),
221 sensitive: Some(false),
222 });
223 }
224 }
225
226 if let (Some(dsl_step), Some(backend)) = (&dsl_step, &*log_backend) {
227 if !dsl_step.outputs.is_empty() {
228 tracing::info!("Scraping outputs from logs for step {}", dsl_step.name);
229 let logs = backend
230 .fetch_step_logs(
231 &dsl_step.name,
232 step_id,
233 current_step_instance.started_at,
234 current_step_instance.finished_at,
235 Some(5000), )
237 .await
238 .unwrap_or_default();
239
240 let mut filtered_logs = Vec::new();
241 let mut in_output_block = dsl_step.start_marker.is_none();
242
243 for line in &logs {
244 if let Some(start) = &dsl_step.start_marker {
245 if line.contains(start) {
246 in_output_block = true;
247 continue;
248 }
249 }
250 if let Some(end) = &dsl_step.end_marker {
251 if line.contains(end) {
252 in_output_block = false;
253 continue;
254 }
255 }
256 if in_output_block {
257 filtered_logs.push(line);
258 }
259 }
260
261 for extraction in &dsl_step.outputs {
262 if extraction.source == "logs" || extraction.source == "stdout" {
263 if let Some(regex_str) = &extraction.regex {
264 if let Ok(re) = regex::Regex::new(regex_str) {
265 for line in filtered_logs.iter().rev() {
266 if let Some(caps) = re.captures(line) {
267 let value = if let Some(marker) = &extraction.marker {
268 if line.contains(marker) {
269 caps.get(extraction.group.unwrap_or(1) as usize)
270 .map(|m| m.as_str().to_string())
271 } else {
272 None
273 }
274 } else {
275 caps.get(extraction.group.unwrap_or(1) as usize)
276 .map(|m| m.as_str().to_string())
277 };
278
279 if let Some(val) = value {
280 let final_val =
281 if extraction.format.as_deref() == Some("json") {
282 serde_json::from_str(&val)
283 .unwrap_or(serde_json::json!(val))
284 } else {
285 serde_json::json!(val)
286 };
287
288 crate::db::upsert_step_output_with_sensitivity(
289 &mut *tx,
290 step_id,
291 &extraction.name,
292 &final_val,
293 extraction.sensitive.unwrap_or(false),
294 )
295 .await?;
296 break;
297 }
298 }
299 }
300 }
301 }
302 }
303 }
304 }
305 }
306
307 if let Some(dsl_step) = dsl_step {
308 if !process_step_completion(
309 dsl_step,
310 &all_steps,
311 run_id,
312 &mut *tx,
313 nats_client.clone(),
314 pool.clone(),
315 tls_reloader.clone(),
316 context.inputs.clone(),
317 &workflow,
318 )
319 .await?
320 {
321 tx.commit().await?;
322 return Ok(());
323 }
324 }
325
326 let all_steps_final: Vec<StepInstance> =
328 crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
329
330 let all_dsl_steps_done = workflow.steps.iter().all(|dsl_step| {
331 let step_instances: Vec<&StepInstance> = all_steps_final
332 .iter()
333 .filter(|s| s.step_name == dsl_step.name)
334 .collect();
335
336 !step_instances.is_empty()
337 && step_instances
338 .iter()
339 .all(|s| s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped)
340 });
341
342 if all_dsl_steps_done {
343 let run = fetch_run(run_id, &mut *tx).await?;
344 let machine = WorkflowMachine::<state::Running>::new_from_run(run.clone());
345 let _ = machine.succeed(&mut *tx).await?;
346
347 let js = async_nats::jetstream::new(nats_client.clone());
348 use stormchaser_model::nats::NatsSubject;
349 if let Err(e) = publish_cloudevent(
350 &js,
351 NatsSubject::RunCompleted,
352 EventType::Workflow(WorkflowEventType::Completed),
353 EventSource::Engine,
354 serde_json::to_value(WorkflowCompletedEvent {
355 run_id,
356 event_type: EventType::Workflow(WorkflowEventType::Completed),
357 timestamp: chrono::Utc::now(),
358 })
359 .unwrap(),
360 None,
361 None,
362 )
363 .await
364 {
365 error!(
366 "Failed to publish workflow completed event for {}: {:?}",
367 run_id, e
368 );
369 }
370
371 crate::RUNS_COMPLETED.add(
372 1,
373 &[
374 opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
375 opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
376 ],
377 );
378
379 tx.commit().await?;
380 info!(
381 "Workflow run {} completed successfully, archiving...",
382 run_id
383 );
384 archive_workflow(run_id, pool.clone()).await?;
385 return Ok(());
386 } else {
387 tx.commit().await?;
388 }
389
390 if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
391 error!(
392 "Failed to dispatch pending steps for run {}: {:?}",
393 run_id, e
394 );
395 }
396
397 Ok(())
398}
399
400#[allow(clippy::too_many_arguments)]
401async fn process_step_completion(
402 dsl_step: ast::Step,
403 all_steps: &[StepInstance],
404 run_id: stormchaser_model::RunId,
405 tx: &mut sqlx::PgConnection,
406 nats_client: async_nats::Client,
407 pool: PgPool,
408 tls_reloader: Arc<TlsReloader>,
409 inputs: serde_json::Value,
410 workflow: &ast::Workflow,
411) -> Result<bool> {
412 let all_instances_of_this_step: Vec<&StepInstance> = all_steps
414 .iter()
415 .filter(|s| s.step_name == dsl_step.name)
416 .collect();
417
418 let finished_instances = all_instances_of_this_step
419 .iter()
420 .filter(|s| s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped)
421 .count();
422
423 let total_instances = all_instances_of_this_step.len();
424
425 if finished_instances < total_instances {
426 let waiting_instances: Vec<&&StepInstance> = all_instances_of_this_step
428 .iter()
429 .filter(|s| s.status == StepStatus::WaitingForEvent)
430 .collect();
431
432 if !waiting_instances.is_empty() {
433 let running_or_pending = all_instances_of_this_step
434 .iter()
435 .filter(|s| s.status == StepStatus::Running || s.status == StepStatus::Pending)
436 .count();
437
438 let max_parallel = dsl_step
439 .strategy
440 .as_ref()
441 .and_then(|s| s.max_parallel)
442 .unwrap_or(u32::MAX);
443
444 if (running_or_pending as u32) < max_parallel {
445 let to_schedule = max_parallel - (running_or_pending as u32);
446 for next_instance in waiting_instances.iter().take(to_schedule as usize) {
447 let machine = crate::step_machine::StepMachine::<
448 crate::step_machine::state::WaitingForEvent,
449 >::from_instance((**next_instance).clone());
450 let _ = machine.reschedule(&mut *tx).await?;
451
452 let inst_data: (Value, Value) =
453 crate::db::get_step_spec_and_params(&mut *tx, next_instance.id).await?;
454
455 dispatch_step_instance(
456 run_id,
457 next_instance.id,
458 &dsl_step.name,
459 &dsl_step.r#type,
460 &inst_data.0,
461 &inst_data.1,
462 nats_client.clone(),
463 pool.clone(),
464 tls_reloader.clone(),
465 )
466 .await?;
467 }
468 }
469 }
470 return Ok(false);
471 }
472
473 if !dsl_step.next.is_empty() {
475 let hcl_ctx =
476 crate::hcl_eval::create_context(inputs, run_id, fetch_outputs(run_id, &mut *tx).await?);
477
478 for next_step_name in &dsl_step.next {
479 let predecessors: Vec<&ast::Step> = workflow
480 .steps
481 .iter()
482 .filter(|s| s.next.contains(next_step_name))
483 .collect();
484
485 let all_predecessors_done = predecessors.iter().all(|pred_dsl| {
486 let pred_instances: Vec<&StepInstance> = all_steps
487 .iter()
488 .filter(|s| s.step_name == pred_dsl.name)
489 .collect();
490
491 !pred_instances.is_empty()
492 && pred_instances.iter().all(|s| {
493 s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped
494 })
495 });
496
497 if all_predecessors_done {
498 if let Some(next_dsl) = workflow.steps.iter().find(|s| s.name == *next_step_name) {
499 #[allow(clippy::explicit_auto_deref)]
500 schedule_step(
501 run_id,
502 next_dsl,
503 &mut *tx,
504 nats_client.clone(),
505 &hcl_ctx,
506 pool.clone(),
507 workflow,
508 )
509 .await?;
510 }
511 }
512 }
513 }
514
515 Ok(true)
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521
522 #[tokio::test]
523 #[ignore]
524 async fn test_process_step_completion_compiles() {
525 let _f = process_step_completion;
526 }
527}