1use crate::application::dto::QueryEventsRequest;
2use crate::application::services::pipeline::PipelineManager;
3use crate::application::services::projection::{
4 EntitySnapshotProjection, EventCounterProjection, ProjectionManager,
5};
6use crate::application::services::replay::ReplayManager;
7use crate::application::services::schema::{SchemaRegistry, SchemaRegistryConfig};
8use crate::domain::entities::Event;
9use crate::error::{AllSourceError, Result};
10use crate::infrastructure::observability::metrics::MetricsRegistry;
11use crate::infrastructure::persistence::compaction::{CompactionConfig, CompactionManager};
12use crate::infrastructure::persistence::index::{EventIndex, IndexEntry};
13use crate::infrastructure::persistence::snapshot::{SnapshotConfig, SnapshotManager, SnapshotType};
14use crate::infrastructure::persistence::storage::ParquetStorage;
15use crate::infrastructure::persistence::wal::{WALConfig, WriteAheadLog};
16use crate::infrastructure::web::websocket::WebSocketManager;
17use chrono::{DateTime, Utc};
18use dashmap::DashMap;
19use parking_lot::RwLock;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23pub struct EventStore {
25 events: Arc<RwLock<Vec<Event>>>,
27
28 index: Arc<EventIndex>,
30
31 pub(crate) projections: Arc<RwLock<ProjectionManager>>,
33
34 storage: Option<Arc<RwLock<ParquetStorage>>>,
36
37 websocket_manager: Arc<WebSocketManager>,
39
40 snapshot_manager: Arc<SnapshotManager>,
42
43 wal: Option<Arc<WriteAheadLog>>,
45
46 compaction_manager: Option<Arc<CompactionManager>>,
48
49 schema_registry: Arc<SchemaRegistry>,
51
52 replay_manager: Arc<ReplayManager>,
54
55 pipeline_manager: Arc<PipelineManager>,
57
58 metrics: Arc<MetricsRegistry>,
60
61 total_ingested: Arc<RwLock<u64>>,
63
64 projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
68}
69
70impl EventStore {
71 pub fn new() -> Self {
73 Self::with_config(EventStoreConfig::default())
74 }
75
76 pub fn with_config(config: EventStoreConfig) -> Self {
78 let mut projections = ProjectionManager::new();
79
80 projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
82 projections.register(Arc::new(EventCounterProjection::new("event_counters")));
83
84 let storage = config
86 .storage_dir
87 .as_ref()
88 .and_then(|dir| match ParquetStorage::new(dir) {
89 Ok(storage) => {
90 tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
91 Some(Arc::new(RwLock::new(storage)))
92 }
93 Err(e) => {
94 tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
95 None
96 }
97 });
98
99 let wal = config.wal_dir.as_ref().and_then(|dir| {
101 match WriteAheadLog::new(dir, config.wal_config.clone()) {
102 Ok(wal) => {
103 tracing::info!("✅ WAL enabled at: {}", dir.display());
104 Some(Arc::new(wal))
105 }
106 Err(e) => {
107 tracing::error!("❌ Failed to initialize WAL: {}", e);
108 None
109 }
110 }
111 });
112
113 let compaction_manager = config.storage_dir.as_ref().map(|dir| {
115 let manager = CompactionManager::new(dir, config.compaction_config.clone());
116 Arc::new(manager)
117 });
118
119 let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
121 tracing::info!("✅ Schema registry enabled");
122
123 let replay_manager = Arc::new(ReplayManager::new());
125 tracing::info!("✅ Replay manager enabled");
126
127 let pipeline_manager = Arc::new(PipelineManager::new());
129 tracing::info!("✅ Pipeline manager enabled");
130
131 let metrics = MetricsRegistry::new();
133 tracing::info!("✅ Prometheus metrics registry initialized");
134
135 let projection_state_cache = Arc::new(DashMap::new());
137 tracing::info!("✅ Projection state cache initialized");
138
139 let store = Self {
140 events: Arc::new(RwLock::new(Vec::new())),
141 index: Arc::new(EventIndex::new()),
142 projections: Arc::new(RwLock::new(projections)),
143 storage,
144 websocket_manager: Arc::new(WebSocketManager::new()),
145 snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
146 wal,
147 compaction_manager,
148 schema_registry,
149 replay_manager,
150 pipeline_manager,
151 metrics,
152 total_ingested: Arc::new(RwLock::new(0)),
153 projection_state_cache,
154 };
155
156 let mut wal_recovered = false;
158 if let Some(ref wal) = store.wal {
159 match wal.recover() {
160 Ok(recovered_events) if !recovered_events.is_empty() => {
161 tracing::info!(
162 "🔄 Recovering {} events from WAL...",
163 recovered_events.len()
164 );
165
166 for event in recovered_events {
167 let offset = store.events.read().len();
169 if let Err(e) = store.index.index_event(
170 event.id,
171 event.entity_id_str(),
172 event.event_type_str(),
173 event.timestamp,
174 offset,
175 ) {
176 tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
177 }
178
179 if let Err(e) = store.projections.read().process_event(&event) {
180 tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
181 }
182
183 store.events.write().push(event);
184 }
185
186 let total = store.events.read().len();
187 *store.total_ingested.write() = total as u64;
188 tracing::info!("✅ Successfully recovered {} events from WAL", total);
189
190 if store.storage.is_some() {
192 tracing::info!("📸 Checkpointing WAL to Parquet storage...");
193 if let Err(e) = store.flush_storage() {
194 tracing::error!("Failed to checkpoint to Parquet: {}", e);
195 } else if let Err(e) = wal.truncate() {
196 tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
197 } else {
198 tracing::info!("✅ WAL checkpointed and truncated");
199 }
200 }
201
202 wal_recovered = true;
203 }
204 Ok(_) => {
205 tracing::debug!("No events to recover from WAL");
206 }
207 Err(e) => {
208 tracing::error!("❌ WAL recovery failed: {}", e);
209 }
210 }
211 }
212
213 if !wal_recovered {
216 if let Some(ref storage) = store.storage {
217 if let Ok(persisted_events) = storage.read().load_all_events() {
218 tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
219
220 for event in persisted_events {
221 let offset = store.events.read().len();
223 if let Err(e) = store.index.index_event(
224 event.id,
225 event.entity_id_str(),
226 event.event_type_str(),
227 event.timestamp,
228 offset,
229 ) {
230 tracing::error!("Failed to re-index event {}: {}", event.id, e);
231 }
232
233 if let Err(e) = store.projections.read().process_event(&event) {
235 tracing::error!("Failed to re-process event {}: {}", event.id, e);
236 }
237
238 store.events.write().push(event);
239 }
240
241 let total = store.events.read().len();
242 *store.total_ingested.write() = total as u64;
243 tracing::info!("✅ Successfully loaded {} events from storage", total);
244 }
245 }
246 }
247
248 store
249 }
250
251 pub fn ingest(&self, event: Event) -> Result<()> {
253 let timer = self.metrics.ingestion_duration_seconds.start_timer();
255
256 let validation_result = self.validate_event(&event);
258 if let Err(e) = validation_result {
259 self.metrics.ingestion_errors_total.inc();
261 timer.observe_duration();
262 return Err(e);
263 }
264
265 if let Some(ref wal) = self.wal {
268 if let Err(e) = wal.append(event.clone()) {
269 self.metrics.ingestion_errors_total.inc();
270 timer.observe_duration();
271 return Err(e);
272 }
273 }
274
275 let mut events = self.events.write();
276 let offset = events.len();
277
278 self.index.index_event(
280 event.id,
281 event.entity_id_str(),
282 event.event_type_str(),
283 event.timestamp,
284 offset,
285 )?;
286
287 let projections = self.projections.read();
289 projections.process_event(&event)?;
290 drop(projections); let pipeline_results = self.pipeline_manager.process_event(&event);
295 if !pipeline_results.is_empty() {
296 tracing::debug!(
297 "Event {} processed by {} pipeline(s)",
298 event.id,
299 pipeline_results.len()
300 );
301 for (pipeline_id, result) in pipeline_results {
304 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
305 }
306 }
307
308 if let Some(ref storage) = self.storage {
310 let mut storage = storage.write();
311 storage.append_event(event.clone())?;
312 }
313
314 events.push(event.clone());
316 let total_events = events.len();
317 drop(events); self.websocket_manager
321 .broadcast_event(Arc::new(event.clone()));
322
323 self.check_auto_snapshot(event.entity_id_str(), &event);
325
326 self.metrics.events_ingested_total.inc();
328 self.metrics
329 .events_ingested_by_type
330 .with_label_values(&[event.event_type_str()])
331 .inc();
332 self.metrics.storage_events_total.set(total_events as i64);
333
334 let mut total = self.total_ingested.write();
336 *total += 1;
337
338 timer.observe_duration();
339
340 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
341
342 Ok(())
343 }
344
345 pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
347 Arc::clone(&self.websocket_manager)
348 }
349
350 pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
352 Arc::clone(&self.snapshot_manager)
353 }
354
355 pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
357 self.compaction_manager.as_ref().map(Arc::clone)
358 }
359
360 pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
362 Arc::clone(&self.schema_registry)
363 }
364
365 pub fn replay_manager(&self) -> Arc<ReplayManager> {
367 Arc::clone(&self.replay_manager)
368 }
369
370 pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
372 Arc::clone(&self.pipeline_manager)
373 }
374
375 pub fn metrics(&self) -> Arc<MetricsRegistry> {
377 Arc::clone(&self.metrics)
378 }
379
380 pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
382 self.projections.read()
383 }
384
385 pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
388 Arc::clone(&self.projection_state_cache)
389 }
390
391 pub fn flush_storage(&self) -> Result<()> {
393 if let Some(ref storage) = self.storage {
394 let mut storage = storage.write();
395 storage.flush()?;
396 tracing::info!("✅ Flushed events to persistent storage");
397 }
398 Ok(())
399 }
400
401 pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
403 let events = self.query(QueryEventsRequest {
405 entity_id: Some(entity_id.to_string()),
406 event_type: None,
407 tenant_id: None,
408 as_of: None,
409 since: None,
410 until: None,
411 limit: None,
412 })?;
413
414 if events.is_empty() {
415 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
416 }
417
418 let mut state = serde_json::json!({});
420 for event in &events {
421 if let serde_json::Value::Object(ref mut state_map) = state {
422 if let serde_json::Value::Object(ref payload_map) = event.payload {
423 for (key, value) in payload_map {
424 state_map.insert(key.clone(), value.clone());
425 }
426 }
427 }
428 }
429
430 let last_event = events.last().unwrap();
431 self.snapshot_manager.create_snapshot(
432 entity_id.to_string(),
433 state,
434 last_event.timestamp,
435 events.len(),
436 SnapshotType::Manual,
437 )?;
438
439 Ok(())
440 }
441
442 fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
444 let entity_event_count = self
446 .index
447 .get_by_entity(entity_id)
448 .map(|entries| entries.len())
449 .unwrap_or(0);
450
451 if self.snapshot_manager.should_create_snapshot(
452 entity_id,
453 entity_event_count,
454 event.timestamp,
455 ) {
456 if let Err(e) = self.create_snapshot(entity_id) {
458 tracing::warn!(
459 "Failed to create automatic snapshot for {}: {}",
460 entity_id,
461 e
462 );
463 }
464 }
465 }
466
467 fn validate_event(&self, event: &Event) -> Result<()> {
469 if event.entity_id_str().is_empty() {
472 return Err(AllSourceError::ValidationError(
473 "entity_id cannot be empty".to_string(),
474 ));
475 }
476
477 if event.event_type_str().is_empty() {
478 return Err(AllSourceError::ValidationError(
479 "event_type cannot be empty".to_string(),
480 ));
481 }
482
483 Ok(())
484 }
485
486 pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
488 let query_type = if request.entity_id.is_some() {
490 "entity"
491 } else if request.event_type.is_some() {
492 "type"
493 } else {
494 "full_scan"
495 };
496
497 let timer = self
499 .metrics
500 .query_duration_seconds
501 .with_label_values(&[query_type])
502 .start_timer();
503
504 self.metrics
506 .queries_total
507 .with_label_values(&[query_type])
508 .inc();
509
510 let events = self.events.read();
511
512 let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
514 self.index
516 .get_by_entity(entity_id)
517 .map(|entries| self.filter_entries(entries, &request))
518 .unwrap_or_default()
519 } else if let Some(event_type) = &request.event_type {
520 self.index
522 .get_by_type(event_type)
523 .map(|entries| self.filter_entries(entries, &request))
524 .unwrap_or_default()
525 } else {
526 (0..events.len()).collect()
528 };
529
530 let mut results: Vec<Event> = offsets
532 .iter()
533 .filter_map(|&offset| events.get(offset).cloned())
534 .filter(|event| self.apply_filters(event, &request))
535 .collect();
536
537 results.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
539
540 if let Some(limit) = request.limit {
542 results.truncate(limit);
543 }
544
545 self.metrics
547 .query_results_total
548 .with_label_values(&[query_type])
549 .inc_by(results.len() as u64);
550
551 timer.observe_duration();
552
553 Ok(results)
554 }
555
556 fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
558 entries
559 .into_iter()
560 .filter(|entry| {
561 if let Some(as_of) = request.as_of {
563 if entry.timestamp > as_of {
564 return false;
565 }
566 }
567 if let Some(since) = request.since {
568 if entry.timestamp < since {
569 return false;
570 }
571 }
572 if let Some(until) = request.until {
573 if entry.timestamp > until {
574 return false;
575 }
576 }
577 true
578 })
579 .map(|entry| entry.offset)
580 .collect()
581 }
582
583 fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
585 if request.entity_id.is_some() {
587 if let Some(ref event_type) = request.event_type {
588 if event.event_type_str() != event_type {
589 return false;
590 }
591 }
592 }
593
594 true
595 }
596
597 pub fn reconstruct_state(
600 &self,
601 entity_id: &str,
602 as_of: Option<DateTime<Utc>>,
603 ) -> Result<serde_json::Value> {
604 let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
606 if let Some(snapshot) = self
608 .snapshot_manager
609 .get_snapshot_as_of(entity_id, as_of_time)
610 {
611 tracing::debug!(
612 "Using snapshot from {} for entity {} (saved {} events)",
613 snapshot.as_of,
614 entity_id,
615 snapshot.event_count
616 );
617 (snapshot.state.clone(), Some(snapshot.as_of))
618 } else {
619 (serde_json::json!({}), None)
620 }
621 } else {
622 if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
624 tracing::debug!(
625 "Using latest snapshot from {} for entity {}",
626 snapshot.as_of,
627 entity_id
628 );
629 (snapshot.state.clone(), Some(snapshot.as_of))
630 } else {
631 (serde_json::json!({}), None)
632 }
633 };
634
635 let events = self.query(QueryEventsRequest {
637 entity_id: Some(entity_id.to_string()),
638 event_type: None,
639 tenant_id: None,
640 as_of,
641 since: since_timestamp,
642 until: None,
643 limit: None,
644 })?;
645
646 if events.is_empty() && since_timestamp.is_none() {
648 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
649 }
650
651 let mut merged_state = merged_state;
653 for event in &events {
654 if let serde_json::Value::Object(ref mut state_map) = merged_state {
655 if let serde_json::Value::Object(ref payload_map) = event.payload {
656 for (key, value) in payload_map {
657 state_map.insert(key.clone(), value.clone());
658 }
659 }
660 }
661 }
662
663 let state = serde_json::json!({
665 "entity_id": entity_id,
666 "last_updated": events.last().map(|e| e.timestamp),
667 "event_count": events.len(),
668 "as_of": as_of,
669 "current_state": merged_state,
670 "history": events.iter().map(|e| {
671 serde_json::json!({
672 "event_id": e.id,
673 "type": e.event_type,
674 "timestamp": e.timestamp,
675 "payload": e.payload
676 })
677 }).collect::<Vec<_>>()
678 });
679
680 Ok(state)
681 }
682
683 pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
685 let projections = self.projections.read();
686
687 if let Some(snapshot_projection) = projections.get_projection("entity_snapshots") {
688 if let Some(state) = snapshot_projection.get_state(entity_id) {
689 return Ok(serde_json::json!({
690 "entity_id": entity_id,
691 "snapshot": state,
692 "from_projection": "entity_snapshots"
693 }));
694 }
695 }
696
697 Err(AllSourceError::EntityNotFound(entity_id.to_string()))
698 }
699
700 pub fn stats(&self) -> StoreStats {
702 let events = self.events.read();
703 let index_stats = self.index.stats();
704
705 StoreStats {
706 total_events: events.len(),
707 total_entities: index_stats.total_entities,
708 total_event_types: index_stats.total_event_types,
709 total_ingested: *self.total_ingested.read(),
710 }
711 }
712}
713
714#[derive(Debug, Clone)]
716pub struct EventStoreConfig {
717 pub storage_dir: Option<PathBuf>,
719
720 pub snapshot_config: SnapshotConfig,
722
723 pub wal_dir: Option<PathBuf>,
725
726 pub wal_config: WALConfig,
728
729 pub compaction_config: CompactionConfig,
731
732 pub schema_registry_config: SchemaRegistryConfig,
734}
735
736impl Default for EventStoreConfig {
737 fn default() -> Self {
738 Self {
739 storage_dir: None,
740 snapshot_config: SnapshotConfig::default(),
741 wal_dir: None,
742 wal_config: WALConfig::default(),
743 compaction_config: CompactionConfig::default(),
744 schema_registry_config: SchemaRegistryConfig::default(),
745 }
746 }
747}
748
749impl EventStoreConfig {
750 pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
752 Self {
753 storage_dir: Some(storage_dir.into()),
754 snapshot_config: SnapshotConfig::default(),
755 wal_dir: None,
756 wal_config: WALConfig::default(),
757 compaction_config: CompactionConfig::default(),
758 schema_registry_config: SchemaRegistryConfig::default(),
759 }
760 }
761
762 pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
764 Self {
765 storage_dir: None,
766 snapshot_config,
767 wal_dir: None,
768 wal_config: WALConfig::default(),
769 compaction_config: CompactionConfig::default(),
770 schema_registry_config: SchemaRegistryConfig::default(),
771 }
772 }
773
774 pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
776 Self {
777 storage_dir: None,
778 snapshot_config: SnapshotConfig::default(),
779 wal_dir: Some(wal_dir.into()),
780 wal_config,
781 compaction_config: CompactionConfig::default(),
782 schema_registry_config: SchemaRegistryConfig::default(),
783 }
784 }
785
786 pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
788 Self {
789 storage_dir: Some(storage_dir.into()),
790 snapshot_config,
791 wal_dir: None,
792 wal_config: WALConfig::default(),
793 compaction_config: CompactionConfig::default(),
794 schema_registry_config: SchemaRegistryConfig::default(),
795 }
796 }
797
798 pub fn production(
800 storage_dir: impl Into<PathBuf>,
801 wal_dir: impl Into<PathBuf>,
802 snapshot_config: SnapshotConfig,
803 wal_config: WALConfig,
804 compaction_config: CompactionConfig,
805 ) -> Self {
806 Self {
807 storage_dir: Some(storage_dir.into()),
808 snapshot_config,
809 wal_dir: Some(wal_dir.into()),
810 wal_config,
811 compaction_config,
812 schema_registry_config: SchemaRegistryConfig::default(),
813 }
814 }
815}
816
817#[derive(Debug, serde::Serialize)]
818pub struct StoreStats {
819 pub total_events: usize,
820 pub total_entities: usize,
821 pub total_event_types: usize,
822 pub total_ingested: u64,
823}
824
825impl Default for EventStore {
826 fn default() -> Self {
827 Self::new()
828 }
829}
830
831