eventually_core/
repository.rs

1//! Contains the [Repository pattern] implementation for [`AggregateRoot`]
2//! instances.
3//!
4//! Check out [`Repository`] for more information.
5//!
6//! [`Repository`]: struct.Repository.html
7//! [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html
8//! [Repository pattern]: https://docs.microsoft.com/en-us/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/infrastructure-persistence-layer-design#the-repository-pattern
9
10use std::fmt::Debug;
11
12use futures::stream::TryStreamExt;
13
14use crate::aggregate::{Aggregate, AggregateRoot, AggregateRootBuilder};
15use crate::store::{EventStore, Expected, Select};
16use crate::versioning::Versioned;
17
18/// Error type returned by the [`Repository`].
19///
20/// [`Repository`]: trait.Repository.html
21#[derive(Debug, thiserror::Error)]
22pub enum Error<A, S>
23where
24    A: std::error::Error + 'static,
25    S: std::error::Error + 'static,
26{
27    /// Error returned by the [`Aggregate`], usually when recreating the [`State`].
28    ///
29    /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
30    /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State
31    #[error("failed to rebuild aggregate state: {0}")]
32    Aggregate(#[source] A),
33
34    /// Error returned by the underlying [`EventStore`].
35    ///
36    /// [`EventStore`]: ../store/trait.EventStore.html
37    #[error("event store failed: {0}")]
38    Store(#[source] S),
39}
40
41/// Result type returned by the [`Repository`].
42///
43/// [`Repository`]: ../struct.Repository.html
44pub type Result<T, A, S> =
45    std::result::Result<T, Error<<A as Aggregate>::Error, <S as EventStore>::Error>>;
46
47/// Implementation of the [Repository pattern] for storing, retrieving
48/// and deleting [`Aggregate`]s.
49///
50/// A `Repository` instruments an [`EventStore`] to:
51///
52/// * **Insert** [`Event`]s in the [`EventStore`] for an Aggregate, using the [`AggregateRoot`],
53/// * **Get** all the [`Event`]s in the [`EventStore`] and rebuild the [`State`] of an Aggregate,
54/// into a new [`AggregateRoot`] instance,
55/// * **Remove** all the [`Event`]s for an Aggregate in the [`EventStore`].
56///
57/// [Repository pattern]: https://docs.microsoft.com/en-us/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/infrastructure-persistence-layer-design#the-repository-pattern
58/// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html
59/// [`Aggregate`]: ../aggregate/trait.Aggregate.html
60/// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State
61/// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
62/// [`EventStore`]: ../store/trait.EventStore.html
63#[derive(Clone)]
64pub struct Repository<T, Store>
65where
66    T: Aggregate + Clone + 'static,
67    Store: EventStore<SourceId = T::Id, Event = T::Event>,
68{
69    builder: AggregateRootBuilder<T>,
70    store: Store,
71}
72
73impl<T, Store> Repository<T, Store>
74where
75    T: Aggregate + Clone,
76    Store: EventStore<SourceId = T::Id, Event = T::Event>,
77{
78    /// Creates a new `Repository` instance, using the [`Aggregate`]
79    /// and [`EventStore`] provided.
80    ///
81    /// [`EventStore`]: ../store/trait.EventStore.html
82    /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
83    #[inline]
84    pub fn new(builder: AggregateRootBuilder<T>, store: Store) -> Self {
85        Repository { builder, store }
86    }
87}
88
89impl<T, Store> Repository<T, Store>
90where
91    T: Aggregate + Clone,
92    T::Id: Debug + Clone,
93    T::Event: Clone,
94    T::Error: std::error::Error + 'static,
95    Store: EventStore<SourceId = T::Id, Event = T::Event>,
96    Store::Error: std::error::Error + 'static,
97{
98    /// Returns the [`Aggregate`] from the `Repository` with the specified id,
99    /// if any.
100    ///
101    /// In case the [`Aggregate`] with the specified id exists, returns
102    /// a new [`AggregateRoot`] instance with its latest [`State`].
103    ///
104    /// Otherwise, `None` is returned.
105    ///
106    /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
107    /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State
108    /// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html
109    #[cfg_attr(
110        feature = "with-tracing",
111        tracing::instrument(level = "info", name = "Repository::get", skip(self))
112    )]
113    pub async fn get(&self, id: T::Id) -> Result<AggregateRoot<T>, T, Store> {
114        self.store
115            .stream(id.clone(), Select::All)
116            .await
117            .map_err(Error::Store)?
118            // Re-map any errors from the Stream into a Repository error
119            .map_err(Error::Store)
120            // Try to fold all the Events into an Aggregate State.
121            .try_fold(
122                (0, T::State::default()),
123                |(version, state), event| async move {
124                    // Always consider the max version number for the next version.
125                    let new_version = std::cmp::max(event.version(), version);
126                    let state = T::apply(state, event.take()).map_err(Error::Aggregate)?;
127
128                    Ok((new_version, state))
129                },
130            )
131            .await
132            // ...and map the State to a new AggregateRoot only if there is
133            // at least one Event coming from the Event Stream.
134            .map(|(version, state)| self.builder.build_with_state(id, version, state))
135    }
136
137    /// Adds a new [`State`] of the [`Aggregate`] into the `Repository`,
138    /// through an [`AggregateRoot`] instance.
139    ///
140    /// Returns [`Error::NoEvents`] if there are no [`Event`]s to commit
141    /// in the [`AggregateRoot`].
142    ///
143    /// [`Error::NoEvents`]: ../repository/enum.Error.html#variant.NoEvents
144    /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
145    /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State
146    /// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
147    /// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html
148    #[cfg_attr(
149        feature = "with-tracing",
150        tracing::instrument(level = "info", name = "Repository::add", skip(self, root))
151    )]
152    pub async fn add(&mut self, mut root: AggregateRoot<T>) -> Result<AggregateRoot<T>, T, Store> {
153        let mut version = root.version();
154        let events_to_commit = root.take_events_to_commit();
155
156        if let Some(events) = events_to_commit {
157            if !events.is_empty() {
158                // Version is incremented at each events flush by the EventStore.
159                version = self
160                    .store
161                    .append(root.id().clone(), Expected::Exact(version), events)
162                    .await
163                    .map_err(Error::Store)?;
164            }
165        }
166
167        Ok(root.with_version(version))
168    }
169
170    /// Removes the specified [`Aggregate`] from the `Repository`,
171    /// using the provided [`AggregateId`].
172    ///
173    /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
174    /// [`AggregateId`]: ../aggregate/type.AggregateId.html
175    #[cfg_attr(
176        feature = "with-tracing",
177        tracing::instrument(level = "info", name = "Repository::remove", skip(self))
178    )]
179    pub async fn remove(&mut self, id: T::Id) -> Result<(), T, Store> {
180        self.store.remove(id).await.map_err(Error::Store)
181    }
182}