Skip to main content

allsource_core/
store.rs

1#[cfg(feature = "server")]
2use crate::application::services::webhook::WebhookRegistry;
3#[cfg(feature = "server")]
4use crate::infrastructure::observability::metrics::MetricsRegistry;
5#[cfg(feature = "server")]
6use crate::infrastructure::web::websocket::WebSocketManager;
7use crate::{
8    application::{
9        dto::QueryEventsRequest,
10        services::{
11            consumer::ConsumerRegistry,
12            exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
13            pipeline::PipelineManager,
14            projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
15            replay::ReplayManager,
16            schema::{SchemaRegistry, SchemaRegistryConfig},
17            schema_evolution::SchemaEvolutionManager,
18        },
19    },
20    domain::entities::Event,
21    error::{AllSourceError, Result},
22    infrastructure::{
23        persistence::{
24            compaction::{CompactionConfig, CompactionManager},
25            index::{EventIndex, IndexEntry},
26            snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
27            storage::ParquetStorage,
28            wal::{WALConfig, WriteAheadLog},
29        },
30        query::geospatial::GeoIndex,
31    },
32};
33use chrono::{DateTime, Utc};
34use dashmap::DashMap;
35use parking_lot::RwLock;
36use std::{path::PathBuf, sync::Arc};
37#[cfg(feature = "server")]
38use tokio::sync::mpsc;
39
40/// High-performance event store with columnar storage
41pub struct EventStore {
42    /// In-memory event storage
43    events: Arc<RwLock<Vec<Event>>>,
44
45    /// High-performance concurrent index
46    index: Arc<EventIndex>,
47
48    /// Projection manager for real-time aggregations
49    pub(crate) projections: Arc<RwLock<ProjectionManager>>,
50
51    /// Optional persistent storage (v0.2 feature)
52    storage: Option<Arc<RwLock<ParquetStorage>>>,
53
54    /// WebSocket manager for real-time event streaming (v0.2 feature)
55    #[cfg(feature = "server")]
56    websocket_manager: Arc<WebSocketManager>,
57
58    /// Snapshot manager for fast state recovery (v0.2 feature)
59    snapshot_manager: Arc<SnapshotManager>,
60
61    /// Write-Ahead Log for durability (v0.2 feature)
62    wal: Option<Arc<WriteAheadLog>>,
63
64    /// Compaction manager for Parquet optimization (v0.2 feature)
65    compaction_manager: Option<Arc<CompactionManager>>,
66
67    /// Schema registry for event validation (v0.5 feature)
68    schema_registry: Arc<SchemaRegistry>,
69
70    /// Replay manager for event replay and projection rebuilding (v0.5 feature)
71    replay_manager: Arc<ReplayManager>,
72
73    /// Pipeline manager for stream processing (v0.5 feature)
74    pipeline_manager: Arc<PipelineManager>,
75
76    /// Prometheus metrics registry (v0.6 feature)
77    #[cfg(feature = "server")]
78    metrics: Arc<MetricsRegistry>,
79
80    /// Total events ingested (for metrics)
81    total_ingested: Arc<RwLock<u64>>,
82
83    /// Projection state cache for Query Service integration (v0.7 feature)
84    /// Key format: "{projection_name}:{entity_id}"
85    /// This DashMap provides O(1) access with ~11.9 μs latency
86    projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
87
88    /// Projection status overrides (v0.13 feature)
89    /// Tracks pause/start state per projection name: "running" or "paused"
90    projection_status: Arc<DashMap<String, String>>,
91
92    /// Webhook registry for outbound event delivery (v0.11 feature)
93    #[cfg(feature = "server")]
94    webhook_registry: Arc<WebhookRegistry>,
95
96    /// Channel sender for async webhook delivery tasks
97    #[cfg(feature = "server")]
98    webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
99
100    /// Geospatial index for coordinate-based queries (v2.0 feature)
101    geo_index: Arc<GeoIndex>,
102
103    /// Exactly-once processing registry (v2.0 feature)
104    exactly_once: Arc<ExactlyOnceRegistry>,
105
106    /// Autonomous schema evolution manager (v2.0 feature)
107    schema_evolution: Arc<SchemaEvolutionManager>,
108
109    /// Per-entity version counters for optimistic concurrency control (v0.14 feature)
110    /// Key: entity_id string, Value: monotonic version (number of events for that entity)
111    entity_versions: Arc<DashMap<String, u64>>,
112
113    /// Durable consumer registry for subscription cursor tracking (v0.14 feature)
114    consumer_registry: Arc<ConsumerRegistry>,
115}
116
117/// A task queued for async webhook delivery
118#[cfg(feature = "server")]
119#[derive(Debug, Clone)]
120pub struct WebhookDeliveryTask {
121    pub webhook: crate::application::services::webhook::WebhookSubscription,
122    pub event: Event,
123}
124
125impl EventStore {
126    /// Create a new in-memory event store
127    pub fn new() -> Self {
128        Self::with_config(EventStoreConfig::default())
129    }
130
131    /// Create event store with custom configuration
132    pub fn with_config(config: EventStoreConfig) -> Self {
133        let mut projections = ProjectionManager::new();
134
135        // Register built-in projections
136        projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
137        projections.register(Arc::new(EventCounterProjection::new("event_counters")));
138
139        // Initialize persistent storage if configured
140        let storage = config
141            .storage_dir
142            .as_ref()
143            .and_then(|dir| match ParquetStorage::new(dir) {
144                Ok(storage) => {
145                    tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
146                    Some(Arc::new(RwLock::new(storage)))
147                }
148                Err(e) => {
149                    tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
150                    None
151                }
152            });
153
154        // Initialize WAL if configured (v0.2 feature)
155        let wal = config.wal_dir.as_ref().and_then(|dir| {
156            match WriteAheadLog::new(dir, config.wal_config.clone()) {
157                Ok(wal) => {
158                    tracing::info!("✅ WAL enabled at: {}", dir.display());
159                    Some(Arc::new(wal))
160                }
161                Err(e) => {
162                    tracing::error!("❌ Failed to initialize WAL: {}", e);
163                    None
164                }
165            }
166        });
167
168        // Initialize compaction manager if Parquet storage is enabled (v0.2 feature)
169        let compaction_manager = config.storage_dir.as_ref().map(|dir| {
170            let manager = CompactionManager::new(dir, config.compaction_config.clone());
171            Arc::new(manager)
172        });
173
174        // Initialize schema registry (v0.5 feature)
175        let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
176        tracing::info!("✅ Schema registry enabled");
177
178        // Initialize replay manager (v0.5 feature)
179        let replay_manager = Arc::new(ReplayManager::new());
180        tracing::info!("✅ Replay manager enabled");
181
182        // Initialize pipeline manager (v0.5 feature)
183        let pipeline_manager = Arc::new(PipelineManager::new());
184        tracing::info!("✅ Pipeline manager enabled");
185
186        // Initialize metrics registry (v0.6 feature)
187        #[cfg(feature = "server")]
188        let metrics = {
189            let m = MetricsRegistry::new();
190            tracing::info!("✅ Prometheus metrics registry initialized");
191            m
192        };
193
194        // Initialize projection state cache (v0.7 feature)
195        let projection_state_cache = Arc::new(DashMap::new());
196        tracing::info!("✅ Projection state cache initialized");
197
198        // Initialize webhook registry (v0.11 feature)
199        #[cfg(feature = "server")]
200        let webhook_registry = {
201            let w = Arc::new(WebhookRegistry::new());
202            tracing::info!("✅ Webhook registry initialized");
203            w
204        };
205
206        let store = Self {
207            events: Arc::new(RwLock::new(Vec::new())),
208            index: Arc::new(EventIndex::new()),
209            projections: Arc::new(RwLock::new(projections)),
210            storage,
211            #[cfg(feature = "server")]
212            websocket_manager: Arc::new(WebSocketManager::new()),
213            snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
214            wal,
215            compaction_manager,
216            schema_registry,
217            replay_manager,
218            pipeline_manager,
219            #[cfg(feature = "server")]
220            metrics,
221            total_ingested: Arc::new(RwLock::new(0)),
222            projection_state_cache,
223            projection_status: Arc::new(DashMap::new()),
224            #[cfg(feature = "server")]
225            webhook_registry,
226            #[cfg(feature = "server")]
227            webhook_tx: Arc::new(RwLock::new(None)),
228            geo_index: Arc::new(GeoIndex::new()),
229            exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
230            schema_evolution: Arc::new(SchemaEvolutionManager::new()),
231            entity_versions: Arc::new(DashMap::new()),
232            consumer_registry: Arc::new(ConsumerRegistry::new()),
233        };
234
235        // Step 1: Load persisted events from Parquet (the durable baseline)
236        if let Some(ref storage) = store.storage {
237            match storage.read().load_all_events() {
238                Err(e) => {
239                    tracing::error!(
240                        "❌ Failed to load events from Parquet — data exists on disk but \
241                         could not be read. This means events are NOT available until the \
242                         issue is resolved. Error: {e}"
243                    );
244                }
245                Ok(persisted_events) if persisted_events.is_empty() => {
246                    tracing::info!("📂 No persisted events found in Parquet storage");
247                }
248                Ok(persisted_events) => {
249                    tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
250
251                    for event in persisted_events {
252                        let offset = store.events.read().len();
253                        if let Err(e) = store.index.index_event(
254                            event.id,
255                            event.entity_id_str(),
256                            event.event_type_str(),
257                            event.timestamp,
258                            offset,
259                        ) {
260                            tracing::error!("Failed to re-index event {}: {}", event.id, e);
261                        }
262
263                        if let Err(e) = store.projections.read().process_event(&event) {
264                            tracing::error!("Failed to re-process event {}: {}", event.id, e);
265                        }
266
267                        // Track per-entity version
268                        *store
269                            .entity_versions
270                            .entry(event.entity_id_str().to_string())
271                            .or_insert(0) += 1;
272
273                        store.events.write().push(event);
274                    }
275
276                    let total = store.events.read().len();
277                    *store.total_ingested.write() = total as u64;
278                    tracing::info!("✅ Successfully loaded {} events from storage", total);
279                }
280            }
281        }
282
283        // Step 2: Recover WAL events (written after last Parquet checkpoint)
284        if let Some(ref wal) = store.wal {
285            match wal.recover() {
286                Ok(recovered_events) if !recovered_events.is_empty() => {
287                    // Collect IDs already loaded from Parquet to skip duplicates
288                    let existing_ids: std::collections::HashSet<uuid::Uuid> =
289                        store.events.read().iter().map(|e| e.id).collect();
290
291                    let mut wal_new = 0usize;
292                    for event in recovered_events {
293                        if existing_ids.contains(&event.id) {
294                            continue; // already loaded from Parquet
295                        }
296
297                        let offset = store.events.read().len();
298                        if let Err(e) = store.index.index_event(
299                            event.id,
300                            event.entity_id_str(),
301                            event.event_type_str(),
302                            event.timestamp,
303                            offset,
304                        ) {
305                            tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
306                        }
307
308                        if let Err(e) = store.projections.read().process_event(&event) {
309                            tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
310                        }
311
312                        // Track per-entity version
313                        *store
314                            .entity_versions
315                            .entry(event.entity_id_str().to_string())
316                            .or_insert(0) += 1;
317
318                        store.events.write().push(event);
319                        wal_new += 1;
320                    }
321
322                    if wal_new > 0 {
323                        let total = store.events.read().len();
324                        *store.total_ingested.write() = total as u64;
325                        tracing::info!(
326                            "✅ Recovered {} new events from WAL ({} total)",
327                            wal_new,
328                            total
329                        );
330
331                        // Checkpoint WAL events to Parquet — buffer them into
332                        // the Parquet batch first, then flush. Without this,
333                        // flush_storage() finds an empty current_batch and
334                        // silently no-ops, then we truncate the WAL and the
335                        // events exist only in memory (lost on next restart).
336                        if let Some(ref storage) = store.storage {
337                            tracing::info!(
338                                "📸 Checkpointing {} WAL events to Parquet storage...",
339                                wal_new
340                            );
341                            let parquet = storage.read();
342                            let events = store.events.read();
343                            let mut buffered = 0usize;
344                            for event in events.iter().skip(events.len() - wal_new) {
345                                if let Err(e) = parquet.append_event(event.clone()) {
346                                    tracing::error!(
347                                        "Failed to buffer WAL event for Parquet: {}",
348                                        e
349                                    );
350                                } else {
351                                    buffered += 1;
352                                }
353                            }
354                            drop(events);
355                            drop(parquet);
356
357                            if buffered > 0 {
358                                if let Err(e) = store.flush_storage() {
359                                    tracing::error!("Failed to checkpoint to Parquet: {}", e);
360                                } else if let Err(e) = wal.truncate() {
361                                    tracing::error!(
362                                        "Failed to truncate WAL after checkpoint: {}",
363                                        e
364                                    );
365                                } else {
366                                    tracing::info!(
367                                        "✅ WAL checkpointed and truncated ({} events)",
368                                        buffered
369                                    );
370                                }
371                            }
372                        }
373                    }
374                }
375                Ok(_) => {
376                    tracing::debug!("No events to recover from WAL");
377                }
378                Err(e) => {
379                    tracing::error!("❌ WAL recovery failed: {}", e);
380                }
381            }
382        }
383
384        store
385    }
386
387    /// Ingest a new event with optional optimistic concurrency check.
388    ///
389    /// If `expected_version` is `Some(v)`, the write is rejected with
390    /// `VersionConflict` unless the entity's current version equals `v`.
391    /// The version check and WAL append are atomic (locked together).
392    ///
393    /// Returns the new entity version after the append.
394    #[cfg_attr(feature = "hotpath", hotpath::measure)]
395    pub fn ingest_with_expected_version(
396        &self,
397        event: &Event,
398        expected_version: Option<u64>,
399    ) -> Result<u64> {
400        // Validate event first (before any locking)
401        self.validate_event(event)?;
402
403        let entity_id = event.entity_id_str().to_string();
404
405        // Atomic version check + append: hold the DashMap entry lock
406        // to prevent TOCTOU races between check and write.
407        let new_version = {
408            let mut version_entry = self.entity_versions.entry(entity_id.clone()).or_insert(0);
409            let current = *version_entry;
410
411            if let Some(expected) = expected_version
412                && current != expected
413            {
414                return Err(crate::error::AllSourceError::VersionConflict { expected, current });
415            }
416
417            // Write to WAL FIRST for durability (under version lock to keep atomicity)
418            if let Some(ref wal) = self.wal {
419                wal.append(event.clone())?;
420            }
421
422            *version_entry += 1;
423            *version_entry
424        };
425
426        // From here on, the event is durable (WAL) and version is bumped.
427        // Continue with indexing, projections, storage, and broadcast.
428        self.ingest_post_wal(event)?;
429
430        Ok(new_version)
431    }
432
433    /// Post-WAL ingestion: index, projections, storage, broadcast.
434    /// Called after WAL append and version bump are complete.
435    #[cfg_attr(feature = "hotpath", hotpath::measure)]
436    fn ingest_post_wal(&self, event: &Event) -> Result<()> {
437        #[cfg(feature = "server")]
438        let timer = self.metrics.ingestion_duration_seconds.start_timer();
439
440        let mut events = self.events.write();
441        let offset = events.len();
442
443        // Index the event
444        self.index.index_event(
445            event.id,
446            event.entity_id_str(),
447            event.event_type_str(),
448            event.timestamp,
449            offset,
450        )?;
451
452        // Process through projections
453        let projections = self.projections.read();
454        projections.process_event(event)?;
455        drop(projections);
456
457        // Process through pipelines
458        let pipeline_results = self.pipeline_manager.process_event(event);
459        if !pipeline_results.is_empty() {
460            tracing::debug!(
461                "Event {} processed by {} pipeline(s)",
462                event.id,
463                pipeline_results.len()
464            );
465            for (pipeline_id, result) in pipeline_results {
466                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
467            }
468        }
469
470        // Persist to Parquet storage if enabled
471        if let Some(ref storage) = self.storage {
472            let storage = storage.read();
473            storage.append_event(event.clone())?;
474        }
475
476        // Store the event in memory
477        events.push(event.clone());
478        let total_events = events.len();
479        drop(events);
480
481        // Broadcast to WebSocket clients
482        #[cfg(feature = "server")]
483        self.websocket_manager
484            .broadcast_event(Arc::new(event.clone()));
485
486        // Dispatch to matching webhook subscriptions
487        #[cfg(feature = "server")]
488        self.dispatch_webhooks(event);
489
490        // Update geospatial index
491        self.geo_index.index_event(event);
492
493        // Autonomous schema evolution
494        self.schema_evolution
495            .analyze_event(event.event_type_str(), &event.payload);
496
497        // Check if automatic snapshot should be created
498        self.check_auto_snapshot(event.entity_id_str(), event);
499
500        // Update metrics
501        #[cfg(feature = "server")]
502        {
503            self.metrics.events_ingested_total.inc();
504            self.metrics
505                .events_ingested_by_type
506                .with_label_values(&[event.event_type_str()])
507                .inc();
508            self.metrics.storage_events_total.set(total_events as i64);
509        }
510
511        // Update legacy total counter
512        let mut total = self.total_ingested.write();
513        *total += 1;
514
515        #[cfg(feature = "server")]
516        timer.observe_duration();
517
518        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
519
520        Ok(())
521    }
522
523    /// Ingest a new event into the store
524    #[cfg_attr(feature = "hotpath", hotpath::measure)]
525    pub fn ingest(&self, event: &Event) -> Result<()> {
526        // Start metrics timer (v0.6 feature)
527        #[cfg(feature = "server")]
528        let timer = self.metrics.ingestion_duration_seconds.start_timer();
529
530        // Validate event
531        let validation_result = self.validate_event(event);
532        if let Err(e) = validation_result {
533            #[cfg(feature = "server")]
534            {
535                self.metrics.ingestion_errors_total.inc();
536                timer.observe_duration();
537            }
538            return Err(e);
539        }
540
541        // Write to WAL FIRST for durability (v0.2 feature)
542        // This ensures event is persisted before processing
543        if let Some(ref wal) = self.wal
544            && let Err(e) = wal.append(event.clone())
545        {
546            #[cfg(feature = "server")]
547            {
548                self.metrics.ingestion_errors_total.inc();
549                timer.observe_duration();
550            }
551            return Err(e);
552        }
553
554        // Track per-entity version (unconditional increment, no version check)
555        *self
556            .entity_versions
557            .entry(event.entity_id_str().to_string())
558            .or_insert(0) += 1;
559
560        let mut events = self.events.write();
561        let offset = events.len();
562
563        // Index the event
564        self.index.index_event(
565            event.id,
566            event.entity_id_str(),
567            event.event_type_str(),
568            event.timestamp,
569            offset,
570        )?;
571
572        // Process through projections
573        let projections = self.projections.read();
574        projections.process_event(event)?;
575        drop(projections); // Release lock
576
577        // Process through pipelines (v0.5 feature)
578        // Pipelines can transform, filter, and aggregate events in real-time
579        let pipeline_results = self.pipeline_manager.process_event(event);
580        if !pipeline_results.is_empty() {
581            tracing::debug!(
582                "Event {} processed by {} pipeline(s)",
583                event.id,
584                pipeline_results.len()
585            );
586            // Pipeline results could be stored, emitted, or forwarded elsewhere
587            // For now, we just log them for observability
588            for (pipeline_id, result) in pipeline_results {
589                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
590            }
591        }
592
593        // Persist to Parquet storage if enabled (v0.2)
594        if let Some(ref storage) = self.storage {
595            let storage = storage.read();
596            storage.append_event(event.clone())?;
597        }
598
599        // Store the event in memory
600        events.push(event.clone());
601        let total_events = events.len();
602        drop(events); // Release lock early
603
604        // Broadcast to WebSocket clients (v0.2 feature)
605        #[cfg(feature = "server")]
606        self.websocket_manager
607            .broadcast_event(Arc::new(event.clone()));
608
609        // Dispatch to matching webhook subscriptions (v0.11 feature)
610        #[cfg(feature = "server")]
611        self.dispatch_webhooks(event);
612
613        // Update geospatial index (v2.0 feature)
614        self.geo_index.index_event(event);
615
616        // Autonomous schema evolution (v2.0 feature)
617        self.schema_evolution
618            .analyze_event(event.event_type_str(), &event.payload);
619
620        // Check if automatic snapshot should be created (v0.2 feature)
621        self.check_auto_snapshot(event.entity_id_str(), event);
622
623        // Update metrics (v0.6 feature)
624        #[cfg(feature = "server")]
625        {
626            self.metrics.events_ingested_total.inc();
627            self.metrics
628                .events_ingested_by_type
629                .with_label_values(&[event.event_type_str()])
630                .inc();
631            self.metrics.storage_events_total.set(total_events as i64);
632        }
633
634        // Update legacy total counter
635        let mut total = self.total_ingested.write();
636        *total += 1;
637
638        #[cfg(feature = "server")]
639        timer.observe_duration();
640
641        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
642
643        Ok(())
644    }
645
646    /// Ingest a batch of events with a single write lock acquisition.
647    ///
648    /// All events are validated first. If any event fails validation, no
649    /// events are stored (all-or-nothing validation). Events are then written
650    /// to WAL, indexed, processed through projections, and pushed to the
651    /// events vector under a single write lock.
652    #[cfg_attr(feature = "hotpath", hotpath::measure)]
653    pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()> {
654        if batch.is_empty() {
655            return Ok(());
656        }
657
658        // Phase 1: Validate all events before acquiring any locks
659        for event in &batch {
660            self.validate_event(event)?;
661        }
662
663        // Phase 2: Write all events to WAL (before write lock, for durability)
664        if let Some(ref wal) = self.wal {
665            for event in &batch {
666                wal.append(event.clone())?;
667            }
668        }
669
670        // Phase 3: Single write lock for index + projections + push
671        let mut events = self.events.write();
672        let projections = self.projections.read();
673
674        for event in batch {
675            let offset = events.len();
676
677            self.index.index_event(
678                event.id,
679                event.entity_id_str(),
680                event.event_type_str(),
681                event.timestamp,
682                offset,
683            )?;
684
685            projections.process_event(&event)?;
686            self.pipeline_manager.process_event(&event);
687
688            if let Some(ref storage) = self.storage {
689                let storage = storage.read();
690                storage.append_event(event.clone())?;
691            }
692
693            self.geo_index.index_event(&event);
694            self.schema_evolution
695                .analyze_event(event.event_type_str(), &event.payload);
696
697            // Track per-entity version
698            *self
699                .entity_versions
700                .entry(event.entity_id_str().to_string())
701                .or_insert(0) += 1;
702
703            events.push(event);
704        }
705
706        let total_events = events.len();
707        drop(projections);
708        drop(events);
709
710        let mut total = self.total_ingested.write();
711        *total += total_events as u64;
712
713        Ok(())
714    }
715
716    /// Ingest a replicated event from the leader (follower mode).
717    ///
718    /// Unlike `ingest()`, this method:
719    /// - Skips WAL writing (the follower's WalReceiver manages its own local WAL)
720    /// - Skips schema validation (the leader already validated)
721    /// - Still indexes, processes projections/pipelines, and broadcasts to WebSocket clients
722    #[cfg_attr(feature = "hotpath", hotpath::measure)]
723    pub fn ingest_replicated(&self, event: &Event) -> Result<()> {
724        #[cfg(feature = "server")]
725        let timer = self.metrics.ingestion_duration_seconds.start_timer();
726
727        let mut events = self.events.write();
728        let offset = events.len();
729
730        // Index the event
731        self.index.index_event(
732            event.id,
733            event.entity_id_str(),
734            event.event_type_str(),
735            event.timestamp,
736            offset,
737        )?;
738
739        // Process through projections
740        let projections = self.projections.read();
741        projections.process_event(event)?;
742        drop(projections);
743
744        // Process through pipelines
745        let pipeline_results = self.pipeline_manager.process_event(event);
746        if !pipeline_results.is_empty() {
747            tracing::debug!(
748                "Replicated event {} processed by {} pipeline(s)",
749                event.id,
750                pipeline_results.len()
751            );
752        }
753
754        // Track per-entity version
755        *self
756            .entity_versions
757            .entry(event.entity_id_str().to_string())
758            .or_insert(0) += 1;
759
760        // Store the event in memory
761        events.push(event.clone());
762        let total_events = events.len();
763        drop(events);
764
765        // Broadcast to WebSocket clients
766        #[cfg(feature = "server")]
767        self.websocket_manager
768            .broadcast_event(Arc::new(event.clone()));
769
770        // Update metrics
771        #[cfg(feature = "server")]
772        {
773            self.metrics.events_ingested_total.inc();
774            self.metrics
775                .events_ingested_by_type
776                .with_label_values(&[event.event_type_str()])
777                .inc();
778            self.metrics.storage_events_total.set(total_events as i64);
779        }
780
781        let mut total = self.total_ingested.write();
782        *total += 1;
783
784        #[cfg(feature = "server")]
785        timer.observe_duration();
786
787        tracing::debug!(
788            "Replicated event ingested: {} (offset: {})",
789            event.id,
790            offset
791        );
792
793        Ok(())
794    }
795
796    /// Get the current version for an entity (number of events appended for it).
797    /// Returns 0 if the entity has no events.
798    #[cfg_attr(feature = "hotpath", hotpath::measure)]
799    pub fn get_entity_version(&self, entity_id: &str) -> u64 {
800        self.entity_versions.get(entity_id).map_or(0, |v| *v)
801    }
802
803    /// Get the consumer registry for durable subscriptions.
804    pub fn consumer_registry(&self) -> &ConsumerRegistry {
805        &self.consumer_registry
806    }
807
808    /// Replace the default in-memory consumer registry with a durable one.
809    ///
810    /// Called during startup when system repositories are available, so that
811    /// consumer cursors survive Core restarts via WAL persistence.
812    pub fn set_consumer_registry(&mut self, registry: Arc<ConsumerRegistry>) {
813        self.consumer_registry = registry;
814    }
815
816    /// Get the total number of events in the store (used as max offset for consumer ack).
817    pub fn total_events(&self) -> usize {
818        self.events.read().len()
819    }
820
821    /// Get events after a given offset, optionally filtered by event type prefixes.
822    /// Used by consumer polling to fetch unprocessed events.
823    pub fn events_after_offset(
824        &self,
825        offset: u64,
826        filters: &[String],
827        limit: usize,
828    ) -> Vec<(u64, Event)> {
829        let events = self.events.read();
830        let start = offset as usize;
831        if start >= events.len() {
832            return vec![];
833        }
834
835        events[start..]
836            .iter()
837            .enumerate()
838            .filter(|(_, event)| ConsumerRegistry::matches_filters(event.event_type_str(), filters))
839            .take(limit)
840            .map(|(i, event)| ((start + i + 1) as u64, event.clone()))
841            .collect()
842    }
843
844    /// Get the WebSocket manager for this store
845    #[cfg(feature = "server")]
846    pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
847        Arc::clone(&self.websocket_manager)
848    }
849
850    /// Get the snapshot manager for this store
851    pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
852        Arc::clone(&self.snapshot_manager)
853    }
854
855    /// Get the compaction manager for this store
856    pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
857        self.compaction_manager.as_ref().map(Arc::clone)
858    }
859
860    /// Get the schema registry for this store (v0.5 feature)
861    pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
862        Arc::clone(&self.schema_registry)
863    }
864
865    /// Get the replay manager for this store (v0.5 feature)
866    pub fn replay_manager(&self) -> Arc<ReplayManager> {
867        Arc::clone(&self.replay_manager)
868    }
869
870    /// Get the pipeline manager for this store (v0.5 feature)
871    pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
872        Arc::clone(&self.pipeline_manager)
873    }
874
875    /// Get the metrics registry for this store (v0.6 feature)
876    #[cfg(feature = "server")]
877    pub fn metrics(&self) -> Arc<MetricsRegistry> {
878        Arc::clone(&self.metrics)
879    }
880
881    /// Get the projection manager for this store (v0.7 feature)
882    pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
883        self.projections.read()
884    }
885
886    /// Register a custom projection at runtime.
887    ///
888    /// The projection will receive all future events via `process()`.
889    /// Historical events are **not** replayed — only events ingested after
890    /// registration will be processed by this projection.
891    ///
892    /// See [`register_projection_with_backfill`](Self::register_projection_with_backfill)
893    /// to also process historical events.
894    pub fn register_projection(
895        &self,
896        projection: Arc<dyn crate::application::services::projection::Projection>,
897    ) {
898        let mut pm = self.projections.write();
899        pm.register(projection);
900    }
901
902    /// Register a custom projection and replay all existing events through it.
903    ///
904    /// After registration, the projection will also receive all future events.
905    /// Historical events are replayed under a read lock — the projection's
906    /// internal state (typically DashMap) handles concurrent access.
907    pub fn register_projection_with_backfill(
908        &self,
909        projection: &Arc<dyn crate::application::services::projection::Projection>,
910    ) -> Result<()> {
911        // First register so future events are processed
912        {
913            let mut pm = self.projections.write();
914            pm.register(Arc::clone(projection));
915        }
916
917        // Then replay existing events under read lock
918        let events = self.events.read();
919        for event in events.iter() {
920            projection.process(event)?;
921        }
922
923        Ok(())
924    }
925
926    /// Get the projection state cache for this store (v0.7 feature)
927    /// Used by Elixir Query Service for state synchronization
928    pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
929        Arc::clone(&self.projection_state_cache)
930    }
931
932    /// Get the projection status map (v0.13 feature)
933    pub fn projection_status(&self) -> Arc<DashMap<String, String>> {
934        Arc::clone(&self.projection_status)
935    }
936
937    /// Get the webhook registry for this store (v0.11 feature)
938    /// Geospatial index for coordinate-based queries (v2.0 feature)
939    pub fn geo_index(&self) -> Arc<GeoIndex> {
940        self.geo_index.clone()
941    }
942
943    /// Exactly-once processing registry (v2.0 feature)
944    pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
945        self.exactly_once.clone()
946    }
947
948    /// Schema evolution manager (v2.0 feature)
949    pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
950        self.schema_evolution.clone()
951    }
952
953    /// Get a read-locked snapshot of all events (for EventQL/GraphQL queries).
954    ///
955    /// Returns an `Arc` reference to the internal events vec, avoiding a full
956    /// clone. The caller holds a read lock for the duration of the `Arc`
957    /// lifetime — prefer short-lived usage.
958    pub fn snapshot_events(&self) -> Vec<Event> {
959        self.events.read().clone()
960    }
961
962    /// Compact token events for an entity by replacing matching events with a
963    /// single merged event. Used by the embedded streaming feature.
964    ///
965    /// Returns `Ok(true)` if compaction was performed, `Ok(false)` if no
966    /// matching events were found.
967    ///
968    /// **Note:** The merged event is processed through projections *without*
969    /// clearing the removed events' projection state first. Projections that
970    /// accumulate state (e.g., counters) should be designed to handle this
971    /// (the merged event replaces individual tokens, not adds to them).
972    ///
973    /// **Crash safety:** The WAL append happens *after* the in-memory swap
974    /// under the write lock. If the process crashes before the WAL write,
975    /// no change is persisted — WAL replay restores the pre-compaction state.
976    /// If the process crashes after the WAL write, replay sees the merged
977    /// event (and the original tokens, which are idempotent to replay since
978    /// the merged event supersedes them).
979    ///
980    /// The write lock is held for the swap + WAL write + index rebuild.
981    /// The index rebuild is O(N) over all events, which is acceptable for
982    /// embedded workloads but should not be called in hot paths for large stores.
983    pub fn compact_entity_tokens(
984        &self,
985        entity_id: &str,
986        token_event_type: &str,
987        merged_event: Event,
988    ) -> Result<bool> {
989        // Phase 1: Read-only check — do we have anything to compact?
990        {
991            let events = self.events.read();
992            let has_tokens = events
993                .iter()
994                .any(|e| e.entity_id_str() == entity_id && e.event_type_str() == token_event_type);
995            if !has_tokens {
996                return Ok(false);
997            }
998        }
999
1000        // Phase 2: Process merged event through projections (no write lock held)
1001        let projections = self.projections.read();
1002        projections.process_event(&merged_event)?;
1003        drop(projections);
1004
1005        // Phase 3: Acquire write lock for the swap + WAL + index rebuild
1006        let mut events = self.events.write();
1007
1008        events.retain(|e| {
1009            !(e.entity_id_str() == entity_id && e.event_type_str() == token_event_type)
1010        });
1011
1012        events.push(merged_event.clone());
1013
1014        // WAL append inside write lock: crash before this line = no change persisted.
1015        // Crash after = merged event in WAL, original tokens also in WAL but
1016        // superseded by the merged event's entity_id + event_type.
1017        if let Some(ref wal) = self.wal {
1018            wal.append(merged_event)?;
1019        }
1020
1021        // Rebuild entire index since retain() shifted event positions.
1022        // Errors here indicate a corrupt event (missing entity_id/event_type)
1023        // which should not happen for well-formed events. Log and continue
1024        // rather than failing the entire compaction.
1025        self.index.clear();
1026        for (offset, event) in events.iter().enumerate() {
1027            if let Err(e) = self.index.index_event(
1028                event.id,
1029                event.entity_id_str(),
1030                event.event_type_str(),
1031                event.timestamp,
1032                offset,
1033            ) {
1034                tracing::warn!(
1035                    event_id = %event.id,
1036                    offset,
1037                    "Failed to re-index event during compaction: {e}"
1038                );
1039            }
1040        }
1041
1042        Ok(true)
1043    }
1044
1045    #[cfg(feature = "server")]
1046    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
1047        Arc::clone(&self.webhook_registry)
1048    }
1049
1050    /// Set the channel for async webhook delivery.
1051    /// Called during server startup to wire the delivery worker.
1052    #[cfg(feature = "server")]
1053    pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
1054        *self.webhook_tx.write() = Some(tx);
1055        tracing::info!("Webhook delivery channel connected");
1056    }
1057
1058    /// Dispatch matching webhooks for a given event (non-blocking).
1059    #[cfg(feature = "server")]
1060    fn dispatch_webhooks(&self, event: &Event) {
1061        let matching = self.webhook_registry.find_matching(event);
1062        if matching.is_empty() {
1063            return;
1064        }
1065
1066        let tx_guard = self.webhook_tx.read();
1067        if let Some(ref tx) = *tx_guard {
1068            for webhook in matching {
1069                let task = WebhookDeliveryTask {
1070                    webhook,
1071                    event: event.clone(),
1072                };
1073                if let Err(e) = tx.send(task) {
1074                    tracing::warn!("Failed to queue webhook delivery: {}", e);
1075                }
1076            }
1077        }
1078    }
1079
1080    /// Manually flush any pending events to persistent storage
1081    pub fn flush_storage(&self) -> Result<()> {
1082        if let Some(ref storage) = self.storage {
1083            let storage = storage.read();
1084            storage.flush()?;
1085            tracing::info!("✅ Flushed events to persistent storage");
1086        }
1087        Ok(())
1088    }
1089
1090    /// Manually create a snapshot for an entity
1091    pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
1092        // Get all events for this entity
1093        let events = self.query(&QueryEventsRequest {
1094            entity_id: Some(entity_id.to_string()),
1095            event_type: None,
1096            tenant_id: None,
1097            as_of: None,
1098            since: None,
1099            until: None,
1100            limit: None,
1101            event_type_prefix: None,
1102            payload_filter: None,
1103        })?;
1104
1105        if events.is_empty() {
1106            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1107        }
1108
1109        // Build current state
1110        let mut state = serde_json::json!({});
1111        for event in &events {
1112            if let serde_json::Value::Object(ref mut state_map) = state
1113                && let serde_json::Value::Object(ref payload_map) = event.payload
1114            {
1115                for (key, value) in payload_map {
1116                    state_map.insert(key.clone(), value.clone());
1117                }
1118            }
1119        }
1120
1121        let last_event = events.last().unwrap();
1122        self.snapshot_manager.create_snapshot(
1123            entity_id,
1124            state,
1125            last_event.timestamp,
1126            events.len(),
1127            SnapshotType::Manual,
1128        )?;
1129
1130        Ok(())
1131    }
1132
1133    /// Check and create automatic snapshots if needed
1134    fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
1135        // Count events for this entity
1136        let entity_event_count = self
1137            .index
1138            .get_by_entity(entity_id)
1139            .map_or(0, |entries| entries.len());
1140
1141        if self.snapshot_manager.should_create_snapshot(
1142            entity_id,
1143            entity_event_count,
1144            event.timestamp,
1145        ) {
1146            // Create snapshot in background (don't block ingestion)
1147            if let Err(e) = self.create_snapshot(entity_id) {
1148                tracing::warn!(
1149                    "Failed to create automatic snapshot for {}: {}",
1150                    entity_id,
1151                    e
1152                );
1153            }
1154        }
1155    }
1156
1157    /// Validate an event before ingestion
1158    fn validate_event(&self, event: &Event) -> Result<()> {
1159        // EntityId and EventType value objects already validate non-empty in their constructors
1160        // So these checks are now redundant, but we keep them for explicit validation
1161        if event.entity_id_str().is_empty() {
1162            return Err(AllSourceError::ValidationError(
1163                "entity_id cannot be empty".to_string(),
1164            ));
1165        }
1166
1167        if event.event_type_str().is_empty() {
1168            return Err(AllSourceError::ValidationError(
1169                "event_type cannot be empty".to_string(),
1170            ));
1171        }
1172
1173        // Reject system namespace events from user-facing ingestion.
1174        // System events are written exclusively via SystemMetadataStore.
1175        if event.event_type().is_system() {
1176            return Err(AllSourceError::ValidationError(
1177                "Event types starting with '_system.' are reserved for internal use".to_string(),
1178            ));
1179        }
1180
1181        Ok(())
1182    }
1183
1184    /// Reset a projection by clearing its state and reprocessing all events
1185    pub fn reset_projection(&self, name: &str) -> Result<usize> {
1186        let projection_manager = self.projections.read();
1187        let projection = projection_manager.get_projection(name).ok_or_else(|| {
1188            AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1189        })?;
1190
1191        // Clear existing state
1192        projection.clear();
1193
1194        // Clear cached state for this projection
1195        let prefix = format!("{name}:");
1196        let keys_to_remove: Vec<String> = self
1197            .projection_state_cache
1198            .iter()
1199            .filter(|entry| entry.key().starts_with(&prefix))
1200            .map(|entry| entry.key().clone())
1201            .collect();
1202        for key in keys_to_remove {
1203            self.projection_state_cache.remove(&key);
1204        }
1205
1206        // Reprocess all events through this projection
1207        let events = self.events.read();
1208        let mut reprocessed = 0usize;
1209        for event in events.iter() {
1210            if projection.process(event).is_ok() {
1211                reprocessed += 1;
1212            }
1213        }
1214
1215        Ok(reprocessed)
1216    }
1217
1218    /// Get a single event by its UUID
1219    pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
1220        if let Some(offset) = self.index.get_by_id(event_id) {
1221            let events = self.events.read();
1222            Ok(events.get(offset).cloned())
1223        } else {
1224            Ok(None)
1225        }
1226    }
1227
1228    /// Query events based on filters (optimized with indices)
1229    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1230    pub fn query(&self, request: &QueryEventsRequest) -> Result<Vec<Event>> {
1231        // Determine query type for metrics (v0.6 feature)
1232        let query_type = if request.entity_id.is_some() {
1233            "entity"
1234        } else if request.event_type.is_some() {
1235            "type"
1236        } else if request.event_type_prefix.is_some() {
1237            "type_prefix"
1238        } else {
1239            "full_scan"
1240        };
1241
1242        // Start metrics timer (v0.6 feature)
1243        #[cfg(feature = "server")]
1244        let timer = self
1245            .metrics
1246            .query_duration_seconds
1247            .with_label_values(&[query_type])
1248            .start_timer();
1249
1250        // Increment query counter (v0.6 feature)
1251        #[cfg(feature = "server")]
1252        self.metrics
1253            .queries_total
1254            .with_label_values(&[query_type])
1255            .inc();
1256
1257        let events = self.events.read();
1258
1259        // Use index for fast lookups
1260        let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
1261            // Use entity index
1262            self.index
1263                .get_by_entity(entity_id)
1264                .map(|entries| self.filter_entries(entries, request))
1265                .unwrap_or_default()
1266        } else if let Some(event_type) = &request.event_type {
1267            // Use type index (exact match)
1268            self.index
1269                .get_by_type(event_type)
1270                .map(|entries| self.filter_entries(entries, request))
1271                .unwrap_or_default()
1272        } else if let Some(prefix) = &request.event_type_prefix {
1273            // Use type index (prefix match)
1274            let entries = self.index.get_by_type_prefix(prefix);
1275            self.filter_entries(entries, request)
1276        } else {
1277            // Full scan (less efficient but necessary for complex queries)
1278            (0..events.len()).collect()
1279        };
1280
1281        // Fetch events and apply remaining filters
1282        let mut results: Vec<Event> = offsets
1283            .iter()
1284            .filter_map(|&offset| events.get(offset).cloned())
1285            .filter(|event| self.apply_filters(event, request))
1286            .collect();
1287
1288        // Sort by timestamp (ascending)
1289        results.sort_by_key(|x| x.timestamp);
1290
1291        // Apply limit
1292        if let Some(limit) = request.limit {
1293            results.truncate(limit);
1294        }
1295
1296        // Record query results count (v0.6 feature)
1297        #[cfg(feature = "server")]
1298        {
1299            self.metrics
1300                .query_results_total
1301                .with_label_values(&[query_type])
1302                .inc_by(results.len() as u64);
1303            timer.observe_duration();
1304        }
1305
1306        Ok(results)
1307    }
1308
1309    /// Filter index entries based on query parameters
1310    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1311    fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
1312        entries
1313            .into_iter()
1314            .filter(|entry| {
1315                // Time filters
1316                if let Some(as_of) = request.as_of
1317                    && entry.timestamp > as_of
1318                {
1319                    return false;
1320                }
1321                if let Some(since) = request.since
1322                    && entry.timestamp < since
1323                {
1324                    return false;
1325                }
1326                if let Some(until) = request.until
1327                    && entry.timestamp > until
1328                {
1329                    return false;
1330                }
1331                true
1332            })
1333            .map(|entry| entry.offset)
1334            .collect()
1335    }
1336
1337    /// Apply filters to an event
1338    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1339    fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
1340        // Additional type filter if entity was primary
1341        if request.entity_id.is_some()
1342            && let Some(ref event_type) = request.event_type
1343            && event.event_type_str() != event_type
1344        {
1345            return false;
1346        }
1347
1348        // Additional prefix filter if entity was primary
1349        if request.entity_id.is_some()
1350            && let Some(ref prefix) = request.event_type_prefix
1351            && !event.event_type_str().starts_with(prefix)
1352        {
1353            return false;
1354        }
1355
1356        // Payload field filtering
1357        if let Some(ref filter_str) = request.payload_filter
1358            && let Ok(filter_obj) =
1359                serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(filter_str)
1360        {
1361            let payload = event.payload();
1362            for (key, expected_value) in &filter_obj {
1363                match payload.get(key) {
1364                    Some(actual_value) if actual_value == expected_value => {}
1365                    _ => return false,
1366                }
1367            }
1368        }
1369
1370        true
1371    }
1372
1373    /// Reconstruct entity state as of a specific timestamp
1374    /// v0.2: Now uses snapshots for fast reconstruction
1375    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1376    pub fn reconstruct_state(
1377        &self,
1378        entity_id: &str,
1379        as_of: Option<DateTime<Utc>>,
1380    ) -> Result<serde_json::Value> {
1381        // Try to find a snapshot to use as a base (v0.2 optimization)
1382        let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
1383            // Get snapshot closest to requested time
1384            if let Some(snapshot) = self
1385                .snapshot_manager
1386                .get_snapshot_as_of(entity_id, as_of_time)
1387            {
1388                tracing::debug!(
1389                    "Using snapshot from {} for entity {} (saved {} events)",
1390                    snapshot.as_of,
1391                    entity_id,
1392                    snapshot.event_count
1393                );
1394                (snapshot.state.clone(), Some(snapshot.as_of))
1395            } else {
1396                (serde_json::json!({}), None)
1397            }
1398        } else {
1399            // Get latest snapshot for current state
1400            if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
1401                tracing::debug!(
1402                    "Using latest snapshot from {} for entity {}",
1403                    snapshot.as_of,
1404                    entity_id
1405                );
1406                (snapshot.state.clone(), Some(snapshot.as_of))
1407            } else {
1408                (serde_json::json!({}), None)
1409            }
1410        };
1411
1412        // Query events after the snapshot (or all if no snapshot)
1413        let events = self.query(&QueryEventsRequest {
1414            entity_id: Some(entity_id.to_string()),
1415            event_type: None,
1416            tenant_id: None,
1417            as_of,
1418            since: since_timestamp,
1419            until: None,
1420            limit: None,
1421            event_type_prefix: None,
1422            payload_filter: None,
1423        })?;
1424
1425        // If no events and no snapshot, entity not found
1426        if events.is_empty() && since_timestamp.is_none() {
1427            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1428        }
1429
1430        // Merge events on top of snapshot (or from scratch if no snapshot)
1431        let mut merged_state = merged_state;
1432        for event in &events {
1433            if let serde_json::Value::Object(ref mut state_map) = merged_state
1434                && let serde_json::Value::Object(ref payload_map) = event.payload
1435            {
1436                for (key, value) in payload_map {
1437                    state_map.insert(key.clone(), value.clone());
1438                }
1439            }
1440        }
1441
1442        // Wrap with metadata
1443        let state = serde_json::json!({
1444            "entity_id": entity_id,
1445            "last_updated": events.last().map(|e| e.timestamp),
1446            "event_count": events.len(),
1447            "as_of": as_of,
1448            "current_state": merged_state,
1449            "history": events.iter().map(|e| {
1450                serde_json::json!({
1451                    "event_id": e.id,
1452                    "type": e.event_type,
1453                    "timestamp": e.timestamp,
1454                    "payload": e.payload
1455                })
1456            }).collect::<Vec<_>>()
1457        });
1458
1459        Ok(state)
1460    }
1461
1462    /// Get snapshot from projection (faster than reconstructing)
1463    pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
1464        let projections = self.projections.read();
1465
1466        if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
1467            && let Some(state) = snapshot_projection.get_state(entity_id)
1468        {
1469            return Ok(serde_json::json!({
1470                "entity_id": entity_id,
1471                "snapshot": state,
1472                "from_projection": "entity_snapshots"
1473            }));
1474        }
1475
1476        Err(AllSourceError::EntityNotFound(entity_id.to_string()))
1477    }
1478
1479    /// Get statistics about the event store
1480    pub fn stats(&self) -> StoreStats {
1481        let events = self.events.read();
1482        let index_stats = self.index.stats();
1483
1484        StoreStats {
1485            total_events: events.len(),
1486            total_entities: index_stats.total_entities,
1487            total_event_types: index_stats.total_event_types,
1488            total_ingested: *self.total_ingested.read(),
1489        }
1490    }
1491
1492    /// Get all unique streams (entity_ids) in the store
1493    pub fn list_streams(&self) -> Vec<StreamInfo> {
1494        self.index
1495            .get_all_entities()
1496            .into_iter()
1497            .map(|entity_id| {
1498                let event_count = self
1499                    .index
1500                    .get_by_entity(&entity_id)
1501                    .map_or(0, |entries| entries.len());
1502                let last_event_at = self
1503                    .index
1504                    .get_by_entity(&entity_id)
1505                    .and_then(|entries| entries.last().map(|e| e.timestamp));
1506                StreamInfo {
1507                    stream_id: entity_id,
1508                    event_count,
1509                    last_event_at,
1510                }
1511            })
1512            .collect()
1513    }
1514
1515    /// Get all unique event types in the store
1516    pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
1517        self.index
1518            .get_all_types()
1519            .into_iter()
1520            .map(|event_type| {
1521                let event_count = self
1522                    .index
1523                    .get_by_type(&event_type)
1524                    .map_or(0, |entries| entries.len());
1525                let last_event_at = self
1526                    .index
1527                    .get_by_type(&event_type)
1528                    .and_then(|entries| entries.last().map(|e| e.timestamp));
1529                EventTypeInfo {
1530                    event_type,
1531                    event_count,
1532                    last_event_at,
1533                }
1534            })
1535            .collect()
1536    }
1537
1538    /// Attach a broadcast sender to the WAL for replication.
1539    ///
1540    /// Thread-safe: can be called through `Arc<EventStore>` at runtime.
1541    /// Used during initial setup and during follower → leader promotion.
1542    /// When set, every WAL append publishes the entry to the broadcast
1543    /// channel so the WAL shipper can stream it to followers.
1544    pub fn enable_wal_replication(
1545        &self,
1546        tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
1547    ) {
1548        if let Some(ref wal_arc) = self.wal {
1549            wal_arc.set_replication_tx(tx);
1550            tracing::info!("WAL replication broadcast enabled");
1551        } else {
1552            tracing::warn!("Cannot enable WAL replication: WAL is not configured");
1553        }
1554    }
1555
1556    /// Get a reference to the WAL (if configured).
1557    /// Used by the replication catch-up protocol to determine oldest available offset.
1558    pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1559        self.wal.as_ref()
1560    }
1561
1562    /// Get a reference to the Parquet storage (if configured).
1563    /// Used by the replication catch-up protocol to stream snapshot files to followers.
1564    pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1565        self.storage.as_ref()
1566    }
1567}
1568
1569/// Configuration for EventStore
1570#[derive(Debug, Clone, Default)]
1571pub struct EventStoreConfig {
1572    /// Optional directory for persistent Parquet storage (v0.2 feature)
1573    pub storage_dir: Option<PathBuf>,
1574
1575    /// Snapshot configuration (v0.2 feature)
1576    pub snapshot_config: SnapshotConfig,
1577
1578    /// Optional directory for WAL (Write-Ahead Log) (v0.2 feature)
1579    pub wal_dir: Option<PathBuf>,
1580
1581    /// WAL configuration (v0.2 feature)
1582    pub wal_config: WALConfig,
1583
1584    /// Compaction configuration (v0.2 feature)
1585    pub compaction_config: CompactionConfig,
1586
1587    /// Schema registry configuration (v0.5 feature)
1588    pub schema_registry_config: SchemaRegistryConfig,
1589
1590    /// Optional directory for system metadata storage (dogfood feature).
1591    /// When set, operational metadata (tenants, config, audit) is stored
1592    /// using AllSource's own event store rather than an external database.
1593    /// Defaults to `{storage_dir}/__system/` when storage_dir is set.
1594    pub system_data_dir: Option<PathBuf>,
1595
1596    /// Name of the default tenant to auto-create on first boot.
1597    pub bootstrap_tenant: Option<String>,
1598}
1599
1600impl EventStoreConfig {
1601    /// Create config with persistent storage enabled
1602    pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
1603        Self {
1604            storage_dir: Some(storage_dir.into()),
1605            ..Self::default()
1606        }
1607    }
1608
1609    /// Create config with custom snapshot settings
1610    pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
1611        Self {
1612            snapshot_config,
1613            ..Self::default()
1614        }
1615    }
1616
1617    /// Create config with WAL enabled
1618    pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
1619        Self {
1620            wal_dir: Some(wal_dir.into()),
1621            wal_config,
1622            ..Self::default()
1623        }
1624    }
1625
1626    /// Create config with both persistence and snapshots
1627    pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
1628        Self {
1629            storage_dir: Some(storage_dir.into()),
1630            snapshot_config,
1631            ..Self::default()
1632        }
1633    }
1634
1635    /// Create production config with all features enabled
1636    pub fn production(
1637        storage_dir: impl Into<PathBuf>,
1638        wal_dir: impl Into<PathBuf>,
1639        snapshot_config: SnapshotConfig,
1640        wal_config: WALConfig,
1641        compaction_config: CompactionConfig,
1642    ) -> Self {
1643        let storage_dir = storage_dir.into();
1644        let system_data_dir = storage_dir.join("__system");
1645        Self {
1646            storage_dir: Some(storage_dir),
1647            snapshot_config,
1648            wal_dir: Some(wal_dir.into()),
1649            wal_config,
1650            compaction_config,
1651            system_data_dir: Some(system_data_dir),
1652            ..Self::default()
1653        }
1654    }
1655
1656    /// Resolve the effective system data directory.
1657    ///
1658    /// If explicitly set, returns that. Otherwise, derives from storage_dir.
1659    /// Returns None if neither is configured (in-memory mode).
1660    pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
1661        self.system_data_dir
1662            .clone()
1663            .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
1664    }
1665
1666    /// Build config from environment variables.
1667    ///
1668    /// Reads `ALLSOURCE_DATA_DIR`, `ALLSOURCE_STORAGE_DIR`, `ALLSOURCE_WAL_DIR`,
1669    /// and `ALLSOURCE_WAL_ENABLED` to determine persistence mode.
1670    ///
1671    /// Returns `(config, description)` where description is a human-readable
1672    /// summary of the persistence mode for logging.
1673    pub fn from_env() -> (Self, &'static str) {
1674        Self::from_env_vars(
1675            std::env::var("ALLSOURCE_DATA_DIR")
1676                .ok()
1677                .filter(|s| !s.is_empty()),
1678            std::env::var("ALLSOURCE_STORAGE_DIR")
1679                .ok()
1680                .filter(|s| !s.is_empty()),
1681            std::env::var("ALLSOURCE_WAL_DIR")
1682                .ok()
1683                .filter(|s| !s.is_empty()),
1684            std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
1685        )
1686    }
1687
1688    /// Build config from explicit env-var values (testable without mutating process env).
1689    pub fn from_env_vars(
1690        data_dir: Option<String>,
1691        explicit_storage_dir: Option<String>,
1692        explicit_wal_dir: Option<String>,
1693        wal_enabled_var: Option<String>,
1694    ) -> (Self, &'static str) {
1695        let data_dir = data_dir.filter(|s| !s.is_empty());
1696        let storage_dir = explicit_storage_dir
1697            .filter(|s| !s.is_empty())
1698            .or_else(|| data_dir.as_ref().map(|d| format!("{d}/storage")));
1699        let wal_dir = explicit_wal_dir
1700            .filter(|s| !s.is_empty())
1701            .or_else(|| data_dir.as_ref().map(|d| format!("{d}/wal")));
1702        let wal_enabled = wal_enabled_var.is_none_or(|v| v == "true");
1703
1704        match (&storage_dir, &wal_dir) {
1705            (Some(sd), Some(wd)) if wal_enabled => {
1706                let config = Self::production(
1707                    sd,
1708                    wd,
1709                    SnapshotConfig::default(),
1710                    WALConfig::default(),
1711                    CompactionConfig::default(),
1712                );
1713                (config, "wal+parquet")
1714            }
1715            (Some(sd), _) => {
1716                let config = Self::with_persistence(sd);
1717                (config, "parquet-only")
1718            }
1719            (_, Some(wd)) if wal_enabled => {
1720                let config = Self::with_wal(wd, WALConfig::default());
1721                (config, "wal-only")
1722            }
1723            _ => (Self::default(), "in-memory"),
1724        }
1725    }
1726}
1727
1728#[derive(Debug, serde::Serialize)]
1729pub struct StoreStats {
1730    pub total_events: usize,
1731    pub total_entities: usize,
1732    pub total_event_types: usize,
1733    pub total_ingested: u64,
1734}
1735
1736/// Information about a stream (entity_id)
1737#[derive(Debug, Clone, serde::Serialize)]
1738pub struct StreamInfo {
1739    /// The stream identifier (entity_id)
1740    pub stream_id: String,
1741    /// Total number of events in this stream
1742    pub event_count: usize,
1743    /// Timestamp of the last event in this stream
1744    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1745}
1746
1747/// Information about an event type
1748#[derive(Debug, Clone, serde::Serialize)]
1749pub struct EventTypeInfo {
1750    /// The event type name
1751    pub event_type: String,
1752    /// Total number of events of this type
1753    pub event_count: usize,
1754    /// Timestamp of the last event of this type
1755    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1756}
1757
1758impl Default for EventStore {
1759    fn default() -> Self {
1760        Self::new()
1761    }
1762}
1763
1764#[cfg(test)]
1765mod tests {
1766    use super::*;
1767    use crate::domain::entities::Event;
1768    use tempfile::TempDir;
1769
1770    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1771        Event::from_strings(
1772            event_type.to_string(),
1773            entity_id.to_string(),
1774            "default".to_string(),
1775            serde_json::json!({"name": "Test", "value": 42}),
1776            None,
1777        )
1778        .unwrap()
1779    }
1780
1781    fn create_test_event_with_payload(
1782        entity_id: &str,
1783        event_type: &str,
1784        payload: serde_json::Value,
1785    ) -> Event {
1786        Event::from_strings(
1787            event_type.to_string(),
1788            entity_id.to_string(),
1789            "default".to_string(),
1790            payload,
1791            None,
1792        )
1793        .unwrap()
1794    }
1795
1796    #[test]
1797    fn test_event_store_new() {
1798        let store = EventStore::new();
1799        assert_eq!(store.stats().total_events, 0);
1800        assert_eq!(store.stats().total_entities, 0);
1801    }
1802
1803    #[test]
1804    fn test_event_store_default() {
1805        let store = EventStore::default();
1806        assert_eq!(store.stats().total_events, 0);
1807    }
1808
1809    #[test]
1810    fn test_ingest_single_event() {
1811        let store = EventStore::new();
1812        let event = create_test_event("entity-1", "user.created");
1813
1814        store.ingest(&event).unwrap();
1815
1816        assert_eq!(store.stats().total_events, 1);
1817        assert_eq!(store.stats().total_ingested, 1);
1818    }
1819
1820    #[test]
1821    fn test_ingest_multiple_events() {
1822        let store = EventStore::new();
1823
1824        for i in 0..10 {
1825            let event = create_test_event(&format!("entity-{i}"), "user.created");
1826            store.ingest(&event).unwrap();
1827        }
1828
1829        assert_eq!(store.stats().total_events, 10);
1830        assert_eq!(store.stats().total_ingested, 10);
1831    }
1832
1833    #[test]
1834    fn test_query_by_entity_id() {
1835        let store = EventStore::new();
1836
1837        store
1838            .ingest(&create_test_event("entity-1", "user.created"))
1839            .unwrap();
1840        store
1841            .ingest(&create_test_event("entity-2", "user.created"))
1842            .unwrap();
1843        store
1844            .ingest(&create_test_event("entity-1", "user.updated"))
1845            .unwrap();
1846
1847        let results = store
1848            .query(&QueryEventsRequest {
1849                entity_id: Some("entity-1".to_string()),
1850                event_type: None,
1851                tenant_id: None,
1852                as_of: None,
1853                since: None,
1854                until: None,
1855                limit: None,
1856                event_type_prefix: None,
1857                payload_filter: None,
1858            })
1859            .unwrap();
1860
1861        assert_eq!(results.len(), 2);
1862    }
1863
1864    #[test]
1865    fn test_query_by_event_type() {
1866        let store = EventStore::new();
1867
1868        store
1869            .ingest(&create_test_event("entity-1", "user.created"))
1870            .unwrap();
1871        store
1872            .ingest(&create_test_event("entity-2", "user.updated"))
1873            .unwrap();
1874        store
1875            .ingest(&create_test_event("entity-3", "user.created"))
1876            .unwrap();
1877
1878        let results = store
1879            .query(&QueryEventsRequest {
1880                entity_id: None,
1881                event_type: Some("user.created".to_string()),
1882                tenant_id: None,
1883                as_of: None,
1884                since: None,
1885                until: None,
1886                limit: None,
1887                event_type_prefix: None,
1888                payload_filter: None,
1889            })
1890            .unwrap();
1891
1892        assert_eq!(results.len(), 2);
1893    }
1894
1895    #[test]
1896    fn test_query_with_limit() {
1897        let store = EventStore::new();
1898
1899        for i in 0..10 {
1900            let event = create_test_event(&format!("entity-{i}"), "user.created");
1901            store.ingest(&event).unwrap();
1902        }
1903
1904        let results = store
1905            .query(&QueryEventsRequest {
1906                entity_id: None,
1907                event_type: None,
1908                tenant_id: None,
1909                as_of: None,
1910                since: None,
1911                until: None,
1912                limit: Some(5),
1913                event_type_prefix: None,
1914                payload_filter: None,
1915            })
1916            .unwrap();
1917
1918        assert_eq!(results.len(), 5);
1919    }
1920
1921    #[test]
1922    fn test_query_empty_store() {
1923        let store = EventStore::new();
1924
1925        let results = store
1926            .query(&QueryEventsRequest {
1927                entity_id: Some("non-existent".to_string()),
1928                event_type: None,
1929                tenant_id: None,
1930                as_of: None,
1931                since: None,
1932                until: None,
1933                limit: None,
1934                event_type_prefix: None,
1935                payload_filter: None,
1936            })
1937            .unwrap();
1938
1939        assert!(results.is_empty());
1940    }
1941
1942    #[test]
1943    fn test_reconstruct_state() {
1944        let store = EventStore::new();
1945
1946        store
1947            .ingest(&create_test_event("entity-1", "user.created"))
1948            .unwrap();
1949
1950        let state = store.reconstruct_state("entity-1", None).unwrap();
1951        // The state is wrapped with metadata
1952        assert_eq!(state["current_state"]["name"], "Test");
1953        assert_eq!(state["current_state"]["value"], 42);
1954    }
1955
1956    #[test]
1957    fn test_reconstruct_state_not_found() {
1958        let store = EventStore::new();
1959
1960        let result = store.reconstruct_state("non-existent", None);
1961        assert!(result.is_err());
1962    }
1963
1964    #[test]
1965    fn test_get_snapshot_empty() {
1966        let store = EventStore::new();
1967
1968        let result = store.get_snapshot("non-existent");
1969        // Entity not found error is expected
1970        assert!(result.is_err());
1971    }
1972
1973    #[test]
1974    fn test_create_snapshot() {
1975        let store = EventStore::new();
1976
1977        store
1978            .ingest(&create_test_event("entity-1", "user.created"))
1979            .unwrap();
1980
1981        store.create_snapshot("entity-1").unwrap();
1982
1983        // Verify snapshot was created
1984        let snapshot = store.get_snapshot("entity-1").unwrap();
1985        assert!(snapshot != serde_json::json!(null));
1986    }
1987
1988    #[test]
1989    fn test_create_snapshot_entity_not_found() {
1990        let store = EventStore::new();
1991
1992        let result = store.create_snapshot("non-existent");
1993        assert!(result.is_err());
1994    }
1995
1996    #[test]
1997    fn test_websocket_manager() {
1998        let store = EventStore::new();
1999        let manager = store.websocket_manager();
2000        // Manager should be accessible
2001        assert!(Arc::strong_count(&manager) >= 1);
2002    }
2003
2004    #[test]
2005    fn test_snapshot_manager() {
2006        let store = EventStore::new();
2007        let manager = store.snapshot_manager();
2008        assert!(Arc::strong_count(&manager) >= 1);
2009    }
2010
2011    #[test]
2012    fn test_compaction_manager_none() {
2013        let store = EventStore::new();
2014        // Without storage_dir, compaction manager should be None
2015        assert!(store.compaction_manager().is_none());
2016    }
2017
2018    #[test]
2019    fn test_schema_registry() {
2020        let store = EventStore::new();
2021        let registry = store.schema_registry();
2022        assert!(Arc::strong_count(&registry) >= 1);
2023    }
2024
2025    #[test]
2026    fn test_replay_manager() {
2027        let store = EventStore::new();
2028        let manager = store.replay_manager();
2029        assert!(Arc::strong_count(&manager) >= 1);
2030    }
2031
2032    #[test]
2033    fn test_pipeline_manager() {
2034        let store = EventStore::new();
2035        let manager = store.pipeline_manager();
2036        assert!(Arc::strong_count(&manager) >= 1);
2037    }
2038
2039    #[test]
2040    fn test_projection_manager() {
2041        let store = EventStore::new();
2042        let manager = store.projection_manager();
2043        // Built-in projections should be registered
2044        let projections = manager.list_projections();
2045        assert!(projections.len() >= 2); // entity_snapshots and event_counters
2046    }
2047
2048    #[test]
2049    fn test_projection_state_cache() {
2050        let store = EventStore::new();
2051        let cache = store.projection_state_cache();
2052
2053        cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
2054        assert_eq!(cache.len(), 1);
2055
2056        let value = cache.get("test:key").unwrap();
2057        assert_eq!(value["value"], 123);
2058    }
2059
2060    #[test]
2061    fn test_metrics() {
2062        let store = EventStore::new();
2063        let metrics = store.metrics();
2064        assert!(Arc::strong_count(&metrics) >= 1);
2065    }
2066
2067    #[test]
2068    fn test_store_stats() {
2069        let store = EventStore::new();
2070
2071        store
2072            .ingest(&create_test_event("entity-1", "user.created"))
2073            .unwrap();
2074        store
2075            .ingest(&create_test_event("entity-2", "order.placed"))
2076            .unwrap();
2077
2078        let stats = store.stats();
2079        assert_eq!(stats.total_events, 2);
2080        assert_eq!(stats.total_entities, 2);
2081        assert_eq!(stats.total_event_types, 2);
2082        assert_eq!(stats.total_ingested, 2);
2083    }
2084
2085    #[test]
2086    fn test_event_store_config_default() {
2087        let config = EventStoreConfig::default();
2088        assert!(config.storage_dir.is_none());
2089        assert!(config.wal_dir.is_none());
2090    }
2091
2092    #[test]
2093    fn test_event_store_config_with_persistence() {
2094        let temp_dir = TempDir::new().unwrap();
2095        let config = EventStoreConfig::with_persistence(temp_dir.path());
2096
2097        assert!(config.storage_dir.is_some());
2098        assert!(config.wal_dir.is_none());
2099    }
2100
2101    #[test]
2102    fn test_event_store_config_with_wal() {
2103        let temp_dir = TempDir::new().unwrap();
2104        let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
2105
2106        assert!(config.storage_dir.is_none());
2107        assert!(config.wal_dir.is_some());
2108    }
2109
2110    #[test]
2111    fn test_event_store_config_with_all() {
2112        let temp_dir = TempDir::new().unwrap();
2113        let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
2114
2115        assert!(config.storage_dir.is_some());
2116    }
2117
2118    #[test]
2119    fn test_event_store_config_production() {
2120        let storage_dir = TempDir::new().unwrap();
2121        let wal_dir = TempDir::new().unwrap();
2122        let config = EventStoreConfig::production(
2123            storage_dir.path(),
2124            wal_dir.path(),
2125            SnapshotConfig::default(),
2126            WALConfig::default(),
2127            CompactionConfig::default(),
2128        );
2129
2130        assert!(config.storage_dir.is_some());
2131        assert!(config.wal_dir.is_some());
2132    }
2133
2134    // -----------------------------------------------------------------------
2135    // from_env_vars tests — verifies the env-var-to-config wiring that
2136    // caused the durability bug (events lost on restart) in v0.10.3.
2137    // -----------------------------------------------------------------------
2138
2139    #[test]
2140    fn test_from_env_vars_data_dir_enables_full_persistence() {
2141        let (config, mode) =
2142            EventStoreConfig::from_env_vars(Some("/app/data".to_string()), None, None, None);
2143        assert_eq!(mode, "wal+parquet");
2144        assert_eq!(
2145            config.storage_dir.unwrap().to_str().unwrap(),
2146            "/app/data/storage"
2147        );
2148        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
2149    }
2150
2151    #[test]
2152    fn test_from_env_vars_explicit_dirs() {
2153        let (config, mode) = EventStoreConfig::from_env_vars(
2154            None,
2155            Some("/custom/storage".to_string()),
2156            Some("/custom/wal".to_string()),
2157            None,
2158        );
2159        assert_eq!(mode, "wal+parquet");
2160        assert_eq!(
2161            config.storage_dir.unwrap().to_str().unwrap(),
2162            "/custom/storage"
2163        );
2164        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
2165    }
2166
2167    #[test]
2168    fn test_from_env_vars_wal_disabled() {
2169        let (config, mode) = EventStoreConfig::from_env_vars(
2170            Some("/app/data".to_string()),
2171            None,
2172            None,
2173            Some("false".to_string()),
2174        );
2175        assert_eq!(mode, "parquet-only");
2176        assert!(config.storage_dir.is_some());
2177        assert!(config.wal_dir.is_none());
2178    }
2179
2180    #[test]
2181    fn test_from_env_vars_no_dirs_is_in_memory() {
2182        let (config, mode) = EventStoreConfig::from_env_vars(None, None, None, None);
2183        assert_eq!(mode, "in-memory");
2184        assert!(config.storage_dir.is_none());
2185        assert!(config.wal_dir.is_none());
2186    }
2187
2188    #[test]
2189    fn test_from_env_vars_empty_strings_treated_as_none() {
2190        let (_, mode) = EventStoreConfig::from_env_vars(
2191            Some(String::new()),
2192            Some(String::new()),
2193            Some(String::new()),
2194            None,
2195        );
2196        assert_eq!(mode, "in-memory");
2197    }
2198
2199    #[test]
2200    fn test_from_env_vars_explicit_overrides_data_dir() {
2201        let (config, mode) = EventStoreConfig::from_env_vars(
2202            Some("/app/data".to_string()),
2203            Some("/override/storage".to_string()),
2204            Some("/override/wal".to_string()),
2205            None,
2206        );
2207        assert_eq!(mode, "wal+parquet");
2208        assert_eq!(
2209            config.storage_dir.unwrap().to_str().unwrap(),
2210            "/override/storage"
2211        );
2212        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
2213    }
2214
2215    #[test]
2216    fn test_from_env_vars_wal_only() {
2217        let (config, mode) =
2218            EventStoreConfig::from_env_vars(None, None, Some("/wal/only".to_string()), None);
2219        assert_eq!(mode, "wal-only");
2220        assert!(config.storage_dir.is_none());
2221        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
2222    }
2223
2224    #[test]
2225    fn test_store_stats_serde() {
2226        let stats = StoreStats {
2227            total_events: 100,
2228            total_entities: 50,
2229            total_event_types: 10,
2230            total_ingested: 100,
2231        };
2232
2233        let json = serde_json::to_string(&stats).unwrap();
2234        assert!(json.contains("\"total_events\":100"));
2235        assert!(json.contains("\"total_entities\":50"));
2236    }
2237
2238    #[test]
2239    fn test_query_with_entity_and_type() {
2240        let store = EventStore::new();
2241
2242        store
2243            .ingest(&create_test_event("entity-1", "user.created"))
2244            .unwrap();
2245        store
2246            .ingest(&create_test_event("entity-1", "user.updated"))
2247            .unwrap();
2248        store
2249            .ingest(&create_test_event("entity-2", "user.created"))
2250            .unwrap();
2251
2252        let results = store
2253            .query(&QueryEventsRequest {
2254                entity_id: Some("entity-1".to_string()),
2255                event_type: Some("user.created".to_string()),
2256                tenant_id: None,
2257                as_of: None,
2258                since: None,
2259                until: None,
2260                limit: None,
2261                event_type_prefix: None,
2262                payload_filter: None,
2263            })
2264            .unwrap();
2265
2266        assert_eq!(results.len(), 1);
2267        assert_eq!(results[0].event_type_str(), "user.created");
2268    }
2269
2270    #[test]
2271    fn test_query_by_event_type_prefix() {
2272        let store = EventStore::new();
2273
2274        // Ingest events with various types
2275        store
2276            .ingest(&create_test_event("entity-1", "index.created"))
2277            .unwrap();
2278        store
2279            .ingest(&create_test_event("entity-2", "index.updated"))
2280            .unwrap();
2281        store
2282            .ingest(&create_test_event("entity-3", "trade.created"))
2283            .unwrap();
2284        store
2285            .ingest(&create_test_event("entity-4", "trade.completed"))
2286            .unwrap();
2287        store
2288            .ingest(&create_test_event("entity-5", "balance.updated"))
2289            .unwrap();
2290
2291        // Query with prefix "index." should return exactly 2
2292        let results = store
2293            .query(&QueryEventsRequest {
2294                entity_id: None,
2295                event_type: None,
2296                tenant_id: None,
2297                as_of: None,
2298                since: None,
2299                until: None,
2300                limit: None,
2301                event_type_prefix: Some("index.".to_string()),
2302                payload_filter: None,
2303            })
2304            .unwrap();
2305
2306        assert_eq!(results.len(), 2);
2307        assert!(
2308            results
2309                .iter()
2310                .all(|e| e.event_type_str().starts_with("index."))
2311        );
2312    }
2313
2314    #[test]
2315    fn test_query_by_event_type_prefix_empty_returns_all() {
2316        let store = EventStore::new();
2317
2318        store
2319            .ingest(&create_test_event("entity-1", "index.created"))
2320            .unwrap();
2321        store
2322            .ingest(&create_test_event("entity-2", "trade.created"))
2323            .unwrap();
2324
2325        // Empty prefix matches all types
2326        let results = store
2327            .query(&QueryEventsRequest {
2328                entity_id: None,
2329                event_type: None,
2330                tenant_id: None,
2331                as_of: None,
2332                since: None,
2333                until: None,
2334                limit: None,
2335                event_type_prefix: Some(String::new()),
2336                payload_filter: None,
2337            })
2338            .unwrap();
2339
2340        assert_eq!(results.len(), 2);
2341    }
2342
2343    #[test]
2344    fn test_query_by_event_type_prefix_no_match() {
2345        let store = EventStore::new();
2346
2347        store
2348            .ingest(&create_test_event("entity-1", "index.created"))
2349            .unwrap();
2350
2351        let results = store
2352            .query(&QueryEventsRequest {
2353                entity_id: None,
2354                event_type: None,
2355                tenant_id: None,
2356                as_of: None,
2357                since: None,
2358                until: None,
2359                limit: None,
2360                event_type_prefix: Some("nonexistent.".to_string()),
2361                payload_filter: None,
2362            })
2363            .unwrap();
2364
2365        assert!(results.is_empty());
2366    }
2367
2368    #[test]
2369    fn test_query_by_entity_with_type_prefix() {
2370        let store = EventStore::new();
2371
2372        store
2373            .ingest(&create_test_event("entity-1", "index.created"))
2374            .unwrap();
2375        store
2376            .ingest(&create_test_event("entity-1", "trade.created"))
2377            .unwrap();
2378        store
2379            .ingest(&create_test_event("entity-2", "index.updated"))
2380            .unwrap();
2381
2382        // Query entity-1 with prefix "index." should return 1
2383        let results = store
2384            .query(&QueryEventsRequest {
2385                entity_id: Some("entity-1".to_string()),
2386                event_type: None,
2387                tenant_id: None,
2388                as_of: None,
2389                since: None,
2390                until: None,
2391                limit: None,
2392                event_type_prefix: Some("index.".to_string()),
2393                payload_filter: None,
2394            })
2395            .unwrap();
2396
2397        assert_eq!(results.len(), 1);
2398        assert_eq!(results[0].event_type_str(), "index.created");
2399    }
2400
2401    #[test]
2402    fn test_query_prefix_with_limit() {
2403        let store = EventStore::new();
2404
2405        for i in 0..5 {
2406            store
2407                .ingest(&create_test_event(&format!("entity-{i}"), "index.created"))
2408                .unwrap();
2409        }
2410
2411        let results = store
2412            .query(&QueryEventsRequest {
2413                entity_id: None,
2414                event_type: None,
2415                tenant_id: None,
2416                as_of: None,
2417                since: None,
2418                until: None,
2419                limit: Some(3),
2420                event_type_prefix: Some("index.".to_string()),
2421                payload_filter: None,
2422            })
2423            .unwrap();
2424
2425        assert_eq!(results.len(), 3);
2426    }
2427
2428    #[test]
2429    fn test_query_prefix_alongside_existing_filters() {
2430        let store = EventStore::new();
2431
2432        store
2433            .ingest(&create_test_event("entity-1", "index.created"))
2434            .unwrap();
2435        // Sleep briefly to ensure different timestamps
2436        std::thread::sleep(std::time::Duration::from_millis(10));
2437        store
2438            .ingest(&create_test_event("entity-2", "index.strategy.updated"))
2439            .unwrap();
2440        std::thread::sleep(std::time::Duration::from_millis(10));
2441        store
2442            .ingest(&create_test_event("entity-3", "index.deleted"))
2443            .unwrap();
2444
2445        // Prefix with limit
2446        let results = store
2447            .query(&QueryEventsRequest {
2448                entity_id: None,
2449                event_type: None,
2450                tenant_id: None,
2451                as_of: None,
2452                since: None,
2453                until: None,
2454                limit: Some(2),
2455                event_type_prefix: Some("index.".to_string()),
2456                payload_filter: None,
2457            })
2458            .unwrap();
2459
2460        assert_eq!(results.len(), 2);
2461    }
2462
2463    #[test]
2464    fn test_query_with_payload_filter() {
2465        let store = EventStore::new();
2466
2467        // Ingest 5 events with user_id=alice
2468        for i in 0..5 {
2469            store
2470                .ingest(&create_test_event_with_payload(
2471                    &format!("entity-{i}"),
2472                    "user.action",
2473                    serde_json::json!({"user_id": "alice", "action": "click"}),
2474                ))
2475                .unwrap();
2476        }
2477        // Ingest 5 events with user_id=bob
2478        for i in 5..10 {
2479            store
2480                .ingest(&create_test_event_with_payload(
2481                    &format!("entity-{i}"),
2482                    "user.action",
2483                    serde_json::json!({"user_id": "bob", "action": "view"}),
2484                ))
2485                .unwrap();
2486        }
2487
2488        // Filter for alice
2489        let results = store
2490            .query(&QueryEventsRequest {
2491                entity_id: None,
2492                event_type: Some("user.action".to_string()),
2493                tenant_id: None,
2494                as_of: None,
2495                since: None,
2496                until: None,
2497                limit: None,
2498                event_type_prefix: None,
2499                payload_filter: Some(r#"{"user_id":"alice"}"#.to_string()),
2500            })
2501            .unwrap();
2502
2503        assert_eq!(results.len(), 5);
2504    }
2505
2506    #[test]
2507    fn test_query_payload_filter_non_existent_field() {
2508        let store = EventStore::new();
2509
2510        store
2511            .ingest(&create_test_event_with_payload(
2512                "entity-1",
2513                "user.action",
2514                serde_json::json!({"user_id": "alice"}),
2515            ))
2516            .unwrap();
2517
2518        // Filter for a field that doesn't exist — returns 0, not error
2519        let results = store
2520            .query(&QueryEventsRequest {
2521                entity_id: None,
2522                event_type: None,
2523                tenant_id: None,
2524                as_of: None,
2525                since: None,
2526                until: None,
2527                limit: None,
2528                event_type_prefix: None,
2529                payload_filter: Some(r#"{"nonexistent":"value"}"#.to_string()),
2530            })
2531            .unwrap();
2532
2533        assert!(results.is_empty());
2534    }
2535
2536    #[test]
2537    fn test_query_payload_filter_with_prefix() {
2538        let store = EventStore::new();
2539
2540        store
2541            .ingest(&create_test_event_with_payload(
2542                "entity-1",
2543                "index.created",
2544                serde_json::json!({"status": "active"}),
2545            ))
2546            .unwrap();
2547        store
2548            .ingest(&create_test_event_with_payload(
2549                "entity-2",
2550                "index.created",
2551                serde_json::json!({"status": "inactive"}),
2552            ))
2553            .unwrap();
2554        store
2555            .ingest(&create_test_event_with_payload(
2556                "entity-3",
2557                "trade.created",
2558                serde_json::json!({"status": "active"}),
2559            ))
2560            .unwrap();
2561
2562        // Combine prefix + payload filter
2563        let results = store
2564            .query(&QueryEventsRequest {
2565                entity_id: None,
2566                event_type: None,
2567                tenant_id: None,
2568                as_of: None,
2569                since: None,
2570                until: None,
2571                limit: None,
2572                event_type_prefix: Some("index.".to_string()),
2573                payload_filter: Some(r#"{"status":"active"}"#.to_string()),
2574            })
2575            .unwrap();
2576
2577        assert_eq!(results.len(), 1);
2578        assert_eq!(results[0].entity_id().to_string(), "entity-1");
2579    }
2580
2581    #[test]
2582    fn test_flush_storage_no_storage() {
2583        let store = EventStore::new();
2584        // Without storage, flush should succeed (no-op)
2585        let result = store.flush_storage();
2586        assert!(result.is_ok());
2587    }
2588
2589    #[test]
2590    fn test_state_evolution() {
2591        let store = EventStore::new();
2592
2593        // Initial state
2594        store
2595            .ingest(
2596                &Event::from_strings(
2597                    "user.created".to_string(),
2598                    "user-1".to_string(),
2599                    "default".to_string(),
2600                    serde_json::json!({"name": "Alice", "age": 25}),
2601                    None,
2602                )
2603                .unwrap(),
2604            )
2605            .unwrap();
2606
2607        // Update state
2608        store
2609            .ingest(
2610                &Event::from_strings(
2611                    "user.updated".to_string(),
2612                    "user-1".to_string(),
2613                    "default".to_string(),
2614                    serde_json::json!({"age": 26}),
2615                    None,
2616                )
2617                .unwrap(),
2618            )
2619            .unwrap();
2620
2621        let state = store.reconstruct_state("user-1", None).unwrap();
2622        // The state is wrapped with metadata
2623        assert_eq!(state["current_state"]["name"], "Alice");
2624        assert_eq!(state["current_state"]["age"], 26);
2625    }
2626
2627    #[test]
2628    fn test_reject_system_event_types() {
2629        let store = EventStore::new();
2630
2631        // System event types should be rejected via user-facing ingestion
2632        let event = Event::reconstruct_from_strings(
2633            uuid::Uuid::new_v4(),
2634            "_system.tenant.created".to_string(),
2635            "_system:tenant:acme".to_string(),
2636            "_system".to_string(),
2637            serde_json::json!({"name": "ACME"}),
2638            chrono::Utc::now(),
2639            None,
2640            1,
2641        );
2642
2643        let result = store.ingest(&event);
2644        assert!(result.is_err());
2645        let err = result.unwrap_err();
2646        assert!(
2647            err.to_string().contains("reserved for internal use"),
2648            "Expected system namespace rejection, got: {err}"
2649        );
2650    }
2651
2652    // -----------------------------------------------------------------------
2653    // Crash recovery: WAL events survive restart via Parquet checkpoint.
2654    // Regression test for GitHub issue #84 — flush_storage() was a no-op
2655    // during recovery because events were never buffered into Parquet's
2656    // current_batch before flushing.
2657    // -----------------------------------------------------------------------
2658
2659    #[test]
2660    fn test_wal_recovery_checkpoints_to_parquet() {
2661        let data_dir = TempDir::new().unwrap();
2662        let storage_dir = data_dir.path().join("storage");
2663        let wal_dir = data_dir.path().join("wal");
2664
2665        // Session 1: ingest events with WAL + Parquet
2666        {
2667            let config = EventStoreConfig::production(
2668                &storage_dir,
2669                &wal_dir,
2670                SnapshotConfig::default(),
2671                WALConfig {
2672                    sync_on_write: true,
2673                    ..WALConfig::default()
2674                },
2675                CompactionConfig::default(),
2676            );
2677            let store = EventStore::with_config(config);
2678
2679            for i in 0..5 {
2680                let event = Event::from_strings(
2681                    "test.created".to_string(),
2682                    format!("entity-{i}"),
2683                    "default".to_string(),
2684                    serde_json::json!({"index": i}),
2685                    None,
2686                )
2687                .unwrap();
2688                store.ingest(&event).unwrap();
2689            }
2690
2691            assert_eq!(store.stats().total_events, 5);
2692
2693            // Do NOT call flush_storage or shutdown — simulate a crash.
2694            // Events are in WAL (sync_on_write: true) but NOT in Parquet.
2695        }
2696
2697        // Verify WAL file has data
2698        let wal_files: Vec<_> = std::fs::read_dir(&wal_dir)
2699            .unwrap()
2700            .filter_map(std::result::Result::ok)
2701            .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
2702            .collect();
2703        assert!(!wal_files.is_empty(), "WAL file should exist");
2704        let wal_size = wal_files[0].metadata().unwrap().len();
2705        assert!(wal_size > 0, "WAL file should have data (got 0 bytes)");
2706
2707        // Session 2: reopen — recovery should checkpoint WAL to Parquet, then truncate
2708        {
2709            let config = EventStoreConfig::production(
2710                &storage_dir,
2711                &wal_dir,
2712                SnapshotConfig::default(),
2713                WALConfig {
2714                    sync_on_write: true,
2715                    ..WALConfig::default()
2716                },
2717                CompactionConfig::default(),
2718            );
2719            let store = EventStore::with_config(config);
2720
2721            // Events should be recovered
2722            assert_eq!(
2723                store.stats().total_events,
2724                5,
2725                "Session 2 should have all 5 events after WAL recovery"
2726            );
2727
2728            // Parquet should now have files (checkpoint happened)
2729            let parquet_files: Vec<_> = std::fs::read_dir(&storage_dir)
2730                .unwrap()
2731                .filter_map(std::result::Result::ok)
2732                .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2733                .collect();
2734            assert!(
2735                !parquet_files.is_empty(),
2736                "Parquet file should exist after WAL checkpoint"
2737            );
2738        }
2739
2740        // Session 3: reopen again — events should load from Parquet (WAL was truncated)
2741        {
2742            let config = EventStoreConfig::production(
2743                &storage_dir,
2744                &wal_dir,
2745                SnapshotConfig::default(),
2746                WALConfig {
2747                    sync_on_write: true,
2748                    ..WALConfig::default()
2749                },
2750                CompactionConfig::default(),
2751            );
2752            let store = EventStore::with_config(config);
2753
2754            assert_eq!(
2755                store.stats().total_events,
2756                5,
2757                "Session 3 should still have all 5 events from Parquet"
2758            );
2759        }
2760    }
2761
2762    #[test]
2763    fn test_parquet_restore_surfaces_errors_not_silent() {
2764        // Write events with WAL+Parquet, flush to Parquet, then corrupt the
2765        // Parquet file. On reload, the error must be logged (not silently
2766        // swallowed as 0 events).
2767        let data_dir = TempDir::new().unwrap();
2768        let storage_dir = data_dir.path().join("storage");
2769        let wal_dir = data_dir.path().join("wal");
2770
2771        // Session 1: write events and flush to Parquet
2772        {
2773            let config = EventStoreConfig::production(
2774                &storage_dir,
2775                &wal_dir,
2776                SnapshotConfig::default(),
2777                WALConfig {
2778                    sync_on_write: true,
2779                    ..WALConfig::default()
2780                },
2781                CompactionConfig::default(),
2782            );
2783            let store = EventStore::with_config(config);
2784
2785            for i in 0..3 {
2786                let event = Event::from_strings(
2787                    "test.created".to_string(),
2788                    format!("entity-{i}"),
2789                    "default".to_string(),
2790                    serde_json::json!({"i": i}),
2791                    None,
2792                )
2793                .unwrap();
2794                store.ingest(&event).unwrap();
2795            }
2796
2797            store.flush_storage().unwrap();
2798            assert_eq!(store.stats().total_events, 3);
2799        }
2800
2801        // Verify parquet file exists
2802        let parquet_files: Vec<_> = std::fs::read_dir(&storage_dir)
2803            .unwrap()
2804            .filter_map(std::result::Result::ok)
2805            .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2806            .collect();
2807        assert!(!parquet_files.is_empty(), "Parquet file must exist");
2808
2809        // Corrupt the parquet file
2810        std::fs::write(parquet_files[0].path(), b"corrupted data").unwrap();
2811
2812        // Truncate WAL so only Parquet matters
2813        for entry in std::fs::read_dir(&wal_dir).unwrap().flatten() {
2814            std::fs::write(entry.path(), b"").unwrap();
2815        }
2816
2817        // Session 2: reload — should NOT silently report 0 events.
2818        // The error is logged via tracing::error! which we can't capture in a
2819        // unit test, but we CAN verify the store has 0 events (previously this
2820        // looked identical to "no data on disk" — now there's an error log).
2821        // The key behavioral change is that with_config no longer uses a
2822        // let-chain that silently drops the Err variant.
2823        {
2824            let config = EventStoreConfig::production(
2825                &storage_dir,
2826                &wal_dir,
2827                SnapshotConfig::default(),
2828                WALConfig::default(),
2829                CompactionConfig::default(),
2830            );
2831            let store = EventStore::with_config(config);
2832
2833            // Store has 0 events because Parquet is corrupted — but the error
2834            // is now logged (not silently swallowed).
2835            assert_eq!(store.stats().total_events, 0);
2836        }
2837    }
2838}