1use crate::{
2 application::{
3 dto::QueryEventsRequest,
4 services::{
5 pipeline::PipelineManager,
6 projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
7 replay::ReplayManager,
8 schema::{SchemaRegistry, SchemaRegistryConfig},
9 },
10 },
11 domain::entities::Event,
12 error::{AllSourceError, Result},
13 infrastructure::{
14 observability::metrics::MetricsRegistry,
15 persistence::{
16 compaction::{CompactionConfig, CompactionManager},
17 index::{EventIndex, IndexEntry},
18 snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
19 storage::ParquetStorage,
20 wal::{WALConfig, WriteAheadLog},
21 },
22 web::websocket::WebSocketManager,
23 },
24};
25use chrono::{DateTime, Utc};
26use dashmap::DashMap;
27use parking_lot::RwLock;
28use std::{path::PathBuf, sync::Arc};
29
30pub struct EventStore {
32 events: Arc<RwLock<Vec<Event>>>,
34
35 index: Arc<EventIndex>,
37
38 pub(crate) projections: Arc<RwLock<ProjectionManager>>,
40
41 storage: Option<Arc<RwLock<ParquetStorage>>>,
43
44 websocket_manager: Arc<WebSocketManager>,
46
47 snapshot_manager: Arc<SnapshotManager>,
49
50 wal: Option<Arc<WriteAheadLog>>,
52
53 compaction_manager: Option<Arc<CompactionManager>>,
55
56 schema_registry: Arc<SchemaRegistry>,
58
59 replay_manager: Arc<ReplayManager>,
61
62 pipeline_manager: Arc<PipelineManager>,
64
65 metrics: Arc<MetricsRegistry>,
67
68 total_ingested: Arc<RwLock<u64>>,
70
71 projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
75}
76
77impl EventStore {
78 pub fn new() -> Self {
80 Self::with_config(EventStoreConfig::default())
81 }
82
83 pub fn with_config(config: EventStoreConfig) -> Self {
85 let mut projections = ProjectionManager::new();
86
87 projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
89 projections.register(Arc::new(EventCounterProjection::new("event_counters")));
90
91 let storage = config
93 .storage_dir
94 .as_ref()
95 .and_then(|dir| match ParquetStorage::new(dir) {
96 Ok(storage) => {
97 tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
98 Some(Arc::new(RwLock::new(storage)))
99 }
100 Err(e) => {
101 tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
102 None
103 }
104 });
105
106 let wal = config.wal_dir.as_ref().and_then(|dir| {
108 match WriteAheadLog::new(dir, config.wal_config.clone()) {
109 Ok(wal) => {
110 tracing::info!("✅ WAL enabled at: {}", dir.display());
111 Some(Arc::new(wal))
112 }
113 Err(e) => {
114 tracing::error!("❌ Failed to initialize WAL: {}", e);
115 None
116 }
117 }
118 });
119
120 let compaction_manager = config.storage_dir.as_ref().map(|dir| {
122 let manager = CompactionManager::new(dir, config.compaction_config.clone());
123 Arc::new(manager)
124 });
125
126 let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
128 tracing::info!("✅ Schema registry enabled");
129
130 let replay_manager = Arc::new(ReplayManager::new());
132 tracing::info!("✅ Replay manager enabled");
133
134 let pipeline_manager = Arc::new(PipelineManager::new());
136 tracing::info!("✅ Pipeline manager enabled");
137
138 let metrics = MetricsRegistry::new();
140 tracing::info!("✅ Prometheus metrics registry initialized");
141
142 let projection_state_cache = Arc::new(DashMap::new());
144 tracing::info!("✅ Projection state cache initialized");
145
146 let store = Self {
147 events: Arc::new(RwLock::new(Vec::new())),
148 index: Arc::new(EventIndex::new()),
149 projections: Arc::new(RwLock::new(projections)),
150 storage,
151 websocket_manager: Arc::new(WebSocketManager::new()),
152 snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
153 wal,
154 compaction_manager,
155 schema_registry,
156 replay_manager,
157 pipeline_manager,
158 metrics,
159 total_ingested: Arc::new(RwLock::new(0)),
160 projection_state_cache,
161 };
162
163 let mut wal_recovered = false;
165 if let Some(ref wal) = store.wal {
166 match wal.recover() {
167 Ok(recovered_events) if !recovered_events.is_empty() => {
168 tracing::info!(
169 "🔄 Recovering {} events from WAL...",
170 recovered_events.len()
171 );
172
173 for event in recovered_events {
174 let offset = store.events.read().len();
176 if let Err(e) = store.index.index_event(
177 event.id,
178 event.entity_id_str(),
179 event.event_type_str(),
180 event.timestamp,
181 offset,
182 ) {
183 tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
184 }
185
186 if let Err(e) = store.projections.read().process_event(&event) {
187 tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
188 }
189
190 store.events.write().push(event);
191 }
192
193 let total = store.events.read().len();
194 *store.total_ingested.write() = total as u64;
195 tracing::info!("✅ Successfully recovered {} events from WAL", total);
196
197 if store.storage.is_some() {
199 tracing::info!("📸 Checkpointing WAL to Parquet storage...");
200 if let Err(e) = store.flush_storage() {
201 tracing::error!("Failed to checkpoint to Parquet: {}", e);
202 } else if let Err(e) = wal.truncate() {
203 tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
204 } else {
205 tracing::info!("✅ WAL checkpointed and truncated");
206 }
207 }
208
209 wal_recovered = true;
210 }
211 Ok(_) => {
212 tracing::debug!("No events to recover from WAL");
213 }
214 Err(e) => {
215 tracing::error!("❌ WAL recovery failed: {}", e);
216 }
217 }
218 }
219
220 if !wal_recovered
223 && let Some(ref storage) = store.storage
224 && let Ok(persisted_events) = storage.read().load_all_events()
225 {
226 tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
227
228 for event in persisted_events {
229 let offset = store.events.read().len();
231 if let Err(e) = store.index.index_event(
232 event.id,
233 event.entity_id_str(),
234 event.event_type_str(),
235 event.timestamp,
236 offset,
237 ) {
238 tracing::error!("Failed to re-index event {}: {}", event.id, e);
239 }
240
241 if let Err(e) = store.projections.read().process_event(&event) {
243 tracing::error!("Failed to re-process event {}: {}", event.id, e);
244 }
245
246 store.events.write().push(event);
247 }
248
249 let total = store.events.read().len();
250 *store.total_ingested.write() = total as u64;
251 tracing::info!("✅ Successfully loaded {} events from storage", total);
252 }
253
254 store
255 }
256
257 pub fn ingest(&self, event: Event) -> Result<()> {
259 let timer = self.metrics.ingestion_duration_seconds.start_timer();
261
262 let validation_result = self.validate_event(&event);
264 if let Err(e) = validation_result {
265 self.metrics.ingestion_errors_total.inc();
267 timer.observe_duration();
268 return Err(e);
269 }
270
271 if let Some(ref wal) = self.wal
274 && let Err(e) = wal.append(event.clone())
275 {
276 self.metrics.ingestion_errors_total.inc();
277 timer.observe_duration();
278 return Err(e);
279 }
280
281 let mut events = self.events.write();
282 let offset = events.len();
283
284 self.index.index_event(
286 event.id,
287 event.entity_id_str(),
288 event.event_type_str(),
289 event.timestamp,
290 offset,
291 )?;
292
293 let projections = self.projections.read();
295 projections.process_event(&event)?;
296 drop(projections); let pipeline_results = self.pipeline_manager.process_event(&event);
301 if !pipeline_results.is_empty() {
302 tracing::debug!(
303 "Event {} processed by {} pipeline(s)",
304 event.id,
305 pipeline_results.len()
306 );
307 for (pipeline_id, result) in pipeline_results {
310 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
311 }
312 }
313
314 if let Some(ref storage) = self.storage {
316 let storage = storage.read();
317 storage.append_event(event.clone())?;
318 }
319
320 events.push(event.clone());
322 let total_events = events.len();
323 drop(events); self.websocket_manager
327 .broadcast_event(Arc::new(event.clone()));
328
329 self.check_auto_snapshot(event.entity_id_str(), &event);
331
332 self.metrics.events_ingested_total.inc();
334 self.metrics
335 .events_ingested_by_type
336 .with_label_values(&[event.event_type_str()])
337 .inc();
338 self.metrics.storage_events_total.set(total_events as i64);
339
340 let mut total = self.total_ingested.write();
342 *total += 1;
343
344 timer.observe_duration();
345
346 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
347
348 Ok(())
349 }
350
351 pub fn ingest_replicated(&self, event: Event) -> Result<()> {
358 let timer = self.metrics.ingestion_duration_seconds.start_timer();
359
360 let mut events = self.events.write();
361 let offset = events.len();
362
363 self.index.index_event(
365 event.id,
366 event.entity_id_str(),
367 event.event_type_str(),
368 event.timestamp,
369 offset,
370 )?;
371
372 let projections = self.projections.read();
374 projections.process_event(&event)?;
375 drop(projections);
376
377 let pipeline_results = self.pipeline_manager.process_event(&event);
379 if !pipeline_results.is_empty() {
380 tracing::debug!(
381 "Replicated event {} processed by {} pipeline(s)",
382 event.id,
383 pipeline_results.len()
384 );
385 }
386
387 events.push(event.clone());
389 let total_events = events.len();
390 drop(events);
391
392 self.websocket_manager
394 .broadcast_event(Arc::new(event.clone()));
395
396 self.metrics.events_ingested_total.inc();
398 self.metrics
399 .events_ingested_by_type
400 .with_label_values(&[event.event_type_str()])
401 .inc();
402 self.metrics.storage_events_total.set(total_events as i64);
403
404 let mut total = self.total_ingested.write();
405 *total += 1;
406
407 timer.observe_duration();
408
409 tracing::debug!(
410 "Replicated event ingested: {} (offset: {})",
411 event.id,
412 offset
413 );
414
415 Ok(())
416 }
417
418 pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
420 Arc::clone(&self.websocket_manager)
421 }
422
423 pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
425 Arc::clone(&self.snapshot_manager)
426 }
427
428 pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
430 self.compaction_manager.as_ref().map(Arc::clone)
431 }
432
433 pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
435 Arc::clone(&self.schema_registry)
436 }
437
438 pub fn replay_manager(&self) -> Arc<ReplayManager> {
440 Arc::clone(&self.replay_manager)
441 }
442
443 pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
445 Arc::clone(&self.pipeline_manager)
446 }
447
448 pub fn metrics(&self) -> Arc<MetricsRegistry> {
450 Arc::clone(&self.metrics)
451 }
452
453 pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
455 self.projections.read()
456 }
457
458 pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
461 Arc::clone(&self.projection_state_cache)
462 }
463
464 pub fn flush_storage(&self) -> Result<()> {
466 if let Some(ref storage) = self.storage {
467 let storage = storage.read();
468 storage.flush()?;
469 tracing::info!("✅ Flushed events to persistent storage");
470 }
471 Ok(())
472 }
473
474 pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
476 let events = self.query(QueryEventsRequest {
478 entity_id: Some(entity_id.to_string()),
479 event_type: None,
480 tenant_id: None,
481 as_of: None,
482 since: None,
483 until: None,
484 limit: None,
485 })?;
486
487 if events.is_empty() {
488 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
489 }
490
491 let mut state = serde_json::json!({});
493 for event in &events {
494 if let serde_json::Value::Object(ref mut state_map) = state
495 && let serde_json::Value::Object(ref payload_map) = event.payload
496 {
497 for (key, value) in payload_map {
498 state_map.insert(key.clone(), value.clone());
499 }
500 }
501 }
502
503 let last_event = events.last().unwrap();
504 self.snapshot_manager.create_snapshot(
505 entity_id.to_string(),
506 state,
507 last_event.timestamp,
508 events.len(),
509 SnapshotType::Manual,
510 )?;
511
512 Ok(())
513 }
514
515 fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
517 let entity_event_count = self
519 .index
520 .get_by_entity(entity_id)
521 .map(|entries| entries.len())
522 .unwrap_or(0);
523
524 if self.snapshot_manager.should_create_snapshot(
525 entity_id,
526 entity_event_count,
527 event.timestamp,
528 ) {
529 if let Err(e) = self.create_snapshot(entity_id) {
531 tracing::warn!(
532 "Failed to create automatic snapshot for {}: {}",
533 entity_id,
534 e
535 );
536 }
537 }
538 }
539
540 fn validate_event(&self, event: &Event) -> Result<()> {
542 if event.entity_id_str().is_empty() {
545 return Err(AllSourceError::ValidationError(
546 "entity_id cannot be empty".to_string(),
547 ));
548 }
549
550 if event.event_type_str().is_empty() {
551 return Err(AllSourceError::ValidationError(
552 "event_type cannot be empty".to_string(),
553 ));
554 }
555
556 if event.event_type().is_system() {
559 return Err(AllSourceError::ValidationError(
560 "Event types starting with '_system.' are reserved for internal use".to_string(),
561 ));
562 }
563
564 Ok(())
565 }
566
567 pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
569 let query_type = if request.entity_id.is_some() {
571 "entity"
572 } else if request.event_type.is_some() {
573 "type"
574 } else {
575 "full_scan"
576 };
577
578 let timer = self
580 .metrics
581 .query_duration_seconds
582 .with_label_values(&[query_type])
583 .start_timer();
584
585 self.metrics
587 .queries_total
588 .with_label_values(&[query_type])
589 .inc();
590
591 let events = self.events.read();
592
593 let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
595 self.index
597 .get_by_entity(entity_id)
598 .map(|entries| self.filter_entries(entries, &request))
599 .unwrap_or_default()
600 } else if let Some(event_type) = &request.event_type {
601 self.index
603 .get_by_type(event_type)
604 .map(|entries| self.filter_entries(entries, &request))
605 .unwrap_or_default()
606 } else {
607 (0..events.len()).collect()
609 };
610
611 let mut results: Vec<Event> = offsets
613 .iter()
614 .filter_map(|&offset| events.get(offset).cloned())
615 .filter(|event| self.apply_filters(event, &request))
616 .collect();
617
618 results.sort_by_key(|x| x.timestamp);
620
621 if let Some(limit) = request.limit {
623 results.truncate(limit);
624 }
625
626 self.metrics
628 .query_results_total
629 .with_label_values(&[query_type])
630 .inc_by(results.len() as u64);
631
632 timer.observe_duration();
633
634 Ok(results)
635 }
636
637 fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
639 entries
640 .into_iter()
641 .filter(|entry| {
642 if let Some(as_of) = request.as_of
644 && entry.timestamp > as_of
645 {
646 return false;
647 }
648 if let Some(since) = request.since
649 && entry.timestamp < since
650 {
651 return false;
652 }
653 if let Some(until) = request.until
654 && entry.timestamp > until
655 {
656 return false;
657 }
658 true
659 })
660 .map(|entry| entry.offset)
661 .collect()
662 }
663
664 fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
666 if request.entity_id.is_some()
668 && let Some(ref event_type) = request.event_type
669 && event.event_type_str() != event_type
670 {
671 return false;
672 }
673
674 true
675 }
676
677 pub fn reconstruct_state(
680 &self,
681 entity_id: &str,
682 as_of: Option<DateTime<Utc>>,
683 ) -> Result<serde_json::Value> {
684 let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
686 if let Some(snapshot) = self
688 .snapshot_manager
689 .get_snapshot_as_of(entity_id, as_of_time)
690 {
691 tracing::debug!(
692 "Using snapshot from {} for entity {} (saved {} events)",
693 snapshot.as_of,
694 entity_id,
695 snapshot.event_count
696 );
697 (snapshot.state.clone(), Some(snapshot.as_of))
698 } else {
699 (serde_json::json!({}), None)
700 }
701 } else {
702 if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
704 tracing::debug!(
705 "Using latest snapshot from {} for entity {}",
706 snapshot.as_of,
707 entity_id
708 );
709 (snapshot.state.clone(), Some(snapshot.as_of))
710 } else {
711 (serde_json::json!({}), None)
712 }
713 };
714
715 let events = self.query(QueryEventsRequest {
717 entity_id: Some(entity_id.to_string()),
718 event_type: None,
719 tenant_id: None,
720 as_of,
721 since: since_timestamp,
722 until: None,
723 limit: None,
724 })?;
725
726 if events.is_empty() && since_timestamp.is_none() {
728 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
729 }
730
731 let mut merged_state = merged_state;
733 for event in &events {
734 if let serde_json::Value::Object(ref mut state_map) = merged_state
735 && let serde_json::Value::Object(ref payload_map) = event.payload
736 {
737 for (key, value) in payload_map {
738 state_map.insert(key.clone(), value.clone());
739 }
740 }
741 }
742
743 let state = serde_json::json!({
745 "entity_id": entity_id,
746 "last_updated": events.last().map(|e| e.timestamp),
747 "event_count": events.len(),
748 "as_of": as_of,
749 "current_state": merged_state,
750 "history": events.iter().map(|e| {
751 serde_json::json!({
752 "event_id": e.id,
753 "type": e.event_type,
754 "timestamp": e.timestamp,
755 "payload": e.payload
756 })
757 }).collect::<Vec<_>>()
758 });
759
760 Ok(state)
761 }
762
763 pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
765 let projections = self.projections.read();
766
767 if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
768 && let Some(state) = snapshot_projection.get_state(entity_id)
769 {
770 return Ok(serde_json::json!({
771 "entity_id": entity_id,
772 "snapshot": state,
773 "from_projection": "entity_snapshots"
774 }));
775 }
776
777 Err(AllSourceError::EntityNotFound(entity_id.to_string()))
778 }
779
780 pub fn stats(&self) -> StoreStats {
782 let events = self.events.read();
783 let index_stats = self.index.stats();
784
785 StoreStats {
786 total_events: events.len(),
787 total_entities: index_stats.total_entities,
788 total_event_types: index_stats.total_event_types,
789 total_ingested: *self.total_ingested.read(),
790 }
791 }
792
793 pub fn list_streams(&self) -> Vec<StreamInfo> {
795 self.index
796 .get_all_entities()
797 .into_iter()
798 .map(|entity_id| {
799 let event_count = self
800 .index
801 .get_by_entity(&entity_id)
802 .map(|entries| entries.len())
803 .unwrap_or(0);
804 let last_event_at = self
805 .index
806 .get_by_entity(&entity_id)
807 .and_then(|entries| entries.last().map(|e| e.timestamp));
808 StreamInfo {
809 stream_id: entity_id,
810 event_count,
811 last_event_at,
812 }
813 })
814 .collect()
815 }
816
817 pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
819 self.index
820 .get_all_types()
821 .into_iter()
822 .map(|event_type| {
823 let event_count = self
824 .index
825 .get_by_type(&event_type)
826 .map(|entries| entries.len())
827 .unwrap_or(0);
828 let last_event_at = self
829 .index
830 .get_by_type(&event_type)
831 .and_then(|entries| entries.last().map(|e| e.timestamp));
832 EventTypeInfo {
833 event_type,
834 event_count,
835 last_event_at,
836 }
837 })
838 .collect()
839 }
840
841 pub fn enable_wal_replication(
848 &self,
849 tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
850 ) {
851 if let Some(ref wal_arc) = self.wal {
852 wal_arc.set_replication_tx(tx);
853 tracing::info!("WAL replication broadcast enabled");
854 } else {
855 tracing::warn!("Cannot enable WAL replication: WAL is not configured");
856 }
857 }
858
859 pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
862 self.wal.as_ref()
863 }
864
865 pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
868 self.storage.as_ref()
869 }
870}
871
872#[derive(Debug, Clone, Default)]
874pub struct EventStoreConfig {
875 pub storage_dir: Option<PathBuf>,
877
878 pub snapshot_config: SnapshotConfig,
880
881 pub wal_dir: Option<PathBuf>,
883
884 pub wal_config: WALConfig,
886
887 pub compaction_config: CompactionConfig,
889
890 pub schema_registry_config: SchemaRegistryConfig,
892
893 pub system_data_dir: Option<PathBuf>,
898
899 pub bootstrap_tenant: Option<String>,
901}
902
903impl EventStoreConfig {
904 pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
906 Self {
907 storage_dir: Some(storage_dir.into()),
908 ..Self::default()
909 }
910 }
911
912 pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
914 Self {
915 snapshot_config,
916 ..Self::default()
917 }
918 }
919
920 pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
922 Self {
923 wal_dir: Some(wal_dir.into()),
924 wal_config,
925 ..Self::default()
926 }
927 }
928
929 pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
931 Self {
932 storage_dir: Some(storage_dir.into()),
933 snapshot_config,
934 ..Self::default()
935 }
936 }
937
938 pub fn production(
940 storage_dir: impl Into<PathBuf>,
941 wal_dir: impl Into<PathBuf>,
942 snapshot_config: SnapshotConfig,
943 wal_config: WALConfig,
944 compaction_config: CompactionConfig,
945 ) -> Self {
946 let storage_dir = storage_dir.into();
947 let system_data_dir = storage_dir.join("__system");
948 Self {
949 storage_dir: Some(storage_dir),
950 snapshot_config,
951 wal_dir: Some(wal_dir.into()),
952 wal_config,
953 compaction_config,
954 system_data_dir: Some(system_data_dir),
955 ..Self::default()
956 }
957 }
958
959 pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
964 self.system_data_dir
965 .clone()
966 .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
967 }
968}
969
970#[derive(Debug, serde::Serialize)]
971pub struct StoreStats {
972 pub total_events: usize,
973 pub total_entities: usize,
974 pub total_event_types: usize,
975 pub total_ingested: u64,
976}
977
978#[derive(Debug, Clone, serde::Serialize)]
980pub struct StreamInfo {
981 pub stream_id: String,
983 pub event_count: usize,
985 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
987}
988
989#[derive(Debug, Clone, serde::Serialize)]
991pub struct EventTypeInfo {
992 pub event_type: String,
994 pub event_count: usize,
996 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
998}
999
1000impl Default for EventStore {
1001 fn default() -> Self {
1002 Self::new()
1003 }
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008 use super::*;
1009 use crate::domain::entities::Event;
1010 use tempfile::TempDir;
1011
1012 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1013 Event::from_strings(
1014 event_type.to_string(),
1015 entity_id.to_string(),
1016 "default".to_string(),
1017 serde_json::json!({"name": "Test", "value": 42}),
1018 None,
1019 )
1020 .unwrap()
1021 }
1022
1023 #[test]
1024 fn test_event_store_new() {
1025 let store = EventStore::new();
1026 assert_eq!(store.stats().total_events, 0);
1027 assert_eq!(store.stats().total_entities, 0);
1028 }
1029
1030 #[test]
1031 fn test_event_store_default() {
1032 let store = EventStore::default();
1033 assert_eq!(store.stats().total_events, 0);
1034 }
1035
1036 #[test]
1037 fn test_ingest_single_event() {
1038 let store = EventStore::new();
1039 let event = create_test_event("entity-1", "user.created");
1040
1041 store.ingest(event).unwrap();
1042
1043 assert_eq!(store.stats().total_events, 1);
1044 assert_eq!(store.stats().total_ingested, 1);
1045 }
1046
1047 #[test]
1048 fn test_ingest_multiple_events() {
1049 let store = EventStore::new();
1050
1051 for i in 0..10 {
1052 let event = create_test_event(&format!("entity-{}", i), "user.created");
1053 store.ingest(event).unwrap();
1054 }
1055
1056 assert_eq!(store.stats().total_events, 10);
1057 assert_eq!(store.stats().total_ingested, 10);
1058 }
1059
1060 #[test]
1061 fn test_query_by_entity_id() {
1062 let store = EventStore::new();
1063
1064 store
1065 .ingest(create_test_event("entity-1", "user.created"))
1066 .unwrap();
1067 store
1068 .ingest(create_test_event("entity-2", "user.created"))
1069 .unwrap();
1070 store
1071 .ingest(create_test_event("entity-1", "user.updated"))
1072 .unwrap();
1073
1074 let results = store
1075 .query(QueryEventsRequest {
1076 entity_id: Some("entity-1".to_string()),
1077 event_type: None,
1078 tenant_id: None,
1079 as_of: None,
1080 since: None,
1081 until: None,
1082 limit: None,
1083 })
1084 .unwrap();
1085
1086 assert_eq!(results.len(), 2);
1087 }
1088
1089 #[test]
1090 fn test_query_by_event_type() {
1091 let store = EventStore::new();
1092
1093 store
1094 .ingest(create_test_event("entity-1", "user.created"))
1095 .unwrap();
1096 store
1097 .ingest(create_test_event("entity-2", "user.updated"))
1098 .unwrap();
1099 store
1100 .ingest(create_test_event("entity-3", "user.created"))
1101 .unwrap();
1102
1103 let results = store
1104 .query(QueryEventsRequest {
1105 entity_id: None,
1106 event_type: Some("user.created".to_string()),
1107 tenant_id: None,
1108 as_of: None,
1109 since: None,
1110 until: None,
1111 limit: None,
1112 })
1113 .unwrap();
1114
1115 assert_eq!(results.len(), 2);
1116 }
1117
1118 #[test]
1119 fn test_query_with_limit() {
1120 let store = EventStore::new();
1121
1122 for i in 0..10 {
1123 let event = create_test_event(&format!("entity-{}", i), "user.created");
1124 store.ingest(event).unwrap();
1125 }
1126
1127 let results = store
1128 .query(QueryEventsRequest {
1129 entity_id: None,
1130 event_type: None,
1131 tenant_id: None,
1132 as_of: None,
1133 since: None,
1134 until: None,
1135 limit: Some(5),
1136 })
1137 .unwrap();
1138
1139 assert_eq!(results.len(), 5);
1140 }
1141
1142 #[test]
1143 fn test_query_empty_store() {
1144 let store = EventStore::new();
1145
1146 let results = store
1147 .query(QueryEventsRequest {
1148 entity_id: Some("non-existent".to_string()),
1149 event_type: None,
1150 tenant_id: None,
1151 as_of: None,
1152 since: None,
1153 until: None,
1154 limit: None,
1155 })
1156 .unwrap();
1157
1158 assert!(results.is_empty());
1159 }
1160
1161 #[test]
1162 fn test_reconstruct_state() {
1163 let store = EventStore::new();
1164
1165 store
1166 .ingest(create_test_event("entity-1", "user.created"))
1167 .unwrap();
1168
1169 let state = store.reconstruct_state("entity-1", None).unwrap();
1170 assert_eq!(state["current_state"]["name"], "Test");
1172 assert_eq!(state["current_state"]["value"], 42);
1173 }
1174
1175 #[test]
1176 fn test_reconstruct_state_not_found() {
1177 let store = EventStore::new();
1178
1179 let result = store.reconstruct_state("non-existent", None);
1180 assert!(result.is_err());
1181 }
1182
1183 #[test]
1184 fn test_get_snapshot_empty() {
1185 let store = EventStore::new();
1186
1187 let result = store.get_snapshot("non-existent");
1188 assert!(result.is_err());
1190 }
1191
1192 #[test]
1193 fn test_create_snapshot() {
1194 let store = EventStore::new();
1195
1196 store
1197 .ingest(create_test_event("entity-1", "user.created"))
1198 .unwrap();
1199
1200 store.create_snapshot("entity-1").unwrap();
1201
1202 let snapshot = store.get_snapshot("entity-1").unwrap();
1204 assert!(snapshot != serde_json::json!(null));
1205 }
1206
1207 #[test]
1208 fn test_create_snapshot_entity_not_found() {
1209 let store = EventStore::new();
1210
1211 let result = store.create_snapshot("non-existent");
1212 assert!(result.is_err());
1213 }
1214
1215 #[test]
1216 fn test_websocket_manager() {
1217 let store = EventStore::new();
1218 let manager = store.websocket_manager();
1219 assert!(Arc::strong_count(&manager) >= 1);
1221 }
1222
1223 #[test]
1224 fn test_snapshot_manager() {
1225 let store = EventStore::new();
1226 let manager = store.snapshot_manager();
1227 assert!(Arc::strong_count(&manager) >= 1);
1228 }
1229
1230 #[test]
1231 fn test_compaction_manager_none() {
1232 let store = EventStore::new();
1233 assert!(store.compaction_manager().is_none());
1235 }
1236
1237 #[test]
1238 fn test_schema_registry() {
1239 let store = EventStore::new();
1240 let registry = store.schema_registry();
1241 assert!(Arc::strong_count(®istry) >= 1);
1242 }
1243
1244 #[test]
1245 fn test_replay_manager() {
1246 let store = EventStore::new();
1247 let manager = store.replay_manager();
1248 assert!(Arc::strong_count(&manager) >= 1);
1249 }
1250
1251 #[test]
1252 fn test_pipeline_manager() {
1253 let store = EventStore::new();
1254 let manager = store.pipeline_manager();
1255 assert!(Arc::strong_count(&manager) >= 1);
1256 }
1257
1258 #[test]
1259 fn test_projection_manager() {
1260 let store = EventStore::new();
1261 let manager = store.projection_manager();
1262 let projections = manager.list_projections();
1264 assert!(projections.len() >= 2); }
1266
1267 #[test]
1268 fn test_projection_state_cache() {
1269 let store = EventStore::new();
1270 let cache = store.projection_state_cache();
1271
1272 cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
1273 assert_eq!(cache.len(), 1);
1274
1275 let value = cache.get("test:key").unwrap();
1276 assert_eq!(value["value"], 123);
1277 }
1278
1279 #[test]
1280 fn test_metrics() {
1281 let store = EventStore::new();
1282 let metrics = store.metrics();
1283 assert!(Arc::strong_count(&metrics) >= 1);
1284 }
1285
1286 #[test]
1287 fn test_store_stats() {
1288 let store = EventStore::new();
1289
1290 store
1291 .ingest(create_test_event("entity-1", "user.created"))
1292 .unwrap();
1293 store
1294 .ingest(create_test_event("entity-2", "order.placed"))
1295 .unwrap();
1296
1297 let stats = store.stats();
1298 assert_eq!(stats.total_events, 2);
1299 assert_eq!(stats.total_entities, 2);
1300 assert_eq!(stats.total_event_types, 2);
1301 assert_eq!(stats.total_ingested, 2);
1302 }
1303
1304 #[test]
1305 fn test_event_store_config_default() {
1306 let config = EventStoreConfig::default();
1307 assert!(config.storage_dir.is_none());
1308 assert!(config.wal_dir.is_none());
1309 }
1310
1311 #[test]
1312 fn test_event_store_config_with_persistence() {
1313 let temp_dir = TempDir::new().unwrap();
1314 let config = EventStoreConfig::with_persistence(temp_dir.path());
1315
1316 assert!(config.storage_dir.is_some());
1317 assert!(config.wal_dir.is_none());
1318 }
1319
1320 #[test]
1321 fn test_event_store_config_with_wal() {
1322 let temp_dir = TempDir::new().unwrap();
1323 let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
1324
1325 assert!(config.storage_dir.is_none());
1326 assert!(config.wal_dir.is_some());
1327 }
1328
1329 #[test]
1330 fn test_event_store_config_with_all() {
1331 let temp_dir = TempDir::new().unwrap();
1332 let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
1333
1334 assert!(config.storage_dir.is_some());
1335 }
1336
1337 #[test]
1338 fn test_event_store_config_production() {
1339 let storage_dir = TempDir::new().unwrap();
1340 let wal_dir = TempDir::new().unwrap();
1341 let config = EventStoreConfig::production(
1342 storage_dir.path(),
1343 wal_dir.path(),
1344 SnapshotConfig::default(),
1345 WALConfig::default(),
1346 CompactionConfig::default(),
1347 );
1348
1349 assert!(config.storage_dir.is_some());
1350 assert!(config.wal_dir.is_some());
1351 }
1352
1353 #[test]
1354 fn test_store_stats_serde() {
1355 let stats = StoreStats {
1356 total_events: 100,
1357 total_entities: 50,
1358 total_event_types: 10,
1359 total_ingested: 100,
1360 };
1361
1362 let json = serde_json::to_string(&stats).unwrap();
1363 assert!(json.contains("\"total_events\":100"));
1364 assert!(json.contains("\"total_entities\":50"));
1365 }
1366
1367 #[test]
1368 fn test_query_with_entity_and_type() {
1369 let store = EventStore::new();
1370
1371 store
1372 .ingest(create_test_event("entity-1", "user.created"))
1373 .unwrap();
1374 store
1375 .ingest(create_test_event("entity-1", "user.updated"))
1376 .unwrap();
1377 store
1378 .ingest(create_test_event("entity-2", "user.created"))
1379 .unwrap();
1380
1381 let results = store
1382 .query(QueryEventsRequest {
1383 entity_id: Some("entity-1".to_string()),
1384 event_type: Some("user.created".to_string()),
1385 tenant_id: None,
1386 as_of: None,
1387 since: None,
1388 until: None,
1389 limit: None,
1390 })
1391 .unwrap();
1392
1393 assert_eq!(results.len(), 1);
1394 assert_eq!(results[0].event_type_str(), "user.created");
1395 }
1396
1397 #[test]
1398 fn test_flush_storage_no_storage() {
1399 let store = EventStore::new();
1400 let result = store.flush_storage();
1402 assert!(result.is_ok());
1403 }
1404
1405 #[test]
1406 fn test_state_evolution() {
1407 let store = EventStore::new();
1408
1409 store
1411 .ingest(
1412 Event::from_strings(
1413 "user.created".to_string(),
1414 "user-1".to_string(),
1415 "default".to_string(),
1416 serde_json::json!({"name": "Alice", "age": 25}),
1417 None,
1418 )
1419 .unwrap(),
1420 )
1421 .unwrap();
1422
1423 store
1425 .ingest(
1426 Event::from_strings(
1427 "user.updated".to_string(),
1428 "user-1".to_string(),
1429 "default".to_string(),
1430 serde_json::json!({"age": 26}),
1431 None,
1432 )
1433 .unwrap(),
1434 )
1435 .unwrap();
1436
1437 let state = store.reconstruct_state("user-1", None).unwrap();
1438 assert_eq!(state["current_state"]["name"], "Alice");
1440 assert_eq!(state["current_state"]["age"], 26);
1441 }
1442
1443 #[test]
1444 fn test_reject_system_event_types() {
1445 let store = EventStore::new();
1446
1447 let event = Event::reconstruct_from_strings(
1449 uuid::Uuid::new_v4(),
1450 "_system.tenant.created".to_string(),
1451 "_system:tenant:acme".to_string(),
1452 "_system".to_string(),
1453 serde_json::json!({"name": "ACME"}),
1454 chrono::Utc::now(),
1455 None,
1456 1,
1457 );
1458
1459 let result = store.ingest(event);
1460 assert!(result.is_err());
1461 let err = result.unwrap_err();
1462 assert!(
1463 err.to_string().contains("reserved for internal use"),
1464 "Expected system namespace rejection, got: {}",
1465 err
1466 );
1467 }
1468}