graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use std::sync::Arc;

use graphile_worker_lifecycle_hooks::JobFetchContext;
use graphile_worker_runtime as runtime;

use super::super::errors::ProcessJobError;
use super::super::job_execution::run_and_release_job;
use super::super::{Worker, WorkerRunner};
use crate::local_queue::{LocalQueue, LocalQueueParams, LocalQueueSignalReceiver};
use crate::streams::job_signal::JobSignalSource;
use crate::Job;

pub(in crate::runner) fn create_local_queues(
    worker: &Worker,
) -> Option<(Vec<LocalQueue>, LocalQueueSignalReceiver)> {
    let config = worker.local_queue_config.as_ref()?;
    let (tx, rx) = runtime::channel(worker.concurrency * 2);
    let queue_count = config.queue_count.min(worker.concurrency);
    let queues = (0..queue_count)
        .map(|_| {
            LocalQueue::new(LocalQueueParams {
                config: config.clone(),
                database: worker.database.clone(),
                schema: worker.schema.clone(),
                worker_id: worker.worker_id.clone(),
                task_details: worker.task_details.clone(),
                poll_interval: worker.poll_interval,
                continuous: true,
                shutdown_signal: Some(worker.shutdown_signal.clone()),
                hooks: worker.hooks.clone(),
                job_signal_sender: tx.clone(),
                use_local_time: worker.use_local_time,
            })
        })
        .collect();

    Some((queues, rx))
}

pub(in crate::runner) async fn process_local_queue_source(
    worker: &WorkerRunner,
    local_queues: &[LocalQueue],
    start_index: usize,
    source: JobSignalSource,
) -> Result<(), ProcessJobError> {
    if matches!(source, JobSignalSource::Notification) {
        local_queues[start_index % local_queues.len()]
            .pulse(1)
            .await;
    }

    let mut source = source;
    loop {
        let job = get_job_from_local_queues(worker, local_queues, start_index).await;

        let Some(job) = job else {
            break;
        };

        let job = Arc::new(job);

        if !worker.hooks.is_empty() {
            worker
                .hooks
                .emit(JobFetchContext {
                    job: job.clone(),
                    worker_id: worker.worker_id.clone(),
                })
                .await;
        }

        run_and_release_job(job.clone(), worker, &source).await?;
        source = JobSignalSource::LocalQueue;
    }

    Ok(())
}

async fn get_job_from_local_queues(
    worker: &WorkerRunner,
    local_queues: &[LocalQueue],
    start_index: usize,
) -> Option<Job> {
    if !worker.forbidden_flags.is_empty() {
        return local_queues[start_index % local_queues.len()]
            .get_job(&worker.forbidden_flags)
            .await;
    }

    for offset in 0..local_queues.len() {
        let queue_index = (start_index + offset) % local_queues.len();
        if let Some(job) = local_queues[queue_index]
            .get_job(&worker.forbidden_flags)
            .await
        {
            return Some(job);
        }
    }

    None
}