runledger-postgres 0.3.0

PostgreSQL persistence layer for the Runledger durable job and workflow system
Documentation
use std::collections::{BTreeMap, BTreeSet};

use runledger_core::jobs::{
    JobStage, WorkflowDependencyReleaseMode, WorkflowRunEnqueue, WorkflowStepEnqueue,
    WorkflowStepExecutionKind,
};
use sqlx::types::Uuid;

use crate::{DbTx, Error, Result};

use super::errors::{workflow_definition_not_available_error, workflow_internal_state_error};
use super::validation::workflow_dependency_count_overflow_error;

pub(in crate::jobs::workflows) type DefaultsByJobType = BTreeMap<String, JobDefinitionDefaults>;
pub(in crate::jobs::workflows) type WorkflowStepIdsByKey = BTreeMap<String, Uuid>;

#[derive(Clone, Debug)]
pub(in crate::jobs::workflows) struct JobDefinitionDefaults {
    default_priority: i32,
    max_attempts: i32,
    default_timeout_seconds: i32,
}

pub(in crate::jobs::workflows) fn dependency_count_total(
    step: &WorkflowStepEnqueue<'_>,
) -> Result<i32> {
    let dependency_count = step.dependencies().len();
    i32::try_from(dependency_count).map_err(|error| {
        workflow_dependency_count_overflow_error(step.step_key().as_str(), dependency_count, error)
    })
}

pub(in crate::jobs::workflows) fn workflow_step_effective_organization_id(
    workflow_organization_id: Option<Uuid>,
    step: &WorkflowStepEnqueue<'_>,
) -> Option<Uuid> {
    step.organization_id().or(workflow_organization_id)
}

pub(in crate::jobs::workflows) fn workflow_step_effective_stage(
    step: &WorkflowStepEnqueue<'_>,
) -> Option<&'static str> {
    match step.execution_kind() {
        WorkflowStepExecutionKind::Job => {
            Some(step.stage().unwrap_or(JobStage::Queued).as_db_value())
        }
        WorkflowStepExecutionKind::External => None,
    }
}

pub(in crate::jobs::workflows) fn workflow_step_defaults<'a>(
    defaults_by_job_type: &'a DefaultsByJobType,
    step: &WorkflowStepEnqueue<'_>,
) -> Result<&'a JobDefinitionDefaults> {
    let job_type = step
        .job_type()
        .ok_or_else(|| workflow_internal_state_error("job workflow step missing job_type"))?;

    defaults_by_job_type
        .get(job_type.as_str())
        .ok_or_else(|| workflow_definition_not_available_error(job_type.as_str()))
}

pub(in crate::jobs::workflows) async fn insert_workflow_step_record_tx(
    tx: &mut DbTx<'_>,
    workflow_run_id: Uuid,
    organization_id: Option<Uuid>,
    step: &WorkflowStepEnqueue<'_>,
    defaults: Option<&JobDefinitionDefaults>,
    dependency_count_pending: i32,
    dependency_count_unsatisfied: i32,
) -> Result<Uuid> {
    let dependency_count_total = dependency_count_total(step)?;
    let (job_type, priority, max_attempts, timeout_seconds, stage) = match step.execution_kind() {
        WorkflowStepExecutionKind::Job => {
            let defaults = defaults.ok_or_else(|| {
                workflow_internal_state_error("missing job definition defaults for job step")
            })?;
            let job_type = step.job_type().ok_or_else(|| {
                workflow_internal_state_error("missing job_type for job workflow step")
            })?;

            (
                Some(job_type.as_str()),
                Some(step.priority().unwrap_or(defaults.default_priority)),
                Some(step.max_attempts().unwrap_or(defaults.max_attempts)),
                Some(
                    step.timeout_seconds()
                        .unwrap_or(defaults.default_timeout_seconds),
                ),
                workflow_step_effective_stage(step),
            )
        }
        WorkflowStepExecutionKind::External => (None, None, None, None, None),
    };
    let step_id: Uuid = sqlx::query_scalar!(
        "INSERT INTO workflow_steps (
            workflow_run_id,
            step_key,
            execution_kind,
            job_type,
            organization_id,
            payload,
            priority,
            max_attempts,
            timeout_seconds,
            stage,
            status,
            dependency_count_total,
            dependency_count_pending,
            dependency_count_unsatisfied
         )
         VALUES (
            $1,
            $2,
            $3::text::workflow_step_execution_kind,
            $4,
            $5,
            $6::jsonb,
            $7,
            $8,
            $9,
            $10,
            'BLOCKED',
            $11,
            $12,
            $13
         )
         RETURNING id",
        workflow_run_id,
        step.step_key() as _,
        step.execution_kind().as_db_value(),
        job_type,
        organization_id,
        step.payload(),
        priority,
        max_attempts,
        timeout_seconds,
        stage,
        dependency_count_total,
        dependency_count_pending,
        dependency_count_unsatisfied,
    )
    .fetch_one(&mut **tx)
    .await
    .map_err(|error| Error::from_query_sqlx_with_context("insert workflow step", error))?;

    Ok(step_id)
}

pub(in crate::jobs::workflows) async fn insert_workflow_steps_tx(
    tx: &mut DbTx<'_>,
    payload: &WorkflowRunEnqueue<'_>,
    workflow_run_id: Uuid,
    defaults_by_job_type: &DefaultsByJobType,
) -> Result<WorkflowStepIdsByKey> {
    let mut step_id_by_key = WorkflowStepIdsByKey::new();
    for step in payload.steps() {
        let defaults = match step.execution_kind() {
            WorkflowStepExecutionKind::Job => {
                Some(workflow_step_defaults(defaults_by_job_type, step)?)
            }
            WorkflowStepExecutionKind::External => None,
        };
        let step_id = insert_workflow_step_record_tx(
            tx,
            workflow_run_id,
            workflow_step_effective_organization_id(payload.organization_id(), step),
            step,
            defaults,
            dependency_count_total(step)?,
            0,
        )
        .await?;
        step_id_by_key.insert(step.step_key().as_str().to_owned(), step_id);
    }

    Ok(step_id_by_key)
}

pub(in crate::jobs::workflows) fn step_id_for_key(
    step_id_by_key: &WorkflowStepIdsByKey,
    step_key: &str,
    missing_error: &'static str,
) -> Result<Uuid> {
    step_id_by_key
        .get(step_key)
        .copied()
        .ok_or_else(|| workflow_internal_state_error(missing_error))
}

pub(in crate::jobs::workflows) async fn insert_workflow_step_dependency_record_tx(
    tx: &mut DbTx<'_>,
    workflow_run_id: Uuid,
    prerequisite_step_id: Uuid,
    dependent_step_id: Uuid,
    release_mode: &str,
) -> Result<()> {
    sqlx::query!(
        "INSERT INTO workflow_step_dependencies (
            workflow_run_id,
            prerequisite_step_id,
            dependent_step_id,
            release_mode
         )
         VALUES ($1, $2, $3, $4::text::workflow_dependency_release_mode)",
        workflow_run_id,
        prerequisite_step_id,
        dependent_step_id,
        release_mode,
    )
    .execute(&mut **tx)
    .await
    .map_err(|error| {
        Error::from_query_sqlx_with_context("insert workflow step dependency", error)
    })?;

    Ok(())
}

pub(in crate::jobs::workflows) async fn insert_workflow_step_dependencies_tx(
    tx: &mut DbTx<'_>,
    payload: &WorkflowRunEnqueue<'_>,
    workflow_run_id: Uuid,
    step_id_by_key: &WorkflowStepIdsByKey,
) -> Result<()> {
    for step in payload.steps() {
        let dependent_step_id = step_id_for_key(
            step_id_by_key,
            step.step_key().as_str(),
            "missing dependent workflow step id",
        )?;
        for dependency in step.dependencies() {
            let prerequisite_step_id = step_id_for_key(
                step_id_by_key,
                dependency.prerequisite_step_key.as_str(),
                "missing prerequisite workflow step id",
            )?;
            let release_mode = dependency
                .release_mode
                .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal)
                .as_db_value();
            insert_workflow_step_dependency_record_tx(
                tx,
                workflow_run_id,
                prerequisite_step_id,
                dependent_step_id,
                release_mode,
            )
            .await?;
        }
    }

    Ok(())
}

pub(in crate::jobs::workflows) async fn fetch_job_definition_defaults_tx(
    tx: &mut DbTx<'_>,
    steps: &[WorkflowStepEnqueue<'_>],
) -> Result<DefaultsByJobType> {
    let job_types: Vec<String> = steps
        .iter()
        .filter(|step| step.execution_kind() == WorkflowStepExecutionKind::Job)
        .map(|step| {
            step.job_type()
                .map(|job_type| job_type.as_str().to_owned())
                .ok_or_else(|| workflow_internal_state_error("job workflow step missing job_type"))
        })
        .collect::<Result<BTreeSet<_>>>()?
        .into_iter()
        .collect();

    let rows = sqlx::query!(
        "SELECT job_type, default_priority, max_attempts, default_timeout_seconds
         FROM job_definitions jd
         WHERE is_enabled = true
           AND job_type = ANY($1::text[])
         FOR SHARE OF jd",
        &job_types,
    )
    .fetch_all(&mut **tx)
    .await
    .map_err(|error| {
        Error::from_query_sqlx_with_context("lookup workflow step job definition defaults", error)
    })?;

    let defaults_by_job_type: DefaultsByJobType = rows
        .into_iter()
        .map(|row| {
            (
                row.job_type,
                JobDefinitionDefaults {
                    default_priority: row.default_priority,
                    max_attempts: row.max_attempts,
                    default_timeout_seconds: row.default_timeout_seconds,
                },
            )
        })
        .collect();

    if let Some(step) = steps
        .iter()
        .filter(|step| step.execution_kind() == WorkflowStepExecutionKind::Job)
        .find(|step| {
            step.job_type()
                .is_none_or(|job_type| !defaults_by_job_type.contains_key(job_type.as_str()))
        })
    {
        return Err(workflow_definition_not_available_error(
            step.job_type()
                .map(|job_type| job_type.as_str())
                .unwrap_or("<missing-job-type>"),
        ));
    }

    Ok(defaults_by_job_type)
}