Skip to main content

oxirs_stream/
cqrs.rs

1//! # CQRS (Command Query Responsibility Segregation) Implementation
2//!
3//! This module provides a complete CQRS pattern implementation for OxiRS Stream,
4//! separating command (write) and query (read) responsibilities. It integrates
5//! with the event sourcing framework for eventual consistency and scalability.
6
7use crate::event_sourcing::{EventStoreTrait, EventStream};
8use crate::StreamEvent;
9use anyhow::{anyhow, Result};
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::{broadcast, RwLock};
16use tracing::{debug, error, info, warn};
17use uuid::Uuid;
18
19/// CQRS configuration for the streaming system
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct CQRSConfig {
22    /// Maximum command processing time before timeout
23    pub command_timeout_ms: u64,
24    /// Maximum query processing time before timeout
25    pub query_timeout_ms: u64,
26    /// Enable command validation
27    pub enable_command_validation: bool,
28    /// Enable query optimization
29    pub enable_query_optimization: bool,
30    /// Command retry configuration
31    pub command_retry_config: RetryConfig,
32    /// Query cache configuration
33    pub query_cache_config: QueryCacheConfig,
34    /// Eventual consistency window (max time for read models to catch up)
35    pub consistency_window_ms: u64,
36    /// Maximum concurrent commands
37    pub max_concurrent_commands: usize,
38    /// Maximum concurrent queries
39    pub max_concurrent_queries: usize,
40}
41
42impl Default for CQRSConfig {
43    fn default() -> Self {
44        Self {
45            command_timeout_ms: 30000,
46            query_timeout_ms: 10000,
47            enable_command_validation: true,
48            enable_query_optimization: true,
49            command_retry_config: RetryConfig::default(),
50            query_cache_config: QueryCacheConfig::default(),
51            consistency_window_ms: 5000,
52            max_concurrent_commands: 1000,
53            max_concurrent_queries: 10000,
54        }
55    }
56}
57
58/// Retry configuration for commands
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct RetryConfig {
61    pub max_retries: u32,
62    pub initial_delay_ms: u64,
63    pub max_delay_ms: u64,
64    pub backoff_multiplier: f64,
65}
66
67impl Default for RetryConfig {
68    fn default() -> Self {
69        Self {
70            max_retries: 3,
71            initial_delay_ms: 100,
72            max_delay_ms: 5000,
73            backoff_multiplier: 2.0,
74        }
75    }
76}
77
78/// Query cache configuration
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct QueryCacheConfig {
81    pub enabled: bool,
82    pub max_entries: usize,
83    pub ttl_seconds: u64,
84    pub max_memory_mb: usize,
85}
86
87impl Default for QueryCacheConfig {
88    fn default() -> Self {
89        Self {
90            enabled: true,
91            max_entries: 10000,
92            ttl_seconds: 300,
93            max_memory_mb: 512,
94        }
95    }
96}
97
98/// Base trait for all commands in the CQRS system
99pub trait Command: Send + Sync + Clone + std::fmt::Debug {
100    type AggregateId: Clone + std::fmt::Debug + Send + Sync;
101    type EventType: Send + Sync + Clone;
102
103    /// Get the unique identifier for this command
104    fn command_id(&self) -> Uuid;
105
106    /// Get the aggregate ID this command operates on
107    fn aggregate_id(&self) -> Self::AggregateId;
108
109    /// Validate the command before execution
110    fn validate(&self) -> Result<()>;
111
112    /// Get the expected version for optimistic concurrency control
113    fn expected_version(&self) -> Option<u64>;
114}
115
116/// Base trait for all queries in the CQRS system
117pub trait Query: Send + Sync + Clone + std::fmt::Debug {
118    type Result: Send + Sync + Clone;
119
120    /// Get the unique identifier for this query
121    fn query_id(&self) -> Uuid;
122
123    /// Validate the query before execution
124    fn validate(&self) -> Result<()>;
125
126    /// Get cache key for this query (if cacheable)
127    fn cache_key(&self) -> Option<String>;
128
129    /// Get query timeout in milliseconds
130    fn timeout_ms(&self) -> Option<u64>;
131}
132
133/// Command handler trait for processing commands
134#[async_trait::async_trait]
135pub trait CommandHandler<C: Command>: Send + Sync {
136    /// Handle the command and return events to be persisted
137    async fn handle(&self, command: C) -> Result<Vec<StreamEvent>>;
138
139    /// Validate the command (optional additional validation)
140    async fn validate_command(&self, command: &C) -> Result<()> {
141        command.validate()
142    }
143}
144
145/// Query handler trait for processing queries
146#[async_trait::async_trait]
147pub trait QueryHandler<Q: Query>: Send + Sync {
148    /// Handle the query and return the result
149    async fn handle(&self, query: Q) -> Result<Q::Result>;
150
151    /// Validate the query (optional additional validation)
152    async fn validate_query(&self, query: &Q) -> Result<()> {
153        query.validate()
154    }
155}
156
157/// Read model projection trait for updating read models from events
158#[async_trait::async_trait]
159pub trait ReadModelProjection: Send + Sync {
160    type Event: Send + Sync;
161
162    /// Handle an event and update the read model
163    async fn handle_event(&self, event: &Self::Event) -> Result<()>;
164
165    /// Get the projection name for tracking progress
166    fn projection_name(&self) -> &str;
167
168    /// Reset the projection (for rebuilding)
169    async fn reset(&self) -> Result<()>;
170}
171
172/// Command result containing execution metadata
173#[derive(Debug, Clone)]
174pub struct CommandResult {
175    pub command_id: Uuid,
176    pub aggregate_id: String,
177    pub events_count: usize,
178    pub execution_time: Duration,
179    pub version: u64,
180    pub timestamp: DateTime<Utc>,
181}
182
183/// Query result containing execution metadata
184#[derive(Debug, Clone)]
185pub struct QueryResult<T> {
186    pub query_id: Uuid,
187    pub result: T,
188    pub execution_time: Duration,
189    pub cache_hit: bool,
190    pub timestamp: DateTime<Utc>,
191}
192
193/// CQRS command bus for handling commands
194pub struct CommandBus {
195    config: CQRSConfig,
196    event_store: Arc<dyn EventStoreTrait>,
197    handlers: Arc<RwLock<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
198    command_semaphore: Arc<tokio::sync::Semaphore>,
199    metrics: Arc<RwLock<CommandBusMetrics>>,
200    event_publisher: broadcast::Sender<StreamEvent>,
201}
202
203impl CommandBus {
204    /// Create a new command bus
205    pub fn new(config: CQRSConfig, event_store: Arc<dyn EventStoreTrait>) -> Self {
206        let (event_publisher, _) = broadcast::channel(10000);
207
208        Self {
209            command_semaphore: Arc::new(tokio::sync::Semaphore::new(
210                config.max_concurrent_commands,
211            )),
212            config,
213            event_store,
214            handlers: Arc::new(RwLock::new(HashMap::new())),
215            metrics: Arc::new(RwLock::new(CommandBusMetrics::default())),
216            event_publisher,
217        }
218    }
219
220    /// Register a command handler
221    pub async fn register_handler<C, H>(&self, handler: H)
222    where
223        C: Command + 'static,
224        H: CommandHandler<C> + 'static,
225    {
226        let type_name = std::any::type_name::<C>();
227        let mut handlers = self.handlers.write().await;
228        handlers.insert(type_name.to_string(), Box::new(handler));
229        info!("Registered command handler for {}", type_name);
230    }
231
232    /// Execute a command
233    pub async fn execute<C>(&self, command: C) -> Result<CommandResult>
234    where
235        C: Command + 'static,
236    {
237        let start_time = Instant::now();
238        let command_id = command.command_id();
239
240        debug!(
241            "Executing command {} for aggregate {:?}",
242            command_id,
243            command.aggregate_id()
244        );
245
246        // Acquire semaphore for concurrency control
247        let _permit = self.command_semaphore.acquire().await?;
248
249        // Update metrics
250        {
251            let mut metrics = self.metrics.write().await;
252            metrics.commands_received += 1;
253            metrics.active_commands += 1;
254        }
255
256        let result = self.execute_with_retry(command).await;
257
258        // Update metrics
259        {
260            let mut metrics = self.metrics.write().await;
261            metrics.active_commands -= 1;
262            match &result {
263                Ok(_) => metrics.commands_succeeded += 1,
264                Err(_) => metrics.commands_failed += 1,
265            }
266        }
267
268        let execution_time = start_time.elapsed();
269        debug!("Command {} executed in {:?}", command_id, execution_time);
270
271        result
272    }
273
274    /// Execute command with retry logic
275    async fn execute_with_retry<C>(&self, command: C) -> Result<CommandResult>
276    where
277        C: Command + 'static,
278    {
279        let mut attempt = 0;
280        let mut delay = Duration::from_millis(self.config.command_retry_config.initial_delay_ms);
281
282        loop {
283            match self.execute_once(command.clone()).await {
284                Ok(result) => return Ok(result),
285                Err(e) if attempt >= self.config.command_retry_config.max_retries => {
286                    error!(
287                        "Command {} failed after {} attempts: {}",
288                        command.command_id(),
289                        attempt + 1,
290                        e
291                    );
292                    return Err(e);
293                }
294                Err(e) => {
295                    warn!(
296                        "Command {} failed on attempt {}: {}",
297                        command.command_id(),
298                        attempt + 1,
299                        e
300                    );
301                    attempt += 1;
302
303                    tokio::time::sleep(delay).await;
304                    delay = Duration::from_millis(
305                        (delay.as_millis() as f64
306                            * self.config.command_retry_config.backoff_multiplier)
307                            as u64,
308                    )
309                    .min(Duration::from_millis(
310                        self.config.command_retry_config.max_delay_ms,
311                    ));
312                }
313            }
314        }
315    }
316
317    /// Execute command once
318    async fn execute_once<C>(&self, command: C) -> Result<CommandResult>
319    where
320        C: Command + 'static,
321    {
322        let start_time = Instant::now();
323        let command_id = command.command_id();
324        let aggregate_id = format!("{:?}", command.aggregate_id());
325
326        // Validate command
327        if self.config.enable_command_validation {
328            command.validate()?;
329        }
330
331        // Get handler
332        let type_name = std::any::type_name::<C>();
333        let handlers = self.handlers.read().await;
334        let handler = handlers
335            .get(type_name)
336            .ok_or_else(|| anyhow!("No handler registered for command type {}", type_name))?;
337
338        // Downcast handler
339        let handler = handler
340            .downcast_ref::<Box<dyn CommandHandler<C>>>()
341            .ok_or_else(|| anyhow!("Handler type mismatch for command {}", type_name))?;
342
343        // Execute handler
344        let events = handler
345            .handle(command.clone())
346            .await
347            .map_err(|e| anyhow!("Command handler error: {}", e))?;
348
349        // Store events
350        let version = if events.is_empty() {
351            0
352        } else {
353            self.event_store
354                .append_events(&aggregate_id, &events, command.expected_version())
355                .await?
356        };
357
358        // Publish events
359        for event in &events {
360            let _ = self.event_publisher.send(event.clone());
361        }
362
363        Ok(CommandResult {
364            command_id,
365            aggregate_id,
366            events_count: events.len(),
367            execution_time: start_time.elapsed(),
368            version,
369            timestamp: Utc::now(),
370        })
371    }
372
373    /// Get command bus metrics
374    pub async fn get_metrics(&self) -> CommandBusMetrics {
375        self.metrics.read().await.clone()
376    }
377
378    /// Subscribe to events
379    pub fn subscribe_to_events(&self) -> broadcast::Receiver<StreamEvent> {
380        self.event_publisher.subscribe()
381    }
382}
383
384/// CQRS query bus for handling queries
385#[derive(Debug)]
386pub struct QueryBus {
387    config: CQRSConfig,
388    handlers: Arc<RwLock<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
389    query_semaphore: Arc<tokio::sync::Semaphore>,
390    cache: Arc<RwLock<QueryCache>>,
391    metrics: Arc<RwLock<QueryBusMetrics>>,
392}
393
394impl QueryBus {
395    /// Create a new query bus
396    pub fn new(config: CQRSConfig) -> Self {
397        Self {
398            query_semaphore: Arc::new(tokio::sync::Semaphore::new(config.max_concurrent_queries)),
399            cache: Arc::new(RwLock::new(QueryCache::new(
400                config.query_cache_config.clone(),
401            ))),
402            config,
403            handlers: Arc::new(RwLock::new(HashMap::new())),
404            metrics: Arc::new(RwLock::new(QueryBusMetrics::default())),
405        }
406    }
407
408    /// Register a query handler
409    pub async fn register_handler<Q, H>(&self, handler: H)
410    where
411        Q: Query + 'static,
412        H: QueryHandler<Q> + 'static,
413    {
414        let type_name = std::any::type_name::<Q>();
415        let mut handlers = self.handlers.write().await;
416        handlers.insert(type_name.to_string(), Box::new(handler));
417        info!("Registered query handler for {}", type_name);
418    }
419
420    /// Execute a query
421    pub async fn execute<Q>(&self, query: Q) -> Result<QueryResult<Q::Result>>
422    where
423        Q: Query + 'static,
424        Q::Result:
425            Clone + Serialize + for<'de> Deserialize<'de> + oxicode::Encode + oxicode::Decode,
426    {
427        let start_time = Instant::now();
428        let query_id = query.query_id();
429
430        debug!("Executing query {}", query_id);
431
432        // Acquire semaphore for concurrency control
433        let _permit = self.query_semaphore.acquire().await?;
434
435        // Update metrics
436        {
437            let mut metrics = self.metrics.write().await;
438            metrics.queries_received += 1;
439            metrics.active_queries += 1;
440        }
441
442        // Check cache first
443        let cache_hit = if self.config.query_cache_config.enabled {
444            if let Some(cache_key) = query.cache_key() {
445                let cache = self.cache.read().await;
446                if let Some(cached_result) = cache.get::<Q::Result>(&cache_key) {
447                    let mut metrics = self.metrics.write().await;
448                    metrics.active_queries -= 1;
449                    metrics.queries_succeeded += 1;
450                    metrics.cache_hits += 1;
451
452                    return Ok(QueryResult {
453                        query_id,
454                        result: cached_result,
455                        execution_time: start_time.elapsed(),
456                        cache_hit: true,
457                        timestamp: Utc::now(),
458                    });
459                }
460            }
461            false
462        } else {
463            false
464        };
465
466        let result = self.execute_query_handler(query).await;
467
468        // Update metrics
469        {
470            let mut metrics = self.metrics.write().await;
471            metrics.active_queries -= 1;
472            match &result {
473                Ok(_) => {
474                    metrics.queries_succeeded += 1;
475                    if !cache_hit {
476                        metrics.cache_misses += 1;
477                    }
478                }
479                Err(_) => metrics.queries_failed += 1,
480            }
481        }
482
483        let execution_time = start_time.elapsed();
484        debug!("Query {} executed in {:?}", query_id, execution_time);
485
486        result.map(|r| QueryResult {
487            query_id,
488            result: r,
489            execution_time,
490            cache_hit,
491            timestamp: Utc::now(),
492        })
493    }
494
495    /// Execute query handler
496    async fn execute_query_handler<Q>(&self, query: Q) -> Result<Q::Result>
497    where
498        Q: Query + 'static,
499        Q::Result:
500            Clone + Serialize + for<'de> Deserialize<'de> + oxicode::Encode + oxicode::Decode,
501    {
502        // Validate query
503        query.validate()?;
504
505        // Get handler
506        let type_name = std::any::type_name::<Q>();
507        let handlers = self.handlers.read().await;
508        let handler = handlers
509            .get(type_name)
510            .ok_or_else(|| anyhow!("No handler registered for query type {}", type_name))?;
511
512        // Downcast handler
513        let handler = handler
514            .downcast_ref::<Box<dyn QueryHandler<Q>>>()
515            .ok_or_else(|| anyhow!("Handler type mismatch for query {}", type_name))?;
516
517        // Execute handler with timeout
518        let timeout =
519            Duration::from_millis(query.timeout_ms().unwrap_or(self.config.query_timeout_ms));
520
521        let result = tokio::time::timeout(timeout, handler.handle(query.clone()))
522            .await
523            .map_err(|_| anyhow!("Query timeout"))?
524            .map_err(|e| anyhow!("Query handler error: {}", e))?;
525
526        // Cache result if applicable
527        if self.config.query_cache_config.enabled {
528            if let Some(cache_key) = query.cache_key() {
529                let mut cache = self.cache.write().await;
530                cache.set(cache_key, result.clone());
531            }
532        }
533
534        Ok(result)
535    }
536
537    /// Get query bus metrics
538    pub async fn get_metrics(&self) -> QueryBusMetrics {
539        self.metrics.read().await.clone()
540    }
541
542    /// Clear query cache
543    pub async fn clear_cache(&self) {
544        let mut cache = self.cache.write().await;
545        cache.clear();
546    }
547}
548
549// Type alias for complex projection type
550type ProjectionMap =
551    Arc<RwLock<HashMap<String, Box<dyn ReadModelProjection<Event = StreamEvent>>>>>;
552
553/// Read model manager for handling projections
554pub struct ReadModelManager {
555    projections: ProjectionMap,
556    projection_positions: Arc<RwLock<HashMap<String, u64>>>,
557    event_stream: Arc<dyn EventStream>,
558    metrics: Arc<RwLock<ReadModelMetrics>>,
559}
560
561impl ReadModelManager {
562    /// Create a new read model manager
563    pub fn new(event_stream: Arc<dyn EventStream>) -> Self {
564        Self {
565            projections: Arc::new(RwLock::new(HashMap::new())),
566            projection_positions: Arc::new(RwLock::new(HashMap::new())),
567            event_stream,
568            metrics: Arc::new(RwLock::new(ReadModelMetrics::default())),
569        }
570    }
571
572    /// Register a read model projection
573    pub async fn register_projection<P>(&self, projection: P)
574    where
575        P: ReadModelProjection<Event = StreamEvent> + 'static,
576    {
577        let name = projection.projection_name().to_string();
578        let mut projections = self.projections.write().await;
579        projections.insert(name.clone(), Box::new(projection));
580
581        let mut positions = self.projection_positions.write().await;
582        positions.entry(name.clone()).or_insert(0);
583
584        info!("Registered read model projection: {}", name);
585    }
586
587    /// Process events for all projections
588    pub async fn process_events(&self) -> Result<()> {
589        let projections = self.projections.read().await;
590
591        for (name, projection) in projections.iter() {
592            if let Err(e) = self.process_projection(name, projection.as_ref()).await {
593                error!("Error processing projection {}: {}", name, e);
594
595                let mut metrics = self.metrics.write().await;
596                *metrics.projection_errors.entry(name.clone()).or_insert(0) += 1;
597            }
598        }
599
600        Ok(())
601    }
602
603    /// Process events for a specific projection
604    async fn process_projection(
605        &self,
606        name: &str,
607        projection: &dyn ReadModelProjection<Event = StreamEvent>,
608    ) -> Result<()> {
609        let position = {
610            let positions = self.projection_positions.read().await;
611            positions.get(name).copied().unwrap_or(0)
612        };
613
614        let events = self
615            .event_stream
616            .read_events_from_position(position, 1000)
617            .await?;
618
619        for stored_event in events {
620            if let Err(e) = projection.handle_event(&stored_event.event_data).await {
621                error!("Projection {} failed to handle event: {}", name, e);
622                return Err(anyhow!("Projection error: {}", e));
623            }
624
625            // Update position
626            let mut positions = self.projection_positions.write().await;
627            *positions.entry(name.to_string()).or_insert(0) += 1;
628        }
629
630        Ok(())
631    }
632
633    /// Reset a projection
634    pub async fn reset_projection(&self, name: &str) -> Result<()> {
635        let projections = self.projections.read().await;
636        let projection = projections
637            .get(name)
638            .ok_or_else(|| anyhow!("Projection not found: {}", name))?;
639
640        projection
641            .reset()
642            .await
643            .map_err(|e| anyhow!("Failed to reset projection {}: {}", name, e))?;
644
645        let mut positions = self.projection_positions.write().await;
646        positions.insert(name.to_string(), 0);
647
648        info!("Reset projection: {}", name);
649        Ok(())
650    }
651
652    /// Get read model metrics
653    pub async fn get_metrics(&self) -> ReadModelMetrics {
654        self.metrics.read().await.clone()
655    }
656}
657
658/// Query cache implementation
659#[derive(Debug)]
660struct QueryCache {
661    config: QueryCacheConfig,
662    entries: HashMap<String, CacheEntry>,
663}
664
665#[derive(Debug, Clone)]
666struct CacheEntry {
667    data: Vec<u8>,
668    created_at: DateTime<Utc>,
669    size_bytes: usize,
670}
671
672impl QueryCache {
673    fn new(config: QueryCacheConfig) -> Self {
674        Self {
675            config,
676            entries: HashMap::new(),
677        }
678    }
679
680    fn get<T>(&self, key: &str) -> Option<T>
681    where
682        T: for<'de> Deserialize<'de> + oxicode::Decode,
683    {
684        if !self.config.enabled {
685            return None;
686        }
687
688        if let Some(entry) = self.entries.get(key) {
689            let age = Utc::now().signed_duration_since(entry.created_at);
690            if age.num_seconds() < self.config.ttl_seconds as i64 {
691                if let Ok((value, _)) =
692                    oxicode::serde::decode_from_slice(&entry.data, oxicode::config::standard())
693                {
694                    return Some(value);
695                }
696            }
697        }
698
699        None
700    }
701
702    fn set<T>(&mut self, key: String, value: T)
703    where
704        T: Serialize + oxicode::Encode,
705    {
706        if !self.config.enabled {
707            return;
708        }
709
710        if let Ok(data) = oxicode::serde::encode_to_vec(&value, oxicode::config::standard()) {
711            let entry = CacheEntry {
712                size_bytes: data.len(),
713                data,
714                created_at: Utc::now(),
715            };
716
717            self.entries.insert(key, entry);
718            self.evict_if_needed();
719        }
720    }
721
722    fn clear(&mut self) {
723        self.entries.clear();
724    }
725
726    fn evict_if_needed(&mut self) {
727        // Remove expired entries
728        let now = Utc::now();
729        self.entries.retain(|_, entry| {
730            let age = now.signed_duration_since(entry.created_at);
731            age.num_seconds() < self.config.ttl_seconds as i64
732        });
733
734        // Remove entries if over limit
735        if self.entries.len() > self.config.max_entries {
736            let mut entries: Vec<_> = self
737                .entries
738                .iter()
739                .map(|(k, v)| (k.clone(), v.created_at))
740                .collect();
741            entries.sort_by_key(|(_, created_at)| *created_at);
742
743            let to_remove = self.entries.len() - self.config.max_entries;
744            for (key, _) in entries.iter().take(to_remove) {
745                self.entries.remove(key);
746            }
747        }
748
749        // Check memory usage
750        let total_size: usize = self.entries.values().map(|e| e.size_bytes).sum();
751        let max_size = self.config.max_memory_mb * 1024 * 1024;
752
753        if total_size > max_size {
754            let mut entries: Vec<_> = self
755                .entries
756                .iter()
757                .map(|(k, v)| (k.clone(), v.created_at, v.size_bytes))
758                .collect();
759            entries.sort_by_key(|(_, created_at, _)| *created_at);
760
761            let mut current_size = total_size;
762            for (key, _, size_bytes) in entries {
763                if current_size <= max_size {
764                    break;
765                }
766                current_size -= size_bytes;
767                self.entries.remove(&key);
768            }
769        }
770    }
771}
772
773/// Command bus metrics
774#[derive(Debug, Clone, Default)]
775pub struct CommandBusMetrics {
776    pub commands_received: u64,
777    pub commands_succeeded: u64,
778    pub commands_failed: u64,
779    pub active_commands: u64,
780}
781
782/// Query bus metrics
783#[derive(Debug, Clone, Default)]
784pub struct QueryBusMetrics {
785    pub queries_received: u64,
786    pub queries_succeeded: u64,
787    pub queries_failed: u64,
788    pub active_queries: u64,
789    pub cache_hits: u64,
790    pub cache_misses: u64,
791}
792
793/// Read model metrics
794#[derive(Debug, Clone, Default)]
795pub struct ReadModelMetrics {
796    pub projection_errors: HashMap<String, u64>,
797    pub events_processed: u64,
798    pub projections_active: u64,
799}
800
801/// Complete CQRS system coordinator
802pub struct CQRSSystem {
803    pub command_bus: CommandBus,
804    pub query_bus: QueryBus,
805    pub read_model_manager: ReadModelManager,
806    config: CQRSConfig,
807}
808
809impl CQRSSystem {
810    /// Create a new CQRS system
811    pub fn new(
812        config: CQRSConfig,
813        event_store: Arc<dyn EventStoreTrait>,
814        event_stream: Arc<dyn EventStream>,
815    ) -> Self {
816        let command_bus = CommandBus::new(config.clone(), event_store);
817        let query_bus = QueryBus::new(config.clone());
818        let read_model_manager = ReadModelManager::new(event_stream);
819
820        Self {
821            command_bus,
822            query_bus,
823            read_model_manager,
824            config,
825        }
826    }
827
828    /// Start the CQRS system
829    pub async fn start(self: Arc<Self>) -> Result<()> {
830        info!("Starting CQRS system");
831
832        // Start read model processing
833        let system_clone = Arc::clone(&self);
834
835        tokio::spawn(async move {
836            let mut interval = tokio::time::interval(Duration::from_millis(1000));
837            loop {
838                interval.tick().await;
839                if let Err(e) = system_clone.read_model_manager.process_events().await {
840                    error!("Error processing read model events: {}", e);
841                }
842            }
843        });
844
845        info!("CQRS system started successfully");
846        Ok(())
847    }
848
849    /// Get system health status
850    pub async fn health_check(&self) -> CQRSHealthStatus {
851        let command_metrics = self.command_bus.get_metrics().await;
852        let query_metrics = self.query_bus.get_metrics().await;
853        let read_model_metrics = self.read_model_manager.get_metrics().await;
854
855        CQRSHealthStatus {
856            command_bus_healthy: command_metrics.active_commands
857                < self.config.max_concurrent_commands as u64,
858            query_bus_healthy: query_metrics.active_queries
859                < self.config.max_concurrent_queries as u64,
860            read_models_healthy: read_model_metrics.projection_errors.is_empty(),
861            command_metrics,
862            query_metrics,
863            read_model_metrics,
864        }
865    }
866}
867
868/// CQRS system health status
869#[derive(Debug, Clone)]
870pub struct CQRSHealthStatus {
871    pub command_bus_healthy: bool,
872    pub query_bus_healthy: bool,
873    pub read_models_healthy: bool,
874    pub command_metrics: CommandBusMetrics,
875    pub query_metrics: QueryBusMetrics,
876    pub read_model_metrics: ReadModelMetrics,
877}
878
879#[cfg(test)]
880mod tests {
881    use super::*;
882
883    #[derive(Debug, Clone)]
884    struct TestCommand {
885        id: Uuid,
886        aggregate_id: String,
887        data: String,
888    }
889
890    impl Command for TestCommand {
891        type AggregateId = String;
892        type EventType = String;
893
894        fn command_id(&self) -> Uuid {
895            self.id
896        }
897
898        fn aggregate_id(&self) -> Self::AggregateId {
899            self.aggregate_id.clone()
900        }
901
902        fn validate(&self) -> Result<()> {
903            if self.data.is_empty() {
904                return Err(anyhow!("Data cannot be empty"));
905            }
906            Ok(())
907        }
908
909        fn expected_version(&self) -> Option<u64> {
910            None
911        }
912    }
913
914    #[derive(Debug, Clone)]
915    struct TestQuery {
916        id: Uuid,
917        filter: String,
918    }
919
920    impl Query for TestQuery {
921        type Result = Vec<String>;
922
923        fn query_id(&self) -> Uuid {
924            self.id
925        }
926
927        fn validate(&self) -> Result<()> {
928            Ok(())
929        }
930
931        fn cache_key(&self) -> Option<String> {
932            Some(format!("test_query_{}", self.filter))
933        }
934
935        fn timeout_ms(&self) -> Option<u64> {
936            Some(5000)
937        }
938    }
939
940    #[tokio::test]
941    async fn test_cqrs_config_defaults() {
942        let config = CQRSConfig::default();
943        assert_eq!(config.command_timeout_ms, 30000);
944        assert_eq!(config.query_timeout_ms, 10000);
945        assert!(config.enable_command_validation);
946        assert!(config.enable_query_optimization);
947    }
948
949    #[tokio::test]
950    async fn test_command_validation() {
951        let valid_command = TestCommand {
952            id: Uuid::new_v4(),
953            aggregate_id: "test".to_string(),
954            data: "valid data".to_string(),
955        };
956
957        let invalid_command = TestCommand {
958            id: Uuid::new_v4(),
959            aggregate_id: "test".to_string(),
960            data: "".to_string(),
961        };
962
963        assert!(valid_command.validate().is_ok());
964        assert!(invalid_command.validate().is_err());
965    }
966
967    #[tokio::test]
968    async fn test_query_cache_key() {
969        let query = TestQuery {
970            id: Uuid::new_v4(),
971            filter: "status=active".to_string(),
972        };
973
974        assert_eq!(
975            query.cache_key(),
976            Some("test_query_status=active".to_string())
977        );
978    }
979}