Skip to main content

cqrs_rust_lib/
engine.rs

1use crate::aggregate::{AggregateIdGenerator, DefaultIdGenerator};
2use crate::context::CqrsContext;
3use crate::denormalizer::Dispatcher;
4use crate::errors::CqrsError;
5use crate::event::Event;
6use crate::{Aggregate, CommandHandler, DynEventStore, EventEnvelope};
7use std::collections::HashMap;
8use tracing::{debug, error, info};
9
10/// The `CqrsCommandEngine` struct is a Command Query Responsibility Segregation (CQRS) engine
11/// designed to handle commands and communication with an underlying event store and various dispatchers.
12/// It acts as the main entry point for command processing and encapsulates the behavior specific to an aggregate.
13///
14/// # Type Parameters
15/// - `A`: The type of the aggregate managed by this CQRS engine.
16///   The aggregate represents the domain behavior and state transitions.
17/// - `ES`: The type of the event store used to views events related to the aggregate.
18///
19/// # Bounds
20/// - `A`: Must implement the `Aggregate` trait. This ensures the aggregate provides necessary
21///   functionality such as validating commands or applying events to mutate its state.
22/// - `ES`: Must implement the `EventStore<A>` trait. This ensures the event store works
23///   with the specified aggregate type for persisting and retrieving events.
24///
25/// # Fields
26/// - `store: ES`
27///   The event store instance used to views and retrieve events associated with the aggregate.
28///   It allows the CQRS engine to save and load the aggregate's event stream to/from persistent storage.
29///
30/// - `dispatchers: Vec<Box<dyn Dispatcher<A>>>`
31///   A collection of dispatchers used by the CQRS engine to handle various external interactions such as
32///   messaging or integration with other systems. Dispatchers are responsible for forwarding
33///   or broadcasting events and can implement custom logic based on the use case.
34///
35/// - `services: A::Services`
36///   A collection of domain-specific services required by the aggregate to perform its business operations.
37///   These services are defined within the aggregate's associated types to provide dependencies
38///   such as external APIs, configuration, or infrastructure required for executing commands.
39///
40/// # Usage
41/// Typically, the `CqrsCommandEngine` is instantiated with a concrete implementation of an event store,
42/// one or more command dispatchers, and the services needed by the aggregate. Once initialized,
43/// it can be used to dispatch commands and manage the lifecycle of aggregate instances.
44///
45/// This struct facilitates the CQRS pattern by separating the responsibility of command handling
46/// from querying, while keeping event storage and dispatching modular and configurable.
47pub struct CqrsCommandEngine<A>
48where
49    A: Aggregate + CommandHandler + 'static,
50    A::Error: Into<CqrsError>,
51{
52    store: DynEventStore<A>,
53    #[cfg(not(target_arch = "wasm32"))]
54    dispatchers: Vec<Box<dyn Dispatcher<A> + Send + Sync>>,
55    #[cfg(target_arch = "wasm32")]
56    dispatchers: Vec<Box<dyn Dispatcher<A>>>,
57    services: A::Services,
58    #[cfg(not(target_arch = "wasm32"))]
59    error_handler: Box<dyn Fn(&CqrsError) + Send + Sync>,
60    #[cfg(target_arch = "wasm32")]
61    error_handler: Box<dyn Fn(&CqrsError)>,
62    #[cfg(not(target_arch = "wasm32"))]
63    id_generator: Box<dyn AggregateIdGenerator<A> + Send + Sync>,
64    #[cfg(target_arch = "wasm32")]
65    id_generator: Box<dyn AggregateIdGenerator<A>>,
66}
67
68impl<A> CqrsCommandEngine<A>
69where
70    A: Aggregate + CommandHandler + 'static,
71    A::Error: Into<CqrsError>,
72{
73    #[must_use]
74    #[cfg(not(target_arch = "wasm32"))]
75    pub fn new(
76        store: DynEventStore<A>,
77        dispatchers: Vec<Box<dyn Dispatcher<A> + Send + Sync>>,
78        services: A::Services,
79        error_handler: Box<dyn Fn(&CqrsError) + Send + Sync>,
80    ) -> Self {
81        Self {
82            store,
83            dispatchers,
84            services,
85            error_handler,
86            id_generator: Box::new(DefaultIdGenerator),
87        }
88    }
89
90    #[must_use]
91    #[cfg(target_arch = "wasm32")]
92    pub fn new(
93        store: DynEventStore<A>,
94        dispatchers: Vec<Box<dyn Dispatcher<A>>>,
95        services: A::Services,
96        error_handler: Box<dyn Fn(&CqrsError)>,
97    ) -> Self {
98        Self {
99            store,
100            dispatchers,
101            services,
102            error_handler,
103            id_generator: Box::new(DefaultIdGenerator),
104        }
105    }
106
107    #[cfg(not(target_arch = "wasm32"))]
108    pub fn with_id_generator(
109        mut self,
110        id_generator: Box<dyn AggregateIdGenerator<A> + Send + Sync>,
111    ) -> Self {
112        self.id_generator = id_generator;
113        self
114    }
115
116    #[cfg(target_arch = "wasm32")]
117    pub fn with_id_generator(mut self, id_generator: Box<dyn AggregateIdGenerator<A>>) -> Self {
118        self.id_generator = id_generator;
119        self
120    }
121
122    #[cfg(not(target_arch = "wasm32"))]
123    pub fn append_dispatcher(&mut self, dispatcher: Box<dyn Dispatcher<A> + Send + Sync>) {
124        self.dispatchers.push(dispatcher);
125    }
126
127    #[cfg(target_arch = "wasm32")]
128    pub fn append_dispatcher(&mut self, dispatcher: Box<dyn Dispatcher<A>>) {
129        self.dispatchers.push(dispatcher);
130    }
131
132    pub async fn execute_create(
133        &self,
134        command: A::CreateCommand,
135        context: &CqrsContext,
136    ) -> Result<String, CqrsError> {
137        debug!("Executing create command");
138        let result = self
139            .execute_create_with_metadata(command, HashMap::new(), context)
140            .await;
141        match &result {
142            Ok(id) => info!(aggregate_id = %id, "Aggregate created successfully"),
143            Err(e) => error!(error = %e, "Failed to create aggregate"),
144        }
145        result
146    }
147
148    pub async fn execute_update(
149        &self,
150        aggregate_id: &str,
151        command: A::UpdateCommand,
152        context: &CqrsContext,
153    ) -> Result<(), CqrsError> {
154        debug!("Executing update command");
155        let result = self
156            .execute_update_with_metadata(aggregate_id, command, HashMap::new(), context)
157            .await;
158        match &result {
159            Ok(_) => info!("Aggregate updated successfully"),
160            Err(e) => error!(error = %e, "Failed to update aggregate"),
161        }
162        result
163    }
164
165    pub async fn execute_create_with_metadata(
166        &self,
167        command: A::CreateCommand,
168        metadata: HashMap<String, String>,
169        context: &CqrsContext,
170    ) -> Result<String, CqrsError> {
171        debug!("Executing create command with metadata");
172        let aggregate_id = self.id_generator.next_id(&command, context);
173        debug!(aggregate_id = %aggregate_id, "Generated new aggregate ID");
174
175        let (aggregate, version) = match self.store.initialize_aggregate(&aggregate_id).await {
176            Ok(result) => {
177                let (_, v) = &result;
178                debug!(version = %v, "Initialized aggregate");
179                result
180            }
181            Err(e) => {
182                error!(error = %e, "Failed to initialize aggregate");
183                return Err(e);
184            }
185        };
186
187        let events = match aggregate
188            .handle_create(command, &self.services, context)
189            .await
190        {
191            Ok(events) => {
192                debug!(
193                    event_count = events.len(),
194                    "Generated events from create command"
195                );
196                events
197            }
198            Err(e) => {
199                error!(error = %e, "Failed to handle create command");
200                return Err(e.into());
201            }
202        };
203
204        match self
205            .process(&aggregate_id, aggregate, version, events, metadata, context)
206            .await
207        {
208            Ok(_) => {
209                debug!("Processed events successfully");
210            }
211            Err(e) => {
212                error!(error = %e, "Failed to process events");
213                return Err(e);
214            }
215        }
216
217        info!(aggregate_id = %aggregate_id, "Aggregate created successfully with metadata");
218        Ok(aggregate_id)
219    }
220
221    async fn handle_events(
222        &self,
223        aggregate_id: &str,
224        events: &[EventEnvelope<A>],
225        context: &CqrsContext,
226    ) {
227        debug!("Handling events for dispatchers");
228        let eh = &self.error_handler;
229        for (i, dispatcher) in self.dispatchers.iter().enumerate() {
230            debug!(dispatcher_index = i, "Dispatching events to dispatcher");
231            match dispatcher.dispatch(aggregate_id, events, context).await {
232                Ok(_) => debug!(dispatcher_index = i, "Successfully dispatched events"),
233                Err(e) => {
234                    error!(dispatcher_index = i, error = %e, "Failed to dispatch events");
235                    eh(&e);
236                }
237            };
238        }
239        debug!("Finished handling events for all dispatchers");
240    }
241
242    pub async fn execute_update_with_metadata(
243        &self,
244        aggregate_id: &str,
245        command: A::UpdateCommand,
246        metadata: HashMap<String, String>,
247        context: &CqrsContext,
248    ) -> Result<(), CqrsError> {
249        debug!("Executing update command with metadata");
250
251        let (mut aggregate, version) = match self.store.load_aggregate(aggregate_id).await {
252            Ok(result) => {
253                let (_, v) = &result;
254                debug!(version = %v, "Loaded aggregate");
255                result
256            }
257            Err(e) => {
258                error!(error = %e, "Failed to load aggregate");
259                return Err(e);
260            }
261        };
262
263        let events = match aggregate
264            .handle_update(command, &self.services, context)
265            .await
266        {
267            Ok(events) => {
268                debug!(
269                    event_count = events.len(),
270                    "Generated events from update command"
271                );
272                events
273            }
274            Err(e) => {
275                error!(error = %e, "Failed to handle update command");
276                return Err(e.into());
277            }
278        };
279
280        for event in &events {
281            if let Err(e) = aggregate.apply(event.clone()) {
282                error!(error = %e, "Failed to apply event to aggregate");
283                return Err(e.into());
284            }
285        }
286        debug!("Applied events to aggregate");
287
288        let committed_events = match self
289            .store
290            .commit(events, &aggregate, metadata, version, context)
291            .await
292        {
293            Ok(events) => {
294                debug!(event_count = events.len(), "Committed events to store");
295                events
296            }
297            Err(e) => {
298                error!(error = %e, "Failed to commit events");
299                return Err(e);
300            }
301        };
302
303        if committed_events.is_empty() {
304            debug!("No events committed, returning early");
305            return Ok(());
306        }
307
308        debug!(
309            event_count = committed_events.len(),
310            "Dispatching events to handlers"
311        );
312        self.handle_events(aggregate_id, &committed_events, context)
313            .await;
314
315        info!("Aggregate updated successfully with metadata");
316        Ok(())
317    }
318
319    async fn process(
320        &self,
321        aggregate_id: &str,
322        mut aggregate: A,
323        version: usize,
324        events: Vec<A::Event>,
325        metadata: HashMap<String, String>,
326        context: &CqrsContext,
327    ) -> Result<(), CqrsError> {
328        debug!("Processing events for aggregate");
329
330        for (i, event) in events.iter().enumerate() {
331            debug!(
332                event_index = i,
333                event_type = event.event_type(),
334                "Applying event to aggregate"
335            );
336            match aggregate.apply(event.clone()) {
337                Ok(_) => debug!(event_index = i, "Successfully applied event to aggregate"),
338                Err(e) => {
339                    error!(event_index = i, error = %e, "Failed to apply event to aggregate");
340                    return Err(e.into());
341                }
342            }
343        }
344        debug!("Applied all events to aggregate");
345
346        debug!("Committing events to store");
347        let committed_events = match self
348            .store
349            .commit(events, &aggregate, metadata, version, context)
350            .await
351        {
352            Ok(events) => {
353                debug!(
354                    event_count = events.len(),
355                    "Successfully committed events to store"
356                );
357                events
358            }
359            Err(e) => {
360                error!(error = %e, "Failed to commit events to store");
361                return Err(e);
362            }
363        };
364
365        if committed_events.is_empty() {
366            debug!("No events committed, returning early");
367            return Ok(());
368        }
369
370        debug!(
371            event_count = committed_events.len(),
372            "Dispatching committed events to handlers"
373        );
374        self.handle_events(aggregate_id, &committed_events, context)
375            .await;
376
377        debug!("Successfully processed all events");
378        Ok(())
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use crate::es::inmemory::InMemoryPersist;
385    use crate::es::EventStoreImpl;
386    use crate::testing::{CreateCommand, TestAggregate, TestEvent, UpdateCommand};
387    use crate::CqrsCommandEngine;
388    use crate::CqrsContext;
389    use crate::EventEnvelope;
390    use futures::StreamExt;
391
392    #[tokio::test]
393    async fn test_create_aggregate() {
394        // Preparation
395        let persist = InMemoryPersist::<TestAggregate>::new();
396        let store = EventStoreImpl::new(persist);
397        let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}));
398
399        let context = CqrsContext::default();
400
401        // Execution
402        let aggregate_id = engine
403            .execute_create(
404                CreateCommand::Initialize {
405                    name: "toto".to_string(),
406                },
407                &context,
408            )
409            .await
410            .expect("Creation should succeed");
411
412        // Verification
413        assert!(!aggregate_id.is_empty(), "Aggregate ID should not be empty");
414    }
415
416    #[tokio::test]
417    async fn test_update_aggregate() {
418        // Preparation
419        let persist = InMemoryPersist::<TestAggregate>::new();
420        let store = EventStoreImpl::new(persist);
421        let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}));
422
423        let context = CqrsContext::default();
424
425        // Create the aggregate
426        let aggregate_id = engine
427            .execute_create(
428                CreateCommand::Initialize {
429                    name: "toto".to_string(),
430                },
431                &context,
432            )
433            .await
434            .expect("Creation should succeed");
435
436        // Execute the update
437        engine
438            .execute_update(&aggregate_id, UpdateCommand::Increment, &context)
439            .await
440            .expect("Update should succeed");
441
442        // Verify via stored events
443        let event_stream = engine
444            .store
445            .load_events(&aggregate_id)
446            .await
447            .expect("Event loading should succeed");
448
449        let events: Vec<EventEnvelope<TestAggregate>> = event_stream
450            .collect::<Vec<_>>()
451            .await
452            .into_iter()
453            .collect::<Result<Vec<_>, _>>()
454            .expect("Events should be valid");
455
456        assert_eq!(events.len(), 2, "There should be two events");
457        assert_eq!(
458            events[0].payload,
459            TestEvent::Created {
460                name: "toto".to_string()
461            }
462        );
463        assert!(matches!(events[1].payload, TestEvent::Incremented));
464    }
465
466    #[tokio::test]
467    async fn test_multiple_updates() {
468        // Preparation
469        let persist = InMemoryPersist::<TestAggregate>::new();
470        let store = EventStoreImpl::new(persist);
471        let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}));
472
473        let context = CqrsContext::default();
474
475        // Create the aggregate
476        let aggregate_id = engine
477            .execute_create(
478                CreateCommand::Initialize {
479                    name: "toto".to_string(),
480                },
481                &context,
482            )
483            .await
484            .expect("Creation should succeed");
485
486        // First update
487        engine
488            .execute_update(&aggregate_id, UpdateCommand::Increment, &context)
489            .await
490            .expect("First update should succeed");
491
492        // Second update
493        engine
494            .execute_update(&aggregate_id, UpdateCommand::Increment, &context)
495            .await
496            .expect("Second update should succeed");
497
498        // Verification
499        let event_stream = engine
500            .store
501            .load_events(&aggregate_id)
502            .await
503            .expect("Event loading should succeed");
504
505        let events: Vec<EventEnvelope<TestAggregate>> = event_stream
506            .collect::<Vec<_>>()
507            .await
508            .into_iter()
509            .collect::<Result<Vec<_>, _>>()
510            .expect("Events should be valid");
511
512        assert_eq!(events.len(), 3, "There should be three events");
513        assert_eq!(
514            events[0].payload,
515            TestEvent::Created {
516                name: "toto".to_string()
517            }
518        );
519        assert!(matches!(events[1].payload, TestEvent::Incremented));
520        assert!(matches!(events[2].payload, TestEvent::Incremented));
521    }
522}