mire 0.2.3

A small, generic PostgreSQL event-sourcing library: append-only event streams, aggregates with optimistic concurrency, and subscription-based projections (requires tokio + sqlx)
Documentation
//! # Pattern: Read-before-write across aggregates (`TransactionScope`)
//!
//! **Why this pattern exists.** Most of the time, mire wants you to
//! keep one aggregate independent of another — that's what makes
//! event sourcing scale. But occasionally a single business decision
//! genuinely spans two aggregates: "transfer $X from account A to
//! account B; both succeed or both fail."
//!
//! For that case [`EventStore::begin_transaction`] opens a
//! [`TransactionScope`] — a Postgres transaction that lets you:
//!
//! 1. Load multiple aggregates inside the same DB transaction.
//! 2. Use [`load_for_update`] to take a row-lock on the streams that
//!    the decision depends on, so a concurrent writer can't slip in
//!    between your read and write.
//! 3. Save events for all of them atomically — commit or rollback as
//!    one unit.
//!
//! This is a deliberate **escape hatch**, not the default. Read its
//! docs: "If you find yourself reaching for `load_for_update` in
//! steady-state command code, that's a signal the projection boundary
//! is in the wrong place." Use it sparingly, for genuinely
//! cross-aggregate invariants.
//!
//! Run:
//!
//! ```sh
//! mise run pg:up
//! cargo run --example transaction_scope
//! ```
//!
//! [`load_for_update`]: mire::TransactionScope::load_for_update

use mire::{Aggregate, EventData, EventStore};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;

#[derive(Debug, Clone, Serialize, Deserialize, EventData)]
#[serde(tag = "type")]
#[mire(entity = "account")]
enum AccountEvent {
    Opened { initial: i64 },
    Debited { amount: i64 },
    Credited { amount: i64 },
}

#[derive(Debug, Default)]
struct Account {
    balance: i64,
}

impl Aggregate for Account {
    type Event = AccountEvent;
    fn stream_category() -> &'static str {
        "account"
    }
    fn apply(&mut self, event: &AccountEvent) {
        match event {
            AccountEvent::Opened { initial } => self.balance = *initial,
            AccountEvent::Debited { amount } => self.balance -= amount,
            AccountEvent::Credited { amount } => self.balance += amount,
        }
    }
}

/// Open an account with an initial balance. Single-aggregate, no
/// scope needed.
async fn open_account(store: &EventStore, id: &str, initial: i64) -> anyhow::Result<()> {
    let mut account = store.load_or_default::<Account>(id).await?;
    account.record(AccountEvent::Opened { initial });
    store.save(&mut account).await?;
    Ok(())
}

/// Atomic transfer between two accounts. This is the pattern in focus.
///
/// Why `load_for_update`? Without the lock, a concurrent writer could
/// modify `from`'s balance between our read and our save, slipping in
/// a withdrawal that takes the balance below zero. With the lock,
/// our scope's view of `from` is stable until we commit.
async fn transfer(
    store: &EventStore,
    from_id: &str,
    to_id: &str,
    amount: i64,
) -> anyhow::Result<()> {
    if amount <= 0 {
        anyhow::bail!("transfer amount must be positive");
    }

    let mut scope = store.begin_transaction().await?;

    // Lock both stream rows so concurrent writers can't slip in.
    // (Lock acquisition order is consistent — alphabetical by stream_id
    // here — to avoid deadlocks under concurrent transfers in both
    // directions.)
    let (first, second) = if from_id <= to_id {
        (from_id, to_id)
    } else {
        (to_id, from_id)
    };
    let _ = scope.load_for_update::<Account>(first).await?;
    let _ = scope.load_for_update::<Account>(second).await?;

    // Now do the real loads (cheap; the rows are already locked).
    let mut from = scope
        .load::<Account>(from_id)
        .await?
        .ok_or_else(|| anyhow::anyhow!("source account {from_id} does not exist"))?;
    let mut to = scope
        .load::<Account>(to_id)
        .await?
        .ok_or_else(|| anyhow::anyhow!("destination account {to_id} does not exist"))?;

    // Cross-aggregate invariant: source can't overdraft.
    if from.state.balance < amount {
        // Rolling back releases both locks. Other writers proceed.
        scope.rollback().await?;
        anyhow::bail!(
            "insufficient funds in {from_id}: balance {}, transfer {amount}",
            from.state.balance
        );
    }

    from.record(AccountEvent::Debited { amount });
    to.record(AccountEvent::Credited { amount });

    scope.save(&mut from).await?;
    scope.save(&mut to).await?;
    scope.commit().await?;
    Ok(())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let url = std::env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://mire:mire@localhost:5434/mire".to_string());
    let pool = PgPool::connect(&url).await?;
    let store = EventStore::new(pool);
    store.migrate().await?;

    let alice = Uuid::new_v4().to_string();
    let bob = Uuid::new_v4().to_string();

    open_account(&store, &alice, 100).await?;
    open_account(&store, &bob, 0).await?;

    // Happy-path transfer.
    transfer(&store, &alice, &bob, 30).await?;
    let alice_after = store.load::<Account>(&alice).await?.unwrap();
    let bob_after = store.load::<Account>(&bob).await?.unwrap();
    println!(
        "after $30 transfer: Alice={}, Bob={} (sum = {} ✓)",
        alice_after.state.balance,
        bob_after.state.balance,
        alice_after.state.balance + bob_after.state.balance
    );

    // Reject: overdraft is caught inside the scope; nothing commits.
    match transfer(&store, &alice, &bob, 1_000_000).await {
        Err(e) => println!("rejected: {e}"),
        Ok(()) => unreachable!(),
    }
    let alice_after = store.load::<Account>(&alice).await?.unwrap();
    let bob_after = store.load::<Account>(&bob).await?.unwrap();
    println!(
        "after rejected transfer: Alice={}, Bob={} (unchanged ✓)",
        alice_after.state.balance, bob_after.state.balance
    );

    println!(
        "\nThe two aggregates moved together (commit) or not at all \
         (rollback). The lock prevented another concurrent transfer \
         from racing the read-decide-write."
    );

    Ok(())
}