use crate::{
outbox::IOutBox,
prelude::{Aggregate, AtomicContextManager, BaseError},
repository::TRepository,
};
use async_trait::async_trait;
use std::{marker::PhantomData, sync::Arc};
use tokio::sync::RwLock;
#[async_trait]
pub trait Executor: Sync + Send {
async fn new() -> Arc<RwLock<Self>>;
async fn begin(&mut self) -> Result<(), BaseError>;
async fn commit(&mut self) -> Result<(), BaseError>;
async fn rollback(&mut self) -> Result<(), BaseError>;
}
#[async_trait]
pub trait TUnitOfWork<R, E, A>: Send + Sync
where
R: TRepository<E, A>,
E: Executor,
A: Aggregate,
{
fn clone_context(&self) -> AtomicContextManager;
fn clone_executor(&self) -> Arc<RwLock<E>>;
async fn new(context: AtomicContextManager) -> Self;
fn repository(&mut self) -> &mut R;
async fn begin(&mut self) -> Result<(), BaseError>;
async fn commit<O: IOutBox<E>>(mut self) -> Result<(), BaseError>;
async fn rollback(self) -> Result<(), BaseError>;
}
#[derive(Clone)]
pub struct UnitOfWork<R, E, A>
where
R: TRepository<E, A>,
E: Executor,
A: Aggregate,
{
executor: Arc<RwLock<E>>,
context: AtomicContextManager,
_aggregate: PhantomData<A>,
pub repository: R,
}
impl<R, E, A> UnitOfWork<R, E, A>
where
R: TRepository<E, A>,
E: Executor,
A: Aggregate,
{
async fn _commit(&mut self) -> Result<(), BaseError> {
let mut executor = self.executor.write().await;
executor.commit().await
}
async fn _commit_hook<O: IOutBox<E>>(&mut self) -> Result<(), BaseError> {
let event_queue = &mut self.context.write().await;
let mut outboxes = vec![];
for e in self.repository.get_events() {
if e.externally_notifiable() {
outboxes.push(e.outbox());
};
if e.internally_notifiable() {
event_queue.push_back(e.message_clone());
}
}
O::add(self.executor.clone(), outboxes).await
}
}
#[async_trait]
impl<R, E, A> TUnitOfWork<R, E, A> for UnitOfWork<R, E, A>
where
R: TRepository<E, A>,
E: Executor,
A: Aggregate,
{
fn clone_context(&self) -> AtomicContextManager {
Arc::clone(&self.context)
}
fn clone_executor(&self) -> Arc<RwLock<E>> {
self.executor.clone()
}
async fn new(context: AtomicContextManager) -> Self {
let executor: Arc<RwLock<E>> = E::new().await;
let mut uow = Self {
repository: R::new(Arc::clone(&executor)),
context,
executor,
_aggregate: PhantomData,
};
uow.begin().await.unwrap();
uow
}
fn repository(&mut self) -> &mut R {
&mut self.repository
}
async fn begin(&mut self) -> Result<(), BaseError> {
let mut executor = self.executor.write().await;
executor.begin().await
}
async fn commit<O: IOutBox<E>>(mut self) -> Result<(), BaseError> {
self._commit_hook::<O>().await?;
self._commit().await
}
async fn rollback(self) -> Result<(), BaseError> {
let mut executor = self.executor.write().await;
executor.rollback().await
}
}