graphile_worker_utils 0.1.1

Worker utility helpers for graphile_worker
Documentation
use std::time::Duration;

use graphile_worker_migrations::{migrate as run_migrations, MigrateError};

use super::client::WorkerUtils;
use super::types::CleanupTask;
use graphile_worker_queries::errors::GraphileWorkerError;
use graphile_worker_queries::task_identifiers::get_tasks_details;
use graphile_worker_queries::worker_heartbeat::active::list_active_workers as list_heartbeat_workers;
use graphile_worker_recovery::{
    sweep_stale_workers as run_recovery_sweep, ActiveWorkerRow, WorkerRecoveryConfig,
};
use graphile_worker_recovery::{SweepStaleWorkersOptions, SweepStaleWorkersResult};

pub(super) async fn list_active_workers(
    utils: &WorkerUtils,
    sweep_threshold: Duration,
) -> Result<Vec<ActiveWorkerRow>, GraphileWorkerError> {
    list_heartbeat_workers(&utils.database, &utils.schema, sweep_threshold).await
}

pub(super) async fn sweep_stale_workers(
    utils: &WorkerUtils,
    options: SweepStaleWorkersOptions,
) -> Result<SweepStaleWorkersResult, GraphileWorkerError> {
    let recovery_config = WorkerRecoveryConfig::default();
    sweep_stale_workers_with_config(utils, &recovery_config, options).await
}

pub(super) async fn sweep_stale_workers_with_config(
    utils: &WorkerUtils,
    recovery_config: &WorkerRecoveryConfig,
    options: SweepStaleWorkersOptions,
) -> Result<SweepStaleWorkersResult, GraphileWorkerError> {
    run_recovery_sweep(
        &utils.database,
        &utils.schema,
        utils.hooks.as_ref(),
        "worker_utils",
        recovery_config,
        options,
    )
    .await
}

pub(super) async fn cleanup(
    utils: &WorkerUtils,
    tasks: &[CleanupTask],
) -> Result<(), GraphileWorkerError> {
    let should_refresh_task_identifiers = tasks
        .iter()
        .any(|task| matches!(task, CleanupTask::GcTaskIdentifiers));

    if should_refresh_task_identifiers {
        let mut guard = utils.task_details.write().await;
        let task_names = guard.task_names();

        for task in tasks {
            task.execute(&utils.database, &utils.schema, &task_names)
                .await?;
        }

        let refreshed = get_tasks_details(&utils.database, &utils.schema, task_names).await?;
        *guard = refreshed;
        return Ok(());
    }

    for task in tasks {
        task.execute(&utils.database, &utils.schema, &[]).await?;
    }

    Ok(())
}

pub(super) async fn migrate(utils: &WorkerUtils) -> Result<(), MigrateError> {
    run_migrations(&utils.database, &utils.schema).await?;
    Ok(())
}