event_driven_core/
unit_of_work.rs

1//! ### UnitOfWork
2//! [UnitOfWork][UOW] is to a unit that manages atomic transaction.
3//!
4//! Its [executor][Exec] is supposed to be shared with its sub type [Repository][TRepository].
5//!
6//! `commit`, and `rollback`, is governed by this implementation.
7//!
8//! When events are collected in `Repository`[TRepository], you can collect them
9//!
10//! automatically thanks to `_commit_hook` method.
11//!
12//! [UOW]: crate::unit_of_work::UnitOfWork
13//! [TRepository]: crate::repository::TRepository
14//! [Exec]: crate::unit_of_work::Executor
15//! [Handler]: crate::unit_of_work::Handler
16//!
17//! #### Usage Pattern
18//!
19//! ```ignore
20//! // Intialize Uow, start transaction
21//! let mut uow = UnitOfWork::<Repository<TaskAggregate>, Executor,TaskAggregate>::new(context).await;
22//!
23//! // Fetch data
24//! let mut aggregate = uow.repository().get(&cmd.aggregate_id).await?;
25//!
26//! // Process business logic
27//! aggregate.process_business_logic(cmd)?;
28//!
29//! // Apply changes
30//! uow.repository().update(&mut aggregate).await?;
31//!
32//! // Commit transaction
33//! uow.commit::<ServiceOutBox>().await?;
34//! ```
35//!
36//!
37//!
38//! ### Handler
39//! [Handler] is what orchestrates operations from data fetching, business logic operation and store
40//! changes back to db. This is where tranasction occurs.
41//!
42//! ### Example
43//! ```ignore
44//! struct ApplicationHandler;
45//! impl Handler for ApplicationHandler{
46//!     type E = ApplicationExecutor;
47//!     type R = ApplicationRepository<Aggregate>
48//! }
49//!
50//! impl ApplicationHandler{
51//!     pub async fn serve_request(
52//!         cmd: Command1,
53//!         context: AtomicContextManager,
54//! ) -> Result<(),ServiceError> {
55//!     let mut uow = TaskHandler::uow(context).await;
56//! }
57//! ```
58
59use crate::{
60	outbox::IOutBox,
61	prelude::{Aggregate, AtomicContextManager, BaseError},
62	repository::TRepository,
63};
64use async_trait::async_trait;
65use std::{marker::PhantomData, sync::Arc};
66use tokio::sync::RwLock;
67
68/// Executor is abstract implementation of whatever storage layer you use.
69/// Among examples are RDBMS, Queue, NoSQLs.
70#[async_trait]
71pub trait Executor: Sync + Send {
72	async fn new() -> Arc<RwLock<Self>>;
73	async fn begin(&mut self) -> Result<(), BaseError>;
74	async fn commit(&mut self) -> Result<(), BaseError>;
75	async fn rollback(&mut self) -> Result<(), BaseError>;
76}
77
78#[async_trait]
79pub trait TUnitOfWork<R, E, A>: Send + Sync
80where
81	R: TRepository<E, A>,
82	E: Executor,
83	A: Aggregate,
84{
85	fn clone_context(&self) -> AtomicContextManager;
86	fn clone_executor(&self) -> Arc<RwLock<E>>;
87
88	/// Creeate UOW object with context manager.
89	async fn new(context: AtomicContextManager) -> Self;
90
91	fn repository(&mut self) -> &mut R;
92
93	async fn begin(&mut self) -> Result<(), BaseError>;
94
95	async fn commit<O: IOutBox<E>>(mut self) -> Result<(), BaseError>;
96
97	async fn rollback(self) -> Result<(), BaseError>;
98}
99
100#[derive(Clone)]
101pub struct UnitOfWork<R, E, A>
102where
103	R: TRepository<E, A>,
104	E: Executor,
105	A: Aggregate,
106{
107	/// real transaction executor
108	executor: Arc<RwLock<E>>,
109	/// global event event_queue
110	context: AtomicContextManager,
111	_aggregate: PhantomData<A>,
112
113	/// event local repository for Executor
114	pub repository: R,
115}
116impl<R, E, A> UnitOfWork<R, E, A>
117where
118	R: TRepository<E, A>,
119	E: Executor,
120	A: Aggregate,
121{
122	async fn _commit(&mut self) -> Result<(), BaseError> {
123		let mut executor = self.executor.write().await;
124
125		executor.commit().await
126	}
127	/// commit_hook is invoked right before the calling for commit
128	/// which sorts out and processes outboxes and internally processable events.
129	async fn _commit_hook<O: IOutBox<E>>(&mut self) -> Result<(), BaseError> {
130		let event_queue = &mut self.context.write().await;
131		let mut outboxes = vec![];
132
133		for e in self.repository.get_events() {
134			if e.externally_notifiable() {
135				outboxes.push(e.outbox());
136			};
137			if e.internally_notifiable() {
138				event_queue.push_back(e.message_clone());
139			}
140		}
141		O::add(self.executor.clone(), outboxes).await
142	}
143}
144
145#[async_trait]
146impl<R, E, A> TUnitOfWork<R, E, A> for UnitOfWork<R, E, A>
147where
148	R: TRepository<E, A>,
149	E: Executor,
150	A: Aggregate,
151{
152	fn clone_context(&self) -> AtomicContextManager {
153		Arc::clone(&self.context)
154	}
155	fn clone_executor(&self) -> Arc<RwLock<E>> {
156		self.executor.clone()
157	}
158
159	/// Creeate UOW object with context manager.
160	async fn new(context: AtomicContextManager) -> Self {
161		let executor: Arc<RwLock<E>> = E::new().await;
162
163		let mut uow = Self {
164			repository: R::new(Arc::clone(&executor)),
165			context,
166			executor,
167			_aggregate: PhantomData,
168		};
169		uow.begin().await.unwrap();
170		uow
171	}
172
173	/// Get local event repository.
174	fn repository(&mut self) -> &mut R {
175		&mut self.repository
176	}
177
178	/// Begin transaction.
179	async fn begin(&mut self) -> Result<(), BaseError> {
180		let mut executor = self.executor.write().await;
181		executor.begin().await
182	}
183
184	/// Commit transaction.
185	async fn commit<O: IOutBox<E>>(mut self) -> Result<(), BaseError> {
186		// To drop uow itself!
187
188		// run commit hook
189		self._commit_hook::<O>().await?;
190
191		// commit
192		self._commit().await
193	}
194
195	/// Rollback transaction.
196	async fn rollback(self) -> Result<(), BaseError> {
197		let mut executor = self.executor.write().await;
198		executor.rollback().await
199	}
200}