cqrs_es2_sql/stores/
snapshot_event_store.rs

1use std::{
2    collections::HashMap,
3    marker::PhantomData,
4};
5
6use postgres::Client;
7use serde_json::Value;
8
9use cqrs_es2::{
10    AggregateContext,
11    AggregateError,
12    EventEnvelope,
13    IAggregate,
14    IEventStore,
15};
16
17use crate::sql::{
18    INSERT_EVENT,
19    INSERT_SNAPSHOT,
20    SELECT_EVENTS,
21    SELECT_SNAPSHOT,
22    UPDATE_SNAPSHOT,
23};
24
25/// Storage engine using an Postgres backing and relying on a
26/// serialization of the aggregate rather than individual events. This
27/// is similar to the "snapshot strategy" seen in many CQRS
28/// frameworks.
29pub struct SnapshotEventStore<A: IAggregate> {
30    conn: Client,
31    _phantom: PhantomData<A>,
32}
33
34impl<A: IAggregate> SnapshotEventStore<A> {
35    /// Creates a new `SnapshotEventStore` from the provided
36    /// database connection.
37    pub fn new(conn: Client) -> Self {
38        SnapshotEventStore {
39            conn,
40            _phantom: PhantomData,
41        }
42    }
43}
44
45impl<A: IAggregate> IEventStore<A> for SnapshotEventStore<A> {
46    fn load(
47        &mut self,
48        aggregate_id: &str,
49    ) -> Vec<EventEnvelope<A>> {
50        let agg_type = A::aggregate_type();
51        let id = aggregate_id.to_string();
52
53        let mut result = Vec::new();
54
55        match self
56            .conn
57            .query(SELECT_EVENTS, &[&agg_type, &id])
58        {
59            Ok(rows) => {
60                for row in rows.iter() {
61                    let aggregate_id: String =
62                        row.get("aggregate_id");
63                    let s: i64 = row.get("sequence");
64                    let sequence = s as usize;
65
66                    let payload: A::Event =
67                        match serde_json::from_value(
68                            row.get("payload"),
69                        ) {
70                            Ok(payload) => payload,
71                            Err(err) => {
72                                panic!(
73                                    "bad payload found in events \
74                                     table for aggregate id {} with \
75                                     error: {}",
76                                    &id, err
77                                );
78                            },
79                        };
80
81                    let event = EventEnvelope::new(
82                        aggregate_id,
83                        sequence,
84                        payload,
85                    );
86
87                    result.push(event);
88                }
89            },
90            Err(e) => {
91                panic!("{:?}", e);
92            },
93        }
94
95        result
96    }
97
98    fn load_aggregate(
99        &mut self,
100        aggregate_id: &str,
101    ) -> AggregateContext<A> {
102        let agg_type = A::aggregate_type();
103
104        match self.conn.query(
105            SELECT_SNAPSHOT,
106            &[&agg_type, &aggregate_id.to_string()],
107        ) {
108            Ok(rows) => {
109                match rows.iter().next() {
110                    None => {
111                        AggregateContext {
112                            aggregate_id: aggregate_id.to_string(),
113                            aggregate: A::default(),
114                            current_sequence: 0,
115                        }
116                    },
117                    Some(row) => {
118                        let s: i64 = row.get("last_sequence");
119                        let val: Value = row.get("payload");
120                        let aggregate =
121                            serde_json::from_value(val).unwrap();
122                        AggregateContext {
123                            aggregate_id: aggregate_id.to_string(),
124                            aggregate: aggregate,
125                            current_sequence: s as usize,
126                        }
127                    },
128                }
129            },
130            Err(e) => {
131                panic!("{:?}", e);
132            },
133        }
134    }
135
136    fn commit(
137        &mut self,
138        events: Vec<A::Event>,
139        context: AggregateContext<A>,
140        metadata: HashMap<String, String>,
141    ) -> Result<Vec<EventEnvelope<A>>, AggregateError> {
142        let mut updated_aggregate = context.aggregate.clone();
143
144        let agg_type = A::aggregate_type().to_string();
145        let aggregate_id = context.aggregate_id.as_str();
146        let current_sequence = context.current_sequence;
147
148        let wrapped_events = self.wrap_events(
149            aggregate_id,
150            current_sequence,
151            events,
152            metadata,
153        );
154
155        let mut trans = match self.conn.transaction() {
156            Ok(t) => t,
157            Err(err) => {
158                return Err(AggregateError::TechnicalError(
159                    err.to_string(),
160                ));
161            },
162        };
163
164        let mut last_sequence = current_sequence as i64;
165
166        for event in wrapped_events.clone() {
167            let id = context.aggregate_id.clone();
168            let sequence = event.sequence as i64;
169            last_sequence = sequence;
170
171            let payload = match serde_json::to_value(&event.payload) {
172                Ok(payload) => payload,
173                Err(err) => {
174                    panic!(
175                        "bad payload found in events table for \
176                         aggregate id {} with error: {}",
177                        &id, err
178                    );
179                },
180            };
181
182            let metadata = match serde_json::to_value(&event.metadata)
183            {
184                Ok(metadata) => metadata,
185                Err(err) => {
186                    panic!(
187                        "bad metadata found in events table for \
188                         aggregate id {} with error: {}",
189                        &id, err
190                    );
191                },
192            };
193
194            match trans.execute(
195                INSERT_EVENT,
196                &[
197                    &agg_type, &id, &sequence, &payload, &metadata,
198                ],
199            ) {
200                Ok(_) => {},
201                Err(err) => {
202                    match err.code() {
203                        None => {},
204                        Some(state) => {
205                            if state.code() == "23505" {
206                                return Err(
207                                    AggregateError::TechnicalError(
208                                        "optimistic lock error"
209                                            .to_string(),
210                                    ),
211                                );
212                            }
213                        },
214                    }
215                    panic!(
216                        "unable to insert event table for aggregate \
217                         id {} with error: {}\n  and payload: {}",
218                        &id, err, &payload
219                    );
220                },
221            };
222
223            updated_aggregate.apply(&event.payload);
224        }
225
226        let aggregate_payload =
227            match serde_json::to_value(updated_aggregate) {
228                Ok(val) => val,
229                Err(err) => {
230                    panic!(
231                        "bad metadata found in events table for \
232                         aggregate id {} with error: {}",
233                        &aggregate_id, err
234                    );
235                },
236            };
237
238        if context.current_sequence == 0 {
239            match trans.execute(
240                INSERT_SNAPSHOT,
241                &[
242                    &agg_type,
243                    &aggregate_id,
244                    &last_sequence,
245                    &aggregate_payload,
246                ],
247            ) {
248                Ok(_) => {},
249                Err(err) => {
250                    panic!(
251                        "unable to insert snapshot for aggregate id \
252                         {} with error: {}\n  and payload: {}",
253                        &aggregate_id, err, &aggregate_payload
254                    );
255                },
256            };
257        }
258        else {
259            match trans.execute(
260                UPDATE_SNAPSHOT,
261                &[
262                    &agg_type,
263                    &aggregate_id,
264                    &last_sequence,
265                    &aggregate_payload,
266                ],
267            ) {
268                Ok(_) => {},
269                Err(err) => {
270                    panic!(
271                        "unable to update snapshot for aggregate id \
272                         {} with error: {}\n  and payload: {}",
273                        &aggregate_id, err, &aggregate_payload
274                    );
275                },
276            };
277        }
278
279        match trans.commit() {
280            Ok(_) => Ok(wrapped_events),
281            Err(err) => {
282                Err(AggregateError::TechnicalError(
283                    err.to_string(),
284                ))
285            },
286        }
287    }
288}