allsource_core/
store.rs

1use crate::application::dto::QueryEventsRequest;
2use crate::application::services::pipeline::PipelineManager;
3use crate::application::services::projection::{
4    EntitySnapshotProjection, EventCounterProjection, ProjectionManager,
5};
6use crate::application::services::replay::ReplayManager;
7use crate::application::services::schema::{SchemaRegistry, SchemaRegistryConfig};
8use crate::domain::entities::Event;
9use crate::error::{AllSourceError, Result};
10use crate::infrastructure::observability::metrics::MetricsRegistry;
11use crate::infrastructure::persistence::compaction::{CompactionConfig, CompactionManager};
12use crate::infrastructure::persistence::index::{EventIndex, IndexEntry};
13use crate::infrastructure::persistence::snapshot::{SnapshotConfig, SnapshotManager, SnapshotType};
14use crate::infrastructure::persistence::storage::ParquetStorage;
15use crate::infrastructure::persistence::wal::{WALConfig, WriteAheadLog};
16use crate::infrastructure::web::websocket::WebSocketManager;
17use chrono::{DateTime, Utc};
18use dashmap::DashMap;
19use parking_lot::RwLock;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23/// High-performance event store with columnar storage
24pub struct EventStore {
25    /// In-memory event storage
26    events: Arc<RwLock<Vec<Event>>>,
27
28    /// High-performance concurrent index
29    index: Arc<EventIndex>,
30
31    /// Projection manager for real-time aggregations
32    pub(crate) projections: Arc<RwLock<ProjectionManager>>,
33
34    /// Optional persistent storage (v0.2 feature)
35    storage: Option<Arc<RwLock<ParquetStorage>>>,
36
37    /// WebSocket manager for real-time event streaming (v0.2 feature)
38    websocket_manager: Arc<WebSocketManager>,
39
40    /// Snapshot manager for fast state recovery (v0.2 feature)
41    snapshot_manager: Arc<SnapshotManager>,
42
43    /// Write-Ahead Log for durability (v0.2 feature)
44    wal: Option<Arc<WriteAheadLog>>,
45
46    /// Compaction manager for Parquet optimization (v0.2 feature)
47    compaction_manager: Option<Arc<CompactionManager>>,
48
49    /// Schema registry for event validation (v0.5 feature)
50    schema_registry: Arc<SchemaRegistry>,
51
52    /// Replay manager for event replay and projection rebuilding (v0.5 feature)
53    replay_manager: Arc<ReplayManager>,
54
55    /// Pipeline manager for stream processing (v0.5 feature)
56    pipeline_manager: Arc<PipelineManager>,
57
58    /// Prometheus metrics registry (v0.6 feature)
59    metrics: Arc<MetricsRegistry>,
60
61    /// Total events ingested (for metrics)
62    total_ingested: Arc<RwLock<u64>>,
63
64    /// Projection state cache for Query Service integration (v0.7 feature)
65    /// Key format: "{projection_name}:{entity_id}"
66    /// This DashMap provides O(1) access with ~11.9 μs latency
67    projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
68}
69
70impl EventStore {
71    /// Create a new in-memory event store
72    pub fn new() -> Self {
73        Self::with_config(EventStoreConfig::default())
74    }
75
76    /// Create event store with custom configuration
77    pub fn with_config(config: EventStoreConfig) -> Self {
78        let mut projections = ProjectionManager::new();
79
80        // Register built-in projections
81        projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
82        projections.register(Arc::new(EventCounterProjection::new("event_counters")));
83
84        // Initialize persistent storage if configured
85        let storage = config
86            .storage_dir
87            .as_ref()
88            .and_then(|dir| match ParquetStorage::new(dir) {
89                Ok(storage) => {
90                    tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
91                    Some(Arc::new(RwLock::new(storage)))
92                }
93                Err(e) => {
94                    tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
95                    None
96                }
97            });
98
99        // Initialize WAL if configured (v0.2 feature)
100        let wal = config.wal_dir.as_ref().and_then(|dir| {
101            match WriteAheadLog::new(dir, config.wal_config.clone()) {
102                Ok(wal) => {
103                    tracing::info!("✅ WAL enabled at: {}", dir.display());
104                    Some(Arc::new(wal))
105                }
106                Err(e) => {
107                    tracing::error!("❌ Failed to initialize WAL: {}", e);
108                    None
109                }
110            }
111        });
112
113        // Initialize compaction manager if Parquet storage is enabled (v0.2 feature)
114        let compaction_manager = config.storage_dir.as_ref().map(|dir| {
115            let manager = CompactionManager::new(dir, config.compaction_config.clone());
116            Arc::new(manager)
117        });
118
119        // Initialize schema registry (v0.5 feature)
120        let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
121        tracing::info!("✅ Schema registry enabled");
122
123        // Initialize replay manager (v0.5 feature)
124        let replay_manager = Arc::new(ReplayManager::new());
125        tracing::info!("✅ Replay manager enabled");
126
127        // Initialize pipeline manager (v0.5 feature)
128        let pipeline_manager = Arc::new(PipelineManager::new());
129        tracing::info!("✅ Pipeline manager enabled");
130
131        // Initialize metrics registry (v0.6 feature)
132        let metrics = MetricsRegistry::new();
133        tracing::info!("✅ Prometheus metrics registry initialized");
134
135        // Initialize projection state cache (v0.7 feature)
136        let projection_state_cache = Arc::new(DashMap::new());
137        tracing::info!("✅ Projection state cache initialized");
138
139        let store = Self {
140            events: Arc::new(RwLock::new(Vec::new())),
141            index: Arc::new(EventIndex::new()),
142            projections: Arc::new(RwLock::new(projections)),
143            storage,
144            websocket_manager: Arc::new(WebSocketManager::new()),
145            snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
146            wal,
147            compaction_manager,
148            schema_registry,
149            replay_manager,
150            pipeline_manager,
151            metrics,
152            total_ingested: Arc::new(RwLock::new(0)),
153            projection_state_cache,
154        };
155
156        // Recover from WAL first (most recent data)
157        let mut wal_recovered = false;
158        if let Some(ref wal) = store.wal {
159            match wal.recover() {
160                Ok(recovered_events) if !recovered_events.is_empty() => {
161                    tracing::info!(
162                        "🔄 Recovering {} events from WAL...",
163                        recovered_events.len()
164                    );
165
166                    for event in recovered_events {
167                        // Re-index and process events from WAL
168                        let offset = store.events.read().len();
169                        if let Err(e) = store.index.index_event(
170                            event.id,
171                            event.entity_id_str(),
172                            event.event_type_str(),
173                            event.timestamp,
174                            offset,
175                        ) {
176                            tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
177                        }
178
179                        if let Err(e) = store.projections.read().process_event(&event) {
180                            tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
181                        }
182
183                        store.events.write().push(event);
184                    }
185
186                    let total = store.events.read().len();
187                    *store.total_ingested.write() = total as u64;
188                    tracing::info!("✅ Successfully recovered {} events from WAL", total);
189
190                    // After successful recovery, checkpoint to Parquet if enabled
191                    if store.storage.is_some() {
192                        tracing::info!("📸 Checkpointing WAL to Parquet storage...");
193                        if let Err(e) = store.flush_storage() {
194                            tracing::error!("Failed to checkpoint to Parquet: {}", e);
195                        } else if let Err(e) = wal.truncate() {
196                            tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
197                        } else {
198                            tracing::info!("✅ WAL checkpointed and truncated");
199                        }
200                    }
201
202                    wal_recovered = true;
203                }
204                Ok(_) => {
205                    tracing::debug!("No events to recover from WAL");
206                }
207                Err(e) => {
208                    tracing::error!("❌ WAL recovery failed: {}", e);
209                }
210            }
211        }
212
213        // Load persisted events from Parquet only if we didn't recover from WAL
214        // (to avoid loading the same events twice after WAL checkpoint)
215        if !wal_recovered {
216            if let Some(ref storage) = store.storage {
217                if let Ok(persisted_events) = storage.read().load_all_events() {
218                    tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
219
220                    for event in persisted_events {
221                        // Re-index loaded events
222                        let offset = store.events.read().len();
223                        if let Err(e) = store.index.index_event(
224                            event.id,
225                            event.entity_id_str(),
226                            event.event_type_str(),
227                            event.timestamp,
228                            offset,
229                        ) {
230                            tracing::error!("Failed to re-index event {}: {}", event.id, e);
231                        }
232
233                        // Re-process through projections
234                        if let Err(e) = store.projections.read().process_event(&event) {
235                            tracing::error!("Failed to re-process event {}: {}", event.id, e);
236                        }
237
238                        store.events.write().push(event);
239                    }
240
241                    let total = store.events.read().len();
242                    *store.total_ingested.write() = total as u64;
243                    tracing::info!("✅ Successfully loaded {} events from storage", total);
244                }
245            }
246        }
247
248        store
249    }
250
251    /// Ingest a new event into the store
252    pub fn ingest(&self, event: Event) -> Result<()> {
253        // Start metrics timer (v0.6 feature)
254        let timer = self.metrics.ingestion_duration_seconds.start_timer();
255
256        // Validate event
257        let validation_result = self.validate_event(&event);
258        if let Err(e) = validation_result {
259            // Record ingestion error
260            self.metrics.ingestion_errors_total.inc();
261            timer.observe_duration();
262            return Err(e);
263        }
264
265        // Write to WAL FIRST for durability (v0.2 feature)
266        // This ensures event is persisted before processing
267        if let Some(ref wal) = self.wal {
268            if let Err(e) = wal.append(event.clone()) {
269                self.metrics.ingestion_errors_total.inc();
270                timer.observe_duration();
271                return Err(e);
272            }
273        }
274
275        let mut events = self.events.write();
276        let offset = events.len();
277
278        // Index the event
279        self.index.index_event(
280            event.id,
281            event.entity_id_str(),
282            event.event_type_str(),
283            event.timestamp,
284            offset,
285        )?;
286
287        // Process through projections
288        let projections = self.projections.read();
289        projections.process_event(&event)?;
290        drop(projections); // Release lock
291
292        // Process through pipelines (v0.5 feature)
293        // Pipelines can transform, filter, and aggregate events in real-time
294        let pipeline_results = self.pipeline_manager.process_event(&event);
295        if !pipeline_results.is_empty() {
296            tracing::debug!(
297                "Event {} processed by {} pipeline(s)",
298                event.id,
299                pipeline_results.len()
300            );
301            // Pipeline results could be stored, emitted, or forwarded elsewhere
302            // For now, we just log them for observability
303            for (pipeline_id, result) in pipeline_results {
304                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
305            }
306        }
307
308        // Persist to Parquet storage if enabled (v0.2)
309        if let Some(ref storage) = self.storage {
310            let mut storage = storage.write();
311            storage.append_event(event.clone())?;
312        }
313
314        // Store the event in memory
315        events.push(event.clone());
316        let total_events = events.len();
317        drop(events); // Release lock early
318
319        // Broadcast to WebSocket clients (v0.2 feature)
320        self.websocket_manager
321            .broadcast_event(Arc::new(event.clone()));
322
323        // Check if automatic snapshot should be created (v0.2 feature)
324        self.check_auto_snapshot(event.entity_id_str(), &event);
325
326        // Update metrics (v0.6 feature)
327        self.metrics.events_ingested_total.inc();
328        self.metrics
329            .events_ingested_by_type
330            .with_label_values(&[event.event_type_str()])
331            .inc();
332        self.metrics.storage_events_total.set(total_events as i64);
333
334        // Update legacy total counter
335        let mut total = self.total_ingested.write();
336        *total += 1;
337
338        timer.observe_duration();
339
340        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
341
342        Ok(())
343    }
344
345    /// Get the WebSocket manager for this store
346    pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
347        Arc::clone(&self.websocket_manager)
348    }
349
350    /// Get the snapshot manager for this store
351    pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
352        Arc::clone(&self.snapshot_manager)
353    }
354
355    /// Get the compaction manager for this store
356    pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
357        self.compaction_manager.as_ref().map(Arc::clone)
358    }
359
360    /// Get the schema registry for this store (v0.5 feature)
361    pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
362        Arc::clone(&self.schema_registry)
363    }
364
365    /// Get the replay manager for this store (v0.5 feature)
366    pub fn replay_manager(&self) -> Arc<ReplayManager> {
367        Arc::clone(&self.replay_manager)
368    }
369
370    /// Get the pipeline manager for this store (v0.5 feature)
371    pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
372        Arc::clone(&self.pipeline_manager)
373    }
374
375    /// Get the metrics registry for this store (v0.6 feature)
376    pub fn metrics(&self) -> Arc<MetricsRegistry> {
377        Arc::clone(&self.metrics)
378    }
379
380    /// Get the projection manager for this store (v0.7 feature)
381    pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
382        self.projections.read()
383    }
384
385    /// Get the projection state cache for this store (v0.7 feature)
386    /// Used by Elixir Query Service for state synchronization
387    pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
388        Arc::clone(&self.projection_state_cache)
389    }
390
391    /// Manually flush any pending events to persistent storage
392    pub fn flush_storage(&self) -> Result<()> {
393        if let Some(ref storage) = self.storage {
394            let mut storage = storage.write();
395            storage.flush()?;
396            tracing::info!("✅ Flushed events to persistent storage");
397        }
398        Ok(())
399    }
400
401    /// Manually create a snapshot for an entity
402    pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
403        // Get all events for this entity
404        let events = self.query(QueryEventsRequest {
405            entity_id: Some(entity_id.to_string()),
406            event_type: None,
407            tenant_id: None,
408            as_of: None,
409            since: None,
410            until: None,
411            limit: None,
412        })?;
413
414        if events.is_empty() {
415            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
416        }
417
418        // Build current state
419        let mut state = serde_json::json!({});
420        for event in &events {
421            if let serde_json::Value::Object(ref mut state_map) = state {
422                if let serde_json::Value::Object(ref payload_map) = event.payload {
423                    for (key, value) in payload_map {
424                        state_map.insert(key.clone(), value.clone());
425                    }
426                }
427            }
428        }
429
430        let last_event = events.last().unwrap();
431        self.snapshot_manager.create_snapshot(
432            entity_id.to_string(),
433            state,
434            last_event.timestamp,
435            events.len(),
436            SnapshotType::Manual,
437        )?;
438
439        Ok(())
440    }
441
442    /// Check and create automatic snapshots if needed
443    fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
444        // Count events for this entity
445        let entity_event_count = self
446            .index
447            .get_by_entity(entity_id)
448            .map(|entries| entries.len())
449            .unwrap_or(0);
450
451        if self.snapshot_manager.should_create_snapshot(
452            entity_id,
453            entity_event_count,
454            event.timestamp,
455        ) {
456            // Create snapshot in background (don't block ingestion)
457            if let Err(e) = self.create_snapshot(entity_id) {
458                tracing::warn!(
459                    "Failed to create automatic snapshot for {}: {}",
460                    entity_id,
461                    e
462                );
463            }
464        }
465    }
466
467    /// Validate an event before ingestion
468    fn validate_event(&self, event: &Event) -> Result<()> {
469        // EntityId and EventType value objects already validate non-empty in their constructors
470        // So these checks are now redundant, but we keep them for explicit validation
471        if event.entity_id_str().is_empty() {
472            return Err(AllSourceError::ValidationError(
473                "entity_id cannot be empty".to_string(),
474            ));
475        }
476
477        if event.event_type_str().is_empty() {
478            return Err(AllSourceError::ValidationError(
479                "event_type cannot be empty".to_string(),
480            ));
481        }
482
483        Ok(())
484    }
485
486    /// Query events based on filters (optimized with indices)
487    pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
488        // Determine query type for metrics (v0.6 feature)
489        let query_type = if request.entity_id.is_some() {
490            "entity"
491        } else if request.event_type.is_some() {
492            "type"
493        } else {
494            "full_scan"
495        };
496
497        // Start metrics timer (v0.6 feature)
498        let timer = self
499            .metrics
500            .query_duration_seconds
501            .with_label_values(&[query_type])
502            .start_timer();
503
504        // Increment query counter (v0.6 feature)
505        self.metrics
506            .queries_total
507            .with_label_values(&[query_type])
508            .inc();
509
510        let events = self.events.read();
511
512        // Use index for fast lookups
513        let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
514            // Use entity index
515            self.index
516                .get_by_entity(entity_id)
517                .map(|entries| self.filter_entries(entries, &request))
518                .unwrap_or_default()
519        } else if let Some(event_type) = &request.event_type {
520            // Use type index
521            self.index
522                .get_by_type(event_type)
523                .map(|entries| self.filter_entries(entries, &request))
524                .unwrap_or_default()
525        } else {
526            // Full scan (less efficient but necessary for complex queries)
527            (0..events.len()).collect()
528        };
529
530        // Fetch events and apply remaining filters
531        let mut results: Vec<Event> = offsets
532            .iter()
533            .filter_map(|&offset| events.get(offset).cloned())
534            .filter(|event| self.apply_filters(event, &request))
535            .collect();
536
537        // Sort by timestamp (ascending)
538        results.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
539
540        // Apply limit
541        if let Some(limit) = request.limit {
542            results.truncate(limit);
543        }
544
545        // Record query results count (v0.6 feature)
546        self.metrics
547            .query_results_total
548            .with_label_values(&[query_type])
549            .inc_by(results.len() as u64);
550
551        timer.observe_duration();
552
553        Ok(results)
554    }
555
556    /// Filter index entries based on query parameters
557    fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
558        entries
559            .into_iter()
560            .filter(|entry| {
561                // Time filters
562                if let Some(as_of) = request.as_of {
563                    if entry.timestamp > as_of {
564                        return false;
565                    }
566                }
567                if let Some(since) = request.since {
568                    if entry.timestamp < since {
569                        return false;
570                    }
571                }
572                if let Some(until) = request.until {
573                    if entry.timestamp > until {
574                        return false;
575                    }
576                }
577                true
578            })
579            .map(|entry| entry.offset)
580            .collect()
581    }
582
583    /// Apply filters to an event
584    fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
585        // Additional type filter if entity was primary
586        if request.entity_id.is_some() {
587            if let Some(ref event_type) = request.event_type {
588                if event.event_type_str() != event_type {
589                    return false;
590                }
591            }
592        }
593
594        true
595    }
596
597    /// Reconstruct entity state as of a specific timestamp
598    /// v0.2: Now uses snapshots for fast reconstruction
599    pub fn reconstruct_state(
600        &self,
601        entity_id: &str,
602        as_of: Option<DateTime<Utc>>,
603    ) -> Result<serde_json::Value> {
604        // Try to find a snapshot to use as a base (v0.2 optimization)
605        let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
606            // Get snapshot closest to requested time
607            if let Some(snapshot) = self
608                .snapshot_manager
609                .get_snapshot_as_of(entity_id, as_of_time)
610            {
611                tracing::debug!(
612                    "Using snapshot from {} for entity {} (saved {} events)",
613                    snapshot.as_of,
614                    entity_id,
615                    snapshot.event_count
616                );
617                (snapshot.state.clone(), Some(snapshot.as_of))
618            } else {
619                (serde_json::json!({}), None)
620            }
621        } else {
622            // Get latest snapshot for current state
623            if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
624                tracing::debug!(
625                    "Using latest snapshot from {} for entity {}",
626                    snapshot.as_of,
627                    entity_id
628                );
629                (snapshot.state.clone(), Some(snapshot.as_of))
630            } else {
631                (serde_json::json!({}), None)
632            }
633        };
634
635        // Query events after the snapshot (or all if no snapshot)
636        let events = self.query(QueryEventsRequest {
637            entity_id: Some(entity_id.to_string()),
638            event_type: None,
639            tenant_id: None,
640            as_of,
641            since: since_timestamp,
642            until: None,
643            limit: None,
644        })?;
645
646        // If no events and no snapshot, entity not found
647        if events.is_empty() && since_timestamp.is_none() {
648            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
649        }
650
651        // Merge events on top of snapshot (or from scratch if no snapshot)
652        let mut merged_state = merged_state;
653        for event in &events {
654            if let serde_json::Value::Object(ref mut state_map) = merged_state {
655                if let serde_json::Value::Object(ref payload_map) = event.payload {
656                    for (key, value) in payload_map {
657                        state_map.insert(key.clone(), value.clone());
658                    }
659                }
660            }
661        }
662
663        // Wrap with metadata
664        let state = serde_json::json!({
665            "entity_id": entity_id,
666            "last_updated": events.last().map(|e| e.timestamp),
667            "event_count": events.len(),
668            "as_of": as_of,
669            "current_state": merged_state,
670            "history": events.iter().map(|e| {
671                serde_json::json!({
672                    "event_id": e.id,
673                    "type": e.event_type,
674                    "timestamp": e.timestamp,
675                    "payload": e.payload
676                })
677            }).collect::<Vec<_>>()
678        });
679
680        Ok(state)
681    }
682
683    /// Get snapshot from projection (faster than reconstructing)
684    pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
685        let projections = self.projections.read();
686
687        if let Some(snapshot_projection) = projections.get_projection("entity_snapshots") {
688            if let Some(state) = snapshot_projection.get_state(entity_id) {
689                return Ok(serde_json::json!({
690                    "entity_id": entity_id,
691                    "snapshot": state,
692                    "from_projection": "entity_snapshots"
693                }));
694            }
695        }
696
697        Err(AllSourceError::EntityNotFound(entity_id.to_string()))
698    }
699
700    /// Get statistics about the event store
701    pub fn stats(&self) -> StoreStats {
702        let events = self.events.read();
703        let index_stats = self.index.stats();
704
705        StoreStats {
706            total_events: events.len(),
707            total_entities: index_stats.total_entities,
708            total_event_types: index_stats.total_event_types,
709            total_ingested: *self.total_ingested.read(),
710        }
711    }
712}
713
714/// Configuration for EventStore
715#[derive(Debug, Clone)]
716pub struct EventStoreConfig {
717    /// Optional directory for persistent Parquet storage (v0.2 feature)
718    pub storage_dir: Option<PathBuf>,
719
720    /// Snapshot configuration (v0.2 feature)
721    pub snapshot_config: SnapshotConfig,
722
723    /// Optional directory for WAL (Write-Ahead Log) (v0.2 feature)
724    pub wal_dir: Option<PathBuf>,
725
726    /// WAL configuration (v0.2 feature)
727    pub wal_config: WALConfig,
728
729    /// Compaction configuration (v0.2 feature)
730    pub compaction_config: CompactionConfig,
731
732    /// Schema registry configuration (v0.5 feature)
733    pub schema_registry_config: SchemaRegistryConfig,
734}
735
736impl Default for EventStoreConfig {
737    fn default() -> Self {
738        Self {
739            storage_dir: None,
740            snapshot_config: SnapshotConfig::default(),
741            wal_dir: None,
742            wal_config: WALConfig::default(),
743            compaction_config: CompactionConfig::default(),
744            schema_registry_config: SchemaRegistryConfig::default(),
745        }
746    }
747}
748
749impl EventStoreConfig {
750    /// Create config with persistent storage enabled
751    pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
752        Self {
753            storage_dir: Some(storage_dir.into()),
754            snapshot_config: SnapshotConfig::default(),
755            wal_dir: None,
756            wal_config: WALConfig::default(),
757            compaction_config: CompactionConfig::default(),
758            schema_registry_config: SchemaRegistryConfig::default(),
759        }
760    }
761
762    /// Create config with custom snapshot settings
763    pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
764        Self {
765            storage_dir: None,
766            snapshot_config,
767            wal_dir: None,
768            wal_config: WALConfig::default(),
769            compaction_config: CompactionConfig::default(),
770            schema_registry_config: SchemaRegistryConfig::default(),
771        }
772    }
773
774    /// Create config with WAL enabled
775    pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
776        Self {
777            storage_dir: None,
778            snapshot_config: SnapshotConfig::default(),
779            wal_dir: Some(wal_dir.into()),
780            wal_config,
781            compaction_config: CompactionConfig::default(),
782            schema_registry_config: SchemaRegistryConfig::default(),
783        }
784    }
785
786    /// Create config with both persistence and snapshots
787    pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
788        Self {
789            storage_dir: Some(storage_dir.into()),
790            snapshot_config,
791            wal_dir: None,
792            wal_config: WALConfig::default(),
793            compaction_config: CompactionConfig::default(),
794            schema_registry_config: SchemaRegistryConfig::default(),
795        }
796    }
797
798    /// Create production config with all features enabled
799    pub fn production(
800        storage_dir: impl Into<PathBuf>,
801        wal_dir: impl Into<PathBuf>,
802        snapshot_config: SnapshotConfig,
803        wal_config: WALConfig,
804        compaction_config: CompactionConfig,
805    ) -> Self {
806        Self {
807            storage_dir: Some(storage_dir.into()),
808            snapshot_config,
809            wal_dir: Some(wal_dir.into()),
810            wal_config,
811            compaction_config,
812            schema_registry_config: SchemaRegistryConfig::default(),
813        }
814    }
815}
816
817#[derive(Debug, serde::Serialize)]
818pub struct StoreStats {
819    pub total_events: usize,
820    pub total_entities: usize,
821    pub total_event_types: usize,
822    pub total_ingested: u64,
823}
824
825impl Default for EventStore {
826    fn default() -> Self {
827        Self::new()
828    }
829}
830
831// Tests for store are covered in integration tests