postgres_es2/stores/
postgres_store.rs1use std::{
2 collections::HashMap,
3 marker::PhantomData,
4};
5
6use postgres::Client;
7
8use cqrs_es2::{
9 Aggregate,
10 AggregateError,
11 EventEnvelope,
12 EventStore,
13};
14
15use super::postgres_store_aggregate_context::PostgresStoreAggregateContext;
16
17static INSERT_EVENT: &str = "INSERT INTO events (aggregate_type, \
18 aggregate_id, sequence, payload, \
19 metadata)
20 VALUES ($1, $2, $3, $4, $5)";
21static SELECT_EVENTS: &str = "SELECT aggregate_type, aggregate_id, \
22 sequence, payload, metadata
23 FROM events
24 WHERE aggregate_type = $1 AND aggregate_id = $2 ORDER BY \
25 sequence";
26
27pub struct PostgresStore<A: Aggregate> {
30 conn: Client,
31 _phantom: PhantomData<A>,
32}
33
34impl<A: Aggregate> PostgresStore<A> {
35 pub fn new(conn: Client) -> Self {
38 PostgresStore {
39 conn,
40 _phantom: PhantomData,
41 }
42 }
43}
44
45impl<A: Aggregate> EventStore<A, PostgresStoreAggregateContext<A>>
46 for PostgresStore<A>
47{
48 fn load(
49 &mut self,
50 aggregate_id: &str,
51 ) -> Vec<EventEnvelope<A>> {
52 let agg_type = A::aggregate_type();
53 let id = aggregate_id.to_string();
54 let mut result = Vec::new();
55 match self
56 .conn
57 .query(SELECT_EVENTS, &[&agg_type, &id])
58 {
59 Ok(rows) => {
60 for row in rows.iter() {
61 let aggregate_type: String =
62 row.get("aggregate_type");
63 let aggregate_id: String =
64 row.get("aggregate_id");
65 let s: i64 = row.get("sequence");
66 let sequence = s as usize;
67 let payload: A::Event =
68 match serde_json::from_value(
69 row.get("payload"),
70 ) {
71 Ok(payload) => payload,
72 Err(err) => {
73 panic!(
74 "bad payload found in events \
75 table for aggregate id {} with \
76 error: {}",
77 &id, err
78 );
79 },
80 };
81 let event = EventEnvelope::new(
82 aggregate_id,
83 sequence,
84 aggregate_type,
85 payload,
86 );
87 result.push(event);
88 }
89 },
90 Err(e) => {
91 println!("{:?}", e);
92 },
93 }
94 result
95 }
96 fn load_aggregate(
97 &mut self,
98 aggregate_id: &str,
99 ) -> PostgresStoreAggregateContext<A> {
100 let committed_events = self.load(aggregate_id);
101 let mut aggregate = A::default();
102 let mut current_sequence = 0;
103 for envelope in committed_events {
104 current_sequence = envelope.sequence;
105 let event = envelope.payload;
106 aggregate.apply(&event);
107 }
108 PostgresStoreAggregateContext {
109 aggregate_id: aggregate_id.to_string(),
110 aggregate,
111 current_sequence,
112 }
113 }
114
115 fn commit(
116 &mut self,
117 events: Vec<A::Event>,
118 context: PostgresStoreAggregateContext<A>,
119 metadata: HashMap<String, String>,
120 ) -> Result<Vec<EventEnvelope<A>>, AggregateError> {
121 let aggregate_id = context.aggregate_id.as_str();
122 let current_sequence = context.current_sequence;
123 let wrapped_events = self.wrap_events(
124 aggregate_id,
125 current_sequence,
126 events,
127 metadata,
128 );
129 let mut trans = match self.conn.transaction() {
130 Ok(t) => t,
131 Err(err) => {
132 return Err(AggregateError::TechnicalError(
133 err.to_string(),
134 ));
135 },
136 };
137 for event in &wrapped_events {
138 let agg_type = event.aggregate_type.clone();
139 let id = context.aggregate_id.clone();
140 let sequence = event.sequence as i64;
141 let payload = match serde_json::to_value(&event.payload) {
142 Ok(payload) => payload,
143 Err(err) => {
144 panic!(
145 "bad payload found in events table for \
146 aggregate id {} with error: {}",
147 &id, err
148 );
149 },
150 };
151 let metadata = match serde_json::to_value(&event.metadata)
152 {
153 Ok(metadata) => metadata,
154 Err(err) => {
155 panic!(
156 "bad metadata found in events table for \
157 aggregate id {} with error: {}",
158 &id, err
159 );
160 },
161 };
162 match trans.execute(
163 INSERT_EVENT,
164 &[
165 &agg_type, &id, &sequence, &payload, &metadata,
166 ],
167 ) {
168 Ok(_) => {},
169 Err(err) => {
170 match err.code() {
171 None => {},
172 Some(state) => {
173 if state.code() == "23505" {
174 return Err(
175 AggregateError::TechnicalError(
176 "optimistic lock error"
177 .to_string(),
178 ),
179 );
180 }
181 },
182 }
183 panic!(
184 "unable to insert event table for aggregate \
185 id {} with error: {}\n and payload: {}",
186 &id, err, &payload
187 );
188 },
189 };
190 }
191 match trans.commit() {
192 Ok(_) => Ok(wrapped_events),
193 Err(err) => {
194 Err(AggregateError::TechnicalError(
195 err.to_string(),
196 ))
197 },
198 }
199 }
200}