cqrs_es2_sql/stores/
event_store.rs

1use std::{
2    collections::HashMap,
3    marker::PhantomData,
4};
5
6use postgres::Client;
7
8use cqrs_es2::{
9    AggregateContext,
10    AggregateError,
11    EventEnvelope,
12    IAggregate,
13    IEventStore,
14};
15
16use crate::sql::{
17    INSERT_EVENT,
18    SELECT_EVENTS,
19};
20
21/// Storage engine using an Postgres backing. This is the only
22/// persistent store currently provided.
23pub struct EventStore<A: IAggregate> {
24    conn: Client,
25    _phantom: PhantomData<A>,
26}
27
28impl<A: IAggregate> EventStore<A> {
29    /// Creates a new `EventStore` from the provided database
30    /// connection.
31    pub fn new(conn: Client) -> Self {
32        EventStore {
33            conn,
34            _phantom: PhantomData,
35        }
36    }
37}
38
39impl<A: IAggregate> IEventStore<A> for EventStore<A> {
40    fn load(
41        &mut self,
42        aggregate_id: &str,
43    ) -> Vec<EventEnvelope<A>> {
44        let agg_type = A::aggregate_type();
45        let id = aggregate_id.to_string();
46
47        let mut result = Vec::new();
48
49        match self
50            .conn
51            .query(SELECT_EVENTS, &[&agg_type, &id])
52        {
53            Ok(rows) => {
54                for row in rows.iter() {
55                    let aggregate_id: String =
56                        row.get("aggregate_id");
57                    let s: i64 = row.get("sequence");
58                    let sequence = s as usize;
59
60                    let payload: A::Event =
61                        match serde_json::from_value(
62                            row.get("payload"),
63                        ) {
64                            Ok(payload) => payload,
65                            Err(err) => {
66                                panic!(
67                                    "bad payload found in events \
68                                     table for aggregate id {} with \
69                                     error: {}",
70                                    &id, err
71                                );
72                            },
73                        };
74
75                    let event = EventEnvelope::new(
76                        aggregate_id,
77                        sequence,
78                        payload,
79                    );
80
81                    result.push(event);
82                }
83            },
84            Err(e) => {
85                println!("{:?}", e);
86            },
87        }
88
89        result
90    }
91
92    fn load_aggregate(
93        &mut self,
94        aggregate_id: &str,
95    ) -> AggregateContext<A> {
96        let committed_events = self.load(aggregate_id);
97
98        let mut aggregate = A::default();
99        let mut current_sequence = 0;
100
101        for envelope in committed_events {
102            current_sequence = envelope.sequence;
103            let event = envelope.payload;
104            aggregate.apply(&event);
105        }
106
107        AggregateContext {
108            aggregate_id: aggregate_id.to_string(),
109            aggregate,
110            current_sequence,
111        }
112    }
113
114    fn commit(
115        &mut self,
116        events: Vec<A::Event>,
117        context: AggregateContext<A>,
118        metadata: HashMap<String, String>,
119    ) -> Result<Vec<EventEnvelope<A>>, AggregateError> {
120        let agg_type = A::aggregate_type().to_string();
121        let aggregate_id = context.aggregate_id.as_str();
122        let current_sequence = context.current_sequence;
123
124        let wrapped_events = self.wrap_events(
125            aggregate_id,
126            current_sequence,
127            events,
128            metadata,
129        );
130
131        let mut trans = match self.conn.transaction() {
132            Ok(t) => t,
133            Err(err) => {
134                return Err(AggregateError::TechnicalError(
135                    err.to_string(),
136                ));
137            },
138        };
139
140        for event in &wrapped_events {
141            let id = context.aggregate_id.clone();
142            let sequence = event.sequence as i64;
143
144            let payload = match serde_json::to_value(&event.payload) {
145                Ok(payload) => payload,
146                Err(err) => {
147                    panic!(
148                        "bad payload found in events table for \
149                         aggregate id {} with error: {}",
150                        &id, err
151                    );
152                },
153            };
154
155            let metadata = match serde_json::to_value(&event.metadata)
156            {
157                Ok(metadata) => metadata,
158                Err(err) => {
159                    panic!(
160                        "bad metadata found in events table for \
161                         aggregate id {} with error: {}",
162                        &id, err
163                    );
164                },
165            };
166
167            match trans.execute(
168                INSERT_EVENT,
169                &[
170                    &agg_type, &id, &sequence, &payload, &metadata,
171                ],
172            ) {
173                Ok(_) => {},
174                Err(err) => {
175                    match err.code() {
176                        None => {},
177                        Some(state) => {
178                            if state.code() == "23505" {
179                                return Err(
180                                    AggregateError::TechnicalError(
181                                        "optimistic lock error"
182                                            .to_string(),
183                                    ),
184                                );
185                            }
186                        },
187                    }
188                    panic!(
189                        "unable to insert event table for aggregate \
190                         id {} with error: {}\n  and payload: {}",
191                        &id, err, &payload
192                    );
193                },
194            };
195        }
196
197        match trans.commit() {
198            Ok(_) => Ok(wrapped_events),
199            Err(err) => {
200                Err(AggregateError::TechnicalError(
201                    err.to_string(),
202                ))
203            },
204        }
205    }
206}