stormchaser-engine 1.3.1

A robust, distributed workflow engine for event-driven and human-triggered workflows.
Documentation
use sqlx::{Executor, Postgres};
use stormchaser_model::RunId;

#[allow(clippy::too_many_arguments)]
/// Get run quota by id.
pub async fn get_run_quota_by_id<'a, E, O>(executor: E, run_id: RunId) -> Result<O, sqlx::Error>
where
    E: Executor<'a, Database = Postgres>,
    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
{
    sqlx::query_as::<_, O>(
        r#"SELECT run_id, max_concurrency, max_cpu, max_memory, max_storage, timeout, current_cpu_usage, current_memory_usage FROM run_quotas WHERE run_id = $1"#
    )
    .bind(run_id)
    .fetch_one(executor)
    .await
}

/// Claim step quota.
pub async fn claim_step_quota(
    executor: &mut sqlx::PgConnection,
    run_id: RunId,
    cpu_req: f64,
    mem_req: i64,
    max_cpu: f64,
    max_mem: i64,
) -> Result<bool, sqlx::Error> {
    use sqlx::Row;

    let row = sqlx::query(
        "SELECT current_cpu_usage, current_memory_usage FROM run_quotas WHERE run_id = $1 FOR UPDATE",
    )
    .bind(run_id)
    .fetch_one(&mut *executor)
    .await?;

    let current_cpu: f64 = row.get("current_cpu_usage");
    let current_mem_str: String = row.try_get("current_memory_usage")?;
    let current_mem: i64 = current_mem_str
        .parse()
        .map_err(|e: std::num::ParseIntError| sqlx::Error::Decode(Box::new(e)))?;

    if current_cpu + cpu_req <= max_cpu && current_mem + mem_req <= max_mem {
        let new_mem = (current_mem + mem_req).to_string();
        sqlx::query(
            "UPDATE run_quotas SET current_cpu_usage = current_cpu_usage + $1, current_memory_usage = $2 WHERE run_id = $3",
        )
        .bind(cpu_req)
        .bind(new_mem)
        .bind(run_id)
        .execute(&mut *executor)
        .await?;
        Ok(true)
    } else {
        Ok(false)
    }
}

/// Release step quota.
pub async fn release_step_quota(
    executor: &mut sqlx::PgConnection,
    run_id: RunId,
    cpu_req: f64,
    mem_req: i64,
) -> Result<(), sqlx::Error> {
    use sqlx::Row;

    let row =
        sqlx::query("SELECT current_memory_usage FROM run_quotas WHERE run_id = $1 FOR UPDATE")
            .bind(run_id)
            .fetch_optional(&mut *executor)
            .await?;

    if let Some(row) = row {
        let current_mem_str: String = row.try_get("current_memory_usage")?;
        let current_mem: i64 = current_mem_str
            .parse()
            .map_err(|e: std::num::ParseIntError| sqlx::Error::Decode(Box::new(e)))?;
        let new_mem = (current_mem - mem_req).max(0).to_string();

        sqlx::query(
            "UPDATE run_quotas SET current_cpu_usage = GREATEST(0, current_cpu_usage - $1), current_memory_usage = $2 WHERE run_id = $3",
        )
        .bind(cpu_req)
        .bind(new_mem)
        .bind(run_id)
        .execute(&mut *executor)
        .await?;
    }

    Ok(())
}