1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
use async_trait::async_trait;
use serde::de::DeserializeOwned;
use serde::Serialize;
use crate::DomainEvent;
/// In CQRS (and Domain Driven Design) an `Aggregate` is the fundamental component that
/// encapsulates the state and application logic (aka business rules) for the application.
/// An `Aggregate` is always composed of a
/// [DDD entity](https://docs.microsoft.com/en-us/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/microservice-domain-model#the-domain-entity-pattern)
/// along with all entities and
/// [value objects](https://docs.microsoft.com/en-us/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/microservice-domain-model#the-value-object-pattern)
/// associated with it.
///
/// # Example of a 'Customer' aggregate
/// ```rust
/// # use cqrs_es::doc::{CustomerEvent, CustomerError, CustomerCommand, CustomerService};
/// # use cqrs_es::{Aggregate, AggregateError};
/// # use serde::{Serialize,Deserialize};
/// # use async_trait::async_trait;
/// #[derive(Default,Serialize,Deserialize)]
/// struct Customer {
/// name: Option<String>,
/// email: Option<String>,
/// }
///
/// #[async_trait]
/// impl Aggregate for Customer {
/// type Command = CustomerCommand;
/// type Event = CustomerEvent;
/// type Error = CustomerError;
/// type Services = CustomerService;
///
///
/// fn aggregate_type() -> String { "customer".to_string() }
///
/// async fn handle(&self, command: Self::Command, service: &Self::Services) -> Result<Vec<Self::Event>, Self::Error> {
/// match command {
/// CustomerCommand::AddCustomerName{name: changed_name} => {
/// if self.name.is_some() {
/// return Err("a name has already been added".into());
/// }
/// Ok(vec![CustomerEvent::NameAdded{name:changed_name}])
/// }
///
/// CustomerCommand::UpdateEmail { new_email } => {
/// Ok(vec![CustomerEvent::EmailUpdated { new_email }])
/// }
/// }
/// }
///
/// fn apply(&mut self, event: Self::Event) {
/// match event {
/// CustomerEvent::NameAdded{name: changed_name} => {
/// self.name = Some(changed_name);
/// }
///
/// CustomerEvent::EmailUpdated{new_email} => {
/// self.email = Some(new_email);
/// }
/// }
/// }
/// }
/// ```
#[async_trait]
pub trait Aggregate: Default + Serialize + DeserializeOwned + Sync + Send {
/// Specifies the inbound command used to make changes in the state of the Aggregate.
type Command;
/// Specifies the published events representing some change in state of the Aggregate.
type Event: DomainEvent;
/// The error returned when a command fails due to business logic.
/// This is used to provide feedback to the user as to the nature of why the command was refused.
type Error: std::error::Error;
/// The external services required for the logic within the Aggregate
type Services: Send + Sync;
/// The aggregate type is used as the unique identifier for this aggregate and its events.
/// This is used for persisting the events and snapshots to a database.
fn aggregate_type() -> String;
/// This method consumes and processes commands.
/// The result should be either a vector of events if the command is successful,
/// or an error if the command is rejected.
///
/// _All business logic belongs in this method_.
///
/// ```rust
/// # use std::sync::Arc;
/// use cqrs_es::{Aggregate, AggregateError};
/// # use serde::{Serialize, Deserialize, de::DeserializeOwned};
/// # use cqrs_es::doc::{CustomerCommand, CustomerError, CustomerEvent, CustomerService};
/// # use async_trait::async_trait;
/// #[derive(Default,Serialize,Deserialize)]
/// # struct Customer {
/// # name: Option<String>,
/// # email: Option<String>,
/// # }
/// # #[async_trait]
/// # impl Aggregate for Customer {
/// # type Command = CustomerCommand;
/// # type Event = CustomerEvent;
/// # type Error = CustomerError;
/// # type Services = CustomerService;
/// # fn aggregate_type() -> String { "customer".to_string() }
/// async fn handle(&self, command: Self::Command, service: &Self::Services) -> Result<Vec<Self::Event>, Self::Error> {
/// match command {
/// CustomerCommand::AddCustomerName{name: changed_name} => {
/// if self.name.is_some() {
/// return Err("a name has already been added".into());
/// }
/// Ok(vec![CustomerEvent::NameAdded{ name: changed_name}])
/// }
///
/// CustomerCommand::UpdateEmail { new_email } => {
/// Ok(vec![CustomerEvent::EmailUpdated { new_email }])
/// }
/// }
/// }
/// # fn apply(&mut self, event: Self::Event) {}
/// # }
/// ```
async fn handle(
&self,
command: Self::Command,
service: &Self::Services,
) -> Result<Vec<Self::Event>, Self::Error>;
/// This is used to update the aggregate's state once an event has been committed.
/// Any events returned from the `handle` method will be applied using this method
/// in order to populate the state of the aggregate instance.
///
/// The source of truth used in the CQRS framework determines when the events are
/// applied to an aggregate:
/// - event sourced - All events are applied every time the aggregate is loaded.
/// - aggregate sourced - Events are applied immediately after they are returned from `handle`
/// (and before they are committed) and the resulting aggregate instance is serialized and persisted.
/// - snapshots - Uses a combination of the above patterns.
///
/// _No business logic should be placed here_, this is only used for updating the aggregate state.
///
/// ```rust
/// # use std::sync::Arc;
/// # use serde::{Serialize, Deserialize, de::DeserializeOwned};
/// # use cqrs_es::doc::{CustomerCommand, CustomerError, CustomerEvent, CustomerService};
/// use cqrs_es::{Aggregate, AggregateError};
/// use async_trait::async_trait;
/// #[derive(Default,Serialize,Deserialize)]
/// # struct Customer {
/// # name: Option<String>,
/// # email: Option<String>,
/// # }
/// # #[async_trait]
/// # impl Aggregate for Customer {
/// # type Command = CustomerCommand;
/// # type Event = CustomerEvent;
/// # type Error = CustomerError;
/// # type Services = CustomerService;
/// # fn aggregate_type() -> String { "customer".to_string() }
/// # async fn handle(&self, command: Self::Command, service: &Self::Services) -> Result<Vec<Self::Event>, Self::Error> {
/// # Ok(vec![])
/// # }
/// fn apply(&mut self, event: Self::Event) {
/// match event {
/// CustomerEvent::NameAdded{name} => {
/// self.name = Some(name);
/// }
///
/// CustomerEvent::EmailUpdated{new_email} => {
/// self.email = Some(new_email);
/// }
/// }
/// }
/// # }
/// ```
fn apply(&mut self, event: Self::Event);
}