allsource_core/
store.rs

1use crate::application::dto::QueryEventsRequest;
2use crate::compaction::{CompactionConfig, CompactionManager};
3use crate::domain::entities::Event;
4use crate::error::{AllSourceError, Result};
5use crate::index::{EventIndex, IndexEntry};
6use crate::metrics::MetricsRegistry;
7use crate::pipeline::PipelineManager;
8use crate::projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager};
9use crate::replay::ReplayManager;
10use crate::schema::{SchemaRegistry, SchemaRegistryConfig};
11use crate::snapshot::{SnapshotConfig, SnapshotManager, SnapshotType};
12use crate::storage::ParquetStorage;
13use crate::wal::{WALConfig, WriteAheadLog};
14use crate::websocket::WebSocketManager;
15use chrono::{DateTime, Utc};
16use parking_lot::RwLock;
17use std::path::PathBuf;
18use std::sync::Arc;
19
20/// High-performance event store with columnar storage
21pub struct EventStore {
22    /// In-memory event storage
23    events: Arc<RwLock<Vec<Event>>>,
24
25    /// High-performance concurrent index
26    index: Arc<EventIndex>,
27
28    /// Projection manager for real-time aggregations
29    pub(crate) projections: Arc<RwLock<ProjectionManager>>,
30
31    /// Optional persistent storage (v0.2 feature)
32    storage: Option<Arc<RwLock<ParquetStorage>>>,
33
34    /// WebSocket manager for real-time event streaming (v0.2 feature)
35    websocket_manager: Arc<WebSocketManager>,
36
37    /// Snapshot manager for fast state recovery (v0.2 feature)
38    snapshot_manager: Arc<SnapshotManager>,
39
40    /// Write-Ahead Log for durability (v0.2 feature)
41    wal: Option<Arc<WriteAheadLog>>,
42
43    /// Compaction manager for Parquet optimization (v0.2 feature)
44    compaction_manager: Option<Arc<CompactionManager>>,
45
46    /// Schema registry for event validation (v0.5 feature)
47    schema_registry: Arc<SchemaRegistry>,
48
49    /// Replay manager for event replay and projection rebuilding (v0.5 feature)
50    replay_manager: Arc<ReplayManager>,
51
52    /// Pipeline manager for stream processing (v0.5 feature)
53    pipeline_manager: Arc<PipelineManager>,
54
55    /// Prometheus metrics registry (v0.6 feature)
56    metrics: Arc<MetricsRegistry>,
57
58    /// Total events ingested (for metrics)
59    total_ingested: Arc<RwLock<u64>>,
60}
61
62impl EventStore {
63    /// Create a new in-memory event store
64    pub fn new() -> Self {
65        Self::with_config(EventStoreConfig::default())
66    }
67
68    /// Create event store with custom configuration
69    pub fn with_config(config: EventStoreConfig) -> Self {
70        let mut projections = ProjectionManager::new();
71
72        // Register built-in projections
73        projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
74        projections.register(Arc::new(EventCounterProjection::new("event_counters")));
75
76        // Initialize persistent storage if configured
77        let storage = config
78            .storage_dir
79            .as_ref()
80            .and_then(|dir| match ParquetStorage::new(dir) {
81                Ok(storage) => {
82                    tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
83                    Some(Arc::new(RwLock::new(storage)))
84                }
85                Err(e) => {
86                    tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
87                    None
88                }
89            });
90
91        // Initialize WAL if configured (v0.2 feature)
92        let wal = config.wal_dir.as_ref().and_then(|dir| {
93            match WriteAheadLog::new(dir, config.wal_config.clone()) {
94                Ok(wal) => {
95                    tracing::info!("✅ WAL enabled at: {}", dir.display());
96                    Some(Arc::new(wal))
97                }
98                Err(e) => {
99                    tracing::error!("❌ Failed to initialize WAL: {}", e);
100                    None
101                }
102            }
103        });
104
105        // Initialize compaction manager if Parquet storage is enabled (v0.2 feature)
106        let compaction_manager = config.storage_dir.as_ref().map(|dir| {
107            let manager = CompactionManager::new(dir, config.compaction_config.clone());
108            Arc::new(manager)
109        });
110
111        // Initialize schema registry (v0.5 feature)
112        let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
113        tracing::info!("✅ Schema registry enabled");
114
115        // Initialize replay manager (v0.5 feature)
116        let replay_manager = Arc::new(ReplayManager::new());
117        tracing::info!("✅ Replay manager enabled");
118
119        // Initialize pipeline manager (v0.5 feature)
120        let pipeline_manager = Arc::new(PipelineManager::new());
121        tracing::info!("✅ Pipeline manager enabled");
122
123        // Initialize metrics registry (v0.6 feature)
124        let metrics = MetricsRegistry::new();
125        tracing::info!("✅ Prometheus metrics registry initialized");
126
127        let store = Self {
128            events: Arc::new(RwLock::new(Vec::new())),
129            index: Arc::new(EventIndex::new()),
130            projections: Arc::new(RwLock::new(projections)),
131            storage,
132            websocket_manager: Arc::new(WebSocketManager::new()),
133            snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
134            wal,
135            compaction_manager,
136            schema_registry,
137            replay_manager,
138            pipeline_manager,
139            metrics,
140            total_ingested: Arc::new(RwLock::new(0)),
141        };
142
143        // Recover from WAL first (most recent data)
144        let mut wal_recovered = false;
145        if let Some(ref wal) = store.wal {
146            match wal.recover() {
147                Ok(recovered_events) if !recovered_events.is_empty() => {
148                    tracing::info!(
149                        "🔄 Recovering {} events from WAL...",
150                        recovered_events.len()
151                    );
152
153                    for event in recovered_events {
154                        // Re-index and process events from WAL
155                        let offset = store.events.read().len();
156                        if let Err(e) = store.index.index_event(
157                            event.id,
158                            event.entity_id_str(),
159                            event.event_type_str(),
160                            event.timestamp,
161                            offset,
162                        ) {
163                            tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
164                        }
165
166                        if let Err(e) = store.projections.read().process_event(&event) {
167                            tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
168                        }
169
170                        store.events.write().push(event);
171                    }
172
173                    let total = store.events.read().len();
174                    *store.total_ingested.write() = total as u64;
175                    tracing::info!("✅ Successfully recovered {} events from WAL", total);
176
177                    // After successful recovery, checkpoint to Parquet if enabled
178                    if store.storage.is_some() {
179                        tracing::info!("📸 Checkpointing WAL to Parquet storage...");
180                        if let Err(e) = store.flush_storage() {
181                            tracing::error!("Failed to checkpoint to Parquet: {}", e);
182                        } else if let Err(e) = wal.truncate() {
183                            tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
184                        } else {
185                            tracing::info!("✅ WAL checkpointed and truncated");
186                        }
187                    }
188
189                    wal_recovered = true;
190                }
191                Ok(_) => {
192                    tracing::debug!("No events to recover from WAL");
193                }
194                Err(e) => {
195                    tracing::error!("❌ WAL recovery failed: {}", e);
196                }
197            }
198        }
199
200        // Load persisted events from Parquet only if we didn't recover from WAL
201        // (to avoid loading the same events twice after WAL checkpoint)
202        if !wal_recovered {
203            if let Some(ref storage) = store.storage {
204                if let Ok(persisted_events) = storage.read().load_all_events() {
205                    tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
206
207                    for event in persisted_events {
208                        // Re-index loaded events
209                        let offset = store.events.read().len();
210                        if let Err(e) = store.index.index_event(
211                            event.id,
212                            event.entity_id_str(),
213                            event.event_type_str(),
214                            event.timestamp,
215                            offset,
216                        ) {
217                            tracing::error!("Failed to re-index event {}: {}", event.id, e);
218                        }
219
220                        // Re-process through projections
221                        if let Err(e) = store.projections.read().process_event(&event) {
222                            tracing::error!("Failed to re-process event {}: {}", event.id, e);
223                        }
224
225                        store.events.write().push(event);
226                    }
227
228                    let total = store.events.read().len();
229                    *store.total_ingested.write() = total as u64;
230                    tracing::info!("✅ Successfully loaded {} events from storage", total);
231                }
232            }
233        }
234
235        store
236    }
237
238    /// Ingest a new event into the store
239    pub fn ingest(&self, event: Event) -> Result<()> {
240        // Start metrics timer (v0.6 feature)
241        let timer = self.metrics.ingestion_duration_seconds.start_timer();
242
243        // Validate event
244        let validation_result = self.validate_event(&event);
245        if let Err(e) = validation_result {
246            // Record ingestion error
247            self.metrics.ingestion_errors_total.inc();
248            timer.observe_duration();
249            return Err(e);
250        }
251
252        // Write to WAL FIRST for durability (v0.2 feature)
253        // This ensures event is persisted before processing
254        if let Some(ref wal) = self.wal {
255            if let Err(e) = wal.append(event.clone()) {
256                self.metrics.ingestion_errors_total.inc();
257                timer.observe_duration();
258                return Err(e);
259            }
260        }
261
262        let mut events = self.events.write();
263        let offset = events.len();
264
265        // Index the event
266        self.index.index_event(
267            event.id,
268            event.entity_id_str(),
269            event.event_type_str(),
270            event.timestamp,
271            offset,
272        )?;
273
274        // Process through projections
275        let projections = self.projections.read();
276        projections.process_event(&event)?;
277        drop(projections); // Release lock
278
279        // Process through pipelines (v0.5 feature)
280        // Pipelines can transform, filter, and aggregate events in real-time
281        let pipeline_results = self.pipeline_manager.process_event(&event);
282        if !pipeline_results.is_empty() {
283            tracing::debug!(
284                "Event {} processed by {} pipeline(s)",
285                event.id,
286                pipeline_results.len()
287            );
288            // Pipeline results could be stored, emitted, or forwarded elsewhere
289            // For now, we just log them for observability
290            for (pipeline_id, result) in pipeline_results {
291                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
292            }
293        }
294
295        // Persist to Parquet storage if enabled (v0.2)
296        if let Some(ref storage) = self.storage {
297            let mut storage = storage.write();
298            storage.append_event(event.clone())?;
299        }
300
301        // Store the event in memory
302        events.push(event.clone());
303        let total_events = events.len();
304        drop(events); // Release lock early
305
306        // Broadcast to WebSocket clients (v0.2 feature)
307        self.websocket_manager
308            .broadcast_event(Arc::new(event.clone()));
309
310        // Check if automatic snapshot should be created (v0.2 feature)
311        self.check_auto_snapshot(event.entity_id_str(), &event);
312
313        // Update metrics (v0.6 feature)
314        self.metrics.events_ingested_total.inc();
315        self.metrics
316            .events_ingested_by_type
317            .with_label_values(&[event.event_type_str()])
318            .inc();
319        self.metrics.storage_events_total.set(total_events as i64);
320
321        // Update legacy total counter
322        let mut total = self.total_ingested.write();
323        *total += 1;
324
325        timer.observe_duration();
326
327        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
328
329        Ok(())
330    }
331
332    /// Get the WebSocket manager for this store
333    pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
334        Arc::clone(&self.websocket_manager)
335    }
336
337    /// Get the snapshot manager for this store
338    pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
339        Arc::clone(&self.snapshot_manager)
340    }
341
342    /// Get the compaction manager for this store
343    pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
344        self.compaction_manager.as_ref().map(Arc::clone)
345    }
346
347    /// Get the schema registry for this store (v0.5 feature)
348    pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
349        Arc::clone(&self.schema_registry)
350    }
351
352    /// Get the replay manager for this store (v0.5 feature)
353    pub fn replay_manager(&self) -> Arc<ReplayManager> {
354        Arc::clone(&self.replay_manager)
355    }
356
357    /// Get the pipeline manager for this store (v0.5 feature)
358    pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
359        Arc::clone(&self.pipeline_manager)
360    }
361
362    /// Get the metrics registry for this store (v0.6 feature)
363    pub fn metrics(&self) -> Arc<MetricsRegistry> {
364        Arc::clone(&self.metrics)
365    }
366
367    /// Manually flush any pending events to persistent storage
368    pub fn flush_storage(&self) -> Result<()> {
369        if let Some(ref storage) = self.storage {
370            let mut storage = storage.write();
371            storage.flush()?;
372            tracing::info!("✅ Flushed events to persistent storage");
373        }
374        Ok(())
375    }
376
377    /// Manually create a snapshot for an entity
378    pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
379        // Get all events for this entity
380        let events = self.query(QueryEventsRequest {
381            entity_id: Some(entity_id.to_string()),
382            event_type: None,
383            tenant_id: None,
384            as_of: None,
385            since: None,
386            until: None,
387            limit: None,
388        })?;
389
390        if events.is_empty() {
391            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
392        }
393
394        // Build current state
395        let mut state = serde_json::json!({});
396        for event in &events {
397            if let serde_json::Value::Object(ref mut state_map) = state {
398                if let serde_json::Value::Object(ref payload_map) = event.payload {
399                    for (key, value) in payload_map {
400                        state_map.insert(key.clone(), value.clone());
401                    }
402                }
403            }
404        }
405
406        let last_event = events.last().unwrap();
407        self.snapshot_manager.create_snapshot(
408            entity_id.to_string(),
409            state,
410            last_event.timestamp,
411            events.len(),
412            SnapshotType::Manual,
413        )?;
414
415        Ok(())
416    }
417
418    /// Check and create automatic snapshots if needed
419    fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
420        // Count events for this entity
421        let entity_event_count = self
422            .index
423            .get_by_entity(entity_id)
424            .map(|entries| entries.len())
425            .unwrap_or(0);
426
427        if self.snapshot_manager.should_create_snapshot(
428            entity_id,
429            entity_event_count,
430            event.timestamp,
431        ) {
432            // Create snapshot in background (don't block ingestion)
433            if let Err(e) = self.create_snapshot(entity_id) {
434                tracing::warn!(
435                    "Failed to create automatic snapshot for {}: {}",
436                    entity_id,
437                    e
438                );
439            }
440        }
441    }
442
443    /// Validate an event before ingestion
444    fn validate_event(&self, event: &Event) -> Result<()> {
445        // EntityId and EventType value objects already validate non-empty in their constructors
446        // So these checks are now redundant, but we keep them for explicit validation
447        if event.entity_id_str().is_empty() {
448            return Err(AllSourceError::ValidationError(
449                "entity_id cannot be empty".to_string(),
450            ));
451        }
452
453        if event.event_type_str().is_empty() {
454            return Err(AllSourceError::ValidationError(
455                "event_type cannot be empty".to_string(),
456            ));
457        }
458
459        Ok(())
460    }
461
462    /// Query events based on filters (optimized with indices)
463    pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
464        // Determine query type for metrics (v0.6 feature)
465        let query_type = if request.entity_id.is_some() {
466            "entity"
467        } else if request.event_type.is_some() {
468            "type"
469        } else {
470            "full_scan"
471        };
472
473        // Start metrics timer (v0.6 feature)
474        let timer = self
475            .metrics
476            .query_duration_seconds
477            .with_label_values(&[query_type])
478            .start_timer();
479
480        // Increment query counter (v0.6 feature)
481        self.metrics
482            .queries_total
483            .with_label_values(&[query_type])
484            .inc();
485
486        let events = self.events.read();
487
488        // Use index for fast lookups
489        let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
490            // Use entity index
491            self.index
492                .get_by_entity(entity_id)
493                .map(|entries| self.filter_entries(entries, &request))
494                .unwrap_or_default()
495        } else if let Some(event_type) = &request.event_type {
496            // Use type index
497            self.index
498                .get_by_type(event_type)
499                .map(|entries| self.filter_entries(entries, &request))
500                .unwrap_or_default()
501        } else {
502            // Full scan (less efficient but necessary for complex queries)
503            (0..events.len()).collect()
504        };
505
506        // Fetch events and apply remaining filters
507        let mut results: Vec<Event> = offsets
508            .iter()
509            .filter_map(|&offset| events.get(offset).cloned())
510            .filter(|event| self.apply_filters(event, &request))
511            .collect();
512
513        // Sort by timestamp (ascending)
514        results.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
515
516        // Apply limit
517        if let Some(limit) = request.limit {
518            results.truncate(limit);
519        }
520
521        // Record query results count (v0.6 feature)
522        self.metrics
523            .query_results_total
524            .with_label_values(&[query_type])
525            .inc_by(results.len() as u64);
526
527        timer.observe_duration();
528
529        Ok(results)
530    }
531
532    /// Filter index entries based on query parameters
533    fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
534        entries
535            .into_iter()
536            .filter(|entry| {
537                // Time filters
538                if let Some(as_of) = request.as_of {
539                    if entry.timestamp > as_of {
540                        return false;
541                    }
542                }
543                if let Some(since) = request.since {
544                    if entry.timestamp < since {
545                        return false;
546                    }
547                }
548                if let Some(until) = request.until {
549                    if entry.timestamp > until {
550                        return false;
551                    }
552                }
553                true
554            })
555            .map(|entry| entry.offset)
556            .collect()
557    }
558
559    /// Apply filters to an event
560    fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
561        // Additional type filter if entity was primary
562        if request.entity_id.is_some() {
563            if let Some(ref event_type) = request.event_type {
564                if event.event_type_str() != event_type {
565                    return false;
566                }
567            }
568        }
569
570        true
571    }
572
573    /// Reconstruct entity state as of a specific timestamp
574    /// v0.2: Now uses snapshots for fast reconstruction
575    pub fn reconstruct_state(
576        &self,
577        entity_id: &str,
578        as_of: Option<DateTime<Utc>>,
579    ) -> Result<serde_json::Value> {
580        // Try to find a snapshot to use as a base (v0.2 optimization)
581        let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
582            // Get snapshot closest to requested time
583            if let Some(snapshot) = self
584                .snapshot_manager
585                .get_snapshot_as_of(entity_id, as_of_time)
586            {
587                tracing::debug!(
588                    "Using snapshot from {} for entity {} (saved {} events)",
589                    snapshot.as_of,
590                    entity_id,
591                    snapshot.event_count
592                );
593                (snapshot.state.clone(), Some(snapshot.as_of))
594            } else {
595                (serde_json::json!({}), None)
596            }
597        } else {
598            // Get latest snapshot for current state
599            if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
600                tracing::debug!(
601                    "Using latest snapshot from {} for entity {}",
602                    snapshot.as_of,
603                    entity_id
604                );
605                (snapshot.state.clone(), Some(snapshot.as_of))
606            } else {
607                (serde_json::json!({}), None)
608            }
609        };
610
611        // Query events after the snapshot (or all if no snapshot)
612        let events = self.query(QueryEventsRequest {
613            entity_id: Some(entity_id.to_string()),
614            event_type: None,
615            tenant_id: None,
616            as_of,
617            since: since_timestamp,
618            until: None,
619            limit: None,
620        })?;
621
622        // If no events and no snapshot, entity not found
623        if events.is_empty() && since_timestamp.is_none() {
624            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
625        }
626
627        // Merge events on top of snapshot (or from scratch if no snapshot)
628        let mut merged_state = merged_state;
629        for event in &events {
630            if let serde_json::Value::Object(ref mut state_map) = merged_state {
631                if let serde_json::Value::Object(ref payload_map) = event.payload {
632                    for (key, value) in payload_map {
633                        state_map.insert(key.clone(), value.clone());
634                    }
635                }
636            }
637        }
638
639        // Wrap with metadata
640        let state = serde_json::json!({
641            "entity_id": entity_id,
642            "last_updated": events.last().map(|e| e.timestamp),
643            "event_count": events.len(),
644            "as_of": as_of,
645            "current_state": merged_state,
646            "history": events.iter().map(|e| {
647                serde_json::json!({
648                    "event_id": e.id,
649                    "type": e.event_type,
650                    "timestamp": e.timestamp,
651                    "payload": e.payload
652                })
653            }).collect::<Vec<_>>()
654        });
655
656        Ok(state)
657    }
658
659    /// Get snapshot from projection (faster than reconstructing)
660    pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
661        let projections = self.projections.read();
662
663        if let Some(snapshot_projection) = projections.get_projection("entity_snapshots") {
664            if let Some(state) = snapshot_projection.get_state(entity_id) {
665                return Ok(serde_json::json!({
666                    "entity_id": entity_id,
667                    "snapshot": state,
668                    "from_projection": "entity_snapshots"
669                }));
670            }
671        }
672
673        Err(AllSourceError::EntityNotFound(entity_id.to_string()))
674    }
675
676    /// Get statistics about the event store
677    pub fn stats(&self) -> StoreStats {
678        let events = self.events.read();
679        let index_stats = self.index.stats();
680
681        StoreStats {
682            total_events: events.len(),
683            total_entities: index_stats.total_entities,
684            total_event_types: index_stats.total_event_types,
685            total_ingested: *self.total_ingested.read(),
686        }
687    }
688}
689
690/// Configuration for EventStore
691#[derive(Debug, Clone)]
692pub struct EventStoreConfig {
693    /// Optional directory for persistent Parquet storage (v0.2 feature)
694    pub storage_dir: Option<PathBuf>,
695
696    /// Snapshot configuration (v0.2 feature)
697    pub snapshot_config: SnapshotConfig,
698
699    /// Optional directory for WAL (Write-Ahead Log) (v0.2 feature)
700    pub wal_dir: Option<PathBuf>,
701
702    /// WAL configuration (v0.2 feature)
703    pub wal_config: WALConfig,
704
705    /// Compaction configuration (v0.2 feature)
706    pub compaction_config: CompactionConfig,
707
708    /// Schema registry configuration (v0.5 feature)
709    pub schema_registry_config: SchemaRegistryConfig,
710}
711
712impl Default for EventStoreConfig {
713    fn default() -> Self {
714        Self {
715            storage_dir: None,
716            snapshot_config: SnapshotConfig::default(),
717            wal_dir: None,
718            wal_config: WALConfig::default(),
719            compaction_config: CompactionConfig::default(),
720            schema_registry_config: SchemaRegistryConfig::default(),
721        }
722    }
723}
724
725impl EventStoreConfig {
726    /// Create config with persistent storage enabled
727    pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
728        Self {
729            storage_dir: Some(storage_dir.into()),
730            snapshot_config: SnapshotConfig::default(),
731            wal_dir: None,
732            wal_config: WALConfig::default(),
733            compaction_config: CompactionConfig::default(),
734            schema_registry_config: SchemaRegistryConfig::default(),
735        }
736    }
737
738    /// Create config with custom snapshot settings
739    pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
740        Self {
741            storage_dir: None,
742            snapshot_config,
743            wal_dir: None,
744            wal_config: WALConfig::default(),
745            compaction_config: CompactionConfig::default(),
746            schema_registry_config: SchemaRegistryConfig::default(),
747        }
748    }
749
750    /// Create config with WAL enabled
751    pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
752        Self {
753            storage_dir: None,
754            snapshot_config: SnapshotConfig::default(),
755            wal_dir: Some(wal_dir.into()),
756            wal_config,
757            compaction_config: CompactionConfig::default(),
758            schema_registry_config: SchemaRegistryConfig::default(),
759        }
760    }
761
762    /// Create config with both persistence and snapshots
763    pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
764        Self {
765            storage_dir: Some(storage_dir.into()),
766            snapshot_config,
767            wal_dir: None,
768            wal_config: WALConfig::default(),
769            compaction_config: CompactionConfig::default(),
770            schema_registry_config: SchemaRegistryConfig::default(),
771        }
772    }
773
774    /// Create production config with all features enabled
775    pub fn production(
776        storage_dir: impl Into<PathBuf>,
777        wal_dir: impl Into<PathBuf>,
778        snapshot_config: SnapshotConfig,
779        wal_config: WALConfig,
780        compaction_config: CompactionConfig,
781    ) -> Self {
782        Self {
783            storage_dir: Some(storage_dir.into()),
784            snapshot_config,
785            wal_dir: Some(wal_dir.into()),
786            wal_config,
787            compaction_config,
788            schema_registry_config: SchemaRegistryConfig::default(),
789        }
790    }
791}
792
793#[derive(Debug, serde::Serialize)]
794pub struct StoreStats {
795    pub total_events: usize,
796    pub total_entities: usize,
797    pub total_event_types: usize,
798    pub total_ingested: u64,
799}
800
801impl Default for EventStore {
802    fn default() -> Self {
803        Self::new()
804    }
805}
806
807// Tests for store are covered in integration tests