1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use async_trait::async_trait;
use std::collections::HashMap;
use std::marker::PhantomData;
use crate::event_repository::EventRepository;
use cqrs_es::{Aggregate, AggregateContext, AggregateError, EventEnvelope, EventStore};
use sqlx::{Pool, Postgres};
pub struct PostgresStore<A: Aggregate + Send + Sync> {
repo: EventRepository<A>,
_phantom: PhantomData<A>,
}
impl<A: Aggregate> PostgresStore<A> {
pub fn new(pool: Pool<Postgres>) -> Self {
let repo = EventRepository::new(pool);
PostgresStore {
repo,
_phantom: PhantomData,
}
}
}
#[async_trait]
impl<A: Aggregate> EventStore<A> for PostgresStore<A> {
type AC = PostgresStoreAggregateContext<A>;
async fn load(&self, aggregate_id: &str) -> Vec<EventEnvelope<A>> {
match self.repo.get_events(aggregate_id).await {
Ok(val) => val,
Err(_err) => {
Default::default()
}
}
}
async fn load_aggregate(&self, aggregate_id: &str) -> PostgresStoreAggregateContext<A> {
let committed_events = self.load(aggregate_id).await;
let mut aggregate = A::default();
let mut current_sequence = 0;
for envelope in committed_events {
current_sequence = envelope.sequence;
let event = envelope.payload;
aggregate.apply(event);
}
PostgresStoreAggregateContext {
aggregate_id: aggregate_id.to_string(),
aggregate,
current_sequence,
}
}
async fn commit(
&self,
events: Vec<A::Event>,
context: PostgresStoreAggregateContext<A>,
metadata: HashMap<String, String>,
) -> Result<Vec<EventEnvelope<A>>, AggregateError> {
let aggregate_id = context.aggregate_id.as_str();
let current_sequence = context.current_sequence;
let wrapped_events = self.wrap_events(aggregate_id, current_sequence, events, metadata);
self.repo.insert_events(wrapped_events.clone()).await?;
Ok(wrapped_events)
}
}
pub struct PostgresStoreAggregateContext<A: Aggregate> {
aggregate_id: String,
aggregate: A,
current_sequence: usize,
}
impl<A: Aggregate> AggregateContext<A> for PostgresStoreAggregateContext<A> {
fn aggregate(&self) -> &A {
&self.aggregate
}
}