mire 0.2.4

A small, generic PostgreSQL event-sourcing library: append-only event streams, aggregates with optimistic concurrency, and subscription-based projections (requires tokio + sqlx)
Documentation
//! # Pattern: Commands & validation
//!
//! **Why this pattern exists.** Events are *facts* — things that
//! already happened. Recording an event you can't justify is how
//! event-sourced systems silently corrupt their own history.
//!
//! A **command** is the right place to ask "is this operation valid
//! given the current state?" — and to refuse with an error rather than
//! recording a nonsense event. The shape is always the same:
//!
//! 1. Load the aggregate (gives you the current state).
//! 2. Inspect the state. Bail if the operation is invalid.
//! 3. Otherwise, [`AggregateRoot::record`] the event(s) the operation
//!    produces.
//! 4. [`EventStore::save`] commits them.
//!
//! The validation lives in the command function, **not in `apply`**.
//! `apply` is a fold over committed history — it must be deterministic
//! and side-effect-free, and it runs at every load. If you put a check
//! there, a replay that no longer satisfies the check will fail to
//! reconstruct the aggregate at all.
//!
//! Run:
//!
//! ```sh
//! mise run pg:up
//! cargo run --example commands
//! ```

use mire::{Aggregate, EventData, EventStore};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;

// ----------------------------------------------------------------------
// 1. Define the events. These are *what happened* in the past tense.
//    `#[derive(EventData)]` reads `#[mire(entity = "account")]` once,
//    then maps each variant via CamelCase → kebab-case:
//      Opened    → "account.opened"
//      Deposited → "account.deposited"
//      Withdrawn → "account.withdrawn"
//      Closed    → "account.closed"
//
//    Once an entity name is in production it is **load-bearing** —
//    changing it would orphan every historical event. Variant
//    renames similarly change the persisted event_type unless you
//    pin them with `#[mire(rename = "...")]`.
// ----------------------------------------------------------------------

#[derive(Debug, Clone, Serialize, Deserialize, EventData)]
#[serde(tag = "type")]
#[mire(entity = "account")]
enum AccountEvent {
    Opened { owner: String },
    Deposited { amount: i64 },
    Withdrawn { amount: i64 },
    Closed,
}

// ----------------------------------------------------------------------
// 2. Define the aggregate. `apply` is the fold over committed events.
//    No validation here — only state assembly.
// ----------------------------------------------------------------------

#[derive(Debug, Default)]
struct Account {
    open: bool,
    owner: String,
    balance: i64,
}

impl Aggregate for Account {
    type Event = AccountEvent;
    fn stream_category() -> &'static str {
        "account"
    }
    fn apply(&mut self, event: &AccountEvent) {
        match event {
            AccountEvent::Opened { owner } => {
                self.open = true;
                self.owner = owner.clone();
            }
            AccountEvent::Deposited { amount } => self.balance += amount,
            AccountEvent::Withdrawn { amount } => self.balance -= amount,
            AccountEvent::Closed => self.open = false,
        }
    }
}

// ----------------------------------------------------------------------
// 3. Commands. Each enforces the invariants it cares about and records
//    one or more events on success.
//
//    Convention: `&mut AggregateRoot<Account>` argument; returns
//    Result so the caller can react to invalid operations.
// ----------------------------------------------------------------------

fn open(account: &mut mire::AggregateRoot<Account>, owner: &str) -> anyhow::Result<()> {
    if account.state.open {
        anyhow::bail!("account is already open");
    }
    if owner.trim().is_empty() {
        anyhow::bail!("owner cannot be empty");
    }
    account.record(AccountEvent::Opened {
        owner: owner.to_string(),
    });
    Ok(())
}

fn deposit(account: &mut mire::AggregateRoot<Account>, amount: i64) -> anyhow::Result<()> {
    if !account.state.open {
        anyhow::bail!("cannot deposit into a closed account");
    }
    if amount <= 0 {
        anyhow::bail!("deposit amount must be positive, got {amount}");
    }
    account.record(AccountEvent::Deposited { amount });
    Ok(())
}

fn withdraw(account: &mut mire::AggregateRoot<Account>, amount: i64) -> anyhow::Result<()> {
    if !account.state.open {
        anyhow::bail!("cannot withdraw from a closed account");
    }
    if amount <= 0 {
        anyhow::bail!("withdrawal amount must be positive, got {amount}");
    }
    if account.state.balance < amount {
        anyhow::bail!(
            "insufficient funds: balance {}, tried to withdraw {amount}",
            account.state.balance
        );
    }
    account.record(AccountEvent::Withdrawn { amount });
    Ok(())
}

fn close(account: &mut mire::AggregateRoot<Account>) -> anyhow::Result<()> {
    if !account.state.open {
        anyhow::bail!("account is already closed");
    }
    if account.state.balance != 0 {
        anyhow::bail!(
            "cannot close account with non-zero balance ({})",
            account.state.balance
        );
    }
    account.record(AccountEvent::Closed);
    Ok(())
}

// ----------------------------------------------------------------------
// 4. Drive a small scenario showing valid + rejected operations.
// ----------------------------------------------------------------------

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let url = std::env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://mire:mire@localhost:5434/mire".to_string());
    let pool = PgPool::connect(&url).await?;
    let store = EventStore::new(pool);
    store.migrate().await?;

    let id = Uuid::new_v4().to_string();
    let mut account = store.load_or_default::<Account>(&id).await?;

    // Happy path.
    open(&mut account, "Ada Lovelace")?;
    deposit(&mut account, 100)?;
    deposit(&mut account, 50)?;
    withdraw(&mut account, 30)?;
    store.save(&mut account).await?;
    println!(
        "balance after deposits/withdrawal: {}",
        account.state.balance
    );

    // Invalid operations are rejected — no events recorded, no save.
    let reloaded = store.load::<Account>(&id).await?.unwrap();
    let mut account = reloaded;

    if let Err(e) = withdraw(&mut account, 1_000_000) {
        println!("withdraw 1_000_000 rejected: {e}");
    }
    if let Err(e) = deposit(&mut account, -5) {
        println!("deposit -5 rejected: {e}");
    }
    if let Err(e) = close(&mut account) {
        println!("close with non-zero balance rejected: {e}");
    }
    assert!(
        !account.has_pending(),
        "no events should have been recorded for rejected commands"
    );

    // Drain to zero then close.
    withdraw(&mut account, 120)?;
    close(&mut account)?;
    store.save(&mut account).await?;
    println!("account closed; final balance: {}", account.state.balance);

    // Replay confirms history: events drove the state to (closed, 0).
    let final_view = store.load::<Account>(&id).await?.unwrap();
    assert_eq!(final_view.state.balance, 0);
    assert!(!final_view.state.open);
    assert_eq!(final_view.version, 6); // Opened, 2× Deposited, 2× Withdrawn, Closed
    println!("replay confirms: 6 events committed, balance 0, account closed");

    Ok(())
}