eventually_core/repository.rs
1//! Contains the [Repository pattern] implementation for [`AggregateRoot`]
2//! instances.
3//!
4//! Check out [`Repository`] for more information.
5//!
6//! [`Repository`]: struct.Repository.html
7//! [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html
8//! [Repository pattern]: https://docs.microsoft.com/en-us/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/infrastructure-persistence-layer-design#the-repository-pattern
9
10use std::fmt::Debug;
11
12use futures::stream::TryStreamExt;
13
14use crate::aggregate::{Aggregate, AggregateRoot, AggregateRootBuilder};
15use crate::store::{EventStore, Expected, Select};
16use crate::versioning::Versioned;
17
18/// Error type returned by the [`Repository`].
19///
20/// [`Repository`]: trait.Repository.html
21#[derive(Debug, thiserror::Error)]
22pub enum Error<A, S>
23where
24 A: std::error::Error + 'static,
25 S: std::error::Error + 'static,
26{
27 /// Error returned by the [`Aggregate`], usually when recreating the [`State`].
28 ///
29 /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
30 /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State
31 #[error("failed to rebuild aggregate state: {0}")]
32 Aggregate(#[source] A),
33
34 /// Error returned by the underlying [`EventStore`].
35 ///
36 /// [`EventStore`]: ../store/trait.EventStore.html
37 #[error("event store failed: {0}")]
38 Store(#[source] S),
39}
40
41/// Result type returned by the [`Repository`].
42///
43/// [`Repository`]: ../struct.Repository.html
44pub type Result<T, A, S> =
45 std::result::Result<T, Error<<A as Aggregate>::Error, <S as EventStore>::Error>>;
46
47/// Implementation of the [Repository pattern] for storing, retrieving
48/// and deleting [`Aggregate`]s.
49///
50/// A `Repository` instruments an [`EventStore`] to:
51///
52/// * **Insert** [`Event`]s in the [`EventStore`] for an Aggregate, using the [`AggregateRoot`],
53/// * **Get** all the [`Event`]s in the [`EventStore`] and rebuild the [`State`] of an Aggregate,
54/// into a new [`AggregateRoot`] instance,
55/// * **Remove** all the [`Event`]s for an Aggregate in the [`EventStore`].
56///
57/// [Repository pattern]: https://docs.microsoft.com/en-us/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/infrastructure-persistence-layer-design#the-repository-pattern
58/// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html
59/// [`Aggregate`]: ../aggregate/trait.Aggregate.html
60/// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State
61/// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
62/// [`EventStore`]: ../store/trait.EventStore.html
63#[derive(Clone)]
64pub struct Repository<T, Store>
65where
66 T: Aggregate + Clone + 'static,
67 Store: EventStore<SourceId = T::Id, Event = T::Event>,
68{
69 builder: AggregateRootBuilder<T>,
70 store: Store,
71}
72
73impl<T, Store> Repository<T, Store>
74where
75 T: Aggregate + Clone,
76 Store: EventStore<SourceId = T::Id, Event = T::Event>,
77{
78 /// Creates a new `Repository` instance, using the [`Aggregate`]
79 /// and [`EventStore`] provided.
80 ///
81 /// [`EventStore`]: ../store/trait.EventStore.html
82 /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
83 #[inline]
84 pub fn new(builder: AggregateRootBuilder<T>, store: Store) -> Self {
85 Repository { builder, store }
86 }
87}
88
89impl<T, Store> Repository<T, Store>
90where
91 T: Aggregate + Clone,
92 T::Id: Debug + Clone,
93 T::Event: Clone,
94 T::Error: std::error::Error + 'static,
95 Store: EventStore<SourceId = T::Id, Event = T::Event>,
96 Store::Error: std::error::Error + 'static,
97{
98 /// Returns the [`Aggregate`] from the `Repository` with the specified id,
99 /// if any.
100 ///
101 /// In case the [`Aggregate`] with the specified id exists, returns
102 /// a new [`AggregateRoot`] instance with its latest [`State`].
103 ///
104 /// Otherwise, `None` is returned.
105 ///
106 /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
107 /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State
108 /// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html
109 #[cfg_attr(
110 feature = "with-tracing",
111 tracing::instrument(level = "info", name = "Repository::get", skip(self))
112 )]
113 pub async fn get(&self, id: T::Id) -> Result<AggregateRoot<T>, T, Store> {
114 self.store
115 .stream(id.clone(), Select::All)
116 .await
117 .map_err(Error::Store)?
118 // Re-map any errors from the Stream into a Repository error
119 .map_err(Error::Store)
120 // Try to fold all the Events into an Aggregate State.
121 .try_fold(
122 (0, T::State::default()),
123 |(version, state), event| async move {
124 // Always consider the max version number for the next version.
125 let new_version = std::cmp::max(event.version(), version);
126 let state = T::apply(state, event.take()).map_err(Error::Aggregate)?;
127
128 Ok((new_version, state))
129 },
130 )
131 .await
132 // ...and map the State to a new AggregateRoot only if there is
133 // at least one Event coming from the Event Stream.
134 .map(|(version, state)| self.builder.build_with_state(id, version, state))
135 }
136
137 /// Adds a new [`State`] of the [`Aggregate`] into the `Repository`,
138 /// through an [`AggregateRoot`] instance.
139 ///
140 /// Returns [`Error::NoEvents`] if there are no [`Event`]s to commit
141 /// in the [`AggregateRoot`].
142 ///
143 /// [`Error::NoEvents`]: ../repository/enum.Error.html#variant.NoEvents
144 /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
145 /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State
146 /// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
147 /// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html
148 #[cfg_attr(
149 feature = "with-tracing",
150 tracing::instrument(level = "info", name = "Repository::add", skip(self, root))
151 )]
152 pub async fn add(&mut self, mut root: AggregateRoot<T>) -> Result<AggregateRoot<T>, T, Store> {
153 let mut version = root.version();
154 let events_to_commit = root.take_events_to_commit();
155
156 if let Some(events) = events_to_commit {
157 if !events.is_empty() {
158 // Version is incremented at each events flush by the EventStore.
159 version = self
160 .store
161 .append(root.id().clone(), Expected::Exact(version), events)
162 .await
163 .map_err(Error::Store)?;
164 }
165 }
166
167 Ok(root.with_version(version))
168 }
169
170 /// Removes the specified [`Aggregate`] from the `Repository`,
171 /// using the provided [`AggregateId`].
172 ///
173 /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
174 /// [`AggregateId`]: ../aggregate/type.AggregateId.html
175 #[cfg_attr(
176 feature = "with-tracing",
177 tracing::instrument(level = "info", name = "Repository::remove", skip(self))
178 )]
179 pub async fn remove(&mut self, id: T::Id) -> Result<(), T, Store> {
180 self.store.remove(id).await.map_err(Error::Store)
181 }
182}