Skip to main content

disintegrate_postgres/
event_store.rs

1//! PostgreSQL Event Store
2//!
3//! This module provides an implementation of the `EventStore` trait using PostgreSQL as the underlying storage.
4//! It allows storing and retrieving events from a PostgreSQL database.
5mod append;
6mod query;
7#[cfg(test)]
8mod tests;
9
10use append::InsertEventsBuilder;
11use futures::stream::BoxStream;
12use query::CriteriaBuilder;
13use sqlx::postgres::PgPool;
14use sqlx::Row;
15use std::error::Error as StdError;
16
17use std::marker::PhantomData;
18
19use crate::{Error, Migrator, PgEventId};
20use async_stream::stream;
21use async_trait::async_trait;
22use disintegrate::EventStore;
23use disintegrate::{Event, PersistedEvent};
24use disintegrate::{StreamItem, StreamQuery};
25use disintegrate_serde::Serde;
26
27use futures::StreamExt;
28
29/// PostgreSQL event store implementation.
30#[derive(Clone)]
31pub struct PgEventStore<E, S>
32where
33    S: Serde<E> + Send + Sync,
34{
35    pub(crate) pool: PgPool,
36    serde: S,
37    event_type: PhantomData<E>,
38}
39
40impl<E, S> PgEventStore<E, S>
41where
42    S: Serde<E> + Send + Sync + Clone,
43    E: Event + Clone,
44{
45    /// Initializes the PostgreSQL DB and returns a new instance of `PgEventStore`.
46    ///
47    /// # Arguments
48    ///
49    /// * `pool` - The PostgreSQL connection pool.
50    /// * `serde` - The serialization implementation for the event payload.
51    pub async fn try_new(pool: PgPool, serde: S) -> Result<Self, Error> {
52        let event_store = Self::new_uninitialized(pool, serde);
53        Migrator::new(event_store.clone())
54            .init_event_store()
55            .await?;
56        Ok(event_store)
57    }
58    /// Creates a new instance of `PgEventStore`.
59    ///
60    /// This constructor does not initialize the database or add the
61    /// `domain_id` columns necessary for `disintegrate` to function properly.
62    /// If you need to initialize the database, use `PgEventStore::new` instead.
63    ///
64    /// If you plan to use this constructor, ensure that the `disintegrate` is
65    /// properly initialized. Refer to the SQL files in the "event_store/sql" directory
66    /// to recreate the default structure. Additionally, all `domain_id` columns
67    /// and their corresponding indexes must be created manually.
68    ///
69    /// # Arguments
70    ///
71    /// * `pool` - The PostgreSQL connection pool.
72    /// * `serde` - The serialization implementation for the event payload.
73    pub fn new_uninitialized(pool: PgPool, serde: S) -> Self {
74        Self {
75            pool,
76            serde,
77            event_type: PhantomData,
78        }
79    }
80}
81
82impl<E, S> PgEventStore<E, S>
83where
84    S: Serde<E> + Send + Sync,
85    E: Event + Send + Sync,
86{
87    /// Appends events within an existing transaction without committing.
88    ///
89    /// This allows callers to control when the transaction is committed.
90    /// Sets SERIALIZABLE isolation, calls `event_store_begin_epoch()` to
91    /// signal an in-flight write, performs the concurrency check, and inserts events.
92    pub(crate) async fn append_in_tx<QE>(
93        &self,
94        tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
95        events: Vec<E>,
96        query: StreamQuery<PgEventId, QE>,
97        version: PgEventId,
98    ) -> Result<Vec<PersistedEvent<PgEventId, E>>, Error>
99    where
100        E: Clone,
101        QE: Event + Clone + Send + Sync,
102    {
103        sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
104            .execute(&mut **tx)
105            .await?;
106
107        sqlx::query("SELECT event_store_begin_epoch()")
108            .execute(&mut **tx)
109            .await?;
110
111        if sqlx::query_scalar(&format!(
112            "SELECT EXISTS (SELECT 1 FROM event WHERE {})",
113            CriteriaBuilder::new(&query.change_origin(version)).build()
114        ))
115        .fetch_one(&mut **tx)
116        .await?
117        {
118            return Err(Error::Concurrency);
119        }
120
121        let mut insert = InsertEventsBuilder::new(&events, &self.serde);
122        let event_ids: Vec<PgEventId> = insert
123            .build()
124            .fetch_all(&mut **tx)
125            .await?
126            .into_iter()
127            .map(|r| r.get(0))
128            .collect();
129
130        let persisted_events = event_ids
131            .iter()
132            .zip(events)
133            .map(|(event_id, event)| PersistedEvent::new(*event_id, event))
134            .collect::<Vec<_>>();
135
136        Ok(persisted_events)
137    }
138
139    /// Streams events based on the provided query and executor.
140    ///
141    /// Events are returned up to the current epoch, which is the minimum between
142    /// the lowest in-flight writer's sequence position and the highest committed
143    /// event ID. This guarantees that readers never observe uncommitted events
144    /// and never skip events when concurrent writers commit out of sequence order.
145    ///
146    /// The stream yields `StreamItem::Event` for each matching event and ends
147    /// with `StreamItem::End(epoch_id)` indicating the epoch up to which events
148    /// were read.
149    ///
150    /// # Arguments
151    ///
152    /// * `executor` - The sqlx executor to use for database operations.
153    /// * `query` - The stream query specifying the criteria for filtering events.
154    ///
155    /// # Returns
156    ///
157    /// A boxed stream of `StreamItem` that contains matching events up to the
158    /// current epoch, or an error of type `Error`.
159    pub(crate) fn stream_with<'a, QE, EX>(
160        &'a self,
161        executor: EX,
162        query: &'a StreamQuery<PgEventId, QE>,
163    ) -> BoxStream<'a, Result<StreamItem<PgEventId, QE>, Error>>
164    where
165        EX: sqlx::PgExecutor<'a> + Send + Sync + 'a,
166        QE: TryFrom<E> + Event + Clone + Send + Sync + 'static,
167        <QE as TryFrom<E>>::Error: StdError + Send + Sync + 'static,
168    {
169        let sql = format!(
170            r#"SELECT event.event_id, event.payload, epoch.__epoch_id
171            FROM (values (event_store_current_epoch())) AS epoch(__epoch_id)
172            LEFT JOIN event ON event.event_id <= epoch.__epoch_id AND ({criteria})
173            ORDER BY event_id ASC"#,
174            criteria = CriteriaBuilder::new(query).build()
175        );
176
177        stream! {
178            let mut rows = sqlx::query(&sql).fetch(executor);
179            let mut epoch_id: PgEventId = 0;
180            while let Some(row) = rows.next().await {
181                let row = row?;
182                let event_id: Option<i64> = row.get(0);
183                epoch_id = row.get(2);
184                if let Some(event_id) = event_id {
185                    let payload = self.serde.deserialize(row.get(1))?;
186                    let payload: QE = payload
187                        .try_into()
188                        .map_err(|e| Error::QueryEventMapping(Box::new(e)))?;
189                    yield Ok(StreamItem::Event(PersistedEvent::new(event_id, payload)));
190                }
191            }
192            yield Ok(StreamItem::End(epoch_id));
193        }
194        .boxed()
195    }
196}
197
198/// Implementation of the event store using PostgreSQL.
199///
200/// This module provides the implementation of the `EventStore` trait for `PgEventStore`,
201/// allowing interaction with a PostgreSQL event store. It enables streaming events based on
202/// a query and appending new events to the event store.
203#[async_trait]
204impl<E, S> EventStore<PgEventId, E> for PgEventStore<E, S>
205where
206    E: Event + Send + Sync,
207    S: Serde<E> + Send + Sync,
208{
209    type Error = Error;
210
211    /// Streams events based on the provided query.
212    ///
213    /// This function fetches events from the PostgreSQL event store based on the provided
214    /// `query`. It constructs a SQL query using the `SqlEventsCriteriaBuilder` and executes
215    /// the query using the `sqlx` crate. The fetched events are then converted into
216    /// `PersistedEvent` instances and streamed as a boxed stream.
217    ///
218    /// # Arguments
219    ///
220    /// * `query` - The stream query specifying the criteria for filtering events.
221    ///
222    /// # Returns
223    ///
224    /// A `Result` containing a boxed stream of `PersistedEvent` that matches the query criteria,
225    /// or an error of type `Self::Error`.
226    fn stream<'a, QE>(
227        &'a self,
228        query: &'a StreamQuery<PgEventId, QE>,
229    ) -> BoxStream<'a, Result<StreamItem<PgEventId, QE>, Self::Error>>
230    where
231        QE: TryFrom<E> + Event + 'static + Clone + Send + Sync,
232        <QE as TryFrom<E>>::Error: StdError + 'static + Send + Sync,
233    {
234        self.stream_with(&self.pool, query)
235    }
236
237    /// Appends new events to the event store.
238    ///
239    /// This function inserts the provided `events` into the PostgreSQL-backed event store.
240    /// Before inserting, it queries the `event` table to ensure that no events have been
241    /// appended since the given `version`. If newer events are found, a concurrency error
242    /// is returned to prevent invalid state transitions.
243    ///
244    /// If the concurrency check succeeds, the events are inserted into the `event` table.
245    ///
246    /// # Arguments
247    ///
248    /// * `events` - The events to append to the event store.
249    /// * `query` - The stream query that identifies the target event stream.
250    /// * `version` - The ID of the last consumed event, used for optimistic concurrency control.
251    ///
252    /// # Returns
253    ///
254    /// A `Result` containing a vector of `PersistedEvent` representing the appended events,
255    /// or an error of type `Self::Error`.
256    async fn append<QE>(
257        &self,
258        events: Vec<E>,
259        query: StreamQuery<PgEventId, QE>,
260        version: PgEventId,
261    ) -> Result<Vec<PersistedEvent<PgEventId, E>>, Self::Error>
262    where
263        E: Clone + 'async_trait,
264        QE: Event + Clone + Send + Sync,
265    {
266        let mut tx = self.pool.begin().await?;
267
268        let persisted_events = self.append_in_tx(&mut tx, events, query, version).await?;
269
270        tx.commit().await.map_err(map_concurrency_err)?;
271
272        Ok(persisted_events)
273    }
274
275    /// Appends a batch of events to the PostgreSQL-backed event store **without** verifying
276    /// whether new events have been added since the last read.
277    ///
278    /// # Arguments
279    ///
280    /// * `events` - A vector of events to be appended.
281    ///
282    /// # Returns
283    ///
284    /// A `Result` containing a vector of `PersistedEvent` representing the appended events,
285    /// or an error of type `Self::Error`.
286    async fn append_without_validation(
287        &self,
288        events: Vec<E>,
289    ) -> Result<Vec<PersistedEvent<PgEventId, E>>, Self::Error>
290    where
291        E: Clone + 'async_trait,
292    {
293        let mut tx = self.pool.begin().await?;
294
295        sqlx::query("SELECT event_store_begin_epoch()")
296            .execute(&mut *tx)
297            .await?;
298
299        let mut insert = InsertEventsBuilder::new(&events, &self.serde);
300        let event_ids: Vec<PgEventId> = insert
301            .build()
302            .fetch_all(&mut *tx)
303            .await?
304            .into_iter()
305            .map(|r| r.get(0))
306            .collect();
307
308        let persisted_events = event_ids
309            .iter()
310            .zip(events)
311            .map(|(event_id, event)| PersistedEvent::new(*event_id, event))
312            .collect::<Vec<_>>();
313
314        tx.commit().await?;
315
316        Ok(persisted_events)
317    }
318}
319
320fn map_concurrency_err(err: sqlx::Error) -> Error {
321    if let sqlx::Error::Database(ref description) = err {
322        if description.code().as_deref() == Some("40001") {
323            return Error::Concurrency;
324        }
325    }
326    Error::Database(err)
327}