eventually_core/
store.rs

1//! Contains the Event Store trait for storing and streaming Aggregate [`Event`]s.
2//!
3//! [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
4
5use std::ops::Deref;
6
7use futures::future::BoxFuture;
8use futures::stream::BoxStream;
9
10#[cfg(feature = "serde")]
11use serde::{Deserialize, Serialize};
12
13use crate::versioning::Versioned;
14
15/// Selection operation for the events to capture in an [`EventStream`].
16///
17/// [`EventStream`]: type.EventStream.html
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum Select {
20    /// To return all the [`Event`]s in the [`EventStream`].
21    ///
22    /// [`Event`]: trait.EventStore.html#associatedtype.Event
23    /// [`EventStream`]: type.EventStream.html
24    All,
25
26    /// To return a slice of the [`EventStream`], starting from
27    /// those [`Event`]s with version **greater or equal** than
28    /// the one specified in this variant.
29    ///
30    /// [`Event`]: trait.EventStore.html#associatedtype.Event
31    /// [`EventStream`]: type.EventStream.html
32    From(u32),
33}
34
35/// Specifies the optimistic locking level when performing [`append`] from
36/// an [`EventStore`].
37///
38/// Check out [`append`] documentation for more info.
39///
40/// [`append`]: trait.EventStore.html#method.append
41/// [`EventStore`]: trait.EventStore.html
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum Expected {
44    /// Append events disregarding the current [`Aggregate`] version.
45    ///
46    /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
47    Any,
48
49    /// Append events only if the current version of the [`Aggregate`]
50    /// is the one specified by the value provided here.
51    ///
52    /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
53    Exact(u32),
54}
55
56/// Stream type returned by the [`EventStore::stream`] method.
57///
58/// [`EventStore::stream`]: trait.EventStore.html#method.stream
59pub type EventStream<'a, S> = BoxStream<
60    'a,
61    Result<
62        Persisted<<S as EventStore>::SourceId, <S as EventStore>::Event>,
63        <S as EventStore>::Error,
64    >,
65>;
66
67/// Error type returned by [`append`] in [`EventStore`] implementations.
68///
69/// [`append`]: trait.EventStore.html#method.append
70/// [`EventStore`]: trait.EventStore.html
71pub trait AppendError: std::error::Error {
72    /// Returns true if the error is due to a version conflict
73    /// during [`append`].
74    ///
75    /// [`append`]: trait.EventStore.html#method.append
76    fn is_conflict_error(&self) -> bool;
77}
78
79impl AppendError for std::convert::Infallible {
80    fn is_conflict_error(&self) -> bool {
81        false
82    }
83}
84
85/// An Event Store is an append-only, ordered list of [`Event`]s
86/// for a certain "source" -- e.g. an [`Aggregate`].
87///
88/// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
89/// [`Aggregate`]: ../aggregate/trait.Aggregate.html
90pub trait EventStore {
91    /// Type of the Source id, typically an [`AggregateId`].
92    ///
93    /// [`AggregateId`]: ../aggregate/type.AggregateId.html
94    type SourceId: Eq;
95
96    /// Event to be stored in the `EventStore`, typically an [`Aggregate::Event`].
97    ///
98    /// [`Aggregate::Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
99    type Event;
100
101    /// Possible errors returned by the `EventStore` when requesting operations.
102    type Error: AppendError;
103
104    /// Appends a new list of [`Event`]s to the Event Store, for the Source
105    /// entity specified by [`SourceId`].
106    ///
107    /// `append` is a transactional operation: it either appends all the events,
108    /// or none at all and returns an [`AppendError`].
109    ///
110    /// The desired version for the new [`Event`]s to append must be specified
111    /// through an [`Expected`] element.
112    ///
113    /// When using `Expected::Any`, no checks on the current [`Aggregate`]
114    /// values will be performed, disregarding optimistic locking.
115    ///
116    /// When using `Expected::Exact`, the Store will check that the current
117    /// version of the [`Aggregate`] is _exactly_ the one specified.
118    ///
119    /// If the version is not the one expected from the Store, implementations
120    /// should raise an [`AppendError::Conflict`] error.
121    ///
122    /// Implementations could decide to return an error if the expected
123    /// version is different from the one supplied in the method invocation.
124    ///
125    /// [`Event`]: trait.EventStore.html#associatedtype.Event
126    /// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId
127    /// [`AppendError`]: enum.AppendError.html
128    /// [`AppendError::Conflict`]: enum.AppendError.html
129    fn append(
130        &mut self,
131        source_id: Self::SourceId,
132        version: Expected,
133        events: Vec<Self::Event>,
134    ) -> BoxFuture<Result<u32, Self::Error>>;
135
136    /// Streams a list of [`Event`]s from the `EventStore` back to the application,
137    /// by specifying the desired [`SourceId`] and [`Select`] operation.
138    ///
139    /// [`SourceId`] will be used to request a particular `EventStream`.
140    ///
141    /// [`Select`] specifies the selection strategy for the [`Event`]s
142    /// in the returned [`EventStream`]: take a look at type documentation
143    /// for all the available options.
144    ///
145    /// [`Event`]: trait.EventStore.html#associatedtype.Event
146    /// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId
147    /// [`Select`]: enum.Select.html
148    /// [`EventStream`]: type.EventStream.html
149    fn stream(
150        &self,
151        source_id: Self::SourceId,
152        select: Select,
153    ) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;
154
155    /// Streams a list of [`Event`]s from the `EventStore` back to the application,
156    /// disregarding the [`SourceId`] values but using a [`Select`] operation.
157    ///
158    /// [`SourceId`] will be used to request a particular `EventStream`.
159    ///
160    /// [`Select`] specifies the selection strategy for the [`Event`]s
161    /// in the returned [`EventStream`]: take a look at type documentation
162    /// for all the available options.
163    ///
164    /// [`Event`]: trait.EventStore.html#associatedtype.Event
165    /// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId
166    /// [`Select`]: enum.Select.html
167    /// [`EventStream`]: type.EventStream.html
168    fn stream_all(&self, select: Select) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;
169
170    /// Drops all the [`Event`]s related to one `Source`, specified by
171    /// the provided [`SourceId`].
172    ///
173    /// [`Event`]: trait.EventStore.html#associatedtype.Event
174    /// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId
175    fn remove(&mut self, source_id: Self::SourceId) -> BoxFuture<Result<(), Self::Error>>;
176}
177
178/// An [`Event`] wrapper for events that have been
179/// successfully committed to the [`EventStore`].
180///
181/// [`EventStream`]s are composed of these events.
182///
183/// [`Event`]: trait.EventStore.html#associatedtype.Event
184/// [`EventStream`]: type.EventStream.html
185#[derive(Debug, Clone, PartialEq, Eq)]
186#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
187pub struct Persisted<SourceId, T> {
188    source_id: SourceId,
189    version: u32,
190    sequence_number: u32,
191    #[cfg_attr(feature = "serde", serde(flatten))]
192    event: T,
193}
194
195impl<SourceId, T> Versioned for Persisted<SourceId, T> {
196    #[inline]
197    fn version(&self) -> u32 {
198        self.version
199    }
200}
201
202impl<SourceId, T> Deref for Persisted<SourceId, T> {
203    type Target = T;
204
205    fn deref(&self) -> &Self::Target {
206        &self.event
207    }
208}
209
210impl<SourceId, T> Persisted<SourceId, T> {
211    /// Creates a new [`EventBuilder`] from the provided Event value.
212    ///
213    /// [`EventBuilder`]: persistent/struct.EventBuilder.html
214    #[inline]
215    pub fn from(source_id: SourceId, event: T) -> persistent::EventBuilder<SourceId, T> {
216        persistent::EventBuilder { source_id, event }
217    }
218
219    /// Returns the event sequence number.
220    #[inline]
221    pub fn sequence_number(&self) -> u32 {
222        self.sequence_number
223    }
224
225    /// Returns the [`SourceId`] of the persisted event.
226    ///
227    /// [`SourceId`]: trait.EventStore.html#associatedType.SourceId
228    #[inline]
229    pub fn source_id(&self) -> &SourceId {
230        &self.source_id
231    }
232
233    /// Unwraps the inner [`Event`] from the `Persisted` wrapper.
234    ///
235    /// [`Event`]: trait.EventStore.html#associatedtype.Event
236    #[inline]
237    pub fn take(self) -> T {
238        self.event
239    }
240}
241
242/// Contains a type-state builder for [`PersistentEvent`] type.
243///
244/// [`PersistentEvent`]: struct.Persisted.html
245pub mod persistent {
246    /// Creates a new [`Persisted`] by wrapping an Event value.
247    ///
248    /// [`PersistentEvent`]: ../struct.Persisted.html
249    pub struct EventBuilder<SourceId, T> {
250        pub(super) event: T,
251        pub(super) source_id: SourceId,
252    }
253
254    impl<SourceId, T> From<(SourceId, T)> for EventBuilder<SourceId, T> {
255        #[inline]
256        fn from(value: (SourceId, T)) -> Self {
257            let (source_id, event) = value;
258            Self { source_id, event }
259        }
260    }
261
262    impl<SourceId, T> EventBuilder<SourceId, T> {
263        /// Specifies the [`PersistentEvent`] version and moves to the next
264        /// builder state.
265        ///
266        /// [`PersistentEvent`]: ../struct.Persisted.html
267        #[inline]
268        pub fn version(self, value: u32) -> EventBuilderWithVersion<SourceId, T> {
269            EventBuilderWithVersion {
270                version: value,
271                event: self.event,
272                source_id: self.source_id,
273            }
274        }
275
276        /// Specifies the [`PersistentEvent`] sequence number and moves to the next
277        /// builder state.
278        ///
279        /// [`PersistentEvent`]: ../struct.Persisted.html
280        #[inline]
281        pub fn sequence_number(self, value: u32) -> EventBuilderWithSequenceNumber<SourceId, T> {
282            EventBuilderWithSequenceNumber {
283                sequence_number: value,
284                event: self.event,
285                source_id: self.source_id,
286            }
287        }
288    }
289
290    /// Next step in creating a new [`Persisted`] carrying an Event value
291    /// and its version.
292    ///
293    /// [`PersistentEvent`]: ../struct.Persisted.html
294    pub struct EventBuilderWithVersion<SourceId, T> {
295        version: u32,
296        event: T,
297        source_id: SourceId,
298    }
299
300    impl<SourceId, T> EventBuilderWithVersion<SourceId, T> {
301        /// Specifies the [`PersistentEvent`] sequence number and moves to the next
302        /// builder state.
303        ///
304        /// [`PersistentEvent`]: ../struct.Persisted.html
305        #[inline]
306        pub fn sequence_number(self, value: u32) -> super::Persisted<SourceId, T> {
307            super::Persisted {
308                version: self.version,
309                event: self.event,
310                source_id: self.source_id,
311                sequence_number: value,
312            }
313        }
314    }
315
316    /// Next step in creating a new [`Persisted`] carrying an Event value
317    /// and its sequence number.
318    ///
319    /// [`PersistentEvent`]: ../struct.Persisted.html
320    pub struct EventBuilderWithSequenceNumber<SourceId, T> {
321        sequence_number: u32,
322        event: T,
323        source_id: SourceId,
324    }
325
326    impl<SourceId, T> EventBuilderWithSequenceNumber<SourceId, T> {
327        /// Specifies the [`PersistentEvent`] version and moves to the next
328        /// builder state.
329        ///
330        /// [`PersistentEvent`]: ../struct.Persisted.html
331        #[inline]
332        pub fn version(self, value: u32) -> super::Persisted<SourceId, T> {
333            super::Persisted {
334                version: value,
335                event: self.event,
336                source_id: self.source_id,
337                sequence_number: self.sequence_number,
338            }
339        }
340    }
341}