event_driven_core/
unit_of_work.rs1use crate::{
60 outbox::IOutBox,
61 prelude::{Aggregate, AtomicContextManager, BaseError},
62 repository::TRepository,
63};
64use async_trait::async_trait;
65use std::{marker::PhantomData, sync::Arc};
66use tokio::sync::RwLock;
67
68#[async_trait]
71pub trait Executor: Sync + Send {
72 async fn new() -> Arc<RwLock<Self>>;
73 async fn begin(&mut self) -> Result<(), BaseError>;
74 async fn commit(&mut self) -> Result<(), BaseError>;
75 async fn rollback(&mut self) -> Result<(), BaseError>;
76}
77
78#[async_trait]
79pub trait TUnitOfWork<R, E, A>: Send + Sync
80where
81 R: TRepository<E, A>,
82 E: Executor,
83 A: Aggregate,
84{
85 fn clone_context(&self) -> AtomicContextManager;
86 fn clone_executor(&self) -> Arc<RwLock<E>>;
87
88 async fn new(context: AtomicContextManager) -> Self;
90
91 fn repository(&mut self) -> &mut R;
92
93 async fn begin(&mut self) -> Result<(), BaseError>;
94
95 async fn commit<O: IOutBox<E>>(mut self) -> Result<(), BaseError>;
96
97 async fn rollback(self) -> Result<(), BaseError>;
98}
99
100#[derive(Clone)]
101pub struct UnitOfWork<R, E, A>
102where
103 R: TRepository<E, A>,
104 E: Executor,
105 A: Aggregate,
106{
107 executor: Arc<RwLock<E>>,
109 context: AtomicContextManager,
111 _aggregate: PhantomData<A>,
112
113 pub repository: R,
115}
116impl<R, E, A> UnitOfWork<R, E, A>
117where
118 R: TRepository<E, A>,
119 E: Executor,
120 A: Aggregate,
121{
122 async fn _commit(&mut self) -> Result<(), BaseError> {
123 let mut executor = self.executor.write().await;
124
125 executor.commit().await
126 }
127 async fn _commit_hook<O: IOutBox<E>>(&mut self) -> Result<(), BaseError> {
130 let event_queue = &mut self.context.write().await;
131 let mut outboxes = vec![];
132
133 for e in self.repository.get_events() {
134 if e.externally_notifiable() {
135 outboxes.push(e.outbox());
136 };
137 if e.internally_notifiable() {
138 event_queue.push_back(e.message_clone());
139 }
140 }
141 O::add(self.executor.clone(), outboxes).await
142 }
143}
144
145#[async_trait]
146impl<R, E, A> TUnitOfWork<R, E, A> for UnitOfWork<R, E, A>
147where
148 R: TRepository<E, A>,
149 E: Executor,
150 A: Aggregate,
151{
152 fn clone_context(&self) -> AtomicContextManager {
153 Arc::clone(&self.context)
154 }
155 fn clone_executor(&self) -> Arc<RwLock<E>> {
156 self.executor.clone()
157 }
158
159 async fn new(context: AtomicContextManager) -> Self {
161 let executor: Arc<RwLock<E>> = E::new().await;
162
163 let mut uow = Self {
164 repository: R::new(Arc::clone(&executor)),
165 context,
166 executor,
167 _aggregate: PhantomData,
168 };
169 uow.begin().await.unwrap();
170 uow
171 }
172
173 fn repository(&mut self) -> &mut R {
175 &mut self.repository
176 }
177
178 async fn begin(&mut self) -> Result<(), BaseError> {
180 let mut executor = self.executor.write().await;
181 executor.begin().await
182 }
183
184 async fn commit<O: IOutBox<E>>(mut self) -> Result<(), BaseError> {
186 self._commit_hook::<O>().await?;
190
191 self._commit().await
193 }
194
195 async fn rollback(self) -> Result<(), BaseError> {
197 let mut executor = self.executor.write().await;
198 executor.rollback().await
199 }
200}