1use crate::{
2 application::{
3 dto::QueryEventsRequest,
4 services::{
5 exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
6 pipeline::PipelineManager,
7 projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
8 replay::ReplayManager,
9 schema::{SchemaRegistry, SchemaRegistryConfig},
10 schema_evolution::SchemaEvolutionManager,
11 webhook::WebhookRegistry,
12 },
13 },
14 domain::entities::Event,
15 error::{AllSourceError, Result},
16 infrastructure::{
17 observability::metrics::MetricsRegistry,
18 persistence::{
19 compaction::{CompactionConfig, CompactionManager},
20 index::{EventIndex, IndexEntry},
21 snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
22 storage::ParquetStorage,
23 wal::{WALConfig, WriteAheadLog},
24 },
25 query::geospatial::GeoIndex,
26 web::websocket::WebSocketManager,
27 },
28};
29use chrono::{DateTime, Utc};
30use dashmap::DashMap;
31use parking_lot::RwLock;
32use std::{path::PathBuf, sync::Arc};
33use tokio::sync::mpsc;
34
35pub struct EventStore {
37 events: Arc<RwLock<Vec<Event>>>,
39
40 index: Arc<EventIndex>,
42
43 pub(crate) projections: Arc<RwLock<ProjectionManager>>,
45
46 storage: Option<Arc<RwLock<ParquetStorage>>>,
48
49 websocket_manager: Arc<WebSocketManager>,
51
52 snapshot_manager: Arc<SnapshotManager>,
54
55 wal: Option<Arc<WriteAheadLog>>,
57
58 compaction_manager: Option<Arc<CompactionManager>>,
60
61 schema_registry: Arc<SchemaRegistry>,
63
64 replay_manager: Arc<ReplayManager>,
66
67 pipeline_manager: Arc<PipelineManager>,
69
70 metrics: Arc<MetricsRegistry>,
72
73 total_ingested: Arc<RwLock<u64>>,
75
76 projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
80
81 webhook_registry: Arc<WebhookRegistry>,
83
84 webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
86
87 geo_index: Arc<GeoIndex>,
89
90 exactly_once: Arc<ExactlyOnceRegistry>,
92
93 schema_evolution: Arc<SchemaEvolutionManager>,
95}
96
97#[derive(Debug, Clone)]
99pub struct WebhookDeliveryTask {
100 pub webhook: crate::application::services::webhook::WebhookSubscription,
101 pub event: Event,
102}
103
104impl EventStore {
105 pub fn new() -> Self {
107 Self::with_config(EventStoreConfig::default())
108 }
109
110 pub fn with_config(config: EventStoreConfig) -> Self {
112 let mut projections = ProjectionManager::new();
113
114 projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
116 projections.register(Arc::new(EventCounterProjection::new("event_counters")));
117
118 let storage = config
120 .storage_dir
121 .as_ref()
122 .and_then(|dir| match ParquetStorage::new(dir) {
123 Ok(storage) => {
124 tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
125 Some(Arc::new(RwLock::new(storage)))
126 }
127 Err(e) => {
128 tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
129 None
130 }
131 });
132
133 let wal = config.wal_dir.as_ref().and_then(|dir| {
135 match WriteAheadLog::new(dir, config.wal_config.clone()) {
136 Ok(wal) => {
137 tracing::info!("✅ WAL enabled at: {}", dir.display());
138 Some(Arc::new(wal))
139 }
140 Err(e) => {
141 tracing::error!("❌ Failed to initialize WAL: {}", e);
142 None
143 }
144 }
145 });
146
147 let compaction_manager = config.storage_dir.as_ref().map(|dir| {
149 let manager = CompactionManager::new(dir, config.compaction_config.clone());
150 Arc::new(manager)
151 });
152
153 let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
155 tracing::info!("✅ Schema registry enabled");
156
157 let replay_manager = Arc::new(ReplayManager::new());
159 tracing::info!("✅ Replay manager enabled");
160
161 let pipeline_manager = Arc::new(PipelineManager::new());
163 tracing::info!("✅ Pipeline manager enabled");
164
165 let metrics = MetricsRegistry::new();
167 tracing::info!("✅ Prometheus metrics registry initialized");
168
169 let projection_state_cache = Arc::new(DashMap::new());
171 tracing::info!("✅ Projection state cache initialized");
172
173 let webhook_registry = Arc::new(WebhookRegistry::new());
175 tracing::info!("✅ Webhook registry initialized");
176
177 let store = Self {
178 events: Arc::new(RwLock::new(Vec::new())),
179 index: Arc::new(EventIndex::new()),
180 projections: Arc::new(RwLock::new(projections)),
181 storage,
182 websocket_manager: Arc::new(WebSocketManager::new()),
183 snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
184 wal,
185 compaction_manager,
186 schema_registry,
187 replay_manager,
188 pipeline_manager,
189 metrics,
190 total_ingested: Arc::new(RwLock::new(0)),
191 projection_state_cache,
192 webhook_registry,
193 webhook_tx: Arc::new(RwLock::new(None)),
194 geo_index: Arc::new(GeoIndex::new()),
195 exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
196 schema_evolution: Arc::new(SchemaEvolutionManager::new()),
197 };
198
199 let mut wal_recovered = false;
201 if let Some(ref wal) = store.wal {
202 match wal.recover() {
203 Ok(recovered_events) if !recovered_events.is_empty() => {
204 tracing::info!(
205 "🔄 Recovering {} events from WAL...",
206 recovered_events.len()
207 );
208
209 for event in recovered_events {
210 let offset = store.events.read().len();
212 if let Err(e) = store.index.index_event(
213 event.id,
214 event.entity_id_str(),
215 event.event_type_str(),
216 event.timestamp,
217 offset,
218 ) {
219 tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
220 }
221
222 if let Err(e) = store.projections.read().process_event(&event) {
223 tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
224 }
225
226 store.events.write().push(event);
227 }
228
229 let total = store.events.read().len();
230 *store.total_ingested.write() = total as u64;
231 tracing::info!("✅ Successfully recovered {} events from WAL", total);
232
233 if store.storage.is_some() {
235 tracing::info!("📸 Checkpointing WAL to Parquet storage...");
236 if let Err(e) = store.flush_storage() {
237 tracing::error!("Failed to checkpoint to Parquet: {}", e);
238 } else if let Err(e) = wal.truncate() {
239 tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
240 } else {
241 tracing::info!("✅ WAL checkpointed and truncated");
242 }
243 }
244
245 wal_recovered = true;
246 }
247 Ok(_) => {
248 tracing::debug!("No events to recover from WAL");
249 }
250 Err(e) => {
251 tracing::error!("❌ WAL recovery failed: {}", e);
252 }
253 }
254 }
255
256 if !wal_recovered
259 && let Some(ref storage) = store.storage
260 && let Ok(persisted_events) = storage.read().load_all_events()
261 {
262 tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
263
264 for event in persisted_events {
265 let offset = store.events.read().len();
267 if let Err(e) = store.index.index_event(
268 event.id,
269 event.entity_id_str(),
270 event.event_type_str(),
271 event.timestamp,
272 offset,
273 ) {
274 tracing::error!("Failed to re-index event {}: {}", event.id, e);
275 }
276
277 if let Err(e) = store.projections.read().process_event(&event) {
279 tracing::error!("Failed to re-process event {}: {}", event.id, e);
280 }
281
282 store.events.write().push(event);
283 }
284
285 let total = store.events.read().len();
286 *store.total_ingested.write() = total as u64;
287 tracing::info!("✅ Successfully loaded {} events from storage", total);
288 }
289
290 store
291 }
292
293 pub fn ingest(&self, event: Event) -> Result<()> {
295 let timer = self.metrics.ingestion_duration_seconds.start_timer();
297
298 let validation_result = self.validate_event(&event);
300 if let Err(e) = validation_result {
301 self.metrics.ingestion_errors_total.inc();
303 timer.observe_duration();
304 return Err(e);
305 }
306
307 if let Some(ref wal) = self.wal
310 && let Err(e) = wal.append(event.clone())
311 {
312 self.metrics.ingestion_errors_total.inc();
313 timer.observe_duration();
314 return Err(e);
315 }
316
317 let mut events = self.events.write();
318 let offset = events.len();
319
320 self.index.index_event(
322 event.id,
323 event.entity_id_str(),
324 event.event_type_str(),
325 event.timestamp,
326 offset,
327 )?;
328
329 let projections = self.projections.read();
331 projections.process_event(&event)?;
332 drop(projections); let pipeline_results = self.pipeline_manager.process_event(&event);
337 if !pipeline_results.is_empty() {
338 tracing::debug!(
339 "Event {} processed by {} pipeline(s)",
340 event.id,
341 pipeline_results.len()
342 );
343 for (pipeline_id, result) in pipeline_results {
346 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
347 }
348 }
349
350 if let Some(ref storage) = self.storage {
352 let storage = storage.read();
353 storage.append_event(event.clone())?;
354 }
355
356 events.push(event.clone());
358 let total_events = events.len();
359 drop(events); self.websocket_manager
363 .broadcast_event(Arc::new(event.clone()));
364
365 self.dispatch_webhooks(&event);
367
368 self.geo_index.index_event(&event);
370
371 self.schema_evolution
373 .analyze_event(event.event_type_str(), &event.payload);
374
375 self.check_auto_snapshot(event.entity_id_str(), &event);
377
378 self.metrics.events_ingested_total.inc();
380 self.metrics
381 .events_ingested_by_type
382 .with_label_values(&[event.event_type_str()])
383 .inc();
384 self.metrics.storage_events_total.set(total_events as i64);
385
386 let mut total = self.total_ingested.write();
388 *total += 1;
389
390 timer.observe_duration();
391
392 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
393
394 Ok(())
395 }
396
397 pub fn ingest_replicated(&self, event: Event) -> Result<()> {
404 let timer = self.metrics.ingestion_duration_seconds.start_timer();
405
406 let mut events = self.events.write();
407 let offset = events.len();
408
409 self.index.index_event(
411 event.id,
412 event.entity_id_str(),
413 event.event_type_str(),
414 event.timestamp,
415 offset,
416 )?;
417
418 let projections = self.projections.read();
420 projections.process_event(&event)?;
421 drop(projections);
422
423 let pipeline_results = self.pipeline_manager.process_event(&event);
425 if !pipeline_results.is_empty() {
426 tracing::debug!(
427 "Replicated event {} processed by {} pipeline(s)",
428 event.id,
429 pipeline_results.len()
430 );
431 }
432
433 events.push(event.clone());
435 let total_events = events.len();
436 drop(events);
437
438 self.websocket_manager
440 .broadcast_event(Arc::new(event.clone()));
441
442 self.metrics.events_ingested_total.inc();
444 self.metrics
445 .events_ingested_by_type
446 .with_label_values(&[event.event_type_str()])
447 .inc();
448 self.metrics.storage_events_total.set(total_events as i64);
449
450 let mut total = self.total_ingested.write();
451 *total += 1;
452
453 timer.observe_duration();
454
455 tracing::debug!(
456 "Replicated event ingested: {} (offset: {})",
457 event.id,
458 offset
459 );
460
461 Ok(())
462 }
463
464 pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
466 Arc::clone(&self.websocket_manager)
467 }
468
469 pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
471 Arc::clone(&self.snapshot_manager)
472 }
473
474 pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
476 self.compaction_manager.as_ref().map(Arc::clone)
477 }
478
479 pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
481 Arc::clone(&self.schema_registry)
482 }
483
484 pub fn replay_manager(&self) -> Arc<ReplayManager> {
486 Arc::clone(&self.replay_manager)
487 }
488
489 pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
491 Arc::clone(&self.pipeline_manager)
492 }
493
494 pub fn metrics(&self) -> Arc<MetricsRegistry> {
496 Arc::clone(&self.metrics)
497 }
498
499 pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
501 self.projections.read()
502 }
503
504 pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
507 Arc::clone(&self.projection_state_cache)
508 }
509
510 pub fn geo_index(&self) -> Arc<GeoIndex> {
513 self.geo_index.clone()
514 }
515
516 pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
518 self.exactly_once.clone()
519 }
520
521 pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
523 self.schema_evolution.clone()
524 }
525
526 pub fn snapshot_events(&self) -> Vec<Event> {
528 self.events.read().clone()
529 }
530
531 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
532 Arc::clone(&self.webhook_registry)
533 }
534
535 pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
538 *self.webhook_tx.write() = Some(tx);
539 tracing::info!("Webhook delivery channel connected");
540 }
541
542 fn dispatch_webhooks(&self, event: &Event) {
544 let matching = self.webhook_registry.find_matching(event);
545 if matching.is_empty() {
546 return;
547 }
548
549 let tx_guard = self.webhook_tx.read();
550 if let Some(ref tx) = *tx_guard {
551 for webhook in matching {
552 let task = WebhookDeliveryTask {
553 webhook,
554 event: event.clone(),
555 };
556 if let Err(e) = tx.send(task) {
557 tracing::warn!("Failed to queue webhook delivery: {}", e);
558 }
559 }
560 }
561 }
562
563 pub fn flush_storage(&self) -> Result<()> {
565 if let Some(ref storage) = self.storage {
566 let storage = storage.read();
567 storage.flush()?;
568 tracing::info!("✅ Flushed events to persistent storage");
569 }
570 Ok(())
571 }
572
573 pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
575 let events = self.query(QueryEventsRequest {
577 entity_id: Some(entity_id.to_string()),
578 event_type: None,
579 tenant_id: None,
580 as_of: None,
581 since: None,
582 until: None,
583 limit: None,
584 })?;
585
586 if events.is_empty() {
587 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
588 }
589
590 let mut state = serde_json::json!({});
592 for event in &events {
593 if let serde_json::Value::Object(ref mut state_map) = state
594 && let serde_json::Value::Object(ref payload_map) = event.payload
595 {
596 for (key, value) in payload_map {
597 state_map.insert(key.clone(), value.clone());
598 }
599 }
600 }
601
602 let last_event = events.last().unwrap();
603 self.snapshot_manager.create_snapshot(
604 entity_id.to_string(),
605 state,
606 last_event.timestamp,
607 events.len(),
608 SnapshotType::Manual,
609 )?;
610
611 Ok(())
612 }
613
614 fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
616 let entity_event_count = self
618 .index
619 .get_by_entity(entity_id)
620 .map(|entries| entries.len())
621 .unwrap_or(0);
622
623 if self.snapshot_manager.should_create_snapshot(
624 entity_id,
625 entity_event_count,
626 event.timestamp,
627 ) {
628 if let Err(e) = self.create_snapshot(entity_id) {
630 tracing::warn!(
631 "Failed to create automatic snapshot for {}: {}",
632 entity_id,
633 e
634 );
635 }
636 }
637 }
638
639 fn validate_event(&self, event: &Event) -> Result<()> {
641 if event.entity_id_str().is_empty() {
644 return Err(AllSourceError::ValidationError(
645 "entity_id cannot be empty".to_string(),
646 ));
647 }
648
649 if event.event_type_str().is_empty() {
650 return Err(AllSourceError::ValidationError(
651 "event_type cannot be empty".to_string(),
652 ));
653 }
654
655 if event.event_type().is_system() {
658 return Err(AllSourceError::ValidationError(
659 "Event types starting with '_system.' are reserved for internal use".to_string(),
660 ));
661 }
662
663 Ok(())
664 }
665
666 pub fn reset_projection(&self, name: &str) -> Result<usize> {
668 let projection_manager = self.projections.read();
669 let projection = projection_manager.get_projection(name).ok_or_else(|| {
670 AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
671 })?;
672
673 projection.clear();
675
676 let prefix = format!("{name}:");
678 let keys_to_remove: Vec<String> = self
679 .projection_state_cache
680 .iter()
681 .filter(|entry| entry.key().starts_with(&prefix))
682 .map(|entry| entry.key().clone())
683 .collect();
684 for key in keys_to_remove {
685 self.projection_state_cache.remove(&key);
686 }
687
688 let events = self.events.read();
690 let mut reprocessed = 0usize;
691 for event in events.iter() {
692 if projection.process(event).is_ok() {
693 reprocessed += 1;
694 }
695 }
696
697 Ok(reprocessed)
698 }
699
700 pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
702 if let Some(offset) = self.index.get_by_id(event_id) {
703 let events = self.events.read();
704 Ok(events.get(offset).cloned())
705 } else {
706 Ok(None)
707 }
708 }
709
710 pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
712 let query_type = if request.entity_id.is_some() {
714 "entity"
715 } else if request.event_type.is_some() {
716 "type"
717 } else {
718 "full_scan"
719 };
720
721 let timer = self
723 .metrics
724 .query_duration_seconds
725 .with_label_values(&[query_type])
726 .start_timer();
727
728 self.metrics
730 .queries_total
731 .with_label_values(&[query_type])
732 .inc();
733
734 let events = self.events.read();
735
736 let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
738 self.index
740 .get_by_entity(entity_id)
741 .map(|entries| self.filter_entries(entries, &request))
742 .unwrap_or_default()
743 } else if let Some(event_type) = &request.event_type {
744 self.index
746 .get_by_type(event_type)
747 .map(|entries| self.filter_entries(entries, &request))
748 .unwrap_or_default()
749 } else {
750 (0..events.len()).collect()
752 };
753
754 let mut results: Vec<Event> = offsets
756 .iter()
757 .filter_map(|&offset| events.get(offset).cloned())
758 .filter(|event| self.apply_filters(event, &request))
759 .collect();
760
761 results.sort_by_key(|x| x.timestamp);
763
764 if let Some(limit) = request.limit {
766 results.truncate(limit);
767 }
768
769 self.metrics
771 .query_results_total
772 .with_label_values(&[query_type])
773 .inc_by(results.len() as u64);
774
775 timer.observe_duration();
776
777 Ok(results)
778 }
779
780 fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
782 entries
783 .into_iter()
784 .filter(|entry| {
785 if let Some(as_of) = request.as_of
787 && entry.timestamp > as_of
788 {
789 return false;
790 }
791 if let Some(since) = request.since
792 && entry.timestamp < since
793 {
794 return false;
795 }
796 if let Some(until) = request.until
797 && entry.timestamp > until
798 {
799 return false;
800 }
801 true
802 })
803 .map(|entry| entry.offset)
804 .collect()
805 }
806
807 fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
809 if request.entity_id.is_some()
811 && let Some(ref event_type) = request.event_type
812 && event.event_type_str() != event_type
813 {
814 return false;
815 }
816
817 true
818 }
819
820 pub fn reconstruct_state(
823 &self,
824 entity_id: &str,
825 as_of: Option<DateTime<Utc>>,
826 ) -> Result<serde_json::Value> {
827 let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
829 if let Some(snapshot) = self
831 .snapshot_manager
832 .get_snapshot_as_of(entity_id, as_of_time)
833 {
834 tracing::debug!(
835 "Using snapshot from {} for entity {} (saved {} events)",
836 snapshot.as_of,
837 entity_id,
838 snapshot.event_count
839 );
840 (snapshot.state.clone(), Some(snapshot.as_of))
841 } else {
842 (serde_json::json!({}), None)
843 }
844 } else {
845 if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
847 tracing::debug!(
848 "Using latest snapshot from {} for entity {}",
849 snapshot.as_of,
850 entity_id
851 );
852 (snapshot.state.clone(), Some(snapshot.as_of))
853 } else {
854 (serde_json::json!({}), None)
855 }
856 };
857
858 let events = self.query(QueryEventsRequest {
860 entity_id: Some(entity_id.to_string()),
861 event_type: None,
862 tenant_id: None,
863 as_of,
864 since: since_timestamp,
865 until: None,
866 limit: None,
867 })?;
868
869 if events.is_empty() && since_timestamp.is_none() {
871 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
872 }
873
874 let mut merged_state = merged_state;
876 for event in &events {
877 if let serde_json::Value::Object(ref mut state_map) = merged_state
878 && let serde_json::Value::Object(ref payload_map) = event.payload
879 {
880 for (key, value) in payload_map {
881 state_map.insert(key.clone(), value.clone());
882 }
883 }
884 }
885
886 let state = serde_json::json!({
888 "entity_id": entity_id,
889 "last_updated": events.last().map(|e| e.timestamp),
890 "event_count": events.len(),
891 "as_of": as_of,
892 "current_state": merged_state,
893 "history": events.iter().map(|e| {
894 serde_json::json!({
895 "event_id": e.id,
896 "type": e.event_type,
897 "timestamp": e.timestamp,
898 "payload": e.payload
899 })
900 }).collect::<Vec<_>>()
901 });
902
903 Ok(state)
904 }
905
906 pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
908 let projections = self.projections.read();
909
910 if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
911 && let Some(state) = snapshot_projection.get_state(entity_id)
912 {
913 return Ok(serde_json::json!({
914 "entity_id": entity_id,
915 "snapshot": state,
916 "from_projection": "entity_snapshots"
917 }));
918 }
919
920 Err(AllSourceError::EntityNotFound(entity_id.to_string()))
921 }
922
923 pub fn stats(&self) -> StoreStats {
925 let events = self.events.read();
926 let index_stats = self.index.stats();
927
928 StoreStats {
929 total_events: events.len(),
930 total_entities: index_stats.total_entities,
931 total_event_types: index_stats.total_event_types,
932 total_ingested: *self.total_ingested.read(),
933 }
934 }
935
936 pub fn list_streams(&self) -> Vec<StreamInfo> {
938 self.index
939 .get_all_entities()
940 .into_iter()
941 .map(|entity_id| {
942 let event_count = self
943 .index
944 .get_by_entity(&entity_id)
945 .map(|entries| entries.len())
946 .unwrap_or(0);
947 let last_event_at = self
948 .index
949 .get_by_entity(&entity_id)
950 .and_then(|entries| entries.last().map(|e| e.timestamp));
951 StreamInfo {
952 stream_id: entity_id,
953 event_count,
954 last_event_at,
955 }
956 })
957 .collect()
958 }
959
960 pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
962 self.index
963 .get_all_types()
964 .into_iter()
965 .map(|event_type| {
966 let event_count = self
967 .index
968 .get_by_type(&event_type)
969 .map(|entries| entries.len())
970 .unwrap_or(0);
971 let last_event_at = self
972 .index
973 .get_by_type(&event_type)
974 .and_then(|entries| entries.last().map(|e| e.timestamp));
975 EventTypeInfo {
976 event_type,
977 event_count,
978 last_event_at,
979 }
980 })
981 .collect()
982 }
983
984 pub fn enable_wal_replication(
991 &self,
992 tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
993 ) {
994 if let Some(ref wal_arc) = self.wal {
995 wal_arc.set_replication_tx(tx);
996 tracing::info!("WAL replication broadcast enabled");
997 } else {
998 tracing::warn!("Cannot enable WAL replication: WAL is not configured");
999 }
1000 }
1001
1002 pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1005 self.wal.as_ref()
1006 }
1007
1008 pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1011 self.storage.as_ref()
1012 }
1013}
1014
1015#[derive(Debug, Clone, Default)]
1017pub struct EventStoreConfig {
1018 pub storage_dir: Option<PathBuf>,
1020
1021 pub snapshot_config: SnapshotConfig,
1023
1024 pub wal_dir: Option<PathBuf>,
1026
1027 pub wal_config: WALConfig,
1029
1030 pub compaction_config: CompactionConfig,
1032
1033 pub schema_registry_config: SchemaRegistryConfig,
1035
1036 pub system_data_dir: Option<PathBuf>,
1041
1042 pub bootstrap_tenant: Option<String>,
1044}
1045
1046impl EventStoreConfig {
1047 pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
1049 Self {
1050 storage_dir: Some(storage_dir.into()),
1051 ..Self::default()
1052 }
1053 }
1054
1055 pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
1057 Self {
1058 snapshot_config,
1059 ..Self::default()
1060 }
1061 }
1062
1063 pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
1065 Self {
1066 wal_dir: Some(wal_dir.into()),
1067 wal_config,
1068 ..Self::default()
1069 }
1070 }
1071
1072 pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
1074 Self {
1075 storage_dir: Some(storage_dir.into()),
1076 snapshot_config,
1077 ..Self::default()
1078 }
1079 }
1080
1081 pub fn production(
1083 storage_dir: impl Into<PathBuf>,
1084 wal_dir: impl Into<PathBuf>,
1085 snapshot_config: SnapshotConfig,
1086 wal_config: WALConfig,
1087 compaction_config: CompactionConfig,
1088 ) -> Self {
1089 let storage_dir = storage_dir.into();
1090 let system_data_dir = storage_dir.join("__system");
1091 Self {
1092 storage_dir: Some(storage_dir),
1093 snapshot_config,
1094 wal_dir: Some(wal_dir.into()),
1095 wal_config,
1096 compaction_config,
1097 system_data_dir: Some(system_data_dir),
1098 ..Self::default()
1099 }
1100 }
1101
1102 pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
1107 self.system_data_dir
1108 .clone()
1109 .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
1110 }
1111
1112 pub fn from_env() -> (Self, &'static str) {
1120 Self::from_env_vars(
1121 std::env::var("ALLSOURCE_DATA_DIR")
1122 .ok()
1123 .filter(|s| !s.is_empty()),
1124 std::env::var("ALLSOURCE_STORAGE_DIR")
1125 .ok()
1126 .filter(|s| !s.is_empty()),
1127 std::env::var("ALLSOURCE_WAL_DIR")
1128 .ok()
1129 .filter(|s| !s.is_empty()),
1130 std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
1131 )
1132 }
1133
1134 pub fn from_env_vars(
1136 data_dir: Option<String>,
1137 explicit_storage_dir: Option<String>,
1138 explicit_wal_dir: Option<String>,
1139 wal_enabled_var: Option<String>,
1140 ) -> (Self, &'static str) {
1141 let data_dir = data_dir.filter(|s| !s.is_empty());
1142 let storage_dir = explicit_storage_dir
1143 .filter(|s| !s.is_empty())
1144 .or_else(|| data_dir.as_ref().map(|d| format!("{}/storage", d)));
1145 let wal_dir = explicit_wal_dir
1146 .filter(|s| !s.is_empty())
1147 .or_else(|| data_dir.as_ref().map(|d| format!("{}/wal", d)));
1148 let wal_enabled = wal_enabled_var.map(|v| v == "true").unwrap_or(true);
1149
1150 match (&storage_dir, &wal_dir) {
1151 (Some(sd), Some(wd)) if wal_enabled => {
1152 let config = Self::production(
1153 sd,
1154 wd,
1155 SnapshotConfig::default(),
1156 WALConfig::default(),
1157 CompactionConfig::default(),
1158 );
1159 (config, "wal+parquet")
1160 }
1161 (Some(sd), _) => {
1162 let config = Self::with_persistence(sd);
1163 (config, "parquet-only")
1164 }
1165 (_, Some(wd)) if wal_enabled => {
1166 let config = Self::with_wal(wd, WALConfig::default());
1167 (config, "wal-only")
1168 }
1169 _ => (Self::default(), "in-memory"),
1170 }
1171 }
1172}
1173
1174#[derive(Debug, serde::Serialize)]
1175pub struct StoreStats {
1176 pub total_events: usize,
1177 pub total_entities: usize,
1178 pub total_event_types: usize,
1179 pub total_ingested: u64,
1180}
1181
1182#[derive(Debug, Clone, serde::Serialize)]
1184pub struct StreamInfo {
1185 pub stream_id: String,
1187 pub event_count: usize,
1189 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1191}
1192
1193#[derive(Debug, Clone, serde::Serialize)]
1195pub struct EventTypeInfo {
1196 pub event_type: String,
1198 pub event_count: usize,
1200 pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1202}
1203
1204impl Default for EventStore {
1205 fn default() -> Self {
1206 Self::new()
1207 }
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212 use super::*;
1213 use crate::domain::entities::Event;
1214 use tempfile::TempDir;
1215
1216 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1217 Event::from_strings(
1218 event_type.to_string(),
1219 entity_id.to_string(),
1220 "default".to_string(),
1221 serde_json::json!({"name": "Test", "value": 42}),
1222 None,
1223 )
1224 .unwrap()
1225 }
1226
1227 #[test]
1228 fn test_event_store_new() {
1229 let store = EventStore::new();
1230 assert_eq!(store.stats().total_events, 0);
1231 assert_eq!(store.stats().total_entities, 0);
1232 }
1233
1234 #[test]
1235 fn test_event_store_default() {
1236 let store = EventStore::default();
1237 assert_eq!(store.stats().total_events, 0);
1238 }
1239
1240 #[test]
1241 fn test_ingest_single_event() {
1242 let store = EventStore::new();
1243 let event = create_test_event("entity-1", "user.created");
1244
1245 store.ingest(event).unwrap();
1246
1247 assert_eq!(store.stats().total_events, 1);
1248 assert_eq!(store.stats().total_ingested, 1);
1249 }
1250
1251 #[test]
1252 fn test_ingest_multiple_events() {
1253 let store = EventStore::new();
1254
1255 for i in 0..10 {
1256 let event = create_test_event(&format!("entity-{}", i), "user.created");
1257 store.ingest(event).unwrap();
1258 }
1259
1260 assert_eq!(store.stats().total_events, 10);
1261 assert_eq!(store.stats().total_ingested, 10);
1262 }
1263
1264 #[test]
1265 fn test_query_by_entity_id() {
1266 let store = EventStore::new();
1267
1268 store
1269 .ingest(create_test_event("entity-1", "user.created"))
1270 .unwrap();
1271 store
1272 .ingest(create_test_event("entity-2", "user.created"))
1273 .unwrap();
1274 store
1275 .ingest(create_test_event("entity-1", "user.updated"))
1276 .unwrap();
1277
1278 let results = store
1279 .query(QueryEventsRequest {
1280 entity_id: Some("entity-1".to_string()),
1281 event_type: None,
1282 tenant_id: None,
1283 as_of: None,
1284 since: None,
1285 until: None,
1286 limit: None,
1287 })
1288 .unwrap();
1289
1290 assert_eq!(results.len(), 2);
1291 }
1292
1293 #[test]
1294 fn test_query_by_event_type() {
1295 let store = EventStore::new();
1296
1297 store
1298 .ingest(create_test_event("entity-1", "user.created"))
1299 .unwrap();
1300 store
1301 .ingest(create_test_event("entity-2", "user.updated"))
1302 .unwrap();
1303 store
1304 .ingest(create_test_event("entity-3", "user.created"))
1305 .unwrap();
1306
1307 let results = store
1308 .query(QueryEventsRequest {
1309 entity_id: None,
1310 event_type: Some("user.created".to_string()),
1311 tenant_id: None,
1312 as_of: None,
1313 since: None,
1314 until: None,
1315 limit: None,
1316 })
1317 .unwrap();
1318
1319 assert_eq!(results.len(), 2);
1320 }
1321
1322 #[test]
1323 fn test_query_with_limit() {
1324 let store = EventStore::new();
1325
1326 for i in 0..10 {
1327 let event = create_test_event(&format!("entity-{}", i), "user.created");
1328 store.ingest(event).unwrap();
1329 }
1330
1331 let results = store
1332 .query(QueryEventsRequest {
1333 entity_id: None,
1334 event_type: None,
1335 tenant_id: None,
1336 as_of: None,
1337 since: None,
1338 until: None,
1339 limit: Some(5),
1340 })
1341 .unwrap();
1342
1343 assert_eq!(results.len(), 5);
1344 }
1345
1346 #[test]
1347 fn test_query_empty_store() {
1348 let store = EventStore::new();
1349
1350 let results = store
1351 .query(QueryEventsRequest {
1352 entity_id: Some("non-existent".to_string()),
1353 event_type: None,
1354 tenant_id: None,
1355 as_of: None,
1356 since: None,
1357 until: None,
1358 limit: None,
1359 })
1360 .unwrap();
1361
1362 assert!(results.is_empty());
1363 }
1364
1365 #[test]
1366 fn test_reconstruct_state() {
1367 let store = EventStore::new();
1368
1369 store
1370 .ingest(create_test_event("entity-1", "user.created"))
1371 .unwrap();
1372
1373 let state = store.reconstruct_state("entity-1", None).unwrap();
1374 assert_eq!(state["current_state"]["name"], "Test");
1376 assert_eq!(state["current_state"]["value"], 42);
1377 }
1378
1379 #[test]
1380 fn test_reconstruct_state_not_found() {
1381 let store = EventStore::new();
1382
1383 let result = store.reconstruct_state("non-existent", None);
1384 assert!(result.is_err());
1385 }
1386
1387 #[test]
1388 fn test_get_snapshot_empty() {
1389 let store = EventStore::new();
1390
1391 let result = store.get_snapshot("non-existent");
1392 assert!(result.is_err());
1394 }
1395
1396 #[test]
1397 fn test_create_snapshot() {
1398 let store = EventStore::new();
1399
1400 store
1401 .ingest(create_test_event("entity-1", "user.created"))
1402 .unwrap();
1403
1404 store.create_snapshot("entity-1").unwrap();
1405
1406 let snapshot = store.get_snapshot("entity-1").unwrap();
1408 assert!(snapshot != serde_json::json!(null));
1409 }
1410
1411 #[test]
1412 fn test_create_snapshot_entity_not_found() {
1413 let store = EventStore::new();
1414
1415 let result = store.create_snapshot("non-existent");
1416 assert!(result.is_err());
1417 }
1418
1419 #[test]
1420 fn test_websocket_manager() {
1421 let store = EventStore::new();
1422 let manager = store.websocket_manager();
1423 assert!(Arc::strong_count(&manager) >= 1);
1425 }
1426
1427 #[test]
1428 fn test_snapshot_manager() {
1429 let store = EventStore::new();
1430 let manager = store.snapshot_manager();
1431 assert!(Arc::strong_count(&manager) >= 1);
1432 }
1433
1434 #[test]
1435 fn test_compaction_manager_none() {
1436 let store = EventStore::new();
1437 assert!(store.compaction_manager().is_none());
1439 }
1440
1441 #[test]
1442 fn test_schema_registry() {
1443 let store = EventStore::new();
1444 let registry = store.schema_registry();
1445 assert!(Arc::strong_count(®istry) >= 1);
1446 }
1447
1448 #[test]
1449 fn test_replay_manager() {
1450 let store = EventStore::new();
1451 let manager = store.replay_manager();
1452 assert!(Arc::strong_count(&manager) >= 1);
1453 }
1454
1455 #[test]
1456 fn test_pipeline_manager() {
1457 let store = EventStore::new();
1458 let manager = store.pipeline_manager();
1459 assert!(Arc::strong_count(&manager) >= 1);
1460 }
1461
1462 #[test]
1463 fn test_projection_manager() {
1464 let store = EventStore::new();
1465 let manager = store.projection_manager();
1466 let projections = manager.list_projections();
1468 assert!(projections.len() >= 2); }
1470
1471 #[test]
1472 fn test_projection_state_cache() {
1473 let store = EventStore::new();
1474 let cache = store.projection_state_cache();
1475
1476 cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
1477 assert_eq!(cache.len(), 1);
1478
1479 let value = cache.get("test:key").unwrap();
1480 assert_eq!(value["value"], 123);
1481 }
1482
1483 #[test]
1484 fn test_metrics() {
1485 let store = EventStore::new();
1486 let metrics = store.metrics();
1487 assert!(Arc::strong_count(&metrics) >= 1);
1488 }
1489
1490 #[test]
1491 fn test_store_stats() {
1492 let store = EventStore::new();
1493
1494 store
1495 .ingest(create_test_event("entity-1", "user.created"))
1496 .unwrap();
1497 store
1498 .ingest(create_test_event("entity-2", "order.placed"))
1499 .unwrap();
1500
1501 let stats = store.stats();
1502 assert_eq!(stats.total_events, 2);
1503 assert_eq!(stats.total_entities, 2);
1504 assert_eq!(stats.total_event_types, 2);
1505 assert_eq!(stats.total_ingested, 2);
1506 }
1507
1508 #[test]
1509 fn test_event_store_config_default() {
1510 let config = EventStoreConfig::default();
1511 assert!(config.storage_dir.is_none());
1512 assert!(config.wal_dir.is_none());
1513 }
1514
1515 #[test]
1516 fn test_event_store_config_with_persistence() {
1517 let temp_dir = TempDir::new().unwrap();
1518 let config = EventStoreConfig::with_persistence(temp_dir.path());
1519
1520 assert!(config.storage_dir.is_some());
1521 assert!(config.wal_dir.is_none());
1522 }
1523
1524 #[test]
1525 fn test_event_store_config_with_wal() {
1526 let temp_dir = TempDir::new().unwrap();
1527 let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
1528
1529 assert!(config.storage_dir.is_none());
1530 assert!(config.wal_dir.is_some());
1531 }
1532
1533 #[test]
1534 fn test_event_store_config_with_all() {
1535 let temp_dir = TempDir::new().unwrap();
1536 let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
1537
1538 assert!(config.storage_dir.is_some());
1539 }
1540
1541 #[test]
1542 fn test_event_store_config_production() {
1543 let storage_dir = TempDir::new().unwrap();
1544 let wal_dir = TempDir::new().unwrap();
1545 let config = EventStoreConfig::production(
1546 storage_dir.path(),
1547 wal_dir.path(),
1548 SnapshotConfig::default(),
1549 WALConfig::default(),
1550 CompactionConfig::default(),
1551 );
1552
1553 assert!(config.storage_dir.is_some());
1554 assert!(config.wal_dir.is_some());
1555 }
1556
1557 #[test]
1563 fn test_from_env_vars_data_dir_enables_full_persistence() {
1564 let (config, mode) =
1565 EventStoreConfig::from_env_vars(Some("/app/data".to_string()), None, None, None);
1566 assert_eq!(mode, "wal+parquet");
1567 assert_eq!(
1568 config.storage_dir.unwrap().to_str().unwrap(),
1569 "/app/data/storage"
1570 );
1571 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
1572 }
1573
1574 #[test]
1575 fn test_from_env_vars_explicit_dirs() {
1576 let (config, mode) = EventStoreConfig::from_env_vars(
1577 None,
1578 Some("/custom/storage".to_string()),
1579 Some("/custom/wal".to_string()),
1580 None,
1581 );
1582 assert_eq!(mode, "wal+parquet");
1583 assert_eq!(
1584 config.storage_dir.unwrap().to_str().unwrap(),
1585 "/custom/storage"
1586 );
1587 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
1588 }
1589
1590 #[test]
1591 fn test_from_env_vars_wal_disabled() {
1592 let (config, mode) = EventStoreConfig::from_env_vars(
1593 Some("/app/data".to_string()),
1594 None,
1595 None,
1596 Some("false".to_string()),
1597 );
1598 assert_eq!(mode, "parquet-only");
1599 assert!(config.storage_dir.is_some());
1600 assert!(config.wal_dir.is_none());
1601 }
1602
1603 #[test]
1604 fn test_from_env_vars_no_dirs_is_in_memory() {
1605 let (config, mode) = EventStoreConfig::from_env_vars(None, None, None, None);
1606 assert_eq!(mode, "in-memory");
1607 assert!(config.storage_dir.is_none());
1608 assert!(config.wal_dir.is_none());
1609 }
1610
1611 #[test]
1612 fn test_from_env_vars_empty_strings_treated_as_none() {
1613 let (_, mode) = EventStoreConfig::from_env_vars(
1614 Some("".to_string()),
1615 Some("".to_string()),
1616 Some("".to_string()),
1617 None,
1618 );
1619 assert_eq!(mode, "in-memory");
1620 }
1621
1622 #[test]
1623 fn test_from_env_vars_explicit_overrides_data_dir() {
1624 let (config, mode) = EventStoreConfig::from_env_vars(
1625 Some("/app/data".to_string()),
1626 Some("/override/storage".to_string()),
1627 Some("/override/wal".to_string()),
1628 None,
1629 );
1630 assert_eq!(mode, "wal+parquet");
1631 assert_eq!(
1632 config.storage_dir.unwrap().to_str().unwrap(),
1633 "/override/storage"
1634 );
1635 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
1636 }
1637
1638 #[test]
1639 fn test_from_env_vars_wal_only() {
1640 let (config, mode) =
1641 EventStoreConfig::from_env_vars(None, None, Some("/wal/only".to_string()), None);
1642 assert_eq!(mode, "wal-only");
1643 assert!(config.storage_dir.is_none());
1644 assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
1645 }
1646
1647 #[test]
1648 fn test_store_stats_serde() {
1649 let stats = StoreStats {
1650 total_events: 100,
1651 total_entities: 50,
1652 total_event_types: 10,
1653 total_ingested: 100,
1654 };
1655
1656 let json = serde_json::to_string(&stats).unwrap();
1657 assert!(json.contains("\"total_events\":100"));
1658 assert!(json.contains("\"total_entities\":50"));
1659 }
1660
1661 #[test]
1662 fn test_query_with_entity_and_type() {
1663 let store = EventStore::new();
1664
1665 store
1666 .ingest(create_test_event("entity-1", "user.created"))
1667 .unwrap();
1668 store
1669 .ingest(create_test_event("entity-1", "user.updated"))
1670 .unwrap();
1671 store
1672 .ingest(create_test_event("entity-2", "user.created"))
1673 .unwrap();
1674
1675 let results = store
1676 .query(QueryEventsRequest {
1677 entity_id: Some("entity-1".to_string()),
1678 event_type: Some("user.created".to_string()),
1679 tenant_id: None,
1680 as_of: None,
1681 since: None,
1682 until: None,
1683 limit: None,
1684 })
1685 .unwrap();
1686
1687 assert_eq!(results.len(), 1);
1688 assert_eq!(results[0].event_type_str(), "user.created");
1689 }
1690
1691 #[test]
1692 fn test_flush_storage_no_storage() {
1693 let store = EventStore::new();
1694 let result = store.flush_storage();
1696 assert!(result.is_ok());
1697 }
1698
1699 #[test]
1700 fn test_state_evolution() {
1701 let store = EventStore::new();
1702
1703 store
1705 .ingest(
1706 Event::from_strings(
1707 "user.created".to_string(),
1708 "user-1".to_string(),
1709 "default".to_string(),
1710 serde_json::json!({"name": "Alice", "age": 25}),
1711 None,
1712 )
1713 .unwrap(),
1714 )
1715 .unwrap();
1716
1717 store
1719 .ingest(
1720 Event::from_strings(
1721 "user.updated".to_string(),
1722 "user-1".to_string(),
1723 "default".to_string(),
1724 serde_json::json!({"age": 26}),
1725 None,
1726 )
1727 .unwrap(),
1728 )
1729 .unwrap();
1730
1731 let state = store.reconstruct_state("user-1", None).unwrap();
1732 assert_eq!(state["current_state"]["name"], "Alice");
1734 assert_eq!(state["current_state"]["age"], 26);
1735 }
1736
1737 #[test]
1738 fn test_reject_system_event_types() {
1739 let store = EventStore::new();
1740
1741 let event = Event::reconstruct_from_strings(
1743 uuid::Uuid::new_v4(),
1744 "_system.tenant.created".to_string(),
1745 "_system:tenant:acme".to_string(),
1746 "_system".to_string(),
1747 serde_json::json!({"name": "ACME"}),
1748 chrono::Utc::now(),
1749 None,
1750 1,
1751 );
1752
1753 let result = store.ingest(event);
1754 assert!(result.is_err());
1755 let err = result.unwrap_err();
1756 assert!(
1757 err.to_string().contains("reserved for internal use"),
1758 "Expected system namespace rejection, got: {}",
1759 err
1760 );
1761 }
1762}