mire 0.2.0

A small, generic PostgreSQL event-sourcing library: append-only event streams, aggregates with optimistic concurrency, and subscription-based projections (requires tokio + sqlx)
Documentation
//! Shared test plumbing: a single, guarded, migrate-once entry point.
//!
//! Migration runs `CREATE INDEX` / `ALTER TABLE`, which take table-level
//! locks that conflict with in-flight writers — even on the idempotent
//! "already applied" path, since Postgres grabs the lock before the
//! `IF NOT EXISTS` check. If every test re-ran `migrate()` concurrently
//! with peers that hold an open [`mire::TransactionScope`], a migrate could
//! block on that writer's locks while the writer's own progress queues
//! behind the migrate's pending lock — a cross-connection cycle Postgres
//! can't break (the tx-holding connection just looks `idle in transaction`),
//! so the whole binary wedges.
//!
//! The fix is sequencing, not luck: migrate **exactly once per test
//! binary, on its own dedicated connection, behind a guard**; every test
//! blocks on that guard and only then opens its own pool + transaction
//! scopes. By the time any operational write runs, the DDL is long done,
//! so there is never migrate-vs-writer lock contention.

#![allow(dead_code)]

use mire::EventStore;
use sqlx::PgPool;
use tokio::sync::OnceCell;

/// The guard: the schema is migrated the first time any test asks for a
/// store, and never again for the lifetime of this binary.
static MIGRATED: OnceCell<()> = OnceCell::const_new();

fn database_url() -> Option<String> {
    std::env::var("DATABASE_URL").ok().filter(|s| !s.is_empty())
}

/// A store backed by a fresh per-call pool, with the schema guaranteed
/// migrated. Returns `None` (so the caller can skip) when `DATABASE_URL`
/// is unset.
pub async fn store() -> Option<EventStore> {
    let url = database_url()?;

    // Migrate once, on its own dedicated connection, before returning any
    // store. Concurrent first-callers all await this same future, so no
    // operational work begins until the DDL has fully committed.
    MIGRATED
        .get_or_init(|| async {
            let migrate_pool = PgPool::connect(&url)
                .await
                .expect("connect (one-time migration)");
            EventStore::new(migrate_pool)
                .migrate()
                .await
                .expect("one-time schema migration");
            // `migrate_pool` is dropped here — the migration connection is
            // not reused for operational work.
        })
        .await;

    let pool = PgPool::connect(&url).await.expect("connect (test pool)");
    Some(EventStore::new(pool))
}

/// Like [`store`], but lets the caller configure the operational pool
/// (e.g. a larger `max_connections` for contention tests). The one-time
/// migration still runs on its own dedicated connection behind the guard.
pub async fn store_with_pool(pool: PgPool) -> EventStore {
    let url = database_url().expect("DATABASE_URL set for store_with_pool");
    MIGRATED
        .get_or_init(|| async {
            let migrate_pool = PgPool::connect(&url)
                .await
                .expect("connect (one-time migration)");
            EventStore::new(migrate_pool)
                .migrate()
                .await
                .expect("one-time schema migration");
        })
        .await;
    EventStore::new(pool)
}