Skip to main content

disintegrate_postgres/
event_store.rs

1//! PostgreSQL Event Store
2//!
3//! This module provides an implementation of the `EventStore` trait using PostgreSQL as the underlying storage.
4//! It allows storing and retrieving events from a PostgreSQL database.
5mod append;
6mod query;
7#[cfg(test)]
8mod tests;
9
10use append::InsertEventsBuilder;
11use futures::stream::BoxStream;
12use query::CriteriaBuilder;
13use sqlx::postgres::PgPool;
14use sqlx::Row;
15use std::error::Error as StdError;
16
17use std::marker::PhantomData;
18
19use crate::{Error, Migrator, PgEventId};
20use async_stream::stream;
21use async_trait::async_trait;
22use disintegrate::EventStore;
23use disintegrate::{Event, PersistedEvent};
24use disintegrate::{StreamItem, StreamQuery};
25use disintegrate_serde::Serde;
26
27use futures::StreamExt;
28
29/// PostgreSQL event store implementation.
30#[derive(Clone)]
31pub struct PgEventStore<E, S>
32where
33    S: Serde<E> + Send + Sync,
34{
35    pub(crate) pool: PgPool,
36    serde: S,
37    event_type: PhantomData<E>,
38}
39
40impl<E, S> PgEventStore<E, S>
41where
42    S: Serde<E> + Send + Sync + Clone,
43    E: Event + Clone,
44{
45    /// Initializes the PostgreSQL DB and returns a new instance of `PgEventStore`.
46    ///
47    /// # Arguments
48    ///
49    /// * `pool` - The PostgreSQL connection pool.
50    /// * `serde` - The serialization implementation for the event payload.
51    pub async fn try_new(pool: PgPool, serde: S) -> Result<Self, Error> {
52        let event_store = Self::new_uninitialized(pool, serde);
53        Migrator::new(event_store.clone())
54            .init_event_store()
55            .await?;
56        Ok(event_store)
57    }
58    /// Creates a new instance of `PgEventStore`.
59    ///
60    /// This constructor does not initialize the database or add the
61    /// `domain_id` columns necessary for `disintegrate` to function properly.
62    /// If you need to initialize the database, use `PgEventStore::new` instead.
63    ///
64    /// If you plan to use this constructor, ensure that the `disintegrate` is
65    /// properly initialized. Refer to the SQL files in the "event_store/sql" directory
66    /// to recreate the default structure. Additionally, all `domain_id` columns
67    /// and their corresponding indexes must be created manually.
68    ///
69    /// # Arguments
70    ///
71    /// * `pool` - The PostgreSQL connection pool.
72    /// * `serde` - The serialization implementation for the event payload.
73    pub fn new_uninitialized(pool: PgPool, serde: S) -> Self {
74        Self {
75            pool,
76            serde,
77            event_type: PhantomData,
78        }
79    }
80}
81
82impl<E, S> PgEventStore<E, S>
83where
84    S: Serde<E> + Send + Sync,
85    E: Event + Send + Sync,
86{
87    /// Streams events based on the provided query and executor.
88    ///
89    /// # Arguments
90    ///
91    /// * `executor` - The sqlx executor to use for database operations.
92    /// * `query` - The stream query specifying the criteria for filtering events.
93    ///
94    /// # Returns
95    ///
96    /// A `Result` containing a boxed stream of `PersistedEvent` that matches the query criteria,
97    /// or an error of type `Error`.
98    pub(crate) fn stream_with<'a, QE, EX>(
99        &'a self,
100        executor: EX,
101        query: &'a StreamQuery<PgEventId, QE>,
102    ) -> BoxStream<'a, Result<StreamItem<PgEventId, QE>, Error>>
103    where
104        EX: sqlx::PgExecutor<'a> + Send + Sync + 'a,
105        QE: TryFrom<E> + Event + Clone + Send + Sync + 'static,
106        <QE as TryFrom<E>>::Error: StdError + Send + Sync + 'static,
107    {
108        let sql = format!(
109            r#"SELECT event.event_id, event.payload, epoch.__epoch_id
110            FROM (SELECT COALESCE(MAX(event_id),0) AS __epoch_id FROM event) AS epoch
111            LEFT JOIN event ON event.event_id <= epoch.__epoch_id AND ({criteria})
112            ORDER BY event_id ASC"#,
113            criteria = CriteriaBuilder::new(query).build()
114        );
115
116        stream! {
117            let mut rows = sqlx::query(&sql).fetch(executor);
118            let mut epoch_id: PgEventId = 0;
119            while let Some(row) = rows.next().await {
120                let row = row?;
121                let event_id: Option<i64> = row.get(0);
122                epoch_id = row.get(2);
123                if let Some(event_id) = event_id {
124                    let payload = self.serde.deserialize(row.get(1))?;
125                    let payload: QE = payload
126                        .try_into()
127                        .map_err(|e| Error::QueryEventMapping(Box::new(e)))?;
128                    yield Ok(StreamItem::Event(PersistedEvent::new(event_id, payload)));
129                }
130            }
131            yield Ok(StreamItem::End(epoch_id));
132        }
133        .boxed()
134    }
135}
136
137/// Implementation of the event store using PostgreSQL.
138///
139/// This module provides the implementation of the `EventStore` trait for `PgEventStore`,
140/// allowing interaction with a PostgreSQL event store. It enables streaming events based on
141/// a query and appending new events to the event store.
142#[async_trait]
143impl<E, S> EventStore<PgEventId, E> for PgEventStore<E, S>
144where
145    E: Event + Send + Sync,
146    S: Serde<E> + Send + Sync,
147{
148    type Error = Error;
149
150    /// Streams events based on the provided query.
151    ///
152    /// This function fetches events from the PostgreSQL event store based on the provided
153    /// `query`. It constructs a SQL query using the `SqlEventsCriteriaBuilder` and executes
154    /// the query using the `sqlx` crate. The fetched events are then converted into
155    /// `PersistedEvent` instances and streamed as a boxed stream.
156    ///
157    /// # Arguments
158    ///
159    /// * `query` - The stream query specifying the criteria for filtering events.
160    ///
161    /// # Returns
162    ///
163    /// A `Result` containing a boxed stream of `PersistedEvent` that matches the query criteria,
164    /// or an error of type `Self::Error`.
165    fn stream<'a, QE>(
166        &'a self,
167        query: &'a StreamQuery<PgEventId, QE>,
168    ) -> BoxStream<'a, Result<StreamItem<PgEventId, QE>, Self::Error>>
169    where
170        QE: TryFrom<E> + Event + 'static + Clone + Send + Sync,
171        <QE as TryFrom<E>>::Error: StdError + 'static + Send + Sync,
172    {
173        self.stream_with(&self.pool, query)
174    }
175
176    /// Appends new events to the event store.
177    ///
178    /// This function inserts the provided `events` into the PostgreSQL-backed event store.
179    /// Before inserting, it queries the `event` table to ensure that no events have been
180    /// appended since the given `version`. If newer events are found, a concurrency error
181    /// is returned to prevent invalid state transitions.
182    ///
183    /// If the concurrency check succeeds, the events are inserted into the `event` table.
184    ///
185    /// # Arguments
186    ///
187    /// * `events` - The events to append to the event store.
188    /// * `query` - The stream query that identifies the target event stream.
189    /// * `version` - The ID of the last consumed event, used for optimistic concurrency control.
190    ///
191    /// # Returns
192    ///
193    /// A `Result` containing a vector of `PersistedEvent` representing the appended events,
194    /// or an error of type `Self::Error`.
195    async fn append<QE>(
196        &self,
197        events: Vec<E>,
198        query: StreamQuery<PgEventId, QE>,
199        version: PgEventId,
200    ) -> Result<Vec<PersistedEvent<PgEventId, E>>, Self::Error>
201    where
202        E: Clone + 'async_trait,
203        QE: Event + Clone + Send + Sync,
204    {
205        let mut tx = self.pool.begin().await?;
206        sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
207            .execute(&mut *tx)
208            .await?;
209
210        if sqlx::query_scalar(&format!(
211            "SELECT EXISTS (SELECT 1 FROM event WHERE {})",
212            CriteriaBuilder::new(&query.change_origin(version)).build()
213        ))
214        .fetch_one(&mut *tx)
215        .await?
216        {
217            return Err(Error::Concurrency);
218        }
219
220        let mut insert = InsertEventsBuilder::new(&events, &self.serde);
221        let event_ids: Vec<PgEventId> = insert
222            .build()
223            .fetch_all(&mut *tx)
224            .await?
225            .into_iter()
226            .map(|r| r.get(0))
227            .collect();
228
229        let persisted_events = event_ids
230            .iter()
231            .zip(events)
232            .map(|(event_id, event)| PersistedEvent::new(*event_id, event))
233            .collect::<Vec<_>>();
234
235        tx.commit().await.map_err(map_concurrency_err)?;
236
237        Ok(persisted_events)
238    }
239
240    /// Appends a batch of events to the PostgreSQL-backed event store **without** verifying
241    /// whether new events have been added since the last read.
242    ///
243    /// # Arguments
244    ///
245    /// * `events` - A vector of events to be appended.
246    ///
247    /// # Returns
248    ///
249    /// A `Result` containing a vector of `PersistedEvent` representing the appended events,
250    /// or an error of type `Self::Error`.
251    async fn append_without_validation(
252        &self,
253        events: Vec<E>,
254    ) -> Result<Vec<PersistedEvent<PgEventId, E>>, Self::Error>
255    where
256        E: Clone + 'async_trait,
257    {
258        let mut tx = self.pool.begin().await?;
259
260        let mut insert = InsertEventsBuilder::new(&events, &self.serde);
261        let event_ids: Vec<PgEventId> = insert
262            .build()
263            .fetch_all(&mut *tx)
264            .await?
265            .into_iter()
266            .map(|r| r.get(0))
267            .collect();
268
269        let persisted_events = event_ids
270            .iter()
271            .zip(events)
272            .map(|(event_id, event)| PersistedEvent::new(*event_id, event))
273            .collect::<Vec<_>>();
274
275        tx.commit().await?;
276
277        Ok(persisted_events)
278    }
279}
280
281fn map_concurrency_err(err: sqlx::Error) -> Error {
282    if let sqlx::Error::Database(ref description) = err {
283        if description.code().as_deref() == Some("40001") {
284            return Error::Concurrency;
285        }
286    }
287    Error::Database(err)
288}