mire 0.2.4

A small, generic PostgreSQL event-sourcing library: append-only event streams, aggregates with optimistic concurrency, and subscription-based projections (requires tokio + sqlx)
Documentation
//! # Pattern: Batched writes for high-throughput ingest
//!
//! **Why this pattern exists.** Every call to [`EventStore::save`]
//! incurs ~8 Postgres round trips (load aggregate, begin tx, CAS the
//! stream row, INSERT events, commit). For a single write that's
//! fine. For *bulk ingest* — importing a CSV, replaying from another
//! system, applying a backlog — calling `save` once per event is
//! 10× slower than necessary.
//!
//! The fix: record many events in one aggregate-state mutation, then
//! save once. mire's `append` writes N events in a single transaction;
//! the cost is `8 + N` round trips, not `8 × N`.
//!
//! Two recipes, depending on whether you have an aggregate in hand:
//!
//! - **Going through the aggregate** (validation runs, state stays
//!   coherent): [`AggregateRoot::record_many`] +
//!   [`EventStore::save`].
//! - **Bypassing the aggregate** (you already have events, no
//!   per-event business logic): [`EventStore::append`] directly with
//!   a `Vec<Event<E>>`.
//!
//! This example does both and times them against the per-call
//! baseline so the speedup is visible.
//!
//! Run:
//!
//! ```sh
//! mise run pg:up
//! cargo run --release --example batched_write
//! ```
//!
//! (Release mode — debug build's serialisation cost masks the I/O
//! win.)

use std::time::Instant;

use mire::{Aggregate, Event, EventData, EventStore, ExpectedVersion};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;

#[derive(Debug, Clone, Serialize, Deserialize, EventData)]
#[serde(tag = "type")]
#[mire(entity = "ledger")]
enum LedgerEvent {
    Posted { amount: i64 },
}

#[derive(Debug, Default)]
struct Ledger {
    balance: i64,
}

impl Aggregate for Ledger {
    type Event = LedgerEvent;
    fn stream_category() -> &'static str {
        "ledger"
    }
    fn apply(&mut self, event: &LedgerEvent) {
        match event {
            LedgerEvent::Posted { amount } => self.balance += amount,
        }
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let url = std::env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://mire:mire@localhost:5434/mire".to_string());
    let pool = PgPool::connect(&url).await?;
    let store = EventStore::new(pool);
    store.migrate().await?;

    let events_per_recipe = 200;
    println!("ingesting {events_per_recipe} events per recipe\n");

    // ---------------------------------------------------------------
    // Baseline: one save per event. Worst case.
    // ---------------------------------------------------------------

    let id_baseline = Uuid::new_v4().to_string();
    let start = Instant::now();
    for _ in 0..events_per_recipe {
        let mut ledger = store.load_or_default::<Ledger>(&id_baseline).await?;
        ledger.record(LedgerEvent::Posted { amount: 1 });
        store.save(&mut ledger).await?;
    }
    let baseline = start.elapsed();
    println!("  baseline (one save per event): {baseline:.2?}");

    // ---------------------------------------------------------------
    // Recipe A: aggregate-level batching with record_many + save.
    // Useful when you want command-style validation per event.
    // ---------------------------------------------------------------

    let id_a = Uuid::new_v4().to_string();
    let start = Instant::now();
    let mut ledger = store.load_or_default::<Ledger>(&id_a).await?;
    ledger.record_many((0..events_per_recipe).map(|_| LedgerEvent::Posted { amount: 1 }));
    store.save(&mut ledger).await?;
    let recipe_a = start.elapsed();
    println!(
        "  Recipe A (record_many + one save):    {recipe_a:.2?}  ({:.1}× faster)",
        baseline.as_secs_f64() / recipe_a.as_secs_f64()
    );

    // ---------------------------------------------------------------
    // Recipe B: bypass the aggregate, append events directly. Useful
    // for raw ingest where you already have the events and don't need
    // per-event command validation.
    // ---------------------------------------------------------------

    let id_b = Uuid::new_v4().to_string();
    let stream_id = format!("ledger-{id_b}");
    let events: Vec<Event<LedgerEvent>> = (0..events_per_recipe)
        .map(|_| Event::new(LedgerEvent::Posted { amount: 1 }))
        .collect();
    let start = Instant::now();
    store
        .append::<LedgerEvent>(
            &stream_id,
            Ledger::stream_category(),
            ExpectedVersion::NoStream,
            &events,
        )
        .await?;
    let recipe_b = start.elapsed();
    println!(
        "  Recipe B (append directly, no agg):   {recipe_b:.2?}  ({:.1}× faster)",
        baseline.as_secs_f64() / recipe_b.as_secs_f64()
    );

    // Confirm correctness: all three produce the same end-state.
    let baseline_state = store.load::<Ledger>(&id_baseline).await?.unwrap();
    let a_state = store.load::<Ledger>(&id_a).await?.unwrap();
    let b_state = store.load::<Ledger>(&id_b).await?.unwrap();
    assert_eq!(baseline_state.state.balance, events_per_recipe);
    assert_eq!(a_state.state.balance, events_per_recipe);
    assert_eq!(b_state.state.balance, events_per_recipe);
    assert_eq!(baseline_state.version, events_per_recipe);
    assert_eq!(a_state.version, events_per_recipe);
    assert_eq!(b_state.version, events_per_recipe);
    println!(
        "\nall three streams ended at balance={events_per_recipe}, version={events_per_recipe}"
    );
    println!(
        "\nUse Recipe A when each event passes through a command (you \
         want the aggregate's apply() to validate as you go). Use \
         Recipe B when you have a stream of pre-validated events and \
         only need durability."
    );

    Ok(())
}