1use crate::event_sourcing::{EventStoreTrait, EventStream};
8use crate::StreamEvent;
9use anyhow::{anyhow, Result};
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::{broadcast, RwLock};
16use tracing::{debug, error, info, warn};
17use uuid::Uuid;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct CQRSConfig {
22 pub command_timeout_ms: u64,
24 pub query_timeout_ms: u64,
26 pub enable_command_validation: bool,
28 pub enable_query_optimization: bool,
30 pub command_retry_config: RetryConfig,
32 pub query_cache_config: QueryCacheConfig,
34 pub consistency_window_ms: u64,
36 pub max_concurrent_commands: usize,
38 pub max_concurrent_queries: usize,
40}
41
42impl Default for CQRSConfig {
43 fn default() -> Self {
44 Self {
45 command_timeout_ms: 30000,
46 query_timeout_ms: 10000,
47 enable_command_validation: true,
48 enable_query_optimization: true,
49 command_retry_config: RetryConfig::default(),
50 query_cache_config: QueryCacheConfig::default(),
51 consistency_window_ms: 5000,
52 max_concurrent_commands: 1000,
53 max_concurrent_queries: 10000,
54 }
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct RetryConfig {
61 pub max_retries: u32,
62 pub initial_delay_ms: u64,
63 pub max_delay_ms: u64,
64 pub backoff_multiplier: f64,
65}
66
67impl Default for RetryConfig {
68 fn default() -> Self {
69 Self {
70 max_retries: 3,
71 initial_delay_ms: 100,
72 max_delay_ms: 5000,
73 backoff_multiplier: 2.0,
74 }
75 }
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct QueryCacheConfig {
81 pub enabled: bool,
82 pub max_entries: usize,
83 pub ttl_seconds: u64,
84 pub max_memory_mb: usize,
85}
86
87impl Default for QueryCacheConfig {
88 fn default() -> Self {
89 Self {
90 enabled: true,
91 max_entries: 10000,
92 ttl_seconds: 300,
93 max_memory_mb: 512,
94 }
95 }
96}
97
98pub trait Command: Send + Sync + Clone + std::fmt::Debug {
100 type AggregateId: Clone + std::fmt::Debug + Send + Sync;
101 type EventType: Send + Sync + Clone;
102
103 fn command_id(&self) -> Uuid;
105
106 fn aggregate_id(&self) -> Self::AggregateId;
108
109 fn validate(&self) -> Result<()>;
111
112 fn expected_version(&self) -> Option<u64>;
114}
115
116pub trait Query: Send + Sync + Clone + std::fmt::Debug {
118 type Result: Send + Sync + Clone;
119
120 fn query_id(&self) -> Uuid;
122
123 fn validate(&self) -> Result<()>;
125
126 fn cache_key(&self) -> Option<String>;
128
129 fn timeout_ms(&self) -> Option<u64>;
131}
132
133#[async_trait::async_trait]
135pub trait CommandHandler<C: Command>: Send + Sync {
136 async fn handle(&self, command: C) -> Result<Vec<StreamEvent>>;
138
139 async fn validate_command(&self, command: &C) -> Result<()> {
141 command.validate()
142 }
143}
144
145#[async_trait::async_trait]
147pub trait QueryHandler<Q: Query>: Send + Sync {
148 async fn handle(&self, query: Q) -> Result<Q::Result>;
150
151 async fn validate_query(&self, query: &Q) -> Result<()> {
153 query.validate()
154 }
155}
156
157#[async_trait::async_trait]
159pub trait ReadModelProjection: Send + Sync {
160 type Event: Send + Sync;
161
162 async fn handle_event(&self, event: &Self::Event) -> Result<()>;
164
165 fn projection_name(&self) -> &str;
167
168 async fn reset(&self) -> Result<()>;
170}
171
172#[derive(Debug, Clone)]
174pub struct CommandResult {
175 pub command_id: Uuid,
176 pub aggregate_id: String,
177 pub events_count: usize,
178 pub execution_time: Duration,
179 pub version: u64,
180 pub timestamp: DateTime<Utc>,
181}
182
183#[derive(Debug, Clone)]
185pub struct QueryResult<T> {
186 pub query_id: Uuid,
187 pub result: T,
188 pub execution_time: Duration,
189 pub cache_hit: bool,
190 pub timestamp: DateTime<Utc>,
191}
192
193pub struct CommandBus {
195 config: CQRSConfig,
196 event_store: Arc<dyn EventStoreTrait>,
197 handlers: Arc<RwLock<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
198 command_semaphore: Arc<tokio::sync::Semaphore>,
199 metrics: Arc<RwLock<CommandBusMetrics>>,
200 event_publisher: broadcast::Sender<StreamEvent>,
201}
202
203impl CommandBus {
204 pub fn new(config: CQRSConfig, event_store: Arc<dyn EventStoreTrait>) -> Self {
206 let (event_publisher, _) = broadcast::channel(10000);
207
208 Self {
209 command_semaphore: Arc::new(tokio::sync::Semaphore::new(
210 config.max_concurrent_commands,
211 )),
212 config,
213 event_store,
214 handlers: Arc::new(RwLock::new(HashMap::new())),
215 metrics: Arc::new(RwLock::new(CommandBusMetrics::default())),
216 event_publisher,
217 }
218 }
219
220 pub async fn register_handler<C, H>(&self, handler: H)
222 where
223 C: Command + 'static,
224 H: CommandHandler<C> + 'static,
225 {
226 let type_name = std::any::type_name::<C>();
227 let mut handlers = self.handlers.write().await;
228 handlers.insert(type_name.to_string(), Box::new(handler));
229 info!("Registered command handler for {}", type_name);
230 }
231
232 pub async fn execute<C>(&self, command: C) -> Result<CommandResult>
234 where
235 C: Command + 'static,
236 {
237 let start_time = Instant::now();
238 let command_id = command.command_id();
239
240 debug!(
241 "Executing command {} for aggregate {:?}",
242 command_id,
243 command.aggregate_id()
244 );
245
246 let _permit = self.command_semaphore.acquire().await?;
248
249 {
251 let mut metrics = self.metrics.write().await;
252 metrics.commands_received += 1;
253 metrics.active_commands += 1;
254 }
255
256 let result = self.execute_with_retry(command).await;
257
258 {
260 let mut metrics = self.metrics.write().await;
261 metrics.active_commands -= 1;
262 match &result {
263 Ok(_) => metrics.commands_succeeded += 1,
264 Err(_) => metrics.commands_failed += 1,
265 }
266 }
267
268 let execution_time = start_time.elapsed();
269 debug!("Command {} executed in {:?}", command_id, execution_time);
270
271 result
272 }
273
274 async fn execute_with_retry<C>(&self, command: C) -> Result<CommandResult>
276 where
277 C: Command + 'static,
278 {
279 let mut attempt = 0;
280 let mut delay = Duration::from_millis(self.config.command_retry_config.initial_delay_ms);
281
282 loop {
283 match self.execute_once(command.clone()).await {
284 Ok(result) => return Ok(result),
285 Err(e) if attempt >= self.config.command_retry_config.max_retries => {
286 error!(
287 "Command {} failed after {} attempts: {}",
288 command.command_id(),
289 attempt + 1,
290 e
291 );
292 return Err(e);
293 }
294 Err(e) => {
295 warn!(
296 "Command {} failed on attempt {}: {}",
297 command.command_id(),
298 attempt + 1,
299 e
300 );
301 attempt += 1;
302
303 tokio::time::sleep(delay).await;
304 delay = Duration::from_millis(
305 (delay.as_millis() as f64
306 * self.config.command_retry_config.backoff_multiplier)
307 as u64,
308 )
309 .min(Duration::from_millis(
310 self.config.command_retry_config.max_delay_ms,
311 ));
312 }
313 }
314 }
315 }
316
317 async fn execute_once<C>(&self, command: C) -> Result<CommandResult>
319 where
320 C: Command + 'static,
321 {
322 let start_time = Instant::now();
323 let command_id = command.command_id();
324 let aggregate_id = format!("{:?}", command.aggregate_id());
325
326 if self.config.enable_command_validation {
328 command.validate()?;
329 }
330
331 let type_name = std::any::type_name::<C>();
333 let handlers = self.handlers.read().await;
334 let handler = handlers
335 .get(type_name)
336 .ok_or_else(|| anyhow!("No handler registered for command type {}", type_name))?;
337
338 let handler = handler
340 .downcast_ref::<Box<dyn CommandHandler<C>>>()
341 .ok_or_else(|| anyhow!("Handler type mismatch for command {}", type_name))?;
342
343 let events = handler
345 .handle(command.clone())
346 .await
347 .map_err(|e| anyhow!("Command handler error: {}", e))?;
348
349 let version = if events.is_empty() {
351 0
352 } else {
353 self.event_store
354 .append_events(&aggregate_id, &events, command.expected_version())
355 .await?
356 };
357
358 for event in &events {
360 let _ = self.event_publisher.send(event.clone());
361 }
362
363 Ok(CommandResult {
364 command_id,
365 aggregate_id,
366 events_count: events.len(),
367 execution_time: start_time.elapsed(),
368 version,
369 timestamp: Utc::now(),
370 })
371 }
372
373 pub async fn get_metrics(&self) -> CommandBusMetrics {
375 self.metrics.read().await.clone()
376 }
377
378 pub fn subscribe_to_events(&self) -> broadcast::Receiver<StreamEvent> {
380 self.event_publisher.subscribe()
381 }
382}
383
384#[derive(Debug)]
386pub struct QueryBus {
387 config: CQRSConfig,
388 handlers: Arc<RwLock<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
389 query_semaphore: Arc<tokio::sync::Semaphore>,
390 cache: Arc<RwLock<QueryCache>>,
391 metrics: Arc<RwLock<QueryBusMetrics>>,
392}
393
394impl QueryBus {
395 pub fn new(config: CQRSConfig) -> Self {
397 Self {
398 query_semaphore: Arc::new(tokio::sync::Semaphore::new(config.max_concurrent_queries)),
399 cache: Arc::new(RwLock::new(QueryCache::new(
400 config.query_cache_config.clone(),
401 ))),
402 config,
403 handlers: Arc::new(RwLock::new(HashMap::new())),
404 metrics: Arc::new(RwLock::new(QueryBusMetrics::default())),
405 }
406 }
407
408 pub async fn register_handler<Q, H>(&self, handler: H)
410 where
411 Q: Query + 'static,
412 H: QueryHandler<Q> + 'static,
413 {
414 let type_name = std::any::type_name::<Q>();
415 let mut handlers = self.handlers.write().await;
416 handlers.insert(type_name.to_string(), Box::new(handler));
417 info!("Registered query handler for {}", type_name);
418 }
419
420 pub async fn execute<Q>(&self, query: Q) -> Result<QueryResult<Q::Result>>
422 where
423 Q: Query + 'static,
424 Q::Result:
425 Clone + Serialize + for<'de> Deserialize<'de> + oxicode::Encode + oxicode::Decode,
426 {
427 let start_time = Instant::now();
428 let query_id = query.query_id();
429
430 debug!("Executing query {}", query_id);
431
432 let _permit = self.query_semaphore.acquire().await?;
434
435 {
437 let mut metrics = self.metrics.write().await;
438 metrics.queries_received += 1;
439 metrics.active_queries += 1;
440 }
441
442 let cache_hit = if self.config.query_cache_config.enabled {
444 if let Some(cache_key) = query.cache_key() {
445 let cache = self.cache.read().await;
446 if let Some(cached_result) = cache.get::<Q::Result>(&cache_key) {
447 let mut metrics = self.metrics.write().await;
448 metrics.active_queries -= 1;
449 metrics.queries_succeeded += 1;
450 metrics.cache_hits += 1;
451
452 return Ok(QueryResult {
453 query_id,
454 result: cached_result,
455 execution_time: start_time.elapsed(),
456 cache_hit: true,
457 timestamp: Utc::now(),
458 });
459 }
460 }
461 false
462 } else {
463 false
464 };
465
466 let result = self.execute_query_handler(query).await;
467
468 {
470 let mut metrics = self.metrics.write().await;
471 metrics.active_queries -= 1;
472 match &result {
473 Ok(_) => {
474 metrics.queries_succeeded += 1;
475 if !cache_hit {
476 metrics.cache_misses += 1;
477 }
478 }
479 Err(_) => metrics.queries_failed += 1,
480 }
481 }
482
483 let execution_time = start_time.elapsed();
484 debug!("Query {} executed in {:?}", query_id, execution_time);
485
486 result.map(|r| QueryResult {
487 query_id,
488 result: r,
489 execution_time,
490 cache_hit,
491 timestamp: Utc::now(),
492 })
493 }
494
495 async fn execute_query_handler<Q>(&self, query: Q) -> Result<Q::Result>
497 where
498 Q: Query + 'static,
499 Q::Result:
500 Clone + Serialize + for<'de> Deserialize<'de> + oxicode::Encode + oxicode::Decode,
501 {
502 query.validate()?;
504
505 let type_name = std::any::type_name::<Q>();
507 let handlers = self.handlers.read().await;
508 let handler = handlers
509 .get(type_name)
510 .ok_or_else(|| anyhow!("No handler registered for query type {}", type_name))?;
511
512 let handler = handler
514 .downcast_ref::<Box<dyn QueryHandler<Q>>>()
515 .ok_or_else(|| anyhow!("Handler type mismatch for query {}", type_name))?;
516
517 let timeout =
519 Duration::from_millis(query.timeout_ms().unwrap_or(self.config.query_timeout_ms));
520
521 let result = tokio::time::timeout(timeout, handler.handle(query.clone()))
522 .await
523 .map_err(|_| anyhow!("Query timeout"))?
524 .map_err(|e| anyhow!("Query handler error: {}", e))?;
525
526 if self.config.query_cache_config.enabled {
528 if let Some(cache_key) = query.cache_key() {
529 let mut cache = self.cache.write().await;
530 cache.set(cache_key, result.clone());
531 }
532 }
533
534 Ok(result)
535 }
536
537 pub async fn get_metrics(&self) -> QueryBusMetrics {
539 self.metrics.read().await.clone()
540 }
541
542 pub async fn clear_cache(&self) {
544 let mut cache = self.cache.write().await;
545 cache.clear();
546 }
547}
548
549type ProjectionMap =
551 Arc<RwLock<HashMap<String, Box<dyn ReadModelProjection<Event = StreamEvent>>>>>;
552
553pub struct ReadModelManager {
555 projections: ProjectionMap,
556 projection_positions: Arc<RwLock<HashMap<String, u64>>>,
557 event_stream: Arc<dyn EventStream>,
558 metrics: Arc<RwLock<ReadModelMetrics>>,
559}
560
561impl ReadModelManager {
562 pub fn new(event_stream: Arc<dyn EventStream>) -> Self {
564 Self {
565 projections: Arc::new(RwLock::new(HashMap::new())),
566 projection_positions: Arc::new(RwLock::new(HashMap::new())),
567 event_stream,
568 metrics: Arc::new(RwLock::new(ReadModelMetrics::default())),
569 }
570 }
571
572 pub async fn register_projection<P>(&self, projection: P)
574 where
575 P: ReadModelProjection<Event = StreamEvent> + 'static,
576 {
577 let name = projection.projection_name().to_string();
578 let mut projections = self.projections.write().await;
579 projections.insert(name.clone(), Box::new(projection));
580
581 let mut positions = self.projection_positions.write().await;
582 positions.entry(name.clone()).or_insert(0);
583
584 info!("Registered read model projection: {}", name);
585 }
586
587 pub async fn process_events(&self) -> Result<()> {
589 let projections = self.projections.read().await;
590
591 for (name, projection) in projections.iter() {
592 if let Err(e) = self.process_projection(name, projection.as_ref()).await {
593 error!("Error processing projection {}: {}", name, e);
594
595 let mut metrics = self.metrics.write().await;
596 *metrics.projection_errors.entry(name.clone()).or_insert(0) += 1;
597 }
598 }
599
600 Ok(())
601 }
602
603 async fn process_projection(
605 &self,
606 name: &str,
607 projection: &dyn ReadModelProjection<Event = StreamEvent>,
608 ) -> Result<()> {
609 let position = {
610 let positions = self.projection_positions.read().await;
611 positions.get(name).copied().unwrap_or(0)
612 };
613
614 let events = self
615 .event_stream
616 .read_events_from_position(position, 1000)
617 .await?;
618
619 for stored_event in events {
620 if let Err(e) = projection.handle_event(&stored_event.event_data).await {
621 error!("Projection {} failed to handle event: {}", name, e);
622 return Err(anyhow!("Projection error: {}", e));
623 }
624
625 let mut positions = self.projection_positions.write().await;
627 *positions.entry(name.to_string()).or_insert(0) += 1;
628 }
629
630 Ok(())
631 }
632
633 pub async fn reset_projection(&self, name: &str) -> Result<()> {
635 let projections = self.projections.read().await;
636 let projection = projections
637 .get(name)
638 .ok_or_else(|| anyhow!("Projection not found: {}", name))?;
639
640 projection
641 .reset()
642 .await
643 .map_err(|e| anyhow!("Failed to reset projection {}: {}", name, e))?;
644
645 let mut positions = self.projection_positions.write().await;
646 positions.insert(name.to_string(), 0);
647
648 info!("Reset projection: {}", name);
649 Ok(())
650 }
651
652 pub async fn get_metrics(&self) -> ReadModelMetrics {
654 self.metrics.read().await.clone()
655 }
656}
657
658#[derive(Debug)]
660struct QueryCache {
661 config: QueryCacheConfig,
662 entries: HashMap<String, CacheEntry>,
663}
664
665#[derive(Debug, Clone)]
666struct CacheEntry {
667 data: Vec<u8>,
668 created_at: DateTime<Utc>,
669 size_bytes: usize,
670}
671
672impl QueryCache {
673 fn new(config: QueryCacheConfig) -> Self {
674 Self {
675 config,
676 entries: HashMap::new(),
677 }
678 }
679
680 fn get<T>(&self, key: &str) -> Option<T>
681 where
682 T: for<'de> Deserialize<'de> + oxicode::Decode,
683 {
684 if !self.config.enabled {
685 return None;
686 }
687
688 if let Some(entry) = self.entries.get(key) {
689 let age = Utc::now().signed_duration_since(entry.created_at);
690 if age.num_seconds() < self.config.ttl_seconds as i64 {
691 if let Ok((value, _)) =
692 oxicode::serde::decode_from_slice(&entry.data, oxicode::config::standard())
693 {
694 return Some(value);
695 }
696 }
697 }
698
699 None
700 }
701
702 fn set<T>(&mut self, key: String, value: T)
703 where
704 T: Serialize + oxicode::Encode,
705 {
706 if !self.config.enabled {
707 return;
708 }
709
710 if let Ok(data) = oxicode::serde::encode_to_vec(&value, oxicode::config::standard()) {
711 let entry = CacheEntry {
712 size_bytes: data.len(),
713 data,
714 created_at: Utc::now(),
715 };
716
717 self.entries.insert(key, entry);
718 self.evict_if_needed();
719 }
720 }
721
722 fn clear(&mut self) {
723 self.entries.clear();
724 }
725
726 fn evict_if_needed(&mut self) {
727 let now = Utc::now();
729 self.entries.retain(|_, entry| {
730 let age = now.signed_duration_since(entry.created_at);
731 age.num_seconds() < self.config.ttl_seconds as i64
732 });
733
734 if self.entries.len() > self.config.max_entries {
736 let mut entries: Vec<_> = self
737 .entries
738 .iter()
739 .map(|(k, v)| (k.clone(), v.created_at))
740 .collect();
741 entries.sort_by_key(|(_, created_at)| *created_at);
742
743 let to_remove = self.entries.len() - self.config.max_entries;
744 for (key, _) in entries.iter().take(to_remove) {
745 self.entries.remove(key);
746 }
747 }
748
749 let total_size: usize = self.entries.values().map(|e| e.size_bytes).sum();
751 let max_size = self.config.max_memory_mb * 1024 * 1024;
752
753 if total_size > max_size {
754 let mut entries: Vec<_> = self
755 .entries
756 .iter()
757 .map(|(k, v)| (k.clone(), v.created_at, v.size_bytes))
758 .collect();
759 entries.sort_by_key(|(_, created_at, _)| *created_at);
760
761 let mut current_size = total_size;
762 for (key, _, size_bytes) in entries {
763 if current_size <= max_size {
764 break;
765 }
766 current_size -= size_bytes;
767 self.entries.remove(&key);
768 }
769 }
770 }
771}
772
773#[derive(Debug, Clone, Default)]
775pub struct CommandBusMetrics {
776 pub commands_received: u64,
777 pub commands_succeeded: u64,
778 pub commands_failed: u64,
779 pub active_commands: u64,
780}
781
782#[derive(Debug, Clone, Default)]
784pub struct QueryBusMetrics {
785 pub queries_received: u64,
786 pub queries_succeeded: u64,
787 pub queries_failed: u64,
788 pub active_queries: u64,
789 pub cache_hits: u64,
790 pub cache_misses: u64,
791}
792
793#[derive(Debug, Clone, Default)]
795pub struct ReadModelMetrics {
796 pub projection_errors: HashMap<String, u64>,
797 pub events_processed: u64,
798 pub projections_active: u64,
799}
800
801pub struct CQRSSystem {
803 pub command_bus: CommandBus,
804 pub query_bus: QueryBus,
805 pub read_model_manager: ReadModelManager,
806 config: CQRSConfig,
807}
808
809impl CQRSSystem {
810 pub fn new(
812 config: CQRSConfig,
813 event_store: Arc<dyn EventStoreTrait>,
814 event_stream: Arc<dyn EventStream>,
815 ) -> Self {
816 let command_bus = CommandBus::new(config.clone(), event_store);
817 let query_bus = QueryBus::new(config.clone());
818 let read_model_manager = ReadModelManager::new(event_stream);
819
820 Self {
821 command_bus,
822 query_bus,
823 read_model_manager,
824 config,
825 }
826 }
827
828 pub async fn start(self: Arc<Self>) -> Result<()> {
830 info!("Starting CQRS system");
831
832 let system_clone = Arc::clone(&self);
834
835 tokio::spawn(async move {
836 let mut interval = tokio::time::interval(Duration::from_millis(1000));
837 loop {
838 interval.tick().await;
839 if let Err(e) = system_clone.read_model_manager.process_events().await {
840 error!("Error processing read model events: {}", e);
841 }
842 }
843 });
844
845 info!("CQRS system started successfully");
846 Ok(())
847 }
848
849 pub async fn health_check(&self) -> CQRSHealthStatus {
851 let command_metrics = self.command_bus.get_metrics().await;
852 let query_metrics = self.query_bus.get_metrics().await;
853 let read_model_metrics = self.read_model_manager.get_metrics().await;
854
855 CQRSHealthStatus {
856 command_bus_healthy: command_metrics.active_commands
857 < self.config.max_concurrent_commands as u64,
858 query_bus_healthy: query_metrics.active_queries
859 < self.config.max_concurrent_queries as u64,
860 read_models_healthy: read_model_metrics.projection_errors.is_empty(),
861 command_metrics,
862 query_metrics,
863 read_model_metrics,
864 }
865 }
866}
867
868#[derive(Debug, Clone)]
870pub struct CQRSHealthStatus {
871 pub command_bus_healthy: bool,
872 pub query_bus_healthy: bool,
873 pub read_models_healthy: bool,
874 pub command_metrics: CommandBusMetrics,
875 pub query_metrics: QueryBusMetrics,
876 pub read_model_metrics: ReadModelMetrics,
877}
878
879#[cfg(test)]
880mod tests {
881 use super::*;
882
883 #[derive(Debug, Clone)]
884 struct TestCommand {
885 id: Uuid,
886 aggregate_id: String,
887 data: String,
888 }
889
890 impl Command for TestCommand {
891 type AggregateId = String;
892 type EventType = String;
893
894 fn command_id(&self) -> Uuid {
895 self.id
896 }
897
898 fn aggregate_id(&self) -> Self::AggregateId {
899 self.aggregate_id.clone()
900 }
901
902 fn validate(&self) -> Result<()> {
903 if self.data.is_empty() {
904 return Err(anyhow!("Data cannot be empty"));
905 }
906 Ok(())
907 }
908
909 fn expected_version(&self) -> Option<u64> {
910 None
911 }
912 }
913
914 #[derive(Debug, Clone)]
915 struct TestQuery {
916 id: Uuid,
917 filter: String,
918 }
919
920 impl Query for TestQuery {
921 type Result = Vec<String>;
922
923 fn query_id(&self) -> Uuid {
924 self.id
925 }
926
927 fn validate(&self) -> Result<()> {
928 Ok(())
929 }
930
931 fn cache_key(&self) -> Option<String> {
932 Some(format!("test_query_{}", self.filter))
933 }
934
935 fn timeout_ms(&self) -> Option<u64> {
936 Some(5000)
937 }
938 }
939
940 #[tokio::test]
941 async fn test_cqrs_config_defaults() {
942 let config = CQRSConfig::default();
943 assert_eq!(config.command_timeout_ms, 30000);
944 assert_eq!(config.query_timeout_ms, 10000);
945 assert!(config.enable_command_validation);
946 assert!(config.enable_query_optimization);
947 }
948
949 #[tokio::test]
950 async fn test_command_validation() {
951 let valid_command = TestCommand {
952 id: Uuid::new_v4(),
953 aggregate_id: "test".to_string(),
954 data: "valid data".to_string(),
955 };
956
957 let invalid_command = TestCommand {
958 id: Uuid::new_v4(),
959 aggregate_id: "test".to_string(),
960 data: "".to_string(),
961 };
962
963 assert!(valid_command.validate().is_ok());
964 assert!(invalid_command.validate().is_err());
965 }
966
967 #[tokio::test]
968 async fn test_query_cache_key() {
969 let query = TestQuery {
970 id: Uuid::new_v4(),
971 filter: "status=active".to_string(),
972 };
973
974 assert_eq!(
975 query.cache_key(),
976 Some("test_query_status=active".to_string())
977 );
978 }
979}