stormchaser_engine/handler/
mod.rs1#![allow(clippy::explicit_auto_deref)]
2use serde_json::Value;
3pub mod integrations;
5pub mod runner;
7pub mod step;
9pub mod workflow;
11
12pub use integrations::*;
13pub use runner::*;
14pub use step::*;
15pub use workflow::*;
16
17use anyhow::{Context, Result};
18use sqlx::PgPool;
19use std::sync::Arc;
20use stormchaser_model::step::StepInstance;
21use stormchaser_model::workflow::{RunContext, RunQuotas, WorkflowRun};
22use stormchaser_tls::TlsReloader;
23use tracing::{debug, error, info};
24use uuid::Uuid;
25
26#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
27pub async fn fetch_outputs(
29 run_id: Uuid,
30 executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
31) -> Result<Value> {
32 let rows: Vec<(String, String, Value)> =
33 crate::db::get_step_outputs_for_run(executor, run_id).await?;
34
35 let mut steps_obj = serde_json::Map::new();
36 for (step_name, key, value) in rows {
37 let step_entry = steps_obj
38 .entry(step_name)
39 .or_insert(serde_json::json!({"outputs": {}}));
40 if let Some(outputs) = step_entry
41 .as_object_mut()
42 .and_then(|o| o.get_mut("outputs"))
43 .and_then(|o| o.as_object_mut())
44 {
45 outputs.insert(key, value);
46 }
47 }
48
49 Ok(Value::Object(steps_obj))
50}
51
52#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
53pub async fn fetch_run<'a, E>(run_id: Uuid, executor: E) -> Result<WorkflowRun>
55where
56 E: sqlx::Executor<'a, Database = sqlx::Postgres>,
57{
58 crate::db::get_workflow_run_by_id(executor, run_id)
59 .await
60 .map(|v: WorkflowRun| v)
61 .with_context(|| format!("Failed to fetch workflow run for {}", run_id))
62}
63
64#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
65pub async fn fetch_run_context<'a, E>(run_id: Uuid, executor: E) -> Result<RunContext>
67where
68 E: sqlx::Executor<'a, Database = sqlx::Postgres>,
69{
70 crate::db::get_run_context_by_id(executor, run_id)
71 .await
72 .map(|v: RunContext| v)
73 .with_context(|| format!("Failed to fetch run context for {}", run_id))
74}
75
76#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
77pub async fn fetch_quotas<'a, E>(run_id: Uuid, executor: E) -> Result<RunQuotas>
79where
80 E: sqlx::Executor<'a, Database = sqlx::Postgres>,
81{
82 crate::db::get_run_quota_by_id(executor, run_id)
83 .await
84 .map(|v: RunQuotas| v)
85 .with_context(|| format!("Failed to fetch run quotas for {}", run_id))
86}
87
88#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
89pub async fn fetch_inputs<'a, E>(run_id: Uuid, executor: E) -> Result<Value>
91where
92 E: sqlx::Executor<'a, Database = sqlx::Postgres>,
93{
94 let row: (Value,) = crate::db::get_run_inputs_by_id(executor, run_id)
95 .await
96 .with_context(|| format!("Failed to fetch inputs for {}", run_id))?;
97 Ok(row.0)
98}
99
100#[tracing::instrument(skip(executor), fields(step_id = %step_id))]
101pub async fn fetch_step_instance<'a, E>(step_id: Uuid, executor: E) -> Result<StepInstance>
103where
104 E: sqlx::Executor<'a, Database = sqlx::Postgres>,
105{
106 crate::db::get_step_instance_by_id(executor, step_id)
107 .await?
108 .context("Step instance not found")
109}
110
111#[tracing::instrument(skip(pool, nats_client, tls_reloader), fields(run_id = %run_id))]
112pub async fn dispatch_pending_steps(
114 run_id: Uuid,
115 pool: PgPool,
116 nats_client: async_nats::Client,
117 tls_reloader: Arc<TlsReloader>,
118) -> Result<()> {
119 let quotas = fetch_quotas(run_id, &pool).await?;
121 let running_count: i64 = crate::db::count_running_steps_for_run(&pool, run_id).await?;
122
123 if running_count >= quotas.max_concurrency as i64 {
124 debug!(
125 "Run {}: Max concurrency ({}) reached, not dispatching more steps",
126 run_id, quotas.max_concurrency
127 );
128 return Ok(());
129 }
130
131 let available_slots = (quotas.max_concurrency as i64) - running_count;
132 debug!(
133 "Run {}: {} slots available for concurrent steps",
134 run_id, available_slots
135 );
136
137 let max_cpu = crate::resource_utils::parse_cpu("as.max_cpu).unwrap_or(0.0);
138 let max_mem = crate::resource_utils::parse_memory("as.max_memory).unwrap_or(0);
139
140 let pending_steps: Vec<StepInstance> =
142 crate::db::get_pending_step_instances_for_run(&pool, run_id, available_slots).await?;
143
144 for step in pending_steps {
145 info!("Run {}: Evaluating queued step {}", run_id, step.step_name);
146 let inst_data: (Value, Value) = crate::db::get_step_spec_and_params(&pool, step.id).await?;
148
149 let (cpu_req, mem_req) =
151 crate::resource_utils::get_step_resource_requirements(&step.step_type, &inst_data.0);
152
153 let can_dispatch = if max_cpu > 0.0 || max_mem > 0 {
154 let mut conn = pool.acquire().await?;
155 crate::db::claim_step_quota(&mut *conn, run_id, cpu_req, mem_req, max_cpu, max_mem)
156 .await?
157 } else {
158 true };
160
161 if !can_dispatch {
162 debug!(
163 "Run {}: Insufficient CPU/Memory quota to dispatch step {}",
164 run_id, step.step_name
165 );
166 continue; }
168
169 info!("Run {}: Dispatching queued step {}", run_id, step.step_name);
170
171 if let Err(e) = dispatch_step_instance(
175 run_id,
176 step.id,
177 &step.step_name,
178 &step.step_type,
179 &inst_data.0,
180 &inst_data.1,
181 nats_client.clone(),
182 pool.clone(),
183 tls_reloader.clone(),
184 )
185 .await
186 {
187 error!("Failed to dispatch step {}: {:?}", step.id, e);
188 if max_cpu > 0.0 || max_mem > 0 {
189 let mut conn = pool.acquire().await?;
190 let _ = crate::db::release_step_quota(&mut *conn, run_id, cpu_req, mem_req).await;
191 }
192 }
193 }
194
195 Ok(())
196}
197
198#[tracing::instrument(skip(pool), fields(run_id = %run_id))]
199pub async fn archive_workflow(run_id: Uuid, pool: PgPool) -> Result<()> {
201 use stormchaser_model::workflow::RunStatus;
202 let mut tx = pool.begin().await?;
203
204 let _ = crate::db::lock_workflow_run(&mut *tx, run_id).await?;
206
207 let run: Option<(RunStatus,)> = crate::db::get_workflow_run_status(&mut *tx, run_id).await?;
209
210 let run_status = match run {
211 Some(r) => r.0,
212 None => return Ok(()), };
214
215 match run_status {
216 RunStatus::Succeeded | RunStatus::Failed | RunStatus::Aborted => {
217 }
219 _ => return Ok(()), }
221
222 info!("Archiving workflow run {}", run_id);
223 crate::db::archive_and_delete_workflow_run(&mut *tx, run_id)
224 .await
225 .with_context(|| format!("Failed to archive and delete workflow run {}", run_id))?;
226
227 tx.commit().await?;
228
229 info!("Successfully archived workflow run {}", run_id);
230
231 Ok(())
232}