mire 0.2.0

A small, generic PostgreSQL event-sourcing library: append-only event streams, aggregates with optimistic concurrency, and subscription-based projections (requires tokio + sqlx)
Documentation
//! # Pattern: Projections (read models)
//!
//! **Why this pattern exists.** Aggregates are great for writing
//! decisions; they're terrible for querying. "Show me all open
//! accounts with balance > $1k" against the event log means replaying
//! every aggregate. The cure is a **projection**: a separate
//! query-optimised table that the [`ProjectionRunner`] keeps updated
//! by folding events into it as they arrive.
//!
//! Projections are eventually consistent: a write commits to the
//! event log first, then the runner picks it up and updates the read
//! model. The runner is built to survive: it checkpoints in
//! `es_subscriptions` so a restart resumes where it left off, retries
//! failing handlers with backoff, and stops rather than skip an event
//! (projections must never develop holes).
//!
//! **Handlers must be idempotent.** Delivery is at-least-once — on
//! restart or lease failover, you may see the same event twice. The
//! canonical shape is an `INSERT ... ON CONFLICT ... DO UPDATE`
//! ("upsert"), shown below.
//!
//! Run:
//!
//! ```sh
//! mise run pg:up
//! cargo run --example projection
//! ```

use std::time::Duration;

use mire::{Aggregate, EventData, EventHandler, EventStore, HandledEvent, ProjectionRunner};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

#[derive(Debug, Clone, Serialize, Deserialize, EventData)]
#[serde(tag = "type")]
#[mire(entity = "counter")]
enum CounterEvent {
    Incremented { by: i64 },
}

#[derive(Debug, Default)]
struct Counter {
    total: i64,
}

impl Aggregate for Counter {
    type Event = CounterEvent;

    fn stream_category() -> &'static str {
        "counter"
    }

    fn apply(&mut self, event: &CounterEvent) {
        match event {
            CounterEvent::Incremented { by } => self.total += by,
        }
    }
}

// The projection: a handler that folds counter events into a read-model
// table. `type Aggregate = Counter` ties this handler to the `counter`
// stream category and tells the runner how to decode each event. Handlers
// must be idempotent — delivery is at-least-once — which the upsert below
// (ON CONFLICT ... DO UPDATE) provides.
struct CounterTotalsProjection {
    db: PgPool,
}

impl EventHandler for CounterTotalsProjection {
    type Aggregate = Counter;

    async fn handle(&self, event: HandledEvent<CounterEvent>) -> anyhow::Result<()> {
        let CounterEvent::Incremented { by } = event.event;
        sqlx::query(
            "INSERT INTO counter_totals (stream_id, total) VALUES ($1, $2)
             ON CONFLICT (stream_id) DO UPDATE SET total = counter_totals.total + EXCLUDED.total",
        )
        .bind(event.stream_id())
        .bind(by)
        .execute(&self.db)
        .await?;
        Ok(())
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let database_url = std::env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://mire:mire@localhost:5432/mire".to_string());
    let pool = PgPool::connect(&database_url).await?;

    let store = EventStore::new(pool.clone());
    store.migrate().await?;

    // The read-model table the projection maintains. In a real app this lives in
    // its own migration; created inline here for the example.
    sqlx::raw_sql(
        "CREATE TABLE IF NOT EXISTS counter_totals (
            stream_id TEXT PRIMARY KEY,
            total     BIGINT NOT NULL
        )",
    )
    .execute(&pool)
    .await?;

    // Start the projection runner in the background.
    let runner = ProjectionRunner::builder(store.clone())
        .poll_interval(Duration::from_millis(50))
        .subscribe(
            "counter-totals-projection",
            CounterTotalsProjection { db: pool.clone() },
        )
        .build();

    let token = CancellationToken::new();
    let handle = {
        let token = token.clone();
        tokio::spawn(async move { runner.run(token).await })
    };

    // Meanwhile, produce some events across two counter streams.
    let mut stream_ids = Vec::new();
    for _ in 0..2 {
        let id = Uuid::new_v4().to_string();
        let mut counter = store.load_or_default::<Counter>(&id).await?;
        counter.record(CounterEvent::Incremented { by: 3 });
        counter.record(CounterEvent::Incremented { by: 7 });
        store.save(&mut counter).await?;
        stream_ids.push(counter.stream_id.clone());
    }

    // Wait until the projection has applied our streams to the read model, then
    // shut down gracefully.
    for stream_id in &stream_ids {
        loop {
            let total: Option<i64> =
                sqlx::query_scalar("SELECT total FROM counter_totals WHERE stream_id = $1")
                    .bind(stream_id)
                    .fetch_optional(&pool)
                    .await?;
            if total == Some(10) {
                break;
            }
            tokio::time::sleep(Duration::from_millis(20)).await;
        }
    }
    token.cancel();
    handle.await??;

    println!("read model caught up for {} streams:", stream_ids.len());
    for stream_id in &stream_ids {
        let total: i64 =
            sqlx::query_scalar("SELECT total FROM counter_totals WHERE stream_id = $1")
                .bind(stream_id)
                .fetch_one(&pool)
                .await?;
        println!("  {stream_id} = {total}");
    }
    println!(
        "projection lag: {}",
        store.projection_lag("counter-totals-projection").await?
    );

    Ok(())
}