1use crate::compaction::{CompactionConfig, CompactionManager};
2use crate::domain::entities::Event;
3use crate::error::{AllSourceError, Result};
4use crate::application::dto::QueryEventsRequest;
5use crate::index::{EventIndex, IndexEntry};
6use crate::metrics::MetricsRegistry;
7use crate::pipeline::PipelineManager;
8use crate::projection::{
9 EntitySnapshotProjection, EventCounterProjection, ProjectionManager,
10};
11use crate::replay::ReplayManager;
12use crate::schema::{SchemaRegistry, SchemaRegistryConfig};
13use crate::snapshot::{SnapshotConfig, SnapshotManager, SnapshotType};
14use crate::storage::ParquetStorage;
15use crate::wal::{WALConfig, WriteAheadLog};
16use crate::websocket::WebSocketManager;
17use chrono::{DateTime, Utc};
18use parking_lot::RwLock;
19use std::path::PathBuf;
20use std::sync::Arc;
21
22pub struct EventStore {
24 events: Arc<RwLock<Vec<Event>>>,
26
27 index: Arc<EventIndex>,
29
30 pub(crate) projections: Arc<RwLock<ProjectionManager>>,
32
33 storage: Option<Arc<RwLock<ParquetStorage>>>,
35
36 websocket_manager: Arc<WebSocketManager>,
38
39 snapshot_manager: Arc<SnapshotManager>,
41
42 wal: Option<Arc<WriteAheadLog>>,
44
45 compaction_manager: Option<Arc<CompactionManager>>,
47
48 schema_registry: Arc<SchemaRegistry>,
50
51 replay_manager: Arc<ReplayManager>,
53
54 pipeline_manager: Arc<PipelineManager>,
56
57 metrics: Arc<MetricsRegistry>,
59
60 total_ingested: Arc<RwLock<u64>>,
62}
63
64impl EventStore {
65 pub fn new() -> Self {
67 Self::with_config(EventStoreConfig::default())
68 }
69
70 pub fn with_config(config: EventStoreConfig) -> Self {
72 let mut projections = ProjectionManager::new();
73
74 projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
76 projections.register(Arc::new(EventCounterProjection::new("event_counters")));
77
78 let storage = config.storage_dir.as_ref().and_then(|dir| {
80 match ParquetStorage::new(dir) {
81 Ok(storage) => {
82 tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
83 Some(Arc::new(RwLock::new(storage)))
84 }
85 Err(e) => {
86 tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
87 None
88 }
89 }
90 });
91
92 let wal = config.wal_dir.as_ref().and_then(|dir| {
94 match WriteAheadLog::new(dir, config.wal_config.clone()) {
95 Ok(wal) => {
96 tracing::info!("✅ WAL enabled at: {}", dir.display());
97 Some(Arc::new(wal))
98 }
99 Err(e) => {
100 tracing::error!("❌ Failed to initialize WAL: {}", e);
101 None
102 }
103 }
104 });
105
106 let compaction_manager = config.storage_dir.as_ref().map(|dir| {
108 let manager = CompactionManager::new(dir, config.compaction_config.clone());
109 Arc::new(manager)
110 });
111
112 let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
114 tracing::info!("✅ Schema registry enabled");
115
116 let replay_manager = Arc::new(ReplayManager::new());
118 tracing::info!("✅ Replay manager enabled");
119
120 let pipeline_manager = Arc::new(PipelineManager::new());
122 tracing::info!("✅ Pipeline manager enabled");
123
124 let metrics = MetricsRegistry::new();
126 tracing::info!("✅ Prometheus metrics registry initialized");
127
128 let store = Self {
129 events: Arc::new(RwLock::new(Vec::new())),
130 index: Arc::new(EventIndex::new()),
131 projections: Arc::new(RwLock::new(projections)),
132 storage,
133 websocket_manager: Arc::new(WebSocketManager::new()),
134 snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
135 wal,
136 compaction_manager,
137 schema_registry,
138 replay_manager,
139 pipeline_manager,
140 metrics,
141 total_ingested: Arc::new(RwLock::new(0)),
142 };
143
144 let mut wal_recovered = false;
146 if let Some(ref wal) = store.wal {
147 match wal.recover() {
148 Ok(recovered_events) if !recovered_events.is_empty() => {
149 tracing::info!("🔄 Recovering {} events from WAL...", recovered_events.len());
150
151 for event in recovered_events {
152 let offset = store.events.read().len();
154 if let Err(e) = store.index.index_event(
155 event.id,
156 event.entity_id_str(),
157 event.event_type_str(),
158 event.timestamp,
159 offset,
160 ) {
161 tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
162 }
163
164 if let Err(e) = store.projections.read().process_event(&event) {
165 tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
166 }
167
168 store.events.write().push(event);
169 }
170
171 let total = store.events.read().len();
172 *store.total_ingested.write() = total as u64;
173 tracing::info!("✅ Successfully recovered {} events from WAL", total);
174
175 if store.storage.is_some() {
177 tracing::info!("📸 Checkpointing WAL to Parquet storage...");
178 if let Err(e) = store.flush_storage() {
179 tracing::error!("Failed to checkpoint to Parquet: {}", e);
180 } else if let Err(e) = wal.truncate() {
181 tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
182 } else {
183 tracing::info!("✅ WAL checkpointed and truncated");
184 }
185 }
186
187 wal_recovered = true;
188 }
189 Ok(_) => {
190 tracing::debug!("No events to recover from WAL");
191 }
192 Err(e) => {
193 tracing::error!("❌ WAL recovery failed: {}", e);
194 }
195 }
196 }
197
198 if !wal_recovered {
201 if let Some(ref storage) = store.storage {
202 if let Ok(persisted_events) = storage.read().load_all_events() {
203 tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
204
205 for event in persisted_events {
206 let offset = store.events.read().len();
208 if let Err(e) = store.index.index_event(
209 event.id,
210 event.entity_id_str(),
211 event.event_type_str(),
212 event.timestamp,
213 offset,
214 ) {
215 tracing::error!("Failed to re-index event {}: {}", event.id, e);
216 }
217
218 if let Err(e) = store.projections.read().process_event(&event) {
220 tracing::error!("Failed to re-process event {}: {}", event.id, e);
221 }
222
223 store.events.write().push(event);
224 }
225
226 let total = store.events.read().len();
227 *store.total_ingested.write() = total as u64;
228 tracing::info!("✅ Successfully loaded {} events from storage", total);
229 }
230 }
231 }
232
233 store
234 }
235
236 pub fn ingest(&self, event: Event) -> Result<()> {
238 let timer = self.metrics.ingestion_duration_seconds.start_timer();
240
241 let validation_result = self.validate_event(&event);
243 if let Err(e) = validation_result {
244 self.metrics.ingestion_errors_total.inc();
246 timer.observe_duration();
247 return Err(e);
248 }
249
250 if let Some(ref wal) = self.wal {
253 if let Err(e) = wal.append(event.clone()) {
254 self.metrics.ingestion_errors_total.inc();
255 timer.observe_duration();
256 return Err(e);
257 }
258 }
259
260 let mut events = self.events.write();
261 let offset = events.len();
262
263 self.index.index_event(
265 event.id,
266 event.entity_id_str(),
267 event.event_type_str(),
268 event.timestamp,
269 offset,
270 )?;
271
272 let projections = self.projections.read();
274 projections.process_event(&event)?;
275 drop(projections); let pipeline_results = self.pipeline_manager.process_event(&event);
280 if !pipeline_results.is_empty() {
281 tracing::debug!(
282 "Event {} processed by {} pipeline(s)",
283 event.id,
284 pipeline_results.len()
285 );
286 for (pipeline_id, result) in pipeline_results {
289 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
290 }
291 }
292
293 if let Some(ref storage) = self.storage {
295 let mut storage = storage.write();
296 storage.append_event(event.clone())?;
297 }
298
299 events.push(event.clone());
301 let total_events = events.len();
302 drop(events); self.websocket_manager.broadcast_event(Arc::new(event.clone()));
306
307 self.check_auto_snapshot(event.entity_id_str(), &event);
309
310 self.metrics.events_ingested_total.inc();
312 self.metrics.events_ingested_by_type
313 .with_label_values(&[event.event_type_str()])
314 .inc();
315 self.metrics.storage_events_total.set(total_events as i64);
316
317 let mut total = self.total_ingested.write();
319 *total += 1;
320
321 timer.observe_duration();
322
323 tracing::debug!(
324 "Event ingested: {} (offset: {})",
325 event.id,
326 offset
327 );
328
329 Ok(())
330 }
331
332 pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
334 Arc::clone(&self.websocket_manager)
335 }
336
337 pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
339 Arc::clone(&self.snapshot_manager)
340 }
341
342 pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
344 self.compaction_manager.as_ref().map(Arc::clone)
345 }
346
347 pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
349 Arc::clone(&self.schema_registry)
350 }
351
352 pub fn replay_manager(&self) -> Arc<ReplayManager> {
354 Arc::clone(&self.replay_manager)
355 }
356
357 pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
359 Arc::clone(&self.pipeline_manager)
360 }
361
362 pub fn metrics(&self) -> Arc<MetricsRegistry> {
364 Arc::clone(&self.metrics)
365 }
366
367 pub fn flush_storage(&self) -> Result<()> {
369 if let Some(ref storage) = self.storage {
370 let mut storage = storage.write();
371 storage.flush()?;
372 tracing::info!("✅ Flushed events to persistent storage");
373 }
374 Ok(())
375 }
376
377 pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
379 let events = self.query(QueryEventsRequest {
381 entity_id: Some(entity_id.to_string()),
382 event_type: None,
383 tenant_id: None,
384 as_of: None,
385 since: None,
386 until: None,
387 limit: None,
388 })?;
389
390 if events.is_empty() {
391 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
392 }
393
394 let mut state = serde_json::json!({});
396 for event in &events {
397 if let serde_json::Value::Object(ref mut state_map) = state {
398 if let serde_json::Value::Object(ref payload_map) = event.payload {
399 for (key, value) in payload_map {
400 state_map.insert(key.clone(), value.clone());
401 }
402 }
403 }
404 }
405
406 let last_event = events.last().unwrap();
407 self.snapshot_manager.create_snapshot(
408 entity_id.to_string(),
409 state,
410 last_event.timestamp,
411 events.len(),
412 SnapshotType::Manual,
413 )?;
414
415 Ok(())
416 }
417
418 fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
420 let entity_event_count = self
422 .index
423 .get_by_entity(entity_id)
424 .map(|entries| entries.len())
425 .unwrap_or(0);
426
427 if self.snapshot_manager.should_create_snapshot(
428 entity_id,
429 entity_event_count,
430 event.timestamp,
431 ) {
432 if let Err(e) = self.create_snapshot(entity_id) {
434 tracing::warn!(
435 "Failed to create automatic snapshot for {}: {}",
436 entity_id,
437 e
438 );
439 }
440 }
441 }
442
443 fn validate_event(&self, event: &Event) -> Result<()> {
445 if event.entity_id_str().is_empty() {
448 return Err(AllSourceError::ValidationError(
449 "entity_id cannot be empty".to_string(),
450 ));
451 }
452
453 if event.event_type_str().is_empty() {
454 return Err(AllSourceError::ValidationError(
455 "event_type cannot be empty".to_string(),
456 ));
457 }
458
459 Ok(())
460 }
461
462 pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
464 let query_type = if request.entity_id.is_some() {
466 "entity"
467 } else if request.event_type.is_some() {
468 "type"
469 } else {
470 "full_scan"
471 };
472
473 let timer = self.metrics.query_duration_seconds
475 .with_label_values(&[query_type])
476 .start_timer();
477
478 self.metrics.queries_total
480 .with_label_values(&[query_type])
481 .inc();
482
483 let events = self.events.read();
484
485 let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
487 self.index
489 .get_by_entity(entity_id)
490 .map(|entries| self.filter_entries(entries, &request))
491 .unwrap_or_default()
492 } else if let Some(event_type) = &request.event_type {
493 self.index
495 .get_by_type(event_type)
496 .map(|entries| self.filter_entries(entries, &request))
497 .unwrap_or_default()
498 } else {
499 (0..events.len()).collect()
501 };
502
503 let mut results: Vec<Event> = offsets
505 .iter()
506 .filter_map(|&offset| events.get(offset).cloned())
507 .filter(|event| self.apply_filters(event, &request))
508 .collect();
509
510 results.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
512
513 if let Some(limit) = request.limit {
515 results.truncate(limit);
516 }
517
518 self.metrics.query_results_total
520 .with_label_values(&[query_type])
521 .inc_by(results.len() as u64);
522
523 timer.observe_duration();
524
525 Ok(results)
526 }
527
528 fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
530 entries
531 .into_iter()
532 .filter(|entry| {
533 if let Some(as_of) = request.as_of {
535 if entry.timestamp > as_of {
536 return false;
537 }
538 }
539 if let Some(since) = request.since {
540 if entry.timestamp < since {
541 return false;
542 }
543 }
544 if let Some(until) = request.until {
545 if entry.timestamp > until {
546 return false;
547 }
548 }
549 true
550 })
551 .map(|entry| entry.offset)
552 .collect()
553 }
554
555 fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
557 if request.entity_id.is_some() {
559 if let Some(ref event_type) = request.event_type {
560 if event.event_type_str() != event_type {
561 return false;
562 }
563 }
564 }
565
566 true
567 }
568
569 pub fn reconstruct_state(
572 &self,
573 entity_id: &str,
574 as_of: Option<DateTime<Utc>>,
575 ) -> Result<serde_json::Value> {
576 let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
578 if let Some(snapshot) = self.snapshot_manager.get_snapshot_as_of(entity_id, as_of_time) {
580 tracing::debug!(
581 "Using snapshot from {} for entity {} (saved {} events)",
582 snapshot.as_of,
583 entity_id,
584 snapshot.event_count
585 );
586 (snapshot.state.clone(), Some(snapshot.as_of))
587 } else {
588 (serde_json::json!({}), None)
589 }
590 } else {
591 if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
593 tracing::debug!(
594 "Using latest snapshot from {} for entity {}",
595 snapshot.as_of,
596 entity_id
597 );
598 (snapshot.state.clone(), Some(snapshot.as_of))
599 } else {
600 (serde_json::json!({}), None)
601 }
602 };
603
604 let events = self.query(QueryEventsRequest {
606 entity_id: Some(entity_id.to_string()),
607 event_type: None,
608 tenant_id: None,
609 as_of,
610 since: since_timestamp,
611 until: None,
612 limit: None,
613 })?;
614
615 if events.is_empty() && since_timestamp.is_none() {
617 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
618 }
619
620 let mut merged_state = merged_state;
622 for event in &events {
623 if let serde_json::Value::Object(ref mut state_map) = merged_state {
624 if let serde_json::Value::Object(ref payload_map) = event.payload {
625 for (key, value) in payload_map {
626 state_map.insert(key.clone(), value.clone());
627 }
628 }
629 }
630 }
631
632 let state = serde_json::json!({
634 "entity_id": entity_id,
635 "last_updated": events.last().map(|e| e.timestamp),
636 "event_count": events.len(),
637 "as_of": as_of,
638 "current_state": merged_state,
639 "history": events.iter().map(|e| {
640 serde_json::json!({
641 "event_id": e.id,
642 "type": e.event_type,
643 "timestamp": e.timestamp,
644 "payload": e.payload
645 })
646 }).collect::<Vec<_>>()
647 });
648
649 Ok(state)
650 }
651
652 pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
654 let projections = self.projections.read();
655
656 if let Some(snapshot_projection) = projections.get_projection("entity_snapshots") {
657 if let Some(state) = snapshot_projection.get_state(entity_id) {
658 return Ok(serde_json::json!({
659 "entity_id": entity_id,
660 "snapshot": state,
661 "from_projection": "entity_snapshots"
662 }));
663 }
664 }
665
666 Err(AllSourceError::EntityNotFound(entity_id.to_string()))
667 }
668
669 pub fn stats(&self) -> StoreStats {
671 let events = self.events.read();
672 let index_stats = self.index.stats();
673
674 StoreStats {
675 total_events: events.len(),
676 total_entities: index_stats.total_entities,
677 total_event_types: index_stats.total_event_types,
678 total_ingested: *self.total_ingested.read(),
679 }
680 }
681}
682
683#[derive(Debug, Clone)]
685pub struct EventStoreConfig {
686 pub storage_dir: Option<PathBuf>,
688
689 pub snapshot_config: SnapshotConfig,
691
692 pub wal_dir: Option<PathBuf>,
694
695 pub wal_config: WALConfig,
697
698 pub compaction_config: CompactionConfig,
700
701 pub schema_registry_config: SchemaRegistryConfig,
703}
704
705impl Default for EventStoreConfig {
706 fn default() -> Self {
707 Self {
708 storage_dir: None,
709 snapshot_config: SnapshotConfig::default(),
710 wal_dir: None,
711 wal_config: WALConfig::default(),
712 compaction_config: CompactionConfig::default(),
713 schema_registry_config: SchemaRegistryConfig::default(),
714 }
715 }
716}
717
718impl EventStoreConfig {
719 pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
721 Self {
722 storage_dir: Some(storage_dir.into()),
723 snapshot_config: SnapshotConfig::default(),
724 wal_dir: None,
725 wal_config: WALConfig::default(),
726 compaction_config: CompactionConfig::default(),
727 schema_registry_config: SchemaRegistryConfig::default(),
728 }
729 }
730
731 pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
733 Self {
734 storage_dir: None,
735 snapshot_config,
736 wal_dir: None,
737 wal_config: WALConfig::default(),
738 compaction_config: CompactionConfig::default(),
739 schema_registry_config: SchemaRegistryConfig::default(),
740 }
741 }
742
743 pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
745 Self {
746 storage_dir: None,
747 snapshot_config: SnapshotConfig::default(),
748 wal_dir: Some(wal_dir.into()),
749 wal_config,
750 compaction_config: CompactionConfig::default(),
751 schema_registry_config: SchemaRegistryConfig::default(),
752 }
753 }
754
755 pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
757 Self {
758 storage_dir: Some(storage_dir.into()),
759 snapshot_config,
760 wal_dir: None,
761 wal_config: WALConfig::default(),
762 compaction_config: CompactionConfig::default(),
763 schema_registry_config: SchemaRegistryConfig::default(),
764 }
765 }
766
767 pub fn production(
769 storage_dir: impl Into<PathBuf>,
770 wal_dir: impl Into<PathBuf>,
771 snapshot_config: SnapshotConfig,
772 wal_config: WALConfig,
773 compaction_config: CompactionConfig,
774 ) -> Self {
775 Self {
776 storage_dir: Some(storage_dir.into()),
777 snapshot_config,
778 wal_dir: Some(wal_dir.into()),
779 wal_config,
780 compaction_config,
781 schema_registry_config: SchemaRegistryConfig::default(),
782 }
783 }
784}
785
786#[derive(Debug, serde::Serialize)]
787pub struct StoreStats {
788 pub total_events: usize,
789 pub total_entities: usize,
790 pub total_event_types: usize,
791 pub total_ingested: u64,
792}
793
794impl Default for EventStore {
795 fn default() -> Self {
796 Self::new()
797 }
798}
799
800