1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
//! Contains the Event Store trait for storing and streaming Aggregate [`Event`]s.
//!
//! [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event

use std::ops::Deref;

use futures::future::BoxFuture;
use futures::stream::BoxStream;

#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

use crate::versioning::Versioned;

/// Selection operation for the events to capture in an [`EventStream`].
///
/// [`EventStream`]: type.EventStream.html
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Select {
    /// To return all the [`Event`]s in the [`EventStream`].
    ///
    /// [`Event`]: trait.EventStore.html#associatedtype.Event
    /// [`EventStream`]: type.EventStream.html
    All,

    /// To return a slice of the [`EventStream`], starting from
    /// those [`Event`]s with version **greater or equal** than
    /// the one specified in this variant.
    ///
    /// [`Event`]: trait.EventStore.html#associatedtype.Event
    /// [`EventStream`]: type.EventStream.html
    From(u32),
}

/// Specifies the optimistic locking level when performing [`append`] from
/// an [`EventStore`].
///
/// Check out [`append`] documentation for more info.
///
/// [`append`]: trait.EventStore.html#method.append
/// [`EventStore`]: trait.EventStore.html
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Expected {
    /// Append events disregarding the current [`Aggregate`] version.
    ///
    /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
    Any,

    /// Append events only if the current version of the [`Aggregate`]
    /// is the one specified by the value provided here.
    ///
    /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
    Exact(u32),
}

/// Stream type returned by the [`EventStore::stream`] method.
///
/// [`EventStore::stream`]: trait.EventStore.html#method.stream
pub type EventStream<'a, S> = BoxStream<
    'a,
    Result<
        Persisted<<S as EventStore>::SourceId, <S as EventStore>::Event>,
        <S as EventStore>::Error,
    >,
>;

/// Error type returned by [`append`] in [`EventStore`] implementations.
///
/// [`append`]: trait.EventStore.html#method.append
/// [`EventStore`]: trait.EventStore.html
pub trait AppendError: std::error::Error {
    /// Returns true if the error is due to a version conflict
    /// during [`append`].
    ///
    /// [`append`]: trait.EventStore.html#method.append
    fn is_conflict_error(&self) -> bool;
}

impl AppendError for std::convert::Infallible {
    fn is_conflict_error(&self) -> bool {
        false
    }
}

/// An Event Store is an append-only, ordered list of [`Event`]s
/// for a certain "source" -- e.g. an [`Aggregate`].
///
/// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
/// [`Aggregate`]: ../aggregate/trait.Aggregate.html
pub trait EventStore {
    /// Type of the Source id, typically an [`AggregateId`].
    ///
    /// [`AggregateId`]: ../aggregate/type.AggregateId.html
    type SourceId: Eq;

    /// Event to be stored in the `EventStore`, typically an [`Aggregate::Event`].
    ///
    /// [`Aggregate::Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
    type Event;

    /// Possible errors returned by the `EventStore` when requesting operations.
    type Error: AppendError;

    /// Appends a new list of [`Event`]s to the Event Store, for the Source
    /// entity specified by [`SourceId`].
    ///
    /// `append` is a transactional operation: it either appends all the events,
    /// or none at all and returns an [`AppendError`].
    ///
    /// The desired version for the new [`Event`]s to append must be specified
    /// through an [`Expected`] element.
    ///
    /// When using `Expected::Any`, no checks on the current [`Aggregate`]
    /// values will be performed, disregarding optimistic locking.
    ///
    /// When using `Expected::Exact`, the Store will check that the current
    /// version of the [`Aggregate`] is _exactly_ the one specified.
    ///
    /// If the version is not the one expected from the Store, implementations
    /// should raise an [`AppendError::Conflict`] error.
    ///
    /// Implementations could decide to return an error if the expected
    /// version is different from the one supplied in the method invocation.
    ///
    /// [`Event`]: trait.EventStore.html#associatedtype.Event
    /// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId
    /// [`AppendError`]: enum.AppendError.html
    /// [`AppendError::Conflict`]: enum.AppendError.html
    fn append(
        &mut self,
        source_id: Self::SourceId,
        version: Expected,
        events: Vec<Self::Event>,
    ) -> BoxFuture<Result<u32, Self::Error>>;

    /// Streams a list of [`Event`]s from the `EventStore` back to the application,
    /// by specifying the desired [`SourceId`] and [`Select`] operation.
    ///
    /// [`SourceId`] will be used to request a particular `EventStream`.
    ///
    /// [`Select`] specifies the selection strategy for the [`Event`]s
    /// in the returned [`EventStream`]: take a look at type documentation
    /// for all the available options.
    ///
    /// [`Event`]: trait.EventStore.html#associatedtype.Event
    /// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId
    /// [`Select`]: enum.Select.html
    /// [`EventStream`]: type.EventStream.html
    fn stream(
        &self,
        source_id: Self::SourceId,
        select: Select,
    ) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;

    /// Streams a list of [`Event`]s from the `EventStore` back to the application,
    /// disregarding the [`SourceId`] values but using a [`Select`] operation.
    ///
    /// [`SourceId`] will be used to request a particular `EventStream`.
    ///
    /// [`Select`] specifies the selection strategy for the [`Event`]s
    /// in the returned [`EventStream`]: take a look at type documentation
    /// for all the available options.
    ///
    /// [`Event`]: trait.EventStore.html#associatedtype.Event
    /// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId
    /// [`Select`]: enum.Select.html
    /// [`EventStream`]: type.EventStream.html
    fn stream_all(&self, select: Select) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;

    /// Drops all the [`Event`]s related to one `Source`, specified by
    /// the provided [`SourceId`].
    ///
    /// [`Event`]: trait.EventStore.html#associatedtype.Event
    /// [`SourceId`]: trait.EventStore.html#associatedtype.SourceId
    fn remove(&mut self, source_id: Self::SourceId) -> BoxFuture<Result<(), Self::Error>>;
}

/// An [`Event`] wrapper for events that have been
/// successfully committed to the [`EventStore`].
///
/// [`EventStream`]s are composed of these events.
///
/// [`Event`]: trait.EventStore.html#associatedtype.Event
/// [`EventStream`]: type.EventStream.html
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Persisted<SourceId, T> {
    source_id: SourceId,
    version: u32,
    sequence_number: u32,
    #[cfg_attr(feature = "serde", serde(flatten))]
    event: T,
}

impl<SourceId, T> Versioned for Persisted<SourceId, T> {
    #[inline]
    fn version(&self) -> u32 {
        self.version
    }
}

impl<SourceId, T> Deref for Persisted<SourceId, T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        &self.event
    }
}

impl<SourceId, T> Persisted<SourceId, T> {
    /// Creates a new [`EventBuilder`] from the provided Event value.
    ///
    /// [`EventBuilder`]: persistent/struct.EventBuilder.html
    #[inline]
    pub fn from(source_id: SourceId, event: T) -> persistent::EventBuilder<SourceId, T> {
        persistent::EventBuilder { source_id, event }
    }

    /// Returns the event sequence number.
    #[inline]
    pub fn sequence_number(&self) -> u32 {
        self.sequence_number
    }

    /// Returns the [`SourceId`] of the persisted event.
    ///
    /// [`SourceId`]: trait.EventStore.html#associatedType.SourceId
    #[inline]
    pub fn source_id(&self) -> &SourceId {
        &self.source_id
    }

    /// Unwraps the inner [`Event`] from the `Persisted` wrapper.
    ///
    /// [`Event`]: trait.EventStore.html#associatedtype.Event
    #[inline]
    pub fn take(self) -> T {
        self.event
    }
}

/// Contains a type-state builder for [`PersistentEvent`] type.
///
/// [`PersistentEvent`]: struct.Persisted.html
pub mod persistent {
    /// Creates a new [`Persisted`] by wrapping an Event value.
    ///
    /// [`PersistentEvent`]: ../struct.Persisted.html
    pub struct EventBuilder<SourceId, T> {
        pub(super) event: T,
        pub(super) source_id: SourceId,
    }

    impl<SourceId, T> From<(SourceId, T)> for EventBuilder<SourceId, T> {
        #[inline]
        fn from(value: (SourceId, T)) -> Self {
            let (source_id, event) = value;
            Self { source_id, event }
        }
    }

    impl<SourceId, T> EventBuilder<SourceId, T> {
        /// Specifies the [`PersistentEvent`] version and moves to the next
        /// builder state.
        ///
        /// [`PersistentEvent`]: ../struct.Persisted.html
        #[inline]
        pub fn version(self, value: u32) -> EventBuilderWithVersion<SourceId, T> {
            EventBuilderWithVersion {
                version: value,
                event: self.event,
                source_id: self.source_id,
            }
        }

        /// Specifies the [`PersistentEvent`] sequence number and moves to the next
        /// builder state.
        ///
        /// [`PersistentEvent`]: ../struct.Persisted.html
        #[inline]
        pub fn sequence_number(self, value: u32) -> EventBuilderWithSequenceNumber<SourceId, T> {
            EventBuilderWithSequenceNumber {
                sequence_number: value,
                event: self.event,
                source_id: self.source_id,
            }
        }
    }

    /// Next step in creating a new [`Persisted`] carrying an Event value
    /// and its version.
    ///
    /// [`PersistentEvent`]: ../struct.Persisted.html
    pub struct EventBuilderWithVersion<SourceId, T> {
        version: u32,
        event: T,
        source_id: SourceId,
    }

    impl<SourceId, T> EventBuilderWithVersion<SourceId, T> {
        /// Specifies the [`PersistentEvent`] sequence number and moves to the next
        /// builder state.
        ///
        /// [`PersistentEvent`]: ../struct.Persisted.html
        #[inline]
        pub fn sequence_number(self, value: u32) -> super::Persisted<SourceId, T> {
            super::Persisted {
                version: self.version,
                event: self.event,
                source_id: self.source_id,
                sequence_number: value,
            }
        }
    }

    /// Next step in creating a new [`Persisted`] carrying an Event value
    /// and its sequence number.
    ///
    /// [`PersistentEvent`]: ../struct.Persisted.html
    pub struct EventBuilderWithSequenceNumber<SourceId, T> {
        sequence_number: u32,
        event: T,
        source_id: SourceId,
    }

    impl<SourceId, T> EventBuilderWithSequenceNumber<SourceId, T> {
        /// Specifies the [`PersistentEvent`] version and moves to the next
        /// builder state.
        ///
        /// [`PersistentEvent`]: ../struct.Persisted.html
        #[inline]
        pub fn version(self, value: u32) -> super::Persisted<SourceId, T> {
            super::Persisted {
                version: value,
                event: self.event,
                source_id: self.source_id,
                sequence_number: self.sequence_number,
            }
        }
    }
}