cqrs_es2_sql/stores/
event_store.rs1use std::{
2 collections::HashMap,
3 marker::PhantomData,
4};
5
6use postgres::Client;
7
8use cqrs_es2::{
9 AggregateContext,
10 AggregateError,
11 EventEnvelope,
12 IAggregate,
13 IEventStore,
14};
15
16use crate::sql::{
17 INSERT_EVENT,
18 SELECT_EVENTS,
19};
20
21pub struct EventStore<A: IAggregate> {
24 conn: Client,
25 _phantom: PhantomData<A>,
26}
27
28impl<A: IAggregate> EventStore<A> {
29 pub fn new(conn: Client) -> Self {
32 EventStore {
33 conn,
34 _phantom: PhantomData,
35 }
36 }
37}
38
39impl<A: IAggregate> IEventStore<A> for EventStore<A> {
40 fn load(
41 &mut self,
42 aggregate_id: &str,
43 ) -> Vec<EventEnvelope<A>> {
44 let agg_type = A::aggregate_type();
45 let id = aggregate_id.to_string();
46
47 let mut result = Vec::new();
48
49 match self
50 .conn
51 .query(SELECT_EVENTS, &[&agg_type, &id])
52 {
53 Ok(rows) => {
54 for row in rows.iter() {
55 let aggregate_id: String =
56 row.get("aggregate_id");
57 let s: i64 = row.get("sequence");
58 let sequence = s as usize;
59
60 let payload: A::Event =
61 match serde_json::from_value(
62 row.get("payload"),
63 ) {
64 Ok(payload) => payload,
65 Err(err) => {
66 panic!(
67 "bad payload found in events \
68 table for aggregate id {} with \
69 error: {}",
70 &id, err
71 );
72 },
73 };
74
75 let event = EventEnvelope::new(
76 aggregate_id,
77 sequence,
78 payload,
79 );
80
81 result.push(event);
82 }
83 },
84 Err(e) => {
85 println!("{:?}", e);
86 },
87 }
88
89 result
90 }
91
92 fn load_aggregate(
93 &mut self,
94 aggregate_id: &str,
95 ) -> AggregateContext<A> {
96 let committed_events = self.load(aggregate_id);
97
98 let mut aggregate = A::default();
99 let mut current_sequence = 0;
100
101 for envelope in committed_events {
102 current_sequence = envelope.sequence;
103 let event = envelope.payload;
104 aggregate.apply(&event);
105 }
106
107 AggregateContext {
108 aggregate_id: aggregate_id.to_string(),
109 aggregate,
110 current_sequence,
111 }
112 }
113
114 fn commit(
115 &mut self,
116 events: Vec<A::Event>,
117 context: AggregateContext<A>,
118 metadata: HashMap<String, String>,
119 ) -> Result<Vec<EventEnvelope<A>>, AggregateError> {
120 let agg_type = A::aggregate_type().to_string();
121 let aggregate_id = context.aggregate_id.as_str();
122 let current_sequence = context.current_sequence;
123
124 let wrapped_events = self.wrap_events(
125 aggregate_id,
126 current_sequence,
127 events,
128 metadata,
129 );
130
131 let mut trans = match self.conn.transaction() {
132 Ok(t) => t,
133 Err(err) => {
134 return Err(AggregateError::TechnicalError(
135 err.to_string(),
136 ));
137 },
138 };
139
140 for event in &wrapped_events {
141 let id = context.aggregate_id.clone();
142 let sequence = event.sequence as i64;
143
144 let payload = match serde_json::to_value(&event.payload) {
145 Ok(payload) => payload,
146 Err(err) => {
147 panic!(
148 "bad payload found in events table for \
149 aggregate id {} with error: {}",
150 &id, err
151 );
152 },
153 };
154
155 let metadata = match serde_json::to_value(&event.metadata)
156 {
157 Ok(metadata) => metadata,
158 Err(err) => {
159 panic!(
160 "bad metadata found in events table for \
161 aggregate id {} with error: {}",
162 &id, err
163 );
164 },
165 };
166
167 match trans.execute(
168 INSERT_EVENT,
169 &[
170 &agg_type, &id, &sequence, &payload, &metadata,
171 ],
172 ) {
173 Ok(_) => {},
174 Err(err) => {
175 match err.code() {
176 None => {},
177 Some(state) => {
178 if state.code() == "23505" {
179 return Err(
180 AggregateError::TechnicalError(
181 "optimistic lock error"
182 .to_string(),
183 ),
184 );
185 }
186 },
187 }
188 panic!(
189 "unable to insert event table for aggregate \
190 id {} with error: {}\n and payload: {}",
191 &id, err, &payload
192 );
193 },
194 };
195 }
196
197 match trans.commit() {
198 Ok(_) => Ok(wrapped_events),
199 Err(err) => {
200 Err(AggregateError::TechnicalError(
201 err.to_string(),
202 ))
203 },
204 }
205 }
206}