postgres_es2/stores/
postgres_store.rs

1use std::{
2    collections::HashMap,
3    marker::PhantomData,
4};
5
6use postgres::Client;
7
8use cqrs_es2::{
9    Aggregate,
10    AggregateError,
11    EventEnvelope,
12    EventStore,
13};
14
15use super::postgres_store_aggregate_context::PostgresStoreAggregateContext;
16
17static INSERT_EVENT: &str = "INSERT INTO events (aggregate_type, \
18                             aggregate_id, sequence, payload, \
19                             metadata)
20    VALUES ($1, $2, $3, $4, $5)";
21static SELECT_EVENTS: &str = "SELECT aggregate_type, aggregate_id, \
22                              sequence, payload, metadata
23     FROM events
24     WHERE aggregate_type = $1 AND aggregate_id = $2 ORDER BY \
25                              sequence";
26
27/// Storage engine using an Postgres backing. This is the only
28/// persistent store currently provided.
29pub struct PostgresStore<A: Aggregate> {
30    conn: Client,
31    _phantom: PhantomData<A>,
32}
33
34impl<A: Aggregate> PostgresStore<A> {
35    /// Creates a new `PostgresStore` from the provided database
36    /// connection.
37    pub fn new(conn: Client) -> Self {
38        PostgresStore {
39            conn,
40            _phantom: PhantomData,
41        }
42    }
43}
44
45impl<A: Aggregate> EventStore<A, PostgresStoreAggregateContext<A>>
46    for PostgresStore<A>
47{
48    fn load(
49        &mut self,
50        aggregate_id: &str,
51    ) -> Vec<EventEnvelope<A>> {
52        let agg_type = A::aggregate_type();
53        let id = aggregate_id.to_string();
54        let mut result = Vec::new();
55        match self
56            .conn
57            .query(SELECT_EVENTS, &[&agg_type, &id])
58        {
59            Ok(rows) => {
60                for row in rows.iter() {
61                    let aggregate_type: String =
62                        row.get("aggregate_type");
63                    let aggregate_id: String =
64                        row.get("aggregate_id");
65                    let s: i64 = row.get("sequence");
66                    let sequence = s as usize;
67                    let payload: A::Event =
68                        match serde_json::from_value(
69                            row.get("payload"),
70                        ) {
71                            Ok(payload) => payload,
72                            Err(err) => {
73                                panic!(
74                                    "bad payload found in events \
75                                     table for aggregate id {} with \
76                                     error: {}",
77                                    &id, err
78                                );
79                            },
80                        };
81                    let event = EventEnvelope::new(
82                        aggregate_id,
83                        sequence,
84                        aggregate_type,
85                        payload,
86                    );
87                    result.push(event);
88                }
89            },
90            Err(e) => {
91                println!("{:?}", e);
92            },
93        }
94        result
95    }
96    fn load_aggregate(
97        &mut self,
98        aggregate_id: &str,
99    ) -> PostgresStoreAggregateContext<A> {
100        let committed_events = self.load(aggregate_id);
101        let mut aggregate = A::default();
102        let mut current_sequence = 0;
103        for envelope in committed_events {
104            current_sequence = envelope.sequence;
105            let event = envelope.payload;
106            aggregate.apply(&event);
107        }
108        PostgresStoreAggregateContext {
109            aggregate_id: aggregate_id.to_string(),
110            aggregate,
111            current_sequence,
112        }
113    }
114
115    fn commit(
116        &mut self,
117        events: Vec<A::Event>,
118        context: PostgresStoreAggregateContext<A>,
119        metadata: HashMap<String, String>,
120    ) -> Result<Vec<EventEnvelope<A>>, AggregateError> {
121        let aggregate_id = context.aggregate_id.as_str();
122        let current_sequence = context.current_sequence;
123        let wrapped_events = self.wrap_events(
124            aggregate_id,
125            current_sequence,
126            events,
127            metadata,
128        );
129        let mut trans = match self.conn.transaction() {
130            Ok(t) => t,
131            Err(err) => {
132                return Err(AggregateError::TechnicalError(
133                    err.to_string(),
134                ));
135            },
136        };
137        for event in &wrapped_events {
138            let agg_type = event.aggregate_type.clone();
139            let id = context.aggregate_id.clone();
140            let sequence = event.sequence as i64;
141            let payload = match serde_json::to_value(&event.payload) {
142                Ok(payload) => payload,
143                Err(err) => {
144                    panic!(
145                        "bad payload found in events table for \
146                         aggregate id {} with error: {}",
147                        &id, err
148                    );
149                },
150            };
151            let metadata = match serde_json::to_value(&event.metadata)
152            {
153                Ok(metadata) => metadata,
154                Err(err) => {
155                    panic!(
156                        "bad metadata found in events table for \
157                         aggregate id {} with error: {}",
158                        &id, err
159                    );
160                },
161            };
162            match trans.execute(
163                INSERT_EVENT,
164                &[
165                    &agg_type, &id, &sequence, &payload, &metadata,
166                ],
167            ) {
168                Ok(_) => {},
169                Err(err) => {
170                    match err.code() {
171                        None => {},
172                        Some(state) => {
173                            if state.code() == "23505" {
174                                return Err(
175                                    AggregateError::TechnicalError(
176                                        "optimistic lock error"
177                                            .to_string(),
178                                    ),
179                                );
180                            }
181                        },
182                    }
183                    panic!(
184                        "unable to insert event table for aggregate \
185                         id {} with error: {}\n  and payload: {}",
186                        &id, err, &payload
187                    );
188                },
189            };
190        }
191        match trans.commit() {
192            Ok(_) => Ok(wrapped_events),
193            Err(err) => {
194                Err(AggregateError::TechnicalError(
195                    err.to_string(),
196                ))
197            },
198        }
199    }
200}