graphile_worker_recovery 0.1.0

Dead worker recovery helpers for graphile_worker
Documentation
use std::sync::Arc;

use graphile_worker_database::{Database, Schema};
use graphile_worker_lifecycle_hooks::{HookRegistry, WorkerRecoveredContext};
use tracing::{debug, warn};

use graphile_worker_queries::errors::GraphileWorkerError;
use graphile_worker_queries::worker_heartbeat::lock::try_acquire_sweep_lock;
use graphile_worker_queries::worker_heartbeat::stale::delete_stale_workers;

use super::config::WorkerRecoveryConfig;
use super::types::{SweepStaleWorkersOptions, SweepStaleWorkersResult};

mod discovery;
mod jobs;

use discovery::find_dead_worker_ids;
use jobs::recover_jobs_from_workers;

pub async fn sweep_stale_workers(
    database: &Database,
    schema: impl Into<Schema>,
    hooks: Option<&Arc<HookRegistry>>,
    worker_id: &str,
    defaults: &WorkerRecoveryConfig,
    options: SweepStaleWorkersOptions,
) -> Result<SweepStaleWorkersResult, GraphileWorkerError> {
    let schema = schema.into();
    let config = options.resolve(defaults);

    let transaction = database.begin().await?;
    if !try_acquire_sweep_lock(&transaction).await? {
        debug!("Another worker is already running recovery sweep");
        return Ok(SweepStaleWorkersResult {
            worker_ids: Vec::new(),
            recovered_count: 0,
        });
    }

    let dead_worker_ids = find_dead_worker_ids(&transaction, &schema, &config).await?;

    if config.dry_run {
        transaction.commit().await?;
        return Ok(SweepStaleWorkersResult {
            worker_ids: dead_worker_ids,
            recovered_count: 0,
        });
    }

    let recovered_count = recover_jobs_from_workers(
        &transaction,
        &schema,
        hooks,
        worker_id,
        &dead_worker_ids,
        config.recovery_delay,
    )
    .await?;

    delete_stale_workers(&transaction, &schema, &dead_worker_ids).await?;
    transaction.commit().await?;

    if let Some(hooks) = hooks.filter(|hooks| !hooks.is_empty()) {
        hooks
            .emit(WorkerRecoveredContext {
                worker_id: worker_id.to_string(),
                dead_worker_ids: dead_worker_ids.clone(),
                jobs_recovered: recovered_count.max(0) as usize,
            })
            .await;
    }

    if !dead_worker_ids.is_empty() {
        warn!(
            dead_worker_ids = ?dead_worker_ids,
            recovered_count,
            "Recovered jobs from inactive workers"
        );
    }

    Ok(SweepStaleWorkersResult {
        worker_ids: dead_worker_ids,
        recovered_count,
    })
}