stormchaser_engine/handler/
mod.rs1#![allow(clippy::explicit_auto_deref)]
2use serde_json::Value;
3use stormchaser_model::workflow::RunStatus;
4use stormchaser_model::RunId;
5use stormchaser_model::StepInstance;
6use stormchaser_model::StepInstanceId;
7pub mod integrations;
9pub mod runner;
11pub mod step;
13pub mod workflow;
15
16pub use integrations::*;
17pub use runner::*;
18pub use step::*;
19pub use workflow::*;
20
21use anyhow::{Context, Result};
22use sqlx::PgPool;
23use std::sync::Arc;
24use stormchaser_model::workflow::{RunContext, RunQuotas, WorkflowRun};
25use stormchaser_tls::TlsReloader;
26use tracing::{debug, error, info};
27
28#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
29pub async fn fetch_outputs(
31 run_id: RunId,
32 executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
33) -> Result<Value> {
34 let rows: Vec<(String, String, Value)> =
35 crate::db::get_step_outputs_for_run(executor, run_id).await?;
36
37 let mut steps_obj = serde_json::Map::new();
38 for (step_name, key, value) in rows {
39 let step_entry = steps_obj
40 .entry(step_name)
41 .or_insert(serde_json::json!({"outputs": {}}));
42 if let Some(outputs) = step_entry
43 .as_object_mut()
44 .and_then(|o| o.get_mut("outputs"))
45 .and_then(|o| o.as_object_mut())
46 {
47 outputs.insert(key, value);
48 }
49 }
50
51 Ok(Value::Object(steps_obj))
52}
53
54#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
55pub async fn fetch_run<'a, E>(run_id: RunId, executor: E) -> Result<WorkflowRun>
57where
58 E: sqlx::Executor<'a, Database = sqlx::Postgres>,
59{
60 crate::db::get_workflow_run_by_id(executor, run_id)
61 .await
62 .map(|v: WorkflowRun| v)
63 .with_context(|| format!("Failed to fetch workflow run for {}", run_id))
64}
65
66#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
67pub async fn fetch_run_context<'a, E>(run_id: RunId, executor: E) -> Result<RunContext>
69where
70 E: sqlx::Executor<'a, Database = sqlx::Postgres>,
71{
72 crate::db::get_run_context_by_id(executor, run_id)
73 .await
74 .map(|v: RunContext| v)
75 .with_context(|| format!("Failed to fetch run context for {}", run_id))
76}
77
78#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
79pub async fn fetch_quotas<'a, E>(run_id: RunId, executor: E) -> Result<RunQuotas>
81where
82 E: sqlx::Executor<'a, Database = sqlx::Postgres>,
83{
84 crate::db::get_run_quota_by_id(executor, run_id)
85 .await
86 .map(|v: RunQuotas| v)
87 .with_context(|| format!("Failed to fetch run quotas for {}", run_id))
88}
89
90#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
91pub async fn fetch_inputs<'a, E>(run_id: RunId, executor: E) -> Result<Value>
93where
94 E: sqlx::Executor<'a, Database = sqlx::Postgres>,
95{
96 let row: (Value,) = crate::db::get_run_inputs_by_id(executor, run_id)
97 .await
98 .with_context(|| format!("Failed to fetch inputs for {}", run_id))?;
99 Ok(row.0)
100}
101
102#[tracing::instrument(skip(executor), fields(step_id = %step_id))]
103pub async fn fetch_step_instance<'a, E>(
105 step_id: StepInstanceId,
106 executor: E,
107) -> Result<StepInstance>
108where
109 E: sqlx::Executor<'a, Database = sqlx::Postgres>,
110{
111 crate::db::get_step_instance_by_id(executor, step_id)
112 .await?
113 .context("Step instance not found")
114}
115
116#[tracing::instrument(skip(pool, nats_client, tls_reloader), fields(run_id = %run_id))]
117pub async fn dispatch_pending_steps(
119 run_id: RunId,
120 pool: PgPool,
121 nats_client: async_nats::Client,
122 tls_reloader: Arc<TlsReloader>,
123) -> Result<()> {
124 let quotas = fetch_quotas(run_id, &pool).await?;
126 let running_count: i64 = crate::db::count_running_steps_for_run(&pool, run_id).await?;
127
128 if running_count >= quotas.max_concurrency as i64 {
129 debug!(
130 "Run {}: Max concurrency ({}) reached, not dispatching more steps",
131 run_id, quotas.max_concurrency
132 );
133 return Ok(());
134 }
135
136 let available_slots = (quotas.max_concurrency as i64) - running_count;
137 debug!(
138 "Run {}: {} slots available for concurrent steps",
139 run_id, available_slots
140 );
141
142 let max_cpu = crate::resource_utils::parse_cpu("as.max_cpu).unwrap_or(0.0);
143 let max_mem = crate::resource_utils::parse_memory("as.max_memory).unwrap_or(0);
144
145 let pending_steps: Vec<StepInstance> =
147 crate::db::get_pending_step_instances_for_run(&pool, run_id, available_slots).await?;
148
149 for step in pending_steps {
150 info!("Run {}: Evaluating queued step {}", run_id, step.step_name);
151 let inst_data: (Value, Value) = crate::db::get_step_spec_and_params(&pool, step.id).await?;
153
154 let (cpu_req, mem_req) =
156 crate::resource_utils::get_step_resource_requirements(&step.step_type, &inst_data.0);
157
158 let can_dispatch = if max_cpu > 0.0 || max_mem > 0 {
159 let mut conn = pool.acquire().await?;
160 crate::db::claim_step_quota(&mut *conn, run_id, cpu_req, mem_req, max_cpu, max_mem)
161 .await?
162 } else {
163 true };
165
166 if !can_dispatch {
167 debug!(
168 "Run {}: Insufficient CPU/Memory quota to dispatch step {}",
169 run_id, step.step_name
170 );
171 continue; }
173
174 info!("Run {}: Dispatching queued step {}", run_id, step.step_name);
175
176 if let Err(e) = dispatch_step_instance(
180 run_id,
181 step.id,
182 &step.step_name,
183 &step.step_type,
184 &inst_data.0,
185 &inst_data.1,
186 nats_client.clone(),
187 pool.clone(),
188 tls_reloader.clone(),
189 )
190 .await
191 {
192 error!("Failed to dispatch step {}: {:?}", step.id, e);
193 if max_cpu > 0.0 || max_mem > 0 {
194 let mut conn = pool.acquire().await?;
195 let _ = crate::db::release_step_quota(&mut *conn, run_id, cpu_req, mem_req).await;
196 }
197 }
198 }
199
200 Ok(())
201}
202
203#[tracing::instrument(skip(pool), fields(run_id = %run_id))]
204pub async fn archive_workflow(run_id: RunId, pool: PgPool) -> Result<()> {
206 let mut tx = pool.begin().await?;
207
208 let _ = crate::db::lock_workflow_run(&mut *tx, run_id).await?;
210
211 let run: Option<(RunStatus,)> = crate::db::get_workflow_run_status(&mut *tx, run_id).await?;
213
214 let run_status = match run {
215 Some(r) => r.0,
216 None => return Ok(()), };
218
219 match run_status {
220 RunStatus::Succeeded | RunStatus::Failed | RunStatus::Aborted => {
221 }
223 _ => return Ok(()), }
225
226 info!("Archiving workflow run {}", run_id);
227 crate::db::archive_and_delete_workflow_run(&mut *tx, run_id)
228 .await
229 .with_context(|| format!("Failed to archive and delete workflow run {}", run_id))?;
230
231 tx.commit().await?;
232
233 info!("Successfully archived workflow run {}", run_id);
234
235 Ok(())
236}