1use crate::{
2 ids::LogId,
3 stores::{EventStore, EventStoreError},
4 AppendOptions, EventRecord,
5};
6use chrono::Utc;
7use const_format::formatcp;
8use deadpool_postgres::{GenericClient, Pool, PoolError};
9use futures_util::TryStreamExt;
10use serde::{de::DeserializeOwned, Serialize};
11use std::fmt::Debug;
12use tokio_postgres::{
13 error::SqlState,
14 types::{FromSql, Json, ToSql, Type},
15 Error, Row,
16};
17
18const SCHEMA_NAME: &str = "eventlogs";
19const TABLE_NAME: &str = "events";
20const QUALIFIED_TABLE_NAME: &str = formatcp!("{SCHEMA_NAME}.{TABLE_NAME}");
21const PK_CONSTRAINT: &str = formatcp!("{TABLE_NAME}_pkey");
22const IDEMPOTENCY_KEY_CONSTRAINT: &str = "idempotency_key_unique";
23const COLUMN_LIST: &str = "log_id,event_index,recorded_at,idempotency_key,payload";
24const SELECT_EVENTS: &str = formatcp!(
25 "select {COLUMN_LIST} from {QUALIFIED_TABLE_NAME}
26 where log_id = $1 and event_index >= $2
27 order by log_id, event_index"
28);
29const SELECT_EVENTS_WITH_LIMIT: &str = formatcp!("{SELECT_EVENTS} limit $3");
30const INSERT_EVENT: &str =
31 formatcp!("insert into {QUALIFIED_TABLE_NAME} ({COLUMN_LIST}) values ($1,$2,$3,$4,$5)");
32const SELECT_EVENT_FOR_IDEMPOTENCY_KEY: &str =
33 formatcp!("select log_id, event_index from {QUALIFIED_TABLE_NAME} where idempotency_key=$1");
34
35impl From<PoolError> for EventStoreError {
36 fn from(value: PoolError) -> Self {
37 EventStoreError::DatabaseError {
38 error: Box::new(value),
39 }
40 }
41}
42
43impl From<Error> for EventStoreError {
44 fn from(value: Error) -> Self {
45 EventStoreError::DatabaseError {
46 error: Box::new(value),
47 }
48 }
49}
50
51impl<'a> FromSql<'a> for LogId {
52 fn from_sql(
53 _ty: &Type,
54 raw: &'a [u8],
55 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
56 let s = String::from_utf8_lossy(raw);
57 let log_id: LogId = s.parse()?;
58 Ok(log_id)
59 }
60
61 fn accepts(ty: &Type) -> bool {
62 matches!(ty, &Type::VARCHAR | &Type::CHAR)
63 }
64}
65
66impl<E> EventRecord<E> for Row
67where
68 E: Serialize + DeserializeOwned + Debug + Send + Sync,
69{
70 fn index(&self) -> u32 {
71 self.get("event_index")
72 }
73
74 fn recorded_at(&self) -> chrono::DateTime<Utc> {
75 self.get("recorded_at")
76 }
77
78 fn idempotency_key(&self) -> Option<String> {
79 self.get("idempotency_key")
80 }
81
82 fn event(&self) -> E {
83 self.get::<_, Json<E>>("payload").0
84 }
85}
86
87pub struct PostgresEventStore {
115 pool: Pool,
116}
117
118impl PostgresEventStore {
119 pub fn new(pool: Pool) -> Self {
121 PostgresEventStore { pool }
122 }
123}
124
125impl<E> EventStore<E> for PostgresEventStore
126where
127 E: Serialize + DeserializeOwned + Debug + Send + Sync,
128{
129 async fn append(
130 &self,
131 log_id: &LogId,
132 event: &E,
133 event_index: u32,
134 append_options: &AppendOptions,
135 ) -> Result<(), EventStoreError> {
136 let conn = self.pool.get().await?;
137 let stmt = conn.prepare_cached(INSERT_EVENT).await?;
138 let result = conn
139 .execute(
140 &stmt,
141 &[
142 &log_id.to_string(),
143 &event_index,
144 &Utc::now(),
145 &append_options.idempotency_key,
146 &Json(event),
147 ],
148 )
149 .await;
150
151 if let Err(ref e) = result {
154 if e.code() == Some(&SqlState::UNIQUE_VIOLATION) {
155 if let Some(dbe) = e.as_db_error() {
156 if dbe.constraint() == Some(PK_CONSTRAINT) {
157 return Err(EventStoreError::EventIndexAlreadyExists {
158 log_id: log_id.clone(),
159 event_index,
160 });
161 }
162 if dbe.constraint() == Some(IDEMPOTENCY_KEY_CONSTRAINT) {
163 if let Some(ref key) = append_options.idempotency_key {
166 let query = conn
168 .prepare_cached(SELECT_EVENT_FOR_IDEMPOTENCY_KEY)
169 .await?;
170 let row = conn.query_one(&query, &[&key]).await?;
171 return Err(EventStoreError::IdempotentReplay {
172 idempotency_key: key.clone(),
173 log_id: row.get("log_id"),
174 event_index: row.get("event_index"),
175 });
176 }
177 }
178 }
179 }
180 }
181
182 Ok(result.map(|_| ())?)
183 }
184
185 async fn load(
186 &self,
187 log_id: &LogId,
188 starting_index: u32,
189 max_events: u32,
190 ) -> Result<
191 impl futures_util::Stream<Item = Result<impl EventRecord<E>, EventStoreError>>,
192 EventStoreError,
193 > {
194 let sql = if max_events == u32::MAX {
195 SELECT_EVENTS
196 } else {
197 SELECT_EVENTS_WITH_LIMIT
198 };
199 let conn = self.pool.get().await?;
200 let stmt = conn.prepare_cached(sql).await?;
201
202 let log_id_param = log_id.to_string();
203 let mut params: Vec<&(dyn ToSql + Sync)> = vec![&log_id_param, &starting_index];
204
205 let limit_param = max_events as i64;
207 if max_events < u32::MAX {
208 params.push(&limit_param);
209 }
210
211 let row_stream = conn.query_raw(&stmt, params).await?;
212
213 Ok(row_stream.map_err(|e| EventStoreError::DatabaseError { error: Box::new(e) }))
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use crate::tests::TestEvent;
221 use deadpool_postgres::Config;
222 use deadpool_redis::Runtime;
223 use futures_util::StreamExt;
224 use tokio_postgres::NoTls;
225 use uuid::Uuid;
226
227 fn store() -> impl EventStore<TestEvent> {
228 let mut cfg = Config::new();
229 cfg.host = Some("localhost".to_string());
230 cfg.user = Some("postgres".to_string());
231 cfg.password = Some("ci-postgres-password".to_string());
232 cfg.dbname = Some("postgres".to_string());
233 let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
234 PostgresEventStore::new(pool)
235 }
236
237 #[tokio::test]
238 async fn append_load() {
239 let log_id = LogId::new();
240 let store = store();
241 store
242 .append(&log_id, &TestEvent::Increment, 0, &AppendOptions::default())
243 .await
244 .unwrap();
245
246 let row_stream = store.load(&log_id, 0, u32::MAX).await.unwrap();
247 assert_eq!(row_stream.count().await, 1);
248 }
249
250 #[tokio::test]
251 async fn append_multiple_load() {
252 let log_id = LogId::new();
253 let store = store();
254 store
255 .append(&log_id, &TestEvent::Increment, 0, &AppendOptions::default())
256 .await
257 .unwrap();
258
259 store
260 .append(&log_id, &TestEvent::Decrement, 1, &AppendOptions::default())
261 .await
262 .unwrap();
263
264 store
265 .append(&log_id, &TestEvent::Increment, 2, &AppendOptions::default())
266 .await
267 .unwrap();
268
269 let row_stream = store.load(&log_id, 0, u32::MAX).await.unwrap();
270 assert_eq!(row_stream.count().await, 3);
271 }
272
273 #[tokio::test]
274 async fn idempotent_create() {
275 let log_id = LogId::new();
276 let store = store();
277 let idempotency_key = Uuid::now_v7().to_string();
278 let options = AppendOptions {
279 idempotency_key: Some(idempotency_key.clone()),
280 ..Default::default()
281 };
282
283 store
284 .append(&log_id, &TestEvent::Increment, 0, &options)
285 .await
286 .unwrap();
287
288 let replay_log_id = LogId::new();
289 let result = store
290 .append(&replay_log_id, &TestEvent::Increment, 0, &options)
291 .await;
292
293 assert_eq!(
294 result,
295 Err(EventStoreError::IdempotentReplay {
296 idempotency_key: idempotency_key.clone(),
297 log_id: log_id.clone(), event_index: 0
299 })
300 )
301 }
302
303 #[tokio::test]
304 async fn concurrent_append() {
305 let log_id = LogId::new();
306 let store = store();
307 store
308 .append(&log_id, &TestEvent::Increment, 0, &AppendOptions::default())
309 .await
310 .unwrap();
311
312 store
313 .append(&log_id, &TestEvent::Decrement, 1, &AppendOptions::default())
314 .await
315 .unwrap();
316
317 let result = store
318 .append(&log_id, &TestEvent::Decrement, 1, &AppendOptions::default())
319 .await;
320
321 assert_eq!(
322 result,
323 Err(EventStoreError::EventIndexAlreadyExists {
324 log_id: log_id,
325 event_index: 1
326 })
327 )
328 }
329
330 #[tokio::test]
331 async fn idempotent_append() {
332 let log_id = LogId::new();
333 let store = store();
334 let idempotency_key = Uuid::now_v7().to_string();
335 let options = AppendOptions {
336 idempotency_key: Some(idempotency_key.clone()),
337 ..Default::default()
338 };
339
340 store
341 .append(&log_id, &TestEvent::Increment, 0, &AppendOptions::default())
342 .await
343 .unwrap();
344
345 store
346 .append(&log_id, &TestEvent::Decrement, 1, &options)
347 .await
348 .unwrap();
349
350 let result = store
351 .append(&log_id, &TestEvent::Decrement, 2, &options)
352 .await;
353
354 assert_eq!(
355 result,
356 Err(EventStoreError::IdempotentReplay {
357 idempotency_key: idempotency_key.clone(),
358 log_id: log_id.clone(), event_index: 1 })
361 );
362
363 let row_stream = store.load(&log_id, 0, u32::MAX).await.unwrap();
365 assert_eq!(row_stream.count().await, 2);
366 }
367
368 #[tokio::test]
369 async fn load_limit() {
370 let log_id = LogId::new();
371 let store = store();
372 for i in 0..10 {
373 store
374 .append(&log_id, &TestEvent::Increment, i, &AppendOptions::default())
375 .await
376 .unwrap();
377 }
378 let row_stream = store.load(&log_id, 0, 5).await.unwrap();
379 assert_eq!(row_stream.count().await, 5);
380 }
381}