allsource_core/
store.rs

1use crate::application::dto::QueryEventsRequest;
2use crate::infrastructure::persistence::compaction::{CompactionConfig, CompactionManager};
3use crate::domain::entities::Event;
4use crate::error::{AllSourceError, Result};
5use crate::infrastructure::persistence::index::{EventIndex, IndexEntry};
6use crate::infrastructure::observability::metrics::MetricsRegistry;
7use crate::application::services::pipeline::PipelineManager;
8use crate::application::services::projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager};
9use crate::application::services::replay::ReplayManager;
10use crate::application::services::schema::{SchemaRegistry, SchemaRegistryConfig};
11use crate::infrastructure::persistence::snapshot::{SnapshotConfig, SnapshotManager, SnapshotType};
12use crate::infrastructure::persistence::storage::ParquetStorage;
13use crate::infrastructure::persistence::wal::{WALConfig, WriteAheadLog};
14use crate::infrastructure::web::websocket::WebSocketManager;
15use chrono::{DateTime, Utc};
16use dashmap::DashMap;
17use parking_lot::RwLock;
18use std::path::PathBuf;
19use std::sync::Arc;
20
21/// High-performance event store with columnar storage
22pub struct EventStore {
23    /// In-memory event storage
24    events: Arc<RwLock<Vec<Event>>>,
25
26    /// High-performance concurrent index
27    index: Arc<EventIndex>,
28
29    /// Projection manager for real-time aggregations
30    pub(crate) projections: Arc<RwLock<ProjectionManager>>,
31
32    /// Optional persistent storage (v0.2 feature)
33    storage: Option<Arc<RwLock<ParquetStorage>>>,
34
35    /// WebSocket manager for real-time event streaming (v0.2 feature)
36    websocket_manager: Arc<WebSocketManager>,
37
38    /// Snapshot manager for fast state recovery (v0.2 feature)
39    snapshot_manager: Arc<SnapshotManager>,
40
41    /// Write-Ahead Log for durability (v0.2 feature)
42    wal: Option<Arc<WriteAheadLog>>,
43
44    /// Compaction manager for Parquet optimization (v0.2 feature)
45    compaction_manager: Option<Arc<CompactionManager>>,
46
47    /// Schema registry for event validation (v0.5 feature)
48    schema_registry: Arc<SchemaRegistry>,
49
50    /// Replay manager for event replay and projection rebuilding (v0.5 feature)
51    replay_manager: Arc<ReplayManager>,
52
53    /// Pipeline manager for stream processing (v0.5 feature)
54    pipeline_manager: Arc<PipelineManager>,
55
56    /// Prometheus metrics registry (v0.6 feature)
57    metrics: Arc<MetricsRegistry>,
58
59    /// Total events ingested (for metrics)
60    total_ingested: Arc<RwLock<u64>>,
61
62    /// Projection state cache for Query Service integration (v0.7 feature)
63    /// Key format: "{projection_name}:{entity_id}"
64    /// This DashMap provides O(1) access with ~11.9 μs latency
65    projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
66}
67
68impl EventStore {
69    /// Create a new in-memory event store
70    pub fn new() -> Self {
71        Self::with_config(EventStoreConfig::default())
72    }
73
74    /// Create event store with custom configuration
75    pub fn with_config(config: EventStoreConfig) -> Self {
76        let mut projections = ProjectionManager::new();
77
78        // Register built-in projections
79        projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
80        projections.register(Arc::new(EventCounterProjection::new("event_counters")));
81
82        // Initialize persistent storage if configured
83        let storage = config
84            .storage_dir
85            .as_ref()
86            .and_then(|dir| match ParquetStorage::new(dir) {
87                Ok(storage) => {
88                    tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
89                    Some(Arc::new(RwLock::new(storage)))
90                }
91                Err(e) => {
92                    tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
93                    None
94                }
95            });
96
97        // Initialize WAL if configured (v0.2 feature)
98        let wal = config.wal_dir.as_ref().and_then(|dir| {
99            match WriteAheadLog::new(dir, config.wal_config.clone()) {
100                Ok(wal) => {
101                    tracing::info!("✅ WAL enabled at: {}", dir.display());
102                    Some(Arc::new(wal))
103                }
104                Err(e) => {
105                    tracing::error!("❌ Failed to initialize WAL: {}", e);
106                    None
107                }
108            }
109        });
110
111        // Initialize compaction manager if Parquet storage is enabled (v0.2 feature)
112        let compaction_manager = config.storage_dir.as_ref().map(|dir| {
113            let manager = CompactionManager::new(dir, config.compaction_config.clone());
114            Arc::new(manager)
115        });
116
117        // Initialize schema registry (v0.5 feature)
118        let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
119        tracing::info!("✅ Schema registry enabled");
120
121        // Initialize replay manager (v0.5 feature)
122        let replay_manager = Arc::new(ReplayManager::new());
123        tracing::info!("✅ Replay manager enabled");
124
125        // Initialize pipeline manager (v0.5 feature)
126        let pipeline_manager = Arc::new(PipelineManager::new());
127        tracing::info!("✅ Pipeline manager enabled");
128
129        // Initialize metrics registry (v0.6 feature)
130        let metrics = MetricsRegistry::new();
131        tracing::info!("✅ Prometheus metrics registry initialized");
132
133        // Initialize projection state cache (v0.7 feature)
134        let projection_state_cache = Arc::new(DashMap::new());
135        tracing::info!("✅ Projection state cache initialized");
136
137        let store = Self {
138            events: Arc::new(RwLock::new(Vec::new())),
139            index: Arc::new(EventIndex::new()),
140            projections: Arc::new(RwLock::new(projections)),
141            storage,
142            websocket_manager: Arc::new(WebSocketManager::new()),
143            snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
144            wal,
145            compaction_manager,
146            schema_registry,
147            replay_manager,
148            pipeline_manager,
149            metrics,
150            total_ingested: Arc::new(RwLock::new(0)),
151            projection_state_cache,
152        };
153
154        // Recover from WAL first (most recent data)
155        let mut wal_recovered = false;
156        if let Some(ref wal) = store.wal {
157            match wal.recover() {
158                Ok(recovered_events) if !recovered_events.is_empty() => {
159                    tracing::info!(
160                        "🔄 Recovering {} events from WAL...",
161                        recovered_events.len()
162                    );
163
164                    for event in recovered_events {
165                        // Re-index and process events from WAL
166                        let offset = store.events.read().len();
167                        if let Err(e) = store.index.index_event(
168                            event.id,
169                            event.entity_id_str(),
170                            event.event_type_str(),
171                            event.timestamp,
172                            offset,
173                        ) {
174                            tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
175                        }
176
177                        if let Err(e) = store.projections.read().process_event(&event) {
178                            tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
179                        }
180
181                        store.events.write().push(event);
182                    }
183
184                    let total = store.events.read().len();
185                    *store.total_ingested.write() = total as u64;
186                    tracing::info!("✅ Successfully recovered {} events from WAL", total);
187
188                    // After successful recovery, checkpoint to Parquet if enabled
189                    if store.storage.is_some() {
190                        tracing::info!("📸 Checkpointing WAL to Parquet storage...");
191                        if let Err(e) = store.flush_storage() {
192                            tracing::error!("Failed to checkpoint to Parquet: {}", e);
193                        } else if let Err(e) = wal.truncate() {
194                            tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
195                        } else {
196                            tracing::info!("✅ WAL checkpointed and truncated");
197                        }
198                    }
199
200                    wal_recovered = true;
201                }
202                Ok(_) => {
203                    tracing::debug!("No events to recover from WAL");
204                }
205                Err(e) => {
206                    tracing::error!("❌ WAL recovery failed: {}", e);
207                }
208            }
209        }
210
211        // Load persisted events from Parquet only if we didn't recover from WAL
212        // (to avoid loading the same events twice after WAL checkpoint)
213        if !wal_recovered {
214            if let Some(ref storage) = store.storage {
215                if let Ok(persisted_events) = storage.read().load_all_events() {
216                    tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
217
218                    for event in persisted_events {
219                        // Re-index loaded events
220                        let offset = store.events.read().len();
221                        if let Err(e) = store.index.index_event(
222                            event.id,
223                            event.entity_id_str(),
224                            event.event_type_str(),
225                            event.timestamp,
226                            offset,
227                        ) {
228                            tracing::error!("Failed to re-index event {}: {}", event.id, e);
229                        }
230
231                        // Re-process through projections
232                        if let Err(e) = store.projections.read().process_event(&event) {
233                            tracing::error!("Failed to re-process event {}: {}", event.id, e);
234                        }
235
236                        store.events.write().push(event);
237                    }
238
239                    let total = store.events.read().len();
240                    *store.total_ingested.write() = total as u64;
241                    tracing::info!("✅ Successfully loaded {} events from storage", total);
242                }
243            }
244        }
245
246        store
247    }
248
249    /// Ingest a new event into the store
250    pub fn ingest(&self, event: Event) -> Result<()> {
251        // Start metrics timer (v0.6 feature)
252        let timer = self.metrics.ingestion_duration_seconds.start_timer();
253
254        // Validate event
255        let validation_result = self.validate_event(&event);
256        if let Err(e) = validation_result {
257            // Record ingestion error
258            self.metrics.ingestion_errors_total.inc();
259            timer.observe_duration();
260            return Err(e);
261        }
262
263        // Write to WAL FIRST for durability (v0.2 feature)
264        // This ensures event is persisted before processing
265        if let Some(ref wal) = self.wal {
266            if let Err(e) = wal.append(event.clone()) {
267                self.metrics.ingestion_errors_total.inc();
268                timer.observe_duration();
269                return Err(e);
270            }
271        }
272
273        let mut events = self.events.write();
274        let offset = events.len();
275
276        // Index the event
277        self.index.index_event(
278            event.id,
279            event.entity_id_str(),
280            event.event_type_str(),
281            event.timestamp,
282            offset,
283        )?;
284
285        // Process through projections
286        let projections = self.projections.read();
287        projections.process_event(&event)?;
288        drop(projections); // Release lock
289
290        // Process through pipelines (v0.5 feature)
291        // Pipelines can transform, filter, and aggregate events in real-time
292        let pipeline_results = self.pipeline_manager.process_event(&event);
293        if !pipeline_results.is_empty() {
294            tracing::debug!(
295                "Event {} processed by {} pipeline(s)",
296                event.id,
297                pipeline_results.len()
298            );
299            // Pipeline results could be stored, emitted, or forwarded elsewhere
300            // For now, we just log them for observability
301            for (pipeline_id, result) in pipeline_results {
302                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
303            }
304        }
305
306        // Persist to Parquet storage if enabled (v0.2)
307        if let Some(ref storage) = self.storage {
308            let mut storage = storage.write();
309            storage.append_event(event.clone())?;
310        }
311
312        // Store the event in memory
313        events.push(event.clone());
314        let total_events = events.len();
315        drop(events); // Release lock early
316
317        // Broadcast to WebSocket clients (v0.2 feature)
318        self.websocket_manager
319            .broadcast_event(Arc::new(event.clone()));
320
321        // Check if automatic snapshot should be created (v0.2 feature)
322        self.check_auto_snapshot(event.entity_id_str(), &event);
323
324        // Update metrics (v0.6 feature)
325        self.metrics.events_ingested_total.inc();
326        self.metrics
327            .events_ingested_by_type
328            .with_label_values(&[event.event_type_str()])
329            .inc();
330        self.metrics.storage_events_total.set(total_events as i64);
331
332        // Update legacy total counter
333        let mut total = self.total_ingested.write();
334        *total += 1;
335
336        timer.observe_duration();
337
338        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
339
340        Ok(())
341    }
342
343    /// Get the WebSocket manager for this store
344    pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
345        Arc::clone(&self.websocket_manager)
346    }
347
348    /// Get the snapshot manager for this store
349    pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
350        Arc::clone(&self.snapshot_manager)
351    }
352
353    /// Get the compaction manager for this store
354    pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
355        self.compaction_manager.as_ref().map(Arc::clone)
356    }
357
358    /// Get the schema registry for this store (v0.5 feature)
359    pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
360        Arc::clone(&self.schema_registry)
361    }
362
363    /// Get the replay manager for this store (v0.5 feature)
364    pub fn replay_manager(&self) -> Arc<ReplayManager> {
365        Arc::clone(&self.replay_manager)
366    }
367
368    /// Get the pipeline manager for this store (v0.5 feature)
369    pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
370        Arc::clone(&self.pipeline_manager)
371    }
372
373    /// Get the metrics registry for this store (v0.6 feature)
374    pub fn metrics(&self) -> Arc<MetricsRegistry> {
375        Arc::clone(&self.metrics)
376    }
377
378    /// Get the projection manager for this store (v0.7 feature)
379    pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
380        self.projections.read()
381    }
382
383    /// Get the projection state cache for this store (v0.7 feature)
384    /// Used by Elixir Query Service for state synchronization
385    pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
386        Arc::clone(&self.projection_state_cache)
387    }
388
389    /// Manually flush any pending events to persistent storage
390    pub fn flush_storage(&self) -> Result<()> {
391        if let Some(ref storage) = self.storage {
392            let mut storage = storage.write();
393            storage.flush()?;
394            tracing::info!("✅ Flushed events to persistent storage");
395        }
396        Ok(())
397    }
398
399    /// Manually create a snapshot for an entity
400    pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
401        // Get all events for this entity
402        let events = self.query(QueryEventsRequest {
403            entity_id: Some(entity_id.to_string()),
404            event_type: None,
405            tenant_id: None,
406            as_of: None,
407            since: None,
408            until: None,
409            limit: None,
410        })?;
411
412        if events.is_empty() {
413            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
414        }
415
416        // Build current state
417        let mut state = serde_json::json!({});
418        for event in &events {
419            if let serde_json::Value::Object(ref mut state_map) = state {
420                if let serde_json::Value::Object(ref payload_map) = event.payload {
421                    for (key, value) in payload_map {
422                        state_map.insert(key.clone(), value.clone());
423                    }
424                }
425            }
426        }
427
428        let last_event = events.last().unwrap();
429        self.snapshot_manager.create_snapshot(
430            entity_id.to_string(),
431            state,
432            last_event.timestamp,
433            events.len(),
434            SnapshotType::Manual,
435        )?;
436
437        Ok(())
438    }
439
440    /// Check and create automatic snapshots if needed
441    fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
442        // Count events for this entity
443        let entity_event_count = self
444            .index
445            .get_by_entity(entity_id)
446            .map(|entries| entries.len())
447            .unwrap_or(0);
448
449        if self.snapshot_manager.should_create_snapshot(
450            entity_id,
451            entity_event_count,
452            event.timestamp,
453        ) {
454            // Create snapshot in background (don't block ingestion)
455            if let Err(e) = self.create_snapshot(entity_id) {
456                tracing::warn!(
457                    "Failed to create automatic snapshot for {}: {}",
458                    entity_id,
459                    e
460                );
461            }
462        }
463    }
464
465    /// Validate an event before ingestion
466    fn validate_event(&self, event: &Event) -> Result<()> {
467        // EntityId and EventType value objects already validate non-empty in their constructors
468        // So these checks are now redundant, but we keep them for explicit validation
469        if event.entity_id_str().is_empty() {
470            return Err(AllSourceError::ValidationError(
471                "entity_id cannot be empty".to_string(),
472            ));
473        }
474
475        if event.event_type_str().is_empty() {
476            return Err(AllSourceError::ValidationError(
477                "event_type cannot be empty".to_string(),
478            ));
479        }
480
481        Ok(())
482    }
483
484    /// Query events based on filters (optimized with indices)
485    pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
486        // Determine query type for metrics (v0.6 feature)
487        let query_type = if request.entity_id.is_some() {
488            "entity"
489        } else if request.event_type.is_some() {
490            "type"
491        } else {
492            "full_scan"
493        };
494
495        // Start metrics timer (v0.6 feature)
496        let timer = self
497            .metrics
498            .query_duration_seconds
499            .with_label_values(&[query_type])
500            .start_timer();
501
502        // Increment query counter (v0.6 feature)
503        self.metrics
504            .queries_total
505            .with_label_values(&[query_type])
506            .inc();
507
508        let events = self.events.read();
509
510        // Use index for fast lookups
511        let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
512            // Use entity index
513            self.index
514                .get_by_entity(entity_id)
515                .map(|entries| self.filter_entries(entries, &request))
516                .unwrap_or_default()
517        } else if let Some(event_type) = &request.event_type {
518            // Use type index
519            self.index
520                .get_by_type(event_type)
521                .map(|entries| self.filter_entries(entries, &request))
522                .unwrap_or_default()
523        } else {
524            // Full scan (less efficient but necessary for complex queries)
525            (0..events.len()).collect()
526        };
527
528        // Fetch events and apply remaining filters
529        let mut results: Vec<Event> = offsets
530            .iter()
531            .filter_map(|&offset| events.get(offset).cloned())
532            .filter(|event| self.apply_filters(event, &request))
533            .collect();
534
535        // Sort by timestamp (ascending)
536        results.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
537
538        // Apply limit
539        if let Some(limit) = request.limit {
540            results.truncate(limit);
541        }
542
543        // Record query results count (v0.6 feature)
544        self.metrics
545            .query_results_total
546            .with_label_values(&[query_type])
547            .inc_by(results.len() as u64);
548
549        timer.observe_duration();
550
551        Ok(results)
552    }
553
554    /// Filter index entries based on query parameters
555    fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
556        entries
557            .into_iter()
558            .filter(|entry| {
559                // Time filters
560                if let Some(as_of) = request.as_of {
561                    if entry.timestamp > as_of {
562                        return false;
563                    }
564                }
565                if let Some(since) = request.since {
566                    if entry.timestamp < since {
567                        return false;
568                    }
569                }
570                if let Some(until) = request.until {
571                    if entry.timestamp > until {
572                        return false;
573                    }
574                }
575                true
576            })
577            .map(|entry| entry.offset)
578            .collect()
579    }
580
581    /// Apply filters to an event
582    fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
583        // Additional type filter if entity was primary
584        if request.entity_id.is_some() {
585            if let Some(ref event_type) = request.event_type {
586                if event.event_type_str() != event_type {
587                    return false;
588                }
589            }
590        }
591
592        true
593    }
594
595    /// Reconstruct entity state as of a specific timestamp
596    /// v0.2: Now uses snapshots for fast reconstruction
597    pub fn reconstruct_state(
598        &self,
599        entity_id: &str,
600        as_of: Option<DateTime<Utc>>,
601    ) -> Result<serde_json::Value> {
602        // Try to find a snapshot to use as a base (v0.2 optimization)
603        let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
604            // Get snapshot closest to requested time
605            if let Some(snapshot) = self
606                .snapshot_manager
607                .get_snapshot_as_of(entity_id, as_of_time)
608            {
609                tracing::debug!(
610                    "Using snapshot from {} for entity {} (saved {} events)",
611                    snapshot.as_of,
612                    entity_id,
613                    snapshot.event_count
614                );
615                (snapshot.state.clone(), Some(snapshot.as_of))
616            } else {
617                (serde_json::json!({}), None)
618            }
619        } else {
620            // Get latest snapshot for current state
621            if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
622                tracing::debug!(
623                    "Using latest snapshot from {} for entity {}",
624                    snapshot.as_of,
625                    entity_id
626                );
627                (snapshot.state.clone(), Some(snapshot.as_of))
628            } else {
629                (serde_json::json!({}), None)
630            }
631        };
632
633        // Query events after the snapshot (or all if no snapshot)
634        let events = self.query(QueryEventsRequest {
635            entity_id: Some(entity_id.to_string()),
636            event_type: None,
637            tenant_id: None,
638            as_of,
639            since: since_timestamp,
640            until: None,
641            limit: None,
642        })?;
643
644        // If no events and no snapshot, entity not found
645        if events.is_empty() && since_timestamp.is_none() {
646            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
647        }
648
649        // Merge events on top of snapshot (or from scratch if no snapshot)
650        let mut merged_state = merged_state;
651        for event in &events {
652            if let serde_json::Value::Object(ref mut state_map) = merged_state {
653                if let serde_json::Value::Object(ref payload_map) = event.payload {
654                    for (key, value) in payload_map {
655                        state_map.insert(key.clone(), value.clone());
656                    }
657                }
658            }
659        }
660
661        // Wrap with metadata
662        let state = serde_json::json!({
663            "entity_id": entity_id,
664            "last_updated": events.last().map(|e| e.timestamp),
665            "event_count": events.len(),
666            "as_of": as_of,
667            "current_state": merged_state,
668            "history": events.iter().map(|e| {
669                serde_json::json!({
670                    "event_id": e.id,
671                    "type": e.event_type,
672                    "timestamp": e.timestamp,
673                    "payload": e.payload
674                })
675            }).collect::<Vec<_>>()
676        });
677
678        Ok(state)
679    }
680
681    /// Get snapshot from projection (faster than reconstructing)
682    pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
683        let projections = self.projections.read();
684
685        if let Some(snapshot_projection) = projections.get_projection("entity_snapshots") {
686            if let Some(state) = snapshot_projection.get_state(entity_id) {
687                return Ok(serde_json::json!({
688                    "entity_id": entity_id,
689                    "snapshot": state,
690                    "from_projection": "entity_snapshots"
691                }));
692            }
693        }
694
695        Err(AllSourceError::EntityNotFound(entity_id.to_string()))
696    }
697
698    /// Get statistics about the event store
699    pub fn stats(&self) -> StoreStats {
700        let events = self.events.read();
701        let index_stats = self.index.stats();
702
703        StoreStats {
704            total_events: events.len(),
705            total_entities: index_stats.total_entities,
706            total_event_types: index_stats.total_event_types,
707            total_ingested: *self.total_ingested.read(),
708        }
709    }
710}
711
712/// Configuration for EventStore
713#[derive(Debug, Clone)]
714pub struct EventStoreConfig {
715    /// Optional directory for persistent Parquet storage (v0.2 feature)
716    pub storage_dir: Option<PathBuf>,
717
718    /// Snapshot configuration (v0.2 feature)
719    pub snapshot_config: SnapshotConfig,
720
721    /// Optional directory for WAL (Write-Ahead Log) (v0.2 feature)
722    pub wal_dir: Option<PathBuf>,
723
724    /// WAL configuration (v0.2 feature)
725    pub wal_config: WALConfig,
726
727    /// Compaction configuration (v0.2 feature)
728    pub compaction_config: CompactionConfig,
729
730    /// Schema registry configuration (v0.5 feature)
731    pub schema_registry_config: SchemaRegistryConfig,
732}
733
734impl Default for EventStoreConfig {
735    fn default() -> Self {
736        Self {
737            storage_dir: None,
738            snapshot_config: SnapshotConfig::default(),
739            wal_dir: None,
740            wal_config: WALConfig::default(),
741            compaction_config: CompactionConfig::default(),
742            schema_registry_config: SchemaRegistryConfig::default(),
743        }
744    }
745}
746
747impl EventStoreConfig {
748    /// Create config with persistent storage enabled
749    pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
750        Self {
751            storage_dir: Some(storage_dir.into()),
752            snapshot_config: SnapshotConfig::default(),
753            wal_dir: None,
754            wal_config: WALConfig::default(),
755            compaction_config: CompactionConfig::default(),
756            schema_registry_config: SchemaRegistryConfig::default(),
757        }
758    }
759
760    /// Create config with custom snapshot settings
761    pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
762        Self {
763            storage_dir: None,
764            snapshot_config,
765            wal_dir: None,
766            wal_config: WALConfig::default(),
767            compaction_config: CompactionConfig::default(),
768            schema_registry_config: SchemaRegistryConfig::default(),
769        }
770    }
771
772    /// Create config with WAL enabled
773    pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
774        Self {
775            storage_dir: None,
776            snapshot_config: SnapshotConfig::default(),
777            wal_dir: Some(wal_dir.into()),
778            wal_config,
779            compaction_config: CompactionConfig::default(),
780            schema_registry_config: SchemaRegistryConfig::default(),
781        }
782    }
783
784    /// Create config with both persistence and snapshots
785    pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
786        Self {
787            storage_dir: Some(storage_dir.into()),
788            snapshot_config,
789            wal_dir: None,
790            wal_config: WALConfig::default(),
791            compaction_config: CompactionConfig::default(),
792            schema_registry_config: SchemaRegistryConfig::default(),
793        }
794    }
795
796    /// Create production config with all features enabled
797    pub fn production(
798        storage_dir: impl Into<PathBuf>,
799        wal_dir: impl Into<PathBuf>,
800        snapshot_config: SnapshotConfig,
801        wal_config: WALConfig,
802        compaction_config: CompactionConfig,
803    ) -> Self {
804        Self {
805            storage_dir: Some(storage_dir.into()),
806            snapshot_config,
807            wal_dir: Some(wal_dir.into()),
808            wal_config,
809            compaction_config,
810            schema_registry_config: SchemaRegistryConfig::default(),
811        }
812    }
813}
814
815#[derive(Debug, serde::Serialize)]
816pub struct StoreStats {
817    pub total_events: usize,
818    pub total_entities: usize,
819    pub total_event_types: usize,
820    pub total_ingested: u64,
821}
822
823impl Default for EventStore {
824    fn default() -> Self {
825        Self::new()
826    }
827}
828
829// Tests for store are covered in integration tests