Skip to main content

allsource_core/
store.rs

1#[cfg(feature = "server")]
2use crate::application::services::webhook::WebhookRegistry;
3#[cfg(feature = "server")]
4use crate::infrastructure::observability::metrics::MetricsRegistry;
5#[cfg(feature = "server")]
6use crate::infrastructure::web::websocket::WebSocketManager;
7use crate::{
8    application::{
9        dto::QueryEventsRequest,
10        services::{
11            exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
12            pipeline::PipelineManager,
13            projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
14            replay::ReplayManager,
15            schema::{SchemaRegistry, SchemaRegistryConfig},
16            schema_evolution::SchemaEvolutionManager,
17        },
18    },
19    domain::entities::Event,
20    error::{AllSourceError, Result},
21    infrastructure::{
22        persistence::{
23            compaction::{CompactionConfig, CompactionManager},
24            index::{EventIndex, IndexEntry},
25            snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
26            storage::ParquetStorage,
27            wal::{WALConfig, WriteAheadLog},
28        },
29        query::geospatial::GeoIndex,
30    },
31};
32use chrono::{DateTime, Utc};
33use dashmap::DashMap;
34use parking_lot::RwLock;
35use std::{path::PathBuf, sync::Arc};
36#[cfg(feature = "server")]
37use tokio::sync::mpsc;
38
39/// High-performance event store with columnar storage
40pub struct EventStore {
41    /// In-memory event storage
42    events: Arc<RwLock<Vec<Event>>>,
43
44    /// High-performance concurrent index
45    index: Arc<EventIndex>,
46
47    /// Projection manager for real-time aggregations
48    pub(crate) projections: Arc<RwLock<ProjectionManager>>,
49
50    /// Optional persistent storage (v0.2 feature)
51    storage: Option<Arc<RwLock<ParquetStorage>>>,
52
53    /// WebSocket manager for real-time event streaming (v0.2 feature)
54    #[cfg(feature = "server")]
55    websocket_manager: Arc<WebSocketManager>,
56
57    /// Snapshot manager for fast state recovery (v0.2 feature)
58    snapshot_manager: Arc<SnapshotManager>,
59
60    /// Write-Ahead Log for durability (v0.2 feature)
61    wal: Option<Arc<WriteAheadLog>>,
62
63    /// Compaction manager for Parquet optimization (v0.2 feature)
64    compaction_manager: Option<Arc<CompactionManager>>,
65
66    /// Schema registry for event validation (v0.5 feature)
67    schema_registry: Arc<SchemaRegistry>,
68
69    /// Replay manager for event replay and projection rebuilding (v0.5 feature)
70    replay_manager: Arc<ReplayManager>,
71
72    /// Pipeline manager for stream processing (v0.5 feature)
73    pipeline_manager: Arc<PipelineManager>,
74
75    /// Prometheus metrics registry (v0.6 feature)
76    #[cfg(feature = "server")]
77    metrics: Arc<MetricsRegistry>,
78
79    /// Total events ingested (for metrics)
80    total_ingested: Arc<RwLock<u64>>,
81
82    /// Projection state cache for Query Service integration (v0.7 feature)
83    /// Key format: "{projection_name}:{entity_id}"
84    /// This DashMap provides O(1) access with ~11.9 μs latency
85    projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
86
87    /// Projection status overrides (v0.13 feature)
88    /// Tracks pause/start state per projection name: "running" or "paused"
89    projection_status: Arc<DashMap<String, String>>,
90
91    /// Webhook registry for outbound event delivery (v0.11 feature)
92    #[cfg(feature = "server")]
93    webhook_registry: Arc<WebhookRegistry>,
94
95    /// Channel sender for async webhook delivery tasks
96    #[cfg(feature = "server")]
97    webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
98
99    /// Geospatial index for coordinate-based queries (v2.0 feature)
100    geo_index: Arc<GeoIndex>,
101
102    /// Exactly-once processing registry (v2.0 feature)
103    exactly_once: Arc<ExactlyOnceRegistry>,
104
105    /// Autonomous schema evolution manager (v2.0 feature)
106    schema_evolution: Arc<SchemaEvolutionManager>,
107}
108
109/// A task queued for async webhook delivery
110#[cfg(feature = "server")]
111#[derive(Debug, Clone)]
112pub struct WebhookDeliveryTask {
113    pub webhook: crate::application::services::webhook::WebhookSubscription,
114    pub event: Event,
115}
116
117impl EventStore {
118    /// Create a new in-memory event store
119    pub fn new() -> Self {
120        Self::with_config(EventStoreConfig::default())
121    }
122
123    /// Create event store with custom configuration
124    pub fn with_config(config: EventStoreConfig) -> Self {
125        let mut projections = ProjectionManager::new();
126
127        // Register built-in projections
128        projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
129        projections.register(Arc::new(EventCounterProjection::new("event_counters")));
130
131        // Initialize persistent storage if configured
132        let storage = config
133            .storage_dir
134            .as_ref()
135            .and_then(|dir| match ParquetStorage::new(dir) {
136                Ok(storage) => {
137                    tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
138                    Some(Arc::new(RwLock::new(storage)))
139                }
140                Err(e) => {
141                    tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
142                    None
143                }
144            });
145
146        // Initialize WAL if configured (v0.2 feature)
147        let wal = config.wal_dir.as_ref().and_then(|dir| {
148            match WriteAheadLog::new(dir, config.wal_config.clone()) {
149                Ok(wal) => {
150                    tracing::info!("✅ WAL enabled at: {}", dir.display());
151                    Some(Arc::new(wal))
152                }
153                Err(e) => {
154                    tracing::error!("❌ Failed to initialize WAL: {}", e);
155                    None
156                }
157            }
158        });
159
160        // Initialize compaction manager if Parquet storage is enabled (v0.2 feature)
161        let compaction_manager = config.storage_dir.as_ref().map(|dir| {
162            let manager = CompactionManager::new(dir, config.compaction_config.clone());
163            Arc::new(manager)
164        });
165
166        // Initialize schema registry (v0.5 feature)
167        let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
168        tracing::info!("✅ Schema registry enabled");
169
170        // Initialize replay manager (v0.5 feature)
171        let replay_manager = Arc::new(ReplayManager::new());
172        tracing::info!("✅ Replay manager enabled");
173
174        // Initialize pipeline manager (v0.5 feature)
175        let pipeline_manager = Arc::new(PipelineManager::new());
176        tracing::info!("✅ Pipeline manager enabled");
177
178        // Initialize metrics registry (v0.6 feature)
179        #[cfg(feature = "server")]
180        let metrics = {
181            let m = MetricsRegistry::new();
182            tracing::info!("✅ Prometheus metrics registry initialized");
183            m
184        };
185
186        // Initialize projection state cache (v0.7 feature)
187        let projection_state_cache = Arc::new(DashMap::new());
188        tracing::info!("✅ Projection state cache initialized");
189
190        // Initialize webhook registry (v0.11 feature)
191        #[cfg(feature = "server")]
192        let webhook_registry = {
193            let w = Arc::new(WebhookRegistry::new());
194            tracing::info!("✅ Webhook registry initialized");
195            w
196        };
197
198        let store = Self {
199            events: Arc::new(RwLock::new(Vec::new())),
200            index: Arc::new(EventIndex::new()),
201            projections: Arc::new(RwLock::new(projections)),
202            storage,
203            #[cfg(feature = "server")]
204            websocket_manager: Arc::new(WebSocketManager::new()),
205            snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
206            wal,
207            compaction_manager,
208            schema_registry,
209            replay_manager,
210            pipeline_manager,
211            #[cfg(feature = "server")]
212            metrics,
213            total_ingested: Arc::new(RwLock::new(0)),
214            projection_state_cache,
215            projection_status: Arc::new(DashMap::new()),
216            #[cfg(feature = "server")]
217            webhook_registry,
218            #[cfg(feature = "server")]
219            webhook_tx: Arc::new(RwLock::new(None)),
220            geo_index: Arc::new(GeoIndex::new()),
221            exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
222            schema_evolution: Arc::new(SchemaEvolutionManager::new()),
223        };
224
225        // Step 1: Load persisted events from Parquet (the durable baseline)
226        if let Some(ref storage) = store.storage
227            && let Ok(persisted_events) = storage.read().load_all_events()
228            && !persisted_events.is_empty()
229        {
230            tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
231
232            for event in persisted_events {
233                let offset = store.events.read().len();
234                if let Err(e) = store.index.index_event(
235                    event.id,
236                    event.entity_id_str(),
237                    event.event_type_str(),
238                    event.timestamp,
239                    offset,
240                ) {
241                    tracing::error!("Failed to re-index event {}: {}", event.id, e);
242                }
243
244                if let Err(e) = store.projections.read().process_event(&event) {
245                    tracing::error!("Failed to re-process event {}: {}", event.id, e);
246                }
247
248                store.events.write().push(event);
249            }
250
251            let total = store.events.read().len();
252            *store.total_ingested.write() = total as u64;
253            tracing::info!("✅ Successfully loaded {} events from storage", total);
254        }
255
256        // Step 2: Recover WAL events (written after last Parquet checkpoint)
257        if let Some(ref wal) = store.wal {
258            match wal.recover() {
259                Ok(recovered_events) if !recovered_events.is_empty() => {
260                    // Collect IDs already loaded from Parquet to skip duplicates
261                    let existing_ids: std::collections::HashSet<uuid::Uuid> =
262                        store.events.read().iter().map(|e| e.id).collect();
263
264                    let mut wal_new = 0usize;
265                    for event in recovered_events {
266                        if existing_ids.contains(&event.id) {
267                            continue; // already loaded from Parquet
268                        }
269
270                        let offset = store.events.read().len();
271                        if let Err(e) = store.index.index_event(
272                            event.id,
273                            event.entity_id_str(),
274                            event.event_type_str(),
275                            event.timestamp,
276                            offset,
277                        ) {
278                            tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
279                        }
280
281                        if let Err(e) = store.projections.read().process_event(&event) {
282                            tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
283                        }
284
285                        store.events.write().push(event);
286                        wal_new += 1;
287                    }
288
289                    if wal_new > 0 {
290                        let total = store.events.read().len();
291                        *store.total_ingested.write() = total as u64;
292                        tracing::info!(
293                            "✅ Recovered {} new events from WAL ({} total)",
294                            wal_new,
295                            total
296                        );
297
298                        // Checkpoint WAL events to Parquet — buffer them into
299                        // the Parquet batch first, then flush. Without this,
300                        // flush_storage() finds an empty current_batch and
301                        // silently no-ops, then we truncate the WAL and the
302                        // events exist only in memory (lost on next restart).
303                        if let Some(ref storage) = store.storage {
304                            tracing::info!(
305                                "📸 Checkpointing {} WAL events to Parquet storage...",
306                                wal_new
307                            );
308                            let parquet = storage.read();
309                            let events = store.events.read();
310                            let mut buffered = 0usize;
311                            for event in events.iter().skip(events.len() - wal_new) {
312                                if let Err(e) = parquet.append_event(event.clone()) {
313                                    tracing::error!(
314                                        "Failed to buffer WAL event for Parquet: {}",
315                                        e
316                                    );
317                                } else {
318                                    buffered += 1;
319                                }
320                            }
321                            drop(events);
322                            drop(parquet);
323
324                            if buffered > 0 {
325                                if let Err(e) = store.flush_storage() {
326                                    tracing::error!("Failed to checkpoint to Parquet: {}", e);
327                                } else if let Err(e) = wal.truncate() {
328                                    tracing::error!(
329                                        "Failed to truncate WAL after checkpoint: {}",
330                                        e
331                                    );
332                                } else {
333                                    tracing::info!(
334                                        "✅ WAL checkpointed and truncated ({} events)",
335                                        buffered
336                                    );
337                                }
338                            }
339                        }
340                    }
341                }
342                Ok(_) => {
343                    tracing::debug!("No events to recover from WAL");
344                }
345                Err(e) => {
346                    tracing::error!("❌ WAL recovery failed: {}", e);
347                }
348            }
349        }
350
351        store
352    }
353
354    /// Ingest a new event into the store
355    pub fn ingest(&self, event: Event) -> Result<()> {
356        // Start metrics timer (v0.6 feature)
357        #[cfg(feature = "server")]
358        let timer = self.metrics.ingestion_duration_seconds.start_timer();
359
360        // Validate event
361        let validation_result = self.validate_event(&event);
362        if let Err(e) = validation_result {
363            #[cfg(feature = "server")]
364            {
365                self.metrics.ingestion_errors_total.inc();
366                timer.observe_duration();
367            }
368            return Err(e);
369        }
370
371        // Write to WAL FIRST for durability (v0.2 feature)
372        // This ensures event is persisted before processing
373        if let Some(ref wal) = self.wal
374            && let Err(e) = wal.append(event.clone())
375        {
376            #[cfg(feature = "server")]
377            {
378                self.metrics.ingestion_errors_total.inc();
379                timer.observe_duration();
380            }
381            return Err(e);
382        }
383
384        let mut events = self.events.write();
385        let offset = events.len();
386
387        // Index the event
388        self.index.index_event(
389            event.id,
390            event.entity_id_str(),
391            event.event_type_str(),
392            event.timestamp,
393            offset,
394        )?;
395
396        // Process through projections
397        let projections = self.projections.read();
398        projections.process_event(&event)?;
399        drop(projections); // Release lock
400
401        // Process through pipelines (v0.5 feature)
402        // Pipelines can transform, filter, and aggregate events in real-time
403        let pipeline_results = self.pipeline_manager.process_event(&event);
404        if !pipeline_results.is_empty() {
405            tracing::debug!(
406                "Event {} processed by {} pipeline(s)",
407                event.id,
408                pipeline_results.len()
409            );
410            // Pipeline results could be stored, emitted, or forwarded elsewhere
411            // For now, we just log them for observability
412            for (pipeline_id, result) in pipeline_results {
413                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
414            }
415        }
416
417        // Persist to Parquet storage if enabled (v0.2)
418        if let Some(ref storage) = self.storage {
419            let storage = storage.read();
420            storage.append_event(event.clone())?;
421        }
422
423        // Store the event in memory
424        events.push(event.clone());
425        let total_events = events.len();
426        drop(events); // Release lock early
427
428        // Broadcast to WebSocket clients (v0.2 feature)
429        #[cfg(feature = "server")]
430        self.websocket_manager
431            .broadcast_event(Arc::new(event.clone()));
432
433        // Dispatch to matching webhook subscriptions (v0.11 feature)
434        #[cfg(feature = "server")]
435        self.dispatch_webhooks(&event);
436
437        // Update geospatial index (v2.0 feature)
438        self.geo_index.index_event(&event);
439
440        // Autonomous schema evolution (v2.0 feature)
441        self.schema_evolution
442            .analyze_event(event.event_type_str(), &event.payload);
443
444        // Check if automatic snapshot should be created (v0.2 feature)
445        self.check_auto_snapshot(event.entity_id_str(), &event);
446
447        // Update metrics (v0.6 feature)
448        #[cfg(feature = "server")]
449        {
450            self.metrics.events_ingested_total.inc();
451            self.metrics
452                .events_ingested_by_type
453                .with_label_values(&[event.event_type_str()])
454                .inc();
455            self.metrics.storage_events_total.set(total_events as i64);
456        }
457
458        // Update legacy total counter
459        let mut total = self.total_ingested.write();
460        *total += 1;
461
462        #[cfg(feature = "server")]
463        timer.observe_duration();
464
465        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
466
467        Ok(())
468    }
469
470    /// Ingest a batch of events with a single write lock acquisition.
471    ///
472    /// All events are validated first. If any event fails validation, no
473    /// events are stored (all-or-nothing validation). Events are then written
474    /// to WAL, indexed, processed through projections, and pushed to the
475    /// events vector under a single write lock.
476    pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()> {
477        if batch.is_empty() {
478            return Ok(());
479        }
480
481        // Phase 1: Validate all events before acquiring any locks
482        for event in &batch {
483            self.validate_event(event)?;
484        }
485
486        // Phase 2: Write all events to WAL (before write lock, for durability)
487        if let Some(ref wal) = self.wal {
488            for event in &batch {
489                wal.append(event.clone())?;
490            }
491        }
492
493        // Phase 3: Single write lock for index + projections + push
494        let mut events = self.events.write();
495        let projections = self.projections.read();
496
497        for event in batch {
498            let offset = events.len();
499
500            self.index.index_event(
501                event.id,
502                event.entity_id_str(),
503                event.event_type_str(),
504                event.timestamp,
505                offset,
506            )?;
507
508            projections.process_event(&event)?;
509            self.pipeline_manager.process_event(&event);
510
511            if let Some(ref storage) = self.storage {
512                let storage = storage.read();
513                storage.append_event(event.clone())?;
514            }
515
516            self.geo_index.index_event(&event);
517            self.schema_evolution
518                .analyze_event(event.event_type_str(), &event.payload);
519
520            events.push(event);
521        }
522
523        let total_events = events.len();
524        drop(projections);
525        drop(events);
526
527        let mut total = self.total_ingested.write();
528        *total += total_events as u64;
529
530        Ok(())
531    }
532
533    /// Ingest a replicated event from the leader (follower mode).
534    ///
535    /// Unlike `ingest()`, this method:
536    /// - Skips WAL writing (the follower's WalReceiver manages its own local WAL)
537    /// - Skips schema validation (the leader already validated)
538    /// - Still indexes, processes projections/pipelines, and broadcasts to WebSocket clients
539    pub fn ingest_replicated(&self, event: Event) -> Result<()> {
540        #[cfg(feature = "server")]
541        let timer = self.metrics.ingestion_duration_seconds.start_timer();
542
543        let mut events = self.events.write();
544        let offset = events.len();
545
546        // Index the event
547        self.index.index_event(
548            event.id,
549            event.entity_id_str(),
550            event.event_type_str(),
551            event.timestamp,
552            offset,
553        )?;
554
555        // Process through projections
556        let projections = self.projections.read();
557        projections.process_event(&event)?;
558        drop(projections);
559
560        // Process through pipelines
561        let pipeline_results = self.pipeline_manager.process_event(&event);
562        if !pipeline_results.is_empty() {
563            tracing::debug!(
564                "Replicated event {} processed by {} pipeline(s)",
565                event.id,
566                pipeline_results.len()
567            );
568        }
569
570        // Store the event in memory
571        events.push(event.clone());
572        let total_events = events.len();
573        drop(events);
574
575        // Broadcast to WebSocket clients
576        #[cfg(feature = "server")]
577        self.websocket_manager
578            .broadcast_event(Arc::new(event.clone()));
579
580        // Update metrics
581        #[cfg(feature = "server")]
582        {
583            self.metrics.events_ingested_total.inc();
584            self.metrics
585                .events_ingested_by_type
586                .with_label_values(&[event.event_type_str()])
587                .inc();
588            self.metrics.storage_events_total.set(total_events as i64);
589        }
590
591        let mut total = self.total_ingested.write();
592        *total += 1;
593
594        #[cfg(feature = "server")]
595        timer.observe_duration();
596
597        tracing::debug!(
598            "Replicated event ingested: {} (offset: {})",
599            event.id,
600            offset
601        );
602
603        Ok(())
604    }
605
606    /// Get the WebSocket manager for this store
607    #[cfg(feature = "server")]
608    pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
609        Arc::clone(&self.websocket_manager)
610    }
611
612    /// Get the snapshot manager for this store
613    pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
614        Arc::clone(&self.snapshot_manager)
615    }
616
617    /// Get the compaction manager for this store
618    pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
619        self.compaction_manager.as_ref().map(Arc::clone)
620    }
621
622    /// Get the schema registry for this store (v0.5 feature)
623    pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
624        Arc::clone(&self.schema_registry)
625    }
626
627    /// Get the replay manager for this store (v0.5 feature)
628    pub fn replay_manager(&self) -> Arc<ReplayManager> {
629        Arc::clone(&self.replay_manager)
630    }
631
632    /// Get the pipeline manager for this store (v0.5 feature)
633    pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
634        Arc::clone(&self.pipeline_manager)
635    }
636
637    /// Get the metrics registry for this store (v0.6 feature)
638    #[cfg(feature = "server")]
639    pub fn metrics(&self) -> Arc<MetricsRegistry> {
640        Arc::clone(&self.metrics)
641    }
642
643    /// Get the projection manager for this store (v0.7 feature)
644    pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
645        self.projections.read()
646    }
647
648    /// Register a custom projection at runtime.
649    ///
650    /// The projection will receive all future events via `process()`.
651    /// Historical events are **not** replayed — only events ingested after
652    /// registration will be processed by this projection.
653    ///
654    /// See [`register_projection_with_backfill`](Self::register_projection_with_backfill)
655    /// to also process historical events.
656    pub fn register_projection(
657        &self,
658        projection: Arc<dyn crate::application::services::projection::Projection>,
659    ) {
660        let mut pm = self.projections.write();
661        pm.register(projection);
662    }
663
664    /// Register a custom projection and replay all existing events through it.
665    ///
666    /// After registration, the projection will also receive all future events.
667    /// Historical events are replayed under a read lock — the projection's
668    /// internal state (typically DashMap) handles concurrent access.
669    pub fn register_projection_with_backfill(
670        &self,
671        projection: Arc<dyn crate::application::services::projection::Projection>,
672    ) -> Result<()> {
673        // First register so future events are processed
674        {
675            let mut pm = self.projections.write();
676            pm.register(Arc::clone(&projection));
677        }
678
679        // Then replay existing events under read lock
680        let events = self.events.read();
681        for event in events.iter() {
682            projection.process(event)?;
683        }
684
685        Ok(())
686    }
687
688    /// Get the projection state cache for this store (v0.7 feature)
689    /// Used by Elixir Query Service for state synchronization
690    pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
691        Arc::clone(&self.projection_state_cache)
692    }
693
694    /// Get the projection status map (v0.13 feature)
695    pub fn projection_status(&self) -> Arc<DashMap<String, String>> {
696        Arc::clone(&self.projection_status)
697    }
698
699    /// Get the webhook registry for this store (v0.11 feature)
700    /// Geospatial index for coordinate-based queries (v2.0 feature)
701    pub fn geo_index(&self) -> Arc<GeoIndex> {
702        self.geo_index.clone()
703    }
704
705    /// Exactly-once processing registry (v2.0 feature)
706    pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
707        self.exactly_once.clone()
708    }
709
710    /// Schema evolution manager (v2.0 feature)
711    pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
712        self.schema_evolution.clone()
713    }
714
715    /// Get a read-locked snapshot of all events (for EventQL/GraphQL queries).
716    ///
717    /// Returns an `Arc` reference to the internal events vec, avoiding a full
718    /// clone. The caller holds a read lock for the duration of the `Arc`
719    /// lifetime — prefer short-lived usage.
720    pub fn snapshot_events(&self) -> Vec<Event> {
721        self.events.read().clone()
722    }
723
724    /// Compact token events for an entity by replacing matching events with a
725    /// single merged event. Used by the embedded streaming feature.
726    ///
727    /// Returns `Ok(true)` if compaction was performed, `Ok(false)` if no
728    /// matching events were found.
729    ///
730    /// **Note:** The merged event is processed through projections *without*
731    /// clearing the removed events' projection state first. Projections that
732    /// accumulate state (e.g., counters) should be designed to handle this
733    /// (the merged event replaces individual tokens, not adds to them).
734    ///
735    /// **Crash safety:** The WAL append happens *after* the in-memory swap
736    /// under the write lock. If the process crashes before the WAL write,
737    /// no change is persisted — WAL replay restores the pre-compaction state.
738    /// If the process crashes after the WAL write, replay sees the merged
739    /// event (and the original tokens, which are idempotent to replay since
740    /// the merged event supersedes them).
741    ///
742    /// The write lock is held for the swap + WAL write + index rebuild.
743    /// The index rebuild is O(N) over all events, which is acceptable for
744    /// embedded workloads but should not be called in hot paths for large stores.
745    pub fn compact_entity_tokens(
746        &self,
747        entity_id: &str,
748        token_event_type: &str,
749        merged_event: Event,
750    ) -> Result<bool> {
751        // Phase 1: Read-only check — do we have anything to compact?
752        {
753            let events = self.events.read();
754            let has_tokens = events
755                .iter()
756                .any(|e| e.entity_id_str() == entity_id && e.event_type_str() == token_event_type);
757            if !has_tokens {
758                return Ok(false);
759            }
760        }
761
762        // Phase 2: Process merged event through projections (no write lock held)
763        let projections = self.projections.read();
764        projections.process_event(&merged_event)?;
765        drop(projections);
766
767        // Phase 3: Acquire write lock for the swap + WAL + index rebuild
768        let mut events = self.events.write();
769
770        events.retain(|e| {
771            !(e.entity_id_str() == entity_id && e.event_type_str() == token_event_type)
772        });
773
774        events.push(merged_event.clone());
775
776        // WAL append inside write lock: crash before this line = no change persisted.
777        // Crash after = merged event in WAL, original tokens also in WAL but
778        // superseded by the merged event's entity_id + event_type.
779        if let Some(ref wal) = self.wal {
780            wal.append(merged_event)?;
781        }
782
783        // Rebuild entire index since retain() shifted event positions.
784        // Errors here indicate a corrupt event (missing entity_id/event_type)
785        // which should not happen for well-formed events. Log and continue
786        // rather than failing the entire compaction.
787        self.index.clear();
788        for (offset, event) in events.iter().enumerate() {
789            if let Err(e) = self.index.index_event(
790                event.id,
791                event.entity_id_str(),
792                event.event_type_str(),
793                event.timestamp,
794                offset,
795            ) {
796                tracing::warn!(
797                    event_id = %event.id,
798                    offset,
799                    "Failed to re-index event during compaction: {e}"
800                );
801            }
802        }
803
804        Ok(true)
805    }
806
807    #[cfg(feature = "server")]
808    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
809        Arc::clone(&self.webhook_registry)
810    }
811
812    /// Set the channel for async webhook delivery.
813    /// Called during server startup to wire the delivery worker.
814    #[cfg(feature = "server")]
815    pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
816        *self.webhook_tx.write() = Some(tx);
817        tracing::info!("Webhook delivery channel connected");
818    }
819
820    /// Dispatch matching webhooks for a given event (non-blocking).
821    #[cfg(feature = "server")]
822    fn dispatch_webhooks(&self, event: &Event) {
823        let matching = self.webhook_registry.find_matching(event);
824        if matching.is_empty() {
825            return;
826        }
827
828        let tx_guard = self.webhook_tx.read();
829        if let Some(ref tx) = *tx_guard {
830            for webhook in matching {
831                let task = WebhookDeliveryTask {
832                    webhook,
833                    event: event.clone(),
834                };
835                if let Err(e) = tx.send(task) {
836                    tracing::warn!("Failed to queue webhook delivery: {}", e);
837                }
838            }
839        }
840    }
841
842    /// Manually flush any pending events to persistent storage
843    pub fn flush_storage(&self) -> Result<()> {
844        if let Some(ref storage) = self.storage {
845            let storage = storage.read();
846            storage.flush()?;
847            tracing::info!("✅ Flushed events to persistent storage");
848        }
849        Ok(())
850    }
851
852    /// Manually create a snapshot for an entity
853    pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
854        // Get all events for this entity
855        let events = self.query(QueryEventsRequest {
856            entity_id: Some(entity_id.to_string()),
857            event_type: None,
858            tenant_id: None,
859            as_of: None,
860            since: None,
861            until: None,
862            limit: None,
863            event_type_prefix: None,
864            payload_filter: None,
865        })?;
866
867        if events.is_empty() {
868            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
869        }
870
871        // Build current state
872        let mut state = serde_json::json!({});
873        for event in &events {
874            if let serde_json::Value::Object(ref mut state_map) = state
875                && let serde_json::Value::Object(ref payload_map) = event.payload
876            {
877                for (key, value) in payload_map {
878                    state_map.insert(key.clone(), value.clone());
879                }
880            }
881        }
882
883        let last_event = events.last().unwrap();
884        self.snapshot_manager.create_snapshot(
885            entity_id.to_string(),
886            state,
887            last_event.timestamp,
888            events.len(),
889            SnapshotType::Manual,
890        )?;
891
892        Ok(())
893    }
894
895    /// Check and create automatic snapshots if needed
896    fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
897        // Count events for this entity
898        let entity_event_count = self
899            .index
900            .get_by_entity(entity_id)
901            .map(|entries| entries.len())
902            .unwrap_or(0);
903
904        if self.snapshot_manager.should_create_snapshot(
905            entity_id,
906            entity_event_count,
907            event.timestamp,
908        ) {
909            // Create snapshot in background (don't block ingestion)
910            if let Err(e) = self.create_snapshot(entity_id) {
911                tracing::warn!(
912                    "Failed to create automatic snapshot for {}: {}",
913                    entity_id,
914                    e
915                );
916            }
917        }
918    }
919
920    /// Validate an event before ingestion
921    fn validate_event(&self, event: &Event) -> Result<()> {
922        // EntityId and EventType value objects already validate non-empty in their constructors
923        // So these checks are now redundant, but we keep them for explicit validation
924        if event.entity_id_str().is_empty() {
925            return Err(AllSourceError::ValidationError(
926                "entity_id cannot be empty".to_string(),
927            ));
928        }
929
930        if event.event_type_str().is_empty() {
931            return Err(AllSourceError::ValidationError(
932                "event_type cannot be empty".to_string(),
933            ));
934        }
935
936        // Reject system namespace events from user-facing ingestion.
937        // System events are written exclusively via SystemMetadataStore.
938        if event.event_type().is_system() {
939            return Err(AllSourceError::ValidationError(
940                "Event types starting with '_system.' are reserved for internal use".to_string(),
941            ));
942        }
943
944        Ok(())
945    }
946
947    /// Reset a projection by clearing its state and reprocessing all events
948    pub fn reset_projection(&self, name: &str) -> Result<usize> {
949        let projection_manager = self.projections.read();
950        let projection = projection_manager.get_projection(name).ok_or_else(|| {
951            AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
952        })?;
953
954        // Clear existing state
955        projection.clear();
956
957        // Clear cached state for this projection
958        let prefix = format!("{name}:");
959        let keys_to_remove: Vec<String> = self
960            .projection_state_cache
961            .iter()
962            .filter(|entry| entry.key().starts_with(&prefix))
963            .map(|entry| entry.key().clone())
964            .collect();
965        for key in keys_to_remove {
966            self.projection_state_cache.remove(&key);
967        }
968
969        // Reprocess all events through this projection
970        let events = self.events.read();
971        let mut reprocessed = 0usize;
972        for event in events.iter() {
973            if projection.process(event).is_ok() {
974                reprocessed += 1;
975            }
976        }
977
978        Ok(reprocessed)
979    }
980
981    /// Get a single event by its UUID
982    pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
983        if let Some(offset) = self.index.get_by_id(event_id) {
984            let events = self.events.read();
985            Ok(events.get(offset).cloned())
986        } else {
987            Ok(None)
988        }
989    }
990
991    /// Query events based on filters (optimized with indices)
992    pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
993        // Determine query type for metrics (v0.6 feature)
994        let query_type = if request.entity_id.is_some() {
995            "entity"
996        } else if request.event_type.is_some() {
997            "type"
998        } else if request.event_type_prefix.is_some() {
999            "type_prefix"
1000        } else {
1001            "full_scan"
1002        };
1003
1004        // Start metrics timer (v0.6 feature)
1005        #[cfg(feature = "server")]
1006        let timer = self
1007            .metrics
1008            .query_duration_seconds
1009            .with_label_values(&[query_type])
1010            .start_timer();
1011
1012        // Increment query counter (v0.6 feature)
1013        #[cfg(feature = "server")]
1014        self.metrics
1015            .queries_total
1016            .with_label_values(&[query_type])
1017            .inc();
1018
1019        let events = self.events.read();
1020
1021        // Use index for fast lookups
1022        let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
1023            // Use entity index
1024            self.index
1025                .get_by_entity(entity_id)
1026                .map(|entries| self.filter_entries(entries, &request))
1027                .unwrap_or_default()
1028        } else if let Some(event_type) = &request.event_type {
1029            // Use type index (exact match)
1030            self.index
1031                .get_by_type(event_type)
1032                .map(|entries| self.filter_entries(entries, &request))
1033                .unwrap_or_default()
1034        } else if let Some(prefix) = &request.event_type_prefix {
1035            // Use type index (prefix match)
1036            let entries = self.index.get_by_type_prefix(prefix);
1037            self.filter_entries(entries, &request)
1038        } else {
1039            // Full scan (less efficient but necessary for complex queries)
1040            (0..events.len()).collect()
1041        };
1042
1043        // Fetch events and apply remaining filters
1044        let mut results: Vec<Event> = offsets
1045            .iter()
1046            .filter_map(|&offset| events.get(offset).cloned())
1047            .filter(|event| self.apply_filters(event, &request))
1048            .collect();
1049
1050        // Sort by timestamp (ascending)
1051        results.sort_by_key(|x| x.timestamp);
1052
1053        // Apply limit
1054        if let Some(limit) = request.limit {
1055            results.truncate(limit);
1056        }
1057
1058        // Record query results count (v0.6 feature)
1059        #[cfg(feature = "server")]
1060        {
1061            self.metrics
1062                .query_results_total
1063                .with_label_values(&[query_type])
1064                .inc_by(results.len() as u64);
1065            timer.observe_duration();
1066        }
1067
1068        Ok(results)
1069    }
1070
1071    /// Filter index entries based on query parameters
1072    fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
1073        entries
1074            .into_iter()
1075            .filter(|entry| {
1076                // Time filters
1077                if let Some(as_of) = request.as_of
1078                    && entry.timestamp > as_of
1079                {
1080                    return false;
1081                }
1082                if let Some(since) = request.since
1083                    && entry.timestamp < since
1084                {
1085                    return false;
1086                }
1087                if let Some(until) = request.until
1088                    && entry.timestamp > until
1089                {
1090                    return false;
1091                }
1092                true
1093            })
1094            .map(|entry| entry.offset)
1095            .collect()
1096    }
1097
1098    /// Apply filters to an event
1099    fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
1100        // Additional type filter if entity was primary
1101        if request.entity_id.is_some()
1102            && let Some(ref event_type) = request.event_type
1103            && event.event_type_str() != event_type
1104        {
1105            return false;
1106        }
1107
1108        // Additional prefix filter if entity was primary
1109        if request.entity_id.is_some()
1110            && let Some(ref prefix) = request.event_type_prefix
1111            && !event.event_type_str().starts_with(prefix)
1112        {
1113            return false;
1114        }
1115
1116        // Payload field filtering
1117        if let Some(ref filter_str) = request.payload_filter
1118            && let Ok(filter_obj) =
1119                serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(filter_str)
1120        {
1121            let payload = event.payload();
1122            for (key, expected_value) in &filter_obj {
1123                match payload.get(key) {
1124                    Some(actual_value) if actual_value == expected_value => {}
1125                    _ => return false,
1126                }
1127            }
1128        }
1129
1130        true
1131    }
1132
1133    /// Reconstruct entity state as of a specific timestamp
1134    /// v0.2: Now uses snapshots for fast reconstruction
1135    pub fn reconstruct_state(
1136        &self,
1137        entity_id: &str,
1138        as_of: Option<DateTime<Utc>>,
1139    ) -> Result<serde_json::Value> {
1140        // Try to find a snapshot to use as a base (v0.2 optimization)
1141        let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
1142            // Get snapshot closest to requested time
1143            if let Some(snapshot) = self
1144                .snapshot_manager
1145                .get_snapshot_as_of(entity_id, as_of_time)
1146            {
1147                tracing::debug!(
1148                    "Using snapshot from {} for entity {} (saved {} events)",
1149                    snapshot.as_of,
1150                    entity_id,
1151                    snapshot.event_count
1152                );
1153                (snapshot.state.clone(), Some(snapshot.as_of))
1154            } else {
1155                (serde_json::json!({}), None)
1156            }
1157        } else {
1158            // Get latest snapshot for current state
1159            if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
1160                tracing::debug!(
1161                    "Using latest snapshot from {} for entity {}",
1162                    snapshot.as_of,
1163                    entity_id
1164                );
1165                (snapshot.state.clone(), Some(snapshot.as_of))
1166            } else {
1167                (serde_json::json!({}), None)
1168            }
1169        };
1170
1171        // Query events after the snapshot (or all if no snapshot)
1172        let events = self.query(QueryEventsRequest {
1173            entity_id: Some(entity_id.to_string()),
1174            event_type: None,
1175            tenant_id: None,
1176            as_of,
1177            since: since_timestamp,
1178            until: None,
1179            limit: None,
1180            event_type_prefix: None,
1181            payload_filter: None,
1182        })?;
1183
1184        // If no events and no snapshot, entity not found
1185        if events.is_empty() && since_timestamp.is_none() {
1186            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1187        }
1188
1189        // Merge events on top of snapshot (or from scratch if no snapshot)
1190        let mut merged_state = merged_state;
1191        for event in &events {
1192            if let serde_json::Value::Object(ref mut state_map) = merged_state
1193                && let serde_json::Value::Object(ref payload_map) = event.payload
1194            {
1195                for (key, value) in payload_map {
1196                    state_map.insert(key.clone(), value.clone());
1197                }
1198            }
1199        }
1200
1201        // Wrap with metadata
1202        let state = serde_json::json!({
1203            "entity_id": entity_id,
1204            "last_updated": events.last().map(|e| e.timestamp),
1205            "event_count": events.len(),
1206            "as_of": as_of,
1207            "current_state": merged_state,
1208            "history": events.iter().map(|e| {
1209                serde_json::json!({
1210                    "event_id": e.id,
1211                    "type": e.event_type,
1212                    "timestamp": e.timestamp,
1213                    "payload": e.payload
1214                })
1215            }).collect::<Vec<_>>()
1216        });
1217
1218        Ok(state)
1219    }
1220
1221    /// Get snapshot from projection (faster than reconstructing)
1222    pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
1223        let projections = self.projections.read();
1224
1225        if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
1226            && let Some(state) = snapshot_projection.get_state(entity_id)
1227        {
1228            return Ok(serde_json::json!({
1229                "entity_id": entity_id,
1230                "snapshot": state,
1231                "from_projection": "entity_snapshots"
1232            }));
1233        }
1234
1235        Err(AllSourceError::EntityNotFound(entity_id.to_string()))
1236    }
1237
1238    /// Get statistics about the event store
1239    pub fn stats(&self) -> StoreStats {
1240        let events = self.events.read();
1241        let index_stats = self.index.stats();
1242
1243        StoreStats {
1244            total_events: events.len(),
1245            total_entities: index_stats.total_entities,
1246            total_event_types: index_stats.total_event_types,
1247            total_ingested: *self.total_ingested.read(),
1248        }
1249    }
1250
1251    /// Get all unique streams (entity_ids) in the store
1252    pub fn list_streams(&self) -> Vec<StreamInfo> {
1253        self.index
1254            .get_all_entities()
1255            .into_iter()
1256            .map(|entity_id| {
1257                let event_count = self
1258                    .index
1259                    .get_by_entity(&entity_id)
1260                    .map(|entries| entries.len())
1261                    .unwrap_or(0);
1262                let last_event_at = self
1263                    .index
1264                    .get_by_entity(&entity_id)
1265                    .and_then(|entries| entries.last().map(|e| e.timestamp));
1266                StreamInfo {
1267                    stream_id: entity_id,
1268                    event_count,
1269                    last_event_at,
1270                }
1271            })
1272            .collect()
1273    }
1274
1275    /// Get all unique event types in the store
1276    pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
1277        self.index
1278            .get_all_types()
1279            .into_iter()
1280            .map(|event_type| {
1281                let event_count = self
1282                    .index
1283                    .get_by_type(&event_type)
1284                    .map(|entries| entries.len())
1285                    .unwrap_or(0);
1286                let last_event_at = self
1287                    .index
1288                    .get_by_type(&event_type)
1289                    .and_then(|entries| entries.last().map(|e| e.timestamp));
1290                EventTypeInfo {
1291                    event_type,
1292                    event_count,
1293                    last_event_at,
1294                }
1295            })
1296            .collect()
1297    }
1298
1299    /// Attach a broadcast sender to the WAL for replication.
1300    ///
1301    /// Thread-safe: can be called through `Arc<EventStore>` at runtime.
1302    /// Used during initial setup and during follower → leader promotion.
1303    /// When set, every WAL append publishes the entry to the broadcast
1304    /// channel so the WAL shipper can stream it to followers.
1305    pub fn enable_wal_replication(
1306        &self,
1307        tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
1308    ) {
1309        if let Some(ref wal_arc) = self.wal {
1310            wal_arc.set_replication_tx(tx);
1311            tracing::info!("WAL replication broadcast enabled");
1312        } else {
1313            tracing::warn!("Cannot enable WAL replication: WAL is not configured");
1314        }
1315    }
1316
1317    /// Get a reference to the WAL (if configured).
1318    /// Used by the replication catch-up protocol to determine oldest available offset.
1319    pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1320        self.wal.as_ref()
1321    }
1322
1323    /// Get a reference to the Parquet storage (if configured).
1324    /// Used by the replication catch-up protocol to stream snapshot files to followers.
1325    pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1326        self.storage.as_ref()
1327    }
1328}
1329
1330/// Configuration for EventStore
1331#[derive(Debug, Clone, Default)]
1332pub struct EventStoreConfig {
1333    /// Optional directory for persistent Parquet storage (v0.2 feature)
1334    pub storage_dir: Option<PathBuf>,
1335
1336    /// Snapshot configuration (v0.2 feature)
1337    pub snapshot_config: SnapshotConfig,
1338
1339    /// Optional directory for WAL (Write-Ahead Log) (v0.2 feature)
1340    pub wal_dir: Option<PathBuf>,
1341
1342    /// WAL configuration (v0.2 feature)
1343    pub wal_config: WALConfig,
1344
1345    /// Compaction configuration (v0.2 feature)
1346    pub compaction_config: CompactionConfig,
1347
1348    /// Schema registry configuration (v0.5 feature)
1349    pub schema_registry_config: SchemaRegistryConfig,
1350
1351    /// Optional directory for system metadata storage (dogfood feature).
1352    /// When set, operational metadata (tenants, config, audit) is stored
1353    /// using AllSource's own event store rather than an external database.
1354    /// Defaults to `{storage_dir}/__system/` when storage_dir is set.
1355    pub system_data_dir: Option<PathBuf>,
1356
1357    /// Name of the default tenant to auto-create on first boot.
1358    pub bootstrap_tenant: Option<String>,
1359}
1360
1361impl EventStoreConfig {
1362    /// Create config with persistent storage enabled
1363    pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
1364        Self {
1365            storage_dir: Some(storage_dir.into()),
1366            ..Self::default()
1367        }
1368    }
1369
1370    /// Create config with custom snapshot settings
1371    pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
1372        Self {
1373            snapshot_config,
1374            ..Self::default()
1375        }
1376    }
1377
1378    /// Create config with WAL enabled
1379    pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
1380        Self {
1381            wal_dir: Some(wal_dir.into()),
1382            wal_config,
1383            ..Self::default()
1384        }
1385    }
1386
1387    /// Create config with both persistence and snapshots
1388    pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
1389        Self {
1390            storage_dir: Some(storage_dir.into()),
1391            snapshot_config,
1392            ..Self::default()
1393        }
1394    }
1395
1396    /// Create production config with all features enabled
1397    pub fn production(
1398        storage_dir: impl Into<PathBuf>,
1399        wal_dir: impl Into<PathBuf>,
1400        snapshot_config: SnapshotConfig,
1401        wal_config: WALConfig,
1402        compaction_config: CompactionConfig,
1403    ) -> Self {
1404        let storage_dir = storage_dir.into();
1405        let system_data_dir = storage_dir.join("__system");
1406        Self {
1407            storage_dir: Some(storage_dir),
1408            snapshot_config,
1409            wal_dir: Some(wal_dir.into()),
1410            wal_config,
1411            compaction_config,
1412            system_data_dir: Some(system_data_dir),
1413            ..Self::default()
1414        }
1415    }
1416
1417    /// Resolve the effective system data directory.
1418    ///
1419    /// If explicitly set, returns that. Otherwise, derives from storage_dir.
1420    /// Returns None if neither is configured (in-memory mode).
1421    pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
1422        self.system_data_dir
1423            .clone()
1424            .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
1425    }
1426
1427    /// Build config from environment variables.
1428    ///
1429    /// Reads `ALLSOURCE_DATA_DIR`, `ALLSOURCE_STORAGE_DIR`, `ALLSOURCE_WAL_DIR`,
1430    /// and `ALLSOURCE_WAL_ENABLED` to determine persistence mode.
1431    ///
1432    /// Returns `(config, description)` where description is a human-readable
1433    /// summary of the persistence mode for logging.
1434    pub fn from_env() -> (Self, &'static str) {
1435        Self::from_env_vars(
1436            std::env::var("ALLSOURCE_DATA_DIR")
1437                .ok()
1438                .filter(|s| !s.is_empty()),
1439            std::env::var("ALLSOURCE_STORAGE_DIR")
1440                .ok()
1441                .filter(|s| !s.is_empty()),
1442            std::env::var("ALLSOURCE_WAL_DIR")
1443                .ok()
1444                .filter(|s| !s.is_empty()),
1445            std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
1446        )
1447    }
1448
1449    /// Build config from explicit env-var values (testable without mutating process env).
1450    pub fn from_env_vars(
1451        data_dir: Option<String>,
1452        explicit_storage_dir: Option<String>,
1453        explicit_wal_dir: Option<String>,
1454        wal_enabled_var: Option<String>,
1455    ) -> (Self, &'static str) {
1456        let data_dir = data_dir.filter(|s| !s.is_empty());
1457        let storage_dir = explicit_storage_dir
1458            .filter(|s| !s.is_empty())
1459            .or_else(|| data_dir.as_ref().map(|d| format!("{}/storage", d)));
1460        let wal_dir = explicit_wal_dir
1461            .filter(|s| !s.is_empty())
1462            .or_else(|| data_dir.as_ref().map(|d| format!("{}/wal", d)));
1463        let wal_enabled = wal_enabled_var.map(|v| v == "true").unwrap_or(true);
1464
1465        match (&storage_dir, &wal_dir) {
1466            (Some(sd), Some(wd)) if wal_enabled => {
1467                let config = Self::production(
1468                    sd,
1469                    wd,
1470                    SnapshotConfig::default(),
1471                    WALConfig::default(),
1472                    CompactionConfig::default(),
1473                );
1474                (config, "wal+parquet")
1475            }
1476            (Some(sd), _) => {
1477                let config = Self::with_persistence(sd);
1478                (config, "parquet-only")
1479            }
1480            (_, Some(wd)) if wal_enabled => {
1481                let config = Self::with_wal(wd, WALConfig::default());
1482                (config, "wal-only")
1483            }
1484            _ => (Self::default(), "in-memory"),
1485        }
1486    }
1487}
1488
1489#[derive(Debug, serde::Serialize)]
1490pub struct StoreStats {
1491    pub total_events: usize,
1492    pub total_entities: usize,
1493    pub total_event_types: usize,
1494    pub total_ingested: u64,
1495}
1496
1497/// Information about a stream (entity_id)
1498#[derive(Debug, Clone, serde::Serialize)]
1499pub struct StreamInfo {
1500    /// The stream identifier (entity_id)
1501    pub stream_id: String,
1502    /// Total number of events in this stream
1503    pub event_count: usize,
1504    /// Timestamp of the last event in this stream
1505    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1506}
1507
1508/// Information about an event type
1509#[derive(Debug, Clone, serde::Serialize)]
1510pub struct EventTypeInfo {
1511    /// The event type name
1512    pub event_type: String,
1513    /// Total number of events of this type
1514    pub event_count: usize,
1515    /// Timestamp of the last event of this type
1516    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1517}
1518
1519impl Default for EventStore {
1520    fn default() -> Self {
1521        Self::new()
1522    }
1523}
1524
1525#[cfg(test)]
1526mod tests {
1527    use super::*;
1528    use crate::domain::entities::Event;
1529    use tempfile::TempDir;
1530
1531    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1532        Event::from_strings(
1533            event_type.to_string(),
1534            entity_id.to_string(),
1535            "default".to_string(),
1536            serde_json::json!({"name": "Test", "value": 42}),
1537            None,
1538        )
1539        .unwrap()
1540    }
1541
1542    fn create_test_event_with_payload(
1543        entity_id: &str,
1544        event_type: &str,
1545        payload: serde_json::Value,
1546    ) -> Event {
1547        Event::from_strings(
1548            event_type.to_string(),
1549            entity_id.to_string(),
1550            "default".to_string(),
1551            payload,
1552            None,
1553        )
1554        .unwrap()
1555    }
1556
1557    #[test]
1558    fn test_event_store_new() {
1559        let store = EventStore::new();
1560        assert_eq!(store.stats().total_events, 0);
1561        assert_eq!(store.stats().total_entities, 0);
1562    }
1563
1564    #[test]
1565    fn test_event_store_default() {
1566        let store = EventStore::default();
1567        assert_eq!(store.stats().total_events, 0);
1568    }
1569
1570    #[test]
1571    fn test_ingest_single_event() {
1572        let store = EventStore::new();
1573        let event = create_test_event("entity-1", "user.created");
1574
1575        store.ingest(event).unwrap();
1576
1577        assert_eq!(store.stats().total_events, 1);
1578        assert_eq!(store.stats().total_ingested, 1);
1579    }
1580
1581    #[test]
1582    fn test_ingest_multiple_events() {
1583        let store = EventStore::new();
1584
1585        for i in 0..10 {
1586            let event = create_test_event(&format!("entity-{}", i), "user.created");
1587            store.ingest(event).unwrap();
1588        }
1589
1590        assert_eq!(store.stats().total_events, 10);
1591        assert_eq!(store.stats().total_ingested, 10);
1592    }
1593
1594    #[test]
1595    fn test_query_by_entity_id() {
1596        let store = EventStore::new();
1597
1598        store
1599            .ingest(create_test_event("entity-1", "user.created"))
1600            .unwrap();
1601        store
1602            .ingest(create_test_event("entity-2", "user.created"))
1603            .unwrap();
1604        store
1605            .ingest(create_test_event("entity-1", "user.updated"))
1606            .unwrap();
1607
1608        let results = store
1609            .query(QueryEventsRequest {
1610                entity_id: Some("entity-1".to_string()),
1611                event_type: None,
1612                tenant_id: None,
1613                as_of: None,
1614                since: None,
1615                until: None,
1616                limit: None,
1617                event_type_prefix: None,
1618                payload_filter: None,
1619            })
1620            .unwrap();
1621
1622        assert_eq!(results.len(), 2);
1623    }
1624
1625    #[test]
1626    fn test_query_by_event_type() {
1627        let store = EventStore::new();
1628
1629        store
1630            .ingest(create_test_event("entity-1", "user.created"))
1631            .unwrap();
1632        store
1633            .ingest(create_test_event("entity-2", "user.updated"))
1634            .unwrap();
1635        store
1636            .ingest(create_test_event("entity-3", "user.created"))
1637            .unwrap();
1638
1639        let results = store
1640            .query(QueryEventsRequest {
1641                entity_id: None,
1642                event_type: Some("user.created".to_string()),
1643                tenant_id: None,
1644                as_of: None,
1645                since: None,
1646                until: None,
1647                limit: None,
1648                event_type_prefix: None,
1649                payload_filter: None,
1650            })
1651            .unwrap();
1652
1653        assert_eq!(results.len(), 2);
1654    }
1655
1656    #[test]
1657    fn test_query_with_limit() {
1658        let store = EventStore::new();
1659
1660        for i in 0..10 {
1661            let event = create_test_event(&format!("entity-{}", i), "user.created");
1662            store.ingest(event).unwrap();
1663        }
1664
1665        let results = store
1666            .query(QueryEventsRequest {
1667                entity_id: None,
1668                event_type: None,
1669                tenant_id: None,
1670                as_of: None,
1671                since: None,
1672                until: None,
1673                limit: Some(5),
1674                event_type_prefix: None,
1675                payload_filter: None,
1676            })
1677            .unwrap();
1678
1679        assert_eq!(results.len(), 5);
1680    }
1681
1682    #[test]
1683    fn test_query_empty_store() {
1684        let store = EventStore::new();
1685
1686        let results = store
1687            .query(QueryEventsRequest {
1688                entity_id: Some("non-existent".to_string()),
1689                event_type: None,
1690                tenant_id: None,
1691                as_of: None,
1692                since: None,
1693                until: None,
1694                limit: None,
1695                event_type_prefix: None,
1696                payload_filter: None,
1697            })
1698            .unwrap();
1699
1700        assert!(results.is_empty());
1701    }
1702
1703    #[test]
1704    fn test_reconstruct_state() {
1705        let store = EventStore::new();
1706
1707        store
1708            .ingest(create_test_event("entity-1", "user.created"))
1709            .unwrap();
1710
1711        let state = store.reconstruct_state("entity-1", None).unwrap();
1712        // The state is wrapped with metadata
1713        assert_eq!(state["current_state"]["name"], "Test");
1714        assert_eq!(state["current_state"]["value"], 42);
1715    }
1716
1717    #[test]
1718    fn test_reconstruct_state_not_found() {
1719        let store = EventStore::new();
1720
1721        let result = store.reconstruct_state("non-existent", None);
1722        assert!(result.is_err());
1723    }
1724
1725    #[test]
1726    fn test_get_snapshot_empty() {
1727        let store = EventStore::new();
1728
1729        let result = store.get_snapshot("non-existent");
1730        // Entity not found error is expected
1731        assert!(result.is_err());
1732    }
1733
1734    #[test]
1735    fn test_create_snapshot() {
1736        let store = EventStore::new();
1737
1738        store
1739            .ingest(create_test_event("entity-1", "user.created"))
1740            .unwrap();
1741
1742        store.create_snapshot("entity-1").unwrap();
1743
1744        // Verify snapshot was created
1745        let snapshot = store.get_snapshot("entity-1").unwrap();
1746        assert!(snapshot != serde_json::json!(null));
1747    }
1748
1749    #[test]
1750    fn test_create_snapshot_entity_not_found() {
1751        let store = EventStore::new();
1752
1753        let result = store.create_snapshot("non-existent");
1754        assert!(result.is_err());
1755    }
1756
1757    #[test]
1758    fn test_websocket_manager() {
1759        let store = EventStore::new();
1760        let manager = store.websocket_manager();
1761        // Manager should be accessible
1762        assert!(Arc::strong_count(&manager) >= 1);
1763    }
1764
1765    #[test]
1766    fn test_snapshot_manager() {
1767        let store = EventStore::new();
1768        let manager = store.snapshot_manager();
1769        assert!(Arc::strong_count(&manager) >= 1);
1770    }
1771
1772    #[test]
1773    fn test_compaction_manager_none() {
1774        let store = EventStore::new();
1775        // Without storage_dir, compaction manager should be None
1776        assert!(store.compaction_manager().is_none());
1777    }
1778
1779    #[test]
1780    fn test_schema_registry() {
1781        let store = EventStore::new();
1782        let registry = store.schema_registry();
1783        assert!(Arc::strong_count(&registry) >= 1);
1784    }
1785
1786    #[test]
1787    fn test_replay_manager() {
1788        let store = EventStore::new();
1789        let manager = store.replay_manager();
1790        assert!(Arc::strong_count(&manager) >= 1);
1791    }
1792
1793    #[test]
1794    fn test_pipeline_manager() {
1795        let store = EventStore::new();
1796        let manager = store.pipeline_manager();
1797        assert!(Arc::strong_count(&manager) >= 1);
1798    }
1799
1800    #[test]
1801    fn test_projection_manager() {
1802        let store = EventStore::new();
1803        let manager = store.projection_manager();
1804        // Built-in projections should be registered
1805        let projections = manager.list_projections();
1806        assert!(projections.len() >= 2); // entity_snapshots and event_counters
1807    }
1808
1809    #[test]
1810    fn test_projection_state_cache() {
1811        let store = EventStore::new();
1812        let cache = store.projection_state_cache();
1813
1814        cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
1815        assert_eq!(cache.len(), 1);
1816
1817        let value = cache.get("test:key").unwrap();
1818        assert_eq!(value["value"], 123);
1819    }
1820
1821    #[test]
1822    fn test_metrics() {
1823        let store = EventStore::new();
1824        let metrics = store.metrics();
1825        assert!(Arc::strong_count(&metrics) >= 1);
1826    }
1827
1828    #[test]
1829    fn test_store_stats() {
1830        let store = EventStore::new();
1831
1832        store
1833            .ingest(create_test_event("entity-1", "user.created"))
1834            .unwrap();
1835        store
1836            .ingest(create_test_event("entity-2", "order.placed"))
1837            .unwrap();
1838
1839        let stats = store.stats();
1840        assert_eq!(stats.total_events, 2);
1841        assert_eq!(stats.total_entities, 2);
1842        assert_eq!(stats.total_event_types, 2);
1843        assert_eq!(stats.total_ingested, 2);
1844    }
1845
1846    #[test]
1847    fn test_event_store_config_default() {
1848        let config = EventStoreConfig::default();
1849        assert!(config.storage_dir.is_none());
1850        assert!(config.wal_dir.is_none());
1851    }
1852
1853    #[test]
1854    fn test_event_store_config_with_persistence() {
1855        let temp_dir = TempDir::new().unwrap();
1856        let config = EventStoreConfig::with_persistence(temp_dir.path());
1857
1858        assert!(config.storage_dir.is_some());
1859        assert!(config.wal_dir.is_none());
1860    }
1861
1862    #[test]
1863    fn test_event_store_config_with_wal() {
1864        let temp_dir = TempDir::new().unwrap();
1865        let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
1866
1867        assert!(config.storage_dir.is_none());
1868        assert!(config.wal_dir.is_some());
1869    }
1870
1871    #[test]
1872    fn test_event_store_config_with_all() {
1873        let temp_dir = TempDir::new().unwrap();
1874        let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
1875
1876        assert!(config.storage_dir.is_some());
1877    }
1878
1879    #[test]
1880    fn test_event_store_config_production() {
1881        let storage_dir = TempDir::new().unwrap();
1882        let wal_dir = TempDir::new().unwrap();
1883        let config = EventStoreConfig::production(
1884            storage_dir.path(),
1885            wal_dir.path(),
1886            SnapshotConfig::default(),
1887            WALConfig::default(),
1888            CompactionConfig::default(),
1889        );
1890
1891        assert!(config.storage_dir.is_some());
1892        assert!(config.wal_dir.is_some());
1893    }
1894
1895    // -----------------------------------------------------------------------
1896    // from_env_vars tests — verifies the env-var-to-config wiring that
1897    // caused the durability bug (events lost on restart) in v0.10.3.
1898    // -----------------------------------------------------------------------
1899
1900    #[test]
1901    fn test_from_env_vars_data_dir_enables_full_persistence() {
1902        let (config, mode) =
1903            EventStoreConfig::from_env_vars(Some("/app/data".to_string()), None, None, None);
1904        assert_eq!(mode, "wal+parquet");
1905        assert_eq!(
1906            config.storage_dir.unwrap().to_str().unwrap(),
1907            "/app/data/storage"
1908        );
1909        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
1910    }
1911
1912    #[test]
1913    fn test_from_env_vars_explicit_dirs() {
1914        let (config, mode) = EventStoreConfig::from_env_vars(
1915            None,
1916            Some("/custom/storage".to_string()),
1917            Some("/custom/wal".to_string()),
1918            None,
1919        );
1920        assert_eq!(mode, "wal+parquet");
1921        assert_eq!(
1922            config.storage_dir.unwrap().to_str().unwrap(),
1923            "/custom/storage"
1924        );
1925        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
1926    }
1927
1928    #[test]
1929    fn test_from_env_vars_wal_disabled() {
1930        let (config, mode) = EventStoreConfig::from_env_vars(
1931            Some("/app/data".to_string()),
1932            None,
1933            None,
1934            Some("false".to_string()),
1935        );
1936        assert_eq!(mode, "parquet-only");
1937        assert!(config.storage_dir.is_some());
1938        assert!(config.wal_dir.is_none());
1939    }
1940
1941    #[test]
1942    fn test_from_env_vars_no_dirs_is_in_memory() {
1943        let (config, mode) = EventStoreConfig::from_env_vars(None, None, None, None);
1944        assert_eq!(mode, "in-memory");
1945        assert!(config.storage_dir.is_none());
1946        assert!(config.wal_dir.is_none());
1947    }
1948
1949    #[test]
1950    fn test_from_env_vars_empty_strings_treated_as_none() {
1951        let (_, mode) = EventStoreConfig::from_env_vars(
1952            Some("".to_string()),
1953            Some("".to_string()),
1954            Some("".to_string()),
1955            None,
1956        );
1957        assert_eq!(mode, "in-memory");
1958    }
1959
1960    #[test]
1961    fn test_from_env_vars_explicit_overrides_data_dir() {
1962        let (config, mode) = EventStoreConfig::from_env_vars(
1963            Some("/app/data".to_string()),
1964            Some("/override/storage".to_string()),
1965            Some("/override/wal".to_string()),
1966            None,
1967        );
1968        assert_eq!(mode, "wal+parquet");
1969        assert_eq!(
1970            config.storage_dir.unwrap().to_str().unwrap(),
1971            "/override/storage"
1972        );
1973        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
1974    }
1975
1976    #[test]
1977    fn test_from_env_vars_wal_only() {
1978        let (config, mode) =
1979            EventStoreConfig::from_env_vars(None, None, Some("/wal/only".to_string()), None);
1980        assert_eq!(mode, "wal-only");
1981        assert!(config.storage_dir.is_none());
1982        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
1983    }
1984
1985    #[test]
1986    fn test_store_stats_serde() {
1987        let stats = StoreStats {
1988            total_events: 100,
1989            total_entities: 50,
1990            total_event_types: 10,
1991            total_ingested: 100,
1992        };
1993
1994        let json = serde_json::to_string(&stats).unwrap();
1995        assert!(json.contains("\"total_events\":100"));
1996        assert!(json.contains("\"total_entities\":50"));
1997    }
1998
1999    #[test]
2000    fn test_query_with_entity_and_type() {
2001        let store = EventStore::new();
2002
2003        store
2004            .ingest(create_test_event("entity-1", "user.created"))
2005            .unwrap();
2006        store
2007            .ingest(create_test_event("entity-1", "user.updated"))
2008            .unwrap();
2009        store
2010            .ingest(create_test_event("entity-2", "user.created"))
2011            .unwrap();
2012
2013        let results = store
2014            .query(QueryEventsRequest {
2015                entity_id: Some("entity-1".to_string()),
2016                event_type: Some("user.created".to_string()),
2017                tenant_id: None,
2018                as_of: None,
2019                since: None,
2020                until: None,
2021                limit: None,
2022                event_type_prefix: None,
2023                payload_filter: None,
2024            })
2025            .unwrap();
2026
2027        assert_eq!(results.len(), 1);
2028        assert_eq!(results[0].event_type_str(), "user.created");
2029    }
2030
2031    #[test]
2032    fn test_query_by_event_type_prefix() {
2033        let store = EventStore::new();
2034
2035        // Ingest events with various types
2036        store
2037            .ingest(create_test_event("entity-1", "index.created"))
2038            .unwrap();
2039        store
2040            .ingest(create_test_event("entity-2", "index.updated"))
2041            .unwrap();
2042        store
2043            .ingest(create_test_event("entity-3", "trade.created"))
2044            .unwrap();
2045        store
2046            .ingest(create_test_event("entity-4", "trade.completed"))
2047            .unwrap();
2048        store
2049            .ingest(create_test_event("entity-5", "balance.updated"))
2050            .unwrap();
2051
2052        // Query with prefix "index." should return exactly 2
2053        let results = store
2054            .query(QueryEventsRequest {
2055                entity_id: None,
2056                event_type: None,
2057                tenant_id: None,
2058                as_of: None,
2059                since: None,
2060                until: None,
2061                limit: None,
2062                event_type_prefix: Some("index.".to_string()),
2063                payload_filter: None,
2064            })
2065            .unwrap();
2066
2067        assert_eq!(results.len(), 2);
2068        assert!(
2069            results
2070                .iter()
2071                .all(|e| e.event_type_str().starts_with("index."))
2072        );
2073    }
2074
2075    #[test]
2076    fn test_query_by_event_type_prefix_empty_returns_all() {
2077        let store = EventStore::new();
2078
2079        store
2080            .ingest(create_test_event("entity-1", "index.created"))
2081            .unwrap();
2082        store
2083            .ingest(create_test_event("entity-2", "trade.created"))
2084            .unwrap();
2085
2086        // Empty prefix matches all types
2087        let results = store
2088            .query(QueryEventsRequest {
2089                entity_id: None,
2090                event_type: None,
2091                tenant_id: None,
2092                as_of: None,
2093                since: None,
2094                until: None,
2095                limit: None,
2096                event_type_prefix: Some("".to_string()),
2097                payload_filter: None,
2098            })
2099            .unwrap();
2100
2101        assert_eq!(results.len(), 2);
2102    }
2103
2104    #[test]
2105    fn test_query_by_event_type_prefix_no_match() {
2106        let store = EventStore::new();
2107
2108        store
2109            .ingest(create_test_event("entity-1", "index.created"))
2110            .unwrap();
2111
2112        let results = store
2113            .query(QueryEventsRequest {
2114                entity_id: None,
2115                event_type: None,
2116                tenant_id: None,
2117                as_of: None,
2118                since: None,
2119                until: None,
2120                limit: None,
2121                event_type_prefix: Some("nonexistent.".to_string()),
2122                payload_filter: None,
2123            })
2124            .unwrap();
2125
2126        assert!(results.is_empty());
2127    }
2128
2129    #[test]
2130    fn test_query_by_entity_with_type_prefix() {
2131        let store = EventStore::new();
2132
2133        store
2134            .ingest(create_test_event("entity-1", "index.created"))
2135            .unwrap();
2136        store
2137            .ingest(create_test_event("entity-1", "trade.created"))
2138            .unwrap();
2139        store
2140            .ingest(create_test_event("entity-2", "index.updated"))
2141            .unwrap();
2142
2143        // Query entity-1 with prefix "index." should return 1
2144        let results = store
2145            .query(QueryEventsRequest {
2146                entity_id: Some("entity-1".to_string()),
2147                event_type: None,
2148                tenant_id: None,
2149                as_of: None,
2150                since: None,
2151                until: None,
2152                limit: None,
2153                event_type_prefix: Some("index.".to_string()),
2154                payload_filter: None,
2155            })
2156            .unwrap();
2157
2158        assert_eq!(results.len(), 1);
2159        assert_eq!(results[0].event_type_str(), "index.created");
2160    }
2161
2162    #[test]
2163    fn test_query_prefix_with_limit() {
2164        let store = EventStore::new();
2165
2166        for i in 0..5 {
2167            store
2168                .ingest(create_test_event(&format!("entity-{}", i), "index.created"))
2169                .unwrap();
2170        }
2171
2172        let results = store
2173            .query(QueryEventsRequest {
2174                entity_id: None,
2175                event_type: None,
2176                tenant_id: None,
2177                as_of: None,
2178                since: None,
2179                until: None,
2180                limit: Some(3),
2181                event_type_prefix: Some("index.".to_string()),
2182                payload_filter: None,
2183            })
2184            .unwrap();
2185
2186        assert_eq!(results.len(), 3);
2187    }
2188
2189    #[test]
2190    fn test_query_prefix_alongside_existing_filters() {
2191        let store = EventStore::new();
2192
2193        store
2194            .ingest(create_test_event("entity-1", "index.created"))
2195            .unwrap();
2196        // Sleep briefly to ensure different timestamps
2197        std::thread::sleep(std::time::Duration::from_millis(10));
2198        store
2199            .ingest(create_test_event("entity-2", "index.strategy.updated"))
2200            .unwrap();
2201        std::thread::sleep(std::time::Duration::from_millis(10));
2202        store
2203            .ingest(create_test_event("entity-3", "index.deleted"))
2204            .unwrap();
2205
2206        // Prefix with limit
2207        let results = store
2208            .query(QueryEventsRequest {
2209                entity_id: None,
2210                event_type: None,
2211                tenant_id: None,
2212                as_of: None,
2213                since: None,
2214                until: None,
2215                limit: Some(2),
2216                event_type_prefix: Some("index.".to_string()),
2217                payload_filter: None,
2218            })
2219            .unwrap();
2220
2221        assert_eq!(results.len(), 2);
2222    }
2223
2224    #[test]
2225    fn test_query_with_payload_filter() {
2226        let store = EventStore::new();
2227
2228        // Ingest 5 events with user_id=alice
2229        for i in 0..5 {
2230            store
2231                .ingest(create_test_event_with_payload(
2232                    &format!("entity-{}", i),
2233                    "user.action",
2234                    serde_json::json!({"user_id": "alice", "action": "click"}),
2235                ))
2236                .unwrap();
2237        }
2238        // Ingest 5 events with user_id=bob
2239        for i in 5..10 {
2240            store
2241                .ingest(create_test_event_with_payload(
2242                    &format!("entity-{}", i),
2243                    "user.action",
2244                    serde_json::json!({"user_id": "bob", "action": "view"}),
2245                ))
2246                .unwrap();
2247        }
2248
2249        // Filter for alice
2250        let results = store
2251            .query(QueryEventsRequest {
2252                entity_id: None,
2253                event_type: Some("user.action".to_string()),
2254                tenant_id: None,
2255                as_of: None,
2256                since: None,
2257                until: None,
2258                limit: None,
2259                event_type_prefix: None,
2260                payload_filter: Some(r#"{"user_id":"alice"}"#.to_string()),
2261            })
2262            .unwrap();
2263
2264        assert_eq!(results.len(), 5);
2265    }
2266
2267    #[test]
2268    fn test_query_payload_filter_non_existent_field() {
2269        let store = EventStore::new();
2270
2271        store
2272            .ingest(create_test_event_with_payload(
2273                "entity-1",
2274                "user.action",
2275                serde_json::json!({"user_id": "alice"}),
2276            ))
2277            .unwrap();
2278
2279        // Filter for a field that doesn't exist — returns 0, not error
2280        let results = store
2281            .query(QueryEventsRequest {
2282                entity_id: None,
2283                event_type: None,
2284                tenant_id: None,
2285                as_of: None,
2286                since: None,
2287                until: None,
2288                limit: None,
2289                event_type_prefix: None,
2290                payload_filter: Some(r#"{"nonexistent":"value"}"#.to_string()),
2291            })
2292            .unwrap();
2293
2294        assert!(results.is_empty());
2295    }
2296
2297    #[test]
2298    fn test_query_payload_filter_with_prefix() {
2299        let store = EventStore::new();
2300
2301        store
2302            .ingest(create_test_event_with_payload(
2303                "entity-1",
2304                "index.created",
2305                serde_json::json!({"status": "active"}),
2306            ))
2307            .unwrap();
2308        store
2309            .ingest(create_test_event_with_payload(
2310                "entity-2",
2311                "index.created",
2312                serde_json::json!({"status": "inactive"}),
2313            ))
2314            .unwrap();
2315        store
2316            .ingest(create_test_event_with_payload(
2317                "entity-3",
2318                "trade.created",
2319                serde_json::json!({"status": "active"}),
2320            ))
2321            .unwrap();
2322
2323        // Combine prefix + payload filter
2324        let results = store
2325            .query(QueryEventsRequest {
2326                entity_id: None,
2327                event_type: None,
2328                tenant_id: None,
2329                as_of: None,
2330                since: None,
2331                until: None,
2332                limit: None,
2333                event_type_prefix: Some("index.".to_string()),
2334                payload_filter: Some(r#"{"status":"active"}"#.to_string()),
2335            })
2336            .unwrap();
2337
2338        assert_eq!(results.len(), 1);
2339        assert_eq!(results[0].entity_id().to_string(), "entity-1");
2340    }
2341
2342    #[test]
2343    fn test_flush_storage_no_storage() {
2344        let store = EventStore::new();
2345        // Without storage, flush should succeed (no-op)
2346        let result = store.flush_storage();
2347        assert!(result.is_ok());
2348    }
2349
2350    #[test]
2351    fn test_state_evolution() {
2352        let store = EventStore::new();
2353
2354        // Initial state
2355        store
2356            .ingest(
2357                Event::from_strings(
2358                    "user.created".to_string(),
2359                    "user-1".to_string(),
2360                    "default".to_string(),
2361                    serde_json::json!({"name": "Alice", "age": 25}),
2362                    None,
2363                )
2364                .unwrap(),
2365            )
2366            .unwrap();
2367
2368        // Update state
2369        store
2370            .ingest(
2371                Event::from_strings(
2372                    "user.updated".to_string(),
2373                    "user-1".to_string(),
2374                    "default".to_string(),
2375                    serde_json::json!({"age": 26}),
2376                    None,
2377                )
2378                .unwrap(),
2379            )
2380            .unwrap();
2381
2382        let state = store.reconstruct_state("user-1", None).unwrap();
2383        // The state is wrapped with metadata
2384        assert_eq!(state["current_state"]["name"], "Alice");
2385        assert_eq!(state["current_state"]["age"], 26);
2386    }
2387
2388    #[test]
2389    fn test_reject_system_event_types() {
2390        let store = EventStore::new();
2391
2392        // System event types should be rejected via user-facing ingestion
2393        let event = Event::reconstruct_from_strings(
2394            uuid::Uuid::new_v4(),
2395            "_system.tenant.created".to_string(),
2396            "_system:tenant:acme".to_string(),
2397            "_system".to_string(),
2398            serde_json::json!({"name": "ACME"}),
2399            chrono::Utc::now(),
2400            None,
2401            1,
2402        );
2403
2404        let result = store.ingest(event);
2405        assert!(result.is_err());
2406        let err = result.unwrap_err();
2407        assert!(
2408            err.to_string().contains("reserved for internal use"),
2409            "Expected system namespace rejection, got: {}",
2410            err
2411        );
2412    }
2413
2414    // -----------------------------------------------------------------------
2415    // Crash recovery: WAL events survive restart via Parquet checkpoint.
2416    // Regression test for GitHub issue #84 — flush_storage() was a no-op
2417    // during recovery because events were never buffered into Parquet's
2418    // current_batch before flushing.
2419    // -----------------------------------------------------------------------
2420
2421    #[test]
2422    fn test_wal_recovery_checkpoints_to_parquet() {
2423        let data_dir = TempDir::new().unwrap();
2424        let storage_dir = data_dir.path().join("storage");
2425        let wal_dir = data_dir.path().join("wal");
2426
2427        // Session 1: ingest events with WAL + Parquet
2428        {
2429            let config = EventStoreConfig::production(
2430                &storage_dir,
2431                &wal_dir,
2432                SnapshotConfig::default(),
2433                WALConfig {
2434                    sync_on_write: true,
2435                    ..WALConfig::default()
2436                },
2437                CompactionConfig::default(),
2438            );
2439            let store = EventStore::with_config(config);
2440
2441            for i in 0..5 {
2442                let event = Event::from_strings(
2443                    "test.created".to_string(),
2444                    format!("entity-{}", i),
2445                    "default".to_string(),
2446                    serde_json::json!({"index": i}),
2447                    None,
2448                )
2449                .unwrap();
2450                store.ingest(event).unwrap();
2451            }
2452
2453            assert_eq!(store.stats().total_events, 5);
2454
2455            // Do NOT call flush_storage or shutdown — simulate a crash.
2456            // Events are in WAL (sync_on_write: true) but NOT in Parquet.
2457        }
2458
2459        // Verify WAL file has data
2460        let wal_files: Vec<_> = std::fs::read_dir(&wal_dir)
2461            .unwrap()
2462            .filter_map(|e| e.ok())
2463            .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
2464            .collect();
2465        assert!(!wal_files.is_empty(), "WAL file should exist");
2466        let wal_size = wal_files[0].metadata().unwrap().len();
2467        assert!(wal_size > 0, "WAL file should have data (got 0 bytes)");
2468
2469        // Session 2: reopen — recovery should checkpoint WAL to Parquet, then truncate
2470        {
2471            let config = EventStoreConfig::production(
2472                &storage_dir,
2473                &wal_dir,
2474                SnapshotConfig::default(),
2475                WALConfig {
2476                    sync_on_write: true,
2477                    ..WALConfig::default()
2478                },
2479                CompactionConfig::default(),
2480            );
2481            let store = EventStore::with_config(config);
2482
2483            // Events should be recovered
2484            assert_eq!(
2485                store.stats().total_events,
2486                5,
2487                "Session 2 should have all 5 events after WAL recovery"
2488            );
2489
2490            // Parquet should now have files (checkpoint happened)
2491            let parquet_files: Vec<_> = std::fs::read_dir(&storage_dir)
2492                .unwrap()
2493                .filter_map(|e| e.ok())
2494                .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2495                .collect();
2496            assert!(
2497                !parquet_files.is_empty(),
2498                "Parquet file should exist after WAL checkpoint"
2499            );
2500        }
2501
2502        // Session 3: reopen again — events should load from Parquet (WAL was truncated)
2503        {
2504            let config = EventStoreConfig::production(
2505                &storage_dir,
2506                &wal_dir,
2507                SnapshotConfig::default(),
2508                WALConfig {
2509                    sync_on_write: true,
2510                    ..WALConfig::default()
2511                },
2512                CompactionConfig::default(),
2513            );
2514            let store = EventStore::with_config(config);
2515
2516            assert_eq!(
2517                store.stats().total_events,
2518                5,
2519                "Session 3 should still have all 5 events from Parquet"
2520            );
2521        }
2522    }
2523}