graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;

use futures::FutureExt;
use graphile_worker_runtime as runtime;
use graphile_worker_shutdown_signal::ShutdownSignal;

pub(super) struct RecvOrShutdown<T> {
    pub(super) item: Option<T>,
    pub(super) shutdown: bool,
}

pub(super) async fn recv_or_shutdown<T>(
    rx: &runtime::Receiver<T>,
    shutdown_signal: &mut ShutdownSignal,
) -> RecvOrShutdown<T> {
    let recv = rx.recv().fuse();
    let shutdown = shutdown_signal.fuse();
    futures::pin_mut!(recv, shutdown);

    futures::select_biased! {
        _ = shutdown => RecvOrShutdown { item: None, shutdown: true },
        item = recv => RecvOrShutdown { item: item.ok(), shutdown: false },
    }
}

pub(super) trait BatchProcessor<T>: Send + Sync + 'static {
    fn flush<'a>(&'a self, batch: &'a [T]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
}

pub(super) async fn run_batcher_task<T, P>(
    rx: runtime::Receiver<T>,
    delay: Duration,
    processor: P,
    mut shutdown_signal: ShutdownSignal,
) where
    T: Send + 'static,
    P: BatchProcessor<T>,
{
    let mut batch = Vec::new();

    loop {
        let first = recv_or_shutdown(&rx, &mut shutdown_signal).await;

        if first.shutdown {
            drain_and_flush(&rx, &mut batch, &processor).await;
            return;
        }

        let Some(first) = first.item else {
            processor.flush(&batch).await;
            return;
        };

        batch.push(first);

        let timeout = runtime::sleep(delay).fuse();
        futures::pin_mut!(timeout);

        loop {
            let recv = rx.recv().fuse();
            let shutdown = (&mut shutdown_signal).fuse();
            futures::pin_mut!(recv, shutdown);

            let result = futures::select_biased! {
                _ = shutdown => {
                    drain_and_flush(&rx, &mut batch, &processor).await;
                    return;
                }
                _ = timeout => break,
                result = recv => result,
            };

            match result {
                Ok(item) => batch.push(item),
                Err(_) => {
                    processor.flush(&batch).await;
                    return;
                }
            }
        }

        processor.flush(&batch).await;
        batch.clear();
    }
}

async fn drain_and_flush<T, P>(rx: &runtime::Receiver<T>, batch: &mut Vec<T>, processor: &P)
where
    T: Send + 'static,
    P: BatchProcessor<T>,
{
    while let Ok(item) = rx.try_recv() {
        batch.push(item);
    }
    processor.flush(batch).await;
}