sourcery_core/store.rs
1//! Persistence layer abstractions.
2//!
3//! This module describes the storage contract ([`EventStore`]), wire formats,
4//! and a reference in-memory implementation. Filters and positions live here
5//! to keep storage concerns together.
6//!
7//! # Event Lifecycle
8//!
9//! Events flow through a simple lifecycle during persistence:
10//!
11//! ```text
12//! DomainEvent ──commit_events()──▶ Database
13//! │
14//! DomainEvent ◀──decode_event()── StoredEvent ◀──load_events()─┘
15//! ```
16//!
17//! [`StoredEvent`] contains the serialised event data plus store-assigned
18//! metadata (position, aggregate info). Use [`EventStore::decode_event`] to
19//! deserialise back to a domain event.
20use std::future::Future;
21
22pub use nonempty::NonEmpty;
23use thiserror::Error;
24
25use crate::concurrency::ConcurrencyConflict;
26
27pub mod inmemory;
28
29/// An event loaded from the store.
30///
31/// Contains the serialised event data plus store-assigned metadata. Returned by
32/// [`EventStore::load_events`]. Use [`EventStore::decode_event`] to deserialise
33/// the `data` field back into a domain event.
34///
35/// # Type Parameters
36///
37/// - `Id`: The aggregate identifier type
38/// - `Pos`: The position type used for ordering
39/// - `Data`: The serialised event payload type (e.g., `serde_json::Value`)
40/// - `M`: The metadata type
41#[derive(Clone, Debug)]
42pub struct StoredEvent<Id, Pos, Data, M> {
43 /// The aggregate type identifier (e.g., `"account"`).
44 pub aggregate_kind: String,
45 /// The aggregate instance identifier.
46 pub aggregate_id: Id,
47 /// The event type identifier (e.g., `"account.deposited"`).
48 pub kind: String,
49 /// The global position assigned by the store.
50 pub position: Pos,
51 /// The serialised event payload.
52 pub data: Data,
53 /// Infrastructure metadata (timestamps, causation IDs, etc.).
54 pub metadata: M,
55}
56
57impl<Id, Pos, Data, M> StoredEvent<Id, Pos, Data, M> {
58 /// Returns the aggregate type identifier.
59 #[inline]
60 pub fn aggregate_kind(&self) -> &str {
61 &self.aggregate_kind
62 }
63
64 /// Returns a reference to the aggregate instance identifier.
65 #[inline]
66 pub const fn aggregate_id(&self) -> &Id {
67 &self.aggregate_id
68 }
69
70 /// Returns the event type identifier.
71 #[inline]
72 pub fn kind(&self) -> &str {
73 &self.kind
74 }
75
76 /// Returns a reference to the metadata.
77 #[inline]
78 pub const fn metadata(&self) -> &M {
79 &self.metadata
80 }
81}
82
83impl<Id, Pos: Clone, Data, M> StoredEvent<Id, Pos, Data, M> {
84 /// Returns a copy of the position.
85 #[inline]
86 pub fn position(&self) -> Pos {
87 self.position.clone()
88 }
89}
90
91/// Filter describing which events should be loaded from the store.
92#[derive(Clone, Debug, PartialEq, Eq)]
93pub struct EventFilter<Id, Pos = ()> {
94 pub event_kind: String,
95 pub aggregate_kind: Option<String>,
96 pub aggregate_id: Option<Id>,
97 /// Only load events with position strictly greater than this value.
98 /// Used for snapshot-based loading to skip already-applied events.
99 pub after_position: Option<Pos>,
100}
101
102impl<Id, Pos> EventFilter<Id, Pos> {
103 /// Load all events of the specified kind across every aggregate.
104 #[must_use]
105 pub fn for_event(kind: impl Into<String>) -> Self {
106 Self {
107 event_kind: kind.into(),
108 aggregate_kind: None,
109 aggregate_id: None,
110 after_position: None,
111 }
112 }
113
114 /// Load events of the specified kind for a single aggregate instance.
115 #[must_use]
116 pub fn for_aggregate(
117 event_kind: impl Into<String>,
118 aggregate_kind: impl Into<String>,
119 aggregate_id: impl Into<Id>,
120 ) -> Self {
121 Self {
122 event_kind: event_kind.into(),
123 aggregate_kind: Some(aggregate_kind.into()),
124 aggregate_id: Some(aggregate_id.into()),
125 after_position: None,
126 }
127 }
128
129 /// Only load events with position strictly greater than the given value.
130 ///
131 /// This is used for snapshot-based loading: load a snapshot at position N,
132 /// then load events with `after(N)` to get only the events that occurred
133 /// after the snapshot was taken.
134 #[must_use]
135 pub fn after(mut self, position: Pos) -> Self {
136 self.after_position = Some(position);
137 self
138 }
139}
140
141/// Error from commit operations without version checking.
142#[derive(Debug, Error)]
143pub enum CommitError<StoreError>
144where
145 StoreError: std::error::Error,
146{
147 /// Failed to serialise an event.
148 #[error("failed to serialize event at index {index}")]
149 Serialization {
150 index: usize,
151 #[source]
152 source: StoreError,
153 },
154 /// Underlying store error.
155 #[error("store error: {0}")]
156 Store(#[source] StoreError),
157}
158
159/// Error from commit operations with optimistic concurrency checking.
160#[derive(Debug, Error)]
161pub enum OptimisticCommitError<Pos, StoreError>
162where
163 Pos: std::fmt::Debug,
164 StoreError: std::error::Error,
165{
166 /// Failed to serialise an event.
167 #[error("failed to serialize event at index {index}")]
168 Serialization {
169 index: usize,
170 #[source]
171 source: StoreError,
172 },
173 /// Concurrency conflict - another writer modified the stream.
174 #[error(transparent)]
175 Conflict(#[from] ConcurrencyConflict<Pos>),
176 /// Underlying store error.
177 #[error("store error: {0}")]
178 Store(#[source] StoreError),
179}
180
181/// Successful commit of events to a stream.
182#[derive(Clone, Copy, Debug, Eq, PartialEq)]
183pub struct Committed<Pos> {
184 /// Position of the last event written in the batch.
185 pub last_position: Pos,
186}
187
188/// A vector of stored events loaded from an event store.
189pub type StoredEvents<Id, Pos, Data, M> = Vec<StoredEvent<Id, Pos, Data, M>>;
190
191/// Result type for event loading operations.
192pub type LoadEventsResult<Id, Pos, Data, M, E> = Result<StoredEvents<Id, Pos, Data, M>, E>;
193
194/// Abstraction over the persistence layer for event streams.
195///
196/// This trait supports both stream-based and type-partitioned storage
197/// implementations. Stores own event serialisation/deserialisation.
198///
199/// Associated types allow stores to customise their behaviour:
200/// - `Id`: Aggregate identifier type
201/// - `Position`: Ordering strategy (`()` for stream-based, `u64` for global
202/// ordering)
203/// - `Metadata`: Infrastructure metadata type (timestamps, causation tracking,
204/// etc.)
205/// - `Data`: Serialised event payload type (e.g., `serde_json::Value` for JSON)
206// ANCHOR: event_store_trait
207pub trait EventStore: Send + Sync {
208 /// Aggregate identifier type.
209 ///
210 /// This type must be clonable so repositories can reuse IDs across calls.
211 /// Common choices: `String`, `Uuid`, or custom ID types.
212 type Id: Clone + Send + Sync + 'static;
213
214 /// Position type used for ordering events and version checking.
215 ///
216 /// Must be `Clone + PartialEq` to support optimistic concurrency.
217 /// Use `()` if ordering is not needed.
218 type Position: Clone + PartialEq + std::fmt::Debug + Send + Sync + 'static;
219
220 /// Store-specific error type.
221 type Error: std::error::Error + Send + Sync + 'static;
222
223 /// Metadata type for infrastructure concerns.
224 type Metadata: Send + Sync + 'static;
225
226 /// Serialised event payload type.
227 ///
228 /// This is the format used to store event data internally. Common choices:
229 /// - `serde_json::Value` for JSON-based stores
230 /// - `Vec<u8>` for binary stores
231 type Data: Clone + Send + Sync + 'static;
232
233 /// Decode a stored event into a concrete event type.
234 ///
235 /// Deserialises the `data` field of a [`StoredEvent`] back into a domain
236 /// event.
237 ///
238 /// # Errors
239 ///
240 /// Returns an error if deserialisation fails.
241 fn decode_event<E>(
242 &self,
243 stored: &StoredEvent<Self::Id, Self::Position, Self::Data, Self::Metadata>,
244 ) -> Result<E, Self::Error>
245 where
246 E: crate::event::DomainEvent + serde::de::DeserializeOwned;
247
248 /// Get the current version (latest position) for an aggregate stream.
249 ///
250 /// Returns `None` for streams with no events.
251 ///
252 /// # Errors
253 ///
254 /// Returns a store-specific error when the operation fails.
255 fn stream_version<'a>(
256 &'a self,
257 aggregate_kind: &'a str,
258 aggregate_id: &'a Self::Id,
259 ) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;
260
261 /// Commit events to an aggregate stream without version checking.
262 ///
263 /// Events are serialised and persisted atomically. No conflict detection
264 /// is performed (last-writer-wins).
265 ///
266 /// # Errors
267 ///
268 /// Returns [`CommitError::Serialization`] if an event fails to serialise,
269 /// or [`CommitError::Store`] if persistence fails.
270 fn commit_events<'a, E>(
271 &'a self,
272 aggregate_kind: &'a str,
273 aggregate_id: &'a Self::Id,
274 events: NonEmpty<E>,
275 metadata: &'a Self::Metadata,
276 ) -> impl Future<Output = Result<Committed<Self::Position>, CommitError<Self::Error>>> + Send + 'a
277 where
278 E: crate::event::EventKind + serde::Serialize + Send + Sync + 'a,
279 Self::Metadata: Clone;
280
281 /// Commit events to an aggregate stream with optimistic concurrency
282 /// control.
283 ///
284 /// Events are serialised and persisted atomically. The commit fails if:
285 /// - `expected_version` is `Some(v)` and the current version differs from
286 /// `v`
287 /// - `expected_version` is `None` and the stream already has events (new
288 /// aggregate expected)
289 ///
290 /// # Errors
291 ///
292 /// Returns [`OptimisticCommitError::Serialization`] if an event fails to
293 /// serialise, [`OptimisticCommitError::Conflict`] if the version check
294 /// fails, or [`OptimisticCommitError::Store`] if persistence fails.
295 #[allow(clippy::type_complexity)]
296 fn commit_events_optimistic<'a, E>(
297 &'a self,
298 aggregate_kind: &'a str,
299 aggregate_id: &'a Self::Id,
300 expected_version: Option<Self::Position>,
301 events: NonEmpty<E>,
302 metadata: &'a Self::Metadata,
303 ) -> impl Future<
304 Output = Result<
305 Committed<Self::Position>,
306 OptimisticCommitError<Self::Position, Self::Error>,
307 >,
308 > + Send
309 + 'a
310 where
311 E: crate::event::EventKind + serde::Serialize + Send + Sync + 'a,
312 Self::Metadata: Clone;
313
314 /// Load events matching the specified filters.
315 ///
316 /// Each filter describes an event kind and optional aggregate identity:
317 /// - [`EventFilter::for_event`] loads every event of the given kind
318 /// - [`EventFilter::for_aggregate`] narrows to a single aggregate instance
319 ///
320 /// The store optimises based on its storage model and returns events
321 /// merged by position (if positions are available).
322 ///
323 /// # Errors
324 ///
325 /// Returns a store-specific error when loading fails.
326 #[allow(clippy::type_complexity)]
327 fn load_events<'a>(
328 &'a self,
329 filters: &'a [EventFilter<Self::Id, Self::Position>],
330 ) -> impl Future<
331 Output = LoadEventsResult<
332 Self::Id,
333 Self::Position,
334 Self::Data,
335 Self::Metadata,
336 Self::Error,
337 >,
338 > + Send
339 + 'a;
340}
341
342/// Marker trait for stores that provide globally ordered positions.
343///
344/// Projection snapshots require this guarantee.
345pub trait GloballyOrderedStore: EventStore {}
346// ANCHOR_END: event_store_trait
347
348#[derive(Clone, Debug, PartialEq, Eq, Hash)]
349pub(crate) struct StreamKey<Id> {
350 aggregate_kind: String,
351 aggregate_id: Id,
352}
353
354impl<Id> StreamKey<Id> {
355 pub(crate) fn new(aggregate_kind: impl Into<String>, aggregate_id: Id) -> Self {
356 Self {
357 aggregate_kind: aggregate_kind.into(),
358 aggregate_id,
359 }
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366
367 #[test]
368 fn event_filter_for_event_is_unrestricted() {
369 let filter: EventFilter<String> = EventFilter::for_event("my-event");
370 assert_eq!(filter.event_kind, "my-event");
371 assert_eq!(filter.aggregate_kind, None);
372 assert_eq!(filter.aggregate_id, None);
373 assert_eq!(filter.after_position, None);
374 }
375
376 #[test]
377 fn event_filter_for_aggregate_is_restricted() {
378 let filter: EventFilter<String> =
379 EventFilter::for_aggregate("my-event", "my-aggregate", "123");
380 assert_eq!(filter.event_kind, "my-event");
381 assert_eq!(filter.aggregate_kind.as_deref(), Some("my-aggregate"));
382 assert_eq!(filter.aggregate_id.as_deref(), Some("123"));
383 assert_eq!(filter.after_position, None);
384 }
385
386 #[test]
387 fn event_filter_after_sets_after_position() {
388 let filter: EventFilter<String, u64> = EventFilter::for_event("e").after(10);
389 assert_eq!(filter.after_position, Some(10));
390 }
391}