mire 0.2.2

A small, generic PostgreSQL event-sourcing library: append-only event streams, aggregates with optimistic concurrency, and subscription-based projections (requires tokio + sqlx)
Documentation
//! # Pattern: Snapshots (loading long streams fast)
//!
//! **Why this pattern exists.** Loading an aggregate replays its
//! entire event history. For a stream that's accumulated thousands of
//! events, that's O(N) of I/O on every load. A **snapshot** is a
//! cached fold of state at some version K; subsequent loads seed from
//! the snapshot and replay only the tail since K.
//!
//! Snapshots are a **disposable optimisation**, never the source of
//! truth. If you change the shape of your aggregate state (`#[derive(
//! Serialize)]` shape change), bump `SNAPSHOT_VERSION` — old
//! snapshots are silently ignored and loads fall back to full replay.
//! No code path ever returns a stale snapshot to your aggregate.
//!
//! ## When to use
//!
//! - Stream depth typically > 1,000 events ✔︎
//! - Load happens on a hot path (every request rebuilds state) ✔︎
//! - Aggregate state is reasonably small to serialise ✔︎
//!
//! ## When NOT to use
//!
//! - Stream stays short (under a few hundred events) — no measurable
//!   win, and you pay snapshot write cost on every Nth save.
//! - The aggregate cycles to a new stream at a business boundary
//!   (per-year, per-session). Shorter-lived streams beat snapshots.
//!
//! Run:
//!
//! ```sh
//! mise run pg:up
//! cargo run --example snapshot
//! ```

use mire::{Aggregate, EventData, EventStore, Snapshot, StreamQuery};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;

#[derive(Debug, Clone, Serialize, Deserialize, EventData)]
#[serde(tag = "type")]
#[mire(entity = "tally")]
enum TallyEvent {
    Added { amount: i64 },
}

// State must be (de)serializable to be snapshotted.
#[derive(Debug, Default, Serialize, Deserialize)]
struct Tally {
    sum: i64,
    count: i64,
}

impl Aggregate for Tally {
    type Event = TallyEvent;

    fn stream_category() -> &'static str {
        "tally"
    }

    fn apply(&mut self, event: &TallyEvent) {
        match event {
            TallyEvent::Added { amount } => {
                self.sum += amount;
                self.count += 1;
            }
        }
    }
}

impl Snapshot for Tally {
    const SNAPSHOT_VERSION: i32 = 1;
    // Small so the example writes a few snapshots; production values are larger.
    const SNAPSHOT_FREQUENCY: i64 = 5;
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let database_url = std::env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://mire:mire@localhost:5432/mire".to_string());
    let pool = PgPool::connect(&database_url).await?;

    let store = EventStore::new(pool);
    store.migrate().await?;

    let id = Uuid::new_v4().to_string();

    // Append 12 events, snapshotting on the save path. With frequency 5 this
    // writes snapshots as the version crosses 5 and 10.
    let mut tally = store.load_or_default::<Tally>(&id).await?;
    for n in 1..=12 {
        tally.record(TallyEvent::Added { amount: n });
        store.save_snapshotting(&mut tally).await?;
    }
    println!(
        "wrote {} events; state: sum={} count={}",
        tally.version, tally.state.sum, tally.state.count
    );

    // Inspect the snapshot stream: snapshots live as events alongside the log.
    let snapshots = store
        .read_stream(
            &format!("{}-snapshot", tally.stream_id),
            StreamQuery::default(),
        )
        .await?;
    println!("snapshot stream holds {} snapshot(s):", snapshots.len());
    for snap in &snapshots {
        println!(
            "  reflects version {} (snapshot_version {})",
            snap.data["version"], snap.data["snapshot_version"]
        );
    }

    // Load via the snapshot path: seeds from the newest snapshot (version 10)
    // and replays only events 11..=12 instead of all 12.
    let loaded = store.load_snapshotted::<Tally>(&id).await?.expect("exists");
    println!(
        "loaded via snapshot: sum={} count={} version={}",
        loaded.state.sum, loaded.state.count, loaded.version
    );
    assert_eq!(loaded.state.sum, (1i64..=12).sum::<i64>());
    assert_eq!(loaded.version, 12);

    println!("done");
    Ok(())
}