1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use crate::aggregate::{
    Aggregate, AggregateType, Generation, InitializeAggregate, VersionedAggregate, WithAggregateId,
};
use crate::command::{DomainCommand, HandleCommand};
use crate::event::{wrap_events, DomainEvent, EventType, Sequence};
use crate::store::{EventSink, EventSource};
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display};

pub trait DispatchEvent<E, A>
where
    E: EventType,
    A: WithAggregateId,
{
    fn dispatch(&self, event: DomainEvent<E, A>);
}

pub trait DispatchCommand<C, A> {
    type Context;
    type Output;
    type Error: std::error::Error;

    fn dispatch_command(
        &self,
        command: C,
        context: &Self::Context,
    ) -> Result<Self::Output, Self::Error>;
}

#[derive(thiserror::Error, Debug, PartialEq, Serialize, Deserialize)]
pub enum CoreError<R, W, H>
where
    R: Debug + Display,
    W: Debug + Display,
    H: Debug + Display,
{
    #[error("failed to replay aggregate from event store: {0}")]
    ReplayAggregateFailed(R),
    #[error("failed to append events to the event store: {0}")]
    AppendEventsFailed(W),
    #[error("failed to read events from the event store: {0}")]
    ReadEventsFailed(R),
    #[error("handling of command failed: {0}")]
    HandleCommandFailed(H),
    #[error("actual aggregate version ({actual}) does not match the assumed version ({assumed})")]
    GenerationConflict {
        assumed: Generation,
        actual: Generation,
    },
}

pub type CoreDispatchError<S, C, A> = CoreError<
    <S as EventSource<
        <VersionedAggregate<A> as HandleCommand<C, VersionedAggregate<A>>>::Event,
        VersionedAggregate<A>,
    >>::Error,
    <S as EventSink<
        <VersionedAggregate<A> as HandleCommand<C, VersionedAggregate<A>>>::Event,
        VersionedAggregate<A>,
    >>::Error,
    <A as HandleCommand<C, A>>::Error,
>;

#[derive(Debug)]
pub struct Core<S> {
    event_store: S,
}

impl<S> Core<S> {
    pub fn new(event_store: S) -> Self {
        Self { event_store }
    }
}

impl<C, A, S> DispatchCommand<DomainCommand<C, A>, A> for Core<S>
where
    A: Aggregate<<A as HandleCommand<C, A>>::Event>
        + AggregateType
        + WithAggregateId
        + HandleCommand<C, A>
        + InitializeAggregate<State = A>,
    <A as HandleCommand<C, A>>::Event: 'static + EventType,
    S: EventSource<
            <VersionedAggregate<A> as HandleCommand<C, VersionedAggregate<A>>>::Event,
            VersionedAggregate<A>,
        > + EventSink<
            <VersionedAggregate<A> as HandleCommand<C, VersionedAggregate<A>>>::Event,
            VersionedAggregate<A>,
        >,
{
    type Context = <A as HandleCommand<C, A>>::Context;
    type Output = VersionedAggregate<A>;
    type Error = CoreDispatchError<S, C, A>;

    fn dispatch_command(
        &self,
        DomainCommand {
            aggregate_id,
            aggregate_generation,
            data,
        }: DomainCommand<C, A>,
        context: &Self::Context,
    ) -> Result<Self::Output, Self::Error> {
        // Replay aggregate
        let mut aggregate = VersionedAggregate::initialize(aggregate_id.clone());
        self.event_store
            .read(&aggregate_id, &mut aggregate)
            .map_err(CoreError::ReplayAggregateFailed)?;

        // Check for generation conflict, which means whether the command was
        // issued based on the current state of the aggregate
        if aggregate_generation != aggregate.generation() {
            return Err(CoreError::GenerationConflict {
                assumed: aggregate_generation,
                actual: aggregate.generation(),
            });
        }

        let mut current_sequence = Sequence::from(aggregate.generation());
        let offset = current_sequence;

        // Handle the command
        let new_events = aggregate
            .handle_command(data, context)
            .map_err(CoreError::HandleCommandFailed)?;
        let domain_events = wrap_events(&mut current_sequence, new_events);

        // Save new events in the event store
        self.event_store
            .append_batch(domain_events)
            .map_err(CoreError::AppendEventsFailed)?;

        // Apply new domain events
        self.event_store
            .read_from_offset(&aggregate_id, offset, &mut aggregate)
            .map_err(CoreError::ReadEventsFailed)?;

        Ok(aggregate)
    }
}

#[cfg(test)]
mod tests;