1#[cfg(feature = "server")]
2use crate::application::services::webhook::WebhookRegistry;
3#[cfg(feature = "server")]
4use crate::infrastructure::observability::metrics::MetricsRegistry;
5#[cfg(feature = "server")]
6use crate::infrastructure::web::websocket::WebSocketManager;
7use crate::{
8 application::{
9 dto::QueryEventsRequest,
10 services::{
11 consumer::ConsumerRegistry,
12 exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
13 pipeline::PipelineManager,
14 projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
15 replay::ReplayManager,
16 schema::{SchemaRegistry, SchemaRegistryConfig},
17 schema_evolution::SchemaEvolutionManager,
18 },
19 },
20 domain::entities::Event,
21 error::{AllSourceError, Result},
22 infrastructure::{
23 persistence::{
24 compaction::{CompactionConfig, CompactionManager},
25 index::{EventIndex, IndexEntry},
26 snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
27 storage::ParquetStorage,
28 tenant_loader::TenantLoader,
29 wal::{WALConfig, WriteAheadLog},
30 },
31 query::geospatial::GeoIndex,
32 },
33};
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use parking_lot::RwLock;
37use std::{path::PathBuf, sync::Arc};
38#[cfg(feature = "server")]
39use tokio::sync::mpsc;
40
41pub struct EventStore {
43 events: Arc<RwLock<Vec<Event>>>,
45
46 index: Arc<EventIndex>,
48
49 pub(crate) projections: Arc<RwLock<ProjectionManager>>,
51
52 storage: Option<Arc<RwLock<ParquetStorage>>>,
54
55 #[cfg(feature = "server")]
57 websocket_manager: Arc<WebSocketManager>,
58
59 snapshot_manager: Arc<SnapshotManager>,
61
62 wal: Option<Arc<WriteAheadLog>>,
64
65 compaction_manager: Option<Arc<CompactionManager>>,
67
68 schema_registry: Arc<SchemaRegistry>,
70
71 replay_manager: Arc<ReplayManager>,
73
74 pipeline_manager: Arc<PipelineManager>,
76
77 #[cfg(feature = "server")]
79 metrics: Arc<MetricsRegistry>,
80
81 total_ingested: Arc<RwLock<u64>>,
83
84 projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
88
89 projection_status: Arc<DashMap<String, String>>,
92
93 #[cfg(feature = "server")]
95 webhook_registry: Arc<WebhookRegistry>,
96
97 #[cfg(feature = "server")]
99 webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
100
101 geo_index: Arc<GeoIndex>,
103
104 exactly_once: Arc<ExactlyOnceRegistry>,
106
107 schema_evolution: Arc<SchemaEvolutionManager>,
109
110 entity_versions: Arc<DashMap<String, u64>>,
113
114 consumer_registry: Arc<ConsumerRegistry>,
116
117 event_broadcast_tx: tokio::sync::broadcast::Sender<Arc<Event>>,
121
122 tenant_loader: Arc<TenantLoader>,
128
129 checkpoint_interval_secs: Option<u64>,
135}
136
137#[cfg(feature = "server")]
139#[derive(Debug, Clone)]
140pub struct WebhookDeliveryTask {
141 pub webhook: crate::application::services::webhook::WebhookSubscription,
142 pub event: Event,
143}
144
145impl EventStore {
146 pub fn new() -> Self {
148 Self::with_config(EventStoreConfig::default())
149 }
150
151 pub fn with_config(config: EventStoreConfig) -> Self {
153 let mut projections = ProjectionManager::new();
154
155 projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
157 projections.register(Arc::new(EventCounterProjection::new("event_counters")));
158
159 let storage = config
161 .storage_dir
162 .as_ref()
163 .and_then(|dir| match ParquetStorage::new(dir) {
164 Ok(storage) => {
165 tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
166 Some(Arc::new(RwLock::new(storage)))
167 }
168 Err(e) => {
169 tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
170 None
171 }
172 });
173
174 let wal = config.wal_dir.as_ref().and_then(|dir| {
176 match WriteAheadLog::new(dir, config.wal_config.clone()) {
177 Ok(wal) => {
178 tracing::info!("✅ WAL enabled at: {}", dir.display());
179 Some(Arc::new(wal))
180 }
181 Err(e) => {
182 tracing::error!("❌ Failed to initialize WAL: {}", e);
183 None
184 }
185 }
186 });
187
188 let compaction_manager = config.storage_dir.as_ref().map(|dir| {
190 let manager = CompactionManager::new(dir, config.compaction_config.clone());
191 Arc::new(manager)
192 });
193
194 let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
196 tracing::info!("✅ Schema registry enabled");
197
198 let replay_manager = Arc::new(ReplayManager::new());
200 tracing::info!("✅ Replay manager enabled");
201
202 let pipeline_manager = Arc::new(PipelineManager::new());
204 tracing::info!("✅ Pipeline manager enabled");
205
206 #[cfg(feature = "server")]
208 let metrics = {
209 let m = MetricsRegistry::new();
210 tracing::info!("✅ Prometheus metrics registry initialized");
211 m
212 };
213
214 let projection_state_cache = Arc::new(DashMap::new());
216 tracing::info!("✅ Projection state cache initialized");
217
218 #[cfg(feature = "server")]
220 let webhook_registry = {
221 let w = Arc::new(WebhookRegistry::new());
222 tracing::info!("✅ Webhook registry initialized");
223 w
224 };
225
226 let (event_broadcast_tx, _) = tokio::sync::broadcast::channel(1024);
229
230 let store = Self {
231 events: Arc::new(RwLock::new(Vec::new())),
232 index: Arc::new(EventIndex::new()),
233 projections: Arc::new(RwLock::new(projections)),
234 storage,
235 #[cfg(feature = "server")]
236 websocket_manager: Arc::new(WebSocketManager::new()),
237 snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
238 wal,
239 compaction_manager,
240 schema_registry,
241 replay_manager,
242 pipeline_manager,
243 #[cfg(feature = "server")]
244 metrics,
245 total_ingested: Arc::new(RwLock::new(0)),
246 projection_state_cache,
247 projection_status: Arc::new(DashMap::new()),
248 #[cfg(feature = "server")]
249 webhook_registry,
250 #[cfg(feature = "server")]
251 webhook_tx: Arc::new(RwLock::new(None)),
252 geo_index: Arc::new(GeoIndex::new()),
253 exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
254 schema_evolution: Arc::new(SchemaEvolutionManager::new()),
255 entity_versions: Arc::new(DashMap::new()),
256 consumer_registry: Arc::new(ConsumerRegistry::new()),
257 event_broadcast_tx,
258 tenant_loader: {
259 let loader = TenantLoader::new();
260 if let Some(budget) = config.cache_byte_budget {
261 loader.set_byte_budget(budget);
262 tracing::info!(
263 "✅ Cache byte budget set to {} bytes ({:.2} GiB) — LRU eviction enabled",
264 budget,
265 budget as f64 / (1024.0 * 1024.0 * 1024.0)
266 );
267 } else {
268 tracing::info!(
269 "✅ Cache budget unset — every loaded tenant stays resident \
270 (set ALLSOURCE_CACHE_BYTES to enable eviction)"
271 );
272 }
273 Arc::new(loader)
274 },
275 checkpoint_interval_secs: config.checkpoint_interval_secs,
276 };
277
278 if let Some(ref wal) = store.wal {
300 match wal.recover() {
301 Ok(recovered_events) if !recovered_events.is_empty() => {
302 let mut wal_new = 0usize;
303 for event in recovered_events {
304 let offset = store.events.read().len();
305 if let Err(e) = store.index.index_event(
306 event.id,
307 event.entity_id_str(),
308 event.event_type_str(),
309 event.timestamp,
310 offset,
311 ) {
312 tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
313 }
314
315 if let Err(e) = store.projections.read().process_event(&event) {
316 tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
317 }
318
319 *store
320 .entity_versions
321 .entry(event.entity_id_str().to_string())
322 .or_insert(0) += 1;
323
324 store.events.write().push(event);
325 wal_new += 1;
326 }
327
328 #[cfg(feature = "server")]
333 store.metrics.wal_replay_events_total.set(wal_new as i64);
334
335 if wal_new > 0 {
336 let total = store.events.read().len();
337 *store.total_ingested.write() = total as u64;
345 tracing::info!(
346 "✅ Recovered {} events from WAL (Parquet data stays cold until \
347 first per-tenant query)",
348 wal_new
349 );
350
351 if let Some(ref storage) = store.storage {
359 tracing::info!(
360 "📸 Checkpointing {} WAL events to Parquet storage...",
361 wal_new
362 );
363 let parquet = storage.read();
364 let events = store.events.read();
365 let mut buffered = 0usize;
366 for event in events.iter().skip(events.len() - wal_new) {
367 if let Err(e) = parquet.append_event(event.clone()) {
368 tracing::error!(
369 "Failed to buffer WAL event for Parquet: {}",
370 e
371 );
372 } else {
373 buffered += 1;
374 }
375 }
376 drop(events);
377 drop(parquet);
378
379 if buffered > 0 {
380 if let Err(e) = store.flush_storage() {
381 tracing::error!("Failed to checkpoint to Parquet: {}", e);
382 } else if let Err(e) = wal.truncate() {
383 tracing::error!(
384 "Failed to truncate WAL after checkpoint: {}",
385 e
386 );
387 } else {
388 tracing::info!(
389 "✅ WAL checkpointed and truncated ({} events)",
390 buffered
391 );
392 }
393 }
394 }
395 }
396 }
397 Ok(_) => {
398 tracing::debug!("No events to recover from WAL");
399 #[cfg(feature = "server")]
400 store.metrics.wal_replay_events_total.set(0);
401 }
402 Err(e) => {
403 tracing::error!("❌ WAL recovery failed: {}", e);
404 }
405 }
406 } else if store.storage.is_some() {
407 tracing::info!(
408 "📂 Boot complete (lazy-load mode): Parquet data stays on disk until first \
409 per-tenant query"
410 );
411 }
412
413 store
414 }
415
416 #[cfg_attr(feature = "hotpath", hotpath::measure)]
424 pub fn ingest_with_expected_version(
425 &self,
426 event: &Event,
427 expected_version: Option<u64>,
428 ) -> Result<u64> {
429 self.validate_event(event)?;
431
432 let entity_id = event.entity_id_str().to_string();
433
434 let new_version = {
437 let mut version_entry = self.entity_versions.entry(entity_id.clone()).or_insert(0);
438 let current = *version_entry;
439
440 if let Some(expected) = expected_version
441 && current != expected
442 {
443 return Err(crate::error::AllSourceError::VersionConflict { expected, current });
444 }
445
446 if let Some(ref wal) = self.wal {
448 wal.append(event.clone())?;
449 }
450
451 *version_entry += 1;
452 *version_entry
453 };
454
455 self.ingest_post_wal(event)?;
458
459 Ok(new_version)
460 }
461
462 #[cfg_attr(feature = "hotpath", hotpath::measure)]
465 fn ingest_post_wal(&self, event: &Event) -> Result<()> {
466 #[cfg(feature = "server")]
467 let timer = self.metrics.ingestion_duration_seconds.start_timer();
468
469 let mut events = self.events.write();
470 let offset = events.len();
471
472 self.index.index_event(
474 event.id,
475 event.entity_id_str(),
476 event.event_type_str(),
477 event.timestamp,
478 offset,
479 )?;
480
481 let projections = self.projections.read();
483 projections.process_event(event)?;
484 drop(projections);
485
486 let pipeline_results = self.pipeline_manager.process_event(event);
488 if !pipeline_results.is_empty() {
489 tracing::debug!(
490 "Event {} processed by {} pipeline(s)",
491 event.id,
492 pipeline_results.len()
493 );
494 for (pipeline_id, result) in pipeline_results {
495 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
496 }
497 }
498
499 if let Some(ref storage) = self.storage {
501 let storage = storage.read();
502 storage.append_event(event.clone())?;
503 }
504
505 events.push(event.clone());
507 let total_events = events.len();
508 drop(events);
509
510 let event_arc = Arc::new(event.clone());
512 let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
513 #[cfg(feature = "server")]
514 self.websocket_manager.broadcast_event(event_arc);
515
516 #[cfg(feature = "server")]
518 self.dispatch_webhooks(event);
519
520 self.geo_index.index_event(event);
522
523 self.schema_evolution
525 .analyze_event(event.event_type_str(), &event.payload);
526
527 self.check_auto_snapshot(event.entity_id_str(), event);
529
530 #[cfg(feature = "server")]
532 {
533 self.metrics.events_ingested_total.inc();
534 self.metrics
535 .events_ingested_by_type
536 .with_label_values(&[event.event_type_str()])
537 .inc();
538 self.metrics.storage_events_total.set(total_events as i64);
539 }
540
541 let mut total = self.total_ingested.write();
543 *total += 1;
544
545 #[cfg(feature = "server")]
546 timer.observe_duration();
547
548 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
549
550 Ok(())
551 }
552
553 #[cfg_attr(feature = "hotpath", hotpath::measure)]
555 pub fn ingest(&self, event: &Event) -> Result<()> {
556 #[cfg(feature = "server")]
558 let timer = self.metrics.ingestion_duration_seconds.start_timer();
559
560 let validation_result = self.validate_event(event);
562 if let Err(e) = validation_result {
563 #[cfg(feature = "server")]
564 {
565 self.metrics.ingestion_errors_total.inc();
566 timer.observe_duration();
567 }
568 return Err(e);
569 }
570
571 if let Some(ref wal) = self.wal
574 && let Err(e) = wal.append(event.clone())
575 {
576 #[cfg(feature = "server")]
577 {
578 self.metrics.ingestion_errors_total.inc();
579 timer.observe_duration();
580 }
581 return Err(e);
582 }
583
584 *self
586 .entity_versions
587 .entry(event.entity_id_str().to_string())
588 .or_insert(0) += 1;
589
590 let mut events = self.events.write();
591 let offset = events.len();
592
593 self.index.index_event(
595 event.id,
596 event.entity_id_str(),
597 event.event_type_str(),
598 event.timestamp,
599 offset,
600 )?;
601
602 let projections = self.projections.read();
604 projections.process_event(event)?;
605 drop(projections); let pipeline_results = self.pipeline_manager.process_event(event);
610 if !pipeline_results.is_empty() {
611 tracing::debug!(
612 "Event {} processed by {} pipeline(s)",
613 event.id,
614 pipeline_results.len()
615 );
616 for (pipeline_id, result) in pipeline_results {
619 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
620 }
621 }
622
623 if let Some(ref storage) = self.storage {
625 let storage = storage.read();
626 storage.append_event(event.clone())?;
627 }
628
629 events.push(event.clone());
631 let total_events = events.len();
632 drop(events); let event_arc = Arc::new(event.clone());
636 let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
637 #[cfg(feature = "server")]
638 self.websocket_manager.broadcast_event(event_arc);
639
640 #[cfg(feature = "server")]
642 self.dispatch_webhooks(event);
643
644 self.geo_index.index_event(event);
646
647 self.schema_evolution
649 .analyze_event(event.event_type_str(), &event.payload);
650
651 self.check_auto_snapshot(event.entity_id_str(), event);
653
654 #[cfg(feature = "server")]
656 {
657 self.metrics.events_ingested_total.inc();
658 self.metrics
659 .events_ingested_by_type
660 .with_label_values(&[event.event_type_str()])
661 .inc();
662 self.metrics.storage_events_total.set(total_events as i64);
663 }
664
665 let mut total = self.total_ingested.write();
667 *total += 1;
668
669 #[cfg(feature = "server")]
670 timer.observe_duration();
671
672 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
673
674 Ok(())
675 }
676
677 #[cfg_attr(feature = "hotpath", hotpath::measure)]
684 pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()> {
685 if batch.is_empty() {
686 return Ok(());
687 }
688
689 for event in &batch {
691 self.validate_event(event)?;
692 }
693
694 if let Some(ref wal) = self.wal {
696 for event in &batch {
697 wal.append(event.clone())?;
698 }
699 }
700
701 let mut events = self.events.write();
703 let projections = self.projections.read();
704
705 for event in batch {
706 let offset = events.len();
707
708 self.index.index_event(
709 event.id,
710 event.entity_id_str(),
711 event.event_type_str(),
712 event.timestamp,
713 offset,
714 )?;
715
716 projections.process_event(&event)?;
717 self.pipeline_manager.process_event(&event);
718
719 if let Some(ref storage) = self.storage {
720 let storage = storage.read();
721 storage.append_event(event.clone())?;
722 }
723
724 self.geo_index.index_event(&event);
725 self.schema_evolution
726 .analyze_event(event.event_type_str(), &event.payload);
727
728 *self
730 .entity_versions
731 .entry(event.entity_id_str().to_string())
732 .or_insert(0) += 1;
733
734 let _ = self.event_broadcast_tx.send(Arc::new(event.clone()));
736
737 events.push(event);
738 }
739
740 let total_events = events.len();
741 drop(projections);
742 drop(events);
743
744 let mut total = self.total_ingested.write();
745 *total += total_events as u64;
746
747 Ok(())
748 }
749
750 #[cfg_attr(feature = "hotpath", hotpath::measure)]
757 pub fn ingest_replicated(&self, event: &Event) -> Result<()> {
758 #[cfg(feature = "server")]
759 let timer = self.metrics.ingestion_duration_seconds.start_timer();
760
761 let mut events = self.events.write();
762 let offset = events.len();
763
764 self.index.index_event(
766 event.id,
767 event.entity_id_str(),
768 event.event_type_str(),
769 event.timestamp,
770 offset,
771 )?;
772
773 let projections = self.projections.read();
775 projections.process_event(event)?;
776 drop(projections);
777
778 let pipeline_results = self.pipeline_manager.process_event(event);
780 if !pipeline_results.is_empty() {
781 tracing::debug!(
782 "Replicated event {} processed by {} pipeline(s)",
783 event.id,
784 pipeline_results.len()
785 );
786 }
787
788 *self
790 .entity_versions
791 .entry(event.entity_id_str().to_string())
792 .or_insert(0) += 1;
793
794 events.push(event.clone());
796 let total_events = events.len();
797 drop(events);
798
799 let event_arc = Arc::new(event.clone());
801 let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
802 #[cfg(feature = "server")]
803 self.websocket_manager.broadcast_event(event_arc);
804
805 #[cfg(feature = "server")]
807 {
808 self.metrics.events_ingested_total.inc();
809 self.metrics
810 .events_ingested_by_type
811 .with_label_values(&[event.event_type_str()])
812 .inc();
813 self.metrics.storage_events_total.set(total_events as i64);
814 }
815
816 let mut total = self.total_ingested.write();
817 *total += 1;
818
819 #[cfg(feature = "server")]
820 timer.observe_duration();
821
822 tracing::debug!(
823 "Replicated event ingested: {} (offset: {})",
824 event.id,
825 offset
826 );
827
828 Ok(())
829 }
830
831 #[cfg_attr(feature = "hotpath", hotpath::measure)]
834 pub fn get_entity_version(&self, entity_id: &str) -> u64 {
835 self.entity_versions.get(entity_id).map_or(0, |v| *v)
836 }
837
838 pub fn consumer_registry(&self) -> &ConsumerRegistry {
840 &self.consumer_registry
841 }
842
843 pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<Arc<Event>> {
849 self.event_broadcast_tx.subscribe()
850 }
851
852 pub fn set_consumer_registry(&mut self, registry: Arc<ConsumerRegistry>) {
857 self.consumer_registry = registry;
858 }
859
860 pub fn total_events(&self) -> usize {
862 self.events.read().len()
863 }
864
865 pub fn events_after_offset(
868 &self,
869 offset: u64,
870 filters: &[String],
871 limit: usize,
872 ) -> Vec<(u64, Event)> {
873 let events = self.events.read();
874 let start = offset as usize;
875 if start >= events.len() {
876 return vec![];
877 }
878
879 events[start..]
880 .iter()
881 .enumerate()
882 .filter(|(_, event)| ConsumerRegistry::matches_filters(event.event_type_str(), filters))
883 .take(limit)
884 .map(|(i, event)| ((start + i + 1) as u64, event.clone()))
885 .collect()
886 }
887
888 #[cfg(feature = "server")]
890 pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
891 Arc::clone(&self.websocket_manager)
892 }
893
894 pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
896 Arc::clone(&self.snapshot_manager)
897 }
898
899 pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
901 self.compaction_manager.as_ref().map(Arc::clone)
902 }
903
904 pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
906 Arc::clone(&self.schema_registry)
907 }
908
909 pub fn replay_manager(&self) -> Arc<ReplayManager> {
911 Arc::clone(&self.replay_manager)
912 }
913
914 pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
916 Arc::clone(&self.pipeline_manager)
917 }
918
919 #[cfg(feature = "server")]
921 pub fn metrics(&self) -> Arc<MetricsRegistry> {
922 Arc::clone(&self.metrics)
923 }
924
925 pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
927 self.projections.read()
928 }
929
930 pub fn register_projection(
939 &self,
940 projection: Arc<dyn crate::application::services::projection::Projection>,
941 ) {
942 let mut pm = self.projections.write();
943 pm.register(projection);
944 }
945
946 pub fn register_projection_with_backfill(
952 &self,
953 projection: &Arc<dyn crate::application::services::projection::Projection>,
954 ) -> Result<()> {
955 {
957 let mut pm = self.projections.write();
958 pm.register(Arc::clone(projection));
959 }
960
961 let events = self.events.read();
963 for event in events.iter() {
964 projection.process(event)?;
965 }
966
967 Ok(())
968 }
969
970 pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
973 Arc::clone(&self.projection_state_cache)
974 }
975
976 pub fn projection_status(&self) -> Arc<DashMap<String, String>> {
978 Arc::clone(&self.projection_status)
979 }
980
981 pub fn geo_index(&self) -> Arc<GeoIndex> {
984 self.geo_index.clone()
985 }
986
987 pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
989 self.exactly_once.clone()
990 }
991
992 pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
994 self.schema_evolution.clone()
995 }
996
997 pub fn snapshot_events(&self) -> Vec<Event> {
1003 self.events.read().clone()
1004 }
1005
1006 pub fn compact_entity_tokens(
1028 &self,
1029 entity_id: &str,
1030 token_event_type: &str,
1031 merged_event: Event,
1032 ) -> Result<bool> {
1033 {
1035 let events = self.events.read();
1036 let has_tokens = events
1037 .iter()
1038 .any(|e| e.entity_id_str() == entity_id && e.event_type_str() == token_event_type);
1039 if !has_tokens {
1040 return Ok(false);
1041 }
1042 }
1043
1044 let projections = self.projections.read();
1046 projections.process_event(&merged_event)?;
1047 drop(projections);
1048
1049 let mut events = self.events.write();
1051
1052 events.retain(|e| {
1053 !(e.entity_id_str() == entity_id && e.event_type_str() == token_event_type)
1054 });
1055
1056 events.push(merged_event.clone());
1057
1058 if let Some(ref wal) = self.wal {
1062 wal.append(merged_event)?;
1063 }
1064
1065 self.index.clear();
1070 for (offset, event) in events.iter().enumerate() {
1071 if let Err(e) = self.index.index_event(
1072 event.id,
1073 event.entity_id_str(),
1074 event.event_type_str(),
1075 event.timestamp,
1076 offset,
1077 ) {
1078 tracing::warn!(
1079 event_id = %event.id,
1080 offset,
1081 "Failed to re-index event during compaction: {e}"
1082 );
1083 }
1084 }
1085
1086 Ok(true)
1087 }
1088
1089 #[cfg(feature = "server")]
1090 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
1091 Arc::clone(&self.webhook_registry)
1092 }
1093
1094 #[cfg(feature = "server")]
1097 pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
1098 *self.webhook_tx.write() = Some(tx);
1099 tracing::info!("Webhook delivery channel connected");
1100 }
1101
1102 #[cfg(feature = "server")]
1104 fn dispatch_webhooks(&self, event: &Event) {
1105 let matching = self.webhook_registry.find_matching(event);
1106 if matching.is_empty() {
1107 return;
1108 }
1109
1110 let tx_guard = self.webhook_tx.read();
1111 if let Some(ref tx) = *tx_guard {
1112 for webhook in matching {
1113 let task = WebhookDeliveryTask {
1114 webhook,
1115 event: event.clone(),
1116 };
1117 if let Err(e) = tx.send(task) {
1118 tracing::warn!("Failed to queue webhook delivery: {}", e);
1119 }
1120 }
1121 }
1122 }
1123
1124 pub fn flush_storage(&self) -> Result<()> {
1126 if let Some(ref storage) = self.storage {
1127 let storage = storage.read();
1128 storage.flush()?;
1129 tracing::info!("✅ Flushed events to persistent storage");
1130 }
1131 Ok(())
1132 }
1133
1134 pub fn checkpoint(&self) -> Result<()> {
1155 let Some(ref wal) = self.wal else {
1156 return Ok(());
1157 };
1158
1159 self.flush_storage()?;
1164 wal.truncate()?;
1165 tracing::debug!("✅ Checkpoint complete: Parquet flushed, WAL truncated");
1166 Ok(())
1167 }
1168
1169 pub fn checkpoint_interval(&self) -> Option<std::time::Duration> {
1171 self.checkpoint_interval_secs
1172 .map(std::time::Duration::from_secs)
1173 }
1174
1175 pub fn ensure_tenant_loaded(&self, tenant_id: &str) -> Result<()> {
1199 if self.tenant_loader.is_loaded(tenant_id) {
1201 return Ok(());
1202 }
1203
1204 let Some(storage) = self.storage.as_ref().map(Arc::clone) else {
1205 self.tenant_loader.mark_loaded(tenant_id);
1208 return Ok(());
1209 };
1210
1211 let lock = self.tenant_loader.lock_for(tenant_id);
1214 let timeout = self.tenant_loader.load_timeout();
1215 let _guard = lock.try_lock_for(timeout).ok_or_else(|| {
1216 AllSourceError::StorageError(format!(
1217 "ensure_tenant_loaded timed out after {timeout:?} waiting for in-flight load of \
1218 tenant {tenant_id:?}"
1219 ))
1220 })?;
1221
1222 if self.tenant_loader.is_loaded(tenant_id) {
1225 return Ok(());
1226 }
1227
1228 let started = std::time::Instant::now();
1229 let events = storage.read().load_events_for_tenant(tenant_id)?;
1230 let read_count = events.len();
1231
1232 let before = self.events.read().len();
1233 for event in events {
1234 self.append_loaded_event(event);
1235 }
1236 let applied = self.events.read().len() - before;
1237
1238 *self.total_ingested.write() += applied as u64;
1242 self.tenant_loader.mark_loaded(tenant_id);
1243
1244 tracing::info!(
1245 tenant_id = tenant_id,
1246 read = read_count,
1247 applied = applied,
1248 elapsed_ms = started.elapsed().as_millis() as u64,
1249 "ensure_tenant_loaded: tenant hydrated"
1250 );
1251
1252 self.enforce_cache_budget(tenant_id);
1260
1261 #[cfg(feature = "server")]
1264 self.metrics
1265 .cache_bytes
1266 .set(self.tenant_loader.total_bytes() as i64);
1267
1268 Ok(())
1269 }
1270
1271 fn enforce_cache_budget(&self, recently_touched: &str) {
1282 if !self.tenant_loader.over_budget() {
1283 return;
1284 }
1285 loop {
1286 let Some(victim) = self.tenant_loader.pick_lru_excluding(recently_touched) else {
1287 tracing::warn!(
1288 cache_bytes = self.tenant_loader.total_bytes(),
1289 budget = self.tenant_loader.byte_budget(),
1290 recently_touched = recently_touched,
1291 "cache over budget but no other tenant available to evict — \
1292 a single tenant exceeds the budget; consider raising it"
1293 );
1294 return;
1295 };
1296 self.evict_tenant(&victim);
1297 if !self.tenant_loader.over_budget() {
1298 return;
1299 }
1300 }
1301 }
1302
1303 pub fn is_tenant_loaded(&self, tenant_id: &str) -> bool {
1306 self.tenant_loader.is_loaded(tenant_id)
1307 }
1308
1309 pub fn evict_tenant(&self, tenant_id: &str) {
1337 let mut events = self.events.write();
1338 let before = events.len();
1339 let evicted_bytes = self.tenant_loader.bytes_for(tenant_id);
1340
1341 events.retain(|e| e.tenant_id_str() != tenant_id);
1342 let after = events.len();
1343 let dropped = before - after;
1344
1345 if dropped == 0 {
1346 drop(events);
1350 self.tenant_loader.mark_unloaded(tenant_id);
1351 return;
1352 }
1353
1354 self.index.clear();
1358 self.entity_versions.clear();
1359 for (offset, event) in events.iter().enumerate() {
1360 if let Err(e) = self.index.index_event(
1361 event.id,
1362 event.entity_id_str(),
1363 event.event_type_str(),
1364 event.timestamp,
1365 offset,
1366 ) {
1367 tracing::error!(
1368 "Failed to re-index event during eviction of {}: {}",
1369 tenant_id,
1370 e
1371 );
1372 }
1373 *self
1374 .entity_versions
1375 .entry(event.entity_id_str().to_string())
1376 .or_insert(0) += 1;
1377 }
1378 drop(events);
1379
1380 self.tenant_loader.mark_unloaded(tenant_id);
1381
1382 let mut t = self.total_ingested.write();
1385 *t = t.saturating_sub(dropped as u64);
1386 drop(t);
1387
1388 #[cfg(feature = "server")]
1391 {
1392 self.metrics.cache_evictions_total.inc();
1393 self.metrics
1394 .cache_bytes
1395 .set(self.tenant_loader.total_bytes() as i64);
1396 }
1397
1398 tracing::info!(
1399 tenant_id = tenant_id,
1400 events_dropped = dropped,
1401 bytes_freed = evicted_bytes,
1402 "evicted tenant from memory cache"
1403 );
1404 }
1405
1406 pub fn tenant_resident_bytes(&self, tenant_id: &str) -> u64 {
1410 self.tenant_loader.bytes_for(tenant_id)
1411 }
1412
1413 pub fn cache_resident_bytes(&self) -> u64 {
1416 self.tenant_loader.total_bytes()
1417 }
1418
1419 fn append_loaded_event(&self, event: Event) {
1441 if self.index.get_by_id(&event.id).is_some() {
1442 return;
1443 }
1444
1445 let event_bytes = event.estimated_size_bytes();
1446 let tenant = event.tenant_id_str().to_string();
1447
1448 let mut events = self.events.write();
1449 let offset = events.len();
1450
1451 if let Err(e) = self.index.index_event(
1452 event.id,
1453 event.entity_id_str(),
1454 event.event_type_str(),
1455 event.timestamp,
1456 offset,
1457 ) {
1458 tracing::error!("Failed to index loaded event {}: {}", event.id, e);
1459 }
1460
1461 if let Err(e) = self.projections.read().process_event(&event) {
1462 tracing::error!("Failed to project loaded event {}: {}", event.id, e);
1463 }
1464
1465 *self
1466 .entity_versions
1467 .entry(event.entity_id_str().to_string())
1468 .or_insert(0) += 1;
1469
1470 events.push(event);
1471 self.tenant_loader.add_bytes(&tenant, event_bytes);
1475 }
1476
1477 pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
1479 let events = self.query(&QueryEventsRequest {
1481 entity_id: Some(entity_id.to_string()),
1482 event_type: None,
1483 tenant_id: None,
1484 as_of: None,
1485 since: None,
1486 until: None,
1487 limit: None,
1488 event_type_prefix: None,
1489 payload_filter: None,
1490 })?;
1491
1492 if events.is_empty() {
1493 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1494 }
1495
1496 let mut state = serde_json::json!({});
1498 for event in &events {
1499 if let serde_json::Value::Object(ref mut state_map) = state
1500 && let serde_json::Value::Object(ref payload_map) = event.payload
1501 {
1502 for (key, value) in payload_map {
1503 state_map.insert(key.clone(), value.clone());
1504 }
1505 }
1506 }
1507
1508 let last_event = events.last().unwrap();
1509 self.snapshot_manager.create_snapshot(
1510 entity_id,
1511 state,
1512 last_event.timestamp,
1513 events.len(),
1514 SnapshotType::Manual,
1515 )?;
1516
1517 Ok(())
1518 }
1519
1520 fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
1522 let entity_event_count = self
1524 .index
1525 .get_by_entity(entity_id)
1526 .map_or(0, |entries| entries.len());
1527
1528 if self.snapshot_manager.should_create_snapshot(
1529 entity_id,
1530 entity_event_count,
1531 event.timestamp,
1532 ) {
1533 if let Err(e) = self.create_snapshot(entity_id) {
1535 tracing::warn!(
1536 "Failed to create automatic snapshot for {}: {}",
1537 entity_id,
1538 e
1539 );
1540 }
1541 }
1542 }
1543
1544 fn validate_event(&self, event: &Event) -> Result<()> {
1546 if event.entity_id_str().is_empty() {
1549 return Err(AllSourceError::ValidationError(
1550 "entity_id cannot be empty".to_string(),
1551 ));
1552 }
1553
1554 if event.event_type_str().is_empty() {
1555 return Err(AllSourceError::ValidationError(
1556 "event_type cannot be empty".to_string(),
1557 ));
1558 }
1559
1560 if event.event_type().is_system() {
1563 return Err(AllSourceError::ValidationError(
1564 "Event types starting with '_system.' are reserved for internal use".to_string(),
1565 ));
1566 }
1567
1568 Ok(())
1569 }
1570
1571 pub fn reset_projection(&self, name: &str) -> Result<usize> {
1573 let projection_manager = self.projections.read();
1574 let projection = projection_manager.get_projection(name).ok_or_else(|| {
1575 AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1576 })?;
1577
1578 projection.clear();
1580
1581 let prefix = format!("{name}:");
1583 let keys_to_remove: Vec<String> = self
1584 .projection_state_cache
1585 .iter()
1586 .filter(|entry| entry.key().starts_with(&prefix))
1587 .map(|entry| entry.key().clone())
1588 .collect();
1589 for key in keys_to_remove {
1590 self.projection_state_cache.remove(&key);
1591 }
1592
1593 let events = self.events.read();
1595 let mut reprocessed = 0usize;
1596 for event in events.iter() {
1597 if projection.process(event).is_ok() {
1598 reprocessed += 1;
1599 }
1600 }
1601
1602 Ok(reprocessed)
1603 }
1604
1605 pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
1607 if let Some(offset) = self.index.get_by_id(event_id) {
1608 let events = self.events.read();
1609 Ok(events.get(offset).cloned())
1610 } else {
1611 Ok(None)
1612 }
1613 }
1614
1615 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1617 pub fn query(&self, request: &QueryEventsRequest) -> Result<Vec<Event>> {
1618 if let Some(ref tenant_id) = request.tenant_id {
1637 self.ensure_tenant_loaded(tenant_id)?;
1638 self.tenant_loader.touch(tenant_id);
1642 }
1643
1644 let query_type = if request.entity_id.is_some() {
1646 "entity"
1647 } else if request.event_type.is_some() {
1648 "type"
1649 } else if request.event_type_prefix.is_some() {
1650 "type_prefix"
1651 } else {
1652 "full_scan"
1653 };
1654
1655 #[cfg(feature = "server")]
1657 let timer = self
1658 .metrics
1659 .query_duration_seconds
1660 .with_label_values(&[query_type])
1661 .start_timer();
1662
1663 #[cfg(feature = "server")]
1665 self.metrics
1666 .queries_total
1667 .with_label_values(&[query_type])
1668 .inc();
1669
1670 let events = self.events.read();
1671
1672 let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
1674 self.index
1676 .get_by_entity(entity_id)
1677 .map(|entries| self.filter_entries(entries, request))
1678 .unwrap_or_default()
1679 } else if let Some(event_type) = &request.event_type {
1680 self.index
1682 .get_by_type(event_type)
1683 .map(|entries| self.filter_entries(entries, request))
1684 .unwrap_or_default()
1685 } else if let Some(prefix) = &request.event_type_prefix {
1686 let entries = self.index.get_by_type_prefix(prefix);
1688 self.filter_entries(entries, request)
1689 } else {
1690 (0..events.len()).collect()
1692 };
1693
1694 let mut results: Vec<Event> = offsets
1696 .iter()
1697 .filter_map(|&offset| events.get(offset).cloned())
1698 .filter(|event| self.apply_filters(event, request))
1699 .collect();
1700
1701 results.sort_by(|a, b| {
1706 a.timestamp
1707 .cmp(&b.timestamp)
1708 .then_with(|| a.version.cmp(&b.version))
1709 });
1710
1711 if let Some(limit) = request.limit {
1713 results.truncate(limit);
1714 }
1715
1716 #[cfg(feature = "server")]
1718 {
1719 self.metrics
1720 .query_results_total
1721 .with_label_values(&[query_type])
1722 .inc_by(results.len() as u64);
1723 timer.observe_duration();
1724 }
1725
1726 Ok(results)
1727 }
1728
1729 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1731 fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
1732 entries
1733 .into_iter()
1734 .filter(|entry| {
1735 if let Some(as_of) = request.as_of
1737 && entry.timestamp > as_of
1738 {
1739 return false;
1740 }
1741 if let Some(since) = request.since
1742 && entry.timestamp < since
1743 {
1744 return false;
1745 }
1746 if let Some(until) = request.until
1747 && entry.timestamp > until
1748 {
1749 return false;
1750 }
1751 true
1752 })
1753 .map(|entry| entry.offset)
1754 .collect()
1755 }
1756
1757 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1759 fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
1760 if let Some(ref tid) = request.tenant_id
1762 && event.tenant_id_str() != tid
1763 {
1764 return false;
1765 }
1766
1767 if request.entity_id.is_some()
1769 && let Some(ref event_type) = request.event_type
1770 && event.event_type_str() != event_type
1771 {
1772 return false;
1773 }
1774
1775 if request.entity_id.is_some()
1777 && let Some(ref prefix) = request.event_type_prefix
1778 && !event.event_type_str().starts_with(prefix)
1779 {
1780 return false;
1781 }
1782
1783 if let Some(ref filter_str) = request.payload_filter
1785 && let Ok(filter_obj) =
1786 serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(filter_str)
1787 {
1788 let payload = event.payload();
1789 for (key, expected_value) in &filter_obj {
1790 match payload.get(key) {
1791 Some(actual_value) if actual_value == expected_value => {}
1792 _ => return false,
1793 }
1794 }
1795 }
1796
1797 true
1798 }
1799
1800 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1803 pub fn reconstruct_state(
1804 &self,
1805 entity_id: &str,
1806 as_of: Option<DateTime<Utc>>,
1807 ) -> Result<serde_json::Value> {
1808 let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
1810 if let Some(snapshot) = self
1812 .snapshot_manager
1813 .get_snapshot_as_of(entity_id, as_of_time)
1814 {
1815 tracing::debug!(
1816 "Using snapshot from {} for entity {} (saved {} events)",
1817 snapshot.as_of,
1818 entity_id,
1819 snapshot.event_count
1820 );
1821 (snapshot.state.clone(), Some(snapshot.as_of))
1822 } else {
1823 (serde_json::json!({}), None)
1824 }
1825 } else {
1826 if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
1828 tracing::debug!(
1829 "Using latest snapshot from {} for entity {}",
1830 snapshot.as_of,
1831 entity_id
1832 );
1833 (snapshot.state.clone(), Some(snapshot.as_of))
1834 } else {
1835 (serde_json::json!({}), None)
1836 }
1837 };
1838
1839 let events = self.query(&QueryEventsRequest {
1841 entity_id: Some(entity_id.to_string()),
1842 event_type: None,
1843 tenant_id: None,
1844 as_of,
1845 since: since_timestamp,
1846 until: None,
1847 limit: None,
1848 event_type_prefix: None,
1849 payload_filter: None,
1850 })?;
1851
1852 if events.is_empty() && since_timestamp.is_none() {
1854 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1855 }
1856
1857 let mut merged_state = merged_state;
1859 for event in &events {
1860 if let serde_json::Value::Object(ref mut state_map) = merged_state
1861 && let serde_json::Value::Object(ref payload_map) = event.payload
1862 {
1863 for (key, value) in payload_map {
1864 state_map.insert(key.clone(), value.clone());
1865 }
1866 }
1867 }
1868
1869 let state = serde_json::json!({
1871 "entity_id": entity_id,
1872 "last_updated": events.last().map(|e| e.timestamp),
1873 "event_count": events.len(),
1874 "as_of": as_of,
1875 "current_state": merged_state,
1876 "history": events.iter().map(|e| {
1877 serde_json::json!({
1878 "event_id": e.id,
1879 "type": e.event_type,
1880 "timestamp": e.timestamp,
1881 "payload": e.payload
1882 })
1883 }).collect::<Vec<_>>()
1884 });
1885
1886 Ok(state)
1887 }
1888
1889 pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
1891 let projections = self.projections.read();
1892
1893 if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
1894 && let Some(state) = snapshot_projection.get_state(entity_id)
1895 {
1896 return Ok(serde_json::json!({
1897 "entity_id": entity_id,
1898 "snapshot": state,
1899 "from_projection": "entity_snapshots"
1900 }));
1901 }
1902
1903 Err(AllSourceError::EntityNotFound(entity_id.to_string()))
1904 }
1905
1906 pub fn stats(&self) -> StoreStats {
1908 let events = self.events.read();
1909 let index_stats = self.index.stats();
1910
1911 StoreStats {
1912 total_events: events.len(),
1913 total_entities: index_stats.total_entities,
1914 total_event_types: index_stats.total_event_types,
1915 total_ingested: *self.total_ingested.read(),
1916 }
1917 }
1918
1919 pub fn list_streams(&self) -> Vec<StreamInfo> {
1921 self.index
1922 .get_all_entities()
1923 .into_iter()
1924 .map(|entity_id| {
1925 let event_count = self
1926 .index
1927 .get_by_entity(&entity_id)
1928 .map_or(0, |entries| entries.len());
1929 let last_event_at = self
1930 .index
1931 .get_by_entity(&entity_id)
1932 .and_then(|entries| entries.last().map(|e| e.timestamp));
1933 StreamInfo {
1934 stream_id: entity_id,
1935 event_count,
1936 last_event_at,
1937 }
1938 })
1939 .collect()
1940 }
1941
1942 pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
1944 self.index
1945 .get_all_types()
1946 .into_iter()
1947 .map(|event_type| {
1948 let event_count = self
1949 .index
1950 .get_by_type(&event_type)
1951 .map_or(0, |entries| entries.len());
1952 let last_event_at = self
1953 .index
1954 .get_by_type(&event_type)
1955 .and_then(|entries| entries.last().map(|e| e.timestamp));
1956 EventTypeInfo {
1957 event_type,
1958 event_count,
1959 last_event_at,
1960 }
1961 })
1962 .collect()
1963 }
1964
1965 pub fn enable_wal_replication(
1972 &self,
1973 tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
1974 ) {
1975 if let Some(ref wal_arc) = self.wal {
1976 wal_arc.set_replication_tx(tx);
1977 tracing::info!("WAL replication broadcast enabled");
1978 } else {
1979 tracing::warn!("Cannot enable WAL replication: WAL is not configured");
1980 }
1981 }
1982
1983 pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1986 self.wal.as_ref()
1987 }
1988
1989 pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1992 self.storage.as_ref()
1993 }
1994}
1995
1996#[derive(Debug, Clone, Default)]
1998pub struct EventStoreConfig {
1999 pub storage_dir: Option<PathBuf>,
2001
2002 pub snapshot_config: SnapshotConfig,
2004
2005 pub wal_dir: Option<PathBuf>,
2007
2008 pub wal_config: WALConfig,
2010
2011 pub compaction_config: CompactionConfig,
2013
2014 pub schema_registry_config: SchemaRegistryConfig,
2016
2017 pub system_data_dir: Option<PathBuf>,
2022
2023 pub bootstrap_tenant: Option<String>,
2025
2026 pub cache_byte_budget: Option<u64>,
2033
2034 pub checkpoint_interval_secs: Option<u64>,
2046}
2047
2048impl EventStoreConfig {
2049 pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
2051 Self {
2052 storage_dir: Some(storage_dir.into()),
2053 ..Self::default()
2054 }
2055 }
2056
2057 pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
2059 Self {
2060 snapshot_config,
2061 ..Self::default()
2062 }
2063 }
2064
2065 pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
2067 Self {
2068 wal_dir: Some(wal_dir.into()),
2069 wal_config,
2070 ..Self::default()
2071 }
2072 }
2073
2074 pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
2076 Self {
2077 storage_dir: Some(storage_dir.into()),
2078 snapshot_config,
2079 ..Self::default()
2080 }
2081 }
2082
2083 pub fn production(
2085 storage_dir: impl Into<PathBuf>,
2086 wal_dir: impl Into<PathBuf>,
2087 snapshot_config: SnapshotConfig,
2088 wal_config: WALConfig,
2089 compaction_config: CompactionConfig,
2090 ) -> Self {
2091 let storage_dir = storage_dir.into();
2092 let system_data_dir = storage_dir.join("__system");
2093 Self {
2094 storage_dir: Some(storage_dir),
2095 snapshot_config,
2096 wal_dir: Some(wal_dir.into()),
2097 wal_config,
2098 compaction_config,
2099 system_data_dir: Some(system_data_dir),
2100 ..Self::default()
2101 }
2102 }
2103
2104 pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
2109 self.system_data_dir
2110 .clone()
2111 .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
2112 }
2113
2114 pub fn from_env() -> (Self, &'static str) {
2122 Self::from_env_vars(
2123 std::env::var("ALLSOURCE_DATA_DIR")
2124 .ok()
2125 .filter(|s| !s.is_empty()),
2126 std::env::var("ALLSOURCE_STORAGE_DIR")
2127 .ok()
2128 .filter(|s| !s.is_empty()),
2129 std::env::var("ALLSOURCE_WAL_DIR")
2130 .ok()
2131 .filter(|s| !s.is_empty()),
2132 std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
2133 std::env::var("ALLSOURCE_CACHE_BYTES").ok(),
2134 std::env::var("ALLSOURCE_SNAPSHOT_INTERVAL_SECONDS").ok(),
2135 std::env::var("ALLSOURCE_RETENTION_SYSTEM_DAYS").ok(),
2136 std::env::var("ALLSOURCE_CHECKPOINT_INTERVAL_SECONDS").ok(),
2137 )
2138 }
2139
2140 pub fn from_env_vars(
2142 data_dir: Option<String>,
2143 explicit_storage_dir: Option<String>,
2144 explicit_wal_dir: Option<String>,
2145 wal_enabled_var: Option<String>,
2146 cache_bytes_var: Option<String>,
2147 snapshot_interval_var: Option<String>,
2148 retention_system_days_var: Option<String>,
2149 checkpoint_interval_var: Option<String>,
2150 ) -> (Self, &'static str) {
2151 let data_dir = data_dir.filter(|s| !s.is_empty());
2152 let storage_dir = explicit_storage_dir
2153 .filter(|s| !s.is_empty())
2154 .or_else(|| data_dir.as_ref().map(|d| format!("{d}/storage")));
2155 let wal_dir = explicit_wal_dir
2156 .filter(|s| !s.is_empty())
2157 .or_else(|| data_dir.as_ref().map(|d| format!("{d}/wal")));
2158 let wal_enabled = wal_enabled_var.is_none_or(|v| v == "true");
2159 let cache_byte_budget =
2164 cache_bytes_var
2165 .filter(|s| !s.is_empty())
2166 .and_then(|s| match s.parse::<u64>() {
2167 Ok(v) => Some(v),
2168 Err(e) => {
2169 tracing::warn!(
2170 "ALLSOURCE_CACHE_BYTES={s:?} could not be parsed as u64: {e}; \
2171 cache budget disabled"
2172 );
2173 None
2174 }
2175 });
2176 let compaction_config =
2177 CompactionConfig::from_env_vars(snapshot_interval_var, retention_system_days_var);
2178
2179 let checkpoint_interval_secs = if wal_enabled {
2184 checkpoint_interval_var
2185 .filter(|s| !s.is_empty())
2186 .map(|s| match s.parse::<u64>() {
2187 Ok(v) => v,
2188 Err(e) => {
2189 tracing::warn!(
2190 "ALLSOURCE_CHECKPOINT_INTERVAL_SECONDS={s:?} could not be parsed as \
2191 u64: {e}; falling back to default 60s"
2192 );
2193 60
2194 }
2195 })
2196 .or(Some(60))
2197 } else {
2198 None
2199 };
2200
2201 let mut config = match (&storage_dir, &wal_dir) {
2202 (Some(sd), Some(wd)) if wal_enabled => Self::production(
2203 sd,
2204 wd,
2205 SnapshotConfig::default(),
2206 WALConfig::default(),
2207 compaction_config,
2208 ),
2209 (Some(sd), _) => Self::with_persistence(sd),
2210 (_, Some(wd)) if wal_enabled => Self::with_wal(wd, WALConfig::default()),
2211 _ => Self::default(),
2212 };
2213 config.cache_byte_budget = cache_byte_budget;
2214 config.checkpoint_interval_secs = checkpoint_interval_secs;
2215
2216 let mode = match (&storage_dir, &wal_dir) {
2217 (Some(_), Some(_)) if wal_enabled => "wal+parquet",
2218 (Some(_), _) => "parquet-only",
2219 (_, Some(_)) if wal_enabled => "wal-only",
2220 _ => "in-memory",
2221 };
2222 (config, mode)
2223 }
2224}
2225
2226#[derive(Debug, serde::Serialize)]
2227pub struct StoreStats {
2228 pub total_events: usize,
2229 pub total_entities: usize,
2230 pub total_event_types: usize,
2231 pub total_ingested: u64,
2232}
2233
2234#[derive(Debug, Clone, serde::Serialize)]
2236pub struct StreamInfo {
2237 pub stream_id: String,
2239 pub event_count: usize,
2241 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
2243}
2244
2245#[derive(Debug, Clone, serde::Serialize)]
2247pub struct EventTypeInfo {
2248 pub event_type: String,
2250 pub event_count: usize,
2252 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
2254}
2255
2256impl Default for EventStore {
2257 fn default() -> Self {
2258 Self::new()
2259 }
2260}
2261
2262#[cfg(test)]
2263mod tests {
2264 use super::*;
2265 use crate::domain::entities::Event;
2266 use tempfile::TempDir;
2267
2268 fn find_parquet_files(dir: &std::path::Path) -> Vec<std::path::PathBuf> {
2273 let mut out = Vec::new();
2274 let mut stack = vec![dir.to_path_buf()];
2275 while let Some(d) = stack.pop() {
2276 let Ok(entries) = std::fs::read_dir(&d) else {
2277 continue;
2278 };
2279 for e in entries.flatten() {
2280 let p = e.path();
2281 if p.is_dir() {
2282 stack.push(p);
2283 } else if p.extension().and_then(|s| s.to_str()) == Some("parquet") {
2284 out.push(p);
2285 }
2286 }
2287 }
2288 out
2289 }
2290
2291 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2292 Event::from_strings(
2293 event_type.to_string(),
2294 entity_id.to_string(),
2295 "default".to_string(),
2296 serde_json::json!({"name": "Test", "value": 42}),
2297 None,
2298 )
2299 .unwrap()
2300 }
2301
2302 fn create_test_event_with_payload(
2303 entity_id: &str,
2304 event_type: &str,
2305 payload: serde_json::Value,
2306 ) -> Event {
2307 Event::from_strings(
2308 event_type.to_string(),
2309 entity_id.to_string(),
2310 "default".to_string(),
2311 payload,
2312 None,
2313 )
2314 .unwrap()
2315 }
2316
2317 #[test]
2318 fn test_event_store_new() {
2319 let store = EventStore::new();
2320 assert_eq!(store.stats().total_events, 0);
2321 assert_eq!(store.stats().total_entities, 0);
2322 }
2323
2324 #[test]
2331 fn test_ensure_tenant_loaded_no_storage_is_a_noop() {
2332 let store = EventStore::new();
2336 assert!(!store.is_tenant_loaded("alice"));
2337 store.ensure_tenant_loaded("alice").unwrap();
2338 assert!(store.is_tenant_loaded("alice"));
2339 assert!(!store.is_tenant_loaded("bob"));
2341 }
2342
2343 #[test]
2344 fn test_ensure_tenant_loaded_warm_path_is_idempotent() {
2345 let store = EventStore::new();
2346 store.ensure_tenant_loaded("alice").unwrap();
2347 store.ensure_tenant_loaded("alice").unwrap();
2349 }
2350
2351 #[test]
2352 fn test_ensure_tenant_loaded_rejects_unsafe_tenant_id() {
2353 let temp_dir = TempDir::new().unwrap();
2359 let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
2360 for unsafe_tid in ["..", "a/b", "a\\b", ""] {
2361 let result = store.ensure_tenant_loaded(unsafe_tid);
2362 assert!(
2363 result.is_err(),
2364 "tenant_id {unsafe_tid:?} should have been rejected"
2365 );
2366 assert!(
2367 !store.is_tenant_loaded(unsafe_tid),
2368 "rejected tenant {unsafe_tid:?} must not be marked loaded"
2369 );
2370 }
2371 }
2372
2373 #[test]
2374 fn test_ensure_tenant_loaded_no_subtree_marks_loaded_with_zero_events() {
2375 let temp_dir = TempDir::new().unwrap();
2380 let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
2381 assert!(!store.is_tenant_loaded("never-existed"));
2382 store.ensure_tenant_loaded("never-existed").unwrap();
2383 assert!(store.is_tenant_loaded("never-existed"));
2384 }
2385
2386 #[test]
2387 fn test_evict_tenant_drops_events_and_resets_bytes() {
2388 let temp_dir = TempDir::new().unwrap();
2392 let storage_dir = temp_dir.path().to_path_buf();
2393
2394 {
2395 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2396 for i in 0..3 {
2397 store
2398 .ingest(
2399 &Event::from_strings(
2400 "test.event".to_string(),
2401 format!("a-{i}"),
2402 "alice".to_string(),
2403 serde_json::json!({"i": i}),
2404 None,
2405 )
2406 .unwrap(),
2407 )
2408 .unwrap();
2409 }
2410 for i in 0..2 {
2411 store
2412 .ingest(
2413 &Event::from_strings(
2414 "test.event".to_string(),
2415 format!("b-{i}"),
2416 "bob".to_string(),
2417 serde_json::json!({"i": i}),
2418 None,
2419 )
2420 .unwrap(),
2421 )
2422 .unwrap();
2423 }
2424 store.flush_storage().unwrap();
2425 }
2426
2427 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2428 store.ensure_tenant_loaded("alice").unwrap();
2429 store.ensure_tenant_loaded("bob").unwrap();
2430 assert_eq!(store.stats().total_events, 5);
2431 let alice_bytes = store.tenant_resident_bytes("alice");
2432 let bob_bytes = store.tenant_resident_bytes("bob");
2433 assert!(alice_bytes > 0 && bob_bytes > 0);
2434
2435 store.evict_tenant("alice");
2436
2437 assert!(!store.is_tenant_loaded("alice"));
2438 assert!(store.is_tenant_loaded("bob"));
2439 assert_eq!(store.tenant_resident_bytes("alice"), 0);
2440 assert_eq!(store.tenant_resident_bytes("bob"), bob_bytes);
2441 assert_eq!(store.stats().total_events, 2, "only bob's 2 events remain");
2442 }
2443
2444 #[test]
2445 fn test_evict_tenant_then_query_re_loads_from_disk() {
2446 let temp_dir = TempDir::new().unwrap();
2450 let storage_dir = temp_dir.path().to_path_buf();
2451
2452 {
2453 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2454 for i in 0..4 {
2455 store
2456 .ingest(
2457 &Event::from_strings(
2458 "test.event".to_string(),
2459 format!("a-{i}"),
2460 "alice".to_string(),
2461 serde_json::json!({"i": i}),
2462 None,
2463 )
2464 .unwrap(),
2465 )
2466 .unwrap();
2467 }
2468 store.flush_storage().unwrap();
2469 }
2470
2471 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2472 store.ensure_tenant_loaded("alice").unwrap();
2473 store.evict_tenant("alice");
2474 assert_eq!(store.stats().total_events, 0);
2475
2476 let results = store
2478 .query(&QueryEventsRequest {
2479 entity_id: None,
2480 event_type: None,
2481 tenant_id: Some("alice".to_string()),
2482 as_of: None,
2483 since: None,
2484 until: None,
2485 limit: None,
2486 event_type_prefix: None,
2487 payload_filter: None,
2488 })
2489 .unwrap();
2490 assert_eq!(results.len(), 4);
2491 assert!(store.is_tenant_loaded("alice"));
2492 }
2493
2494 #[test]
2495 fn test_evict_tenant_rebuilds_index_with_new_offsets() {
2496 let temp_dir = TempDir::new().unwrap();
2503 let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
2504
2505 for i in 0..3 {
2509 store
2510 .ingest(
2511 &Event::from_strings(
2512 "test.event".to_string(),
2513 format!("a-{i}"),
2514 "alice".to_string(),
2515 serde_json::json!({"i": i}),
2516 None,
2517 )
2518 .unwrap(),
2519 )
2520 .unwrap();
2521 if i < 2 {
2522 store
2523 .ingest(
2524 &Event::from_strings(
2525 "test.event".to_string(),
2526 format!("b-{i}"),
2527 "bob".to_string(),
2528 serde_json::json!({"i": i}),
2529 None,
2530 )
2531 .unwrap(),
2532 )
2533 .unwrap();
2534 }
2535 }
2536 store.tenant_loader.mark_loaded("alice");
2538 store.tenant_loader.mark_loaded("bob");
2539
2540 store.evict_tenant("alice");
2541
2542 let bob_results = store
2543 .query(&QueryEventsRequest {
2544 entity_id: None,
2545 event_type: None,
2546 tenant_id: Some("bob".to_string()),
2547 as_of: None,
2548 since: None,
2549 until: None,
2550 limit: None,
2551 event_type_prefix: None,
2552 payload_filter: None,
2553 })
2554 .unwrap();
2555 assert_eq!(bob_results.len(), 2);
2556 for e in &bob_results {
2557 assert_eq!(e.tenant_id_str(), "bob");
2558 }
2559 }
2560
2561 #[test]
2562 fn test_budget_eviction_keeps_resident_set_bounded() {
2563 let temp_dir = TempDir::new().unwrap();
2567 let storage_dir = temp_dir.path().to_path_buf();
2568
2569 let big_payload = serde_json::json!({"data": "x".repeat(1000)});
2572 {
2573 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2574 for tenant in ["alice", "bob", "carol"] {
2575 for i in 0..5 {
2576 store
2577 .ingest(
2578 &Event::from_strings(
2579 "test.event".to_string(),
2580 format!("{tenant}-{i}"),
2581 tenant.to_string(),
2582 big_payload.clone(),
2583 None,
2584 )
2585 .unwrap(),
2586 )
2587 .unwrap();
2588 }
2589 }
2590 store.flush_storage().unwrap();
2591 }
2592
2593 let mut config = EventStoreConfig::with_persistence(&storage_dir);
2596 config.cache_byte_budget = Some(12_000);
2597 let store = EventStore::with_config(config);
2598
2599 store.ensure_tenant_loaded("alice").unwrap();
2601 assert!(store.is_tenant_loaded("alice"));
2602
2603 store.tenant_loader.touch("alice");
2608 std::thread::sleep(std::time::Duration::from_millis(10));
2609 store.ensure_tenant_loaded("bob").unwrap();
2610 assert!(store.is_tenant_loaded("bob"));
2611
2612 store.tenant_loader.touch("bob");
2616 std::thread::sleep(std::time::Duration::from_millis(10));
2617 store.ensure_tenant_loaded("carol").unwrap();
2618 assert!(store.is_tenant_loaded("carol"));
2619
2620 let resident = store.cache_resident_bytes();
2625 let budget = 12_000u64;
2626
2627 if resident > budget {
2630 let loaded_count = ["alice", "bob", "carol"]
2631 .iter()
2632 .filter(|t| store.is_tenant_loaded(t))
2633 .count();
2634 assert_eq!(
2635 loaded_count, 1,
2636 "over budget but more than one tenant loaded — eviction policy didn't fire"
2637 );
2638 }
2639
2640 assert!(store.is_tenant_loaded("carol"));
2643 }
2644
2645 #[test]
2646 fn test_query_after_eviction_re_loads_transparently() {
2647 let temp_dir = TempDir::new().unwrap();
2650 let storage_dir = temp_dir.path().to_path_buf();
2651
2652 let big_payload = serde_json::json!({"data": "x".repeat(2000)});
2653 {
2654 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2655 for tenant in ["alice", "bob"] {
2656 for i in 0..3 {
2657 store
2658 .ingest(
2659 &Event::from_strings(
2660 "test.event".to_string(),
2661 format!("{tenant}-{i}"),
2662 tenant.to_string(),
2663 big_payload.clone(),
2664 None,
2665 )
2666 .unwrap(),
2667 )
2668 .unwrap();
2669 }
2670 }
2671 store.flush_storage().unwrap();
2672 }
2673
2674 let mut config = EventStoreConfig::with_persistence(&storage_dir);
2676 config.cache_byte_budget = Some(5_000);
2677 let store = EventStore::with_config(config);
2678
2679 let alice_first = store
2683 .query(&QueryEventsRequest {
2684 entity_id: None,
2685 event_type: None,
2686 tenant_id: Some("alice".to_string()),
2687 as_of: None,
2688 since: None,
2689 until: None,
2690 limit: None,
2691 event_type_prefix: None,
2692 payload_filter: None,
2693 })
2694 .unwrap();
2695 assert_eq!(alice_first.len(), 3);
2696
2697 std::thread::sleep(std::time::Duration::from_millis(15));
2699 let _bob = store
2701 .query(&QueryEventsRequest {
2702 entity_id: None,
2703 event_type: None,
2704 tenant_id: Some("bob".to_string()),
2705 as_of: None,
2706 since: None,
2707 until: None,
2708 limit: None,
2709 event_type_prefix: None,
2710 payload_filter: None,
2711 })
2712 .unwrap();
2713 assert!(
2714 !store.is_tenant_loaded("alice"),
2715 "alice should have been evicted"
2716 );
2717
2718 let alice_second = store
2720 .query(&QueryEventsRequest {
2721 entity_id: None,
2722 event_type: None,
2723 tenant_id: Some("alice".to_string()),
2724 as_of: None,
2725 since: None,
2726 until: None,
2727 limit: None,
2728 event_type_prefix: None,
2729 payload_filter: None,
2730 })
2731 .unwrap();
2732 assert_eq!(
2733 alice_second.len(),
2734 3,
2735 "alice's events come back via re-load"
2736 );
2737 assert!(store.is_tenant_loaded("alice"));
2738 }
2739
2740 #[test]
2741 #[cfg(feature = "server")]
2742 fn test_cache_metrics_track_evictions_and_bytes() {
2743 let temp_dir = TempDir::new().unwrap();
2747 let storage_dir = temp_dir.path().to_path_buf();
2748
2749 let big_payload = serde_json::json!({"data": "x".repeat(2000)});
2750 {
2751 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2752 for tenant in ["alice", "bob"] {
2753 for i in 0..3 {
2754 store
2755 .ingest(
2756 &Event::from_strings(
2757 "test.event".to_string(),
2758 format!("{tenant}-{i}"),
2759 tenant.to_string(),
2760 big_payload.clone(),
2761 None,
2762 )
2763 .unwrap(),
2764 )
2765 .unwrap();
2766 }
2767 }
2768 store.flush_storage().unwrap();
2769 }
2770
2771 let mut config = EventStoreConfig::with_persistence(&storage_dir);
2772 config.cache_byte_budget = Some(5_000); let store = EventStore::with_config(config);
2774
2775 assert_eq!(store.metrics.cache_evictions_total.get(), 0);
2776 assert_eq!(store.metrics.cache_bytes.get(), 0);
2777
2778 store.ensure_tenant_loaded("alice").unwrap();
2779 let after_alice = store.metrics.cache_bytes.get();
2781 assert!(after_alice > 0, "gauge should reflect alice's bytes");
2782 assert_eq!(store.metrics.cache_evictions_total.get(), 0);
2784
2785 std::thread::sleep(std::time::Duration::from_millis(10));
2786 store.ensure_tenant_loaded("bob").unwrap();
2787
2788 assert_eq!(
2791 store.metrics.cache_evictions_total.get(),
2792 1,
2793 "exactly one tenant evicted after bob's load"
2794 );
2795 let after_bob = store.metrics.cache_bytes.get();
2797 assert!(after_bob > 0);
2798 assert!(after_bob <= after_alice, "gauge dropped after eviction");
2799 }
2800
2801 #[test]
2802 fn test_stress_resident_set_stays_near_budget_under_rolling_queries() {
2803 let temp_dir = TempDir::new().unwrap();
2810 let storage_dir = temp_dir.path().to_path_buf();
2811
2812 const TENANT_COUNT: usize = 10;
2813 const EVENTS_PER_TENANT: usize = 50;
2814 let big_payload = serde_json::json!({"data": "x".repeat(10_000)});
2816
2817 {
2820 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2821 for t in 0..TENANT_COUNT {
2822 let tenant = format!("tenant-{t}");
2823 for i in 0..EVENTS_PER_TENANT {
2824 store
2825 .ingest(
2826 &Event::from_strings(
2827 "test.event".to_string(),
2828 format!("{tenant}-{i}"),
2829 tenant.clone(),
2830 big_payload.clone(),
2831 None,
2832 )
2833 .unwrap(),
2834 )
2835 .unwrap();
2836 }
2837 }
2838 store.flush_storage().unwrap();
2839 }
2840
2841 const BUDGET: u64 = 1_048_576;
2845 let mut config = EventStoreConfig::with_persistence(&storage_dir);
2846 config.cache_byte_budget = Some(BUDGET);
2847 let store = EventStore::with_config(config);
2848
2849 let mut peak_resident: u64 = 0;
2853 for t in 0..TENANT_COUNT {
2854 let tenant = format!("tenant-{t}");
2855 let results = store
2856 .query(&QueryEventsRequest {
2857 entity_id: None,
2858 event_type: None,
2859 tenant_id: Some(tenant.clone()),
2860 as_of: None,
2861 since: None,
2862 until: None,
2863 limit: None,
2864 event_type_prefix: None,
2865 payload_filter: None,
2866 })
2867 .unwrap();
2868 assert_eq!(
2869 results.len(),
2870 EVENTS_PER_TENANT,
2871 "every per-tenant query must return all of that tenant's events"
2872 );
2873 let resident = store.cache_resident_bytes();
2875 if resident > peak_resident {
2876 peak_resident = resident;
2877 }
2878 }
2879
2880 let final_resident = store.cache_resident_bytes();
2881
2882 let tolerance = BUDGET; assert!(
2888 peak_resident <= BUDGET + tolerance,
2889 "peak resident {peak_resident} exceeds budget {BUDGET} by more than {tolerance} \
2890 — eviction policy not keeping up with the working-set churn"
2891 );
2892 assert!(
2893 final_resident <= BUDGET + tolerance,
2894 "final resident {final_resident} exceeds budget {BUDGET} by more than {tolerance}"
2895 );
2896
2897 let last_tenant = format!("tenant-{}", TENANT_COUNT - 1);
2900 assert!(
2901 store.is_tenant_loaded(&last_tenant),
2902 "the most-recent tenant must remain loaded after the sweep"
2903 );
2904
2905 let still_loaded = (0..TENANT_COUNT)
2908 .filter(|t| store.is_tenant_loaded(&format!("tenant-{t}")))
2909 .count();
2910 assert!(
2911 still_loaded < TENANT_COUNT,
2912 "no tenants evicted ({still_loaded}/{TENANT_COUNT} still loaded) — \
2913 budget enforcement didn't engage"
2914 );
2915 }
2916
2917 #[test]
2918 fn test_evict_tenant_when_not_loaded_is_a_noop() {
2919 let store = EventStore::new();
2922 store.evict_tenant("nobody"); assert!(!store.is_tenant_loaded("nobody"));
2924 }
2925
2926 #[test]
2927 fn test_lazy_load_accounts_bytes_per_tenant() {
2928 let temp_dir = TempDir::new().unwrap();
2932 let storage_dir = temp_dir.path().to_path_buf();
2933
2934 {
2936 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2937 for i in 0..5 {
2938 store
2939 .ingest(
2940 &Event::from_strings(
2941 "test.event".to_string(),
2942 format!("a-{i}"),
2943 "alice".to_string(),
2944 serde_json::json!({"data": "x".repeat(1000)}),
2945 None,
2946 )
2947 .unwrap(),
2948 )
2949 .unwrap();
2950 }
2951 store.flush_storage().unwrap();
2952 }
2953
2954 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2955 assert_eq!(store.tenant_resident_bytes("alice"), 0);
2957 assert_eq!(store.cache_resident_bytes(), 0);
2958
2959 store.ensure_tenant_loaded("alice").unwrap();
2960
2961 let alice_bytes = store.tenant_resident_bytes("alice");
2964 assert!(
2965 alice_bytes >= 5 * 1000,
2966 "alice should have at least 5 KiB resident; got {alice_bytes}"
2967 );
2968 assert_eq!(store.tenant_resident_bytes("bob"), 0);
2970 assert_eq!(store.cache_resident_bytes(), alice_bytes);
2972 }
2973
2974 #[test]
2975 fn test_query_lazy_loads_tenant_on_first_call() {
2976 let temp_dir = TempDir::new().unwrap();
2980 let storage_dir = temp_dir.path().to_path_buf();
2981
2982 {
2984 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2985 for i in 0..3 {
2986 let event = Event::from_strings(
2987 "test.event".to_string(),
2988 format!("e-{i}"),
2989 "alice".to_string(),
2990 serde_json::json!({"i": i}),
2991 None,
2992 )
2993 .unwrap();
2994 store.ingest(&event).unwrap();
2995 }
2996 store.flush_storage().unwrap();
2997 }
2998
2999 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3001 assert_eq!(
3002 store.stats().total_events,
3003 0,
3004 "boot must be O(1) — no Parquet pre-load"
3005 );
3006 assert!(!store.is_tenant_loaded("alice"));
3007 assert!(!store.is_tenant_loaded("bob"));
3008
3009 let results = store
3011 .query(&QueryEventsRequest {
3012 entity_id: None,
3013 event_type: None,
3014 tenant_id: Some("alice".to_string()),
3015 as_of: None,
3016 since: None,
3017 until: None,
3018 limit: None,
3019 event_type_prefix: None,
3020 payload_filter: None,
3021 })
3022 .unwrap();
3023 assert_eq!(results.len(), 3, "alice's 3 events are returned");
3024 assert!(store.is_tenant_loaded("alice"), "alice now warm");
3025 assert!(!store.is_tenant_loaded("bob"), "bob still cold");
3028 }
3029
3030 #[test]
3031 fn test_query_invalid_tenant_id_returns_error_no_hang() {
3032 let temp_dir = TempDir::new().unwrap();
3036 let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
3037
3038 let result = store.query(&QueryEventsRequest {
3039 entity_id: None,
3040 event_type: None,
3041 tenant_id: Some("../etc".to_string()),
3042 as_of: None,
3043 since: None,
3044 until: None,
3045 limit: None,
3046 event_type_prefix: None,
3047 payload_filter: None,
3048 });
3049 assert!(result.is_err(), "unsafe tenant_id must surface as error");
3050 }
3051
3052 #[test]
3053 fn test_query_concurrent_first_queries_for_same_tenant_all_succeed() {
3054 let temp_dir = TempDir::new().unwrap();
3062 let storage_dir = temp_dir.path().to_path_buf();
3063
3064 {
3066 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3067 for i in 0..25 {
3068 let event = Event::from_strings(
3069 "test.event".to_string(),
3070 format!("e-{i}"),
3071 "alice".to_string(),
3072 serde_json::json!({"i": i}),
3073 None,
3074 )
3075 .unwrap();
3076 store.ingest(&event).unwrap();
3077 }
3078 store.flush_storage().unwrap();
3079 }
3080
3081 let store = Arc::new(EventStore::with_config(EventStoreConfig::with_persistence(
3083 &storage_dir,
3084 )));
3085 assert!(!store.is_tenant_loaded("alice"));
3086
3087 let mut handles = Vec::new();
3088 for _ in 0..8 {
3089 let s = store.clone();
3090 handles.push(std::thread::spawn(move || {
3091 s.query(&QueryEventsRequest {
3092 entity_id: None,
3093 event_type: None,
3094 tenant_id: Some("alice".to_string()),
3095 as_of: None,
3096 since: None,
3097 until: None,
3098 limit: None,
3099 event_type_prefix: None,
3100 payload_filter: None,
3101 })
3102 }));
3103 }
3104
3105 for h in handles {
3106 let result = h.join().unwrap().unwrap();
3107 assert_eq!(
3108 result.len(),
3109 25,
3110 "every concurrent caller must see all 25 events"
3111 );
3112 }
3113 assert!(store.is_tenant_loaded("alice"));
3114 assert_eq!(store.stats().total_events, 25);
3116 }
3117
3118 #[test]
3119 fn test_query_two_cold_tenants_load_independently() {
3120 let temp_dir = TempDir::new().unwrap();
3124 let storage_dir = temp_dir.path().to_path_buf();
3125
3126 {
3127 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3128 for i in 0..3 {
3129 store
3130 .ingest(
3131 &Event::from_strings(
3132 "test.event".to_string(),
3133 format!("a-{i}"),
3134 "alice".to_string(),
3135 serde_json::json!({"i": i}),
3136 None,
3137 )
3138 .unwrap(),
3139 )
3140 .unwrap();
3141 }
3142 for i in 0..5 {
3143 store
3144 .ingest(
3145 &Event::from_strings(
3146 "test.event".to_string(),
3147 format!("b-{i}"),
3148 "bob".to_string(),
3149 serde_json::json!({"i": i}),
3150 None,
3151 )
3152 .unwrap(),
3153 )
3154 .unwrap();
3155 }
3156 store.flush_storage().unwrap();
3157 }
3158
3159 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3160 assert_eq!(store.stats().total_events, 0);
3161
3162 let alice = store
3164 .query(&QueryEventsRequest {
3165 entity_id: None,
3166 event_type: None,
3167 tenant_id: Some("alice".to_string()),
3168 as_of: None,
3169 since: None,
3170 until: None,
3171 limit: None,
3172 event_type_prefix: None,
3173 payload_filter: None,
3174 })
3175 .unwrap();
3176 assert_eq!(alice.len(), 3);
3177 assert!(store.is_tenant_loaded("alice"));
3178 assert!(!store.is_tenant_loaded("bob"));
3179 assert_eq!(store.stats().total_events, 3);
3180
3181 let bob = store
3183 .query(&QueryEventsRequest {
3184 entity_id: None,
3185 event_type: None,
3186 tenant_id: Some("bob".to_string()),
3187 as_of: None,
3188 since: None,
3189 until: None,
3190 limit: None,
3191 event_type_prefix: None,
3192 payload_filter: None,
3193 })
3194 .unwrap();
3195 assert_eq!(bob.len(), 5);
3196 assert!(store.is_tenant_loaded("bob"));
3197 assert_eq!(store.stats().total_events, 8);
3198 }
3199
3200 #[test]
3201 fn test_boot_with_persisted_data_is_o1() {
3202 let temp_dir = TempDir::new().unwrap();
3215 let storage_dir = temp_dir.path().to_path_buf();
3216
3217 {
3218 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3219 for tenant in ["alice", "bob", "carol"] {
3220 for i in 0..50 / 3 {
3221 store
3222 .ingest(
3223 &Event::from_strings(
3224 "test.event".to_string(),
3225 format!("{tenant}-{i}"),
3226 tenant.to_string(),
3227 serde_json::json!({"i": i}),
3228 None,
3229 )
3230 .unwrap(),
3231 )
3232 .unwrap();
3233 }
3234 }
3235 store.flush_storage().unwrap();
3236 }
3237
3238 let on_disk = find_parquet_files(&storage_dir);
3240 assert!(
3241 !on_disk.is_empty(),
3242 "session 1 should have produced parquet files; pre-condition for the test"
3243 );
3244
3245 let started = std::time::Instant::now();
3246 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3247 let boot_elapsed = started.elapsed();
3248
3249 assert_eq!(
3250 store.stats().total_events,
3251 0,
3252 "boot must not pre-load any Parquet events"
3253 );
3254
3255 assert!(
3259 boot_elapsed < std::time::Duration::from_secs(2),
3260 "boot took {boot_elapsed:?} — Step 2 boot should be O(1)"
3261 );
3262 }
3263
3264 #[test]
3265 fn test_query_warm_tenant_does_not_re_read_disk() {
3266 let temp_dir = TempDir::new().unwrap();
3272 let storage_dir = temp_dir.path().to_path_buf();
3273
3274 let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3275 for i in 0..3 {
3276 let event = Event::from_strings(
3277 "test.event".to_string(),
3278 format!("e-{i}"),
3279 "alice".to_string(),
3280 serde_json::json!({"i": i}),
3281 None,
3282 )
3283 .unwrap();
3284 store.ingest(&event).unwrap();
3285 }
3286 store.flush_storage().unwrap();
3287
3288 let _ = store
3290 .query(&QueryEventsRequest {
3291 entity_id: None,
3292 event_type: None,
3293 tenant_id: Some("alice".to_string()),
3294 as_of: None,
3295 since: None,
3296 until: None,
3297 limit: None,
3298 event_type_prefix: None,
3299 payload_filter: None,
3300 })
3301 .unwrap();
3302 assert!(store.is_tenant_loaded("alice"));
3303
3304 let parquet_files = find_parquet_files(&storage_dir);
3307 for f in parquet_files {
3308 std::fs::remove_file(&f).unwrap();
3309 }
3310
3311 let results = store
3312 .query(&QueryEventsRequest {
3313 entity_id: None,
3314 event_type: None,
3315 tenant_id: Some("alice".to_string()),
3316 as_of: None,
3317 since: None,
3318 until: None,
3319 limit: None,
3320 event_type_prefix: None,
3321 payload_filter: None,
3322 })
3323 .unwrap();
3324 assert_eq!(
3325 results.len(),
3326 3,
3327 "warm tenant query must not need disk; got {} events from a deleted parquet",
3328 results.len()
3329 );
3330 }
3331
3332 #[test]
3333 fn test_event_store_default() {
3334 let store = EventStore::default();
3335 assert_eq!(store.stats().total_events, 0);
3336 }
3337
3338 #[test]
3339 fn test_ingest_single_event() {
3340 let store = EventStore::new();
3341 let event = create_test_event("entity-1", "user.created");
3342
3343 store.ingest(&event).unwrap();
3344
3345 assert_eq!(store.stats().total_events, 1);
3346 assert_eq!(store.stats().total_ingested, 1);
3347 }
3348
3349 #[test]
3350 fn test_ingest_multiple_events() {
3351 let store = EventStore::new();
3352
3353 for i in 0..10 {
3354 let event = create_test_event(&format!("entity-{i}"), "user.created");
3355 store.ingest(&event).unwrap();
3356 }
3357
3358 assert_eq!(store.stats().total_events, 10);
3359 assert_eq!(store.stats().total_ingested, 10);
3360 }
3361
3362 #[test]
3363 fn test_query_by_entity_id() {
3364 let store = EventStore::new();
3365
3366 store
3367 .ingest(&create_test_event("entity-1", "user.created"))
3368 .unwrap();
3369 store
3370 .ingest(&create_test_event("entity-2", "user.created"))
3371 .unwrap();
3372 store
3373 .ingest(&create_test_event("entity-1", "user.updated"))
3374 .unwrap();
3375
3376 let results = store
3377 .query(&QueryEventsRequest {
3378 entity_id: Some("entity-1".to_string()),
3379 event_type: None,
3380 tenant_id: None,
3381 as_of: None,
3382 since: None,
3383 until: None,
3384 limit: None,
3385 event_type_prefix: None,
3386 payload_filter: None,
3387 })
3388 .unwrap();
3389
3390 assert_eq!(results.len(), 2);
3391 }
3392
3393 #[test]
3394 fn test_query_by_event_type() {
3395 let store = EventStore::new();
3396
3397 store
3398 .ingest(&create_test_event("entity-1", "user.created"))
3399 .unwrap();
3400 store
3401 .ingest(&create_test_event("entity-2", "user.updated"))
3402 .unwrap();
3403 store
3404 .ingest(&create_test_event("entity-3", "user.created"))
3405 .unwrap();
3406
3407 let results = store
3408 .query(&QueryEventsRequest {
3409 entity_id: None,
3410 event_type: Some("user.created".to_string()),
3411 tenant_id: None,
3412 as_of: None,
3413 since: None,
3414 until: None,
3415 limit: None,
3416 event_type_prefix: None,
3417 payload_filter: None,
3418 })
3419 .unwrap();
3420
3421 assert_eq!(results.len(), 2);
3422 }
3423
3424 #[test]
3425 fn test_query_with_limit() {
3426 let store = EventStore::new();
3427
3428 for i in 0..10 {
3429 let event = create_test_event(&format!("entity-{i}"), "user.created");
3430 store.ingest(&event).unwrap();
3431 }
3432
3433 let results = store
3434 .query(&QueryEventsRequest {
3435 entity_id: None,
3436 event_type: None,
3437 tenant_id: None,
3438 as_of: None,
3439 since: None,
3440 until: None,
3441 limit: Some(5),
3442 event_type_prefix: None,
3443 payload_filter: None,
3444 })
3445 .unwrap();
3446
3447 assert_eq!(results.len(), 5);
3448 }
3449
3450 #[test]
3451 fn test_query_empty_store() {
3452 let store = EventStore::new();
3453
3454 let results = store
3455 .query(&QueryEventsRequest {
3456 entity_id: Some("non-existent".to_string()),
3457 event_type: None,
3458 tenant_id: None,
3459 as_of: None,
3460 since: None,
3461 until: None,
3462 limit: None,
3463 event_type_prefix: None,
3464 payload_filter: None,
3465 })
3466 .unwrap();
3467
3468 assert!(results.is_empty());
3469 }
3470
3471 #[test]
3472 fn test_reconstruct_state() {
3473 let store = EventStore::new();
3474
3475 store
3476 .ingest(&create_test_event("entity-1", "user.created"))
3477 .unwrap();
3478
3479 let state = store.reconstruct_state("entity-1", None).unwrap();
3480 assert_eq!(state["current_state"]["name"], "Test");
3482 assert_eq!(state["current_state"]["value"], 42);
3483 }
3484
3485 #[test]
3486 fn test_reconstruct_state_not_found() {
3487 let store = EventStore::new();
3488
3489 let result = store.reconstruct_state("non-existent", None);
3490 assert!(result.is_err());
3491 }
3492
3493 #[test]
3494 fn test_get_snapshot_empty() {
3495 let store = EventStore::new();
3496
3497 let result = store.get_snapshot("non-existent");
3498 assert!(result.is_err());
3500 }
3501
3502 #[test]
3503 fn test_create_snapshot() {
3504 let store = EventStore::new();
3505
3506 store
3507 .ingest(&create_test_event("entity-1", "user.created"))
3508 .unwrap();
3509
3510 store.create_snapshot("entity-1").unwrap();
3511
3512 let snapshot = store.get_snapshot("entity-1").unwrap();
3514 assert!(snapshot != serde_json::json!(null));
3515 }
3516
3517 #[test]
3518 fn test_create_snapshot_entity_not_found() {
3519 let store = EventStore::new();
3520
3521 let result = store.create_snapshot("non-existent");
3522 assert!(result.is_err());
3523 }
3524
3525 #[test]
3526 fn test_websocket_manager() {
3527 let store = EventStore::new();
3528 let manager = store.websocket_manager();
3529 assert!(Arc::strong_count(&manager) >= 1);
3531 }
3532
3533 #[test]
3534 fn test_snapshot_manager() {
3535 let store = EventStore::new();
3536 let manager = store.snapshot_manager();
3537 assert!(Arc::strong_count(&manager) >= 1);
3538 }
3539
3540 #[test]
3541 fn test_compaction_manager_none() {
3542 let store = EventStore::new();
3543 assert!(store.compaction_manager().is_none());
3545 }
3546
3547 #[test]
3548 fn test_schema_registry() {
3549 let store = EventStore::new();
3550 let registry = store.schema_registry();
3551 assert!(Arc::strong_count(®istry) >= 1);
3552 }
3553
3554 #[test]
3555 fn test_replay_manager() {
3556 let store = EventStore::new();
3557 let manager = store.replay_manager();
3558 assert!(Arc::strong_count(&manager) >= 1);
3559 }
3560
3561 #[test]
3562 fn test_pipeline_manager() {
3563 let store = EventStore::new();
3564 let manager = store.pipeline_manager();
3565 assert!(Arc::strong_count(&manager) >= 1);
3566 }
3567
3568 #[test]
3569 fn test_projection_manager() {
3570 let store = EventStore::new();
3571 let manager = store.projection_manager();
3572 let projections = manager.list_projections();
3574 assert!(projections.len() >= 2); }
3576
3577 #[test]
3578 fn test_projection_state_cache() {
3579 let store = EventStore::new();
3580 let cache = store.projection_state_cache();
3581
3582 cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
3583 assert_eq!(cache.len(), 1);
3584
3585 let value = cache.get("test:key").unwrap();
3586 assert_eq!(value["value"], 123);
3587 }
3588
3589 #[test]
3590 fn test_metrics() {
3591 let store = EventStore::new();
3592 let metrics = store.metrics();
3593 assert!(Arc::strong_count(&metrics) >= 1);
3594 }
3595
3596 #[test]
3597 fn test_store_stats() {
3598 let store = EventStore::new();
3599
3600 store
3601 .ingest(&create_test_event("entity-1", "user.created"))
3602 .unwrap();
3603 store
3604 .ingest(&create_test_event("entity-2", "order.placed"))
3605 .unwrap();
3606
3607 let stats = store.stats();
3608 assert_eq!(stats.total_events, 2);
3609 assert_eq!(stats.total_entities, 2);
3610 assert_eq!(stats.total_event_types, 2);
3611 assert_eq!(stats.total_ingested, 2);
3612 }
3613
3614 #[test]
3615 fn test_event_store_config_default() {
3616 let config = EventStoreConfig::default();
3617 assert!(config.storage_dir.is_none());
3618 assert!(config.wal_dir.is_none());
3619 }
3620
3621 #[test]
3622 fn test_event_store_config_with_persistence() {
3623 let temp_dir = TempDir::new().unwrap();
3624 let config = EventStoreConfig::with_persistence(temp_dir.path());
3625
3626 assert!(config.storage_dir.is_some());
3627 assert!(config.wal_dir.is_none());
3628 }
3629
3630 #[test]
3631 fn test_event_store_config_with_wal() {
3632 let temp_dir = TempDir::new().unwrap();
3633 let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
3634
3635 assert!(config.storage_dir.is_none());
3636 assert!(config.wal_dir.is_some());
3637 }
3638
3639 #[test]
3640 fn test_event_store_config_with_all() {
3641 let temp_dir = TempDir::new().unwrap();
3642 let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
3643
3644 assert!(config.storage_dir.is_some());
3645 }
3646
3647 #[test]
3648 fn test_event_store_config_production() {
3649 let storage_dir = TempDir::new().unwrap();
3650 let wal_dir = TempDir::new().unwrap();
3651 let config = EventStoreConfig::production(
3652 storage_dir.path(),
3653 wal_dir.path(),
3654 SnapshotConfig::default(),
3655 WALConfig::default(),
3656 CompactionConfig::default(),
3657 );
3658
3659 assert!(config.storage_dir.is_some());
3660 assert!(config.wal_dir.is_some());
3661 }
3662
3663 #[test]
3669 fn test_from_env_vars_data_dir_enables_full_persistence() {
3670 let (config, mode) = EventStoreConfig::from_env_vars(
3671 Some("/app/data".to_string()),
3672 None,
3673 None,
3674 None,
3675 None,
3676 None,
3677 None,
3678 None,
3679 );
3680 assert_eq!(mode, "wal+parquet");
3681 assert_eq!(
3682 config.storage_dir.unwrap().to_str().unwrap(),
3683 "/app/data/storage"
3684 );
3685 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
3686 }
3687
3688 #[test]
3689 fn test_from_env_vars_explicit_dirs() {
3690 let (config, mode) = EventStoreConfig::from_env_vars(
3691 None,
3692 Some("/custom/storage".to_string()),
3693 Some("/custom/wal".to_string()),
3694 None,
3695 None,
3696 None,
3697 None,
3698 None,
3699 );
3700 assert_eq!(mode, "wal+parquet");
3701 assert_eq!(
3702 config.storage_dir.unwrap().to_str().unwrap(),
3703 "/custom/storage"
3704 );
3705 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
3706 }
3707
3708 #[test]
3709 fn test_from_env_vars_wal_disabled() {
3710 let (config, mode) = EventStoreConfig::from_env_vars(
3711 Some("/app/data".to_string()),
3712 None,
3713 None,
3714 Some("false".to_string()),
3715 None,
3716 None,
3717 None,
3718 None,
3719 );
3720 assert_eq!(mode, "parquet-only");
3721 assert!(config.storage_dir.is_some());
3722 assert!(config.wal_dir.is_none());
3723 }
3724
3725 #[test]
3726 fn test_from_env_vars_no_dirs_is_in_memory() {
3727 let (config, mode) =
3728 EventStoreConfig::from_env_vars(None, None, None, None, None, None, None, None);
3729 assert_eq!(mode, "in-memory");
3730 assert!(config.storage_dir.is_none());
3731 assert!(config.wal_dir.is_none());
3732 }
3733
3734 #[test]
3735 fn test_from_env_vars_empty_strings_treated_as_none() {
3736 let (_, mode) = EventStoreConfig::from_env_vars(
3737 Some(String::new()),
3738 Some(String::new()),
3739 Some(String::new()),
3740 None,
3741 None,
3742 None,
3743 None,
3744 None,
3745 );
3746 assert_eq!(mode, "in-memory");
3747 }
3748
3749 #[test]
3750 fn test_from_env_vars_explicit_overrides_data_dir() {
3751 let (config, mode) = EventStoreConfig::from_env_vars(
3752 Some("/app/data".to_string()),
3753 Some("/override/storage".to_string()),
3754 Some("/override/wal".to_string()),
3755 None,
3756 None,
3757 None,
3758 None,
3759 None,
3760 );
3761 assert_eq!(mode, "wal+parquet");
3762 assert_eq!(
3763 config.storage_dir.unwrap().to_str().unwrap(),
3764 "/override/storage"
3765 );
3766 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
3767 }
3768
3769 #[test]
3770 fn test_from_env_vars_wal_only() {
3771 let (config, mode) = EventStoreConfig::from_env_vars(
3772 None,
3773 None,
3774 Some("/wal/only".to_string()),
3775 None,
3776 None,
3777 None,
3778 None,
3779 None,
3780 );
3781 assert_eq!(mode, "wal-only");
3782 assert!(config.storage_dir.is_none());
3783 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
3784 }
3785
3786 #[test]
3787 fn test_from_env_vars_cache_bytes_parses_decimal() {
3788 let (config, _) = EventStoreConfig::from_env_vars(
3789 Some("/app/data".to_string()),
3790 None,
3791 None,
3792 None,
3793 Some("536870912".to_string()),
3794 None,
3796 None,
3797 None,
3798 );
3799 assert_eq!(config.cache_byte_budget, Some(536_870_912));
3800 }
3801
3802 #[test]
3803 fn test_from_env_vars_cache_bytes_unparseable_disables_budget() {
3804 let (config, _) = EventStoreConfig::from_env_vars(
3808 Some("/app/data".to_string()),
3809 None,
3810 None,
3811 None,
3812 Some("not-a-number".to_string()),
3813 None,
3814 None,
3815 None,
3816 );
3817 assert_eq!(config.cache_byte_budget, None);
3818 }
3819
3820 #[test]
3821 fn test_from_env_vars_cache_bytes_empty_disables_budget() {
3822 let (config, _) = EventStoreConfig::from_env_vars(
3823 Some("/app/data".to_string()),
3824 None,
3825 None,
3826 None,
3827 Some(String::new()),
3828 None,
3829 None,
3830 None,
3831 );
3832 assert_eq!(config.cache_byte_budget, None);
3833 }
3834
3835 #[test]
3836 fn test_from_env_vars_snapshot_interval_overrides_default() {
3837 let (config, _) = EventStoreConfig::from_env_vars(
3841 Some("/app/data".to_string()),
3842 None,
3843 None,
3844 None,
3845 None,
3846 Some("60".to_string()),
3847 None,
3848 None,
3849 );
3850 assert_eq!(config.compaction_config.compaction_interval_seconds, 60);
3851 }
3852
3853 #[test]
3854 fn test_from_env_vars_snapshot_interval_default_is_hourly() {
3855 let (config, _) = EventStoreConfig::from_env_vars(
3856 Some("/app/data".to_string()),
3857 None,
3858 None,
3859 None,
3860 None,
3861 None,
3862 None,
3863 None,
3864 );
3865 assert_eq!(config.compaction_config.compaction_interval_seconds, 3600);
3866 }
3867
3868 #[test]
3869 fn test_from_env_vars_snapshot_interval_unparseable_falls_back() {
3870 let (config, _) = EventStoreConfig::from_env_vars(
3871 Some("/app/data".to_string()),
3872 None,
3873 None,
3874 None,
3875 None,
3876 Some("not-a-number".to_string()),
3877 None,
3878 None,
3879 );
3880 assert_eq!(config.compaction_config.compaction_interval_seconds, 3600);
3881 }
3882
3883 #[test]
3884 fn test_from_env_vars_retention_system_days_overrides_default() {
3885 let (config, _) = EventStoreConfig::from_env_vars(
3888 Some("/app/data".to_string()),
3889 None,
3890 None,
3891 None,
3892 None,
3893 None,
3894 Some("7".to_string()),
3895 None,
3896 );
3897 let ttl = config
3898 .compaction_config
3899 .retention
3900 .ttl_for("system")
3901 .unwrap();
3902 assert_eq!(ttl.as_secs(), 7 * 24 * 3600);
3903 }
3904
3905 #[test]
3906 fn test_from_env_vars_retention_default_is_30_days_for_system() {
3907 let (config, _) = EventStoreConfig::from_env_vars(
3908 Some("/app/data".to_string()),
3909 None,
3910 None,
3911 None,
3912 None,
3913 None,
3914 None,
3915 None,
3916 );
3917 let ttl = config
3918 .compaction_config
3919 .retention
3920 .ttl_for("system")
3921 .unwrap();
3922 assert_eq!(ttl.as_secs(), 30 * 24 * 3600);
3923 assert!(config.compaction_config.retention.ttl_for("acme").is_none());
3925 }
3926
3927 #[test]
3928 fn test_store_stats_serde() {
3929 let stats = StoreStats {
3930 total_events: 100,
3931 total_entities: 50,
3932 total_event_types: 10,
3933 total_ingested: 100,
3934 };
3935
3936 let json = serde_json::to_string(&stats).unwrap();
3937 assert!(json.contains("\"total_events\":100"));
3938 assert!(json.contains("\"total_entities\":50"));
3939 }
3940
3941 #[test]
3942 fn test_query_with_entity_and_type() {
3943 let store = EventStore::new();
3944
3945 store
3946 .ingest(&create_test_event("entity-1", "user.created"))
3947 .unwrap();
3948 store
3949 .ingest(&create_test_event("entity-1", "user.updated"))
3950 .unwrap();
3951 store
3952 .ingest(&create_test_event("entity-2", "user.created"))
3953 .unwrap();
3954
3955 let results = store
3956 .query(&QueryEventsRequest {
3957 entity_id: Some("entity-1".to_string()),
3958 event_type: Some("user.created".to_string()),
3959 tenant_id: None,
3960 as_of: None,
3961 since: None,
3962 until: None,
3963 limit: None,
3964 event_type_prefix: None,
3965 payload_filter: None,
3966 })
3967 .unwrap();
3968
3969 assert_eq!(results.len(), 1);
3970 assert_eq!(results[0].event_type_str(), "user.created");
3971 }
3972
3973 #[test]
3974 fn test_query_by_event_type_prefix() {
3975 let store = EventStore::new();
3976
3977 store
3979 .ingest(&create_test_event("entity-1", "index.created"))
3980 .unwrap();
3981 store
3982 .ingest(&create_test_event("entity-2", "index.updated"))
3983 .unwrap();
3984 store
3985 .ingest(&create_test_event("entity-3", "trade.created"))
3986 .unwrap();
3987 store
3988 .ingest(&create_test_event("entity-4", "trade.completed"))
3989 .unwrap();
3990 store
3991 .ingest(&create_test_event("entity-5", "balance.updated"))
3992 .unwrap();
3993
3994 let results = store
3996 .query(&QueryEventsRequest {
3997 entity_id: None,
3998 event_type: None,
3999 tenant_id: None,
4000 as_of: None,
4001 since: None,
4002 until: None,
4003 limit: None,
4004 event_type_prefix: Some("index.".to_string()),
4005 payload_filter: None,
4006 })
4007 .unwrap();
4008
4009 assert_eq!(results.len(), 2);
4010 assert!(
4011 results
4012 .iter()
4013 .all(|e| e.event_type_str().starts_with("index."))
4014 );
4015 }
4016
4017 #[test]
4018 fn test_query_by_event_type_prefix_empty_returns_all() {
4019 let store = EventStore::new();
4020
4021 store
4022 .ingest(&create_test_event("entity-1", "index.created"))
4023 .unwrap();
4024 store
4025 .ingest(&create_test_event("entity-2", "trade.created"))
4026 .unwrap();
4027
4028 let results = store
4030 .query(&QueryEventsRequest {
4031 entity_id: None,
4032 event_type: None,
4033 tenant_id: None,
4034 as_of: None,
4035 since: None,
4036 until: None,
4037 limit: None,
4038 event_type_prefix: Some(String::new()),
4039 payload_filter: None,
4040 })
4041 .unwrap();
4042
4043 assert_eq!(results.len(), 2);
4044 }
4045
4046 #[test]
4047 fn test_query_by_event_type_prefix_no_match() {
4048 let store = EventStore::new();
4049
4050 store
4051 .ingest(&create_test_event("entity-1", "index.created"))
4052 .unwrap();
4053
4054 let results = store
4055 .query(&QueryEventsRequest {
4056 entity_id: None,
4057 event_type: None,
4058 tenant_id: None,
4059 as_of: None,
4060 since: None,
4061 until: None,
4062 limit: None,
4063 event_type_prefix: Some("nonexistent.".to_string()),
4064 payload_filter: None,
4065 })
4066 .unwrap();
4067
4068 assert!(results.is_empty());
4069 }
4070
4071 #[test]
4072 fn test_query_by_entity_with_type_prefix() {
4073 let store = EventStore::new();
4074
4075 store
4076 .ingest(&create_test_event("entity-1", "index.created"))
4077 .unwrap();
4078 store
4079 .ingest(&create_test_event("entity-1", "trade.created"))
4080 .unwrap();
4081 store
4082 .ingest(&create_test_event("entity-2", "index.updated"))
4083 .unwrap();
4084
4085 let results = store
4087 .query(&QueryEventsRequest {
4088 entity_id: Some("entity-1".to_string()),
4089 event_type: None,
4090 tenant_id: None,
4091 as_of: None,
4092 since: None,
4093 until: None,
4094 limit: None,
4095 event_type_prefix: Some("index.".to_string()),
4096 payload_filter: None,
4097 })
4098 .unwrap();
4099
4100 assert_eq!(results.len(), 1);
4101 assert_eq!(results[0].event_type_str(), "index.created");
4102 }
4103
4104 #[test]
4105 fn test_query_prefix_with_limit() {
4106 let store = EventStore::new();
4107
4108 for i in 0..5 {
4109 store
4110 .ingest(&create_test_event(&format!("entity-{i}"), "index.created"))
4111 .unwrap();
4112 }
4113
4114 let results = store
4115 .query(&QueryEventsRequest {
4116 entity_id: None,
4117 event_type: None,
4118 tenant_id: None,
4119 as_of: None,
4120 since: None,
4121 until: None,
4122 limit: Some(3),
4123 event_type_prefix: Some("index.".to_string()),
4124 payload_filter: None,
4125 })
4126 .unwrap();
4127
4128 assert_eq!(results.len(), 3);
4129 }
4130
4131 #[test]
4132 fn test_query_prefix_alongside_existing_filters() {
4133 let store = EventStore::new();
4134
4135 store
4136 .ingest(&create_test_event("entity-1", "index.created"))
4137 .unwrap();
4138 std::thread::sleep(std::time::Duration::from_millis(10));
4140 store
4141 .ingest(&create_test_event("entity-2", "index.strategy.updated"))
4142 .unwrap();
4143 std::thread::sleep(std::time::Duration::from_millis(10));
4144 store
4145 .ingest(&create_test_event("entity-3", "index.deleted"))
4146 .unwrap();
4147
4148 let results = store
4150 .query(&QueryEventsRequest {
4151 entity_id: None,
4152 event_type: None,
4153 tenant_id: None,
4154 as_of: None,
4155 since: None,
4156 until: None,
4157 limit: Some(2),
4158 event_type_prefix: Some("index.".to_string()),
4159 payload_filter: None,
4160 })
4161 .unwrap();
4162
4163 assert_eq!(results.len(), 2);
4164 }
4165
4166 #[test]
4167 fn test_query_with_payload_filter() {
4168 let store = EventStore::new();
4169
4170 for i in 0..5 {
4172 store
4173 .ingest(&create_test_event_with_payload(
4174 &format!("entity-{i}"),
4175 "user.action",
4176 serde_json::json!({"user_id": "alice", "action": "click"}),
4177 ))
4178 .unwrap();
4179 }
4180 for i in 5..10 {
4182 store
4183 .ingest(&create_test_event_with_payload(
4184 &format!("entity-{i}"),
4185 "user.action",
4186 serde_json::json!({"user_id": "bob", "action": "view"}),
4187 ))
4188 .unwrap();
4189 }
4190
4191 let results = store
4193 .query(&QueryEventsRequest {
4194 entity_id: None,
4195 event_type: Some("user.action".to_string()),
4196 tenant_id: None,
4197 as_of: None,
4198 since: None,
4199 until: None,
4200 limit: None,
4201 event_type_prefix: None,
4202 payload_filter: Some(r#"{"user_id":"alice"}"#.to_string()),
4203 })
4204 .unwrap();
4205
4206 assert_eq!(results.len(), 5);
4207 }
4208
4209 #[test]
4210 fn test_query_payload_filter_non_existent_field() {
4211 let store = EventStore::new();
4212
4213 store
4214 .ingest(&create_test_event_with_payload(
4215 "entity-1",
4216 "user.action",
4217 serde_json::json!({"user_id": "alice"}),
4218 ))
4219 .unwrap();
4220
4221 let results = store
4223 .query(&QueryEventsRequest {
4224 entity_id: None,
4225 event_type: None,
4226 tenant_id: None,
4227 as_of: None,
4228 since: None,
4229 until: None,
4230 limit: None,
4231 event_type_prefix: None,
4232 payload_filter: Some(r#"{"nonexistent":"value"}"#.to_string()),
4233 })
4234 .unwrap();
4235
4236 assert!(results.is_empty());
4237 }
4238
4239 #[test]
4240 fn test_query_payload_filter_with_prefix() {
4241 let store = EventStore::new();
4242
4243 store
4244 .ingest(&create_test_event_with_payload(
4245 "entity-1",
4246 "index.created",
4247 serde_json::json!({"status": "active"}),
4248 ))
4249 .unwrap();
4250 store
4251 .ingest(&create_test_event_with_payload(
4252 "entity-2",
4253 "index.created",
4254 serde_json::json!({"status": "inactive"}),
4255 ))
4256 .unwrap();
4257 store
4258 .ingest(&create_test_event_with_payload(
4259 "entity-3",
4260 "trade.created",
4261 serde_json::json!({"status": "active"}),
4262 ))
4263 .unwrap();
4264
4265 let results = store
4267 .query(&QueryEventsRequest {
4268 entity_id: None,
4269 event_type: None,
4270 tenant_id: None,
4271 as_of: None,
4272 since: None,
4273 until: None,
4274 limit: None,
4275 event_type_prefix: Some("index.".to_string()),
4276 payload_filter: Some(r#"{"status":"active"}"#.to_string()),
4277 })
4278 .unwrap();
4279
4280 assert_eq!(results.len(), 1);
4281 assert_eq!(results[0].entity_id().to_string(), "entity-1");
4282 }
4283
4284 #[test]
4285 fn test_flush_storage_no_storage() {
4286 let store = EventStore::new();
4287 let result = store.flush_storage();
4289 assert!(result.is_ok());
4290 }
4291
4292 #[test]
4293 fn test_state_evolution() {
4294 let store = EventStore::new();
4295
4296 store
4298 .ingest(
4299 &Event::from_strings(
4300 "user.created".to_string(),
4301 "user-1".to_string(),
4302 "default".to_string(),
4303 serde_json::json!({"name": "Alice", "age": 25}),
4304 None,
4305 )
4306 .unwrap(),
4307 )
4308 .unwrap();
4309
4310 store
4312 .ingest(
4313 &Event::from_strings(
4314 "user.updated".to_string(),
4315 "user-1".to_string(),
4316 "default".to_string(),
4317 serde_json::json!({"age": 26}),
4318 None,
4319 )
4320 .unwrap(),
4321 )
4322 .unwrap();
4323
4324 let state = store.reconstruct_state("user-1", None).unwrap();
4325 assert_eq!(state["current_state"]["name"], "Alice");
4327 assert_eq!(state["current_state"]["age"], 26);
4328 }
4329
4330 #[test]
4331 fn test_reject_system_event_types() {
4332 let store = EventStore::new();
4333
4334 let event = Event::reconstruct_from_strings(
4336 uuid::Uuid::new_v4(),
4337 "_system.tenant.created".to_string(),
4338 "_system:tenant:acme".to_string(),
4339 "_system".to_string(),
4340 serde_json::json!({"name": "ACME"}),
4341 chrono::Utc::now(),
4342 None,
4343 1,
4344 );
4345
4346 let result = store.ingest(&event);
4347 assert!(result.is_err());
4348 let err = result.unwrap_err();
4349 assert!(
4350 err.to_string().contains("reserved for internal use"),
4351 "Expected system namespace rejection, got: {err}"
4352 );
4353 }
4354
4355 #[test]
4363 fn test_wal_recovery_checkpoints_to_parquet() {
4364 let data_dir = TempDir::new().unwrap();
4365 let storage_dir = data_dir.path().join("storage");
4366 let wal_dir = data_dir.path().join("wal");
4367
4368 {
4370 let config = EventStoreConfig::production(
4371 &storage_dir,
4372 &wal_dir,
4373 SnapshotConfig::default(),
4374 WALConfig {
4375 sync_on_write: true,
4376 ..WALConfig::default()
4377 },
4378 CompactionConfig::default(),
4379 );
4380 let store = EventStore::with_config(config);
4381
4382 for i in 0..5 {
4383 let event = Event::from_strings(
4384 "test.created".to_string(),
4385 format!("entity-{i}"),
4386 "default".to_string(),
4387 serde_json::json!({"index": i}),
4388 None,
4389 )
4390 .unwrap();
4391 store.ingest(&event).unwrap();
4392 }
4393
4394 assert_eq!(store.stats().total_events, 5);
4395
4396 }
4399
4400 let wal_files: Vec<_> = std::fs::read_dir(&wal_dir)
4402 .unwrap()
4403 .filter_map(std::result::Result::ok)
4404 .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
4405 .collect();
4406 assert!(!wal_files.is_empty(), "WAL file should exist");
4407 let wal_size = wal_files[0].metadata().unwrap().len();
4408 assert!(wal_size > 0, "WAL file should have data (got 0 bytes)");
4409
4410 {
4412 let config = EventStoreConfig::production(
4413 &storage_dir,
4414 &wal_dir,
4415 SnapshotConfig::default(),
4416 WALConfig {
4417 sync_on_write: true,
4418 ..WALConfig::default()
4419 },
4420 CompactionConfig::default(),
4421 );
4422 let store = EventStore::with_config(config);
4423
4424 assert_eq!(
4426 store.stats().total_events,
4427 5,
4428 "Session 2 should have all 5 events after WAL recovery"
4429 );
4430
4431 let parquet_files = find_parquet_files(&storage_dir);
4435 assert!(
4436 !parquet_files.is_empty(),
4437 "Parquet file should exist after WAL checkpoint"
4438 );
4439 }
4440
4441 {
4444 let config = EventStoreConfig::production(
4445 &storage_dir,
4446 &wal_dir,
4447 SnapshotConfig::default(),
4448 WALConfig {
4449 sync_on_write: true,
4450 ..WALConfig::default()
4451 },
4452 CompactionConfig::default(),
4453 );
4454 let store = EventStore::with_config(config);
4455
4456 assert_eq!(
4460 store.stats().total_events,
4461 0,
4462 "Session 3 boot should not pre-load Parquet (lazy-load mode)"
4463 );
4464
4465 store.ensure_tenant_loaded("default").unwrap();
4468 assert_eq!(
4469 store.stats().total_events,
4470 5,
4471 "Session 3 should have all 5 events after ensure_tenant_loaded"
4472 );
4473 }
4474 }
4475
4476 #[test]
4477 fn test_parquet_restore_surfaces_errors_not_silent() {
4478 let data_dir = TempDir::new().unwrap();
4482 let storage_dir = data_dir.path().join("storage");
4483 let wal_dir = data_dir.path().join("wal");
4484
4485 {
4487 let config = EventStoreConfig::production(
4488 &storage_dir,
4489 &wal_dir,
4490 SnapshotConfig::default(),
4491 WALConfig {
4492 sync_on_write: true,
4493 ..WALConfig::default()
4494 },
4495 CompactionConfig::default(),
4496 );
4497 let store = EventStore::with_config(config);
4498
4499 for i in 0..3 {
4500 let event = Event::from_strings(
4501 "test.created".to_string(),
4502 format!("entity-{i}"),
4503 "default".to_string(),
4504 serde_json::json!({"i": i}),
4505 None,
4506 )
4507 .unwrap();
4508 store.ingest(&event).unwrap();
4509 }
4510
4511 store.flush_storage().unwrap();
4512 assert_eq!(store.stats().total_events, 3);
4513 }
4514
4515 let parquet_files = find_parquet_files(&storage_dir);
4518 assert!(!parquet_files.is_empty(), "Parquet file must exist");
4519
4520 std::fs::write(&parquet_files[0], b"corrupted data").unwrap();
4522
4523 for entry in std::fs::read_dir(&wal_dir).unwrap().flatten() {
4525 std::fs::write(entry.path(), b"").unwrap();
4526 }
4527
4528 {
4535 let config = EventStoreConfig::production(
4536 &storage_dir,
4537 &wal_dir,
4538 SnapshotConfig::default(),
4539 WALConfig::default(),
4540 CompactionConfig::default(),
4541 );
4542 let store = EventStore::with_config(config);
4543
4544 assert_eq!(store.stats().total_events, 0);
4547 }
4548 }
4549
4550 fn count_wal_entries(wal_dir: &std::path::Path) -> usize {
4560 use std::io::{BufRead, BufReader};
4561 let mut total = 0usize;
4562 let Ok(entries) = std::fs::read_dir(wal_dir) else {
4563 return 0;
4564 };
4565 for entry in entries.flatten() {
4566 let path = entry.path();
4567 if path.extension().is_none_or(|e| e != "log") {
4568 continue;
4569 }
4570 let Ok(file) = std::fs::File::open(&path) else {
4571 continue;
4572 };
4573 for line in BufReader::new(file)
4574 .lines()
4575 .map_while(std::result::Result::ok)
4576 {
4577 if !line.trim().is_empty() {
4578 total += 1;
4579 }
4580 }
4581 }
4582 total
4583 }
4584
4585 #[test]
4586 fn test_checkpoint_truncates_wal_after_flush() {
4587 let data_dir = TempDir::new().unwrap();
4592 let storage_dir = data_dir.path().join("storage");
4593 let wal_dir = data_dir.path().join("wal");
4594
4595 let config = EventStoreConfig::production(
4596 &storage_dir,
4597 &wal_dir,
4598 SnapshotConfig::default(),
4599 WALConfig {
4600 sync_on_write: true,
4601 ..WALConfig::default()
4602 },
4603 CompactionConfig::default(),
4604 );
4605 let store = EventStore::with_config(config);
4606
4607 for i in 0..10 {
4608 let event = Event::from_strings(
4609 "test.created".to_string(),
4610 format!("entity-{i}"),
4611 "default".to_string(),
4612 serde_json::json!({"i": i}),
4613 None,
4614 )
4615 .unwrap();
4616 store.ingest(&event).unwrap();
4617 }
4618
4619 assert_eq!(
4621 count_wal_entries(&wal_dir),
4622 10,
4623 "WAL should have 10 events before checkpoint"
4624 );
4625
4626 store.checkpoint().unwrap();
4627
4628 assert_eq!(
4629 count_wal_entries(&wal_dir),
4630 0,
4631 "WAL should be empty after successful checkpoint"
4632 );
4633 let parquet_files = find_parquet_files(&storage_dir);
4634 assert!(!parquet_files.is_empty(), "Parquet should hold the events");
4635 }
4636
4637 #[test]
4638 fn test_replay_only_post_checkpoint_events_after_crash() {
4639 let data_dir = TempDir::new().unwrap();
4646 let storage_dir = data_dir.path().join("storage");
4647 let wal_dir = data_dir.path().join("wal");
4648
4649 let config_factory = || {
4650 EventStoreConfig::production(
4651 &storage_dir,
4652 &wal_dir,
4653 SnapshotConfig::default(),
4654 WALConfig {
4655 sync_on_write: true,
4656 ..WALConfig::default()
4657 },
4658 CompactionConfig::default(),
4659 )
4660 };
4661
4662 const N: usize = 50;
4665 const K: usize = 5;
4666 {
4667 let store = EventStore::with_config(config_factory());
4668 for i in 0..N {
4669 store
4670 .ingest(
4671 &Event::from_strings(
4672 "pre.checkpoint".to_string(),
4673 format!("e-{i}"),
4674 "default".to_string(),
4675 serde_json::json!({"i": i}),
4676 None,
4677 )
4678 .unwrap(),
4679 )
4680 .unwrap();
4681 }
4682 store.checkpoint().unwrap();
4683 assert_eq!(
4684 count_wal_entries(&wal_dir),
4685 0,
4686 "WAL should be empty immediately after checkpoint"
4687 );
4688
4689 for i in 0..K {
4690 store
4691 .ingest(
4692 &Event::from_strings(
4693 "post.checkpoint".to_string(),
4694 format!("p-{i}"),
4695 "default".to_string(),
4696 serde_json::json!({"i": i}),
4697 None,
4698 )
4699 .unwrap(),
4700 )
4701 .unwrap();
4702 }
4703 assert_eq!(
4704 count_wal_entries(&wal_dir),
4705 K,
4706 "WAL should hold only post-checkpoint events"
4707 );
4708 }
4710
4711 {
4715 let store = EventStore::with_config(config_factory());
4716 assert_eq!(
4720 store.stats().total_events,
4721 K,
4722 "Boot should replay exactly K events from WAL (the post-checkpoint window), not N+K"
4723 );
4724
4725 store.ensure_tenant_loaded("default").unwrap();
4727 assert_eq!(
4728 store.stats().total_events,
4729 N + K,
4730 "After lazy-load, both pre- and post-checkpoint events should be reachable"
4731 );
4732 }
4733 }
4734
4735 #[test]
4736 fn test_checkpoint_is_idempotent() {
4737 let data_dir = TempDir::new().unwrap();
4740 let storage_dir = data_dir.path().join("storage");
4741 let wal_dir = data_dir.path().join("wal");
4742
4743 let store = EventStore::with_config(EventStoreConfig::production(
4744 &storage_dir,
4745 &wal_dir,
4746 SnapshotConfig::default(),
4747 WALConfig::default(),
4748 CompactionConfig::default(),
4749 ));
4750
4751 for i in 0..5 {
4752 store
4753 .ingest(
4754 &Event::from_strings(
4755 "x".to_string(),
4756 format!("e-{i}"),
4757 "default".to_string(),
4758 serde_json::json!({}),
4759 None,
4760 )
4761 .unwrap(),
4762 )
4763 .unwrap();
4764 }
4765
4766 store.checkpoint().unwrap();
4767 store.checkpoint().unwrap();
4769 assert_eq!(count_wal_entries(&wal_dir), 0);
4770 }
4771
4772 #[test]
4773 fn test_checkpoint_noop_in_memory_only_mode() {
4774 let store = EventStore::new();
4776 store.checkpoint().unwrap();
4777 }
4778
4779 #[test]
4780 fn test_checkpoint_interval_from_env_defaults_to_60s_when_wal_enabled() {
4781 let (config, _) = EventStoreConfig::from_env_vars(
4782 Some("/app/data".to_string()),
4783 None,
4784 None,
4785 None,
4786 None,
4787 None,
4788 None,
4789 None,
4790 );
4791 assert_eq!(config.checkpoint_interval_secs, Some(60));
4792 }
4793
4794 #[test]
4795 fn test_checkpoint_interval_from_env_overrides_default() {
4796 let (config, _) = EventStoreConfig::from_env_vars(
4797 Some("/app/data".to_string()),
4798 None,
4799 None,
4800 None,
4801 None,
4802 None,
4803 None,
4804 Some("15".to_string()),
4805 );
4806 assert_eq!(config.checkpoint_interval_secs, Some(15));
4807 }
4808
4809 #[test]
4810 fn test_checkpoint_interval_disabled_when_wal_disabled() {
4811 let (config, _) = EventStoreConfig::from_env_vars(
4813 Some("/app/data".to_string()),
4814 None,
4815 None,
4816 Some("false".to_string()),
4817 None,
4818 None,
4819 None,
4820 Some("15".to_string()),
4821 );
4822 assert_eq!(config.checkpoint_interval_secs, None);
4823 }
4824
4825 #[test]
4826 fn test_checkpoint_interval_unparseable_falls_back_to_default() {
4827 let (config, _) = EventStoreConfig::from_env_vars(
4828 Some("/app/data".to_string()),
4829 None,
4830 None,
4831 None,
4832 None,
4833 None,
4834 None,
4835 Some("not-a-number".to_string()),
4836 );
4837 assert_eq!(config.checkpoint_interval_secs, Some(60));
4838 }
4839}