graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use futures::{stream::FuturesUnordered, FutureExt, Stream, StreamExt};
use graphile_worker_runtime as runtime;

use super::super::errors::{ProcessJobError, WorkerRuntimeError};
use crate::streams::job_signal::JobSignalSource;

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum FanoutResult {
    Open,
    Closed,
}

pub(in crate::runner) async fn dispatch_job_signals<S>(
    job_signal: S,
    source_tx: runtime::Sender<JobSignalSource>,
    mut worker_handles: FuturesUnordered<runtime::JoinHandle<Result<(), ProcessJobError>>>,
    fanout: usize,
) -> Result<(), WorkerRuntimeError>
where
    S: Stream<Item = JobSignalSource>,
{
    let job_signal = job_signal.fuse();
    futures::pin_mut!(job_signal);

    loop {
        let next_source = job_signal.next().fuse();
        let worker_done = worker_handles.next().fuse();
        futures::pin_mut!(next_source, worker_done);

        futures::select_biased! {
            worker_result = worker_done => {
                match worker_result {
                    Some(Ok(Ok(()))) => {
                        if worker_handles.is_empty() {
                            break;
                        }
                    }
                    Some(Ok(Err(e))) => {
                        source_tx.close();
                        return Err(e.into());
                    }
                    Some(Err(e)) => {
                        source_tx.close();
                        return Err(e.into());
                    }
                    None => break,
                }
            }
            source = next_source => {
                let Some(source) = source else {
                    break;
                };

                if fanout_job_signal(&source_tx, source, fanout) == FanoutResult::Closed {
                    break;
                }
            }
        }
    }

    drop(source_tx);

    while let Some(result) = worker_handles.next().await {
        result??;
    }

    Ok(())
}

fn fanout_job_signal(
    source_tx: &runtime::Sender<JobSignalSource>,
    source: JobSignalSource,
    fanout: usize,
) -> FanoutResult {
    for _ in 0..fanout {
        match source_tx.try_send(source) {
            Ok(()) => {}
            Err(error) if error.is_closed() => return FanoutResult::Closed,
            Err(_) => return FanoutResult::Open,
        }
    }

    FanoutResult::Open
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn fanout_queues_requested_signals_when_channel_has_capacity() {
        let (tx, rx) = runtime::channel(4);

        let result = fanout_job_signal(&tx, JobSignalSource::Notification, 3);

        assert_eq!(result, FanoutResult::Open);
        assert!(matches!(rx.try_recv(), Ok(JobSignalSource::Notification)));
        assert!(matches!(rx.try_recv(), Ok(JobSignalSource::Notification)));
        assert!(matches!(rx.try_recv(), Ok(JobSignalSource::Notification)));
        assert!(rx.try_recv().is_err());
    }

    #[test]
    fn fanout_coalesces_when_worker_channel_is_full() {
        let (tx, rx) = runtime::channel(1);
        tx.try_send(JobSignalSource::Polling)
            .expect("initial signal should fit");

        let result = fanout_job_signal(&tx, JobSignalSource::Notification, 3);

        assert_eq!(result, FanoutResult::Open);
        assert!(matches!(rx.try_recv(), Ok(JobSignalSource::Polling)));
        assert!(rx.try_recv().is_err());
    }

    #[test]
    fn fanout_reports_closed_worker_channel() {
        let (tx, rx) = runtime::channel(1);
        drop(rx);

        let result = fanout_job_signal(&tx, JobSignalSource::Notification, 1);

        assert_eq!(result, FanoutResult::Closed);
    }
}