allsource_core/
store.rs

1use crate::compaction::{CompactionConfig, CompactionManager};
2use crate::domain::entities::Event;
3use crate::error::{AllSourceError, Result};
4use crate::application::dto::QueryEventsRequest;
5use crate::index::{EventIndex, IndexEntry};
6use crate::metrics::MetricsRegistry;
7use crate::pipeline::PipelineManager;
8use crate::projection::{
9    EntitySnapshotProjection, EventCounterProjection, ProjectionManager,
10};
11use crate::replay::ReplayManager;
12use crate::schema::{SchemaRegistry, SchemaRegistryConfig};
13use crate::snapshot::{SnapshotConfig, SnapshotManager, SnapshotType};
14use crate::storage::ParquetStorage;
15use crate::wal::{WALConfig, WriteAheadLog};
16use crate::websocket::WebSocketManager;
17use chrono::{DateTime, Utc};
18use parking_lot::RwLock;
19use std::path::PathBuf;
20use std::sync::Arc;
21
22/// High-performance event store with columnar storage
23pub struct EventStore {
24    /// In-memory event storage
25    events: Arc<RwLock<Vec<Event>>>,
26
27    /// High-performance concurrent index
28    index: Arc<EventIndex>,
29
30    /// Projection manager for real-time aggregations
31    pub(crate) projections: Arc<RwLock<ProjectionManager>>,
32
33    /// Optional persistent storage (v0.2 feature)
34    storage: Option<Arc<RwLock<ParquetStorage>>>,
35
36    /// WebSocket manager for real-time event streaming (v0.2 feature)
37    websocket_manager: Arc<WebSocketManager>,
38
39    /// Snapshot manager for fast state recovery (v0.2 feature)
40    snapshot_manager: Arc<SnapshotManager>,
41
42    /// Write-Ahead Log for durability (v0.2 feature)
43    wal: Option<Arc<WriteAheadLog>>,
44
45    /// Compaction manager for Parquet optimization (v0.2 feature)
46    compaction_manager: Option<Arc<CompactionManager>>,
47
48    /// Schema registry for event validation (v0.5 feature)
49    schema_registry: Arc<SchemaRegistry>,
50
51    /// Replay manager for event replay and projection rebuilding (v0.5 feature)
52    replay_manager: Arc<ReplayManager>,
53
54    /// Pipeline manager for stream processing (v0.5 feature)
55    pipeline_manager: Arc<PipelineManager>,
56
57    /// Prometheus metrics registry (v0.6 feature)
58    metrics: Arc<MetricsRegistry>,
59
60    /// Total events ingested (for metrics)
61    total_ingested: Arc<RwLock<u64>>,
62}
63
64impl EventStore {
65    /// Create a new in-memory event store
66    pub fn new() -> Self {
67        Self::with_config(EventStoreConfig::default())
68    }
69
70    /// Create event store with custom configuration
71    pub fn with_config(config: EventStoreConfig) -> Self {
72        let mut projections = ProjectionManager::new();
73
74        // Register built-in projections
75        projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
76        projections.register(Arc::new(EventCounterProjection::new("event_counters")));
77
78        // Initialize persistent storage if configured
79        let storage = config.storage_dir.as_ref().and_then(|dir| {
80            match ParquetStorage::new(dir) {
81                Ok(storage) => {
82                    tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
83                    Some(Arc::new(RwLock::new(storage)))
84                }
85                Err(e) => {
86                    tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
87                    None
88                }
89            }
90        });
91
92        // Initialize WAL if configured (v0.2 feature)
93        let wal = config.wal_dir.as_ref().and_then(|dir| {
94            match WriteAheadLog::new(dir, config.wal_config.clone()) {
95                Ok(wal) => {
96                    tracing::info!("✅ WAL enabled at: {}", dir.display());
97                    Some(Arc::new(wal))
98                }
99                Err(e) => {
100                    tracing::error!("❌ Failed to initialize WAL: {}", e);
101                    None
102                }
103            }
104        });
105
106        // Initialize compaction manager if Parquet storage is enabled (v0.2 feature)
107        let compaction_manager = config.storage_dir.as_ref().map(|dir| {
108            let manager = CompactionManager::new(dir, config.compaction_config.clone());
109            Arc::new(manager)
110        });
111
112        // Initialize schema registry (v0.5 feature)
113        let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
114        tracing::info!("✅ Schema registry enabled");
115
116        // Initialize replay manager (v0.5 feature)
117        let replay_manager = Arc::new(ReplayManager::new());
118        tracing::info!("✅ Replay manager enabled");
119
120        // Initialize pipeline manager (v0.5 feature)
121        let pipeline_manager = Arc::new(PipelineManager::new());
122        tracing::info!("✅ Pipeline manager enabled");
123
124        // Initialize metrics registry (v0.6 feature)
125        let metrics = MetricsRegistry::new();
126        tracing::info!("✅ Prometheus metrics registry initialized");
127
128        let store = Self {
129            events: Arc::new(RwLock::new(Vec::new())),
130            index: Arc::new(EventIndex::new()),
131            projections: Arc::new(RwLock::new(projections)),
132            storage,
133            websocket_manager: Arc::new(WebSocketManager::new()),
134            snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
135            wal,
136            compaction_manager,
137            schema_registry,
138            replay_manager,
139            pipeline_manager,
140            metrics,
141            total_ingested: Arc::new(RwLock::new(0)),
142        };
143
144        // Recover from WAL first (most recent data)
145        let mut wal_recovered = false;
146        if let Some(ref wal) = store.wal {
147            match wal.recover() {
148                Ok(recovered_events) if !recovered_events.is_empty() => {
149                    tracing::info!("🔄 Recovering {} events from WAL...", recovered_events.len());
150
151                    for event in recovered_events {
152                        // Re-index and process events from WAL
153                        let offset = store.events.read().len();
154                        if let Err(e) = store.index.index_event(
155                            event.id,
156                            event.entity_id_str(),
157                            event.event_type_str(),
158                            event.timestamp,
159                            offset,
160                        ) {
161                            tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
162                        }
163
164                        if let Err(e) = store.projections.read().process_event(&event) {
165                            tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
166                        }
167
168                        store.events.write().push(event);
169                    }
170
171                    let total = store.events.read().len();
172                    *store.total_ingested.write() = total as u64;
173                    tracing::info!("✅ Successfully recovered {} events from WAL", total);
174
175                    // After successful recovery, checkpoint to Parquet if enabled
176                    if store.storage.is_some() {
177                        tracing::info!("📸 Checkpointing WAL to Parquet storage...");
178                        if let Err(e) = store.flush_storage() {
179                            tracing::error!("Failed to checkpoint to Parquet: {}", e);
180                        } else if let Err(e) = wal.truncate() {
181                            tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
182                        } else {
183                            tracing::info!("✅ WAL checkpointed and truncated");
184                        }
185                    }
186
187                    wal_recovered = true;
188                }
189                Ok(_) => {
190                    tracing::debug!("No events to recover from WAL");
191                }
192                Err(e) => {
193                    tracing::error!("❌ WAL recovery failed: {}", e);
194                }
195            }
196        }
197
198        // Load persisted events from Parquet only if we didn't recover from WAL
199        // (to avoid loading the same events twice after WAL checkpoint)
200        if !wal_recovered {
201            if let Some(ref storage) = store.storage {
202                if let Ok(persisted_events) = storage.read().load_all_events() {
203                    tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
204
205                    for event in persisted_events {
206                        // Re-index loaded events
207                        let offset = store.events.read().len();
208                        if let Err(e) = store.index.index_event(
209                            event.id,
210                            event.entity_id_str(),
211                            event.event_type_str(),
212                            event.timestamp,
213                            offset,
214                        ) {
215                            tracing::error!("Failed to re-index event {}: {}", event.id, e);
216                        }
217
218                        // Re-process through projections
219                        if let Err(e) = store.projections.read().process_event(&event) {
220                            tracing::error!("Failed to re-process event {}: {}", event.id, e);
221                        }
222
223                        store.events.write().push(event);
224                    }
225
226                    let total = store.events.read().len();
227                    *store.total_ingested.write() = total as u64;
228                    tracing::info!("✅ Successfully loaded {} events from storage", total);
229                }
230            }
231        }
232
233        store
234    }
235
236    /// Ingest a new event into the store
237    pub fn ingest(&self, event: Event) -> Result<()> {
238        // Start metrics timer (v0.6 feature)
239        let timer = self.metrics.ingestion_duration_seconds.start_timer();
240
241        // Validate event
242        let validation_result = self.validate_event(&event);
243        if let Err(e) = validation_result {
244            // Record ingestion error
245            self.metrics.ingestion_errors_total.inc();
246            timer.observe_duration();
247            return Err(e);
248        }
249
250        // Write to WAL FIRST for durability (v0.2 feature)
251        // This ensures event is persisted before processing
252        if let Some(ref wal) = self.wal {
253            if let Err(e) = wal.append(event.clone()) {
254                self.metrics.ingestion_errors_total.inc();
255                timer.observe_duration();
256                return Err(e);
257            }
258        }
259
260        let mut events = self.events.write();
261        let offset = events.len();
262
263        // Index the event
264        self.index.index_event(
265            event.id,
266            event.entity_id_str(),
267            event.event_type_str(),
268            event.timestamp,
269            offset,
270        )?;
271
272        // Process through projections
273        let projections = self.projections.read();
274        projections.process_event(&event)?;
275        drop(projections); // Release lock
276
277        // Process through pipelines (v0.5 feature)
278        // Pipelines can transform, filter, and aggregate events in real-time
279        let pipeline_results = self.pipeline_manager.process_event(&event);
280        if !pipeline_results.is_empty() {
281            tracing::debug!(
282                "Event {} processed by {} pipeline(s)",
283                event.id,
284                pipeline_results.len()
285            );
286            // Pipeline results could be stored, emitted, or forwarded elsewhere
287            // For now, we just log them for observability
288            for (pipeline_id, result) in pipeline_results {
289                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
290            }
291        }
292
293        // Persist to Parquet storage if enabled (v0.2)
294        if let Some(ref storage) = self.storage {
295            let mut storage = storage.write();
296            storage.append_event(event.clone())?;
297        }
298
299        // Store the event in memory
300        events.push(event.clone());
301        let total_events = events.len();
302        drop(events); // Release lock early
303
304        // Broadcast to WebSocket clients (v0.2 feature)
305        self.websocket_manager.broadcast_event(Arc::new(event.clone()));
306
307        // Check if automatic snapshot should be created (v0.2 feature)
308        self.check_auto_snapshot(event.entity_id_str(), &event);
309
310        // Update metrics (v0.6 feature)
311        self.metrics.events_ingested_total.inc();
312        self.metrics.events_ingested_by_type
313            .with_label_values(&[event.event_type_str()])
314            .inc();
315        self.metrics.storage_events_total.set(total_events as i64);
316
317        // Update legacy total counter
318        let mut total = self.total_ingested.write();
319        *total += 1;
320
321        timer.observe_duration();
322
323        tracing::debug!(
324            "Event ingested: {} (offset: {})",
325            event.id,
326            offset
327        );
328
329        Ok(())
330    }
331
332    /// Get the WebSocket manager for this store
333    pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
334        Arc::clone(&self.websocket_manager)
335    }
336
337    /// Get the snapshot manager for this store
338    pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
339        Arc::clone(&self.snapshot_manager)
340    }
341
342    /// Get the compaction manager for this store
343    pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
344        self.compaction_manager.as_ref().map(Arc::clone)
345    }
346
347    /// Get the schema registry for this store (v0.5 feature)
348    pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
349        Arc::clone(&self.schema_registry)
350    }
351
352    /// Get the replay manager for this store (v0.5 feature)
353    pub fn replay_manager(&self) -> Arc<ReplayManager> {
354        Arc::clone(&self.replay_manager)
355    }
356
357    /// Get the pipeline manager for this store (v0.5 feature)
358    pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
359        Arc::clone(&self.pipeline_manager)
360    }
361
362    /// Get the metrics registry for this store (v0.6 feature)
363    pub fn metrics(&self) -> Arc<MetricsRegistry> {
364        Arc::clone(&self.metrics)
365    }
366
367    /// Manually flush any pending events to persistent storage
368    pub fn flush_storage(&self) -> Result<()> {
369        if let Some(ref storage) = self.storage {
370            let mut storage = storage.write();
371            storage.flush()?;
372            tracing::info!("✅ Flushed events to persistent storage");
373        }
374        Ok(())
375    }
376
377    /// Manually create a snapshot for an entity
378    pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
379        // Get all events for this entity
380        let events = self.query(QueryEventsRequest {
381            entity_id: Some(entity_id.to_string()),
382            event_type: None,
383            tenant_id: None,
384            as_of: None,
385            since: None,
386            until: None,
387            limit: None,
388        })?;
389
390        if events.is_empty() {
391            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
392        }
393
394        // Build current state
395        let mut state = serde_json::json!({});
396        for event in &events {
397            if let serde_json::Value::Object(ref mut state_map) = state {
398                if let serde_json::Value::Object(ref payload_map) = event.payload {
399                    for (key, value) in payload_map {
400                        state_map.insert(key.clone(), value.clone());
401                    }
402                }
403            }
404        }
405
406        let last_event = events.last().unwrap();
407        self.snapshot_manager.create_snapshot(
408            entity_id.to_string(),
409            state,
410            last_event.timestamp,
411            events.len(),
412            SnapshotType::Manual,
413        )?;
414
415        Ok(())
416    }
417
418    /// Check and create automatic snapshots if needed
419    fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
420        // Count events for this entity
421        let entity_event_count = self
422            .index
423            .get_by_entity(entity_id)
424            .map(|entries| entries.len())
425            .unwrap_or(0);
426
427        if self.snapshot_manager.should_create_snapshot(
428            entity_id,
429            entity_event_count,
430            event.timestamp,
431        ) {
432            // Create snapshot in background (don't block ingestion)
433            if let Err(e) = self.create_snapshot(entity_id) {
434                tracing::warn!(
435                    "Failed to create automatic snapshot for {}: {}",
436                    entity_id,
437                    e
438                );
439            }
440        }
441    }
442
443    /// Validate an event before ingestion
444    fn validate_event(&self, event: &Event) -> Result<()> {
445        // EntityId and EventType value objects already validate non-empty in their constructors
446        // So these checks are now redundant, but we keep them for explicit validation
447        if event.entity_id_str().is_empty() {
448            return Err(AllSourceError::ValidationError(
449                "entity_id cannot be empty".to_string(),
450            ));
451        }
452
453        if event.event_type_str().is_empty() {
454            return Err(AllSourceError::ValidationError(
455                "event_type cannot be empty".to_string(),
456            ));
457        }
458
459        Ok(())
460    }
461
462    /// Query events based on filters (optimized with indices)
463    pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
464        // Determine query type for metrics (v0.6 feature)
465        let query_type = if request.entity_id.is_some() {
466            "entity"
467        } else if request.event_type.is_some() {
468            "type"
469        } else {
470            "full_scan"
471        };
472
473        // Start metrics timer (v0.6 feature)
474        let timer = self.metrics.query_duration_seconds
475            .with_label_values(&[query_type])
476            .start_timer();
477
478        // Increment query counter (v0.6 feature)
479        self.metrics.queries_total
480            .with_label_values(&[query_type])
481            .inc();
482
483        let events = self.events.read();
484
485        // Use index for fast lookups
486        let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
487            // Use entity index
488            self.index
489                .get_by_entity(entity_id)
490                .map(|entries| self.filter_entries(entries, &request))
491                .unwrap_or_default()
492        } else if let Some(event_type) = &request.event_type {
493            // Use type index
494            self.index
495                .get_by_type(event_type)
496                .map(|entries| self.filter_entries(entries, &request))
497                .unwrap_or_default()
498        } else {
499            // Full scan (less efficient but necessary for complex queries)
500            (0..events.len()).collect()
501        };
502
503        // Fetch events and apply remaining filters
504        let mut results: Vec<Event> = offsets
505            .iter()
506            .filter_map(|&offset| events.get(offset).cloned())
507            .filter(|event| self.apply_filters(event, &request))
508            .collect();
509
510        // Sort by timestamp (ascending)
511        results.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
512
513        // Apply limit
514        if let Some(limit) = request.limit {
515            results.truncate(limit);
516        }
517
518        // Record query results count (v0.6 feature)
519        self.metrics.query_results_total
520            .with_label_values(&[query_type])
521            .inc_by(results.len() as u64);
522
523        timer.observe_duration();
524
525        Ok(results)
526    }
527
528    /// Filter index entries based on query parameters
529    fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
530        entries
531            .into_iter()
532            .filter(|entry| {
533                // Time filters
534                if let Some(as_of) = request.as_of {
535                    if entry.timestamp > as_of {
536                        return false;
537                    }
538                }
539                if let Some(since) = request.since {
540                    if entry.timestamp < since {
541                        return false;
542                    }
543                }
544                if let Some(until) = request.until {
545                    if entry.timestamp > until {
546                        return false;
547                    }
548                }
549                true
550            })
551            .map(|entry| entry.offset)
552            .collect()
553    }
554
555    /// Apply filters to an event
556    fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
557        // Additional type filter if entity was primary
558        if request.entity_id.is_some() {
559            if let Some(ref event_type) = request.event_type {
560                if event.event_type_str() != event_type {
561                    return false;
562                }
563            }
564        }
565
566        true
567    }
568
569    /// Reconstruct entity state as of a specific timestamp
570    /// v0.2: Now uses snapshots for fast reconstruction
571    pub fn reconstruct_state(
572        &self,
573        entity_id: &str,
574        as_of: Option<DateTime<Utc>>,
575    ) -> Result<serde_json::Value> {
576        // Try to find a snapshot to use as a base (v0.2 optimization)
577        let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
578            // Get snapshot closest to requested time
579            if let Some(snapshot) = self.snapshot_manager.get_snapshot_as_of(entity_id, as_of_time) {
580                tracing::debug!(
581                    "Using snapshot from {} for entity {} (saved {} events)",
582                    snapshot.as_of,
583                    entity_id,
584                    snapshot.event_count
585                );
586                (snapshot.state.clone(), Some(snapshot.as_of))
587            } else {
588                (serde_json::json!({}), None)
589            }
590        } else {
591            // Get latest snapshot for current state
592            if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
593                tracing::debug!(
594                    "Using latest snapshot from {} for entity {}",
595                    snapshot.as_of,
596                    entity_id
597                );
598                (snapshot.state.clone(), Some(snapshot.as_of))
599            } else {
600                (serde_json::json!({}), None)
601            }
602        };
603
604        // Query events after the snapshot (or all if no snapshot)
605        let events = self.query(QueryEventsRequest {
606            entity_id: Some(entity_id.to_string()),
607            event_type: None,
608            tenant_id: None,
609            as_of,
610            since: since_timestamp,
611            until: None,
612            limit: None,
613        })?;
614
615        // If no events and no snapshot, entity not found
616        if events.is_empty() && since_timestamp.is_none() {
617            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
618        }
619
620        // Merge events on top of snapshot (or from scratch if no snapshot)
621        let mut merged_state = merged_state;
622        for event in &events {
623            if let serde_json::Value::Object(ref mut state_map) = merged_state {
624                if let serde_json::Value::Object(ref payload_map) = event.payload {
625                    for (key, value) in payload_map {
626                        state_map.insert(key.clone(), value.clone());
627                    }
628                }
629            }
630        }
631
632        // Wrap with metadata
633        let state = serde_json::json!({
634            "entity_id": entity_id,
635            "last_updated": events.last().map(|e| e.timestamp),
636            "event_count": events.len(),
637            "as_of": as_of,
638            "current_state": merged_state,
639            "history": events.iter().map(|e| {
640                serde_json::json!({
641                    "event_id": e.id,
642                    "type": e.event_type,
643                    "timestamp": e.timestamp,
644                    "payload": e.payload
645                })
646            }).collect::<Vec<_>>()
647        });
648
649        Ok(state)
650    }
651
652    /// Get snapshot from projection (faster than reconstructing)
653    pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
654        let projections = self.projections.read();
655
656        if let Some(snapshot_projection) = projections.get_projection("entity_snapshots") {
657            if let Some(state) = snapshot_projection.get_state(entity_id) {
658                return Ok(serde_json::json!({
659                    "entity_id": entity_id,
660                    "snapshot": state,
661                    "from_projection": "entity_snapshots"
662                }));
663            }
664        }
665
666        Err(AllSourceError::EntityNotFound(entity_id.to_string()))
667    }
668
669    /// Get statistics about the event store
670    pub fn stats(&self) -> StoreStats {
671        let events = self.events.read();
672        let index_stats = self.index.stats();
673
674        StoreStats {
675            total_events: events.len(),
676            total_entities: index_stats.total_entities,
677            total_event_types: index_stats.total_event_types,
678            total_ingested: *self.total_ingested.read(),
679        }
680    }
681}
682
683/// Configuration for EventStore
684#[derive(Debug, Clone)]
685pub struct EventStoreConfig {
686    /// Optional directory for persistent Parquet storage (v0.2 feature)
687    pub storage_dir: Option<PathBuf>,
688
689    /// Snapshot configuration (v0.2 feature)
690    pub snapshot_config: SnapshotConfig,
691
692    /// Optional directory for WAL (Write-Ahead Log) (v0.2 feature)
693    pub wal_dir: Option<PathBuf>,
694
695    /// WAL configuration (v0.2 feature)
696    pub wal_config: WALConfig,
697
698    /// Compaction configuration (v0.2 feature)
699    pub compaction_config: CompactionConfig,
700
701    /// Schema registry configuration (v0.5 feature)
702    pub schema_registry_config: SchemaRegistryConfig,
703}
704
705impl Default for EventStoreConfig {
706    fn default() -> Self {
707        Self {
708            storage_dir: None,
709            snapshot_config: SnapshotConfig::default(),
710            wal_dir: None,
711            wal_config: WALConfig::default(),
712            compaction_config: CompactionConfig::default(),
713            schema_registry_config: SchemaRegistryConfig::default(),
714        }
715    }
716}
717
718impl EventStoreConfig {
719    /// Create config with persistent storage enabled
720    pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
721        Self {
722            storage_dir: Some(storage_dir.into()),
723            snapshot_config: SnapshotConfig::default(),
724            wal_dir: None,
725            wal_config: WALConfig::default(),
726            compaction_config: CompactionConfig::default(),
727            schema_registry_config: SchemaRegistryConfig::default(),
728        }
729    }
730
731    /// Create config with custom snapshot settings
732    pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
733        Self {
734            storage_dir: None,
735            snapshot_config,
736            wal_dir: None,
737            wal_config: WALConfig::default(),
738            compaction_config: CompactionConfig::default(),
739            schema_registry_config: SchemaRegistryConfig::default(),
740        }
741    }
742
743    /// Create config with WAL enabled
744    pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
745        Self {
746            storage_dir: None,
747            snapshot_config: SnapshotConfig::default(),
748            wal_dir: Some(wal_dir.into()),
749            wal_config,
750            compaction_config: CompactionConfig::default(),
751            schema_registry_config: SchemaRegistryConfig::default(),
752        }
753    }
754
755    /// Create config with both persistence and snapshots
756    pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
757        Self {
758            storage_dir: Some(storage_dir.into()),
759            snapshot_config,
760            wal_dir: None,
761            wal_config: WALConfig::default(),
762            compaction_config: CompactionConfig::default(),
763            schema_registry_config: SchemaRegistryConfig::default(),
764        }
765    }
766
767    /// Create production config with all features enabled
768    pub fn production(
769        storage_dir: impl Into<PathBuf>,
770        wal_dir: impl Into<PathBuf>,
771        snapshot_config: SnapshotConfig,
772        wal_config: WALConfig,
773        compaction_config: CompactionConfig,
774    ) -> Self {
775        Self {
776            storage_dir: Some(storage_dir.into()),
777            snapshot_config,
778            wal_dir: Some(wal_dir.into()),
779            wal_config,
780            compaction_config,
781            schema_registry_config: SchemaRegistryConfig::default(),
782        }
783    }
784}
785
786#[derive(Debug, serde::Serialize)]
787pub struct StoreStats {
788    pub total_events: usize,
789    pub total_entities: usize,
790    pub total_event_types: usize,
791    pub total_ingested: u64,
792}
793
794impl Default for EventStore {
795    fn default() -> Self {
796        Self::new()
797    }
798}
799
800// Tests for store are covered in integration tests