graphile_worker_queries 0.1.1

Database query helpers for graphile_worker
Documentation
use std::time::Duration;

use chrono::Utc;
use graphile_worker_database::{DbExecutorArg, DbParams, DbValue, Schema};
use indoc::formatdoc;

use crate::duration::duration_as_millis_i64;
use crate::errors::Result;
use crate::rows::{collect_column, get_required};
use crate::schema_names::{PrivateTable, WorkerFunction};

pub async fn list_stale_workers(
    mut executor: impl DbExecutorArg,
    schema: impl Into<Schema>,
    stale_threshold: Duration,
) -> Result<Vec<String>> {
    let schema = schema.into();
    let list_stale_workers = WorkerFunction::ListStaleWorkers.qualified(&schema);
    list_workers_from_threshold_function(
        &mut executor,
        list_stale_workers.to_string(),
        stale_threshold,
    )
    .await
}

pub async fn list_orphan_locked_workers(
    mut executor: impl DbExecutorArg,
    schema: impl Into<Schema>,
    stale_threshold: Duration,
) -> Result<Vec<String>> {
    let schema = schema.into();
    let list_orphan_locked_workers = WorkerFunction::ListOrphanLockedWorkers.qualified(&schema);
    list_workers_from_threshold_function(
        &mut executor,
        list_orphan_locked_workers.to_string(),
        stale_threshold,
    )
    .await
}

async fn list_workers_from_threshold_function(
    executor: &mut impl DbExecutorArg,
    function: String,
    threshold: Duration,
) -> Result<Vec<String>> {
    let sql = formatdoc!(
        r#"
            SELECT worker_id
            FROM {function}($1::bigint * interval '1 millisecond');
        "#
    );

    let rows = executor
        .fetch_all(
            &sql,
            DbParams::from(vec![DbValue::I64(duration_as_millis_i64(threshold))]),
        )
        .await?;

    collect_column(&rows, "worker_id")
}

pub async fn worker_holds_resilient_locks(
    mut executor: impl DbExecutorArg,
    schema: impl Into<Schema>,
    worker_id: &str,
    resilient_flags: &[String],
) -> Result<bool> {
    if resilient_flags.is_empty() {
        return Ok(false);
    }

    let schema = schema.into();
    let jobs = PrivateTable::Jobs.qualified(&schema);
    let sql = formatdoc!(
        r#"
            SELECT EXISTS (
                SELECT 1
                FROM {jobs} AS jobs
                WHERE jobs.locked_by = $1::text
                AND jobs.flags ?| $2::text[]
            ) AS has_resilient_locks;
        "#
    );

    let row = executor
        .fetch_one(
            &sql,
            DbParams::from(vec![
                DbValue::Text(worker_id.to_string()),
                DbValue::TextArray(resilient_flags.to_vec()),
            ]),
        )
        .await?;

    get_required(&row, "has_resilient_locks")
}

pub async fn get_worker_last_heartbeat(
    mut executor: impl DbExecutorArg,
    schema: impl Into<Schema>,
    worker_id: &str,
) -> Result<Option<chrono::DateTime<Utc>>> {
    let schema = schema.into();
    let workers = PrivateTable::Workers.qualified(&schema);
    let sql = formatdoc!(
        r#"
            SELECT workers.last_heartbeat_at
            FROM {workers} AS workers
            WHERE workers.id = $1::text;
        "#
    );

    let row = executor
        .fetch_optional(
            &sql,
            DbParams::from(vec![DbValue::Text(worker_id.to_string())]),
        )
        .await?;

    row.map(|row| get_required(&row, "last_heartbeat_at"))
        .transpose()
}

pub async fn delete_stale_workers(
    mut executor: impl DbExecutorArg,
    schema: impl Into<Schema>,
    worker_ids: &[String],
) -> Result<()> {
    if worker_ids.is_empty() {
        return Ok(());
    }

    let schema = schema.into();
    let delete_stale_workers = WorkerFunction::DeleteStaleWorkers.qualified(&schema);
    let sql = formatdoc!(
        r#"
            SELECT * FROM {delete_stale_workers}($1::text[]);
        "#
    );

    executor
        .execute(&sql, vec![DbValue::TextArray(worker_ids.to_vec())].into())
        .await?;

    Ok(())
}