1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
//! Contains the [Repository pattern] implementation for [`AggregateRoot`] //! instances. //! //! Check out [`Repository`] for more information. //! //! [`Repository`]: struct.Repository.html //! [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html //! [Repository pattern]: https://docs.microsoft.com/en-us/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/infrastructure-persistence-layer-design#the-repository-pattern use std::fmt::Debug; use futures::stream::TryStreamExt; use crate::aggregate::{Aggregate, AggregateRoot, AggregateRootBuilder}; use crate::store::{EventStore, Expected, Select}; use crate::versioning::Versioned; /// Error type returned by the [`Repository`]. /// /// [`Repository`]: trait.Repository.html #[derive(Debug, thiserror::Error)] pub enum Error<A, S> where A: std::error::Error + 'static, S: std::error::Error + 'static, { /// Error returned by the [`Aggregate`], usually when recreating the [`State`]. /// /// [`Aggregate`]: ../aggregate/trait.Aggregate.html /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State #[error("failed to rebuild aggregate state: {0}")] Aggregate(#[source] A), /// Error returned by the underlying [`EventStore`]. /// /// [`EventStore`]: ../store/trait.EventStore.html #[error("event store failed: {0}")] Store(#[source] S), } /// Result type returned by the [`Repository`]. /// /// [`Repository`]: ../struct.Repository.html pub type Result<T, A, S> = std::result::Result<T, Error<<A as Aggregate>::Error, <S as EventStore>::Error>>; /// Implementation of the [Repository pattern] for storing, retrieving /// and deleting [`Aggregate`]s. /// /// A `Repository` instruments an [`EventStore`] to: /// /// * **Insert** [`Event`]s in the [`EventStore`] for an Aggregate, using the [`AggregateRoot`], /// * **Get** all the [`Event`]s in the [`EventStore`] and rebuild the [`State`] of an Aggregate, /// into a new [`AggregateRoot`] instance, /// * **Remove** all the [`Event`]s for an Aggregate in the [`EventStore`]. /// /// [Repository pattern]: https://docs.microsoft.com/en-us/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/infrastructure-persistence-layer-design#the-repository-pattern /// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html /// [`Aggregate`]: ../aggregate/trait.Aggregate.html /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State /// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event /// [`EventStore`]: ../store/trait.EventStore.html #[derive(Clone)] pub struct Repository<T, Store> where T: Aggregate + Clone + 'static, Store: EventStore<SourceId = T::Id, Event = T::Event>, { builder: AggregateRootBuilder<T>, store: Store, } impl<T, Store> Repository<T, Store> where T: Aggregate + Clone, Store: EventStore<SourceId = T::Id, Event = T::Event>, { /// Creates a new `Repository` instance, using the [`Aggregate`] /// and [`EventStore`] provided. /// /// [`EventStore`]: ../store/trait.EventStore.html /// [`Aggregate`]: ../aggregate/trait.Aggregate.html #[inline] pub fn new(builder: AggregateRootBuilder<T>, store: Store) -> Self { Repository { builder, store } } } impl<T, Store> Repository<T, Store> where T: Aggregate + Clone, T::Id: Clone, T::Event: Clone, T::Error: std::error::Error + 'static, Store: EventStore<SourceId = T::Id, Event = T::Event>, Store::Error: std::error::Error + 'static, { /// Returns the [`Aggregate`] from the `Repository` with the specified id, /// if any. /// /// In case the [`Aggregate`] with the specified id exists, returns /// a new [`AggregateRoot`] instance with its latest [`State`]. /// /// Otherwise, `None` is returned. /// /// [`Aggregate`]: ../aggregate/trait.Aggregate.html /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State /// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html pub async fn get(&self, id: T::Id) -> Result<AggregateRoot<T>, T, Store> { self.store .stream(id.clone(), Select::All) .await .map_err(Error::Store)? // Re-map any errors from the Stream into a Repository error .map_err(Error::Store) // Try to fold all the Events into an Aggregate State. .try_fold( (0, T::State::default()), |(version, state), event| async move { // Always consider the max version number for the next version. let new_version = std::cmp::max(event.version(), version); let state = T::apply(state, event.take()).map_err(Error::Aggregate)?; Ok((new_version, state)) }, ) .await // ...and map the State to a new AggregateRoot only if there is // at least one Event coming from the Event Stream. .map(|(version, state)| self.builder.build_with_state(id, version, state)) } /// Adds a new [`State`] of the [`Aggregate`] into the `Repository`, /// through an [`AggregateRoot`] instance. /// /// Returns [`Error::NoEvents`] if there are no [`Event`]s to commit /// in the [`AggregateRoot`]. /// /// [`Error::NoEvents`]: ../repository/enum.Error.html#variant.NoEvents /// [`Aggregate`]: ../aggregate/trait.Aggregate.html /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State /// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event /// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html pub async fn add(&mut self, mut root: AggregateRoot<T>) -> Result<AggregateRoot<T>, T, Store> { let mut version = root.version(); let events_to_commit = root.take_events_to_commit(); if let Some(events) = events_to_commit { if !events.is_empty() { // Version is incremented at each events flush by the EventStore. version = self .store .append(root.id().clone(), Expected::Exact(version), events) .await .map_err(Error::Store)?; } } Ok(root.with_version(version)) } /// Removes the specified [`Aggregate`] from the `Repository`, /// using the provided [`AggregateId`]. /// /// [`Aggregate`]: ../aggregate/trait.Aggregate.html /// [`AggregateId`]: ../aggregate/type.AggregateId.html pub async fn remove(&mut self, id: T::Id) -> Result<(), T, Store> { self.store.remove(id).await.map_err(Error::Store) } }