postgres_es2/aggregates/
postgres_snapshot_store.rs

1use std::{
2    collections::HashMap,
3    marker::PhantomData,
4};
5
6use postgres::Client;
7use serde_json::Value;
8
9use cqrs_es2::{
10    Aggregate,
11    AggregateError,
12    EventEnvelope,
13    EventStore,
14};
15
16use super::postgres_snapshot_store_aggregate_context::PostgresSnapshotStoreAggregateContext;
17
18static INSERT_EVENT: &str = "INSERT INTO events (aggregate_type, \
19                             aggregate_id, sequence, payload, \
20                             metadata)
21    VALUES ($1, $2, $3, $4, $5)";
22static SELECT_EVENTS: &str = "SELECT aggregate_type, aggregate_id, \
23                              sequence, payload, metadata
24     FROM events
25     WHERE aggregate_type = $1 AND aggregate_id = $2 ORDER BY \
26                              sequence";
27
28static INSERT_SNAPSHOT: &str = "INSERT INTO snapshots \
29                                (aggregate_type, aggregate_id, \
30                                last_sequence, payload)
31    VALUES ($1, $2, $3, $4)";
32static UPDATE_SNAPSHOT: &str = "UPDATE snapshots
33    SET last_sequence= $3 , payload= $4
34    WHERE aggregate_type= $1 AND aggregate_id= $2";
35static SELECT_SNAPSHOT: &str = "SELECT aggregate_type, \
36                                aggregate_id, last_sequence, payload
37     FROM snapshots
38     WHERE aggregate_type = $1 AND aggregate_id = $2";
39
40/// Storage engine using an Postgres backing and relying on a
41/// serialization of the aggregate rather than individual events. This
42/// is similar to the "snapshot strategy" seen in many CQRS
43/// frameworks.
44pub struct PostgresSnapshotStore<A: Aggregate> {
45    conn: Client,
46    _phantom: PhantomData<A>,
47}
48
49impl<A: Aggregate> PostgresSnapshotStore<A> {
50    /// Creates a new `PostgresSnapshotStore` from the provided
51    /// database connection.
52    pub fn new(conn: Client) -> Self {
53        PostgresSnapshotStore {
54            conn,
55            _phantom: PhantomData,
56        }
57    }
58}
59
60impl<A: Aggregate>
61    EventStore<A, PostgresSnapshotStoreAggregateContext<A>>
62    for PostgresSnapshotStore<A>
63{
64    fn load(
65        &mut self,
66        aggregate_id: &str,
67    ) -> Vec<EventEnvelope<A>> {
68        let agg_type = A::aggregate_type();
69        let id = aggregate_id.to_string();
70        let mut result = Vec::new();
71        match self
72            .conn
73            .query(SELECT_EVENTS, &[&agg_type, &id])
74        {
75            Ok(rows) => {
76                for row in rows.iter() {
77                    let aggregate_type: String =
78                        row.get("aggregate_type");
79                    let aggregate_id: String =
80                        row.get("aggregate_id");
81                    let s: i64 = row.get("sequence");
82                    let sequence = s as usize;
83                    let payload: A::Event =
84                        match serde_json::from_value(
85                            row.get("payload"),
86                        ) {
87                            Ok(payload) => payload,
88                            Err(err) => {
89                                panic!(
90                                    "bad payload found in events \
91                                     table for aggregate id {} with \
92                                     error: {}",
93                                    &id, err
94                                );
95                            },
96                        };
97                    let event = EventEnvelope::new(
98                        aggregate_id,
99                        sequence,
100                        aggregate_type,
101                        payload,
102                    );
103                    result.push(event);
104                }
105            },
106            Err(e) => {
107                panic!("{:?}", e);
108            },
109        }
110        result
111    }
112    fn load_aggregate(
113        &mut self,
114        aggregate_id: &str,
115    ) -> PostgresSnapshotStoreAggregateContext<A> {
116        let agg_type = A::aggregate_type();
117        match self.conn.query(
118            SELECT_SNAPSHOT,
119            &[&agg_type, &aggregate_id.to_string()],
120        ) {
121            Ok(rows) => {
122                match rows.iter().next() {
123                    None => {
124                        let current_sequence = 0;
125                        PostgresSnapshotStoreAggregateContext::new(
126                            aggregate_id.to_string(),
127                            A::default(),
128                            current_sequence,
129                        )
130                    },
131                    Some(row) => {
132                        let s: i64 = row.get("last_sequence");
133                        let val: Value = row.get("payload");
134                        let aggregate =
135                            serde_json::from_value(val).unwrap();
136                        PostgresSnapshotStoreAggregateContext::new(
137                            aggregate_id.to_string(),
138                            aggregate,
139                            s as usize,
140                        )
141                    },
142                }
143            },
144            Err(e) => {
145                panic!("{:?}", e);
146            },
147        }
148    }
149
150    fn commit(
151        &mut self,
152        events: Vec<A::Event>,
153        context: PostgresSnapshotStoreAggregateContext<A>,
154        metadata: HashMap<String, String>,
155    ) -> Result<Vec<EventEnvelope<A>>, AggregateError> {
156        let mut updated_aggregate = context.aggregate_copy();
157        let aggregate_id = context.aggregate_id.as_str();
158        let current_sequence = context.current_sequence;
159        let wrapped_events = self.wrap_events(
160            aggregate_id,
161            current_sequence,
162            events,
163            metadata,
164        );
165        let mut trans = match self.conn.transaction() {
166            Ok(t) => t,
167            Err(err) => {
168                return Err(AggregateError::TechnicalError(
169                    err.to_string(),
170                ));
171            },
172        };
173        let mut last_sequence = current_sequence as i64;
174        for event in wrapped_events.clone() {
175            let agg_type = event.aggregate_type.clone();
176            let id = context.aggregate_id.clone();
177            let sequence = event.sequence as i64;
178            last_sequence = sequence;
179            let payload = match serde_json::to_value(&event.payload) {
180                Ok(payload) => payload,
181                Err(err) => {
182                    panic!(
183                        "bad payload found in events table for \
184                         aggregate id {} with error: {}",
185                        &id, err
186                    );
187                },
188            };
189            let metadata = match serde_json::to_value(&event.metadata)
190            {
191                Ok(metadata) => metadata,
192                Err(err) => {
193                    panic!(
194                        "bad metadata found in events table for \
195                         aggregate id {} with error: {}",
196                        &id, err
197                    );
198                },
199            };
200            match trans.execute(
201                INSERT_EVENT,
202                &[
203                    &agg_type, &id, &sequence, &payload, &metadata,
204                ],
205            ) {
206                Ok(_) => {},
207                Err(err) => {
208                    match err.code() {
209                        None => {},
210                        Some(state) => {
211                            if state.code() == "23505" {
212                                return Err(
213                                    AggregateError::TechnicalError(
214                                        "optimistic lock error"
215                                            .to_string(),
216                                    ),
217                                );
218                            }
219                        },
220                    }
221                    panic!(
222                        "unable to insert event table for aggregate \
223                         id {} with error: {}\n  and payload: {}",
224                        &id, err, &payload
225                    );
226                },
227            };
228            updated_aggregate.apply(&event.payload);
229        }
230
231        let agg_type = A::aggregate_type();
232        let aggregate_payload =
233            match serde_json::to_value(updated_aggregate) {
234                Ok(val) => val,
235                Err(err) => {
236                    panic!(
237                        "bad metadata found in events table for \
238                         aggregate id {} with error: {}",
239                        &aggregate_id, err
240                    );
241                },
242            };
243        if context.current_sequence == 0 {
244            match trans.execute(
245                INSERT_SNAPSHOT,
246                &[
247                    &agg_type,
248                    &aggregate_id,
249                    &last_sequence,
250                    &aggregate_payload,
251                ],
252            ) {
253                Ok(_) => {},
254                Err(err) => {
255                    panic!(
256                        "unable to insert snapshot for aggregate id \
257                         {} with error: {}\n  and payload: {}",
258                        &aggregate_id, err, &aggregate_payload
259                    );
260                },
261            };
262        }
263        else {
264            match trans.execute(
265                UPDATE_SNAPSHOT,
266                &[
267                    &agg_type,
268                    &aggregate_id,
269                    &last_sequence,
270                    &aggregate_payload,
271                ],
272            ) {
273                Ok(_) => {},
274                Err(err) => {
275                    panic!(
276                        "unable to update snapshot for aggregate id \
277                         {} with error: {}\n  and payload: {}",
278                        &aggregate_id, err, &aggregate_payload
279                    );
280                },
281            };
282        }
283
284        match trans.commit() {
285            Ok(_) => Ok(wrapped_events),
286            Err(err) => {
287                Err(AggregateError::TechnicalError(
288                    err.to_string(),
289                ))
290            },
291        }
292    }
293}