runledger-postgres 0.1.1

PostgreSQL persistence layer for the Runledger durable job and workflow system
Documentation
mod enqueue;
mod hooks;
mod locking;
mod mutate;
mod read;
mod runtime;
mod validation;

pub use self::enqueue::{enqueue_workflow_run, enqueue_workflow_run_tx};
pub(crate) use self::hooks::{on_claim_released, on_claimed, on_retry_scheduled, on_terminal};
pub(crate) use self::locking::{
    lock_workflow_run_release_tx, try_lock_workflow_run_release_shared_tx,
};
pub use self::mutate::{
    append_workflow_steps, append_workflow_steps_tx, cancel_workflow_run_tx,
    list_workflow_step_keys_for_update_tx, update_workflow_step_and_pending_job_payload_tx,
};
pub use self::read::{
    get_latest_workflow_run_by_type, get_workflow_run_by_id,
    get_workflow_run_by_type_and_idempotency_key, get_workflow_run_by_type_and_idempotency_key_tx,
    get_workflow_run_id_for_job, list_workflow_runs, list_workflow_step_dependencies,
    list_workflow_steps,
};
pub use self::runtime::{complete_external_workflow_step, complete_external_workflow_step_tx};
#[cfg(feature = "test-support")]
pub mod test_support {
    pub use super::locking::test_support::workflow_run_release_lock_key;
}

use runledger_core::jobs::{
    JobStage, JobTypeName, WorkflowRunStatus, WorkflowStepExecutionKind, WorkflowStepStatus,
};
use sqlx::types::Uuid;

use crate::{Error, QueryError, QueryErrorCategory};

#[derive(Debug, Clone)]
struct JobDefinitionDefaults {
    default_priority: i32,
    max_attempts: i32,
    default_timeout_seconds: i32,
}

#[derive(Debug, Clone)]
pub(crate) struct StepReleaseCandidate {
    id: Uuid,
    workflow_run_id: Uuid,
    execution_kind: WorkflowStepExecutionKind,
    job_type: Option<JobTypeName>,
    organization_id: Option<Uuid>,
    payload: serde_json::Value,
    priority: Option<i32>,
    max_attempts: Option<i32>,
    timeout_seconds: Option<i32>,
    stage: Option<JobStage>,
}

fn workflow_definition_not_available_error(job_type: &str) -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Validation,
        "workflow.definition_not_found_or_disabled",
        "Workflow step job type is not available.",
        format!("workflow step job definition missing or disabled: {job_type}"),
    ))
}

fn workflow_internal_state_error(message: impl Into<String>) -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Internal,
        "workflow.internal_state",
        "Workflow state is invalid.",
        message,
    ))
}

fn workflow_release_conflict_error(workflow_run_id: Uuid) -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Conflict,
        "workflow.release_conflict",
        "Workflow step release conflicted with another workflow mutation.",
        format!("workflow run {workflow_run_id} is locked for an exclusive mutation"),
    ))
}

fn workflow_external_step_not_found_error() -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Validation,
        "workflow.external_step_not_found",
        "External workflow step was not found.",
        "external workflow step lookup failed",
    ))
}

fn workflow_external_step_not_external_error(step_key: &str) -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Validation,
        "workflow.external_step_not_external",
        "Workflow step is not an external gate.",
        format!("workflow step '{step_key}' is not an EXTERNAL step"),
    ))
}

fn workflow_external_step_not_waiting_error(step_key: &str, status: WorkflowStepStatus) -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Validation,
        "workflow.external_step_not_waiting",
        "External workflow step is not waiting for completion.",
        format!(
            "workflow step '{step_key}' is not waiting for external completion; current status={}",
            status.as_db_value()
        ),
    ))
}

fn workflow_external_completion_conflict_error(
    step_key: &str,
    current_status: WorkflowStepStatus,
    requested_status: WorkflowStepStatus,
) -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Validation,
        "workflow.external_step_conflicting_completion_retry",
        "External workflow step was already completed with a different result.",
        format!(
            "workflow step '{step_key}' was already completed as {} and cannot be retried as {}",
            current_status.as_db_value(),
            requested_status.as_db_value()
        ),
    ))
}

fn workflow_external_completion_invalid_status_error(status: WorkflowStepStatus) -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Validation,
        "workflow.external_step_invalid_terminal_status",
        "External workflow step completion status is invalid.",
        format!(
            "external workflow step completion requires SUCCEEDED, FAILED, or CANCELED; got {}",
            status.as_db_value()
        ),
    ))
}

fn workflow_run_not_found_error() -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Validation,
        "workflow.run_not_found",
        "Workflow run was not found.",
        "workflow run lookup failed",
    ))
}

fn workflow_append_blank_mutation_key_error() -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Validation,
        "workflow.append_mutation_key_blank",
        "Workflow append mutation key must be non-empty.",
        "workflow append mutation key must be non-empty",
    ))
}

fn workflow_append_terminal_run_error(status: WorkflowRunStatus) -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Validation,
        "workflow.append_terminal_run",
        "Workflow steps cannot be appended to a terminal workflow run.",
        format!(
            "workflow append rejected because run is terminal: status={}",
            status.as_db_value()
        ),
    ))
}

fn workflow_append_conflicting_retry_error(mutation_key: &str) -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Validation,
        "workflow.append_conflicting_retry",
        "Workflow append retry conflicts with the existing mutation.",
        format!(
            "workflow append mutation '{mutation_key}' does not match the already-recorded request"
        ),
    ))
}

fn workflow_append_window_missing_error(step_key: &str) -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Validation,
        "workflow.append_window_missing",
        "Workflow append window was not found.",
        format!("workflow append window step '{step_key}' was not found"),
    ))
}

fn workflow_append_window_not_external_error(step_key: &str) -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Validation,
        "workflow.append_window_not_external",
        "Workflow append window must be an external gate.",
        format!("workflow append window step '{step_key}' is not an EXTERNAL step"),
    ))
}

fn workflow_append_window_not_open_error(step_key: &str, status: WorkflowStepStatus) -> Error {
    Error::QueryError(QueryError::from_classified(
        QueryErrorCategory::Validation,
        "workflow.append_window_not_open",
        "Workflow append window is no longer open.",
        format!(
            "workflow append window step '{step_key}' is not waiting for external completion; current status={}",
            status.as_db_value()
        ),
    ))
}

pub(crate) use self::validation::workflow_dag_validation_error;
pub(crate) use self::validation::workflow_dependency_count_overflow_error;