1use std::{
2 collections::HashMap,
3 marker::PhantomData,
4};
5
6use postgres::Client;
7use serde_json::Value;
8
9use cqrs_es2::{
10 AggregateContext,
11 AggregateError,
12 EventEnvelope,
13 IAggregate,
14 IEventStore,
15};
16
17use crate::sql::{
18 INSERT_EVENT,
19 INSERT_SNAPSHOT,
20 SELECT_EVENTS,
21 SELECT_SNAPSHOT,
22 UPDATE_SNAPSHOT,
23};
24
25pub struct SnapshotEventStore<A: IAggregate> {
30 conn: Client,
31 _phantom: PhantomData<A>,
32}
33
34impl<A: IAggregate> SnapshotEventStore<A> {
35 pub fn new(conn: Client) -> Self {
38 SnapshotEventStore {
39 conn,
40 _phantom: PhantomData,
41 }
42 }
43}
44
45impl<A: IAggregate> IEventStore<A> for SnapshotEventStore<A> {
46 fn load(
47 &mut self,
48 aggregate_id: &str,
49 ) -> Vec<EventEnvelope<A>> {
50 let agg_type = A::aggregate_type();
51 let id = aggregate_id.to_string();
52
53 let mut result = Vec::new();
54
55 match self
56 .conn
57 .query(SELECT_EVENTS, &[&agg_type, &id])
58 {
59 Ok(rows) => {
60 for row in rows.iter() {
61 let aggregate_id: String =
62 row.get("aggregate_id");
63 let s: i64 = row.get("sequence");
64 let sequence = s as usize;
65
66 let payload: A::Event =
67 match serde_json::from_value(
68 row.get("payload"),
69 ) {
70 Ok(payload) => payload,
71 Err(err) => {
72 panic!(
73 "bad payload found in events \
74 table for aggregate id {} with \
75 error: {}",
76 &id, err
77 );
78 },
79 };
80
81 let event = EventEnvelope::new(
82 aggregate_id,
83 sequence,
84 payload,
85 );
86
87 result.push(event);
88 }
89 },
90 Err(e) => {
91 panic!("{:?}", e);
92 },
93 }
94
95 result
96 }
97
98 fn load_aggregate(
99 &mut self,
100 aggregate_id: &str,
101 ) -> AggregateContext<A> {
102 let agg_type = A::aggregate_type();
103
104 match self.conn.query(
105 SELECT_SNAPSHOT,
106 &[&agg_type, &aggregate_id.to_string()],
107 ) {
108 Ok(rows) => {
109 match rows.iter().next() {
110 None => {
111 AggregateContext {
112 aggregate_id: aggregate_id.to_string(),
113 aggregate: A::default(),
114 current_sequence: 0,
115 }
116 },
117 Some(row) => {
118 let s: i64 = row.get("last_sequence");
119 let val: Value = row.get("payload");
120 let aggregate =
121 serde_json::from_value(val).unwrap();
122 AggregateContext {
123 aggregate_id: aggregate_id.to_string(),
124 aggregate: aggregate,
125 current_sequence: s as usize,
126 }
127 },
128 }
129 },
130 Err(e) => {
131 panic!("{:?}", e);
132 },
133 }
134 }
135
136 fn commit(
137 &mut self,
138 events: Vec<A::Event>,
139 context: AggregateContext<A>,
140 metadata: HashMap<String, String>,
141 ) -> Result<Vec<EventEnvelope<A>>, AggregateError> {
142 let mut updated_aggregate = context.aggregate.clone();
143
144 let agg_type = A::aggregate_type().to_string();
145 let aggregate_id = context.aggregate_id.as_str();
146 let current_sequence = context.current_sequence;
147
148 let wrapped_events = self.wrap_events(
149 aggregate_id,
150 current_sequence,
151 events,
152 metadata,
153 );
154
155 let mut trans = match self.conn.transaction() {
156 Ok(t) => t,
157 Err(err) => {
158 return Err(AggregateError::TechnicalError(
159 err.to_string(),
160 ));
161 },
162 };
163
164 let mut last_sequence = current_sequence as i64;
165
166 for event in wrapped_events.clone() {
167 let id = context.aggregate_id.clone();
168 let sequence = event.sequence as i64;
169 last_sequence = sequence;
170
171 let payload = match serde_json::to_value(&event.payload) {
172 Ok(payload) => payload,
173 Err(err) => {
174 panic!(
175 "bad payload found in events table for \
176 aggregate id {} with error: {}",
177 &id, err
178 );
179 },
180 };
181
182 let metadata = match serde_json::to_value(&event.metadata)
183 {
184 Ok(metadata) => metadata,
185 Err(err) => {
186 panic!(
187 "bad metadata found in events table for \
188 aggregate id {} with error: {}",
189 &id, err
190 );
191 },
192 };
193
194 match trans.execute(
195 INSERT_EVENT,
196 &[
197 &agg_type, &id, &sequence, &payload, &metadata,
198 ],
199 ) {
200 Ok(_) => {},
201 Err(err) => {
202 match err.code() {
203 None => {},
204 Some(state) => {
205 if state.code() == "23505" {
206 return Err(
207 AggregateError::TechnicalError(
208 "optimistic lock error"
209 .to_string(),
210 ),
211 );
212 }
213 },
214 }
215 panic!(
216 "unable to insert event table for aggregate \
217 id {} with error: {}\n and payload: {}",
218 &id, err, &payload
219 );
220 },
221 };
222
223 updated_aggregate.apply(&event.payload);
224 }
225
226 let aggregate_payload =
227 match serde_json::to_value(updated_aggregate) {
228 Ok(val) => val,
229 Err(err) => {
230 panic!(
231 "bad metadata found in events table for \
232 aggregate id {} with error: {}",
233 &aggregate_id, err
234 );
235 },
236 };
237
238 if context.current_sequence == 0 {
239 match trans.execute(
240 INSERT_SNAPSHOT,
241 &[
242 &agg_type,
243 &aggregate_id,
244 &last_sequence,
245 &aggregate_payload,
246 ],
247 ) {
248 Ok(_) => {},
249 Err(err) => {
250 panic!(
251 "unable to insert snapshot for aggregate id \
252 {} with error: {}\n and payload: {}",
253 &aggregate_id, err, &aggregate_payload
254 );
255 },
256 };
257 }
258 else {
259 match trans.execute(
260 UPDATE_SNAPSHOT,
261 &[
262 &agg_type,
263 &aggregate_id,
264 &last_sequence,
265 &aggregate_payload,
266 ],
267 ) {
268 Ok(_) => {},
269 Err(err) => {
270 panic!(
271 "unable to update snapshot for aggregate id \
272 {} with error: {}\n and payload: {}",
273 &aggregate_id, err, &aggregate_payload
274 );
275 },
276 };
277 }
278
279 match trans.commit() {
280 Ok(_) => Ok(wrapped_events),
281 Err(err) => {
282 Err(AggregateError::TechnicalError(
283 err.to_string(),
284 ))
285 },
286 }
287 }
288}