ddd_rs/application/
repository.rs

1use std::sync::Arc;
2
3use crate::domain::{AggregateRoot, AggregateRootEx, Entity};
4
5use super::DomainEventHandler;
6
7/// Trait for representing a **Repository**.
8///
9/// > Therefore, use a Repository, the purpose of which is to encapsulate all the logic needed to
10/// > obtain object references. The domain objects won’t have to deal with the infrastructure to get
11/// > the needed references to other objects of the domain. They will just get them from the
12/// > Repository and the model is regaining its clarity and focus.
13///
14/// # Examples
15///
16/// This example uses the [InMemoryRepository](crate::infrastructure::memory::InMemoryRepository)
17/// which is a sample implementation of this trait.
18///
19/// ```
20/// use ddd_rs::{
21///     application::{ReadRepository, Repository},
22///     infrastructure::InMemoryRepository
23/// };
24///
25/// // By definition, only `AggregateRoot`s have repositories.
26/// //
27/// // Common entities must be retrieved and persisted through their associated aggregate roots.
28/// #[derive(ddd_rs::AggregateRoot, ddd_rs::Entity, Clone)]
29/// struct MyEntity {
30///     #[entity(id)]
31///     id: u32,
32///     my_field: String,
33/// }
34///
35/// impl MyEntity {
36///     pub fn new(id: u32, my_field: impl ToString) -> Self {
37///         Self {
38///             id,
39///             my_field: my_field.to_string(),
40///         }
41///     }
42/// }
43///
44/// # tokio_test::block_on(async {
45/// let repository: InMemoryRepository<MyEntity> = InMemoryRepository::new();
46///
47/// // Add some entities to the repository.
48/// repository.add(MyEntity::new(1, "foo")).await.unwrap();
49/// repository.add(MyEntity::new(2, "bar")).await.unwrap();
50/// repository.add(MyEntity::new(3, "baz")).await.unwrap();
51///
52/// // Attempt to retrieve an entity by its ID.
53/// let my_entity_2 = repository.get_by_id(2).await.unwrap();
54///
55/// assert!(my_entity_2.is_some());
56/// assert_eq!(my_entity_2.as_ref().map(|e| e.my_field.as_str()), Some("bar"));
57///
58/// let mut my_entity_2 = my_entity_2.unwrap();
59///
60/// // Update the entity, then persist its changes.
61/// my_entity_2.my_field = "qux".to_string();
62///
63/// let my_entity_2 = repository.update(my_entity_2).await.unwrap();
64///
65/// assert_eq!(my_entity_2.my_field.as_str(), "qux");
66///
67/// // Delete the entity permanently.
68/// repository.delete(my_entity_2).await.unwrap();
69///
70/// // Assert it no longer exists.
71/// assert!(!repository.exists(2).await.unwrap());
72/// # })
73/// ```
74#[async_trait::async_trait]
75pub trait Repository<T: AggregateRoot>: ReadRepository<T> {
76    /// Adds an entity to the repository.
77    async fn add(&self, entity: T) -> crate::Result<T>;
78
79    /// Updates an entity on the repository.
80    async fn update(&self, entity: T) -> crate::Result<T>;
81
82    /// Deletes the entity from the repository.
83    async fn delete(&self, entity: T) -> crate::Result<()>;
84
85    /// Adds the given entities to the repository.
86    async fn add_range(&self, entities: Vec<T>) -> crate::Result<Vec<T>> {
87        let mut added_entities = Vec::new();
88
89        for entity in entities {
90            self.add(entity).await.map(|e| added_entities.push(e))?;
91        }
92
93        Ok(added_entities)
94    }
95
96    /// Updates the given entities on the repository.
97    async fn update_range(&self, entities: Vec<T>) -> crate::Result<Vec<T>> {
98        let mut updated_entities = Vec::new();
99
100        for entity in entities {
101            self.update(entity)
102                .await
103                .map(|e| updated_entities.push(e))?;
104        }
105
106        Ok(updated_entities)
107    }
108
109    /// Deletes the given entities from the repository.
110    async fn delete_range(&self, entities: Vec<T>) -> crate::Result<()> {
111        for entity in entities {
112            self.delete(entity).await?;
113        }
114
115        Ok(())
116    }
117}
118
119/// Trait for representing a read-only **Repository**.
120///
121/// See the [Repository] trait for the definition of a repository and a sample of its usage.
122#[async_trait::async_trait]
123pub trait ReadRepository<T: AggregateRoot>: Send + Sync {
124    /// Gets an entity with the given ID.
125    async fn get_by_id(&self, id: <T as Entity>::Id) -> crate::Result<Option<T>>;
126
127    /// Lists all entities within a given page.
128    async fn list(&self, skip: usize, take: usize) -> crate::Result<Vec<T>>;
129
130    /// Returns the total number of entities in the repository.
131    async fn count(&self) -> crate::Result<usize>;
132
133    /// Checks whether an entity with the given ID exists in the repository.
134    async fn exists(&self, id: <T as Entity>::Id) -> crate::Result<bool> {
135        self.get_by_id(id).await.map(|e| e.is_some())
136    }
137
138    /// Checks if the repository is empty.
139    async fn is_empty(&self) -> crate::Result<bool> {
140        self.count().await.map(|c| c == 0)
141    }
142}
143
144/// Repository extension abstraction, for performing operations over aggregates that implement the
145/// [AggregateRootEx] trait.
146///
147/// # Examples
148///
149/// Building upon the [Repository] sample, this example shows how a repository object can be
150/// extended in order to support concepts from the [AggregateRootEx] trait.
151///
152/// ```
153/// use std::sync::Arc;
154///
155/// use ddd_rs::{
156///     application::{DomainEventHandler, ReadRepository, Repository, RepositoryEx},
157///     infrastructure::InMemoryRepository
158/// };
159///
160/// // The aggregate below requires an action to be performed asynchronously, but doing so directly
161/// // would require the aggregate root to:
162/// //
163/// // - Have a reference to one or many application services, thus breaching the isolation between
164/// //   the Application and Domain layers;
165/// // - Expect an async runtime, which is generally associated with I/O operations and
166/// //   long-running tasks, to be available for the implementation of business rules.
167/// //
168/// // These can be seem as contrary to the modeling principles of DDD, since the domain model
169/// // should be self-sufficient when enforcing its own business rules.
170/// //
171/// // Instead, the aggregate will register a domain event requesting the async action to be
172/// // performed prior to being persisted to the repository.
173/// #[derive(Clone, Debug, PartialEq)]
174/// enum MyDomainEvent {
175///     AsyncActionRequested { action: String },
176/// }
177///
178/// #[derive(ddd_rs::AggregateRoot, ddd_rs::Entity, Clone)]
179/// struct MyEntity {
180///     #[entity(id)]
181///     id: u32,
182///     last_performed_action: Option<String>,
183///     #[aggregate_root(domain_events)]
184///     domain_events: Vec<MyDomainEvent>,
185/// }
186///
187/// impl MyEntity {
188///     pub fn new(id: u32) -> Self {
189///         Self {
190///             id,
191///             last_performed_action: None,
192///             domain_events: Default::default(),
193///         }
194///     }
195///
196///     pub fn request_async_action(&mut self, action: impl ToString) {
197///         let domain_event = MyDomainEvent::AsyncActionRequested { action: action.to_string() };
198///
199///         self.register_domain_event(domain_event);
200///     }
201///
202///     pub fn confirm_async_action_performed(&mut self, action: impl ToString) {
203///         self.last_performed_action.replace(action.to_string());
204///     }
205/// }
206///
207/// // The domain event handler will usually be a context that holds references to all necessary
208/// // services and providers to handle domain events.
209/// struct MyDomainEventHandler {
210///     repository: Arc<dyn Repository<MyEntity>>,
211/// }
212///
213/// impl MyDomainEventHandler {
214///     pub fn new(repository: Arc<dyn Repository<MyEntity>>) -> Self {
215///         Self { repository }
216///     }
217/// }
218///
219/// #[async_trait::async_trait]
220/// impl DomainEventHandler<MyEntity> for MyDomainEventHandler {
221///     async fn handle(&self, mut entity: MyEntity, event: MyDomainEvent) -> ddd_rs::Result<MyEntity> {
222///         let action = match event {
223///             MyDomainEvent::AsyncActionRequested { action, .. } => action,
224///         };
225///
226///         // Perform the async action...
227///
228///         entity.confirm_async_action_performed(action);
229///
230///         self.repository.update(entity).await
231///     }
232/// }
233///
234/// # tokio_test::block_on(async {
235///
236/// // Extend the basic repository to enable processing of domain events registered by the
237/// // aggregate, upon persistence.
238/// let repository = Arc::new(InMemoryRepository::new());
239/// let domain_event_handler = Arc::new(MyDomainEventHandler::new(repository.clone()));
240///
241/// let repository_ex = RepositoryEx::new(domain_event_handler, repository);
242///
243/// // Create a new entity and request an async action.
244/// let mut entity = MyEntity::new(42);
245///
246/// entity.request_async_action("foo");
247///
248/// // Assert that the action was not performed yet, but registered a domain event.
249/// assert!(entity.last_performed_action.is_none());
250/// assert_eq!(entity.domain_events.len(), 1);
251///
252/// // Persist the entity and assert that the action was performed as a result.
253/// repository_ex.add(entity).await.unwrap();
254///
255/// let entity = repository_ex.get_by_id(42).await.unwrap().unwrap();
256///
257/// assert_eq!(entity.last_performed_action.unwrap(), "foo");
258/// assert!(entity.domain_events.is_empty());
259/// # })
260/// ```
261pub struct RepositoryEx<T: AggregateRootEx> {
262    domain_event_handler: Arc<dyn DomainEventHandler<T>>,
263    repository: Arc<dyn Repository<T>>,
264}
265
266impl<T: AggregateRootEx> RepositoryEx<T> {
267    /// Creates a new instance of the extended repository.
268    pub fn new(
269        domain_event_handler: Arc<dyn DomainEventHandler<T>>,
270        repository: Arc<dyn Repository<T>>,
271    ) -> Self {
272        Self {
273            domain_event_handler,
274            repository,
275        }
276    }
277}
278
279#[async_trait::async_trait]
280impl<T: AggregateRootEx> ReadRepository<T> for RepositoryEx<T> {
281    async fn get_by_id(&self, id: <T as Entity>::Id) -> crate::Result<Option<T>> {
282        self.repository.get_by_id(id).await
283    }
284
285    async fn list(&self, skip: usize, take: usize) -> crate::Result<Vec<T>> {
286        self.repository.list(skip, take).await
287    }
288
289    async fn count(&self) -> crate::Result<usize> {
290        self.repository.count().await
291    }
292}
293
294#[async_trait::async_trait]
295impl<T: AggregateRootEx> Repository<T> for RepositoryEx<T> {
296    async fn add(&self, mut entity: T) -> crate::Result<T> {
297        let domain_events = entity.take_domain_events();
298
299        let mut entity = self.repository.add(entity).await?;
300
301        for event in domain_events {
302            entity = self.domain_event_handler.handle(entity, event).await?;
303        }
304
305        Ok(entity)
306    }
307
308    async fn update(&self, mut entity: T) -> crate::Result<T> {
309        let domain_events = entity.take_domain_events();
310
311        let mut entity = self.repository.update(entity).await?;
312
313        for event in domain_events {
314            entity = self.domain_event_handler.handle(entity, event).await?;
315        }
316
317        Ok(entity)
318    }
319
320    async fn delete(&self, mut entity: T) -> crate::Result<()> {
321        let domain_events = entity.take_domain_events();
322
323        let mut entity = entity;
324
325        for event in domain_events {
326            entity = self.domain_event_handler.handle(entity, event).await?;
327        }
328
329        self.repository.delete(entity).await
330    }
331}