1#[cfg(feature = "server")]
2use crate::application::services::webhook::WebhookRegistry;
3#[cfg(feature = "server")]
4use crate::infrastructure::observability::metrics::MetricsRegistry;
5#[cfg(feature = "server")]
6use crate::infrastructure::web::websocket::WebSocketManager;
7use crate::{
8 application::{
9 dto::QueryEventsRequest,
10 services::{
11 consumer::ConsumerRegistry,
12 exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
13 pipeline::PipelineManager,
14 projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
15 replay::ReplayManager,
16 schema::{SchemaRegistry, SchemaRegistryConfig},
17 schema_evolution::SchemaEvolutionManager,
18 },
19 },
20 domain::entities::Event,
21 error::{AllSourceError, Result},
22 infrastructure::{
23 persistence::{
24 compaction::{CompactionConfig, CompactionManager},
25 index::{EventIndex, IndexEntry},
26 snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
27 storage::ParquetStorage,
28 wal::{WALConfig, WriteAheadLog},
29 },
30 query::geospatial::GeoIndex,
31 },
32};
33use chrono::{DateTime, Utc};
34use dashmap::DashMap;
35use parking_lot::RwLock;
36use std::{path::PathBuf, sync::Arc};
37#[cfg(feature = "server")]
38use tokio::sync::mpsc;
39
40pub struct EventStore {
42 events: Arc<RwLock<Vec<Event>>>,
44
45 index: Arc<EventIndex>,
47
48 pub(crate) projections: Arc<RwLock<ProjectionManager>>,
50
51 storage: Option<Arc<RwLock<ParquetStorage>>>,
53
54 #[cfg(feature = "server")]
56 websocket_manager: Arc<WebSocketManager>,
57
58 snapshot_manager: Arc<SnapshotManager>,
60
61 wal: Option<Arc<WriteAheadLog>>,
63
64 compaction_manager: Option<Arc<CompactionManager>>,
66
67 schema_registry: Arc<SchemaRegistry>,
69
70 replay_manager: Arc<ReplayManager>,
72
73 pipeline_manager: Arc<PipelineManager>,
75
76 #[cfg(feature = "server")]
78 metrics: Arc<MetricsRegistry>,
79
80 total_ingested: Arc<RwLock<u64>>,
82
83 projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
87
88 projection_status: Arc<DashMap<String, String>>,
91
92 #[cfg(feature = "server")]
94 webhook_registry: Arc<WebhookRegistry>,
95
96 #[cfg(feature = "server")]
98 webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
99
100 geo_index: Arc<GeoIndex>,
102
103 exactly_once: Arc<ExactlyOnceRegistry>,
105
106 schema_evolution: Arc<SchemaEvolutionManager>,
108
109 entity_versions: Arc<DashMap<String, u64>>,
112
113 consumer_registry: Arc<ConsumerRegistry>,
115}
116
117#[cfg(feature = "server")]
119#[derive(Debug, Clone)]
120pub struct WebhookDeliveryTask {
121 pub webhook: crate::application::services::webhook::WebhookSubscription,
122 pub event: Event,
123}
124
125impl EventStore {
126 pub fn new() -> Self {
128 Self::with_config(EventStoreConfig::default())
129 }
130
131 pub fn with_config(config: EventStoreConfig) -> Self {
133 let mut projections = ProjectionManager::new();
134
135 projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
137 projections.register(Arc::new(EventCounterProjection::new("event_counters")));
138
139 let storage = config
141 .storage_dir
142 .as_ref()
143 .and_then(|dir| match ParquetStorage::new(dir) {
144 Ok(storage) => {
145 tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
146 Some(Arc::new(RwLock::new(storage)))
147 }
148 Err(e) => {
149 tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
150 None
151 }
152 });
153
154 let wal = config.wal_dir.as_ref().and_then(|dir| {
156 match WriteAheadLog::new(dir, config.wal_config.clone()) {
157 Ok(wal) => {
158 tracing::info!("✅ WAL enabled at: {}", dir.display());
159 Some(Arc::new(wal))
160 }
161 Err(e) => {
162 tracing::error!("❌ Failed to initialize WAL: {}", e);
163 None
164 }
165 }
166 });
167
168 let compaction_manager = config.storage_dir.as_ref().map(|dir| {
170 let manager = CompactionManager::new(dir, config.compaction_config.clone());
171 Arc::new(manager)
172 });
173
174 let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
176 tracing::info!("✅ Schema registry enabled");
177
178 let replay_manager = Arc::new(ReplayManager::new());
180 tracing::info!("✅ Replay manager enabled");
181
182 let pipeline_manager = Arc::new(PipelineManager::new());
184 tracing::info!("✅ Pipeline manager enabled");
185
186 #[cfg(feature = "server")]
188 let metrics = {
189 let m = MetricsRegistry::new();
190 tracing::info!("✅ Prometheus metrics registry initialized");
191 m
192 };
193
194 let projection_state_cache = Arc::new(DashMap::new());
196 tracing::info!("✅ Projection state cache initialized");
197
198 #[cfg(feature = "server")]
200 let webhook_registry = {
201 let w = Arc::new(WebhookRegistry::new());
202 tracing::info!("✅ Webhook registry initialized");
203 w
204 };
205
206 let store = Self {
207 events: Arc::new(RwLock::new(Vec::new())),
208 index: Arc::new(EventIndex::new()),
209 projections: Arc::new(RwLock::new(projections)),
210 storage,
211 #[cfg(feature = "server")]
212 websocket_manager: Arc::new(WebSocketManager::new()),
213 snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
214 wal,
215 compaction_manager,
216 schema_registry,
217 replay_manager,
218 pipeline_manager,
219 #[cfg(feature = "server")]
220 metrics,
221 total_ingested: Arc::new(RwLock::new(0)),
222 projection_state_cache,
223 projection_status: Arc::new(DashMap::new()),
224 #[cfg(feature = "server")]
225 webhook_registry,
226 #[cfg(feature = "server")]
227 webhook_tx: Arc::new(RwLock::new(None)),
228 geo_index: Arc::new(GeoIndex::new()),
229 exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
230 schema_evolution: Arc::new(SchemaEvolutionManager::new()),
231 entity_versions: Arc::new(DashMap::new()),
232 consumer_registry: Arc::new(ConsumerRegistry::new()),
233 };
234
235 if let Some(ref storage) = store.storage
237 && let Ok(persisted_events) = storage.read().load_all_events()
238 && !persisted_events.is_empty()
239 {
240 tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
241
242 for event in persisted_events {
243 let offset = store.events.read().len();
244 if let Err(e) = store.index.index_event(
245 event.id,
246 event.entity_id_str(),
247 event.event_type_str(),
248 event.timestamp,
249 offset,
250 ) {
251 tracing::error!("Failed to re-index event {}: {}", event.id, e);
252 }
253
254 if let Err(e) = store.projections.read().process_event(&event) {
255 tracing::error!("Failed to re-process event {}: {}", event.id, e);
256 }
257
258 *store
260 .entity_versions
261 .entry(event.entity_id_str().to_string())
262 .or_insert(0) += 1;
263
264 store.events.write().push(event);
265 }
266
267 let total = store.events.read().len();
268 *store.total_ingested.write() = total as u64;
269 tracing::info!("✅ Successfully loaded {} events from storage", total);
270 }
271
272 if let Some(ref wal) = store.wal {
274 match wal.recover() {
275 Ok(recovered_events) if !recovered_events.is_empty() => {
276 let existing_ids: std::collections::HashSet<uuid::Uuid> =
278 store.events.read().iter().map(|e| e.id).collect();
279
280 let mut wal_new = 0usize;
281 for event in recovered_events {
282 if existing_ids.contains(&event.id) {
283 continue; }
285
286 let offset = store.events.read().len();
287 if let Err(e) = store.index.index_event(
288 event.id,
289 event.entity_id_str(),
290 event.event_type_str(),
291 event.timestamp,
292 offset,
293 ) {
294 tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
295 }
296
297 if let Err(e) = store.projections.read().process_event(&event) {
298 tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
299 }
300
301 *store
303 .entity_versions
304 .entry(event.entity_id_str().to_string())
305 .or_insert(0) += 1;
306
307 store.events.write().push(event);
308 wal_new += 1;
309 }
310
311 if wal_new > 0 {
312 let total = store.events.read().len();
313 *store.total_ingested.write() = total as u64;
314 tracing::info!(
315 "✅ Recovered {} new events from WAL ({} total)",
316 wal_new,
317 total
318 );
319
320 if let Some(ref storage) = store.storage {
326 tracing::info!(
327 "📸 Checkpointing {} WAL events to Parquet storage...",
328 wal_new
329 );
330 let parquet = storage.read();
331 let events = store.events.read();
332 let mut buffered = 0usize;
333 for event in events.iter().skip(events.len() - wal_new) {
334 if let Err(e) = parquet.append_event(event.clone()) {
335 tracing::error!(
336 "Failed to buffer WAL event for Parquet: {}",
337 e
338 );
339 } else {
340 buffered += 1;
341 }
342 }
343 drop(events);
344 drop(parquet);
345
346 if buffered > 0 {
347 if let Err(e) = store.flush_storage() {
348 tracing::error!("Failed to checkpoint to Parquet: {}", e);
349 } else if let Err(e) = wal.truncate() {
350 tracing::error!(
351 "Failed to truncate WAL after checkpoint: {}",
352 e
353 );
354 } else {
355 tracing::info!(
356 "✅ WAL checkpointed and truncated ({} events)",
357 buffered
358 );
359 }
360 }
361 }
362 }
363 }
364 Ok(_) => {
365 tracing::debug!("No events to recover from WAL");
366 }
367 Err(e) => {
368 tracing::error!("❌ WAL recovery failed: {}", e);
369 }
370 }
371 }
372
373 store
374 }
375
376 pub fn ingest_with_expected_version(
384 &self,
385 event: Event,
386 expected_version: Option<u64>,
387 ) -> Result<u64> {
388 self.validate_event(&event)?;
390
391 let entity_id = event.entity_id_str().to_string();
392
393 let new_version = {
396 let mut version_entry = self.entity_versions.entry(entity_id.clone()).or_insert(0);
397 let current = *version_entry;
398
399 if let Some(expected) = expected_version
400 && current != expected
401 {
402 return Err(crate::error::AllSourceError::VersionConflict {
403 expected,
404 current,
405 });
406 }
407
408 if let Some(ref wal) = self.wal {
410 wal.append(event.clone())?;
411 }
412
413 *version_entry += 1;
414 *version_entry
415 };
416
417 self.ingest_post_wal(event)?;
420
421 Ok(new_version)
422 }
423
424 fn ingest_post_wal(&self, event: Event) -> Result<()> {
427 #[cfg(feature = "server")]
428 let timer = self.metrics.ingestion_duration_seconds.start_timer();
429
430 let mut events = self.events.write();
431 let offset = events.len();
432
433 self.index.index_event(
435 event.id,
436 event.entity_id_str(),
437 event.event_type_str(),
438 event.timestamp,
439 offset,
440 )?;
441
442 let projections = self.projections.read();
444 projections.process_event(&event)?;
445 drop(projections);
446
447 let pipeline_results = self.pipeline_manager.process_event(&event);
449 if !pipeline_results.is_empty() {
450 tracing::debug!(
451 "Event {} processed by {} pipeline(s)",
452 event.id,
453 pipeline_results.len()
454 );
455 for (pipeline_id, result) in pipeline_results {
456 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
457 }
458 }
459
460 if let Some(ref storage) = self.storage {
462 let storage = storage.read();
463 storage.append_event(event.clone())?;
464 }
465
466 events.push(event.clone());
468 let total_events = events.len();
469 drop(events);
470
471 #[cfg(feature = "server")]
473 self.websocket_manager
474 .broadcast_event(Arc::new(event.clone()));
475
476 #[cfg(feature = "server")]
478 self.dispatch_webhooks(&event);
479
480 self.geo_index.index_event(&event);
482
483 self.schema_evolution
485 .analyze_event(event.event_type_str(), &event.payload);
486
487 self.check_auto_snapshot(event.entity_id_str(), &event);
489
490 #[cfg(feature = "server")]
492 {
493 self.metrics.events_ingested_total.inc();
494 self.metrics
495 .events_ingested_by_type
496 .with_label_values(&[event.event_type_str()])
497 .inc();
498 self.metrics.storage_events_total.set(total_events as i64);
499 }
500
501 let mut total = self.total_ingested.write();
503 *total += 1;
504
505 #[cfg(feature = "server")]
506 timer.observe_duration();
507
508 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
509
510 Ok(())
511 }
512
513 pub fn ingest(&self, event: Event) -> Result<()> {
515 #[cfg(feature = "server")]
517 let timer = self.metrics.ingestion_duration_seconds.start_timer();
518
519 let validation_result = self.validate_event(&event);
521 if let Err(e) = validation_result {
522 #[cfg(feature = "server")]
523 {
524 self.metrics.ingestion_errors_total.inc();
525 timer.observe_duration();
526 }
527 return Err(e);
528 }
529
530 if let Some(ref wal) = self.wal
533 && let Err(e) = wal.append(event.clone())
534 {
535 #[cfg(feature = "server")]
536 {
537 self.metrics.ingestion_errors_total.inc();
538 timer.observe_duration();
539 }
540 return Err(e);
541 }
542
543 *self
545 .entity_versions
546 .entry(event.entity_id_str().to_string())
547 .or_insert(0) += 1;
548
549 let mut events = self.events.write();
550 let offset = events.len();
551
552 self.index.index_event(
554 event.id,
555 event.entity_id_str(),
556 event.event_type_str(),
557 event.timestamp,
558 offset,
559 )?;
560
561 let projections = self.projections.read();
563 projections.process_event(&event)?;
564 drop(projections); let pipeline_results = self.pipeline_manager.process_event(&event);
569 if !pipeline_results.is_empty() {
570 tracing::debug!(
571 "Event {} processed by {} pipeline(s)",
572 event.id,
573 pipeline_results.len()
574 );
575 for (pipeline_id, result) in pipeline_results {
578 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
579 }
580 }
581
582 if let Some(ref storage) = self.storage {
584 let storage = storage.read();
585 storage.append_event(event.clone())?;
586 }
587
588 events.push(event.clone());
590 let total_events = events.len();
591 drop(events); #[cfg(feature = "server")]
595 self.websocket_manager
596 .broadcast_event(Arc::new(event.clone()));
597
598 #[cfg(feature = "server")]
600 self.dispatch_webhooks(&event);
601
602 self.geo_index.index_event(&event);
604
605 self.schema_evolution
607 .analyze_event(event.event_type_str(), &event.payload);
608
609 self.check_auto_snapshot(event.entity_id_str(), &event);
611
612 #[cfg(feature = "server")]
614 {
615 self.metrics.events_ingested_total.inc();
616 self.metrics
617 .events_ingested_by_type
618 .with_label_values(&[event.event_type_str()])
619 .inc();
620 self.metrics.storage_events_total.set(total_events as i64);
621 }
622
623 let mut total = self.total_ingested.write();
625 *total += 1;
626
627 #[cfg(feature = "server")]
628 timer.observe_duration();
629
630 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
631
632 Ok(())
633 }
634
635 pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()> {
642 if batch.is_empty() {
643 return Ok(());
644 }
645
646 for event in &batch {
648 self.validate_event(event)?;
649 }
650
651 if let Some(ref wal) = self.wal {
653 for event in &batch {
654 wal.append(event.clone())?;
655 }
656 }
657
658 let mut events = self.events.write();
660 let projections = self.projections.read();
661
662 for event in batch {
663 let offset = events.len();
664
665 self.index.index_event(
666 event.id,
667 event.entity_id_str(),
668 event.event_type_str(),
669 event.timestamp,
670 offset,
671 )?;
672
673 projections.process_event(&event)?;
674 self.pipeline_manager.process_event(&event);
675
676 if let Some(ref storage) = self.storage {
677 let storage = storage.read();
678 storage.append_event(event.clone())?;
679 }
680
681 self.geo_index.index_event(&event);
682 self.schema_evolution
683 .analyze_event(event.event_type_str(), &event.payload);
684
685 *self
687 .entity_versions
688 .entry(event.entity_id_str().to_string())
689 .or_insert(0) += 1;
690
691 events.push(event);
692 }
693
694 let total_events = events.len();
695 drop(projections);
696 drop(events);
697
698 let mut total = self.total_ingested.write();
699 *total += total_events as u64;
700
701 Ok(())
702 }
703
704 pub fn ingest_replicated(&self, event: Event) -> Result<()> {
711 #[cfg(feature = "server")]
712 let timer = self.metrics.ingestion_duration_seconds.start_timer();
713
714 let mut events = self.events.write();
715 let offset = events.len();
716
717 self.index.index_event(
719 event.id,
720 event.entity_id_str(),
721 event.event_type_str(),
722 event.timestamp,
723 offset,
724 )?;
725
726 let projections = self.projections.read();
728 projections.process_event(&event)?;
729 drop(projections);
730
731 let pipeline_results = self.pipeline_manager.process_event(&event);
733 if !pipeline_results.is_empty() {
734 tracing::debug!(
735 "Replicated event {} processed by {} pipeline(s)",
736 event.id,
737 pipeline_results.len()
738 );
739 }
740
741 *self
743 .entity_versions
744 .entry(event.entity_id_str().to_string())
745 .or_insert(0) += 1;
746
747 events.push(event.clone());
749 let total_events = events.len();
750 drop(events);
751
752 #[cfg(feature = "server")]
754 self.websocket_manager
755 .broadcast_event(Arc::new(event.clone()));
756
757 #[cfg(feature = "server")]
759 {
760 self.metrics.events_ingested_total.inc();
761 self.metrics
762 .events_ingested_by_type
763 .with_label_values(&[event.event_type_str()])
764 .inc();
765 self.metrics.storage_events_total.set(total_events as i64);
766 }
767
768 let mut total = self.total_ingested.write();
769 *total += 1;
770
771 #[cfg(feature = "server")]
772 timer.observe_duration();
773
774 tracing::debug!(
775 "Replicated event ingested: {} (offset: {})",
776 event.id,
777 offset
778 );
779
780 Ok(())
781 }
782
783 pub fn get_entity_version(&self, entity_id: &str) -> u64 {
786 self.entity_versions
787 .get(entity_id)
788 .map(|v| *v)
789 .unwrap_or(0)
790 }
791
792 pub fn consumer_registry(&self) -> &ConsumerRegistry {
794 &self.consumer_registry
795 }
796
797 pub fn total_events(&self) -> usize {
799 self.events.read().len()
800 }
801
802 pub fn events_after_offset(
805 &self,
806 offset: u64,
807 filters: &[String],
808 limit: usize,
809 ) -> Vec<(u64, Event)> {
810 let events = self.events.read();
811 let start = offset as usize;
812 if start >= events.len() {
813 return vec![];
814 }
815
816 events[start..]
817 .iter()
818 .enumerate()
819 .filter(|(_, event)| {
820 ConsumerRegistry::matches_filters(event.event_type_str(), filters)
821 })
822 .take(limit)
823 .map(|(i, event)| ((start + i + 1) as u64, event.clone()))
824 .collect()
825 }
826
827 #[cfg(feature = "server")]
829 pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
830 Arc::clone(&self.websocket_manager)
831 }
832
833 pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
835 Arc::clone(&self.snapshot_manager)
836 }
837
838 pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
840 self.compaction_manager.as_ref().map(Arc::clone)
841 }
842
843 pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
845 Arc::clone(&self.schema_registry)
846 }
847
848 pub fn replay_manager(&self) -> Arc<ReplayManager> {
850 Arc::clone(&self.replay_manager)
851 }
852
853 pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
855 Arc::clone(&self.pipeline_manager)
856 }
857
858 #[cfg(feature = "server")]
860 pub fn metrics(&self) -> Arc<MetricsRegistry> {
861 Arc::clone(&self.metrics)
862 }
863
864 pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
866 self.projections.read()
867 }
868
869 pub fn register_projection(
878 &self,
879 projection: Arc<dyn crate::application::services::projection::Projection>,
880 ) {
881 let mut pm = self.projections.write();
882 pm.register(projection);
883 }
884
885 pub fn register_projection_with_backfill(
891 &self,
892 projection: Arc<dyn crate::application::services::projection::Projection>,
893 ) -> Result<()> {
894 {
896 let mut pm = self.projections.write();
897 pm.register(Arc::clone(&projection));
898 }
899
900 let events = self.events.read();
902 for event in events.iter() {
903 projection.process(event)?;
904 }
905
906 Ok(())
907 }
908
909 pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
912 Arc::clone(&self.projection_state_cache)
913 }
914
915 pub fn projection_status(&self) -> Arc<DashMap<String, String>> {
917 Arc::clone(&self.projection_status)
918 }
919
920 pub fn geo_index(&self) -> Arc<GeoIndex> {
923 self.geo_index.clone()
924 }
925
926 pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
928 self.exactly_once.clone()
929 }
930
931 pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
933 self.schema_evolution.clone()
934 }
935
936 pub fn snapshot_events(&self) -> Vec<Event> {
942 self.events.read().clone()
943 }
944
945 pub fn compact_entity_tokens(
967 &self,
968 entity_id: &str,
969 token_event_type: &str,
970 merged_event: Event,
971 ) -> Result<bool> {
972 {
974 let events = self.events.read();
975 let has_tokens = events
976 .iter()
977 .any(|e| e.entity_id_str() == entity_id && e.event_type_str() == token_event_type);
978 if !has_tokens {
979 return Ok(false);
980 }
981 }
982
983 let projections = self.projections.read();
985 projections.process_event(&merged_event)?;
986 drop(projections);
987
988 let mut events = self.events.write();
990
991 events.retain(|e| {
992 !(e.entity_id_str() == entity_id && e.event_type_str() == token_event_type)
993 });
994
995 events.push(merged_event.clone());
996
997 if let Some(ref wal) = self.wal {
1001 wal.append(merged_event)?;
1002 }
1003
1004 self.index.clear();
1009 for (offset, event) in events.iter().enumerate() {
1010 if let Err(e) = self.index.index_event(
1011 event.id,
1012 event.entity_id_str(),
1013 event.event_type_str(),
1014 event.timestamp,
1015 offset,
1016 ) {
1017 tracing::warn!(
1018 event_id = %event.id,
1019 offset,
1020 "Failed to re-index event during compaction: {e}"
1021 );
1022 }
1023 }
1024
1025 Ok(true)
1026 }
1027
1028 #[cfg(feature = "server")]
1029 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
1030 Arc::clone(&self.webhook_registry)
1031 }
1032
1033 #[cfg(feature = "server")]
1036 pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
1037 *self.webhook_tx.write() = Some(tx);
1038 tracing::info!("Webhook delivery channel connected");
1039 }
1040
1041 #[cfg(feature = "server")]
1043 fn dispatch_webhooks(&self, event: &Event) {
1044 let matching = self.webhook_registry.find_matching(event);
1045 if matching.is_empty() {
1046 return;
1047 }
1048
1049 let tx_guard = self.webhook_tx.read();
1050 if let Some(ref tx) = *tx_guard {
1051 for webhook in matching {
1052 let task = WebhookDeliveryTask {
1053 webhook,
1054 event: event.clone(),
1055 };
1056 if let Err(e) = tx.send(task) {
1057 tracing::warn!("Failed to queue webhook delivery: {}", e);
1058 }
1059 }
1060 }
1061 }
1062
1063 pub fn flush_storage(&self) -> Result<()> {
1065 if let Some(ref storage) = self.storage {
1066 let storage = storage.read();
1067 storage.flush()?;
1068 tracing::info!("✅ Flushed events to persistent storage");
1069 }
1070 Ok(())
1071 }
1072
1073 pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
1075 let events = self.query(QueryEventsRequest {
1077 entity_id: Some(entity_id.to_string()),
1078 event_type: None,
1079 tenant_id: None,
1080 as_of: None,
1081 since: None,
1082 until: None,
1083 limit: None,
1084 event_type_prefix: None,
1085 payload_filter: None,
1086 })?;
1087
1088 if events.is_empty() {
1089 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1090 }
1091
1092 let mut state = serde_json::json!({});
1094 for event in &events {
1095 if let serde_json::Value::Object(ref mut state_map) = state
1096 && let serde_json::Value::Object(ref payload_map) = event.payload
1097 {
1098 for (key, value) in payload_map {
1099 state_map.insert(key.clone(), value.clone());
1100 }
1101 }
1102 }
1103
1104 let last_event = events.last().unwrap();
1105 self.snapshot_manager.create_snapshot(
1106 entity_id.to_string(),
1107 state,
1108 last_event.timestamp,
1109 events.len(),
1110 SnapshotType::Manual,
1111 )?;
1112
1113 Ok(())
1114 }
1115
1116 fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
1118 let entity_event_count = self
1120 .index
1121 .get_by_entity(entity_id)
1122 .map(|entries| entries.len())
1123 .unwrap_or(0);
1124
1125 if self.snapshot_manager.should_create_snapshot(
1126 entity_id,
1127 entity_event_count,
1128 event.timestamp,
1129 ) {
1130 if let Err(e) = self.create_snapshot(entity_id) {
1132 tracing::warn!(
1133 "Failed to create automatic snapshot for {}: {}",
1134 entity_id,
1135 e
1136 );
1137 }
1138 }
1139 }
1140
1141 fn validate_event(&self, event: &Event) -> Result<()> {
1143 if event.entity_id_str().is_empty() {
1146 return Err(AllSourceError::ValidationError(
1147 "entity_id cannot be empty".to_string(),
1148 ));
1149 }
1150
1151 if event.event_type_str().is_empty() {
1152 return Err(AllSourceError::ValidationError(
1153 "event_type cannot be empty".to_string(),
1154 ));
1155 }
1156
1157 if event.event_type().is_system() {
1160 return Err(AllSourceError::ValidationError(
1161 "Event types starting with '_system.' are reserved for internal use".to_string(),
1162 ));
1163 }
1164
1165 Ok(())
1166 }
1167
1168 pub fn reset_projection(&self, name: &str) -> Result<usize> {
1170 let projection_manager = self.projections.read();
1171 let projection = projection_manager.get_projection(name).ok_or_else(|| {
1172 AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1173 })?;
1174
1175 projection.clear();
1177
1178 let prefix = format!("{name}:");
1180 let keys_to_remove: Vec<String> = self
1181 .projection_state_cache
1182 .iter()
1183 .filter(|entry| entry.key().starts_with(&prefix))
1184 .map(|entry| entry.key().clone())
1185 .collect();
1186 for key in keys_to_remove {
1187 self.projection_state_cache.remove(&key);
1188 }
1189
1190 let events = self.events.read();
1192 let mut reprocessed = 0usize;
1193 for event in events.iter() {
1194 if projection.process(event).is_ok() {
1195 reprocessed += 1;
1196 }
1197 }
1198
1199 Ok(reprocessed)
1200 }
1201
1202 pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
1204 if let Some(offset) = self.index.get_by_id(event_id) {
1205 let events = self.events.read();
1206 Ok(events.get(offset).cloned())
1207 } else {
1208 Ok(None)
1209 }
1210 }
1211
1212 pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
1214 let query_type = if request.entity_id.is_some() {
1216 "entity"
1217 } else if request.event_type.is_some() {
1218 "type"
1219 } else if request.event_type_prefix.is_some() {
1220 "type_prefix"
1221 } else {
1222 "full_scan"
1223 };
1224
1225 #[cfg(feature = "server")]
1227 let timer = self
1228 .metrics
1229 .query_duration_seconds
1230 .with_label_values(&[query_type])
1231 .start_timer();
1232
1233 #[cfg(feature = "server")]
1235 self.metrics
1236 .queries_total
1237 .with_label_values(&[query_type])
1238 .inc();
1239
1240 let events = self.events.read();
1241
1242 let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
1244 self.index
1246 .get_by_entity(entity_id)
1247 .map(|entries| self.filter_entries(entries, &request))
1248 .unwrap_or_default()
1249 } else if let Some(event_type) = &request.event_type {
1250 self.index
1252 .get_by_type(event_type)
1253 .map(|entries| self.filter_entries(entries, &request))
1254 .unwrap_or_default()
1255 } else if let Some(prefix) = &request.event_type_prefix {
1256 let entries = self.index.get_by_type_prefix(prefix);
1258 self.filter_entries(entries, &request)
1259 } else {
1260 (0..events.len()).collect()
1262 };
1263
1264 let mut results: Vec<Event> = offsets
1266 .iter()
1267 .filter_map(|&offset| events.get(offset).cloned())
1268 .filter(|event| self.apply_filters(event, &request))
1269 .collect();
1270
1271 results.sort_by_key(|x| x.timestamp);
1273
1274 if let Some(limit) = request.limit {
1276 results.truncate(limit);
1277 }
1278
1279 #[cfg(feature = "server")]
1281 {
1282 self.metrics
1283 .query_results_total
1284 .with_label_values(&[query_type])
1285 .inc_by(results.len() as u64);
1286 timer.observe_duration();
1287 }
1288
1289 Ok(results)
1290 }
1291
1292 fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
1294 entries
1295 .into_iter()
1296 .filter(|entry| {
1297 if let Some(as_of) = request.as_of
1299 && entry.timestamp > as_of
1300 {
1301 return false;
1302 }
1303 if let Some(since) = request.since
1304 && entry.timestamp < since
1305 {
1306 return false;
1307 }
1308 if let Some(until) = request.until
1309 && entry.timestamp > until
1310 {
1311 return false;
1312 }
1313 true
1314 })
1315 .map(|entry| entry.offset)
1316 .collect()
1317 }
1318
1319 fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
1321 if request.entity_id.is_some()
1323 && let Some(ref event_type) = request.event_type
1324 && event.event_type_str() != event_type
1325 {
1326 return false;
1327 }
1328
1329 if request.entity_id.is_some()
1331 && let Some(ref prefix) = request.event_type_prefix
1332 && !event.event_type_str().starts_with(prefix)
1333 {
1334 return false;
1335 }
1336
1337 if let Some(ref filter_str) = request.payload_filter
1339 && let Ok(filter_obj) =
1340 serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(filter_str)
1341 {
1342 let payload = event.payload();
1343 for (key, expected_value) in &filter_obj {
1344 match payload.get(key) {
1345 Some(actual_value) if actual_value == expected_value => {}
1346 _ => return false,
1347 }
1348 }
1349 }
1350
1351 true
1352 }
1353
1354 pub fn reconstruct_state(
1357 &self,
1358 entity_id: &str,
1359 as_of: Option<DateTime<Utc>>,
1360 ) -> Result<serde_json::Value> {
1361 let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
1363 if let Some(snapshot) = self
1365 .snapshot_manager
1366 .get_snapshot_as_of(entity_id, as_of_time)
1367 {
1368 tracing::debug!(
1369 "Using snapshot from {} for entity {} (saved {} events)",
1370 snapshot.as_of,
1371 entity_id,
1372 snapshot.event_count
1373 );
1374 (snapshot.state.clone(), Some(snapshot.as_of))
1375 } else {
1376 (serde_json::json!({}), None)
1377 }
1378 } else {
1379 if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
1381 tracing::debug!(
1382 "Using latest snapshot from {} for entity {}",
1383 snapshot.as_of,
1384 entity_id
1385 );
1386 (snapshot.state.clone(), Some(snapshot.as_of))
1387 } else {
1388 (serde_json::json!({}), None)
1389 }
1390 };
1391
1392 let events = self.query(QueryEventsRequest {
1394 entity_id: Some(entity_id.to_string()),
1395 event_type: None,
1396 tenant_id: None,
1397 as_of,
1398 since: since_timestamp,
1399 until: None,
1400 limit: None,
1401 event_type_prefix: None,
1402 payload_filter: None,
1403 })?;
1404
1405 if events.is_empty() && since_timestamp.is_none() {
1407 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1408 }
1409
1410 let mut merged_state = merged_state;
1412 for event in &events {
1413 if let serde_json::Value::Object(ref mut state_map) = merged_state
1414 && let serde_json::Value::Object(ref payload_map) = event.payload
1415 {
1416 for (key, value) in payload_map {
1417 state_map.insert(key.clone(), value.clone());
1418 }
1419 }
1420 }
1421
1422 let state = serde_json::json!({
1424 "entity_id": entity_id,
1425 "last_updated": events.last().map(|e| e.timestamp),
1426 "event_count": events.len(),
1427 "as_of": as_of,
1428 "current_state": merged_state,
1429 "history": events.iter().map(|e| {
1430 serde_json::json!({
1431 "event_id": e.id,
1432 "type": e.event_type,
1433 "timestamp": e.timestamp,
1434 "payload": e.payload
1435 })
1436 }).collect::<Vec<_>>()
1437 });
1438
1439 Ok(state)
1440 }
1441
1442 pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
1444 let projections = self.projections.read();
1445
1446 if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
1447 && let Some(state) = snapshot_projection.get_state(entity_id)
1448 {
1449 return Ok(serde_json::json!({
1450 "entity_id": entity_id,
1451 "snapshot": state,
1452 "from_projection": "entity_snapshots"
1453 }));
1454 }
1455
1456 Err(AllSourceError::EntityNotFound(entity_id.to_string()))
1457 }
1458
1459 pub fn stats(&self) -> StoreStats {
1461 let events = self.events.read();
1462 let index_stats = self.index.stats();
1463
1464 StoreStats {
1465 total_events: events.len(),
1466 total_entities: index_stats.total_entities,
1467 total_event_types: index_stats.total_event_types,
1468 total_ingested: *self.total_ingested.read(),
1469 }
1470 }
1471
1472 pub fn list_streams(&self) -> Vec<StreamInfo> {
1474 self.index
1475 .get_all_entities()
1476 .into_iter()
1477 .map(|entity_id| {
1478 let event_count = self
1479 .index
1480 .get_by_entity(&entity_id)
1481 .map(|entries| entries.len())
1482 .unwrap_or(0);
1483 let last_event_at = self
1484 .index
1485 .get_by_entity(&entity_id)
1486 .and_then(|entries| entries.last().map(|e| e.timestamp));
1487 StreamInfo {
1488 stream_id: entity_id,
1489 event_count,
1490 last_event_at,
1491 }
1492 })
1493 .collect()
1494 }
1495
1496 pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
1498 self.index
1499 .get_all_types()
1500 .into_iter()
1501 .map(|event_type| {
1502 let event_count = self
1503 .index
1504 .get_by_type(&event_type)
1505 .map(|entries| entries.len())
1506 .unwrap_or(0);
1507 let last_event_at = self
1508 .index
1509 .get_by_type(&event_type)
1510 .and_then(|entries| entries.last().map(|e| e.timestamp));
1511 EventTypeInfo {
1512 event_type,
1513 event_count,
1514 last_event_at,
1515 }
1516 })
1517 .collect()
1518 }
1519
1520 pub fn enable_wal_replication(
1527 &self,
1528 tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
1529 ) {
1530 if let Some(ref wal_arc) = self.wal {
1531 wal_arc.set_replication_tx(tx);
1532 tracing::info!("WAL replication broadcast enabled");
1533 } else {
1534 tracing::warn!("Cannot enable WAL replication: WAL is not configured");
1535 }
1536 }
1537
1538 pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1541 self.wal.as_ref()
1542 }
1543
1544 pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1547 self.storage.as_ref()
1548 }
1549}
1550
1551#[derive(Debug, Clone, Default)]
1553pub struct EventStoreConfig {
1554 pub storage_dir: Option<PathBuf>,
1556
1557 pub snapshot_config: SnapshotConfig,
1559
1560 pub wal_dir: Option<PathBuf>,
1562
1563 pub wal_config: WALConfig,
1565
1566 pub compaction_config: CompactionConfig,
1568
1569 pub schema_registry_config: SchemaRegistryConfig,
1571
1572 pub system_data_dir: Option<PathBuf>,
1577
1578 pub bootstrap_tenant: Option<String>,
1580}
1581
1582impl EventStoreConfig {
1583 pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
1585 Self {
1586 storage_dir: Some(storage_dir.into()),
1587 ..Self::default()
1588 }
1589 }
1590
1591 pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
1593 Self {
1594 snapshot_config,
1595 ..Self::default()
1596 }
1597 }
1598
1599 pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
1601 Self {
1602 wal_dir: Some(wal_dir.into()),
1603 wal_config,
1604 ..Self::default()
1605 }
1606 }
1607
1608 pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
1610 Self {
1611 storage_dir: Some(storage_dir.into()),
1612 snapshot_config,
1613 ..Self::default()
1614 }
1615 }
1616
1617 pub fn production(
1619 storage_dir: impl Into<PathBuf>,
1620 wal_dir: impl Into<PathBuf>,
1621 snapshot_config: SnapshotConfig,
1622 wal_config: WALConfig,
1623 compaction_config: CompactionConfig,
1624 ) -> Self {
1625 let storage_dir = storage_dir.into();
1626 let system_data_dir = storage_dir.join("__system");
1627 Self {
1628 storage_dir: Some(storage_dir),
1629 snapshot_config,
1630 wal_dir: Some(wal_dir.into()),
1631 wal_config,
1632 compaction_config,
1633 system_data_dir: Some(system_data_dir),
1634 ..Self::default()
1635 }
1636 }
1637
1638 pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
1643 self.system_data_dir
1644 .clone()
1645 .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
1646 }
1647
1648 pub fn from_env() -> (Self, &'static str) {
1656 Self::from_env_vars(
1657 std::env::var("ALLSOURCE_DATA_DIR")
1658 .ok()
1659 .filter(|s| !s.is_empty()),
1660 std::env::var("ALLSOURCE_STORAGE_DIR")
1661 .ok()
1662 .filter(|s| !s.is_empty()),
1663 std::env::var("ALLSOURCE_WAL_DIR")
1664 .ok()
1665 .filter(|s| !s.is_empty()),
1666 std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
1667 )
1668 }
1669
1670 pub fn from_env_vars(
1672 data_dir: Option<String>,
1673 explicit_storage_dir: Option<String>,
1674 explicit_wal_dir: Option<String>,
1675 wal_enabled_var: Option<String>,
1676 ) -> (Self, &'static str) {
1677 let data_dir = data_dir.filter(|s| !s.is_empty());
1678 let storage_dir = explicit_storage_dir
1679 .filter(|s| !s.is_empty())
1680 .or_else(|| data_dir.as_ref().map(|d| format!("{}/storage", d)));
1681 let wal_dir = explicit_wal_dir
1682 .filter(|s| !s.is_empty())
1683 .or_else(|| data_dir.as_ref().map(|d| format!("{}/wal", d)));
1684 let wal_enabled = wal_enabled_var.map(|v| v == "true").unwrap_or(true);
1685
1686 match (&storage_dir, &wal_dir) {
1687 (Some(sd), Some(wd)) if wal_enabled => {
1688 let config = Self::production(
1689 sd,
1690 wd,
1691 SnapshotConfig::default(),
1692 WALConfig::default(),
1693 CompactionConfig::default(),
1694 );
1695 (config, "wal+parquet")
1696 }
1697 (Some(sd), _) => {
1698 let config = Self::with_persistence(sd);
1699 (config, "parquet-only")
1700 }
1701 (_, Some(wd)) if wal_enabled => {
1702 let config = Self::with_wal(wd, WALConfig::default());
1703 (config, "wal-only")
1704 }
1705 _ => (Self::default(), "in-memory"),
1706 }
1707 }
1708}
1709
1710#[derive(Debug, serde::Serialize)]
1711pub struct StoreStats {
1712 pub total_events: usize,
1713 pub total_entities: usize,
1714 pub total_event_types: usize,
1715 pub total_ingested: u64,
1716}
1717
1718#[derive(Debug, Clone, serde::Serialize)]
1720pub struct StreamInfo {
1721 pub stream_id: String,
1723 pub event_count: usize,
1725 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1727}
1728
1729#[derive(Debug, Clone, serde::Serialize)]
1731pub struct EventTypeInfo {
1732 pub event_type: String,
1734 pub event_count: usize,
1736 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1738}
1739
1740impl Default for EventStore {
1741 fn default() -> Self {
1742 Self::new()
1743 }
1744}
1745
1746#[cfg(test)]
1747mod tests {
1748 use super::*;
1749 use crate::domain::entities::Event;
1750 use tempfile::TempDir;
1751
1752 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1753 Event::from_strings(
1754 event_type.to_string(),
1755 entity_id.to_string(),
1756 "default".to_string(),
1757 serde_json::json!({"name": "Test", "value": 42}),
1758 None,
1759 )
1760 .unwrap()
1761 }
1762
1763 fn create_test_event_with_payload(
1764 entity_id: &str,
1765 event_type: &str,
1766 payload: serde_json::Value,
1767 ) -> Event {
1768 Event::from_strings(
1769 event_type.to_string(),
1770 entity_id.to_string(),
1771 "default".to_string(),
1772 payload,
1773 None,
1774 )
1775 .unwrap()
1776 }
1777
1778 #[test]
1779 fn test_event_store_new() {
1780 let store = EventStore::new();
1781 assert_eq!(store.stats().total_events, 0);
1782 assert_eq!(store.stats().total_entities, 0);
1783 }
1784
1785 #[test]
1786 fn test_event_store_default() {
1787 let store = EventStore::default();
1788 assert_eq!(store.stats().total_events, 0);
1789 }
1790
1791 #[test]
1792 fn test_ingest_single_event() {
1793 let store = EventStore::new();
1794 let event = create_test_event("entity-1", "user.created");
1795
1796 store.ingest(event).unwrap();
1797
1798 assert_eq!(store.stats().total_events, 1);
1799 assert_eq!(store.stats().total_ingested, 1);
1800 }
1801
1802 #[test]
1803 fn test_ingest_multiple_events() {
1804 let store = EventStore::new();
1805
1806 for i in 0..10 {
1807 let event = create_test_event(&format!("entity-{}", i), "user.created");
1808 store.ingest(event).unwrap();
1809 }
1810
1811 assert_eq!(store.stats().total_events, 10);
1812 assert_eq!(store.stats().total_ingested, 10);
1813 }
1814
1815 #[test]
1816 fn test_query_by_entity_id() {
1817 let store = EventStore::new();
1818
1819 store
1820 .ingest(create_test_event("entity-1", "user.created"))
1821 .unwrap();
1822 store
1823 .ingest(create_test_event("entity-2", "user.created"))
1824 .unwrap();
1825 store
1826 .ingest(create_test_event("entity-1", "user.updated"))
1827 .unwrap();
1828
1829 let results = store
1830 .query(QueryEventsRequest {
1831 entity_id: Some("entity-1".to_string()),
1832 event_type: None,
1833 tenant_id: None,
1834 as_of: None,
1835 since: None,
1836 until: None,
1837 limit: None,
1838 event_type_prefix: None,
1839 payload_filter: None,
1840 })
1841 .unwrap();
1842
1843 assert_eq!(results.len(), 2);
1844 }
1845
1846 #[test]
1847 fn test_query_by_event_type() {
1848 let store = EventStore::new();
1849
1850 store
1851 .ingest(create_test_event("entity-1", "user.created"))
1852 .unwrap();
1853 store
1854 .ingest(create_test_event("entity-2", "user.updated"))
1855 .unwrap();
1856 store
1857 .ingest(create_test_event("entity-3", "user.created"))
1858 .unwrap();
1859
1860 let results = store
1861 .query(QueryEventsRequest {
1862 entity_id: None,
1863 event_type: Some("user.created".to_string()),
1864 tenant_id: None,
1865 as_of: None,
1866 since: None,
1867 until: None,
1868 limit: None,
1869 event_type_prefix: None,
1870 payload_filter: None,
1871 })
1872 .unwrap();
1873
1874 assert_eq!(results.len(), 2);
1875 }
1876
1877 #[test]
1878 fn test_query_with_limit() {
1879 let store = EventStore::new();
1880
1881 for i in 0..10 {
1882 let event = create_test_event(&format!("entity-{}", i), "user.created");
1883 store.ingest(event).unwrap();
1884 }
1885
1886 let results = store
1887 .query(QueryEventsRequest {
1888 entity_id: None,
1889 event_type: None,
1890 tenant_id: None,
1891 as_of: None,
1892 since: None,
1893 until: None,
1894 limit: Some(5),
1895 event_type_prefix: None,
1896 payload_filter: None,
1897 })
1898 .unwrap();
1899
1900 assert_eq!(results.len(), 5);
1901 }
1902
1903 #[test]
1904 fn test_query_empty_store() {
1905 let store = EventStore::new();
1906
1907 let results = store
1908 .query(QueryEventsRequest {
1909 entity_id: Some("non-existent".to_string()),
1910 event_type: None,
1911 tenant_id: None,
1912 as_of: None,
1913 since: None,
1914 until: None,
1915 limit: None,
1916 event_type_prefix: None,
1917 payload_filter: None,
1918 })
1919 .unwrap();
1920
1921 assert!(results.is_empty());
1922 }
1923
1924 #[test]
1925 fn test_reconstruct_state() {
1926 let store = EventStore::new();
1927
1928 store
1929 .ingest(create_test_event("entity-1", "user.created"))
1930 .unwrap();
1931
1932 let state = store.reconstruct_state("entity-1", None).unwrap();
1933 assert_eq!(state["current_state"]["name"], "Test");
1935 assert_eq!(state["current_state"]["value"], 42);
1936 }
1937
1938 #[test]
1939 fn test_reconstruct_state_not_found() {
1940 let store = EventStore::new();
1941
1942 let result = store.reconstruct_state("non-existent", None);
1943 assert!(result.is_err());
1944 }
1945
1946 #[test]
1947 fn test_get_snapshot_empty() {
1948 let store = EventStore::new();
1949
1950 let result = store.get_snapshot("non-existent");
1951 assert!(result.is_err());
1953 }
1954
1955 #[test]
1956 fn test_create_snapshot() {
1957 let store = EventStore::new();
1958
1959 store
1960 .ingest(create_test_event("entity-1", "user.created"))
1961 .unwrap();
1962
1963 store.create_snapshot("entity-1").unwrap();
1964
1965 let snapshot = store.get_snapshot("entity-1").unwrap();
1967 assert!(snapshot != serde_json::json!(null));
1968 }
1969
1970 #[test]
1971 fn test_create_snapshot_entity_not_found() {
1972 let store = EventStore::new();
1973
1974 let result = store.create_snapshot("non-existent");
1975 assert!(result.is_err());
1976 }
1977
1978 #[test]
1979 fn test_websocket_manager() {
1980 let store = EventStore::new();
1981 let manager = store.websocket_manager();
1982 assert!(Arc::strong_count(&manager) >= 1);
1984 }
1985
1986 #[test]
1987 fn test_snapshot_manager() {
1988 let store = EventStore::new();
1989 let manager = store.snapshot_manager();
1990 assert!(Arc::strong_count(&manager) >= 1);
1991 }
1992
1993 #[test]
1994 fn test_compaction_manager_none() {
1995 let store = EventStore::new();
1996 assert!(store.compaction_manager().is_none());
1998 }
1999
2000 #[test]
2001 fn test_schema_registry() {
2002 let store = EventStore::new();
2003 let registry = store.schema_registry();
2004 assert!(Arc::strong_count(®istry) >= 1);
2005 }
2006
2007 #[test]
2008 fn test_replay_manager() {
2009 let store = EventStore::new();
2010 let manager = store.replay_manager();
2011 assert!(Arc::strong_count(&manager) >= 1);
2012 }
2013
2014 #[test]
2015 fn test_pipeline_manager() {
2016 let store = EventStore::new();
2017 let manager = store.pipeline_manager();
2018 assert!(Arc::strong_count(&manager) >= 1);
2019 }
2020
2021 #[test]
2022 fn test_projection_manager() {
2023 let store = EventStore::new();
2024 let manager = store.projection_manager();
2025 let projections = manager.list_projections();
2027 assert!(projections.len() >= 2); }
2029
2030 #[test]
2031 fn test_projection_state_cache() {
2032 let store = EventStore::new();
2033 let cache = store.projection_state_cache();
2034
2035 cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
2036 assert_eq!(cache.len(), 1);
2037
2038 let value = cache.get("test:key").unwrap();
2039 assert_eq!(value["value"], 123);
2040 }
2041
2042 #[test]
2043 fn test_metrics() {
2044 let store = EventStore::new();
2045 let metrics = store.metrics();
2046 assert!(Arc::strong_count(&metrics) >= 1);
2047 }
2048
2049 #[test]
2050 fn test_store_stats() {
2051 let store = EventStore::new();
2052
2053 store
2054 .ingest(create_test_event("entity-1", "user.created"))
2055 .unwrap();
2056 store
2057 .ingest(create_test_event("entity-2", "order.placed"))
2058 .unwrap();
2059
2060 let stats = store.stats();
2061 assert_eq!(stats.total_events, 2);
2062 assert_eq!(stats.total_entities, 2);
2063 assert_eq!(stats.total_event_types, 2);
2064 assert_eq!(stats.total_ingested, 2);
2065 }
2066
2067 #[test]
2068 fn test_event_store_config_default() {
2069 let config = EventStoreConfig::default();
2070 assert!(config.storage_dir.is_none());
2071 assert!(config.wal_dir.is_none());
2072 }
2073
2074 #[test]
2075 fn test_event_store_config_with_persistence() {
2076 let temp_dir = TempDir::new().unwrap();
2077 let config = EventStoreConfig::with_persistence(temp_dir.path());
2078
2079 assert!(config.storage_dir.is_some());
2080 assert!(config.wal_dir.is_none());
2081 }
2082
2083 #[test]
2084 fn test_event_store_config_with_wal() {
2085 let temp_dir = TempDir::new().unwrap();
2086 let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
2087
2088 assert!(config.storage_dir.is_none());
2089 assert!(config.wal_dir.is_some());
2090 }
2091
2092 #[test]
2093 fn test_event_store_config_with_all() {
2094 let temp_dir = TempDir::new().unwrap();
2095 let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
2096
2097 assert!(config.storage_dir.is_some());
2098 }
2099
2100 #[test]
2101 fn test_event_store_config_production() {
2102 let storage_dir = TempDir::new().unwrap();
2103 let wal_dir = TempDir::new().unwrap();
2104 let config = EventStoreConfig::production(
2105 storage_dir.path(),
2106 wal_dir.path(),
2107 SnapshotConfig::default(),
2108 WALConfig::default(),
2109 CompactionConfig::default(),
2110 );
2111
2112 assert!(config.storage_dir.is_some());
2113 assert!(config.wal_dir.is_some());
2114 }
2115
2116 #[test]
2122 fn test_from_env_vars_data_dir_enables_full_persistence() {
2123 let (config, mode) =
2124 EventStoreConfig::from_env_vars(Some("/app/data".to_string()), None, None, None);
2125 assert_eq!(mode, "wal+parquet");
2126 assert_eq!(
2127 config.storage_dir.unwrap().to_str().unwrap(),
2128 "/app/data/storage"
2129 );
2130 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
2131 }
2132
2133 #[test]
2134 fn test_from_env_vars_explicit_dirs() {
2135 let (config, mode) = EventStoreConfig::from_env_vars(
2136 None,
2137 Some("/custom/storage".to_string()),
2138 Some("/custom/wal".to_string()),
2139 None,
2140 );
2141 assert_eq!(mode, "wal+parquet");
2142 assert_eq!(
2143 config.storage_dir.unwrap().to_str().unwrap(),
2144 "/custom/storage"
2145 );
2146 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
2147 }
2148
2149 #[test]
2150 fn test_from_env_vars_wal_disabled() {
2151 let (config, mode) = EventStoreConfig::from_env_vars(
2152 Some("/app/data".to_string()),
2153 None,
2154 None,
2155 Some("false".to_string()),
2156 );
2157 assert_eq!(mode, "parquet-only");
2158 assert!(config.storage_dir.is_some());
2159 assert!(config.wal_dir.is_none());
2160 }
2161
2162 #[test]
2163 fn test_from_env_vars_no_dirs_is_in_memory() {
2164 let (config, mode) = EventStoreConfig::from_env_vars(None, None, None, None);
2165 assert_eq!(mode, "in-memory");
2166 assert!(config.storage_dir.is_none());
2167 assert!(config.wal_dir.is_none());
2168 }
2169
2170 #[test]
2171 fn test_from_env_vars_empty_strings_treated_as_none() {
2172 let (_, mode) = EventStoreConfig::from_env_vars(
2173 Some("".to_string()),
2174 Some("".to_string()),
2175 Some("".to_string()),
2176 None,
2177 );
2178 assert_eq!(mode, "in-memory");
2179 }
2180
2181 #[test]
2182 fn test_from_env_vars_explicit_overrides_data_dir() {
2183 let (config, mode) = EventStoreConfig::from_env_vars(
2184 Some("/app/data".to_string()),
2185 Some("/override/storage".to_string()),
2186 Some("/override/wal".to_string()),
2187 None,
2188 );
2189 assert_eq!(mode, "wal+parquet");
2190 assert_eq!(
2191 config.storage_dir.unwrap().to_str().unwrap(),
2192 "/override/storage"
2193 );
2194 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
2195 }
2196
2197 #[test]
2198 fn test_from_env_vars_wal_only() {
2199 let (config, mode) =
2200 EventStoreConfig::from_env_vars(None, None, Some("/wal/only".to_string()), None);
2201 assert_eq!(mode, "wal-only");
2202 assert!(config.storage_dir.is_none());
2203 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
2204 }
2205
2206 #[test]
2207 fn test_store_stats_serde() {
2208 let stats = StoreStats {
2209 total_events: 100,
2210 total_entities: 50,
2211 total_event_types: 10,
2212 total_ingested: 100,
2213 };
2214
2215 let json = serde_json::to_string(&stats).unwrap();
2216 assert!(json.contains("\"total_events\":100"));
2217 assert!(json.contains("\"total_entities\":50"));
2218 }
2219
2220 #[test]
2221 fn test_query_with_entity_and_type() {
2222 let store = EventStore::new();
2223
2224 store
2225 .ingest(create_test_event("entity-1", "user.created"))
2226 .unwrap();
2227 store
2228 .ingest(create_test_event("entity-1", "user.updated"))
2229 .unwrap();
2230 store
2231 .ingest(create_test_event("entity-2", "user.created"))
2232 .unwrap();
2233
2234 let results = store
2235 .query(QueryEventsRequest {
2236 entity_id: Some("entity-1".to_string()),
2237 event_type: Some("user.created".to_string()),
2238 tenant_id: None,
2239 as_of: None,
2240 since: None,
2241 until: None,
2242 limit: None,
2243 event_type_prefix: None,
2244 payload_filter: None,
2245 })
2246 .unwrap();
2247
2248 assert_eq!(results.len(), 1);
2249 assert_eq!(results[0].event_type_str(), "user.created");
2250 }
2251
2252 #[test]
2253 fn test_query_by_event_type_prefix() {
2254 let store = EventStore::new();
2255
2256 store
2258 .ingest(create_test_event("entity-1", "index.created"))
2259 .unwrap();
2260 store
2261 .ingest(create_test_event("entity-2", "index.updated"))
2262 .unwrap();
2263 store
2264 .ingest(create_test_event("entity-3", "trade.created"))
2265 .unwrap();
2266 store
2267 .ingest(create_test_event("entity-4", "trade.completed"))
2268 .unwrap();
2269 store
2270 .ingest(create_test_event("entity-5", "balance.updated"))
2271 .unwrap();
2272
2273 let results = store
2275 .query(QueryEventsRequest {
2276 entity_id: None,
2277 event_type: None,
2278 tenant_id: None,
2279 as_of: None,
2280 since: None,
2281 until: None,
2282 limit: None,
2283 event_type_prefix: Some("index.".to_string()),
2284 payload_filter: None,
2285 })
2286 .unwrap();
2287
2288 assert_eq!(results.len(), 2);
2289 assert!(
2290 results
2291 .iter()
2292 .all(|e| e.event_type_str().starts_with("index."))
2293 );
2294 }
2295
2296 #[test]
2297 fn test_query_by_event_type_prefix_empty_returns_all() {
2298 let store = EventStore::new();
2299
2300 store
2301 .ingest(create_test_event("entity-1", "index.created"))
2302 .unwrap();
2303 store
2304 .ingest(create_test_event("entity-2", "trade.created"))
2305 .unwrap();
2306
2307 let results = store
2309 .query(QueryEventsRequest {
2310 entity_id: None,
2311 event_type: None,
2312 tenant_id: None,
2313 as_of: None,
2314 since: None,
2315 until: None,
2316 limit: None,
2317 event_type_prefix: Some("".to_string()),
2318 payload_filter: None,
2319 })
2320 .unwrap();
2321
2322 assert_eq!(results.len(), 2);
2323 }
2324
2325 #[test]
2326 fn test_query_by_event_type_prefix_no_match() {
2327 let store = EventStore::new();
2328
2329 store
2330 .ingest(create_test_event("entity-1", "index.created"))
2331 .unwrap();
2332
2333 let results = store
2334 .query(QueryEventsRequest {
2335 entity_id: None,
2336 event_type: None,
2337 tenant_id: None,
2338 as_of: None,
2339 since: None,
2340 until: None,
2341 limit: None,
2342 event_type_prefix: Some("nonexistent.".to_string()),
2343 payload_filter: None,
2344 })
2345 .unwrap();
2346
2347 assert!(results.is_empty());
2348 }
2349
2350 #[test]
2351 fn test_query_by_entity_with_type_prefix() {
2352 let store = EventStore::new();
2353
2354 store
2355 .ingest(create_test_event("entity-1", "index.created"))
2356 .unwrap();
2357 store
2358 .ingest(create_test_event("entity-1", "trade.created"))
2359 .unwrap();
2360 store
2361 .ingest(create_test_event("entity-2", "index.updated"))
2362 .unwrap();
2363
2364 let results = store
2366 .query(QueryEventsRequest {
2367 entity_id: Some("entity-1".to_string()),
2368 event_type: None,
2369 tenant_id: None,
2370 as_of: None,
2371 since: None,
2372 until: None,
2373 limit: None,
2374 event_type_prefix: Some("index.".to_string()),
2375 payload_filter: None,
2376 })
2377 .unwrap();
2378
2379 assert_eq!(results.len(), 1);
2380 assert_eq!(results[0].event_type_str(), "index.created");
2381 }
2382
2383 #[test]
2384 fn test_query_prefix_with_limit() {
2385 let store = EventStore::new();
2386
2387 for i in 0..5 {
2388 store
2389 .ingest(create_test_event(&format!("entity-{}", i), "index.created"))
2390 .unwrap();
2391 }
2392
2393 let results = store
2394 .query(QueryEventsRequest {
2395 entity_id: None,
2396 event_type: None,
2397 tenant_id: None,
2398 as_of: None,
2399 since: None,
2400 until: None,
2401 limit: Some(3),
2402 event_type_prefix: Some("index.".to_string()),
2403 payload_filter: None,
2404 })
2405 .unwrap();
2406
2407 assert_eq!(results.len(), 3);
2408 }
2409
2410 #[test]
2411 fn test_query_prefix_alongside_existing_filters() {
2412 let store = EventStore::new();
2413
2414 store
2415 .ingest(create_test_event("entity-1", "index.created"))
2416 .unwrap();
2417 std::thread::sleep(std::time::Duration::from_millis(10));
2419 store
2420 .ingest(create_test_event("entity-2", "index.strategy.updated"))
2421 .unwrap();
2422 std::thread::sleep(std::time::Duration::from_millis(10));
2423 store
2424 .ingest(create_test_event("entity-3", "index.deleted"))
2425 .unwrap();
2426
2427 let results = store
2429 .query(QueryEventsRequest {
2430 entity_id: None,
2431 event_type: None,
2432 tenant_id: None,
2433 as_of: None,
2434 since: None,
2435 until: None,
2436 limit: Some(2),
2437 event_type_prefix: Some("index.".to_string()),
2438 payload_filter: None,
2439 })
2440 .unwrap();
2441
2442 assert_eq!(results.len(), 2);
2443 }
2444
2445 #[test]
2446 fn test_query_with_payload_filter() {
2447 let store = EventStore::new();
2448
2449 for i in 0..5 {
2451 store
2452 .ingest(create_test_event_with_payload(
2453 &format!("entity-{}", i),
2454 "user.action",
2455 serde_json::json!({"user_id": "alice", "action": "click"}),
2456 ))
2457 .unwrap();
2458 }
2459 for i in 5..10 {
2461 store
2462 .ingest(create_test_event_with_payload(
2463 &format!("entity-{}", i),
2464 "user.action",
2465 serde_json::json!({"user_id": "bob", "action": "view"}),
2466 ))
2467 .unwrap();
2468 }
2469
2470 let results = store
2472 .query(QueryEventsRequest {
2473 entity_id: None,
2474 event_type: Some("user.action".to_string()),
2475 tenant_id: None,
2476 as_of: None,
2477 since: None,
2478 until: None,
2479 limit: None,
2480 event_type_prefix: None,
2481 payload_filter: Some(r#"{"user_id":"alice"}"#.to_string()),
2482 })
2483 .unwrap();
2484
2485 assert_eq!(results.len(), 5);
2486 }
2487
2488 #[test]
2489 fn test_query_payload_filter_non_existent_field() {
2490 let store = EventStore::new();
2491
2492 store
2493 .ingest(create_test_event_with_payload(
2494 "entity-1",
2495 "user.action",
2496 serde_json::json!({"user_id": "alice"}),
2497 ))
2498 .unwrap();
2499
2500 let results = store
2502 .query(QueryEventsRequest {
2503 entity_id: None,
2504 event_type: None,
2505 tenant_id: None,
2506 as_of: None,
2507 since: None,
2508 until: None,
2509 limit: None,
2510 event_type_prefix: None,
2511 payload_filter: Some(r#"{"nonexistent":"value"}"#.to_string()),
2512 })
2513 .unwrap();
2514
2515 assert!(results.is_empty());
2516 }
2517
2518 #[test]
2519 fn test_query_payload_filter_with_prefix() {
2520 let store = EventStore::new();
2521
2522 store
2523 .ingest(create_test_event_with_payload(
2524 "entity-1",
2525 "index.created",
2526 serde_json::json!({"status": "active"}),
2527 ))
2528 .unwrap();
2529 store
2530 .ingest(create_test_event_with_payload(
2531 "entity-2",
2532 "index.created",
2533 serde_json::json!({"status": "inactive"}),
2534 ))
2535 .unwrap();
2536 store
2537 .ingest(create_test_event_with_payload(
2538 "entity-3",
2539 "trade.created",
2540 serde_json::json!({"status": "active"}),
2541 ))
2542 .unwrap();
2543
2544 let results = store
2546 .query(QueryEventsRequest {
2547 entity_id: None,
2548 event_type: None,
2549 tenant_id: None,
2550 as_of: None,
2551 since: None,
2552 until: None,
2553 limit: None,
2554 event_type_prefix: Some("index.".to_string()),
2555 payload_filter: Some(r#"{"status":"active"}"#.to_string()),
2556 })
2557 .unwrap();
2558
2559 assert_eq!(results.len(), 1);
2560 assert_eq!(results[0].entity_id().to_string(), "entity-1");
2561 }
2562
2563 #[test]
2564 fn test_flush_storage_no_storage() {
2565 let store = EventStore::new();
2566 let result = store.flush_storage();
2568 assert!(result.is_ok());
2569 }
2570
2571 #[test]
2572 fn test_state_evolution() {
2573 let store = EventStore::new();
2574
2575 store
2577 .ingest(
2578 Event::from_strings(
2579 "user.created".to_string(),
2580 "user-1".to_string(),
2581 "default".to_string(),
2582 serde_json::json!({"name": "Alice", "age": 25}),
2583 None,
2584 )
2585 .unwrap(),
2586 )
2587 .unwrap();
2588
2589 store
2591 .ingest(
2592 Event::from_strings(
2593 "user.updated".to_string(),
2594 "user-1".to_string(),
2595 "default".to_string(),
2596 serde_json::json!({"age": 26}),
2597 None,
2598 )
2599 .unwrap(),
2600 )
2601 .unwrap();
2602
2603 let state = store.reconstruct_state("user-1", None).unwrap();
2604 assert_eq!(state["current_state"]["name"], "Alice");
2606 assert_eq!(state["current_state"]["age"], 26);
2607 }
2608
2609 #[test]
2610 fn test_reject_system_event_types() {
2611 let store = EventStore::new();
2612
2613 let event = Event::reconstruct_from_strings(
2615 uuid::Uuid::new_v4(),
2616 "_system.tenant.created".to_string(),
2617 "_system:tenant:acme".to_string(),
2618 "_system".to_string(),
2619 serde_json::json!({"name": "ACME"}),
2620 chrono::Utc::now(),
2621 None,
2622 1,
2623 );
2624
2625 let result = store.ingest(event);
2626 assert!(result.is_err());
2627 let err = result.unwrap_err();
2628 assert!(
2629 err.to_string().contains("reserved for internal use"),
2630 "Expected system namespace rejection, got: {}",
2631 err
2632 );
2633 }
2634
2635 #[test]
2643 fn test_wal_recovery_checkpoints_to_parquet() {
2644 let data_dir = TempDir::new().unwrap();
2645 let storage_dir = data_dir.path().join("storage");
2646 let wal_dir = data_dir.path().join("wal");
2647
2648 {
2650 let config = EventStoreConfig::production(
2651 &storage_dir,
2652 &wal_dir,
2653 SnapshotConfig::default(),
2654 WALConfig {
2655 sync_on_write: true,
2656 ..WALConfig::default()
2657 },
2658 CompactionConfig::default(),
2659 );
2660 let store = EventStore::with_config(config);
2661
2662 for i in 0..5 {
2663 let event = Event::from_strings(
2664 "test.created".to_string(),
2665 format!("entity-{}", i),
2666 "default".to_string(),
2667 serde_json::json!({"index": i}),
2668 None,
2669 )
2670 .unwrap();
2671 store.ingest(event).unwrap();
2672 }
2673
2674 assert_eq!(store.stats().total_events, 5);
2675
2676 }
2679
2680 let wal_files: Vec<_> = std::fs::read_dir(&wal_dir)
2682 .unwrap()
2683 .filter_map(|e| e.ok())
2684 .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
2685 .collect();
2686 assert!(!wal_files.is_empty(), "WAL file should exist");
2687 let wal_size = wal_files[0].metadata().unwrap().len();
2688 assert!(wal_size > 0, "WAL file should have data (got 0 bytes)");
2689
2690 {
2692 let config = EventStoreConfig::production(
2693 &storage_dir,
2694 &wal_dir,
2695 SnapshotConfig::default(),
2696 WALConfig {
2697 sync_on_write: true,
2698 ..WALConfig::default()
2699 },
2700 CompactionConfig::default(),
2701 );
2702 let store = EventStore::with_config(config);
2703
2704 assert_eq!(
2706 store.stats().total_events,
2707 5,
2708 "Session 2 should have all 5 events after WAL recovery"
2709 );
2710
2711 let parquet_files: Vec<_> = std::fs::read_dir(&storage_dir)
2713 .unwrap()
2714 .filter_map(|e| e.ok())
2715 .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2716 .collect();
2717 assert!(
2718 !parquet_files.is_empty(),
2719 "Parquet file should exist after WAL checkpoint"
2720 );
2721 }
2722
2723 {
2725 let config = EventStoreConfig::production(
2726 &storage_dir,
2727 &wal_dir,
2728 SnapshotConfig::default(),
2729 WALConfig {
2730 sync_on_write: true,
2731 ..WALConfig::default()
2732 },
2733 CompactionConfig::default(),
2734 );
2735 let store = EventStore::with_config(config);
2736
2737 assert_eq!(
2738 store.stats().total_events,
2739 5,
2740 "Session 3 should still have all 5 events from Parquet"
2741 );
2742 }
2743 }
2744}