use crate::aggregate::{AggregateIdGenerator, DefaultIdGenerator};
use crate::context::CqrsContext;
use crate::denormalizer::Dispatcher;
use crate::errors::CqrsError;
use crate::event::Event;
use crate::{Aggregate, CommandHandler, DynEventStore, EventEnvelope};
use std::collections::HashMap;
use tracing::{debug, error, info};
pub struct CqrsCommandEngine<A>
where
A: Aggregate + CommandHandler + 'static,
A::Error: Into<CqrsError>,
{
store: DynEventStore<A>,
#[cfg(not(target_arch = "wasm32"))]
dispatchers: Vec<Box<dyn Dispatcher<A> + Send + Sync>>,
#[cfg(target_arch = "wasm32")]
dispatchers: Vec<Box<dyn Dispatcher<A>>>,
services: A::Services,
#[cfg(not(target_arch = "wasm32"))]
error_handler: Box<dyn Fn(&CqrsError) + Send + Sync>,
#[cfg(target_arch = "wasm32")]
error_handler: Box<dyn Fn(&CqrsError)>,
#[cfg(not(target_arch = "wasm32"))]
id_generator: Box<dyn AggregateIdGenerator<A> + Send + Sync>,
#[cfg(target_arch = "wasm32")]
id_generator: Box<dyn AggregateIdGenerator<A>>,
}
impl<A> CqrsCommandEngine<A>
where
A: Aggregate + CommandHandler + 'static,
A::Error: Into<CqrsError>,
{
#[must_use]
#[cfg(not(target_arch = "wasm32"))]
pub fn new(
store: DynEventStore<A>,
dispatchers: Vec<Box<dyn Dispatcher<A> + Send + Sync>>,
services: A::Services,
error_handler: Box<dyn Fn(&CqrsError) + Send + Sync>,
) -> Self {
Self {
store,
dispatchers,
services,
error_handler,
id_generator: Box::new(DefaultIdGenerator),
}
}
#[must_use]
#[cfg(target_arch = "wasm32")]
pub fn new(
store: DynEventStore<A>,
dispatchers: Vec<Box<dyn Dispatcher<A>>>,
services: A::Services,
error_handler: Box<dyn Fn(&CqrsError)>,
) -> Self {
Self {
store,
dispatchers,
services,
error_handler,
id_generator: Box::new(DefaultIdGenerator),
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn with_id_generator(
mut self,
id_generator: Box<dyn AggregateIdGenerator<A> + Send + Sync>,
) -> Self {
self.id_generator = id_generator;
self
}
#[cfg(target_arch = "wasm32")]
pub fn with_id_generator(mut self, id_generator: Box<dyn AggregateIdGenerator<A>>) -> Self {
self.id_generator = id_generator;
self
}
#[cfg(not(target_arch = "wasm32"))]
pub fn append_dispatcher(&mut self, dispatcher: Box<dyn Dispatcher<A> + Send + Sync>) {
self.dispatchers.push(dispatcher);
}
#[cfg(target_arch = "wasm32")]
pub fn append_dispatcher(&mut self, dispatcher: Box<dyn Dispatcher<A>>) {
self.dispatchers.push(dispatcher);
}
pub async fn execute_create(
&self,
command: A::CreateCommand,
context: &CqrsContext,
) -> Result<String, CqrsError> {
debug!("Executing create command");
let result = self
.execute_create_with_metadata(command, HashMap::new(), context)
.await;
match &result {
Ok(id) => info!(aggregate_id = %id, "Aggregate created successfully"),
Err(e) => error!(error = %e, "Failed to create aggregate"),
}
result
}
pub async fn execute_update(
&self,
aggregate_id: &str,
command: A::UpdateCommand,
context: &CqrsContext,
) -> Result<(), CqrsError> {
debug!("Executing update command");
let result = self
.execute_update_with_metadata(aggregate_id, command, HashMap::new(), context)
.await;
match &result {
Ok(_) => info!("Aggregate updated successfully"),
Err(e) => error!(error = %e, "Failed to update aggregate"),
}
result
}
pub async fn execute_create_with_metadata(
&self,
command: A::CreateCommand,
metadata: HashMap<String, String>,
context: &CqrsContext,
) -> Result<String, CqrsError> {
debug!("Executing create command with metadata");
let aggregate_id = self.id_generator.next_id(&command, context);
debug!(aggregate_id = %aggregate_id, "Generated new aggregate ID");
let (aggregate, version) = match self.store.initialize_aggregate(&aggregate_id).await {
Ok(result) => {
let (_, v) = &result;
debug!(version = %v, "Initialized aggregate");
result
}
Err(e) => {
error!(error = %e, "Failed to initialize aggregate");
return Err(e);
}
};
let events = match aggregate
.handle_create(command, &self.services, context)
.await
{
Ok(events) => {
debug!(
event_count = events.len(),
"Generated events from create command"
);
events
}
Err(e) => {
error!(error = %e, "Failed to handle create command");
return Err(e.into());
}
};
match self
.process(&aggregate_id, aggregate, version, events, metadata, context)
.await
{
Ok(_) => {
debug!("Processed events successfully");
}
Err(e) => {
error!(error = %e, "Failed to process events");
return Err(e);
}
}
info!(aggregate_id = %aggregate_id, "Aggregate created successfully with metadata");
Ok(aggregate_id)
}
async fn handle_events(
&self,
aggregate_id: &str,
events: &[EventEnvelope<A>],
context: &CqrsContext,
) {
debug!("Handling events for dispatchers");
let eh = &self.error_handler;
for (i, dispatcher) in self.dispatchers.iter().enumerate() {
debug!(dispatcher_index = i, "Dispatching events to dispatcher");
match dispatcher.dispatch(aggregate_id, events, context).await {
Ok(_) => debug!(dispatcher_index = i, "Successfully dispatched events"),
Err(e) => {
error!(dispatcher_index = i, error = %e, "Failed to dispatch events");
eh(&e);
}
};
}
debug!("Finished handling events for all dispatchers");
}
pub async fn execute_update_with_metadata(
&self,
aggregate_id: &str,
command: A::UpdateCommand,
metadata: HashMap<String, String>,
context: &CqrsContext,
) -> Result<(), CqrsError> {
debug!("Executing update command with metadata");
let (mut aggregate, version) = match self.store.load_aggregate(aggregate_id).await {
Ok(result) => {
let (_, v) = &result;
debug!(version = %v, "Loaded aggregate");
result
}
Err(e) => {
error!(error = %e, "Failed to load aggregate");
return Err(e);
}
};
let events = match aggregate
.handle_update(command, &self.services, context)
.await
{
Ok(events) => {
debug!(
event_count = events.len(),
"Generated events from update command"
);
events
}
Err(e) => {
error!(error = %e, "Failed to handle update command");
return Err(e.into());
}
};
for event in &events {
if let Err(e) = aggregate.apply(event.clone()) {
error!(error = %e, "Failed to apply event to aggregate");
return Err(e.into());
}
}
debug!("Applied events to aggregate");
let committed_events = match self
.store
.commit(events, &aggregate, metadata, version, context)
.await
{
Ok(events) => {
debug!(event_count = events.len(), "Committed events to store");
events
}
Err(e) => {
error!(error = %e, "Failed to commit events");
return Err(e);
}
};
if committed_events.is_empty() {
debug!("No events committed, returning early");
return Ok(());
}
debug!(
event_count = committed_events.len(),
"Dispatching events to handlers"
);
self.handle_events(aggregate_id, &committed_events, context)
.await;
info!("Aggregate updated successfully with metadata");
Ok(())
}
async fn process(
&self,
aggregate_id: &str,
mut aggregate: A,
version: usize,
events: Vec<A::Event>,
metadata: HashMap<String, String>,
context: &CqrsContext,
) -> Result<(), CqrsError> {
debug!("Processing events for aggregate");
for (i, event) in events.iter().enumerate() {
debug!(
event_index = i,
event_type = event.event_type(),
"Applying event to aggregate"
);
match aggregate.apply(event.clone()) {
Ok(_) => debug!(event_index = i, "Successfully applied event to aggregate"),
Err(e) => {
error!(event_index = i, error = %e, "Failed to apply event to aggregate");
return Err(e.into());
}
}
}
debug!("Applied all events to aggregate");
debug!("Committing events to store");
let committed_events = match self
.store
.commit(events, &aggregate, metadata, version, context)
.await
{
Ok(events) => {
debug!(
event_count = events.len(),
"Successfully committed events to store"
);
events
}
Err(e) => {
error!(error = %e, "Failed to commit events to store");
return Err(e);
}
};
if committed_events.is_empty() {
debug!("No events committed, returning early");
return Ok(());
}
debug!(
event_count = committed_events.len(),
"Dispatching committed events to handlers"
);
self.handle_events(aggregate_id, &committed_events, context)
.await;
debug!("Successfully processed all events");
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::es::inmemory::InMemoryPersist;
use crate::es::EventStoreImpl;
use crate::testing::{CreateCommand, TestAggregate, TestEvent, UpdateCommand};
use crate::CqrsCommandEngine;
use crate::CqrsContext;
use crate::EventEnvelope;
use futures::StreamExt;
#[tokio::test]
async fn test_create_aggregate() {
let persist = InMemoryPersist::<TestAggregate>::new();
let store = EventStoreImpl::new(persist);
let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}));
let context = CqrsContext::default();
let aggregate_id = engine
.execute_create(
CreateCommand::Initialize {
name: "toto".to_string(),
},
&context,
)
.await
.expect("Creation should succeed");
assert!(!aggregate_id.is_empty(), "Aggregate ID should not be empty");
}
#[tokio::test]
async fn test_update_aggregate() {
let persist = InMemoryPersist::<TestAggregate>::new();
let store = EventStoreImpl::new(persist);
let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}));
let context = CqrsContext::default();
let aggregate_id = engine
.execute_create(
CreateCommand::Initialize {
name: "toto".to_string(),
},
&context,
)
.await
.expect("Creation should succeed");
engine
.execute_update(&aggregate_id, UpdateCommand::Increment, &context)
.await
.expect("Update should succeed");
let event_stream = engine
.store
.load_events(&aggregate_id)
.await
.expect("Event loading should succeed");
let events: Vec<EventEnvelope<TestAggregate>> = event_stream
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.expect("Events should be valid");
assert_eq!(events.len(), 2, "There should be two events");
assert_eq!(
events[0].payload,
TestEvent::Created {
name: "toto".to_string()
}
);
assert!(matches!(events[1].payload, TestEvent::Incremented));
}
#[tokio::test]
async fn test_multiple_updates() {
let persist = InMemoryPersist::<TestAggregate>::new();
let store = EventStoreImpl::new(persist);
let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}));
let context = CqrsContext::default();
let aggregate_id = engine
.execute_create(
CreateCommand::Initialize {
name: "toto".to_string(),
},
&context,
)
.await
.expect("Creation should succeed");
engine
.execute_update(&aggregate_id, UpdateCommand::Increment, &context)
.await
.expect("First update should succeed");
engine
.execute_update(&aggregate_id, UpdateCommand::Increment, &context)
.await
.expect("Second update should succeed");
let event_stream = engine
.store
.load_events(&aggregate_id)
.await
.expect("Event loading should succeed");
let events: Vec<EventEnvelope<TestAggregate>> = event_stream
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.expect("Events should be valid");
assert_eq!(events.len(), 3, "There should be three events");
assert_eq!(
events[0].payload,
TestEvent::Created {
name: "toto".to_string()
}
);
assert!(matches!(events[1].payload, TestEvent::Incremented));
assert!(matches!(events[2].payload, TestEvent::Incremented));
}
}