1use crate::application::dto::QueryEventsRequest;
2use crate::compaction::{CompactionConfig, CompactionManager};
3use crate::domain::entities::Event;
4use crate::error::{AllSourceError, Result};
5use crate::index::{EventIndex, IndexEntry};
6use crate::metrics::MetricsRegistry;
7use crate::pipeline::PipelineManager;
8use crate::projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager};
9use crate::replay::ReplayManager;
10use crate::schema::{SchemaRegistry, SchemaRegistryConfig};
11use crate::snapshot::{SnapshotConfig, SnapshotManager, SnapshotType};
12use crate::storage::ParquetStorage;
13use crate::wal::{WALConfig, WriteAheadLog};
14use crate::websocket::WebSocketManager;
15use chrono::{DateTime, Utc};
16use parking_lot::RwLock;
17use std::path::PathBuf;
18use std::sync::Arc;
19
20pub struct EventStore {
22 events: Arc<RwLock<Vec<Event>>>,
24
25 index: Arc<EventIndex>,
27
28 pub(crate) projections: Arc<RwLock<ProjectionManager>>,
30
31 storage: Option<Arc<RwLock<ParquetStorage>>>,
33
34 websocket_manager: Arc<WebSocketManager>,
36
37 snapshot_manager: Arc<SnapshotManager>,
39
40 wal: Option<Arc<WriteAheadLog>>,
42
43 compaction_manager: Option<Arc<CompactionManager>>,
45
46 schema_registry: Arc<SchemaRegistry>,
48
49 replay_manager: Arc<ReplayManager>,
51
52 pipeline_manager: Arc<PipelineManager>,
54
55 metrics: Arc<MetricsRegistry>,
57
58 total_ingested: Arc<RwLock<u64>>,
60}
61
62impl EventStore {
63 pub fn new() -> Self {
65 Self::with_config(EventStoreConfig::default())
66 }
67
68 pub fn with_config(config: EventStoreConfig) -> Self {
70 let mut projections = ProjectionManager::new();
71
72 projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
74 projections.register(Arc::new(EventCounterProjection::new("event_counters")));
75
76 let storage = config
78 .storage_dir
79 .as_ref()
80 .and_then(|dir| match ParquetStorage::new(dir) {
81 Ok(storage) => {
82 tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
83 Some(Arc::new(RwLock::new(storage)))
84 }
85 Err(e) => {
86 tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
87 None
88 }
89 });
90
91 let wal = config.wal_dir.as_ref().and_then(|dir| {
93 match WriteAheadLog::new(dir, config.wal_config.clone()) {
94 Ok(wal) => {
95 tracing::info!("✅ WAL enabled at: {}", dir.display());
96 Some(Arc::new(wal))
97 }
98 Err(e) => {
99 tracing::error!("❌ Failed to initialize WAL: {}", e);
100 None
101 }
102 }
103 });
104
105 let compaction_manager = config.storage_dir.as_ref().map(|dir| {
107 let manager = CompactionManager::new(dir, config.compaction_config.clone());
108 Arc::new(manager)
109 });
110
111 let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
113 tracing::info!("✅ Schema registry enabled");
114
115 let replay_manager = Arc::new(ReplayManager::new());
117 tracing::info!("✅ Replay manager enabled");
118
119 let pipeline_manager = Arc::new(PipelineManager::new());
121 tracing::info!("✅ Pipeline manager enabled");
122
123 let metrics = MetricsRegistry::new();
125 tracing::info!("✅ Prometheus metrics registry initialized");
126
127 let store = Self {
128 events: Arc::new(RwLock::new(Vec::new())),
129 index: Arc::new(EventIndex::new()),
130 projections: Arc::new(RwLock::new(projections)),
131 storage,
132 websocket_manager: Arc::new(WebSocketManager::new()),
133 snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
134 wal,
135 compaction_manager,
136 schema_registry,
137 replay_manager,
138 pipeline_manager,
139 metrics,
140 total_ingested: Arc::new(RwLock::new(0)),
141 };
142
143 let mut wal_recovered = false;
145 if let Some(ref wal) = store.wal {
146 match wal.recover() {
147 Ok(recovered_events) if !recovered_events.is_empty() => {
148 tracing::info!(
149 "🔄 Recovering {} events from WAL...",
150 recovered_events.len()
151 );
152
153 for event in recovered_events {
154 let offset = store.events.read().len();
156 if let Err(e) = store.index.index_event(
157 event.id,
158 event.entity_id_str(),
159 event.event_type_str(),
160 event.timestamp,
161 offset,
162 ) {
163 tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
164 }
165
166 if let Err(e) = store.projections.read().process_event(&event) {
167 tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
168 }
169
170 store.events.write().push(event);
171 }
172
173 let total = store.events.read().len();
174 *store.total_ingested.write() = total as u64;
175 tracing::info!("✅ Successfully recovered {} events from WAL", total);
176
177 if store.storage.is_some() {
179 tracing::info!("📸 Checkpointing WAL to Parquet storage...");
180 if let Err(e) = store.flush_storage() {
181 tracing::error!("Failed to checkpoint to Parquet: {}", e);
182 } else if let Err(e) = wal.truncate() {
183 tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
184 } else {
185 tracing::info!("✅ WAL checkpointed and truncated");
186 }
187 }
188
189 wal_recovered = true;
190 }
191 Ok(_) => {
192 tracing::debug!("No events to recover from WAL");
193 }
194 Err(e) => {
195 tracing::error!("❌ WAL recovery failed: {}", e);
196 }
197 }
198 }
199
200 if !wal_recovered {
203 if let Some(ref storage) = store.storage {
204 if let Ok(persisted_events) = storage.read().load_all_events() {
205 tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
206
207 for event in persisted_events {
208 let offset = store.events.read().len();
210 if let Err(e) = store.index.index_event(
211 event.id,
212 event.entity_id_str(),
213 event.event_type_str(),
214 event.timestamp,
215 offset,
216 ) {
217 tracing::error!("Failed to re-index event {}: {}", event.id, e);
218 }
219
220 if let Err(e) = store.projections.read().process_event(&event) {
222 tracing::error!("Failed to re-process event {}: {}", event.id, e);
223 }
224
225 store.events.write().push(event);
226 }
227
228 let total = store.events.read().len();
229 *store.total_ingested.write() = total as u64;
230 tracing::info!("✅ Successfully loaded {} events from storage", total);
231 }
232 }
233 }
234
235 store
236 }
237
238 pub fn ingest(&self, event: Event) -> Result<()> {
240 let timer = self.metrics.ingestion_duration_seconds.start_timer();
242
243 let validation_result = self.validate_event(&event);
245 if let Err(e) = validation_result {
246 self.metrics.ingestion_errors_total.inc();
248 timer.observe_duration();
249 return Err(e);
250 }
251
252 if let Some(ref wal) = self.wal {
255 if let Err(e) = wal.append(event.clone()) {
256 self.metrics.ingestion_errors_total.inc();
257 timer.observe_duration();
258 return Err(e);
259 }
260 }
261
262 let mut events = self.events.write();
263 let offset = events.len();
264
265 self.index.index_event(
267 event.id,
268 event.entity_id_str(),
269 event.event_type_str(),
270 event.timestamp,
271 offset,
272 )?;
273
274 let projections = self.projections.read();
276 projections.process_event(&event)?;
277 drop(projections); let pipeline_results = self.pipeline_manager.process_event(&event);
282 if !pipeline_results.is_empty() {
283 tracing::debug!(
284 "Event {} processed by {} pipeline(s)",
285 event.id,
286 pipeline_results.len()
287 );
288 for (pipeline_id, result) in pipeline_results {
291 tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
292 }
293 }
294
295 if let Some(ref storage) = self.storage {
297 let mut storage = storage.write();
298 storage.append_event(event.clone())?;
299 }
300
301 events.push(event.clone());
303 let total_events = events.len();
304 drop(events); self.websocket_manager
308 .broadcast_event(Arc::new(event.clone()));
309
310 self.check_auto_snapshot(event.entity_id_str(), &event);
312
313 self.metrics.events_ingested_total.inc();
315 self.metrics
316 .events_ingested_by_type
317 .with_label_values(&[event.event_type_str()])
318 .inc();
319 self.metrics.storage_events_total.set(total_events as i64);
320
321 let mut total = self.total_ingested.write();
323 *total += 1;
324
325 timer.observe_duration();
326
327 tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
328
329 Ok(())
330 }
331
332 pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
334 Arc::clone(&self.websocket_manager)
335 }
336
337 pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
339 Arc::clone(&self.snapshot_manager)
340 }
341
342 pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
344 self.compaction_manager.as_ref().map(Arc::clone)
345 }
346
347 pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
349 Arc::clone(&self.schema_registry)
350 }
351
352 pub fn replay_manager(&self) -> Arc<ReplayManager> {
354 Arc::clone(&self.replay_manager)
355 }
356
357 pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
359 Arc::clone(&self.pipeline_manager)
360 }
361
362 pub fn metrics(&self) -> Arc<MetricsRegistry> {
364 Arc::clone(&self.metrics)
365 }
366
367 pub fn flush_storage(&self) -> Result<()> {
369 if let Some(ref storage) = self.storage {
370 let mut storage = storage.write();
371 storage.flush()?;
372 tracing::info!("✅ Flushed events to persistent storage");
373 }
374 Ok(())
375 }
376
377 pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
379 let events = self.query(QueryEventsRequest {
381 entity_id: Some(entity_id.to_string()),
382 event_type: None,
383 tenant_id: None,
384 as_of: None,
385 since: None,
386 until: None,
387 limit: None,
388 })?;
389
390 if events.is_empty() {
391 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
392 }
393
394 let mut state = serde_json::json!({});
396 for event in &events {
397 if let serde_json::Value::Object(ref mut state_map) = state {
398 if let serde_json::Value::Object(ref payload_map) = event.payload {
399 for (key, value) in payload_map {
400 state_map.insert(key.clone(), value.clone());
401 }
402 }
403 }
404 }
405
406 let last_event = events.last().unwrap();
407 self.snapshot_manager.create_snapshot(
408 entity_id.to_string(),
409 state,
410 last_event.timestamp,
411 events.len(),
412 SnapshotType::Manual,
413 )?;
414
415 Ok(())
416 }
417
418 fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
420 let entity_event_count = self
422 .index
423 .get_by_entity(entity_id)
424 .map(|entries| entries.len())
425 .unwrap_or(0);
426
427 if self.snapshot_manager.should_create_snapshot(
428 entity_id,
429 entity_event_count,
430 event.timestamp,
431 ) {
432 if let Err(e) = self.create_snapshot(entity_id) {
434 tracing::warn!(
435 "Failed to create automatic snapshot for {}: {}",
436 entity_id,
437 e
438 );
439 }
440 }
441 }
442
443 fn validate_event(&self, event: &Event) -> Result<()> {
445 if event.entity_id_str().is_empty() {
448 return Err(AllSourceError::ValidationError(
449 "entity_id cannot be empty".to_string(),
450 ));
451 }
452
453 if event.event_type_str().is_empty() {
454 return Err(AllSourceError::ValidationError(
455 "event_type cannot be empty".to_string(),
456 ));
457 }
458
459 Ok(())
460 }
461
462 pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
464 let query_type = if request.entity_id.is_some() {
466 "entity"
467 } else if request.event_type.is_some() {
468 "type"
469 } else {
470 "full_scan"
471 };
472
473 let timer = self
475 .metrics
476 .query_duration_seconds
477 .with_label_values(&[query_type])
478 .start_timer();
479
480 self.metrics
482 .queries_total
483 .with_label_values(&[query_type])
484 .inc();
485
486 let events = self.events.read();
487
488 let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
490 self.index
492 .get_by_entity(entity_id)
493 .map(|entries| self.filter_entries(entries, &request))
494 .unwrap_or_default()
495 } else if let Some(event_type) = &request.event_type {
496 self.index
498 .get_by_type(event_type)
499 .map(|entries| self.filter_entries(entries, &request))
500 .unwrap_or_default()
501 } else {
502 (0..events.len()).collect()
504 };
505
506 let mut results: Vec<Event> = offsets
508 .iter()
509 .filter_map(|&offset| events.get(offset).cloned())
510 .filter(|event| self.apply_filters(event, &request))
511 .collect();
512
513 results.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
515
516 if let Some(limit) = request.limit {
518 results.truncate(limit);
519 }
520
521 self.metrics
523 .query_results_total
524 .with_label_values(&[query_type])
525 .inc_by(results.len() as u64);
526
527 timer.observe_duration();
528
529 Ok(results)
530 }
531
532 fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
534 entries
535 .into_iter()
536 .filter(|entry| {
537 if let Some(as_of) = request.as_of {
539 if entry.timestamp > as_of {
540 return false;
541 }
542 }
543 if let Some(since) = request.since {
544 if entry.timestamp < since {
545 return false;
546 }
547 }
548 if let Some(until) = request.until {
549 if entry.timestamp > until {
550 return false;
551 }
552 }
553 true
554 })
555 .map(|entry| entry.offset)
556 .collect()
557 }
558
559 fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
561 if request.entity_id.is_some() {
563 if let Some(ref event_type) = request.event_type {
564 if event.event_type_str() != event_type {
565 return false;
566 }
567 }
568 }
569
570 true
571 }
572
573 pub fn reconstruct_state(
576 &self,
577 entity_id: &str,
578 as_of: Option<DateTime<Utc>>,
579 ) -> Result<serde_json::Value> {
580 let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
582 if let Some(snapshot) = self
584 .snapshot_manager
585 .get_snapshot_as_of(entity_id, as_of_time)
586 {
587 tracing::debug!(
588 "Using snapshot from {} for entity {} (saved {} events)",
589 snapshot.as_of,
590 entity_id,
591 snapshot.event_count
592 );
593 (snapshot.state.clone(), Some(snapshot.as_of))
594 } else {
595 (serde_json::json!({}), None)
596 }
597 } else {
598 if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
600 tracing::debug!(
601 "Using latest snapshot from {} for entity {}",
602 snapshot.as_of,
603 entity_id
604 );
605 (snapshot.state.clone(), Some(snapshot.as_of))
606 } else {
607 (serde_json::json!({}), None)
608 }
609 };
610
611 let events = self.query(QueryEventsRequest {
613 entity_id: Some(entity_id.to_string()),
614 event_type: None,
615 tenant_id: None,
616 as_of,
617 since: since_timestamp,
618 until: None,
619 limit: None,
620 })?;
621
622 if events.is_empty() && since_timestamp.is_none() {
624 return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
625 }
626
627 let mut merged_state = merged_state;
629 for event in &events {
630 if let serde_json::Value::Object(ref mut state_map) = merged_state {
631 if let serde_json::Value::Object(ref payload_map) = event.payload {
632 for (key, value) in payload_map {
633 state_map.insert(key.clone(), value.clone());
634 }
635 }
636 }
637 }
638
639 let state = serde_json::json!({
641 "entity_id": entity_id,
642 "last_updated": events.last().map(|e| e.timestamp),
643 "event_count": events.len(),
644 "as_of": as_of,
645 "current_state": merged_state,
646 "history": events.iter().map(|e| {
647 serde_json::json!({
648 "event_id": e.id,
649 "type": e.event_type,
650 "timestamp": e.timestamp,
651 "payload": e.payload
652 })
653 }).collect::<Vec<_>>()
654 });
655
656 Ok(state)
657 }
658
659 pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
661 let projections = self.projections.read();
662
663 if let Some(snapshot_projection) = projections.get_projection("entity_snapshots") {
664 if let Some(state) = snapshot_projection.get_state(entity_id) {
665 return Ok(serde_json::json!({
666 "entity_id": entity_id,
667 "snapshot": state,
668 "from_projection": "entity_snapshots"
669 }));
670 }
671 }
672
673 Err(AllSourceError::EntityNotFound(entity_id.to_string()))
674 }
675
676 pub fn stats(&self) -> StoreStats {
678 let events = self.events.read();
679 let index_stats = self.index.stats();
680
681 StoreStats {
682 total_events: events.len(),
683 total_entities: index_stats.total_entities,
684 total_event_types: index_stats.total_event_types,
685 total_ingested: *self.total_ingested.read(),
686 }
687 }
688}
689
690#[derive(Debug, Clone)]
692pub struct EventStoreConfig {
693 pub storage_dir: Option<PathBuf>,
695
696 pub snapshot_config: SnapshotConfig,
698
699 pub wal_dir: Option<PathBuf>,
701
702 pub wal_config: WALConfig,
704
705 pub compaction_config: CompactionConfig,
707
708 pub schema_registry_config: SchemaRegistryConfig,
710}
711
712impl Default for EventStoreConfig {
713 fn default() -> Self {
714 Self {
715 storage_dir: None,
716 snapshot_config: SnapshotConfig::default(),
717 wal_dir: None,
718 wal_config: WALConfig::default(),
719 compaction_config: CompactionConfig::default(),
720 schema_registry_config: SchemaRegistryConfig::default(),
721 }
722 }
723}
724
725impl EventStoreConfig {
726 pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
728 Self {
729 storage_dir: Some(storage_dir.into()),
730 snapshot_config: SnapshotConfig::default(),
731 wal_dir: None,
732 wal_config: WALConfig::default(),
733 compaction_config: CompactionConfig::default(),
734 schema_registry_config: SchemaRegistryConfig::default(),
735 }
736 }
737
738 pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
740 Self {
741 storage_dir: None,
742 snapshot_config,
743 wal_dir: None,
744 wal_config: WALConfig::default(),
745 compaction_config: CompactionConfig::default(),
746 schema_registry_config: SchemaRegistryConfig::default(),
747 }
748 }
749
750 pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
752 Self {
753 storage_dir: None,
754 snapshot_config: SnapshotConfig::default(),
755 wal_dir: Some(wal_dir.into()),
756 wal_config,
757 compaction_config: CompactionConfig::default(),
758 schema_registry_config: SchemaRegistryConfig::default(),
759 }
760 }
761
762 pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
764 Self {
765 storage_dir: Some(storage_dir.into()),
766 snapshot_config,
767 wal_dir: None,
768 wal_config: WALConfig::default(),
769 compaction_config: CompactionConfig::default(),
770 schema_registry_config: SchemaRegistryConfig::default(),
771 }
772 }
773
774 pub fn production(
776 storage_dir: impl Into<PathBuf>,
777 wal_dir: impl Into<PathBuf>,
778 snapshot_config: SnapshotConfig,
779 wal_config: WALConfig,
780 compaction_config: CompactionConfig,
781 ) -> Self {
782 Self {
783 storage_dir: Some(storage_dir.into()),
784 snapshot_config,
785 wal_dir: Some(wal_dir.into()),
786 wal_config,
787 compaction_config,
788 schema_registry_config: SchemaRegistryConfig::default(),
789 }
790 }
791}
792
793#[derive(Debug, serde::Serialize)]
794pub struct StoreStats {
795 pub total_events: usize,
796 pub total_entities: usize,
797 pub total_event_types: usize,
798 pub total_ingested: u64,
799}
800
801impl Default for EventStore {
802 fn default() -> Self {
803 Self::new()
804 }
805}
806
807