1#[cfg(feature = "server")]
2use crate::application::services::webhook::WebhookRegistry;
3#[cfg(feature = "server")]
4use crate::infrastructure::observability::metrics::MetricsRegistry;
5#[cfg(feature = "server")]
6use crate::infrastructure::web::websocket::WebSocketManager;
7use crate::{
8 application::{
9 dto::QueryEventsRequest,
10 services::{
11 consumer::ConsumerRegistry,
12 exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
13 pipeline::PipelineManager,
14 projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
15 replay::ReplayManager,
16 schema::{SchemaRegistry, SchemaRegistryConfig},
17 schema_evolution::SchemaEvolutionManager,
18 },
19 },
20 domain::entities::Event,
21 error::{AllSourceError, Result},
22 infrastructure::{
23 persistence::{
24 compaction::{CompactionConfig, CompactionManager},
25 index::{EventIndex, IndexEntry},
26 snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
27 storage::ParquetStorage,
28 tenant_loader::TenantLoader,
29 wal::{WALConfig, WriteAheadLog},
30 },
31 query::geospatial::GeoIndex,
32 },
33};
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use parking_lot::RwLock;
37use std::{path::PathBuf, sync::Arc};
38#[cfg(feature = "server")]
39use tokio::sync::mpsc;
40
41pub struct EventStore {
43 events: Arc<RwLock<Vec<Event>>>,
45
46 index: Arc<EventIndex>,
48
49 pub(crate) projections: Arc<RwLock<ProjectionManager>>,
51
52 storage: Option<Arc<RwLock<ParquetStorage>>>,
54
55 #[cfg(feature = "server")]
57 websocket_manager: Arc<WebSocketManager>,
58
59 snapshot_manager: Arc<SnapshotManager>,
61
62 wal: Option<Arc<WriteAheadLog>>,
64
65 compaction_manager: Option<Arc<CompactionManager>>,
67
68 schema_registry: Arc<SchemaRegistry>,
70
71 replay_manager: Arc<ReplayManager>,
73
74 pipeline_manager: Arc<PipelineManager>,
76
77 #[cfg(feature = "server")]
79 metrics: Arc<MetricsRegistry>,
80
81 total_ingested: Arc<RwLock<u64>>,
83
84 projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
88
89 projection_status: Arc<DashMap<String, String>>,
92
93 #[cfg(feature = "server")]
95 webhook_registry: Arc<WebhookRegistry>,
96
97 #[cfg(feature = "server")]
99 webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
100
101 geo_index: Arc<GeoIndex>,
103
104 exactly_once: Arc<ExactlyOnceRegistry>,
106
107 schema_evolution: Arc<SchemaEvolutionManager>,
109
110 entity_versions: Arc<DashMap<String, u64>>,
113
114 consumer_registry: Arc<ConsumerRegistry>,
116
117 event_broadcast_tx: tokio::sync::broadcast::Sender<Arc<Event>>,
121
122 tenant_loader: Arc<TenantLoader>,
128
129 checkpoint_interval_secs: Option<u64>,
135}
136
137#[cfg(feature = "server")]
139#[derive(Debug, Clone)]
140pub struct WebhookDeliveryTask {
141 pub webhook: crate::application::services::webhook::WebhookSubscription,
142 pub event: Event,
143}
144
145impl EventStore {
146 pub fn new() -> Self {
148 Self::with_config(EventStoreConfig::default())
149 }
150
151 pub fn with_config(config: EventStoreConfig) -> Self {
153 let mut projections = ProjectionManager::new();
154
155 projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
157 projections.register(Arc::new(EventCounterProjection::new("event_counters")));
158
159 let storage = config
161 .storage_dir
162 .as_ref()
163 .and_then(|dir| match ParquetStorage::new(dir) {
164 Ok(storage) => {
165 tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
166 Some(Arc::new(RwLock::new(storage)))
167 }
168 Err(e) => {
169 tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
170 None
171 }
172 });
173
174 let wal = config.wal_dir.as_ref().and_then(|dir| {
176 match WriteAheadLog::new(dir, config.wal_config.clone()) {
177 Ok(wal) => {
178 tracing::info!("✅ WAL enabled at: {}", dir.display());
179 Some(Arc::new(wal))
180 }
181 Err(e) => {
182 tracing::error!("❌ Failed to initialize WAL: {}", e);
183 None
184 }
185 }
186 });
187
188 let compaction_manager = config.storage_dir.as_ref().map(|dir| {
190 let manager = CompactionManager::new(dir, config.compaction_config.clone());
191 Arc::new(manager)
192 });
193
194 let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
196 tracing::info!("✅ Schema registry enabled");
197
198 let replay_manager = Arc::new(ReplayManager::new());
200 tracing::info!("✅ Replay manager enabled");
201
202 let pipeline_manager = Arc::new(PipelineManager::new());
204 tracing::info!("✅ Pipeline manager enabled");
205
206 #[cfg(feature = "server")]
208 let metrics = {
209 let m = MetricsRegistry::new();
210 tracing::info!("✅ Prometheus metrics registry initialized");
211 m
212 };
213
214 let projection_state_cache = Arc::new(DashMap::new());
216 tracing::info!("✅ Projection state cache initialized");
217
218 #[cfg(feature = "server")]
220 let webhook_registry = {
221 let w = Arc::new(WebhookRegistry::new());
222 tracing::info!("✅ Webhook registry initialized");
223 w
224 };
225
226 let (event_broadcast_tx, _) = tokio::sync::broadcast::channel(1024);
229
230 let store = Self {
231 events: Arc::new(RwLock::new(Vec::new())),
232 index: Arc::new(EventIndex::new()),
233 projections: Arc::new(RwLock::new(projections)),
234 storage,
235 #[cfg(feature = "server")]
236 websocket_manager: Arc::new(WebSocketManager::new()),
237 snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
238 wal,
239 compaction_manager,
240 schema_registry,
241 replay_manager,
242 pipeline_manager,
243 #[cfg(feature = "server")]
244 metrics,
245 total_ingested: Arc::new(RwLock::new(0)),
246 projection_state_cache,
247 projection_status: Arc::new(DashMap::new()),
248 #[cfg(feature = "server")]
249 webhook_registry,
250 #[cfg(feature = "server")]
251 webhook_tx: Arc::new(RwLock::new(None)),
252 geo_index: Arc::new(GeoIndex::new()),
253 exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
254 schema_evolution: Arc::new(SchemaEvolutionManager::new()),
255 entity_versions: Arc::new(DashMap::new()),
256 consumer_registry: Arc::new(ConsumerRegistry::new()),
257 event_broadcast_tx,
258 tenant_loader: {
259 let loader = TenantLoader::new();
260 if let Some(budget) = config.cache_byte_budget {
261 loader.set_byte_budget(budget);
262 tracing::info!(
263 "✅ Cache byte budget set to {} bytes ({:.2} GiB) — LRU eviction enabled",
264 budget,
265 budget as f64 / (1024.0 * 1024.0 * 1024.0)
266 );
267 } else {
268 tracing::info!(
269 "✅ Cache budget unset — every loaded tenant stays resident \
270 (set ALLSOURCE_CACHE_BYTES to enable eviction)"
271 );
272 }
273 Arc::new(loader)
274 },
275 checkpoint_interval_secs: config.checkpoint_interval_secs,
276 };
277
278 if let Some(ref wal) = store.wal {
300 match wal.recover() {
301 Ok(recovered_events) if !recovered_events.is_empty() => {
302 let mut wal_new = 0usize;
303 for event in recovered_events {
304 let offset = store.events.read().len();
305 if let Err(e) = store.index.index_event(
306 event.id,
307 event.entity_id_str(),
308 event.event_type_str(),
309 event.timestamp,
310 offset,
311 ) {
312 tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
313 }
314
315 if let Err(e) = store.projections.read().process_event(&event) {
316 tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
317 }
318
319 *store
320 .entity_versions
321 .entry(event.entity_id_str().to_string())
322 .or_insert(0) += 1;
323
324 store.events.write().push(event);
325 wal_new += 1;
326 }
327
328 #[cfg(feature = "server")]
333 store.metrics.wal_replay_events_total.set(wal_new as i64);
334
335 if wal_new > 0 {
336 let total = store.events.read().len();
337 *store.total_ingested.write() = total as u64;
345 tracing::info!(
346 "✅ Recovered {} events from WAL (Parquet data stays cold until \
347 first per-tenant query)",
348 wal_new
349 );
350
351 if let Some(ref storage) = store.storage {
359 tracing::info!(
360 "📸 Checkpointing {} WAL events to Parquet storage...",
361 wal_new
362 );
363 let parquet = storage.read();
364 let events = store.events.read();
365 let mut buffered = 0usize;
366 for event in events.iter().skip(events.len() - wal_new) {
367 if let Err(e) = parquet.append_event(event.clone()) {
368 tracing::error!(
369 "Failed to buffer WAL event for Parquet: {}",
370 e
371 );
372 } else {
373 buffered += 1;
374 }
375 }
376 drop(events);
377 drop(parquet);
378
379 if buffered > 0 {
380 if let Err(e) = store.flush_storage() {
381 tracing::error!("Failed to checkpoint to Parquet: {}", e);
382 } else if let Err(e) = wal.truncate() {
383 tracing::error!(
384 "Failed to truncate WAL after checkpoint: {}",
385 e
386 );
387 } else {
388 tracing::info!(
389 "✅ WAL checkpointed and truncated ({} events)",
390 buffered
391 );
392 }
393 }
394 }
395 }
396 }
397 Ok(_) => {
398 tracing::debug!("No events to recover from WAL");
399 #[cfg(feature = "server")]
400 store.metrics.wal_replay_events_total.set(0);
401 }
402 Err(e) => {
403 tracing::error!("❌ WAL recovery failed: {}", e);
404 }
405 }
406 } else if store.storage.is_some() {
407 tracing::info!(
408 "📂 Boot complete (lazy-load mode): Parquet data stays on disk until first \
409 per-tenant query"
410 );
411 }
412
413 store
414 }
415
416 #[cfg_attr(feature = "hotpath", hotpath::measure)]
424 pub fn ingest_with_expected_version(
425 &self,
426 event: &Event,
427 expected_version: Option<u64>,
428 ) -> Result<u64> {
429 self.validate_event(event)?;
431
432 let entity_id = event.entity_id_str().to_string();
433
434 let new_version = {
437 let mut version_entry = self.entity_versions.entry(entity_id.clone()).or_insert(0);
438 let current = *version_entry;
439
440 if let Some(expected) = expected_version
441 && current != expected
442 {
443 return Err(crate::error::AllSourceError::VersionConflict { expected, current });
444 }
445
446 if let Some(ref wal) = self.wal {
448 wal.append(event.clone())?;
449 }
450
451 *version_entry += 1;
452 *version_entry
453 };
454
455 self.ingest_post_wal(event)?;
458
459 Ok(new_version)
460 }
461
462 #[cfg_attr(feature = "hotpath", hotpath::measure)]
465 fn ingest_post_wal(&self, event: &Event) -> Result<()> {
466 #[cfg(feature = "server")]
467 let timer = self.metrics.ingestion_duration_seconds.start_timer();
468
469 let mut events = self.events.write();
470 let offset = events.len();
471
472 self.index.index_event(
474 event.id,
475 event.entity_id_str(),
476 event.event_type_str(),
477 event.timestamp,
478 offset,
479 )?;
480
481 let projections = self.projections.read();
483 projections.process_event(event)?;
484 drop(projections);
485
486 let pipeline_results = self.pipeline_manager.process_event(event);
488 if !pipeline_results.is_empty() {
489 tracing::debug!(
490 "Event {} processed by {} pipeline(s)",
491 event.id,
492 pipeline_results.len()
493 );
494 for (pipeline_id, result) in pipeline_results {
495 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
496 }
497 }
498
499 if let Some(ref storage) = self.storage {
501 let storage = storage.read();
502 storage.append_event(event.clone())?;
503 }
504
505 events.push(event.clone());
507 let total_events = events.len();
508 drop(events);
509
510 let event_arc = Arc::new(event.clone());
512 let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
513 #[cfg(feature = "server")]
514 self.websocket_manager.broadcast_event(event_arc);
515
516 #[cfg(feature = "server")]
518 self.dispatch_webhooks(event);
519
520 self.geo_index.index_event(event);
522
523 self.schema_evolution
525 .analyze_event(event.event_type_str(), &event.payload);
526
527 self.check_auto_snapshot(event.entity_id_str(), event);
529
530 #[cfg(feature = "server")]
532 {
533 self.metrics.events_ingested_total.inc();
534 self.metrics
535 .events_ingested_by_type
536 .with_label_values(&[event.event_type_str()])
537 .inc();
538 self.metrics.storage_events_total.set(total_events as i64);
539 }
540
541 let mut total = self.total_ingested.write();
543 *total += 1;
544
545 #[cfg(feature = "server")]
546 timer.observe_duration();
547
548 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
549
550 Ok(())
551 }
552
553 #[cfg_attr(feature = "hotpath", hotpath::measure)]
555 pub fn ingest(&self, event: &Event) -> Result<()> {
556 #[cfg(feature = "server")]
558 let timer = self.metrics.ingestion_duration_seconds.start_timer();
559
560 let validation_result = self.validate_event(event);
562 if let Err(e) = validation_result {
563 #[cfg(feature = "server")]
564 {
565 self.metrics.ingestion_errors_total.inc();
566 timer.observe_duration();
567 }
568 return Err(e);
569 }
570
571 if let Some(ref wal) = self.wal
574 && let Err(e) = wal.append(event.clone())
575 {
576 #[cfg(feature = "server")]
577 {
578 self.metrics.ingestion_errors_total.inc();
579 timer.observe_duration();
580 }
581 return Err(e);
582 }
583
584 *self
586 .entity_versions
587 .entry(event.entity_id_str().to_string())
588 .or_insert(0) += 1;
589
590 let mut events = self.events.write();
591 let offset = events.len();
592
593 self.index.index_event(
595 event.id,
596 event.entity_id_str(),
597 event.event_type_str(),
598 event.timestamp,
599 offset,
600 )?;
601
602 let projections = self.projections.read();
604 projections.process_event(event)?;
605 drop(projections); let pipeline_results = self.pipeline_manager.process_event(event);
610 if !pipeline_results.is_empty() {
611 tracing::debug!(
612 "Event {} processed by {} pipeline(s)",
613 event.id,
614 pipeline_results.len()
615 );
616 for (pipeline_id, result) in pipeline_results {
619 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
620 }
621 }
622
623 if let Some(ref storage) = self.storage {
625 let storage = storage.read();
626 storage.append_event(event.clone())?;
627 }
628
629 events.push(event.clone());
631 let total_events = events.len();
632 drop(events); let event_arc = Arc::new(event.clone());
636 let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
637 #[cfg(feature = "server")]
638 self.websocket_manager.broadcast_event(event_arc);
639
640 #[cfg(feature = "server")]
642 self.dispatch_webhooks(event);
643
644 self.geo_index.index_event(event);
646
647 self.schema_evolution
649 .analyze_event(event.event_type_str(), &event.payload);
650
651 self.check_auto_snapshot(event.entity_id_str(), event);
653
654 #[cfg(feature = "server")]
656 {
657 self.metrics.events_ingested_total.inc();
658 self.metrics
659 .events_ingested_by_type
660 .with_label_values(&[event.event_type_str()])
661 .inc();
662 self.metrics.storage_events_total.set(total_events as i64);
663 }
664
665 let mut total = self.total_ingested.write();
667 *total += 1;
668
669 #[cfg(feature = "server")]
670 timer.observe_duration();
671
672 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
673
674 Ok(())
675 }
676
677 #[cfg_attr(feature = "hotpath", hotpath::measure)]
684 pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()> {
685 if batch.is_empty() {
686 return Ok(());
687 }
688
689 for event in &batch {
691 self.validate_event(event)?;
692 }
693
694 if let Some(ref wal) = self.wal {
696 for event in &batch {
697 wal.append(event.clone())?;
698 }
699 }
700
701 let mut events = self.events.write();
703 let projections = self.projections.read();
704
705 for event in batch {
706 let offset = events.len();
707
708 self.index.index_event(
709 event.id,
710 event.entity_id_str(),
711 event.event_type_str(),
712 event.timestamp,
713 offset,
714 )?;
715
716 projections.process_event(&event)?;
717 self.pipeline_manager.process_event(&event);
718
719 if let Some(ref storage) = self.storage {
720 let storage = storage.read();
721 storage.append_event(event.clone())?;
722 }
723
724 self.geo_index.index_event(&event);
725 self.schema_evolution
726 .analyze_event(event.event_type_str(), &event.payload);
727
728 *self
730 .entity_versions
731 .entry(event.entity_id_str().to_string())
732 .or_insert(0) += 1;
733
734 let _ = self.event_broadcast_tx.send(Arc::new(event.clone()));
736
737 events.push(event);
738 }
739
740 let total_events = events.len();
741 drop(projections);
742 drop(events);
743
744 let mut total = self.total_ingested.write();
745 *total += total_events as u64;
746
747 Ok(())
748 }
749
750 #[cfg_attr(feature = "hotpath", hotpath::measure)]
757 pub fn ingest_replicated(&self, event: &Event) -> Result<()> {
758 #[cfg(feature = "server")]
759 let timer = self.metrics.ingestion_duration_seconds.start_timer();
760
761 let mut events = self.events.write();
762 let offset = events.len();
763
764 self.index.index_event(
766 event.id,
767 event.entity_id_str(),
768 event.event_type_str(),
769 event.timestamp,
770 offset,
771 )?;
772
773 let projections = self.projections.read();
775 projections.process_event(event)?;
776 drop(projections);
777
778 let pipeline_results = self.pipeline_manager.process_event(event);
780 if !pipeline_results.is_empty() {
781 tracing::debug!(
782 "Replicated event {} processed by {} pipeline(s)",
783 event.id,
784 pipeline_results.len()
785 );
786 }
787
788 *self
790 .entity_versions
791 .entry(event.entity_id_str().to_string())
792 .or_insert(0) += 1;
793
794 events.push(event.clone());
796 let total_events = events.len();
797 drop(events);
798
799 let event_arc = Arc::new(event.clone());
801 let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
802 #[cfg(feature = "server")]
803 self.websocket_manager.broadcast_event(event_arc);
804
805 #[cfg(feature = "server")]
807 {
808 self.metrics.events_ingested_total.inc();
809 self.metrics
810 .events_ingested_by_type
811 .with_label_values(&[event.event_type_str()])
812 .inc();
813 self.metrics.storage_events_total.set(total_events as i64);
814 }
815
816 let mut total = self.total_ingested.write();
817 *total += 1;
818
819 #[cfg(feature = "server")]
820 timer.observe_duration();
821
822 tracing::debug!(
823 "Replicated event ingested: {} (offset: {})",
824 event.id,
825 offset
826 );
827
828 Ok(())
829 }
830
831 #[cfg_attr(feature = "hotpath", hotpath::measure)]
834 pub fn get_entity_version(&self, entity_id: &str) -> u64 {
835 self.entity_versions.get(entity_id).map_or(0, |v| *v)
836 }
837
838 pub fn consumer_registry(&self) -> &ConsumerRegistry {
840 &self.consumer_registry
841 }
842
843 pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<Arc<Event>> {
849 self.event_broadcast_tx.subscribe()
850 }
851
852 pub fn set_consumer_registry(&mut self, registry: Arc<ConsumerRegistry>) {
857 self.consumer_registry = registry;
858 }
859
860 pub fn total_events(&self) -> usize {
862 self.events.read().len()
863 }
864
865 pub fn events_after_offset(
868 &self,
869 offset: u64,
870 filters: &[String],
871 limit: usize,
872 ) -> Vec<(u64, Event)> {
873 let events = self.events.read();
874 let start = offset as usize;
875 if start >= events.len() {
876 return vec![];
877 }
878
879 events[start..]
880 .iter()
881 .enumerate()
882 .filter(|(_, event)| ConsumerRegistry::matches_filters(event.event_type_str(), filters))
883 .take(limit)
884 .map(|(i, event)| ((start + i + 1) as u64, event.clone()))
885 .collect()
886 }
887
888 #[cfg(feature = "server")]
890 pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
891 Arc::clone(&self.websocket_manager)
892 }
893
894 pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
896 Arc::clone(&self.snapshot_manager)
897 }
898
899 pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
901 self.compaction_manager.as_ref().map(Arc::clone)
902 }
903
904 pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
906 Arc::clone(&self.schema_registry)
907 }
908
909 pub fn replay_manager(&self) -> Arc<ReplayManager> {
911 Arc::clone(&self.replay_manager)
912 }
913
914 pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
916 Arc::clone(&self.pipeline_manager)
917 }
918
919 #[cfg(feature = "server")]
921 pub fn metrics(&self) -> Arc<MetricsRegistry> {
922 Arc::clone(&self.metrics)
923 }
924
925 pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
927 self.projections.read()
928 }
929
930 pub fn register_projection(
939 &self,
940 projection: Arc<dyn crate::application::services::projection::Projection>,
941 ) {
942 let mut pm = self.projections.write();
943 pm.register(projection);
944 }
945
946 pub fn register_projection_with_backfill(
952 &self,
953 projection: &Arc<dyn crate::application::services::projection::Projection>,
954 ) -> Result<()> {
955 {
957 let mut pm = self.projections.write();
958 pm.register(Arc::clone(projection));
959 }
960
961 let events = self.events.read();
963 for event in events.iter() {
964 projection.process(event)?;
965 }
966
967 Ok(())
968 }
969
970 pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
973 Arc::clone(&self.projection_state_cache)
974 }
975
976 pub fn projection_status(&self) -> Arc<DashMap<String, String>> {
978 Arc::clone(&self.projection_status)
979 }
980
981 pub fn geo_index(&self) -> Arc<GeoIndex> {
984 self.geo_index.clone()
985 }
986
987 pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
989 self.exactly_once.clone()
990 }
991
992 pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
994 self.schema_evolution.clone()
995 }
996
997 pub fn snapshot_events(&self) -> Vec<Event> {
1003 self.events.read().clone()
1004 }
1005
1006 pub fn compact_entity_tokens(
1028 &self,
1029 entity_id: &str,
1030 token_event_type: &str,
1031 merged_event: Event,
1032 ) -> Result<bool> {
1033 {
1035 let events = self.events.read();
1036 let has_tokens = events
1037 .iter()
1038 .any(|e| e.entity_id_str() == entity_id && e.event_type_str() == token_event_type);
1039 if !has_tokens {
1040 return Ok(false);
1041 }
1042 }
1043
1044 let projections = self.projections.read();
1046 projections.process_event(&merged_event)?;
1047 drop(projections);
1048
1049 let mut events = self.events.write();
1051
1052 events.retain(|e| {
1053 !(e.entity_id_str() == entity_id && e.event_type_str() == token_event_type)
1054 });
1055
1056 events.push(merged_event.clone());
1057
1058 if let Some(ref wal) = self.wal {
1062 wal.append(merged_event)?;
1063 }
1064
1065 self.index.clear();
1070 for (offset, event) in events.iter().enumerate() {
1071 if let Err(e) = self.index.index_event(
1072 event.id,
1073 event.entity_id_str(),
1074 event.event_type_str(),
1075 event.timestamp,
1076 offset,
1077 ) {
1078 tracing::warn!(
1079 event_id = %event.id,
1080 offset,
1081 "Failed to re-index event during compaction: {e}"
1082 );
1083 }
1084 }
1085
1086 Ok(true)
1087 }
1088
1089 #[cfg(feature = "server")]
1090 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
1091 Arc::clone(&self.webhook_registry)
1092 }
1093
1094 #[cfg(feature = "server")]
1097 pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
1098 *self.webhook_tx.write() = Some(tx);
1099 tracing::info!("Webhook delivery channel connected");
1100 }
1101
1102 #[cfg(feature = "server")]
1104 fn dispatch_webhooks(&self, event: &Event) {
1105 let matching = self.webhook_registry.find_matching(event);
1106 if matching.is_empty() {
1107 return;
1108 }
1109
1110 let tx_guard = self.webhook_tx.read();
1111 if let Some(ref tx) = *tx_guard {
1112 for webhook in matching {
1113 let task = WebhookDeliveryTask {
1114 webhook,
1115 event: event.clone(),
1116 };
1117 if let Err(e) = tx.send(task) {
1118 tracing::warn!("Failed to queue webhook delivery: {}", e);
1119 }
1120 }
1121 }
1122 }
1123
1124 pub fn flush_storage(&self) -> Result<()> {
1126 if let Some(ref storage) = self.storage {
1127 let storage = storage.read();
1128 storage.flush()?;
1129 tracing::info!("✅ Flushed events to persistent storage");
1130 }
1131 Ok(())
1132 }
1133
1134 pub fn checkpoint(&self) -> Result<()> {
1155 let Some(ref wal) = self.wal else {
1156 return Ok(());
1157 };
1158
1159 self.flush_storage()?;
1164 wal.truncate()?;
1165 tracing::debug!("✅ Checkpoint complete: Parquet flushed, WAL truncated");
1166 Ok(())
1167 }
1168
1169 pub fn checkpoint_interval(&self) -> Option<std::time::Duration> {
1171 self.checkpoint_interval_secs
1172 .map(std::time::Duration::from_secs)
1173 }
1174
1175 pub fn ensure_tenant_loaded(&self, tenant_id: &str) -> Result<()> {
1199 if self.tenant_loader.is_loaded(tenant_id) {
1201 return Ok(());
1202 }
1203
1204 let Some(storage) = self.storage.as_ref().map(Arc::clone) else {
1205 self.tenant_loader.mark_loaded(tenant_id);
1208 return Ok(());
1209 };
1210
1211 let lock = self.tenant_loader.lock_for(tenant_id);
1214 let timeout = self.tenant_loader.load_timeout();
1215 let _guard = lock.try_lock_for(timeout).ok_or_else(|| {
1216 AllSourceError::StorageError(format!(
1217 "ensure_tenant_loaded timed out after {timeout:?} waiting for in-flight load of \
1218 tenant {tenant_id:?}"
1219 ))
1220 })?;
1221
1222 if self.tenant_loader.is_loaded(tenant_id) {
1225 return Ok(());
1226 }
1227
1228 let started = std::time::Instant::now();
1229 let events = storage.read().load_events_for_tenant(tenant_id)?;
1230 let read_count = events.len();
1231
1232 let before = self.events.read().len();
1233 for event in events {
1234 self.append_loaded_event(event);
1235 }
1236 let applied = self.events.read().len() - before;
1237
1238 *self.total_ingested.write() += applied as u64;
1242 self.tenant_loader.mark_loaded(tenant_id);
1243
1244 tracing::info!(
1245 tenant_id = tenant_id,
1246 read = read_count,
1247 applied = applied,
1248 elapsed_ms = started.elapsed().as_millis() as u64,
1249 "ensure_tenant_loaded: tenant hydrated"
1250 );
1251
1252 self.enforce_cache_budget(tenant_id);
1260
1261 #[cfg(feature = "server")]
1264 self.metrics
1265 .cache_bytes
1266 .set(self.tenant_loader.total_bytes() as i64);
1267
1268 Ok(())
1269 }
1270
1271 fn enforce_cache_budget(&self, recently_touched: &str) {
1282 if !self.tenant_loader.over_budget() {
1283 return;
1284 }
1285 loop {
1286 let Some(victim) = self.tenant_loader.pick_lru_excluding(recently_touched) else {
1287 tracing::warn!(
1288 cache_bytes = self.tenant_loader.total_bytes(),
1289 budget = self.tenant_loader.byte_budget(),
1290 recently_touched = recently_touched,
1291 "cache over budget but no other tenant available to evict — \
1292 a single tenant exceeds the budget; consider raising it"
1293 );
1294 return;
1295 };
1296 self.evict_tenant(&victim);
1297 if !self.tenant_loader.over_budget() {
1298 return;
1299 }
1300 }
1301 }
1302
1303 pub fn is_tenant_loaded(&self, tenant_id: &str) -> bool {
1306 self.tenant_loader.is_loaded(tenant_id)
1307 }
1308
1309 pub fn evict_tenant(&self, tenant_id: &str) {
1337 let mut events = self.events.write();
1338 let before = events.len();
1339 let evicted_bytes = self.tenant_loader.bytes_for(tenant_id);
1340
1341 events.retain(|e| e.tenant_id_str() != tenant_id);
1342 let after = events.len();
1343 let dropped = before - after;
1344
1345 if dropped == 0 {
1346 drop(events);
1350 self.tenant_loader.mark_unloaded(tenant_id);
1351 return;
1352 }
1353
1354 self.index.clear();
1358 self.entity_versions.clear();
1359 for (offset, event) in events.iter().enumerate() {
1360 if let Err(e) = self.index.index_event(
1361 event.id,
1362 event.entity_id_str(),
1363 event.event_type_str(),
1364 event.timestamp,
1365 offset,
1366 ) {
1367 tracing::error!(
1368 "Failed to re-index event during eviction of {}: {}",
1369 tenant_id,
1370 e
1371 );
1372 }
1373 *self
1374 .entity_versions
1375 .entry(event.entity_id_str().to_string())
1376 .or_insert(0) += 1;
1377 }
1378 drop(events);
1379
1380 self.tenant_loader.mark_unloaded(tenant_id);
1381
1382 let mut t = self.total_ingested.write();
1385 *t = t.saturating_sub(dropped as u64);
1386 drop(t);
1387
1388 #[cfg(feature = "server")]
1391 {
1392 self.metrics.cache_evictions_total.inc();
1393 self.metrics
1394 .cache_bytes
1395 .set(self.tenant_loader.total_bytes() as i64);
1396 }
1397
1398 tracing::info!(
1399 tenant_id = tenant_id,
1400 events_dropped = dropped,
1401 bytes_freed = evicted_bytes,
1402 "evicted tenant from memory cache"
1403 );
1404 }
1405
1406 pub fn tenant_resident_bytes(&self, tenant_id: &str) -> u64 {
1410 self.tenant_loader.bytes_for(tenant_id)
1411 }
1412
1413 pub fn cache_resident_bytes(&self) -> u64 {
1416 self.tenant_loader.total_bytes()
1417 }
1418
1419 fn append_loaded_event(&self, event: Event) {
1441 if self.index.get_by_id(&event.id).is_some() {
1442 return;
1443 }
1444
1445 let event_bytes = event.estimated_size_bytes();
1446 let tenant = event.tenant_id_str().to_string();
1447
1448 let mut events = self.events.write();
1449 let offset = events.len();
1450
1451 if let Err(e) = self.index.index_event(
1452 event.id,
1453 event.entity_id_str(),
1454 event.event_type_str(),
1455 event.timestamp,
1456 offset,
1457 ) {
1458 tracing::error!("Failed to index loaded event {}: {}", event.id, e);
1459 }
1460
1461 if let Err(e) = self.projections.read().process_event(&event) {
1462 tracing::error!("Failed to project loaded event {}: {}", event.id, e);
1463 }
1464
1465 *self
1466 .entity_versions
1467 .entry(event.entity_id_str().to_string())
1468 .or_insert(0) += 1;
1469
1470 events.push(event);
1471 self.tenant_loader.add_bytes(&tenant, event_bytes);
1475 }
1476
1477 pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
1479 let events = self.query(&QueryEventsRequest {
1481 entity_id: Some(entity_id.to_string()),
1482 event_type: None,
1483 tenant_id: None,
1484 as_of: None,
1485 since: None,
1486 until: None,
1487 limit: None,
1488 event_type_prefix: None,
1489 payload_filter: None,
1490 })?;
1491
1492 if events.is_empty() {
1493 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1494 }
1495
1496 let mut state = serde_json::json!({});
1498 for event in &events {
1499 if let serde_json::Value::Object(ref mut state_map) = state
1500 && let serde_json::Value::Object(ref payload_map) = event.payload
1501 {
1502 for (key, value) in payload_map {
1503 state_map.insert(key.clone(), value.clone());
1504 }
1505 }
1506 }
1507
1508 let last_event = events.last().unwrap();
1509 self.snapshot_manager.create_snapshot(
1510 entity_id,
1511 state,
1512 last_event.timestamp,
1513 events.len(),
1514 SnapshotType::Manual,
1515 )?;
1516
1517 Ok(())
1518 }
1519
1520 fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
1522 let entity_event_count = self
1524 .index
1525 .get_by_entity(entity_id)
1526 .map_or(0, |entries| entries.len());
1527
1528 if self.snapshot_manager.should_create_snapshot(
1529 entity_id,
1530 entity_event_count,
1531 event.timestamp,
1532 ) {
1533 if let Err(e) = self.create_snapshot(entity_id) {
1535 tracing::warn!(
1536 "Failed to create automatic snapshot for {}: {}",
1537 entity_id,
1538 e
1539 );
1540 }
1541 }
1542 }
1543
1544 fn validate_event(&self, event: &Event) -> Result<()> {
1546 if event.entity_id_str().is_empty() {
1549 return Err(AllSourceError::ValidationError(
1550 "entity_id cannot be empty".to_string(),
1551 ));
1552 }
1553
1554 if event.event_type_str().is_empty() {
1555 return Err(AllSourceError::ValidationError(
1556 "event_type cannot be empty".to_string(),
1557 ));
1558 }
1559
1560 if event.event_type().is_system() {
1563 return Err(AllSourceError::ValidationError(
1564 "Event types starting with '_system.' are reserved for internal use".to_string(),
1565 ));
1566 }
1567
1568 Ok(())
1569 }
1570
1571 pub fn reset_projection(&self, name: &str) -> Result<usize> {
1573 let projection_manager = self.projections.read();
1574 let projection = projection_manager.get_projection(name).ok_or_else(|| {
1575 AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1576 })?;
1577
1578 projection.clear();
1580
1581 let prefix = format!("{name}:");
1583 let keys_to_remove: Vec<String> = self
1584 .projection_state_cache
1585 .iter()
1586 .filter(|entry| entry.key().starts_with(&prefix))
1587 .map(|entry| entry.key().clone())
1588 .collect();
1589 for key in keys_to_remove {
1590 self.projection_state_cache.remove(&key);
1591 }
1592
1593 let events = self.events.read();
1595 let mut reprocessed = 0usize;
1596 for event in events.iter() {
1597 if projection.process(event).is_ok() {
1598 reprocessed += 1;
1599 }
1600 }
1601
1602 Ok(reprocessed)
1603 }
1604
1605 pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
1607 if let Some(offset) = self.index.get_by_id(event_id) {
1608 let events = self.events.read();
1609 Ok(events.get(offset).cloned())
1610 } else {
1611 Ok(None)
1612 }
1613 }
1614
1615 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1617 pub fn query(&self, request: &QueryEventsRequest) -> Result<Vec<Event>> {
1618 if let Some(ref tenant_id) = request.tenant_id {
1637 self.ensure_tenant_loaded(tenant_id)?;
1638 self.tenant_loader.touch(tenant_id);
1642 }
1643
1644 let query_type = if request.entity_id.is_some() {
1646 "entity"
1647 } else if request.event_type.is_some() {
1648 "type"
1649 } else if request.event_type_prefix.is_some() {
1650 "type_prefix"
1651 } else {
1652 "full_scan"
1653 };
1654
1655 #[cfg(feature = "server")]
1657 let timer = self
1658 .metrics
1659 .query_duration_seconds
1660 .with_label_values(&[query_type])
1661 .start_timer();
1662
1663 #[cfg(feature = "server")]
1665 self.metrics
1666 .queries_total
1667 .with_label_values(&[query_type])
1668 .inc();
1669
1670 let events = self.events.read();
1671
1672 let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
1674 self.index
1676 .get_by_entity(entity_id)
1677 .map(|entries| self.filter_entries(entries, request))
1678 .unwrap_or_default()
1679 } else if let Some(event_type) = &request.event_type {
1680 self.index
1682 .get_by_type(event_type)
1683 .map(|entries| self.filter_entries(entries, request))
1684 .unwrap_or_default()
1685 } else if let Some(prefix) = &request.event_type_prefix {
1686 let entries = self.index.get_by_type_prefix(prefix);
1688 self.filter_entries(entries, request)
1689 } else {
1690 (0..events.len()).collect()
1692 };
1693
1694 let mut results: Vec<Event> = offsets
1696 .iter()
1697 .filter_map(|&offset| events.get(offset).cloned())
1698 .filter(|event| self.apply_filters(event, request))
1699 .collect();
1700
1701 results.sort_by_key(|x| x.timestamp);
1703
1704 if let Some(limit) = request.limit {
1706 results.truncate(limit);
1707 }
1708
1709 #[cfg(feature = "server")]
1711 {
1712 self.metrics
1713 .query_results_total
1714 .with_label_values(&[query_type])
1715 .inc_by(results.len() as u64);
1716 timer.observe_duration();
1717 }
1718
1719 Ok(results)
1720 }
1721
1722 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1724 fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
1725 entries
1726 .into_iter()
1727 .filter(|entry| {
1728 if let Some(as_of) = request.as_of
1730 && entry.timestamp > as_of
1731 {
1732 return false;
1733 }
1734 if let Some(since) = request.since
1735 && entry.timestamp < since
1736 {
1737 return false;
1738 }
1739 if let Some(until) = request.until
1740 && entry.timestamp > until
1741 {
1742 return false;
1743 }
1744 true
1745 })
1746 .map(|entry| entry.offset)
1747 .collect()
1748 }
1749
1750 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1752 fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
1753 if let Some(ref tid) = request.tenant_id
1755 && event.tenant_id_str() != tid
1756 {
1757 return false;
1758 }
1759
1760 if request.entity_id.is_some()
1762 && let Some(ref event_type) = request.event_type
1763 && event.event_type_str() != event_type
1764 {
1765 return false;
1766 }
1767
1768 if request.entity_id.is_some()
1770 && let Some(ref prefix) = request.event_type_prefix
1771 && !event.event_type_str().starts_with(prefix)
1772 {
1773 return false;
1774 }
1775
1776 if let Some(ref filter_str) = request.payload_filter
1778 && let Ok(filter_obj) =
1779 serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(filter_str)
1780 {
1781 let payload = event.payload();
1782 for (key, expected_value) in &filter_obj {
1783 match payload.get(key) {
1784 Some(actual_value) if actual_value == expected_value => {}
1785 _ => return false,
1786 }
1787 }
1788 }
1789
1790 true
1791 }
1792
1793 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1796 pub fn reconstruct_state(
1797 &self,
1798 entity_id: &str,
1799 as_of: Option<DateTime<Utc>>,
1800 ) -> Result<serde_json::Value> {
1801 let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
1803 if let Some(snapshot) = self
1805 .snapshot_manager
1806 .get_snapshot_as_of(entity_id, as_of_time)
1807 {
1808 tracing::debug!(
1809 "Using snapshot from {} for entity {} (saved {} events)",
1810 snapshot.as_of,
1811 entity_id,
1812 snapshot.event_count
1813 );
1814 (snapshot.state.clone(), Some(snapshot.as_of))
1815 } else {
1816 (serde_json::json!({}), None)
1817 }
1818 } else {
1819 if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
1821 tracing::debug!(
1822 "Using latest snapshot from {} for entity {}",
1823 snapshot.as_of,
1824 entity_id
1825 );
1826 (snapshot.state.clone(), Some(snapshot.as_of))
1827 } else {
1828 (serde_json::json!({}), None)
1829 }
1830 };
1831
1832 let events = self.query(&QueryEventsRequest {
1834 entity_id: Some(entity_id.to_string()),
1835 event_type: None,
1836 tenant_id: None,
1837 as_of,
1838 since: since_timestamp,
1839 until: None,
1840 limit: None,
1841 event_type_prefix: None,
1842 payload_filter: None,
1843 })?;
1844
1845 if events.is_empty() && since_timestamp.is_none() {
1847 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1848 }
1849
1850 let mut merged_state = merged_state;
1852 for event in &events {
1853 if let serde_json::Value::Object(ref mut state_map) = merged_state
1854 && let serde_json::Value::Object(ref payload_map) = event.payload
1855 {
1856 for (key, value) in payload_map {
1857 state_map.insert(key.clone(), value.clone());
1858 }
1859 }
1860 }
1861
1862 let state = serde_json::json!({
1864 "entity_id": entity_id,
1865 "last_updated": events.last().map(|e| e.timestamp),
1866 "event_count": events.len(),
1867 "as_of": as_of,
1868 "current_state": merged_state,
1869 "history": events.iter().map(|e| {
1870 serde_json::json!({
1871 "event_id": e.id,
1872 "type": e.event_type,
1873 "timestamp": e.timestamp,
1874 "payload": e.payload
1875 })
1876 }).collect::<Vec<_>>()
1877 });
1878
1879 Ok(state)
1880 }
1881
1882 pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
1884 let projections = self.projections.read();
1885
1886 if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
1887 && let Some(state) = snapshot_projection.get_state(entity_id)
1888 {
1889 return Ok(serde_json::json!({
1890 "entity_id": entity_id,
1891 "snapshot": state,
1892 "from_projection": "entity_snapshots"
1893 }));
1894 }
1895
1896 Err(AllSourceError::EntityNotFound(entity_id.to_string()))
1897 }
1898
1899 pub fn stats(&self) -> StoreStats {
1901 let events = self.events.read();
1902 let index_stats = self.index.stats();
1903
1904 StoreStats {
1905 total_events: events.len(),
1906 total_entities: index_stats.total_entities,
1907 total_event_types: index_stats.total_event_types,
1908 total_ingested: *self.total_ingested.read(),
1909 }
1910 }
1911
1912 pub fn list_streams(&self) -> Vec<StreamInfo> {
1914 self.index
1915 .get_all_entities()
1916 .into_iter()
1917 .map(|entity_id| {
1918 let event_count = self
1919 .index
1920 .get_by_entity(&entity_id)
1921 .map_or(0, |entries| entries.len());
1922 let last_event_at = self
1923 .index
1924 .get_by_entity(&entity_id)
1925 .and_then(|entries| entries.last().map(|e| e.timestamp));
1926 StreamInfo {
1927 stream_id: entity_id,
1928 event_count,
1929 last_event_at,
1930 }
1931 })
1932 .collect()
1933 }
1934
1935 pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
1937 self.index
1938 .get_all_types()
1939 .into_iter()
1940 .map(|event_type| {
1941 let event_count = self
1942 .index
1943 .get_by_type(&event_type)
1944 .map_or(0, |entries| entries.len());
1945 let last_event_at = self
1946 .index
1947 .get_by_type(&event_type)
1948 .and_then(|entries| entries.last().map(|e| e.timestamp));
1949 EventTypeInfo {
1950 event_type,
1951 event_count,
1952 last_event_at,
1953 }
1954 })
1955 .collect()
1956 }
1957
1958 pub fn enable_wal_replication(
1965 &self,
1966 tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
1967 ) {
1968 if let Some(ref wal_arc) = self.wal {
1969 wal_arc.set_replication_tx(tx);
1970 tracing::info!("WAL replication broadcast enabled");
1971 } else {
1972 tracing::warn!("Cannot enable WAL replication: WAL is not configured");
1973 }
1974 }
1975
1976 pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1979 self.wal.as_ref()
1980 }
1981
1982 pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1985 self.storage.as_ref()
1986 }
1987}
1988
1989#[derive(Debug, Clone, Default)]
1991pub struct EventStoreConfig {
1992 pub storage_dir: Option<PathBuf>,
1994
1995 pub snapshot_config: SnapshotConfig,
1997
1998 pub wal_dir: Option<PathBuf>,
2000
2001 pub wal_config: WALConfig,
2003
2004 pub compaction_config: CompactionConfig,
2006
2007 pub schema_registry_config: SchemaRegistryConfig,
2009
2010 pub system_data_dir: Option<PathBuf>,
2015
2016 pub bootstrap_tenant: Option<String>,
2018
2019 pub cache_byte_budget: Option<u64>,
2026
2027 pub checkpoint_interval_secs: Option<u64>,
2039}
2040
2041impl EventStoreConfig {
2042 pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
2044 Self {
2045 storage_dir: Some(storage_dir.into()),
2046 ..Self::default()
2047 }
2048 }
2049
2050 pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
2052 Self {
2053 snapshot_config,
2054 ..Self::default()
2055 }
2056 }
2057
2058 pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
2060 Self {
2061 wal_dir: Some(wal_dir.into()),
2062 wal_config,
2063 ..Self::default()
2064 }
2065 }
2066
2067 pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
2069 Self {
2070 storage_dir: Some(storage_dir.into()),
2071 snapshot_config,
2072 ..Self::default()
2073 }
2074 }
2075
2076 pub fn production(
2078 storage_dir: impl Into<PathBuf>,
2079 wal_dir: impl Into<PathBuf>,
2080 snapshot_config: SnapshotConfig,
2081 wal_config: WALConfig,
2082 compaction_config: CompactionConfig,
2083 ) -> Self {
2084 let storage_dir = storage_dir.into();
2085 let system_data_dir = storage_dir.join("__system");
2086 Self {
2087 storage_dir: Some(storage_dir),
2088 snapshot_config,
2089 wal_dir: Some(wal_dir.into()),
2090 wal_config,
2091 compaction_config,
2092 system_data_dir: Some(system_data_dir),
2093 ..Self::default()
2094 }
2095 }
2096
2097 pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
2102 self.system_data_dir
2103 .clone()
2104 .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
2105 }
2106
2107 pub fn from_env() -> (Self, &'static str) {
2115 Self::from_env_vars(
2116 std::env::var("ALLSOURCE_DATA_DIR")
2117 .ok()
2118 .filter(|s| !s.is_empty()),
2119 std::env::var("ALLSOURCE_STORAGE_DIR")
2120 .ok()
2121 .filter(|s| !s.is_empty()),
2122 std::env::var("ALLSOURCE_WAL_DIR")
2123 .ok()
2124 .filter(|s| !s.is_empty()),
2125 std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
2126 std::env::var("ALLSOURCE_CACHE_BYTES").ok(),
2127 std::env::var("ALLSOURCE_SNAPSHOT_INTERVAL_SECONDS").ok(),
2128 std::env::var("ALLSOURCE_RETENTION_SYSTEM_DAYS").ok(),
2129 std::env::var("ALLSOURCE_CHECKPOINT_INTERVAL_SECONDS").ok(),
2130 )
2131 }
2132
2133 pub fn from_env_vars(
2135 data_dir: Option<String>,
2136 explicit_storage_dir: Option<String>,
2137 explicit_wal_dir: Option<String>,
2138 wal_enabled_var: Option<String>,
2139 cache_bytes_var: Option<String>,
2140 snapshot_interval_var: Option<String>,
2141 retention_system_days_var: Option<String>,
2142 checkpoint_interval_var: Option<String>,
2143 ) -> (Self, &'static str) {
2144 let data_dir = data_dir.filter(|s| !s.is_empty());
2145 let storage_dir = explicit_storage_dir
2146 .filter(|s| !s.is_empty())
2147 .or_else(|| data_dir.as_ref().map(|d| format!("{d}/storage")));
2148 let wal_dir = explicit_wal_dir
2149 .filter(|s| !s.is_empty())
2150 .or_else(|| data_dir.as_ref().map(|d| format!("{d}/wal")));
2151 let wal_enabled = wal_enabled_var.is_none_or(|v| v == "true");
2152 let cache_byte_budget =
2157 cache_bytes_var
2158 .filter(|s| !s.is_empty())
2159 .and_then(|s| match s.parse::<u64>() {
2160 Ok(v) => Some(v),
2161 Err(e) => {
2162 tracing::warn!(
2163 "ALLSOURCE_CACHE_BYTES={s:?} could not be parsed as u64: {e}; \
2164 cache budget disabled"
2165 );
2166 None
2167 }
2168 });
2169 let compaction_config =
2170 CompactionConfig::from_env_vars(snapshot_interval_var, retention_system_days_var);
2171
2172 let checkpoint_interval_secs = if wal_enabled {
2177 checkpoint_interval_var
2178 .filter(|s| !s.is_empty())
2179 .map(|s| match s.parse::<u64>() {
2180 Ok(v) => v,
2181 Err(e) => {
2182 tracing::warn!(
2183 "ALLSOURCE_CHECKPOINT_INTERVAL_SECONDS={s:?} could not be parsed as \
2184 u64: {e}; falling back to default 60s"
2185 );
2186 60
2187 }
2188 })
2189 .or(Some(60))
2190 } else {
2191 None
2192 };
2193
2194 let mut config = match (&storage_dir, &wal_dir) {
2195 (Some(sd), Some(wd)) if wal_enabled => Self::production(
2196 sd,
2197 wd,
2198 SnapshotConfig::default(),
2199 WALConfig::default(),
2200 compaction_config,
2201 ),
2202 (Some(sd), _) => Self::with_persistence(sd),
2203 (_, Some(wd)) if wal_enabled => Self::with_wal(wd, WALConfig::default()),
2204 _ => Self::default(),
2205 };
2206 config.cache_byte_budget = cache_byte_budget;
2207 config.checkpoint_interval_secs = checkpoint_interval_secs;
2208
2209 let mode = match (&storage_dir, &wal_dir) {
2210 (Some(_), Some(_)) if wal_enabled => "wal+parquet",
2211 (Some(_), _) => "parquet-only",
2212 (_, Some(_)) if wal_enabled => "wal-only",
2213 _ => "in-memory",
2214 };
2215 (config, mode)
2216 }
2217}
2218
2219#[derive(Debug, serde::Serialize)]
2220pub struct StoreStats {
2221 pub total_events: usize,
2222 pub total_entities: usize,
2223 pub total_event_types: usize,
2224 pub total_ingested: u64,
2225}
2226
2227#[derive(Debug, Clone, serde::Serialize)]
2229pub struct StreamInfo {
2230 pub stream_id: String,
2232 pub event_count: usize,
2234 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
2236}
2237
2238#[derive(Debug, Clone, serde::Serialize)]
2240pub struct EventTypeInfo {
2241 pub event_type: String,
2243 pub event_count: usize,
2245 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
2247}
2248
2249impl Default for EventStore {
2250 fn default() -> Self {
2251 Self::new()
2252 }
2253}
2254
2255#[cfg(test)]
2256mod tests {
2257 use super::*;
2258 use crate::domain::entities::Event;
2259 use tempfile::TempDir;
2260
2261 fn find_parquet_files(dir: &std::path::Path) -> Vec<std::path::PathBuf> {
2266 let mut out = Vec::new();
2267 let mut stack = vec![dir.to_path_buf()];
2268 while let Some(d) = stack.pop() {
2269 let Ok(entries) = std::fs::read_dir(&d) else {
2270 continue;
2271 };
2272 for e in entries.flatten() {
2273 let p = e.path();
2274 if p.is_dir() {
2275 stack.push(p);
2276 } else if p.extension().and_then(|s| s.to_str()) == Some("parquet") {
2277 out.push(p);
2278 }
2279 }
2280 }
2281 out
2282 }
2283
2284 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2285 Event::from_strings(
2286 event_type.to_string(),
2287 entity_id.to_string(),
2288 "default".to_string(),
2289 serde_json::json!({"name": "Test", "value": 42}),
2290 None,
2291 )
2292 .unwrap()
2293 }
2294
2295 fn create_test_event_with_payload(
2296 entity_id: &str,
2297 event_type: &str,
2298 payload: serde_json::Value,
2299 ) -> Event {
2300 Event::from_strings(
2301 event_type.to_string(),
2302 entity_id.to_string(),
2303 "default".to_string(),
2304 payload,
2305 None,
2306 )
2307 .unwrap()
2308 }
2309
2310 #[test]
2311 fn test_event_store_new() {
2312 let store = EventStore::new();
2313 assert_eq!(store.stats().total_events, 0);
2314 assert_eq!(store.stats().total_entities, 0);
2315 }
2316
2317 #[test]
2324 fn test_ensure_tenant_loaded_no_storage_is_a_noop() {
2325 let store = EventStore::new();
2329 assert!(!store.is_tenant_loaded("alice"));
2330 store.ensure_tenant_loaded("alice").unwrap();
2331 assert!(store.is_tenant_loaded("alice"));
2332 assert!(!store.is_tenant_loaded("bob"));
2334 }
2335
2336 #[test]
2337 fn test_ensure_tenant_loaded_warm_path_is_idempotent() {
2338 let store = EventStore::new();
2339 store.ensure_tenant_loaded("alice").unwrap();
2340 store.ensure_tenant_loaded("alice").unwrap();
2342 }
2343
2344 #[test]
2345 fn test_ensure_tenant_loaded_rejects_unsafe_tenant_id() {
2346 let temp_dir = TempDir::new().unwrap();
2352 let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
2353 for unsafe_tid in ["..", "a/b", "a\\b", ""] {
2354 let result = store.ensure_tenant_loaded(unsafe_tid);
2355 assert!(
2356 result.is_err(),
2357 "tenant_id {unsafe_tid:?} should have been rejected"
2358 );
2359 assert!(
2360 !store.is_tenant_loaded(unsafe_tid),
2361 "rejected tenant {unsafe_tid:?} must not be marked loaded"
2362 );
2363 }
2364 }
2365
2366 #[test]
2367 fn test_ensure_tenant_loaded_no_subtree_marks_loaded_with_zero_events() {
2368 let temp_dir = TempDir::new().unwrap();
2373 let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
2374 assert!(!store.is_tenant_loaded("never-existed"));
2375 store.ensure_tenant_loaded("never-existed").unwrap();
2376 assert!(store.is_tenant_loaded("never-existed"));
2377 }
2378
2379 #[test]
2380 fn test_evict_tenant_drops_events_and_resets_bytes() {
2381 let temp_dir = TempDir::new().unwrap();
2385 let storage_dir = temp_dir.path().to_path_buf();
2386
2387 {
2388 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2389 for i in 0..3 {
2390 store
2391 .ingest(
2392 &Event::from_strings(
2393 "test.event".to_string(),
2394 format!("a-{i}"),
2395 "alice".to_string(),
2396 serde_json::json!({"i": i}),
2397 None,
2398 )
2399 .unwrap(),
2400 )
2401 .unwrap();
2402 }
2403 for i in 0..2 {
2404 store
2405 .ingest(
2406 &Event::from_strings(
2407 "test.event".to_string(),
2408 format!("b-{i}"),
2409 "bob".to_string(),
2410 serde_json::json!({"i": i}),
2411 None,
2412 )
2413 .unwrap(),
2414 )
2415 .unwrap();
2416 }
2417 store.flush_storage().unwrap();
2418 }
2419
2420 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2421 store.ensure_tenant_loaded("alice").unwrap();
2422 store.ensure_tenant_loaded("bob").unwrap();
2423 assert_eq!(store.stats().total_events, 5);
2424 let alice_bytes = store.tenant_resident_bytes("alice");
2425 let bob_bytes = store.tenant_resident_bytes("bob");
2426 assert!(alice_bytes > 0 && bob_bytes > 0);
2427
2428 store.evict_tenant("alice");
2429
2430 assert!(!store.is_tenant_loaded("alice"));
2431 assert!(store.is_tenant_loaded("bob"));
2432 assert_eq!(store.tenant_resident_bytes("alice"), 0);
2433 assert_eq!(store.tenant_resident_bytes("bob"), bob_bytes);
2434 assert_eq!(store.stats().total_events, 2, "only bob's 2 events remain");
2435 }
2436
2437 #[test]
2438 fn test_evict_tenant_then_query_re_loads_from_disk() {
2439 let temp_dir = TempDir::new().unwrap();
2443 let storage_dir = temp_dir.path().to_path_buf();
2444
2445 {
2446 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2447 for i in 0..4 {
2448 store
2449 .ingest(
2450 &Event::from_strings(
2451 "test.event".to_string(),
2452 format!("a-{i}"),
2453 "alice".to_string(),
2454 serde_json::json!({"i": i}),
2455 None,
2456 )
2457 .unwrap(),
2458 )
2459 .unwrap();
2460 }
2461 store.flush_storage().unwrap();
2462 }
2463
2464 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2465 store.ensure_tenant_loaded("alice").unwrap();
2466 store.evict_tenant("alice");
2467 assert_eq!(store.stats().total_events, 0);
2468
2469 let results = store
2471 .query(&QueryEventsRequest {
2472 entity_id: None,
2473 event_type: None,
2474 tenant_id: Some("alice".to_string()),
2475 as_of: None,
2476 since: None,
2477 until: None,
2478 limit: None,
2479 event_type_prefix: None,
2480 payload_filter: None,
2481 })
2482 .unwrap();
2483 assert_eq!(results.len(), 4);
2484 assert!(store.is_tenant_loaded("alice"));
2485 }
2486
2487 #[test]
2488 fn test_evict_tenant_rebuilds_index_with_new_offsets() {
2489 let temp_dir = TempDir::new().unwrap();
2496 let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
2497
2498 for i in 0..3 {
2502 store
2503 .ingest(
2504 &Event::from_strings(
2505 "test.event".to_string(),
2506 format!("a-{i}"),
2507 "alice".to_string(),
2508 serde_json::json!({"i": i}),
2509 None,
2510 )
2511 .unwrap(),
2512 )
2513 .unwrap();
2514 if i < 2 {
2515 store
2516 .ingest(
2517 &Event::from_strings(
2518 "test.event".to_string(),
2519 format!("b-{i}"),
2520 "bob".to_string(),
2521 serde_json::json!({"i": i}),
2522 None,
2523 )
2524 .unwrap(),
2525 )
2526 .unwrap();
2527 }
2528 }
2529 store.tenant_loader.mark_loaded("alice");
2531 store.tenant_loader.mark_loaded("bob");
2532
2533 store.evict_tenant("alice");
2534
2535 let bob_results = store
2536 .query(&QueryEventsRequest {
2537 entity_id: None,
2538 event_type: None,
2539 tenant_id: Some("bob".to_string()),
2540 as_of: None,
2541 since: None,
2542 until: None,
2543 limit: None,
2544 event_type_prefix: None,
2545 payload_filter: None,
2546 })
2547 .unwrap();
2548 assert_eq!(bob_results.len(), 2);
2549 for e in &bob_results {
2550 assert_eq!(e.tenant_id_str(), "bob");
2551 }
2552 }
2553
2554 #[test]
2555 fn test_budget_eviction_keeps_resident_set_bounded() {
2556 let temp_dir = TempDir::new().unwrap();
2560 let storage_dir = temp_dir.path().to_path_buf();
2561
2562 let big_payload = serde_json::json!({"data": "x".repeat(1000)});
2565 {
2566 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2567 for tenant in ["alice", "bob", "carol"] {
2568 for i in 0..5 {
2569 store
2570 .ingest(
2571 &Event::from_strings(
2572 "test.event".to_string(),
2573 format!("{tenant}-{i}"),
2574 tenant.to_string(),
2575 big_payload.clone(),
2576 None,
2577 )
2578 .unwrap(),
2579 )
2580 .unwrap();
2581 }
2582 }
2583 store.flush_storage().unwrap();
2584 }
2585
2586 let mut config = EventStoreConfig::with_persistence(&storage_dir);
2589 config.cache_byte_budget = Some(12_000);
2590 let store = EventStore::with_config(config);
2591
2592 store.ensure_tenant_loaded("alice").unwrap();
2594 assert!(store.is_tenant_loaded("alice"));
2595
2596 store.tenant_loader.touch("alice");
2601 std::thread::sleep(std::time::Duration::from_millis(10));
2602 store.ensure_tenant_loaded("bob").unwrap();
2603 assert!(store.is_tenant_loaded("bob"));
2604
2605 store.tenant_loader.touch("bob");
2609 std::thread::sleep(std::time::Duration::from_millis(10));
2610 store.ensure_tenant_loaded("carol").unwrap();
2611 assert!(store.is_tenant_loaded("carol"));
2612
2613 let resident = store.cache_resident_bytes();
2618 let budget = 12_000u64;
2619
2620 if resident > budget {
2623 let loaded_count = ["alice", "bob", "carol"]
2624 .iter()
2625 .filter(|t| store.is_tenant_loaded(t))
2626 .count();
2627 assert_eq!(
2628 loaded_count, 1,
2629 "over budget but more than one tenant loaded — eviction policy didn't fire"
2630 );
2631 }
2632
2633 assert!(store.is_tenant_loaded("carol"));
2636 }
2637
2638 #[test]
2639 fn test_query_after_eviction_re_loads_transparently() {
2640 let temp_dir = TempDir::new().unwrap();
2643 let storage_dir = temp_dir.path().to_path_buf();
2644
2645 let big_payload = serde_json::json!({"data": "x".repeat(2000)});
2646 {
2647 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2648 for tenant in ["alice", "bob"] {
2649 for i in 0..3 {
2650 store
2651 .ingest(
2652 &Event::from_strings(
2653 "test.event".to_string(),
2654 format!("{tenant}-{i}"),
2655 tenant.to_string(),
2656 big_payload.clone(),
2657 None,
2658 )
2659 .unwrap(),
2660 )
2661 .unwrap();
2662 }
2663 }
2664 store.flush_storage().unwrap();
2665 }
2666
2667 let mut config = EventStoreConfig::with_persistence(&storage_dir);
2669 config.cache_byte_budget = Some(5_000);
2670 let store = EventStore::with_config(config);
2671
2672 let alice_first = store
2676 .query(&QueryEventsRequest {
2677 entity_id: None,
2678 event_type: None,
2679 tenant_id: Some("alice".to_string()),
2680 as_of: None,
2681 since: None,
2682 until: None,
2683 limit: None,
2684 event_type_prefix: None,
2685 payload_filter: None,
2686 })
2687 .unwrap();
2688 assert_eq!(alice_first.len(), 3);
2689
2690 std::thread::sleep(std::time::Duration::from_millis(15));
2692 let _bob = store
2694 .query(&QueryEventsRequest {
2695 entity_id: None,
2696 event_type: None,
2697 tenant_id: Some("bob".to_string()),
2698 as_of: None,
2699 since: None,
2700 until: None,
2701 limit: None,
2702 event_type_prefix: None,
2703 payload_filter: None,
2704 })
2705 .unwrap();
2706 assert!(
2707 !store.is_tenant_loaded("alice"),
2708 "alice should have been evicted"
2709 );
2710
2711 let alice_second = store
2713 .query(&QueryEventsRequest {
2714 entity_id: None,
2715 event_type: None,
2716 tenant_id: Some("alice".to_string()),
2717 as_of: None,
2718 since: None,
2719 until: None,
2720 limit: None,
2721 event_type_prefix: None,
2722 payload_filter: None,
2723 })
2724 .unwrap();
2725 assert_eq!(
2726 alice_second.len(),
2727 3,
2728 "alice's events come back via re-load"
2729 );
2730 assert!(store.is_tenant_loaded("alice"));
2731 }
2732
2733 #[test]
2734 #[cfg(feature = "server")]
2735 fn test_cache_metrics_track_evictions_and_bytes() {
2736 let temp_dir = TempDir::new().unwrap();
2740 let storage_dir = temp_dir.path().to_path_buf();
2741
2742 let big_payload = serde_json::json!({"data": "x".repeat(2000)});
2743 {
2744 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2745 for tenant in ["alice", "bob"] {
2746 for i in 0..3 {
2747 store
2748 .ingest(
2749 &Event::from_strings(
2750 "test.event".to_string(),
2751 format!("{tenant}-{i}"),
2752 tenant.to_string(),
2753 big_payload.clone(),
2754 None,
2755 )
2756 .unwrap(),
2757 )
2758 .unwrap();
2759 }
2760 }
2761 store.flush_storage().unwrap();
2762 }
2763
2764 let mut config = EventStoreConfig::with_persistence(&storage_dir);
2765 config.cache_byte_budget = Some(5_000); let store = EventStore::with_config(config);
2767
2768 assert_eq!(store.metrics.cache_evictions_total.get(), 0);
2769 assert_eq!(store.metrics.cache_bytes.get(), 0);
2770
2771 store.ensure_tenant_loaded("alice").unwrap();
2772 let after_alice = store.metrics.cache_bytes.get();
2774 assert!(after_alice > 0, "gauge should reflect alice's bytes");
2775 assert_eq!(store.metrics.cache_evictions_total.get(), 0);
2777
2778 std::thread::sleep(std::time::Duration::from_millis(10));
2779 store.ensure_tenant_loaded("bob").unwrap();
2780
2781 assert_eq!(
2784 store.metrics.cache_evictions_total.get(),
2785 1,
2786 "exactly one tenant evicted after bob's load"
2787 );
2788 let after_bob = store.metrics.cache_bytes.get();
2790 assert!(after_bob > 0);
2791 assert!(after_bob <= after_alice, "gauge dropped after eviction");
2792 }
2793
2794 #[test]
2795 fn test_stress_resident_set_stays_near_budget_under_rolling_queries() {
2796 let temp_dir = TempDir::new().unwrap();
2803 let storage_dir = temp_dir.path().to_path_buf();
2804
2805 const TENANT_COUNT: usize = 10;
2806 const EVENTS_PER_TENANT: usize = 50;
2807 let big_payload = serde_json::json!({"data": "x".repeat(10_000)});
2809
2810 {
2813 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2814 for t in 0..TENANT_COUNT {
2815 let tenant = format!("tenant-{t}");
2816 for i in 0..EVENTS_PER_TENANT {
2817 store
2818 .ingest(
2819 &Event::from_strings(
2820 "test.event".to_string(),
2821 format!("{tenant}-{i}"),
2822 tenant.clone(),
2823 big_payload.clone(),
2824 None,
2825 )
2826 .unwrap(),
2827 )
2828 .unwrap();
2829 }
2830 }
2831 store.flush_storage().unwrap();
2832 }
2833
2834 const BUDGET: u64 = 1_048_576;
2838 let mut config = EventStoreConfig::with_persistence(&storage_dir);
2839 config.cache_byte_budget = Some(BUDGET);
2840 let store = EventStore::with_config(config);
2841
2842 let mut peak_resident: u64 = 0;
2846 for t in 0..TENANT_COUNT {
2847 let tenant = format!("tenant-{t}");
2848 let results = store
2849 .query(&QueryEventsRequest {
2850 entity_id: None,
2851 event_type: None,
2852 tenant_id: Some(tenant.clone()),
2853 as_of: None,
2854 since: None,
2855 until: None,
2856 limit: None,
2857 event_type_prefix: None,
2858 payload_filter: None,
2859 })
2860 .unwrap();
2861 assert_eq!(
2862 results.len(),
2863 EVENTS_PER_TENANT,
2864 "every per-tenant query must return all of that tenant's events"
2865 );
2866 let resident = store.cache_resident_bytes();
2868 if resident > peak_resident {
2869 peak_resident = resident;
2870 }
2871 }
2872
2873 let final_resident = store.cache_resident_bytes();
2874
2875 let tolerance = BUDGET; assert!(
2881 peak_resident <= BUDGET + tolerance,
2882 "peak resident {peak_resident} exceeds budget {BUDGET} by more than {tolerance} \
2883 — eviction policy not keeping up with the working-set churn"
2884 );
2885 assert!(
2886 final_resident <= BUDGET + tolerance,
2887 "final resident {final_resident} exceeds budget {BUDGET} by more than {tolerance}"
2888 );
2889
2890 let last_tenant = format!("tenant-{}", TENANT_COUNT - 1);
2893 assert!(
2894 store.is_tenant_loaded(&last_tenant),
2895 "the most-recent tenant must remain loaded after the sweep"
2896 );
2897
2898 let still_loaded = (0..TENANT_COUNT)
2901 .filter(|t| store.is_tenant_loaded(&format!("tenant-{t}")))
2902 .count();
2903 assert!(
2904 still_loaded < TENANT_COUNT,
2905 "no tenants evicted ({still_loaded}/{TENANT_COUNT} still loaded) — \
2906 budget enforcement didn't engage"
2907 );
2908 }
2909
2910 #[test]
2911 fn test_evict_tenant_when_not_loaded_is_a_noop() {
2912 let store = EventStore::new();
2915 store.evict_tenant("nobody"); assert!(!store.is_tenant_loaded("nobody"));
2917 }
2918
2919 #[test]
2920 fn test_lazy_load_accounts_bytes_per_tenant() {
2921 let temp_dir = TempDir::new().unwrap();
2925 let storage_dir = temp_dir.path().to_path_buf();
2926
2927 {
2929 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2930 for i in 0..5 {
2931 store
2932 .ingest(
2933 &Event::from_strings(
2934 "test.event".to_string(),
2935 format!("a-{i}"),
2936 "alice".to_string(),
2937 serde_json::json!({"data": "x".repeat(1000)}),
2938 None,
2939 )
2940 .unwrap(),
2941 )
2942 .unwrap();
2943 }
2944 store.flush_storage().unwrap();
2945 }
2946
2947 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2948 assert_eq!(store.tenant_resident_bytes("alice"), 0);
2950 assert_eq!(store.cache_resident_bytes(), 0);
2951
2952 store.ensure_tenant_loaded("alice").unwrap();
2953
2954 let alice_bytes = store.tenant_resident_bytes("alice");
2957 assert!(
2958 alice_bytes >= 5 * 1000,
2959 "alice should have at least 5 KiB resident; got {alice_bytes}"
2960 );
2961 assert_eq!(store.tenant_resident_bytes("bob"), 0);
2963 assert_eq!(store.cache_resident_bytes(), alice_bytes);
2965 }
2966
2967 #[test]
2968 fn test_query_lazy_loads_tenant_on_first_call() {
2969 let temp_dir = TempDir::new().unwrap();
2973 let storage_dir = temp_dir.path().to_path_buf();
2974
2975 {
2977 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2978 for i in 0..3 {
2979 let event = Event::from_strings(
2980 "test.event".to_string(),
2981 format!("e-{i}"),
2982 "alice".to_string(),
2983 serde_json::json!({"i": i}),
2984 None,
2985 )
2986 .unwrap();
2987 store.ingest(&event).unwrap();
2988 }
2989 store.flush_storage().unwrap();
2990 }
2991
2992 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2994 assert_eq!(
2995 store.stats().total_events,
2996 0,
2997 "boot must be O(1) — no Parquet pre-load"
2998 );
2999 assert!(!store.is_tenant_loaded("alice"));
3000 assert!(!store.is_tenant_loaded("bob"));
3001
3002 let results = store
3004 .query(&QueryEventsRequest {
3005 entity_id: None,
3006 event_type: None,
3007 tenant_id: Some("alice".to_string()),
3008 as_of: None,
3009 since: None,
3010 until: None,
3011 limit: None,
3012 event_type_prefix: None,
3013 payload_filter: None,
3014 })
3015 .unwrap();
3016 assert_eq!(results.len(), 3, "alice's 3 events are returned");
3017 assert!(store.is_tenant_loaded("alice"), "alice now warm");
3018 assert!(!store.is_tenant_loaded("bob"), "bob still cold");
3021 }
3022
3023 #[test]
3024 fn test_query_invalid_tenant_id_returns_error_no_hang() {
3025 let temp_dir = TempDir::new().unwrap();
3029 let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
3030
3031 let result = store.query(&QueryEventsRequest {
3032 entity_id: None,
3033 event_type: None,
3034 tenant_id: Some("../etc".to_string()),
3035 as_of: None,
3036 since: None,
3037 until: None,
3038 limit: None,
3039 event_type_prefix: None,
3040 payload_filter: None,
3041 });
3042 assert!(result.is_err(), "unsafe tenant_id must surface as error");
3043 }
3044
3045 #[test]
3046 fn test_query_concurrent_first_queries_for_same_tenant_all_succeed() {
3047 let temp_dir = TempDir::new().unwrap();
3055 let storage_dir = temp_dir.path().to_path_buf();
3056
3057 {
3059 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3060 for i in 0..25 {
3061 let event = Event::from_strings(
3062 "test.event".to_string(),
3063 format!("e-{i}"),
3064 "alice".to_string(),
3065 serde_json::json!({"i": i}),
3066 None,
3067 )
3068 .unwrap();
3069 store.ingest(&event).unwrap();
3070 }
3071 store.flush_storage().unwrap();
3072 }
3073
3074 let store = Arc::new(EventStore::with_config(EventStoreConfig::with_persistence(
3076 &storage_dir,
3077 )));
3078 assert!(!store.is_tenant_loaded("alice"));
3079
3080 let mut handles = Vec::new();
3081 for _ in 0..8 {
3082 let s = store.clone();
3083 handles.push(std::thread::spawn(move || {
3084 s.query(&QueryEventsRequest {
3085 entity_id: None,
3086 event_type: None,
3087 tenant_id: Some("alice".to_string()),
3088 as_of: None,
3089 since: None,
3090 until: None,
3091 limit: None,
3092 event_type_prefix: None,
3093 payload_filter: None,
3094 })
3095 }));
3096 }
3097
3098 for h in handles {
3099 let result = h.join().unwrap().unwrap();
3100 assert_eq!(
3101 result.len(),
3102 25,
3103 "every concurrent caller must see all 25 events"
3104 );
3105 }
3106 assert!(store.is_tenant_loaded("alice"));
3107 assert_eq!(store.stats().total_events, 25);
3109 }
3110
3111 #[test]
3112 fn test_query_two_cold_tenants_load_independently() {
3113 let temp_dir = TempDir::new().unwrap();
3117 let storage_dir = temp_dir.path().to_path_buf();
3118
3119 {
3120 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3121 for i in 0..3 {
3122 store
3123 .ingest(
3124 &Event::from_strings(
3125 "test.event".to_string(),
3126 format!("a-{i}"),
3127 "alice".to_string(),
3128 serde_json::json!({"i": i}),
3129 None,
3130 )
3131 .unwrap(),
3132 )
3133 .unwrap();
3134 }
3135 for i in 0..5 {
3136 store
3137 .ingest(
3138 &Event::from_strings(
3139 "test.event".to_string(),
3140 format!("b-{i}"),
3141 "bob".to_string(),
3142 serde_json::json!({"i": i}),
3143 None,
3144 )
3145 .unwrap(),
3146 )
3147 .unwrap();
3148 }
3149 store.flush_storage().unwrap();
3150 }
3151
3152 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3153 assert_eq!(store.stats().total_events, 0);
3154
3155 let alice = store
3157 .query(&QueryEventsRequest {
3158 entity_id: None,
3159 event_type: None,
3160 tenant_id: Some("alice".to_string()),
3161 as_of: None,
3162 since: None,
3163 until: None,
3164 limit: None,
3165 event_type_prefix: None,
3166 payload_filter: None,
3167 })
3168 .unwrap();
3169 assert_eq!(alice.len(), 3);
3170 assert!(store.is_tenant_loaded("alice"));
3171 assert!(!store.is_tenant_loaded("bob"));
3172 assert_eq!(store.stats().total_events, 3);
3173
3174 let bob = store
3176 .query(&QueryEventsRequest {
3177 entity_id: None,
3178 event_type: None,
3179 tenant_id: Some("bob".to_string()),
3180 as_of: None,
3181 since: None,
3182 until: None,
3183 limit: None,
3184 event_type_prefix: None,
3185 payload_filter: None,
3186 })
3187 .unwrap();
3188 assert_eq!(bob.len(), 5);
3189 assert!(store.is_tenant_loaded("bob"));
3190 assert_eq!(store.stats().total_events, 8);
3191 }
3192
3193 #[test]
3194 fn test_boot_with_persisted_data_is_o1() {
3195 let temp_dir = TempDir::new().unwrap();
3208 let storage_dir = temp_dir.path().to_path_buf();
3209
3210 {
3211 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3212 for tenant in ["alice", "bob", "carol"] {
3213 for i in 0..50 / 3 {
3214 store
3215 .ingest(
3216 &Event::from_strings(
3217 "test.event".to_string(),
3218 format!("{tenant}-{i}"),
3219 tenant.to_string(),
3220 serde_json::json!({"i": i}),
3221 None,
3222 )
3223 .unwrap(),
3224 )
3225 .unwrap();
3226 }
3227 }
3228 store.flush_storage().unwrap();
3229 }
3230
3231 let on_disk = find_parquet_files(&storage_dir);
3233 assert!(
3234 !on_disk.is_empty(),
3235 "session 1 should have produced parquet files; pre-condition for the test"
3236 );
3237
3238 let started = std::time::Instant::now();
3239 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3240 let boot_elapsed = started.elapsed();
3241
3242 assert_eq!(
3243 store.stats().total_events,
3244 0,
3245 "boot must not pre-load any Parquet events"
3246 );
3247
3248 assert!(
3252 boot_elapsed < std::time::Duration::from_secs(2),
3253 "boot took {boot_elapsed:?} — Step 2 boot should be O(1)"
3254 );
3255 }
3256
3257 #[test]
3258 fn test_query_warm_tenant_does_not_re_read_disk() {
3259 let temp_dir = TempDir::new().unwrap();
3265 let storage_dir = temp_dir.path().to_path_buf();
3266
3267 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3268 for i in 0..3 {
3269 let event = Event::from_strings(
3270 "test.event".to_string(),
3271 format!("e-{i}"),
3272 "alice".to_string(),
3273 serde_json::json!({"i": i}),
3274 None,
3275 )
3276 .unwrap();
3277 store.ingest(&event).unwrap();
3278 }
3279 store.flush_storage().unwrap();
3280
3281 let _ = store
3283 .query(&QueryEventsRequest {
3284 entity_id: None,
3285 event_type: None,
3286 tenant_id: Some("alice".to_string()),
3287 as_of: None,
3288 since: None,
3289 until: None,
3290 limit: None,
3291 event_type_prefix: None,
3292 payload_filter: None,
3293 })
3294 .unwrap();
3295 assert!(store.is_tenant_loaded("alice"));
3296
3297 let parquet_files = find_parquet_files(&storage_dir);
3300 for f in parquet_files {
3301 std::fs::remove_file(&f).unwrap();
3302 }
3303
3304 let results = store
3305 .query(&QueryEventsRequest {
3306 entity_id: None,
3307 event_type: None,
3308 tenant_id: Some("alice".to_string()),
3309 as_of: None,
3310 since: None,
3311 until: None,
3312 limit: None,
3313 event_type_prefix: None,
3314 payload_filter: None,
3315 })
3316 .unwrap();
3317 assert_eq!(
3318 results.len(),
3319 3,
3320 "warm tenant query must not need disk; got {} events from a deleted parquet",
3321 results.len()
3322 );
3323 }
3324
3325 #[test]
3326 fn test_event_store_default() {
3327 let store = EventStore::default();
3328 assert_eq!(store.stats().total_events, 0);
3329 }
3330
3331 #[test]
3332 fn test_ingest_single_event() {
3333 let store = EventStore::new();
3334 let event = create_test_event("entity-1", "user.created");
3335
3336 store.ingest(&event).unwrap();
3337
3338 assert_eq!(store.stats().total_events, 1);
3339 assert_eq!(store.stats().total_ingested, 1);
3340 }
3341
3342 #[test]
3343 fn test_ingest_multiple_events() {
3344 let store = EventStore::new();
3345
3346 for i in 0..10 {
3347 let event = create_test_event(&format!("entity-{i}"), "user.created");
3348 store.ingest(&event).unwrap();
3349 }
3350
3351 assert_eq!(store.stats().total_events, 10);
3352 assert_eq!(store.stats().total_ingested, 10);
3353 }
3354
3355 #[test]
3356 fn test_query_by_entity_id() {
3357 let store = EventStore::new();
3358
3359 store
3360 .ingest(&create_test_event("entity-1", "user.created"))
3361 .unwrap();
3362 store
3363 .ingest(&create_test_event("entity-2", "user.created"))
3364 .unwrap();
3365 store
3366 .ingest(&create_test_event("entity-1", "user.updated"))
3367 .unwrap();
3368
3369 let results = store
3370 .query(&QueryEventsRequest {
3371 entity_id: Some("entity-1".to_string()),
3372 event_type: None,
3373 tenant_id: None,
3374 as_of: None,
3375 since: None,
3376 until: None,
3377 limit: None,
3378 event_type_prefix: None,
3379 payload_filter: None,
3380 })
3381 .unwrap();
3382
3383 assert_eq!(results.len(), 2);
3384 }
3385
3386 #[test]
3387 fn test_query_by_event_type() {
3388 let store = EventStore::new();
3389
3390 store
3391 .ingest(&create_test_event("entity-1", "user.created"))
3392 .unwrap();
3393 store
3394 .ingest(&create_test_event("entity-2", "user.updated"))
3395 .unwrap();
3396 store
3397 .ingest(&create_test_event("entity-3", "user.created"))
3398 .unwrap();
3399
3400 let results = store
3401 .query(&QueryEventsRequest {
3402 entity_id: None,
3403 event_type: Some("user.created".to_string()),
3404 tenant_id: None,
3405 as_of: None,
3406 since: None,
3407 until: None,
3408 limit: None,
3409 event_type_prefix: None,
3410 payload_filter: None,
3411 })
3412 .unwrap();
3413
3414 assert_eq!(results.len(), 2);
3415 }
3416
3417 #[test]
3418 fn test_query_with_limit() {
3419 let store = EventStore::new();
3420
3421 for i in 0..10 {
3422 let event = create_test_event(&format!("entity-{i}"), "user.created");
3423 store.ingest(&event).unwrap();
3424 }
3425
3426 let results = store
3427 .query(&QueryEventsRequest {
3428 entity_id: None,
3429 event_type: None,
3430 tenant_id: None,
3431 as_of: None,
3432 since: None,
3433 until: None,
3434 limit: Some(5),
3435 event_type_prefix: None,
3436 payload_filter: None,
3437 })
3438 .unwrap();
3439
3440 assert_eq!(results.len(), 5);
3441 }
3442
3443 #[test]
3444 fn test_query_empty_store() {
3445 let store = EventStore::new();
3446
3447 let results = store
3448 .query(&QueryEventsRequest {
3449 entity_id: Some("non-existent".to_string()),
3450 event_type: None,
3451 tenant_id: None,
3452 as_of: None,
3453 since: None,
3454 until: None,
3455 limit: None,
3456 event_type_prefix: None,
3457 payload_filter: None,
3458 })
3459 .unwrap();
3460
3461 assert!(results.is_empty());
3462 }
3463
3464 #[test]
3465 fn test_reconstruct_state() {
3466 let store = EventStore::new();
3467
3468 store
3469 .ingest(&create_test_event("entity-1", "user.created"))
3470 .unwrap();
3471
3472 let state = store.reconstruct_state("entity-1", None).unwrap();
3473 assert_eq!(state["current_state"]["name"], "Test");
3475 assert_eq!(state["current_state"]["value"], 42);
3476 }
3477
3478 #[test]
3479 fn test_reconstruct_state_not_found() {
3480 let store = EventStore::new();
3481
3482 let result = store.reconstruct_state("non-existent", None);
3483 assert!(result.is_err());
3484 }
3485
3486 #[test]
3487 fn test_get_snapshot_empty() {
3488 let store = EventStore::new();
3489
3490 let result = store.get_snapshot("non-existent");
3491 assert!(result.is_err());
3493 }
3494
3495 #[test]
3496 fn test_create_snapshot() {
3497 let store = EventStore::new();
3498
3499 store
3500 .ingest(&create_test_event("entity-1", "user.created"))
3501 .unwrap();
3502
3503 store.create_snapshot("entity-1").unwrap();
3504
3505 let snapshot = store.get_snapshot("entity-1").unwrap();
3507 assert!(snapshot != serde_json::json!(null));
3508 }
3509
3510 #[test]
3511 fn test_create_snapshot_entity_not_found() {
3512 let store = EventStore::new();
3513
3514 let result = store.create_snapshot("non-existent");
3515 assert!(result.is_err());
3516 }
3517
3518 #[test]
3519 fn test_websocket_manager() {
3520 let store = EventStore::new();
3521 let manager = store.websocket_manager();
3522 assert!(Arc::strong_count(&manager) >= 1);
3524 }
3525
3526 #[test]
3527 fn test_snapshot_manager() {
3528 let store = EventStore::new();
3529 let manager = store.snapshot_manager();
3530 assert!(Arc::strong_count(&manager) >= 1);
3531 }
3532
3533 #[test]
3534 fn test_compaction_manager_none() {
3535 let store = EventStore::new();
3536 assert!(store.compaction_manager().is_none());
3538 }
3539
3540 #[test]
3541 fn test_schema_registry() {
3542 let store = EventStore::new();
3543 let registry = store.schema_registry();
3544 assert!(Arc::strong_count(®istry) >= 1);
3545 }
3546
3547 #[test]
3548 fn test_replay_manager() {
3549 let store = EventStore::new();
3550 let manager = store.replay_manager();
3551 assert!(Arc::strong_count(&manager) >= 1);
3552 }
3553
3554 #[test]
3555 fn test_pipeline_manager() {
3556 let store = EventStore::new();
3557 let manager = store.pipeline_manager();
3558 assert!(Arc::strong_count(&manager) >= 1);
3559 }
3560
3561 #[test]
3562 fn test_projection_manager() {
3563 let store = EventStore::new();
3564 let manager = store.projection_manager();
3565 let projections = manager.list_projections();
3567 assert!(projections.len() >= 2); }
3569
3570 #[test]
3571 fn test_projection_state_cache() {
3572 let store = EventStore::new();
3573 let cache = store.projection_state_cache();
3574
3575 cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
3576 assert_eq!(cache.len(), 1);
3577
3578 let value = cache.get("test:key").unwrap();
3579 assert_eq!(value["value"], 123);
3580 }
3581
3582 #[test]
3583 fn test_metrics() {
3584 let store = EventStore::new();
3585 let metrics = store.metrics();
3586 assert!(Arc::strong_count(&metrics) >= 1);
3587 }
3588
3589 #[test]
3590 fn test_store_stats() {
3591 let store = EventStore::new();
3592
3593 store
3594 .ingest(&create_test_event("entity-1", "user.created"))
3595 .unwrap();
3596 store
3597 .ingest(&create_test_event("entity-2", "order.placed"))
3598 .unwrap();
3599
3600 let stats = store.stats();
3601 assert_eq!(stats.total_events, 2);
3602 assert_eq!(stats.total_entities, 2);
3603 assert_eq!(stats.total_event_types, 2);
3604 assert_eq!(stats.total_ingested, 2);
3605 }
3606
3607 #[test]
3608 fn test_event_store_config_default() {
3609 let config = EventStoreConfig::default();
3610 assert!(config.storage_dir.is_none());
3611 assert!(config.wal_dir.is_none());
3612 }
3613
3614 #[test]
3615 fn test_event_store_config_with_persistence() {
3616 let temp_dir = TempDir::new().unwrap();
3617 let config = EventStoreConfig::with_persistence(temp_dir.path());
3618
3619 assert!(config.storage_dir.is_some());
3620 assert!(config.wal_dir.is_none());
3621 }
3622
3623 #[test]
3624 fn test_event_store_config_with_wal() {
3625 let temp_dir = TempDir::new().unwrap();
3626 let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
3627
3628 assert!(config.storage_dir.is_none());
3629 assert!(config.wal_dir.is_some());
3630 }
3631
3632 #[test]
3633 fn test_event_store_config_with_all() {
3634 let temp_dir = TempDir::new().unwrap();
3635 let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
3636
3637 assert!(config.storage_dir.is_some());
3638 }
3639
3640 #[test]
3641 fn test_event_store_config_production() {
3642 let storage_dir = TempDir::new().unwrap();
3643 let wal_dir = TempDir::new().unwrap();
3644 let config = EventStoreConfig::production(
3645 storage_dir.path(),
3646 wal_dir.path(),
3647 SnapshotConfig::default(),
3648 WALConfig::default(),
3649 CompactionConfig::default(),
3650 );
3651
3652 assert!(config.storage_dir.is_some());
3653 assert!(config.wal_dir.is_some());
3654 }
3655
3656 #[test]
3662 fn test_from_env_vars_data_dir_enables_full_persistence() {
3663 let (config, mode) = EventStoreConfig::from_env_vars(
3664 Some("/app/data".to_string()),
3665 None,
3666 None,
3667 None,
3668 None,
3669 None,
3670 None,
3671 None,
3672 );
3673 assert_eq!(mode, "wal+parquet");
3674 assert_eq!(
3675 config.storage_dir.unwrap().to_str().unwrap(),
3676 "/app/data/storage"
3677 );
3678 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
3679 }
3680
3681 #[test]
3682 fn test_from_env_vars_explicit_dirs() {
3683 let (config, mode) = EventStoreConfig::from_env_vars(
3684 None,
3685 Some("/custom/storage".to_string()),
3686 Some("/custom/wal".to_string()),
3687 None,
3688 None,
3689 None,
3690 None,
3691 None,
3692 );
3693 assert_eq!(mode, "wal+parquet");
3694 assert_eq!(
3695 config.storage_dir.unwrap().to_str().unwrap(),
3696 "/custom/storage"
3697 );
3698 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
3699 }
3700
3701 #[test]
3702 fn test_from_env_vars_wal_disabled() {
3703 let (config, mode) = EventStoreConfig::from_env_vars(
3704 Some("/app/data".to_string()),
3705 None,
3706 None,
3707 Some("false".to_string()),
3708 None,
3709 None,
3710 None,
3711 None,
3712 );
3713 assert_eq!(mode, "parquet-only");
3714 assert!(config.storage_dir.is_some());
3715 assert!(config.wal_dir.is_none());
3716 }
3717
3718 #[test]
3719 fn test_from_env_vars_no_dirs_is_in_memory() {
3720 let (config, mode) =
3721 EventStoreConfig::from_env_vars(None, None, None, None, None, None, None, None);
3722 assert_eq!(mode, "in-memory");
3723 assert!(config.storage_dir.is_none());
3724 assert!(config.wal_dir.is_none());
3725 }
3726
3727 #[test]
3728 fn test_from_env_vars_empty_strings_treated_as_none() {
3729 let (_, mode) = EventStoreConfig::from_env_vars(
3730 Some(String::new()),
3731 Some(String::new()),
3732 Some(String::new()),
3733 None,
3734 None,
3735 None,
3736 None,
3737 None,
3738 );
3739 assert_eq!(mode, "in-memory");
3740 }
3741
3742 #[test]
3743 fn test_from_env_vars_explicit_overrides_data_dir() {
3744 let (config, mode) = EventStoreConfig::from_env_vars(
3745 Some("/app/data".to_string()),
3746 Some("/override/storage".to_string()),
3747 Some("/override/wal".to_string()),
3748 None,
3749 None,
3750 None,
3751 None,
3752 None,
3753 );
3754 assert_eq!(mode, "wal+parquet");
3755 assert_eq!(
3756 config.storage_dir.unwrap().to_str().unwrap(),
3757 "/override/storage"
3758 );
3759 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
3760 }
3761
3762 #[test]
3763 fn test_from_env_vars_wal_only() {
3764 let (config, mode) = EventStoreConfig::from_env_vars(
3765 None,
3766 None,
3767 Some("/wal/only".to_string()),
3768 None,
3769 None,
3770 None,
3771 None,
3772 None,
3773 );
3774 assert_eq!(mode, "wal-only");
3775 assert!(config.storage_dir.is_none());
3776 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
3777 }
3778
3779 #[test]
3780 fn test_from_env_vars_cache_bytes_parses_decimal() {
3781 let (config, _) = EventStoreConfig::from_env_vars(
3782 Some("/app/data".to_string()),
3783 None,
3784 None,
3785 None,
3786 Some("536870912".to_string()),
3787 None,
3789 None,
3790 None,
3791 );
3792 assert_eq!(config.cache_byte_budget, Some(536_870_912));
3793 }
3794
3795 #[test]
3796 fn test_from_env_vars_cache_bytes_unparseable_disables_budget() {
3797 let (config, _) = EventStoreConfig::from_env_vars(
3801 Some("/app/data".to_string()),
3802 None,
3803 None,
3804 None,
3805 Some("not-a-number".to_string()),
3806 None,
3807 None,
3808 None,
3809 );
3810 assert_eq!(config.cache_byte_budget, None);
3811 }
3812
3813 #[test]
3814 fn test_from_env_vars_cache_bytes_empty_disables_budget() {
3815 let (config, _) = EventStoreConfig::from_env_vars(
3816 Some("/app/data".to_string()),
3817 None,
3818 None,
3819 None,
3820 Some(String::new()),
3821 None,
3822 None,
3823 None,
3824 );
3825 assert_eq!(config.cache_byte_budget, None);
3826 }
3827
3828 #[test]
3829 fn test_from_env_vars_snapshot_interval_overrides_default() {
3830 let (config, _) = EventStoreConfig::from_env_vars(
3834 Some("/app/data".to_string()),
3835 None,
3836 None,
3837 None,
3838 None,
3839 Some("60".to_string()),
3840 None,
3841 None,
3842 );
3843 assert_eq!(config.compaction_config.compaction_interval_seconds, 60);
3844 }
3845
3846 #[test]
3847 fn test_from_env_vars_snapshot_interval_default_is_hourly() {
3848 let (config, _) = EventStoreConfig::from_env_vars(
3849 Some("/app/data".to_string()),
3850 None,
3851 None,
3852 None,
3853 None,
3854 None,
3855 None,
3856 None,
3857 );
3858 assert_eq!(config.compaction_config.compaction_interval_seconds, 3600);
3859 }
3860
3861 #[test]
3862 fn test_from_env_vars_snapshot_interval_unparseable_falls_back() {
3863 let (config, _) = EventStoreConfig::from_env_vars(
3864 Some("/app/data".to_string()),
3865 None,
3866 None,
3867 None,
3868 None,
3869 Some("not-a-number".to_string()),
3870 None,
3871 None,
3872 );
3873 assert_eq!(config.compaction_config.compaction_interval_seconds, 3600);
3874 }
3875
3876 #[test]
3877 fn test_from_env_vars_retention_system_days_overrides_default() {
3878 let (config, _) = EventStoreConfig::from_env_vars(
3881 Some("/app/data".to_string()),
3882 None,
3883 None,
3884 None,
3885 None,
3886 None,
3887 Some("7".to_string()),
3888 None,
3889 );
3890 let ttl = config
3891 .compaction_config
3892 .retention
3893 .ttl_for("system")
3894 .unwrap();
3895 assert_eq!(ttl.as_secs(), 7 * 24 * 3600);
3896 }
3897
3898 #[test]
3899 fn test_from_env_vars_retention_default_is_30_days_for_system() {
3900 let (config, _) = EventStoreConfig::from_env_vars(
3901 Some("/app/data".to_string()),
3902 None,
3903 None,
3904 None,
3905 None,
3906 None,
3907 None,
3908 None,
3909 );
3910 let ttl = config
3911 .compaction_config
3912 .retention
3913 .ttl_for("system")
3914 .unwrap();
3915 assert_eq!(ttl.as_secs(), 30 * 24 * 3600);
3916 assert!(config.compaction_config.retention.ttl_for("acme").is_none());
3918 }
3919
3920 #[test]
3921 fn test_store_stats_serde() {
3922 let stats = StoreStats {
3923 total_events: 100,
3924 total_entities: 50,
3925 total_event_types: 10,
3926 total_ingested: 100,
3927 };
3928
3929 let json = serde_json::to_string(&stats).unwrap();
3930 assert!(json.contains("\"total_events\":100"));
3931 assert!(json.contains("\"total_entities\":50"));
3932 }
3933
3934 #[test]
3935 fn test_query_with_entity_and_type() {
3936 let store = EventStore::new();
3937
3938 store
3939 .ingest(&create_test_event("entity-1", "user.created"))
3940 .unwrap();
3941 store
3942 .ingest(&create_test_event("entity-1", "user.updated"))
3943 .unwrap();
3944 store
3945 .ingest(&create_test_event("entity-2", "user.created"))
3946 .unwrap();
3947
3948 let results = store
3949 .query(&QueryEventsRequest {
3950 entity_id: Some("entity-1".to_string()),
3951 event_type: Some("user.created".to_string()),
3952 tenant_id: None,
3953 as_of: None,
3954 since: None,
3955 until: None,
3956 limit: None,
3957 event_type_prefix: None,
3958 payload_filter: None,
3959 })
3960 .unwrap();
3961
3962 assert_eq!(results.len(), 1);
3963 assert_eq!(results[0].event_type_str(), "user.created");
3964 }
3965
3966 #[test]
3967 fn test_query_by_event_type_prefix() {
3968 let store = EventStore::new();
3969
3970 store
3972 .ingest(&create_test_event("entity-1", "index.created"))
3973 .unwrap();
3974 store
3975 .ingest(&create_test_event("entity-2", "index.updated"))
3976 .unwrap();
3977 store
3978 .ingest(&create_test_event("entity-3", "trade.created"))
3979 .unwrap();
3980 store
3981 .ingest(&create_test_event("entity-4", "trade.completed"))
3982 .unwrap();
3983 store
3984 .ingest(&create_test_event("entity-5", "balance.updated"))
3985 .unwrap();
3986
3987 let results = store
3989 .query(&QueryEventsRequest {
3990 entity_id: None,
3991 event_type: None,
3992 tenant_id: None,
3993 as_of: None,
3994 since: None,
3995 until: None,
3996 limit: None,
3997 event_type_prefix: Some("index.".to_string()),
3998 payload_filter: None,
3999 })
4000 .unwrap();
4001
4002 assert_eq!(results.len(), 2);
4003 assert!(
4004 results
4005 .iter()
4006 .all(|e| e.event_type_str().starts_with("index."))
4007 );
4008 }
4009
4010 #[test]
4011 fn test_query_by_event_type_prefix_empty_returns_all() {
4012 let store = EventStore::new();
4013
4014 store
4015 .ingest(&create_test_event("entity-1", "index.created"))
4016 .unwrap();
4017 store
4018 .ingest(&create_test_event("entity-2", "trade.created"))
4019 .unwrap();
4020
4021 let results = store
4023 .query(&QueryEventsRequest {
4024 entity_id: None,
4025 event_type: None,
4026 tenant_id: None,
4027 as_of: None,
4028 since: None,
4029 until: None,
4030 limit: None,
4031 event_type_prefix: Some(String::new()),
4032 payload_filter: None,
4033 })
4034 .unwrap();
4035
4036 assert_eq!(results.len(), 2);
4037 }
4038
4039 #[test]
4040 fn test_query_by_event_type_prefix_no_match() {
4041 let store = EventStore::new();
4042
4043 store
4044 .ingest(&create_test_event("entity-1", "index.created"))
4045 .unwrap();
4046
4047 let results = store
4048 .query(&QueryEventsRequest {
4049 entity_id: None,
4050 event_type: None,
4051 tenant_id: None,
4052 as_of: None,
4053 since: None,
4054 until: None,
4055 limit: None,
4056 event_type_prefix: Some("nonexistent.".to_string()),
4057 payload_filter: None,
4058 })
4059 .unwrap();
4060
4061 assert!(results.is_empty());
4062 }
4063
4064 #[test]
4065 fn test_query_by_entity_with_type_prefix() {
4066 let store = EventStore::new();
4067
4068 store
4069 .ingest(&create_test_event("entity-1", "index.created"))
4070 .unwrap();
4071 store
4072 .ingest(&create_test_event("entity-1", "trade.created"))
4073 .unwrap();
4074 store
4075 .ingest(&create_test_event("entity-2", "index.updated"))
4076 .unwrap();
4077
4078 let results = store
4080 .query(&QueryEventsRequest {
4081 entity_id: Some("entity-1".to_string()),
4082 event_type: None,
4083 tenant_id: None,
4084 as_of: None,
4085 since: None,
4086 until: None,
4087 limit: None,
4088 event_type_prefix: Some("index.".to_string()),
4089 payload_filter: None,
4090 })
4091 .unwrap();
4092
4093 assert_eq!(results.len(), 1);
4094 assert_eq!(results[0].event_type_str(), "index.created");
4095 }
4096
4097 #[test]
4098 fn test_query_prefix_with_limit() {
4099 let store = EventStore::new();
4100
4101 for i in 0..5 {
4102 store
4103 .ingest(&create_test_event(&format!("entity-{i}"), "index.created"))
4104 .unwrap();
4105 }
4106
4107 let results = store
4108 .query(&QueryEventsRequest {
4109 entity_id: None,
4110 event_type: None,
4111 tenant_id: None,
4112 as_of: None,
4113 since: None,
4114 until: None,
4115 limit: Some(3),
4116 event_type_prefix: Some("index.".to_string()),
4117 payload_filter: None,
4118 })
4119 .unwrap();
4120
4121 assert_eq!(results.len(), 3);
4122 }
4123
4124 #[test]
4125 fn test_query_prefix_alongside_existing_filters() {
4126 let store = EventStore::new();
4127
4128 store
4129 .ingest(&create_test_event("entity-1", "index.created"))
4130 .unwrap();
4131 std::thread::sleep(std::time::Duration::from_millis(10));
4133 store
4134 .ingest(&create_test_event("entity-2", "index.strategy.updated"))
4135 .unwrap();
4136 std::thread::sleep(std::time::Duration::from_millis(10));
4137 store
4138 .ingest(&create_test_event("entity-3", "index.deleted"))
4139 .unwrap();
4140
4141 let results = store
4143 .query(&QueryEventsRequest {
4144 entity_id: None,
4145 event_type: None,
4146 tenant_id: None,
4147 as_of: None,
4148 since: None,
4149 until: None,
4150 limit: Some(2),
4151 event_type_prefix: Some("index.".to_string()),
4152 payload_filter: None,
4153 })
4154 .unwrap();
4155
4156 assert_eq!(results.len(), 2);
4157 }
4158
4159 #[test]
4160 fn test_query_with_payload_filter() {
4161 let store = EventStore::new();
4162
4163 for i in 0..5 {
4165 store
4166 .ingest(&create_test_event_with_payload(
4167 &format!("entity-{i}"),
4168 "user.action",
4169 serde_json::json!({"user_id": "alice", "action": "click"}),
4170 ))
4171 .unwrap();
4172 }
4173 for i in 5..10 {
4175 store
4176 .ingest(&create_test_event_with_payload(
4177 &format!("entity-{i}"),
4178 "user.action",
4179 serde_json::json!({"user_id": "bob", "action": "view"}),
4180 ))
4181 .unwrap();
4182 }
4183
4184 let results = store
4186 .query(&QueryEventsRequest {
4187 entity_id: None,
4188 event_type: Some("user.action".to_string()),
4189 tenant_id: None,
4190 as_of: None,
4191 since: None,
4192 until: None,
4193 limit: None,
4194 event_type_prefix: None,
4195 payload_filter: Some(r#"{"user_id":"alice"}"#.to_string()),
4196 })
4197 .unwrap();
4198
4199 assert_eq!(results.len(), 5);
4200 }
4201
4202 #[test]
4203 fn test_query_payload_filter_non_existent_field() {
4204 let store = EventStore::new();
4205
4206 store
4207 .ingest(&create_test_event_with_payload(
4208 "entity-1",
4209 "user.action",
4210 serde_json::json!({"user_id": "alice"}),
4211 ))
4212 .unwrap();
4213
4214 let results = store
4216 .query(&QueryEventsRequest {
4217 entity_id: None,
4218 event_type: None,
4219 tenant_id: None,
4220 as_of: None,
4221 since: None,
4222 until: None,
4223 limit: None,
4224 event_type_prefix: None,
4225 payload_filter: Some(r#"{"nonexistent":"value"}"#.to_string()),
4226 })
4227 .unwrap();
4228
4229 assert!(results.is_empty());
4230 }
4231
4232 #[test]
4233 fn test_query_payload_filter_with_prefix() {
4234 let store = EventStore::new();
4235
4236 store
4237 .ingest(&create_test_event_with_payload(
4238 "entity-1",
4239 "index.created",
4240 serde_json::json!({"status": "active"}),
4241 ))
4242 .unwrap();
4243 store
4244 .ingest(&create_test_event_with_payload(
4245 "entity-2",
4246 "index.created",
4247 serde_json::json!({"status": "inactive"}),
4248 ))
4249 .unwrap();
4250 store
4251 .ingest(&create_test_event_with_payload(
4252 "entity-3",
4253 "trade.created",
4254 serde_json::json!({"status": "active"}),
4255 ))
4256 .unwrap();
4257
4258 let results = store
4260 .query(&QueryEventsRequest {
4261 entity_id: None,
4262 event_type: None,
4263 tenant_id: None,
4264 as_of: None,
4265 since: None,
4266 until: None,
4267 limit: None,
4268 event_type_prefix: Some("index.".to_string()),
4269 payload_filter: Some(r#"{"status":"active"}"#.to_string()),
4270 })
4271 .unwrap();
4272
4273 assert_eq!(results.len(), 1);
4274 assert_eq!(results[0].entity_id().to_string(), "entity-1");
4275 }
4276
4277 #[test]
4278 fn test_flush_storage_no_storage() {
4279 let store = EventStore::new();
4280 let result = store.flush_storage();
4282 assert!(result.is_ok());
4283 }
4284
4285 #[test]
4286 fn test_state_evolution() {
4287 let store = EventStore::new();
4288
4289 store
4291 .ingest(
4292 &Event::from_strings(
4293 "user.created".to_string(),
4294 "user-1".to_string(),
4295 "default".to_string(),
4296 serde_json::json!({"name": "Alice", "age": 25}),
4297 None,
4298 )
4299 .unwrap(),
4300 )
4301 .unwrap();
4302
4303 store
4305 .ingest(
4306 &Event::from_strings(
4307 "user.updated".to_string(),
4308 "user-1".to_string(),
4309 "default".to_string(),
4310 serde_json::json!({"age": 26}),
4311 None,
4312 )
4313 .unwrap(),
4314 )
4315 .unwrap();
4316
4317 let state = store.reconstruct_state("user-1", None).unwrap();
4318 assert_eq!(state["current_state"]["name"], "Alice");
4320 assert_eq!(state["current_state"]["age"], 26);
4321 }
4322
4323 #[test]
4324 fn test_reject_system_event_types() {
4325 let store = EventStore::new();
4326
4327 let event = Event::reconstruct_from_strings(
4329 uuid::Uuid::new_v4(),
4330 "_system.tenant.created".to_string(),
4331 "_system:tenant:acme".to_string(),
4332 "_system".to_string(),
4333 serde_json::json!({"name": "ACME"}),
4334 chrono::Utc::now(),
4335 None,
4336 1,
4337 );
4338
4339 let result = store.ingest(&event);
4340 assert!(result.is_err());
4341 let err = result.unwrap_err();
4342 assert!(
4343 err.to_string().contains("reserved for internal use"),
4344 "Expected system namespace rejection, got: {err}"
4345 );
4346 }
4347
4348 #[test]
4356 fn test_wal_recovery_checkpoints_to_parquet() {
4357 let data_dir = TempDir::new().unwrap();
4358 let storage_dir = data_dir.path().join("storage");
4359 let wal_dir = data_dir.path().join("wal");
4360
4361 {
4363 let config = EventStoreConfig::production(
4364 &storage_dir,
4365 &wal_dir,
4366 SnapshotConfig::default(),
4367 WALConfig {
4368 sync_on_write: true,
4369 ..WALConfig::default()
4370 },
4371 CompactionConfig::default(),
4372 );
4373 let store = EventStore::with_config(config);
4374
4375 for i in 0..5 {
4376 let event = Event::from_strings(
4377 "test.created".to_string(),
4378 format!("entity-{i}"),
4379 "default".to_string(),
4380 serde_json::json!({"index": i}),
4381 None,
4382 )
4383 .unwrap();
4384 store.ingest(&event).unwrap();
4385 }
4386
4387 assert_eq!(store.stats().total_events, 5);
4388
4389 }
4392
4393 let wal_files: Vec<_> = std::fs::read_dir(&wal_dir)
4395 .unwrap()
4396 .filter_map(std::result::Result::ok)
4397 .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
4398 .collect();
4399 assert!(!wal_files.is_empty(), "WAL file should exist");
4400 let wal_size = wal_files[0].metadata().unwrap().len();
4401 assert!(wal_size > 0, "WAL file should have data (got 0 bytes)");
4402
4403 {
4405 let config = EventStoreConfig::production(
4406 &storage_dir,
4407 &wal_dir,
4408 SnapshotConfig::default(),
4409 WALConfig {
4410 sync_on_write: true,
4411 ..WALConfig::default()
4412 },
4413 CompactionConfig::default(),
4414 );
4415 let store = EventStore::with_config(config);
4416
4417 assert_eq!(
4419 store.stats().total_events,
4420 5,
4421 "Session 2 should have all 5 events after WAL recovery"
4422 );
4423
4424 let parquet_files = find_parquet_files(&storage_dir);
4428 assert!(
4429 !parquet_files.is_empty(),
4430 "Parquet file should exist after WAL checkpoint"
4431 );
4432 }
4433
4434 {
4437 let config = EventStoreConfig::production(
4438 &storage_dir,
4439 &wal_dir,
4440 SnapshotConfig::default(),
4441 WALConfig {
4442 sync_on_write: true,
4443 ..WALConfig::default()
4444 },
4445 CompactionConfig::default(),
4446 );
4447 let store = EventStore::with_config(config);
4448
4449 assert_eq!(
4453 store.stats().total_events,
4454 0,
4455 "Session 3 boot should not pre-load Parquet (lazy-load mode)"
4456 );
4457
4458 store.ensure_tenant_loaded("default").unwrap();
4461 assert_eq!(
4462 store.stats().total_events,
4463 5,
4464 "Session 3 should have all 5 events after ensure_tenant_loaded"
4465 );
4466 }
4467 }
4468
4469 #[test]
4470 fn test_parquet_restore_surfaces_errors_not_silent() {
4471 let data_dir = TempDir::new().unwrap();
4475 let storage_dir = data_dir.path().join("storage");
4476 let wal_dir = data_dir.path().join("wal");
4477
4478 {
4480 let config = EventStoreConfig::production(
4481 &storage_dir,
4482 &wal_dir,
4483 SnapshotConfig::default(),
4484 WALConfig {
4485 sync_on_write: true,
4486 ..WALConfig::default()
4487 },
4488 CompactionConfig::default(),
4489 );
4490 let store = EventStore::with_config(config);
4491
4492 for i in 0..3 {
4493 let event = Event::from_strings(
4494 "test.created".to_string(),
4495 format!("entity-{i}"),
4496 "default".to_string(),
4497 serde_json::json!({"i": i}),
4498 None,
4499 )
4500 .unwrap();
4501 store.ingest(&event).unwrap();
4502 }
4503
4504 store.flush_storage().unwrap();
4505 assert_eq!(store.stats().total_events, 3);
4506 }
4507
4508 let parquet_files = find_parquet_files(&storage_dir);
4511 assert!(!parquet_files.is_empty(), "Parquet file must exist");
4512
4513 std::fs::write(&parquet_files[0], b"corrupted data").unwrap();
4515
4516 for entry in std::fs::read_dir(&wal_dir).unwrap().flatten() {
4518 std::fs::write(entry.path(), b"").unwrap();
4519 }
4520
4521 {
4528 let config = EventStoreConfig::production(
4529 &storage_dir,
4530 &wal_dir,
4531 SnapshotConfig::default(),
4532 WALConfig::default(),
4533 CompactionConfig::default(),
4534 );
4535 let store = EventStore::with_config(config);
4536
4537 assert_eq!(store.stats().total_events, 0);
4540 }
4541 }
4542
4543 fn count_wal_entries(wal_dir: &std::path::Path) -> usize {
4553 use std::io::{BufRead, BufReader};
4554 let mut total = 0usize;
4555 let Ok(entries) = std::fs::read_dir(wal_dir) else {
4556 return 0;
4557 };
4558 for entry in entries.flatten() {
4559 let path = entry.path();
4560 if path.extension().is_none_or(|e| e != "log") {
4561 continue;
4562 }
4563 let Ok(file) = std::fs::File::open(&path) else {
4564 continue;
4565 };
4566 for line in BufReader::new(file)
4567 .lines()
4568 .map_while(std::result::Result::ok)
4569 {
4570 if !line.trim().is_empty() {
4571 total += 1;
4572 }
4573 }
4574 }
4575 total
4576 }
4577
4578 #[test]
4579 fn test_checkpoint_truncates_wal_after_flush() {
4580 let data_dir = TempDir::new().unwrap();
4585 let storage_dir = data_dir.path().join("storage");
4586 let wal_dir = data_dir.path().join("wal");
4587
4588 let config = EventStoreConfig::production(
4589 &storage_dir,
4590 &wal_dir,
4591 SnapshotConfig::default(),
4592 WALConfig {
4593 sync_on_write: true,
4594 ..WALConfig::default()
4595 },
4596 CompactionConfig::default(),
4597 );
4598 let store = EventStore::with_config(config);
4599
4600 for i in 0..10 {
4601 let event = Event::from_strings(
4602 "test.created".to_string(),
4603 format!("entity-{i}"),
4604 "default".to_string(),
4605 serde_json::json!({"i": i}),
4606 None,
4607 )
4608 .unwrap();
4609 store.ingest(&event).unwrap();
4610 }
4611
4612 assert_eq!(
4614 count_wal_entries(&wal_dir),
4615 10,
4616 "WAL should have 10 events before checkpoint"
4617 );
4618
4619 store.checkpoint().unwrap();
4620
4621 assert_eq!(
4622 count_wal_entries(&wal_dir),
4623 0,
4624 "WAL should be empty after successful checkpoint"
4625 );
4626 let parquet_files = find_parquet_files(&storage_dir);
4627 assert!(!parquet_files.is_empty(), "Parquet should hold the events");
4628 }
4629
4630 #[test]
4631 fn test_replay_only_post_checkpoint_events_after_crash() {
4632 let data_dir = TempDir::new().unwrap();
4639 let storage_dir = data_dir.path().join("storage");
4640 let wal_dir = data_dir.path().join("wal");
4641
4642 let config_factory = || {
4643 EventStoreConfig::production(
4644 &storage_dir,
4645 &wal_dir,
4646 SnapshotConfig::default(),
4647 WALConfig {
4648 sync_on_write: true,
4649 ..WALConfig::default()
4650 },
4651 CompactionConfig::default(),
4652 )
4653 };
4654
4655 const N: usize = 50;
4658 const K: usize = 5;
4659 {
4660 let store = EventStore::with_config(config_factory());
4661 for i in 0..N {
4662 store
4663 .ingest(
4664 &Event::from_strings(
4665 "pre.checkpoint".to_string(),
4666 format!("e-{i}"),
4667 "default".to_string(),
4668 serde_json::json!({"i": i}),
4669 None,
4670 )
4671 .unwrap(),
4672 )
4673 .unwrap();
4674 }
4675 store.checkpoint().unwrap();
4676 assert_eq!(
4677 count_wal_entries(&wal_dir),
4678 0,
4679 "WAL should be empty immediately after checkpoint"
4680 );
4681
4682 for i in 0..K {
4683 store
4684 .ingest(
4685 &Event::from_strings(
4686 "post.checkpoint".to_string(),
4687 format!("p-{i}"),
4688 "default".to_string(),
4689 serde_json::json!({"i": i}),
4690 None,
4691 )
4692 .unwrap(),
4693 )
4694 .unwrap();
4695 }
4696 assert_eq!(
4697 count_wal_entries(&wal_dir),
4698 K,
4699 "WAL should hold only post-checkpoint events"
4700 );
4701 }
4703
4704 {
4708 let store = EventStore::with_config(config_factory());
4709 assert_eq!(
4713 store.stats().total_events,
4714 K,
4715 "Boot should replay exactly K events from WAL (the post-checkpoint window), not N+K"
4716 );
4717
4718 store.ensure_tenant_loaded("default").unwrap();
4720 assert_eq!(
4721 store.stats().total_events,
4722 N + K,
4723 "After lazy-load, both pre- and post-checkpoint events should be reachable"
4724 );
4725 }
4726 }
4727
4728 #[test]
4729 fn test_checkpoint_is_idempotent() {
4730 let data_dir = TempDir::new().unwrap();
4733 let storage_dir = data_dir.path().join("storage");
4734 let wal_dir = data_dir.path().join("wal");
4735
4736 let store = EventStore::with_config(EventStoreConfig::production(
4737 &storage_dir,
4738 &wal_dir,
4739 SnapshotConfig::default(),
4740 WALConfig::default(),
4741 CompactionConfig::default(),
4742 ));
4743
4744 for i in 0..5 {
4745 store
4746 .ingest(
4747 &Event::from_strings(
4748 "x".to_string(),
4749 format!("e-{i}"),
4750 "default".to_string(),
4751 serde_json::json!({}),
4752 None,
4753 )
4754 .unwrap(),
4755 )
4756 .unwrap();
4757 }
4758
4759 store.checkpoint().unwrap();
4760 store.checkpoint().unwrap();
4762 assert_eq!(count_wal_entries(&wal_dir), 0);
4763 }
4764
4765 #[test]
4766 fn test_checkpoint_noop_in_memory_only_mode() {
4767 let store = EventStore::new();
4769 store.checkpoint().unwrap();
4770 }
4771
4772 #[test]
4773 fn test_checkpoint_interval_from_env_defaults_to_60s_when_wal_enabled() {
4774 let (config, _) = EventStoreConfig::from_env_vars(
4775 Some("/app/data".to_string()),
4776 None,
4777 None,
4778 None,
4779 None,
4780 None,
4781 None,
4782 None,
4783 );
4784 assert_eq!(config.checkpoint_interval_secs, Some(60));
4785 }
4786
4787 #[test]
4788 fn test_checkpoint_interval_from_env_overrides_default() {
4789 let (config, _) = EventStoreConfig::from_env_vars(
4790 Some("/app/data".to_string()),
4791 None,
4792 None,
4793 None,
4794 None,
4795 None,
4796 None,
4797 Some("15".to_string()),
4798 );
4799 assert_eq!(config.checkpoint_interval_secs, Some(15));
4800 }
4801
4802 #[test]
4803 fn test_checkpoint_interval_disabled_when_wal_disabled() {
4804 let (config, _) = EventStoreConfig::from_env_vars(
4806 Some("/app/data".to_string()),
4807 None,
4808 None,
4809 Some("false".to_string()),
4810 None,
4811 None,
4812 None,
4813 Some("15".to_string()),
4814 );
4815 assert_eq!(config.checkpoint_interval_secs, None);
4816 }
4817
4818 #[test]
4819 fn test_checkpoint_interval_unparseable_falls_back_to_default() {
4820 let (config, _) = EventStoreConfig::from_env_vars(
4821 Some("/app/data".to_string()),
4822 None,
4823 None,
4824 None,
4825 None,
4826 None,
4827 None,
4828 Some("not-a-number".to_string()),
4829 );
4830 assert_eq!(config.checkpoint_interval_secs, Some(60));
4831 }
4832}