use std::marker::PhantomData;
use async_trait::async_trait;
use futures::StreamExt;
use sqlx::{PgConnection, Pool, Postgres, Transaction};
use uuid::Uuid;
use crate::bus::EventBus;
use crate::handler::{ReplayableEventHandler, TransactionalEventHandler};
use crate::rebuilder::Rebuilder;
use crate::store::postgres::persistable::Persistable;
use crate::store::postgres::{PgStore, PgStoreBuilder, PgStoreError, Schema};
use crate::store::{EventStore, StoreEvent};
use crate::Aggregate;
pub struct PgRebuilder<A, Schema = <A as Aggregate>::Event>
where
A: Aggregate,
{
event_handlers: Vec<Box<dyn ReplayableEventHandler<A> + Send>>,
transactional_event_handlers: Vec<Box<dyn TransactionalEventHandler<A, PgStoreError, PgConnection> + Send>>,
event_buses: Vec<Box<dyn EventBus<A> + Send>>,
_schema: PhantomData<Schema>,
}
impl<A> PgRebuilder<A>
where
A: Aggregate,
{
pub fn new() -> Self {
Default::default()
}
pub fn with_event_handlers(self, event_handlers: Vec<Box<dyn ReplayableEventHandler<A> + Send>>) -> Self {
Self { event_handlers, ..self }
}
pub fn with_transactional_event_handlers(
self,
transactional_event_handlers: Vec<Box<dyn TransactionalEventHandler<A, PgStoreError, PgConnection> + Send>>,
) -> Self {
Self {
transactional_event_handlers,
..self
}
}
pub fn with_event_buses(self, event_buses: Vec<Box<dyn EventBus<A> + Send>>) -> Self {
Self { event_buses, ..self }
}
}
impl<A> Default for PgRebuilder<A>
where
A: Aggregate,
{
fn default() -> Self {
Self {
event_handlers: vec![],
transactional_event_handlers: vec![],
event_buses: vec![],
_schema: PhantomData,
}
}
}
#[async_trait]
impl<A, S> Rebuilder<A> for PgRebuilder<A, S>
where
A: Aggregate,
A::State: Send,
A::Event: Send + Sync,
S: Schema<A::Event> + Persistable + Send + Sync,
{
type Executor = Pool<Postgres>;
type Error = PgStoreError;
async fn by_aggregate_id(&self, pool: Pool<Postgres>) -> Result<(), Self::Error> {
let store: PgStore<A, _> = PgStoreBuilder::new(pool.clone())
.without_running_migrations()
.with_schema::<S>()
.try_build()
.await?;
let aggregate_ids: Vec<Uuid> = get_all_aggregate_ids(&pool, store.table_name()).await?;
for id in aggregate_ids {
let mut transaction: Transaction<Postgres> = pool.begin().await.unwrap();
let events = store.by_aggregate_id(id).await.unwrap();
for handler in self.transactional_event_handlers.iter() {
handler.delete(id, &mut transaction).await?;
for event in &events {
handler.handle(event, &mut transaction).await?;
}
}
transaction.commit().await.unwrap();
for handler in self.event_handlers.iter() {
handler.delete(id).await;
for event in &events {
handler.handle(event).await;
}
}
for bus in self.event_buses.iter() {
for event in &events {
bus.publish(event).await;
}
}
}
Ok(())
}
async fn all_at_once(&self, pool: Pool<Postgres>) -> Result<(), Self::Error> {
let store: PgStore<A, _> = PgStoreBuilder::new(pool.clone())
.with_schema::<S>()
.without_running_migrations()
.try_build()
.await?;
let mut transaction: Transaction<Postgres> = pool.begin().await.unwrap();
let events: Vec<StoreEvent<A::Event>> = store
.stream_events(&mut *transaction)
.collect::<Vec<Result<StoreEvent<A::Event>, Self::Error>>>()
.await
.into_iter()
.collect::<Result<Vec<StoreEvent<A::Event>>, Self::Error>>()?;
for event in &events {
for handler in self.transactional_event_handlers.iter() {
handler.delete(event.aggregate_id, &mut transaction).await?;
handler.handle(event, &mut transaction).await?;
}
}
transaction.commit().await?;
for event in &events {
for handler in self.event_handlers.iter() {
handler.delete(event.aggregate_id).await;
handler.handle(event).await;
}
for bus in self.event_buses.iter() {
for event in &events {
bus.publish(event).await;
}
}
}
Ok(())
}
}
async fn get_all_aggregate_ids(pool: &Pool<Postgres>, store_table_name: &str) -> Result<Vec<Uuid>, sqlx::Error> {
let query: String = format!("SELECT DISTINCT(aggregate_id) FROM {}", store_table_name);
let result: Vec<(Uuid,)> = sqlx::query_as::<_, (Uuid,)>(query.as_str()).fetch_all(pool).await?;
Ok(result.iter().map(|v| v.0).collect())
}