ddd_rs/application/repository.rs
1use std::sync::Arc;
2
3use crate::domain::{AggregateRoot, AggregateRootEx, Entity};
4
5use super::DomainEventHandler;
6
7/// Trait for representing a **Repository**.
8///
9/// > Therefore, use a Repository, the purpose of which is to encapsulate all the logic needed to
10/// > obtain object references. The domain objects won’t have to deal with the infrastructure to get
11/// > the needed references to other objects of the domain. They will just get them from the
12/// > Repository and the model is regaining its clarity and focus.
13///
14/// # Examples
15///
16/// This example uses the [InMemoryRepository](crate::infrastructure::memory::InMemoryRepository)
17/// which is a sample implementation of this trait.
18///
19/// ```
20/// use ddd_rs::{
21/// application::{ReadRepository, Repository},
22/// infrastructure::InMemoryRepository
23/// };
24///
25/// // By definition, only `AggregateRoot`s have repositories.
26/// //
27/// // Common entities must be retrieved and persisted through their associated aggregate roots.
28/// #[derive(ddd_rs::AggregateRoot, ddd_rs::Entity, Clone)]
29/// struct MyEntity {
30/// #[entity(id)]
31/// id: u32,
32/// my_field: String,
33/// }
34///
35/// impl MyEntity {
36/// pub fn new(id: u32, my_field: impl ToString) -> Self {
37/// Self {
38/// id,
39/// my_field: my_field.to_string(),
40/// }
41/// }
42/// }
43///
44/// # tokio_test::block_on(async {
45/// let repository: InMemoryRepository<MyEntity> = InMemoryRepository::new();
46///
47/// // Add some entities to the repository.
48/// repository.add(MyEntity::new(1, "foo")).await.unwrap();
49/// repository.add(MyEntity::new(2, "bar")).await.unwrap();
50/// repository.add(MyEntity::new(3, "baz")).await.unwrap();
51///
52/// // Attempt to retrieve an entity by its ID.
53/// let my_entity_2 = repository.get_by_id(2).await.unwrap();
54///
55/// assert!(my_entity_2.is_some());
56/// assert_eq!(my_entity_2.as_ref().map(|e| e.my_field.as_str()), Some("bar"));
57///
58/// let mut my_entity_2 = my_entity_2.unwrap();
59///
60/// // Update the entity, then persist its changes.
61/// my_entity_2.my_field = "qux".to_string();
62///
63/// let my_entity_2 = repository.update(my_entity_2).await.unwrap();
64///
65/// assert_eq!(my_entity_2.my_field.as_str(), "qux");
66///
67/// // Delete the entity permanently.
68/// repository.delete(my_entity_2).await.unwrap();
69///
70/// // Assert it no longer exists.
71/// assert!(!repository.exists(2).await.unwrap());
72/// # })
73/// ```
74#[async_trait::async_trait]
75pub trait Repository<T: AggregateRoot>: ReadRepository<T> {
76 /// Adds an entity to the repository.
77 async fn add(&self, entity: T) -> crate::Result<T>;
78
79 /// Updates an entity on the repository.
80 async fn update(&self, entity: T) -> crate::Result<T>;
81
82 /// Deletes the entity from the repository.
83 async fn delete(&self, entity: T) -> crate::Result<()>;
84
85 /// Adds the given entities to the repository.
86 async fn add_range(&self, entities: Vec<T>) -> crate::Result<Vec<T>> {
87 let mut added_entities = Vec::new();
88
89 for entity in entities {
90 self.add(entity).await.map(|e| added_entities.push(e))?;
91 }
92
93 Ok(added_entities)
94 }
95
96 /// Updates the given entities on the repository.
97 async fn update_range(&self, entities: Vec<T>) -> crate::Result<Vec<T>> {
98 let mut updated_entities = Vec::new();
99
100 for entity in entities {
101 self.update(entity)
102 .await
103 .map(|e| updated_entities.push(e))?;
104 }
105
106 Ok(updated_entities)
107 }
108
109 /// Deletes the given entities from the repository.
110 async fn delete_range(&self, entities: Vec<T>) -> crate::Result<()> {
111 for entity in entities {
112 self.delete(entity).await?;
113 }
114
115 Ok(())
116 }
117}
118
119/// Trait for representing a read-only **Repository**.
120///
121/// See the [Repository] trait for the definition of a repository and a sample of its usage.
122#[async_trait::async_trait]
123pub trait ReadRepository<T: AggregateRoot>: Send + Sync {
124 /// Gets an entity with the given ID.
125 async fn get_by_id(&self, id: <T as Entity>::Id) -> crate::Result<Option<T>>;
126
127 /// Lists all entities within a given page.
128 async fn list(&self, skip: usize, take: usize) -> crate::Result<Vec<T>>;
129
130 /// Returns the total number of entities in the repository.
131 async fn count(&self) -> crate::Result<usize>;
132
133 /// Checks whether an entity with the given ID exists in the repository.
134 async fn exists(&self, id: <T as Entity>::Id) -> crate::Result<bool> {
135 self.get_by_id(id).await.map(|e| e.is_some())
136 }
137
138 /// Checks if the repository is empty.
139 async fn is_empty(&self) -> crate::Result<bool> {
140 self.count().await.map(|c| c == 0)
141 }
142}
143
144/// Repository extension abstraction, for performing operations over aggregates that implement the
145/// [AggregateRootEx] trait.
146///
147/// # Examples
148///
149/// Building upon the [Repository] sample, this example shows how a repository object can be
150/// extended in order to support concepts from the [AggregateRootEx] trait.
151///
152/// ```
153/// use std::sync::Arc;
154///
155/// use ddd_rs::{
156/// application::{DomainEventHandler, ReadRepository, Repository, RepositoryEx},
157/// infrastructure::InMemoryRepository
158/// };
159///
160/// // The aggregate below requires an action to be performed asynchronously, but doing so directly
161/// // would require the aggregate root to:
162/// //
163/// // - Have a reference to one or many application services, thus breaching the isolation between
164/// // the Application and Domain layers;
165/// // - Expect an async runtime, which is generally associated with I/O operations and
166/// // long-running tasks, to be available for the implementation of business rules.
167/// //
168/// // These can be seem as contrary to the modeling principles of DDD, since the domain model
169/// // should be self-sufficient when enforcing its own business rules.
170/// //
171/// // Instead, the aggregate will register a domain event requesting the async action to be
172/// // performed prior to being persisted to the repository.
173/// #[derive(Clone, Debug, PartialEq)]
174/// enum MyDomainEvent {
175/// AsyncActionRequested { action: String },
176/// }
177///
178/// #[derive(ddd_rs::AggregateRoot, ddd_rs::Entity, Clone)]
179/// struct MyEntity {
180/// #[entity(id)]
181/// id: u32,
182/// last_performed_action: Option<String>,
183/// #[aggregate_root(domain_events)]
184/// domain_events: Vec<MyDomainEvent>,
185/// }
186///
187/// impl MyEntity {
188/// pub fn new(id: u32) -> Self {
189/// Self {
190/// id,
191/// last_performed_action: None,
192/// domain_events: Default::default(),
193/// }
194/// }
195///
196/// pub fn request_async_action(&mut self, action: impl ToString) {
197/// let domain_event = MyDomainEvent::AsyncActionRequested { action: action.to_string() };
198///
199/// self.register_domain_event(domain_event);
200/// }
201///
202/// pub fn confirm_async_action_performed(&mut self, action: impl ToString) {
203/// self.last_performed_action.replace(action.to_string());
204/// }
205/// }
206///
207/// // The domain event handler will usually be a context that holds references to all necessary
208/// // services and providers to handle domain events.
209/// struct MyDomainEventHandler {
210/// repository: Arc<dyn Repository<MyEntity>>,
211/// }
212///
213/// impl MyDomainEventHandler {
214/// pub fn new(repository: Arc<dyn Repository<MyEntity>>) -> Self {
215/// Self { repository }
216/// }
217/// }
218///
219/// #[async_trait::async_trait]
220/// impl DomainEventHandler<MyEntity> for MyDomainEventHandler {
221/// async fn handle(&self, mut entity: MyEntity, event: MyDomainEvent) -> ddd_rs::Result<MyEntity> {
222/// let action = match event {
223/// MyDomainEvent::AsyncActionRequested { action, .. } => action,
224/// };
225///
226/// // Perform the async action...
227///
228/// entity.confirm_async_action_performed(action);
229///
230/// self.repository.update(entity).await
231/// }
232/// }
233///
234/// # tokio_test::block_on(async {
235///
236/// // Extend the basic repository to enable processing of domain events registered by the
237/// // aggregate, upon persistence.
238/// let repository = Arc::new(InMemoryRepository::new());
239/// let domain_event_handler = Arc::new(MyDomainEventHandler::new(repository.clone()));
240///
241/// let repository_ex = RepositoryEx::new(domain_event_handler, repository);
242///
243/// // Create a new entity and request an async action.
244/// let mut entity = MyEntity::new(42);
245///
246/// entity.request_async_action("foo");
247///
248/// // Assert that the action was not performed yet, but registered a domain event.
249/// assert!(entity.last_performed_action.is_none());
250/// assert_eq!(entity.domain_events.len(), 1);
251///
252/// // Persist the entity and assert that the action was performed as a result.
253/// repository_ex.add(entity).await.unwrap();
254///
255/// let entity = repository_ex.get_by_id(42).await.unwrap().unwrap();
256///
257/// assert_eq!(entity.last_performed_action.unwrap(), "foo");
258/// assert!(entity.domain_events.is_empty());
259/// # })
260/// ```
261pub struct RepositoryEx<T: AggregateRootEx> {
262 domain_event_handler: Arc<dyn DomainEventHandler<T>>,
263 repository: Arc<dyn Repository<T>>,
264}
265
266impl<T: AggregateRootEx> RepositoryEx<T> {
267 /// Creates a new instance of the extended repository.
268 pub fn new(
269 domain_event_handler: Arc<dyn DomainEventHandler<T>>,
270 repository: Arc<dyn Repository<T>>,
271 ) -> Self {
272 Self {
273 domain_event_handler,
274 repository,
275 }
276 }
277}
278
279#[async_trait::async_trait]
280impl<T: AggregateRootEx> ReadRepository<T> for RepositoryEx<T> {
281 async fn get_by_id(&self, id: <T as Entity>::Id) -> crate::Result<Option<T>> {
282 self.repository.get_by_id(id).await
283 }
284
285 async fn list(&self, skip: usize, take: usize) -> crate::Result<Vec<T>> {
286 self.repository.list(skip, take).await
287 }
288
289 async fn count(&self) -> crate::Result<usize> {
290 self.repository.count().await
291 }
292}
293
294#[async_trait::async_trait]
295impl<T: AggregateRootEx> Repository<T> for RepositoryEx<T> {
296 async fn add(&self, mut entity: T) -> crate::Result<T> {
297 let domain_events = entity.take_domain_events();
298
299 let mut entity = self.repository.add(entity).await?;
300
301 for event in domain_events {
302 entity = self.domain_event_handler.handle(entity, event).await?;
303 }
304
305 Ok(entity)
306 }
307
308 async fn update(&self, mut entity: T) -> crate::Result<T> {
309 let domain_events = entity.take_domain_events();
310
311 let mut entity = self.repository.update(entity).await?;
312
313 for event in domain_events {
314 entity = self.domain_event_handler.handle(entity, event).await?;
315 }
316
317 Ok(entity)
318 }
319
320 async fn delete(&self, mut entity: T) -> crate::Result<()> {
321 let domain_events = entity.take_domain_events();
322
323 let mut entity = entity;
324
325 for event in domain_events {
326 entity = self.domain_event_handler.handle(entity, event).await?;
327 }
328
329 self.repository.delete(entity).await
330 }
331}