disintegrate_postgres/event_store.rs
1//! PostgreSQL Event Store
2//!
3//! This module provides an implementation of the `EventStore` trait using PostgreSQL as the underlying storage.
4//! It allows storing and retrieving events from a PostgreSQL database.
5mod append;
6mod query;
7#[cfg(test)]
8mod tests;
9
10use append::InsertEventsBuilder;
11use futures::stream::BoxStream;
12use query::CriteriaBuilder;
13use sqlx::postgres::PgPool;
14use sqlx::Row;
15use std::error::Error as StdError;
16
17use std::marker::PhantomData;
18
19use crate::{Error, Migrator, PgEventId};
20use async_stream::stream;
21use async_trait::async_trait;
22use disintegrate::EventStore;
23use disintegrate::{Event, PersistedEvent};
24use disintegrate::{StreamItem, StreamQuery};
25use disintegrate_serde::Serde;
26
27use futures::StreamExt;
28
29/// PostgreSQL event store implementation.
30#[derive(Clone)]
31pub struct PgEventStore<E, S>
32where
33 S: Serde<E> + Send + Sync,
34{
35 pub(crate) pool: PgPool,
36 serde: S,
37 event_type: PhantomData<E>,
38}
39
40impl<E, S> PgEventStore<E, S>
41where
42 S: Serde<E> + Send + Sync + Clone,
43 E: Event + Clone,
44{
45 /// Initializes the PostgreSQL DB and returns a new instance of `PgEventStore`.
46 ///
47 /// # Arguments
48 ///
49 /// * `pool` - The PostgreSQL connection pool.
50 /// * `serde` - The serialization implementation for the event payload.
51 pub async fn try_new(pool: PgPool, serde: S) -> Result<Self, Error> {
52 let event_store = Self::new_uninitialized(pool, serde);
53 Migrator::new(event_store.clone())
54 .init_event_store()
55 .await?;
56 Ok(event_store)
57 }
58 /// Creates a new instance of `PgEventStore`.
59 ///
60 /// This constructor does not initialize the database or add the
61 /// `domain_id` columns necessary for `disintegrate` to function properly.
62 /// If you need to initialize the database, use `PgEventStore::new` instead.
63 ///
64 /// If you plan to use this constructor, ensure that the `disintegrate` is
65 /// properly initialized. Refer to the SQL files in the "event_store/sql" directory
66 /// to recreate the default structure. Additionally, all `domain_id` columns
67 /// and their corresponding indexes must be created manually.
68 ///
69 /// # Arguments
70 ///
71 /// * `pool` - The PostgreSQL connection pool.
72 /// * `serde` - The serialization implementation for the event payload.
73 pub fn new_uninitialized(pool: PgPool, serde: S) -> Self {
74 Self {
75 pool,
76 serde,
77 event_type: PhantomData,
78 }
79 }
80}
81
82impl<E, S> PgEventStore<E, S>
83where
84 S: Serde<E> + Send + Sync,
85 E: Event + Send + Sync,
86{
87 /// Appends events within an existing transaction without committing.
88 ///
89 /// This allows callers to control when the transaction is committed.
90 /// Sets SERIALIZABLE isolation, calls `event_store_begin_epoch()` to
91 /// signal an in-flight write, performs the concurrency check, and inserts events.
92 pub(crate) async fn append_in_tx<QE>(
93 &self,
94 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
95 events: Vec<E>,
96 query: StreamQuery<PgEventId, QE>,
97 version: PgEventId,
98 ) -> Result<Vec<PersistedEvent<PgEventId, E>>, Error>
99 where
100 E: Clone,
101 QE: Event + Clone + Send + Sync,
102 {
103 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
104 .execute(&mut **tx)
105 .await?;
106
107 sqlx::query("SELECT event_store_begin_epoch()")
108 .execute(&mut **tx)
109 .await?;
110
111 if sqlx::query_scalar(&format!(
112 "SELECT EXISTS (SELECT 1 FROM event WHERE {})",
113 CriteriaBuilder::new(&query.change_origin(version)).build()
114 ))
115 .fetch_one(&mut **tx)
116 .await?
117 {
118 return Err(Error::Concurrency);
119 }
120
121 let mut insert = InsertEventsBuilder::new(&events, &self.serde);
122 let event_ids: Vec<PgEventId> = insert
123 .build()
124 .fetch_all(&mut **tx)
125 .await?
126 .into_iter()
127 .map(|r| r.get(0))
128 .collect();
129
130 let persisted_events = event_ids
131 .iter()
132 .zip(events)
133 .map(|(event_id, event)| PersistedEvent::new(*event_id, event))
134 .collect::<Vec<_>>();
135
136 Ok(persisted_events)
137 }
138
139 /// Streams events based on the provided query and executor.
140 ///
141 /// Events are returned up to the current epoch, which is the minimum between
142 /// the lowest in-flight writer's sequence position and the highest committed
143 /// event ID. This guarantees that readers never observe uncommitted events
144 /// and never skip events when concurrent writers commit out of sequence order.
145 ///
146 /// The stream yields `StreamItem::Event` for each matching event and ends
147 /// with `StreamItem::End(epoch_id)` indicating the epoch up to which events
148 /// were read.
149 ///
150 /// # Arguments
151 ///
152 /// * `executor` - The sqlx executor to use for database operations.
153 /// * `query` - The stream query specifying the criteria for filtering events.
154 ///
155 /// # Returns
156 ///
157 /// A boxed stream of `StreamItem` that contains matching events up to the
158 /// current epoch, or an error of type `Error`.
159 pub(crate) fn stream_with<'a, QE, EX>(
160 &'a self,
161 executor: EX,
162 query: &'a StreamQuery<PgEventId, QE>,
163 ) -> BoxStream<'a, Result<StreamItem<PgEventId, QE>, Error>>
164 where
165 EX: sqlx::PgExecutor<'a> + Send + Sync + 'a,
166 QE: TryFrom<E> + Event + Clone + Send + Sync + 'static,
167 <QE as TryFrom<E>>::Error: StdError + Send + Sync + 'static,
168 {
169 let sql = format!(
170 r#"SELECT event.event_id, event.payload, epoch.__epoch_id
171 FROM (values (event_store_current_epoch())) AS epoch(__epoch_id)
172 LEFT JOIN event ON event.event_id <= epoch.__epoch_id AND ({criteria})
173 ORDER BY event_id ASC"#,
174 criteria = CriteriaBuilder::new(query).build()
175 );
176
177 stream! {
178 let mut rows = sqlx::query(&sql).fetch(executor);
179 let mut epoch_id: PgEventId = 0;
180 while let Some(row) = rows.next().await {
181 let row = row?;
182 let event_id: Option<i64> = row.get(0);
183 epoch_id = row.get(2);
184 if let Some(event_id) = event_id {
185 let payload = self.serde.deserialize(row.get(1))?;
186 let payload: QE = payload
187 .try_into()
188 .map_err(|e| Error::QueryEventMapping(Box::new(e)))?;
189 yield Ok(StreamItem::Event(PersistedEvent::new(event_id, payload)));
190 }
191 }
192 yield Ok(StreamItem::End(epoch_id));
193 }
194 .boxed()
195 }
196}
197
198/// Implementation of the event store using PostgreSQL.
199///
200/// This module provides the implementation of the `EventStore` trait for `PgEventStore`,
201/// allowing interaction with a PostgreSQL event store. It enables streaming events based on
202/// a query and appending new events to the event store.
203#[async_trait]
204impl<E, S> EventStore<PgEventId, E> for PgEventStore<E, S>
205where
206 E: Event + Send + Sync,
207 S: Serde<E> + Send + Sync,
208{
209 type Error = Error;
210
211 /// Streams events based on the provided query.
212 ///
213 /// This function fetches events from the PostgreSQL event store based on the provided
214 /// `query`. It constructs a SQL query using the `SqlEventsCriteriaBuilder` and executes
215 /// the query using the `sqlx` crate. The fetched events are then converted into
216 /// `PersistedEvent` instances and streamed as a boxed stream.
217 ///
218 /// # Arguments
219 ///
220 /// * `query` - The stream query specifying the criteria for filtering events.
221 ///
222 /// # Returns
223 ///
224 /// A `Result` containing a boxed stream of `PersistedEvent` that matches the query criteria,
225 /// or an error of type `Self::Error`.
226 fn stream<'a, QE>(
227 &'a self,
228 query: &'a StreamQuery<PgEventId, QE>,
229 ) -> BoxStream<'a, Result<StreamItem<PgEventId, QE>, Self::Error>>
230 where
231 QE: TryFrom<E> + Event + 'static + Clone + Send + Sync,
232 <QE as TryFrom<E>>::Error: StdError + 'static + Send + Sync,
233 {
234 self.stream_with(&self.pool, query)
235 }
236
237 /// Appends new events to the event store.
238 ///
239 /// This function inserts the provided `events` into the PostgreSQL-backed event store.
240 /// Before inserting, it queries the `event` table to ensure that no events have been
241 /// appended since the given `version`. If newer events are found, a concurrency error
242 /// is returned to prevent invalid state transitions.
243 ///
244 /// If the concurrency check succeeds, the events are inserted into the `event` table.
245 ///
246 /// # Arguments
247 ///
248 /// * `events` - The events to append to the event store.
249 /// * `query` - The stream query that identifies the target event stream.
250 /// * `version` - The ID of the last consumed event, used for optimistic concurrency control.
251 ///
252 /// # Returns
253 ///
254 /// A `Result` containing a vector of `PersistedEvent` representing the appended events,
255 /// or an error of type `Self::Error`.
256 async fn append<QE>(
257 &self,
258 events: Vec<E>,
259 query: StreamQuery<PgEventId, QE>,
260 version: PgEventId,
261 ) -> Result<Vec<PersistedEvent<PgEventId, E>>, Self::Error>
262 where
263 E: Clone + 'async_trait,
264 QE: Event + Clone + Send + Sync,
265 {
266 let mut tx = self.pool.begin().await?;
267
268 let persisted_events = self.append_in_tx(&mut tx, events, query, version).await?;
269
270 tx.commit().await.map_err(map_concurrency_err)?;
271
272 Ok(persisted_events)
273 }
274
275 /// Appends a batch of events to the PostgreSQL-backed event store **without** verifying
276 /// whether new events have been added since the last read.
277 ///
278 /// # Arguments
279 ///
280 /// * `events` - A vector of events to be appended.
281 ///
282 /// # Returns
283 ///
284 /// A `Result` containing a vector of `PersistedEvent` representing the appended events,
285 /// or an error of type `Self::Error`.
286 async fn append_without_validation(
287 &self,
288 events: Vec<E>,
289 ) -> Result<Vec<PersistedEvent<PgEventId, E>>, Self::Error>
290 where
291 E: Clone + 'async_trait,
292 {
293 let mut tx = self.pool.begin().await?;
294
295 sqlx::query("SELECT event_store_begin_epoch()")
296 .execute(&mut *tx)
297 .await?;
298
299 let mut insert = InsertEventsBuilder::new(&events, &self.serde);
300 let event_ids: Vec<PgEventId> = insert
301 .build()
302 .fetch_all(&mut *tx)
303 .await?
304 .into_iter()
305 .map(|r| r.get(0))
306 .collect();
307
308 let persisted_events = event_ids
309 .iter()
310 .zip(events)
311 .map(|(event_id, event)| PersistedEvent::new(*event_id, event))
312 .collect::<Vec<_>>();
313
314 tx.commit().await?;
315
316 Ok(persisted_events)
317 }
318}
319
320fn map_concurrency_err(err: sqlx::Error) -> Error {
321 if let sqlx::Error::Database(ref description) = err {
322 if description.code().as_deref() == Some("40001") {
323 return Error::Concurrency;
324 }
325 }
326 Error::Database(err)
327}