mire 0.2.4

A small, generic PostgreSQL event-sourcing library: append-only event streams, aggregates with optimistic concurrency, and subscription-based projections (requires tokio + sqlx)
Documentation
//! # Pattern: Optimistic concurrency control & retry
//!
//! **Why this pattern exists.** When two processes try to mutate the
//! same aggregate at the same time, you have two choices:
//!
//! - **Pessimistic**: take a row lock; the second writer waits. Slow
//!   under contention; one slow writer blocks every other.
//! - **Optimistic** (what mire does): both writers proceed; whoever
//!   commits first wins; the other gets a [`ConcurrencyConflict`] and
//!   must reload + retry.
//!
//! In steady-state, conflicts are rare and the retry cost is tiny
//! compared to per-write locking. Under contention, the *successful*
//! writer doesn't slow down — only the loser pays.
//!
//! The retry shape is short and universal: load → mutate → save →
//! on conflict, reload and retry. Wrap it in a helper if you'll write
//! more than a couple of commands.
//!
//! Run:
//!
//! ```sh
//! mise run pg:up
//! cargo run --example concurrency
//! ```
//!
//! Expected output: two concurrent tasks each commit 50 deposits.
//! The final balance is 100 × $1 = $100, but at least one task
//! observes one or more conflict-retries on the way.
//!
//! [`ConcurrencyConflict`]: mire::EventStoreError::ConcurrencyConflict

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use mire::{Aggregate, EventData, EventStore, EventStoreError};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;

#[derive(Debug, Clone, Serialize, Deserialize, EventData)]
#[serde(tag = "type")]
#[mire(entity = "counter")]
enum CounterEvent {
    Deposited { by: i64 },
}

#[derive(Debug, Default)]
struct Counter {
    total: i64,
}

impl Aggregate for Counter {
    type Event = CounterEvent;
    fn stream_category() -> &'static str {
        "counter"
    }
    fn apply(&mut self, event: &CounterEvent) {
        match event {
            CounterEvent::Deposited { by } => self.total += by,
        }
    }
}

/// The canonical retry shape. Generic over a command closure that
/// applies to the loaded aggregate. On `ConcurrencyConflict`, reload
/// and try again.
///
/// In production you'd cap the retry count and add backoff; this is
/// the minimal teaching shape.
async fn with_retry<F>(
    store: &EventStore,
    id: &str,
    conflicts: &Arc<AtomicU64>,
    mut command: F,
) -> anyhow::Result<()>
where
    F: FnMut(&mut mire::AggregateRoot<Counter>) -> anyhow::Result<()>,
{
    loop {
        let mut agg = store.load_or_default::<Counter>(id).await?;
        command(&mut agg)?;
        match store.save(&mut agg).await {
            Ok(()) => return Ok(()),
            Err(EventStoreError::ConcurrencyConflict { .. }) => {
                conflicts.fetch_add(1, Ordering::Relaxed);
                // Loop: reload and try again. No locks were ever taken.
            }
            Err(e) => return Err(e.into()),
        }
    }
}

#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> anyhow::Result<()> {
    let url = std::env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://mire:mire@localhost:5434/mire".to_string());
    let pool = PgPool::connect(&url).await?;
    let store = EventStore::new(pool);
    store.migrate().await?;

    let id = Uuid::new_v4().to_string();

    // Two tasks, both depositing $1 fifty times to the same aggregate.
    // Without retry, the loser of each race would error out and the
    // total would be wrong. With retry, both eventually succeed.
    let conflicts = Arc::new(AtomicU64::new(0));
    let writers_per_task = 50;

    let mut handles = Vec::new();
    for task_label in ["A", "B"] {
        let store = store.clone();
        let id = id.clone();
        let conflicts = conflicts.clone();
        handles.push(tokio::spawn(async move {
            for i in 0..writers_per_task {
                with_retry(&store, &id, &conflicts, |agg| {
                    agg.record(CounterEvent::Deposited { by: 1 });
                    Ok(())
                })
                .await?;
                if i % 10 == 9 {
                    println!("  task {task_label} reached deposit #{}", i + 1);
                }
            }
            anyhow::Ok(())
        }));
    }
    for h in handles {
        h.await??;
    }

    let final_view = store.load::<Counter>(&id).await?.unwrap();
    let total_conflicts = conflicts.load(Ordering::Relaxed);

    println!("\nfinal balance: {}", final_view.state.total);
    println!("stream version (events committed): {}", final_view.version);
    println!("conflict retries observed: {total_conflicts}");

    assert_eq!(
        final_view.state.total,
        (writers_per_task * 2) as i64,
        "both tasks must succeed despite the contention"
    );
    assert_eq!(final_view.version, (writers_per_task * 2) as i64);

    if total_conflicts > 0 {
        println!(
            "\nThe contention produced {total_conflicts} retries; \
             each was a small cost paid by the loser, with no lock \
             held by the winner. Throughput would degrade with locking; \
             with OCC, the winning writer never pauses."
        );
    } else {
        println!(
            "\nNo conflicts this run (the scheduling happened to \
             serialise the two tasks). Run again or increase the load \
             to observe retries."
        );
    }

    Ok(())
}