eventually_core/store.rs
1//! Contains the Event Store trait for storing and streaming Aggregate [`Event`]s.
2//!
3//! [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
4
5use std::ops::Deref;
6
7use futures::future::BoxFuture;
8use futures::stream::BoxStream;
9
10#[cfg(feature = "serde")]
11use serde::{Deserialize, Serialize};
12
13use crate::versioning::Versioned;
14
15/// Selection operation for the events to capture in an [`EventStream`].
16///
17/// [`EventStream`]: type.EventStream.html
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum Select {
20 /// To return all the [`Event`]s in the [`EventStream`].
21 ///
22 /// [`Event`]: trait.EventStore.html#associatedtype.Event
23 /// [`EventStream`]: type.EventStream.html
24 All,
25
26 /// To return a slice of the [`EventStream`], starting from
27 /// those [`Event`]s with version **greater or equal** than
28 /// the one specified in this variant.
29 ///
30 /// [`Event`]: trait.EventStore.html#associatedtype.Event
31 /// [`EventStream`]: type.EventStream.html
32 From(u32),
33}
34
35/// Specifies the optimistic locking level when performing [`append`] from
36/// an [`EventStore`].
37///
38/// Check out [`append`] documentation for more info.
39///
40/// [`append`]: trait.EventStore.html#method.append
41/// [`EventStore`]: trait.EventStore.html
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum Expected {
44 /// Append events disregarding the current [`Aggregate`] version.
45 ///
46 /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
47 Any,
48
49 /// Append events only if the current version of the [`Aggregate`]
50 /// is the one specified by the value provided here.
51 ///
52 /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
53 Exact(u32),
54}
55
56/// Stream type returned by the [`EventStore::stream`] method.
57///
58/// [`EventStore::stream`]: trait.EventStore.html#method.stream
59pub type EventStream<'a, S> = BoxStream<
60 'a,
61 Result<
62 Persisted<<S as EventStore>::SourceId, <S as EventStore>::Event>,
63 <S as EventStore>::Error,
64 >,
65>;
66
67/// Error type returned by [`append`] in [`EventStore`] implementations.
68///
69/// [`append`]: trait.EventStore.html#method.append
70/// [`EventStore`]: trait.EventStore.html
71pub trait AppendError: std::error::Error {
72 /// Returns true if the error is due to a version conflict
73 /// during [`append`].
74 ///
75 /// [`append`]: trait.EventStore.html#method.append
76 fn is_conflict_error(&self) -> bool;
77}
78
79impl AppendError for std::convert::Infallible {
80 fn is_conflict_error(&self) -> bool {
81 false
82 }
83}
84
85/// An Event Store is an append-only, ordered list of [`Event`]s
86/// for a certain "source" -- e.g. an [`Aggregate`].
87///
88/// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
89/// [`Aggregate`]: ../aggregate/trait.Aggregate.html
90pub trait EventStore {
91 /// Type of the Source id, typically an [`AggregateId`].
92 ///
93 /// [`AggregateId`]: ../aggregate/type.AggregateId.html
94 type SourceId: Eq;
95
96 /// Event to be stored in the `EventStore`, typically an [`Aggregate::Event`].
97 ///
98 /// [`Aggregate::Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
99 type Event;
100
101 /// Possible errors returned by the `EventStore` when requesting operations.
102 type Error: AppendError;
103
104 /// Appends a new list of [`Event`]s to the Event Store, for the Source
105 /// entity specified by [`SourceId`].
106 ///
107 /// `append` is a transactional operation: it either appends all the events,
108 /// or none at all and returns an [`AppendError`].
109 ///
110 /// The desired version for the new [`Event`]s to append must be specified
111 /// through an [`Expected`] element.
112 ///
113 /// When using `Expected::Any`, no checks on the current [`Aggregate`]
114 /// values will be performed, disregarding optimistic locking.
115 ///
116 /// When using `Expected::Exact`, the Store will check that the current
117 /// version of the [`Aggregate`] is _exactly_ the one specified.
118 ///
119 /// If the version is not the one expected from the Store, implementations
120 /// should raise an [`AppendError::Conflict`] error.
121 ///
122 /// Implementations could decide to return an error if the expected
123 /// version is different from the one supplied in the method invocation.
124 ///
125 /// [`Event`]: trait.EventStore.html#associatedtype.Event
126 /// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId
127 /// [`AppendError`]: enum.AppendError.html
128 /// [`AppendError::Conflict`]: enum.AppendError.html
129 fn append(
130 &mut self,
131 source_id: Self::SourceId,
132 version: Expected,
133 events: Vec<Self::Event>,
134 ) -> BoxFuture<Result<u32, Self::Error>>;
135
136 /// Streams a list of [`Event`]s from the `EventStore` back to the application,
137 /// by specifying the desired [`SourceId`] and [`Select`] operation.
138 ///
139 /// [`SourceId`] will be used to request a particular `EventStream`.
140 ///
141 /// [`Select`] specifies the selection strategy for the [`Event`]s
142 /// in the returned [`EventStream`]: take a look at type documentation
143 /// for all the available options.
144 ///
145 /// [`Event`]: trait.EventStore.html#associatedtype.Event
146 /// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId
147 /// [`Select`]: enum.Select.html
148 /// [`EventStream`]: type.EventStream.html
149 fn stream(
150 &self,
151 source_id: Self::SourceId,
152 select: Select,
153 ) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;
154
155 /// Streams a list of [`Event`]s from the `EventStore` back to the application,
156 /// disregarding the [`SourceId`] values but using a [`Select`] operation.
157 ///
158 /// [`SourceId`] will be used to request a particular `EventStream`.
159 ///
160 /// [`Select`] specifies the selection strategy for the [`Event`]s
161 /// in the returned [`EventStream`]: take a look at type documentation
162 /// for all the available options.
163 ///
164 /// [`Event`]: trait.EventStore.html#associatedtype.Event
165 /// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId
166 /// [`Select`]: enum.Select.html
167 /// [`EventStream`]: type.EventStream.html
168 fn stream_all(&self, select: Select) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;
169
170 /// Drops all the [`Event`]s related to one `Source`, specified by
171 /// the provided [`SourceId`].
172 ///
173 /// [`Event`]: trait.EventStore.html#associatedtype.Event
174 /// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId
175 fn remove(&mut self, source_id: Self::SourceId) -> BoxFuture<Result<(), Self::Error>>;
176}
177
178/// An [`Event`] wrapper for events that have been
179/// successfully committed to the [`EventStore`].
180///
181/// [`EventStream`]s are composed of these events.
182///
183/// [`Event`]: trait.EventStore.html#associatedtype.Event
184/// [`EventStream`]: type.EventStream.html
185#[derive(Debug, Clone, PartialEq, Eq)]
186#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
187pub struct Persisted<SourceId, T> {
188 source_id: SourceId,
189 version: u32,
190 sequence_number: u32,
191 #[cfg_attr(feature = "serde", serde(flatten))]
192 event: T,
193}
194
195impl<SourceId, T> Versioned for Persisted<SourceId, T> {
196 #[inline]
197 fn version(&self) -> u32 {
198 self.version
199 }
200}
201
202impl<SourceId, T> Deref for Persisted<SourceId, T> {
203 type Target = T;
204
205 fn deref(&self) -> &Self::Target {
206 &self.event
207 }
208}
209
210impl<SourceId, T> Persisted<SourceId, T> {
211 /// Creates a new [`EventBuilder`] from the provided Event value.
212 ///
213 /// [`EventBuilder`]: persistent/struct.EventBuilder.html
214 #[inline]
215 pub fn from(source_id: SourceId, event: T) -> persistent::EventBuilder<SourceId, T> {
216 persistent::EventBuilder { source_id, event }
217 }
218
219 /// Returns the event sequence number.
220 #[inline]
221 pub fn sequence_number(&self) -> u32 {
222 self.sequence_number
223 }
224
225 /// Returns the [`SourceId`] of the persisted event.
226 ///
227 /// [`SourceId`]: trait.EventStore.html#associatedType.SourceId
228 #[inline]
229 pub fn source_id(&self) -> &SourceId {
230 &self.source_id
231 }
232
233 /// Unwraps the inner [`Event`] from the `Persisted` wrapper.
234 ///
235 /// [`Event`]: trait.EventStore.html#associatedtype.Event
236 #[inline]
237 pub fn take(self) -> T {
238 self.event
239 }
240}
241
242/// Contains a type-state builder for [`PersistentEvent`] type.
243///
244/// [`PersistentEvent`]: struct.Persisted.html
245pub mod persistent {
246 /// Creates a new [`Persisted`] by wrapping an Event value.
247 ///
248 /// [`PersistentEvent`]: ../struct.Persisted.html
249 pub struct EventBuilder<SourceId, T> {
250 pub(super) event: T,
251 pub(super) source_id: SourceId,
252 }
253
254 impl<SourceId, T> From<(SourceId, T)> for EventBuilder<SourceId, T> {
255 #[inline]
256 fn from(value: (SourceId, T)) -> Self {
257 let (source_id, event) = value;
258 Self { source_id, event }
259 }
260 }
261
262 impl<SourceId, T> EventBuilder<SourceId, T> {
263 /// Specifies the [`PersistentEvent`] version and moves to the next
264 /// builder state.
265 ///
266 /// [`PersistentEvent`]: ../struct.Persisted.html
267 #[inline]
268 pub fn version(self, value: u32) -> EventBuilderWithVersion<SourceId, T> {
269 EventBuilderWithVersion {
270 version: value,
271 event: self.event,
272 source_id: self.source_id,
273 }
274 }
275
276 /// Specifies the [`PersistentEvent`] sequence number and moves to the next
277 /// builder state.
278 ///
279 /// [`PersistentEvent`]: ../struct.Persisted.html
280 #[inline]
281 pub fn sequence_number(self, value: u32) -> EventBuilderWithSequenceNumber<SourceId, T> {
282 EventBuilderWithSequenceNumber {
283 sequence_number: value,
284 event: self.event,
285 source_id: self.source_id,
286 }
287 }
288 }
289
290 /// Next step in creating a new [`Persisted`] carrying an Event value
291 /// and its version.
292 ///
293 /// [`PersistentEvent`]: ../struct.Persisted.html
294 pub struct EventBuilderWithVersion<SourceId, T> {
295 version: u32,
296 event: T,
297 source_id: SourceId,
298 }
299
300 impl<SourceId, T> EventBuilderWithVersion<SourceId, T> {
301 /// Specifies the [`PersistentEvent`] sequence number and moves to the next
302 /// builder state.
303 ///
304 /// [`PersistentEvent`]: ../struct.Persisted.html
305 #[inline]
306 pub fn sequence_number(self, value: u32) -> super::Persisted<SourceId, T> {
307 super::Persisted {
308 version: self.version,
309 event: self.event,
310 source_id: self.source_id,
311 sequence_number: value,
312 }
313 }
314 }
315
316 /// Next step in creating a new [`Persisted`] carrying an Event value
317 /// and its sequence number.
318 ///
319 /// [`PersistentEvent`]: ../struct.Persisted.html
320 pub struct EventBuilderWithSequenceNumber<SourceId, T> {
321 sequence_number: u32,
322 event: T,
323 source_id: SourceId,
324 }
325
326 impl<SourceId, T> EventBuilderWithSequenceNumber<SourceId, T> {
327 /// Specifies the [`PersistentEvent`] version and moves to the next
328 /// builder state.
329 ///
330 /// [`PersistentEvent`]: ../struct.Persisted.html
331 #[inline]
332 pub fn version(self, value: u32) -> super::Persisted<SourceId, T> {
333 super::Persisted {
334 version: value,
335 event: self.event,
336 source_id: self.source_id,
337 sequence_number: self.sequence_number,
338 }
339 }
340 }
341}