mire/lib.rs
1//! `mire` — a small, generic PostgreSQL event-sourcing library.
2//!
3//! Events are appended to per-aggregate streams in an append-only log, ordered
4//! both within a stream (`stream_version`) and globally (`global_position`).
5//! Aggregates are rebuilt by replaying their events; read models are built by
6//! a [`ProjectionRunner`], which polls the log and checkpoints under a
7//! replica-safe fenced lease.
8//!
9//! The core building blocks are:
10//! - [`Aggregate`] / [`AggregateRoot`] — your domain state + the events it records.
11//! - [`EventStore`] — load, save, append and read events (Postgres-backed).
12//! - [`ProjectionRunner`] — drive projections / read models across replicas,
13//! one leader per subscription via fenced leases. (The lower-level
14//! `Subscription` poll cursor it's built on is internal.)
15//!
16//! See the `examples/` directory for runnable end-to-end usage.
17
18mod error;
19mod event;
20/// Replica-coordination primitives. Surfaced publicly so concurrency
21/// tests can drive them directly; production code should use the
22/// [`ProjectionRunner`] which wraps these correctly.
23#[doc(hidden)]
24pub mod lease;
25mod projection;
26mod snapshot;
27mod store;
28mod stream;
29mod subscription;
30
31pub use lease::LeaseStatus;
32
33pub use error::{DbErrorKind, EventStoreError};
34pub use event::{
35 Event, EventData, EventMetadata, EventTypeStatic, IntoResponse, RecordedEvent, ResponseValue,
36};
37
38/// `#[derive(EventData)]` — see the crate-level docs and
39/// [`EventData`] for the runtime contract. Enabled by default via
40/// the `derive` feature; turn it off with
41/// `mire = { ..., default-features = false }` if you don't want the
42/// proc-macro dep.
43#[cfg(feature = "derive")]
44pub use mire_derive::{EventData, IntoResponse};
45pub use projection::{
46 EventHandler, HandledEvent, ProjectionRunner, ProjectionRunnerBuilder,
47 TransactionalEventHandler,
48};
49pub use snapshot::Snapshot;
50pub use store::{CommittedEvents, EventStore, TransactionScope};
51pub use stream::{ExpectedVersion, ReadDirection, StreamQuery};
52/// Low-level poll cursor. `#[doc(hidden)]` and unblessed — production code
53/// uses [`ProjectionRunner`], the replica-safe consumer built on top of it
54/// (review CORE-4/SUB-2). Surfaced only so concurrency tests can drive it.
55#[doc(hidden)]
56pub use subscription::Subscription;
57
58use serde::de::DeserializeOwned;
59
60/// The domain model whose state is rebuilt by folding events.
61///
62/// **Why `Default`?** In event sourcing, an aggregate's state is *defined*
63/// by its event history. `Default::default()` represents the state
64/// *before any events have been applied* — a logically empty aggregate.
65/// All real state arises through [`apply`](Self::apply). If your
66/// aggregate has fields that look invalid in their default form (e.g.
67/// `BankAccount { open: false, owner: "".into() }`), that's correct —
68/// those values are *unreachable in practice* because the first event
69/// (e.g. `Opened`) will set them. Mire constructs the empty aggregate,
70/// then replays events onto it.
71///
72/// If your `apply` implementation guards against operating on a default
73/// aggregate (e.g. early-returns when `!self.open`), the invariants
74/// hold even if someone accidentally calls a method on a `Default::default()`
75/// instance — the events that would unlock real behaviour simply
76/// haven't been applied yet.
77pub trait Aggregate: Default + Send + Sync {
78 type Event: EventData;
79
80 /// The category prefix for streams of this aggregate type. Streams
81 /// are identified as `format!("{category}-{id}")`; the category is
82 /// what `ProjectionRunner`s subscribe to and what
83 /// `EventStore::read_category_after` filters on.
84 fn stream_category() -> &'static str;
85
86 /// Fold a single event into the aggregate's state. Must be
87 /// deterministic and side-effect-free — it runs every time the
88 /// aggregate is reloaded.
89 fn apply(&mut self, event: &Self::Event);
90}
91
92pub struct AggregateRoot<A: Aggregate> {
93 pub state: A,
94 pub version: i64,
95 pub stream_id: String,
96 pending_events: Vec<A::Event>,
97 pub metadata: EventMetadata,
98}
99
100impl<A: Aggregate> AggregateRoot<A> {
101 pub fn new(id: &str) -> Self {
102 let stream_id = format!("{}-{}", A::stream_category(), id);
103 Self {
104 state: A::default(),
105 version: 0,
106 stream_id,
107 pending_events: Vec::new(),
108 metadata: EventMetadata::default(),
109 }
110 }
111
112 pub fn set_metadata(&mut self, metadata: EventMetadata) {
113 self.metadata = metadata;
114 }
115
116 /// Build a root seeded from snapshotted `state` at `version`, with no
117 /// pending events. The caller replays any events after `version` on top.
118 pub fn from_snapshot(stream_id: String, state: A, version: i64) -> Self {
119 Self {
120 state,
121 version,
122 stream_id,
123 pending_events: Vec::new(),
124 metadata: EventMetadata::default(),
125 }
126 }
127
128 /// Rebuild an aggregate by folding a **complete** event history.
129 ///
130 /// `events` must be the full forward history of the stream (versions
131 /// `1..=version`, contiguous). Callers obtain it from a paged read
132 /// (`EventStore::read_stream_all`) — never a row-limited read, which
133 /// would silently truncate a long stream and produce state that still
134 /// passes the optimistic-concurrency check on save (review C1/SNAP-1).
135 /// The contiguity check here is the backstop that turns any such
136 /// truncation — or a partial-append hole — into a hard
137 /// [`EventStoreError::StreamCorruption`] instead of corrupt state.
138 pub fn hydrate(
139 stream_id: String,
140 events: &[RecordedEvent],
141 version: i64,
142 ) -> Result<Self, EventStoreError>
143 where
144 A::Event: DeserializeOwned,
145 {
146 Self::check_contiguous(&stream_id, events, version)?;
147 let mut state = A::default();
148 for recorded in events {
149 let event =
150 serde_json::from_value::<A::Event>(recorded.data.clone()).map_err(|source| {
151 EventStoreError::Deserialization {
152 stream_id: recorded.stream_id.clone(),
153 global_position: recorded.global_position,
154 event_type: recorded.event_type.clone(),
155 source,
156 }
157 })?;
158 state.apply(&event);
159 }
160 Ok(Self {
161 state,
162 version,
163 stream_id,
164 pending_events: Vec::new(),
165 metadata: EventMetadata::default(),
166 })
167 }
168
169 /// Verify `events` is a contiguous prefix `1..=N` with `N >= version`.
170 ///
171 /// The backstop targets **truncation** (review C1/SNAP-1): reading
172 /// *fewer* events than the stream actually has — a row-limited read, or a
173 /// partial-append hole — which would fold incomplete state that still
174 /// passes the optimistic-concurrency check on save. Those manifest as a
175 /// missing prefix, a version gap, or a highest version *below* the
176 /// recorded `version`, and are rejected.
177 ///
178 /// Reading *more* than `version` (`N > version`) is **not** corruption:
179 /// `load` reads the `es_streams` version and the events in separate
180 /// statements, so a concurrent append committing between them yields a
181 /// few extra trailing events (the benign CORE-24 load race). The state is
182 /// still a valid, fully-contiguous history; the only consequence is that
183 /// the root's `version` is a slightly stale lower bound, so a subsequent
184 /// save conflicts and retries — safe.
185 fn check_contiguous(
186 stream_id: &str,
187 events: &[RecordedEvent],
188 version: i64,
189 ) -> Result<(), EventStoreError> {
190 let corruption = |detail: String| EventStoreError::StreamCorruption {
191 stream_id: stream_id.to_string(),
192 recorded_version: version,
193 read_count: events.len() as i64,
194 detail,
195 };
196
197 let Some(last) = events.last() else {
198 // A stream at version > 0 must have events; an empty read is a
199 // truncation, not a valid empty aggregate.
200 if version != 0 {
201 return Err(corruption("read no events".to_string()));
202 }
203 return Ok(());
204 };
205
206 if events[0].stream_version != 1 {
207 return Err(corruption(format!(
208 "first event is version {} (expected 1 — missing prefix)",
209 events[0].stream_version
210 )));
211 }
212 if last.stream_version < version {
213 return Err(corruption(format!(
214 "highest event is version {} but recorded version is {version} (truncated read)",
215 last.stream_version
216 )));
217 }
218 // Contiguity: versions must be 1, 2, 3, … with no gaps or repeats.
219 for pair in events.windows(2) {
220 if pair[1].stream_version != pair[0].stream_version + 1 {
221 return Err(corruption(format!(
222 "non-contiguous versions {} then {}",
223 pair[0].stream_version, pair[1].stream_version
224 )));
225 }
226 }
227 Ok(())
228 }
229
230 pub fn record(&mut self, event: A::Event) {
231 self.state.apply(&event);
232 self.pending_events.push(event);
233 }
234
235 /// Record many events in one call. Equivalent to calling
236 /// [`record`](Self::record) in sequence; the convenience is that the
237 /// subsequent [`EventStore::save`] writes all pending events in a
238 /// single append transaction (amortising 8 round trips per save
239 /// across N events). For high-throughput writers, this is the
240 /// canonical batched-write recipe.
241 pub fn record_many<I>(&mut self, events: I)
242 where
243 I: IntoIterator<Item = A::Event>,
244 {
245 for event in events {
246 self.record(event);
247 }
248 }
249
250 pub fn take_pending(&mut self) -> Vec<A::Event> {
251 std::mem::take(&mut self.pending_events)
252 }
253
254 /// Put previously-[taken](Self::take_pending) events back at the front of
255 /// the pending queue, preserving order ahead of anything recorded since.
256 /// Used by [`EventStore::save`](crate::EventStore::save) to restore the
257 /// root after a failed append so a retry is not a silent no-op (C8).
258 pub fn restore_pending(&mut self, mut events: Vec<A::Event>) {
259 events.append(&mut self.pending_events);
260 self.pending_events = events;
261 }
262
263 pub fn has_pending(&self) -> bool {
264 !self.pending_events.is_empty()
265 }
266
267 pub fn pending_count(&self) -> usize {
268 self.pending_events.len()
269 }
270}