1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use std::marker::PhantomData;
use cqrs_es::{Aggregate, AggregateError, DomainEvent, EventStore, MessageEnvelope};
use postgres::Connection;
pub struct PostgresStore<A, E>
where
A: Aggregate,
E: DomainEvent<A>
{
conn: Connection,
_phantom: PhantomData<(A, E)>,
}
impl<A, E> PostgresStore<A, E>
where
A: Aggregate,
E: DomainEvent<A>
{
pub fn new(conn: Connection) -> Self {
PostgresStore {
conn,
_phantom: PhantomData,
}
}
}
static INSERT_EVENT: &str = "INSERT INTO events (aggregate_type, aggregate_id, sequence, payload, metadata)
VALUES ($1, $2, $3, $4, $5)";
static SELECT_EVENTS: &str = "SELECT aggregate_type, aggregate_id, sequence, payload, metadata
FROM events
WHERE aggregate_type = $1 AND aggregate_id = $2 ORDER BY sequence";
impl<A, E> EventStore<A, E> for PostgresStore<A, E>
where
A: Aggregate,
E: DomainEvent<A>
{
fn load(&self, aggregate_id: &str) -> Vec<MessageEnvelope<A, E>> {
let agg_type = A::aggregate_type();
let id = aggregate_id.to_string();
let mut result = Vec::new();
match self.conn.query(SELECT_EVENTS, &[&agg_type, &id]) {
Ok(rows) => {
for row in rows.iter() {
let aggregate_type: String = row.get("aggregate_type");
let aggregate_id: String = row.get("aggregate_id");
let s: i64 = row.get("sequence");
let sequence = s as usize;
let payload: E = match serde_json::from_value(row.get("payload")) {
Ok(payload) => payload,
Err(err) => {
panic!("bad payload found in events table for aggregate id {} with error: {}", &id, err);
}
};
let event = MessageEnvelope::new(aggregate_id, sequence, aggregate_type, payload);
result.push(event);
}
}
Err(e) => { println!("{:?}", e); }
}
result
}
fn commit(&self, events: Vec<MessageEnvelope<A, E>>) -> Result<(), AggregateError> {
let trans = match self.conn.transaction() {
Ok(t) => { t }
Err(err) => {
return Err(AggregateError::TechnicalError(err.to_string()));
}
};
for event in events {
let agg_type = event.aggregate_type.clone();
let id = event.aggregate_id.clone();
let sequence = event.sequence as i64;
let payload = match serde_json::to_value(&event.payload) {
Ok(payload) => payload,
Err(err) => {
panic!("bad payload found in events table for aggregate id {} with error: {}", &id, err);
}
};
let metadata = match serde_json::to_value(&event.metadata) {
Ok(metadata) => metadata,
Err(err) => {
panic!("bad metadata found in events table for aggregate id {} with error: {}", &id, err);
}
};
match self.conn.execute(INSERT_EVENT, &[&agg_type, &id, &sequence, &payload, &metadata]) {
Ok(_) => {}
Err(err) => {
match err.code() {
None => {}
Some(state) => {
if state.code() == "23505" {
return Err(AggregateError::TechnicalError("optimistic lock error".to_string()));
}
}
}
panic!("unable to insert event table for aggregate id {} with error: {}\n and payload: {}", &id, err, &payload);
}
};
}
match trans.commit() {
Ok(_) => Ok(()),
Err(err) => Err(AggregateError::TechnicalError(err.to_string())),
}
}
}