mire 0.2.4

A small, generic PostgreSQL event-sourcing library: append-only event streams, aggregates with optimistic concurrency, and subscription-based projections (requires tokio + sqlx)
Documentation
//! # Pattern: Multi-replica deployment (lease failover)
//!
//! **Why this pattern exists.** Every production service runs more
//! than one replica. Without coordination, every replica's
//! [`ProjectionRunner`] would process every event — duplicate work
//! and read-model contention. With mire's lease system, **exactly
//! one replica is leader** for each `subscription_id` at any given
//! moment, and when the leader dies the others take over within
//! `lease_ttl + lease_recheck_interval`.
//!
//! No coordination service required — the lease lives in Postgres
//! (the database you already have). Fence-token semantics prevent a
//! paused leader that wakes up from clobbering the new leader's
//! progress.
//!
//! This example simulates two replicas in one process (they'd be
//! separate pods in production). The lease mechanics are identical
//! across-process or in-process.
//!
//! Run:
//!
//! ```sh
//! mise run pg:up
//! cargo run --example multi_replica
//! ```
//!
//! Expected: A acquires first → processes 5 events; we kill A → B
//! takes over (fence_token bumps from 1 → 2); B processes the next
//! 5 events.
//!
//! Delivery is **effectively-once in the happy path, at-least-once under
//! failover**: a leader that pauses mid-batch and is superseded may have
//! already applied some events that the new leader, resuming from the last
//! *checkpoint*, applies again. Handlers must be idempotent (and, for the
//! current non-transactional path, order-tolerant). True exactly-once for
//! Postgres read models is the transactional-handler API tracked in the
//! review (C2/LEASE-1).

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

use mire::{Aggregate, EventData, EventHandler, EventStore, HandledEvent, ProjectionRunner};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

#[derive(Debug, Clone, Serialize, Deserialize, EventData)]
#[serde(tag = "type")]
#[mire(entity = "order")]
enum OrderEvent {
    Placed { amount: i64 },
}

#[derive(Default)]
struct Order {
    amount: i64,
}

impl Aggregate for Order {
    type Event = OrderEvent;
    fn stream_category() -> &'static str {
        "order"
    }
    fn apply(&mut self, event: &OrderEvent) {
        match event {
            OrderEvent::Placed { amount } => self.amount += amount,
        }
    }
}

/// Handler prints which worker_id processed each event. In real code
/// you'd write to a read model; the print is the demo.
struct PrintingHandler {
    worker_label: String,
    counter: Arc<AtomicU64>,
}

impl EventHandler for PrintingHandler {
    type Aggregate = Order;

    async fn handle(&self, event: HandledEvent<OrderEvent>) -> anyhow::Result<()> {
        let OrderEvent::Placed { amount } = event.event;
        let n = self.counter.fetch_add(1, Ordering::Relaxed) + 1;
        println!(
            "  [{}] handled event #{n} (amount={amount}) on {}",
            self.worker_label,
            event.stream_id(),
        );
        Ok(())
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let url = std::env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://mire:mire@localhost:5434/mire".to_string());
    let pool = PgPool::connect(&url).await?;
    let store = EventStore::new(pool.clone());
    store.migrate().await?;

    // Unique subscription_id per demo run so reruns don't collide.
    let suffix = Uuid::new_v4().simple().to_string();
    let subscription_id = format!("orders-projection-{suffix}");

    // Short lease_ttl so failover is visible within a demo timescale.
    // Production defaults are 30s ttl / 5s recheck.
    let make_runner = |label: &str| {
        let counter = Arc::new(AtomicU64::new(0));
        let runner = ProjectionRunner::builder(store.clone())
            .worker_id(format!("replica-{label}"))
            .poll_interval(Duration::from_millis(50))
            .lease_ttl(Duration::from_secs(3))
            .lease_recheck_interval(Duration::from_millis(500))
            .subscribe(
                subscription_id.clone(),
                PrintingHandler {
                    worker_label: format!("replica-{label}"),
                    counter: counter.clone(),
                },
            )
            .build();
        (runner, counter)
    };

    let (runner_a, count_a) = make_runner("A");
    let (runner_b, count_b) = make_runner("B");

    let cancel_a = CancellationToken::new();
    let cancel_b = CancellationToken::new();

    let mut handle_a = Some({
        let cancel = cancel_a.clone();
        tokio::spawn(async move { runner_a.run(cancel).await })
    });
    let mut handle_b = Some({
        let cancel = cancel_b.clone();
        tokio::spawn(async move { runner_b.run(cancel).await })
    });

    // Wait for either replica to acquire the lease.
    let lease_pool = store.pool();
    let leader_id = loop {
        if let Some(s) = mire::lease::status(lease_pool, &subscription_id).await?
            && s.leased_until > chrono::Utc::now()
        {
            break s.worker_id;
        }
        tokio::time::sleep(Duration::from_millis(50)).await;
    };
    println!("\n--- after startup ---");
    println!("lease holder: {leader_id}");

    // First batch: 5 events. The current leader processes them.
    println!("\n--- appending 5 events; leader processes ---");
    for i in 0..5 {
        let id = Uuid::new_v4().to_string();
        let mut order = store.load_or_default::<Order>(&id).await?;
        order.record(OrderEvent::Placed {
            amount: 100 * (i + 1),
        });
        store.save(&mut order).await?;
    }
    tokio::time::sleep(Duration::from_millis(800)).await;
    let before_a = count_a.load(Ordering::Relaxed);
    let before_b = count_b.load(Ordering::Relaxed);
    println!("  A processed {before_a} events");
    println!("  B processed {before_b} events");

    // Kill whichever replica won the initial race — this is the demo of
    // failover. The follower must observe the dead lease and take over.
    println!("\n--- killing the current leader ({leader_id}); follower should take over ---");
    if leader_id == "replica-A" {
        cancel_a.cancel();
        if let Some(h) = handle_a.take() {
            let _ = h.await;
        }
    } else {
        cancel_b.cancel();
        if let Some(h) = handle_b.take() {
            let _ = h.await;
        }
    }

    // Wait for failover: lease_ttl (3s) for the released `-infinity` to
    // be visible to the follower's next acquire attempt.
    tokio::time::sleep(Duration::from_secs(2)).await;
    let status_after = mire::lease::status(lease_pool, &subscription_id).await?;
    if let Some(s) = &status_after {
        println!(
            "lease holder after failover: {} (fence_token={})",
            s.worker_id, s.fence_token,
        );
    }

    println!("\n--- appending 5 more events; new leader processes ---");
    for i in 5..10 {
        let id = Uuid::new_v4().to_string();
        let mut order = store.load_or_default::<Order>(&id).await?;
        order.record(OrderEvent::Placed {
            amount: 100 * (i + 1),
        });
        store.save(&mut order).await?;
    }
    tokio::time::sleep(Duration::from_millis(800)).await;
    let after_a = count_a.load(Ordering::Relaxed);
    let after_b = count_b.load(Ordering::Relaxed);
    println!(
        "  A processed {after_a} events total (Δ {})",
        after_a - before_a
    );
    println!(
        "  B processed {after_b} events total (Δ {})",
        after_b - before_b
    );

    cancel_a.cancel();
    cancel_b.cancel();
    if let Some(h) = handle_a {
        let _ = h.await;
    }
    if let Some(h) = handle_b {
        let _ = h.await;
    }

    println!("\n--- summary ---");
    println!(
        "  total events processed: A={after_a}, B={after_b} (sum {})",
        after_a + after_b
    );
    println!(
        "  10 events were appended; in this clean handoff each was delivered \
         once to the current leader. Under a mid-batch pause+failover, \
         delivery is at-least-once — idempotent handlers absorb the \
         re-delivery."
    );

    Ok(())
}