disintegrate_postgres/event_store.rs
1//! PostgreSQL Event Store
2//!
3//! This module provides an implementation of the `EventStore` trait using PostgreSQL as the underlying storage.
4//! It allows storing and retrieving events from a PostgreSQL database.
5mod append;
6mod query;
7#[cfg(test)]
8mod tests;
9
10use append::InsertEventsBuilder;
11use futures::stream::BoxStream;
12use query::CriteriaBuilder;
13use sqlx::postgres::PgPool;
14use sqlx::Row;
15use std::error::Error as StdError;
16
17use std::marker::PhantomData;
18
19use crate::{Error, Migrator, PgEventId};
20use async_stream::stream;
21use async_trait::async_trait;
22use disintegrate::EventStore;
23use disintegrate::{Event, PersistedEvent};
24use disintegrate::{StreamItem, StreamQuery};
25use disintegrate_serde::Serde;
26
27use futures::StreamExt;
28
29/// PostgreSQL event store implementation.
30#[derive(Clone)]
31pub struct PgEventStore<E, S>
32where
33 S: Serde<E> + Send + Sync,
34{
35 pub(crate) pool: PgPool,
36 serde: S,
37 event_type: PhantomData<E>,
38}
39
40impl<E, S> PgEventStore<E, S>
41where
42 S: Serde<E> + Send + Sync + Clone,
43 E: Event + Clone,
44{
45 /// Initializes the PostgreSQL DB and returns a new instance of `PgEventStore`.
46 ///
47 /// # Arguments
48 ///
49 /// * `pool` - The PostgreSQL connection pool.
50 /// * `serde` - The serialization implementation for the event payload.
51 pub async fn try_new(pool: PgPool, serde: S) -> Result<Self, Error> {
52 let event_store = Self::new_uninitialized(pool, serde);
53 Migrator::new(event_store.clone())
54 .init_event_store()
55 .await?;
56 Ok(event_store)
57 }
58 /// Creates a new instance of `PgEventStore`.
59 ///
60 /// This constructor does not initialize the database or add the
61 /// `domain_id` columns necessary for `disintegrate` to function properly.
62 /// If you need to initialize the database, use `PgEventStore::new` instead.
63 ///
64 /// If you plan to use this constructor, ensure that the `disintegrate` is
65 /// properly initialized. Refer to the SQL files in the "event_store/sql" directory
66 /// to recreate the default structure. Additionally, all `domain_id` columns
67 /// and their corresponding indexes must be created manually.
68 ///
69 /// # Arguments
70 ///
71 /// * `pool` - The PostgreSQL connection pool.
72 /// * `serde` - The serialization implementation for the event payload.
73 pub fn new_uninitialized(pool: PgPool, serde: S) -> Self {
74 Self {
75 pool,
76 serde,
77 event_type: PhantomData,
78 }
79 }
80}
81
82impl<E, S> PgEventStore<E, S>
83where
84 S: Serde<E> + Send + Sync,
85 E: Event + Send + Sync,
86{
87 /// Streams events based on the provided query and executor.
88 ///
89 /// # Arguments
90 ///
91 /// * `executor` - The sqlx executor to use for database operations.
92 /// * `query` - The stream query specifying the criteria for filtering events.
93 ///
94 /// # Returns
95 ///
96 /// A `Result` containing a boxed stream of `PersistedEvent` that matches the query criteria,
97 /// or an error of type `Error`.
98 pub(crate) fn stream_with<'a, QE, EX>(
99 &'a self,
100 executor: EX,
101 query: &'a StreamQuery<PgEventId, QE>,
102 ) -> BoxStream<'a, Result<StreamItem<PgEventId, QE>, Error>>
103 where
104 EX: sqlx::PgExecutor<'a> + Send + Sync + 'a,
105 QE: TryFrom<E> + Event + Clone + Send + Sync + 'static,
106 <QE as TryFrom<E>>::Error: StdError + Send + Sync + 'static,
107 {
108 let sql = format!(
109 r#"SELECT event.event_id, event.payload, epoch.__epoch_id
110 FROM (SELECT COALESCE(MAX(event_id),0) AS __epoch_id FROM event) AS epoch
111 LEFT JOIN event ON event.event_id <= epoch.__epoch_id AND ({criteria})
112 ORDER BY event_id ASC"#,
113 criteria = CriteriaBuilder::new(query).build()
114 );
115
116 stream! {
117 let mut rows = sqlx::query(&sql).fetch(executor);
118 let mut epoch_id: PgEventId = 0;
119 while let Some(row) = rows.next().await {
120 let row = row?;
121 let event_id: Option<i64> = row.get(0);
122 epoch_id = row.get(2);
123 if let Some(event_id) = event_id {
124 let payload = self.serde.deserialize(row.get(1))?;
125 let payload: QE = payload
126 .try_into()
127 .map_err(|e| Error::QueryEventMapping(Box::new(e)))?;
128 yield Ok(StreamItem::Event(PersistedEvent::new(event_id, payload)));
129 }
130 }
131 yield Ok(StreamItem::End(epoch_id));
132 }
133 .boxed()
134 }
135}
136
137/// Implementation of the event store using PostgreSQL.
138///
139/// This module provides the implementation of the `EventStore` trait for `PgEventStore`,
140/// allowing interaction with a PostgreSQL event store. It enables streaming events based on
141/// a query and appending new events to the event store.
142#[async_trait]
143impl<E, S> EventStore<PgEventId, E> for PgEventStore<E, S>
144where
145 E: Event + Send + Sync,
146 S: Serde<E> + Send + Sync,
147{
148 type Error = Error;
149
150 /// Streams events based on the provided query.
151 ///
152 /// This function fetches events from the PostgreSQL event store based on the provided
153 /// `query`. It constructs a SQL query using the `SqlEventsCriteriaBuilder` and executes
154 /// the query using the `sqlx` crate. The fetched events are then converted into
155 /// `PersistedEvent` instances and streamed as a boxed stream.
156 ///
157 /// # Arguments
158 ///
159 /// * `query` - The stream query specifying the criteria for filtering events.
160 ///
161 /// # Returns
162 ///
163 /// A `Result` containing a boxed stream of `PersistedEvent` that matches the query criteria,
164 /// or an error of type `Self::Error`.
165 fn stream<'a, QE>(
166 &'a self,
167 query: &'a StreamQuery<PgEventId, QE>,
168 ) -> BoxStream<'a, Result<StreamItem<PgEventId, QE>, Self::Error>>
169 where
170 QE: TryFrom<E> + Event + 'static + Clone + Send + Sync,
171 <QE as TryFrom<E>>::Error: StdError + 'static + Send + Sync,
172 {
173 self.stream_with(&self.pool, query)
174 }
175
176 /// Appends new events to the event store.
177 ///
178 /// This function inserts the provided `events` into the PostgreSQL-backed event store.
179 /// Before inserting, it queries the `event` table to ensure that no events have been
180 /// appended since the given `version`. If newer events are found, a concurrency error
181 /// is returned to prevent invalid state transitions.
182 ///
183 /// If the concurrency check succeeds, the events are inserted into the `event` table.
184 ///
185 /// # Arguments
186 ///
187 /// * `events` - The events to append to the event store.
188 /// * `query` - The stream query that identifies the target event stream.
189 /// * `version` - The ID of the last consumed event, used for optimistic concurrency control.
190 ///
191 /// # Returns
192 ///
193 /// A `Result` containing a vector of `PersistedEvent` representing the appended events,
194 /// or an error of type `Self::Error`.
195 async fn append<QE>(
196 &self,
197 events: Vec<E>,
198 query: StreamQuery<PgEventId, QE>,
199 version: PgEventId,
200 ) -> Result<Vec<PersistedEvent<PgEventId, E>>, Self::Error>
201 where
202 E: Clone + 'async_trait,
203 QE: Event + Clone + Send + Sync,
204 {
205 let mut tx = self.pool.begin().await?;
206 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
207 .execute(&mut *tx)
208 .await?;
209
210 if sqlx::query_scalar(&format!(
211 "SELECT EXISTS (SELECT 1 FROM event WHERE {})",
212 CriteriaBuilder::new(&query.change_origin(version)).build()
213 ))
214 .fetch_one(&mut *tx)
215 .await?
216 {
217 return Err(Error::Concurrency);
218 }
219
220 let mut insert = InsertEventsBuilder::new(&events, &self.serde);
221 let event_ids: Vec<PgEventId> = insert
222 .build()
223 .fetch_all(&mut *tx)
224 .await?
225 .into_iter()
226 .map(|r| r.get(0))
227 .collect();
228
229 let persisted_events = event_ids
230 .iter()
231 .zip(events)
232 .map(|(event_id, event)| PersistedEvent::new(*event_id, event))
233 .collect::<Vec<_>>();
234
235 tx.commit().await.map_err(map_concurrency_err)?;
236
237 Ok(persisted_events)
238 }
239
240 /// Appends a batch of events to the PostgreSQL-backed event store **without** verifying
241 /// whether new events have been added since the last read.
242 ///
243 /// # Arguments
244 ///
245 /// * `events` - A vector of events to be appended.
246 ///
247 /// # Returns
248 ///
249 /// A `Result` containing a vector of `PersistedEvent` representing the appended events,
250 /// or an error of type `Self::Error`.
251 async fn append_without_validation(
252 &self,
253 events: Vec<E>,
254 ) -> Result<Vec<PersistedEvent<PgEventId, E>>, Self::Error>
255 where
256 E: Clone + 'async_trait,
257 {
258 let mut tx = self.pool.begin().await?;
259
260 let mut insert = InsertEventsBuilder::new(&events, &self.serde);
261 let event_ids: Vec<PgEventId> = insert
262 .build()
263 .fetch_all(&mut *tx)
264 .await?
265 .into_iter()
266 .map(|r| r.get(0))
267 .collect();
268
269 let persisted_events = event_ids
270 .iter()
271 .zip(events)
272 .map(|(event_id, event)| PersistedEvent::new(*event_id, event))
273 .collect::<Vec<_>>();
274
275 tx.commit().await?;
276
277 Ok(persisted_events)
278 }
279}
280
281fn map_concurrency_err(err: sqlx::Error) -> Error {
282 if let sqlx::Error::Database(ref description) = err {
283 if description.code().as_deref() == Some("40001") {
284 return Error::Concurrency;
285 }
286 }
287 Error::Database(err)
288}