1#[cfg(feature = "server")]
2use crate::application::services::webhook::WebhookRegistry;
3#[cfg(feature = "server")]
4use crate::infrastructure::observability::metrics::MetricsRegistry;
5#[cfg(feature = "server")]
6use crate::infrastructure::web::websocket::WebSocketManager;
7use crate::{
8 application::{
9 dto::QueryEventsRequest,
10 services::{
11 exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
12 pipeline::PipelineManager,
13 projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
14 replay::ReplayManager,
15 schema::{SchemaRegistry, SchemaRegistryConfig},
16 schema_evolution::SchemaEvolutionManager,
17 },
18 },
19 domain::entities::Event,
20 error::{AllSourceError, Result},
21 infrastructure::{
22 persistence::{
23 compaction::{CompactionConfig, CompactionManager},
24 index::{EventIndex, IndexEntry},
25 snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
26 storage::ParquetStorage,
27 wal::{WALConfig, WriteAheadLog},
28 },
29 query::geospatial::GeoIndex,
30 },
31};
32use chrono::{DateTime, Utc};
33use dashmap::DashMap;
34use parking_lot::RwLock;
35use std::{path::PathBuf, sync::Arc};
36#[cfg(feature = "server")]
37use tokio::sync::mpsc;
38
39pub struct EventStore {
41 events: Arc<RwLock<Vec<Event>>>,
43
44 index: Arc<EventIndex>,
46
47 pub(crate) projections: Arc<RwLock<ProjectionManager>>,
49
50 storage: Option<Arc<RwLock<ParquetStorage>>>,
52
53 #[cfg(feature = "server")]
55 websocket_manager: Arc<WebSocketManager>,
56
57 snapshot_manager: Arc<SnapshotManager>,
59
60 wal: Option<Arc<WriteAheadLog>>,
62
63 compaction_manager: Option<Arc<CompactionManager>>,
65
66 schema_registry: Arc<SchemaRegistry>,
68
69 replay_manager: Arc<ReplayManager>,
71
72 pipeline_manager: Arc<PipelineManager>,
74
75 #[cfg(feature = "server")]
77 metrics: Arc<MetricsRegistry>,
78
79 total_ingested: Arc<RwLock<u64>>,
81
82 projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
86
87 #[cfg(feature = "server")]
89 webhook_registry: Arc<WebhookRegistry>,
90
91 #[cfg(feature = "server")]
93 webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
94
95 geo_index: Arc<GeoIndex>,
97
98 exactly_once: Arc<ExactlyOnceRegistry>,
100
101 schema_evolution: Arc<SchemaEvolutionManager>,
103}
104
105#[cfg(feature = "server")]
107#[derive(Debug, Clone)]
108pub struct WebhookDeliveryTask {
109 pub webhook: crate::application::services::webhook::WebhookSubscription,
110 pub event: Event,
111}
112
113impl EventStore {
114 pub fn new() -> Self {
116 Self::with_config(EventStoreConfig::default())
117 }
118
119 pub fn with_config(config: EventStoreConfig) -> Self {
121 let mut projections = ProjectionManager::new();
122
123 projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
125 projections.register(Arc::new(EventCounterProjection::new("event_counters")));
126
127 let storage = config
129 .storage_dir
130 .as_ref()
131 .and_then(|dir| match ParquetStorage::new(dir) {
132 Ok(storage) => {
133 tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
134 Some(Arc::new(RwLock::new(storage)))
135 }
136 Err(e) => {
137 tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
138 None
139 }
140 });
141
142 let wal = config.wal_dir.as_ref().and_then(|dir| {
144 match WriteAheadLog::new(dir, config.wal_config.clone()) {
145 Ok(wal) => {
146 tracing::info!("✅ WAL enabled at: {}", dir.display());
147 Some(Arc::new(wal))
148 }
149 Err(e) => {
150 tracing::error!("❌ Failed to initialize WAL: {}", e);
151 None
152 }
153 }
154 });
155
156 let compaction_manager = config.storage_dir.as_ref().map(|dir| {
158 let manager = CompactionManager::new(dir, config.compaction_config.clone());
159 Arc::new(manager)
160 });
161
162 let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
164 tracing::info!("✅ Schema registry enabled");
165
166 let replay_manager = Arc::new(ReplayManager::new());
168 tracing::info!("✅ Replay manager enabled");
169
170 let pipeline_manager = Arc::new(PipelineManager::new());
172 tracing::info!("✅ Pipeline manager enabled");
173
174 #[cfg(feature = "server")]
176 let metrics = {
177 let m = MetricsRegistry::new();
178 tracing::info!("✅ Prometheus metrics registry initialized");
179 m
180 };
181
182 let projection_state_cache = Arc::new(DashMap::new());
184 tracing::info!("✅ Projection state cache initialized");
185
186 #[cfg(feature = "server")]
188 let webhook_registry = {
189 let w = Arc::new(WebhookRegistry::new());
190 tracing::info!("✅ Webhook registry initialized");
191 w
192 };
193
194 let store = Self {
195 events: Arc::new(RwLock::new(Vec::new())),
196 index: Arc::new(EventIndex::new()),
197 projections: Arc::new(RwLock::new(projections)),
198 storage,
199 #[cfg(feature = "server")]
200 websocket_manager: Arc::new(WebSocketManager::new()),
201 snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
202 wal,
203 compaction_manager,
204 schema_registry,
205 replay_manager,
206 pipeline_manager,
207 #[cfg(feature = "server")]
208 metrics,
209 total_ingested: Arc::new(RwLock::new(0)),
210 projection_state_cache,
211 #[cfg(feature = "server")]
212 webhook_registry,
213 #[cfg(feature = "server")]
214 webhook_tx: Arc::new(RwLock::new(None)),
215 geo_index: Arc::new(GeoIndex::new()),
216 exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
217 schema_evolution: Arc::new(SchemaEvolutionManager::new()),
218 };
219
220 let mut wal_recovered = false;
222 if let Some(ref wal) = store.wal {
223 match wal.recover() {
224 Ok(recovered_events) if !recovered_events.is_empty() => {
225 tracing::info!(
226 "🔄 Recovering {} events from WAL...",
227 recovered_events.len()
228 );
229
230 for event in recovered_events {
231 let offset = store.events.read().len();
233 if let Err(e) = store.index.index_event(
234 event.id,
235 event.entity_id_str(),
236 event.event_type_str(),
237 event.timestamp,
238 offset,
239 ) {
240 tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
241 }
242
243 if let Err(e) = store.projections.read().process_event(&event) {
244 tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
245 }
246
247 store.events.write().push(event);
248 }
249
250 let total = store.events.read().len();
251 *store.total_ingested.write() = total as u64;
252 tracing::info!("✅ Successfully recovered {} events from WAL", total);
253
254 if store.storage.is_some() {
256 tracing::info!("📸 Checkpointing WAL to Parquet storage...");
257 if let Err(e) = store.flush_storage() {
258 tracing::error!("Failed to checkpoint to Parquet: {}", e);
259 } else if let Err(e) = wal.truncate() {
260 tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
261 } else {
262 tracing::info!("✅ WAL checkpointed and truncated");
263 }
264 }
265
266 wal_recovered = true;
267 }
268 Ok(_) => {
269 tracing::debug!("No events to recover from WAL");
270 }
271 Err(e) => {
272 tracing::error!("❌ WAL recovery failed: {}", e);
273 }
274 }
275 }
276
277 if !wal_recovered
280 && let Some(ref storage) = store.storage
281 && let Ok(persisted_events) = storage.read().load_all_events()
282 {
283 tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
284
285 for event in persisted_events {
286 let offset = store.events.read().len();
288 if let Err(e) = store.index.index_event(
289 event.id,
290 event.entity_id_str(),
291 event.event_type_str(),
292 event.timestamp,
293 offset,
294 ) {
295 tracing::error!("Failed to re-index event {}: {}", event.id, e);
296 }
297
298 if let Err(e) = store.projections.read().process_event(&event) {
300 tracing::error!("Failed to re-process event {}: {}", event.id, e);
301 }
302
303 store.events.write().push(event);
304 }
305
306 let total = store.events.read().len();
307 *store.total_ingested.write() = total as u64;
308 tracing::info!("✅ Successfully loaded {} events from storage", total);
309 }
310
311 store
312 }
313
314 pub fn ingest(&self, event: Event) -> Result<()> {
316 #[cfg(feature = "server")]
318 let timer = self.metrics.ingestion_duration_seconds.start_timer();
319
320 let validation_result = self.validate_event(&event);
322 if let Err(e) = validation_result {
323 #[cfg(feature = "server")]
324 {
325 self.metrics.ingestion_errors_total.inc();
326 timer.observe_duration();
327 }
328 return Err(e);
329 }
330
331 if let Some(ref wal) = self.wal
334 && let Err(e) = wal.append(event.clone())
335 {
336 #[cfg(feature = "server")]
337 {
338 self.metrics.ingestion_errors_total.inc();
339 timer.observe_duration();
340 }
341 return Err(e);
342 }
343
344 let mut events = self.events.write();
345 let offset = events.len();
346
347 self.index.index_event(
349 event.id,
350 event.entity_id_str(),
351 event.event_type_str(),
352 event.timestamp,
353 offset,
354 )?;
355
356 let projections = self.projections.read();
358 projections.process_event(&event)?;
359 drop(projections); let pipeline_results = self.pipeline_manager.process_event(&event);
364 if !pipeline_results.is_empty() {
365 tracing::debug!(
366 "Event {} processed by {} pipeline(s)",
367 event.id,
368 pipeline_results.len()
369 );
370 for (pipeline_id, result) in pipeline_results {
373 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
374 }
375 }
376
377 if let Some(ref storage) = self.storage {
379 let storage = storage.read();
380 storage.append_event(event.clone())?;
381 }
382
383 events.push(event.clone());
385 let total_events = events.len();
386 drop(events); #[cfg(feature = "server")]
390 self.websocket_manager
391 .broadcast_event(Arc::new(event.clone()));
392
393 #[cfg(feature = "server")]
395 self.dispatch_webhooks(&event);
396
397 self.geo_index.index_event(&event);
399
400 self.schema_evolution
402 .analyze_event(event.event_type_str(), &event.payload);
403
404 self.check_auto_snapshot(event.entity_id_str(), &event);
406
407 #[cfg(feature = "server")]
409 {
410 self.metrics.events_ingested_total.inc();
411 self.metrics
412 .events_ingested_by_type
413 .with_label_values(&[event.event_type_str()])
414 .inc();
415 self.metrics.storage_events_total.set(total_events as i64);
416 }
417
418 let mut total = self.total_ingested.write();
420 *total += 1;
421
422 #[cfg(feature = "server")]
423 timer.observe_duration();
424
425 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
426
427 Ok(())
428 }
429
430 pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()> {
437 if batch.is_empty() {
438 return Ok(());
439 }
440
441 for event in &batch {
443 self.validate_event(event)?;
444 }
445
446 if let Some(ref wal) = self.wal {
448 for event in &batch {
449 wal.append(event.clone())?;
450 }
451 }
452
453 let mut events = self.events.write();
455 let projections = self.projections.read();
456
457 for event in batch {
458 let offset = events.len();
459
460 self.index.index_event(
461 event.id,
462 event.entity_id_str(),
463 event.event_type_str(),
464 event.timestamp,
465 offset,
466 )?;
467
468 projections.process_event(&event)?;
469 self.pipeline_manager.process_event(&event);
470
471 if let Some(ref storage) = self.storage {
472 let storage = storage.read();
473 storage.append_event(event.clone())?;
474 }
475
476 self.geo_index.index_event(&event);
477 self.schema_evolution
478 .analyze_event(event.event_type_str(), &event.payload);
479
480 events.push(event);
481 }
482
483 let total_events = events.len();
484 drop(projections);
485 drop(events);
486
487 let mut total = self.total_ingested.write();
488 *total += total_events as u64;
489
490 Ok(())
491 }
492
493 pub fn ingest_replicated(&self, event: Event) -> Result<()> {
500 #[cfg(feature = "server")]
501 let timer = self.metrics.ingestion_duration_seconds.start_timer();
502
503 let mut events = self.events.write();
504 let offset = events.len();
505
506 self.index.index_event(
508 event.id,
509 event.entity_id_str(),
510 event.event_type_str(),
511 event.timestamp,
512 offset,
513 )?;
514
515 let projections = self.projections.read();
517 projections.process_event(&event)?;
518 drop(projections);
519
520 let pipeline_results = self.pipeline_manager.process_event(&event);
522 if !pipeline_results.is_empty() {
523 tracing::debug!(
524 "Replicated event {} processed by {} pipeline(s)",
525 event.id,
526 pipeline_results.len()
527 );
528 }
529
530 events.push(event.clone());
532 let total_events = events.len();
533 drop(events);
534
535 #[cfg(feature = "server")]
537 self.websocket_manager
538 .broadcast_event(Arc::new(event.clone()));
539
540 #[cfg(feature = "server")]
542 {
543 self.metrics.events_ingested_total.inc();
544 self.metrics
545 .events_ingested_by_type
546 .with_label_values(&[event.event_type_str()])
547 .inc();
548 self.metrics.storage_events_total.set(total_events as i64);
549 }
550
551 let mut total = self.total_ingested.write();
552 *total += 1;
553
554 #[cfg(feature = "server")]
555 timer.observe_duration();
556
557 tracing::debug!(
558 "Replicated event ingested: {} (offset: {})",
559 event.id,
560 offset
561 );
562
563 Ok(())
564 }
565
566 #[cfg(feature = "server")]
568 pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
569 Arc::clone(&self.websocket_manager)
570 }
571
572 pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
574 Arc::clone(&self.snapshot_manager)
575 }
576
577 pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
579 self.compaction_manager.as_ref().map(Arc::clone)
580 }
581
582 pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
584 Arc::clone(&self.schema_registry)
585 }
586
587 pub fn replay_manager(&self) -> Arc<ReplayManager> {
589 Arc::clone(&self.replay_manager)
590 }
591
592 pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
594 Arc::clone(&self.pipeline_manager)
595 }
596
597 #[cfg(feature = "server")]
599 pub fn metrics(&self) -> Arc<MetricsRegistry> {
600 Arc::clone(&self.metrics)
601 }
602
603 pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
605 self.projections.read()
606 }
607
608 pub fn register_projection(
617 &self,
618 projection: Arc<dyn crate::application::services::projection::Projection>,
619 ) {
620 let mut pm = self.projections.write();
621 pm.register(projection);
622 }
623
624 pub fn register_projection_with_backfill(
630 &self,
631 projection: Arc<dyn crate::application::services::projection::Projection>,
632 ) -> Result<()> {
633 {
635 let mut pm = self.projections.write();
636 pm.register(Arc::clone(&projection));
637 }
638
639 let events = self.events.read();
641 for event in events.iter() {
642 projection.process(event)?;
643 }
644
645 Ok(())
646 }
647
648 pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
651 Arc::clone(&self.projection_state_cache)
652 }
653
654 pub fn geo_index(&self) -> Arc<GeoIndex> {
657 self.geo_index.clone()
658 }
659
660 pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
662 self.exactly_once.clone()
663 }
664
665 pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
667 self.schema_evolution.clone()
668 }
669
670 pub fn snapshot_events(&self) -> Vec<Event> {
676 self.events.read().clone()
677 }
678
679 pub fn compact_entity_tokens(
701 &self,
702 entity_id: &str,
703 token_event_type: &str,
704 merged_event: Event,
705 ) -> Result<bool> {
706 {
708 let events = self.events.read();
709 let has_tokens = events
710 .iter()
711 .any(|e| e.entity_id_str() == entity_id && e.event_type_str() == token_event_type);
712 if !has_tokens {
713 return Ok(false);
714 }
715 }
716
717 let projections = self.projections.read();
719 projections.process_event(&merged_event)?;
720 drop(projections);
721
722 let mut events = self.events.write();
724
725 events.retain(|e| {
726 !(e.entity_id_str() == entity_id && e.event_type_str() == token_event_type)
727 });
728
729 events.push(merged_event.clone());
730
731 if let Some(ref wal) = self.wal {
735 wal.append(merged_event)?;
736 }
737
738 self.index.clear();
743 for (offset, event) in events.iter().enumerate() {
744 if let Err(e) = self.index.index_event(
745 event.id,
746 event.entity_id_str(),
747 event.event_type_str(),
748 event.timestamp,
749 offset,
750 ) {
751 tracing::warn!(
752 event_id = %event.id,
753 offset,
754 "Failed to re-index event during compaction: {e}"
755 );
756 }
757 }
758
759 Ok(true)
760 }
761
762 #[cfg(feature = "server")]
763 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
764 Arc::clone(&self.webhook_registry)
765 }
766
767 #[cfg(feature = "server")]
770 pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
771 *self.webhook_tx.write() = Some(tx);
772 tracing::info!("Webhook delivery channel connected");
773 }
774
775 #[cfg(feature = "server")]
777 fn dispatch_webhooks(&self, event: &Event) {
778 let matching = self.webhook_registry.find_matching(event);
779 if matching.is_empty() {
780 return;
781 }
782
783 let tx_guard = self.webhook_tx.read();
784 if let Some(ref tx) = *tx_guard {
785 for webhook in matching {
786 let task = WebhookDeliveryTask {
787 webhook,
788 event: event.clone(),
789 };
790 if let Err(e) = tx.send(task) {
791 tracing::warn!("Failed to queue webhook delivery: {}", e);
792 }
793 }
794 }
795 }
796
797 pub fn flush_storage(&self) -> Result<()> {
799 if let Some(ref storage) = self.storage {
800 let storage = storage.read();
801 storage.flush()?;
802 tracing::info!("✅ Flushed events to persistent storage");
803 }
804 Ok(())
805 }
806
807 pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
809 let events = self.query(QueryEventsRequest {
811 entity_id: Some(entity_id.to_string()),
812 event_type: None,
813 tenant_id: None,
814 as_of: None,
815 since: None,
816 until: None,
817 limit: None,
818 event_type_prefix: None,
819 payload_filter: None,
820 })?;
821
822 if events.is_empty() {
823 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
824 }
825
826 let mut state = serde_json::json!({});
828 for event in &events {
829 if let serde_json::Value::Object(ref mut state_map) = state
830 && let serde_json::Value::Object(ref payload_map) = event.payload
831 {
832 for (key, value) in payload_map {
833 state_map.insert(key.clone(), value.clone());
834 }
835 }
836 }
837
838 let last_event = events.last().unwrap();
839 self.snapshot_manager.create_snapshot(
840 entity_id.to_string(),
841 state,
842 last_event.timestamp,
843 events.len(),
844 SnapshotType::Manual,
845 )?;
846
847 Ok(())
848 }
849
850 fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
852 let entity_event_count = self
854 .index
855 .get_by_entity(entity_id)
856 .map(|entries| entries.len())
857 .unwrap_or(0);
858
859 if self.snapshot_manager.should_create_snapshot(
860 entity_id,
861 entity_event_count,
862 event.timestamp,
863 ) {
864 if let Err(e) = self.create_snapshot(entity_id) {
866 tracing::warn!(
867 "Failed to create automatic snapshot for {}: {}",
868 entity_id,
869 e
870 );
871 }
872 }
873 }
874
875 fn validate_event(&self, event: &Event) -> Result<()> {
877 if event.entity_id_str().is_empty() {
880 return Err(AllSourceError::ValidationError(
881 "entity_id cannot be empty".to_string(),
882 ));
883 }
884
885 if event.event_type_str().is_empty() {
886 return Err(AllSourceError::ValidationError(
887 "event_type cannot be empty".to_string(),
888 ));
889 }
890
891 if event.event_type().is_system() {
894 return Err(AllSourceError::ValidationError(
895 "Event types starting with '_system.' are reserved for internal use".to_string(),
896 ));
897 }
898
899 Ok(())
900 }
901
902 pub fn reset_projection(&self, name: &str) -> Result<usize> {
904 let projection_manager = self.projections.read();
905 let projection = projection_manager.get_projection(name).ok_or_else(|| {
906 AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
907 })?;
908
909 projection.clear();
911
912 let prefix = format!("{name}:");
914 let keys_to_remove: Vec<String> = self
915 .projection_state_cache
916 .iter()
917 .filter(|entry| entry.key().starts_with(&prefix))
918 .map(|entry| entry.key().clone())
919 .collect();
920 for key in keys_to_remove {
921 self.projection_state_cache.remove(&key);
922 }
923
924 let events = self.events.read();
926 let mut reprocessed = 0usize;
927 for event in events.iter() {
928 if projection.process(event).is_ok() {
929 reprocessed += 1;
930 }
931 }
932
933 Ok(reprocessed)
934 }
935
936 pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
938 if let Some(offset) = self.index.get_by_id(event_id) {
939 let events = self.events.read();
940 Ok(events.get(offset).cloned())
941 } else {
942 Ok(None)
943 }
944 }
945
946 pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
948 let query_type = if request.entity_id.is_some() {
950 "entity"
951 } else if request.event_type.is_some() {
952 "type"
953 } else if request.event_type_prefix.is_some() {
954 "type_prefix"
955 } else {
956 "full_scan"
957 };
958
959 #[cfg(feature = "server")]
961 let timer = self
962 .metrics
963 .query_duration_seconds
964 .with_label_values(&[query_type])
965 .start_timer();
966
967 #[cfg(feature = "server")]
969 self.metrics
970 .queries_total
971 .with_label_values(&[query_type])
972 .inc();
973
974 let events = self.events.read();
975
976 let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
978 self.index
980 .get_by_entity(entity_id)
981 .map(|entries| self.filter_entries(entries, &request))
982 .unwrap_or_default()
983 } else if let Some(event_type) = &request.event_type {
984 self.index
986 .get_by_type(event_type)
987 .map(|entries| self.filter_entries(entries, &request))
988 .unwrap_or_default()
989 } else if let Some(prefix) = &request.event_type_prefix {
990 let entries = self.index.get_by_type_prefix(prefix);
992 self.filter_entries(entries, &request)
993 } else {
994 (0..events.len()).collect()
996 };
997
998 let mut results: Vec<Event> = offsets
1000 .iter()
1001 .filter_map(|&offset| events.get(offset).cloned())
1002 .filter(|event| self.apply_filters(event, &request))
1003 .collect();
1004
1005 results.sort_by_key(|x| x.timestamp);
1007
1008 if let Some(limit) = request.limit {
1010 results.truncate(limit);
1011 }
1012
1013 #[cfg(feature = "server")]
1015 {
1016 self.metrics
1017 .query_results_total
1018 .with_label_values(&[query_type])
1019 .inc_by(results.len() as u64);
1020 timer.observe_duration();
1021 }
1022
1023 Ok(results)
1024 }
1025
1026 fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
1028 entries
1029 .into_iter()
1030 .filter(|entry| {
1031 if let Some(as_of) = request.as_of
1033 && entry.timestamp > as_of
1034 {
1035 return false;
1036 }
1037 if let Some(since) = request.since
1038 && entry.timestamp < since
1039 {
1040 return false;
1041 }
1042 if let Some(until) = request.until
1043 && entry.timestamp > until
1044 {
1045 return false;
1046 }
1047 true
1048 })
1049 .map(|entry| entry.offset)
1050 .collect()
1051 }
1052
1053 fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
1055 if request.entity_id.is_some()
1057 && let Some(ref event_type) = request.event_type
1058 && event.event_type_str() != event_type
1059 {
1060 return false;
1061 }
1062
1063 if request.entity_id.is_some()
1065 && let Some(ref prefix) = request.event_type_prefix
1066 && !event.event_type_str().starts_with(prefix)
1067 {
1068 return false;
1069 }
1070
1071 if let Some(ref filter_str) = request.payload_filter
1073 && let Ok(filter_obj) =
1074 serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(filter_str)
1075 {
1076 let payload = event.payload();
1077 for (key, expected_value) in &filter_obj {
1078 match payload.get(key) {
1079 Some(actual_value) if actual_value == expected_value => {}
1080 _ => return false,
1081 }
1082 }
1083 }
1084
1085 true
1086 }
1087
1088 pub fn reconstruct_state(
1091 &self,
1092 entity_id: &str,
1093 as_of: Option<DateTime<Utc>>,
1094 ) -> Result<serde_json::Value> {
1095 let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
1097 if let Some(snapshot) = self
1099 .snapshot_manager
1100 .get_snapshot_as_of(entity_id, as_of_time)
1101 {
1102 tracing::debug!(
1103 "Using snapshot from {} for entity {} (saved {} events)",
1104 snapshot.as_of,
1105 entity_id,
1106 snapshot.event_count
1107 );
1108 (snapshot.state.clone(), Some(snapshot.as_of))
1109 } else {
1110 (serde_json::json!({}), None)
1111 }
1112 } else {
1113 if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
1115 tracing::debug!(
1116 "Using latest snapshot from {} for entity {}",
1117 snapshot.as_of,
1118 entity_id
1119 );
1120 (snapshot.state.clone(), Some(snapshot.as_of))
1121 } else {
1122 (serde_json::json!({}), None)
1123 }
1124 };
1125
1126 let events = self.query(QueryEventsRequest {
1128 entity_id: Some(entity_id.to_string()),
1129 event_type: None,
1130 tenant_id: None,
1131 as_of,
1132 since: since_timestamp,
1133 until: None,
1134 limit: None,
1135 event_type_prefix: None,
1136 payload_filter: None,
1137 })?;
1138
1139 if events.is_empty() && since_timestamp.is_none() {
1141 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1142 }
1143
1144 let mut merged_state = merged_state;
1146 for event in &events {
1147 if let serde_json::Value::Object(ref mut state_map) = merged_state
1148 && let serde_json::Value::Object(ref payload_map) = event.payload
1149 {
1150 for (key, value) in payload_map {
1151 state_map.insert(key.clone(), value.clone());
1152 }
1153 }
1154 }
1155
1156 let state = serde_json::json!({
1158 "entity_id": entity_id,
1159 "last_updated": events.last().map(|e| e.timestamp),
1160 "event_count": events.len(),
1161 "as_of": as_of,
1162 "current_state": merged_state,
1163 "history": events.iter().map(|e| {
1164 serde_json::json!({
1165 "event_id": e.id,
1166 "type": e.event_type,
1167 "timestamp": e.timestamp,
1168 "payload": e.payload
1169 })
1170 }).collect::<Vec<_>>()
1171 });
1172
1173 Ok(state)
1174 }
1175
1176 pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
1178 let projections = self.projections.read();
1179
1180 if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
1181 && let Some(state) = snapshot_projection.get_state(entity_id)
1182 {
1183 return Ok(serde_json::json!({
1184 "entity_id": entity_id,
1185 "snapshot": state,
1186 "from_projection": "entity_snapshots"
1187 }));
1188 }
1189
1190 Err(AllSourceError::EntityNotFound(entity_id.to_string()))
1191 }
1192
1193 pub fn stats(&self) -> StoreStats {
1195 let events = self.events.read();
1196 let index_stats = self.index.stats();
1197
1198 StoreStats {
1199 total_events: events.len(),
1200 total_entities: index_stats.total_entities,
1201 total_event_types: index_stats.total_event_types,
1202 total_ingested: *self.total_ingested.read(),
1203 }
1204 }
1205
1206 pub fn list_streams(&self) -> Vec<StreamInfo> {
1208 self.index
1209 .get_all_entities()
1210 .into_iter()
1211 .map(|entity_id| {
1212 let event_count = self
1213 .index
1214 .get_by_entity(&entity_id)
1215 .map(|entries| entries.len())
1216 .unwrap_or(0);
1217 let last_event_at = self
1218 .index
1219 .get_by_entity(&entity_id)
1220 .and_then(|entries| entries.last().map(|e| e.timestamp));
1221 StreamInfo {
1222 stream_id: entity_id,
1223 event_count,
1224 last_event_at,
1225 }
1226 })
1227 .collect()
1228 }
1229
1230 pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
1232 self.index
1233 .get_all_types()
1234 .into_iter()
1235 .map(|event_type| {
1236 let event_count = self
1237 .index
1238 .get_by_type(&event_type)
1239 .map(|entries| entries.len())
1240 .unwrap_or(0);
1241 let last_event_at = self
1242 .index
1243 .get_by_type(&event_type)
1244 .and_then(|entries| entries.last().map(|e| e.timestamp));
1245 EventTypeInfo {
1246 event_type,
1247 event_count,
1248 last_event_at,
1249 }
1250 })
1251 .collect()
1252 }
1253
1254 pub fn enable_wal_replication(
1261 &self,
1262 tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
1263 ) {
1264 if let Some(ref wal_arc) = self.wal {
1265 wal_arc.set_replication_tx(tx);
1266 tracing::info!("WAL replication broadcast enabled");
1267 } else {
1268 tracing::warn!("Cannot enable WAL replication: WAL is not configured");
1269 }
1270 }
1271
1272 pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1275 self.wal.as_ref()
1276 }
1277
1278 pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1281 self.storage.as_ref()
1282 }
1283}
1284
1285#[derive(Debug, Clone, Default)]
1287pub struct EventStoreConfig {
1288 pub storage_dir: Option<PathBuf>,
1290
1291 pub snapshot_config: SnapshotConfig,
1293
1294 pub wal_dir: Option<PathBuf>,
1296
1297 pub wal_config: WALConfig,
1299
1300 pub compaction_config: CompactionConfig,
1302
1303 pub schema_registry_config: SchemaRegistryConfig,
1305
1306 pub system_data_dir: Option<PathBuf>,
1311
1312 pub bootstrap_tenant: Option<String>,
1314}
1315
1316impl EventStoreConfig {
1317 pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
1319 Self {
1320 storage_dir: Some(storage_dir.into()),
1321 ..Self::default()
1322 }
1323 }
1324
1325 pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
1327 Self {
1328 snapshot_config,
1329 ..Self::default()
1330 }
1331 }
1332
1333 pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
1335 Self {
1336 wal_dir: Some(wal_dir.into()),
1337 wal_config,
1338 ..Self::default()
1339 }
1340 }
1341
1342 pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
1344 Self {
1345 storage_dir: Some(storage_dir.into()),
1346 snapshot_config,
1347 ..Self::default()
1348 }
1349 }
1350
1351 pub fn production(
1353 storage_dir: impl Into<PathBuf>,
1354 wal_dir: impl Into<PathBuf>,
1355 snapshot_config: SnapshotConfig,
1356 wal_config: WALConfig,
1357 compaction_config: CompactionConfig,
1358 ) -> Self {
1359 let storage_dir = storage_dir.into();
1360 let system_data_dir = storage_dir.join("__system");
1361 Self {
1362 storage_dir: Some(storage_dir),
1363 snapshot_config,
1364 wal_dir: Some(wal_dir.into()),
1365 wal_config,
1366 compaction_config,
1367 system_data_dir: Some(system_data_dir),
1368 ..Self::default()
1369 }
1370 }
1371
1372 pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
1377 self.system_data_dir
1378 .clone()
1379 .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
1380 }
1381
1382 pub fn from_env() -> (Self, &'static str) {
1390 Self::from_env_vars(
1391 std::env::var("ALLSOURCE_DATA_DIR")
1392 .ok()
1393 .filter(|s| !s.is_empty()),
1394 std::env::var("ALLSOURCE_STORAGE_DIR")
1395 .ok()
1396 .filter(|s| !s.is_empty()),
1397 std::env::var("ALLSOURCE_WAL_DIR")
1398 .ok()
1399 .filter(|s| !s.is_empty()),
1400 std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
1401 )
1402 }
1403
1404 pub fn from_env_vars(
1406 data_dir: Option<String>,
1407 explicit_storage_dir: Option<String>,
1408 explicit_wal_dir: Option<String>,
1409 wal_enabled_var: Option<String>,
1410 ) -> (Self, &'static str) {
1411 let data_dir = data_dir.filter(|s| !s.is_empty());
1412 let storage_dir = explicit_storage_dir
1413 .filter(|s| !s.is_empty())
1414 .or_else(|| data_dir.as_ref().map(|d| format!("{}/storage", d)));
1415 let wal_dir = explicit_wal_dir
1416 .filter(|s| !s.is_empty())
1417 .or_else(|| data_dir.as_ref().map(|d| format!("{}/wal", d)));
1418 let wal_enabled = wal_enabled_var.map(|v| v == "true").unwrap_or(true);
1419
1420 match (&storage_dir, &wal_dir) {
1421 (Some(sd), Some(wd)) if wal_enabled => {
1422 let config = Self::production(
1423 sd,
1424 wd,
1425 SnapshotConfig::default(),
1426 WALConfig::default(),
1427 CompactionConfig::default(),
1428 );
1429 (config, "wal+parquet")
1430 }
1431 (Some(sd), _) => {
1432 let config = Self::with_persistence(sd);
1433 (config, "parquet-only")
1434 }
1435 (_, Some(wd)) if wal_enabled => {
1436 let config = Self::with_wal(wd, WALConfig::default());
1437 (config, "wal-only")
1438 }
1439 _ => (Self::default(), "in-memory"),
1440 }
1441 }
1442}
1443
1444#[derive(Debug, serde::Serialize)]
1445pub struct StoreStats {
1446 pub total_events: usize,
1447 pub total_entities: usize,
1448 pub total_event_types: usize,
1449 pub total_ingested: u64,
1450}
1451
1452#[derive(Debug, Clone, serde::Serialize)]
1454pub struct StreamInfo {
1455 pub stream_id: String,
1457 pub event_count: usize,
1459 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1461}
1462
1463#[derive(Debug, Clone, serde::Serialize)]
1465pub struct EventTypeInfo {
1466 pub event_type: String,
1468 pub event_count: usize,
1470 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1472}
1473
1474impl Default for EventStore {
1475 fn default() -> Self {
1476 Self::new()
1477 }
1478}
1479
1480#[cfg(test)]
1481mod tests {
1482 use super::*;
1483 use crate::domain::entities::Event;
1484 use tempfile::TempDir;
1485
1486 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1487 Event::from_strings(
1488 event_type.to_string(),
1489 entity_id.to_string(),
1490 "default".to_string(),
1491 serde_json::json!({"name": "Test", "value": 42}),
1492 None,
1493 )
1494 .unwrap()
1495 }
1496
1497 fn create_test_event_with_payload(
1498 entity_id: &str,
1499 event_type: &str,
1500 payload: serde_json::Value,
1501 ) -> Event {
1502 Event::from_strings(
1503 event_type.to_string(),
1504 entity_id.to_string(),
1505 "default".to_string(),
1506 payload,
1507 None,
1508 )
1509 .unwrap()
1510 }
1511
1512 #[test]
1513 fn test_event_store_new() {
1514 let store = EventStore::new();
1515 assert_eq!(store.stats().total_events, 0);
1516 assert_eq!(store.stats().total_entities, 0);
1517 }
1518
1519 #[test]
1520 fn test_event_store_default() {
1521 let store = EventStore::default();
1522 assert_eq!(store.stats().total_events, 0);
1523 }
1524
1525 #[test]
1526 fn test_ingest_single_event() {
1527 let store = EventStore::new();
1528 let event = create_test_event("entity-1", "user.created");
1529
1530 store.ingest(event).unwrap();
1531
1532 assert_eq!(store.stats().total_events, 1);
1533 assert_eq!(store.stats().total_ingested, 1);
1534 }
1535
1536 #[test]
1537 fn test_ingest_multiple_events() {
1538 let store = EventStore::new();
1539
1540 for i in 0..10 {
1541 let event = create_test_event(&format!("entity-{}", i), "user.created");
1542 store.ingest(event).unwrap();
1543 }
1544
1545 assert_eq!(store.stats().total_events, 10);
1546 assert_eq!(store.stats().total_ingested, 10);
1547 }
1548
1549 #[test]
1550 fn test_query_by_entity_id() {
1551 let store = EventStore::new();
1552
1553 store
1554 .ingest(create_test_event("entity-1", "user.created"))
1555 .unwrap();
1556 store
1557 .ingest(create_test_event("entity-2", "user.created"))
1558 .unwrap();
1559 store
1560 .ingest(create_test_event("entity-1", "user.updated"))
1561 .unwrap();
1562
1563 let results = store
1564 .query(QueryEventsRequest {
1565 entity_id: Some("entity-1".to_string()),
1566 event_type: None,
1567 tenant_id: None,
1568 as_of: None,
1569 since: None,
1570 until: None,
1571 limit: None,
1572 event_type_prefix: None,
1573 payload_filter: None,
1574 })
1575 .unwrap();
1576
1577 assert_eq!(results.len(), 2);
1578 }
1579
1580 #[test]
1581 fn test_query_by_event_type() {
1582 let store = EventStore::new();
1583
1584 store
1585 .ingest(create_test_event("entity-1", "user.created"))
1586 .unwrap();
1587 store
1588 .ingest(create_test_event("entity-2", "user.updated"))
1589 .unwrap();
1590 store
1591 .ingest(create_test_event("entity-3", "user.created"))
1592 .unwrap();
1593
1594 let results = store
1595 .query(QueryEventsRequest {
1596 entity_id: None,
1597 event_type: Some("user.created".to_string()),
1598 tenant_id: None,
1599 as_of: None,
1600 since: None,
1601 until: None,
1602 limit: None,
1603 event_type_prefix: None,
1604 payload_filter: None,
1605 })
1606 .unwrap();
1607
1608 assert_eq!(results.len(), 2);
1609 }
1610
1611 #[test]
1612 fn test_query_with_limit() {
1613 let store = EventStore::new();
1614
1615 for i in 0..10 {
1616 let event = create_test_event(&format!("entity-{}", i), "user.created");
1617 store.ingest(event).unwrap();
1618 }
1619
1620 let results = store
1621 .query(QueryEventsRequest {
1622 entity_id: None,
1623 event_type: None,
1624 tenant_id: None,
1625 as_of: None,
1626 since: None,
1627 until: None,
1628 limit: Some(5),
1629 event_type_prefix: None,
1630 payload_filter: None,
1631 })
1632 .unwrap();
1633
1634 assert_eq!(results.len(), 5);
1635 }
1636
1637 #[test]
1638 fn test_query_empty_store() {
1639 let store = EventStore::new();
1640
1641 let results = store
1642 .query(QueryEventsRequest {
1643 entity_id: Some("non-existent".to_string()),
1644 event_type: None,
1645 tenant_id: None,
1646 as_of: None,
1647 since: None,
1648 until: None,
1649 limit: None,
1650 event_type_prefix: None,
1651 payload_filter: None,
1652 })
1653 .unwrap();
1654
1655 assert!(results.is_empty());
1656 }
1657
1658 #[test]
1659 fn test_reconstruct_state() {
1660 let store = EventStore::new();
1661
1662 store
1663 .ingest(create_test_event("entity-1", "user.created"))
1664 .unwrap();
1665
1666 let state = store.reconstruct_state("entity-1", None).unwrap();
1667 assert_eq!(state["current_state"]["name"], "Test");
1669 assert_eq!(state["current_state"]["value"], 42);
1670 }
1671
1672 #[test]
1673 fn test_reconstruct_state_not_found() {
1674 let store = EventStore::new();
1675
1676 let result = store.reconstruct_state("non-existent", None);
1677 assert!(result.is_err());
1678 }
1679
1680 #[test]
1681 fn test_get_snapshot_empty() {
1682 let store = EventStore::new();
1683
1684 let result = store.get_snapshot("non-existent");
1685 assert!(result.is_err());
1687 }
1688
1689 #[test]
1690 fn test_create_snapshot() {
1691 let store = EventStore::new();
1692
1693 store
1694 .ingest(create_test_event("entity-1", "user.created"))
1695 .unwrap();
1696
1697 store.create_snapshot("entity-1").unwrap();
1698
1699 let snapshot = store.get_snapshot("entity-1").unwrap();
1701 assert!(snapshot != serde_json::json!(null));
1702 }
1703
1704 #[test]
1705 fn test_create_snapshot_entity_not_found() {
1706 let store = EventStore::new();
1707
1708 let result = store.create_snapshot("non-existent");
1709 assert!(result.is_err());
1710 }
1711
1712 #[test]
1713 fn test_websocket_manager() {
1714 let store = EventStore::new();
1715 let manager = store.websocket_manager();
1716 assert!(Arc::strong_count(&manager) >= 1);
1718 }
1719
1720 #[test]
1721 fn test_snapshot_manager() {
1722 let store = EventStore::new();
1723 let manager = store.snapshot_manager();
1724 assert!(Arc::strong_count(&manager) >= 1);
1725 }
1726
1727 #[test]
1728 fn test_compaction_manager_none() {
1729 let store = EventStore::new();
1730 assert!(store.compaction_manager().is_none());
1732 }
1733
1734 #[test]
1735 fn test_schema_registry() {
1736 let store = EventStore::new();
1737 let registry = store.schema_registry();
1738 assert!(Arc::strong_count(®istry) >= 1);
1739 }
1740
1741 #[test]
1742 fn test_replay_manager() {
1743 let store = EventStore::new();
1744 let manager = store.replay_manager();
1745 assert!(Arc::strong_count(&manager) >= 1);
1746 }
1747
1748 #[test]
1749 fn test_pipeline_manager() {
1750 let store = EventStore::new();
1751 let manager = store.pipeline_manager();
1752 assert!(Arc::strong_count(&manager) >= 1);
1753 }
1754
1755 #[test]
1756 fn test_projection_manager() {
1757 let store = EventStore::new();
1758 let manager = store.projection_manager();
1759 let projections = manager.list_projections();
1761 assert!(projections.len() >= 2); }
1763
1764 #[test]
1765 fn test_projection_state_cache() {
1766 let store = EventStore::new();
1767 let cache = store.projection_state_cache();
1768
1769 cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
1770 assert_eq!(cache.len(), 1);
1771
1772 let value = cache.get("test:key").unwrap();
1773 assert_eq!(value["value"], 123);
1774 }
1775
1776 #[test]
1777 fn test_metrics() {
1778 let store = EventStore::new();
1779 let metrics = store.metrics();
1780 assert!(Arc::strong_count(&metrics) >= 1);
1781 }
1782
1783 #[test]
1784 fn test_store_stats() {
1785 let store = EventStore::new();
1786
1787 store
1788 .ingest(create_test_event("entity-1", "user.created"))
1789 .unwrap();
1790 store
1791 .ingest(create_test_event("entity-2", "order.placed"))
1792 .unwrap();
1793
1794 let stats = store.stats();
1795 assert_eq!(stats.total_events, 2);
1796 assert_eq!(stats.total_entities, 2);
1797 assert_eq!(stats.total_event_types, 2);
1798 assert_eq!(stats.total_ingested, 2);
1799 }
1800
1801 #[test]
1802 fn test_event_store_config_default() {
1803 let config = EventStoreConfig::default();
1804 assert!(config.storage_dir.is_none());
1805 assert!(config.wal_dir.is_none());
1806 }
1807
1808 #[test]
1809 fn test_event_store_config_with_persistence() {
1810 let temp_dir = TempDir::new().unwrap();
1811 let config = EventStoreConfig::with_persistence(temp_dir.path());
1812
1813 assert!(config.storage_dir.is_some());
1814 assert!(config.wal_dir.is_none());
1815 }
1816
1817 #[test]
1818 fn test_event_store_config_with_wal() {
1819 let temp_dir = TempDir::new().unwrap();
1820 let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
1821
1822 assert!(config.storage_dir.is_none());
1823 assert!(config.wal_dir.is_some());
1824 }
1825
1826 #[test]
1827 fn test_event_store_config_with_all() {
1828 let temp_dir = TempDir::new().unwrap();
1829 let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
1830
1831 assert!(config.storage_dir.is_some());
1832 }
1833
1834 #[test]
1835 fn test_event_store_config_production() {
1836 let storage_dir = TempDir::new().unwrap();
1837 let wal_dir = TempDir::new().unwrap();
1838 let config = EventStoreConfig::production(
1839 storage_dir.path(),
1840 wal_dir.path(),
1841 SnapshotConfig::default(),
1842 WALConfig::default(),
1843 CompactionConfig::default(),
1844 );
1845
1846 assert!(config.storage_dir.is_some());
1847 assert!(config.wal_dir.is_some());
1848 }
1849
1850 #[test]
1856 fn test_from_env_vars_data_dir_enables_full_persistence() {
1857 let (config, mode) =
1858 EventStoreConfig::from_env_vars(Some("/app/data".to_string()), None, None, None);
1859 assert_eq!(mode, "wal+parquet");
1860 assert_eq!(
1861 config.storage_dir.unwrap().to_str().unwrap(),
1862 "/app/data/storage"
1863 );
1864 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
1865 }
1866
1867 #[test]
1868 fn test_from_env_vars_explicit_dirs() {
1869 let (config, mode) = EventStoreConfig::from_env_vars(
1870 None,
1871 Some("/custom/storage".to_string()),
1872 Some("/custom/wal".to_string()),
1873 None,
1874 );
1875 assert_eq!(mode, "wal+parquet");
1876 assert_eq!(
1877 config.storage_dir.unwrap().to_str().unwrap(),
1878 "/custom/storage"
1879 );
1880 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
1881 }
1882
1883 #[test]
1884 fn test_from_env_vars_wal_disabled() {
1885 let (config, mode) = EventStoreConfig::from_env_vars(
1886 Some("/app/data".to_string()),
1887 None,
1888 None,
1889 Some("false".to_string()),
1890 );
1891 assert_eq!(mode, "parquet-only");
1892 assert!(config.storage_dir.is_some());
1893 assert!(config.wal_dir.is_none());
1894 }
1895
1896 #[test]
1897 fn test_from_env_vars_no_dirs_is_in_memory() {
1898 let (config, mode) = EventStoreConfig::from_env_vars(None, None, None, None);
1899 assert_eq!(mode, "in-memory");
1900 assert!(config.storage_dir.is_none());
1901 assert!(config.wal_dir.is_none());
1902 }
1903
1904 #[test]
1905 fn test_from_env_vars_empty_strings_treated_as_none() {
1906 let (_, mode) = EventStoreConfig::from_env_vars(
1907 Some("".to_string()),
1908 Some("".to_string()),
1909 Some("".to_string()),
1910 None,
1911 );
1912 assert_eq!(mode, "in-memory");
1913 }
1914
1915 #[test]
1916 fn test_from_env_vars_explicit_overrides_data_dir() {
1917 let (config, mode) = EventStoreConfig::from_env_vars(
1918 Some("/app/data".to_string()),
1919 Some("/override/storage".to_string()),
1920 Some("/override/wal".to_string()),
1921 None,
1922 );
1923 assert_eq!(mode, "wal+parquet");
1924 assert_eq!(
1925 config.storage_dir.unwrap().to_str().unwrap(),
1926 "/override/storage"
1927 );
1928 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
1929 }
1930
1931 #[test]
1932 fn test_from_env_vars_wal_only() {
1933 let (config, mode) =
1934 EventStoreConfig::from_env_vars(None, None, Some("/wal/only".to_string()), None);
1935 assert_eq!(mode, "wal-only");
1936 assert!(config.storage_dir.is_none());
1937 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
1938 }
1939
1940 #[test]
1941 fn test_store_stats_serde() {
1942 let stats = StoreStats {
1943 total_events: 100,
1944 total_entities: 50,
1945 total_event_types: 10,
1946 total_ingested: 100,
1947 };
1948
1949 let json = serde_json::to_string(&stats).unwrap();
1950 assert!(json.contains("\"total_events\":100"));
1951 assert!(json.contains("\"total_entities\":50"));
1952 }
1953
1954 #[test]
1955 fn test_query_with_entity_and_type() {
1956 let store = EventStore::new();
1957
1958 store
1959 .ingest(create_test_event("entity-1", "user.created"))
1960 .unwrap();
1961 store
1962 .ingest(create_test_event("entity-1", "user.updated"))
1963 .unwrap();
1964 store
1965 .ingest(create_test_event("entity-2", "user.created"))
1966 .unwrap();
1967
1968 let results = store
1969 .query(QueryEventsRequest {
1970 entity_id: Some("entity-1".to_string()),
1971 event_type: Some("user.created".to_string()),
1972 tenant_id: None,
1973 as_of: None,
1974 since: None,
1975 until: None,
1976 limit: None,
1977 event_type_prefix: None,
1978 payload_filter: None,
1979 })
1980 .unwrap();
1981
1982 assert_eq!(results.len(), 1);
1983 assert_eq!(results[0].event_type_str(), "user.created");
1984 }
1985
1986 #[test]
1987 fn test_query_by_event_type_prefix() {
1988 let store = EventStore::new();
1989
1990 store
1992 .ingest(create_test_event("entity-1", "index.created"))
1993 .unwrap();
1994 store
1995 .ingest(create_test_event("entity-2", "index.updated"))
1996 .unwrap();
1997 store
1998 .ingest(create_test_event("entity-3", "trade.created"))
1999 .unwrap();
2000 store
2001 .ingest(create_test_event("entity-4", "trade.completed"))
2002 .unwrap();
2003 store
2004 .ingest(create_test_event("entity-5", "balance.updated"))
2005 .unwrap();
2006
2007 let results = store
2009 .query(QueryEventsRequest {
2010 entity_id: None,
2011 event_type: None,
2012 tenant_id: None,
2013 as_of: None,
2014 since: None,
2015 until: None,
2016 limit: None,
2017 event_type_prefix: Some("index.".to_string()),
2018 payload_filter: None,
2019 })
2020 .unwrap();
2021
2022 assert_eq!(results.len(), 2);
2023 assert!(
2024 results
2025 .iter()
2026 .all(|e| e.event_type_str().starts_with("index."))
2027 );
2028 }
2029
2030 #[test]
2031 fn test_query_by_event_type_prefix_empty_returns_all() {
2032 let store = EventStore::new();
2033
2034 store
2035 .ingest(create_test_event("entity-1", "index.created"))
2036 .unwrap();
2037 store
2038 .ingest(create_test_event("entity-2", "trade.created"))
2039 .unwrap();
2040
2041 let results = store
2043 .query(QueryEventsRequest {
2044 entity_id: None,
2045 event_type: None,
2046 tenant_id: None,
2047 as_of: None,
2048 since: None,
2049 until: None,
2050 limit: None,
2051 event_type_prefix: Some("".to_string()),
2052 payload_filter: None,
2053 })
2054 .unwrap();
2055
2056 assert_eq!(results.len(), 2);
2057 }
2058
2059 #[test]
2060 fn test_query_by_event_type_prefix_no_match() {
2061 let store = EventStore::new();
2062
2063 store
2064 .ingest(create_test_event("entity-1", "index.created"))
2065 .unwrap();
2066
2067 let results = store
2068 .query(QueryEventsRequest {
2069 entity_id: None,
2070 event_type: None,
2071 tenant_id: None,
2072 as_of: None,
2073 since: None,
2074 until: None,
2075 limit: None,
2076 event_type_prefix: Some("nonexistent.".to_string()),
2077 payload_filter: None,
2078 })
2079 .unwrap();
2080
2081 assert!(results.is_empty());
2082 }
2083
2084 #[test]
2085 fn test_query_by_entity_with_type_prefix() {
2086 let store = EventStore::new();
2087
2088 store
2089 .ingest(create_test_event("entity-1", "index.created"))
2090 .unwrap();
2091 store
2092 .ingest(create_test_event("entity-1", "trade.created"))
2093 .unwrap();
2094 store
2095 .ingest(create_test_event("entity-2", "index.updated"))
2096 .unwrap();
2097
2098 let results = store
2100 .query(QueryEventsRequest {
2101 entity_id: Some("entity-1".to_string()),
2102 event_type: None,
2103 tenant_id: None,
2104 as_of: None,
2105 since: None,
2106 until: None,
2107 limit: None,
2108 event_type_prefix: Some("index.".to_string()),
2109 payload_filter: None,
2110 })
2111 .unwrap();
2112
2113 assert_eq!(results.len(), 1);
2114 assert_eq!(results[0].event_type_str(), "index.created");
2115 }
2116
2117 #[test]
2118 fn test_query_prefix_with_limit() {
2119 let store = EventStore::new();
2120
2121 for i in 0..5 {
2122 store
2123 .ingest(create_test_event(&format!("entity-{}", i), "index.created"))
2124 .unwrap();
2125 }
2126
2127 let results = store
2128 .query(QueryEventsRequest {
2129 entity_id: None,
2130 event_type: None,
2131 tenant_id: None,
2132 as_of: None,
2133 since: None,
2134 until: None,
2135 limit: Some(3),
2136 event_type_prefix: Some("index.".to_string()),
2137 payload_filter: None,
2138 })
2139 .unwrap();
2140
2141 assert_eq!(results.len(), 3);
2142 }
2143
2144 #[test]
2145 fn test_query_prefix_alongside_existing_filters() {
2146 let store = EventStore::new();
2147
2148 store
2149 .ingest(create_test_event("entity-1", "index.created"))
2150 .unwrap();
2151 std::thread::sleep(std::time::Duration::from_millis(10));
2153 store
2154 .ingest(create_test_event("entity-2", "index.strategy.updated"))
2155 .unwrap();
2156 std::thread::sleep(std::time::Duration::from_millis(10));
2157 store
2158 .ingest(create_test_event("entity-3", "index.deleted"))
2159 .unwrap();
2160
2161 let results = store
2163 .query(QueryEventsRequest {
2164 entity_id: None,
2165 event_type: None,
2166 tenant_id: None,
2167 as_of: None,
2168 since: None,
2169 until: None,
2170 limit: Some(2),
2171 event_type_prefix: Some("index.".to_string()),
2172 payload_filter: None,
2173 })
2174 .unwrap();
2175
2176 assert_eq!(results.len(), 2);
2177 }
2178
2179 #[test]
2180 fn test_query_with_payload_filter() {
2181 let store = EventStore::new();
2182
2183 for i in 0..5 {
2185 store
2186 .ingest(create_test_event_with_payload(
2187 &format!("entity-{}", i),
2188 "user.action",
2189 serde_json::json!({"user_id": "alice", "action": "click"}),
2190 ))
2191 .unwrap();
2192 }
2193 for i in 5..10 {
2195 store
2196 .ingest(create_test_event_with_payload(
2197 &format!("entity-{}", i),
2198 "user.action",
2199 serde_json::json!({"user_id": "bob", "action": "view"}),
2200 ))
2201 .unwrap();
2202 }
2203
2204 let results = store
2206 .query(QueryEventsRequest {
2207 entity_id: None,
2208 event_type: Some("user.action".to_string()),
2209 tenant_id: None,
2210 as_of: None,
2211 since: None,
2212 until: None,
2213 limit: None,
2214 event_type_prefix: None,
2215 payload_filter: Some(r#"{"user_id":"alice"}"#.to_string()),
2216 })
2217 .unwrap();
2218
2219 assert_eq!(results.len(), 5);
2220 }
2221
2222 #[test]
2223 fn test_query_payload_filter_non_existent_field() {
2224 let store = EventStore::new();
2225
2226 store
2227 .ingest(create_test_event_with_payload(
2228 "entity-1",
2229 "user.action",
2230 serde_json::json!({"user_id": "alice"}),
2231 ))
2232 .unwrap();
2233
2234 let results = store
2236 .query(QueryEventsRequest {
2237 entity_id: None,
2238 event_type: None,
2239 tenant_id: None,
2240 as_of: None,
2241 since: None,
2242 until: None,
2243 limit: None,
2244 event_type_prefix: None,
2245 payload_filter: Some(r#"{"nonexistent":"value"}"#.to_string()),
2246 })
2247 .unwrap();
2248
2249 assert!(results.is_empty());
2250 }
2251
2252 #[test]
2253 fn test_query_payload_filter_with_prefix() {
2254 let store = EventStore::new();
2255
2256 store
2257 .ingest(create_test_event_with_payload(
2258 "entity-1",
2259 "index.created",
2260 serde_json::json!({"status": "active"}),
2261 ))
2262 .unwrap();
2263 store
2264 .ingest(create_test_event_with_payload(
2265 "entity-2",
2266 "index.created",
2267 serde_json::json!({"status": "inactive"}),
2268 ))
2269 .unwrap();
2270 store
2271 .ingest(create_test_event_with_payload(
2272 "entity-3",
2273 "trade.created",
2274 serde_json::json!({"status": "active"}),
2275 ))
2276 .unwrap();
2277
2278 let results = store
2280 .query(QueryEventsRequest {
2281 entity_id: None,
2282 event_type: None,
2283 tenant_id: None,
2284 as_of: None,
2285 since: None,
2286 until: None,
2287 limit: None,
2288 event_type_prefix: Some("index.".to_string()),
2289 payload_filter: Some(r#"{"status":"active"}"#.to_string()),
2290 })
2291 .unwrap();
2292
2293 assert_eq!(results.len(), 1);
2294 assert_eq!(results[0].entity_id().to_string(), "entity-1");
2295 }
2296
2297 #[test]
2298 fn test_flush_storage_no_storage() {
2299 let store = EventStore::new();
2300 let result = store.flush_storage();
2302 assert!(result.is_ok());
2303 }
2304
2305 #[test]
2306 fn test_state_evolution() {
2307 let store = EventStore::new();
2308
2309 store
2311 .ingest(
2312 Event::from_strings(
2313 "user.created".to_string(),
2314 "user-1".to_string(),
2315 "default".to_string(),
2316 serde_json::json!({"name": "Alice", "age": 25}),
2317 None,
2318 )
2319 .unwrap(),
2320 )
2321 .unwrap();
2322
2323 store
2325 .ingest(
2326 Event::from_strings(
2327 "user.updated".to_string(),
2328 "user-1".to_string(),
2329 "default".to_string(),
2330 serde_json::json!({"age": 26}),
2331 None,
2332 )
2333 .unwrap(),
2334 )
2335 .unwrap();
2336
2337 let state = store.reconstruct_state("user-1", None).unwrap();
2338 assert_eq!(state["current_state"]["name"], "Alice");
2340 assert_eq!(state["current_state"]["age"], 26);
2341 }
2342
2343 #[test]
2344 fn test_reject_system_event_types() {
2345 let store = EventStore::new();
2346
2347 let event = Event::reconstruct_from_strings(
2349 uuid::Uuid::new_v4(),
2350 "_system.tenant.created".to_string(),
2351 "_system:tenant:acme".to_string(),
2352 "_system".to_string(),
2353 serde_json::json!({"name": "ACME"}),
2354 chrono::Utc::now(),
2355 None,
2356 1,
2357 );
2358
2359 let result = store.ingest(event);
2360 assert!(result.is_err());
2361 let err = result.unwrap_err();
2362 assert!(
2363 err.to_string().contains("reserved for internal use"),
2364 "Expected system namespace rejection, got: {}",
2365 err
2366 );
2367 }
2368}