graphile_worker_recovery 0.1.0

Dead worker recovery helpers for graphile_worker
Documentation
use std::sync::Arc;
use std::time::Duration;

use graphile_worker_database::{DbExecutorArg, Schema};
use graphile_worker_lifecycle_hooks::{FailureReason, HookRegistry, JobInterruptedContext};

use graphile_worker_queries::errors::GraphileWorkerError;
use graphile_worker_queries::recover_workers::{
    get_locked_jobs_for_recovery, recover_dead_worker_jobs,
};

use super::super::job_recovery::apply_job_recovery;
use super::super::types::JobRecoveryRequest;

pub(super) async fn recover_jobs_from_workers(
    mut executor: impl DbExecutorArg,
    schema: &Schema,
    hooks: Option<&Arc<HookRegistry>>,
    worker_id: &str,
    worker_ids: &[String],
    recovery_delay: Duration,
) -> Result<i32, GraphileWorkerError> {
    if worker_ids.is_empty() {
        return Ok(0);
    }

    let has_hooks = hooks.is_some_and(|hooks| !hooks.is_empty());
    if !has_hooks {
        return recover_dead_worker_jobs(&mut executor, schema, worker_ids, recovery_delay).await;
    }

    let jobs = get_locked_jobs_for_recovery(&mut executor, schema, worker_ids).await?;
    let mut recovered_count = 0;

    for job in jobs {
        let Some(previous_worker_id) = job.locked_by().as_deref() else {
            continue;
        };

        let outcome = apply_job_recovery(
            &mut executor,
            schema,
            JobRecoveryRequest {
                hooks,
                worker_id,
                job: job.clone(),
                previous_worker_id,
                reason: FailureReason::WorkerCrashed,
                recovery_delay,
            },
        )
        .await?;

        if outcome.was_handled() {
            recovered_count += 1;
            if let Some(hooks) = hooks {
                hooks
                    .emit(JobInterruptedContext {
                        job,
                        worker_id: worker_id.to_string(),
                        reason: FailureReason::WorkerCrashed,
                    })
                    .await;
            }
        }
    }

    Ok(recovered_count)
}