1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
//! Contains the [Repository pattern] implementation for [`AggregateRoot`]
//! instances.
//!
//! Check out [`Repository`] for more information.
//!
//! [`Repository`]: struct.Repository.html
//! [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html
//! [Repository pattern]: https://docs.microsoft.com/en-us/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/infrastructure-persistence-layer-design#the-repository-pattern

use std::fmt::Debug;

use futures::stream::TryStreamExt;

use crate::aggregate::{Aggregate, AggregateRoot, AggregateRootBuilder};
use crate::store::{EventStore, Expected, Select};
use crate::versioning::Versioned;

/// Error type returned by the [`Repository`].
///
/// [`Repository`]: trait.Repository.html
#[derive(Debug, thiserror::Error)]
pub enum Error<A, S>
where
    A: std::error::Error + 'static,
    S: std::error::Error + 'static,
{
    /// Error returned by the [`Aggregate`], usually when recreating the [`State`].
    ///
    /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
    /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State
    #[error("failed to rebuild aggregate state: {0}")]
    Aggregate(#[source] A),

    /// Error returned by the underlying [`EventStore`].
    ///
    /// [`EventStore`]: ../store/trait.EventStore.html
    #[error("event store failed: {0}")]
    Store(#[source] S),
}

/// Result type returned by the [`Repository`].
///
/// [`Repository`]: ../struct.Repository.html
pub type Result<T, A, S> =
    std::result::Result<T, Error<<A as Aggregate>::Error, <S as EventStore>::Error>>;

/// Implementation of the [Repository pattern] for storing, retrieving
/// and deleting [`Aggregate`]s.
///
/// A `Repository` instruments an [`EventStore`] to:
///
/// * **Insert** [`Event`]s in the [`EventStore`] for an Aggregate, using the [`AggregateRoot`],
/// * **Get** all the [`Event`]s in the [`EventStore`] and rebuild the [`State`] of an Aggregate,
/// into a new [`AggregateRoot`] instance,
/// * **Remove** all the [`Event`]s for an Aggregate in the [`EventStore`].
///
/// [Repository pattern]: https://docs.microsoft.com/en-us/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/infrastructure-persistence-layer-design#the-repository-pattern
/// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html
/// [`Aggregate`]: ../aggregate/trait.Aggregate.html
/// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State
/// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
/// [`EventStore`]: ../store/trait.EventStore.html
#[derive(Clone)]
pub struct Repository<T, Store>
where
    T: Aggregate + Clone + 'static,
    Store: EventStore<SourceId = T::Id, Event = T::Event>,
{
    builder: AggregateRootBuilder<T>,
    store: Store,
}

impl<T, Store> Repository<T, Store>
where
    T: Aggregate + Clone,
    Store: EventStore<SourceId = T::Id, Event = T::Event>,
{
    /// Creates a new `Repository` instance, using the [`Aggregate`]
    /// and [`EventStore`] provided.
    ///
    /// [`EventStore`]: ../store/trait.EventStore.html
    /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
    #[inline]
    pub fn new(builder: AggregateRootBuilder<T>, store: Store) -> Self {
        Repository { builder, store }
    }
}

impl<T, Store> Repository<T, Store>
where
    T: Aggregate + Clone,
    T::Id: Clone,
    T::Event: Clone,
    T::Error: std::error::Error + 'static,
    Store: EventStore<SourceId = T::Id, Event = T::Event>,
    Store::Error: std::error::Error + 'static,
{
    /// Returns the [`Aggregate`] from the `Repository` with the specified id,
    /// if any.
    ///
    /// In case the [`Aggregate`] with the specified id exists, returns
    /// a new [`AggregateRoot`] instance with its latest [`State`].
    ///
    /// Otherwise, `None` is returned.
    ///
    /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
    /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State
    /// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html
    pub async fn get(&self, id: T::Id) -> Result<AggregateRoot<T>, T, Store> {
        self.store
            .stream(id.clone(), Select::All)
            .await
            .map_err(Error::Store)?
            // Re-map any errors from the Stream into a Repository error
            .map_err(Error::Store)
            // Try to fold all the Events into an Aggregate State.
            .try_fold(
                (0, T::State::default()),
                |(version, state), event| async move {
                    // Always consider the max version number for the next version.
                    let new_version = std::cmp::max(event.version(), version);
                    let state = T::apply(state, event.take()).map_err(Error::Aggregate)?;

                    Ok((new_version, state))
                },
            )
            .await
            // ...and map the State to a new AggregateRoot only if there is
            // at least one Event coming from the Event Stream.
            .map(|(version, state)| self.builder.build_with_state(id, version, state))
    }

    /// Adds a new [`State`] of the [`Aggregate`] into the `Repository`,
    /// through an [`AggregateRoot`] instance.
    ///
    /// Returns [`Error::NoEvents`] if there are no [`Event`]s to commit
    /// in the [`AggregateRoot`].
    ///
    /// [`Error::NoEvents`]: ../repository/enum.Error.html#variant.NoEvents
    /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
    /// [`State`]: ../aggregate/trait.Aggregate.html#associatedtype.State
    /// [`Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
    /// [`AggregateRoot`]: ../aggregate/struct.AggregateRoot.html
    pub async fn add(&mut self, mut root: AggregateRoot<T>) -> Result<AggregateRoot<T>, T, Store> {
        let mut version = root.version();
        let events_to_commit = root.take_events_to_commit();

        if let Some(events) = events_to_commit {
            if !events.is_empty() {
                // Version is incremented at each events flush by the EventStore.
                version = self
                    .store
                    .append(root.id().clone(), Expected::Exact(version), events)
                    .await
                    .map_err(Error::Store)?;
            }
        }

        Ok(root.with_version(version))
    }

    /// Removes the specified [`Aggregate`] from the `Repository`,
    /// using the provided [`AggregateId`].
    ///
    /// [`Aggregate`]: ../aggregate/trait.Aggregate.html
    /// [`AggregateId`]: ../aggregate/type.AggregateId.html
    pub async fn remove(&mut self, id: T::Id) -> Result<(), T, Store> {
        self.store.remove(id).await.map_err(Error::Store)
    }
}