#![allow(clippy::explicit_auto_deref)]
use serde_json::Value;
use stormchaser_model::workflow::RunStatus;
use stormchaser_model::RunId;
use stormchaser_model::StepInstance;
use stormchaser_model::StepInstanceId;
pub mod integrations;
pub mod runner;
pub mod step;
pub mod workflow;
pub use integrations::*;
pub use runner::*;
pub use step::*;
pub use workflow::*;
use anyhow::{Context, Result};
use sqlx::PgPool;
use std::sync::Arc;
use stormchaser_model::workflow::{RunContext, RunQuotas, WorkflowRun};
use stormchaser_tls::TlsReloader;
use tracing::{debug, error, info};
#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
pub async fn fetch_outputs(
run_id: RunId,
executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Value> {
let rows: Vec<(String, String, Value)> =
crate::db::get_step_outputs_for_run(executor, run_id).await?;
let mut steps_obj = serde_json::Map::new();
for (step_name, key, value) in rows {
let step_entry = steps_obj
.entry(step_name)
.or_insert(serde_json::json!({"outputs": {}}));
if let Some(outputs) = step_entry
.as_object_mut()
.and_then(|o| o.get_mut("outputs"))
.and_then(|o| o.as_object_mut())
{
outputs.insert(key, value);
}
}
Ok(Value::Object(steps_obj))
}
#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
pub async fn fetch_run<'a, E>(run_id: RunId, executor: E) -> Result<WorkflowRun>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
crate::db::get_workflow_run_by_id(executor, run_id)
.await
.map(|v: WorkflowRun| v)
.with_context(|| format!("Failed to fetch workflow run for {}", run_id))
}
#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
pub async fn fetch_run_context<'a, E>(run_id: RunId, executor: E) -> Result<RunContext>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
crate::db::get_run_context_by_id(executor, run_id)
.await
.map(|v: RunContext| v)
.with_context(|| format!("Failed to fetch run context for {}", run_id))
}
#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
pub async fn fetch_quotas<'a, E>(run_id: RunId, executor: E) -> Result<RunQuotas>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
crate::db::get_run_quota_by_id(executor, run_id)
.await
.map(|v: RunQuotas| v)
.with_context(|| format!("Failed to fetch run quotas for {}", run_id))
}
#[tracing::instrument(skip(executor), fields(run_id = %run_id))]
pub async fn fetch_inputs<'a, E>(run_id: RunId, executor: E) -> Result<Value>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
let row: (Value,) = crate::db::get_run_inputs_by_id(executor, run_id)
.await
.with_context(|| format!("Failed to fetch inputs for {}", run_id))?;
Ok(row.0)
}
#[tracing::instrument(skip(executor), fields(step_id = %step_id))]
pub async fn fetch_step_instance<'a, E>(
step_id: StepInstanceId,
executor: E,
) -> Result<StepInstance>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
crate::db::get_step_instance_by_id(executor, step_id)
.await?
.context("Step instance not found")
}
#[tracing::instrument(skip(pool, nats_client, tls_reloader), fields(run_id = %run_id))]
pub async fn dispatch_pending_steps(
run_id: RunId,
pool: PgPool,
nats_client: async_nats::Client,
tls_reloader: Arc<TlsReloader>,
) -> Result<()> {
let quotas = fetch_quotas(run_id, &pool).await?;
let running_count: i64 = crate::db::count_running_steps_for_run(&pool, run_id).await?;
if running_count >= quotas.max_concurrency as i64 {
debug!(
"Run {}: Max concurrency ({}) reached, not dispatching more steps",
run_id, quotas.max_concurrency
);
return Ok(());
}
let available_slots = (quotas.max_concurrency as i64) - running_count;
debug!(
"Run {}: {} slots available for concurrent steps",
run_id, available_slots
);
let max_cpu = crate::resource_utils::parse_cpu("as.max_cpu).unwrap_or(0.0);
let max_mem = crate::resource_utils::parse_memory("as.max_memory).unwrap_or(0);
let pending_steps: Vec<StepInstance> =
crate::db::get_pending_step_instances_for_run(&pool, run_id, available_slots).await?;
for step in pending_steps {
info!("Run {}: Evaluating queued step {}", run_id, step.step_name);
let inst_data: (Value, Value) = crate::db::get_step_spec_and_params(&pool, step.id).await?;
let (cpu_req, mem_req) =
crate::resource_utils::get_step_resource_requirements(&step.step_type, &inst_data.0);
let can_dispatch = if max_cpu > 0.0 || max_mem > 0 {
let mut conn = pool.acquire().await?;
crate::db::claim_step_quota(&mut *conn, run_id, cpu_req, mem_req, max_cpu, max_mem)
.await?
} else {
true };
if !can_dispatch {
debug!(
"Run {}: Insufficient CPU/Memory quota to dispatch step {}",
run_id, step.step_name
);
continue; }
info!("Run {}: Dispatching queued step {}", run_id, step.step_name);
if let Err(e) = dispatch_step_instance(
run_id,
step.id,
&step.step_name,
&step.step_type,
&inst_data.0,
&inst_data.1,
nats_client.clone(),
pool.clone(),
tls_reloader.clone(),
)
.await
{
error!("Failed to dispatch step {}: {:?}", step.id, e);
if max_cpu > 0.0 || max_mem > 0 {
let mut conn = pool.acquire().await?;
let _ = crate::db::release_step_quota(&mut *conn, run_id, cpu_req, mem_req).await;
}
}
}
Ok(())
}
#[tracing::instrument(skip(pool), fields(run_id = %run_id))]
pub async fn archive_workflow(run_id: RunId, pool: PgPool) -> Result<()> {
let mut tx = pool.begin().await?;
let _ = crate::db::lock_workflow_run(&mut *tx, run_id).await?;
let run: Option<(RunStatus,)> = crate::db::get_workflow_run_status(&mut *tx, run_id).await?;
let run_status = match run {
Some(r) => r.0,
None => return Ok(()), };
match run_status {
RunStatus::Succeeded | RunStatus::Failed | RunStatus::Aborted => {
}
_ => return Ok(()), }
info!("Archiving workflow run {}", run_id);
crate::db::archive_and_delete_workflow_run(&mut *tx, run_id)
.await
.with_context(|| format!("Failed to archive and delete workflow run {}", run_id))?;
tx.commit().await?;
info!("Successfully archived workflow run {}", run_id);
Ok(())
}