strev-postgres 0.6.0

PostgreSQL backend for strev
Documentation
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;

use async_trait::async_trait;
use bytes::Bytes;
use sqlx::PgPool;
use sqlx::postgres::PgPoolOptions;
use strev::{HandlerResult, Message, Publisher, Router, ShutdownSignal, Subscriber, Topic};
use strev_postgres::{
    PostgresPublisher, PostgresPublisherConfig, PostgresSubscriber, PostgresSubscriberConfig,
};
use strev_testsuite::Backend;
use tokio_util::sync::CancellationToken;

async fn pg_pool() -> Option<PgPool> {
    let url = std::env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://postgres:postgres@127.0.0.1:5432/postgres".into());
    PgPoolOptions::new()
        .max_connections(5)
        .acquire_timeout(Duration::from_secs(3))
        .connect(&url)
        .await
        .ok()
}

struct PostgresBackend {
    pool: PgPool,
}

#[async_trait]
impl Backend for PostgresBackend {
    async fn publisher(&self) -> Box<dyn Publisher> {
        Box::new(
            PostgresPublisher::new(PostgresPublisherConfig::new(self.pool.clone()))
                .await
                .unwrap(),
        )
    }

    async fn subscriber(&self, group: &str) -> Box<dyn Subscriber> {
        Box::new(PostgresSubscriber::new(PostgresSubscriberConfig::new(
            self.pool.clone(),
            group,
        )))
    }
}

async fn backend() -> Option<PostgresBackend> {
    Some(PostgresBackend {
        pool: pg_pool().await?,
    })
}

#[tokio::test]
async fn conformance_roundtrip() {
    let Some(backend) = backend().await else {
        eprintln!("skipping: postgres not available");
        return;
    };
    strev_testsuite::roundtrip(&backend).await;
}

#[tokio::test]
async fn conformance_ordering() {
    let Some(backend) = backend().await else {
        eprintln!("skipping: postgres not available");
        return;
    };
    strev_testsuite::ordering(&backend).await;
}

#[tokio::test]
async fn conformance_metadata() {
    let Some(backend) = backend().await else {
        eprintln!("skipping: postgres not available");
        return;
    };
    strev_testsuite::metadata_fidelity(&backend).await;
}

#[tokio::test]
async fn conformance_consumer_group_resume() {
    let Some(backend) = backend().await else {
        eprintln!("skipping: postgres not available");
        return;
    };
    strev_testsuite::consumer_group_resume(&backend).await;
}

#[tokio::test]
async fn router_with_postgres() {
    let pool = match pg_pool().await {
        Some(p) => p,
        None => {
            eprintln!("skipping: postgres not available");
            return;
        }
    };

    let topic = Topic::new(format!("router.{}", uuid::Uuid::new_v4().simple()));
    let processed = Arc::new(AtomicU32::new(0));

    let mut router = Router::new();
    let counter = processed.clone();
    let group = format!("grp-{}", uuid::Uuid::new_v4().simple());
    router.add_consumer(
        "postgres_handler",
        topic.clone(),
        PostgresSubscriber::new(PostgresSubscriberConfig::new(pool.clone(), group)),
        move |msg: Message| {
            let counter = counter.clone();
            async move {
                counter.fetch_add(1, Ordering::SeqCst);
                Ok(HandlerResult::ack(msg))
            }
        },
    );

    let token = CancellationToken::new();
    let tc = token.clone();
    let handle = tokio::spawn(async move { router.run(ShutdownSignal::Token(tc)).await });

    tokio::time::sleep(Duration::from_millis(500)).await;

    let publisher = PostgresPublisher::new(PostgresPublisherConfig::new(pool))
        .await
        .unwrap();
    for i in 0..3 {
        Publisher::publish(
            &publisher,
            &topic,
            vec![Message::new(Bytes::from(format!("m-{i}")))],
        )
        .await
        .unwrap();
    }

    tokio::time::sleep(Duration::from_secs(3)).await;
    token.cancel();
    handle.await.unwrap().unwrap();

    assert_eq!(processed.load(Ordering::SeqCst), 3);
}