strev-postgres 0.6.0

PostgreSQL backend for strev
Documentation
use sqlx::PgPool;

const MESSAGES_DDL: &str = "CREATE TABLE IF NOT EXISTS strev_messages (
    id BIGSERIAL PRIMARY KEY,
    topic TEXT NOT NULL,
    uuid TEXT NOT NULL,
    payload BYTEA NOT NULL,
    metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now()
)";

const MESSAGES_INDEX_DDL: &str =
    "CREATE INDEX IF NOT EXISTS strev_messages_topic_id ON strev_messages (topic, id)";

const OFFSETS_DDL: &str = "CREATE TABLE IF NOT EXISTS strev_offsets (
    consumer_group TEXT NOT NULL,
    topic TEXT NOT NULL,
    last_id BIGINT NOT NULL DEFAULT 0,
    PRIMARY KEY (consumer_group, topic)
)";

const SCHEMA_LOCK_KEY: i64 = 0x_7374_7265_7600;

pub(crate) async fn ensure_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
    let mut tx = pool.begin().await?;
    sqlx::query("SELECT pg_advisory_xact_lock($1)")
        .bind(SCHEMA_LOCK_KEY)
        .execute(&mut *tx)
        .await?;
    sqlx::query(MESSAGES_DDL).execute(&mut *tx).await?;
    sqlx::query(MESSAGES_INDEX_DDL).execute(&mut *tx).await?;
    sqlx::query(OFFSETS_DDL).execute(&mut *tx).await?;
    tx.commit().await?;
    Ok(())
}