use crate::aggregate::{
Aggregate, AggregateType, Generation, InitializeAggregate, VersionedAggregate, WithAggregateId,
};
use crate::command::{DomainCommand, HandleCommand};
use crate::event::{wrap_events, DomainEvent, EventType, Sequence};
use crate::store::{EventSink, EventSource};
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display};
pub trait DispatchEvent<E, A>
where
E: EventType,
A: WithAggregateId,
{
fn dispatch(&self, event: DomainEvent<E, A>);
}
pub trait DispatchCommand<C, A> {
type Context;
type Output;
type Error: std::error::Error;
fn dispatch_command(
&self,
command: C,
context: &Self::Context,
) -> Result<Self::Output, Self::Error>;
}
#[derive(thiserror::Error, Debug, PartialEq, Serialize, Deserialize)]
pub enum CoreError<R, W, H>
where
R: Debug + Display,
W: Debug + Display,
H: Debug + Display,
{
#[error("failed to replay aggregate from event store: {0}")]
ReplayAggregateFailed(R),
#[error("failed to append events to the event store: {0}")]
AppendEventsFailed(W),
#[error("failed to read events from the event store: {0}")]
ReadEventsFailed(R),
#[error("handling of command failed: {0}")]
HandleCommandFailed(H),
#[error("actual aggregate version ({actual}) does not match the assumed version ({assumed})")]
GenerationConflict {
assumed: Generation,
actual: Generation,
},
}
pub type CoreDispatchError<S, C, A> = CoreError<
<S as EventSource<
<VersionedAggregate<A> as HandleCommand<C, VersionedAggregate<A>>>::Event,
VersionedAggregate<A>,
>>::Error,
<S as EventSink<
<VersionedAggregate<A> as HandleCommand<C, VersionedAggregate<A>>>::Event,
VersionedAggregate<A>,
>>::Error,
<A as HandleCommand<C, A>>::Error,
>;
#[derive(Debug)]
pub struct Core<S> {
event_store: S,
}
impl<S> Core<S> {
pub fn new(event_store: S) -> Self {
Self { event_store }
}
}
impl<C, A, S> DispatchCommand<DomainCommand<C, A>, A> for Core<S>
where
A: Aggregate<<A as HandleCommand<C, A>>::Event>
+ AggregateType
+ WithAggregateId
+ HandleCommand<C, A>
+ InitializeAggregate<State = A>,
<A as HandleCommand<C, A>>::Event: 'static + EventType,
S: EventSource<
<VersionedAggregate<A> as HandleCommand<C, VersionedAggregate<A>>>::Event,
VersionedAggregate<A>,
> + EventSink<
<VersionedAggregate<A> as HandleCommand<C, VersionedAggregate<A>>>::Event,
VersionedAggregate<A>,
>,
{
type Context = <A as HandleCommand<C, A>>::Context;
type Output = VersionedAggregate<A>;
type Error = CoreDispatchError<S, C, A>;
fn dispatch_command(
&self,
DomainCommand {
aggregate_id,
aggregate_generation,
data,
}: DomainCommand<C, A>,
context: &Self::Context,
) -> Result<Self::Output, Self::Error> {
let mut aggregate = VersionedAggregate::initialize(aggregate_id.clone());
self.event_store
.read(&aggregate_id, &mut aggregate)
.map_err(CoreError::ReplayAggregateFailed)?;
if aggregate_generation != aggregate.generation() {
return Err(CoreError::GenerationConflict {
assumed: aggregate_generation,
actual: aggregate.generation(),
});
}
let mut current_sequence = Sequence::from(aggregate.generation());
let offset = current_sequence;
let new_events = aggregate
.handle_command(data, context)
.map_err(CoreError::HandleCommandFailed)?;
let domain_events = wrap_events(&mut current_sequence, new_events);
self.event_store
.append_batch(domain_events)
.map_err(CoreError::AppendEventsFailed)?;
self.event_store
.read_from_offset(&aggregate_id, offset, &mut aggregate)
.map_err(CoreError::ReadEventsFailed)?;
Ok(aggregate)
}
}
#[cfg(test)]
mod tests;