1#[cfg(feature = "server")]
2use crate::application::services::webhook::WebhookRegistry;
3#[cfg(feature = "server")]
4use crate::infrastructure::observability::metrics::MetricsRegistry;
5#[cfg(feature = "server")]
6use crate::infrastructure::web::websocket::WebSocketManager;
7use crate::{
8 application::{
9 dto::QueryEventsRequest,
10 services::{
11 consumer::ConsumerRegistry,
12 exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
13 pipeline::PipelineManager,
14 projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
15 replay::ReplayManager,
16 schema::{SchemaRegistry, SchemaRegistryConfig},
17 schema_evolution::SchemaEvolutionManager,
18 },
19 },
20 domain::entities::Event,
21 error::{AllSourceError, Result},
22 infrastructure::{
23 persistence::{
24 compaction::{CompactionConfig, CompactionManager},
25 index::{EventIndex, IndexEntry},
26 snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
27 storage::ParquetStorage,
28 wal::{WALConfig, WriteAheadLog},
29 },
30 query::geospatial::GeoIndex,
31 },
32};
33use chrono::{DateTime, Utc};
34use dashmap::DashMap;
35use parking_lot::RwLock;
36use std::{path::PathBuf, sync::Arc};
37#[cfg(feature = "server")]
38use tokio::sync::mpsc;
39
40pub struct EventStore {
42 events: Arc<RwLock<Vec<Event>>>,
44
45 index: Arc<EventIndex>,
47
48 pub(crate) projections: Arc<RwLock<ProjectionManager>>,
50
51 storage: Option<Arc<RwLock<ParquetStorage>>>,
53
54 #[cfg(feature = "server")]
56 websocket_manager: Arc<WebSocketManager>,
57
58 snapshot_manager: Arc<SnapshotManager>,
60
61 wal: Option<Arc<WriteAheadLog>>,
63
64 compaction_manager: Option<Arc<CompactionManager>>,
66
67 schema_registry: Arc<SchemaRegistry>,
69
70 replay_manager: Arc<ReplayManager>,
72
73 pipeline_manager: Arc<PipelineManager>,
75
76 #[cfg(feature = "server")]
78 metrics: Arc<MetricsRegistry>,
79
80 total_ingested: Arc<RwLock<u64>>,
82
83 projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
87
88 projection_status: Arc<DashMap<String, String>>,
91
92 #[cfg(feature = "server")]
94 webhook_registry: Arc<WebhookRegistry>,
95
96 #[cfg(feature = "server")]
98 webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
99
100 geo_index: Arc<GeoIndex>,
102
103 exactly_once: Arc<ExactlyOnceRegistry>,
105
106 schema_evolution: Arc<SchemaEvolutionManager>,
108
109 entity_versions: Arc<DashMap<String, u64>>,
112
113 consumer_registry: Arc<ConsumerRegistry>,
115
116 event_broadcast_tx: tokio::sync::broadcast::Sender<Arc<Event>>,
120}
121
122#[cfg(feature = "server")]
124#[derive(Debug, Clone)]
125pub struct WebhookDeliveryTask {
126 pub webhook: crate::application::services::webhook::WebhookSubscription,
127 pub event: Event,
128}
129
130impl EventStore {
131 pub fn new() -> Self {
133 Self::with_config(EventStoreConfig::default())
134 }
135
136 pub fn with_config(config: EventStoreConfig) -> Self {
138 let mut projections = ProjectionManager::new();
139
140 projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
142 projections.register(Arc::new(EventCounterProjection::new("event_counters")));
143
144 let storage = config
146 .storage_dir
147 .as_ref()
148 .and_then(|dir| match ParquetStorage::new(dir) {
149 Ok(storage) => {
150 tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
151 Some(Arc::new(RwLock::new(storage)))
152 }
153 Err(e) => {
154 tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
155 None
156 }
157 });
158
159 let wal = config.wal_dir.as_ref().and_then(|dir| {
161 match WriteAheadLog::new(dir, config.wal_config.clone()) {
162 Ok(wal) => {
163 tracing::info!("✅ WAL enabled at: {}", dir.display());
164 Some(Arc::new(wal))
165 }
166 Err(e) => {
167 tracing::error!("❌ Failed to initialize WAL: {}", e);
168 None
169 }
170 }
171 });
172
173 let compaction_manager = config.storage_dir.as_ref().map(|dir| {
175 let manager = CompactionManager::new(dir, config.compaction_config.clone());
176 Arc::new(manager)
177 });
178
179 let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
181 tracing::info!("✅ Schema registry enabled");
182
183 let replay_manager = Arc::new(ReplayManager::new());
185 tracing::info!("✅ Replay manager enabled");
186
187 let pipeline_manager = Arc::new(PipelineManager::new());
189 tracing::info!("✅ Pipeline manager enabled");
190
191 #[cfg(feature = "server")]
193 let metrics = {
194 let m = MetricsRegistry::new();
195 tracing::info!("✅ Prometheus metrics registry initialized");
196 m
197 };
198
199 let projection_state_cache = Arc::new(DashMap::new());
201 tracing::info!("✅ Projection state cache initialized");
202
203 #[cfg(feature = "server")]
205 let webhook_registry = {
206 let w = Arc::new(WebhookRegistry::new());
207 tracing::info!("✅ Webhook registry initialized");
208 w
209 };
210
211 let (event_broadcast_tx, _) = tokio::sync::broadcast::channel(1024);
214
215 let store = Self {
216 events: Arc::new(RwLock::new(Vec::new())),
217 index: Arc::new(EventIndex::new()),
218 projections: Arc::new(RwLock::new(projections)),
219 storage,
220 #[cfg(feature = "server")]
221 websocket_manager: Arc::new(WebSocketManager::new()),
222 snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
223 wal,
224 compaction_manager,
225 schema_registry,
226 replay_manager,
227 pipeline_manager,
228 #[cfg(feature = "server")]
229 metrics,
230 total_ingested: Arc::new(RwLock::new(0)),
231 projection_state_cache,
232 projection_status: Arc::new(DashMap::new()),
233 #[cfg(feature = "server")]
234 webhook_registry,
235 #[cfg(feature = "server")]
236 webhook_tx: Arc::new(RwLock::new(None)),
237 geo_index: Arc::new(GeoIndex::new()),
238 exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
239 schema_evolution: Arc::new(SchemaEvolutionManager::new()),
240 entity_versions: Arc::new(DashMap::new()),
241 consumer_registry: Arc::new(ConsumerRegistry::new()),
242 event_broadcast_tx,
243 };
244
245 if let Some(ref storage) = store.storage {
247 match storage.read().load_all_events() {
248 Err(e) => {
249 tracing::error!(
250 "❌ Failed to load events from Parquet — data exists on disk but \
251 could not be read. This means events are NOT available until the \
252 issue is resolved. Error: {e}"
253 );
254 }
255 Ok(persisted_events) if persisted_events.is_empty() => {
256 tracing::info!("📂 No persisted events found in Parquet storage");
257 }
258 Ok(persisted_events) => {
259 tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
260
261 for event in persisted_events {
262 let offset = store.events.read().len();
263 if let Err(e) = store.index.index_event(
264 event.id,
265 event.entity_id_str(),
266 event.event_type_str(),
267 event.timestamp,
268 offset,
269 ) {
270 tracing::error!("Failed to re-index event {}: {}", event.id, e);
271 }
272
273 if let Err(e) = store.projections.read().process_event(&event) {
274 tracing::error!("Failed to re-process event {}: {}", event.id, e);
275 }
276
277 *store
279 .entity_versions
280 .entry(event.entity_id_str().to_string())
281 .or_insert(0) += 1;
282
283 store.events.write().push(event);
284 }
285
286 let total = store.events.read().len();
287 *store.total_ingested.write() = total as u64;
288 tracing::info!("✅ Successfully loaded {} events from storage", total);
289 }
290 }
291 }
292
293 if let Some(ref wal) = store.wal {
295 match wal.recover() {
296 Ok(recovered_events) if !recovered_events.is_empty() => {
297 let existing_ids: std::collections::HashSet<uuid::Uuid> =
299 store.events.read().iter().map(|e| e.id).collect();
300
301 let mut wal_new = 0usize;
302 for event in recovered_events {
303 if existing_ids.contains(&event.id) {
304 continue; }
306
307 let offset = store.events.read().len();
308 if let Err(e) = store.index.index_event(
309 event.id,
310 event.entity_id_str(),
311 event.event_type_str(),
312 event.timestamp,
313 offset,
314 ) {
315 tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
316 }
317
318 if let Err(e) = store.projections.read().process_event(&event) {
319 tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
320 }
321
322 *store
324 .entity_versions
325 .entry(event.entity_id_str().to_string())
326 .or_insert(0) += 1;
327
328 store.events.write().push(event);
329 wal_new += 1;
330 }
331
332 if wal_new > 0 {
333 let total = store.events.read().len();
334 *store.total_ingested.write() = total as u64;
335 tracing::info!(
336 "✅ Recovered {} new events from WAL ({} total)",
337 wal_new,
338 total
339 );
340
341 if let Some(ref storage) = store.storage {
347 tracing::info!(
348 "📸 Checkpointing {} WAL events to Parquet storage...",
349 wal_new
350 );
351 let parquet = storage.read();
352 let events = store.events.read();
353 let mut buffered = 0usize;
354 for event in events.iter().skip(events.len() - wal_new) {
355 if let Err(e) = parquet.append_event(event.clone()) {
356 tracing::error!(
357 "Failed to buffer WAL event for Parquet: {}",
358 e
359 );
360 } else {
361 buffered += 1;
362 }
363 }
364 drop(events);
365 drop(parquet);
366
367 if buffered > 0 {
368 if let Err(e) = store.flush_storage() {
369 tracing::error!("Failed to checkpoint to Parquet: {}", e);
370 } else if let Err(e) = wal.truncate() {
371 tracing::error!(
372 "Failed to truncate WAL after checkpoint: {}",
373 e
374 );
375 } else {
376 tracing::info!(
377 "✅ WAL checkpointed and truncated ({} events)",
378 buffered
379 );
380 }
381 }
382 }
383 }
384 }
385 Ok(_) => {
386 tracing::debug!("No events to recover from WAL");
387 }
388 Err(e) => {
389 tracing::error!("❌ WAL recovery failed: {}", e);
390 }
391 }
392 }
393
394 store
395 }
396
397 #[cfg_attr(feature = "hotpath", hotpath::measure)]
405 pub fn ingest_with_expected_version(
406 &self,
407 event: &Event,
408 expected_version: Option<u64>,
409 ) -> Result<u64> {
410 self.validate_event(event)?;
412
413 let entity_id = event.entity_id_str().to_string();
414
415 let new_version = {
418 let mut version_entry = self.entity_versions.entry(entity_id.clone()).or_insert(0);
419 let current = *version_entry;
420
421 if let Some(expected) = expected_version
422 && current != expected
423 {
424 return Err(crate::error::AllSourceError::VersionConflict { expected, current });
425 }
426
427 if let Some(ref wal) = self.wal {
429 wal.append(event.clone())?;
430 }
431
432 *version_entry += 1;
433 *version_entry
434 };
435
436 self.ingest_post_wal(event)?;
439
440 Ok(new_version)
441 }
442
443 #[cfg_attr(feature = "hotpath", hotpath::measure)]
446 fn ingest_post_wal(&self, event: &Event) -> Result<()> {
447 #[cfg(feature = "server")]
448 let timer = self.metrics.ingestion_duration_seconds.start_timer();
449
450 let mut events = self.events.write();
451 let offset = events.len();
452
453 self.index.index_event(
455 event.id,
456 event.entity_id_str(),
457 event.event_type_str(),
458 event.timestamp,
459 offset,
460 )?;
461
462 let projections = self.projections.read();
464 projections.process_event(event)?;
465 drop(projections);
466
467 let pipeline_results = self.pipeline_manager.process_event(event);
469 if !pipeline_results.is_empty() {
470 tracing::debug!(
471 "Event {} processed by {} pipeline(s)",
472 event.id,
473 pipeline_results.len()
474 );
475 for (pipeline_id, result) in pipeline_results {
476 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
477 }
478 }
479
480 if let Some(ref storage) = self.storage {
482 let storage = storage.read();
483 storage.append_event(event.clone())?;
484 }
485
486 events.push(event.clone());
488 let total_events = events.len();
489 drop(events);
490
491 let event_arc = Arc::new(event.clone());
493 let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
494 #[cfg(feature = "server")]
495 self.websocket_manager.broadcast_event(event_arc);
496
497 #[cfg(feature = "server")]
499 self.dispatch_webhooks(event);
500
501 self.geo_index.index_event(event);
503
504 self.schema_evolution
506 .analyze_event(event.event_type_str(), &event.payload);
507
508 self.check_auto_snapshot(event.entity_id_str(), event);
510
511 #[cfg(feature = "server")]
513 {
514 self.metrics.events_ingested_total.inc();
515 self.metrics
516 .events_ingested_by_type
517 .with_label_values(&[event.event_type_str()])
518 .inc();
519 self.metrics.storage_events_total.set(total_events as i64);
520 }
521
522 let mut total = self.total_ingested.write();
524 *total += 1;
525
526 #[cfg(feature = "server")]
527 timer.observe_duration();
528
529 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
530
531 Ok(())
532 }
533
534 #[cfg_attr(feature = "hotpath", hotpath::measure)]
536 pub fn ingest(&self, event: &Event) -> Result<()> {
537 #[cfg(feature = "server")]
539 let timer = self.metrics.ingestion_duration_seconds.start_timer();
540
541 let validation_result = self.validate_event(event);
543 if let Err(e) = validation_result {
544 #[cfg(feature = "server")]
545 {
546 self.metrics.ingestion_errors_total.inc();
547 timer.observe_duration();
548 }
549 return Err(e);
550 }
551
552 if let Some(ref wal) = self.wal
555 && let Err(e) = wal.append(event.clone())
556 {
557 #[cfg(feature = "server")]
558 {
559 self.metrics.ingestion_errors_total.inc();
560 timer.observe_duration();
561 }
562 return Err(e);
563 }
564
565 *self
567 .entity_versions
568 .entry(event.entity_id_str().to_string())
569 .or_insert(0) += 1;
570
571 let mut events = self.events.write();
572 let offset = events.len();
573
574 self.index.index_event(
576 event.id,
577 event.entity_id_str(),
578 event.event_type_str(),
579 event.timestamp,
580 offset,
581 )?;
582
583 let projections = self.projections.read();
585 projections.process_event(event)?;
586 drop(projections); let pipeline_results = self.pipeline_manager.process_event(event);
591 if !pipeline_results.is_empty() {
592 tracing::debug!(
593 "Event {} processed by {} pipeline(s)",
594 event.id,
595 pipeline_results.len()
596 );
597 for (pipeline_id, result) in pipeline_results {
600 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
601 }
602 }
603
604 if let Some(ref storage) = self.storage {
606 let storage = storage.read();
607 storage.append_event(event.clone())?;
608 }
609
610 events.push(event.clone());
612 let total_events = events.len();
613 drop(events); let event_arc = Arc::new(event.clone());
617 let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
618 #[cfg(feature = "server")]
619 self.websocket_manager.broadcast_event(event_arc);
620
621 #[cfg(feature = "server")]
623 self.dispatch_webhooks(event);
624
625 self.geo_index.index_event(event);
627
628 self.schema_evolution
630 .analyze_event(event.event_type_str(), &event.payload);
631
632 self.check_auto_snapshot(event.entity_id_str(), event);
634
635 #[cfg(feature = "server")]
637 {
638 self.metrics.events_ingested_total.inc();
639 self.metrics
640 .events_ingested_by_type
641 .with_label_values(&[event.event_type_str()])
642 .inc();
643 self.metrics.storage_events_total.set(total_events as i64);
644 }
645
646 let mut total = self.total_ingested.write();
648 *total += 1;
649
650 #[cfg(feature = "server")]
651 timer.observe_duration();
652
653 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
654
655 Ok(())
656 }
657
658 #[cfg_attr(feature = "hotpath", hotpath::measure)]
665 pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()> {
666 if batch.is_empty() {
667 return Ok(());
668 }
669
670 for event in &batch {
672 self.validate_event(event)?;
673 }
674
675 if let Some(ref wal) = self.wal {
677 for event in &batch {
678 wal.append(event.clone())?;
679 }
680 }
681
682 let mut events = self.events.write();
684 let projections = self.projections.read();
685
686 for event in batch {
687 let offset = events.len();
688
689 self.index.index_event(
690 event.id,
691 event.entity_id_str(),
692 event.event_type_str(),
693 event.timestamp,
694 offset,
695 )?;
696
697 projections.process_event(&event)?;
698 self.pipeline_manager.process_event(&event);
699
700 if let Some(ref storage) = self.storage {
701 let storage = storage.read();
702 storage.append_event(event.clone())?;
703 }
704
705 self.geo_index.index_event(&event);
706 self.schema_evolution
707 .analyze_event(event.event_type_str(), &event.payload);
708
709 *self
711 .entity_versions
712 .entry(event.entity_id_str().to_string())
713 .or_insert(0) += 1;
714
715 let _ = self.event_broadcast_tx.send(Arc::new(event.clone()));
717
718 events.push(event);
719 }
720
721 let total_events = events.len();
722 drop(projections);
723 drop(events);
724
725 let mut total = self.total_ingested.write();
726 *total += total_events as u64;
727
728 Ok(())
729 }
730
731 #[cfg_attr(feature = "hotpath", hotpath::measure)]
738 pub fn ingest_replicated(&self, event: &Event) -> Result<()> {
739 #[cfg(feature = "server")]
740 let timer = self.metrics.ingestion_duration_seconds.start_timer();
741
742 let mut events = self.events.write();
743 let offset = events.len();
744
745 self.index.index_event(
747 event.id,
748 event.entity_id_str(),
749 event.event_type_str(),
750 event.timestamp,
751 offset,
752 )?;
753
754 let projections = self.projections.read();
756 projections.process_event(event)?;
757 drop(projections);
758
759 let pipeline_results = self.pipeline_manager.process_event(event);
761 if !pipeline_results.is_empty() {
762 tracing::debug!(
763 "Replicated event {} processed by {} pipeline(s)",
764 event.id,
765 pipeline_results.len()
766 );
767 }
768
769 *self
771 .entity_versions
772 .entry(event.entity_id_str().to_string())
773 .or_insert(0) += 1;
774
775 events.push(event.clone());
777 let total_events = events.len();
778 drop(events);
779
780 let event_arc = Arc::new(event.clone());
782 let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
783 #[cfg(feature = "server")]
784 self.websocket_manager.broadcast_event(event_arc);
785
786 #[cfg(feature = "server")]
788 {
789 self.metrics.events_ingested_total.inc();
790 self.metrics
791 .events_ingested_by_type
792 .with_label_values(&[event.event_type_str()])
793 .inc();
794 self.metrics.storage_events_total.set(total_events as i64);
795 }
796
797 let mut total = self.total_ingested.write();
798 *total += 1;
799
800 #[cfg(feature = "server")]
801 timer.observe_duration();
802
803 tracing::debug!(
804 "Replicated event ingested: {} (offset: {})",
805 event.id,
806 offset
807 );
808
809 Ok(())
810 }
811
812 #[cfg_attr(feature = "hotpath", hotpath::measure)]
815 pub fn get_entity_version(&self, entity_id: &str) -> u64 {
816 self.entity_versions.get(entity_id).map_or(0, |v| *v)
817 }
818
819 pub fn consumer_registry(&self) -> &ConsumerRegistry {
821 &self.consumer_registry
822 }
823
824 pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<Arc<Event>> {
830 self.event_broadcast_tx.subscribe()
831 }
832
833 pub fn set_consumer_registry(&mut self, registry: Arc<ConsumerRegistry>) {
838 self.consumer_registry = registry;
839 }
840
841 pub fn total_events(&self) -> usize {
843 self.events.read().len()
844 }
845
846 pub fn events_after_offset(
849 &self,
850 offset: u64,
851 filters: &[String],
852 limit: usize,
853 ) -> Vec<(u64, Event)> {
854 let events = self.events.read();
855 let start = offset as usize;
856 if start >= events.len() {
857 return vec![];
858 }
859
860 events[start..]
861 .iter()
862 .enumerate()
863 .filter(|(_, event)| ConsumerRegistry::matches_filters(event.event_type_str(), filters))
864 .take(limit)
865 .map(|(i, event)| ((start + i + 1) as u64, event.clone()))
866 .collect()
867 }
868
869 #[cfg(feature = "server")]
871 pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
872 Arc::clone(&self.websocket_manager)
873 }
874
875 pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
877 Arc::clone(&self.snapshot_manager)
878 }
879
880 pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
882 self.compaction_manager.as_ref().map(Arc::clone)
883 }
884
885 pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
887 Arc::clone(&self.schema_registry)
888 }
889
890 pub fn replay_manager(&self) -> Arc<ReplayManager> {
892 Arc::clone(&self.replay_manager)
893 }
894
895 pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
897 Arc::clone(&self.pipeline_manager)
898 }
899
900 #[cfg(feature = "server")]
902 pub fn metrics(&self) -> Arc<MetricsRegistry> {
903 Arc::clone(&self.metrics)
904 }
905
906 pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
908 self.projections.read()
909 }
910
911 pub fn register_projection(
920 &self,
921 projection: Arc<dyn crate::application::services::projection::Projection>,
922 ) {
923 let mut pm = self.projections.write();
924 pm.register(projection);
925 }
926
927 pub fn register_projection_with_backfill(
933 &self,
934 projection: &Arc<dyn crate::application::services::projection::Projection>,
935 ) -> Result<()> {
936 {
938 let mut pm = self.projections.write();
939 pm.register(Arc::clone(projection));
940 }
941
942 let events = self.events.read();
944 for event in events.iter() {
945 projection.process(event)?;
946 }
947
948 Ok(())
949 }
950
951 pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
954 Arc::clone(&self.projection_state_cache)
955 }
956
957 pub fn projection_status(&self) -> Arc<DashMap<String, String>> {
959 Arc::clone(&self.projection_status)
960 }
961
962 pub fn geo_index(&self) -> Arc<GeoIndex> {
965 self.geo_index.clone()
966 }
967
968 pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
970 self.exactly_once.clone()
971 }
972
973 pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
975 self.schema_evolution.clone()
976 }
977
978 pub fn snapshot_events(&self) -> Vec<Event> {
984 self.events.read().clone()
985 }
986
987 pub fn compact_entity_tokens(
1009 &self,
1010 entity_id: &str,
1011 token_event_type: &str,
1012 merged_event: Event,
1013 ) -> Result<bool> {
1014 {
1016 let events = self.events.read();
1017 let has_tokens = events
1018 .iter()
1019 .any(|e| e.entity_id_str() == entity_id && e.event_type_str() == token_event_type);
1020 if !has_tokens {
1021 return Ok(false);
1022 }
1023 }
1024
1025 let projections = self.projections.read();
1027 projections.process_event(&merged_event)?;
1028 drop(projections);
1029
1030 let mut events = self.events.write();
1032
1033 events.retain(|e| {
1034 !(e.entity_id_str() == entity_id && e.event_type_str() == token_event_type)
1035 });
1036
1037 events.push(merged_event.clone());
1038
1039 if let Some(ref wal) = self.wal {
1043 wal.append(merged_event)?;
1044 }
1045
1046 self.index.clear();
1051 for (offset, event) in events.iter().enumerate() {
1052 if let Err(e) = self.index.index_event(
1053 event.id,
1054 event.entity_id_str(),
1055 event.event_type_str(),
1056 event.timestamp,
1057 offset,
1058 ) {
1059 tracing::warn!(
1060 event_id = %event.id,
1061 offset,
1062 "Failed to re-index event during compaction: {e}"
1063 );
1064 }
1065 }
1066
1067 Ok(true)
1068 }
1069
1070 #[cfg(feature = "server")]
1071 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
1072 Arc::clone(&self.webhook_registry)
1073 }
1074
1075 #[cfg(feature = "server")]
1078 pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
1079 *self.webhook_tx.write() = Some(tx);
1080 tracing::info!("Webhook delivery channel connected");
1081 }
1082
1083 #[cfg(feature = "server")]
1085 fn dispatch_webhooks(&self, event: &Event) {
1086 let matching = self.webhook_registry.find_matching(event);
1087 if matching.is_empty() {
1088 return;
1089 }
1090
1091 let tx_guard = self.webhook_tx.read();
1092 if let Some(ref tx) = *tx_guard {
1093 for webhook in matching {
1094 let task = WebhookDeliveryTask {
1095 webhook,
1096 event: event.clone(),
1097 };
1098 if let Err(e) = tx.send(task) {
1099 tracing::warn!("Failed to queue webhook delivery: {}", e);
1100 }
1101 }
1102 }
1103 }
1104
1105 pub fn flush_storage(&self) -> Result<()> {
1107 if let Some(ref storage) = self.storage {
1108 let storage = storage.read();
1109 storage.flush()?;
1110 tracing::info!("✅ Flushed events to persistent storage");
1111 }
1112 Ok(())
1113 }
1114
1115 pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
1117 let events = self.query(&QueryEventsRequest {
1119 entity_id: Some(entity_id.to_string()),
1120 event_type: None,
1121 tenant_id: None,
1122 as_of: None,
1123 since: None,
1124 until: None,
1125 limit: None,
1126 event_type_prefix: None,
1127 payload_filter: None,
1128 })?;
1129
1130 if events.is_empty() {
1131 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1132 }
1133
1134 let mut state = serde_json::json!({});
1136 for event in &events {
1137 if let serde_json::Value::Object(ref mut state_map) = state
1138 && let serde_json::Value::Object(ref payload_map) = event.payload
1139 {
1140 for (key, value) in payload_map {
1141 state_map.insert(key.clone(), value.clone());
1142 }
1143 }
1144 }
1145
1146 let last_event = events.last().unwrap();
1147 self.snapshot_manager.create_snapshot(
1148 entity_id,
1149 state,
1150 last_event.timestamp,
1151 events.len(),
1152 SnapshotType::Manual,
1153 )?;
1154
1155 Ok(())
1156 }
1157
1158 fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
1160 let entity_event_count = self
1162 .index
1163 .get_by_entity(entity_id)
1164 .map_or(0, |entries| entries.len());
1165
1166 if self.snapshot_manager.should_create_snapshot(
1167 entity_id,
1168 entity_event_count,
1169 event.timestamp,
1170 ) {
1171 if let Err(e) = self.create_snapshot(entity_id) {
1173 tracing::warn!(
1174 "Failed to create automatic snapshot for {}: {}",
1175 entity_id,
1176 e
1177 );
1178 }
1179 }
1180 }
1181
1182 fn validate_event(&self, event: &Event) -> Result<()> {
1184 if event.entity_id_str().is_empty() {
1187 return Err(AllSourceError::ValidationError(
1188 "entity_id cannot be empty".to_string(),
1189 ));
1190 }
1191
1192 if event.event_type_str().is_empty() {
1193 return Err(AllSourceError::ValidationError(
1194 "event_type cannot be empty".to_string(),
1195 ));
1196 }
1197
1198 if event.event_type().is_system() {
1201 return Err(AllSourceError::ValidationError(
1202 "Event types starting with '_system.' are reserved for internal use".to_string(),
1203 ));
1204 }
1205
1206 Ok(())
1207 }
1208
1209 pub fn reset_projection(&self, name: &str) -> Result<usize> {
1211 let projection_manager = self.projections.read();
1212 let projection = projection_manager.get_projection(name).ok_or_else(|| {
1213 AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1214 })?;
1215
1216 projection.clear();
1218
1219 let prefix = format!("{name}:");
1221 let keys_to_remove: Vec<String> = self
1222 .projection_state_cache
1223 .iter()
1224 .filter(|entry| entry.key().starts_with(&prefix))
1225 .map(|entry| entry.key().clone())
1226 .collect();
1227 for key in keys_to_remove {
1228 self.projection_state_cache.remove(&key);
1229 }
1230
1231 let events = self.events.read();
1233 let mut reprocessed = 0usize;
1234 for event in events.iter() {
1235 if projection.process(event).is_ok() {
1236 reprocessed += 1;
1237 }
1238 }
1239
1240 Ok(reprocessed)
1241 }
1242
1243 pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
1245 if let Some(offset) = self.index.get_by_id(event_id) {
1246 let events = self.events.read();
1247 Ok(events.get(offset).cloned())
1248 } else {
1249 Ok(None)
1250 }
1251 }
1252
1253 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1255 pub fn query(&self, request: &QueryEventsRequest) -> Result<Vec<Event>> {
1256 let query_type = if request.entity_id.is_some() {
1258 "entity"
1259 } else if request.event_type.is_some() {
1260 "type"
1261 } else if request.event_type_prefix.is_some() {
1262 "type_prefix"
1263 } else {
1264 "full_scan"
1265 };
1266
1267 #[cfg(feature = "server")]
1269 let timer = self
1270 .metrics
1271 .query_duration_seconds
1272 .with_label_values(&[query_type])
1273 .start_timer();
1274
1275 #[cfg(feature = "server")]
1277 self.metrics
1278 .queries_total
1279 .with_label_values(&[query_type])
1280 .inc();
1281
1282 let events = self.events.read();
1283
1284 let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
1286 self.index
1288 .get_by_entity(entity_id)
1289 .map(|entries| self.filter_entries(entries, request))
1290 .unwrap_or_default()
1291 } else if let Some(event_type) = &request.event_type {
1292 self.index
1294 .get_by_type(event_type)
1295 .map(|entries| self.filter_entries(entries, request))
1296 .unwrap_or_default()
1297 } else if let Some(prefix) = &request.event_type_prefix {
1298 let entries = self.index.get_by_type_prefix(prefix);
1300 self.filter_entries(entries, request)
1301 } else {
1302 (0..events.len()).collect()
1304 };
1305
1306 let mut results: Vec<Event> = offsets
1308 .iter()
1309 .filter_map(|&offset| events.get(offset).cloned())
1310 .filter(|event| self.apply_filters(event, request))
1311 .collect();
1312
1313 results.sort_by_key(|x| x.timestamp);
1315
1316 if let Some(limit) = request.limit {
1318 results.truncate(limit);
1319 }
1320
1321 #[cfg(feature = "server")]
1323 {
1324 self.metrics
1325 .query_results_total
1326 .with_label_values(&[query_type])
1327 .inc_by(results.len() as u64);
1328 timer.observe_duration();
1329 }
1330
1331 Ok(results)
1332 }
1333
1334 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1336 fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
1337 entries
1338 .into_iter()
1339 .filter(|entry| {
1340 if let Some(as_of) = request.as_of
1342 && entry.timestamp > as_of
1343 {
1344 return false;
1345 }
1346 if let Some(since) = request.since
1347 && entry.timestamp < since
1348 {
1349 return false;
1350 }
1351 if let Some(until) = request.until
1352 && entry.timestamp > until
1353 {
1354 return false;
1355 }
1356 true
1357 })
1358 .map(|entry| entry.offset)
1359 .collect()
1360 }
1361
1362 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1364 fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
1365 if let Some(ref tid) = request.tenant_id
1367 && event.tenant_id_str() != tid
1368 {
1369 return false;
1370 }
1371
1372 if request.entity_id.is_some()
1374 && let Some(ref event_type) = request.event_type
1375 && event.event_type_str() != event_type
1376 {
1377 return false;
1378 }
1379
1380 if request.entity_id.is_some()
1382 && let Some(ref prefix) = request.event_type_prefix
1383 && !event.event_type_str().starts_with(prefix)
1384 {
1385 return false;
1386 }
1387
1388 if let Some(ref filter_str) = request.payload_filter
1390 && let Ok(filter_obj) =
1391 serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(filter_str)
1392 {
1393 let payload = event.payload();
1394 for (key, expected_value) in &filter_obj {
1395 match payload.get(key) {
1396 Some(actual_value) if actual_value == expected_value => {}
1397 _ => return false,
1398 }
1399 }
1400 }
1401
1402 true
1403 }
1404
1405 #[cfg_attr(feature = "hotpath", hotpath::measure)]
1408 pub fn reconstruct_state(
1409 &self,
1410 entity_id: &str,
1411 as_of: Option<DateTime<Utc>>,
1412 ) -> Result<serde_json::Value> {
1413 let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
1415 if let Some(snapshot) = self
1417 .snapshot_manager
1418 .get_snapshot_as_of(entity_id, as_of_time)
1419 {
1420 tracing::debug!(
1421 "Using snapshot from {} for entity {} (saved {} events)",
1422 snapshot.as_of,
1423 entity_id,
1424 snapshot.event_count
1425 );
1426 (snapshot.state.clone(), Some(snapshot.as_of))
1427 } else {
1428 (serde_json::json!({}), None)
1429 }
1430 } else {
1431 if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
1433 tracing::debug!(
1434 "Using latest snapshot from {} for entity {}",
1435 snapshot.as_of,
1436 entity_id
1437 );
1438 (snapshot.state.clone(), Some(snapshot.as_of))
1439 } else {
1440 (serde_json::json!({}), None)
1441 }
1442 };
1443
1444 let events = self.query(&QueryEventsRequest {
1446 entity_id: Some(entity_id.to_string()),
1447 event_type: None,
1448 tenant_id: None,
1449 as_of,
1450 since: since_timestamp,
1451 until: None,
1452 limit: None,
1453 event_type_prefix: None,
1454 payload_filter: None,
1455 })?;
1456
1457 if events.is_empty() && since_timestamp.is_none() {
1459 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1460 }
1461
1462 let mut merged_state = merged_state;
1464 for event in &events {
1465 if let serde_json::Value::Object(ref mut state_map) = merged_state
1466 && let serde_json::Value::Object(ref payload_map) = event.payload
1467 {
1468 for (key, value) in payload_map {
1469 state_map.insert(key.clone(), value.clone());
1470 }
1471 }
1472 }
1473
1474 let state = serde_json::json!({
1476 "entity_id": entity_id,
1477 "last_updated": events.last().map(|e| e.timestamp),
1478 "event_count": events.len(),
1479 "as_of": as_of,
1480 "current_state": merged_state,
1481 "history": events.iter().map(|e| {
1482 serde_json::json!({
1483 "event_id": e.id,
1484 "type": e.event_type,
1485 "timestamp": e.timestamp,
1486 "payload": e.payload
1487 })
1488 }).collect::<Vec<_>>()
1489 });
1490
1491 Ok(state)
1492 }
1493
1494 pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
1496 let projections = self.projections.read();
1497
1498 if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
1499 && let Some(state) = snapshot_projection.get_state(entity_id)
1500 {
1501 return Ok(serde_json::json!({
1502 "entity_id": entity_id,
1503 "snapshot": state,
1504 "from_projection": "entity_snapshots"
1505 }));
1506 }
1507
1508 Err(AllSourceError::EntityNotFound(entity_id.to_string()))
1509 }
1510
1511 pub fn stats(&self) -> StoreStats {
1513 let events = self.events.read();
1514 let index_stats = self.index.stats();
1515
1516 StoreStats {
1517 total_events: events.len(),
1518 total_entities: index_stats.total_entities,
1519 total_event_types: index_stats.total_event_types,
1520 total_ingested: *self.total_ingested.read(),
1521 }
1522 }
1523
1524 pub fn list_streams(&self) -> Vec<StreamInfo> {
1526 self.index
1527 .get_all_entities()
1528 .into_iter()
1529 .map(|entity_id| {
1530 let event_count = self
1531 .index
1532 .get_by_entity(&entity_id)
1533 .map_or(0, |entries| entries.len());
1534 let last_event_at = self
1535 .index
1536 .get_by_entity(&entity_id)
1537 .and_then(|entries| entries.last().map(|e| e.timestamp));
1538 StreamInfo {
1539 stream_id: entity_id,
1540 event_count,
1541 last_event_at,
1542 }
1543 })
1544 .collect()
1545 }
1546
1547 pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
1549 self.index
1550 .get_all_types()
1551 .into_iter()
1552 .map(|event_type| {
1553 let event_count = self
1554 .index
1555 .get_by_type(&event_type)
1556 .map_or(0, |entries| entries.len());
1557 let last_event_at = self
1558 .index
1559 .get_by_type(&event_type)
1560 .and_then(|entries| entries.last().map(|e| e.timestamp));
1561 EventTypeInfo {
1562 event_type,
1563 event_count,
1564 last_event_at,
1565 }
1566 })
1567 .collect()
1568 }
1569
1570 pub fn enable_wal_replication(
1577 &self,
1578 tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
1579 ) {
1580 if let Some(ref wal_arc) = self.wal {
1581 wal_arc.set_replication_tx(tx);
1582 tracing::info!("WAL replication broadcast enabled");
1583 } else {
1584 tracing::warn!("Cannot enable WAL replication: WAL is not configured");
1585 }
1586 }
1587
1588 pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1591 self.wal.as_ref()
1592 }
1593
1594 pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1597 self.storage.as_ref()
1598 }
1599}
1600
1601#[derive(Debug, Clone, Default)]
1603pub struct EventStoreConfig {
1604 pub storage_dir: Option<PathBuf>,
1606
1607 pub snapshot_config: SnapshotConfig,
1609
1610 pub wal_dir: Option<PathBuf>,
1612
1613 pub wal_config: WALConfig,
1615
1616 pub compaction_config: CompactionConfig,
1618
1619 pub schema_registry_config: SchemaRegistryConfig,
1621
1622 pub system_data_dir: Option<PathBuf>,
1627
1628 pub bootstrap_tenant: Option<String>,
1630}
1631
1632impl EventStoreConfig {
1633 pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
1635 Self {
1636 storage_dir: Some(storage_dir.into()),
1637 ..Self::default()
1638 }
1639 }
1640
1641 pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
1643 Self {
1644 snapshot_config,
1645 ..Self::default()
1646 }
1647 }
1648
1649 pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
1651 Self {
1652 wal_dir: Some(wal_dir.into()),
1653 wal_config,
1654 ..Self::default()
1655 }
1656 }
1657
1658 pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
1660 Self {
1661 storage_dir: Some(storage_dir.into()),
1662 snapshot_config,
1663 ..Self::default()
1664 }
1665 }
1666
1667 pub fn production(
1669 storage_dir: impl Into<PathBuf>,
1670 wal_dir: impl Into<PathBuf>,
1671 snapshot_config: SnapshotConfig,
1672 wal_config: WALConfig,
1673 compaction_config: CompactionConfig,
1674 ) -> Self {
1675 let storage_dir = storage_dir.into();
1676 let system_data_dir = storage_dir.join("__system");
1677 Self {
1678 storage_dir: Some(storage_dir),
1679 snapshot_config,
1680 wal_dir: Some(wal_dir.into()),
1681 wal_config,
1682 compaction_config,
1683 system_data_dir: Some(system_data_dir),
1684 ..Self::default()
1685 }
1686 }
1687
1688 pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
1693 self.system_data_dir
1694 .clone()
1695 .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
1696 }
1697
1698 pub fn from_env() -> (Self, &'static str) {
1706 Self::from_env_vars(
1707 std::env::var("ALLSOURCE_DATA_DIR")
1708 .ok()
1709 .filter(|s| !s.is_empty()),
1710 std::env::var("ALLSOURCE_STORAGE_DIR")
1711 .ok()
1712 .filter(|s| !s.is_empty()),
1713 std::env::var("ALLSOURCE_WAL_DIR")
1714 .ok()
1715 .filter(|s| !s.is_empty()),
1716 std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
1717 )
1718 }
1719
1720 pub fn from_env_vars(
1722 data_dir: Option<String>,
1723 explicit_storage_dir: Option<String>,
1724 explicit_wal_dir: Option<String>,
1725 wal_enabled_var: Option<String>,
1726 ) -> (Self, &'static str) {
1727 let data_dir = data_dir.filter(|s| !s.is_empty());
1728 let storage_dir = explicit_storage_dir
1729 .filter(|s| !s.is_empty())
1730 .or_else(|| data_dir.as_ref().map(|d| format!("{d}/storage")));
1731 let wal_dir = explicit_wal_dir
1732 .filter(|s| !s.is_empty())
1733 .or_else(|| data_dir.as_ref().map(|d| format!("{d}/wal")));
1734 let wal_enabled = wal_enabled_var.is_none_or(|v| v == "true");
1735
1736 match (&storage_dir, &wal_dir) {
1737 (Some(sd), Some(wd)) if wal_enabled => {
1738 let config = Self::production(
1739 sd,
1740 wd,
1741 SnapshotConfig::default(),
1742 WALConfig::default(),
1743 CompactionConfig::default(),
1744 );
1745 (config, "wal+parquet")
1746 }
1747 (Some(sd), _) => {
1748 let config = Self::with_persistence(sd);
1749 (config, "parquet-only")
1750 }
1751 (_, Some(wd)) if wal_enabled => {
1752 let config = Self::with_wal(wd, WALConfig::default());
1753 (config, "wal-only")
1754 }
1755 _ => (Self::default(), "in-memory"),
1756 }
1757 }
1758}
1759
1760#[derive(Debug, serde::Serialize)]
1761pub struct StoreStats {
1762 pub total_events: usize,
1763 pub total_entities: usize,
1764 pub total_event_types: usize,
1765 pub total_ingested: u64,
1766}
1767
1768#[derive(Debug, Clone, serde::Serialize)]
1770pub struct StreamInfo {
1771 pub stream_id: String,
1773 pub event_count: usize,
1775 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1777}
1778
1779#[derive(Debug, Clone, serde::Serialize)]
1781pub struct EventTypeInfo {
1782 pub event_type: String,
1784 pub event_count: usize,
1786 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1788}
1789
1790impl Default for EventStore {
1791 fn default() -> Self {
1792 Self::new()
1793 }
1794}
1795
1796#[cfg(test)]
1797mod tests {
1798 use super::*;
1799 use crate::domain::entities::Event;
1800 use tempfile::TempDir;
1801
1802 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1803 Event::from_strings(
1804 event_type.to_string(),
1805 entity_id.to_string(),
1806 "default".to_string(),
1807 serde_json::json!({"name": "Test", "value": 42}),
1808 None,
1809 )
1810 .unwrap()
1811 }
1812
1813 fn create_test_event_with_payload(
1814 entity_id: &str,
1815 event_type: &str,
1816 payload: serde_json::Value,
1817 ) -> Event {
1818 Event::from_strings(
1819 event_type.to_string(),
1820 entity_id.to_string(),
1821 "default".to_string(),
1822 payload,
1823 None,
1824 )
1825 .unwrap()
1826 }
1827
1828 #[test]
1829 fn test_event_store_new() {
1830 let store = EventStore::new();
1831 assert_eq!(store.stats().total_events, 0);
1832 assert_eq!(store.stats().total_entities, 0);
1833 }
1834
1835 #[test]
1836 fn test_event_store_default() {
1837 let store = EventStore::default();
1838 assert_eq!(store.stats().total_events, 0);
1839 }
1840
1841 #[test]
1842 fn test_ingest_single_event() {
1843 let store = EventStore::new();
1844 let event = create_test_event("entity-1", "user.created");
1845
1846 store.ingest(&event).unwrap();
1847
1848 assert_eq!(store.stats().total_events, 1);
1849 assert_eq!(store.stats().total_ingested, 1);
1850 }
1851
1852 #[test]
1853 fn test_ingest_multiple_events() {
1854 let store = EventStore::new();
1855
1856 for i in 0..10 {
1857 let event = create_test_event(&format!("entity-{i}"), "user.created");
1858 store.ingest(&event).unwrap();
1859 }
1860
1861 assert_eq!(store.stats().total_events, 10);
1862 assert_eq!(store.stats().total_ingested, 10);
1863 }
1864
1865 #[test]
1866 fn test_query_by_entity_id() {
1867 let store = EventStore::new();
1868
1869 store
1870 .ingest(&create_test_event("entity-1", "user.created"))
1871 .unwrap();
1872 store
1873 .ingest(&create_test_event("entity-2", "user.created"))
1874 .unwrap();
1875 store
1876 .ingest(&create_test_event("entity-1", "user.updated"))
1877 .unwrap();
1878
1879 let results = store
1880 .query(&QueryEventsRequest {
1881 entity_id: Some("entity-1".to_string()),
1882 event_type: None,
1883 tenant_id: None,
1884 as_of: None,
1885 since: None,
1886 until: None,
1887 limit: None,
1888 event_type_prefix: None,
1889 payload_filter: None,
1890 })
1891 .unwrap();
1892
1893 assert_eq!(results.len(), 2);
1894 }
1895
1896 #[test]
1897 fn test_query_by_event_type() {
1898 let store = EventStore::new();
1899
1900 store
1901 .ingest(&create_test_event("entity-1", "user.created"))
1902 .unwrap();
1903 store
1904 .ingest(&create_test_event("entity-2", "user.updated"))
1905 .unwrap();
1906 store
1907 .ingest(&create_test_event("entity-3", "user.created"))
1908 .unwrap();
1909
1910 let results = store
1911 .query(&QueryEventsRequest {
1912 entity_id: None,
1913 event_type: Some("user.created".to_string()),
1914 tenant_id: None,
1915 as_of: None,
1916 since: None,
1917 until: None,
1918 limit: None,
1919 event_type_prefix: None,
1920 payload_filter: None,
1921 })
1922 .unwrap();
1923
1924 assert_eq!(results.len(), 2);
1925 }
1926
1927 #[test]
1928 fn test_query_with_limit() {
1929 let store = EventStore::new();
1930
1931 for i in 0..10 {
1932 let event = create_test_event(&format!("entity-{i}"), "user.created");
1933 store.ingest(&event).unwrap();
1934 }
1935
1936 let results = store
1937 .query(&QueryEventsRequest {
1938 entity_id: None,
1939 event_type: None,
1940 tenant_id: None,
1941 as_of: None,
1942 since: None,
1943 until: None,
1944 limit: Some(5),
1945 event_type_prefix: None,
1946 payload_filter: None,
1947 })
1948 .unwrap();
1949
1950 assert_eq!(results.len(), 5);
1951 }
1952
1953 #[test]
1954 fn test_query_empty_store() {
1955 let store = EventStore::new();
1956
1957 let results = store
1958 .query(&QueryEventsRequest {
1959 entity_id: Some("non-existent".to_string()),
1960 event_type: None,
1961 tenant_id: None,
1962 as_of: None,
1963 since: None,
1964 until: None,
1965 limit: None,
1966 event_type_prefix: None,
1967 payload_filter: None,
1968 })
1969 .unwrap();
1970
1971 assert!(results.is_empty());
1972 }
1973
1974 #[test]
1975 fn test_reconstruct_state() {
1976 let store = EventStore::new();
1977
1978 store
1979 .ingest(&create_test_event("entity-1", "user.created"))
1980 .unwrap();
1981
1982 let state = store.reconstruct_state("entity-1", None).unwrap();
1983 assert_eq!(state["current_state"]["name"], "Test");
1985 assert_eq!(state["current_state"]["value"], 42);
1986 }
1987
1988 #[test]
1989 fn test_reconstruct_state_not_found() {
1990 let store = EventStore::new();
1991
1992 let result = store.reconstruct_state("non-existent", None);
1993 assert!(result.is_err());
1994 }
1995
1996 #[test]
1997 fn test_get_snapshot_empty() {
1998 let store = EventStore::new();
1999
2000 let result = store.get_snapshot("non-existent");
2001 assert!(result.is_err());
2003 }
2004
2005 #[test]
2006 fn test_create_snapshot() {
2007 let store = EventStore::new();
2008
2009 store
2010 .ingest(&create_test_event("entity-1", "user.created"))
2011 .unwrap();
2012
2013 store.create_snapshot("entity-1").unwrap();
2014
2015 let snapshot = store.get_snapshot("entity-1").unwrap();
2017 assert!(snapshot != serde_json::json!(null));
2018 }
2019
2020 #[test]
2021 fn test_create_snapshot_entity_not_found() {
2022 let store = EventStore::new();
2023
2024 let result = store.create_snapshot("non-existent");
2025 assert!(result.is_err());
2026 }
2027
2028 #[test]
2029 fn test_websocket_manager() {
2030 let store = EventStore::new();
2031 let manager = store.websocket_manager();
2032 assert!(Arc::strong_count(&manager) >= 1);
2034 }
2035
2036 #[test]
2037 fn test_snapshot_manager() {
2038 let store = EventStore::new();
2039 let manager = store.snapshot_manager();
2040 assert!(Arc::strong_count(&manager) >= 1);
2041 }
2042
2043 #[test]
2044 fn test_compaction_manager_none() {
2045 let store = EventStore::new();
2046 assert!(store.compaction_manager().is_none());
2048 }
2049
2050 #[test]
2051 fn test_schema_registry() {
2052 let store = EventStore::new();
2053 let registry = store.schema_registry();
2054 assert!(Arc::strong_count(®istry) >= 1);
2055 }
2056
2057 #[test]
2058 fn test_replay_manager() {
2059 let store = EventStore::new();
2060 let manager = store.replay_manager();
2061 assert!(Arc::strong_count(&manager) >= 1);
2062 }
2063
2064 #[test]
2065 fn test_pipeline_manager() {
2066 let store = EventStore::new();
2067 let manager = store.pipeline_manager();
2068 assert!(Arc::strong_count(&manager) >= 1);
2069 }
2070
2071 #[test]
2072 fn test_projection_manager() {
2073 let store = EventStore::new();
2074 let manager = store.projection_manager();
2075 let projections = manager.list_projections();
2077 assert!(projections.len() >= 2); }
2079
2080 #[test]
2081 fn test_projection_state_cache() {
2082 let store = EventStore::new();
2083 let cache = store.projection_state_cache();
2084
2085 cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
2086 assert_eq!(cache.len(), 1);
2087
2088 let value = cache.get("test:key").unwrap();
2089 assert_eq!(value["value"], 123);
2090 }
2091
2092 #[test]
2093 fn test_metrics() {
2094 let store = EventStore::new();
2095 let metrics = store.metrics();
2096 assert!(Arc::strong_count(&metrics) >= 1);
2097 }
2098
2099 #[test]
2100 fn test_store_stats() {
2101 let store = EventStore::new();
2102
2103 store
2104 .ingest(&create_test_event("entity-1", "user.created"))
2105 .unwrap();
2106 store
2107 .ingest(&create_test_event("entity-2", "order.placed"))
2108 .unwrap();
2109
2110 let stats = store.stats();
2111 assert_eq!(stats.total_events, 2);
2112 assert_eq!(stats.total_entities, 2);
2113 assert_eq!(stats.total_event_types, 2);
2114 assert_eq!(stats.total_ingested, 2);
2115 }
2116
2117 #[test]
2118 fn test_event_store_config_default() {
2119 let config = EventStoreConfig::default();
2120 assert!(config.storage_dir.is_none());
2121 assert!(config.wal_dir.is_none());
2122 }
2123
2124 #[test]
2125 fn test_event_store_config_with_persistence() {
2126 let temp_dir = TempDir::new().unwrap();
2127 let config = EventStoreConfig::with_persistence(temp_dir.path());
2128
2129 assert!(config.storage_dir.is_some());
2130 assert!(config.wal_dir.is_none());
2131 }
2132
2133 #[test]
2134 fn test_event_store_config_with_wal() {
2135 let temp_dir = TempDir::new().unwrap();
2136 let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
2137
2138 assert!(config.storage_dir.is_none());
2139 assert!(config.wal_dir.is_some());
2140 }
2141
2142 #[test]
2143 fn test_event_store_config_with_all() {
2144 let temp_dir = TempDir::new().unwrap();
2145 let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
2146
2147 assert!(config.storage_dir.is_some());
2148 }
2149
2150 #[test]
2151 fn test_event_store_config_production() {
2152 let storage_dir = TempDir::new().unwrap();
2153 let wal_dir = TempDir::new().unwrap();
2154 let config = EventStoreConfig::production(
2155 storage_dir.path(),
2156 wal_dir.path(),
2157 SnapshotConfig::default(),
2158 WALConfig::default(),
2159 CompactionConfig::default(),
2160 );
2161
2162 assert!(config.storage_dir.is_some());
2163 assert!(config.wal_dir.is_some());
2164 }
2165
2166 #[test]
2172 fn test_from_env_vars_data_dir_enables_full_persistence() {
2173 let (config, mode) =
2174 EventStoreConfig::from_env_vars(Some("/app/data".to_string()), None, None, None);
2175 assert_eq!(mode, "wal+parquet");
2176 assert_eq!(
2177 config.storage_dir.unwrap().to_str().unwrap(),
2178 "/app/data/storage"
2179 );
2180 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
2181 }
2182
2183 #[test]
2184 fn test_from_env_vars_explicit_dirs() {
2185 let (config, mode) = EventStoreConfig::from_env_vars(
2186 None,
2187 Some("/custom/storage".to_string()),
2188 Some("/custom/wal".to_string()),
2189 None,
2190 );
2191 assert_eq!(mode, "wal+parquet");
2192 assert_eq!(
2193 config.storage_dir.unwrap().to_str().unwrap(),
2194 "/custom/storage"
2195 );
2196 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
2197 }
2198
2199 #[test]
2200 fn test_from_env_vars_wal_disabled() {
2201 let (config, mode) = EventStoreConfig::from_env_vars(
2202 Some("/app/data".to_string()),
2203 None,
2204 None,
2205 Some("false".to_string()),
2206 );
2207 assert_eq!(mode, "parquet-only");
2208 assert!(config.storage_dir.is_some());
2209 assert!(config.wal_dir.is_none());
2210 }
2211
2212 #[test]
2213 fn test_from_env_vars_no_dirs_is_in_memory() {
2214 let (config, mode) = EventStoreConfig::from_env_vars(None, None, None, None);
2215 assert_eq!(mode, "in-memory");
2216 assert!(config.storage_dir.is_none());
2217 assert!(config.wal_dir.is_none());
2218 }
2219
2220 #[test]
2221 fn test_from_env_vars_empty_strings_treated_as_none() {
2222 let (_, mode) = EventStoreConfig::from_env_vars(
2223 Some(String::new()),
2224 Some(String::new()),
2225 Some(String::new()),
2226 None,
2227 );
2228 assert_eq!(mode, "in-memory");
2229 }
2230
2231 #[test]
2232 fn test_from_env_vars_explicit_overrides_data_dir() {
2233 let (config, mode) = EventStoreConfig::from_env_vars(
2234 Some("/app/data".to_string()),
2235 Some("/override/storage".to_string()),
2236 Some("/override/wal".to_string()),
2237 None,
2238 );
2239 assert_eq!(mode, "wal+parquet");
2240 assert_eq!(
2241 config.storage_dir.unwrap().to_str().unwrap(),
2242 "/override/storage"
2243 );
2244 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
2245 }
2246
2247 #[test]
2248 fn test_from_env_vars_wal_only() {
2249 let (config, mode) =
2250 EventStoreConfig::from_env_vars(None, None, Some("/wal/only".to_string()), None);
2251 assert_eq!(mode, "wal-only");
2252 assert!(config.storage_dir.is_none());
2253 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
2254 }
2255
2256 #[test]
2257 fn test_store_stats_serde() {
2258 let stats = StoreStats {
2259 total_events: 100,
2260 total_entities: 50,
2261 total_event_types: 10,
2262 total_ingested: 100,
2263 };
2264
2265 let json = serde_json::to_string(&stats).unwrap();
2266 assert!(json.contains("\"total_events\":100"));
2267 assert!(json.contains("\"total_entities\":50"));
2268 }
2269
2270 #[test]
2271 fn test_query_with_entity_and_type() {
2272 let store = EventStore::new();
2273
2274 store
2275 .ingest(&create_test_event("entity-1", "user.created"))
2276 .unwrap();
2277 store
2278 .ingest(&create_test_event("entity-1", "user.updated"))
2279 .unwrap();
2280 store
2281 .ingest(&create_test_event("entity-2", "user.created"))
2282 .unwrap();
2283
2284 let results = store
2285 .query(&QueryEventsRequest {
2286 entity_id: Some("entity-1".to_string()),
2287 event_type: Some("user.created".to_string()),
2288 tenant_id: None,
2289 as_of: None,
2290 since: None,
2291 until: None,
2292 limit: None,
2293 event_type_prefix: None,
2294 payload_filter: None,
2295 })
2296 .unwrap();
2297
2298 assert_eq!(results.len(), 1);
2299 assert_eq!(results[0].event_type_str(), "user.created");
2300 }
2301
2302 #[test]
2303 fn test_query_by_event_type_prefix() {
2304 let store = EventStore::new();
2305
2306 store
2308 .ingest(&create_test_event("entity-1", "index.created"))
2309 .unwrap();
2310 store
2311 .ingest(&create_test_event("entity-2", "index.updated"))
2312 .unwrap();
2313 store
2314 .ingest(&create_test_event("entity-3", "trade.created"))
2315 .unwrap();
2316 store
2317 .ingest(&create_test_event("entity-4", "trade.completed"))
2318 .unwrap();
2319 store
2320 .ingest(&create_test_event("entity-5", "balance.updated"))
2321 .unwrap();
2322
2323 let results = store
2325 .query(&QueryEventsRequest {
2326 entity_id: None,
2327 event_type: None,
2328 tenant_id: None,
2329 as_of: None,
2330 since: None,
2331 until: None,
2332 limit: None,
2333 event_type_prefix: Some("index.".to_string()),
2334 payload_filter: None,
2335 })
2336 .unwrap();
2337
2338 assert_eq!(results.len(), 2);
2339 assert!(
2340 results
2341 .iter()
2342 .all(|e| e.event_type_str().starts_with("index."))
2343 );
2344 }
2345
2346 #[test]
2347 fn test_query_by_event_type_prefix_empty_returns_all() {
2348 let store = EventStore::new();
2349
2350 store
2351 .ingest(&create_test_event("entity-1", "index.created"))
2352 .unwrap();
2353 store
2354 .ingest(&create_test_event("entity-2", "trade.created"))
2355 .unwrap();
2356
2357 let results = store
2359 .query(&QueryEventsRequest {
2360 entity_id: None,
2361 event_type: None,
2362 tenant_id: None,
2363 as_of: None,
2364 since: None,
2365 until: None,
2366 limit: None,
2367 event_type_prefix: Some(String::new()),
2368 payload_filter: None,
2369 })
2370 .unwrap();
2371
2372 assert_eq!(results.len(), 2);
2373 }
2374
2375 #[test]
2376 fn test_query_by_event_type_prefix_no_match() {
2377 let store = EventStore::new();
2378
2379 store
2380 .ingest(&create_test_event("entity-1", "index.created"))
2381 .unwrap();
2382
2383 let results = store
2384 .query(&QueryEventsRequest {
2385 entity_id: None,
2386 event_type: None,
2387 tenant_id: None,
2388 as_of: None,
2389 since: None,
2390 until: None,
2391 limit: None,
2392 event_type_prefix: Some("nonexistent.".to_string()),
2393 payload_filter: None,
2394 })
2395 .unwrap();
2396
2397 assert!(results.is_empty());
2398 }
2399
2400 #[test]
2401 fn test_query_by_entity_with_type_prefix() {
2402 let store = EventStore::new();
2403
2404 store
2405 .ingest(&create_test_event("entity-1", "index.created"))
2406 .unwrap();
2407 store
2408 .ingest(&create_test_event("entity-1", "trade.created"))
2409 .unwrap();
2410 store
2411 .ingest(&create_test_event("entity-2", "index.updated"))
2412 .unwrap();
2413
2414 let results = store
2416 .query(&QueryEventsRequest {
2417 entity_id: Some("entity-1".to_string()),
2418 event_type: None,
2419 tenant_id: None,
2420 as_of: None,
2421 since: None,
2422 until: None,
2423 limit: None,
2424 event_type_prefix: Some("index.".to_string()),
2425 payload_filter: None,
2426 })
2427 .unwrap();
2428
2429 assert_eq!(results.len(), 1);
2430 assert_eq!(results[0].event_type_str(), "index.created");
2431 }
2432
2433 #[test]
2434 fn test_query_prefix_with_limit() {
2435 let store = EventStore::new();
2436
2437 for i in 0..5 {
2438 store
2439 .ingest(&create_test_event(&format!("entity-{i}"), "index.created"))
2440 .unwrap();
2441 }
2442
2443 let results = store
2444 .query(&QueryEventsRequest {
2445 entity_id: None,
2446 event_type: None,
2447 tenant_id: None,
2448 as_of: None,
2449 since: None,
2450 until: None,
2451 limit: Some(3),
2452 event_type_prefix: Some("index.".to_string()),
2453 payload_filter: None,
2454 })
2455 .unwrap();
2456
2457 assert_eq!(results.len(), 3);
2458 }
2459
2460 #[test]
2461 fn test_query_prefix_alongside_existing_filters() {
2462 let store = EventStore::new();
2463
2464 store
2465 .ingest(&create_test_event("entity-1", "index.created"))
2466 .unwrap();
2467 std::thread::sleep(std::time::Duration::from_millis(10));
2469 store
2470 .ingest(&create_test_event("entity-2", "index.strategy.updated"))
2471 .unwrap();
2472 std::thread::sleep(std::time::Duration::from_millis(10));
2473 store
2474 .ingest(&create_test_event("entity-3", "index.deleted"))
2475 .unwrap();
2476
2477 let results = store
2479 .query(&QueryEventsRequest {
2480 entity_id: None,
2481 event_type: None,
2482 tenant_id: None,
2483 as_of: None,
2484 since: None,
2485 until: None,
2486 limit: Some(2),
2487 event_type_prefix: Some("index.".to_string()),
2488 payload_filter: None,
2489 })
2490 .unwrap();
2491
2492 assert_eq!(results.len(), 2);
2493 }
2494
2495 #[test]
2496 fn test_query_with_payload_filter() {
2497 let store = EventStore::new();
2498
2499 for i in 0..5 {
2501 store
2502 .ingest(&create_test_event_with_payload(
2503 &format!("entity-{i}"),
2504 "user.action",
2505 serde_json::json!({"user_id": "alice", "action": "click"}),
2506 ))
2507 .unwrap();
2508 }
2509 for i in 5..10 {
2511 store
2512 .ingest(&create_test_event_with_payload(
2513 &format!("entity-{i}"),
2514 "user.action",
2515 serde_json::json!({"user_id": "bob", "action": "view"}),
2516 ))
2517 .unwrap();
2518 }
2519
2520 let results = store
2522 .query(&QueryEventsRequest {
2523 entity_id: None,
2524 event_type: Some("user.action".to_string()),
2525 tenant_id: None,
2526 as_of: None,
2527 since: None,
2528 until: None,
2529 limit: None,
2530 event_type_prefix: None,
2531 payload_filter: Some(r#"{"user_id":"alice"}"#.to_string()),
2532 })
2533 .unwrap();
2534
2535 assert_eq!(results.len(), 5);
2536 }
2537
2538 #[test]
2539 fn test_query_payload_filter_non_existent_field() {
2540 let store = EventStore::new();
2541
2542 store
2543 .ingest(&create_test_event_with_payload(
2544 "entity-1",
2545 "user.action",
2546 serde_json::json!({"user_id": "alice"}),
2547 ))
2548 .unwrap();
2549
2550 let results = store
2552 .query(&QueryEventsRequest {
2553 entity_id: None,
2554 event_type: None,
2555 tenant_id: None,
2556 as_of: None,
2557 since: None,
2558 until: None,
2559 limit: None,
2560 event_type_prefix: None,
2561 payload_filter: Some(r#"{"nonexistent":"value"}"#.to_string()),
2562 })
2563 .unwrap();
2564
2565 assert!(results.is_empty());
2566 }
2567
2568 #[test]
2569 fn test_query_payload_filter_with_prefix() {
2570 let store = EventStore::new();
2571
2572 store
2573 .ingest(&create_test_event_with_payload(
2574 "entity-1",
2575 "index.created",
2576 serde_json::json!({"status": "active"}),
2577 ))
2578 .unwrap();
2579 store
2580 .ingest(&create_test_event_with_payload(
2581 "entity-2",
2582 "index.created",
2583 serde_json::json!({"status": "inactive"}),
2584 ))
2585 .unwrap();
2586 store
2587 .ingest(&create_test_event_with_payload(
2588 "entity-3",
2589 "trade.created",
2590 serde_json::json!({"status": "active"}),
2591 ))
2592 .unwrap();
2593
2594 let results = store
2596 .query(&QueryEventsRequest {
2597 entity_id: None,
2598 event_type: None,
2599 tenant_id: None,
2600 as_of: None,
2601 since: None,
2602 until: None,
2603 limit: None,
2604 event_type_prefix: Some("index.".to_string()),
2605 payload_filter: Some(r#"{"status":"active"}"#.to_string()),
2606 })
2607 .unwrap();
2608
2609 assert_eq!(results.len(), 1);
2610 assert_eq!(results[0].entity_id().to_string(), "entity-1");
2611 }
2612
2613 #[test]
2614 fn test_flush_storage_no_storage() {
2615 let store = EventStore::new();
2616 let result = store.flush_storage();
2618 assert!(result.is_ok());
2619 }
2620
2621 #[test]
2622 fn test_state_evolution() {
2623 let store = EventStore::new();
2624
2625 store
2627 .ingest(
2628 &Event::from_strings(
2629 "user.created".to_string(),
2630 "user-1".to_string(),
2631 "default".to_string(),
2632 serde_json::json!({"name": "Alice", "age": 25}),
2633 None,
2634 )
2635 .unwrap(),
2636 )
2637 .unwrap();
2638
2639 store
2641 .ingest(
2642 &Event::from_strings(
2643 "user.updated".to_string(),
2644 "user-1".to_string(),
2645 "default".to_string(),
2646 serde_json::json!({"age": 26}),
2647 None,
2648 )
2649 .unwrap(),
2650 )
2651 .unwrap();
2652
2653 let state = store.reconstruct_state("user-1", None).unwrap();
2654 assert_eq!(state["current_state"]["name"], "Alice");
2656 assert_eq!(state["current_state"]["age"], 26);
2657 }
2658
2659 #[test]
2660 fn test_reject_system_event_types() {
2661 let store = EventStore::new();
2662
2663 let event = Event::reconstruct_from_strings(
2665 uuid::Uuid::new_v4(),
2666 "_system.tenant.created".to_string(),
2667 "_system:tenant:acme".to_string(),
2668 "_system".to_string(),
2669 serde_json::json!({"name": "ACME"}),
2670 chrono::Utc::now(),
2671 None,
2672 1,
2673 );
2674
2675 let result = store.ingest(&event);
2676 assert!(result.is_err());
2677 let err = result.unwrap_err();
2678 assert!(
2679 err.to_string().contains("reserved for internal use"),
2680 "Expected system namespace rejection, got: {err}"
2681 );
2682 }
2683
2684 #[test]
2692 fn test_wal_recovery_checkpoints_to_parquet() {
2693 let data_dir = TempDir::new().unwrap();
2694 let storage_dir = data_dir.path().join("storage");
2695 let wal_dir = data_dir.path().join("wal");
2696
2697 {
2699 let config = EventStoreConfig::production(
2700 &storage_dir,
2701 &wal_dir,
2702 SnapshotConfig::default(),
2703 WALConfig {
2704 sync_on_write: true,
2705 ..WALConfig::default()
2706 },
2707 CompactionConfig::default(),
2708 );
2709 let store = EventStore::with_config(config);
2710
2711 for i in 0..5 {
2712 let event = Event::from_strings(
2713 "test.created".to_string(),
2714 format!("entity-{i}"),
2715 "default".to_string(),
2716 serde_json::json!({"index": i}),
2717 None,
2718 )
2719 .unwrap();
2720 store.ingest(&event).unwrap();
2721 }
2722
2723 assert_eq!(store.stats().total_events, 5);
2724
2725 }
2728
2729 let wal_files: Vec<_> = std::fs::read_dir(&wal_dir)
2731 .unwrap()
2732 .filter_map(std::result::Result::ok)
2733 .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
2734 .collect();
2735 assert!(!wal_files.is_empty(), "WAL file should exist");
2736 let wal_size = wal_files[0].metadata().unwrap().len();
2737 assert!(wal_size > 0, "WAL file should have data (got 0 bytes)");
2738
2739 {
2741 let config = EventStoreConfig::production(
2742 &storage_dir,
2743 &wal_dir,
2744 SnapshotConfig::default(),
2745 WALConfig {
2746 sync_on_write: true,
2747 ..WALConfig::default()
2748 },
2749 CompactionConfig::default(),
2750 );
2751 let store = EventStore::with_config(config);
2752
2753 assert_eq!(
2755 store.stats().total_events,
2756 5,
2757 "Session 2 should have all 5 events after WAL recovery"
2758 );
2759
2760 let parquet_files: Vec<_> = std::fs::read_dir(&storage_dir)
2762 .unwrap()
2763 .filter_map(std::result::Result::ok)
2764 .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2765 .collect();
2766 assert!(
2767 !parquet_files.is_empty(),
2768 "Parquet file should exist after WAL checkpoint"
2769 );
2770 }
2771
2772 {
2774 let config = EventStoreConfig::production(
2775 &storage_dir,
2776 &wal_dir,
2777 SnapshotConfig::default(),
2778 WALConfig {
2779 sync_on_write: true,
2780 ..WALConfig::default()
2781 },
2782 CompactionConfig::default(),
2783 );
2784 let store = EventStore::with_config(config);
2785
2786 assert_eq!(
2787 store.stats().total_events,
2788 5,
2789 "Session 3 should still have all 5 events from Parquet"
2790 );
2791 }
2792 }
2793
2794 #[test]
2795 fn test_parquet_restore_surfaces_errors_not_silent() {
2796 let data_dir = TempDir::new().unwrap();
2800 let storage_dir = data_dir.path().join("storage");
2801 let wal_dir = data_dir.path().join("wal");
2802
2803 {
2805 let config = EventStoreConfig::production(
2806 &storage_dir,
2807 &wal_dir,
2808 SnapshotConfig::default(),
2809 WALConfig {
2810 sync_on_write: true,
2811 ..WALConfig::default()
2812 },
2813 CompactionConfig::default(),
2814 );
2815 let store = EventStore::with_config(config);
2816
2817 for i in 0..3 {
2818 let event = Event::from_strings(
2819 "test.created".to_string(),
2820 format!("entity-{i}"),
2821 "default".to_string(),
2822 serde_json::json!({"i": i}),
2823 None,
2824 )
2825 .unwrap();
2826 store.ingest(&event).unwrap();
2827 }
2828
2829 store.flush_storage().unwrap();
2830 assert_eq!(store.stats().total_events, 3);
2831 }
2832
2833 let parquet_files: Vec<_> = std::fs::read_dir(&storage_dir)
2835 .unwrap()
2836 .filter_map(std::result::Result::ok)
2837 .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2838 .collect();
2839 assert!(!parquet_files.is_empty(), "Parquet file must exist");
2840
2841 std::fs::write(parquet_files[0].path(), b"corrupted data").unwrap();
2843
2844 for entry in std::fs::read_dir(&wal_dir).unwrap().flatten() {
2846 std::fs::write(entry.path(), b"").unwrap();
2847 }
2848
2849 {
2856 let config = EventStoreConfig::production(
2857 &storage_dir,
2858 &wal_dir,
2859 SnapshotConfig::default(),
2860 WALConfig::default(),
2861 CompactionConfig::default(),
2862 );
2863 let store = EventStore::with_config(config);
2864
2865 assert_eq!(store.stats().total_events, 0);
2868 }
2869 }
2870}