1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
//! Contains the [Repository pattern] implementation for [`AggregateRoot`] //! instances. //! //! Check out [`Repository`] for more information. //! //! [`Repository`]: struct.Repository.html //! [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html //! [Repository pattern]: https://docs.microsoft.com/en-us/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/infrastructure-persistence-layer-design#the-repository-pattern use std::error::Error as StdError; use std::fmt::Debug; use futures::future; use futures::stream::TryStreamExt; use thiserror::Error as ThisError; use crate::aggregate::{Aggregate, AggregateId, AggregateRoot, Identifiable}; use crate::store::EventStore; /// Error type returned by the [`Repository`]. /// /// [`Repository`]: trait.Repository.html #[derive(Debug, ThisError, PartialEq, Eq)] pub enum Error<A, S> where A: Aggregate + Debug, S: EventStore + Debug, A::Error: StdError + 'static, S::Error: StdError + 'static, { /// Error returned by the [`Aggregate`], usually when recreating the [`State`]. /// /// [`Aggregate`]: ../aggregate/trait.Aggregate.html /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State #[error("failed to rebuild aggregate state: {0}")] Aggregate(#[source] A::Error), /// Error returned by the underlying [`EventStore`]. /// /// [`EventStore`]: ../store/trait.EventStore.html #[error("event store failed: {0}")] Store(#[source] S::Error), /// Error returned by [`add`] method when trying to add an [`AggregateRoot`] /// that has no [`Event`]s to be flushed to the [`EventStore`]. /// /// [`add`]: struct.Repository.html#method.add /// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State /// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event /// [`EventStore`]: ../store/trait.EventStore.html #[error("no events to commit")] NoEvents, } /// Result type returned by the [`Repository`]. /// /// [`Repository`]: ../struct.Repository.html pub type Result<T, A, S> = std::result::Result<T, Error<A, S>>; /// Implementation of the [Repository pattern] for storing, retrieving /// and deleting [`Aggregate`]s. /// /// A `Repository` instruments an [`EventStore`] to: /// /// * **Insert** [`Event`]s in the [`EventStore`] for an Aggregate, using the [`AggregateRoot`], /// * **Get** all the [`Event`]s in the [`EventStore`] and rebuild the [`State`] of an Aggregate, /// into a new [`AggregateRoot`] instance, /// * **Remove** all the [`Event`]s for an Aggregate in the [`EventStore`]. /// /// [Repository pattern]: https://docs.microsoft.com/en-us/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/infrastructure-persistence-layer-design#the-repository-pattern /// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html /// [`Aggregate`]: ../aggregate/trait.Aggregate.html /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State /// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event /// [`EventStore`]: ../store/trait.EventStore.html pub struct Repository<T, Store> where T: Aggregate + 'static, { aggregate: T, store: Store, } impl<T, Store> Repository<T, Store> where T: Aggregate, Store: EventStore<SourceId = AggregateId<T>, Event = T::Event>, { /// Creates a new `Repository` instance, using the [`Aggregate`] /// and [`EventStore`] provided. /// /// [`EventStore`]: ../store/trait.EventStore.html /// [`Aggregate`]: ../aggregate/trait.Aggregate.html #[inline] pub fn new(aggregate: T, store: Store) -> Self { Repository { aggregate, store } } } impl<T, Store> Repository<T, Store> where T: Aggregate + Debug + Clone, T::Event: Clone, T::State: Default + Identifiable, T::Error: StdError + 'static, AggregateId<T>: Default, Store: EventStore<SourceId = AggregateId<T>, Event = T::Event> + Debug, Store::Offset: Default, Store::Error: StdError + 'static, { /// Returns the [`Aggregate`] from the `Repository` with the specified id, /// if any. /// /// In case the [`Aggregate`] with the specified id exists, returns /// a new [`AggregateRoot`] instance with its latest [`State`]. /// /// Otherwise, `None` is returned. /// /// [`Aggregate`]: ../aggregate/trait.Aggregate.html /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State /// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html pub async fn get(&self, id: AggregateId<T>) -> Result<Option<AggregateRoot<T>>, T, Store> { Ok(self .store .stream(id, Store::Offset::default()) .await .map_err(Error::Store)? // Re-map any errors from the Stream into a Repository error .map_err(Error::Store) // Try to fold all the Events into an Aggregate State. // // However, since the Event Store might not have any Events yet, // use an Option<T::State> with the initial value set to None... .try_fold(None, |state, event| { let state = state.unwrap_or_else(T::State::default); future::ready(T::apply(state, event).map(Some).map_err(Error::Aggregate)) }) .await? // ...and map the State to a new AggregateRoot only if there is // at least one Event coming from the Event Stream. .map(|state| AggregateRoot::new(self.aggregate.clone(), state))) } /// Adds a new [`State`] of the [`Aggregate`] into the `Repository`, /// through an [`AggregateRoot`] instance. /// /// Returns [`Error::NoEvents`] if there are no [`Event`]s to commit /// in the [`AggregateRoot`]. /// /// [`Error::NoEvents`]: ../repository/enum.Error.html#variant.NoEvents /// [`Aggregate`]: ../aggregate/trait.Aggregate.html /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State /// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event /// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html pub async fn add(&mut self, mut root: AggregateRoot<T>) -> Result<AggregateRoot<T>, T, Store> { if root.to_commit.is_none() { return Err(Error::NoEvents); } self.store .append(root.id(), root.to_commit.unwrap()) .await .map_err(Error::Store)?; root.to_commit = None; Ok(root) } /// Removes the specified [`Aggregate`] from the `Repository`, /// using the provided [`AggregateId`]. /// /// [`Aggregate`]: ../aggregate/trait.Aggregate.html /// [`AggregateId`]: ../aggregate/type.AggregateId.html pub async fn remove(&mut self, id: AggregateId<T>) -> Result<(), T, Store> { self.store.remove(id).await.map_err(Error::Store) } }