graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use std::time::Duration;

use futures::FutureExt;
use graphile_worker_lifecycle_hooks::{
    LocalQueueMode, LocalQueueRefetchDelayAbortContext, LocalQueueRefetchDelayExpiredContext,
    LocalQueueRefetchDelayStartContext,
};
use graphile_worker_runtime as runtime;
use rand::RngExt;
use tracing::trace;

use super::{LocalQueue, RefetchDelayConfig};

impl LocalQueue {
    pub(super) async fn start_refetch_delay(&self, config: &RefetchDelayConfig) {
        let max_abort = config.max_abort_threshold.unwrap_or(5 * self.0.config.size);
        let abort_threshold = if max_abort == usize::MAX {
            usize::MAX
        } else {
            let random: f64 = rand::rng().random();
            ((random * max_abort as f64) as usize).max(1)
        };

        *self.0.refetch_delay.abort_threshold.write().await = abort_threshold;
        self.set_refetch_delay_active(true);
        self.set_refetch_delay_fetch_on_complete(false);

        let duration = Duration::from_millis(
            ((0.5 + rand::rng().random::<f64>() * 0.5) * config.duration.as_millis() as f64) as u64,
        );

        trace!(
            ?duration,
            abort_threshold,
            "LocalQueue starting refetch delay"
        );

        self.0
            .hooks
            .emit(LocalQueueRefetchDelayStartContext {
                worker_id: self.0.worker_id.clone(),
                duration,
                threshold: config.threshold,
                abort_threshold,
            })
            .await;

        let queue_clone = self.clone();
        let refetch_delay_task = runtime::spawn(async move {
            let sleep = runtime::sleep(duration).fuse();
            let notified = queue_clone.0.refetch_delay.abort_notify.notified().fuse();
            futures::pin_mut!(sleep, notified);
            futures::select_biased! {
                _ = sleep => {
                    queue_clone.refetch_delay_complete(false).await;
                }
                _ = notified => {
                    queue_clone.refetch_delay_complete(true).await;
                }
            };
        });
        self.0.refetch_delay_task.replace_abort(refetch_delay_task);
    }

    async fn refetch_delay_complete(&self, aborted: bool) {
        self.set_refetch_delay_active(false);
        self.0.state_notify.notify_one();

        if aborted {
            let count = self.get_refetch_delay_counter();
            let abort_threshold = *self.0.refetch_delay.abort_threshold.read().await;

            self.set_refetch_delay_fetch_on_complete(true);
            trace!("LocalQueue refetch delay aborted");

            self.0
                .hooks
                .emit(LocalQueueRefetchDelayAbortContext {
                    worker_id: self.0.worker_id.clone(),
                    count,
                    abort_threshold,
                })
                .await;
        } else {
            trace!("LocalQueue refetch delay expired");

            self.0
                .hooks
                .emit(LocalQueueRefetchDelayExpiredContext {
                    worker_id: self.0.worker_id.clone(),
                })
                .await;
        }
    }

    async fn check_refetch_delay_abort(&self) {
        if !self.is_refetch_delay_active() {
            return;
        }

        let counter = self.get_refetch_delay_counter();
        let abort_threshold = *self.0.refetch_delay.abort_threshold.read().await;

        if counter >= abort_threshold {
            self.0.refetch_delay.abort_notify.notify_one();
        }
    }

    pub async fn pulse(&self, count: usize) {
        trace!(count, "LocalQueue received pulse");

        self.increment_refetch_delay_counter(count);
        self.check_refetch_delay_abort().await;

        let mode = *self.0.mode.read().await;

        match mode {
            LocalQueueMode::Polling => {
                if self.is_fetch_in_progress() {
                    self.set_fetch_again(true);
                    self.0.state_notify.notify_one();
                } else if !self.is_refetch_delay_active() {
                    self.0.state_notify.notify_one();
                }
            }
            LocalQueueMode::Waiting | LocalQueueMode::TtlExpired => {}
            LocalQueueMode::Released | LocalQueueMode::Starting => {}
        }
    }
}