runledger-postgres 0.1.1

PostgreSQL persistence layer for the Runledger durable job and workflow system
Documentation
use sqlx::types::Uuid;

use crate::{DbPool, Error, Result};

use super::common::{HEARTBEAT_LEASE_MISMATCH_CONTEXT, rollback_and_return_lease_mismatch};

pub async fn heartbeat_job(
    pool: &DbPool,
    job_id: Uuid,
    run_number: i32,
    attempt: i32,
    worker_id: &str,
    lease_duration_seconds: i32,
) -> Result<()> {
    let mut tx = pool
        .begin()
        .await
        .map_err(|error| Error::ConnectionError(error.to_string()))?;

    let updated = sqlx::query!(
        "UPDATE job_queue
         SET lease_expires_at = now() + make_interval(secs => $5::int4),
             last_heartbeat_at = now(),
             updated_at = now()
         WHERE id = $1
           AND run_number = $2
           AND attempt = $3
           AND worker_id = $4
           AND status = 'LEASED'",
        job_id,
        run_number,
        attempt,
        worker_id,
        lease_duration_seconds,
    )
    .execute(&mut *tx)
    .await
    .map_err(|error| Error::from_query_sqlx_with_context("heartbeat job", error))?
    .rows_affected();

    if updated == 0 {
        return rollback_and_return_lease_mismatch(tx, HEARTBEAT_LEASE_MISMATCH_CONTEXT).await;
    }

    sqlx::query!(
        "INSERT INTO job_events (
            job_id,
            run_number,
            attempt,
            event_type,
            payload
         )
         VALUES (
            $1,
            $2,
            $3,
            'HEARTBEAT',
            jsonb_build_object('worker_id', $4::text)
         )",
        job_id,
        run_number,
        attempt,
        worker_id,
    )
    .execute(&mut *tx)
    .await
    .map_err(|error| Error::from_query_sqlx_with_context("heartbeat job event", error))?;

    tx.commit()
        .await
        .map_err(|error| Error::ConnectionError(error.to_string()))?;

    Ok(())
}