eventlogs/stores/
postgres.rs

1use crate::{
2    ids::LogId,
3    stores::{EventStore, EventStoreError},
4    AppendOptions, EventRecord,
5};
6use chrono::Utc;
7use const_format::formatcp;
8use deadpool_postgres::{GenericClient, Pool, PoolError};
9use futures_util::TryStreamExt;
10use serde::{de::DeserializeOwned, Serialize};
11use std::fmt::Debug;
12use tokio_postgres::{
13    error::SqlState,
14    types::{FromSql, Json, ToSql, Type},
15    Error, Row,
16};
17
18const SCHEMA_NAME: &str = "eventlogs";
19const TABLE_NAME: &str = "events";
20const QUALIFIED_TABLE_NAME: &str = formatcp!("{SCHEMA_NAME}.{TABLE_NAME}");
21const PK_CONSTRAINT: &str = formatcp!("{TABLE_NAME}_pkey");
22const IDEMPOTENCY_KEY_CONSTRAINT: &str = "idempotency_key_unique";
23const COLUMN_LIST: &str = "log_id,event_index,recorded_at,idempotency_key,payload";
24const SELECT_EVENTS: &str = formatcp!(
25    "select {COLUMN_LIST} from {QUALIFIED_TABLE_NAME} 
26    where log_id = $1 and event_index >= $2
27    order by log_id, event_index"
28);
29const SELECT_EVENTS_WITH_LIMIT: &str = formatcp!("{SELECT_EVENTS} limit $3");
30const INSERT_EVENT: &str =
31    formatcp!("insert into {QUALIFIED_TABLE_NAME} ({COLUMN_LIST}) values ($1,$2,$3,$4,$5)");
32const SELECT_EVENT_FOR_IDEMPOTENCY_KEY: &str =
33    formatcp!("select log_id, event_index from {QUALIFIED_TABLE_NAME} where idempotency_key=$1");
34
35impl From<PoolError> for EventStoreError {
36    fn from(value: PoolError) -> Self {
37        EventStoreError::DatabaseError {
38            error: Box::new(value),
39        }
40    }
41}
42
43impl From<Error> for EventStoreError {
44    fn from(value: Error) -> Self {
45        EventStoreError::DatabaseError {
46            error: Box::new(value),
47        }
48    }
49}
50
51impl<'a> FromSql<'a> for LogId {
52    fn from_sql(
53        _ty: &Type,
54        raw: &'a [u8],
55    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
56        let s = String::from_utf8_lossy(raw);
57        let log_id: LogId = s.parse()?;
58        Ok(log_id)
59    }
60
61    fn accepts(ty: &Type) -> bool {
62        matches!(ty, &Type::VARCHAR | &Type::CHAR)
63    }
64}
65
66impl<E> EventRecord<E> for Row
67where
68    E: Serialize + DeserializeOwned + Debug + Send + Sync,
69{
70    fn index(&self) -> u32 {
71        self.get("event_index")
72    }
73
74    fn recorded_at(&self) -> chrono::DateTime<Utc> {
75        self.get("recorded_at")
76    }
77
78    fn idempotency_key(&self) -> Option<String> {
79        self.get("idempotency_key")
80    }
81
82    fn event(&self) -> E {
83        self.get::<_, Json<E>>("payload").0
84    }
85}
86
87/// An implementation of [EventStore] backed by a Postgres database.
88///
89/// This assumes the following schema and table exists in the target server:
90/// ```sql
91/// create schema eventlogs;
92///
93/// create table eventlogs.events (
94///     log_id varchar(128) not null,
95///     event_index OID not null,
96///     recorded_at timestamp with time zone not null,
97///     idempotency_key varchar(256) null constraint idempotency_key_unique unique,
98///     payload json not null,
99///     primary key(log_id, event_index)
100/// );
101/// ```
102/// The `payload` column may be typed either as `json` or `jsonb`. The former will
103/// give you slightly faster insert latency, but you won't be able to do any queries
104/// directly against the database that refer to the contents of that field. A `jsonb`
105/// column has slightly slower insert latency, but you can then refer to the contents
106/// in your own SQL queries, and even index fields within the JSON. That said,
107/// indexing the JSON will slow down the inserts event more, so consider using a
108/// separate database/table for searching and filtering events.
109///
110/// The [Github repo](https://github.com/davestearns/eventlogs/tree/main/docker/postgres)
111/// contains a `Dockerfile` and `schema.sql` you can use
112/// to build a custom Postgres image with all of this pre-defined. Or run the
113/// schema file against your own existing/hosted Postgres instance.
114pub struct PostgresEventStore {
115    pool: Pool,
116}
117
118impl PostgresEventStore {
119    /// Constructs a new instance given a pre-configured deadpool-postgres Pool.
120    pub fn new(pool: Pool) -> Self {
121        PostgresEventStore { pool }
122    }
123}
124
125impl<E> EventStore<E> for PostgresEventStore
126where
127    E: Serialize + DeserializeOwned + Debug + Send + Sync,
128{
129    async fn append(
130        &self,
131        log_id: &LogId,
132        event: &E,
133        event_index: u32,
134        append_options: &AppendOptions,
135    ) -> Result<(), EventStoreError> {
136        let conn = self.pool.get().await?;
137        let stmt = conn.prepare_cached(INSERT_EVENT).await?;
138        let result = conn
139            .execute(
140                &stmt,
141                &[
142                    &log_id.to_string(),
143                    &event_index,
144                    &Utc::now(),
145                    &append_options.idempotency_key,
146                    &Json(event),
147                ],
148            )
149            .await;
150
151        // If there was a unique constraint violation,
152        // return th appropriate EventStoreError
153        if let Err(ref e) = result {
154            if e.code() == Some(&SqlState::UNIQUE_VIOLATION) {
155                if let Some(dbe) = e.as_db_error() {
156                    if dbe.constraint() == Some(PK_CONSTRAINT) {
157                        return Err(EventStoreError::EventIndexAlreadyExists {
158                            log_id: log_id.clone(),
159                            event_index,
160                        });
161                    }
162                    if dbe.constraint() == Some(IDEMPOTENCY_KEY_CONSTRAINT) {
163                        // If we got a duplicate idempotency key error, idempotency_key
164                        // should have Some value, but just to be safe...
165                        if let Some(ref key) = append_options.idempotency_key {
166                            // Find the event with that idempotency key
167                            let query = conn
168                                .prepare_cached(SELECT_EVENT_FOR_IDEMPOTENCY_KEY)
169                                .await?;
170                            let row = conn.query_one(&query, &[&key]).await?;
171                            return Err(EventStoreError::IdempotentReplay {
172                                idempotency_key: key.clone(),
173                                log_id: row.get("log_id"),
174                                event_index: row.get("event_index"),
175                            });
176                        }
177                    }
178                }
179            }
180        }
181
182        Ok(result.map(|_| ())?)
183    }
184
185    async fn load(
186        &self,
187        log_id: &LogId,
188        starting_index: u32,
189        max_events: u32,
190    ) -> Result<
191        impl futures_util::Stream<Item = Result<impl EventRecord<E>, EventStoreError>>,
192        EventStoreError,
193    > {
194        let sql = if max_events == u32::MAX {
195            SELECT_EVENTS
196        } else {
197            SELECT_EVENTS_WITH_LIMIT
198        };
199        let conn = self.pool.get().await?;
200        let stmt = conn.prepare_cached(sql).await?;
201
202        let log_id_param = log_id.to_string();
203        let mut params: Vec<&(dyn ToSql + Sync)> = vec![&log_id_param, &starting_index];
204
205        // for some reason postgres won't accept a u32 for a limit value
206        let limit_param = max_events as i64;
207        if max_events < u32::MAX {
208            params.push(&limit_param);
209        }
210
211        let row_stream = conn.query_raw(&stmt, params).await?;
212
213        Ok(row_stream.map_err(|e| EventStoreError::DatabaseError { error: Box::new(e) }))
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use crate::tests::TestEvent;
221    use deadpool_postgres::Config;
222    use deadpool_redis::Runtime;
223    use futures_util::StreamExt;
224    use tokio_postgres::NoTls;
225    use uuid::Uuid;
226
227    fn store() -> impl EventStore<TestEvent> {
228        let mut cfg = Config::new();
229        cfg.host = Some("localhost".to_string());
230        cfg.user = Some("postgres".to_string());
231        cfg.password = Some("ci-postgres-password".to_string());
232        cfg.dbname = Some("postgres".to_string());
233        let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
234        PostgresEventStore::new(pool)
235    }
236
237    #[tokio::test]
238    async fn append_load() {
239        let log_id = LogId::new();
240        let store = store();
241        store
242            .append(&log_id, &TestEvent::Increment, 0, &AppendOptions::default())
243            .await
244            .unwrap();
245
246        let row_stream = store.load(&log_id, 0, u32::MAX).await.unwrap();
247        assert_eq!(row_stream.count().await, 1);
248    }
249
250    #[tokio::test]
251    async fn append_multiple_load() {
252        let log_id = LogId::new();
253        let store = store();
254        store
255            .append(&log_id, &TestEvent::Increment, 0, &AppendOptions::default())
256            .await
257            .unwrap();
258
259        store
260            .append(&log_id, &TestEvent::Decrement, 1, &AppendOptions::default())
261            .await
262            .unwrap();
263
264        store
265            .append(&log_id, &TestEvent::Increment, 2, &AppendOptions::default())
266            .await
267            .unwrap();
268
269        let row_stream = store.load(&log_id, 0, u32::MAX).await.unwrap();
270        assert_eq!(row_stream.count().await, 3);
271    }
272
273    #[tokio::test]
274    async fn idempotent_create() {
275        let log_id = LogId::new();
276        let store = store();
277        let idempotency_key = Uuid::now_v7().to_string();
278        let options = AppendOptions {
279            idempotency_key: Some(idempotency_key.clone()),
280            ..Default::default()
281        };
282
283        store
284            .append(&log_id, &TestEvent::Increment, 0, &options)
285            .await
286            .unwrap();
287
288        let replay_log_id = LogId::new();
289        let result = store
290            .append(&replay_log_id, &TestEvent::Increment, 0, &options)
291            .await;
292
293        assert_eq!(
294            result,
295            Err(EventStoreError::IdempotentReplay {
296                idempotency_key: idempotency_key.clone(),
297                log_id: log_id.clone(), // original log id, not replay log id
298                event_index: 0
299            })
300        )
301    }
302
303    #[tokio::test]
304    async fn concurrent_append() {
305        let log_id = LogId::new();
306        let store = store();
307        store
308            .append(&log_id, &TestEvent::Increment, 0, &AppendOptions::default())
309            .await
310            .unwrap();
311
312        store
313            .append(&log_id, &TestEvent::Decrement, 1, &AppendOptions::default())
314            .await
315            .unwrap();
316
317        let result = store
318            .append(&log_id, &TestEvent::Decrement, 1, &AppendOptions::default())
319            .await;
320
321        assert_eq!(
322            result,
323            Err(EventStoreError::EventIndexAlreadyExists {
324                log_id: log_id,
325                event_index: 1
326            })
327        )
328    }
329
330    #[tokio::test]
331    async fn idempotent_append() {
332        let log_id = LogId::new();
333        let store = store();
334        let idempotency_key = Uuid::now_v7().to_string();
335        let options = AppendOptions {
336            idempotency_key: Some(idempotency_key.clone()),
337            ..Default::default()
338        };
339
340        store
341            .append(&log_id, &TestEvent::Increment, 0, &AppendOptions::default())
342            .await
343            .unwrap();
344
345        store
346            .append(&log_id, &TestEvent::Decrement, 1, &options)
347            .await
348            .unwrap();
349
350        let result = store
351            .append(&log_id, &TestEvent::Decrement, 2, &options)
352            .await;
353
354        assert_eq!(
355            result,
356            Err(EventStoreError::IdempotentReplay {
357                idempotency_key: idempotency_key.clone(),
358                log_id: log_id.clone(), // original log id
359                event_index: 1          // original event index
360            })
361        );
362
363        //ensure this log only has 2 events
364        let row_stream = store.load(&log_id, 0, u32::MAX).await.unwrap();
365        assert_eq!(row_stream.count().await, 2);
366    }
367
368    #[tokio::test]
369    async fn load_limit() {
370        let log_id = LogId::new();
371        let store = store();
372        for i in 0..10 {
373            store
374                .append(&log_id, &TestEvent::Increment, i, &AppendOptions::default())
375                .await
376                .unwrap();
377        }
378        let row_stream = store.load(&log_id, 0, 5).await.unwrap();
379        assert_eq!(row_stream.count().await, 5);
380    }
381}