Skip to main content

stormchaser_engine/handler/
mod.rs

1#![allow(clippy::explicit_auto_deref)]
2use serde_json::Value;
3use stormchaser_model::workflow::RunStatus;
4use stormchaser_model::RunId;
5use stormchaser_model::StepInstance;
6use stormchaser_model::StepInstanceId;
7/// Module for integrations.
8pub mod integrations;
9/// Module for runner.
10pub mod runner;
11/// Module for step.
12pub mod step;
13/// Module for workflow.
14pub mod workflow;
15
16pub use integrations::*;
17pub use runner::*;
18pub use step::*;
19pub use workflow::*;
20
21use anyhow::{Context, Result};
22use sqlx::PgPool;
23use std::sync::Arc;
24use stormchaser_model::workflow::{RunContext, RunQuotas, WorkflowRun};
25use stormchaser_tls::TlsReloader;
26use tracing::{debug, error, info};
27
28#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
29/// Fetch outputs.
30pub async fn fetch_outputs(
31    run_id: RunId,
32    executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
33) -> Result<Value> {
34    let rows: Vec<(String, String, Value)> =
35        crate::db::get_step_outputs_for_run(executor, run_id).await?;
36
37    let mut steps_obj = serde_json::Map::new();
38    for (step_name, key, value) in rows {
39        let step_entry = steps_obj
40            .entry(step_name)
41            .or_insert(serde_json::json!({"outputs": {}}));
42        if let Some(outputs) = step_entry
43            .as_object_mut()
44            .and_then(|o| o.get_mut("outputs"))
45            .and_then(|o| o.as_object_mut())
46        {
47            outputs.insert(key, value);
48        }
49    }
50
51    Ok(Value::Object(steps_obj))
52}
53
54#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
55/// Fetch run.
56pub async fn fetch_run<'a, E>(run_id: RunId, executor: E) -> Result<WorkflowRun>
57where
58    E: sqlx::Executor<'a, Database = sqlx::Postgres>,
59{
60    crate::db::get_workflow_run_by_id(executor, run_id)
61        .await
62        .map(|v: WorkflowRun| v)
63        .with_context(|| format!("Failed to fetch workflow run for {}", run_id))
64}
65
66#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
67/// Fetch run context.
68pub async fn fetch_run_context<'a, E>(run_id: RunId, executor: E) -> Result<RunContext>
69where
70    E: sqlx::Executor<'a, Database = sqlx::Postgres>,
71{
72    crate::db::get_run_context_by_id(executor, run_id)
73        .await
74        .map(|v: RunContext| v)
75        .with_context(|| format!("Failed to fetch run context for {}", run_id))
76}
77
78#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
79/// Fetch quotas.
80pub async fn fetch_quotas<'a, E>(run_id: RunId, executor: E) -> Result<RunQuotas>
81where
82    E: sqlx::Executor<'a, Database = sqlx::Postgres>,
83{
84    crate::db::get_run_quota_by_id(executor, run_id)
85        .await
86        .map(|v: RunQuotas| v)
87        .with_context(|| format!("Failed to fetch run quotas for {}", run_id))
88}
89
90#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
91/// Fetch inputs.
92pub async fn fetch_inputs<'a, E>(run_id: RunId, executor: E) -> Result<Value>
93where
94    E: sqlx::Executor<'a, Database = sqlx::Postgres>,
95{
96    let row: (Value,) = crate::db::get_run_inputs_by_id(executor, run_id)
97        .await
98        .with_context(|| format!("Failed to fetch inputs for {}", run_id))?;
99    Ok(row.0)
100}
101
102#[tracing::instrument(skip(executor), fields(step_id = %step_id))]
103/// Fetch step instance.
104pub async fn fetch_step_instance<'a, E>(
105    step_id: StepInstanceId,
106    executor: E,
107) -> Result<StepInstance>
108where
109    E: sqlx::Executor<'a, Database = sqlx::Postgres>,
110{
111    crate::db::get_step_instance_by_id(executor, step_id)
112        .await?
113        .context("Step instance not found")
114}
115
116#[tracing::instrument(skip(pool, nats_client, tls_reloader), fields(run_id = %run_id))]
117/// Dispatch pending steps.
118pub async fn dispatch_pending_steps(
119    run_id: RunId,
120    pool: PgPool,
121    nats_client: async_nats::Client,
122    tls_reloader: Arc<TlsReloader>,
123) -> Result<()> {
124    // 1. Fetch Quotas and Current Running Count
125    let quotas = fetch_quotas(run_id, &pool).await?;
126    let running_count: i64 = crate::db::count_running_steps_for_run(&pool, run_id).await?;
127
128    if running_count >= quotas.max_concurrency as i64 {
129        debug!(
130            "Run {}: Max concurrency ({}) reached, not dispatching more steps",
131            run_id, quotas.max_concurrency
132        );
133        return Ok(());
134    }
135
136    let available_slots = (quotas.max_concurrency as i64) - running_count;
137    debug!(
138        "Run {}: {} slots available for concurrent steps",
139        run_id, available_slots
140    );
141
142    let max_cpu = crate::resource_utils::parse_cpu(&quotas.max_cpu).unwrap_or(0.0);
143    let max_mem = crate::resource_utils::parse_memory(&quotas.max_memory).unwrap_or(0);
144
145    // 2. Fetch Pending steps (excluding Approval/Wait which are handled via events)
146    let pending_steps: Vec<StepInstance> =
147        crate::db::get_pending_step_instances_for_run(&pool, run_id, available_slots).await?;
148
149    for step in pending_steps {
150        info!("Run {}: Evaluating queued step {}", run_id, step.step_name);
151        // We need to fetch the resolved spec and params for this step
152        let inst_data: (Value, Value) = crate::db::get_step_spec_and_params(&pool, step.id).await?;
153
154        // 3. Enforce CPU and Memory quotas before dispatching
155        let (cpu_req, mem_req) =
156            crate::resource_utils::get_step_resource_requirements(&step.step_type, &inst_data.0);
157
158        let can_dispatch = if max_cpu > 0.0 || max_mem > 0 {
159            let mut conn = pool.acquire().await?;
160            crate::db::claim_step_quota(&mut *conn, run_id, cpu_req, mem_req, max_cpu, max_mem)
161                .await?
162        } else {
163            true // If no quota configured, allow dispatch
164        };
165
166        if !can_dispatch {
167            debug!(
168                "Run {}: Insufficient CPU/Memory quota to dispatch step {}",
169                run_id, step.step_name
170            );
171            continue; // Try next step, maybe it's smaller and fits
172        }
173
174        info!("Run {}: Dispatching queued step {}", run_id, step.step_name);
175
176        // Dispatch can fail, but we'll assume success for the quota claim.
177        // If it fails, the step state handles it, and we might leak quota until the step completes/fails.
178        // Ideally we'd release it on immediate dispatch failure, but we'll stick to the completion hooks for now.
179        if let Err(e) = dispatch_step_instance(
180            run_id,
181            step.id,
182            &step.step_name,
183            &step.step_type,
184            &inst_data.0,
185            &inst_data.1,
186            nats_client.clone(),
187            pool.clone(),
188            tls_reloader.clone(),
189        )
190        .await
191        {
192            error!("Failed to dispatch step {}: {:?}", step.id, e);
193            if max_cpu > 0.0 || max_mem > 0 {
194                let mut conn = pool.acquire().await?;
195                let _ = crate::db::release_step_quota(&mut *conn, run_id, cpu_req, mem_req).await;
196            }
197        }
198    }
199
200    Ok(())
201}
202
203#[tracing::instrument(skip(pool), fields(run_id = %run_id))]
204/// Archive workflow.
205pub async fn archive_workflow(run_id: RunId, pool: PgPool) -> Result<()> {
206    let mut tx = pool.begin().await?;
207
208    // Lock the workflow run to serialize archiving
209    let _ = crate::db::lock_workflow_run(&mut *tx, run_id).await?;
210
211    // Check if it's already archived or if it's terminal
212    let run: Option<(RunStatus,)> = crate::db::get_workflow_run_status(&mut *tx, run_id).await?;
213
214    let run_status = match run {
215        Some(r) => r.0,
216        None => return Ok(()), // Already archived or doesn't exist
217    };
218
219    match run_status {
220        RunStatus::Succeeded | RunStatus::Failed | RunStatus::Aborted => {
221            // Proceed with archiving
222        }
223        _ => return Ok(()), // Not terminal
224    }
225
226    info!("Archiving workflow run {}", run_id);
227    crate::db::archive_and_delete_workflow_run(&mut *tx, run_id)
228        .await
229        .with_context(|| format!("Failed to archive and delete workflow run {}", run_id))?;
230
231    tx.commit().await?;
232
233    info!("Successfully archived workflow run {}", run_id);
234
235    Ok(())
236}