1#[cfg(feature = "server")]
2use crate::application::services::webhook::WebhookRegistry;
3#[cfg(feature = "server")]
4use crate::infrastructure::observability::metrics::MetricsRegistry;
5#[cfg(feature = "server")]
6use crate::infrastructure::web::websocket::WebSocketManager;
7use crate::{
8 application::{
9 dto::QueryEventsRequest,
10 services::{
11 consumer::ConsumerRegistry,
12 exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
13 pipeline::PipelineManager,
14 projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
15 replay::ReplayManager,
16 schema::{SchemaRegistry, SchemaRegistryConfig},
17 schema_evolution::SchemaEvolutionManager,
18 },
19 },
20 domain::entities::Event,
21 error::{AllSourceError, Result},
22 infrastructure::{
23 persistence::{
24 compaction::{CompactionConfig, CompactionManager},
25 index::{EventIndex, IndexEntry},
26 snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
27 storage::ParquetStorage,
28 wal::{WALConfig, WriteAheadLog},
29 },
30 query::geospatial::GeoIndex,
31 },
32};
33use chrono::{DateTime, Utc};
34use dashmap::DashMap;
35use parking_lot::RwLock;
36use std::{path::PathBuf, sync::Arc};
37#[cfg(feature = "server")]
38use tokio::sync::mpsc;
39
40pub struct EventStore {
42 events: Arc<RwLock<Vec<Event>>>,
44
45 index: Arc<EventIndex>,
47
48 pub(crate) projections: Arc<RwLock<ProjectionManager>>,
50
51 storage: Option<Arc<RwLock<ParquetStorage>>>,
53
54 #[cfg(feature = "server")]
56 websocket_manager: Arc<WebSocketManager>,
57
58 snapshot_manager: Arc<SnapshotManager>,
60
61 wal: Option<Arc<WriteAheadLog>>,
63
64 compaction_manager: Option<Arc<CompactionManager>>,
66
67 schema_registry: Arc<SchemaRegistry>,
69
70 replay_manager: Arc<ReplayManager>,
72
73 pipeline_manager: Arc<PipelineManager>,
75
76 #[cfg(feature = "server")]
78 metrics: Arc<MetricsRegistry>,
79
80 total_ingested: Arc<RwLock<u64>>,
82
83 projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
87
88 projection_status: Arc<DashMap<String, String>>,
91
92 #[cfg(feature = "server")]
94 webhook_registry: Arc<WebhookRegistry>,
95
96 #[cfg(feature = "server")]
98 webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
99
100 geo_index: Arc<GeoIndex>,
102
103 exactly_once: Arc<ExactlyOnceRegistry>,
105
106 schema_evolution: Arc<SchemaEvolutionManager>,
108
109 entity_versions: Arc<DashMap<String, u64>>,
112
113 consumer_registry: Arc<ConsumerRegistry>,
115}
116
117#[cfg(feature = "server")]
119#[derive(Debug, Clone)]
120pub struct WebhookDeliveryTask {
121 pub webhook: crate::application::services::webhook::WebhookSubscription,
122 pub event: Event,
123}
124
125impl EventStore {
126 pub fn new() -> Self {
128 Self::with_config(EventStoreConfig::default())
129 }
130
131 pub fn with_config(config: EventStoreConfig) -> Self {
133 let mut projections = ProjectionManager::new();
134
135 projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
137 projections.register(Arc::new(EventCounterProjection::new("event_counters")));
138
139 let storage = config
141 .storage_dir
142 .as_ref()
143 .and_then(|dir| match ParquetStorage::new(dir) {
144 Ok(storage) => {
145 tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
146 Some(Arc::new(RwLock::new(storage)))
147 }
148 Err(e) => {
149 tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
150 None
151 }
152 });
153
154 let wal = config.wal_dir.as_ref().and_then(|dir| {
156 match WriteAheadLog::new(dir, config.wal_config.clone()) {
157 Ok(wal) => {
158 tracing::info!("✅ WAL enabled at: {}", dir.display());
159 Some(Arc::new(wal))
160 }
161 Err(e) => {
162 tracing::error!("❌ Failed to initialize WAL: {}", e);
163 None
164 }
165 }
166 });
167
168 let compaction_manager = config.storage_dir.as_ref().map(|dir| {
170 let manager = CompactionManager::new(dir, config.compaction_config.clone());
171 Arc::new(manager)
172 });
173
174 let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
176 tracing::info!("✅ Schema registry enabled");
177
178 let replay_manager = Arc::new(ReplayManager::new());
180 tracing::info!("✅ Replay manager enabled");
181
182 let pipeline_manager = Arc::new(PipelineManager::new());
184 tracing::info!("✅ Pipeline manager enabled");
185
186 #[cfg(feature = "server")]
188 let metrics = {
189 let m = MetricsRegistry::new();
190 tracing::info!("✅ Prometheus metrics registry initialized");
191 m
192 };
193
194 let projection_state_cache = Arc::new(DashMap::new());
196 tracing::info!("✅ Projection state cache initialized");
197
198 #[cfg(feature = "server")]
200 let webhook_registry = {
201 let w = Arc::new(WebhookRegistry::new());
202 tracing::info!("✅ Webhook registry initialized");
203 w
204 };
205
206 let store = Self {
207 events: Arc::new(RwLock::new(Vec::new())),
208 index: Arc::new(EventIndex::new()),
209 projections: Arc::new(RwLock::new(projections)),
210 storage,
211 #[cfg(feature = "server")]
212 websocket_manager: Arc::new(WebSocketManager::new()),
213 snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
214 wal,
215 compaction_manager,
216 schema_registry,
217 replay_manager,
218 pipeline_manager,
219 #[cfg(feature = "server")]
220 metrics,
221 total_ingested: Arc::new(RwLock::new(0)),
222 projection_state_cache,
223 projection_status: Arc::new(DashMap::new()),
224 #[cfg(feature = "server")]
225 webhook_registry,
226 #[cfg(feature = "server")]
227 webhook_tx: Arc::new(RwLock::new(None)),
228 geo_index: Arc::new(GeoIndex::new()),
229 exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
230 schema_evolution: Arc::new(SchemaEvolutionManager::new()),
231 entity_versions: Arc::new(DashMap::new()),
232 consumer_registry: Arc::new(ConsumerRegistry::new()),
233 };
234
235 if let Some(ref storage) = store.storage {
237 match storage.read().load_all_events() {
238 Err(e) => {
239 tracing::error!(
240 "❌ Failed to load events from Parquet — data exists on disk but \
241 could not be read. This means events are NOT available until the \
242 issue is resolved. Error: {e}"
243 );
244 }
245 Ok(persisted_events) if persisted_events.is_empty() => {
246 tracing::info!("📂 No persisted events found in Parquet storage");
247 }
248 Ok(persisted_events) => {
249 tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
250
251 for event in persisted_events {
252 let offset = store.events.read().len();
253 if let Err(e) = store.index.index_event(
254 event.id,
255 event.entity_id_str(),
256 event.event_type_str(),
257 event.timestamp,
258 offset,
259 ) {
260 tracing::error!("Failed to re-index event {}: {}", event.id, e);
261 }
262
263 if let Err(e) = store.projections.read().process_event(&event) {
264 tracing::error!("Failed to re-process event {}: {}", event.id, e);
265 }
266
267 *store
269 .entity_versions
270 .entry(event.entity_id_str().to_string())
271 .or_insert(0) += 1;
272
273 store.events.write().push(event);
274 }
275
276 let total = store.events.read().len();
277 *store.total_ingested.write() = total as u64;
278 tracing::info!("✅ Successfully loaded {} events from storage", total);
279 }
280 }
281 }
282
283 if let Some(ref wal) = store.wal {
285 match wal.recover() {
286 Ok(recovered_events) if !recovered_events.is_empty() => {
287 let existing_ids: std::collections::HashSet<uuid::Uuid> =
289 store.events.read().iter().map(|e| e.id).collect();
290
291 let mut wal_new = 0usize;
292 for event in recovered_events {
293 if existing_ids.contains(&event.id) {
294 continue; }
296
297 let offset = store.events.read().len();
298 if let Err(e) = store.index.index_event(
299 event.id,
300 event.entity_id_str(),
301 event.event_type_str(),
302 event.timestamp,
303 offset,
304 ) {
305 tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
306 }
307
308 if let Err(e) = store.projections.read().process_event(&event) {
309 tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
310 }
311
312 *store
314 .entity_versions
315 .entry(event.entity_id_str().to_string())
316 .or_insert(0) += 1;
317
318 store.events.write().push(event);
319 wal_new += 1;
320 }
321
322 if wal_new > 0 {
323 let total = store.events.read().len();
324 *store.total_ingested.write() = total as u64;
325 tracing::info!(
326 "✅ Recovered {} new events from WAL ({} total)",
327 wal_new,
328 total
329 );
330
331 if let Some(ref storage) = store.storage {
337 tracing::info!(
338 "📸 Checkpointing {} WAL events to Parquet storage...",
339 wal_new
340 );
341 let parquet = storage.read();
342 let events = store.events.read();
343 let mut buffered = 0usize;
344 for event in events.iter().skip(events.len() - wal_new) {
345 if let Err(e) = parquet.append_event(event.clone()) {
346 tracing::error!(
347 "Failed to buffer WAL event for Parquet: {}",
348 e
349 );
350 } else {
351 buffered += 1;
352 }
353 }
354 drop(events);
355 drop(parquet);
356
357 if buffered > 0 {
358 if let Err(e) = store.flush_storage() {
359 tracing::error!("Failed to checkpoint to Parquet: {}", e);
360 } else if let Err(e) = wal.truncate() {
361 tracing::error!(
362 "Failed to truncate WAL after checkpoint: {}",
363 e
364 );
365 } else {
366 tracing::info!(
367 "✅ WAL checkpointed and truncated ({} events)",
368 buffered
369 );
370 }
371 }
372 }
373 }
374 }
375 Ok(_) => {
376 tracing::debug!("No events to recover from WAL");
377 }
378 Err(e) => {
379 tracing::error!("❌ WAL recovery failed: {}", e);
380 }
381 }
382 }
383
384 store
385 }
386
387 #[cfg_attr(feature = "hotpath", hotpath::measure)]
395 pub fn ingest_with_expected_version(
396 &self,
397 event: &Event,
398 expected_version: Option<u64>,
399 ) -> Result<u64> {
400 self.validate_event(event)?;
402
403 let entity_id = event.entity_id_str().to_string();
404
405 let new_version = {
408 let mut version_entry = self.entity_versions.entry(entity_id.clone()).or_insert(0);
409 let current = *version_entry;
410
411 if let Some(expected) = expected_version
412 && current != expected
413 {
414 return Err(crate::error::AllSourceError::VersionConflict { expected, current });
415 }
416
417 if let Some(ref wal) = self.wal {
419 wal.append(event.clone())?;
420 }
421
422 *version_entry += 1;
423 *version_entry
424 };
425
426 self.ingest_post_wal(event)?;
429
430 Ok(new_version)
431 }
432
433 #[cfg_attr(feature = "hotpath", hotpath::measure)]
436 fn ingest_post_wal(&self, event: &Event) -> Result<()> {
437 #[cfg(feature = "server")]
438 let timer = self.metrics.ingestion_duration_seconds.start_timer();
439
440 let mut events = self.events.write();
441 let offset = events.len();
442
443 self.index.index_event(
445 event.id,
446 event.entity_id_str(),
447 event.event_type_str(),
448 event.timestamp,
449 offset,
450 )?;
451
452 let projections = self.projections.read();
454 projections.process_event(event)?;
455 drop(projections);
456
457 let pipeline_results = self.pipeline_manager.process_event(event);
459 if !pipeline_results.is_empty() {
460 tracing::debug!(
461 "Event {} processed by {} pipeline(s)",
462 event.id,
463 pipeline_results.len()
464 );
465 for (pipeline_id, result) in pipeline_results {
466 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
467 }
468 }
469
470 if let Some(ref storage) = self.storage {
472 let storage = storage.read();
473 storage.append_event(event.clone())?;
474 }
475
476 events.push(event.clone());
478 let total_events = events.len();
479 drop(events);
480
481 #[cfg(feature = "server")]
483 self.websocket_manager
484 .broadcast_event(Arc::new(event.clone()));
485
486 #[cfg(feature = "server")]
488 self.dispatch_webhooks(event);
489
490 self.geo_index.index_event(event);
492
493 self.schema_evolution
495 .analyze_event(event.event_type_str(), &event.payload);
496
497 self.check_auto_snapshot(event.entity_id_str(), event);
499
500 #[cfg(feature = "server")]
502 {
503 self.metrics.events_ingested_total.inc();
504 self.metrics
505 .events_ingested_by_type
506 .with_label_values(&[event.event_type_str()])
507 .inc();
508 self.metrics.storage_events_total.set(total_events as i64);
509 }
510
511 let mut total = self.total_ingested.write();
513 *total += 1;
514
515 #[cfg(feature = "server")]
516 timer.observe_duration();
517
518 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
519
520 Ok(())
521 }
522
523 #[cfg_attr(feature = "hotpath", hotpath::measure)]
525 pub fn ingest(&self, event: &Event) -> Result<()> {
526 #[cfg(feature = "server")]
528 let timer = self.metrics.ingestion_duration_seconds.start_timer();
529
530 let validation_result = self.validate_event(event);
532 if let Err(e) = validation_result {
533 #[cfg(feature = "server")]
534 {
535 self.metrics.ingestion_errors_total.inc();
536 timer.observe_duration();
537 }
538 return Err(e);
539 }
540
541 if let Some(ref wal) = self.wal
544 && let Err(e) = wal.append(event.clone())
545 {
546 #[cfg(feature = "server")]
547 {
548 self.metrics.ingestion_errors_total.inc();
549 timer.observe_duration();
550 }
551 return Err(e);
552 }
553
554 *self
556 .entity_versions
557 .entry(event.entity_id_str().to_string())
558 .or_insert(0) += 1;
559
560 let mut events = self.events.write();
561 let offset = events.len();
562
563 self.index.index_event(
565 event.id,
566 event.entity_id_str(),
567 event.event_type_str(),
568 event.timestamp,
569 offset,
570 )?;
571
572 let projections = self.projections.read();
574 projections.process_event(event)?;
575 drop(projections); let pipeline_results = self.pipeline_manager.process_event(event);
580 if !pipeline_results.is_empty() {
581 tracing::debug!(
582 "Event {} processed by {} pipeline(s)",
583 event.id,
584 pipeline_results.len()
585 );
586 for (pipeline_id, result) in pipeline_results {
589 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
590 }
591 }
592
593 if let Some(ref storage) = self.storage {
595 let storage = storage.read();
596 storage.append_event(event.clone())?;
597 }
598
599 events.push(event.clone());
601 let total_events = events.len();
602 drop(events); #[cfg(feature = "server")]
606 self.websocket_manager
607 .broadcast_event(Arc::new(event.clone()));
608
609 #[cfg(feature = "server")]
611 self.dispatch_webhooks(event);
612
613 self.geo_index.index_event(event);
615
616 self.schema_evolution
618 .analyze_event(event.event_type_str(), &event.payload);
619
620 self.check_auto_snapshot(event.entity_id_str(), event);
622
623 #[cfg(feature = "server")]
625 {
626 self.metrics.events_ingested_total.inc();
627 self.metrics
628 .events_ingested_by_type
629 .with_label_values(&[event.event_type_str()])
630 .inc();
631 self.metrics.storage_events_total.set(total_events as i64);
632 }
633
634 let mut total = self.total_ingested.write();
636 *total += 1;
637
638 #[cfg(feature = "server")]
639 timer.observe_duration();
640
641 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
642
643 Ok(())
644 }
645
646 #[cfg_attr(feature = "hotpath", hotpath::measure)]
653 pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()> {
654 if batch.is_empty() {
655 return Ok(());
656 }
657
658 for event in &batch {
660 self.validate_event(event)?;
661 }
662
663 if let Some(ref wal) = self.wal {
665 for event in &batch {
666 wal.append(event.clone())?;
667 }
668 }
669
670 let mut events = self.events.write();
672 let projections = self.projections.read();
673
674 for event in batch {
675 let offset = events.len();
676
677 self.index.index_event(
678 event.id,
679 event.entity_id_str(),
680 event.event_type_str(),
681 event.timestamp,
682 offset,
683 )?;
684
685 projections.process_event(&event)?;
686 self.pipeline_manager.process_event(&event);
687
688 if let Some(ref storage) = self.storage {
689 let storage = storage.read();
690 storage.append_event(event.clone())?;
691 }
692
693 self.geo_index.index_event(&event);
694 self.schema_evolution
695 .analyze_event(event.event_type_str(), &event.payload);
696
697 *self
699 .entity_versions
700 .entry(event.entity_id_str().to_string())
701 .or_insert(0) += 1;
702
703 events.push(event);
704 }
705
706 let total_events = events.len();
707 drop(projections);
708 drop(events);
709
710 let mut total = self.total_ingested.write();
711 *total += total_events as u64;
712
713 Ok(())
714 }
715
716 #[cfg_attr(feature = "hotpath", hotpath::measure)]
723 pub fn ingest_replicated(&self, event: &Event) -> Result<()> {
724 #[cfg(feature = "server")]
725 let timer = self.metrics.ingestion_duration_seconds.start_timer();
726
727 let mut events = self.events.write();
728 let offset = events.len();
729
730 self.index.index_event(
732 event.id,
733 event.entity_id_str(),
734 event.event_type_str(),
735 event.timestamp,
736 offset,
737 )?;
738
739 let projections = self.projections.read();
741 projections.process_event(event)?;
742 drop(projections);
743
744 let pipeline_results = self.pipeline_manager.process_event(event);
746 if !pipeline_results.is_empty() {
747 tracing::debug!(
748 "Replicated event {} processed by {} pipeline(s)",
749 event.id,
750 pipeline_results.len()
751 );
752 }
753
754 *self
756 .entity_versions
757 .entry(event.entity_id_str().to_string())
758 .or_insert(0) += 1;
759
760 events.push(event.clone());
762 let total_events = events.len();
763 drop(events);
764
765 #[cfg(feature = "server")]
767 self.websocket_manager
768 .broadcast_event(Arc::new(event.clone()));
769
770 #[cfg(feature = "server")]
772 {
773 self.metrics.events_ingested_total.inc();
774 self.metrics
775 .events_ingested_by_type
776 .with_label_values(&[event.event_type_str()])
777 .inc();
778 self.metrics.storage_events_total.set(total_events as i64);
779 }
780
781 let mut total = self.total_ingested.write();
782 *total += 1;
783
784 #[cfg(feature = "server")]
785 timer.observe_duration();
786
787 tracing::debug!(
788 "Replicated event ingested: {} (offset: {})",
789 event.id,
790 offset
791 );
792
793 Ok(())
794 }
795
796 #[cfg_attr(feature = "hotpath", hotpath::measure)]
799 pub fn get_entity_version(&self, entity_id: &str) -> u64 {
800 self.entity_versions.get(entity_id).map_or(0, |v| *v)
801 }
802
803 pub fn consumer_registry(&self) -> &ConsumerRegistry {
805 &self.consumer_registry
806 }
807
808 pub fn set_consumer_registry(&mut self, registry: Arc<ConsumerRegistry>) {
813 self.consumer_registry = registry;
814 }
815
816 pub fn total_events(&self) -> usize {
818 self.events.read().len()
819 }
820
821 pub fn events_after_offset(
824 &self,
825 offset: u64,
826 filters: &[String],
827 limit: usize,
828 ) -> Vec<(u64, Event)> {
829 let events = self.events.read();
830 let start = offset as usize;
831 if start >= events.len() {
832 return vec![];
833 }
834
835 events[start..]
836 .iter()
837 .enumerate()
838 .filter(|(_, event)| ConsumerRegistry::matches_filters(event.event_type_str(), filters))
839 .take(limit)
840 .map(|(i, event)| ((start + i + 1) as u64, event.clone()))
841 .collect()
842 }
843
844 #[cfg(feature = "server")]
846 pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
847 Arc::clone(&self.websocket_manager)
848 }
849
850 pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
852 Arc::clone(&self.snapshot_manager)
853 }
854
855 pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
857 self.compaction_manager.as_ref().map(Arc::clone)
858 }
859
860 pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
862 Arc::clone(&self.schema_registry)
863 }
864
865 pub fn replay_manager(&self) -> Arc<ReplayManager> {
867 Arc::clone(&self.replay_manager)
868 }
869
870 pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
872 Arc::clone(&self.pipeline_manager)
873 }
874
875 #[cfg(feature = "server")]
877 pub fn metrics(&self) -> Arc<MetricsRegistry> {
878 Arc::clone(&self.metrics)
879 }
880
881 pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
883 self.projections.read()
884 }
885
886 pub fn register_projection(
895 &self,
896 projection: Arc<dyn crate::application::services::projection::Projection>,
897 ) {
898 let mut pm = self.projections.write();
899 pm.register(projection);
900 }
901
902 pub fn register_projection_with_backfill(
908 &self,
909 projection: &Arc<dyn crate::application::services::projection::Projection>,
910 ) -> Result<()> {
911 {
913 let mut pm = self.projections.write();
914 pm.register(Arc::clone(projection));
915 }
916
917 let events = self.events.read();
919 for event in events.iter() {
920 projection.process(event)?;
921 }
922
923 Ok(())
924 }
925
926 pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
929 Arc::clone(&self.projection_state_cache)
930 }
931
932 pub fn projection_status(&self) -> Arc<DashMap<String, String>> {
934 Arc::clone(&self.projection_status)
935 }
936
937 pub fn geo_index(&self) -> Arc<GeoIndex> {
940 self.geo_index.clone()
941 }
942
943 pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
945 self.exactly_once.clone()
946 }
947
948 pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
950 self.schema_evolution.clone()
951 }
952
953 pub fn snapshot_events(&self) -> Vec<Event> {
959 self.events.read().clone()
960 }
961
962 pub fn compact_entity_tokens(
984 &self,
985 entity_id: &str,
986 token_event_type: &str,
987 merged_event: Event,
988 ) -> Result<bool> {
989 {
991 let events = self.events.read();
992 let has_tokens = events
993 .iter()
994 .any(|e| e.entity_id_str() == entity_id && e.event_type_str() == token_event_type);
995 if !has_tokens {
996 return Ok(false);
997 }
998 }
999
1000 let projections = self.projections.read();
1002 projections.process_event(&merged_event)?;
1003 drop(projections);
1004
1005 let mut events = self.events.write();
1007
1008 events.retain(|e| {
1009 !(e.entity_id_str() == entity_id && e.event_type_str() == token_event_type)
1010 });
1011
1012 events.push(merged_event.clone());
1013
1014 if let Some(ref wal) = self.wal {
1018 wal.append(merged_event)?;
1019 }
1020
1021 self.index.clear();
1026 for (offset, event) in events.iter().enumerate() {
1027 if let Err(e) = self.index.index_event(
1028 event.id,
1029 event.entity_id_str(),
1030 event.event_type_str(),
1031 event.timestamp,
1032 offset,
1033 ) {
1034 tracing::warn!(
1035 event_id = %event.id,
1036 offset,
1037 "Failed to re-index event during compaction: {e}"
1038 );
1039 }
1040 }
1041
1042 Ok(true)
1043 }
1044
1045 #[cfg(feature = "server")]
1046 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
1047 Arc::clone(&self.webhook_registry)
1048 }
1049
1050 #[cfg(feature = "server")]
1053 pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
1054 *self.webhook_tx.write() = Some(tx);
1055 tracing::info!("Webhook delivery channel connected");
1056 }
1057
1058 #[cfg(feature = "server")]
1060 fn dispatch_webhooks(&self, event: &Event) {
1061 let matching = self.webhook_registry.find_matching(event);
1062 if matching.is_empty() {
1063 return;
1064 }
1065
1066 let tx_guard = self.webhook_tx.read();
1067 if let Some(ref tx) = *tx_guard {
1068 for webhook in matching {
1069 let task = WebhookDeliveryTask {
1070 webhook,
1071 event: event.clone(),
1072 };
1073 if let Err(e) = tx.send(task) {
1074 tracing::warn!("Failed to queue webhook delivery: {}", e);
1075 }
1076 }
1077 }
1078 }
1079
1080 pub fn flush_storage(&self) -> Result<()> {
1082 if let Some(ref storage) = self.storage {
1083 let storage = storage.read();
1084 storage.flush()?;
1085 tracing::info!("✅ Flushed events to persistent storage");
1086 }
1087 Ok(())
1088 }
1089
1090 pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
1092 let events = self.query(&QueryEventsRequest {
1094 entity_id: Some(entity_id.to_string()),
1095 event_type: None,
1096 tenant_id: None,
1097 as_of: None,
1098 since: None,
1099 until: None,
1100 limit: None,
1101 event_type_prefix: None,
1102 payload_filter: None,
1103 })?;
1104
1105 if events.is_empty() {
1106 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1107 }
1108
1109 let mut state = serde_json::json!({});
1111 for event in &events {
1112 if let serde_json::Value::Object(ref mut state_map) = state
1113 && let serde_json::Value::Object(ref payload_map) = event.payload
1114 {
1115 for (key, value) in payload_map {
1116 state_map.insert(key.clone(), value.clone());
1117 }
1118 }
1119 }
1120
1121 let last_event = events.last().unwrap();
1122 self.snapshot_manager.create_snapshot(
1123 entity_id,
1124 state,
1125 last_event.timestamp,
1126 events.len(),
1127 SnapshotType::Manual,
1128 )?;
1129
1130 Ok(())
1131 }
1132
1133 fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
1135 let entity_event_count = self
1137 .index
1138 .get_by_entity(entity_id)
1139 .map_or(0, |entries| entries.len());
1140
1141 if self.snapshot_manager.should_create_snapshot(
1142 entity_id,
1143 entity_event_count,
1144 event.timestamp,
1145 ) {
1146 if let Err(e) = self.create_snapshot(entity_id) {
1148 tracing::warn!(
1149 "Failed to create automatic snapshot for {}: {}",
1150 entity_id,
1151 e
1152 );
1153 }
1154 }
1155 }
1156
1157 fn validate_event(&self, event: &Event) -> Result<()> {
1159 if event.entity_id_str().is_empty() {
1162 return Err(AllSourceError::ValidationError(
1163 "entity_id cannot be empty".to_string(),
1164 ));
1165 }
1166
1167 if event.event_type_str().is_empty() {
1168 return Err(AllSourceError::ValidationError(
1169 "event_type cannot be empty".to_string(),
1170 ));
1171 }
1172
1173 if event.event_type().is_system() {
1176 return Err(AllSourceError::ValidationError(
1177 "Event types starting with '_system.' are reserved for internal use".to_string(),
1178 ));
1179 }
1180
1181 Ok(())
1182 }
1183
1184 pub fn reset_projection(&self, name: &str) -> Result<usize> {
1186 let projection_manager = self.projections.read();
1187 let projection = projection_manager.get_projection(name).ok_or_else(|| {
1188 AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1189 })?;
1190
1191 projection.clear();
1193
1194 let prefix = format!("{name}:");
1196 let keys_to_remove: Vec<String> = self
1197 .projection_state_cache
1198 .iter()
1199 .filter(|entry| entry.key().starts_with(&prefix))
1200 .map(|entry| entry.key().clone())
1201 .collect();
1202 for key in keys_to_remove {
1203 self.projection_state_cache.remove(&key);
1204 }
1205
1206 let events = self.events.read();
1208 let mut reprocessed = 0usize;
1209 for event in events.iter() {
1210 if projection.process(event).is_ok() {
1211 reprocessed += 1;
1212 }
1213 }
1214
1215 Ok(reprocessed)
1216 }
1217
1218 pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
1220 if let Some(offset) = self.index.get_by_id(event_id) {
1221 let events = self.events.read();
1222 Ok(events.get(offset).cloned())
1223 } else {
1224 Ok(None)
1225 }
1226 }
1227
1228 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1230 pub fn query(&self, request: &QueryEventsRequest) -> Result<Vec<Event>> {
1231 let query_type = if request.entity_id.is_some() {
1233 "entity"
1234 } else if request.event_type.is_some() {
1235 "type"
1236 } else if request.event_type_prefix.is_some() {
1237 "type_prefix"
1238 } else {
1239 "full_scan"
1240 };
1241
1242 #[cfg(feature = "server")]
1244 let timer = self
1245 .metrics
1246 .query_duration_seconds
1247 .with_label_values(&[query_type])
1248 .start_timer();
1249
1250 #[cfg(feature = "server")]
1252 self.metrics
1253 .queries_total
1254 .with_label_values(&[query_type])
1255 .inc();
1256
1257 let events = self.events.read();
1258
1259 let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
1261 self.index
1263 .get_by_entity(entity_id)
1264 .map(|entries| self.filter_entries(entries, request))
1265 .unwrap_or_default()
1266 } else if let Some(event_type) = &request.event_type {
1267 self.index
1269 .get_by_type(event_type)
1270 .map(|entries| self.filter_entries(entries, request))
1271 .unwrap_or_default()
1272 } else if let Some(prefix) = &request.event_type_prefix {
1273 let entries = self.index.get_by_type_prefix(prefix);
1275 self.filter_entries(entries, request)
1276 } else {
1277 (0..events.len()).collect()
1279 };
1280
1281 let mut results: Vec<Event> = offsets
1283 .iter()
1284 .filter_map(|&offset| events.get(offset).cloned())
1285 .filter(|event| self.apply_filters(event, request))
1286 .collect();
1287
1288 results.sort_by_key(|x| x.timestamp);
1290
1291 if let Some(limit) = request.limit {
1293 results.truncate(limit);
1294 }
1295
1296 #[cfg(feature = "server")]
1298 {
1299 self.metrics
1300 .query_results_total
1301 .with_label_values(&[query_type])
1302 .inc_by(results.len() as u64);
1303 timer.observe_duration();
1304 }
1305
1306 Ok(results)
1307 }
1308
1309 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1311 fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
1312 entries
1313 .into_iter()
1314 .filter(|entry| {
1315 if let Some(as_of) = request.as_of
1317 && entry.timestamp > as_of
1318 {
1319 return false;
1320 }
1321 if let Some(since) = request.since
1322 && entry.timestamp < since
1323 {
1324 return false;
1325 }
1326 if let Some(until) = request.until
1327 && entry.timestamp > until
1328 {
1329 return false;
1330 }
1331 true
1332 })
1333 .map(|entry| entry.offset)
1334 .collect()
1335 }
1336
1337 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1339 fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
1340 if request.entity_id.is_some()
1342 && let Some(ref event_type) = request.event_type
1343 && event.event_type_str() != event_type
1344 {
1345 return false;
1346 }
1347
1348 if request.entity_id.is_some()
1350 && let Some(ref prefix) = request.event_type_prefix
1351 && !event.event_type_str().starts_with(prefix)
1352 {
1353 return false;
1354 }
1355
1356 if let Some(ref filter_str) = request.payload_filter
1358 && let Ok(filter_obj) =
1359 serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(filter_str)
1360 {
1361 let payload = event.payload();
1362 for (key, expected_value) in &filter_obj {
1363 match payload.get(key) {
1364 Some(actual_value) if actual_value == expected_value => {}
1365 _ => return false,
1366 }
1367 }
1368 }
1369
1370 true
1371 }
1372
1373 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1376 pub fn reconstruct_state(
1377 &self,
1378 entity_id: &str,
1379 as_of: Option<DateTime<Utc>>,
1380 ) -> Result<serde_json::Value> {
1381 let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
1383 if let Some(snapshot) = self
1385 .snapshot_manager
1386 .get_snapshot_as_of(entity_id, as_of_time)
1387 {
1388 tracing::debug!(
1389 "Using snapshot from {} for entity {} (saved {} events)",
1390 snapshot.as_of,
1391 entity_id,
1392 snapshot.event_count
1393 );
1394 (snapshot.state.clone(), Some(snapshot.as_of))
1395 } else {
1396 (serde_json::json!({}), None)
1397 }
1398 } else {
1399 if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
1401 tracing::debug!(
1402 "Using latest snapshot from {} for entity {}",
1403 snapshot.as_of,
1404 entity_id
1405 );
1406 (snapshot.state.clone(), Some(snapshot.as_of))
1407 } else {
1408 (serde_json::json!({}), None)
1409 }
1410 };
1411
1412 let events = self.query(&QueryEventsRequest {
1414 entity_id: Some(entity_id.to_string()),
1415 event_type: None,
1416 tenant_id: None,
1417 as_of,
1418 since: since_timestamp,
1419 until: None,
1420 limit: None,
1421 event_type_prefix: None,
1422 payload_filter: None,
1423 })?;
1424
1425 if events.is_empty() && since_timestamp.is_none() {
1427 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1428 }
1429
1430 let mut merged_state = merged_state;
1432 for event in &events {
1433 if let serde_json::Value::Object(ref mut state_map) = merged_state
1434 && let serde_json::Value::Object(ref payload_map) = event.payload
1435 {
1436 for (key, value) in payload_map {
1437 state_map.insert(key.clone(), value.clone());
1438 }
1439 }
1440 }
1441
1442 let state = serde_json::json!({
1444 "entity_id": entity_id,
1445 "last_updated": events.last().map(|e| e.timestamp),
1446 "event_count": events.len(),
1447 "as_of": as_of,
1448 "current_state": merged_state,
1449 "history": events.iter().map(|e| {
1450 serde_json::json!({
1451 "event_id": e.id,
1452 "type": e.event_type,
1453 "timestamp": e.timestamp,
1454 "payload": e.payload
1455 })
1456 }).collect::<Vec<_>>()
1457 });
1458
1459 Ok(state)
1460 }
1461
1462 pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
1464 let projections = self.projections.read();
1465
1466 if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
1467 && let Some(state) = snapshot_projection.get_state(entity_id)
1468 {
1469 return Ok(serde_json::json!({
1470 "entity_id": entity_id,
1471 "snapshot": state,
1472 "from_projection": "entity_snapshots"
1473 }));
1474 }
1475
1476 Err(AllSourceError::EntityNotFound(entity_id.to_string()))
1477 }
1478
1479 pub fn stats(&self) -> StoreStats {
1481 let events = self.events.read();
1482 let index_stats = self.index.stats();
1483
1484 StoreStats {
1485 total_events: events.len(),
1486 total_entities: index_stats.total_entities,
1487 total_event_types: index_stats.total_event_types,
1488 total_ingested: *self.total_ingested.read(),
1489 }
1490 }
1491
1492 pub fn list_streams(&self) -> Vec<StreamInfo> {
1494 self.index
1495 .get_all_entities()
1496 .into_iter()
1497 .map(|entity_id| {
1498 let event_count = self
1499 .index
1500 .get_by_entity(&entity_id)
1501 .map_or(0, |entries| entries.len());
1502 let last_event_at = self
1503 .index
1504 .get_by_entity(&entity_id)
1505 .and_then(|entries| entries.last().map(|e| e.timestamp));
1506 StreamInfo {
1507 stream_id: entity_id,
1508 event_count,
1509 last_event_at,
1510 }
1511 })
1512 .collect()
1513 }
1514
1515 pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
1517 self.index
1518 .get_all_types()
1519 .into_iter()
1520 .map(|event_type| {
1521 let event_count = self
1522 .index
1523 .get_by_type(&event_type)
1524 .map_or(0, |entries| entries.len());
1525 let last_event_at = self
1526 .index
1527 .get_by_type(&event_type)
1528 .and_then(|entries| entries.last().map(|e| e.timestamp));
1529 EventTypeInfo {
1530 event_type,
1531 event_count,
1532 last_event_at,
1533 }
1534 })
1535 .collect()
1536 }
1537
1538 pub fn enable_wal_replication(
1545 &self,
1546 tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
1547 ) {
1548 if let Some(ref wal_arc) = self.wal {
1549 wal_arc.set_replication_tx(tx);
1550 tracing::info!("WAL replication broadcast enabled");
1551 } else {
1552 tracing::warn!("Cannot enable WAL replication: WAL is not configured");
1553 }
1554 }
1555
1556 pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1559 self.wal.as_ref()
1560 }
1561
1562 pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1565 self.storage.as_ref()
1566 }
1567}
1568
1569#[derive(Debug, Clone, Default)]
1571pub struct EventStoreConfig {
1572 pub storage_dir: Option<PathBuf>,
1574
1575 pub snapshot_config: SnapshotConfig,
1577
1578 pub wal_dir: Option<PathBuf>,
1580
1581 pub wal_config: WALConfig,
1583
1584 pub compaction_config: CompactionConfig,
1586
1587 pub schema_registry_config: SchemaRegistryConfig,
1589
1590 pub system_data_dir: Option<PathBuf>,
1595
1596 pub bootstrap_tenant: Option<String>,
1598}
1599
1600impl EventStoreConfig {
1601 pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
1603 Self {
1604 storage_dir: Some(storage_dir.into()),
1605 ..Self::default()
1606 }
1607 }
1608
1609 pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
1611 Self {
1612 snapshot_config,
1613 ..Self::default()
1614 }
1615 }
1616
1617 pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
1619 Self {
1620 wal_dir: Some(wal_dir.into()),
1621 wal_config,
1622 ..Self::default()
1623 }
1624 }
1625
1626 pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
1628 Self {
1629 storage_dir: Some(storage_dir.into()),
1630 snapshot_config,
1631 ..Self::default()
1632 }
1633 }
1634
1635 pub fn production(
1637 storage_dir: impl Into<PathBuf>,
1638 wal_dir: impl Into<PathBuf>,
1639 snapshot_config: SnapshotConfig,
1640 wal_config: WALConfig,
1641 compaction_config: CompactionConfig,
1642 ) -> Self {
1643 let storage_dir = storage_dir.into();
1644 let system_data_dir = storage_dir.join("__system");
1645 Self {
1646 storage_dir: Some(storage_dir),
1647 snapshot_config,
1648 wal_dir: Some(wal_dir.into()),
1649 wal_config,
1650 compaction_config,
1651 system_data_dir: Some(system_data_dir),
1652 ..Self::default()
1653 }
1654 }
1655
1656 pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
1661 self.system_data_dir
1662 .clone()
1663 .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
1664 }
1665
1666 pub fn from_env() -> (Self, &'static str) {
1674 Self::from_env_vars(
1675 std::env::var("ALLSOURCE_DATA_DIR")
1676 .ok()
1677 .filter(|s| !s.is_empty()),
1678 std::env::var("ALLSOURCE_STORAGE_DIR")
1679 .ok()
1680 .filter(|s| !s.is_empty()),
1681 std::env::var("ALLSOURCE_WAL_DIR")
1682 .ok()
1683 .filter(|s| !s.is_empty()),
1684 std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
1685 )
1686 }
1687
1688 pub fn from_env_vars(
1690 data_dir: Option<String>,
1691 explicit_storage_dir: Option<String>,
1692 explicit_wal_dir: Option<String>,
1693 wal_enabled_var: Option<String>,
1694 ) -> (Self, &'static str) {
1695 let data_dir = data_dir.filter(|s| !s.is_empty());
1696 let storage_dir = explicit_storage_dir
1697 .filter(|s| !s.is_empty())
1698 .or_else(|| data_dir.as_ref().map(|d| format!("{d}/storage")));
1699 let wal_dir = explicit_wal_dir
1700 .filter(|s| !s.is_empty())
1701 .or_else(|| data_dir.as_ref().map(|d| format!("{d}/wal")));
1702 let wal_enabled = wal_enabled_var.is_none_or(|v| v == "true");
1703
1704 match (&storage_dir, &wal_dir) {
1705 (Some(sd), Some(wd)) if wal_enabled => {
1706 let config = Self::production(
1707 sd,
1708 wd,
1709 SnapshotConfig::default(),
1710 WALConfig::default(),
1711 CompactionConfig::default(),
1712 );
1713 (config, "wal+parquet")
1714 }
1715 (Some(sd), _) => {
1716 let config = Self::with_persistence(sd);
1717 (config, "parquet-only")
1718 }
1719 (_, Some(wd)) if wal_enabled => {
1720 let config = Self::with_wal(wd, WALConfig::default());
1721 (config, "wal-only")
1722 }
1723 _ => (Self::default(), "in-memory"),
1724 }
1725 }
1726}
1727
1728#[derive(Debug, serde::Serialize)]
1729pub struct StoreStats {
1730 pub total_events: usize,
1731 pub total_entities: usize,
1732 pub total_event_types: usize,
1733 pub total_ingested: u64,
1734}
1735
1736#[derive(Debug, Clone, serde::Serialize)]
1738pub struct StreamInfo {
1739 pub stream_id: String,
1741 pub event_count: usize,
1743 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1745}
1746
1747#[derive(Debug, Clone, serde::Serialize)]
1749pub struct EventTypeInfo {
1750 pub event_type: String,
1752 pub event_count: usize,
1754 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1756}
1757
1758impl Default for EventStore {
1759 fn default() -> Self {
1760 Self::new()
1761 }
1762}
1763
1764#[cfg(test)]
1765mod tests {
1766 use super::*;
1767 use crate::domain::entities::Event;
1768 use tempfile::TempDir;
1769
1770 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1771 Event::from_strings(
1772 event_type.to_string(),
1773 entity_id.to_string(),
1774 "default".to_string(),
1775 serde_json::json!({"name": "Test", "value": 42}),
1776 None,
1777 )
1778 .unwrap()
1779 }
1780
1781 fn create_test_event_with_payload(
1782 entity_id: &str,
1783 event_type: &str,
1784 payload: serde_json::Value,
1785 ) -> Event {
1786 Event::from_strings(
1787 event_type.to_string(),
1788 entity_id.to_string(),
1789 "default".to_string(),
1790 payload,
1791 None,
1792 )
1793 .unwrap()
1794 }
1795
1796 #[test]
1797 fn test_event_store_new() {
1798 let store = EventStore::new();
1799 assert_eq!(store.stats().total_events, 0);
1800 assert_eq!(store.stats().total_entities, 0);
1801 }
1802
1803 #[test]
1804 fn test_event_store_default() {
1805 let store = EventStore::default();
1806 assert_eq!(store.stats().total_events, 0);
1807 }
1808
1809 #[test]
1810 fn test_ingest_single_event() {
1811 let store = EventStore::new();
1812 let event = create_test_event("entity-1", "user.created");
1813
1814 store.ingest(&event).unwrap();
1815
1816 assert_eq!(store.stats().total_events, 1);
1817 assert_eq!(store.stats().total_ingested, 1);
1818 }
1819
1820 #[test]
1821 fn test_ingest_multiple_events() {
1822 let store = EventStore::new();
1823
1824 for i in 0..10 {
1825 let event = create_test_event(&format!("entity-{i}"), "user.created");
1826 store.ingest(&event).unwrap();
1827 }
1828
1829 assert_eq!(store.stats().total_events, 10);
1830 assert_eq!(store.stats().total_ingested, 10);
1831 }
1832
1833 #[test]
1834 fn test_query_by_entity_id() {
1835 let store = EventStore::new();
1836
1837 store
1838 .ingest(&create_test_event("entity-1", "user.created"))
1839 .unwrap();
1840 store
1841 .ingest(&create_test_event("entity-2", "user.created"))
1842 .unwrap();
1843 store
1844 .ingest(&create_test_event("entity-1", "user.updated"))
1845 .unwrap();
1846
1847 let results = store
1848 .query(&QueryEventsRequest {
1849 entity_id: Some("entity-1".to_string()),
1850 event_type: None,
1851 tenant_id: None,
1852 as_of: None,
1853 since: None,
1854 until: None,
1855 limit: None,
1856 event_type_prefix: None,
1857 payload_filter: None,
1858 })
1859 .unwrap();
1860
1861 assert_eq!(results.len(), 2);
1862 }
1863
1864 #[test]
1865 fn test_query_by_event_type() {
1866 let store = EventStore::new();
1867
1868 store
1869 .ingest(&create_test_event("entity-1", "user.created"))
1870 .unwrap();
1871 store
1872 .ingest(&create_test_event("entity-2", "user.updated"))
1873 .unwrap();
1874 store
1875 .ingest(&create_test_event("entity-3", "user.created"))
1876 .unwrap();
1877
1878 let results = store
1879 .query(&QueryEventsRequest {
1880 entity_id: None,
1881 event_type: Some("user.created".to_string()),
1882 tenant_id: None,
1883 as_of: None,
1884 since: None,
1885 until: None,
1886 limit: None,
1887 event_type_prefix: None,
1888 payload_filter: None,
1889 })
1890 .unwrap();
1891
1892 assert_eq!(results.len(), 2);
1893 }
1894
1895 #[test]
1896 fn test_query_with_limit() {
1897 let store = EventStore::new();
1898
1899 for i in 0..10 {
1900 let event = create_test_event(&format!("entity-{i}"), "user.created");
1901 store.ingest(&event).unwrap();
1902 }
1903
1904 let results = store
1905 .query(&QueryEventsRequest {
1906 entity_id: None,
1907 event_type: None,
1908 tenant_id: None,
1909 as_of: None,
1910 since: None,
1911 until: None,
1912 limit: Some(5),
1913 event_type_prefix: None,
1914 payload_filter: None,
1915 })
1916 .unwrap();
1917
1918 assert_eq!(results.len(), 5);
1919 }
1920
1921 #[test]
1922 fn test_query_empty_store() {
1923 let store = EventStore::new();
1924
1925 let results = store
1926 .query(&QueryEventsRequest {
1927 entity_id: Some("non-existent".to_string()),
1928 event_type: None,
1929 tenant_id: None,
1930 as_of: None,
1931 since: None,
1932 until: None,
1933 limit: None,
1934 event_type_prefix: None,
1935 payload_filter: None,
1936 })
1937 .unwrap();
1938
1939 assert!(results.is_empty());
1940 }
1941
1942 #[test]
1943 fn test_reconstruct_state() {
1944 let store = EventStore::new();
1945
1946 store
1947 .ingest(&create_test_event("entity-1", "user.created"))
1948 .unwrap();
1949
1950 let state = store.reconstruct_state("entity-1", None).unwrap();
1951 assert_eq!(state["current_state"]["name"], "Test");
1953 assert_eq!(state["current_state"]["value"], 42);
1954 }
1955
1956 #[test]
1957 fn test_reconstruct_state_not_found() {
1958 let store = EventStore::new();
1959
1960 let result = store.reconstruct_state("non-existent", None);
1961 assert!(result.is_err());
1962 }
1963
1964 #[test]
1965 fn test_get_snapshot_empty() {
1966 let store = EventStore::new();
1967
1968 let result = store.get_snapshot("non-existent");
1969 assert!(result.is_err());
1971 }
1972
1973 #[test]
1974 fn test_create_snapshot() {
1975 let store = EventStore::new();
1976
1977 store
1978 .ingest(&create_test_event("entity-1", "user.created"))
1979 .unwrap();
1980
1981 store.create_snapshot("entity-1").unwrap();
1982
1983 let snapshot = store.get_snapshot("entity-1").unwrap();
1985 assert!(snapshot != serde_json::json!(null));
1986 }
1987
1988 #[test]
1989 fn test_create_snapshot_entity_not_found() {
1990 let store = EventStore::new();
1991
1992 let result = store.create_snapshot("non-existent");
1993 assert!(result.is_err());
1994 }
1995
1996 #[test]
1997 fn test_websocket_manager() {
1998 let store = EventStore::new();
1999 let manager = store.websocket_manager();
2000 assert!(Arc::strong_count(&manager) >= 1);
2002 }
2003
2004 #[test]
2005 fn test_snapshot_manager() {
2006 let store = EventStore::new();
2007 let manager = store.snapshot_manager();
2008 assert!(Arc::strong_count(&manager) >= 1);
2009 }
2010
2011 #[test]
2012 fn test_compaction_manager_none() {
2013 let store = EventStore::new();
2014 assert!(store.compaction_manager().is_none());
2016 }
2017
2018 #[test]
2019 fn test_schema_registry() {
2020 let store = EventStore::new();
2021 let registry = store.schema_registry();
2022 assert!(Arc::strong_count(®istry) >= 1);
2023 }
2024
2025 #[test]
2026 fn test_replay_manager() {
2027 let store = EventStore::new();
2028 let manager = store.replay_manager();
2029 assert!(Arc::strong_count(&manager) >= 1);
2030 }
2031
2032 #[test]
2033 fn test_pipeline_manager() {
2034 let store = EventStore::new();
2035 let manager = store.pipeline_manager();
2036 assert!(Arc::strong_count(&manager) >= 1);
2037 }
2038
2039 #[test]
2040 fn test_projection_manager() {
2041 let store = EventStore::new();
2042 let manager = store.projection_manager();
2043 let projections = manager.list_projections();
2045 assert!(projections.len() >= 2); }
2047
2048 #[test]
2049 fn test_projection_state_cache() {
2050 let store = EventStore::new();
2051 let cache = store.projection_state_cache();
2052
2053 cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
2054 assert_eq!(cache.len(), 1);
2055
2056 let value = cache.get("test:key").unwrap();
2057 assert_eq!(value["value"], 123);
2058 }
2059
2060 #[test]
2061 fn test_metrics() {
2062 let store = EventStore::new();
2063 let metrics = store.metrics();
2064 assert!(Arc::strong_count(&metrics) >= 1);
2065 }
2066
2067 #[test]
2068 fn test_store_stats() {
2069 let store = EventStore::new();
2070
2071 store
2072 .ingest(&create_test_event("entity-1", "user.created"))
2073 .unwrap();
2074 store
2075 .ingest(&create_test_event("entity-2", "order.placed"))
2076 .unwrap();
2077
2078 let stats = store.stats();
2079 assert_eq!(stats.total_events, 2);
2080 assert_eq!(stats.total_entities, 2);
2081 assert_eq!(stats.total_event_types, 2);
2082 assert_eq!(stats.total_ingested, 2);
2083 }
2084
2085 #[test]
2086 fn test_event_store_config_default() {
2087 let config = EventStoreConfig::default();
2088 assert!(config.storage_dir.is_none());
2089 assert!(config.wal_dir.is_none());
2090 }
2091
2092 #[test]
2093 fn test_event_store_config_with_persistence() {
2094 let temp_dir = TempDir::new().unwrap();
2095 let config = EventStoreConfig::with_persistence(temp_dir.path());
2096
2097 assert!(config.storage_dir.is_some());
2098 assert!(config.wal_dir.is_none());
2099 }
2100
2101 #[test]
2102 fn test_event_store_config_with_wal() {
2103 let temp_dir = TempDir::new().unwrap();
2104 let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
2105
2106 assert!(config.storage_dir.is_none());
2107 assert!(config.wal_dir.is_some());
2108 }
2109
2110 #[test]
2111 fn test_event_store_config_with_all() {
2112 let temp_dir = TempDir::new().unwrap();
2113 let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
2114
2115 assert!(config.storage_dir.is_some());
2116 }
2117
2118 #[test]
2119 fn test_event_store_config_production() {
2120 let storage_dir = TempDir::new().unwrap();
2121 let wal_dir = TempDir::new().unwrap();
2122 let config = EventStoreConfig::production(
2123 storage_dir.path(),
2124 wal_dir.path(),
2125 SnapshotConfig::default(),
2126 WALConfig::default(),
2127 CompactionConfig::default(),
2128 );
2129
2130 assert!(config.storage_dir.is_some());
2131 assert!(config.wal_dir.is_some());
2132 }
2133
2134 #[test]
2140 fn test_from_env_vars_data_dir_enables_full_persistence() {
2141 let (config, mode) =
2142 EventStoreConfig::from_env_vars(Some("/app/data".to_string()), None, None, None);
2143 assert_eq!(mode, "wal+parquet");
2144 assert_eq!(
2145 config.storage_dir.unwrap().to_str().unwrap(),
2146 "/app/data/storage"
2147 );
2148 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
2149 }
2150
2151 #[test]
2152 fn test_from_env_vars_explicit_dirs() {
2153 let (config, mode) = EventStoreConfig::from_env_vars(
2154 None,
2155 Some("/custom/storage".to_string()),
2156 Some("/custom/wal".to_string()),
2157 None,
2158 );
2159 assert_eq!(mode, "wal+parquet");
2160 assert_eq!(
2161 config.storage_dir.unwrap().to_str().unwrap(),
2162 "/custom/storage"
2163 );
2164 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
2165 }
2166
2167 #[test]
2168 fn test_from_env_vars_wal_disabled() {
2169 let (config, mode) = EventStoreConfig::from_env_vars(
2170 Some("/app/data".to_string()),
2171 None,
2172 None,
2173 Some("false".to_string()),
2174 );
2175 assert_eq!(mode, "parquet-only");
2176 assert!(config.storage_dir.is_some());
2177 assert!(config.wal_dir.is_none());
2178 }
2179
2180 #[test]
2181 fn test_from_env_vars_no_dirs_is_in_memory() {
2182 let (config, mode) = EventStoreConfig::from_env_vars(None, None, None, None);
2183 assert_eq!(mode, "in-memory");
2184 assert!(config.storage_dir.is_none());
2185 assert!(config.wal_dir.is_none());
2186 }
2187
2188 #[test]
2189 fn test_from_env_vars_empty_strings_treated_as_none() {
2190 let (_, mode) = EventStoreConfig::from_env_vars(
2191 Some(String::new()),
2192 Some(String::new()),
2193 Some(String::new()),
2194 None,
2195 );
2196 assert_eq!(mode, "in-memory");
2197 }
2198
2199 #[test]
2200 fn test_from_env_vars_explicit_overrides_data_dir() {
2201 let (config, mode) = EventStoreConfig::from_env_vars(
2202 Some("/app/data".to_string()),
2203 Some("/override/storage".to_string()),
2204 Some("/override/wal".to_string()),
2205 None,
2206 );
2207 assert_eq!(mode, "wal+parquet");
2208 assert_eq!(
2209 config.storage_dir.unwrap().to_str().unwrap(),
2210 "/override/storage"
2211 );
2212 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
2213 }
2214
2215 #[test]
2216 fn test_from_env_vars_wal_only() {
2217 let (config, mode) =
2218 EventStoreConfig::from_env_vars(None, None, Some("/wal/only".to_string()), None);
2219 assert_eq!(mode, "wal-only");
2220 assert!(config.storage_dir.is_none());
2221 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
2222 }
2223
2224 #[test]
2225 fn test_store_stats_serde() {
2226 let stats = StoreStats {
2227 total_events: 100,
2228 total_entities: 50,
2229 total_event_types: 10,
2230 total_ingested: 100,
2231 };
2232
2233 let json = serde_json::to_string(&stats).unwrap();
2234 assert!(json.contains("\"total_events\":100"));
2235 assert!(json.contains("\"total_entities\":50"));
2236 }
2237
2238 #[test]
2239 fn test_query_with_entity_and_type() {
2240 let store = EventStore::new();
2241
2242 store
2243 .ingest(&create_test_event("entity-1", "user.created"))
2244 .unwrap();
2245 store
2246 .ingest(&create_test_event("entity-1", "user.updated"))
2247 .unwrap();
2248 store
2249 .ingest(&create_test_event("entity-2", "user.created"))
2250 .unwrap();
2251
2252 let results = store
2253 .query(&QueryEventsRequest {
2254 entity_id: Some("entity-1".to_string()),
2255 event_type: Some("user.created".to_string()),
2256 tenant_id: None,
2257 as_of: None,
2258 since: None,
2259 until: None,
2260 limit: None,
2261 event_type_prefix: None,
2262 payload_filter: None,
2263 })
2264 .unwrap();
2265
2266 assert_eq!(results.len(), 1);
2267 assert_eq!(results[0].event_type_str(), "user.created");
2268 }
2269
2270 #[test]
2271 fn test_query_by_event_type_prefix() {
2272 let store = EventStore::new();
2273
2274 store
2276 .ingest(&create_test_event("entity-1", "index.created"))
2277 .unwrap();
2278 store
2279 .ingest(&create_test_event("entity-2", "index.updated"))
2280 .unwrap();
2281 store
2282 .ingest(&create_test_event("entity-3", "trade.created"))
2283 .unwrap();
2284 store
2285 .ingest(&create_test_event("entity-4", "trade.completed"))
2286 .unwrap();
2287 store
2288 .ingest(&create_test_event("entity-5", "balance.updated"))
2289 .unwrap();
2290
2291 let results = store
2293 .query(&QueryEventsRequest {
2294 entity_id: None,
2295 event_type: None,
2296 tenant_id: None,
2297 as_of: None,
2298 since: None,
2299 until: None,
2300 limit: None,
2301 event_type_prefix: Some("index.".to_string()),
2302 payload_filter: None,
2303 })
2304 .unwrap();
2305
2306 assert_eq!(results.len(), 2);
2307 assert!(
2308 results
2309 .iter()
2310 .all(|e| e.event_type_str().starts_with("index."))
2311 );
2312 }
2313
2314 #[test]
2315 fn test_query_by_event_type_prefix_empty_returns_all() {
2316 let store = EventStore::new();
2317
2318 store
2319 .ingest(&create_test_event("entity-1", "index.created"))
2320 .unwrap();
2321 store
2322 .ingest(&create_test_event("entity-2", "trade.created"))
2323 .unwrap();
2324
2325 let results = store
2327 .query(&QueryEventsRequest {
2328 entity_id: None,
2329 event_type: None,
2330 tenant_id: None,
2331 as_of: None,
2332 since: None,
2333 until: None,
2334 limit: None,
2335 event_type_prefix: Some(String::new()),
2336 payload_filter: None,
2337 })
2338 .unwrap();
2339
2340 assert_eq!(results.len(), 2);
2341 }
2342
2343 #[test]
2344 fn test_query_by_event_type_prefix_no_match() {
2345 let store = EventStore::new();
2346
2347 store
2348 .ingest(&create_test_event("entity-1", "index.created"))
2349 .unwrap();
2350
2351 let results = store
2352 .query(&QueryEventsRequest {
2353 entity_id: None,
2354 event_type: None,
2355 tenant_id: None,
2356 as_of: None,
2357 since: None,
2358 until: None,
2359 limit: None,
2360 event_type_prefix: Some("nonexistent.".to_string()),
2361 payload_filter: None,
2362 })
2363 .unwrap();
2364
2365 assert!(results.is_empty());
2366 }
2367
2368 #[test]
2369 fn test_query_by_entity_with_type_prefix() {
2370 let store = EventStore::new();
2371
2372 store
2373 .ingest(&create_test_event("entity-1", "index.created"))
2374 .unwrap();
2375 store
2376 .ingest(&create_test_event("entity-1", "trade.created"))
2377 .unwrap();
2378 store
2379 .ingest(&create_test_event("entity-2", "index.updated"))
2380 .unwrap();
2381
2382 let results = store
2384 .query(&QueryEventsRequest {
2385 entity_id: Some("entity-1".to_string()),
2386 event_type: None,
2387 tenant_id: None,
2388 as_of: None,
2389 since: None,
2390 until: None,
2391 limit: None,
2392 event_type_prefix: Some("index.".to_string()),
2393 payload_filter: None,
2394 })
2395 .unwrap();
2396
2397 assert_eq!(results.len(), 1);
2398 assert_eq!(results[0].event_type_str(), "index.created");
2399 }
2400
2401 #[test]
2402 fn test_query_prefix_with_limit() {
2403 let store = EventStore::new();
2404
2405 for i in 0..5 {
2406 store
2407 .ingest(&create_test_event(&format!("entity-{i}"), "index.created"))
2408 .unwrap();
2409 }
2410
2411 let results = store
2412 .query(&QueryEventsRequest {
2413 entity_id: None,
2414 event_type: None,
2415 tenant_id: None,
2416 as_of: None,
2417 since: None,
2418 until: None,
2419 limit: Some(3),
2420 event_type_prefix: Some("index.".to_string()),
2421 payload_filter: None,
2422 })
2423 .unwrap();
2424
2425 assert_eq!(results.len(), 3);
2426 }
2427
2428 #[test]
2429 fn test_query_prefix_alongside_existing_filters() {
2430 let store = EventStore::new();
2431
2432 store
2433 .ingest(&create_test_event("entity-1", "index.created"))
2434 .unwrap();
2435 std::thread::sleep(std::time::Duration::from_millis(10));
2437 store
2438 .ingest(&create_test_event("entity-2", "index.strategy.updated"))
2439 .unwrap();
2440 std::thread::sleep(std::time::Duration::from_millis(10));
2441 store
2442 .ingest(&create_test_event("entity-3", "index.deleted"))
2443 .unwrap();
2444
2445 let results = store
2447 .query(&QueryEventsRequest {
2448 entity_id: None,
2449 event_type: None,
2450 tenant_id: None,
2451 as_of: None,
2452 since: None,
2453 until: None,
2454 limit: Some(2),
2455 event_type_prefix: Some("index.".to_string()),
2456 payload_filter: None,
2457 })
2458 .unwrap();
2459
2460 assert_eq!(results.len(), 2);
2461 }
2462
2463 #[test]
2464 fn test_query_with_payload_filter() {
2465 let store = EventStore::new();
2466
2467 for i in 0..5 {
2469 store
2470 .ingest(&create_test_event_with_payload(
2471 &format!("entity-{i}"),
2472 "user.action",
2473 serde_json::json!({"user_id": "alice", "action": "click"}),
2474 ))
2475 .unwrap();
2476 }
2477 for i in 5..10 {
2479 store
2480 .ingest(&create_test_event_with_payload(
2481 &format!("entity-{i}"),
2482 "user.action",
2483 serde_json::json!({"user_id": "bob", "action": "view"}),
2484 ))
2485 .unwrap();
2486 }
2487
2488 let results = store
2490 .query(&QueryEventsRequest {
2491 entity_id: None,
2492 event_type: Some("user.action".to_string()),
2493 tenant_id: None,
2494 as_of: None,
2495 since: None,
2496 until: None,
2497 limit: None,
2498 event_type_prefix: None,
2499 payload_filter: Some(r#"{"user_id":"alice"}"#.to_string()),
2500 })
2501 .unwrap();
2502
2503 assert_eq!(results.len(), 5);
2504 }
2505
2506 #[test]
2507 fn test_query_payload_filter_non_existent_field() {
2508 let store = EventStore::new();
2509
2510 store
2511 .ingest(&create_test_event_with_payload(
2512 "entity-1",
2513 "user.action",
2514 serde_json::json!({"user_id": "alice"}),
2515 ))
2516 .unwrap();
2517
2518 let results = store
2520 .query(&QueryEventsRequest {
2521 entity_id: None,
2522 event_type: None,
2523 tenant_id: None,
2524 as_of: None,
2525 since: None,
2526 until: None,
2527 limit: None,
2528 event_type_prefix: None,
2529 payload_filter: Some(r#"{"nonexistent":"value"}"#.to_string()),
2530 })
2531 .unwrap();
2532
2533 assert!(results.is_empty());
2534 }
2535
2536 #[test]
2537 fn test_query_payload_filter_with_prefix() {
2538 let store = EventStore::new();
2539
2540 store
2541 .ingest(&create_test_event_with_payload(
2542 "entity-1",
2543 "index.created",
2544 serde_json::json!({"status": "active"}),
2545 ))
2546 .unwrap();
2547 store
2548 .ingest(&create_test_event_with_payload(
2549 "entity-2",
2550 "index.created",
2551 serde_json::json!({"status": "inactive"}),
2552 ))
2553 .unwrap();
2554 store
2555 .ingest(&create_test_event_with_payload(
2556 "entity-3",
2557 "trade.created",
2558 serde_json::json!({"status": "active"}),
2559 ))
2560 .unwrap();
2561
2562 let results = store
2564 .query(&QueryEventsRequest {
2565 entity_id: None,
2566 event_type: None,
2567 tenant_id: None,
2568 as_of: None,
2569 since: None,
2570 until: None,
2571 limit: None,
2572 event_type_prefix: Some("index.".to_string()),
2573 payload_filter: Some(r#"{"status":"active"}"#.to_string()),
2574 })
2575 .unwrap();
2576
2577 assert_eq!(results.len(), 1);
2578 assert_eq!(results[0].entity_id().to_string(), "entity-1");
2579 }
2580
2581 #[test]
2582 fn test_flush_storage_no_storage() {
2583 let store = EventStore::new();
2584 let result = store.flush_storage();
2586 assert!(result.is_ok());
2587 }
2588
2589 #[test]
2590 fn test_state_evolution() {
2591 let store = EventStore::new();
2592
2593 store
2595 .ingest(
2596 &Event::from_strings(
2597 "user.created".to_string(),
2598 "user-1".to_string(),
2599 "default".to_string(),
2600 serde_json::json!({"name": "Alice", "age": 25}),
2601 None,
2602 )
2603 .unwrap(),
2604 )
2605 .unwrap();
2606
2607 store
2609 .ingest(
2610 &Event::from_strings(
2611 "user.updated".to_string(),
2612 "user-1".to_string(),
2613 "default".to_string(),
2614 serde_json::json!({"age": 26}),
2615 None,
2616 )
2617 .unwrap(),
2618 )
2619 .unwrap();
2620
2621 let state = store.reconstruct_state("user-1", None).unwrap();
2622 assert_eq!(state["current_state"]["name"], "Alice");
2624 assert_eq!(state["current_state"]["age"], 26);
2625 }
2626
2627 #[test]
2628 fn test_reject_system_event_types() {
2629 let store = EventStore::new();
2630
2631 let event = Event::reconstruct_from_strings(
2633 uuid::Uuid::new_v4(),
2634 "_system.tenant.created".to_string(),
2635 "_system:tenant:acme".to_string(),
2636 "_system".to_string(),
2637 serde_json::json!({"name": "ACME"}),
2638 chrono::Utc::now(),
2639 None,
2640 1,
2641 );
2642
2643 let result = store.ingest(&event);
2644 assert!(result.is_err());
2645 let err = result.unwrap_err();
2646 assert!(
2647 err.to_string().contains("reserved for internal use"),
2648 "Expected system namespace rejection, got: {err}"
2649 );
2650 }
2651
2652 #[test]
2660 fn test_wal_recovery_checkpoints_to_parquet() {
2661 let data_dir = TempDir::new().unwrap();
2662 let storage_dir = data_dir.path().join("storage");
2663 let wal_dir = data_dir.path().join("wal");
2664
2665 {
2667 let config = EventStoreConfig::production(
2668 &storage_dir,
2669 &wal_dir,
2670 SnapshotConfig::default(),
2671 WALConfig {
2672 sync_on_write: true,
2673 ..WALConfig::default()
2674 },
2675 CompactionConfig::default(),
2676 );
2677 let store = EventStore::with_config(config);
2678
2679 for i in 0..5 {
2680 let event = Event::from_strings(
2681 "test.created".to_string(),
2682 format!("entity-{i}"),
2683 "default".to_string(),
2684 serde_json::json!({"index": i}),
2685 None,
2686 )
2687 .unwrap();
2688 store.ingest(&event).unwrap();
2689 }
2690
2691 assert_eq!(store.stats().total_events, 5);
2692
2693 }
2696
2697 let wal_files: Vec<_> = std::fs::read_dir(&wal_dir)
2699 .unwrap()
2700 .filter_map(std::result::Result::ok)
2701 .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
2702 .collect();
2703 assert!(!wal_files.is_empty(), "WAL file should exist");
2704 let wal_size = wal_files[0].metadata().unwrap().len();
2705 assert!(wal_size > 0, "WAL file should have data (got 0 bytes)");
2706
2707 {
2709 let config = EventStoreConfig::production(
2710 &storage_dir,
2711 &wal_dir,
2712 SnapshotConfig::default(),
2713 WALConfig {
2714 sync_on_write: true,
2715 ..WALConfig::default()
2716 },
2717 CompactionConfig::default(),
2718 );
2719 let store = EventStore::with_config(config);
2720
2721 assert_eq!(
2723 store.stats().total_events,
2724 5,
2725 "Session 2 should have all 5 events after WAL recovery"
2726 );
2727
2728 let parquet_files: Vec<_> = std::fs::read_dir(&storage_dir)
2730 .unwrap()
2731 .filter_map(std::result::Result::ok)
2732 .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2733 .collect();
2734 assert!(
2735 !parquet_files.is_empty(),
2736 "Parquet file should exist after WAL checkpoint"
2737 );
2738 }
2739
2740 {
2742 let config = EventStoreConfig::production(
2743 &storage_dir,
2744 &wal_dir,
2745 SnapshotConfig::default(),
2746 WALConfig {
2747 sync_on_write: true,
2748 ..WALConfig::default()
2749 },
2750 CompactionConfig::default(),
2751 );
2752 let store = EventStore::with_config(config);
2753
2754 assert_eq!(
2755 store.stats().total_events,
2756 5,
2757 "Session 3 should still have all 5 events from Parquet"
2758 );
2759 }
2760 }
2761
2762 #[test]
2763 fn test_parquet_restore_surfaces_errors_not_silent() {
2764 let data_dir = TempDir::new().unwrap();
2768 let storage_dir = data_dir.path().join("storage");
2769 let wal_dir = data_dir.path().join("wal");
2770
2771 {
2773 let config = EventStoreConfig::production(
2774 &storage_dir,
2775 &wal_dir,
2776 SnapshotConfig::default(),
2777 WALConfig {
2778 sync_on_write: true,
2779 ..WALConfig::default()
2780 },
2781 CompactionConfig::default(),
2782 );
2783 let store = EventStore::with_config(config);
2784
2785 for i in 0..3 {
2786 let event = Event::from_strings(
2787 "test.created".to_string(),
2788 format!("entity-{i}"),
2789 "default".to_string(),
2790 serde_json::json!({"i": i}),
2791 None,
2792 )
2793 .unwrap();
2794 store.ingest(&event).unwrap();
2795 }
2796
2797 store.flush_storage().unwrap();
2798 assert_eq!(store.stats().total_events, 3);
2799 }
2800
2801 let parquet_files: Vec<_> = std::fs::read_dir(&storage_dir)
2803 .unwrap()
2804 .filter_map(std::result::Result::ok)
2805 .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2806 .collect();
2807 assert!(!parquet_files.is_empty(), "Parquet file must exist");
2808
2809 std::fs::write(parquet_files[0].path(), b"corrupted data").unwrap();
2811
2812 for entry in std::fs::read_dir(&wal_dir).unwrap().flatten() {
2814 std::fs::write(entry.path(), b"").unwrap();
2815 }
2816
2817 {
2824 let config = EventStoreConfig::production(
2825 &storage_dir,
2826 &wal_dir,
2827 SnapshotConfig::default(),
2828 WALConfig::default(),
2829 CompactionConfig::default(),
2830 );
2831 let store = EventStore::with_config(config);
2832
2833 assert_eq!(store.stats().total_events, 0);
2836 }
2837 }
2838}