1use crate::application::dto::QueryEventsRequest;
2use crate::infrastructure::persistence::compaction::{CompactionConfig, CompactionManager};
3use crate::domain::entities::Event;
4use crate::error::{AllSourceError, Result};
5use crate::infrastructure::persistence::index::{EventIndex, IndexEntry};
6use crate::infrastructure::observability::metrics::MetricsRegistry;
7use crate::application::services::pipeline::PipelineManager;
8use crate::application::services::projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager};
9use crate::application::services::replay::ReplayManager;
10use crate::application::services::schema::{SchemaRegistry, SchemaRegistryConfig};
11use crate::infrastructure::persistence::snapshot::{SnapshotConfig, SnapshotManager, SnapshotType};
12use crate::infrastructure::persistence::storage::ParquetStorage;
13use crate::infrastructure::persistence::wal::{WALConfig, WriteAheadLog};
14use crate::infrastructure::web::websocket::WebSocketManager;
15use chrono::{DateTime, Utc};
16use dashmap::DashMap;
17use parking_lot::RwLock;
18use std::path::PathBuf;
19use std::sync::Arc;
20
21pub struct EventStore {
23 events: Arc<RwLock<Vec<Event>>>,
25
26 index: Arc<EventIndex>,
28
29 pub(crate) projections: Arc<RwLock<ProjectionManager>>,
31
32 storage: Option<Arc<RwLock<ParquetStorage>>>,
34
35 websocket_manager: Arc<WebSocketManager>,
37
38 snapshot_manager: Arc<SnapshotManager>,
40
41 wal: Option<Arc<WriteAheadLog>>,
43
44 compaction_manager: Option<Arc<CompactionManager>>,
46
47 schema_registry: Arc<SchemaRegistry>,
49
50 replay_manager: Arc<ReplayManager>,
52
53 pipeline_manager: Arc<PipelineManager>,
55
56 metrics: Arc<MetricsRegistry>,
58
59 total_ingested: Arc<RwLock<u64>>,
61
62 projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
66}
67
68impl EventStore {
69 pub fn new() -> Self {
71 Self::with_config(EventStoreConfig::default())
72 }
73
74 pub fn with_config(config: EventStoreConfig) -> Self {
76 let mut projections = ProjectionManager::new();
77
78 projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
80 projections.register(Arc::new(EventCounterProjection::new("event_counters")));
81
82 let storage = config
84 .storage_dir
85 .as_ref()
86 .and_then(|dir| match ParquetStorage::new(dir) {
87 Ok(storage) => {
88 tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
89 Some(Arc::new(RwLock::new(storage)))
90 }
91 Err(e) => {
92 tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
93 None
94 }
95 });
96
97 let wal = config.wal_dir.as_ref().and_then(|dir| {
99 match WriteAheadLog::new(dir, config.wal_config.clone()) {
100 Ok(wal) => {
101 tracing::info!("✅ WAL enabled at: {}", dir.display());
102 Some(Arc::new(wal))
103 }
104 Err(e) => {
105 tracing::error!("❌ Failed to initialize WAL: {}", e);
106 None
107 }
108 }
109 });
110
111 let compaction_manager = config.storage_dir.as_ref().map(|dir| {
113 let manager = CompactionManager::new(dir, config.compaction_config.clone());
114 Arc::new(manager)
115 });
116
117 let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
119 tracing::info!("✅ Schema registry enabled");
120
121 let replay_manager = Arc::new(ReplayManager::new());
123 tracing::info!("✅ Replay manager enabled");
124
125 let pipeline_manager = Arc::new(PipelineManager::new());
127 tracing::info!("✅ Pipeline manager enabled");
128
129 let metrics = MetricsRegistry::new();
131 tracing::info!("✅ Prometheus metrics registry initialized");
132
133 let projection_state_cache = Arc::new(DashMap::new());
135 tracing::info!("✅ Projection state cache initialized");
136
137 let store = Self {
138 events: Arc::new(RwLock::new(Vec::new())),
139 index: Arc::new(EventIndex::new()),
140 projections: Arc::new(RwLock::new(projections)),
141 storage,
142 websocket_manager: Arc::new(WebSocketManager::new()),
143 snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
144 wal,
145 compaction_manager,
146 schema_registry,
147 replay_manager,
148 pipeline_manager,
149 metrics,
150 total_ingested: Arc::new(RwLock::new(0)),
151 projection_state_cache,
152 };
153
154 let mut wal_recovered = false;
156 if let Some(ref wal) = store.wal {
157 match wal.recover() {
158 Ok(recovered_events) if !recovered_events.is_empty() => {
159 tracing::info!(
160 "🔄 Recovering {} events from WAL...",
161 recovered_events.len()
162 );
163
164 for event in recovered_events {
165 let offset = store.events.read().len();
167 if let Err(e) = store.index.index_event(
168 event.id,
169 event.entity_id_str(),
170 event.event_type_str(),
171 event.timestamp,
172 offset,
173 ) {
174 tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
175 }
176
177 if let Err(e) = store.projections.read().process_event(&event) {
178 tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
179 }
180
181 store.events.write().push(event);
182 }
183
184 let total = store.events.read().len();
185 *store.total_ingested.write() = total as u64;
186 tracing::info!("✅ Successfully recovered {} events from WAL", total);
187
188 if store.storage.is_some() {
190 tracing::info!("📸 Checkpointing WAL to Parquet storage...");
191 if let Err(e) = store.flush_storage() {
192 tracing::error!("Failed to checkpoint to Parquet: {}", e);
193 } else if let Err(e) = wal.truncate() {
194 tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
195 } else {
196 tracing::info!("✅ WAL checkpointed and truncated");
197 }
198 }
199
200 wal_recovered = true;
201 }
202 Ok(_) => {
203 tracing::debug!("No events to recover from WAL");
204 }
205 Err(e) => {
206 tracing::error!("❌ WAL recovery failed: {}", e);
207 }
208 }
209 }
210
211 if !wal_recovered {
214 if let Some(ref storage) = store.storage {
215 if let Ok(persisted_events) = storage.read().load_all_events() {
216 tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
217
218 for event in persisted_events {
219 let offset = store.events.read().len();
221 if let Err(e) = store.index.index_event(
222 event.id,
223 event.entity_id_str(),
224 event.event_type_str(),
225 event.timestamp,
226 offset,
227 ) {
228 tracing::error!("Failed to re-index event {}: {}", event.id, e);
229 }
230
231 if let Err(e) = store.projections.read().process_event(&event) {
233 tracing::error!("Failed to re-process event {}: {}", event.id, e);
234 }
235
236 store.events.write().push(event);
237 }
238
239 let total = store.events.read().len();
240 *store.total_ingested.write() = total as u64;
241 tracing::info!("✅ Successfully loaded {} events from storage", total);
242 }
243 }
244 }
245
246 store
247 }
248
249 pub fn ingest(&self, event: Event) -> Result<()> {
251 let timer = self.metrics.ingestion_duration_seconds.start_timer();
253
254 let validation_result = self.validate_event(&event);
256 if let Err(e) = validation_result {
257 self.metrics.ingestion_errors_total.inc();
259 timer.observe_duration();
260 return Err(e);
261 }
262
263 if let Some(ref wal) = self.wal {
266 if let Err(e) = wal.append(event.clone()) {
267 self.metrics.ingestion_errors_total.inc();
268 timer.observe_duration();
269 return Err(e);
270 }
271 }
272
273 let mut events = self.events.write();
274 let offset = events.len();
275
276 self.index.index_event(
278 event.id,
279 event.entity_id_str(),
280 event.event_type_str(),
281 event.timestamp,
282 offset,
283 )?;
284
285 let projections = self.projections.read();
287 projections.process_event(&event)?;
288 drop(projections); let pipeline_results = self.pipeline_manager.process_event(&event);
293 if !pipeline_results.is_empty() {
294 tracing::debug!(
295 "Event {} processed by {} pipeline(s)",
296 event.id,
297 pipeline_results.len()
298 );
299 for (pipeline_id, result) in pipeline_results {
302 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
303 }
304 }
305
306 if let Some(ref storage) = self.storage {
308 let mut storage = storage.write();
309 storage.append_event(event.clone())?;
310 }
311
312 events.push(event.clone());
314 let total_events = events.len();
315 drop(events); self.websocket_manager
319 .broadcast_event(Arc::new(event.clone()));
320
321 self.check_auto_snapshot(event.entity_id_str(), &event);
323
324 self.metrics.events_ingested_total.inc();
326 self.metrics
327 .events_ingested_by_type
328 .with_label_values(&[event.event_type_str()])
329 .inc();
330 self.metrics.storage_events_total.set(total_events as i64);
331
332 let mut total = self.total_ingested.write();
334 *total += 1;
335
336 timer.observe_duration();
337
338 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
339
340 Ok(())
341 }
342
343 pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
345 Arc::clone(&self.websocket_manager)
346 }
347
348 pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
350 Arc::clone(&self.snapshot_manager)
351 }
352
353 pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
355 self.compaction_manager.as_ref().map(Arc::clone)
356 }
357
358 pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
360 Arc::clone(&self.schema_registry)
361 }
362
363 pub fn replay_manager(&self) -> Arc<ReplayManager> {
365 Arc::clone(&self.replay_manager)
366 }
367
368 pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
370 Arc::clone(&self.pipeline_manager)
371 }
372
373 pub fn metrics(&self) -> Arc<MetricsRegistry> {
375 Arc::clone(&self.metrics)
376 }
377
378 pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
380 self.projections.read()
381 }
382
383 pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
386 Arc::clone(&self.projection_state_cache)
387 }
388
389 pub fn flush_storage(&self) -> Result<()> {
391 if let Some(ref storage) = self.storage {
392 let mut storage = storage.write();
393 storage.flush()?;
394 tracing::info!("✅ Flushed events to persistent storage");
395 }
396 Ok(())
397 }
398
399 pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
401 let events = self.query(QueryEventsRequest {
403 entity_id: Some(entity_id.to_string()),
404 event_type: None,
405 tenant_id: None,
406 as_of: None,
407 since: None,
408 until: None,
409 limit: None,
410 })?;
411
412 if events.is_empty() {
413 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
414 }
415
416 let mut state = serde_json::json!({});
418 for event in &events {
419 if let serde_json::Value::Object(ref mut state_map) = state {
420 if let serde_json::Value::Object(ref payload_map) = event.payload {
421 for (key, value) in payload_map {
422 state_map.insert(key.clone(), value.clone());
423 }
424 }
425 }
426 }
427
428 let last_event = events.last().unwrap();
429 self.snapshot_manager.create_snapshot(
430 entity_id.to_string(),
431 state,
432 last_event.timestamp,
433 events.len(),
434 SnapshotType::Manual,
435 )?;
436
437 Ok(())
438 }
439
440 fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
442 let entity_event_count = self
444 .index
445 .get_by_entity(entity_id)
446 .map(|entries| entries.len())
447 .unwrap_or(0);
448
449 if self.snapshot_manager.should_create_snapshot(
450 entity_id,
451 entity_event_count,
452 event.timestamp,
453 ) {
454 if let Err(e) = self.create_snapshot(entity_id) {
456 tracing::warn!(
457 "Failed to create automatic snapshot for {}: {}",
458 entity_id,
459 e
460 );
461 }
462 }
463 }
464
465 fn validate_event(&self, event: &Event) -> Result<()> {
467 if event.entity_id_str().is_empty() {
470 return Err(AllSourceError::ValidationError(
471 "entity_id cannot be empty".to_string(),
472 ));
473 }
474
475 if event.event_type_str().is_empty() {
476 return Err(AllSourceError::ValidationError(
477 "event_type cannot be empty".to_string(),
478 ));
479 }
480
481 Ok(())
482 }
483
484 pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
486 let query_type = if request.entity_id.is_some() {
488 "entity"
489 } else if request.event_type.is_some() {
490 "type"
491 } else {
492 "full_scan"
493 };
494
495 let timer = self
497 .metrics
498 .query_duration_seconds
499 .with_label_values(&[query_type])
500 .start_timer();
501
502 self.metrics
504 .queries_total
505 .with_label_values(&[query_type])
506 .inc();
507
508 let events = self.events.read();
509
510 let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
512 self.index
514 .get_by_entity(entity_id)
515 .map(|entries| self.filter_entries(entries, &request))
516 .unwrap_or_default()
517 } else if let Some(event_type) = &request.event_type {
518 self.index
520 .get_by_type(event_type)
521 .map(|entries| self.filter_entries(entries, &request))
522 .unwrap_or_default()
523 } else {
524 (0..events.len()).collect()
526 };
527
528 let mut results: Vec<Event> = offsets
530 .iter()
531 .filter_map(|&offset| events.get(offset).cloned())
532 .filter(|event| self.apply_filters(event, &request))
533 .collect();
534
535 results.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
537
538 if let Some(limit) = request.limit {
540 results.truncate(limit);
541 }
542
543 self.metrics
545 .query_results_total
546 .with_label_values(&[query_type])
547 .inc_by(results.len() as u64);
548
549 timer.observe_duration();
550
551 Ok(results)
552 }
553
554 fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
556 entries
557 .into_iter()
558 .filter(|entry| {
559 if let Some(as_of) = request.as_of {
561 if entry.timestamp > as_of {
562 return false;
563 }
564 }
565 if let Some(since) = request.since {
566 if entry.timestamp < since {
567 return false;
568 }
569 }
570 if let Some(until) = request.until {
571 if entry.timestamp > until {
572 return false;
573 }
574 }
575 true
576 })
577 .map(|entry| entry.offset)
578 .collect()
579 }
580
581 fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
583 if request.entity_id.is_some() {
585 if let Some(ref event_type) = request.event_type {
586 if event.event_type_str() != event_type {
587 return false;
588 }
589 }
590 }
591
592 true
593 }
594
595 pub fn reconstruct_state(
598 &self,
599 entity_id: &str,
600 as_of: Option<DateTime<Utc>>,
601 ) -> Result<serde_json::Value> {
602 let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
604 if let Some(snapshot) = self
606 .snapshot_manager
607 .get_snapshot_as_of(entity_id, as_of_time)
608 {
609 tracing::debug!(
610 "Using snapshot from {} for entity {} (saved {} events)",
611 snapshot.as_of,
612 entity_id,
613 snapshot.event_count
614 );
615 (snapshot.state.clone(), Some(snapshot.as_of))
616 } else {
617 (serde_json::json!({}), None)
618 }
619 } else {
620 if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
622 tracing::debug!(
623 "Using latest snapshot from {} for entity {}",
624 snapshot.as_of,
625 entity_id
626 );
627 (snapshot.state.clone(), Some(snapshot.as_of))
628 } else {
629 (serde_json::json!({}), None)
630 }
631 };
632
633 let events = self.query(QueryEventsRequest {
635 entity_id: Some(entity_id.to_string()),
636 event_type: None,
637 tenant_id: None,
638 as_of,
639 since: since_timestamp,
640 until: None,
641 limit: None,
642 })?;
643
644 if events.is_empty() && since_timestamp.is_none() {
646 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
647 }
648
649 let mut merged_state = merged_state;
651 for event in &events {
652 if let serde_json::Value::Object(ref mut state_map) = merged_state {
653 if let serde_json::Value::Object(ref payload_map) = event.payload {
654 for (key, value) in payload_map {
655 state_map.insert(key.clone(), value.clone());
656 }
657 }
658 }
659 }
660
661 let state = serde_json::json!({
663 "entity_id": entity_id,
664 "last_updated": events.last().map(|e| e.timestamp),
665 "event_count": events.len(),
666 "as_of": as_of,
667 "current_state": merged_state,
668 "history": events.iter().map(|e| {
669 serde_json::json!({
670 "event_id": e.id,
671 "type": e.event_type,
672 "timestamp": e.timestamp,
673 "payload": e.payload
674 })
675 }).collect::<Vec<_>>()
676 });
677
678 Ok(state)
679 }
680
681 pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
683 let projections = self.projections.read();
684
685 if let Some(snapshot_projection) = projections.get_projection("entity_snapshots") {
686 if let Some(state) = snapshot_projection.get_state(entity_id) {
687 return Ok(serde_json::json!({
688 "entity_id": entity_id,
689 "snapshot": state,
690 "from_projection": "entity_snapshots"
691 }));
692 }
693 }
694
695 Err(AllSourceError::EntityNotFound(entity_id.to_string()))
696 }
697
698 pub fn stats(&self) -> StoreStats {
700 let events = self.events.read();
701 let index_stats = self.index.stats();
702
703 StoreStats {
704 total_events: events.len(),
705 total_entities: index_stats.total_entities,
706 total_event_types: index_stats.total_event_types,
707 total_ingested: *self.total_ingested.read(),
708 }
709 }
710}
711
712#[derive(Debug, Clone)]
714pub struct EventStoreConfig {
715 pub storage_dir: Option<PathBuf>,
717
718 pub snapshot_config: SnapshotConfig,
720
721 pub wal_dir: Option<PathBuf>,
723
724 pub wal_config: WALConfig,
726
727 pub compaction_config: CompactionConfig,
729
730 pub schema_registry_config: SchemaRegistryConfig,
732}
733
734impl Default for EventStoreConfig {
735 fn default() -> Self {
736 Self {
737 storage_dir: None,
738 snapshot_config: SnapshotConfig::default(),
739 wal_dir: None,
740 wal_config: WALConfig::default(),
741 compaction_config: CompactionConfig::default(),
742 schema_registry_config: SchemaRegistryConfig::default(),
743 }
744 }
745}
746
747impl EventStoreConfig {
748 pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
750 Self {
751 storage_dir: Some(storage_dir.into()),
752 snapshot_config: SnapshotConfig::default(),
753 wal_dir: None,
754 wal_config: WALConfig::default(),
755 compaction_config: CompactionConfig::default(),
756 schema_registry_config: SchemaRegistryConfig::default(),
757 }
758 }
759
760 pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
762 Self {
763 storage_dir: None,
764 snapshot_config,
765 wal_dir: None,
766 wal_config: WALConfig::default(),
767 compaction_config: CompactionConfig::default(),
768 schema_registry_config: SchemaRegistryConfig::default(),
769 }
770 }
771
772 pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
774 Self {
775 storage_dir: None,
776 snapshot_config: SnapshotConfig::default(),
777 wal_dir: Some(wal_dir.into()),
778 wal_config,
779 compaction_config: CompactionConfig::default(),
780 schema_registry_config: SchemaRegistryConfig::default(),
781 }
782 }
783
784 pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
786 Self {
787 storage_dir: Some(storage_dir.into()),
788 snapshot_config,
789 wal_dir: None,
790 wal_config: WALConfig::default(),
791 compaction_config: CompactionConfig::default(),
792 schema_registry_config: SchemaRegistryConfig::default(),
793 }
794 }
795
796 pub fn production(
798 storage_dir: impl Into<PathBuf>,
799 wal_dir: impl Into<PathBuf>,
800 snapshot_config: SnapshotConfig,
801 wal_config: WALConfig,
802 compaction_config: CompactionConfig,
803 ) -> Self {
804 Self {
805 storage_dir: Some(storage_dir.into()),
806 snapshot_config,
807 wal_dir: Some(wal_dir.into()),
808 wal_config,
809 compaction_config,
810 schema_registry_config: SchemaRegistryConfig::default(),
811 }
812 }
813}
814
815#[derive(Debug, serde::Serialize)]
816pub struct StoreStats {
817 pub total_events: usize,
818 pub total_entities: usize,
819 pub total_event_types: usize,
820 pub total_ingested: u64,
821}
822
823impl Default for EventStore {
824 fn default() -> Self {
825 Self::new()
826 }
827}
828
829