pgmq 0.33.0

A distributed message queue for Rust applications, on Postgres.
Documentation
use rand::Rng;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::env;

#[derive(Serialize, Debug, Deserialize, Eq, PartialEq)]
struct MyMessage {
    foo: String,
    num: u64,
}

impl Default for MyMessage {
    fn default() -> Self {
        MyMessage {
            foo: "bar".to_owned(),
            num: rand::thread_rng().gen_range(0..100),
        }
    }
}

#[ignore]
#[cfg(feature = "install-sql-embedded")]
#[tokio::test]
async fn test_sql_lifecycle() {
    let test_num = rand::thread_rng().gen_range(0..100000);
    let test_queue = format!("test_sql_lifecycle_{}", test_num);
    let db_url = env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/postgres".to_owned());
    let test_db_name = format!("pgmq_test_{}", test_num);
    let test_db_url = replace_db_string(&db_url, &format!("/{test_db_name}"));
    println!("test_db_url: {test_db_url}");
    let pool = PgPool::connect(&db_url).await.unwrap();
    sqlx::query(&format!("CREATE DATABASE {test_db_name};"))
        .execute(&pool)
        .await
        .unwrap();

    let queue = pgmq::PGMQueueExt::new(test_db_url, 1).await.unwrap();
    #[cfg(feature = "install-sql-embedded")]
    queue.install_sql_from_embedded().await.unwrap();
    queue.create(&test_queue).await.unwrap();

    let sent_msg = MyMessage::default();
    let msg_id = queue.send(&test_queue, &sent_msg).await.unwrap();
    let read_msg = queue
        .read::<MyMessage>(&test_queue, 30)
        .await
        .unwrap()
        .unwrap();
    assert_eq!(msg_id, read_msg.msg_id);
    assert_eq!(sent_msg, read_msg.message);
    queue.archive(&test_queue, msg_id).await.unwrap();
    let read_none = queue.read::<MyMessage>(&test_queue, 30).await.unwrap();
    assert!(read_none.is_none());
}

fn replace_db_string(s: &str, replacement: &str) -> String {
    match s.rfind('/') {
        Some(pos) => {
            let prefix = &s[0..pos];
            format!("{prefix}{replacement}")
        }
        None => s.to_string(),
    }
}