1#[cfg(feature = "server")]
2use crate::application::services::webhook::WebhookRegistry;
3#[cfg(feature = "server")]
4use crate::infrastructure::observability::metrics::MetricsRegistry;
5#[cfg(feature = "server")]
6use crate::infrastructure::web::websocket::WebSocketManager;
7use crate::{
8 application::{
9 dto::QueryEventsRequest,
10 services::{
11 exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
12 pipeline::PipelineManager,
13 projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
14 replay::ReplayManager,
15 schema::{SchemaRegistry, SchemaRegistryConfig},
16 schema_evolution::SchemaEvolutionManager,
17 },
18 },
19 domain::entities::Event,
20 error::{AllSourceError, Result},
21 infrastructure::{
22 persistence::{
23 compaction::{CompactionConfig, CompactionManager},
24 index::{EventIndex, IndexEntry},
25 snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
26 storage::ParquetStorage,
27 wal::{WALConfig, WriteAheadLog},
28 },
29 query::geospatial::GeoIndex,
30 },
31};
32use chrono::{DateTime, Utc};
33use dashmap::DashMap;
34use parking_lot::RwLock;
35use std::{path::PathBuf, sync::Arc};
36#[cfg(feature = "server")]
37use tokio::sync::mpsc;
38
39pub struct EventStore {
41 events: Arc<RwLock<Vec<Event>>>,
43
44 index: Arc<EventIndex>,
46
47 pub(crate) projections: Arc<RwLock<ProjectionManager>>,
49
50 storage: Option<Arc<RwLock<ParquetStorage>>>,
52
53 #[cfg(feature = "server")]
55 websocket_manager: Arc<WebSocketManager>,
56
57 snapshot_manager: Arc<SnapshotManager>,
59
60 wal: Option<Arc<WriteAheadLog>>,
62
63 compaction_manager: Option<Arc<CompactionManager>>,
65
66 schema_registry: Arc<SchemaRegistry>,
68
69 replay_manager: Arc<ReplayManager>,
71
72 pipeline_manager: Arc<PipelineManager>,
74
75 #[cfg(feature = "server")]
77 metrics: Arc<MetricsRegistry>,
78
79 total_ingested: Arc<RwLock<u64>>,
81
82 projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
86
87 projection_status: Arc<DashMap<String, String>>,
90
91 #[cfg(feature = "server")]
93 webhook_registry: Arc<WebhookRegistry>,
94
95 #[cfg(feature = "server")]
97 webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
98
99 geo_index: Arc<GeoIndex>,
101
102 exactly_once: Arc<ExactlyOnceRegistry>,
104
105 schema_evolution: Arc<SchemaEvolutionManager>,
107}
108
109#[cfg(feature = "server")]
111#[derive(Debug, Clone)]
112pub struct WebhookDeliveryTask {
113 pub webhook: crate::application::services::webhook::WebhookSubscription,
114 pub event: Event,
115}
116
117impl EventStore {
118 pub fn new() -> Self {
120 Self::with_config(EventStoreConfig::default())
121 }
122
123 pub fn with_config(config: EventStoreConfig) -> Self {
125 let mut projections = ProjectionManager::new();
126
127 projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
129 projections.register(Arc::new(EventCounterProjection::new("event_counters")));
130
131 let storage = config
133 .storage_dir
134 .as_ref()
135 .and_then(|dir| match ParquetStorage::new(dir) {
136 Ok(storage) => {
137 tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
138 Some(Arc::new(RwLock::new(storage)))
139 }
140 Err(e) => {
141 tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
142 None
143 }
144 });
145
146 let wal = config.wal_dir.as_ref().and_then(|dir| {
148 match WriteAheadLog::new(dir, config.wal_config.clone()) {
149 Ok(wal) => {
150 tracing::info!("✅ WAL enabled at: {}", dir.display());
151 Some(Arc::new(wal))
152 }
153 Err(e) => {
154 tracing::error!("❌ Failed to initialize WAL: {}", e);
155 None
156 }
157 }
158 });
159
160 let compaction_manager = config.storage_dir.as_ref().map(|dir| {
162 let manager = CompactionManager::new(dir, config.compaction_config.clone());
163 Arc::new(manager)
164 });
165
166 let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
168 tracing::info!("✅ Schema registry enabled");
169
170 let replay_manager = Arc::new(ReplayManager::new());
172 tracing::info!("✅ Replay manager enabled");
173
174 let pipeline_manager = Arc::new(PipelineManager::new());
176 tracing::info!("✅ Pipeline manager enabled");
177
178 #[cfg(feature = "server")]
180 let metrics = {
181 let m = MetricsRegistry::new();
182 tracing::info!("✅ Prometheus metrics registry initialized");
183 m
184 };
185
186 let projection_state_cache = Arc::new(DashMap::new());
188 tracing::info!("✅ Projection state cache initialized");
189
190 #[cfg(feature = "server")]
192 let webhook_registry = {
193 let w = Arc::new(WebhookRegistry::new());
194 tracing::info!("✅ Webhook registry initialized");
195 w
196 };
197
198 let store = Self {
199 events: Arc::new(RwLock::new(Vec::new())),
200 index: Arc::new(EventIndex::new()),
201 projections: Arc::new(RwLock::new(projections)),
202 storage,
203 #[cfg(feature = "server")]
204 websocket_manager: Arc::new(WebSocketManager::new()),
205 snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
206 wal,
207 compaction_manager,
208 schema_registry,
209 replay_manager,
210 pipeline_manager,
211 #[cfg(feature = "server")]
212 metrics,
213 total_ingested: Arc::new(RwLock::new(0)),
214 projection_state_cache,
215 projection_status: Arc::new(DashMap::new()),
216 #[cfg(feature = "server")]
217 webhook_registry,
218 #[cfg(feature = "server")]
219 webhook_tx: Arc::new(RwLock::new(None)),
220 geo_index: Arc::new(GeoIndex::new()),
221 exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
222 schema_evolution: Arc::new(SchemaEvolutionManager::new()),
223 };
224
225 if let Some(ref storage) = store.storage
227 && let Ok(persisted_events) = storage.read().load_all_events()
228 && !persisted_events.is_empty()
229 {
230 tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
231
232 for event in persisted_events {
233 let offset = store.events.read().len();
234 if let Err(e) = store.index.index_event(
235 event.id,
236 event.entity_id_str(),
237 event.event_type_str(),
238 event.timestamp,
239 offset,
240 ) {
241 tracing::error!("Failed to re-index event {}: {}", event.id, e);
242 }
243
244 if let Err(e) = store.projections.read().process_event(&event) {
245 tracing::error!("Failed to re-process event {}: {}", event.id, e);
246 }
247
248 store.events.write().push(event);
249 }
250
251 let total = store.events.read().len();
252 *store.total_ingested.write() = total as u64;
253 tracing::info!("✅ Successfully loaded {} events from storage", total);
254 }
255
256 if let Some(ref wal) = store.wal {
258 match wal.recover() {
259 Ok(recovered_events) if !recovered_events.is_empty() => {
260 let existing_ids: std::collections::HashSet<uuid::Uuid> =
262 store.events.read().iter().map(|e| e.id).collect();
263
264 let mut wal_new = 0usize;
265 for event in recovered_events {
266 if existing_ids.contains(&event.id) {
267 continue; }
269
270 let offset = store.events.read().len();
271 if let Err(e) = store.index.index_event(
272 event.id,
273 event.entity_id_str(),
274 event.event_type_str(),
275 event.timestamp,
276 offset,
277 ) {
278 tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
279 }
280
281 if let Err(e) = store.projections.read().process_event(&event) {
282 tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
283 }
284
285 store.events.write().push(event);
286 wal_new += 1;
287 }
288
289 if wal_new > 0 {
290 let total = store.events.read().len();
291 *store.total_ingested.write() = total as u64;
292 tracing::info!(
293 "✅ Recovered {} new events from WAL ({} total)",
294 wal_new,
295 total
296 );
297
298 if let Some(ref storage) = store.storage {
304 tracing::info!(
305 "📸 Checkpointing {} WAL events to Parquet storage...",
306 wal_new
307 );
308 let parquet = storage.read();
309 let events = store.events.read();
310 let mut buffered = 0usize;
311 for event in events.iter().skip(events.len() - wal_new) {
312 if let Err(e) = parquet.append_event(event.clone()) {
313 tracing::error!(
314 "Failed to buffer WAL event for Parquet: {}",
315 e
316 );
317 } else {
318 buffered += 1;
319 }
320 }
321 drop(events);
322 drop(parquet);
323
324 if buffered > 0 {
325 if let Err(e) = store.flush_storage() {
326 tracing::error!("Failed to checkpoint to Parquet: {}", e);
327 } else if let Err(e) = wal.truncate() {
328 tracing::error!(
329 "Failed to truncate WAL after checkpoint: {}",
330 e
331 );
332 } else {
333 tracing::info!(
334 "✅ WAL checkpointed and truncated ({} events)",
335 buffered
336 );
337 }
338 }
339 }
340 }
341 }
342 Ok(_) => {
343 tracing::debug!("No events to recover from WAL");
344 }
345 Err(e) => {
346 tracing::error!("❌ WAL recovery failed: {}", e);
347 }
348 }
349 }
350
351 store
352 }
353
354 pub fn ingest(&self, event: Event) -> Result<()> {
356 #[cfg(feature = "server")]
358 let timer = self.metrics.ingestion_duration_seconds.start_timer();
359
360 let validation_result = self.validate_event(&event);
362 if let Err(e) = validation_result {
363 #[cfg(feature = "server")]
364 {
365 self.metrics.ingestion_errors_total.inc();
366 timer.observe_duration();
367 }
368 return Err(e);
369 }
370
371 if let Some(ref wal) = self.wal
374 && let Err(e) = wal.append(event.clone())
375 {
376 #[cfg(feature = "server")]
377 {
378 self.metrics.ingestion_errors_total.inc();
379 timer.observe_duration();
380 }
381 return Err(e);
382 }
383
384 let mut events = self.events.write();
385 let offset = events.len();
386
387 self.index.index_event(
389 event.id,
390 event.entity_id_str(),
391 event.event_type_str(),
392 event.timestamp,
393 offset,
394 )?;
395
396 let projections = self.projections.read();
398 projections.process_event(&event)?;
399 drop(projections); let pipeline_results = self.pipeline_manager.process_event(&event);
404 if !pipeline_results.is_empty() {
405 tracing::debug!(
406 "Event {} processed by {} pipeline(s)",
407 event.id,
408 pipeline_results.len()
409 );
410 for (pipeline_id, result) in pipeline_results {
413 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
414 }
415 }
416
417 if let Some(ref storage) = self.storage {
419 let storage = storage.read();
420 storage.append_event(event.clone())?;
421 }
422
423 events.push(event.clone());
425 let total_events = events.len();
426 drop(events); #[cfg(feature = "server")]
430 self.websocket_manager
431 .broadcast_event(Arc::new(event.clone()));
432
433 #[cfg(feature = "server")]
435 self.dispatch_webhooks(&event);
436
437 self.geo_index.index_event(&event);
439
440 self.schema_evolution
442 .analyze_event(event.event_type_str(), &event.payload);
443
444 self.check_auto_snapshot(event.entity_id_str(), &event);
446
447 #[cfg(feature = "server")]
449 {
450 self.metrics.events_ingested_total.inc();
451 self.metrics
452 .events_ingested_by_type
453 .with_label_values(&[event.event_type_str()])
454 .inc();
455 self.metrics.storage_events_total.set(total_events as i64);
456 }
457
458 let mut total = self.total_ingested.write();
460 *total += 1;
461
462 #[cfg(feature = "server")]
463 timer.observe_duration();
464
465 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
466
467 Ok(())
468 }
469
470 pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()> {
477 if batch.is_empty() {
478 return Ok(());
479 }
480
481 for event in &batch {
483 self.validate_event(event)?;
484 }
485
486 if let Some(ref wal) = self.wal {
488 for event in &batch {
489 wal.append(event.clone())?;
490 }
491 }
492
493 let mut events = self.events.write();
495 let projections = self.projections.read();
496
497 for event in batch {
498 let offset = events.len();
499
500 self.index.index_event(
501 event.id,
502 event.entity_id_str(),
503 event.event_type_str(),
504 event.timestamp,
505 offset,
506 )?;
507
508 projections.process_event(&event)?;
509 self.pipeline_manager.process_event(&event);
510
511 if let Some(ref storage) = self.storage {
512 let storage = storage.read();
513 storage.append_event(event.clone())?;
514 }
515
516 self.geo_index.index_event(&event);
517 self.schema_evolution
518 .analyze_event(event.event_type_str(), &event.payload);
519
520 events.push(event);
521 }
522
523 let total_events = events.len();
524 drop(projections);
525 drop(events);
526
527 let mut total = self.total_ingested.write();
528 *total += total_events as u64;
529
530 Ok(())
531 }
532
533 pub fn ingest_replicated(&self, event: Event) -> Result<()> {
540 #[cfg(feature = "server")]
541 let timer = self.metrics.ingestion_duration_seconds.start_timer();
542
543 let mut events = self.events.write();
544 let offset = events.len();
545
546 self.index.index_event(
548 event.id,
549 event.entity_id_str(),
550 event.event_type_str(),
551 event.timestamp,
552 offset,
553 )?;
554
555 let projections = self.projections.read();
557 projections.process_event(&event)?;
558 drop(projections);
559
560 let pipeline_results = self.pipeline_manager.process_event(&event);
562 if !pipeline_results.is_empty() {
563 tracing::debug!(
564 "Replicated event {} processed by {} pipeline(s)",
565 event.id,
566 pipeline_results.len()
567 );
568 }
569
570 events.push(event.clone());
572 let total_events = events.len();
573 drop(events);
574
575 #[cfg(feature = "server")]
577 self.websocket_manager
578 .broadcast_event(Arc::new(event.clone()));
579
580 #[cfg(feature = "server")]
582 {
583 self.metrics.events_ingested_total.inc();
584 self.metrics
585 .events_ingested_by_type
586 .with_label_values(&[event.event_type_str()])
587 .inc();
588 self.metrics.storage_events_total.set(total_events as i64);
589 }
590
591 let mut total = self.total_ingested.write();
592 *total += 1;
593
594 #[cfg(feature = "server")]
595 timer.observe_duration();
596
597 tracing::debug!(
598 "Replicated event ingested: {} (offset: {})",
599 event.id,
600 offset
601 );
602
603 Ok(())
604 }
605
606 #[cfg(feature = "server")]
608 pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
609 Arc::clone(&self.websocket_manager)
610 }
611
612 pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
614 Arc::clone(&self.snapshot_manager)
615 }
616
617 pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
619 self.compaction_manager.as_ref().map(Arc::clone)
620 }
621
622 pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
624 Arc::clone(&self.schema_registry)
625 }
626
627 pub fn replay_manager(&self) -> Arc<ReplayManager> {
629 Arc::clone(&self.replay_manager)
630 }
631
632 pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
634 Arc::clone(&self.pipeline_manager)
635 }
636
637 #[cfg(feature = "server")]
639 pub fn metrics(&self) -> Arc<MetricsRegistry> {
640 Arc::clone(&self.metrics)
641 }
642
643 pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
645 self.projections.read()
646 }
647
648 pub fn register_projection(
657 &self,
658 projection: Arc<dyn crate::application::services::projection::Projection>,
659 ) {
660 let mut pm = self.projections.write();
661 pm.register(projection);
662 }
663
664 pub fn register_projection_with_backfill(
670 &self,
671 projection: Arc<dyn crate::application::services::projection::Projection>,
672 ) -> Result<()> {
673 {
675 let mut pm = self.projections.write();
676 pm.register(Arc::clone(&projection));
677 }
678
679 let events = self.events.read();
681 for event in events.iter() {
682 projection.process(event)?;
683 }
684
685 Ok(())
686 }
687
688 pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
691 Arc::clone(&self.projection_state_cache)
692 }
693
694 pub fn projection_status(&self) -> Arc<DashMap<String, String>> {
696 Arc::clone(&self.projection_status)
697 }
698
699 pub fn geo_index(&self) -> Arc<GeoIndex> {
702 self.geo_index.clone()
703 }
704
705 pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
707 self.exactly_once.clone()
708 }
709
710 pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
712 self.schema_evolution.clone()
713 }
714
715 pub fn snapshot_events(&self) -> Vec<Event> {
721 self.events.read().clone()
722 }
723
724 pub fn compact_entity_tokens(
746 &self,
747 entity_id: &str,
748 token_event_type: &str,
749 merged_event: Event,
750 ) -> Result<bool> {
751 {
753 let events = self.events.read();
754 let has_tokens = events
755 .iter()
756 .any(|e| e.entity_id_str() == entity_id && e.event_type_str() == token_event_type);
757 if !has_tokens {
758 return Ok(false);
759 }
760 }
761
762 let projections = self.projections.read();
764 projections.process_event(&merged_event)?;
765 drop(projections);
766
767 let mut events = self.events.write();
769
770 events.retain(|e| {
771 !(e.entity_id_str() == entity_id && e.event_type_str() == token_event_type)
772 });
773
774 events.push(merged_event.clone());
775
776 if let Some(ref wal) = self.wal {
780 wal.append(merged_event)?;
781 }
782
783 self.index.clear();
788 for (offset, event) in events.iter().enumerate() {
789 if let Err(e) = self.index.index_event(
790 event.id,
791 event.entity_id_str(),
792 event.event_type_str(),
793 event.timestamp,
794 offset,
795 ) {
796 tracing::warn!(
797 event_id = %event.id,
798 offset,
799 "Failed to re-index event during compaction: {e}"
800 );
801 }
802 }
803
804 Ok(true)
805 }
806
807 #[cfg(feature = "server")]
808 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
809 Arc::clone(&self.webhook_registry)
810 }
811
812 #[cfg(feature = "server")]
815 pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
816 *self.webhook_tx.write() = Some(tx);
817 tracing::info!("Webhook delivery channel connected");
818 }
819
820 #[cfg(feature = "server")]
822 fn dispatch_webhooks(&self, event: &Event) {
823 let matching = self.webhook_registry.find_matching(event);
824 if matching.is_empty() {
825 return;
826 }
827
828 let tx_guard = self.webhook_tx.read();
829 if let Some(ref tx) = *tx_guard {
830 for webhook in matching {
831 let task = WebhookDeliveryTask {
832 webhook,
833 event: event.clone(),
834 };
835 if let Err(e) = tx.send(task) {
836 tracing::warn!("Failed to queue webhook delivery: {}", e);
837 }
838 }
839 }
840 }
841
842 pub fn flush_storage(&self) -> Result<()> {
844 if let Some(ref storage) = self.storage {
845 let storage = storage.read();
846 storage.flush()?;
847 tracing::info!("✅ Flushed events to persistent storage");
848 }
849 Ok(())
850 }
851
852 pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
854 let events = self.query(QueryEventsRequest {
856 entity_id: Some(entity_id.to_string()),
857 event_type: None,
858 tenant_id: None,
859 as_of: None,
860 since: None,
861 until: None,
862 limit: None,
863 event_type_prefix: None,
864 payload_filter: None,
865 })?;
866
867 if events.is_empty() {
868 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
869 }
870
871 let mut state = serde_json::json!({});
873 for event in &events {
874 if let serde_json::Value::Object(ref mut state_map) = state
875 && let serde_json::Value::Object(ref payload_map) = event.payload
876 {
877 for (key, value) in payload_map {
878 state_map.insert(key.clone(), value.clone());
879 }
880 }
881 }
882
883 let last_event = events.last().unwrap();
884 self.snapshot_manager.create_snapshot(
885 entity_id.to_string(),
886 state,
887 last_event.timestamp,
888 events.len(),
889 SnapshotType::Manual,
890 )?;
891
892 Ok(())
893 }
894
895 fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
897 let entity_event_count = self
899 .index
900 .get_by_entity(entity_id)
901 .map(|entries| entries.len())
902 .unwrap_or(0);
903
904 if self.snapshot_manager.should_create_snapshot(
905 entity_id,
906 entity_event_count,
907 event.timestamp,
908 ) {
909 if let Err(e) = self.create_snapshot(entity_id) {
911 tracing::warn!(
912 "Failed to create automatic snapshot for {}: {}",
913 entity_id,
914 e
915 );
916 }
917 }
918 }
919
920 fn validate_event(&self, event: &Event) -> Result<()> {
922 if event.entity_id_str().is_empty() {
925 return Err(AllSourceError::ValidationError(
926 "entity_id cannot be empty".to_string(),
927 ));
928 }
929
930 if event.event_type_str().is_empty() {
931 return Err(AllSourceError::ValidationError(
932 "event_type cannot be empty".to_string(),
933 ));
934 }
935
936 if event.event_type().is_system() {
939 return Err(AllSourceError::ValidationError(
940 "Event types starting with '_system.' are reserved for internal use".to_string(),
941 ));
942 }
943
944 Ok(())
945 }
946
947 pub fn reset_projection(&self, name: &str) -> Result<usize> {
949 let projection_manager = self.projections.read();
950 let projection = projection_manager.get_projection(name).ok_or_else(|| {
951 AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
952 })?;
953
954 projection.clear();
956
957 let prefix = format!("{name}:");
959 let keys_to_remove: Vec<String> = self
960 .projection_state_cache
961 .iter()
962 .filter(|entry| entry.key().starts_with(&prefix))
963 .map(|entry| entry.key().clone())
964 .collect();
965 for key in keys_to_remove {
966 self.projection_state_cache.remove(&key);
967 }
968
969 let events = self.events.read();
971 let mut reprocessed = 0usize;
972 for event in events.iter() {
973 if projection.process(event).is_ok() {
974 reprocessed += 1;
975 }
976 }
977
978 Ok(reprocessed)
979 }
980
981 pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
983 if let Some(offset) = self.index.get_by_id(event_id) {
984 let events = self.events.read();
985 Ok(events.get(offset).cloned())
986 } else {
987 Ok(None)
988 }
989 }
990
991 pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
993 let query_type = if request.entity_id.is_some() {
995 "entity"
996 } else if request.event_type.is_some() {
997 "type"
998 } else if request.event_type_prefix.is_some() {
999 "type_prefix"
1000 } else {
1001 "full_scan"
1002 };
1003
1004 #[cfg(feature = "server")]
1006 let timer = self
1007 .metrics
1008 .query_duration_seconds
1009 .with_label_values(&[query_type])
1010 .start_timer();
1011
1012 #[cfg(feature = "server")]
1014 self.metrics
1015 .queries_total
1016 .with_label_values(&[query_type])
1017 .inc();
1018
1019 let events = self.events.read();
1020
1021 let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
1023 self.index
1025 .get_by_entity(entity_id)
1026 .map(|entries| self.filter_entries(entries, &request))
1027 .unwrap_or_default()
1028 } else if let Some(event_type) = &request.event_type {
1029 self.index
1031 .get_by_type(event_type)
1032 .map(|entries| self.filter_entries(entries, &request))
1033 .unwrap_or_default()
1034 } else if let Some(prefix) = &request.event_type_prefix {
1035 let entries = self.index.get_by_type_prefix(prefix);
1037 self.filter_entries(entries, &request)
1038 } else {
1039 (0..events.len()).collect()
1041 };
1042
1043 let mut results: Vec<Event> = offsets
1045 .iter()
1046 .filter_map(|&offset| events.get(offset).cloned())
1047 .filter(|event| self.apply_filters(event, &request))
1048 .collect();
1049
1050 results.sort_by_key(|x| x.timestamp);
1052
1053 if let Some(limit) = request.limit {
1055 results.truncate(limit);
1056 }
1057
1058 #[cfg(feature = "server")]
1060 {
1061 self.metrics
1062 .query_results_total
1063 .with_label_values(&[query_type])
1064 .inc_by(results.len() as u64);
1065 timer.observe_duration();
1066 }
1067
1068 Ok(results)
1069 }
1070
1071 fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
1073 entries
1074 .into_iter()
1075 .filter(|entry| {
1076 if let Some(as_of) = request.as_of
1078 && entry.timestamp > as_of
1079 {
1080 return false;
1081 }
1082 if let Some(since) = request.since
1083 && entry.timestamp < since
1084 {
1085 return false;
1086 }
1087 if let Some(until) = request.until
1088 && entry.timestamp > until
1089 {
1090 return false;
1091 }
1092 true
1093 })
1094 .map(|entry| entry.offset)
1095 .collect()
1096 }
1097
1098 fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
1100 if request.entity_id.is_some()
1102 && let Some(ref event_type) = request.event_type
1103 && event.event_type_str() != event_type
1104 {
1105 return false;
1106 }
1107
1108 if request.entity_id.is_some()
1110 && let Some(ref prefix) = request.event_type_prefix
1111 && !event.event_type_str().starts_with(prefix)
1112 {
1113 return false;
1114 }
1115
1116 if let Some(ref filter_str) = request.payload_filter
1118 && let Ok(filter_obj) =
1119 serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(filter_str)
1120 {
1121 let payload = event.payload();
1122 for (key, expected_value) in &filter_obj {
1123 match payload.get(key) {
1124 Some(actual_value) if actual_value == expected_value => {}
1125 _ => return false,
1126 }
1127 }
1128 }
1129
1130 true
1131 }
1132
1133 pub fn reconstruct_state(
1136 &self,
1137 entity_id: &str,
1138 as_of: Option<DateTime<Utc>>,
1139 ) -> Result<serde_json::Value> {
1140 let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
1142 if let Some(snapshot) = self
1144 .snapshot_manager
1145 .get_snapshot_as_of(entity_id, as_of_time)
1146 {
1147 tracing::debug!(
1148 "Using snapshot from {} for entity {} (saved {} events)",
1149 snapshot.as_of,
1150 entity_id,
1151 snapshot.event_count
1152 );
1153 (snapshot.state.clone(), Some(snapshot.as_of))
1154 } else {
1155 (serde_json::json!({}), None)
1156 }
1157 } else {
1158 if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
1160 tracing::debug!(
1161 "Using latest snapshot from {} for entity {}",
1162 snapshot.as_of,
1163 entity_id
1164 );
1165 (snapshot.state.clone(), Some(snapshot.as_of))
1166 } else {
1167 (serde_json::json!({}), None)
1168 }
1169 };
1170
1171 let events = self.query(QueryEventsRequest {
1173 entity_id: Some(entity_id.to_string()),
1174 event_type: None,
1175 tenant_id: None,
1176 as_of,
1177 since: since_timestamp,
1178 until: None,
1179 limit: None,
1180 event_type_prefix: None,
1181 payload_filter: None,
1182 })?;
1183
1184 if events.is_empty() && since_timestamp.is_none() {
1186 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1187 }
1188
1189 let mut merged_state = merged_state;
1191 for event in &events {
1192 if let serde_json::Value::Object(ref mut state_map) = merged_state
1193 && let serde_json::Value::Object(ref payload_map) = event.payload
1194 {
1195 for (key, value) in payload_map {
1196 state_map.insert(key.clone(), value.clone());
1197 }
1198 }
1199 }
1200
1201 let state = serde_json::json!({
1203 "entity_id": entity_id,
1204 "last_updated": events.last().map(|e| e.timestamp),
1205 "event_count": events.len(),
1206 "as_of": as_of,
1207 "current_state": merged_state,
1208 "history": events.iter().map(|e| {
1209 serde_json::json!({
1210 "event_id": e.id,
1211 "type": e.event_type,
1212 "timestamp": e.timestamp,
1213 "payload": e.payload
1214 })
1215 }).collect::<Vec<_>>()
1216 });
1217
1218 Ok(state)
1219 }
1220
1221 pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
1223 let projections = self.projections.read();
1224
1225 if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
1226 && let Some(state) = snapshot_projection.get_state(entity_id)
1227 {
1228 return Ok(serde_json::json!({
1229 "entity_id": entity_id,
1230 "snapshot": state,
1231 "from_projection": "entity_snapshots"
1232 }));
1233 }
1234
1235 Err(AllSourceError::EntityNotFound(entity_id.to_string()))
1236 }
1237
1238 pub fn stats(&self) -> StoreStats {
1240 let events = self.events.read();
1241 let index_stats = self.index.stats();
1242
1243 StoreStats {
1244 total_events: events.len(),
1245 total_entities: index_stats.total_entities,
1246 total_event_types: index_stats.total_event_types,
1247 total_ingested: *self.total_ingested.read(),
1248 }
1249 }
1250
1251 pub fn list_streams(&self) -> Vec<StreamInfo> {
1253 self.index
1254 .get_all_entities()
1255 .into_iter()
1256 .map(|entity_id| {
1257 let event_count = self
1258 .index
1259 .get_by_entity(&entity_id)
1260 .map(|entries| entries.len())
1261 .unwrap_or(0);
1262 let last_event_at = self
1263 .index
1264 .get_by_entity(&entity_id)
1265 .and_then(|entries| entries.last().map(|e| e.timestamp));
1266 StreamInfo {
1267 stream_id: entity_id,
1268 event_count,
1269 last_event_at,
1270 }
1271 })
1272 .collect()
1273 }
1274
1275 pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
1277 self.index
1278 .get_all_types()
1279 .into_iter()
1280 .map(|event_type| {
1281 let event_count = self
1282 .index
1283 .get_by_type(&event_type)
1284 .map(|entries| entries.len())
1285 .unwrap_or(0);
1286 let last_event_at = self
1287 .index
1288 .get_by_type(&event_type)
1289 .and_then(|entries| entries.last().map(|e| e.timestamp));
1290 EventTypeInfo {
1291 event_type,
1292 event_count,
1293 last_event_at,
1294 }
1295 })
1296 .collect()
1297 }
1298
1299 pub fn enable_wal_replication(
1306 &self,
1307 tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
1308 ) {
1309 if let Some(ref wal_arc) = self.wal {
1310 wal_arc.set_replication_tx(tx);
1311 tracing::info!("WAL replication broadcast enabled");
1312 } else {
1313 tracing::warn!("Cannot enable WAL replication: WAL is not configured");
1314 }
1315 }
1316
1317 pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1320 self.wal.as_ref()
1321 }
1322
1323 pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1326 self.storage.as_ref()
1327 }
1328}
1329
1330#[derive(Debug, Clone, Default)]
1332pub struct EventStoreConfig {
1333 pub storage_dir: Option<PathBuf>,
1335
1336 pub snapshot_config: SnapshotConfig,
1338
1339 pub wal_dir: Option<PathBuf>,
1341
1342 pub wal_config: WALConfig,
1344
1345 pub compaction_config: CompactionConfig,
1347
1348 pub schema_registry_config: SchemaRegistryConfig,
1350
1351 pub system_data_dir: Option<PathBuf>,
1356
1357 pub bootstrap_tenant: Option<String>,
1359}
1360
1361impl EventStoreConfig {
1362 pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
1364 Self {
1365 storage_dir: Some(storage_dir.into()),
1366 ..Self::default()
1367 }
1368 }
1369
1370 pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
1372 Self {
1373 snapshot_config,
1374 ..Self::default()
1375 }
1376 }
1377
1378 pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
1380 Self {
1381 wal_dir: Some(wal_dir.into()),
1382 wal_config,
1383 ..Self::default()
1384 }
1385 }
1386
1387 pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
1389 Self {
1390 storage_dir: Some(storage_dir.into()),
1391 snapshot_config,
1392 ..Self::default()
1393 }
1394 }
1395
1396 pub fn production(
1398 storage_dir: impl Into<PathBuf>,
1399 wal_dir: impl Into<PathBuf>,
1400 snapshot_config: SnapshotConfig,
1401 wal_config: WALConfig,
1402 compaction_config: CompactionConfig,
1403 ) -> Self {
1404 let storage_dir = storage_dir.into();
1405 let system_data_dir = storage_dir.join("__system");
1406 Self {
1407 storage_dir: Some(storage_dir),
1408 snapshot_config,
1409 wal_dir: Some(wal_dir.into()),
1410 wal_config,
1411 compaction_config,
1412 system_data_dir: Some(system_data_dir),
1413 ..Self::default()
1414 }
1415 }
1416
1417 pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
1422 self.system_data_dir
1423 .clone()
1424 .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
1425 }
1426
1427 pub fn from_env() -> (Self, &'static str) {
1435 Self::from_env_vars(
1436 std::env::var("ALLSOURCE_DATA_DIR")
1437 .ok()
1438 .filter(|s| !s.is_empty()),
1439 std::env::var("ALLSOURCE_STORAGE_DIR")
1440 .ok()
1441 .filter(|s| !s.is_empty()),
1442 std::env::var("ALLSOURCE_WAL_DIR")
1443 .ok()
1444 .filter(|s| !s.is_empty()),
1445 std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
1446 )
1447 }
1448
1449 pub fn from_env_vars(
1451 data_dir: Option<String>,
1452 explicit_storage_dir: Option<String>,
1453 explicit_wal_dir: Option<String>,
1454 wal_enabled_var: Option<String>,
1455 ) -> (Self, &'static str) {
1456 let data_dir = data_dir.filter(|s| !s.is_empty());
1457 let storage_dir = explicit_storage_dir
1458 .filter(|s| !s.is_empty())
1459 .or_else(|| data_dir.as_ref().map(|d| format!("{}/storage", d)));
1460 let wal_dir = explicit_wal_dir
1461 .filter(|s| !s.is_empty())
1462 .or_else(|| data_dir.as_ref().map(|d| format!("{}/wal", d)));
1463 let wal_enabled = wal_enabled_var.map(|v| v == "true").unwrap_or(true);
1464
1465 match (&storage_dir, &wal_dir) {
1466 (Some(sd), Some(wd)) if wal_enabled => {
1467 let config = Self::production(
1468 sd,
1469 wd,
1470 SnapshotConfig::default(),
1471 WALConfig::default(),
1472 CompactionConfig::default(),
1473 );
1474 (config, "wal+parquet")
1475 }
1476 (Some(sd), _) => {
1477 let config = Self::with_persistence(sd);
1478 (config, "parquet-only")
1479 }
1480 (_, Some(wd)) if wal_enabled => {
1481 let config = Self::with_wal(wd, WALConfig::default());
1482 (config, "wal-only")
1483 }
1484 _ => (Self::default(), "in-memory"),
1485 }
1486 }
1487}
1488
1489#[derive(Debug, serde::Serialize)]
1490pub struct StoreStats {
1491 pub total_events: usize,
1492 pub total_entities: usize,
1493 pub total_event_types: usize,
1494 pub total_ingested: u64,
1495}
1496
1497#[derive(Debug, Clone, serde::Serialize)]
1499pub struct StreamInfo {
1500 pub stream_id: String,
1502 pub event_count: usize,
1504 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1506}
1507
1508#[derive(Debug, Clone, serde::Serialize)]
1510pub struct EventTypeInfo {
1511 pub event_type: String,
1513 pub event_count: usize,
1515 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1517}
1518
1519impl Default for EventStore {
1520 fn default() -> Self {
1521 Self::new()
1522 }
1523}
1524
1525#[cfg(test)]
1526mod tests {
1527 use super::*;
1528 use crate::domain::entities::Event;
1529 use tempfile::TempDir;
1530
1531 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1532 Event::from_strings(
1533 event_type.to_string(),
1534 entity_id.to_string(),
1535 "default".to_string(),
1536 serde_json::json!({"name": "Test", "value": 42}),
1537 None,
1538 )
1539 .unwrap()
1540 }
1541
1542 fn create_test_event_with_payload(
1543 entity_id: &str,
1544 event_type: &str,
1545 payload: serde_json::Value,
1546 ) -> Event {
1547 Event::from_strings(
1548 event_type.to_string(),
1549 entity_id.to_string(),
1550 "default".to_string(),
1551 payload,
1552 None,
1553 )
1554 .unwrap()
1555 }
1556
1557 #[test]
1558 fn test_event_store_new() {
1559 let store = EventStore::new();
1560 assert_eq!(store.stats().total_events, 0);
1561 assert_eq!(store.stats().total_entities, 0);
1562 }
1563
1564 #[test]
1565 fn test_event_store_default() {
1566 let store = EventStore::default();
1567 assert_eq!(store.stats().total_events, 0);
1568 }
1569
1570 #[test]
1571 fn test_ingest_single_event() {
1572 let store = EventStore::new();
1573 let event = create_test_event("entity-1", "user.created");
1574
1575 store.ingest(event).unwrap();
1576
1577 assert_eq!(store.stats().total_events, 1);
1578 assert_eq!(store.stats().total_ingested, 1);
1579 }
1580
1581 #[test]
1582 fn test_ingest_multiple_events() {
1583 let store = EventStore::new();
1584
1585 for i in 0..10 {
1586 let event = create_test_event(&format!("entity-{}", i), "user.created");
1587 store.ingest(event).unwrap();
1588 }
1589
1590 assert_eq!(store.stats().total_events, 10);
1591 assert_eq!(store.stats().total_ingested, 10);
1592 }
1593
1594 #[test]
1595 fn test_query_by_entity_id() {
1596 let store = EventStore::new();
1597
1598 store
1599 .ingest(create_test_event("entity-1", "user.created"))
1600 .unwrap();
1601 store
1602 .ingest(create_test_event("entity-2", "user.created"))
1603 .unwrap();
1604 store
1605 .ingest(create_test_event("entity-1", "user.updated"))
1606 .unwrap();
1607
1608 let results = store
1609 .query(QueryEventsRequest {
1610 entity_id: Some("entity-1".to_string()),
1611 event_type: None,
1612 tenant_id: None,
1613 as_of: None,
1614 since: None,
1615 until: None,
1616 limit: None,
1617 event_type_prefix: None,
1618 payload_filter: None,
1619 })
1620 .unwrap();
1621
1622 assert_eq!(results.len(), 2);
1623 }
1624
1625 #[test]
1626 fn test_query_by_event_type() {
1627 let store = EventStore::new();
1628
1629 store
1630 .ingest(create_test_event("entity-1", "user.created"))
1631 .unwrap();
1632 store
1633 .ingest(create_test_event("entity-2", "user.updated"))
1634 .unwrap();
1635 store
1636 .ingest(create_test_event("entity-3", "user.created"))
1637 .unwrap();
1638
1639 let results = store
1640 .query(QueryEventsRequest {
1641 entity_id: None,
1642 event_type: Some("user.created".to_string()),
1643 tenant_id: None,
1644 as_of: None,
1645 since: None,
1646 until: None,
1647 limit: None,
1648 event_type_prefix: None,
1649 payload_filter: None,
1650 })
1651 .unwrap();
1652
1653 assert_eq!(results.len(), 2);
1654 }
1655
1656 #[test]
1657 fn test_query_with_limit() {
1658 let store = EventStore::new();
1659
1660 for i in 0..10 {
1661 let event = create_test_event(&format!("entity-{}", i), "user.created");
1662 store.ingest(event).unwrap();
1663 }
1664
1665 let results = store
1666 .query(QueryEventsRequest {
1667 entity_id: None,
1668 event_type: None,
1669 tenant_id: None,
1670 as_of: None,
1671 since: None,
1672 until: None,
1673 limit: Some(5),
1674 event_type_prefix: None,
1675 payload_filter: None,
1676 })
1677 .unwrap();
1678
1679 assert_eq!(results.len(), 5);
1680 }
1681
1682 #[test]
1683 fn test_query_empty_store() {
1684 let store = EventStore::new();
1685
1686 let results = store
1687 .query(QueryEventsRequest {
1688 entity_id: Some("non-existent".to_string()),
1689 event_type: None,
1690 tenant_id: None,
1691 as_of: None,
1692 since: None,
1693 until: None,
1694 limit: None,
1695 event_type_prefix: None,
1696 payload_filter: None,
1697 })
1698 .unwrap();
1699
1700 assert!(results.is_empty());
1701 }
1702
1703 #[test]
1704 fn test_reconstruct_state() {
1705 let store = EventStore::new();
1706
1707 store
1708 .ingest(create_test_event("entity-1", "user.created"))
1709 .unwrap();
1710
1711 let state = store.reconstruct_state("entity-1", None).unwrap();
1712 assert_eq!(state["current_state"]["name"], "Test");
1714 assert_eq!(state["current_state"]["value"], 42);
1715 }
1716
1717 #[test]
1718 fn test_reconstruct_state_not_found() {
1719 let store = EventStore::new();
1720
1721 let result = store.reconstruct_state("non-existent", None);
1722 assert!(result.is_err());
1723 }
1724
1725 #[test]
1726 fn test_get_snapshot_empty() {
1727 let store = EventStore::new();
1728
1729 let result = store.get_snapshot("non-existent");
1730 assert!(result.is_err());
1732 }
1733
1734 #[test]
1735 fn test_create_snapshot() {
1736 let store = EventStore::new();
1737
1738 store
1739 .ingest(create_test_event("entity-1", "user.created"))
1740 .unwrap();
1741
1742 store.create_snapshot("entity-1").unwrap();
1743
1744 let snapshot = store.get_snapshot("entity-1").unwrap();
1746 assert!(snapshot != serde_json::json!(null));
1747 }
1748
1749 #[test]
1750 fn test_create_snapshot_entity_not_found() {
1751 let store = EventStore::new();
1752
1753 let result = store.create_snapshot("non-existent");
1754 assert!(result.is_err());
1755 }
1756
1757 #[test]
1758 fn test_websocket_manager() {
1759 let store = EventStore::new();
1760 let manager = store.websocket_manager();
1761 assert!(Arc::strong_count(&manager) >= 1);
1763 }
1764
1765 #[test]
1766 fn test_snapshot_manager() {
1767 let store = EventStore::new();
1768 let manager = store.snapshot_manager();
1769 assert!(Arc::strong_count(&manager) >= 1);
1770 }
1771
1772 #[test]
1773 fn test_compaction_manager_none() {
1774 let store = EventStore::new();
1775 assert!(store.compaction_manager().is_none());
1777 }
1778
1779 #[test]
1780 fn test_schema_registry() {
1781 let store = EventStore::new();
1782 let registry = store.schema_registry();
1783 assert!(Arc::strong_count(®istry) >= 1);
1784 }
1785
1786 #[test]
1787 fn test_replay_manager() {
1788 let store = EventStore::new();
1789 let manager = store.replay_manager();
1790 assert!(Arc::strong_count(&manager) >= 1);
1791 }
1792
1793 #[test]
1794 fn test_pipeline_manager() {
1795 let store = EventStore::new();
1796 let manager = store.pipeline_manager();
1797 assert!(Arc::strong_count(&manager) >= 1);
1798 }
1799
1800 #[test]
1801 fn test_projection_manager() {
1802 let store = EventStore::new();
1803 let manager = store.projection_manager();
1804 let projections = manager.list_projections();
1806 assert!(projections.len() >= 2); }
1808
1809 #[test]
1810 fn test_projection_state_cache() {
1811 let store = EventStore::new();
1812 let cache = store.projection_state_cache();
1813
1814 cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
1815 assert_eq!(cache.len(), 1);
1816
1817 let value = cache.get("test:key").unwrap();
1818 assert_eq!(value["value"], 123);
1819 }
1820
1821 #[test]
1822 fn test_metrics() {
1823 let store = EventStore::new();
1824 let metrics = store.metrics();
1825 assert!(Arc::strong_count(&metrics) >= 1);
1826 }
1827
1828 #[test]
1829 fn test_store_stats() {
1830 let store = EventStore::new();
1831
1832 store
1833 .ingest(create_test_event("entity-1", "user.created"))
1834 .unwrap();
1835 store
1836 .ingest(create_test_event("entity-2", "order.placed"))
1837 .unwrap();
1838
1839 let stats = store.stats();
1840 assert_eq!(stats.total_events, 2);
1841 assert_eq!(stats.total_entities, 2);
1842 assert_eq!(stats.total_event_types, 2);
1843 assert_eq!(stats.total_ingested, 2);
1844 }
1845
1846 #[test]
1847 fn test_event_store_config_default() {
1848 let config = EventStoreConfig::default();
1849 assert!(config.storage_dir.is_none());
1850 assert!(config.wal_dir.is_none());
1851 }
1852
1853 #[test]
1854 fn test_event_store_config_with_persistence() {
1855 let temp_dir = TempDir::new().unwrap();
1856 let config = EventStoreConfig::with_persistence(temp_dir.path());
1857
1858 assert!(config.storage_dir.is_some());
1859 assert!(config.wal_dir.is_none());
1860 }
1861
1862 #[test]
1863 fn test_event_store_config_with_wal() {
1864 let temp_dir = TempDir::new().unwrap();
1865 let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
1866
1867 assert!(config.storage_dir.is_none());
1868 assert!(config.wal_dir.is_some());
1869 }
1870
1871 #[test]
1872 fn test_event_store_config_with_all() {
1873 let temp_dir = TempDir::new().unwrap();
1874 let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
1875
1876 assert!(config.storage_dir.is_some());
1877 }
1878
1879 #[test]
1880 fn test_event_store_config_production() {
1881 let storage_dir = TempDir::new().unwrap();
1882 let wal_dir = TempDir::new().unwrap();
1883 let config = EventStoreConfig::production(
1884 storage_dir.path(),
1885 wal_dir.path(),
1886 SnapshotConfig::default(),
1887 WALConfig::default(),
1888 CompactionConfig::default(),
1889 );
1890
1891 assert!(config.storage_dir.is_some());
1892 assert!(config.wal_dir.is_some());
1893 }
1894
1895 #[test]
1901 fn test_from_env_vars_data_dir_enables_full_persistence() {
1902 let (config, mode) =
1903 EventStoreConfig::from_env_vars(Some("/app/data".to_string()), None, None, None);
1904 assert_eq!(mode, "wal+parquet");
1905 assert_eq!(
1906 config.storage_dir.unwrap().to_str().unwrap(),
1907 "/app/data/storage"
1908 );
1909 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
1910 }
1911
1912 #[test]
1913 fn test_from_env_vars_explicit_dirs() {
1914 let (config, mode) = EventStoreConfig::from_env_vars(
1915 None,
1916 Some("/custom/storage".to_string()),
1917 Some("/custom/wal".to_string()),
1918 None,
1919 );
1920 assert_eq!(mode, "wal+parquet");
1921 assert_eq!(
1922 config.storage_dir.unwrap().to_str().unwrap(),
1923 "/custom/storage"
1924 );
1925 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
1926 }
1927
1928 #[test]
1929 fn test_from_env_vars_wal_disabled() {
1930 let (config, mode) = EventStoreConfig::from_env_vars(
1931 Some("/app/data".to_string()),
1932 None,
1933 None,
1934 Some("false".to_string()),
1935 );
1936 assert_eq!(mode, "parquet-only");
1937 assert!(config.storage_dir.is_some());
1938 assert!(config.wal_dir.is_none());
1939 }
1940
1941 #[test]
1942 fn test_from_env_vars_no_dirs_is_in_memory() {
1943 let (config, mode) = EventStoreConfig::from_env_vars(None, None, None, None);
1944 assert_eq!(mode, "in-memory");
1945 assert!(config.storage_dir.is_none());
1946 assert!(config.wal_dir.is_none());
1947 }
1948
1949 #[test]
1950 fn test_from_env_vars_empty_strings_treated_as_none() {
1951 let (_, mode) = EventStoreConfig::from_env_vars(
1952 Some("".to_string()),
1953 Some("".to_string()),
1954 Some("".to_string()),
1955 None,
1956 );
1957 assert_eq!(mode, "in-memory");
1958 }
1959
1960 #[test]
1961 fn test_from_env_vars_explicit_overrides_data_dir() {
1962 let (config, mode) = EventStoreConfig::from_env_vars(
1963 Some("/app/data".to_string()),
1964 Some("/override/storage".to_string()),
1965 Some("/override/wal".to_string()),
1966 None,
1967 );
1968 assert_eq!(mode, "wal+parquet");
1969 assert_eq!(
1970 config.storage_dir.unwrap().to_str().unwrap(),
1971 "/override/storage"
1972 );
1973 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
1974 }
1975
1976 #[test]
1977 fn test_from_env_vars_wal_only() {
1978 let (config, mode) =
1979 EventStoreConfig::from_env_vars(None, None, Some("/wal/only".to_string()), None);
1980 assert_eq!(mode, "wal-only");
1981 assert!(config.storage_dir.is_none());
1982 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
1983 }
1984
1985 #[test]
1986 fn test_store_stats_serde() {
1987 let stats = StoreStats {
1988 total_events: 100,
1989 total_entities: 50,
1990 total_event_types: 10,
1991 total_ingested: 100,
1992 };
1993
1994 let json = serde_json::to_string(&stats).unwrap();
1995 assert!(json.contains("\"total_events\":100"));
1996 assert!(json.contains("\"total_entities\":50"));
1997 }
1998
1999 #[test]
2000 fn test_query_with_entity_and_type() {
2001 let store = EventStore::new();
2002
2003 store
2004 .ingest(create_test_event("entity-1", "user.created"))
2005 .unwrap();
2006 store
2007 .ingest(create_test_event("entity-1", "user.updated"))
2008 .unwrap();
2009 store
2010 .ingest(create_test_event("entity-2", "user.created"))
2011 .unwrap();
2012
2013 let results = store
2014 .query(QueryEventsRequest {
2015 entity_id: Some("entity-1".to_string()),
2016 event_type: Some("user.created".to_string()),
2017 tenant_id: None,
2018 as_of: None,
2019 since: None,
2020 until: None,
2021 limit: None,
2022 event_type_prefix: None,
2023 payload_filter: None,
2024 })
2025 .unwrap();
2026
2027 assert_eq!(results.len(), 1);
2028 assert_eq!(results[0].event_type_str(), "user.created");
2029 }
2030
2031 #[test]
2032 fn test_query_by_event_type_prefix() {
2033 let store = EventStore::new();
2034
2035 store
2037 .ingest(create_test_event("entity-1", "index.created"))
2038 .unwrap();
2039 store
2040 .ingest(create_test_event("entity-2", "index.updated"))
2041 .unwrap();
2042 store
2043 .ingest(create_test_event("entity-3", "trade.created"))
2044 .unwrap();
2045 store
2046 .ingest(create_test_event("entity-4", "trade.completed"))
2047 .unwrap();
2048 store
2049 .ingest(create_test_event("entity-5", "balance.updated"))
2050 .unwrap();
2051
2052 let results = store
2054 .query(QueryEventsRequest {
2055 entity_id: None,
2056 event_type: None,
2057 tenant_id: None,
2058 as_of: None,
2059 since: None,
2060 until: None,
2061 limit: None,
2062 event_type_prefix: Some("index.".to_string()),
2063 payload_filter: None,
2064 })
2065 .unwrap();
2066
2067 assert_eq!(results.len(), 2);
2068 assert!(
2069 results
2070 .iter()
2071 .all(|e| e.event_type_str().starts_with("index."))
2072 );
2073 }
2074
2075 #[test]
2076 fn test_query_by_event_type_prefix_empty_returns_all() {
2077 let store = EventStore::new();
2078
2079 store
2080 .ingest(create_test_event("entity-1", "index.created"))
2081 .unwrap();
2082 store
2083 .ingest(create_test_event("entity-2", "trade.created"))
2084 .unwrap();
2085
2086 let results = store
2088 .query(QueryEventsRequest {
2089 entity_id: None,
2090 event_type: None,
2091 tenant_id: None,
2092 as_of: None,
2093 since: None,
2094 until: None,
2095 limit: None,
2096 event_type_prefix: Some("".to_string()),
2097 payload_filter: None,
2098 })
2099 .unwrap();
2100
2101 assert_eq!(results.len(), 2);
2102 }
2103
2104 #[test]
2105 fn test_query_by_event_type_prefix_no_match() {
2106 let store = EventStore::new();
2107
2108 store
2109 .ingest(create_test_event("entity-1", "index.created"))
2110 .unwrap();
2111
2112 let results = store
2113 .query(QueryEventsRequest {
2114 entity_id: None,
2115 event_type: None,
2116 tenant_id: None,
2117 as_of: None,
2118 since: None,
2119 until: None,
2120 limit: None,
2121 event_type_prefix: Some("nonexistent.".to_string()),
2122 payload_filter: None,
2123 })
2124 .unwrap();
2125
2126 assert!(results.is_empty());
2127 }
2128
2129 #[test]
2130 fn test_query_by_entity_with_type_prefix() {
2131 let store = EventStore::new();
2132
2133 store
2134 .ingest(create_test_event("entity-1", "index.created"))
2135 .unwrap();
2136 store
2137 .ingest(create_test_event("entity-1", "trade.created"))
2138 .unwrap();
2139 store
2140 .ingest(create_test_event("entity-2", "index.updated"))
2141 .unwrap();
2142
2143 let results = store
2145 .query(QueryEventsRequest {
2146 entity_id: Some("entity-1".to_string()),
2147 event_type: None,
2148 tenant_id: None,
2149 as_of: None,
2150 since: None,
2151 until: None,
2152 limit: None,
2153 event_type_prefix: Some("index.".to_string()),
2154 payload_filter: None,
2155 })
2156 .unwrap();
2157
2158 assert_eq!(results.len(), 1);
2159 assert_eq!(results[0].event_type_str(), "index.created");
2160 }
2161
2162 #[test]
2163 fn test_query_prefix_with_limit() {
2164 let store = EventStore::new();
2165
2166 for i in 0..5 {
2167 store
2168 .ingest(create_test_event(&format!("entity-{}", i), "index.created"))
2169 .unwrap();
2170 }
2171
2172 let results = store
2173 .query(QueryEventsRequest {
2174 entity_id: None,
2175 event_type: None,
2176 tenant_id: None,
2177 as_of: None,
2178 since: None,
2179 until: None,
2180 limit: Some(3),
2181 event_type_prefix: Some("index.".to_string()),
2182 payload_filter: None,
2183 })
2184 .unwrap();
2185
2186 assert_eq!(results.len(), 3);
2187 }
2188
2189 #[test]
2190 fn test_query_prefix_alongside_existing_filters() {
2191 let store = EventStore::new();
2192
2193 store
2194 .ingest(create_test_event("entity-1", "index.created"))
2195 .unwrap();
2196 std::thread::sleep(std::time::Duration::from_millis(10));
2198 store
2199 .ingest(create_test_event("entity-2", "index.strategy.updated"))
2200 .unwrap();
2201 std::thread::sleep(std::time::Duration::from_millis(10));
2202 store
2203 .ingest(create_test_event("entity-3", "index.deleted"))
2204 .unwrap();
2205
2206 let results = store
2208 .query(QueryEventsRequest {
2209 entity_id: None,
2210 event_type: None,
2211 tenant_id: None,
2212 as_of: None,
2213 since: None,
2214 until: None,
2215 limit: Some(2),
2216 event_type_prefix: Some("index.".to_string()),
2217 payload_filter: None,
2218 })
2219 .unwrap();
2220
2221 assert_eq!(results.len(), 2);
2222 }
2223
2224 #[test]
2225 fn test_query_with_payload_filter() {
2226 let store = EventStore::new();
2227
2228 for i in 0..5 {
2230 store
2231 .ingest(create_test_event_with_payload(
2232 &format!("entity-{}", i),
2233 "user.action",
2234 serde_json::json!({"user_id": "alice", "action": "click"}),
2235 ))
2236 .unwrap();
2237 }
2238 for i in 5..10 {
2240 store
2241 .ingest(create_test_event_with_payload(
2242 &format!("entity-{}", i),
2243 "user.action",
2244 serde_json::json!({"user_id": "bob", "action": "view"}),
2245 ))
2246 .unwrap();
2247 }
2248
2249 let results = store
2251 .query(QueryEventsRequest {
2252 entity_id: None,
2253 event_type: Some("user.action".to_string()),
2254 tenant_id: None,
2255 as_of: None,
2256 since: None,
2257 until: None,
2258 limit: None,
2259 event_type_prefix: None,
2260 payload_filter: Some(r#"{"user_id":"alice"}"#.to_string()),
2261 })
2262 .unwrap();
2263
2264 assert_eq!(results.len(), 5);
2265 }
2266
2267 #[test]
2268 fn test_query_payload_filter_non_existent_field() {
2269 let store = EventStore::new();
2270
2271 store
2272 .ingest(create_test_event_with_payload(
2273 "entity-1",
2274 "user.action",
2275 serde_json::json!({"user_id": "alice"}),
2276 ))
2277 .unwrap();
2278
2279 let results = store
2281 .query(QueryEventsRequest {
2282 entity_id: None,
2283 event_type: None,
2284 tenant_id: None,
2285 as_of: None,
2286 since: None,
2287 until: None,
2288 limit: None,
2289 event_type_prefix: None,
2290 payload_filter: Some(r#"{"nonexistent":"value"}"#.to_string()),
2291 })
2292 .unwrap();
2293
2294 assert!(results.is_empty());
2295 }
2296
2297 #[test]
2298 fn test_query_payload_filter_with_prefix() {
2299 let store = EventStore::new();
2300
2301 store
2302 .ingest(create_test_event_with_payload(
2303 "entity-1",
2304 "index.created",
2305 serde_json::json!({"status": "active"}),
2306 ))
2307 .unwrap();
2308 store
2309 .ingest(create_test_event_with_payload(
2310 "entity-2",
2311 "index.created",
2312 serde_json::json!({"status": "inactive"}),
2313 ))
2314 .unwrap();
2315 store
2316 .ingest(create_test_event_with_payload(
2317 "entity-3",
2318 "trade.created",
2319 serde_json::json!({"status": "active"}),
2320 ))
2321 .unwrap();
2322
2323 let results = store
2325 .query(QueryEventsRequest {
2326 entity_id: None,
2327 event_type: None,
2328 tenant_id: None,
2329 as_of: None,
2330 since: None,
2331 until: None,
2332 limit: None,
2333 event_type_prefix: Some("index.".to_string()),
2334 payload_filter: Some(r#"{"status":"active"}"#.to_string()),
2335 })
2336 .unwrap();
2337
2338 assert_eq!(results.len(), 1);
2339 assert_eq!(results[0].entity_id().to_string(), "entity-1");
2340 }
2341
2342 #[test]
2343 fn test_flush_storage_no_storage() {
2344 let store = EventStore::new();
2345 let result = store.flush_storage();
2347 assert!(result.is_ok());
2348 }
2349
2350 #[test]
2351 fn test_state_evolution() {
2352 let store = EventStore::new();
2353
2354 store
2356 .ingest(
2357 Event::from_strings(
2358 "user.created".to_string(),
2359 "user-1".to_string(),
2360 "default".to_string(),
2361 serde_json::json!({"name": "Alice", "age": 25}),
2362 None,
2363 )
2364 .unwrap(),
2365 )
2366 .unwrap();
2367
2368 store
2370 .ingest(
2371 Event::from_strings(
2372 "user.updated".to_string(),
2373 "user-1".to_string(),
2374 "default".to_string(),
2375 serde_json::json!({"age": 26}),
2376 None,
2377 )
2378 .unwrap(),
2379 )
2380 .unwrap();
2381
2382 let state = store.reconstruct_state("user-1", None).unwrap();
2383 assert_eq!(state["current_state"]["name"], "Alice");
2385 assert_eq!(state["current_state"]["age"], 26);
2386 }
2387
2388 #[test]
2389 fn test_reject_system_event_types() {
2390 let store = EventStore::new();
2391
2392 let event = Event::reconstruct_from_strings(
2394 uuid::Uuid::new_v4(),
2395 "_system.tenant.created".to_string(),
2396 "_system:tenant:acme".to_string(),
2397 "_system".to_string(),
2398 serde_json::json!({"name": "ACME"}),
2399 chrono::Utc::now(),
2400 None,
2401 1,
2402 );
2403
2404 let result = store.ingest(event);
2405 assert!(result.is_err());
2406 let err = result.unwrap_err();
2407 assert!(
2408 err.to_string().contains("reserved for internal use"),
2409 "Expected system namespace rejection, got: {}",
2410 err
2411 );
2412 }
2413
2414 #[test]
2422 fn test_wal_recovery_checkpoints_to_parquet() {
2423 let data_dir = TempDir::new().unwrap();
2424 let storage_dir = data_dir.path().join("storage");
2425 let wal_dir = data_dir.path().join("wal");
2426
2427 {
2429 let config = EventStoreConfig::production(
2430 &storage_dir,
2431 &wal_dir,
2432 SnapshotConfig::default(),
2433 WALConfig {
2434 sync_on_write: true,
2435 ..WALConfig::default()
2436 },
2437 CompactionConfig::default(),
2438 );
2439 let store = EventStore::with_config(config);
2440
2441 for i in 0..5 {
2442 let event = Event::from_strings(
2443 "test.created".to_string(),
2444 format!("entity-{}", i),
2445 "default".to_string(),
2446 serde_json::json!({"index": i}),
2447 None,
2448 )
2449 .unwrap();
2450 store.ingest(event).unwrap();
2451 }
2452
2453 assert_eq!(store.stats().total_events, 5);
2454
2455 }
2458
2459 let wal_files: Vec<_> = std::fs::read_dir(&wal_dir)
2461 .unwrap()
2462 .filter_map(|e| e.ok())
2463 .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
2464 .collect();
2465 assert!(!wal_files.is_empty(), "WAL file should exist");
2466 let wal_size = wal_files[0].metadata().unwrap().len();
2467 assert!(wal_size > 0, "WAL file should have data (got 0 bytes)");
2468
2469 {
2471 let config = EventStoreConfig::production(
2472 &storage_dir,
2473 &wal_dir,
2474 SnapshotConfig::default(),
2475 WALConfig {
2476 sync_on_write: true,
2477 ..WALConfig::default()
2478 },
2479 CompactionConfig::default(),
2480 );
2481 let store = EventStore::with_config(config);
2482
2483 assert_eq!(
2485 store.stats().total_events,
2486 5,
2487 "Session 2 should have all 5 events after WAL recovery"
2488 );
2489
2490 let parquet_files: Vec<_> = std::fs::read_dir(&storage_dir)
2492 .unwrap()
2493 .filter_map(|e| e.ok())
2494 .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2495 .collect();
2496 assert!(
2497 !parquet_files.is_empty(),
2498 "Parquet file should exist after WAL checkpoint"
2499 );
2500 }
2501
2502 {
2504 let config = EventStoreConfig::production(
2505 &storage_dir,
2506 &wal_dir,
2507 SnapshotConfig::default(),
2508 WALConfig {
2509 sync_on_write: true,
2510 ..WALConfig::default()
2511 },
2512 CompactionConfig::default(),
2513 );
2514 let store = EventStore::with_config(config);
2515
2516 assert_eq!(
2517 store.stats().total_events,
2518 5,
2519 "Session 3 should still have all 5 events from Parquet"
2520 );
2521 }
2522 }
2523}