1use crate::application::dto::QueryEventsRequest;
2use crate::application::services::pipeline::PipelineManager;
3use crate::application::services::projection::{
4 EntitySnapshotProjection, EventCounterProjection, ProjectionManager,
5};
6use crate::application::services::replay::ReplayManager;
7use crate::application::services::schema::{SchemaRegistry, SchemaRegistryConfig};
8use crate::domain::entities::Event;
9use crate::error::{AllSourceError, Result};
10use crate::infrastructure::observability::metrics::MetricsRegistry;
11use crate::infrastructure::persistence::compaction::{CompactionConfig, CompactionManager};
12use crate::infrastructure::persistence::index::{EventIndex, IndexEntry};
13use crate::infrastructure::persistence::snapshot::{SnapshotConfig, SnapshotManager, SnapshotType};
14use crate::infrastructure::persistence::storage::ParquetStorage;
15use crate::infrastructure::persistence::wal::{WALConfig, WriteAheadLog};
16use crate::infrastructure::web::websocket::WebSocketManager;
17use chrono::{DateTime, Utc};
18use dashmap::DashMap;
19use parking_lot::RwLock;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23pub struct EventStore {
25 events: Arc<RwLock<Vec<Event>>>,
27
28 index: Arc<EventIndex>,
30
31 pub(crate) projections: Arc<RwLock<ProjectionManager>>,
33
34 storage: Option<Arc<RwLock<ParquetStorage>>>,
36
37 websocket_manager: Arc<WebSocketManager>,
39
40 snapshot_manager: Arc<SnapshotManager>,
42
43 wal: Option<Arc<WriteAheadLog>>,
45
46 compaction_manager: Option<Arc<CompactionManager>>,
48
49 schema_registry: Arc<SchemaRegistry>,
51
52 replay_manager: Arc<ReplayManager>,
54
55 pipeline_manager: Arc<PipelineManager>,
57
58 metrics: Arc<MetricsRegistry>,
60
61 total_ingested: Arc<RwLock<u64>>,
63
64 projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
68}
69
70impl EventStore {
71 pub fn new() -> Self {
73 Self::with_config(EventStoreConfig::default())
74 }
75
76 pub fn with_config(config: EventStoreConfig) -> Self {
78 let mut projections = ProjectionManager::new();
79
80 projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
82 projections.register(Arc::new(EventCounterProjection::new("event_counters")));
83
84 let storage = config
86 .storage_dir
87 .as_ref()
88 .and_then(|dir| match ParquetStorage::new(dir) {
89 Ok(storage) => {
90 tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
91 Some(Arc::new(RwLock::new(storage)))
92 }
93 Err(e) => {
94 tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
95 None
96 }
97 });
98
99 let wal = config.wal_dir.as_ref().and_then(|dir| {
101 match WriteAheadLog::new(dir, config.wal_config.clone()) {
102 Ok(wal) => {
103 tracing::info!("✅ WAL enabled at: {}", dir.display());
104 Some(Arc::new(wal))
105 }
106 Err(e) => {
107 tracing::error!("❌ Failed to initialize WAL: {}", e);
108 None
109 }
110 }
111 });
112
113 let compaction_manager = config.storage_dir.as_ref().map(|dir| {
115 let manager = CompactionManager::new(dir, config.compaction_config.clone());
116 Arc::new(manager)
117 });
118
119 let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
121 tracing::info!("✅ Schema registry enabled");
122
123 let replay_manager = Arc::new(ReplayManager::new());
125 tracing::info!("✅ Replay manager enabled");
126
127 let pipeline_manager = Arc::new(PipelineManager::new());
129 tracing::info!("✅ Pipeline manager enabled");
130
131 let metrics = MetricsRegistry::new();
133 tracing::info!("✅ Prometheus metrics registry initialized");
134
135 let projection_state_cache = Arc::new(DashMap::new());
137 tracing::info!("✅ Projection state cache initialized");
138
139 let store = Self {
140 events: Arc::new(RwLock::new(Vec::new())),
141 index: Arc::new(EventIndex::new()),
142 projections: Arc::new(RwLock::new(projections)),
143 storage,
144 websocket_manager: Arc::new(WebSocketManager::new()),
145 snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
146 wal,
147 compaction_manager,
148 schema_registry,
149 replay_manager,
150 pipeline_manager,
151 metrics,
152 total_ingested: Arc::new(RwLock::new(0)),
153 projection_state_cache,
154 };
155
156 let mut wal_recovered = false;
158 if let Some(ref wal) = store.wal {
159 match wal.recover() {
160 Ok(recovered_events) if !recovered_events.is_empty() => {
161 tracing::info!(
162 "🔄 Recovering {} events from WAL...",
163 recovered_events.len()
164 );
165
166 for event in recovered_events {
167 let offset = store.events.read().len();
169 if let Err(e) = store.index.index_event(
170 event.id,
171 event.entity_id_str(),
172 event.event_type_str(),
173 event.timestamp,
174 offset,
175 ) {
176 tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
177 }
178
179 if let Err(e) = store.projections.read().process_event(&event) {
180 tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
181 }
182
183 store.events.write().push(event);
184 }
185
186 let total = store.events.read().len();
187 *store.total_ingested.write() = total as u64;
188 tracing::info!("✅ Successfully recovered {} events from WAL", total);
189
190 if store.storage.is_some() {
192 tracing::info!("📸 Checkpointing WAL to Parquet storage...");
193 if let Err(e) = store.flush_storage() {
194 tracing::error!("Failed to checkpoint to Parquet: {}", e);
195 } else if let Err(e) = wal.truncate() {
196 tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
197 } else {
198 tracing::info!("✅ WAL checkpointed and truncated");
199 }
200 }
201
202 wal_recovered = true;
203 }
204 Ok(_) => {
205 tracing::debug!("No events to recover from WAL");
206 }
207 Err(e) => {
208 tracing::error!("❌ WAL recovery failed: {}", e);
209 }
210 }
211 }
212
213 if !wal_recovered {
216 if let Some(ref storage) = store.storage {
217 if let Ok(persisted_events) = storage.read().load_all_events() {
218 tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
219
220 for event in persisted_events {
221 let offset = store.events.read().len();
223 if let Err(e) = store.index.index_event(
224 event.id,
225 event.entity_id_str(),
226 event.event_type_str(),
227 event.timestamp,
228 offset,
229 ) {
230 tracing::error!("Failed to re-index event {}: {}", event.id, e);
231 }
232
233 if let Err(e) = store.projections.read().process_event(&event) {
235 tracing::error!("Failed to re-process event {}: {}", event.id, e);
236 }
237
238 store.events.write().push(event);
239 }
240
241 let total = store.events.read().len();
242 *store.total_ingested.write() = total as u64;
243 tracing::info!("✅ Successfully loaded {} events from storage", total);
244 }
245 }
246 }
247
248 store
249 }
250
251 pub fn ingest(&self, event: Event) -> Result<()> {
253 let timer = self.metrics.ingestion_duration_seconds.start_timer();
255
256 let validation_result = self.validate_event(&event);
258 if let Err(e) = validation_result {
259 self.metrics.ingestion_errors_total.inc();
261 timer.observe_duration();
262 return Err(e);
263 }
264
265 if let Some(ref wal) = self.wal {
268 if let Err(e) = wal.append(event.clone()) {
269 self.metrics.ingestion_errors_total.inc();
270 timer.observe_duration();
271 return Err(e);
272 }
273 }
274
275 let mut events = self.events.write();
276 let offset = events.len();
277
278 self.index.index_event(
280 event.id,
281 event.entity_id_str(),
282 event.event_type_str(),
283 event.timestamp,
284 offset,
285 )?;
286
287 let projections = self.projections.read();
289 projections.process_event(&event)?;
290 drop(projections); let pipeline_results = self.pipeline_manager.process_event(&event);
295 if !pipeline_results.is_empty() {
296 tracing::debug!(
297 "Event {} processed by {} pipeline(s)",
298 event.id,
299 pipeline_results.len()
300 );
301 for (pipeline_id, result) in pipeline_results {
304 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
305 }
306 }
307
308 if let Some(ref storage) = self.storage {
310 let mut storage = storage.write();
311 storage.append_event(event.clone())?;
312 }
313
314 events.push(event.clone());
316 let total_events = events.len();
317 drop(events); self.websocket_manager
321 .broadcast_event(Arc::new(event.clone()));
322
323 self.check_auto_snapshot(event.entity_id_str(), &event);
325
326 self.metrics.events_ingested_total.inc();
328 self.metrics
329 .events_ingested_by_type
330 .with_label_values(&[event.event_type_str()])
331 .inc();
332 self.metrics.storage_events_total.set(total_events as i64);
333
334 let mut total = self.total_ingested.write();
336 *total += 1;
337
338 timer.observe_duration();
339
340 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
341
342 Ok(())
343 }
344
345 pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
347 Arc::clone(&self.websocket_manager)
348 }
349
350 pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
352 Arc::clone(&self.snapshot_manager)
353 }
354
355 pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
357 self.compaction_manager.as_ref().map(Arc::clone)
358 }
359
360 pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
362 Arc::clone(&self.schema_registry)
363 }
364
365 pub fn replay_manager(&self) -> Arc<ReplayManager> {
367 Arc::clone(&self.replay_manager)
368 }
369
370 pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
372 Arc::clone(&self.pipeline_manager)
373 }
374
375 pub fn metrics(&self) -> Arc<MetricsRegistry> {
377 Arc::clone(&self.metrics)
378 }
379
380 pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
382 self.projections.read()
383 }
384
385 pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
388 Arc::clone(&self.projection_state_cache)
389 }
390
391 pub fn flush_storage(&self) -> Result<()> {
393 if let Some(ref storage) = self.storage {
394 let mut storage = storage.write();
395 storage.flush()?;
396 tracing::info!("✅ Flushed events to persistent storage");
397 }
398 Ok(())
399 }
400
401 pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
403 let events = self.query(QueryEventsRequest {
405 entity_id: Some(entity_id.to_string()),
406 event_type: None,
407 tenant_id: None,
408 as_of: None,
409 since: None,
410 until: None,
411 limit: None,
412 })?;
413
414 if events.is_empty() {
415 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
416 }
417
418 let mut state = serde_json::json!({});
420 for event in &events {
421 if let serde_json::Value::Object(ref mut state_map) = state {
422 if let serde_json::Value::Object(ref payload_map) = event.payload {
423 for (key, value) in payload_map {
424 state_map.insert(key.clone(), value.clone());
425 }
426 }
427 }
428 }
429
430 let last_event = events.last().unwrap();
431 self.snapshot_manager.create_snapshot(
432 entity_id.to_string(),
433 state,
434 last_event.timestamp,
435 events.len(),
436 SnapshotType::Manual,
437 )?;
438
439 Ok(())
440 }
441
442 fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
444 let entity_event_count = self
446 .index
447 .get_by_entity(entity_id)
448 .map(|entries| entries.len())
449 .unwrap_or(0);
450
451 if self.snapshot_manager.should_create_snapshot(
452 entity_id,
453 entity_event_count,
454 event.timestamp,
455 ) {
456 if let Err(e) = self.create_snapshot(entity_id) {
458 tracing::warn!(
459 "Failed to create automatic snapshot for {}: {}",
460 entity_id,
461 e
462 );
463 }
464 }
465 }
466
467 fn validate_event(&self, event: &Event) -> Result<()> {
469 if event.entity_id_str().is_empty() {
472 return Err(AllSourceError::ValidationError(
473 "entity_id cannot be empty".to_string(),
474 ));
475 }
476
477 if event.event_type_str().is_empty() {
478 return Err(AllSourceError::ValidationError(
479 "event_type cannot be empty".to_string(),
480 ));
481 }
482
483 Ok(())
484 }
485
486 pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
488 let query_type = if request.entity_id.is_some() {
490 "entity"
491 } else if request.event_type.is_some() {
492 "type"
493 } else {
494 "full_scan"
495 };
496
497 let timer = self
499 .metrics
500 .query_duration_seconds
501 .with_label_values(&[query_type])
502 .start_timer();
503
504 self.metrics
506 .queries_total
507 .with_label_values(&[query_type])
508 .inc();
509
510 let events = self.events.read();
511
512 let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
514 self.index
516 .get_by_entity(entity_id)
517 .map(|entries| self.filter_entries(entries, &request))
518 .unwrap_or_default()
519 } else if let Some(event_type) = &request.event_type {
520 self.index
522 .get_by_type(event_type)
523 .map(|entries| self.filter_entries(entries, &request))
524 .unwrap_or_default()
525 } else {
526 (0..events.len()).collect()
528 };
529
530 let mut results: Vec<Event> = offsets
532 .iter()
533 .filter_map(|&offset| events.get(offset).cloned())
534 .filter(|event| self.apply_filters(event, &request))
535 .collect();
536
537 results.sort_by_key(|x| x.timestamp);
539
540 if let Some(limit) = request.limit {
542 results.truncate(limit);
543 }
544
545 self.metrics
547 .query_results_total
548 .with_label_values(&[query_type])
549 .inc_by(results.len() as u64);
550
551 timer.observe_duration();
552
553 Ok(results)
554 }
555
556 fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
558 entries
559 .into_iter()
560 .filter(|entry| {
561 if let Some(as_of) = request.as_of {
563 if entry.timestamp > as_of {
564 return false;
565 }
566 }
567 if let Some(since) = request.since {
568 if entry.timestamp < since {
569 return false;
570 }
571 }
572 if let Some(until) = request.until {
573 if entry.timestamp > until {
574 return false;
575 }
576 }
577 true
578 })
579 .map(|entry| entry.offset)
580 .collect()
581 }
582
583 fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
585 if request.entity_id.is_some() {
587 if let Some(ref event_type) = request.event_type {
588 if event.event_type_str() != event_type {
589 return false;
590 }
591 }
592 }
593
594 true
595 }
596
597 pub fn reconstruct_state(
600 &self,
601 entity_id: &str,
602 as_of: Option<DateTime<Utc>>,
603 ) -> Result<serde_json::Value> {
604 let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
606 if let Some(snapshot) = self
608 .snapshot_manager
609 .get_snapshot_as_of(entity_id, as_of_time)
610 {
611 tracing::debug!(
612 "Using snapshot from {} for entity {} (saved {} events)",
613 snapshot.as_of,
614 entity_id,
615 snapshot.event_count
616 );
617 (snapshot.state.clone(), Some(snapshot.as_of))
618 } else {
619 (serde_json::json!({}), None)
620 }
621 } else {
622 if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
624 tracing::debug!(
625 "Using latest snapshot from {} for entity {}",
626 snapshot.as_of,
627 entity_id
628 );
629 (snapshot.state.clone(), Some(snapshot.as_of))
630 } else {
631 (serde_json::json!({}), None)
632 }
633 };
634
635 let events = self.query(QueryEventsRequest {
637 entity_id: Some(entity_id.to_string()),
638 event_type: None,
639 tenant_id: None,
640 as_of,
641 since: since_timestamp,
642 until: None,
643 limit: None,
644 })?;
645
646 if events.is_empty() && since_timestamp.is_none() {
648 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
649 }
650
651 let mut merged_state = merged_state;
653 for event in &events {
654 if let serde_json::Value::Object(ref mut state_map) = merged_state {
655 if let serde_json::Value::Object(ref payload_map) = event.payload {
656 for (key, value) in payload_map {
657 state_map.insert(key.clone(), value.clone());
658 }
659 }
660 }
661 }
662
663 let state = serde_json::json!({
665 "entity_id": entity_id,
666 "last_updated": events.last().map(|e| e.timestamp),
667 "event_count": events.len(),
668 "as_of": as_of,
669 "current_state": merged_state,
670 "history": events.iter().map(|e| {
671 serde_json::json!({
672 "event_id": e.id,
673 "type": e.event_type,
674 "timestamp": e.timestamp,
675 "payload": e.payload
676 })
677 }).collect::<Vec<_>>()
678 });
679
680 Ok(state)
681 }
682
683 pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
685 let projections = self.projections.read();
686
687 if let Some(snapshot_projection) = projections.get_projection("entity_snapshots") {
688 if let Some(state) = snapshot_projection.get_state(entity_id) {
689 return Ok(serde_json::json!({
690 "entity_id": entity_id,
691 "snapshot": state,
692 "from_projection": "entity_snapshots"
693 }));
694 }
695 }
696
697 Err(AllSourceError::EntityNotFound(entity_id.to_string()))
698 }
699
700 pub fn stats(&self) -> StoreStats {
702 let events = self.events.read();
703 let index_stats = self.index.stats();
704
705 StoreStats {
706 total_events: events.len(),
707 total_entities: index_stats.total_entities,
708 total_event_types: index_stats.total_event_types,
709 total_ingested: *self.total_ingested.read(),
710 }
711 }
712}
713
714#[derive(Debug, Clone)]
716pub struct EventStoreConfig {
717 pub storage_dir: Option<PathBuf>,
719
720 pub snapshot_config: SnapshotConfig,
722
723 pub wal_dir: Option<PathBuf>,
725
726 pub wal_config: WALConfig,
728
729 pub compaction_config: CompactionConfig,
731
732 pub schema_registry_config: SchemaRegistryConfig,
734}
735
736impl Default for EventStoreConfig {
737 fn default() -> Self {
738 Self {
739 storage_dir: None,
740 snapshot_config: SnapshotConfig::default(),
741 wal_dir: None,
742 wal_config: WALConfig::default(),
743 compaction_config: CompactionConfig::default(),
744 schema_registry_config: SchemaRegistryConfig::default(),
745 }
746 }
747}
748
749impl EventStoreConfig {
750 pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
752 Self {
753 storage_dir: Some(storage_dir.into()),
754 snapshot_config: SnapshotConfig::default(),
755 wal_dir: None,
756 wal_config: WALConfig::default(),
757 compaction_config: CompactionConfig::default(),
758 schema_registry_config: SchemaRegistryConfig::default(),
759 }
760 }
761
762 pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
764 Self {
765 storage_dir: None,
766 snapshot_config,
767 wal_dir: None,
768 wal_config: WALConfig::default(),
769 compaction_config: CompactionConfig::default(),
770 schema_registry_config: SchemaRegistryConfig::default(),
771 }
772 }
773
774 pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
776 Self {
777 storage_dir: None,
778 snapshot_config: SnapshotConfig::default(),
779 wal_dir: Some(wal_dir.into()),
780 wal_config,
781 compaction_config: CompactionConfig::default(),
782 schema_registry_config: SchemaRegistryConfig::default(),
783 }
784 }
785
786 pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
788 Self {
789 storage_dir: Some(storage_dir.into()),
790 snapshot_config,
791 wal_dir: None,
792 wal_config: WALConfig::default(),
793 compaction_config: CompactionConfig::default(),
794 schema_registry_config: SchemaRegistryConfig::default(),
795 }
796 }
797
798 pub fn production(
800 storage_dir: impl Into<PathBuf>,
801 wal_dir: impl Into<PathBuf>,
802 snapshot_config: SnapshotConfig,
803 wal_config: WALConfig,
804 compaction_config: CompactionConfig,
805 ) -> Self {
806 Self {
807 storage_dir: Some(storage_dir.into()),
808 snapshot_config,
809 wal_dir: Some(wal_dir.into()),
810 wal_config,
811 compaction_config,
812 schema_registry_config: SchemaRegistryConfig::default(),
813 }
814 }
815}
816
817#[derive(Debug, serde::Serialize)]
818pub struct StoreStats {
819 pub total_events: usize,
820 pub total_entities: usize,
821 pub total_event_types: usize,
822 pub total_ingested: u64,
823}
824
825impl Default for EventStore {
826 fn default() -> Self {
827 Self::new()
828 }
829}
830
831#[cfg(test)]
832mod tests {
833 use super::*;
834 use crate::domain::entities::Event;
835 use tempfile::TempDir;
836
837 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
838 Event::from_strings(
839 event_type.to_string(),
840 entity_id.to_string(),
841 "default".to_string(),
842 serde_json::json!({"name": "Test", "value": 42}),
843 None,
844 )
845 .unwrap()
846 }
847
848 #[test]
849 fn test_event_store_new() {
850 let store = EventStore::new();
851 assert_eq!(store.stats().total_events, 0);
852 assert_eq!(store.stats().total_entities, 0);
853 }
854
855 #[test]
856 fn test_event_store_default() {
857 let store = EventStore::default();
858 assert_eq!(store.stats().total_events, 0);
859 }
860
861 #[test]
862 fn test_ingest_single_event() {
863 let store = EventStore::new();
864 let event = create_test_event("entity-1", "user.created");
865
866 store.ingest(event).unwrap();
867
868 assert_eq!(store.stats().total_events, 1);
869 assert_eq!(store.stats().total_ingested, 1);
870 }
871
872 #[test]
873 fn test_ingest_multiple_events() {
874 let store = EventStore::new();
875
876 for i in 0..10 {
877 let event = create_test_event(&format!("entity-{}", i), "user.created");
878 store.ingest(event).unwrap();
879 }
880
881 assert_eq!(store.stats().total_events, 10);
882 assert_eq!(store.stats().total_ingested, 10);
883 }
884
885 #[test]
886 fn test_query_by_entity_id() {
887 let store = EventStore::new();
888
889 store
890 .ingest(create_test_event("entity-1", "user.created"))
891 .unwrap();
892 store
893 .ingest(create_test_event("entity-2", "user.created"))
894 .unwrap();
895 store
896 .ingest(create_test_event("entity-1", "user.updated"))
897 .unwrap();
898
899 let results = store
900 .query(QueryEventsRequest {
901 entity_id: Some("entity-1".to_string()),
902 event_type: None,
903 tenant_id: None,
904 as_of: None,
905 since: None,
906 until: None,
907 limit: None,
908 })
909 .unwrap();
910
911 assert_eq!(results.len(), 2);
912 }
913
914 #[test]
915 fn test_query_by_event_type() {
916 let store = EventStore::new();
917
918 store
919 .ingest(create_test_event("entity-1", "user.created"))
920 .unwrap();
921 store
922 .ingest(create_test_event("entity-2", "user.updated"))
923 .unwrap();
924 store
925 .ingest(create_test_event("entity-3", "user.created"))
926 .unwrap();
927
928 let results = store
929 .query(QueryEventsRequest {
930 entity_id: None,
931 event_type: Some("user.created".to_string()),
932 tenant_id: None,
933 as_of: None,
934 since: None,
935 until: None,
936 limit: None,
937 })
938 .unwrap();
939
940 assert_eq!(results.len(), 2);
941 }
942
943 #[test]
944 fn test_query_with_limit() {
945 let store = EventStore::new();
946
947 for i in 0..10 {
948 let event = create_test_event(&format!("entity-{}", i), "user.created");
949 store.ingest(event).unwrap();
950 }
951
952 let results = store
953 .query(QueryEventsRequest {
954 entity_id: None,
955 event_type: None,
956 tenant_id: None,
957 as_of: None,
958 since: None,
959 until: None,
960 limit: Some(5),
961 })
962 .unwrap();
963
964 assert_eq!(results.len(), 5);
965 }
966
967 #[test]
968 fn test_query_empty_store() {
969 let store = EventStore::new();
970
971 let results = store
972 .query(QueryEventsRequest {
973 entity_id: Some("non-existent".to_string()),
974 event_type: None,
975 tenant_id: None,
976 as_of: None,
977 since: None,
978 until: None,
979 limit: None,
980 })
981 .unwrap();
982
983 assert!(results.is_empty());
984 }
985
986 #[test]
987 fn test_reconstruct_state() {
988 let store = EventStore::new();
989
990 store
991 .ingest(create_test_event("entity-1", "user.created"))
992 .unwrap();
993
994 let state = store.reconstruct_state("entity-1", None).unwrap();
995 assert_eq!(state["current_state"]["name"], "Test");
997 assert_eq!(state["current_state"]["value"], 42);
998 }
999
1000 #[test]
1001 fn test_reconstruct_state_not_found() {
1002 let store = EventStore::new();
1003
1004 let result = store.reconstruct_state("non-existent", None);
1005 assert!(result.is_err());
1006 }
1007
1008 #[test]
1009 fn test_get_snapshot_empty() {
1010 let store = EventStore::new();
1011
1012 let result = store.get_snapshot("non-existent");
1013 assert!(result.is_err());
1015 }
1016
1017 #[test]
1018 fn test_create_snapshot() {
1019 let store = EventStore::new();
1020
1021 store
1022 .ingest(create_test_event("entity-1", "user.created"))
1023 .unwrap();
1024
1025 store.create_snapshot("entity-1").unwrap();
1026
1027 let snapshot = store.get_snapshot("entity-1").unwrap();
1029 assert!(snapshot != serde_json::json!(null));
1030 }
1031
1032 #[test]
1033 fn test_create_snapshot_entity_not_found() {
1034 let store = EventStore::new();
1035
1036 let result = store.create_snapshot("non-existent");
1037 assert!(result.is_err());
1038 }
1039
1040 #[test]
1041 fn test_websocket_manager() {
1042 let store = EventStore::new();
1043 let manager = store.websocket_manager();
1044 assert!(Arc::strong_count(&manager) >= 1);
1046 }
1047
1048 #[test]
1049 fn test_snapshot_manager() {
1050 let store = EventStore::new();
1051 let manager = store.snapshot_manager();
1052 assert!(Arc::strong_count(&manager) >= 1);
1053 }
1054
1055 #[test]
1056 fn test_compaction_manager_none() {
1057 let store = EventStore::new();
1058 assert!(store.compaction_manager().is_none());
1060 }
1061
1062 #[test]
1063 fn test_schema_registry() {
1064 let store = EventStore::new();
1065 let registry = store.schema_registry();
1066 assert!(Arc::strong_count(®istry) >= 1);
1067 }
1068
1069 #[test]
1070 fn test_replay_manager() {
1071 let store = EventStore::new();
1072 let manager = store.replay_manager();
1073 assert!(Arc::strong_count(&manager) >= 1);
1074 }
1075
1076 #[test]
1077 fn test_pipeline_manager() {
1078 let store = EventStore::new();
1079 let manager = store.pipeline_manager();
1080 assert!(Arc::strong_count(&manager) >= 1);
1081 }
1082
1083 #[test]
1084 fn test_projection_manager() {
1085 let store = EventStore::new();
1086 let manager = store.projection_manager();
1087 let projections = manager.list_projections();
1089 assert!(projections.len() >= 2); }
1091
1092 #[test]
1093 fn test_projection_state_cache() {
1094 let store = EventStore::new();
1095 let cache = store.projection_state_cache();
1096
1097 cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
1098 assert_eq!(cache.len(), 1);
1099
1100 let value = cache.get("test:key").unwrap();
1101 assert_eq!(value["value"], 123);
1102 }
1103
1104 #[test]
1105 fn test_metrics() {
1106 let store = EventStore::new();
1107 let metrics = store.metrics();
1108 assert!(Arc::strong_count(&metrics) >= 1);
1109 }
1110
1111 #[test]
1112 fn test_store_stats() {
1113 let store = EventStore::new();
1114
1115 store
1116 .ingest(create_test_event("entity-1", "user.created"))
1117 .unwrap();
1118 store
1119 .ingest(create_test_event("entity-2", "order.placed"))
1120 .unwrap();
1121
1122 let stats = store.stats();
1123 assert_eq!(stats.total_events, 2);
1124 assert_eq!(stats.total_entities, 2);
1125 assert_eq!(stats.total_event_types, 2);
1126 assert_eq!(stats.total_ingested, 2);
1127 }
1128
1129 #[test]
1130 fn test_event_store_config_default() {
1131 let config = EventStoreConfig::default();
1132 assert!(config.storage_dir.is_none());
1133 assert!(config.wal_dir.is_none());
1134 }
1135
1136 #[test]
1137 fn test_event_store_config_with_persistence() {
1138 let temp_dir = TempDir::new().unwrap();
1139 let config = EventStoreConfig::with_persistence(temp_dir.path());
1140
1141 assert!(config.storage_dir.is_some());
1142 assert!(config.wal_dir.is_none());
1143 }
1144
1145 #[test]
1146 fn test_event_store_config_with_wal() {
1147 let temp_dir = TempDir::new().unwrap();
1148 let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
1149
1150 assert!(config.storage_dir.is_none());
1151 assert!(config.wal_dir.is_some());
1152 }
1153
1154 #[test]
1155 fn test_event_store_config_with_all() {
1156 let temp_dir = TempDir::new().unwrap();
1157 let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
1158
1159 assert!(config.storage_dir.is_some());
1160 }
1161
1162 #[test]
1163 fn test_event_store_config_production() {
1164 let storage_dir = TempDir::new().unwrap();
1165 let wal_dir = TempDir::new().unwrap();
1166 let config = EventStoreConfig::production(
1167 storage_dir.path(),
1168 wal_dir.path(),
1169 SnapshotConfig::default(),
1170 WALConfig::default(),
1171 CompactionConfig::default(),
1172 );
1173
1174 assert!(config.storage_dir.is_some());
1175 assert!(config.wal_dir.is_some());
1176 }
1177
1178 #[test]
1179 fn test_store_stats_serde() {
1180 let stats = StoreStats {
1181 total_events: 100,
1182 total_entities: 50,
1183 total_event_types: 10,
1184 total_ingested: 100,
1185 };
1186
1187 let json = serde_json::to_string(&stats).unwrap();
1188 assert!(json.contains("\"total_events\":100"));
1189 assert!(json.contains("\"total_entities\":50"));
1190 }
1191
1192 #[test]
1193 fn test_query_with_entity_and_type() {
1194 let store = EventStore::new();
1195
1196 store
1197 .ingest(create_test_event("entity-1", "user.created"))
1198 .unwrap();
1199 store
1200 .ingest(create_test_event("entity-1", "user.updated"))
1201 .unwrap();
1202 store
1203 .ingest(create_test_event("entity-2", "user.created"))
1204 .unwrap();
1205
1206 let results = store
1207 .query(QueryEventsRequest {
1208 entity_id: Some("entity-1".to_string()),
1209 event_type: Some("user.created".to_string()),
1210 tenant_id: None,
1211 as_of: None,
1212 since: None,
1213 until: None,
1214 limit: None,
1215 })
1216 .unwrap();
1217
1218 assert_eq!(results.len(), 1);
1219 assert_eq!(results[0].event_type_str(), "user.created");
1220 }
1221
1222 #[test]
1223 fn test_flush_storage_no_storage() {
1224 let store = EventStore::new();
1225 let result = store.flush_storage();
1227 assert!(result.is_ok());
1228 }
1229
1230 #[test]
1231 fn test_state_evolution() {
1232 let store = EventStore::new();
1233
1234 store
1236 .ingest(
1237 Event::from_strings(
1238 "user.created".to_string(),
1239 "user-1".to_string(),
1240 "default".to_string(),
1241 serde_json::json!({"name": "Alice", "age": 25}),
1242 None,
1243 )
1244 .unwrap(),
1245 )
1246 .unwrap();
1247
1248 store
1250 .ingest(
1251 Event::from_strings(
1252 "user.updated".to_string(),
1253 "user-1".to_string(),
1254 "default".to_string(),
1255 serde_json::json!({"age": 26}),
1256 None,
1257 )
1258 .unwrap(),
1259 )
1260 .unwrap();
1261
1262 let state = store.reconstruct_state("user-1", None).unwrap();
1263 assert_eq!(state["current_state"]["name"], "Alice");
1265 assert_eq!(state["current_state"]["age"], 26);
1266 }
1267}