Skip to main content

mire/
lib.rs

1//! `mire` — a small, generic PostgreSQL event-sourcing library.
2//!
3//! Events are appended to per-aggregate streams in an append-only log, ordered
4//! both within a stream (`stream_version`) and globally (`global_position`).
5//! Aggregates are rebuilt by replaying their events; read models are built by
6//! a [`ProjectionRunner`], which polls the log and checkpoints under a
7//! replica-safe fenced lease.
8//!
9//! The core building blocks are:
10//! - [`Aggregate`] / [`AggregateRoot`] — your domain state + the events it records.
11//! - [`EventStore`] — load, save, append and read events (Postgres-backed).
12//! - [`ProjectionRunner`] — drive projections / read models across replicas,
13//!   one leader per subscription via fenced leases. (The lower-level
14//!   `Subscription` poll cursor it's built on is internal.)
15//!
16//! See the `examples/` directory for runnable end-to-end usage.
17
18mod error;
19mod event;
20/// Replica-coordination primitives. Surfaced publicly so concurrency
21/// tests can drive them directly; production code should use the
22/// [`ProjectionRunner`] which wraps these correctly.
23#[doc(hidden)]
24pub mod lease;
25mod projection;
26mod snapshot;
27mod store;
28mod stream;
29mod subscription;
30
31pub use lease::LeaseStatus;
32
33pub use error::{DbErrorKind, EventStoreError};
34pub use event::{
35    Event, EventData, EventMetadata, EventTypeStatic, IntoResponse, RecordedEvent, ResponseValue,
36};
37
38/// `#[derive(EventData)]` — see the crate-level docs and
39/// [`EventData`] for the runtime contract. Enabled by default via
40/// the `derive` feature; turn it off with
41/// `mire = { ..., default-features = false }` if you don't want the
42/// proc-macro dep.
43#[cfg(feature = "derive")]
44pub use mire_derive::{EventData, IntoResponse};
45pub use projection::{
46    EventHandler, HandledEvent, ProjectionRunner, ProjectionRunnerBuilder,
47    TransactionalEventHandler,
48};
49pub use snapshot::Snapshot;
50pub use store::{CommittedEvents, EventStore, TransactionScope};
51pub use stream::{ExpectedVersion, ReadDirection, StreamQuery};
52/// Low-level poll cursor. `#[doc(hidden)]` and unblessed — production code
53/// uses [`ProjectionRunner`], the replica-safe consumer built on top of it
54/// (review CORE-4/SUB-2). Surfaced only so concurrency tests can drive it.
55#[doc(hidden)]
56pub use subscription::Subscription;
57
58use serde::de::DeserializeOwned;
59
60/// The domain model whose state is rebuilt by folding events.
61///
62/// **Why `Default`?** In event sourcing, an aggregate's state is *defined*
63/// by its event history. `Default::default()` represents the state
64/// *before any events have been applied* — a logically empty aggregate.
65/// All real state arises through [`apply`](Self::apply). If your
66/// aggregate has fields that look invalid in their default form (e.g.
67/// `BankAccount { open: false, owner: "".into() }`), that's correct —
68/// those values are *unreachable in practice* because the first event
69/// (e.g. `Opened`) will set them. Mire constructs the empty aggregate,
70/// then replays events onto it.
71///
72/// If your `apply` implementation guards against operating on a default
73/// aggregate (e.g. early-returns when `!self.open`), the invariants
74/// hold even if someone accidentally calls a method on a `Default::default()`
75/// instance — the events that would unlock real behaviour simply
76/// haven't been applied yet.
77pub trait Aggregate: Default + Send + Sync {
78    type Event: EventData;
79
80    /// The category prefix for streams of this aggregate type. Streams
81    /// are identified as `format!("{category}-{id}")`; the category is
82    /// what `ProjectionRunner`s subscribe to and what
83    /// `EventStore::read_category_after` filters on.
84    fn stream_category() -> &'static str;
85
86    /// Fold a single event into the aggregate's state. Must be
87    /// deterministic and side-effect-free — it runs every time the
88    /// aggregate is reloaded.
89    fn apply(&mut self, event: &Self::Event);
90}
91
92pub struct AggregateRoot<A: Aggregate> {
93    pub state: A,
94    pub version: i64,
95    pub stream_id: String,
96    pending_events: Vec<A::Event>,
97    pub metadata: EventMetadata,
98}
99
100impl<A: Aggregate> AggregateRoot<A> {
101    pub fn new(id: &str) -> Self {
102        let stream_id = format!("{}-{}", A::stream_category(), id);
103        Self {
104            state: A::default(),
105            version: 0,
106            stream_id,
107            pending_events: Vec::new(),
108            metadata: EventMetadata::default(),
109        }
110    }
111
112    pub fn set_metadata(&mut self, metadata: EventMetadata) {
113        self.metadata = metadata;
114    }
115
116    /// Build a root seeded from snapshotted `state` at `version`, with no
117    /// pending events. The caller replays any events after `version` on top.
118    pub fn from_snapshot(stream_id: String, state: A, version: i64) -> Self {
119        Self {
120            state,
121            version,
122            stream_id,
123            pending_events: Vec::new(),
124            metadata: EventMetadata::default(),
125        }
126    }
127
128    /// Rebuild an aggregate by folding a **complete** event history.
129    ///
130    /// `events` must be the full forward history of the stream (versions
131    /// `1..=version`, contiguous). Callers obtain it from a paged read
132    /// (`EventStore::read_stream_all`) — never a row-limited read, which
133    /// would silently truncate a long stream and produce state that still
134    /// passes the optimistic-concurrency check on save (review C1/SNAP-1).
135    /// The contiguity check here is the backstop that turns any such
136    /// truncation — or a partial-append hole — into a hard
137    /// [`EventStoreError::StreamCorruption`] instead of corrupt state.
138    pub fn hydrate(
139        stream_id: String,
140        events: &[RecordedEvent],
141        version: i64,
142    ) -> Result<Self, EventStoreError>
143    where
144        A::Event: DeserializeOwned,
145    {
146        Self::check_contiguous(&stream_id, events, version)?;
147        let mut state = A::default();
148        for recorded in events {
149            let event =
150                serde_json::from_value::<A::Event>(recorded.data.clone()).map_err(|source| {
151                    EventStoreError::Deserialization {
152                        stream_id: recorded.stream_id.clone(),
153                        global_position: recorded.global_position,
154                        event_type: recorded.event_type.clone(),
155                        source,
156                    }
157                })?;
158            state.apply(&event);
159        }
160        Ok(Self {
161            state,
162            version,
163            stream_id,
164            pending_events: Vec::new(),
165            metadata: EventMetadata::default(),
166        })
167    }
168
169    /// Verify `events` is a contiguous prefix `1..=N` with `N >= version`.
170    ///
171    /// The backstop targets **truncation** (review C1/SNAP-1): reading
172    /// *fewer* events than the stream actually has — a row-limited read, or a
173    /// partial-append hole — which would fold incomplete state that still
174    /// passes the optimistic-concurrency check on save. Those manifest as a
175    /// missing prefix, a version gap, or a highest version *below* the
176    /// recorded `version`, and are rejected.
177    ///
178    /// Reading *more* than `version` (`N > version`) is **not** corruption:
179    /// `load` reads the `es_streams` version and the events in separate
180    /// statements, so a concurrent append committing between them yields a
181    /// few extra trailing events (the benign CORE-24 load race). The state is
182    /// still a valid, fully-contiguous history; the only consequence is that
183    /// the root's `version` is a slightly stale lower bound, so a subsequent
184    /// save conflicts and retries — safe.
185    fn check_contiguous(
186        stream_id: &str,
187        events: &[RecordedEvent],
188        version: i64,
189    ) -> Result<(), EventStoreError> {
190        let corruption = |detail: String| EventStoreError::StreamCorruption {
191            stream_id: stream_id.to_string(),
192            recorded_version: version,
193            read_count: events.len() as i64,
194            detail,
195        };
196
197        let Some(last) = events.last() else {
198            // A stream at version > 0 must have events; an empty read is a
199            // truncation, not a valid empty aggregate.
200            if version != 0 {
201                return Err(corruption("read no events".to_string()));
202            }
203            return Ok(());
204        };
205
206        if events[0].stream_version != 1 {
207            return Err(corruption(format!(
208                "first event is version {} (expected 1 — missing prefix)",
209                events[0].stream_version
210            )));
211        }
212        if last.stream_version < version {
213            return Err(corruption(format!(
214                "highest event is version {} but recorded version is {version} (truncated read)",
215                last.stream_version
216            )));
217        }
218        // Contiguity: versions must be 1, 2, 3, … with no gaps or repeats.
219        for pair in events.windows(2) {
220            if pair[1].stream_version != pair[0].stream_version + 1 {
221                return Err(corruption(format!(
222                    "non-contiguous versions {} then {}",
223                    pair[0].stream_version, pair[1].stream_version
224                )));
225            }
226        }
227        Ok(())
228    }
229
230    pub fn record(&mut self, event: A::Event) {
231        self.state.apply(&event);
232        self.pending_events.push(event);
233    }
234
235    /// Record many events in one call. Equivalent to calling
236    /// [`record`](Self::record) in sequence; the convenience is that the
237    /// subsequent [`EventStore::save`] writes all pending events in a
238    /// single append transaction (amortising 8 round trips per save
239    /// across N events). For high-throughput writers, this is the
240    /// canonical batched-write recipe.
241    pub fn record_many<I>(&mut self, events: I)
242    where
243        I: IntoIterator<Item = A::Event>,
244    {
245        for event in events {
246            self.record(event);
247        }
248    }
249
250    pub fn take_pending(&mut self) -> Vec<A::Event> {
251        std::mem::take(&mut self.pending_events)
252    }
253
254    /// Put previously-[taken](Self::take_pending) events back at the front of
255    /// the pending queue, preserving order ahead of anything recorded since.
256    /// Used by [`EventStore::save`](crate::EventStore::save) to restore the
257    /// root after a failed append so a retry is not a silent no-op (C8).
258    pub fn restore_pending(&mut self, mut events: Vec<A::Event>) {
259        events.append(&mut self.pending_events);
260        self.pending_events = events;
261    }
262
263    pub fn has_pending(&self) -> bool {
264        !self.pending_events.is_empty()
265    }
266
267    pub fn pending_count(&self) -> usize {
268        self.pending_events.len()
269    }
270}