1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
use std::fmt::Debug;

use async_trait::async_trait;
use serde::de::DeserializeOwned;
use serde::Serialize;
use uuid::Uuid;

use crate::esrs::state::AggregateState;
use crate::esrs::store::{EventStore, StoreEvent};
use crate::esrs::SequenceNumber;

/// The Identifier trait is responsible for naming an aggregate type.
/// Each aggregate type should have an identifier that is unique among all the aggregate types in your application.
///
/// Aggregates are linked to their instances & events using their `name` and their `aggregate_id`.  Be very careful when changing
/// `name`, as doing so will break the link between all the aggregates of their type, and their events!
pub trait Identifier {
    /// Returns the aggregate name
    fn name() -> &'static str
    where
        Self: Sized;
}

#[async_trait]
/// The Eraser trait is responsible for erasing an aggregate instance from history.
pub trait Eraser<
    Event: Serialize + DeserializeOwned + Send + Sync,
    Error: From<sqlx::Error> + From<serde_json::Error> + Send + Sync,
>
{
    /// `delete` should either complete the aggregate instance, along with all its associated events, or fail.
    /// If the deletion succeeds only partially, it _must_ return an error.
    async fn delete(&self, aggregate_id: Uuid) -> Result<(), Error>;
}

/// The AggregateManager is responsible for loading an aggregate from the store, mapping commands to events, and
/// persisting those events in the store.  Be careful when implenting this trait, as you will be responsible for
/// threading AggregateState/Commands/Events correctly.  For example, a bad implementation could result in an AggregateState
/// that is not replicated on load.
///
/// Unless you need to perform side effects as part of your command handling/verification you should implement the
/// safer `Aggregate` trait instead.
#[async_trait]
pub trait AggregateManager: Identifier {
    type State: Default + Clone + Debug + Send + Sync;
    type Command: Send + Sync;
    type Event: Serialize + DeserializeOwned + Send + Sync;
    type Error: Send + Sync;

    /// Returns the event store, configured for the aggregate
    fn event_store(&self) -> &(dyn EventStore<Self::Event, Self::Error> + Send + Sync);

    /// This function applies the event onto the aggregate and returns a new one, updated with the event data
    fn apply_event(id: &Uuid, state: Self::State, event: &StoreEvent<Self::Event>) -> Self::State;

    /// Validation should reject any command is inconsitent with the current aggregate state, or would result
    /// in one or more events that could not be applied onto the aggregate state.
    fn validate_command(aggregate_state: &AggregateState<Self::State>, cmd: &Self::Command) -> Result<(), Self::Error>;

    async fn do_handle_command(
        &self,
        aggregate_state: AggregateState<Self::State>,
        cmd: Self::Command,
    ) -> Result<AggregateState<Self::State>, Self::Error>;

    async fn handle_command(
        &self,
        aggregate_state: AggregateState<Self::State>,
        cmd: Self::Command,
    ) -> Result<AggregateState<Self::State>, Self::Error> {
        Self::validate_command(&aggregate_state, &cmd)?;
        self.do_handle_command(aggregate_state, cmd).await
    }

    /// Responsible for applying events in order onto the aggregate state, and incrementing the sequence number.
    /// You should avoid implementing this method, and be _very_ careful if you decide to do so.
    ///
    /// `events` will be passed in order of ascending sequence number.
    fn apply_events(
        aggregate_state: AggregateState<Self::State>,
        events: Vec<StoreEvent<Self::Event>>,
    ) -> AggregateState<Self::State> {
        let aggregate_id: &Uuid = &aggregate_state.id;
        let inner: Self::State = events.iter().fold(
            aggregate_state.inner,
            |acc: Self::State, event: &StoreEvent<Self::Event>| Self::apply_event(aggregate_id, acc, event),
        );

        AggregateState {
            inner,
            sequence_number: events.last().map_or(0, |e| e.sequence_number()),
            ..aggregate_state
        }
    }

    /// Loads an aggregate instance from the event store, by applying previously persisted events onto
    /// the aggregate state by order of their sequence number
    async fn load(&self, aggregate_id: Uuid) -> Option<AggregateState<Self::State>> {
        let events: Vec<StoreEvent<Self::Event>> = self
            .event_store()
            .by_aggregate_id(aggregate_id)
            .await
            .ok()?
            .into_iter()
            .collect();

        if events.is_empty() {
            None
        } else {
            Some(Self::apply_events(AggregateState::new(aggregate_id), events))
        }
    }

    /// Persits an event into the event store - recording it in the aggregate instance's history.
    async fn persist(
        &self,
        aggregate_state: AggregateState<Self::State>,
        events: Vec<Self::Event>,
    ) -> Result<AggregateState<Self::State>, Self::Error> {
        let next_sequence_number: SequenceNumber = aggregate_state.sequence_number + 1;
        let events = self
            .event_store()
            .persist(aggregate_state.id, events, next_sequence_number)
            .await?;

        Ok(Self::apply_events(aggregate_state, events))
    }
}

/// The Aggregate trait is responsible for validating commands, mapping commands to events, and applying
/// events onto the aggregate state.
///
/// An Aggregate should be able to derive its own state from nothing but its initial configuration, and its
/// event stream.  Applying the same events, in the same order, to the same aggregate, should always yield an
/// identical aggregate state.
///
/// This trait is purposfully _synchronous_.  If you are implementing this trait, your aggregate
/// should not have any side effects.  If you additional information to handle commands correctly, then
/// consider either looking up that information and placing it in the command, or if that is not an option
/// you can implement the more powerful _asynchronous_ `AggregateManager` trait - it will then be your responsibility to
/// uphold the Aggregate _invariants_.
pub trait Aggregate {
    type State: Default + Clone + Debug + Send + Sync;
    type Command: Send + Sync;
    type Event: Serialize + DeserializeOwned + Send + Sync;
    type Error: Send + Sync;

    /// Event store configured for aggregate - required for the default implementation of AggregateManager
    fn event_store(&self) -> &(dyn EventStore<Self::Event, Self::Error> + Send + Sync);

    /// Updates the aggregate state using the new event.
    fn apply_event(state: Self::State, event: &Self::Event) -> Self::State;

    /// Validates a command against the current aggregate state.  The aggregate must be able to handle the command
    /// if validation succeeds.
    fn validate_command(aggregate_state: &AggregateState<Self::State>, cmd: &Self::Command) -> Result<(), Self::Error>;

    /// Handles a validated command, and emits events.
    fn handle_command(&self, aggregate_state: &AggregateState<Self::State>, cmd: Self::Command) -> Vec<Self::Event>;
}

#[async_trait]
impl<T: Aggregate + Sync + Identifier> AggregateManager for T {
    type State = T::State;
    type Command = T::Command;
    type Event = T::Event;
    type Error = T::Error;

    fn event_store(&self) -> &(dyn EventStore<Self::Event, Self::Error> + Send + Sync) {
        self.event_store()
    }

    fn apply_event(_id: &Uuid, state: Self::State, event: &StoreEvent<Self::Event>) -> Self::State {
        T::apply_event(state, event.payload())
    }

    fn validate_command(aggregate_state: &AggregateState<Self::State>, cmd: &Self::Command) -> Result<(), Self::Error> {
        T::validate_command(aggregate_state, cmd)
    }

    async fn do_handle_command(
        &self,
        aggregate_state: AggregateState<Self::State>,
        cmd: Self::Command,
    ) -> Result<AggregateState<T::State>, T::Error> {
        let events = Aggregate::handle_command(self, &aggregate_state, cmd);
        AggregateManager::persist(self, aggregate_state, events).await
    }

    async fn handle_command(
        &self,
        aggregate_state: AggregateState<Self::State>,
        cmd: Self::Command,
    ) -> Result<AggregateState<Self::State>, Self::Error> {
        Self::validate_command(&aggregate_state, &cmd)?;
        AggregateManager::do_handle_command(self, aggregate_state, cmd).await
    }
}