graphile_worker 0.11.4

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

use graphile_worker::{
    HookRegistry, IntoTaskHandlerResult, LocalQueueConfig, LocalQueueGetJobsComplete,
    LocalQueueInit, LocalQueueRefetchDelayAbort, LocalQueueRefetchDelayExpired,
    LocalQueueRefetchDelayStart, LocalQueueReturnJobs, LocalQueueSetMode, Plugin,
    RefetchDelayConfig, WorkerContext, WorkerOptions,
};
use graphile_worker_task_handler::TaskHandler;
use serde::{Deserialize, Serialize};
use sqlx::postgres::PgConnectOptions;
use tracing_subscriber::{
    filter::EnvFilter, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt,
};

fn enable_logs() {
    let fmt_layer = tracing_subscriber::fmt::layer();
    let filter_layer = EnvFilter::try_new("info,graphile_worker=debug").unwrap();

    tracing_subscriber::registry()
        .with(filter_layer)
        .with(fmt_layer)
        .init();
}

#[derive(Debug, Default)]
struct LocalQueueMonitorPlugin {
    jobs_fetched: AtomicUsize,
    jobs_returned: AtomicUsize,
    refetch_delays_started: AtomicUsize,
    refetch_delays_aborted: AtomicUsize,
}

impl Plugin for LocalQueueMonitorPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        let plugin = Arc::new(self);

        hooks.on(LocalQueueInit, async |ctx| {
            println!(
                "[LocalQueueMonitor] LocalQueue initialized for worker {}",
                ctx.worker_id
            );
        });

        hooks.on(LocalQueueSetMode, async |ctx| {
            println!(
                "[LocalQueueMonitor] Mode changed: {:?} -> {:?}",
                ctx.old_mode, ctx.new_mode
            );
        });

        {
            let plugin = plugin.clone();
            hooks.on(LocalQueueGetJobsComplete, move |ctx| {
                let plugin = plugin.clone();
                async move {
                    plugin
                        .jobs_fetched
                        .fetch_add(ctx.jobs_count, Ordering::Relaxed);
                    println!(
                        "[LocalQueueMonitor] Batch fetched {} jobs (total: {})",
                        ctx.jobs_count,
                        plugin.jobs_fetched.load(Ordering::Relaxed)
                    );
                }
            });
        }

        {
            let plugin = plugin.clone();
            hooks.on(LocalQueueReturnJobs, move |ctx| {
                let plugin = plugin.clone();
                async move {
                    plugin.jobs_returned.fetch_add(ctx.jobs_count, Ordering::Relaxed);
                    println!(
                        "[LocalQueueMonitor] Returned {} jobs to database (TTL expired or shutdown)",
                        ctx.jobs_count
                    );
                }
            });
        }

        {
            let plugin = plugin.clone();
            hooks.on(LocalQueueRefetchDelayStart, move |ctx| {
                let plugin = plugin.clone();
                async move {
                    plugin.refetch_delays_started.fetch_add(1, Ordering::Relaxed);
                    println!(
                        "[LocalQueueMonitor] Refetch delay started: {:?} (threshold: {}, abort_threshold: {})",
                        ctx.duration, ctx.threshold, ctx.abort_threshold
                    );
                }
            });
        }

        {
            let plugin = plugin.clone();
            hooks.on(LocalQueueRefetchDelayAbort, move |ctx| {
                let plugin = plugin.clone();
                async move {
                    plugin
                        .refetch_delays_aborted
                        .fetch_add(1, Ordering::Relaxed);
                    println!(
                        "[LocalQueueMonitor] Refetch delay aborted! count={} >= abort_threshold={}",
                        ctx.count, ctx.abort_threshold
                    );
                }
            });
        }

        hooks.on(LocalQueueRefetchDelayExpired, async |_ctx| {
            println!("[LocalQueueMonitor] Refetch delay expired naturally");
        });
    }
}

#[derive(Deserialize, Serialize)]
struct ProcessItem {
    item_id: u32,
}

impl TaskHandler for ProcessItem {
    const IDENTIFIER: &'static str = "process_item";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        tokio::time::sleep(Duration::from_millis(10)).await;
        println!("Processed item {}", self.item_id);
        Ok::<(), String>(())
    }
}

fn get_database_url() -> String {
    std::env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://postgres:root@localhost:5432".to_string())
}

#[tokio::main]
async fn main() {
    enable_logs();

    let pg_options = PgConnectOptions::from_str(&get_database_url()).unwrap();

    let pg_pool = sqlx::postgres::PgPoolOptions::new()
        .max_connections(5)
        .connect_with(pg_options)
        .await
        .unwrap();

    let worker = WorkerOptions::default()
        .concurrency(4)
        .schema("example_local_queue")
        .define_job::<ProcessItem>()
        .pg_pool(pg_pool)
        .add_plugin(LocalQueueMonitorPlugin::default())
        .local_queue(
            LocalQueueConfig::default()
                .with_size(50)
                .with_ttl(Duration::from_secs(60))
                .with_refetch_delay(
                    RefetchDelayConfig::default()
                        .with_duration(Duration::from_millis(100))
                        .with_threshold(5)
                        .with_max_abort_threshold(20),
                ),
        )
        .init()
        .await
        .unwrap();

    let utils = worker.create_utils();

    println!("\n=== LocalQueue Example ===");
    println!("LocalQueue batch-fetches jobs to reduce database load.");
    println!("Configuration:");
    println!("  - Size: 50 (max jobs to fetch per batch)");
    println!("  - TTL: 60s (return unfetched jobs after this time)");
    println!("  - Refetch delay: 100ms (pause before next fetch when queue is low)");
    println!("  - Refetch threshold: 5 (trigger delay when fewer jobs fetched)");
    println!("  - Abort threshold: 20 (cancel delay after this many pulses)\n");

    println!("Adding 20 jobs...\n");

    for i in 1..=20 {
        utils
            .add_job(ProcessItem { item_id: i }, Default::default())
            .await
            .unwrap();
    }

    println!("Jobs added. Watch the LocalQueue behavior:\n");
    println!("1. Jobs are batch-fetched from the database");
    println!("2. When batch size < threshold, refetch delay activates");
    println!("3. New job notifications can abort the delay early\n");

    worker.run().await.unwrap();
}