Skip to main content

allsource_core/
store.rs

1use crate::{
2    application::{
3        dto::QueryEventsRequest,
4        services::{
5            exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
6            pipeline::PipelineManager,
7            projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
8            replay::ReplayManager,
9            schema::{SchemaRegistry, SchemaRegistryConfig},
10            schema_evolution::SchemaEvolutionManager,
11            webhook::WebhookRegistry,
12        },
13    },
14    domain::entities::Event,
15    error::{AllSourceError, Result},
16    infrastructure::{
17        observability::metrics::MetricsRegistry,
18        persistence::{
19            compaction::{CompactionConfig, CompactionManager},
20            index::{EventIndex, IndexEntry},
21            snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
22            storage::ParquetStorage,
23            wal::{WALConfig, WriteAheadLog},
24        },
25        query::geospatial::GeoIndex,
26        web::websocket::WebSocketManager,
27    },
28};
29use chrono::{DateTime, Utc};
30use dashmap::DashMap;
31use parking_lot::RwLock;
32use std::{path::PathBuf, sync::Arc};
33use tokio::sync::mpsc;
34
35/// High-performance event store with columnar storage
36pub struct EventStore {
37    /// In-memory event storage
38    events: Arc<RwLock<Vec<Event>>>,
39
40    /// High-performance concurrent index
41    index: Arc<EventIndex>,
42
43    /// Projection manager for real-time aggregations
44    pub(crate) projections: Arc<RwLock<ProjectionManager>>,
45
46    /// Optional persistent storage (v0.2 feature)
47    storage: Option<Arc<RwLock<ParquetStorage>>>,
48
49    /// WebSocket manager for real-time event streaming (v0.2 feature)
50    websocket_manager: Arc<WebSocketManager>,
51
52    /// Snapshot manager for fast state recovery (v0.2 feature)
53    snapshot_manager: Arc<SnapshotManager>,
54
55    /// Write-Ahead Log for durability (v0.2 feature)
56    wal: Option<Arc<WriteAheadLog>>,
57
58    /// Compaction manager for Parquet optimization (v0.2 feature)
59    compaction_manager: Option<Arc<CompactionManager>>,
60
61    /// Schema registry for event validation (v0.5 feature)
62    schema_registry: Arc<SchemaRegistry>,
63
64    /// Replay manager for event replay and projection rebuilding (v0.5 feature)
65    replay_manager: Arc<ReplayManager>,
66
67    /// Pipeline manager for stream processing (v0.5 feature)
68    pipeline_manager: Arc<PipelineManager>,
69
70    /// Prometheus metrics registry (v0.6 feature)
71    metrics: Arc<MetricsRegistry>,
72
73    /// Total events ingested (for metrics)
74    total_ingested: Arc<RwLock<u64>>,
75
76    /// Projection state cache for Query Service integration (v0.7 feature)
77    /// Key format: "{projection_name}:{entity_id}"
78    /// This DashMap provides O(1) access with ~11.9 μs latency
79    projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
80
81    /// Webhook registry for outbound event delivery (v0.11 feature)
82    webhook_registry: Arc<WebhookRegistry>,
83
84    /// Channel sender for async webhook delivery tasks
85    webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
86
87    /// Geospatial index for coordinate-based queries (v2.0 feature)
88    geo_index: Arc<GeoIndex>,
89
90    /// Exactly-once processing registry (v2.0 feature)
91    exactly_once: Arc<ExactlyOnceRegistry>,
92
93    /// Autonomous schema evolution manager (v2.0 feature)
94    schema_evolution: Arc<SchemaEvolutionManager>,
95}
96
97/// A task queued for async webhook delivery
98#[derive(Debug, Clone)]
99pub struct WebhookDeliveryTask {
100    pub webhook: crate::application::services::webhook::WebhookSubscription,
101    pub event: Event,
102}
103
104impl EventStore {
105    /// Create a new in-memory event store
106    pub fn new() -> Self {
107        Self::with_config(EventStoreConfig::default())
108    }
109
110    /// Create event store with custom configuration
111    pub fn with_config(config: EventStoreConfig) -> Self {
112        let mut projections = ProjectionManager::new();
113
114        // Register built-in projections
115        projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
116        projections.register(Arc::new(EventCounterProjection::new("event_counters")));
117
118        // Initialize persistent storage if configured
119        let storage = config
120            .storage_dir
121            .as_ref()
122            .and_then(|dir| match ParquetStorage::new(dir) {
123                Ok(storage) => {
124                    tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
125                    Some(Arc::new(RwLock::new(storage)))
126                }
127                Err(e) => {
128                    tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
129                    None
130                }
131            });
132
133        // Initialize WAL if configured (v0.2 feature)
134        let wal = config.wal_dir.as_ref().and_then(|dir| {
135            match WriteAheadLog::new(dir, config.wal_config.clone()) {
136                Ok(wal) => {
137                    tracing::info!("✅ WAL enabled at: {}", dir.display());
138                    Some(Arc::new(wal))
139                }
140                Err(e) => {
141                    tracing::error!("❌ Failed to initialize WAL: {}", e);
142                    None
143                }
144            }
145        });
146
147        // Initialize compaction manager if Parquet storage is enabled (v0.2 feature)
148        let compaction_manager = config.storage_dir.as_ref().map(|dir| {
149            let manager = CompactionManager::new(dir, config.compaction_config.clone());
150            Arc::new(manager)
151        });
152
153        // Initialize schema registry (v0.5 feature)
154        let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
155        tracing::info!("✅ Schema registry enabled");
156
157        // Initialize replay manager (v0.5 feature)
158        let replay_manager = Arc::new(ReplayManager::new());
159        tracing::info!("✅ Replay manager enabled");
160
161        // Initialize pipeline manager (v0.5 feature)
162        let pipeline_manager = Arc::new(PipelineManager::new());
163        tracing::info!("✅ Pipeline manager enabled");
164
165        // Initialize metrics registry (v0.6 feature)
166        let metrics = MetricsRegistry::new();
167        tracing::info!("✅ Prometheus metrics registry initialized");
168
169        // Initialize projection state cache (v0.7 feature)
170        let projection_state_cache = Arc::new(DashMap::new());
171        tracing::info!("✅ Projection state cache initialized");
172
173        // Initialize webhook registry (v0.11 feature)
174        let webhook_registry = Arc::new(WebhookRegistry::new());
175        tracing::info!("✅ Webhook registry initialized");
176
177        let store = Self {
178            events: Arc::new(RwLock::new(Vec::new())),
179            index: Arc::new(EventIndex::new()),
180            projections: Arc::new(RwLock::new(projections)),
181            storage,
182            websocket_manager: Arc::new(WebSocketManager::new()),
183            snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
184            wal,
185            compaction_manager,
186            schema_registry,
187            replay_manager,
188            pipeline_manager,
189            metrics,
190            total_ingested: Arc::new(RwLock::new(0)),
191            projection_state_cache,
192            webhook_registry,
193            webhook_tx: Arc::new(RwLock::new(None)),
194            geo_index: Arc::new(GeoIndex::new()),
195            exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
196            schema_evolution: Arc::new(SchemaEvolutionManager::new()),
197        };
198
199        // Recover from WAL first (most recent data)
200        let mut wal_recovered = false;
201        if let Some(ref wal) = store.wal {
202            match wal.recover() {
203                Ok(recovered_events) if !recovered_events.is_empty() => {
204                    tracing::info!(
205                        "🔄 Recovering {} events from WAL...",
206                        recovered_events.len()
207                    );
208
209                    for event in recovered_events {
210                        // Re-index and process events from WAL
211                        let offset = store.events.read().len();
212                        if let Err(e) = store.index.index_event(
213                            event.id,
214                            event.entity_id_str(),
215                            event.event_type_str(),
216                            event.timestamp,
217                            offset,
218                        ) {
219                            tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
220                        }
221
222                        if let Err(e) = store.projections.read().process_event(&event) {
223                            tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
224                        }
225
226                        store.events.write().push(event);
227                    }
228
229                    let total = store.events.read().len();
230                    *store.total_ingested.write() = total as u64;
231                    tracing::info!("✅ Successfully recovered {} events from WAL", total);
232
233                    // After successful recovery, checkpoint to Parquet if enabled
234                    if store.storage.is_some() {
235                        tracing::info!("📸 Checkpointing WAL to Parquet storage...");
236                        if let Err(e) = store.flush_storage() {
237                            tracing::error!("Failed to checkpoint to Parquet: {}", e);
238                        } else if let Err(e) = wal.truncate() {
239                            tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
240                        } else {
241                            tracing::info!("✅ WAL checkpointed and truncated");
242                        }
243                    }
244
245                    wal_recovered = true;
246                }
247                Ok(_) => {
248                    tracing::debug!("No events to recover from WAL");
249                }
250                Err(e) => {
251                    tracing::error!("❌ WAL recovery failed: {}", e);
252                }
253            }
254        }
255
256        // Load persisted events from Parquet only if we didn't recover from WAL
257        // (to avoid loading the same events twice after WAL checkpoint)
258        if !wal_recovered
259            && let Some(ref storage) = store.storage
260            && let Ok(persisted_events) = storage.read().load_all_events()
261        {
262            tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
263
264            for event in persisted_events {
265                // Re-index loaded events
266                let offset = store.events.read().len();
267                if let Err(e) = store.index.index_event(
268                    event.id,
269                    event.entity_id_str(),
270                    event.event_type_str(),
271                    event.timestamp,
272                    offset,
273                ) {
274                    tracing::error!("Failed to re-index event {}: {}", event.id, e);
275                }
276
277                // Re-process through projections
278                if let Err(e) = store.projections.read().process_event(&event) {
279                    tracing::error!("Failed to re-process event {}: {}", event.id, e);
280                }
281
282                store.events.write().push(event);
283            }
284
285            let total = store.events.read().len();
286            *store.total_ingested.write() = total as u64;
287            tracing::info!("✅ Successfully loaded {} events from storage", total);
288        }
289
290        store
291    }
292
293    /// Ingest a new event into the store
294    pub fn ingest(&self, event: Event) -> Result<()> {
295        // Start metrics timer (v0.6 feature)
296        let timer = self.metrics.ingestion_duration_seconds.start_timer();
297
298        // Validate event
299        let validation_result = self.validate_event(&event);
300        if let Err(e) = validation_result {
301            // Record ingestion error
302            self.metrics.ingestion_errors_total.inc();
303            timer.observe_duration();
304            return Err(e);
305        }
306
307        // Write to WAL FIRST for durability (v0.2 feature)
308        // This ensures event is persisted before processing
309        if let Some(ref wal) = self.wal
310            && let Err(e) = wal.append(event.clone())
311        {
312            self.metrics.ingestion_errors_total.inc();
313            timer.observe_duration();
314            return Err(e);
315        }
316
317        let mut events = self.events.write();
318        let offset = events.len();
319
320        // Index the event
321        self.index.index_event(
322            event.id,
323            event.entity_id_str(),
324            event.event_type_str(),
325            event.timestamp,
326            offset,
327        )?;
328
329        // Process through projections
330        let projections = self.projections.read();
331        projections.process_event(&event)?;
332        drop(projections); // Release lock
333
334        // Process through pipelines (v0.5 feature)
335        // Pipelines can transform, filter, and aggregate events in real-time
336        let pipeline_results = self.pipeline_manager.process_event(&event);
337        if !pipeline_results.is_empty() {
338            tracing::debug!(
339                "Event {} processed by {} pipeline(s)",
340                event.id,
341                pipeline_results.len()
342            );
343            // Pipeline results could be stored, emitted, or forwarded elsewhere
344            // For now, we just log them for observability
345            for (pipeline_id, result) in pipeline_results {
346                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
347            }
348        }
349
350        // Persist to Parquet storage if enabled (v0.2)
351        if let Some(ref storage) = self.storage {
352            let storage = storage.read();
353            storage.append_event(event.clone())?;
354        }
355
356        // Store the event in memory
357        events.push(event.clone());
358        let total_events = events.len();
359        drop(events); // Release lock early
360
361        // Broadcast to WebSocket clients (v0.2 feature)
362        self.websocket_manager
363            .broadcast_event(Arc::new(event.clone()));
364
365        // Dispatch to matching webhook subscriptions (v0.11 feature)
366        self.dispatch_webhooks(&event);
367
368        // Update geospatial index (v2.0 feature)
369        self.geo_index.index_event(&event);
370
371        // Autonomous schema evolution (v2.0 feature)
372        self.schema_evolution
373            .analyze_event(event.event_type_str(), &event.payload);
374
375        // Check if automatic snapshot should be created (v0.2 feature)
376        self.check_auto_snapshot(event.entity_id_str(), &event);
377
378        // Update metrics (v0.6 feature)
379        self.metrics.events_ingested_total.inc();
380        self.metrics
381            .events_ingested_by_type
382            .with_label_values(&[event.event_type_str()])
383            .inc();
384        self.metrics.storage_events_total.set(total_events as i64);
385
386        // Update legacy total counter
387        let mut total = self.total_ingested.write();
388        *total += 1;
389
390        timer.observe_duration();
391
392        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
393
394        Ok(())
395    }
396
397    /// Ingest a replicated event from the leader (follower mode).
398    ///
399    /// Unlike `ingest()`, this method:
400    /// - Skips WAL writing (the follower's WalReceiver manages its own local WAL)
401    /// - Skips schema validation (the leader already validated)
402    /// - Still indexes, processes projections/pipelines, and broadcasts to WebSocket clients
403    pub fn ingest_replicated(&self, event: Event) -> Result<()> {
404        let timer = self.metrics.ingestion_duration_seconds.start_timer();
405
406        let mut events = self.events.write();
407        let offset = events.len();
408
409        // Index the event
410        self.index.index_event(
411            event.id,
412            event.entity_id_str(),
413            event.event_type_str(),
414            event.timestamp,
415            offset,
416        )?;
417
418        // Process through projections
419        let projections = self.projections.read();
420        projections.process_event(&event)?;
421        drop(projections);
422
423        // Process through pipelines
424        let pipeline_results = self.pipeline_manager.process_event(&event);
425        if !pipeline_results.is_empty() {
426            tracing::debug!(
427                "Replicated event {} processed by {} pipeline(s)",
428                event.id,
429                pipeline_results.len()
430            );
431        }
432
433        // Store the event in memory
434        events.push(event.clone());
435        let total_events = events.len();
436        drop(events);
437
438        // Broadcast to WebSocket clients
439        self.websocket_manager
440            .broadcast_event(Arc::new(event.clone()));
441
442        // Update metrics
443        self.metrics.events_ingested_total.inc();
444        self.metrics
445            .events_ingested_by_type
446            .with_label_values(&[event.event_type_str()])
447            .inc();
448        self.metrics.storage_events_total.set(total_events as i64);
449
450        let mut total = self.total_ingested.write();
451        *total += 1;
452
453        timer.observe_duration();
454
455        tracing::debug!(
456            "Replicated event ingested: {} (offset: {})",
457            event.id,
458            offset
459        );
460
461        Ok(())
462    }
463
464    /// Get the WebSocket manager for this store
465    pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
466        Arc::clone(&self.websocket_manager)
467    }
468
469    /// Get the snapshot manager for this store
470    pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
471        Arc::clone(&self.snapshot_manager)
472    }
473
474    /// Get the compaction manager for this store
475    pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
476        self.compaction_manager.as_ref().map(Arc::clone)
477    }
478
479    /// Get the schema registry for this store (v0.5 feature)
480    pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
481        Arc::clone(&self.schema_registry)
482    }
483
484    /// Get the replay manager for this store (v0.5 feature)
485    pub fn replay_manager(&self) -> Arc<ReplayManager> {
486        Arc::clone(&self.replay_manager)
487    }
488
489    /// Get the pipeline manager for this store (v0.5 feature)
490    pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
491        Arc::clone(&self.pipeline_manager)
492    }
493
494    /// Get the metrics registry for this store (v0.6 feature)
495    pub fn metrics(&self) -> Arc<MetricsRegistry> {
496        Arc::clone(&self.metrics)
497    }
498
499    /// Get the projection manager for this store (v0.7 feature)
500    pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
501        self.projections.read()
502    }
503
504    /// Get the projection state cache for this store (v0.7 feature)
505    /// Used by Elixir Query Service for state synchronization
506    pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
507        Arc::clone(&self.projection_state_cache)
508    }
509
510    /// Get the webhook registry for this store (v0.11 feature)
511    /// Geospatial index for coordinate-based queries (v2.0 feature)
512    pub fn geo_index(&self) -> Arc<GeoIndex> {
513        self.geo_index.clone()
514    }
515
516    /// Exactly-once processing registry (v2.0 feature)
517    pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
518        self.exactly_once.clone()
519    }
520
521    /// Schema evolution manager (v2.0 feature)
522    pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
523        self.schema_evolution.clone()
524    }
525
526    /// Get a read-locked snapshot of all events (for EventQL/GraphQL queries)
527    pub fn snapshot_events(&self) -> Vec<Event> {
528        self.events.read().clone()
529    }
530
531    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
532        Arc::clone(&self.webhook_registry)
533    }
534
535    /// Set the channel for async webhook delivery.
536    /// Called during server startup to wire the delivery worker.
537    pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
538        *self.webhook_tx.write() = Some(tx);
539        tracing::info!("Webhook delivery channel connected");
540    }
541
542    /// Dispatch matching webhooks for a given event (non-blocking).
543    fn dispatch_webhooks(&self, event: &Event) {
544        let matching = self.webhook_registry.find_matching(event);
545        if matching.is_empty() {
546            return;
547        }
548
549        let tx_guard = self.webhook_tx.read();
550        if let Some(ref tx) = *tx_guard {
551            for webhook in matching {
552                let task = WebhookDeliveryTask {
553                    webhook,
554                    event: event.clone(),
555                };
556                if let Err(e) = tx.send(task) {
557                    tracing::warn!("Failed to queue webhook delivery: {}", e);
558                }
559            }
560        }
561    }
562
563    /// Manually flush any pending events to persistent storage
564    pub fn flush_storage(&self) -> Result<()> {
565        if let Some(ref storage) = self.storage {
566            let storage = storage.read();
567            storage.flush()?;
568            tracing::info!("✅ Flushed events to persistent storage");
569        }
570        Ok(())
571    }
572
573    /// Manually create a snapshot for an entity
574    pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
575        // Get all events for this entity
576        let events = self.query(QueryEventsRequest {
577            entity_id: Some(entity_id.to_string()),
578            event_type: None,
579            tenant_id: None,
580            as_of: None,
581            since: None,
582            until: None,
583            limit: None,
584        })?;
585
586        if events.is_empty() {
587            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
588        }
589
590        // Build current state
591        let mut state = serde_json::json!({});
592        for event in &events {
593            if let serde_json::Value::Object(ref mut state_map) = state
594                && let serde_json::Value::Object(ref payload_map) = event.payload
595            {
596                for (key, value) in payload_map {
597                    state_map.insert(key.clone(), value.clone());
598                }
599            }
600        }
601
602        let last_event = events.last().unwrap();
603        self.snapshot_manager.create_snapshot(
604            entity_id.to_string(),
605            state,
606            last_event.timestamp,
607            events.len(),
608            SnapshotType::Manual,
609        )?;
610
611        Ok(())
612    }
613
614    /// Check and create automatic snapshots if needed
615    fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
616        // Count events for this entity
617        let entity_event_count = self
618            .index
619            .get_by_entity(entity_id)
620            .map(|entries| entries.len())
621            .unwrap_or(0);
622
623        if self.snapshot_manager.should_create_snapshot(
624            entity_id,
625            entity_event_count,
626            event.timestamp,
627        ) {
628            // Create snapshot in background (don't block ingestion)
629            if let Err(e) = self.create_snapshot(entity_id) {
630                tracing::warn!(
631                    "Failed to create automatic snapshot for {}: {}",
632                    entity_id,
633                    e
634                );
635            }
636        }
637    }
638
639    /// Validate an event before ingestion
640    fn validate_event(&self, event: &Event) -> Result<()> {
641        // EntityId and EventType value objects already validate non-empty in their constructors
642        // So these checks are now redundant, but we keep them for explicit validation
643        if event.entity_id_str().is_empty() {
644            return Err(AllSourceError::ValidationError(
645                "entity_id cannot be empty".to_string(),
646            ));
647        }
648
649        if event.event_type_str().is_empty() {
650            return Err(AllSourceError::ValidationError(
651                "event_type cannot be empty".to_string(),
652            ));
653        }
654
655        // Reject system namespace events from user-facing ingestion.
656        // System events are written exclusively via SystemMetadataStore.
657        if event.event_type().is_system() {
658            return Err(AllSourceError::ValidationError(
659                "Event types starting with '_system.' are reserved for internal use".to_string(),
660            ));
661        }
662
663        Ok(())
664    }
665
666    /// Reset a projection by clearing its state and reprocessing all events
667    pub fn reset_projection(&self, name: &str) -> Result<usize> {
668        let projection_manager = self.projections.read();
669        let projection = projection_manager.get_projection(name).ok_or_else(|| {
670            AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
671        })?;
672
673        // Clear existing state
674        projection.clear();
675
676        // Clear cached state for this projection
677        let prefix = format!("{name}:");
678        let keys_to_remove: Vec<String> = self
679            .projection_state_cache
680            .iter()
681            .filter(|entry| entry.key().starts_with(&prefix))
682            .map(|entry| entry.key().clone())
683            .collect();
684        for key in keys_to_remove {
685            self.projection_state_cache.remove(&key);
686        }
687
688        // Reprocess all events through this projection
689        let events = self.events.read();
690        let mut reprocessed = 0usize;
691        for event in events.iter() {
692            if projection.process(event).is_ok() {
693                reprocessed += 1;
694            }
695        }
696
697        Ok(reprocessed)
698    }
699
700    /// Get a single event by its UUID
701    pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
702        if let Some(offset) = self.index.get_by_id(event_id) {
703            let events = self.events.read();
704            Ok(events.get(offset).cloned())
705        } else {
706            Ok(None)
707        }
708    }
709
710    /// Query events based on filters (optimized with indices)
711    pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
712        // Determine query type for metrics (v0.6 feature)
713        let query_type = if request.entity_id.is_some() {
714            "entity"
715        } else if request.event_type.is_some() {
716            "type"
717        } else {
718            "full_scan"
719        };
720
721        // Start metrics timer (v0.6 feature)
722        let timer = self
723            .metrics
724            .query_duration_seconds
725            .with_label_values(&[query_type])
726            .start_timer();
727
728        // Increment query counter (v0.6 feature)
729        self.metrics
730            .queries_total
731            .with_label_values(&[query_type])
732            .inc();
733
734        let events = self.events.read();
735
736        // Use index for fast lookups
737        let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
738            // Use entity index
739            self.index
740                .get_by_entity(entity_id)
741                .map(|entries| self.filter_entries(entries, &request))
742                .unwrap_or_default()
743        } else if let Some(event_type) = &request.event_type {
744            // Use type index
745            self.index
746                .get_by_type(event_type)
747                .map(|entries| self.filter_entries(entries, &request))
748                .unwrap_or_default()
749        } else {
750            // Full scan (less efficient but necessary for complex queries)
751            (0..events.len()).collect()
752        };
753
754        // Fetch events and apply remaining filters
755        let mut results: Vec<Event> = offsets
756            .iter()
757            .filter_map(|&offset| events.get(offset).cloned())
758            .filter(|event| self.apply_filters(event, &request))
759            .collect();
760
761        // Sort by timestamp (ascending)
762        results.sort_by_key(|x| x.timestamp);
763
764        // Apply limit
765        if let Some(limit) = request.limit {
766            results.truncate(limit);
767        }
768
769        // Record query results count (v0.6 feature)
770        self.metrics
771            .query_results_total
772            .with_label_values(&[query_type])
773            .inc_by(results.len() as u64);
774
775        timer.observe_duration();
776
777        Ok(results)
778    }
779
780    /// Filter index entries based on query parameters
781    fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
782        entries
783            .into_iter()
784            .filter(|entry| {
785                // Time filters
786                if let Some(as_of) = request.as_of
787                    && entry.timestamp > as_of
788                {
789                    return false;
790                }
791                if let Some(since) = request.since
792                    && entry.timestamp < since
793                {
794                    return false;
795                }
796                if let Some(until) = request.until
797                    && entry.timestamp > until
798                {
799                    return false;
800                }
801                true
802            })
803            .map(|entry| entry.offset)
804            .collect()
805    }
806
807    /// Apply filters to an event
808    fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
809        // Additional type filter if entity was primary
810        if request.entity_id.is_some()
811            && let Some(ref event_type) = request.event_type
812            && event.event_type_str() != event_type
813        {
814            return false;
815        }
816
817        true
818    }
819
820    /// Reconstruct entity state as of a specific timestamp
821    /// v0.2: Now uses snapshots for fast reconstruction
822    pub fn reconstruct_state(
823        &self,
824        entity_id: &str,
825        as_of: Option<DateTime<Utc>>,
826    ) -> Result<serde_json::Value> {
827        // Try to find a snapshot to use as a base (v0.2 optimization)
828        let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
829            // Get snapshot closest to requested time
830            if let Some(snapshot) = self
831                .snapshot_manager
832                .get_snapshot_as_of(entity_id, as_of_time)
833            {
834                tracing::debug!(
835                    "Using snapshot from {} for entity {} (saved {} events)",
836                    snapshot.as_of,
837                    entity_id,
838                    snapshot.event_count
839                );
840                (snapshot.state.clone(), Some(snapshot.as_of))
841            } else {
842                (serde_json::json!({}), None)
843            }
844        } else {
845            // Get latest snapshot for current state
846            if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
847                tracing::debug!(
848                    "Using latest snapshot from {} for entity {}",
849                    snapshot.as_of,
850                    entity_id
851                );
852                (snapshot.state.clone(), Some(snapshot.as_of))
853            } else {
854                (serde_json::json!({}), None)
855            }
856        };
857
858        // Query events after the snapshot (or all if no snapshot)
859        let events = self.query(QueryEventsRequest {
860            entity_id: Some(entity_id.to_string()),
861            event_type: None,
862            tenant_id: None,
863            as_of,
864            since: since_timestamp,
865            until: None,
866            limit: None,
867        })?;
868
869        // If no events and no snapshot, entity not found
870        if events.is_empty() && since_timestamp.is_none() {
871            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
872        }
873
874        // Merge events on top of snapshot (or from scratch if no snapshot)
875        let mut merged_state = merged_state;
876        for event in &events {
877            if let serde_json::Value::Object(ref mut state_map) = merged_state
878                && let serde_json::Value::Object(ref payload_map) = event.payload
879            {
880                for (key, value) in payload_map {
881                    state_map.insert(key.clone(), value.clone());
882                }
883            }
884        }
885
886        // Wrap with metadata
887        let state = serde_json::json!({
888            "entity_id": entity_id,
889            "last_updated": events.last().map(|e| e.timestamp),
890            "event_count": events.len(),
891            "as_of": as_of,
892            "current_state": merged_state,
893            "history": events.iter().map(|e| {
894                serde_json::json!({
895                    "event_id": e.id,
896                    "type": e.event_type,
897                    "timestamp": e.timestamp,
898                    "payload": e.payload
899                })
900            }).collect::<Vec<_>>()
901        });
902
903        Ok(state)
904    }
905
906    /// Get snapshot from projection (faster than reconstructing)
907    pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
908        let projections = self.projections.read();
909
910        if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
911            && let Some(state) = snapshot_projection.get_state(entity_id)
912        {
913            return Ok(serde_json::json!({
914                "entity_id": entity_id,
915                "snapshot": state,
916                "from_projection": "entity_snapshots"
917            }));
918        }
919
920        Err(AllSourceError::EntityNotFound(entity_id.to_string()))
921    }
922
923    /// Get statistics about the event store
924    pub fn stats(&self) -> StoreStats {
925        let events = self.events.read();
926        let index_stats = self.index.stats();
927
928        StoreStats {
929            total_events: events.len(),
930            total_entities: index_stats.total_entities,
931            total_event_types: index_stats.total_event_types,
932            total_ingested: *self.total_ingested.read(),
933        }
934    }
935
936    /// Get all unique streams (entity_ids) in the store
937    pub fn list_streams(&self) -> Vec<StreamInfo> {
938        self.index
939            .get_all_entities()
940            .into_iter()
941            .map(|entity_id| {
942                let event_count = self
943                    .index
944                    .get_by_entity(&entity_id)
945                    .map(|entries| entries.len())
946                    .unwrap_or(0);
947                let last_event_at = self
948                    .index
949                    .get_by_entity(&entity_id)
950                    .and_then(|entries| entries.last().map(|e| e.timestamp));
951                StreamInfo {
952                    stream_id: entity_id,
953                    event_count,
954                    last_event_at,
955                }
956            })
957            .collect()
958    }
959
960    /// Get all unique event types in the store
961    pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
962        self.index
963            .get_all_types()
964            .into_iter()
965            .map(|event_type| {
966                let event_count = self
967                    .index
968                    .get_by_type(&event_type)
969                    .map(|entries| entries.len())
970                    .unwrap_or(0);
971                let last_event_at = self
972                    .index
973                    .get_by_type(&event_type)
974                    .and_then(|entries| entries.last().map(|e| e.timestamp));
975                EventTypeInfo {
976                    event_type,
977                    event_count,
978                    last_event_at,
979                }
980            })
981            .collect()
982    }
983
984    /// Attach a broadcast sender to the WAL for replication.
985    ///
986    /// Thread-safe: can be called through `Arc<EventStore>` at runtime.
987    /// Used during initial setup and during follower → leader promotion.
988    /// When set, every WAL append publishes the entry to the broadcast
989    /// channel so the WAL shipper can stream it to followers.
990    pub fn enable_wal_replication(
991        &self,
992        tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
993    ) {
994        if let Some(ref wal_arc) = self.wal {
995            wal_arc.set_replication_tx(tx);
996            tracing::info!("WAL replication broadcast enabled");
997        } else {
998            tracing::warn!("Cannot enable WAL replication: WAL is not configured");
999        }
1000    }
1001
1002    /// Get a reference to the WAL (if configured).
1003    /// Used by the replication catch-up protocol to determine oldest available offset.
1004    pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1005        self.wal.as_ref()
1006    }
1007
1008    /// Get a reference to the Parquet storage (if configured).
1009    /// Used by the replication catch-up protocol to stream snapshot files to followers.
1010    pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1011        self.storage.as_ref()
1012    }
1013}
1014
1015/// Configuration for EventStore
1016#[derive(Debug, Clone, Default)]
1017pub struct EventStoreConfig {
1018    /// Optional directory for persistent Parquet storage (v0.2 feature)
1019    pub storage_dir: Option<PathBuf>,
1020
1021    /// Snapshot configuration (v0.2 feature)
1022    pub snapshot_config: SnapshotConfig,
1023
1024    /// Optional directory for WAL (Write-Ahead Log) (v0.2 feature)
1025    pub wal_dir: Option<PathBuf>,
1026
1027    /// WAL configuration (v0.2 feature)
1028    pub wal_config: WALConfig,
1029
1030    /// Compaction configuration (v0.2 feature)
1031    pub compaction_config: CompactionConfig,
1032
1033    /// Schema registry configuration (v0.5 feature)
1034    pub schema_registry_config: SchemaRegistryConfig,
1035
1036    /// Optional directory for system metadata storage (dogfood feature).
1037    /// When set, operational metadata (tenants, config, audit) is stored
1038    /// using AllSource's own event store rather than an external database.
1039    /// Defaults to `{storage_dir}/__system/` when storage_dir is set.
1040    pub system_data_dir: Option<PathBuf>,
1041
1042    /// Name of the default tenant to auto-create on first boot.
1043    pub bootstrap_tenant: Option<String>,
1044}
1045
1046impl EventStoreConfig {
1047    /// Create config with persistent storage enabled
1048    pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
1049        Self {
1050            storage_dir: Some(storage_dir.into()),
1051            ..Self::default()
1052        }
1053    }
1054
1055    /// Create config with custom snapshot settings
1056    pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
1057        Self {
1058            snapshot_config,
1059            ..Self::default()
1060        }
1061    }
1062
1063    /// Create config with WAL enabled
1064    pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
1065        Self {
1066            wal_dir: Some(wal_dir.into()),
1067            wal_config,
1068            ..Self::default()
1069        }
1070    }
1071
1072    /// Create config with both persistence and snapshots
1073    pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
1074        Self {
1075            storage_dir: Some(storage_dir.into()),
1076            snapshot_config,
1077            ..Self::default()
1078        }
1079    }
1080
1081    /// Create production config with all features enabled
1082    pub fn production(
1083        storage_dir: impl Into<PathBuf>,
1084        wal_dir: impl Into<PathBuf>,
1085        snapshot_config: SnapshotConfig,
1086        wal_config: WALConfig,
1087        compaction_config: CompactionConfig,
1088    ) -> Self {
1089        let storage_dir = storage_dir.into();
1090        let system_data_dir = storage_dir.join("__system");
1091        Self {
1092            storage_dir: Some(storage_dir),
1093            snapshot_config,
1094            wal_dir: Some(wal_dir.into()),
1095            wal_config,
1096            compaction_config,
1097            system_data_dir: Some(system_data_dir),
1098            ..Self::default()
1099        }
1100    }
1101
1102    /// Resolve the effective system data directory.
1103    ///
1104    /// If explicitly set, returns that. Otherwise, derives from storage_dir.
1105    /// Returns None if neither is configured (in-memory mode).
1106    pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
1107        self.system_data_dir
1108            .clone()
1109            .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
1110    }
1111
1112    /// Build config from environment variables.
1113    ///
1114    /// Reads `ALLSOURCE_DATA_DIR`, `ALLSOURCE_STORAGE_DIR`, `ALLSOURCE_WAL_DIR`,
1115    /// and `ALLSOURCE_WAL_ENABLED` to determine persistence mode.
1116    ///
1117    /// Returns `(config, description)` where description is a human-readable
1118    /// summary of the persistence mode for logging.
1119    pub fn from_env() -> (Self, &'static str) {
1120        Self::from_env_vars(
1121            std::env::var("ALLSOURCE_DATA_DIR")
1122                .ok()
1123                .filter(|s| !s.is_empty()),
1124            std::env::var("ALLSOURCE_STORAGE_DIR")
1125                .ok()
1126                .filter(|s| !s.is_empty()),
1127            std::env::var("ALLSOURCE_WAL_DIR")
1128                .ok()
1129                .filter(|s| !s.is_empty()),
1130            std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
1131        )
1132    }
1133
1134    /// Build config from explicit env-var values (testable without mutating process env).
1135    pub fn from_env_vars(
1136        data_dir: Option<String>,
1137        explicit_storage_dir: Option<String>,
1138        explicit_wal_dir: Option<String>,
1139        wal_enabled_var: Option<String>,
1140    ) -> (Self, &'static str) {
1141        let data_dir = data_dir.filter(|s| !s.is_empty());
1142        let storage_dir = explicit_storage_dir
1143            .filter(|s| !s.is_empty())
1144            .or_else(|| data_dir.as_ref().map(|d| format!("{}/storage", d)));
1145        let wal_dir = explicit_wal_dir
1146            .filter(|s| !s.is_empty())
1147            .or_else(|| data_dir.as_ref().map(|d| format!("{}/wal", d)));
1148        let wal_enabled = wal_enabled_var.map(|v| v == "true").unwrap_or(true);
1149
1150        match (&storage_dir, &wal_dir) {
1151            (Some(sd), Some(wd)) if wal_enabled => {
1152                let config = Self::production(
1153                    sd,
1154                    wd,
1155                    SnapshotConfig::default(),
1156                    WALConfig::default(),
1157                    CompactionConfig::default(),
1158                );
1159                (config, "wal+parquet")
1160            }
1161            (Some(sd), _) => {
1162                let config = Self::with_persistence(sd);
1163                (config, "parquet-only")
1164            }
1165            (_, Some(wd)) if wal_enabled => {
1166                let config = Self::with_wal(wd, WALConfig::default());
1167                (config, "wal-only")
1168            }
1169            _ => (Self::default(), "in-memory"),
1170        }
1171    }
1172}
1173
1174#[derive(Debug, serde::Serialize)]
1175pub struct StoreStats {
1176    pub total_events: usize,
1177    pub total_entities: usize,
1178    pub total_event_types: usize,
1179    pub total_ingested: u64,
1180}
1181
1182/// Information about a stream (entity_id)
1183#[derive(Debug, Clone, serde::Serialize)]
1184pub struct StreamInfo {
1185    /// The stream identifier (entity_id)
1186    pub stream_id: String,
1187    /// Total number of events in this stream
1188    pub event_count: usize,
1189    /// Timestamp of the last event in this stream
1190    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1191}
1192
1193/// Information about an event type
1194#[derive(Debug, Clone, serde::Serialize)]
1195pub struct EventTypeInfo {
1196    /// The event type name
1197    pub event_type: String,
1198    /// Total number of events of this type
1199    pub event_count: usize,
1200    /// Timestamp of the last event of this type
1201    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1202}
1203
1204impl Default for EventStore {
1205    fn default() -> Self {
1206        Self::new()
1207    }
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212    use super::*;
1213    use crate::domain::entities::Event;
1214    use tempfile::TempDir;
1215
1216    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1217        Event::from_strings(
1218            event_type.to_string(),
1219            entity_id.to_string(),
1220            "default".to_string(),
1221            serde_json::json!({"name": "Test", "value": 42}),
1222            None,
1223        )
1224        .unwrap()
1225    }
1226
1227    #[test]
1228    fn test_event_store_new() {
1229        let store = EventStore::new();
1230        assert_eq!(store.stats().total_events, 0);
1231        assert_eq!(store.stats().total_entities, 0);
1232    }
1233
1234    #[test]
1235    fn test_event_store_default() {
1236        let store = EventStore::default();
1237        assert_eq!(store.stats().total_events, 0);
1238    }
1239
1240    #[test]
1241    fn test_ingest_single_event() {
1242        let store = EventStore::new();
1243        let event = create_test_event("entity-1", "user.created");
1244
1245        store.ingest(event).unwrap();
1246
1247        assert_eq!(store.stats().total_events, 1);
1248        assert_eq!(store.stats().total_ingested, 1);
1249    }
1250
1251    #[test]
1252    fn test_ingest_multiple_events() {
1253        let store = EventStore::new();
1254
1255        for i in 0..10 {
1256            let event = create_test_event(&format!("entity-{}", i), "user.created");
1257            store.ingest(event).unwrap();
1258        }
1259
1260        assert_eq!(store.stats().total_events, 10);
1261        assert_eq!(store.stats().total_ingested, 10);
1262    }
1263
1264    #[test]
1265    fn test_query_by_entity_id() {
1266        let store = EventStore::new();
1267
1268        store
1269            .ingest(create_test_event("entity-1", "user.created"))
1270            .unwrap();
1271        store
1272            .ingest(create_test_event("entity-2", "user.created"))
1273            .unwrap();
1274        store
1275            .ingest(create_test_event("entity-1", "user.updated"))
1276            .unwrap();
1277
1278        let results = store
1279            .query(QueryEventsRequest {
1280                entity_id: Some("entity-1".to_string()),
1281                event_type: None,
1282                tenant_id: None,
1283                as_of: None,
1284                since: None,
1285                until: None,
1286                limit: None,
1287            })
1288            .unwrap();
1289
1290        assert_eq!(results.len(), 2);
1291    }
1292
1293    #[test]
1294    fn test_query_by_event_type() {
1295        let store = EventStore::new();
1296
1297        store
1298            .ingest(create_test_event("entity-1", "user.created"))
1299            .unwrap();
1300        store
1301            .ingest(create_test_event("entity-2", "user.updated"))
1302            .unwrap();
1303        store
1304            .ingest(create_test_event("entity-3", "user.created"))
1305            .unwrap();
1306
1307        let results = store
1308            .query(QueryEventsRequest {
1309                entity_id: None,
1310                event_type: Some("user.created".to_string()),
1311                tenant_id: None,
1312                as_of: None,
1313                since: None,
1314                until: None,
1315                limit: None,
1316            })
1317            .unwrap();
1318
1319        assert_eq!(results.len(), 2);
1320    }
1321
1322    #[test]
1323    fn test_query_with_limit() {
1324        let store = EventStore::new();
1325
1326        for i in 0..10 {
1327            let event = create_test_event(&format!("entity-{}", i), "user.created");
1328            store.ingest(event).unwrap();
1329        }
1330
1331        let results = store
1332            .query(QueryEventsRequest {
1333                entity_id: None,
1334                event_type: None,
1335                tenant_id: None,
1336                as_of: None,
1337                since: None,
1338                until: None,
1339                limit: Some(5),
1340            })
1341            .unwrap();
1342
1343        assert_eq!(results.len(), 5);
1344    }
1345
1346    #[test]
1347    fn test_query_empty_store() {
1348        let store = EventStore::new();
1349
1350        let results = store
1351            .query(QueryEventsRequest {
1352                entity_id: Some("non-existent".to_string()),
1353                event_type: None,
1354                tenant_id: None,
1355                as_of: None,
1356                since: None,
1357                until: None,
1358                limit: None,
1359            })
1360            .unwrap();
1361
1362        assert!(results.is_empty());
1363    }
1364
1365    #[test]
1366    fn test_reconstruct_state() {
1367        let store = EventStore::new();
1368
1369        store
1370            .ingest(create_test_event("entity-1", "user.created"))
1371            .unwrap();
1372
1373        let state = store.reconstruct_state("entity-1", None).unwrap();
1374        // The state is wrapped with metadata
1375        assert_eq!(state["current_state"]["name"], "Test");
1376        assert_eq!(state["current_state"]["value"], 42);
1377    }
1378
1379    #[test]
1380    fn test_reconstruct_state_not_found() {
1381        let store = EventStore::new();
1382
1383        let result = store.reconstruct_state("non-existent", None);
1384        assert!(result.is_err());
1385    }
1386
1387    #[test]
1388    fn test_get_snapshot_empty() {
1389        let store = EventStore::new();
1390
1391        let result = store.get_snapshot("non-existent");
1392        // Entity not found error is expected
1393        assert!(result.is_err());
1394    }
1395
1396    #[test]
1397    fn test_create_snapshot() {
1398        let store = EventStore::new();
1399
1400        store
1401            .ingest(create_test_event("entity-1", "user.created"))
1402            .unwrap();
1403
1404        store.create_snapshot("entity-1").unwrap();
1405
1406        // Verify snapshot was created
1407        let snapshot = store.get_snapshot("entity-1").unwrap();
1408        assert!(snapshot != serde_json::json!(null));
1409    }
1410
1411    #[test]
1412    fn test_create_snapshot_entity_not_found() {
1413        let store = EventStore::new();
1414
1415        let result = store.create_snapshot("non-existent");
1416        assert!(result.is_err());
1417    }
1418
1419    #[test]
1420    fn test_websocket_manager() {
1421        let store = EventStore::new();
1422        let manager = store.websocket_manager();
1423        // Manager should be accessible
1424        assert!(Arc::strong_count(&manager) >= 1);
1425    }
1426
1427    #[test]
1428    fn test_snapshot_manager() {
1429        let store = EventStore::new();
1430        let manager = store.snapshot_manager();
1431        assert!(Arc::strong_count(&manager) >= 1);
1432    }
1433
1434    #[test]
1435    fn test_compaction_manager_none() {
1436        let store = EventStore::new();
1437        // Without storage_dir, compaction manager should be None
1438        assert!(store.compaction_manager().is_none());
1439    }
1440
1441    #[test]
1442    fn test_schema_registry() {
1443        let store = EventStore::new();
1444        let registry = store.schema_registry();
1445        assert!(Arc::strong_count(&registry) >= 1);
1446    }
1447
1448    #[test]
1449    fn test_replay_manager() {
1450        let store = EventStore::new();
1451        let manager = store.replay_manager();
1452        assert!(Arc::strong_count(&manager) >= 1);
1453    }
1454
1455    #[test]
1456    fn test_pipeline_manager() {
1457        let store = EventStore::new();
1458        let manager = store.pipeline_manager();
1459        assert!(Arc::strong_count(&manager) >= 1);
1460    }
1461
1462    #[test]
1463    fn test_projection_manager() {
1464        let store = EventStore::new();
1465        let manager = store.projection_manager();
1466        // Built-in projections should be registered
1467        let projections = manager.list_projections();
1468        assert!(projections.len() >= 2); // entity_snapshots and event_counters
1469    }
1470
1471    #[test]
1472    fn test_projection_state_cache() {
1473        let store = EventStore::new();
1474        let cache = store.projection_state_cache();
1475
1476        cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
1477        assert_eq!(cache.len(), 1);
1478
1479        let value = cache.get("test:key").unwrap();
1480        assert_eq!(value["value"], 123);
1481    }
1482
1483    #[test]
1484    fn test_metrics() {
1485        let store = EventStore::new();
1486        let metrics = store.metrics();
1487        assert!(Arc::strong_count(&metrics) >= 1);
1488    }
1489
1490    #[test]
1491    fn test_store_stats() {
1492        let store = EventStore::new();
1493
1494        store
1495            .ingest(create_test_event("entity-1", "user.created"))
1496            .unwrap();
1497        store
1498            .ingest(create_test_event("entity-2", "order.placed"))
1499            .unwrap();
1500
1501        let stats = store.stats();
1502        assert_eq!(stats.total_events, 2);
1503        assert_eq!(stats.total_entities, 2);
1504        assert_eq!(stats.total_event_types, 2);
1505        assert_eq!(stats.total_ingested, 2);
1506    }
1507
1508    #[test]
1509    fn test_event_store_config_default() {
1510        let config = EventStoreConfig::default();
1511        assert!(config.storage_dir.is_none());
1512        assert!(config.wal_dir.is_none());
1513    }
1514
1515    #[test]
1516    fn test_event_store_config_with_persistence() {
1517        let temp_dir = TempDir::new().unwrap();
1518        let config = EventStoreConfig::with_persistence(temp_dir.path());
1519
1520        assert!(config.storage_dir.is_some());
1521        assert!(config.wal_dir.is_none());
1522    }
1523
1524    #[test]
1525    fn test_event_store_config_with_wal() {
1526        let temp_dir = TempDir::new().unwrap();
1527        let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
1528
1529        assert!(config.storage_dir.is_none());
1530        assert!(config.wal_dir.is_some());
1531    }
1532
1533    #[test]
1534    fn test_event_store_config_with_all() {
1535        let temp_dir = TempDir::new().unwrap();
1536        let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
1537
1538        assert!(config.storage_dir.is_some());
1539    }
1540
1541    #[test]
1542    fn test_event_store_config_production() {
1543        let storage_dir = TempDir::new().unwrap();
1544        let wal_dir = TempDir::new().unwrap();
1545        let config = EventStoreConfig::production(
1546            storage_dir.path(),
1547            wal_dir.path(),
1548            SnapshotConfig::default(),
1549            WALConfig::default(),
1550            CompactionConfig::default(),
1551        );
1552
1553        assert!(config.storage_dir.is_some());
1554        assert!(config.wal_dir.is_some());
1555    }
1556
1557    // -----------------------------------------------------------------------
1558    // from_env_vars tests — verifies the env-var-to-config wiring that
1559    // caused the durability bug (events lost on restart) in v0.10.3.
1560    // -----------------------------------------------------------------------
1561
1562    #[test]
1563    fn test_from_env_vars_data_dir_enables_full_persistence() {
1564        let (config, mode) =
1565            EventStoreConfig::from_env_vars(Some("/app/data".to_string()), None, None, None);
1566        assert_eq!(mode, "wal+parquet");
1567        assert_eq!(
1568            config.storage_dir.unwrap().to_str().unwrap(),
1569            "/app/data/storage"
1570        );
1571        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
1572    }
1573
1574    #[test]
1575    fn test_from_env_vars_explicit_dirs() {
1576        let (config, mode) = EventStoreConfig::from_env_vars(
1577            None,
1578            Some("/custom/storage".to_string()),
1579            Some("/custom/wal".to_string()),
1580            None,
1581        );
1582        assert_eq!(mode, "wal+parquet");
1583        assert_eq!(
1584            config.storage_dir.unwrap().to_str().unwrap(),
1585            "/custom/storage"
1586        );
1587        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
1588    }
1589
1590    #[test]
1591    fn test_from_env_vars_wal_disabled() {
1592        let (config, mode) = EventStoreConfig::from_env_vars(
1593            Some("/app/data".to_string()),
1594            None,
1595            None,
1596            Some("false".to_string()),
1597        );
1598        assert_eq!(mode, "parquet-only");
1599        assert!(config.storage_dir.is_some());
1600        assert!(config.wal_dir.is_none());
1601    }
1602
1603    #[test]
1604    fn test_from_env_vars_no_dirs_is_in_memory() {
1605        let (config, mode) = EventStoreConfig::from_env_vars(None, None, None, None);
1606        assert_eq!(mode, "in-memory");
1607        assert!(config.storage_dir.is_none());
1608        assert!(config.wal_dir.is_none());
1609    }
1610
1611    #[test]
1612    fn test_from_env_vars_empty_strings_treated_as_none() {
1613        let (_, mode) = EventStoreConfig::from_env_vars(
1614            Some("".to_string()),
1615            Some("".to_string()),
1616            Some("".to_string()),
1617            None,
1618        );
1619        assert_eq!(mode, "in-memory");
1620    }
1621
1622    #[test]
1623    fn test_from_env_vars_explicit_overrides_data_dir() {
1624        let (config, mode) = EventStoreConfig::from_env_vars(
1625            Some("/app/data".to_string()),
1626            Some("/override/storage".to_string()),
1627            Some("/override/wal".to_string()),
1628            None,
1629        );
1630        assert_eq!(mode, "wal+parquet");
1631        assert_eq!(
1632            config.storage_dir.unwrap().to_str().unwrap(),
1633            "/override/storage"
1634        );
1635        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
1636    }
1637
1638    #[test]
1639    fn test_from_env_vars_wal_only() {
1640        let (config, mode) =
1641            EventStoreConfig::from_env_vars(None, None, Some("/wal/only".to_string()), None);
1642        assert_eq!(mode, "wal-only");
1643        assert!(config.storage_dir.is_none());
1644        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
1645    }
1646
1647    #[test]
1648    fn test_store_stats_serde() {
1649        let stats = StoreStats {
1650            total_events: 100,
1651            total_entities: 50,
1652            total_event_types: 10,
1653            total_ingested: 100,
1654        };
1655
1656        let json = serde_json::to_string(&stats).unwrap();
1657        assert!(json.contains("\"total_events\":100"));
1658        assert!(json.contains("\"total_entities\":50"));
1659    }
1660
1661    #[test]
1662    fn test_query_with_entity_and_type() {
1663        let store = EventStore::new();
1664
1665        store
1666            .ingest(create_test_event("entity-1", "user.created"))
1667            .unwrap();
1668        store
1669            .ingest(create_test_event("entity-1", "user.updated"))
1670            .unwrap();
1671        store
1672            .ingest(create_test_event("entity-2", "user.created"))
1673            .unwrap();
1674
1675        let results = store
1676            .query(QueryEventsRequest {
1677                entity_id: Some("entity-1".to_string()),
1678                event_type: Some("user.created".to_string()),
1679                tenant_id: None,
1680                as_of: None,
1681                since: None,
1682                until: None,
1683                limit: None,
1684            })
1685            .unwrap();
1686
1687        assert_eq!(results.len(), 1);
1688        assert_eq!(results[0].event_type_str(), "user.created");
1689    }
1690
1691    #[test]
1692    fn test_flush_storage_no_storage() {
1693        let store = EventStore::new();
1694        // Without storage, flush should succeed (no-op)
1695        let result = store.flush_storage();
1696        assert!(result.is_ok());
1697    }
1698
1699    #[test]
1700    fn test_state_evolution() {
1701        let store = EventStore::new();
1702
1703        // Initial state
1704        store
1705            .ingest(
1706                Event::from_strings(
1707                    "user.created".to_string(),
1708                    "user-1".to_string(),
1709                    "default".to_string(),
1710                    serde_json::json!({"name": "Alice", "age": 25}),
1711                    None,
1712                )
1713                .unwrap(),
1714            )
1715            .unwrap();
1716
1717        // Update state
1718        store
1719            .ingest(
1720                Event::from_strings(
1721                    "user.updated".to_string(),
1722                    "user-1".to_string(),
1723                    "default".to_string(),
1724                    serde_json::json!({"age": 26}),
1725                    None,
1726                )
1727                .unwrap(),
1728            )
1729            .unwrap();
1730
1731        let state = store.reconstruct_state("user-1", None).unwrap();
1732        // The state is wrapped with metadata
1733        assert_eq!(state["current_state"]["name"], "Alice");
1734        assert_eq!(state["current_state"]["age"], 26);
1735    }
1736
1737    #[test]
1738    fn test_reject_system_event_types() {
1739        let store = EventStore::new();
1740
1741        // System event types should be rejected via user-facing ingestion
1742        let event = Event::reconstruct_from_strings(
1743            uuid::Uuid::new_v4(),
1744            "_system.tenant.created".to_string(),
1745            "_system:tenant:acme".to_string(),
1746            "_system".to_string(),
1747            serde_json::json!({"name": "ACME"}),
1748            chrono::Utc::now(),
1749            None,
1750            1,
1751        );
1752
1753        let result = store.ingest(event);
1754        assert!(result.is_err());
1755        let err = result.unwrap_err();
1756        assert!(
1757            err.to_string().contains("reserved for internal use"),
1758            "Expected system namespace rejection, got: {}",
1759            err
1760        );
1761    }
1762}