simple-queue 0.2.2

A simple persistent queue implementation in Rust backed by PostgreSQL and tokio
Documentation
use std::sync::{Arc, OnceLock};

use simple_queue::SimpleQueue;
use sqlx::{PgPool, postgres::PgPoolOptions};

const DB_URL: &str = "postgres://postgres@localhost:5432/simple_queue";

pub async fn spawn_queue_with(
    pool: &PgPool,
    config_fn: impl FnOnce(SimpleQueue) -> SimpleQueue,
) -> Arc<SimpleQueue> {
    let queue = Arc::new(config_fn(SimpleQueue::new(pool.clone())));
    let queue2 = queue.clone();
    tokio::spawn(async move {
        let joinset = simple_queue::start(queue2.clone()).await;
        std::future::pending::<()>().await;
        let _ = joinset;
    });
    queue
}

#[allow(dead_code)]
pub async fn spawn_queue(pool: &PgPool) -> Arc<SimpleQueue> {
    let default_queue = |queue: SimpleQueue| {
        queue
            .with_default_backoff_strategy(simple_queue::sync::BackoffStrategy::Linear {
                delay: chrono::Duration::milliseconds(100),
            })
            .with_max_reprocess_count(10)
            .with_empty_poll_sleep(tokio::time::Duration::from_millis(100))
            .with_heartbeat_interval(tokio::time::Duration::from_millis(500))
    };
    return spawn_queue_with(pool, default_queue).await;
}

pub struct TestContext {
    pub pool: PgPool,
    pub admin_pool: PgPool,
    pub schema: String,
}
impl TestContext {
    pub async fn new() -> Self {
        init_tracing();
        let schema = format!(
            "test_schema_{}",
            uuid::Uuid::new_v4().to_string().replace("-", "")
        );
        tracing::info!("schema: {}", schema);

        let admin_pool = PgPool::connect(DB_URL).await.unwrap();

        sqlx::raw_sql(&format!("CREATE SCHEMA \"{}\"", schema))
            .execute(&admin_pool)
            .await
            .unwrap();

        let pool = PgPoolOptions::new()
            .max_connections(20)
            .min_connections(5)
            .connect(&format!("{}?options=--search_path%3D{}", DB_URL, schema))
            .await
            .unwrap();

        simple_queue::setup(&pool).await.unwrap();

        Self {
            pool,
            admin_pool,
            schema,
        }
    }
}

impl Drop for TestContext {
    fn drop(&mut self) {
        use tokio::runtime::*;
        let schema = self.schema.clone();
        let admin_pool = self.admin_pool.clone();

        let cleanup = async move {
            let _ = sqlx::raw_sql(&format!("DROP SCHEMA \"{}\" CASCADE", &schema))
                .execute(&admin_pool)
                .await;
            admin_pool.close().await;
        };

        match Handle::try_current() {
            Ok(handle) => match handle.runtime_flavor() {
                RuntimeFlavor::MultiThread => {
                    tokio::task::block_in_place(|| handle.block_on(cleanup));
                }
                _ => {
                    handle.spawn(cleanup);
                }
            },
            Err(_) => {
                tokio::runtime::Runtime::new().unwrap().block_on(cleanup);
            }
        }
    }
}

pub fn init_tracing() {
    static TRACING: OnceLock<()> = OnceLock::new();
    TRACING.get_or_init(|| {
        tracing_subscriber::fmt()
            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
            .with_test_writer() // routes to cargo test output
            .init();
    });
}