cqrs_core/
types.rs

1use std::{fmt, num::NonZeroU64};
2
3/// Represents an event sequence number, starting at 1
4#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
5pub struct EventNumber(NonZeroU64);
6
7impl EventNumber {
8    /// The minimum [EventNumber].
9    #[allow(unsafe_code)]
10    pub const MIN_VALUE: EventNumber =
11        // One is absolutely non-zero, and this is required for this to be usable in a `const` context.
12        EventNumber(unsafe {NonZeroU64::new_unchecked(1)});
13
14    /// Attempts to create a new event number from a given number. Will return non if the given number is `0`.
15    #[inline]
16    pub fn new(x: u64) -> Option<Self> {
17        Some(EventNumber(NonZeroU64::new(x)?))
18    }
19
20    /// Extracts the event number as a plain `u64`.
21    #[inline]
22    pub fn get(self) -> u64 {
23        self.0.get()
24    }
25
26    /// Increments the event number to the next value.
27    #[inline]
28    pub fn incr(&mut self) {
29        self.0 = NonZeroU64::new(self.0.get() + 1).unwrap();
30    }
31
32    /// Gets the event number after the current one.
33    #[inline]
34    #[must_use]
35    pub fn next(self) -> Self {
36        let mut slf = self;
37        slf.0 = NonZeroU64::new(self.0.get() + 1).unwrap();
38        slf
39    }
40}
41
42impl fmt::Display for EventNumber {
43    #[inline]
44    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
45        fmt::Display::fmt(&self.0, f)
46    }
47}
48
49//impl PartialEq<usize> for EventNumber {
50//    #[inline]
51//    fn eq(&self, rhs: &usize) -> bool {
52//        *rhs == self.0
53//    }
54//}
55//
56//impl PartialEq<EventNumber> for usize {
57//    #[inline]
58//    fn eq(&self, rhs: &EventNumber) -> bool {
59//        rhs.0 == *self
60//    }
61//}
62
63/// An aggregate version.
64#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
65pub enum Version {
66    /// The version of an aggregate that has not had any events applied to it.
67    Initial,
68    /// The version of the last event applied to the aggregate.
69    Number(EventNumber),
70}
71
72impl Version {
73    /// Creates a new `Version` from a number.
74    ///
75    /// The number `0` gets interpreted as being `Version::Initial`, while any other number is interpreted as the
76    /// latest event number applied.
77    #[inline]
78    pub fn new(number: u64) -> Self {
79        NonZeroU64::new(number)
80            .map(EventNumber)
81            .map(Version::Number)
82            .unwrap_or(Version::Initial)
83    }
84
85    /// Increments the version number to the next in sequence.
86    #[inline]
87    pub fn incr(&mut self) {
88        match *self {
89            Version::Initial => *self = Version::Number(EventNumber::MIN_VALUE),
90            Version::Number(ref mut en) => en.incr(),
91        }
92    }
93
94    /// Returns the next event number in the sequence.
95    #[inline]
96    pub fn next_event(self) -> EventNumber {
97        match self {
98            Version::Initial => EventNumber::MIN_VALUE,
99            Version::Number(mut en) => {
100                en.incr();
101                en
102            }
103        }
104    }
105
106    /// Gets the version number as a raw `u64`.
107    #[inline]
108    pub fn get(self) -> u64 {
109        match self {
110            Version::Initial => 0,
111            Version::Number(en) => en.get(),
112        }
113    }
114
115    /// Gets the version number as an [EventNumber], returning `None` if the current verison is [Version::Initial].
116    #[inline]
117    pub fn event_number(self) -> Option<EventNumber> {
118        match self {
119            Version::Initial => None,
120            Version::Number(en) => Some(en),
121        }
122    }
123}
124
125impl fmt::Display for Version {
126    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
127        match *self {
128            Version::Initial => f.write_str("initial"),
129            Version::Number(ref event_number) => event_number.fmt(f),
130        }
131    }
132}
133
134impl Default for Version {
135    #[inline]
136    fn default() -> Self {
137        Version::Initial
138    }
139}
140
141//impl PartialEq<EventNumber> for Version {
142//    fn eq(&self, rhs: &EventNumber) -> bool {
143//        if let Version::Number(ref v) = *self {
144//            v == rhs
145//        } else {
146//            false
147//        }
148//    }
149//}
150
151impl From<EventNumber> for Version {
152    #[inline]
153    fn from(event_number: EventNumber) -> Self {
154        Version::Number(event_number)
155    }
156}
157
158impl ::std::ops::Sub for Version {
159    type Output = i64;
160
161    fn sub(self, rhs: Version) -> Self::Output {
162        let l = match self {
163            Version::Initial => 0,
164            Version::Number(n) => n.get() as i64,
165        };
166        let r = match rhs {
167            Version::Initial => 0,
168            Version::Number(n) => n.get() as i64,
169        };
170
171        l - r
172    }
173}
174
175/// A precondition that must be upheld for a command to be executed or for events to be persisted.
176#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
177pub enum Precondition {
178    /// Requires that the target aggregate must not have had any events previously applied to it.
179    New,
180    /// Requires that the target event at least exist, whether as a snapshot, or as having had at least one event applied.
181    Exists,
182    /// Requires that the target aggregate have the exact version specified.
183    ExpectedVersion(Version),
184}
185
186impl Precondition {
187    /// Verifies that the precondition holds, given the `current_version`. If the precondition is violated, an error is
188    /// returned with the precondition.
189    pub fn verify(self, current_version: Option<Version>) -> Result<(), Self> {
190        match (self, current_version) {
191            (Precondition::ExpectedVersion(Version::Initial), None) => Ok(()),
192            (Precondition::ExpectedVersion(Version::Initial), Some(Version::Initial)) => Ok(()),
193            (Precondition::ExpectedVersion(e), Some(x)) if e == x => Ok(()),
194            (Precondition::New, None) => Ok(()),
195            (Precondition::Exists, Some(_)) => Ok(()),
196            (precondition, _) => Err(precondition),
197        }
198    }
199}
200
201impl From<Version> for Precondition {
202    #[inline]
203    fn from(v: Version) -> Self {
204        Precondition::ExpectedVersion(v)
205    }
206}
207
208impl fmt::Display for Precondition {
209    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
210        match *self {
211            Precondition::Exists => f.write_str("expect aggregate exists"),
212            Precondition::New => f.write_str("expect aggregate does not exist"),
213            Precondition::ExpectedVersion(Version::Initial) => {
214                f.write_str("expect aggregate to exist in initial state")
215            }
216            Precondition::ExpectedVersion(Version::Number(v)) => {
217                write!(f, "expect aggregate to exist with version {}", v)
218            }
219        }
220    }
221}
222
223/// A structured tuple combining an event number and an event.
224#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
225pub struct VersionedEvent<E> {
226    /// The event number.
227    pub sequence: EventNumber,
228
229    /// The event.
230    pub event: E,
231}
232
233/// A structured tuple combining an event number and an event.
234#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
235pub struct VersionedEventWithMetadata<E, M> {
236    /// The event number.
237    pub sequence: EventNumber,
238
239    /// The event.
240    pub event: E,
241
242    /// The event metadata.
243    pub metadata: M,
244}
245
246/// A structured tuple combining an aggregate and its current version.
247#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
248pub struct VersionedAggregate<A> {
249    /// The current version of the aggregate.
250    pub version: Version,
251
252    /// The aggregate.
253    pub payload: A,
254}
255
256/// The starting point when reading a stream of values from an [EventSource].
257#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
258pub enum Before {
259    /// Begins reading events from the end of the stream.
260    EndOfStream,
261
262    /// Begins reading events before the given [EventNumber].
263    ///
264    /// E.g. if the event number were 4, then reading should begin at event number 3.
265    Event(EventNumber),
266}
267
268/// The starting point when reading a stream of values from an [EventSource].
269#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
270pub enum Since {
271    /// Begins reading events from the very beginning of the stream.
272    BeginningOfStream,
273
274    /// Begins reading events after the given [EventNumber].
275    ///
276    /// E.g. if the event number were 4, then reading should begin at event number 5.
277    Event(EventNumber),
278}
279
280impl From<Version> for Since {
281    fn from(v: Version) -> Self {
282        match v {
283            Version::Initial => Since::BeginningOfStream,
284            Version::Number(x) => Since::Event(x),
285        }
286    }
287}
288
289/// A recommendation on whether or not a snapshot should be persisted.
290#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
291pub enum SnapshotRecommendation {
292    /// Recommends that a snapshot be taken.
293    ShouldSnapshot,
294
295    /// Recommends that a snapshot should not be taken.
296    DoNotSnapshot,
297}
298
299/// Represents a common trait that all errors handled by CQRS should implement.
300pub trait CqrsError: fmt::Debug + fmt::Display + Send + Sync + 'static {}
301
302impl<T> CqrsError for T where T: fmt::Debug + fmt::Display + Send + Sync + 'static {}
303
304/// An owned, raw view of event data.
305#[derive(Clone, Debug, Hash, PartialEq, Eq)]
306pub struct RawEvent {
307    /// The event id.
308    pub event_id: EventNumber,
309    /// The aggregate type.
310    pub aggregate_type: String,
311    /// The entity id.
312    pub entity_id: String,
313    /// The sequence number of this event in the entity's event stream.
314    pub sequence: EventNumber,
315    /// The event type.
316    pub event_type: String,
317    /// The raw event payload.
318    pub payload: Vec<u8>,
319}
320
321/// An owned, raw view of event data.
322#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
323pub struct BorrowedRawEvent<'row> {
324    /// The event id.
325    pub event_id: EventNumber,
326    /// The aggregate type.
327    pub aggregate_type: &'row str,
328    /// The entity id.
329    pub entity_id: &'row str,
330    /// The sequence number of this event in the entity's event stream.
331    pub sequence: EventNumber,
332    /// The event type.
333    pub event_type: &'row str,
334    /// The raw event payload.
335    pub payload: &'row [u8],
336}