1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
//! A Decision serves as a building block for developing the business logic of an application.
use serde::de::DeserializeOwned;
use serde::Serialize;
use crate::event::EventId;
use crate::state_store::LoadedState;
use crate::stream_query::StreamQuery;
use crate::{event::Event, PersistedEvent};
use crate::{BoxDynError, IntoState, IntoStatePart, LoadState, MultiState};
/// Represents a business decision taken from a state built upon the occurred events.
pub trait Decision: Send + Sync {
type Event: Event + Clone + Send + Sync;
type StateQuery: Clone + Send + Sync;
type Error: Send + Sync;
/// Returns the state query to compute the decision state from the events in the event store.
///
/// If there are no events that match the specified query, the default values of the state query is utilized to make the decision.
fn state_query(&self) -> Self::StateQuery;
/// Returns the stream query used to validate the decision.
///
/// If the validation query is `None`, the state query will be used for validation.
/// This means that the decision will only be confirmed if new events that would make the state query outdated are not found.
/// However, if a `validation_query` is provided, it will be used to confirm the decision.
/// This allows narrowing down the set of events that could invalidate the decision.
///
/// For example, in a banking system, deposit events should not invalidate withdrawals if the account balance is already sufficient.
/// In this case, the state query needs to include both withdraw and deposit events to compute the available balance,
/// but only withdraw events should invalidate the decision.
/// In other words, once we have confirmed that the account has a sufficient amount,
/// only a withdraw event can reduce the balance below the requested amount and invalidate the decision.
fn validation_query<ID: EventId>(&self) -> Option<StreamQuery<ID, Self::Event>> {
None
}
/// Evaluates the decision based on the mutated state, ensuring that all business rules
/// are verified against the current state. This method generates a series of events
/// that capture the changes made by the decision, allowing the results to be
/// persisted in the event store.
///
/// # Parameters
///
/// - `state`: A reference to the current state of the system, obtained through
/// the implementation of the `StateQuery` trait.
///
/// # Returns
///
/// A `Result` indicating the success of the process. If successful, it contains
/// a vector of events representing the changes made. In case of an error, it
/// contains details about the encountered issue.
fn process(&self, state: &Self::StateQuery) -> Result<Vec<Self::Event>, Self::Error>;
}
#[derive(thiserror::Error, Debug)]
pub enum Error<DE> {
#[error("event store error: {0}")]
EventStore(#[source] BoxDynError),
#[error("state store error: {0}")]
StateStore(#[source] BoxDynError),
#[error("domain error: {0}")]
Domain(#[source] DE),
}
/// The `DecisionMaker` struct is responsible for executing and persisting business decisions.
#[derive(Clone)]
pub struct DecisionMaker<SS> {
state_store: SS,
}
impl<SS> DecisionMaker<SS> {
/// Creates a new instance of `DecisionMaker`.
///
/// # Parameters
///
/// - `state_store`: The state store backend used by the `DecisionMaker` to load the current state
/// and persist the decision.
pub fn new(state_store: SS) -> Self {
Self { state_store }
}
/// Makes the given business decision, persisting the resulting events in the event store.
///
/// # Parameters
///
/// - `decision`: The business decision to be executed, implementing the `Decision` trait.
///
/// # Returns
///
/// A `Result` indicating the success of the decision-making process. If successful,
/// it contains a vector of `PersistedEvent` representing the changes made. In case of
/// an error, it contains details about the encountered issue.
pub async fn make<D, S, ID, E>(
&self,
decision: D,
) -> Result<Vec<PersistedEvent<ID, E>>, Error<D::Error>>
where
ID: EventId,
E: Event + Clone + Sync + Send + 'static,
SS: LoadState<ID, S, E> + PersistDecision<ID, S, E>,
D: Decision<StateQuery = S, Event = E>,
S: Send + Sync + Serialize + DeserializeOwned + IntoStatePart<ID, S>,
<S as IntoStatePart<ID, S>>::Target:
Send + Sync + Serialize + DeserializeOwned + IntoState<S> + MultiState<ID, E>,
<D as Decision>::Error: 'static,
{
let loaded_state = self
.state_store
.load(decision.state_query())
.await
.map_err(Error::StateStore)?;
let changes = decision
.process(&loaded_state.state)
.map_err(Error::Domain)?;
let events = self
.state_store
.persist(
loaded_state,
changes.into_iter().collect(),
decision.validation_query(),
)
.await
.map_err(Error::StateStore)?;
Ok(events)
}
}
/// Persists decision changes to the event store.
#[async_trait::async_trait]
pub trait PersistDecision<ID: EventId, S, E: Event + Clone> {
/// Persists the decision changes to the event store.
///
/// # Parameters
///
/// - `loaded_state`: The current state loaded from the event store, used to check if the events to be persisted have been produced from a non-stale state.
/// - `events`: A vector of events representing the changes to be stored.
/// - `validation_query`: An optional stream query used to validate the state before persisting changes.
///
/// # Returns
///
/// A `Result` containing a vector of `PersistedEvent` if the operation is successful, or an error if the persist operation fails.
async fn persist(
&self,
loaded_state: LoadedState<ID, S>,
events: Vec<E>,
validation_query: Option<StreamQuery<ID, E>>,
) -> Result<Vec<PersistedEvent<ID, E>>, BoxDynError>;
}
#[cfg(test)]
mod test {
use mockall::predicate::eq;
use super::*;
use crate::{utils::tests::*, EventSourcedStateStore, NoSnapshot, StateQuery};
#[tokio::test]
async fn it_processes_a_decision() {
let mut database = MockDatabase::new();
database.expect_stream().once().return_once(|_| {
event_stream([item_added_event("p1", "c1"), item_removed_event("p1", "c1")])
});
let state_query = cart("c1", []).query().change_origin(0);
database
.expect_append()
.with(
eq(vec![item_added_event("p2", "c1")]),
eq(state_query),
eq(2),
)
.once()
.return_once(|_, _, _| vec![PersistedEvent::new(3, item_added_event("p2", "c1"))]);
let mut mock_add_item = MockDecision::new();
mock_add_item
.expect_state_query()
.once()
.return_once(|| cart("c1", []));
mock_add_item
.expect_validation_query()
.once()
.return_once(|| Option::<StreamQuery<i64, ShoppingCartEvent>>::None);
mock_add_item
.expect_process()
.once()
.return_once(|_| Ok(vec![item_added_event("p2", "c1")]));
let event_store = MockEventStore::new(database);
let state_store = EventSourcedStateStore::new(event_store, NoSnapshot);
let decision_maker = DecisionMaker::new(state_store);
decision_maker.make(mock_add_item).await.unwrap();
}
}