1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use crate::aggregate::{
Aggregate, AggregateType, Generation, InitializeAggregate, VersionedAggregate, WithAggregateId,
};
use crate::command::{DomainCommand, HandleCommand};
use crate::event::{wrap_events, DomainEvent, EventType, Sequence};
use crate::store::{EventSink, EventSource};
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display};
pub trait DispatchEvent<E, A>
where
E: EventType,
A: WithAggregateId,
{
fn dispatch(&self, event: DomainEvent<E, A>);
}
pub trait DispatchCommand<C, A> {
type Context;
type Output;
type Error: std::error::Error;
fn dispatch_command(
&self,
command: C,
context: &Self::Context,
) -> Result<Self::Output, Self::Error>;
}
#[derive(thiserror::Error, Debug, PartialEq, Serialize, Deserialize)]
pub enum CoreError<R, W, H>
where
R: Debug + Display,
W: Debug + Display,
H: Debug + Display,
{
#[error("failed to replay aggregate from event store: {0}")]
ReplayAggregateFailed(R),
#[error("failed to append events to the event store: {0}")]
AppendEventsFailed(W),
#[error("failed to read events from the event store: {0}")]
ReadEventsFailed(R),
#[error("handling of command failed: {0}")]
HandleCommandFailed(H),
#[error("actual aggregate version ({actual}) does not match the assumed version ({assumed})")]
GenerationConflict {
assumed: Generation,
actual: Generation,
},
}
pub type CoreDispatchError<S, C, A> = CoreError<
<S as EventSource<
<VersionedAggregate<A> as HandleCommand<C, VersionedAggregate<A>>>::Event,
VersionedAggregate<A>,
>>::Error,
<S as EventSink<
<VersionedAggregate<A> as HandleCommand<C, VersionedAggregate<A>>>::Event,
VersionedAggregate<A>,
>>::Error,
<A as HandleCommand<C, A>>::Error,
>;
#[derive(Debug)]
pub struct Core<S> {
event_store: S,
}
impl<S> Core<S> {
pub fn new(event_store: S) -> Self {
Self { event_store }
}
}
impl<C, A, S> DispatchCommand<DomainCommand<C, A>, A> for Core<S>
where
A: Aggregate<<A as HandleCommand<C, A>>::Event>
+ AggregateType
+ WithAggregateId
+ HandleCommand<C, A>
+ InitializeAggregate<State = A>,
<A as HandleCommand<C, A>>::Event: 'static + EventType,
S: EventSource<
<VersionedAggregate<A> as HandleCommand<C, VersionedAggregate<A>>>::Event,
VersionedAggregate<A>,
> + EventSink<
<VersionedAggregate<A> as HandleCommand<C, VersionedAggregate<A>>>::Event,
VersionedAggregate<A>,
>,
{
type Context = <A as HandleCommand<C, A>>::Context;
type Output = VersionedAggregate<A>;
type Error = CoreDispatchError<S, C, A>;
fn dispatch_command(
&self,
DomainCommand {
aggregate_id,
aggregate_generation,
data,
}: DomainCommand<C, A>,
context: &Self::Context,
) -> Result<Self::Output, Self::Error> {
let mut aggregate = VersionedAggregate::initialize(aggregate_id.clone());
self.event_store
.read(&aggregate_id, &mut aggregate)
.map_err(CoreError::ReplayAggregateFailed)?;
if aggregate_generation != aggregate.generation() {
return Err(CoreError::GenerationConflict {
assumed: aggregate_generation,
actual: aggregate.generation(),
});
}
let mut current_sequence = Sequence::from(aggregate.generation());
let offset = current_sequence;
let new_events = aggregate
.handle_command(data, context)
.map_err(CoreError::HandleCommandFailed)?;
let domain_events = wrap_events(&mut current_sequence, new_events);
self.event_store
.append_batch(domain_events)
.map_err(CoreError::AppendEventsFailed)?;
self.event_store
.read_from_offset(&aggregate_id, offset, &mut aggregate)
.map_err(CoreError::ReadEventsFailed)?;
Ok(aggregate)
}
}
#[cfg(test)]
mod tests;