use std::marker::PhantomData;
use std::sync::Arc;
use sqlx::{PgConnection, Pool, Postgres};
use tokio::sync::RwLock;
use crate::bus::EventBus;
use crate::handler::{EventHandler, TransactionalEventHandler};
use crate::sql::migrations::{Migrations, MigrationsHandler};
use crate::sql::statements::{Statements, StatementsHandler};
use crate::store::postgres::{InnerPgStore, PgStoreError};
use crate::Aggregate;
use super::persistable::Persistable;
use super::{PgStore, Schema};
pub enum UuidFormat {
V4,
V7,
}
pub struct PgStoreBuilder<A, Schema = <A as Aggregate>::Event>
where
A: Aggregate,
{
pool: Pool<Postgres>,
statements: Statements,
event_handlers: Vec<Box<dyn EventHandler<A> + Send>>,
transactional_event_handlers: Vec<Box<dyn TransactionalEventHandler<A, PgStoreError, PgConnection> + Send>>,
event_buses: Vec<Box<dyn EventBus<A> + Send>>,
event_id_format: UuidFormat,
run_migrations: bool,
_schema: PhantomData<Schema>,
}
impl<A> PgStoreBuilder<A, <A as Aggregate>::Event>
where
A: Aggregate,
{
pub fn new(pool: Pool<Postgres>) -> PgStoreBuilder<A, <A as Aggregate>::Event> {
PgStoreBuilder {
pool,
statements: Statements::new::<A>(),
event_handlers: vec![],
transactional_event_handlers: vec![],
event_buses: vec![],
event_id_format: UuidFormat::V4,
run_migrations: true,
_schema: PhantomData,
}
}
}
impl<A, S> PgStoreBuilder<A, S>
where
A: Aggregate,
{
pub fn with_event_handlers(mut self, event_handlers: Vec<Box<dyn EventHandler<A> + Send>>) -> Self {
self.event_handlers = event_handlers;
self
}
pub fn add_event_handler(mut self, event_handler: impl EventHandler<A> + Send + 'static) -> Self {
self.event_handlers.push(Box::new(event_handler));
self
}
pub fn with_transactional_event_handlers(
mut self,
transactional_event_handlers: Vec<Box<dyn TransactionalEventHandler<A, PgStoreError, PgConnection> + Send>>,
) -> Self {
self.transactional_event_handlers = transactional_event_handlers;
self
}
pub fn add_transactional_event_handler(
mut self,
transaction_event_handler: impl TransactionalEventHandler<A, PgStoreError, PgConnection> + Send + 'static,
) -> Self {
self.transactional_event_handlers
.push(Box::new(transaction_event_handler));
self
}
pub fn with_event_buses(mut self, event_buses: Vec<Box<dyn EventBus<A> + Send>>) -> Self {
self.event_buses = event_buses;
self
}
pub fn add_event_bus(mut self, event_bus: impl EventBus<A> + Send + 'static) -> Self {
self.event_buses.push(Box::new(event_bus));
self
}
pub fn without_running_migrations(mut self) -> Self {
self.run_migrations = false;
self
}
pub fn with_schema<N>(self) -> PgStoreBuilder<A, N>
where
N: Schema<A::Event> + Persistable + Send + Sync,
{
PgStoreBuilder {
pool: self.pool,
statements: self.statements,
run_migrations: self.run_migrations,
event_handlers: self.event_handlers,
transactional_event_handlers: self.transactional_event_handlers,
event_buses: self.event_buses,
event_id_format: self.event_id_format,
_schema: PhantomData,
}
}
pub fn with_event_id_format(mut self, event_id_format: UuidFormat) -> Self {
self.event_id_format = event_id_format;
self
}
pub async fn try_build(self) -> Result<PgStore<A, S>, sqlx::Error> {
if self.run_migrations {
Migrations::run::<A>(&self.pool).await?;
}
Ok(PgStore {
inner: Arc::new(InnerPgStore {
pool: self.pool,
statements: self.statements,
event_handlers: RwLock::new(self.event_handlers),
transactional_event_handlers: self.transactional_event_handlers,
event_buses: self.event_buses,
event_id_format: self.event_id_format,
}),
_schema: self._schema,
})
}
}