1use crate::aggregate::{AggregateIdGenerator, DefaultIdGenerator};
2use crate::context::CqrsContext;
3use crate::denormalizer::Dispatcher;
4use crate::errors::CqrsError;
5use crate::event::Event;
6use crate::{Aggregate, CommandHandler, DynEventStore, EventEnvelope};
7use std::collections::HashMap;
8use tracing::{debug, error, info};
9
10pub struct CqrsCommandEngine<A>
48where
49 A: Aggregate + CommandHandler + 'static,
50 A::Error: Into<CqrsError>,
51{
52 store: DynEventStore<A>,
53 #[cfg(not(target_arch = "wasm32"))]
54 dispatchers: Vec<Box<dyn Dispatcher<A> + Send + Sync>>,
55 #[cfg(target_arch = "wasm32")]
56 dispatchers: Vec<Box<dyn Dispatcher<A>>>,
57 services: A::Services,
58 #[cfg(not(target_arch = "wasm32"))]
59 error_handler: Box<dyn Fn(&CqrsError) + Send + Sync>,
60 #[cfg(target_arch = "wasm32")]
61 error_handler: Box<dyn Fn(&CqrsError)>,
62 #[cfg(not(target_arch = "wasm32"))]
63 id_generator: Box<dyn AggregateIdGenerator<A> + Send + Sync>,
64 #[cfg(target_arch = "wasm32")]
65 id_generator: Box<dyn AggregateIdGenerator<A>>,
66}
67
68impl<A> CqrsCommandEngine<A>
69where
70 A: Aggregate + CommandHandler + 'static,
71 A::Error: Into<CqrsError>,
72{
73 #[must_use]
74 #[cfg(not(target_arch = "wasm32"))]
75 pub fn new(
76 store: DynEventStore<A>,
77 dispatchers: Vec<Box<dyn Dispatcher<A> + Send + Sync>>,
78 services: A::Services,
79 error_handler: Box<dyn Fn(&CqrsError) + Send + Sync>,
80 ) -> Self {
81 Self {
82 store,
83 dispatchers,
84 services,
85 error_handler,
86 id_generator: Box::new(DefaultIdGenerator),
87 }
88 }
89
90 #[must_use]
91 #[cfg(target_arch = "wasm32")]
92 pub fn new(
93 store: DynEventStore<A>,
94 dispatchers: Vec<Box<dyn Dispatcher<A>>>,
95 services: A::Services,
96 error_handler: Box<dyn Fn(&CqrsError)>,
97 ) -> Self {
98 Self {
99 store,
100 dispatchers,
101 services,
102 error_handler,
103 id_generator: Box::new(DefaultIdGenerator),
104 }
105 }
106
107 #[cfg(not(target_arch = "wasm32"))]
108 pub fn with_id_generator(
109 mut self,
110 id_generator: Box<dyn AggregateIdGenerator<A> + Send + Sync>,
111 ) -> Self {
112 self.id_generator = id_generator;
113 self
114 }
115
116 #[cfg(target_arch = "wasm32")]
117 pub fn with_id_generator(mut self, id_generator: Box<dyn AggregateIdGenerator<A>>) -> Self {
118 self.id_generator = id_generator;
119 self
120 }
121
122 #[cfg(not(target_arch = "wasm32"))]
123 pub fn append_dispatcher(&mut self, dispatcher: Box<dyn Dispatcher<A> + Send + Sync>) {
124 self.dispatchers.push(dispatcher);
125 }
126
127 #[cfg(target_arch = "wasm32")]
128 pub fn append_dispatcher(&mut self, dispatcher: Box<dyn Dispatcher<A>>) {
129 self.dispatchers.push(dispatcher);
130 }
131
132 pub async fn execute_create(
133 &self,
134 command: A::CreateCommand,
135 context: &CqrsContext,
136 ) -> Result<String, CqrsError> {
137 debug!("Executing create command");
138 let result = self
139 .execute_create_with_metadata(command, HashMap::new(), context)
140 .await;
141 match &result {
142 Ok(id) => info!(aggregate_id = %id, "Aggregate created successfully"),
143 Err(e) => error!(error = %e, "Failed to create aggregate"),
144 }
145 result
146 }
147
148 pub async fn execute_update(
149 &self,
150 aggregate_id: &str,
151 command: A::UpdateCommand,
152 context: &CqrsContext,
153 ) -> Result<(), CqrsError> {
154 debug!("Executing update command");
155 let result = self
156 .execute_update_with_metadata(aggregate_id, command, HashMap::new(), context)
157 .await;
158 match &result {
159 Ok(_) => info!("Aggregate updated successfully"),
160 Err(e) => error!(error = %e, "Failed to update aggregate"),
161 }
162 result
163 }
164
165 pub async fn execute_create_with_metadata(
166 &self,
167 command: A::CreateCommand,
168 metadata: HashMap<String, String>,
169 context: &CqrsContext,
170 ) -> Result<String, CqrsError> {
171 debug!("Executing create command with metadata");
172 let aggregate_id = self.id_generator.next_id(&command, context);
173 debug!(aggregate_id = %aggregate_id, "Generated new aggregate ID");
174
175 let (aggregate, version) = match self.store.initialize_aggregate(&aggregate_id).await {
176 Ok(result) => {
177 let (_, v) = &result;
178 debug!(version = %v, "Initialized aggregate");
179 result
180 }
181 Err(e) => {
182 error!(error = %e, "Failed to initialize aggregate");
183 return Err(e);
184 }
185 };
186
187 let events = match aggregate
188 .handle_create(command, &self.services, context)
189 .await
190 {
191 Ok(events) => {
192 debug!(
193 event_count = events.len(),
194 "Generated events from create command"
195 );
196 events
197 }
198 Err(e) => {
199 error!(error = %e, "Failed to handle create command");
200 return Err(e.into());
201 }
202 };
203
204 match self
205 .process(&aggregate_id, aggregate, version, events, metadata, context)
206 .await
207 {
208 Ok(_) => {
209 debug!("Processed events successfully");
210 }
211 Err(e) => {
212 error!(error = %e, "Failed to process events");
213 return Err(e);
214 }
215 }
216
217 info!(aggregate_id = %aggregate_id, "Aggregate created successfully with metadata");
218 Ok(aggregate_id)
219 }
220
221 async fn handle_events(
222 &self,
223 aggregate_id: &str,
224 events: &[EventEnvelope<A>],
225 context: &CqrsContext,
226 ) {
227 debug!("Handling events for dispatchers");
228 let eh = &self.error_handler;
229 for (i, dispatcher) in self.dispatchers.iter().enumerate() {
230 debug!(dispatcher_index = i, "Dispatching events to dispatcher");
231 match dispatcher.dispatch(aggregate_id, events, context).await {
232 Ok(_) => debug!(dispatcher_index = i, "Successfully dispatched events"),
233 Err(e) => {
234 error!(dispatcher_index = i, error = %e, "Failed to dispatch events");
235 eh(&e);
236 }
237 };
238 }
239 debug!("Finished handling events for all dispatchers");
240 }
241
242 pub async fn execute_update_with_metadata(
243 &self,
244 aggregate_id: &str,
245 command: A::UpdateCommand,
246 metadata: HashMap<String, String>,
247 context: &CqrsContext,
248 ) -> Result<(), CqrsError> {
249 debug!("Executing update command with metadata");
250
251 let (mut aggregate, version) = match self.store.load_aggregate(aggregate_id).await {
252 Ok(result) => {
253 let (_, v) = &result;
254 debug!(version = %v, "Loaded aggregate");
255 result
256 }
257 Err(e) => {
258 error!(error = %e, "Failed to load aggregate");
259 return Err(e);
260 }
261 };
262
263 let events = match aggregate
264 .handle_update(command, &self.services, context)
265 .await
266 {
267 Ok(events) => {
268 debug!(
269 event_count = events.len(),
270 "Generated events from update command"
271 );
272 events
273 }
274 Err(e) => {
275 error!(error = %e, "Failed to handle update command");
276 return Err(e.into());
277 }
278 };
279
280 for event in &events {
281 if let Err(e) = aggregate.apply(event.clone()) {
282 error!(error = %e, "Failed to apply event to aggregate");
283 return Err(e.into());
284 }
285 }
286 debug!("Applied events to aggregate");
287
288 let committed_events = match self
289 .store
290 .commit(events, &aggregate, metadata, version, context)
291 .await
292 {
293 Ok(events) => {
294 debug!(event_count = events.len(), "Committed events to store");
295 events
296 }
297 Err(e) => {
298 error!(error = %e, "Failed to commit events");
299 return Err(e);
300 }
301 };
302
303 if committed_events.is_empty() {
304 debug!("No events committed, returning early");
305 return Ok(());
306 }
307
308 debug!(
309 event_count = committed_events.len(),
310 "Dispatching events to handlers"
311 );
312 self.handle_events(aggregate_id, &committed_events, context)
313 .await;
314
315 info!("Aggregate updated successfully with metadata");
316 Ok(())
317 }
318
319 async fn process(
320 &self,
321 aggregate_id: &str,
322 mut aggregate: A,
323 version: usize,
324 events: Vec<A::Event>,
325 metadata: HashMap<String, String>,
326 context: &CqrsContext,
327 ) -> Result<(), CqrsError> {
328 debug!("Processing events for aggregate");
329
330 for (i, event) in events.iter().enumerate() {
331 debug!(
332 event_index = i,
333 event_type = event.event_type(),
334 "Applying event to aggregate"
335 );
336 match aggregate.apply(event.clone()) {
337 Ok(_) => debug!(event_index = i, "Successfully applied event to aggregate"),
338 Err(e) => {
339 error!(event_index = i, error = %e, "Failed to apply event to aggregate");
340 return Err(e.into());
341 }
342 }
343 }
344 debug!("Applied all events to aggregate");
345
346 debug!("Committing events to store");
347 let committed_events = match self
348 .store
349 .commit(events, &aggregate, metadata, version, context)
350 .await
351 {
352 Ok(events) => {
353 debug!(
354 event_count = events.len(),
355 "Successfully committed events to store"
356 );
357 events
358 }
359 Err(e) => {
360 error!(error = %e, "Failed to commit events to store");
361 return Err(e);
362 }
363 };
364
365 if committed_events.is_empty() {
366 debug!("No events committed, returning early");
367 return Ok(());
368 }
369
370 debug!(
371 event_count = committed_events.len(),
372 "Dispatching committed events to handlers"
373 );
374 self.handle_events(aggregate_id, &committed_events, context)
375 .await;
376
377 debug!("Successfully processed all events");
378 Ok(())
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use crate::es::inmemory::InMemoryPersist;
385 use crate::es::EventStoreImpl;
386 use crate::testing::{CreateCommand, TestAggregate, TestEvent, UpdateCommand};
387 use crate::CqrsCommandEngine;
388 use crate::CqrsContext;
389 use crate::EventEnvelope;
390 use futures::StreamExt;
391
392 #[tokio::test]
393 async fn test_create_aggregate() {
394 let persist = InMemoryPersist::<TestAggregate>::new();
396 let store = EventStoreImpl::new(persist);
397 let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}));
398
399 let context = CqrsContext::default();
400
401 let aggregate_id = engine
403 .execute_create(
404 CreateCommand::Initialize {
405 name: "toto".to_string(),
406 },
407 &context,
408 )
409 .await
410 .expect("Creation should succeed");
411
412 assert!(!aggregate_id.is_empty(), "Aggregate ID should not be empty");
414 }
415
416 #[tokio::test]
417 async fn test_update_aggregate() {
418 let persist = InMemoryPersist::<TestAggregate>::new();
420 let store = EventStoreImpl::new(persist);
421 let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}));
422
423 let context = CqrsContext::default();
424
425 let aggregate_id = engine
427 .execute_create(
428 CreateCommand::Initialize {
429 name: "toto".to_string(),
430 },
431 &context,
432 )
433 .await
434 .expect("Creation should succeed");
435
436 engine
438 .execute_update(&aggregate_id, UpdateCommand::Increment, &context)
439 .await
440 .expect("Update should succeed");
441
442 let event_stream = engine
444 .store
445 .load_events(&aggregate_id)
446 .await
447 .expect("Event loading should succeed");
448
449 let events: Vec<EventEnvelope<TestAggregate>> = event_stream
450 .collect::<Vec<_>>()
451 .await
452 .into_iter()
453 .collect::<Result<Vec<_>, _>>()
454 .expect("Events should be valid");
455
456 assert_eq!(events.len(), 2, "There should be two events");
457 assert_eq!(
458 events[0].payload,
459 TestEvent::Created {
460 name: "toto".to_string()
461 }
462 );
463 assert!(matches!(events[1].payload, TestEvent::Incremented));
464 }
465
466 #[tokio::test]
467 async fn test_multiple_updates() {
468 let persist = InMemoryPersist::<TestAggregate>::new();
470 let store = EventStoreImpl::new(persist);
471 let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}));
472
473 let context = CqrsContext::default();
474
475 let aggregate_id = engine
477 .execute_create(
478 CreateCommand::Initialize {
479 name: "toto".to_string(),
480 },
481 &context,
482 )
483 .await
484 .expect("Creation should succeed");
485
486 engine
488 .execute_update(&aggregate_id, UpdateCommand::Increment, &context)
489 .await
490 .expect("First update should succeed");
491
492 engine
494 .execute_update(&aggregate_id, UpdateCommand::Increment, &context)
495 .await
496 .expect("Second update should succeed");
497
498 let event_stream = engine
500 .store
501 .load_events(&aggregate_id)
502 .await
503 .expect("Event loading should succeed");
504
505 let events: Vec<EventEnvelope<TestAggregate>> = event_stream
506 .collect::<Vec<_>>()
507 .await
508 .into_iter()
509 .collect::<Result<Vec<_>, _>>()
510 .expect("Events should be valid");
511
512 assert_eq!(events.len(), 3, "There should be three events");
513 assert_eq!(
514 events[0].payload,
515 TestEvent::Created {
516 name: "toto".to_string()
517 }
518 );
519 assert!(matches!(events[1].payload, TestEvent::Incremented));
520 assert!(matches!(events[2].payload, TestEvent::Incremented));
521 }
522}