mire 0.2.1

A small, generic PostgreSQL event-sourcing library: append-only event streams, aggregates with optimistic concurrency, and subscription-based projections (requires tokio + sqlx)
Documentation
//! `mire` — a small, generic PostgreSQL event-sourcing library.
//!
//! Events are appended to per-aggregate streams in an append-only log, ordered
//! both within a stream (`stream_version`) and globally (`global_position`).
//! Aggregates are rebuilt by replaying their events; read models are built by
//! a [`ProjectionRunner`], which polls the log and checkpoints under a
//! replica-safe fenced lease.
//!
//! The core building blocks are:
//! - [`Aggregate`] / [`AggregateRoot`] — your domain state + the events it records.
//! - [`EventStore`] — load, save, append and read events (Postgres-backed).
//! - [`ProjectionRunner`] — drive projections / read models across replicas,
//!   one leader per subscription via fenced leases. (The lower-level
//!   `Subscription` poll cursor it's built on is internal.)
//!
//! See the `examples/` directory for runnable end-to-end usage.

mod error;
mod event;
/// Replica-coordination primitives. Surfaced publicly so concurrency
/// tests can drive them directly; production code should use the
/// [`ProjectionRunner`] which wraps these correctly.
#[doc(hidden)]
pub mod lease;
mod projection;
mod snapshot;
mod store;
mod stream;
mod subscription;

pub use lease::LeaseStatus;

pub use error::{DbErrorKind, EventStoreError};
pub use event::{
    Event, EventData, EventMetadata, EventTypeStatic, IntoResponse, RecordedEvent, ResponseValue,
};

/// `#[derive(EventData)]` — see the crate-level docs and
/// [`EventData`] for the runtime contract. Enabled by default via
/// the `derive` feature; turn it off with
/// `mire = { ..., default-features = false }` if you don't want the
/// proc-macro dep.
#[cfg(feature = "derive")]
pub use mire_derive::{EventData, IntoResponse};
pub use projection::{
    EventHandler, HandledEvent, ProjectionRunner, ProjectionRunnerBuilder,
    TransactionalEventHandler,
};
pub use snapshot::Snapshot;
pub use store::{CommittedEvents, EventStore, TransactionScope};
pub use stream::{ExpectedVersion, ReadDirection, StreamQuery};
/// Low-level poll cursor. `#[doc(hidden)]` and unblessed — production code
/// uses [`ProjectionRunner`], the replica-safe consumer built on top of it
/// (review CORE-4/SUB-2). Surfaced only so concurrency tests can drive it.
#[doc(hidden)]
pub use subscription::Subscription;

use serde::de::DeserializeOwned;

/// The domain model whose state is rebuilt by folding events.
///
/// **Why `Default`?** In event sourcing, an aggregate's state is *defined*
/// by its event history. `Default::default()` represents the state
/// *before any events have been applied* — a logically empty aggregate.
/// All real state arises through [`apply`](Self::apply). If your
/// aggregate has fields that look invalid in their default form (e.g.
/// `BankAccount { open: false, owner: "".into() }`), that's correct —
/// those values are *unreachable in practice* because the first event
/// (e.g. `Opened`) will set them. Mire constructs the empty aggregate,
/// then replays events onto it.
///
/// If your `apply` implementation guards against operating on a default
/// aggregate (e.g. early-returns when `!self.open`), the invariants
/// hold even if someone accidentally calls a method on a `Default::default()`
/// instance — the events that would unlock real behaviour simply
/// haven't been applied yet.
pub trait Aggregate: Default + Send + Sync {
    type Event: EventData;

    /// The category prefix for streams of this aggregate type. Streams
    /// are identified as `format!("{category}-{id}")`; the category is
    /// what `ProjectionRunner`s subscribe to and what
    /// `EventStore::read_category_after` filters on.
    fn stream_category() -> &'static str;

    /// Fold a single event into the aggregate's state. Must be
    /// deterministic and side-effect-free — it runs every time the
    /// aggregate is reloaded.
    fn apply(&mut self, event: &Self::Event);
}

pub struct AggregateRoot<A: Aggregate> {
    pub state: A,
    pub version: i64,
    pub stream_id: String,
    pending_events: Vec<A::Event>,
    pub metadata: EventMetadata,
}

impl<A: Aggregate> AggregateRoot<A> {
    pub fn new(id: &str) -> Self {
        let stream_id = format!("{}-{}", A::stream_category(), id);
        Self {
            state: A::default(),
            version: 0,
            stream_id,
            pending_events: Vec::new(),
            metadata: EventMetadata::default(),
        }
    }

    pub fn set_metadata(&mut self, metadata: EventMetadata) {
        self.metadata = metadata;
    }

    /// Build a root seeded from snapshotted `state` at `version`, with no
    /// pending events. The caller replays any events after `version` on top.
    pub fn from_snapshot(stream_id: String, state: A, version: i64) -> Self {
        Self {
            state,
            version,
            stream_id,
            pending_events: Vec::new(),
            metadata: EventMetadata::default(),
        }
    }

    /// Rebuild an aggregate by folding a **complete** event history.
    ///
    /// `events` must be the full forward history of the stream (versions
    /// `1..=version`, contiguous). Callers obtain it from a paged read
    /// (`EventStore::read_stream_all`) — never a row-limited read, which
    /// would silently truncate a long stream and produce state that still
    /// passes the optimistic-concurrency check on save (review C1/SNAP-1).
    /// The contiguity check here is the backstop that turns any such
    /// truncation — or a partial-append hole — into a hard
    /// [`EventStoreError::StreamCorruption`] instead of corrupt state.
    pub fn hydrate(
        stream_id: String,
        events: &[RecordedEvent],
        version: i64,
    ) -> Result<Self, EventStoreError>
    where
        A::Event: DeserializeOwned,
    {
        Self::check_contiguous(&stream_id, events, version)?;
        let mut state = A::default();
        for recorded in events {
            let event =
                serde_json::from_value::<A::Event>(recorded.data.clone()).map_err(|source| {
                    EventStoreError::Deserialization {
                        stream_id: recorded.stream_id.clone(),
                        global_position: recorded.global_position,
                        event_type: recorded.event_type.clone(),
                        source,
                    }
                })?;
            state.apply(&event);
        }
        Ok(Self {
            state,
            version,
            stream_id,
            pending_events: Vec::new(),
            metadata: EventMetadata::default(),
        })
    }

    /// Verify `events` is a contiguous prefix `1..=N` with `N >= version`.
    ///
    /// The backstop targets **truncation** (review C1/SNAP-1): reading
    /// *fewer* events than the stream actually has — a row-limited read, or a
    /// partial-append hole — which would fold incomplete state that still
    /// passes the optimistic-concurrency check on save. Those manifest as a
    /// missing prefix, a version gap, or a highest version *below* the
    /// recorded `version`, and are rejected.
    ///
    /// Reading *more* than `version` (`N > version`) is **not** corruption:
    /// `load` reads the `es_streams` version and the events in separate
    /// statements, so a concurrent append committing between them yields a
    /// few extra trailing events (the benign CORE-24 load race). The state is
    /// still a valid, fully-contiguous history; the only consequence is that
    /// the root's `version` is a slightly stale lower bound, so a subsequent
    /// save conflicts and retries — safe.
    fn check_contiguous(
        stream_id: &str,
        events: &[RecordedEvent],
        version: i64,
    ) -> Result<(), EventStoreError> {
        let corruption = |detail: String| EventStoreError::StreamCorruption {
            stream_id: stream_id.to_string(),
            recorded_version: version,
            read_count: events.len() as i64,
            detail,
        };

        let Some(last) = events.last() else {
            // A stream at version > 0 must have events; an empty read is a
            // truncation, not a valid empty aggregate.
            if version != 0 {
                return Err(corruption("read no events".to_string()));
            }
            return Ok(());
        };

        if events[0].stream_version != 1 {
            return Err(corruption(format!(
                "first event is version {} (expected 1 — missing prefix)",
                events[0].stream_version
            )));
        }
        if last.stream_version < version {
            return Err(corruption(format!(
                "highest event is version {} but recorded version is {version} (truncated read)",
                last.stream_version
            )));
        }
        // Contiguity: versions must be 1, 2, 3, … with no gaps or repeats.
        for pair in events.windows(2) {
            if pair[1].stream_version != pair[0].stream_version + 1 {
                return Err(corruption(format!(
                    "non-contiguous versions {} then {}",
                    pair[0].stream_version, pair[1].stream_version
                )));
            }
        }
        Ok(())
    }

    pub fn record(&mut self, event: A::Event) {
        self.state.apply(&event);
        self.pending_events.push(event);
    }

    /// Record many events in one call. Equivalent to calling
    /// [`record`](Self::record) in sequence; the convenience is that the
    /// subsequent [`EventStore::save`] writes all pending events in a
    /// single append transaction (amortising 8 round trips per save
    /// across N events). For high-throughput writers, this is the
    /// canonical batched-write recipe.
    pub fn record_many<I>(&mut self, events: I)
    where
        I: IntoIterator<Item = A::Event>,
    {
        for event in events {
            self.record(event);
        }
    }

    pub fn take_pending(&mut self) -> Vec<A::Event> {
        std::mem::take(&mut self.pending_events)
    }

    /// Put previously-[taken](Self::take_pending) events back at the front of
    /// the pending queue, preserving order ahead of anything recorded since.
    /// Used by [`EventStore::save`](crate::EventStore::save) to restore the
    /// root after a failed append so a retry is not a silent no-op (C8).
    pub fn restore_pending(&mut self, mut events: Vec<A::Event>) {
        events.append(&mut self.pending_events);
        self.pending_events = events;
    }

    pub fn has_pending(&self) -> bool {
        !self.pending_events.is_empty()
    }

    pub fn pending_count(&self) -> usize {
        self.pending_events.len()
    }
}