pub trait Aggregate: Default + Serialize + DeserializeOwned + Sync + Send {
type Command;
type Event: DomainEvent;
type Error: Error;
type Services: Send + Sync;
// Required methods
fn aggregate_type() -> String;
fn handle<'life0, 'life1, 'async_trait>(
&'life0 self,
command: Self::Command,
service: &'life1 Self::Services
) -> Pin<Box<dyn Future<Output = Result<Vec<Self::Event>, Self::Error>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn apply(&mut self, event: Self::Event);
}
Expand description
In CQRS (and Domain Driven Design) an Aggregate
is the fundamental component that
encapsulates the state and application logic (aka business rules) for the application.
An Aggregate
is always composed of a
DDD entity
along with all entities and
value objects
associated with it.
§Example of a ‘Customer’ aggregate
#[derive(Default,Serialize,Deserialize)]
struct Customer {
name: Option<String>,
email: Option<String>,
}
#[async_trait]
impl Aggregate for Customer {
type Command = CustomerCommand;
type Event = CustomerEvent;
type Error = CustomerError;
type Services = CustomerService;
fn aggregate_type() -> String { "customer".to_string() }
async fn handle(&self, command: Self::Command, service: &Self::Services) -> Result<Vec<Self::Event>, Self::Error> {
match command {
CustomerCommand::AddCustomerName{name: changed_name} => {
if self.name.is_some() {
return Err("a name has already been added".into());
}
Ok(vec![CustomerEvent::NameAdded{name:changed_name}])
}
CustomerCommand::UpdateEmail { new_email } => {
Ok(vec![CustomerEvent::EmailUpdated { new_email }])
}
}
}
fn apply(&mut self, event: Self::Event) {
match event {
CustomerEvent::NameAdded{name: changed_name} => {
self.name = Some(changed_name);
}
CustomerEvent::EmailUpdated{new_email} => {
self.email = Some(new_email);
}
}
}
}
Required Associated Types§
sourcetype Event: DomainEvent
type Event: DomainEvent
Specifies the published events representing some change in state of the Aggregate.
Required Methods§
sourcefn aggregate_type() -> String
fn aggregate_type() -> String
The aggregate type is used as the unique identifier for this aggregate and its events. This is used for persisting the events and snapshots to a database.
sourcefn handle<'life0, 'life1, 'async_trait>(
&'life0 self,
command: Self::Command,
service: &'life1 Self::Services
) -> Pin<Box<dyn Future<Output = Result<Vec<Self::Event>, Self::Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn handle<'life0, 'life1, 'async_trait>(
&'life0 self,
command: Self::Command,
service: &'life1 Self::Services
) -> Pin<Box<dyn Future<Output = Result<Vec<Self::Event>, Self::Error>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
This method consumes and processes commands. The result should be either a vector of events if the command is successful, or an error if the command is rejected.
All business logic belongs in this method.
use cqrs_es::{Aggregate, AggregateError};
#[derive(Default,Serialize,Deserialize)]
async fn handle(&self, command: Self::Command, service: &Self::Services) -> Result<Vec<Self::Event>, Self::Error> {
match command {
CustomerCommand::AddCustomerName{name: changed_name} => {
if self.name.is_some() {
return Err("a name has already been added".into());
}
Ok(vec![CustomerEvent::NameAdded{ name: changed_name}])
}
CustomerCommand::UpdateEmail { new_email } => {
Ok(vec![CustomerEvent::EmailUpdated { new_email }])
}
}
}
sourcefn apply(&mut self, event: Self::Event)
fn apply(&mut self, event: Self::Event)
This is used to update the aggregate’s state once an event has been committed.
Any events returned from the handle
method will be applied using this method
in order to populate the state of the aggregate instance.
The source of truth used in the CQRS framework determines when the events are applied to an aggregate:
- event sourced - All events are applied every time the aggregate is loaded.
- aggregate sourced - Events are applied immediately after they are returned from
handle
(and before they are committed) and the resulting aggregate instance is serialized and persisted. - snapshots - Uses a combination of the above patterns.
No business logic should be placed here, this is only used for updating the aggregate state.
use cqrs_es::{Aggregate, AggregateError};
use async_trait::async_trait;
#[derive(Default,Serialize,Deserialize)]
fn apply(&mut self, event: Self::Event) {
match event {
CustomerEvent::NameAdded{name} => {
self.name = Some(name);
}
CustomerEvent::EmailUpdated{new_email} => {
self.email = Some(new_email);
}
}
}