use crate::{
EventSourcingDbError, PostgresTransaction, SideEffectStorage, encryption::EncryptionProvider,
pickle::Pickle, reader_impl, table_config::TableConfig,
};
use async_trait::async_trait;
use eventastic::{
aggregate::{Aggregate, Context, SideEffect},
event::{DomainEvent, EventStoreEvent},
repository::{Repository, RepositoryError, RepositoryReader, Snapshot},
};
use sqlx::{
Pool, Postgres,
postgres::{PgConnectOptions, PgPoolOptions},
types::Uuid,
};
use std::marker::PhantomData;
#[derive(Clone)]
pub struct PostgresRepository<T, O, E> {
inner: Pool<Postgres>,
outbox: O,
table_config: TableConfig,
encryption_provider: E,
phantom_aggregate: std::marker::PhantomData<T>,
}
impl<T, O, E> PostgresRepository<T, O, E> {
pub async fn new(
connect_options: PgConnectOptions,
pool_options: PgPoolOptions,
table_config: TableConfig,
outbox: O,
encryption_provider: E,
) -> Result<Self, sqlx::Error> {
let pool = pool_options.connect_with(connect_options).await?;
Ok(Self {
inner: pool,
outbox,
table_config,
encryption_provider,
phantom_aggregate: PhantomData,
})
}
pub async fn begin_transaction(&self) -> Result<PostgresTransaction<'_, T, O, E>, sqlx::Error> {
Ok(PostgresTransaction {
inner: self.inner.begin().await?,
outbox: &self.outbox,
table_config: &self.table_config,
encryption_provider: &self.encryption_provider,
phantom_aggregate: PhantomData,
})
}
pub fn transaction_from<'a>(
&'a self,
transaction: sqlx::Transaction<'a, Postgres>,
) -> PostgresTransaction<'a, T, O, E> {
PostgresTransaction {
inner: transaction,
outbox: &self.outbox,
table_config: &self.table_config,
encryption_provider: &self.encryption_provider,
phantom_aggregate: PhantomData,
}
}
pub async fn run_migrations(&self) -> Result<(), sqlx::Error> {
sqlx::migrate!("./migrations").run(&self.inner).await?;
Ok(())
}
}
#[async_trait]
impl<T, O, E> RepositoryReader<T> for PostgresRepository<T, O, E>
where
T: Aggregate<AggregateId = Uuid> + Pickle + Send + Sync + 'static,
T::DomainEvent: DomainEvent<EventId = Uuid> + Pickle + Send + Sync,
T::SideEffect: SideEffect<SideEffectId = Uuid> + Pickle + Send + Sync,
T::ApplyError: Send + Sync,
O: SideEffectStorage<E::Error, T::SideEffect> + Clone + Send + Sync,
E: EncryptionProvider + Clone + Send + Sync,
{
type DbError = EventSourcingDbError<E, T>;
fn stream_from(
&mut self,
id: &T::AggregateId,
version: u64,
) -> impl futures::Stream<
Item = std::result::Result<
eventastic::event::EventStoreEvent<
<T as eventastic::aggregate::Aggregate>::DomainEvent,
>,
Self::DbError,
>,
> {
let query = &self.table_config.stream_events_query;
Box::pin(reader_impl::stream_from::<_, T, E>(
&self.inner,
id,
version,
query.clone(),
&self.encryption_provider,
))
}
async fn get_event(
&mut self,
aggregate_id: &T::AggregateId,
event_id: &<<T as Aggregate>::DomainEvent as DomainEvent>::EventId,
) -> Result<Option<EventStoreEvent<<T as Aggregate>::DomainEvent>>, Self::DbError> {
let query = &self.table_config.get_event_query;
reader_impl::get_event::<_, T, E>(
&self.inner,
aggregate_id,
event_id,
query,
&self.encryption_provider,
)
.await
}
async fn get_snapshot(
&mut self,
id: &T::AggregateId,
) -> Result<Option<Snapshot<T>>, Self::DbError> {
let query = &self.table_config.get_snapshot_query;
reader_impl::get_snapshot::<_, T, E>(&self.inner, id, query, &self.encryption_provider)
.await
}
}
#[async_trait]
impl<T, O, E> Repository<T> for PostgresRepository<T, O, E>
where
T: Aggregate<AggregateId = Uuid> + Pickle + Send + Sync + 'static,
T::DomainEvent: DomainEvent<EventId = Uuid> + Pickle + Send + Sync,
T::SideEffect: eventastic::aggregate::SideEffect<SideEffectId = Uuid> + Pickle + Send + Sync,
T::ApplyError: Send + Sync,
O: SideEffectStorage<E::Error, T::SideEffect> + Clone + Send + Sync,
E: EncryptionProvider + Clone + Send + Sync,
{
type Error = RepositoryError<
T::ApplyError,
<<T as Aggregate>::DomainEvent as DomainEvent>::EventId,
EventSourcingDbError<E, T>,
>;
async fn load(&self, aggregate_id: &T::AggregateId) -> Result<Context<T>, Self::Error> {
let mut repo_ref = self.clone();
Context::load(&mut repo_ref, aggregate_id).await
}
}