Skip to main content

sourcery_core/
store.rs

1//! Persistence layer abstractions.
2//!
3//! This module describes the storage contract ([`EventStore`]), wire formats,
4//! and a reference in-memory implementation. Filters and positions live here
5//! to keep storage concerns together.
6//!
7//! # Event Lifecycle
8//!
9//! Events flow through a simple lifecycle during persistence:
10//!
11//! ```text
12//! DomainEvent ──commit_events()──▶ Database
13//!                                      │
14//! DomainEvent ◀──decode_event()── StoredEvent ◀──load_events()─┘
15//! ```
16//!
17//! [`StoredEvent`] contains the serialised event data plus store-assigned
18//! metadata (position, aggregate info). Use [`EventStore::decode_event`] to
19//! deserialise back to a domain event.
20use std::future::Future;
21
22pub use nonempty::NonEmpty;
23use thiserror::Error;
24
25use crate::concurrency::ConcurrencyConflict;
26
27pub mod inmemory;
28
29/// An event loaded from the store.
30///
31/// Contains the serialised event data plus store-assigned metadata. Returned by
32/// [`EventStore::load_events`]. Use [`EventStore::decode_event`] to deserialise
33/// the `data` field back into a domain event.
34///
35/// # Type Parameters
36///
37/// - `Id`: The aggregate identifier type
38/// - `Pos`: The position type used for ordering
39/// - `Data`: The serialised event payload type (e.g., `serde_json::Value`)
40/// - `M`: The metadata type
41#[derive(Clone, Debug)]
42pub struct StoredEvent<Id, Pos, Data, M> {
43    /// The aggregate type identifier (e.g., `"account"`).
44    pub aggregate_kind: String,
45    /// The aggregate instance identifier.
46    pub aggregate_id: Id,
47    /// The event type identifier (e.g., `"account.deposited"`).
48    pub kind: String,
49    /// The global position assigned by the store.
50    pub position: Pos,
51    /// The serialised event payload.
52    pub data: Data,
53    /// Infrastructure metadata (timestamps, causation IDs, etc.).
54    pub metadata: M,
55}
56
57impl<Id, Pos, Data, M> StoredEvent<Id, Pos, Data, M> {
58    /// Returns the aggregate type identifier.
59    #[inline]
60    pub fn aggregate_kind(&self) -> &str {
61        &self.aggregate_kind
62    }
63
64    /// Returns a reference to the aggregate instance identifier.
65    #[inline]
66    pub const fn aggregate_id(&self) -> &Id {
67        &self.aggregate_id
68    }
69
70    /// Returns the event type identifier.
71    #[inline]
72    pub fn kind(&self) -> &str {
73        &self.kind
74    }
75
76    /// Returns a reference to the metadata.
77    #[inline]
78    pub const fn metadata(&self) -> &M {
79        &self.metadata
80    }
81}
82
83impl<Id, Pos: Clone, Data, M> StoredEvent<Id, Pos, Data, M> {
84    /// Returns a copy of the position.
85    #[inline]
86    pub fn position(&self) -> Pos {
87        self.position.clone()
88    }
89}
90
91/// Filter describing which events should be loaded from the store.
92#[derive(Clone, Debug, PartialEq, Eq)]
93pub struct EventFilter<Id, Pos = ()> {
94    pub event_kind: String,
95    pub aggregate_kind: Option<String>,
96    pub aggregate_id: Option<Id>,
97    /// Only load events with position strictly greater than this value.
98    /// Used for snapshot-based loading to skip already-applied events.
99    pub after_position: Option<Pos>,
100}
101
102impl<Id, Pos> EventFilter<Id, Pos> {
103    /// Load all events of the specified kind across every aggregate.
104    #[must_use]
105    pub fn for_event(kind: impl Into<String>) -> Self {
106        Self {
107            event_kind: kind.into(),
108            aggregate_kind: None,
109            aggregate_id: None,
110            after_position: None,
111        }
112    }
113
114    /// Load events of the specified kind for a single aggregate instance.
115    #[must_use]
116    pub fn for_aggregate(
117        event_kind: impl Into<String>,
118        aggregate_kind: impl Into<String>,
119        aggregate_id: impl Into<Id>,
120    ) -> Self {
121        Self {
122            event_kind: event_kind.into(),
123            aggregate_kind: Some(aggregate_kind.into()),
124            aggregate_id: Some(aggregate_id.into()),
125            after_position: None,
126        }
127    }
128
129    /// Only load events with position strictly greater than the given value.
130    ///
131    /// This is used for snapshot-based loading: load a snapshot at position N,
132    /// then load events with `after(N)` to get only the events that occurred
133    /// after the snapshot was taken.
134    #[must_use]
135    pub fn after(mut self, position: Pos) -> Self {
136        self.after_position = Some(position);
137        self
138    }
139}
140
141/// Error from commit operations without version checking.
142#[derive(Debug, Error)]
143pub enum CommitError<StoreError>
144where
145    StoreError: std::error::Error,
146{
147    /// Failed to serialise an event.
148    #[error("failed to serialize event at index {index}")]
149    Serialization {
150        index: usize,
151        #[source]
152        source: StoreError,
153    },
154    /// Underlying store error.
155    #[error("store error: {0}")]
156    Store(#[source] StoreError),
157}
158
159/// Error from commit operations with optimistic concurrency checking.
160#[derive(Debug, Error)]
161pub enum OptimisticCommitError<Pos, StoreError>
162where
163    Pos: std::fmt::Debug,
164    StoreError: std::error::Error,
165{
166    /// Failed to serialise an event.
167    #[error("failed to serialize event at index {index}")]
168    Serialization {
169        index: usize,
170        #[source]
171        source: StoreError,
172    },
173    /// Concurrency conflict - another writer modified the stream.
174    #[error(transparent)]
175    Conflict(#[from] ConcurrencyConflict<Pos>),
176    /// Underlying store error.
177    #[error("store error: {0}")]
178    Store(#[source] StoreError),
179}
180
181/// Successful commit of events to a stream.
182#[derive(Clone, Copy, Debug, Eq, PartialEq)]
183pub struct Committed<Pos> {
184    /// Position of the last event written in the batch.
185    pub last_position: Pos,
186}
187
188/// A vector of stored events loaded from an event store.
189pub type StoredEvents<Id, Pos, Data, M> = Vec<StoredEvent<Id, Pos, Data, M>>;
190
191/// Result type for event loading operations.
192pub type LoadEventsResult<Id, Pos, Data, M, E> = Result<StoredEvents<Id, Pos, Data, M>, E>;
193
194/// Abstraction over the persistence layer for event streams.
195///
196/// This trait supports both stream-based and type-partitioned storage
197/// implementations. Stores own event serialisation/deserialisation.
198///
199/// Associated types allow stores to customise their behaviour:
200/// - `Id`: Aggregate identifier type
201/// - `Position`: Ordering strategy (`()` for stream-based, `u64` for global
202///   ordering)
203/// - `Metadata`: Infrastructure metadata type (timestamps, causation tracking,
204///   etc.)
205/// - `Data`: Serialised event payload type (e.g., `serde_json::Value` for JSON)
206// ANCHOR: event_store_trait
207pub trait EventStore: Send + Sync {
208    /// Aggregate identifier type.
209    ///
210    /// This type must be clonable so repositories can reuse IDs across calls.
211    /// Common choices: `String`, `Uuid`, or custom ID types.
212    type Id: Clone + Send + Sync + 'static;
213
214    /// Position type used for ordering events and version checking.
215    ///
216    /// Must be `Clone + PartialEq` to support optimistic concurrency.
217    /// Use `()` if ordering is not needed.
218    type Position: Clone + PartialEq + std::fmt::Debug + Send + Sync + 'static;
219
220    /// Store-specific error type.
221    type Error: std::error::Error + Send + Sync + 'static;
222
223    /// Metadata type for infrastructure concerns.
224    type Metadata: Send + Sync + 'static;
225
226    /// Serialised event payload type.
227    ///
228    /// This is the format used to store event data internally. Common choices:
229    /// - `serde_json::Value` for JSON-based stores
230    /// - `Vec<u8>` for binary stores
231    type Data: Clone + Send + Sync + 'static;
232
233    /// Decode a stored event into a concrete event type.
234    ///
235    /// Deserialises the `data` field of a [`StoredEvent`] back into a domain
236    /// event.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if deserialisation fails.
241    fn decode_event<E>(
242        &self,
243        stored: &StoredEvent<Self::Id, Self::Position, Self::Data, Self::Metadata>,
244    ) -> Result<E, Self::Error>
245    where
246        E: crate::event::DomainEvent + serde::de::DeserializeOwned;
247
248    /// Get the current version (latest position) for an aggregate stream.
249    ///
250    /// Returns `None` for streams with no events.
251    ///
252    /// # Errors
253    ///
254    /// Returns a store-specific error when the operation fails.
255    fn stream_version<'a>(
256        &'a self,
257        aggregate_kind: &'a str,
258        aggregate_id: &'a Self::Id,
259    ) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;
260
261    /// Commit events to an aggregate stream without version checking.
262    ///
263    /// Events are serialised and persisted atomically. No conflict detection
264    /// is performed (last-writer-wins).
265    ///
266    /// # Errors
267    ///
268    /// Returns [`CommitError::Serialization`] if an event fails to serialise,
269    /// or [`CommitError::Store`] if persistence fails.
270    fn commit_events<'a, E>(
271        &'a self,
272        aggregate_kind: &'a str,
273        aggregate_id: &'a Self::Id,
274        events: NonEmpty<E>,
275        metadata: &'a Self::Metadata,
276    ) -> impl Future<Output = Result<Committed<Self::Position>, CommitError<Self::Error>>> + Send + 'a
277    where
278        E: crate::event::EventKind + serde::Serialize + Send + Sync + 'a,
279        Self::Metadata: Clone;
280
281    /// Commit events to an aggregate stream with optimistic concurrency
282    /// control.
283    ///
284    /// Events are serialised and persisted atomically. The commit fails if:
285    /// - `expected_version` is `Some(v)` and the current version differs from
286    ///   `v`
287    /// - `expected_version` is `None` and the stream already has events (new
288    ///   aggregate expected)
289    ///
290    /// # Errors
291    ///
292    /// Returns [`OptimisticCommitError::Serialization`] if an event fails to
293    /// serialise, [`OptimisticCommitError::Conflict`] if the version check
294    /// fails, or [`OptimisticCommitError::Store`] if persistence fails.
295    #[allow(clippy::type_complexity)]
296    fn commit_events_optimistic<'a, E>(
297        &'a self,
298        aggregate_kind: &'a str,
299        aggregate_id: &'a Self::Id,
300        expected_version: Option<Self::Position>,
301        events: NonEmpty<E>,
302        metadata: &'a Self::Metadata,
303    ) -> impl Future<
304        Output = Result<
305            Committed<Self::Position>,
306            OptimisticCommitError<Self::Position, Self::Error>,
307        >,
308    > + Send
309    + 'a
310    where
311        E: crate::event::EventKind + serde::Serialize + Send + Sync + 'a,
312        Self::Metadata: Clone;
313
314    /// Load events matching the specified filters.
315    ///
316    /// Each filter describes an event kind and optional aggregate identity:
317    /// - [`EventFilter::for_event`] loads every event of the given kind
318    /// - [`EventFilter::for_aggregate`] narrows to a single aggregate instance
319    ///
320    /// The store optimises based on its storage model and returns events
321    /// merged by position (if positions are available).
322    ///
323    /// # Errors
324    ///
325    /// Returns a store-specific error when loading fails.
326    #[allow(clippy::type_complexity)]
327    fn load_events<'a>(
328        &'a self,
329        filters: &'a [EventFilter<Self::Id, Self::Position>],
330    ) -> impl Future<
331        Output = LoadEventsResult<
332            Self::Id,
333            Self::Position,
334            Self::Data,
335            Self::Metadata,
336            Self::Error,
337        >,
338    > + Send
339    + 'a;
340}
341
342/// Marker trait for stores that provide globally ordered positions.
343///
344/// Projection snapshots require this guarantee.
345pub trait GloballyOrderedStore: EventStore {}
346// ANCHOR_END: event_store_trait
347
348#[derive(Clone, Debug, PartialEq, Eq, Hash)]
349pub(crate) struct StreamKey<Id> {
350    aggregate_kind: String,
351    aggregate_id: Id,
352}
353
354impl<Id> StreamKey<Id> {
355    pub(crate) fn new(aggregate_kind: impl Into<String>, aggregate_id: Id) -> Self {
356        Self {
357            aggregate_kind: aggregate_kind.into(),
358            aggregate_id,
359        }
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366
367    #[test]
368    fn event_filter_for_event_is_unrestricted() {
369        let filter: EventFilter<String> = EventFilter::for_event("my-event");
370        assert_eq!(filter.event_kind, "my-event");
371        assert_eq!(filter.aggregate_kind, None);
372        assert_eq!(filter.aggregate_id, None);
373        assert_eq!(filter.after_position, None);
374    }
375
376    #[test]
377    fn event_filter_for_aggregate_is_restricted() {
378        let filter: EventFilter<String> =
379            EventFilter::for_aggregate("my-event", "my-aggregate", "123");
380        assert_eq!(filter.event_kind, "my-event");
381        assert_eq!(filter.aggregate_kind.as_deref(), Some("my-aggregate"));
382        assert_eq!(filter.aggregate_id.as_deref(), Some("123"));
383        assert_eq!(filter.after_position, None);
384    }
385
386    #[test]
387    fn event_filter_after_sets_after_position() {
388        let filter: EventFilter<String, u64> = EventFilter::for_event("e").after(10);
389        assert_eq!(filter.after_position, Some(10));
390    }
391}