mod append;
mod query;
#[cfg(test)]
mod tests;
use append::InsertEventsBuilder;
use futures::stream::BoxStream;
use query::CriteriaBuilder;
use sqlx::postgres::PgPool;
use sqlx::Row;
use std::error::Error as StdError;
use std::marker::PhantomData;
use crate::{Error, Migrator, PgEventId};
use async_stream::stream;
use async_trait::async_trait;
use disintegrate::EventStore;
use disintegrate::{Event, PersistedEvent};
use disintegrate::{StreamItem, StreamQuery};
use disintegrate_serde::Serde;
use futures::StreamExt;
#[derive(Clone)]
pub struct PgEventStore<E, S>
where
S: Serde<E> + Send + Sync,
{
pub(crate) pool: PgPool,
serde: S,
event_type: PhantomData<E>,
}
impl<E, S> PgEventStore<E, S>
where
S: Serde<E> + Send + Sync + Clone,
E: Event + Clone,
{
pub async fn try_new(pool: PgPool, serde: S) -> Result<Self, Error> {
let event_store = Self::new_uninitialized(pool, serde);
Migrator::new(event_store.clone())
.init_event_store()
.await?;
Ok(event_store)
}
pub fn new_uninitialized(pool: PgPool, serde: S) -> Self {
Self {
pool,
serde,
event_type: PhantomData,
}
}
}
impl<E, S> PgEventStore<E, S>
where
S: Serde<E> + Send + Sync,
E: Event + Send + Sync,
{
pub(crate) async fn append_in_tx<QE>(
&self,
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
events: Vec<E>,
query: StreamQuery<PgEventId, QE>,
version: PgEventId,
) -> Result<Vec<PersistedEvent<PgEventId, E>>, Error>
where
E: Clone,
QE: Event + Clone + Send + Sync,
{
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut **tx)
.await?;
sqlx::query("SELECT event_store_begin_epoch()")
.execute(&mut **tx)
.await?;
if sqlx::query_scalar(&format!(
"SELECT EXISTS (SELECT 1 FROM event WHERE {})",
CriteriaBuilder::new(&query.change_origin(version)).build()
))
.fetch_one(&mut **tx)
.await?
{
return Err(Error::Concurrency);
}
let mut insert = InsertEventsBuilder::new(&events, &self.serde);
let event_ids: Vec<PgEventId> = insert
.build()
.fetch_all(&mut **tx)
.await?
.into_iter()
.map(|r| r.get(0))
.collect();
let persisted_events = event_ids
.iter()
.zip(events)
.map(|(event_id, event)| PersistedEvent::new(*event_id, event))
.collect::<Vec<_>>();
Ok(persisted_events)
}
pub(crate) fn stream_with<'a, QE, EX>(
&'a self,
executor: EX,
query: &'a StreamQuery<PgEventId, QE>,
) -> BoxStream<'a, Result<StreamItem<PgEventId, QE>, Error>>
where
EX: sqlx::PgExecutor<'a> + Send + Sync + 'a,
QE: TryFrom<E> + Event + Clone + Send + Sync + 'static,
<QE as TryFrom<E>>::Error: StdError + Send + Sync + 'static,
{
let sql = format!(
r#"SELECT event.event_id, event.payload, epoch.__epoch_id
FROM (values (event_store_current_epoch())) AS epoch(__epoch_id)
LEFT JOIN event ON event.event_id <= epoch.__epoch_id AND ({criteria})
ORDER BY event_id ASC"#,
criteria = CriteriaBuilder::new(query).build()
);
stream! {
let mut rows = sqlx::query(&sql).fetch(executor);
let mut epoch_id: PgEventId = 0;
while let Some(row) = rows.next().await {
let row = row?;
let event_id: Option<i64> = row.get(0);
epoch_id = row.get(2);
if let Some(event_id) = event_id {
let payload = self.serde.deserialize(row.get(1))?;
let payload: QE = payload
.try_into()
.map_err(|e| Error::QueryEventMapping(Box::new(e)))?;
yield Ok(StreamItem::Event(PersistedEvent::new(event_id, payload)));
}
}
yield Ok(StreamItem::End(epoch_id));
}
.boxed()
}
}
#[async_trait]
impl<E, S> EventStore<PgEventId, E> for PgEventStore<E, S>
where
E: Event + Send + Sync,
S: Serde<E> + Send + Sync,
{
type Error = Error;
fn stream<'a, QE>(
&'a self,
query: &'a StreamQuery<PgEventId, QE>,
) -> BoxStream<'a, Result<StreamItem<PgEventId, QE>, Self::Error>>
where
QE: TryFrom<E> + Event + 'static + Clone + Send + Sync,
<QE as TryFrom<E>>::Error: StdError + 'static + Send + Sync,
{
self.stream_with(&self.pool, query)
}
async fn append<QE>(
&self,
events: Vec<E>,
query: StreamQuery<PgEventId, QE>,
version: PgEventId,
) -> Result<Vec<PersistedEvent<PgEventId, E>>, Self::Error>
where
E: Clone + 'async_trait,
QE: Event + Clone + Send + Sync,
{
let mut tx = self.pool.begin().await?;
let persisted_events = self.append_in_tx(&mut tx, events, query, version).await?;
tx.commit().await.map_err(map_concurrency_err)?;
Ok(persisted_events)
}
async fn append_without_validation(
&self,
events: Vec<E>,
) -> Result<Vec<PersistedEvent<PgEventId, E>>, Self::Error>
where
E: Clone + 'async_trait,
{
let mut tx = self.pool.begin().await?;
sqlx::query("SELECT event_store_begin_epoch()")
.execute(&mut *tx)
.await?;
let mut insert = InsertEventsBuilder::new(&events, &self.serde);
let event_ids: Vec<PgEventId> = insert
.build()
.fetch_all(&mut *tx)
.await?
.into_iter()
.map(|r| r.get(0))
.collect();
let persisted_events = event_ids
.iter()
.zip(events)
.map(|(event_id, event)| PersistedEvent::new(*event_id, event))
.collect::<Vec<_>>();
tx.commit().await?;
Ok(persisted_events)
}
}
fn map_concurrency_err(err: sqlx::Error) -> Error {
if let sqlx::Error::Database(ref description) = err {
if description.code().as_deref() == Some("40001") {
return Error::Concurrency;
}
}
Error::Database(err)
}