1use std::{
2 collections::HashMap,
3 marker::PhantomData,
4};
5
6use postgres::Client;
7use serde_json::Value;
8
9use cqrs_es2::{
10 Aggregate,
11 AggregateError,
12 EventEnvelope,
13 EventStore,
14};
15
16use super::postgres_snapshot_store_aggregate_context::PostgresSnapshotStoreAggregateContext;
17
18static INSERT_EVENT: &str = "INSERT INTO events (aggregate_type, \
19 aggregate_id, sequence, payload, \
20 metadata)
21 VALUES ($1, $2, $3, $4, $5)";
22static SELECT_EVENTS: &str = "SELECT aggregate_type, aggregate_id, \
23 sequence, payload, metadata
24 FROM events
25 WHERE aggregate_type = $1 AND aggregate_id = $2 ORDER BY \
26 sequence";
27
28static INSERT_SNAPSHOT: &str = "INSERT INTO snapshots \
29 (aggregate_type, aggregate_id, \
30 last_sequence, payload)
31 VALUES ($1, $2, $3, $4)";
32static UPDATE_SNAPSHOT: &str = "UPDATE snapshots
33 SET last_sequence= $3 , payload= $4
34 WHERE aggregate_type= $1 AND aggregate_id= $2";
35static SELECT_SNAPSHOT: &str = "SELECT aggregate_type, \
36 aggregate_id, last_sequence, payload
37 FROM snapshots
38 WHERE aggregate_type = $1 AND aggregate_id = $2";
39
40pub struct PostgresSnapshotStore<A: Aggregate> {
45 conn: Client,
46 _phantom: PhantomData<A>,
47}
48
49impl<A: Aggregate> PostgresSnapshotStore<A> {
50 pub fn new(conn: Client) -> Self {
53 PostgresSnapshotStore {
54 conn,
55 _phantom: PhantomData,
56 }
57 }
58}
59
60impl<A: Aggregate>
61 EventStore<A, PostgresSnapshotStoreAggregateContext<A>>
62 for PostgresSnapshotStore<A>
63{
64 fn load(
65 &mut self,
66 aggregate_id: &str,
67 ) -> Vec<EventEnvelope<A>> {
68 let agg_type = A::aggregate_type();
69 let id = aggregate_id.to_string();
70 let mut result = Vec::new();
71 match self
72 .conn
73 .query(SELECT_EVENTS, &[&agg_type, &id])
74 {
75 Ok(rows) => {
76 for row in rows.iter() {
77 let aggregate_type: String =
78 row.get("aggregate_type");
79 let aggregate_id: String =
80 row.get("aggregate_id");
81 let s: i64 = row.get("sequence");
82 let sequence = s as usize;
83 let payload: A::Event =
84 match serde_json::from_value(
85 row.get("payload"),
86 ) {
87 Ok(payload) => payload,
88 Err(err) => {
89 panic!(
90 "bad payload found in events \
91 table for aggregate id {} with \
92 error: {}",
93 &id, err
94 );
95 },
96 };
97 let event = EventEnvelope::new(
98 aggregate_id,
99 sequence,
100 aggregate_type,
101 payload,
102 );
103 result.push(event);
104 }
105 },
106 Err(e) => {
107 panic!("{:?}", e);
108 },
109 }
110 result
111 }
112 fn load_aggregate(
113 &mut self,
114 aggregate_id: &str,
115 ) -> PostgresSnapshotStoreAggregateContext<A> {
116 let agg_type = A::aggregate_type();
117 match self.conn.query(
118 SELECT_SNAPSHOT,
119 &[&agg_type, &aggregate_id.to_string()],
120 ) {
121 Ok(rows) => {
122 match rows.iter().next() {
123 None => {
124 let current_sequence = 0;
125 PostgresSnapshotStoreAggregateContext::new(
126 aggregate_id.to_string(),
127 A::default(),
128 current_sequence,
129 )
130 },
131 Some(row) => {
132 let s: i64 = row.get("last_sequence");
133 let val: Value = row.get("payload");
134 let aggregate =
135 serde_json::from_value(val).unwrap();
136 PostgresSnapshotStoreAggregateContext::new(
137 aggregate_id.to_string(),
138 aggregate,
139 s as usize,
140 )
141 },
142 }
143 },
144 Err(e) => {
145 panic!("{:?}", e);
146 },
147 }
148 }
149
150 fn commit(
151 &mut self,
152 events: Vec<A::Event>,
153 context: PostgresSnapshotStoreAggregateContext<A>,
154 metadata: HashMap<String, String>,
155 ) -> Result<Vec<EventEnvelope<A>>, AggregateError> {
156 let mut updated_aggregate = context.aggregate_copy();
157 let aggregate_id = context.aggregate_id.as_str();
158 let current_sequence = context.current_sequence;
159 let wrapped_events = self.wrap_events(
160 aggregate_id,
161 current_sequence,
162 events,
163 metadata,
164 );
165 let mut trans = match self.conn.transaction() {
166 Ok(t) => t,
167 Err(err) => {
168 return Err(AggregateError::TechnicalError(
169 err.to_string(),
170 ));
171 },
172 };
173 let mut last_sequence = current_sequence as i64;
174 for event in wrapped_events.clone() {
175 let agg_type = event.aggregate_type.clone();
176 let id = context.aggregate_id.clone();
177 let sequence = event.sequence as i64;
178 last_sequence = sequence;
179 let payload = match serde_json::to_value(&event.payload) {
180 Ok(payload) => payload,
181 Err(err) => {
182 panic!(
183 "bad payload found in events table for \
184 aggregate id {} with error: {}",
185 &id, err
186 );
187 },
188 };
189 let metadata = match serde_json::to_value(&event.metadata)
190 {
191 Ok(metadata) => metadata,
192 Err(err) => {
193 panic!(
194 "bad metadata found in events table for \
195 aggregate id {} with error: {}",
196 &id, err
197 );
198 },
199 };
200 match trans.execute(
201 INSERT_EVENT,
202 &[
203 &agg_type, &id, &sequence, &payload, &metadata,
204 ],
205 ) {
206 Ok(_) => {},
207 Err(err) => {
208 match err.code() {
209 None => {},
210 Some(state) => {
211 if state.code() == "23505" {
212 return Err(
213 AggregateError::TechnicalError(
214 "optimistic lock error"
215 .to_string(),
216 ),
217 );
218 }
219 },
220 }
221 panic!(
222 "unable to insert event table for aggregate \
223 id {} with error: {}\n and payload: {}",
224 &id, err, &payload
225 );
226 },
227 };
228 updated_aggregate.apply(&event.payload);
229 }
230
231 let agg_type = A::aggregate_type();
232 let aggregate_payload =
233 match serde_json::to_value(updated_aggregate) {
234 Ok(val) => val,
235 Err(err) => {
236 panic!(
237 "bad metadata found in events table for \
238 aggregate id {} with error: {}",
239 &aggregate_id, err
240 );
241 },
242 };
243 if context.current_sequence == 0 {
244 match trans.execute(
245 INSERT_SNAPSHOT,
246 &[
247 &agg_type,
248 &aggregate_id,
249 &last_sequence,
250 &aggregate_payload,
251 ],
252 ) {
253 Ok(_) => {},
254 Err(err) => {
255 panic!(
256 "unable to insert snapshot for aggregate id \
257 {} with error: {}\n and payload: {}",
258 &aggregate_id, err, &aggregate_payload
259 );
260 },
261 };
262 }
263 else {
264 match trans.execute(
265 UPDATE_SNAPSHOT,
266 &[
267 &agg_type,
268 &aggregate_id,
269 &last_sequence,
270 &aggregate_payload,
271 ],
272 ) {
273 Ok(_) => {},
274 Err(err) => {
275 panic!(
276 "unable to update snapshot for aggregate id \
277 {} with error: {}\n and payload: {}",
278 &aggregate_id, err, &aggregate_payload
279 );
280 },
281 };
282 }
283
284 match trans.commit() {
285 Ok(_) => Ok(wrapped_events),
286 Err(err) => {
287 Err(AggregateError::TechnicalError(
288 err.to_string(),
289 ))
290 },
291 }
292 }
293}