graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use chrono::Utc;
use futures::FutureExt;
use graphile_worker_lifecycle_hooks::{LocalQueueGetJobsCompleteContext, LocalQueueMode};
use graphile_worker_runtime as runtime;
use tracing::{debug, error};

use graphile_worker_queries::batch_get_jobs::batch_get_jobs;

use super::LocalQueue;

impl LocalQueue {
    pub(super) async fn schedule_fetch(&self) {
        loop {
            let mode = *self.0.mode.read().await;

            if mode == LocalQueueMode::Released {
                break;
            }

            let can_fetch = mode == LocalQueueMode::Polling
                && !self.is_fetch_in_progress()
                && !self.is_refetch_delay_active();

            if !can_fetch {
                self.0.state_notify.notified().await;
                continue;
            }

            self.fetch().await;

            let mode = *self.0.mode.read().await;
            if mode == LocalQueueMode::Polling && self.0.continuous {
                let should_fetch_again = self.take_fetch_again();
                let refetch_delay_wants_fetch = self.take_refetch_delay_fetch_on_complete();

                if !should_fetch_again && !refetch_delay_wants_fetch {
                    let sleep = runtime::sleep(self.0.poll_interval).fuse();
                    let notified = self.0.state_notify.notified().fuse();
                    futures::pin_mut!(sleep, notified);
                    futures::select_biased! {
                        _ = sleep => {}
                        _ = notified => {}
                    };
                }
            } else if mode == LocalQueueMode::Polling && !self.0.continuous {
                self.set_mode(LocalQueueMode::Released).await;
                break;
            }
        }
    }

    async fn fetch(&self) {
        if !self.try_start_fetch() {
            return;
        }

        if self.is_refetch_delay_active() {
            self.set_refetch_delay_fetch_on_complete(true);
            self.end_fetch();
            self.0.state_notify.notify_one();
            return;
        }

        self.set_fetch_again(false);
        self.reset_refetch_delay_counter();

        let task_details = self.0.task_details.read().await;
        let now = self.0.use_local_time.then(Utc::now);
        let result = batch_get_jobs(
            &self.0.database,
            &task_details,
            &self.0.schema,
            &self.0.worker_id,
            &[],
            self.0.config.size.try_into().unwrap_or(i32::MAX),
            now,
        )
        .await;
        drop(task_details);

        self.end_fetch();
        self.0.state_notify.notify_one();

        match result {
            Ok(jobs) => {
                let job_count = jobs.len();
                debug!(job_count, "LocalQueue fetched jobs from database");

                self.0
                    .hooks
                    .emit(LocalQueueGetJobsCompleteContext {
                        worker_id: self.0.worker_id.clone(),
                        jobs_count: job_count,
                    })
                    .await;

                let fetched_max = job_count >= self.0.config.size;

                if let Some(ref refetch_delay_config) = self.0.config.refetch_delay {
                    let threshold_surpassed =
                        fetched_max || job_count > refetch_delay_config.threshold;

                    if !threshold_surpassed {
                        self.start_refetch_delay(refetch_delay_config).await;
                    }
                }

                if !jobs.is_empty() {
                    self.received_jobs(jobs, fetched_max).await;
                } else if !self.0.continuous {
                    self.set_mode(LocalQueueMode::Released).await;
                }
            }
            Err(e) => {
                error!(error = %e, "LocalQueue failed to fetch jobs");
            }
        }
    }
}