Skip to main content

allsource_core/
store.rs

1#[cfg(feature = "server")]
2use crate::application::services::webhook::WebhookRegistry;
3#[cfg(feature = "server")]
4use crate::infrastructure::observability::metrics::MetricsRegistry;
5#[cfg(feature = "server")]
6use crate::infrastructure::web::websocket::WebSocketManager;
7use crate::{
8    application::{
9        dto::QueryEventsRequest,
10        services::{
11            consumer::ConsumerRegistry,
12            exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
13            pipeline::PipelineManager,
14            projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
15            replay::ReplayManager,
16            schema::{SchemaRegistry, SchemaRegistryConfig},
17            schema_evolution::SchemaEvolutionManager,
18        },
19    },
20    domain::entities::Event,
21    error::{AllSourceError, Result},
22    infrastructure::{
23        persistence::{
24            compaction::{CompactionConfig, CompactionManager},
25            index::{EventIndex, IndexEntry},
26            snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
27            storage::ParquetStorage,
28            wal::{WALConfig, WriteAheadLog},
29        },
30        query::geospatial::GeoIndex,
31    },
32};
33use chrono::{DateTime, Utc};
34use dashmap::DashMap;
35use parking_lot::RwLock;
36use std::{path::PathBuf, sync::Arc};
37#[cfg(feature = "server")]
38use tokio::sync::mpsc;
39
40/// High-performance event store with columnar storage
41pub struct EventStore {
42    /// In-memory event storage
43    events: Arc<RwLock<Vec<Event>>>,
44
45    /// High-performance concurrent index
46    index: Arc<EventIndex>,
47
48    /// Projection manager for real-time aggregations
49    pub(crate) projections: Arc<RwLock<ProjectionManager>>,
50
51    /// Optional persistent storage (v0.2 feature)
52    storage: Option<Arc<RwLock<ParquetStorage>>>,
53
54    /// WebSocket manager for real-time event streaming (v0.2 feature)
55    #[cfg(feature = "server")]
56    websocket_manager: Arc<WebSocketManager>,
57
58    /// Snapshot manager for fast state recovery (v0.2 feature)
59    snapshot_manager: Arc<SnapshotManager>,
60
61    /// Write-Ahead Log for durability (v0.2 feature)
62    wal: Option<Arc<WriteAheadLog>>,
63
64    /// Compaction manager for Parquet optimization (v0.2 feature)
65    compaction_manager: Option<Arc<CompactionManager>>,
66
67    /// Schema registry for event validation (v0.5 feature)
68    schema_registry: Arc<SchemaRegistry>,
69
70    /// Replay manager for event replay and projection rebuilding (v0.5 feature)
71    replay_manager: Arc<ReplayManager>,
72
73    /// Pipeline manager for stream processing (v0.5 feature)
74    pipeline_manager: Arc<PipelineManager>,
75
76    /// Prometheus metrics registry (v0.6 feature)
77    #[cfg(feature = "server")]
78    metrics: Arc<MetricsRegistry>,
79
80    /// Total events ingested (for metrics)
81    total_ingested: Arc<RwLock<u64>>,
82
83    /// Projection state cache for Query Service integration (v0.7 feature)
84    /// Key format: "{projection_name}:{entity_id}"
85    /// This DashMap provides O(1) access with ~11.9 μs latency
86    projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
87
88    /// Projection status overrides (v0.13 feature)
89    /// Tracks pause/start state per projection name: "running" or "paused"
90    projection_status: Arc<DashMap<String, String>>,
91
92    /// Webhook registry for outbound event delivery (v0.11 feature)
93    #[cfg(feature = "server")]
94    webhook_registry: Arc<WebhookRegistry>,
95
96    /// Channel sender for async webhook delivery tasks
97    #[cfg(feature = "server")]
98    webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
99
100    /// Geospatial index for coordinate-based queries (v2.0 feature)
101    geo_index: Arc<GeoIndex>,
102
103    /// Exactly-once processing registry (v2.0 feature)
104    exactly_once: Arc<ExactlyOnceRegistry>,
105
106    /// Autonomous schema evolution manager (v2.0 feature)
107    schema_evolution: Arc<SchemaEvolutionManager>,
108
109    /// Per-entity version counters for optimistic concurrency control (v0.14 feature)
110    /// Key: entity_id string, Value: monotonic version (number of events for that entity)
111    entity_versions: Arc<DashMap<String, u64>>,
112
113    /// Durable consumer registry for subscription cursor tracking (v0.14 feature)
114    consumer_registry: Arc<ConsumerRegistry>,
115}
116
117/// A task queued for async webhook delivery
118#[cfg(feature = "server")]
119#[derive(Debug, Clone)]
120pub struct WebhookDeliveryTask {
121    pub webhook: crate::application::services::webhook::WebhookSubscription,
122    pub event: Event,
123}
124
125impl EventStore {
126    /// Create a new in-memory event store
127    pub fn new() -> Self {
128        Self::with_config(EventStoreConfig::default())
129    }
130
131    /// Create event store with custom configuration
132    pub fn with_config(config: EventStoreConfig) -> Self {
133        let mut projections = ProjectionManager::new();
134
135        // Register built-in projections
136        projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
137        projections.register(Arc::new(EventCounterProjection::new("event_counters")));
138
139        // Initialize persistent storage if configured
140        let storage = config
141            .storage_dir
142            .as_ref()
143            .and_then(|dir| match ParquetStorage::new(dir) {
144                Ok(storage) => {
145                    tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
146                    Some(Arc::new(RwLock::new(storage)))
147                }
148                Err(e) => {
149                    tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
150                    None
151                }
152            });
153
154        // Initialize WAL if configured (v0.2 feature)
155        let wal = config.wal_dir.as_ref().and_then(|dir| {
156            match WriteAheadLog::new(dir, config.wal_config.clone()) {
157                Ok(wal) => {
158                    tracing::info!("✅ WAL enabled at: {}", dir.display());
159                    Some(Arc::new(wal))
160                }
161                Err(e) => {
162                    tracing::error!("❌ Failed to initialize WAL: {}", e);
163                    None
164                }
165            }
166        });
167
168        // Initialize compaction manager if Parquet storage is enabled (v0.2 feature)
169        let compaction_manager = config.storage_dir.as_ref().map(|dir| {
170            let manager = CompactionManager::new(dir, config.compaction_config.clone());
171            Arc::new(manager)
172        });
173
174        // Initialize schema registry (v0.5 feature)
175        let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
176        tracing::info!("✅ Schema registry enabled");
177
178        // Initialize replay manager (v0.5 feature)
179        let replay_manager = Arc::new(ReplayManager::new());
180        tracing::info!("✅ Replay manager enabled");
181
182        // Initialize pipeline manager (v0.5 feature)
183        let pipeline_manager = Arc::new(PipelineManager::new());
184        tracing::info!("✅ Pipeline manager enabled");
185
186        // Initialize metrics registry (v0.6 feature)
187        #[cfg(feature = "server")]
188        let metrics = {
189            let m = MetricsRegistry::new();
190            tracing::info!("✅ Prometheus metrics registry initialized");
191            m
192        };
193
194        // Initialize projection state cache (v0.7 feature)
195        let projection_state_cache = Arc::new(DashMap::new());
196        tracing::info!("✅ Projection state cache initialized");
197
198        // Initialize webhook registry (v0.11 feature)
199        #[cfg(feature = "server")]
200        let webhook_registry = {
201            let w = Arc::new(WebhookRegistry::new());
202            tracing::info!("✅ Webhook registry initialized");
203            w
204        };
205
206        let store = Self {
207            events: Arc::new(RwLock::new(Vec::new())),
208            index: Arc::new(EventIndex::new()),
209            projections: Arc::new(RwLock::new(projections)),
210            storage,
211            #[cfg(feature = "server")]
212            websocket_manager: Arc::new(WebSocketManager::new()),
213            snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
214            wal,
215            compaction_manager,
216            schema_registry,
217            replay_manager,
218            pipeline_manager,
219            #[cfg(feature = "server")]
220            metrics,
221            total_ingested: Arc::new(RwLock::new(0)),
222            projection_state_cache,
223            projection_status: Arc::new(DashMap::new()),
224            #[cfg(feature = "server")]
225            webhook_registry,
226            #[cfg(feature = "server")]
227            webhook_tx: Arc::new(RwLock::new(None)),
228            geo_index: Arc::new(GeoIndex::new()),
229            exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
230            schema_evolution: Arc::new(SchemaEvolutionManager::new()),
231            entity_versions: Arc::new(DashMap::new()),
232            consumer_registry: Arc::new(ConsumerRegistry::new()),
233        };
234
235        // Step 1: Load persisted events from Parquet (the durable baseline)
236        if let Some(ref storage) = store.storage
237            && let Ok(persisted_events) = storage.read().load_all_events()
238            && !persisted_events.is_empty()
239        {
240            tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
241
242            for event in persisted_events {
243                let offset = store.events.read().len();
244                if let Err(e) = store.index.index_event(
245                    event.id,
246                    event.entity_id_str(),
247                    event.event_type_str(),
248                    event.timestamp,
249                    offset,
250                ) {
251                    tracing::error!("Failed to re-index event {}: {}", event.id, e);
252                }
253
254                if let Err(e) = store.projections.read().process_event(&event) {
255                    tracing::error!("Failed to re-process event {}: {}", event.id, e);
256                }
257
258                // Track per-entity version
259                *store
260                    .entity_versions
261                    .entry(event.entity_id_str().to_string())
262                    .or_insert(0) += 1;
263
264                store.events.write().push(event);
265            }
266
267            let total = store.events.read().len();
268            *store.total_ingested.write() = total as u64;
269            tracing::info!("✅ Successfully loaded {} events from storage", total);
270        }
271
272        // Step 2: Recover WAL events (written after last Parquet checkpoint)
273        if let Some(ref wal) = store.wal {
274            match wal.recover() {
275                Ok(recovered_events) if !recovered_events.is_empty() => {
276                    // Collect IDs already loaded from Parquet to skip duplicates
277                    let existing_ids: std::collections::HashSet<uuid::Uuid> =
278                        store.events.read().iter().map(|e| e.id).collect();
279
280                    let mut wal_new = 0usize;
281                    for event in recovered_events {
282                        if existing_ids.contains(&event.id) {
283                            continue; // already loaded from Parquet
284                        }
285
286                        let offset = store.events.read().len();
287                        if let Err(e) = store.index.index_event(
288                            event.id,
289                            event.entity_id_str(),
290                            event.event_type_str(),
291                            event.timestamp,
292                            offset,
293                        ) {
294                            tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
295                        }
296
297                        if let Err(e) = store.projections.read().process_event(&event) {
298                            tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
299                        }
300
301                        // Track per-entity version
302                        *store
303                            .entity_versions
304                            .entry(event.entity_id_str().to_string())
305                            .or_insert(0) += 1;
306
307                        store.events.write().push(event);
308                        wal_new += 1;
309                    }
310
311                    if wal_new > 0 {
312                        let total = store.events.read().len();
313                        *store.total_ingested.write() = total as u64;
314                        tracing::info!(
315                            "✅ Recovered {} new events from WAL ({} total)",
316                            wal_new,
317                            total
318                        );
319
320                        // Checkpoint WAL events to Parquet — buffer them into
321                        // the Parquet batch first, then flush. Without this,
322                        // flush_storage() finds an empty current_batch and
323                        // silently no-ops, then we truncate the WAL and the
324                        // events exist only in memory (lost on next restart).
325                        if let Some(ref storage) = store.storage {
326                            tracing::info!(
327                                "📸 Checkpointing {} WAL events to Parquet storage...",
328                                wal_new
329                            );
330                            let parquet = storage.read();
331                            let events = store.events.read();
332                            let mut buffered = 0usize;
333                            for event in events.iter().skip(events.len() - wal_new) {
334                                if let Err(e) = parquet.append_event(event.clone()) {
335                                    tracing::error!(
336                                        "Failed to buffer WAL event for Parquet: {}",
337                                        e
338                                    );
339                                } else {
340                                    buffered += 1;
341                                }
342                            }
343                            drop(events);
344                            drop(parquet);
345
346                            if buffered > 0 {
347                                if let Err(e) = store.flush_storage() {
348                                    tracing::error!("Failed to checkpoint to Parquet: {}", e);
349                                } else if let Err(e) = wal.truncate() {
350                                    tracing::error!(
351                                        "Failed to truncate WAL after checkpoint: {}",
352                                        e
353                                    );
354                                } else {
355                                    tracing::info!(
356                                        "✅ WAL checkpointed and truncated ({} events)",
357                                        buffered
358                                    );
359                                }
360                            }
361                        }
362                    }
363                }
364                Ok(_) => {
365                    tracing::debug!("No events to recover from WAL");
366                }
367                Err(e) => {
368                    tracing::error!("❌ WAL recovery failed: {}", e);
369                }
370            }
371        }
372
373        store
374    }
375
376    /// Ingest a new event with optional optimistic concurrency check.
377    ///
378    /// If `expected_version` is `Some(v)`, the write is rejected with
379    /// `VersionConflict` unless the entity's current version equals `v`.
380    /// The version check and WAL append are atomic (locked together).
381    ///
382    /// Returns the new entity version after the append.
383    pub fn ingest_with_expected_version(
384        &self,
385        event: Event,
386        expected_version: Option<u64>,
387    ) -> Result<u64> {
388        // Validate event first (before any locking)
389        self.validate_event(&event)?;
390
391        let entity_id = event.entity_id_str().to_string();
392
393        // Atomic version check + append: hold the DashMap entry lock
394        // to prevent TOCTOU races between check and write.
395        let new_version = {
396            let mut version_entry = self.entity_versions.entry(entity_id.clone()).or_insert(0);
397            let current = *version_entry;
398
399            if let Some(expected) = expected_version
400                && current != expected
401            {
402                return Err(crate::error::AllSourceError::VersionConflict {
403                    expected,
404                    current,
405                });
406            }
407
408            // Write to WAL FIRST for durability (under version lock to keep atomicity)
409            if let Some(ref wal) = self.wal {
410                wal.append(event.clone())?;
411            }
412
413            *version_entry += 1;
414            *version_entry
415        };
416
417        // From here on, the event is durable (WAL) and version is bumped.
418        // Continue with indexing, projections, storage, and broadcast.
419        self.ingest_post_wal(event)?;
420
421        Ok(new_version)
422    }
423
424    /// Post-WAL ingestion: index, projections, storage, broadcast.
425    /// Called after WAL append and version bump are complete.
426    fn ingest_post_wal(&self, event: Event) -> Result<()> {
427        #[cfg(feature = "server")]
428        let timer = self.metrics.ingestion_duration_seconds.start_timer();
429
430        let mut events = self.events.write();
431        let offset = events.len();
432
433        // Index the event
434        self.index.index_event(
435            event.id,
436            event.entity_id_str(),
437            event.event_type_str(),
438            event.timestamp,
439            offset,
440        )?;
441
442        // Process through projections
443        let projections = self.projections.read();
444        projections.process_event(&event)?;
445        drop(projections);
446
447        // Process through pipelines
448        let pipeline_results = self.pipeline_manager.process_event(&event);
449        if !pipeline_results.is_empty() {
450            tracing::debug!(
451                "Event {} processed by {} pipeline(s)",
452                event.id,
453                pipeline_results.len()
454            );
455            for (pipeline_id, result) in pipeline_results {
456                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
457            }
458        }
459
460        // Persist to Parquet storage if enabled
461        if let Some(ref storage) = self.storage {
462            let storage = storage.read();
463            storage.append_event(event.clone())?;
464        }
465
466        // Store the event in memory
467        events.push(event.clone());
468        let total_events = events.len();
469        drop(events);
470
471        // Broadcast to WebSocket clients
472        #[cfg(feature = "server")]
473        self.websocket_manager
474            .broadcast_event(Arc::new(event.clone()));
475
476        // Dispatch to matching webhook subscriptions
477        #[cfg(feature = "server")]
478        self.dispatch_webhooks(&event);
479
480        // Update geospatial index
481        self.geo_index.index_event(&event);
482
483        // Autonomous schema evolution
484        self.schema_evolution
485            .analyze_event(event.event_type_str(), &event.payload);
486
487        // Check if automatic snapshot should be created
488        self.check_auto_snapshot(event.entity_id_str(), &event);
489
490        // Update metrics
491        #[cfg(feature = "server")]
492        {
493            self.metrics.events_ingested_total.inc();
494            self.metrics
495                .events_ingested_by_type
496                .with_label_values(&[event.event_type_str()])
497                .inc();
498            self.metrics.storage_events_total.set(total_events as i64);
499        }
500
501        // Update legacy total counter
502        let mut total = self.total_ingested.write();
503        *total += 1;
504
505        #[cfg(feature = "server")]
506        timer.observe_duration();
507
508        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
509
510        Ok(())
511    }
512
513    /// Ingest a new event into the store
514    pub fn ingest(&self, event: Event) -> Result<()> {
515        // Start metrics timer (v0.6 feature)
516        #[cfg(feature = "server")]
517        let timer = self.metrics.ingestion_duration_seconds.start_timer();
518
519        // Validate event
520        let validation_result = self.validate_event(&event);
521        if let Err(e) = validation_result {
522            #[cfg(feature = "server")]
523            {
524                self.metrics.ingestion_errors_total.inc();
525                timer.observe_duration();
526            }
527            return Err(e);
528        }
529
530        // Write to WAL FIRST for durability (v0.2 feature)
531        // This ensures event is persisted before processing
532        if let Some(ref wal) = self.wal
533            && let Err(e) = wal.append(event.clone())
534        {
535            #[cfg(feature = "server")]
536            {
537                self.metrics.ingestion_errors_total.inc();
538                timer.observe_duration();
539            }
540            return Err(e);
541        }
542
543        // Track per-entity version (unconditional increment, no version check)
544        *self
545            .entity_versions
546            .entry(event.entity_id_str().to_string())
547            .or_insert(0) += 1;
548
549        let mut events = self.events.write();
550        let offset = events.len();
551
552        // Index the event
553        self.index.index_event(
554            event.id,
555            event.entity_id_str(),
556            event.event_type_str(),
557            event.timestamp,
558            offset,
559        )?;
560
561        // Process through projections
562        let projections = self.projections.read();
563        projections.process_event(&event)?;
564        drop(projections); // Release lock
565
566        // Process through pipelines (v0.5 feature)
567        // Pipelines can transform, filter, and aggregate events in real-time
568        let pipeline_results = self.pipeline_manager.process_event(&event);
569        if !pipeline_results.is_empty() {
570            tracing::debug!(
571                "Event {} processed by {} pipeline(s)",
572                event.id,
573                pipeline_results.len()
574            );
575            // Pipeline results could be stored, emitted, or forwarded elsewhere
576            // For now, we just log them for observability
577            for (pipeline_id, result) in pipeline_results {
578                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
579            }
580        }
581
582        // Persist to Parquet storage if enabled (v0.2)
583        if let Some(ref storage) = self.storage {
584            let storage = storage.read();
585            storage.append_event(event.clone())?;
586        }
587
588        // Store the event in memory
589        events.push(event.clone());
590        let total_events = events.len();
591        drop(events); // Release lock early
592
593        // Broadcast to WebSocket clients (v0.2 feature)
594        #[cfg(feature = "server")]
595        self.websocket_manager
596            .broadcast_event(Arc::new(event.clone()));
597
598        // Dispatch to matching webhook subscriptions (v0.11 feature)
599        #[cfg(feature = "server")]
600        self.dispatch_webhooks(&event);
601
602        // Update geospatial index (v2.0 feature)
603        self.geo_index.index_event(&event);
604
605        // Autonomous schema evolution (v2.0 feature)
606        self.schema_evolution
607            .analyze_event(event.event_type_str(), &event.payload);
608
609        // Check if automatic snapshot should be created (v0.2 feature)
610        self.check_auto_snapshot(event.entity_id_str(), &event);
611
612        // Update metrics (v0.6 feature)
613        #[cfg(feature = "server")]
614        {
615            self.metrics.events_ingested_total.inc();
616            self.metrics
617                .events_ingested_by_type
618                .with_label_values(&[event.event_type_str()])
619                .inc();
620            self.metrics.storage_events_total.set(total_events as i64);
621        }
622
623        // Update legacy total counter
624        let mut total = self.total_ingested.write();
625        *total += 1;
626
627        #[cfg(feature = "server")]
628        timer.observe_duration();
629
630        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
631
632        Ok(())
633    }
634
635    /// Ingest a batch of events with a single write lock acquisition.
636    ///
637    /// All events are validated first. If any event fails validation, no
638    /// events are stored (all-or-nothing validation). Events are then written
639    /// to WAL, indexed, processed through projections, and pushed to the
640    /// events vector under a single write lock.
641    pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()> {
642        if batch.is_empty() {
643            return Ok(());
644        }
645
646        // Phase 1: Validate all events before acquiring any locks
647        for event in &batch {
648            self.validate_event(event)?;
649        }
650
651        // Phase 2: Write all events to WAL (before write lock, for durability)
652        if let Some(ref wal) = self.wal {
653            for event in &batch {
654                wal.append(event.clone())?;
655            }
656        }
657
658        // Phase 3: Single write lock for index + projections + push
659        let mut events = self.events.write();
660        let projections = self.projections.read();
661
662        for event in batch {
663            let offset = events.len();
664
665            self.index.index_event(
666                event.id,
667                event.entity_id_str(),
668                event.event_type_str(),
669                event.timestamp,
670                offset,
671            )?;
672
673            projections.process_event(&event)?;
674            self.pipeline_manager.process_event(&event);
675
676            if let Some(ref storage) = self.storage {
677                let storage = storage.read();
678                storage.append_event(event.clone())?;
679            }
680
681            self.geo_index.index_event(&event);
682            self.schema_evolution
683                .analyze_event(event.event_type_str(), &event.payload);
684
685            // Track per-entity version
686            *self
687                .entity_versions
688                .entry(event.entity_id_str().to_string())
689                .or_insert(0) += 1;
690
691            events.push(event);
692        }
693
694        let total_events = events.len();
695        drop(projections);
696        drop(events);
697
698        let mut total = self.total_ingested.write();
699        *total += total_events as u64;
700
701        Ok(())
702    }
703
704    /// Ingest a replicated event from the leader (follower mode).
705    ///
706    /// Unlike `ingest()`, this method:
707    /// - Skips WAL writing (the follower's WalReceiver manages its own local WAL)
708    /// - Skips schema validation (the leader already validated)
709    /// - Still indexes, processes projections/pipelines, and broadcasts to WebSocket clients
710    pub fn ingest_replicated(&self, event: Event) -> Result<()> {
711        #[cfg(feature = "server")]
712        let timer = self.metrics.ingestion_duration_seconds.start_timer();
713
714        let mut events = self.events.write();
715        let offset = events.len();
716
717        // Index the event
718        self.index.index_event(
719            event.id,
720            event.entity_id_str(),
721            event.event_type_str(),
722            event.timestamp,
723            offset,
724        )?;
725
726        // Process through projections
727        let projections = self.projections.read();
728        projections.process_event(&event)?;
729        drop(projections);
730
731        // Process through pipelines
732        let pipeline_results = self.pipeline_manager.process_event(&event);
733        if !pipeline_results.is_empty() {
734            tracing::debug!(
735                "Replicated event {} processed by {} pipeline(s)",
736                event.id,
737                pipeline_results.len()
738            );
739        }
740
741        // Track per-entity version
742        *self
743            .entity_versions
744            .entry(event.entity_id_str().to_string())
745            .or_insert(0) += 1;
746
747        // Store the event in memory
748        events.push(event.clone());
749        let total_events = events.len();
750        drop(events);
751
752        // Broadcast to WebSocket clients
753        #[cfg(feature = "server")]
754        self.websocket_manager
755            .broadcast_event(Arc::new(event.clone()));
756
757        // Update metrics
758        #[cfg(feature = "server")]
759        {
760            self.metrics.events_ingested_total.inc();
761            self.metrics
762                .events_ingested_by_type
763                .with_label_values(&[event.event_type_str()])
764                .inc();
765            self.metrics.storage_events_total.set(total_events as i64);
766        }
767
768        let mut total = self.total_ingested.write();
769        *total += 1;
770
771        #[cfg(feature = "server")]
772        timer.observe_duration();
773
774        tracing::debug!(
775            "Replicated event ingested: {} (offset: {})",
776            event.id,
777            offset
778        );
779
780        Ok(())
781    }
782
783    /// Get the current version for an entity (number of events appended for it).
784    /// Returns 0 if the entity has no events.
785    pub fn get_entity_version(&self, entity_id: &str) -> u64 {
786        self.entity_versions
787            .get(entity_id)
788            .map(|v| *v)
789            .unwrap_or(0)
790    }
791
792    /// Get the consumer registry for durable subscriptions.
793    pub fn consumer_registry(&self) -> &ConsumerRegistry {
794        &self.consumer_registry
795    }
796
797    /// Get the total number of events in the store (used as max offset for consumer ack).
798    pub fn total_events(&self) -> usize {
799        self.events.read().len()
800    }
801
802    /// Get events after a given offset, optionally filtered by event type prefixes.
803    /// Used by consumer polling to fetch unprocessed events.
804    pub fn events_after_offset(
805        &self,
806        offset: u64,
807        filters: &[String],
808        limit: usize,
809    ) -> Vec<(u64, Event)> {
810        let events = self.events.read();
811        let start = offset as usize;
812        if start >= events.len() {
813            return vec![];
814        }
815
816        events[start..]
817            .iter()
818            .enumerate()
819            .filter(|(_, event)| {
820                ConsumerRegistry::matches_filters(event.event_type_str(), filters)
821            })
822            .take(limit)
823            .map(|(i, event)| ((start + i + 1) as u64, event.clone()))
824            .collect()
825    }
826
827    /// Get the WebSocket manager for this store
828    #[cfg(feature = "server")]
829    pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
830        Arc::clone(&self.websocket_manager)
831    }
832
833    /// Get the snapshot manager for this store
834    pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
835        Arc::clone(&self.snapshot_manager)
836    }
837
838    /// Get the compaction manager for this store
839    pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
840        self.compaction_manager.as_ref().map(Arc::clone)
841    }
842
843    /// Get the schema registry for this store (v0.5 feature)
844    pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
845        Arc::clone(&self.schema_registry)
846    }
847
848    /// Get the replay manager for this store (v0.5 feature)
849    pub fn replay_manager(&self) -> Arc<ReplayManager> {
850        Arc::clone(&self.replay_manager)
851    }
852
853    /// Get the pipeline manager for this store (v0.5 feature)
854    pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
855        Arc::clone(&self.pipeline_manager)
856    }
857
858    /// Get the metrics registry for this store (v0.6 feature)
859    #[cfg(feature = "server")]
860    pub fn metrics(&self) -> Arc<MetricsRegistry> {
861        Arc::clone(&self.metrics)
862    }
863
864    /// Get the projection manager for this store (v0.7 feature)
865    pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
866        self.projections.read()
867    }
868
869    /// Register a custom projection at runtime.
870    ///
871    /// The projection will receive all future events via `process()`.
872    /// Historical events are **not** replayed — only events ingested after
873    /// registration will be processed by this projection.
874    ///
875    /// See [`register_projection_with_backfill`](Self::register_projection_with_backfill)
876    /// to also process historical events.
877    pub fn register_projection(
878        &self,
879        projection: Arc<dyn crate::application::services::projection::Projection>,
880    ) {
881        let mut pm = self.projections.write();
882        pm.register(projection);
883    }
884
885    /// Register a custom projection and replay all existing events through it.
886    ///
887    /// After registration, the projection will also receive all future events.
888    /// Historical events are replayed under a read lock — the projection's
889    /// internal state (typically DashMap) handles concurrent access.
890    pub fn register_projection_with_backfill(
891        &self,
892        projection: Arc<dyn crate::application::services::projection::Projection>,
893    ) -> Result<()> {
894        // First register so future events are processed
895        {
896            let mut pm = self.projections.write();
897            pm.register(Arc::clone(&projection));
898        }
899
900        // Then replay existing events under read lock
901        let events = self.events.read();
902        for event in events.iter() {
903            projection.process(event)?;
904        }
905
906        Ok(())
907    }
908
909    /// Get the projection state cache for this store (v0.7 feature)
910    /// Used by Elixir Query Service for state synchronization
911    pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
912        Arc::clone(&self.projection_state_cache)
913    }
914
915    /// Get the projection status map (v0.13 feature)
916    pub fn projection_status(&self) -> Arc<DashMap<String, String>> {
917        Arc::clone(&self.projection_status)
918    }
919
920    /// Get the webhook registry for this store (v0.11 feature)
921    /// Geospatial index for coordinate-based queries (v2.0 feature)
922    pub fn geo_index(&self) -> Arc<GeoIndex> {
923        self.geo_index.clone()
924    }
925
926    /// Exactly-once processing registry (v2.0 feature)
927    pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
928        self.exactly_once.clone()
929    }
930
931    /// Schema evolution manager (v2.0 feature)
932    pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
933        self.schema_evolution.clone()
934    }
935
936    /// Get a read-locked snapshot of all events (for EventQL/GraphQL queries).
937    ///
938    /// Returns an `Arc` reference to the internal events vec, avoiding a full
939    /// clone. The caller holds a read lock for the duration of the `Arc`
940    /// lifetime — prefer short-lived usage.
941    pub fn snapshot_events(&self) -> Vec<Event> {
942        self.events.read().clone()
943    }
944
945    /// Compact token events for an entity by replacing matching events with a
946    /// single merged event. Used by the embedded streaming feature.
947    ///
948    /// Returns `Ok(true)` if compaction was performed, `Ok(false)` if no
949    /// matching events were found.
950    ///
951    /// **Note:** The merged event is processed through projections *without*
952    /// clearing the removed events' projection state first. Projections that
953    /// accumulate state (e.g., counters) should be designed to handle this
954    /// (the merged event replaces individual tokens, not adds to them).
955    ///
956    /// **Crash safety:** The WAL append happens *after* the in-memory swap
957    /// under the write lock. If the process crashes before the WAL write,
958    /// no change is persisted — WAL replay restores the pre-compaction state.
959    /// If the process crashes after the WAL write, replay sees the merged
960    /// event (and the original tokens, which are idempotent to replay since
961    /// the merged event supersedes them).
962    ///
963    /// The write lock is held for the swap + WAL write + index rebuild.
964    /// The index rebuild is O(N) over all events, which is acceptable for
965    /// embedded workloads but should not be called in hot paths for large stores.
966    pub fn compact_entity_tokens(
967        &self,
968        entity_id: &str,
969        token_event_type: &str,
970        merged_event: Event,
971    ) -> Result<bool> {
972        // Phase 1: Read-only check — do we have anything to compact?
973        {
974            let events = self.events.read();
975            let has_tokens = events
976                .iter()
977                .any(|e| e.entity_id_str() == entity_id && e.event_type_str() == token_event_type);
978            if !has_tokens {
979                return Ok(false);
980            }
981        }
982
983        // Phase 2: Process merged event through projections (no write lock held)
984        let projections = self.projections.read();
985        projections.process_event(&merged_event)?;
986        drop(projections);
987
988        // Phase 3: Acquire write lock for the swap + WAL + index rebuild
989        let mut events = self.events.write();
990
991        events.retain(|e| {
992            !(e.entity_id_str() == entity_id && e.event_type_str() == token_event_type)
993        });
994
995        events.push(merged_event.clone());
996
997        // WAL append inside write lock: crash before this line = no change persisted.
998        // Crash after = merged event in WAL, original tokens also in WAL but
999        // superseded by the merged event's entity_id + event_type.
1000        if let Some(ref wal) = self.wal {
1001            wal.append(merged_event)?;
1002        }
1003
1004        // Rebuild entire index since retain() shifted event positions.
1005        // Errors here indicate a corrupt event (missing entity_id/event_type)
1006        // which should not happen for well-formed events. Log and continue
1007        // rather than failing the entire compaction.
1008        self.index.clear();
1009        for (offset, event) in events.iter().enumerate() {
1010            if let Err(e) = self.index.index_event(
1011                event.id,
1012                event.entity_id_str(),
1013                event.event_type_str(),
1014                event.timestamp,
1015                offset,
1016            ) {
1017                tracing::warn!(
1018                    event_id = %event.id,
1019                    offset,
1020                    "Failed to re-index event during compaction: {e}"
1021                );
1022            }
1023        }
1024
1025        Ok(true)
1026    }
1027
1028    #[cfg(feature = "server")]
1029    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
1030        Arc::clone(&self.webhook_registry)
1031    }
1032
1033    /// Set the channel for async webhook delivery.
1034    /// Called during server startup to wire the delivery worker.
1035    #[cfg(feature = "server")]
1036    pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
1037        *self.webhook_tx.write() = Some(tx);
1038        tracing::info!("Webhook delivery channel connected");
1039    }
1040
1041    /// Dispatch matching webhooks for a given event (non-blocking).
1042    #[cfg(feature = "server")]
1043    fn dispatch_webhooks(&self, event: &Event) {
1044        let matching = self.webhook_registry.find_matching(event);
1045        if matching.is_empty() {
1046            return;
1047        }
1048
1049        let tx_guard = self.webhook_tx.read();
1050        if let Some(ref tx) = *tx_guard {
1051            for webhook in matching {
1052                let task = WebhookDeliveryTask {
1053                    webhook,
1054                    event: event.clone(),
1055                };
1056                if let Err(e) = tx.send(task) {
1057                    tracing::warn!("Failed to queue webhook delivery: {}", e);
1058                }
1059            }
1060        }
1061    }
1062
1063    /// Manually flush any pending events to persistent storage
1064    pub fn flush_storage(&self) -> Result<()> {
1065        if let Some(ref storage) = self.storage {
1066            let storage = storage.read();
1067            storage.flush()?;
1068            tracing::info!("✅ Flushed events to persistent storage");
1069        }
1070        Ok(())
1071    }
1072
1073    /// Manually create a snapshot for an entity
1074    pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
1075        // Get all events for this entity
1076        let events = self.query(QueryEventsRequest {
1077            entity_id: Some(entity_id.to_string()),
1078            event_type: None,
1079            tenant_id: None,
1080            as_of: None,
1081            since: None,
1082            until: None,
1083            limit: None,
1084            event_type_prefix: None,
1085            payload_filter: None,
1086        })?;
1087
1088        if events.is_empty() {
1089            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1090        }
1091
1092        // Build current state
1093        let mut state = serde_json::json!({});
1094        for event in &events {
1095            if let serde_json::Value::Object(ref mut state_map) = state
1096                && let serde_json::Value::Object(ref payload_map) = event.payload
1097            {
1098                for (key, value) in payload_map {
1099                    state_map.insert(key.clone(), value.clone());
1100                }
1101            }
1102        }
1103
1104        let last_event = events.last().unwrap();
1105        self.snapshot_manager.create_snapshot(
1106            entity_id.to_string(),
1107            state,
1108            last_event.timestamp,
1109            events.len(),
1110            SnapshotType::Manual,
1111        )?;
1112
1113        Ok(())
1114    }
1115
1116    /// Check and create automatic snapshots if needed
1117    fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
1118        // Count events for this entity
1119        let entity_event_count = self
1120            .index
1121            .get_by_entity(entity_id)
1122            .map(|entries| entries.len())
1123            .unwrap_or(0);
1124
1125        if self.snapshot_manager.should_create_snapshot(
1126            entity_id,
1127            entity_event_count,
1128            event.timestamp,
1129        ) {
1130            // Create snapshot in background (don't block ingestion)
1131            if let Err(e) = self.create_snapshot(entity_id) {
1132                tracing::warn!(
1133                    "Failed to create automatic snapshot for {}: {}",
1134                    entity_id,
1135                    e
1136                );
1137            }
1138        }
1139    }
1140
1141    /// Validate an event before ingestion
1142    fn validate_event(&self, event: &Event) -> Result<()> {
1143        // EntityId and EventType value objects already validate non-empty in their constructors
1144        // So these checks are now redundant, but we keep them for explicit validation
1145        if event.entity_id_str().is_empty() {
1146            return Err(AllSourceError::ValidationError(
1147                "entity_id cannot be empty".to_string(),
1148            ));
1149        }
1150
1151        if event.event_type_str().is_empty() {
1152            return Err(AllSourceError::ValidationError(
1153                "event_type cannot be empty".to_string(),
1154            ));
1155        }
1156
1157        // Reject system namespace events from user-facing ingestion.
1158        // System events are written exclusively via SystemMetadataStore.
1159        if event.event_type().is_system() {
1160            return Err(AllSourceError::ValidationError(
1161                "Event types starting with '_system.' are reserved for internal use".to_string(),
1162            ));
1163        }
1164
1165        Ok(())
1166    }
1167
1168    /// Reset a projection by clearing its state and reprocessing all events
1169    pub fn reset_projection(&self, name: &str) -> Result<usize> {
1170        let projection_manager = self.projections.read();
1171        let projection = projection_manager.get_projection(name).ok_or_else(|| {
1172            AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1173        })?;
1174
1175        // Clear existing state
1176        projection.clear();
1177
1178        // Clear cached state for this projection
1179        let prefix = format!("{name}:");
1180        let keys_to_remove: Vec<String> = self
1181            .projection_state_cache
1182            .iter()
1183            .filter(|entry| entry.key().starts_with(&prefix))
1184            .map(|entry| entry.key().clone())
1185            .collect();
1186        for key in keys_to_remove {
1187            self.projection_state_cache.remove(&key);
1188        }
1189
1190        // Reprocess all events through this projection
1191        let events = self.events.read();
1192        let mut reprocessed = 0usize;
1193        for event in events.iter() {
1194            if projection.process(event).is_ok() {
1195                reprocessed += 1;
1196            }
1197        }
1198
1199        Ok(reprocessed)
1200    }
1201
1202    /// Get a single event by its UUID
1203    pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
1204        if let Some(offset) = self.index.get_by_id(event_id) {
1205            let events = self.events.read();
1206            Ok(events.get(offset).cloned())
1207        } else {
1208            Ok(None)
1209        }
1210    }
1211
1212    /// Query events based on filters (optimized with indices)
1213    pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
1214        // Determine query type for metrics (v0.6 feature)
1215        let query_type = if request.entity_id.is_some() {
1216            "entity"
1217        } else if request.event_type.is_some() {
1218            "type"
1219        } else if request.event_type_prefix.is_some() {
1220            "type_prefix"
1221        } else {
1222            "full_scan"
1223        };
1224
1225        // Start metrics timer (v0.6 feature)
1226        #[cfg(feature = "server")]
1227        let timer = self
1228            .metrics
1229            .query_duration_seconds
1230            .with_label_values(&[query_type])
1231            .start_timer();
1232
1233        // Increment query counter (v0.6 feature)
1234        #[cfg(feature = "server")]
1235        self.metrics
1236            .queries_total
1237            .with_label_values(&[query_type])
1238            .inc();
1239
1240        let events = self.events.read();
1241
1242        // Use index for fast lookups
1243        let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
1244            // Use entity index
1245            self.index
1246                .get_by_entity(entity_id)
1247                .map(|entries| self.filter_entries(entries, &request))
1248                .unwrap_or_default()
1249        } else if let Some(event_type) = &request.event_type {
1250            // Use type index (exact match)
1251            self.index
1252                .get_by_type(event_type)
1253                .map(|entries| self.filter_entries(entries, &request))
1254                .unwrap_or_default()
1255        } else if let Some(prefix) = &request.event_type_prefix {
1256            // Use type index (prefix match)
1257            let entries = self.index.get_by_type_prefix(prefix);
1258            self.filter_entries(entries, &request)
1259        } else {
1260            // Full scan (less efficient but necessary for complex queries)
1261            (0..events.len()).collect()
1262        };
1263
1264        // Fetch events and apply remaining filters
1265        let mut results: Vec<Event> = offsets
1266            .iter()
1267            .filter_map(|&offset| events.get(offset).cloned())
1268            .filter(|event| self.apply_filters(event, &request))
1269            .collect();
1270
1271        // Sort by timestamp (ascending)
1272        results.sort_by_key(|x| x.timestamp);
1273
1274        // Apply limit
1275        if let Some(limit) = request.limit {
1276            results.truncate(limit);
1277        }
1278
1279        // Record query results count (v0.6 feature)
1280        #[cfg(feature = "server")]
1281        {
1282            self.metrics
1283                .query_results_total
1284                .with_label_values(&[query_type])
1285                .inc_by(results.len() as u64);
1286            timer.observe_duration();
1287        }
1288
1289        Ok(results)
1290    }
1291
1292    /// Filter index entries based on query parameters
1293    fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
1294        entries
1295            .into_iter()
1296            .filter(|entry| {
1297                // Time filters
1298                if let Some(as_of) = request.as_of
1299                    && entry.timestamp > as_of
1300                {
1301                    return false;
1302                }
1303                if let Some(since) = request.since
1304                    && entry.timestamp < since
1305                {
1306                    return false;
1307                }
1308                if let Some(until) = request.until
1309                    && entry.timestamp > until
1310                {
1311                    return false;
1312                }
1313                true
1314            })
1315            .map(|entry| entry.offset)
1316            .collect()
1317    }
1318
1319    /// Apply filters to an event
1320    fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
1321        // Additional type filter if entity was primary
1322        if request.entity_id.is_some()
1323            && let Some(ref event_type) = request.event_type
1324            && event.event_type_str() != event_type
1325        {
1326            return false;
1327        }
1328
1329        // Additional prefix filter if entity was primary
1330        if request.entity_id.is_some()
1331            && let Some(ref prefix) = request.event_type_prefix
1332            && !event.event_type_str().starts_with(prefix)
1333        {
1334            return false;
1335        }
1336
1337        // Payload field filtering
1338        if let Some(ref filter_str) = request.payload_filter
1339            && let Ok(filter_obj) =
1340                serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(filter_str)
1341        {
1342            let payload = event.payload();
1343            for (key, expected_value) in &filter_obj {
1344                match payload.get(key) {
1345                    Some(actual_value) if actual_value == expected_value => {}
1346                    _ => return false,
1347                }
1348            }
1349        }
1350
1351        true
1352    }
1353
1354    /// Reconstruct entity state as of a specific timestamp
1355    /// v0.2: Now uses snapshots for fast reconstruction
1356    pub fn reconstruct_state(
1357        &self,
1358        entity_id: &str,
1359        as_of: Option<DateTime<Utc>>,
1360    ) -> Result<serde_json::Value> {
1361        // Try to find a snapshot to use as a base (v0.2 optimization)
1362        let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
1363            // Get snapshot closest to requested time
1364            if let Some(snapshot) = self
1365                .snapshot_manager
1366                .get_snapshot_as_of(entity_id, as_of_time)
1367            {
1368                tracing::debug!(
1369                    "Using snapshot from {} for entity {} (saved {} events)",
1370                    snapshot.as_of,
1371                    entity_id,
1372                    snapshot.event_count
1373                );
1374                (snapshot.state.clone(), Some(snapshot.as_of))
1375            } else {
1376                (serde_json::json!({}), None)
1377            }
1378        } else {
1379            // Get latest snapshot for current state
1380            if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
1381                tracing::debug!(
1382                    "Using latest snapshot from {} for entity {}",
1383                    snapshot.as_of,
1384                    entity_id
1385                );
1386                (snapshot.state.clone(), Some(snapshot.as_of))
1387            } else {
1388                (serde_json::json!({}), None)
1389            }
1390        };
1391
1392        // Query events after the snapshot (or all if no snapshot)
1393        let events = self.query(QueryEventsRequest {
1394            entity_id: Some(entity_id.to_string()),
1395            event_type: None,
1396            tenant_id: None,
1397            as_of,
1398            since: since_timestamp,
1399            until: None,
1400            limit: None,
1401            event_type_prefix: None,
1402            payload_filter: None,
1403        })?;
1404
1405        // If no events and no snapshot, entity not found
1406        if events.is_empty() && since_timestamp.is_none() {
1407            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1408        }
1409
1410        // Merge events on top of snapshot (or from scratch if no snapshot)
1411        let mut merged_state = merged_state;
1412        for event in &events {
1413            if let serde_json::Value::Object(ref mut state_map) = merged_state
1414                && let serde_json::Value::Object(ref payload_map) = event.payload
1415            {
1416                for (key, value) in payload_map {
1417                    state_map.insert(key.clone(), value.clone());
1418                }
1419            }
1420        }
1421
1422        // Wrap with metadata
1423        let state = serde_json::json!({
1424            "entity_id": entity_id,
1425            "last_updated": events.last().map(|e| e.timestamp),
1426            "event_count": events.len(),
1427            "as_of": as_of,
1428            "current_state": merged_state,
1429            "history": events.iter().map(|e| {
1430                serde_json::json!({
1431                    "event_id": e.id,
1432                    "type": e.event_type,
1433                    "timestamp": e.timestamp,
1434                    "payload": e.payload
1435                })
1436            }).collect::<Vec<_>>()
1437        });
1438
1439        Ok(state)
1440    }
1441
1442    /// Get snapshot from projection (faster than reconstructing)
1443    pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
1444        let projections = self.projections.read();
1445
1446        if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
1447            && let Some(state) = snapshot_projection.get_state(entity_id)
1448        {
1449            return Ok(serde_json::json!({
1450                "entity_id": entity_id,
1451                "snapshot": state,
1452                "from_projection": "entity_snapshots"
1453            }));
1454        }
1455
1456        Err(AllSourceError::EntityNotFound(entity_id.to_string()))
1457    }
1458
1459    /// Get statistics about the event store
1460    pub fn stats(&self) -> StoreStats {
1461        let events = self.events.read();
1462        let index_stats = self.index.stats();
1463
1464        StoreStats {
1465            total_events: events.len(),
1466            total_entities: index_stats.total_entities,
1467            total_event_types: index_stats.total_event_types,
1468            total_ingested: *self.total_ingested.read(),
1469        }
1470    }
1471
1472    /// Get all unique streams (entity_ids) in the store
1473    pub fn list_streams(&self) -> Vec<StreamInfo> {
1474        self.index
1475            .get_all_entities()
1476            .into_iter()
1477            .map(|entity_id| {
1478                let event_count = self
1479                    .index
1480                    .get_by_entity(&entity_id)
1481                    .map(|entries| entries.len())
1482                    .unwrap_or(0);
1483                let last_event_at = self
1484                    .index
1485                    .get_by_entity(&entity_id)
1486                    .and_then(|entries| entries.last().map(|e| e.timestamp));
1487                StreamInfo {
1488                    stream_id: entity_id,
1489                    event_count,
1490                    last_event_at,
1491                }
1492            })
1493            .collect()
1494    }
1495
1496    /// Get all unique event types in the store
1497    pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
1498        self.index
1499            .get_all_types()
1500            .into_iter()
1501            .map(|event_type| {
1502                let event_count = self
1503                    .index
1504                    .get_by_type(&event_type)
1505                    .map(|entries| entries.len())
1506                    .unwrap_or(0);
1507                let last_event_at = self
1508                    .index
1509                    .get_by_type(&event_type)
1510                    .and_then(|entries| entries.last().map(|e| e.timestamp));
1511                EventTypeInfo {
1512                    event_type,
1513                    event_count,
1514                    last_event_at,
1515                }
1516            })
1517            .collect()
1518    }
1519
1520    /// Attach a broadcast sender to the WAL for replication.
1521    ///
1522    /// Thread-safe: can be called through `Arc<EventStore>` at runtime.
1523    /// Used during initial setup and during follower → leader promotion.
1524    /// When set, every WAL append publishes the entry to the broadcast
1525    /// channel so the WAL shipper can stream it to followers.
1526    pub fn enable_wal_replication(
1527        &self,
1528        tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
1529    ) {
1530        if let Some(ref wal_arc) = self.wal {
1531            wal_arc.set_replication_tx(tx);
1532            tracing::info!("WAL replication broadcast enabled");
1533        } else {
1534            tracing::warn!("Cannot enable WAL replication: WAL is not configured");
1535        }
1536    }
1537
1538    /// Get a reference to the WAL (if configured).
1539    /// Used by the replication catch-up protocol to determine oldest available offset.
1540    pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1541        self.wal.as_ref()
1542    }
1543
1544    /// Get a reference to the Parquet storage (if configured).
1545    /// Used by the replication catch-up protocol to stream snapshot files to followers.
1546    pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1547        self.storage.as_ref()
1548    }
1549}
1550
1551/// Configuration for EventStore
1552#[derive(Debug, Clone, Default)]
1553pub struct EventStoreConfig {
1554    /// Optional directory for persistent Parquet storage (v0.2 feature)
1555    pub storage_dir: Option<PathBuf>,
1556
1557    /// Snapshot configuration (v0.2 feature)
1558    pub snapshot_config: SnapshotConfig,
1559
1560    /// Optional directory for WAL (Write-Ahead Log) (v0.2 feature)
1561    pub wal_dir: Option<PathBuf>,
1562
1563    /// WAL configuration (v0.2 feature)
1564    pub wal_config: WALConfig,
1565
1566    /// Compaction configuration (v0.2 feature)
1567    pub compaction_config: CompactionConfig,
1568
1569    /// Schema registry configuration (v0.5 feature)
1570    pub schema_registry_config: SchemaRegistryConfig,
1571
1572    /// Optional directory for system metadata storage (dogfood feature).
1573    /// When set, operational metadata (tenants, config, audit) is stored
1574    /// using AllSource's own event store rather than an external database.
1575    /// Defaults to `{storage_dir}/__system/` when storage_dir is set.
1576    pub system_data_dir: Option<PathBuf>,
1577
1578    /// Name of the default tenant to auto-create on first boot.
1579    pub bootstrap_tenant: Option<String>,
1580}
1581
1582impl EventStoreConfig {
1583    /// Create config with persistent storage enabled
1584    pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
1585        Self {
1586            storage_dir: Some(storage_dir.into()),
1587            ..Self::default()
1588        }
1589    }
1590
1591    /// Create config with custom snapshot settings
1592    pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
1593        Self {
1594            snapshot_config,
1595            ..Self::default()
1596        }
1597    }
1598
1599    /// Create config with WAL enabled
1600    pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
1601        Self {
1602            wal_dir: Some(wal_dir.into()),
1603            wal_config,
1604            ..Self::default()
1605        }
1606    }
1607
1608    /// Create config with both persistence and snapshots
1609    pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
1610        Self {
1611            storage_dir: Some(storage_dir.into()),
1612            snapshot_config,
1613            ..Self::default()
1614        }
1615    }
1616
1617    /// Create production config with all features enabled
1618    pub fn production(
1619        storage_dir: impl Into<PathBuf>,
1620        wal_dir: impl Into<PathBuf>,
1621        snapshot_config: SnapshotConfig,
1622        wal_config: WALConfig,
1623        compaction_config: CompactionConfig,
1624    ) -> Self {
1625        let storage_dir = storage_dir.into();
1626        let system_data_dir = storage_dir.join("__system");
1627        Self {
1628            storage_dir: Some(storage_dir),
1629            snapshot_config,
1630            wal_dir: Some(wal_dir.into()),
1631            wal_config,
1632            compaction_config,
1633            system_data_dir: Some(system_data_dir),
1634            ..Self::default()
1635        }
1636    }
1637
1638    /// Resolve the effective system data directory.
1639    ///
1640    /// If explicitly set, returns that. Otherwise, derives from storage_dir.
1641    /// Returns None if neither is configured (in-memory mode).
1642    pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
1643        self.system_data_dir
1644            .clone()
1645            .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
1646    }
1647
1648    /// Build config from environment variables.
1649    ///
1650    /// Reads `ALLSOURCE_DATA_DIR`, `ALLSOURCE_STORAGE_DIR`, `ALLSOURCE_WAL_DIR`,
1651    /// and `ALLSOURCE_WAL_ENABLED` to determine persistence mode.
1652    ///
1653    /// Returns `(config, description)` where description is a human-readable
1654    /// summary of the persistence mode for logging.
1655    pub fn from_env() -> (Self, &'static str) {
1656        Self::from_env_vars(
1657            std::env::var("ALLSOURCE_DATA_DIR")
1658                .ok()
1659                .filter(|s| !s.is_empty()),
1660            std::env::var("ALLSOURCE_STORAGE_DIR")
1661                .ok()
1662                .filter(|s| !s.is_empty()),
1663            std::env::var("ALLSOURCE_WAL_DIR")
1664                .ok()
1665                .filter(|s| !s.is_empty()),
1666            std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
1667        )
1668    }
1669
1670    /// Build config from explicit env-var values (testable without mutating process env).
1671    pub fn from_env_vars(
1672        data_dir: Option<String>,
1673        explicit_storage_dir: Option<String>,
1674        explicit_wal_dir: Option<String>,
1675        wal_enabled_var: Option<String>,
1676    ) -> (Self, &'static str) {
1677        let data_dir = data_dir.filter(|s| !s.is_empty());
1678        let storage_dir = explicit_storage_dir
1679            .filter(|s| !s.is_empty())
1680            .or_else(|| data_dir.as_ref().map(|d| format!("{}/storage", d)));
1681        let wal_dir = explicit_wal_dir
1682            .filter(|s| !s.is_empty())
1683            .or_else(|| data_dir.as_ref().map(|d| format!("{}/wal", d)));
1684        let wal_enabled = wal_enabled_var.map(|v| v == "true").unwrap_or(true);
1685
1686        match (&storage_dir, &wal_dir) {
1687            (Some(sd), Some(wd)) if wal_enabled => {
1688                let config = Self::production(
1689                    sd,
1690                    wd,
1691                    SnapshotConfig::default(),
1692                    WALConfig::default(),
1693                    CompactionConfig::default(),
1694                );
1695                (config, "wal+parquet")
1696            }
1697            (Some(sd), _) => {
1698                let config = Self::with_persistence(sd);
1699                (config, "parquet-only")
1700            }
1701            (_, Some(wd)) if wal_enabled => {
1702                let config = Self::with_wal(wd, WALConfig::default());
1703                (config, "wal-only")
1704            }
1705            _ => (Self::default(), "in-memory"),
1706        }
1707    }
1708}
1709
1710#[derive(Debug, serde::Serialize)]
1711pub struct StoreStats {
1712    pub total_events: usize,
1713    pub total_entities: usize,
1714    pub total_event_types: usize,
1715    pub total_ingested: u64,
1716}
1717
1718/// Information about a stream (entity_id)
1719#[derive(Debug, Clone, serde::Serialize)]
1720pub struct StreamInfo {
1721    /// The stream identifier (entity_id)
1722    pub stream_id: String,
1723    /// Total number of events in this stream
1724    pub event_count: usize,
1725    /// Timestamp of the last event in this stream
1726    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1727}
1728
1729/// Information about an event type
1730#[derive(Debug, Clone, serde::Serialize)]
1731pub struct EventTypeInfo {
1732    /// The event type name
1733    pub event_type: String,
1734    /// Total number of events of this type
1735    pub event_count: usize,
1736    /// Timestamp of the last event of this type
1737    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1738}
1739
1740impl Default for EventStore {
1741    fn default() -> Self {
1742        Self::new()
1743    }
1744}
1745
1746#[cfg(test)]
1747mod tests {
1748    use super::*;
1749    use crate::domain::entities::Event;
1750    use tempfile::TempDir;
1751
1752    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1753        Event::from_strings(
1754            event_type.to_string(),
1755            entity_id.to_string(),
1756            "default".to_string(),
1757            serde_json::json!({"name": "Test", "value": 42}),
1758            None,
1759        )
1760        .unwrap()
1761    }
1762
1763    fn create_test_event_with_payload(
1764        entity_id: &str,
1765        event_type: &str,
1766        payload: serde_json::Value,
1767    ) -> Event {
1768        Event::from_strings(
1769            event_type.to_string(),
1770            entity_id.to_string(),
1771            "default".to_string(),
1772            payload,
1773            None,
1774        )
1775        .unwrap()
1776    }
1777
1778    #[test]
1779    fn test_event_store_new() {
1780        let store = EventStore::new();
1781        assert_eq!(store.stats().total_events, 0);
1782        assert_eq!(store.stats().total_entities, 0);
1783    }
1784
1785    #[test]
1786    fn test_event_store_default() {
1787        let store = EventStore::default();
1788        assert_eq!(store.stats().total_events, 0);
1789    }
1790
1791    #[test]
1792    fn test_ingest_single_event() {
1793        let store = EventStore::new();
1794        let event = create_test_event("entity-1", "user.created");
1795
1796        store.ingest(event).unwrap();
1797
1798        assert_eq!(store.stats().total_events, 1);
1799        assert_eq!(store.stats().total_ingested, 1);
1800    }
1801
1802    #[test]
1803    fn test_ingest_multiple_events() {
1804        let store = EventStore::new();
1805
1806        for i in 0..10 {
1807            let event = create_test_event(&format!("entity-{}", i), "user.created");
1808            store.ingest(event).unwrap();
1809        }
1810
1811        assert_eq!(store.stats().total_events, 10);
1812        assert_eq!(store.stats().total_ingested, 10);
1813    }
1814
1815    #[test]
1816    fn test_query_by_entity_id() {
1817        let store = EventStore::new();
1818
1819        store
1820            .ingest(create_test_event("entity-1", "user.created"))
1821            .unwrap();
1822        store
1823            .ingest(create_test_event("entity-2", "user.created"))
1824            .unwrap();
1825        store
1826            .ingest(create_test_event("entity-1", "user.updated"))
1827            .unwrap();
1828
1829        let results = store
1830            .query(QueryEventsRequest {
1831                entity_id: Some("entity-1".to_string()),
1832                event_type: None,
1833                tenant_id: None,
1834                as_of: None,
1835                since: None,
1836                until: None,
1837                limit: None,
1838                event_type_prefix: None,
1839                payload_filter: None,
1840            })
1841            .unwrap();
1842
1843        assert_eq!(results.len(), 2);
1844    }
1845
1846    #[test]
1847    fn test_query_by_event_type() {
1848        let store = EventStore::new();
1849
1850        store
1851            .ingest(create_test_event("entity-1", "user.created"))
1852            .unwrap();
1853        store
1854            .ingest(create_test_event("entity-2", "user.updated"))
1855            .unwrap();
1856        store
1857            .ingest(create_test_event("entity-3", "user.created"))
1858            .unwrap();
1859
1860        let results = store
1861            .query(QueryEventsRequest {
1862                entity_id: None,
1863                event_type: Some("user.created".to_string()),
1864                tenant_id: None,
1865                as_of: None,
1866                since: None,
1867                until: None,
1868                limit: None,
1869                event_type_prefix: None,
1870                payload_filter: None,
1871            })
1872            .unwrap();
1873
1874        assert_eq!(results.len(), 2);
1875    }
1876
1877    #[test]
1878    fn test_query_with_limit() {
1879        let store = EventStore::new();
1880
1881        for i in 0..10 {
1882            let event = create_test_event(&format!("entity-{}", i), "user.created");
1883            store.ingest(event).unwrap();
1884        }
1885
1886        let results = store
1887            .query(QueryEventsRequest {
1888                entity_id: None,
1889                event_type: None,
1890                tenant_id: None,
1891                as_of: None,
1892                since: None,
1893                until: None,
1894                limit: Some(5),
1895                event_type_prefix: None,
1896                payload_filter: None,
1897            })
1898            .unwrap();
1899
1900        assert_eq!(results.len(), 5);
1901    }
1902
1903    #[test]
1904    fn test_query_empty_store() {
1905        let store = EventStore::new();
1906
1907        let results = store
1908            .query(QueryEventsRequest {
1909                entity_id: Some("non-existent".to_string()),
1910                event_type: None,
1911                tenant_id: None,
1912                as_of: None,
1913                since: None,
1914                until: None,
1915                limit: None,
1916                event_type_prefix: None,
1917                payload_filter: None,
1918            })
1919            .unwrap();
1920
1921        assert!(results.is_empty());
1922    }
1923
1924    #[test]
1925    fn test_reconstruct_state() {
1926        let store = EventStore::new();
1927
1928        store
1929            .ingest(create_test_event("entity-1", "user.created"))
1930            .unwrap();
1931
1932        let state = store.reconstruct_state("entity-1", None).unwrap();
1933        // The state is wrapped with metadata
1934        assert_eq!(state["current_state"]["name"], "Test");
1935        assert_eq!(state["current_state"]["value"], 42);
1936    }
1937
1938    #[test]
1939    fn test_reconstruct_state_not_found() {
1940        let store = EventStore::new();
1941
1942        let result = store.reconstruct_state("non-existent", None);
1943        assert!(result.is_err());
1944    }
1945
1946    #[test]
1947    fn test_get_snapshot_empty() {
1948        let store = EventStore::new();
1949
1950        let result = store.get_snapshot("non-existent");
1951        // Entity not found error is expected
1952        assert!(result.is_err());
1953    }
1954
1955    #[test]
1956    fn test_create_snapshot() {
1957        let store = EventStore::new();
1958
1959        store
1960            .ingest(create_test_event("entity-1", "user.created"))
1961            .unwrap();
1962
1963        store.create_snapshot("entity-1").unwrap();
1964
1965        // Verify snapshot was created
1966        let snapshot = store.get_snapshot("entity-1").unwrap();
1967        assert!(snapshot != serde_json::json!(null));
1968    }
1969
1970    #[test]
1971    fn test_create_snapshot_entity_not_found() {
1972        let store = EventStore::new();
1973
1974        let result = store.create_snapshot("non-existent");
1975        assert!(result.is_err());
1976    }
1977
1978    #[test]
1979    fn test_websocket_manager() {
1980        let store = EventStore::new();
1981        let manager = store.websocket_manager();
1982        // Manager should be accessible
1983        assert!(Arc::strong_count(&manager) >= 1);
1984    }
1985
1986    #[test]
1987    fn test_snapshot_manager() {
1988        let store = EventStore::new();
1989        let manager = store.snapshot_manager();
1990        assert!(Arc::strong_count(&manager) >= 1);
1991    }
1992
1993    #[test]
1994    fn test_compaction_manager_none() {
1995        let store = EventStore::new();
1996        // Without storage_dir, compaction manager should be None
1997        assert!(store.compaction_manager().is_none());
1998    }
1999
2000    #[test]
2001    fn test_schema_registry() {
2002        let store = EventStore::new();
2003        let registry = store.schema_registry();
2004        assert!(Arc::strong_count(&registry) >= 1);
2005    }
2006
2007    #[test]
2008    fn test_replay_manager() {
2009        let store = EventStore::new();
2010        let manager = store.replay_manager();
2011        assert!(Arc::strong_count(&manager) >= 1);
2012    }
2013
2014    #[test]
2015    fn test_pipeline_manager() {
2016        let store = EventStore::new();
2017        let manager = store.pipeline_manager();
2018        assert!(Arc::strong_count(&manager) >= 1);
2019    }
2020
2021    #[test]
2022    fn test_projection_manager() {
2023        let store = EventStore::new();
2024        let manager = store.projection_manager();
2025        // Built-in projections should be registered
2026        let projections = manager.list_projections();
2027        assert!(projections.len() >= 2); // entity_snapshots and event_counters
2028    }
2029
2030    #[test]
2031    fn test_projection_state_cache() {
2032        let store = EventStore::new();
2033        let cache = store.projection_state_cache();
2034
2035        cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
2036        assert_eq!(cache.len(), 1);
2037
2038        let value = cache.get("test:key").unwrap();
2039        assert_eq!(value["value"], 123);
2040    }
2041
2042    #[test]
2043    fn test_metrics() {
2044        let store = EventStore::new();
2045        let metrics = store.metrics();
2046        assert!(Arc::strong_count(&metrics) >= 1);
2047    }
2048
2049    #[test]
2050    fn test_store_stats() {
2051        let store = EventStore::new();
2052
2053        store
2054            .ingest(create_test_event("entity-1", "user.created"))
2055            .unwrap();
2056        store
2057            .ingest(create_test_event("entity-2", "order.placed"))
2058            .unwrap();
2059
2060        let stats = store.stats();
2061        assert_eq!(stats.total_events, 2);
2062        assert_eq!(stats.total_entities, 2);
2063        assert_eq!(stats.total_event_types, 2);
2064        assert_eq!(stats.total_ingested, 2);
2065    }
2066
2067    #[test]
2068    fn test_event_store_config_default() {
2069        let config = EventStoreConfig::default();
2070        assert!(config.storage_dir.is_none());
2071        assert!(config.wal_dir.is_none());
2072    }
2073
2074    #[test]
2075    fn test_event_store_config_with_persistence() {
2076        let temp_dir = TempDir::new().unwrap();
2077        let config = EventStoreConfig::with_persistence(temp_dir.path());
2078
2079        assert!(config.storage_dir.is_some());
2080        assert!(config.wal_dir.is_none());
2081    }
2082
2083    #[test]
2084    fn test_event_store_config_with_wal() {
2085        let temp_dir = TempDir::new().unwrap();
2086        let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
2087
2088        assert!(config.storage_dir.is_none());
2089        assert!(config.wal_dir.is_some());
2090    }
2091
2092    #[test]
2093    fn test_event_store_config_with_all() {
2094        let temp_dir = TempDir::new().unwrap();
2095        let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
2096
2097        assert!(config.storage_dir.is_some());
2098    }
2099
2100    #[test]
2101    fn test_event_store_config_production() {
2102        let storage_dir = TempDir::new().unwrap();
2103        let wal_dir = TempDir::new().unwrap();
2104        let config = EventStoreConfig::production(
2105            storage_dir.path(),
2106            wal_dir.path(),
2107            SnapshotConfig::default(),
2108            WALConfig::default(),
2109            CompactionConfig::default(),
2110        );
2111
2112        assert!(config.storage_dir.is_some());
2113        assert!(config.wal_dir.is_some());
2114    }
2115
2116    // -----------------------------------------------------------------------
2117    // from_env_vars tests — verifies the env-var-to-config wiring that
2118    // caused the durability bug (events lost on restart) in v0.10.3.
2119    // -----------------------------------------------------------------------
2120
2121    #[test]
2122    fn test_from_env_vars_data_dir_enables_full_persistence() {
2123        let (config, mode) =
2124            EventStoreConfig::from_env_vars(Some("/app/data".to_string()), None, None, None);
2125        assert_eq!(mode, "wal+parquet");
2126        assert_eq!(
2127            config.storage_dir.unwrap().to_str().unwrap(),
2128            "/app/data/storage"
2129        );
2130        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
2131    }
2132
2133    #[test]
2134    fn test_from_env_vars_explicit_dirs() {
2135        let (config, mode) = EventStoreConfig::from_env_vars(
2136            None,
2137            Some("/custom/storage".to_string()),
2138            Some("/custom/wal".to_string()),
2139            None,
2140        );
2141        assert_eq!(mode, "wal+parquet");
2142        assert_eq!(
2143            config.storage_dir.unwrap().to_str().unwrap(),
2144            "/custom/storage"
2145        );
2146        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
2147    }
2148
2149    #[test]
2150    fn test_from_env_vars_wal_disabled() {
2151        let (config, mode) = EventStoreConfig::from_env_vars(
2152            Some("/app/data".to_string()),
2153            None,
2154            None,
2155            Some("false".to_string()),
2156        );
2157        assert_eq!(mode, "parquet-only");
2158        assert!(config.storage_dir.is_some());
2159        assert!(config.wal_dir.is_none());
2160    }
2161
2162    #[test]
2163    fn test_from_env_vars_no_dirs_is_in_memory() {
2164        let (config, mode) = EventStoreConfig::from_env_vars(None, None, None, None);
2165        assert_eq!(mode, "in-memory");
2166        assert!(config.storage_dir.is_none());
2167        assert!(config.wal_dir.is_none());
2168    }
2169
2170    #[test]
2171    fn test_from_env_vars_empty_strings_treated_as_none() {
2172        let (_, mode) = EventStoreConfig::from_env_vars(
2173            Some("".to_string()),
2174            Some("".to_string()),
2175            Some("".to_string()),
2176            None,
2177        );
2178        assert_eq!(mode, "in-memory");
2179    }
2180
2181    #[test]
2182    fn test_from_env_vars_explicit_overrides_data_dir() {
2183        let (config, mode) = EventStoreConfig::from_env_vars(
2184            Some("/app/data".to_string()),
2185            Some("/override/storage".to_string()),
2186            Some("/override/wal".to_string()),
2187            None,
2188        );
2189        assert_eq!(mode, "wal+parquet");
2190        assert_eq!(
2191            config.storage_dir.unwrap().to_str().unwrap(),
2192            "/override/storage"
2193        );
2194        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
2195    }
2196
2197    #[test]
2198    fn test_from_env_vars_wal_only() {
2199        let (config, mode) =
2200            EventStoreConfig::from_env_vars(None, None, Some("/wal/only".to_string()), None);
2201        assert_eq!(mode, "wal-only");
2202        assert!(config.storage_dir.is_none());
2203        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
2204    }
2205
2206    #[test]
2207    fn test_store_stats_serde() {
2208        let stats = StoreStats {
2209            total_events: 100,
2210            total_entities: 50,
2211            total_event_types: 10,
2212            total_ingested: 100,
2213        };
2214
2215        let json = serde_json::to_string(&stats).unwrap();
2216        assert!(json.contains("\"total_events\":100"));
2217        assert!(json.contains("\"total_entities\":50"));
2218    }
2219
2220    #[test]
2221    fn test_query_with_entity_and_type() {
2222        let store = EventStore::new();
2223
2224        store
2225            .ingest(create_test_event("entity-1", "user.created"))
2226            .unwrap();
2227        store
2228            .ingest(create_test_event("entity-1", "user.updated"))
2229            .unwrap();
2230        store
2231            .ingest(create_test_event("entity-2", "user.created"))
2232            .unwrap();
2233
2234        let results = store
2235            .query(QueryEventsRequest {
2236                entity_id: Some("entity-1".to_string()),
2237                event_type: Some("user.created".to_string()),
2238                tenant_id: None,
2239                as_of: None,
2240                since: None,
2241                until: None,
2242                limit: None,
2243                event_type_prefix: None,
2244                payload_filter: None,
2245            })
2246            .unwrap();
2247
2248        assert_eq!(results.len(), 1);
2249        assert_eq!(results[0].event_type_str(), "user.created");
2250    }
2251
2252    #[test]
2253    fn test_query_by_event_type_prefix() {
2254        let store = EventStore::new();
2255
2256        // Ingest events with various types
2257        store
2258            .ingest(create_test_event("entity-1", "index.created"))
2259            .unwrap();
2260        store
2261            .ingest(create_test_event("entity-2", "index.updated"))
2262            .unwrap();
2263        store
2264            .ingest(create_test_event("entity-3", "trade.created"))
2265            .unwrap();
2266        store
2267            .ingest(create_test_event("entity-4", "trade.completed"))
2268            .unwrap();
2269        store
2270            .ingest(create_test_event("entity-5", "balance.updated"))
2271            .unwrap();
2272
2273        // Query with prefix "index." should return exactly 2
2274        let results = store
2275            .query(QueryEventsRequest {
2276                entity_id: None,
2277                event_type: None,
2278                tenant_id: None,
2279                as_of: None,
2280                since: None,
2281                until: None,
2282                limit: None,
2283                event_type_prefix: Some("index.".to_string()),
2284                payload_filter: None,
2285            })
2286            .unwrap();
2287
2288        assert_eq!(results.len(), 2);
2289        assert!(
2290            results
2291                .iter()
2292                .all(|e| e.event_type_str().starts_with("index."))
2293        );
2294    }
2295
2296    #[test]
2297    fn test_query_by_event_type_prefix_empty_returns_all() {
2298        let store = EventStore::new();
2299
2300        store
2301            .ingest(create_test_event("entity-1", "index.created"))
2302            .unwrap();
2303        store
2304            .ingest(create_test_event("entity-2", "trade.created"))
2305            .unwrap();
2306
2307        // Empty prefix matches all types
2308        let results = store
2309            .query(QueryEventsRequest {
2310                entity_id: None,
2311                event_type: None,
2312                tenant_id: None,
2313                as_of: None,
2314                since: None,
2315                until: None,
2316                limit: None,
2317                event_type_prefix: Some("".to_string()),
2318                payload_filter: None,
2319            })
2320            .unwrap();
2321
2322        assert_eq!(results.len(), 2);
2323    }
2324
2325    #[test]
2326    fn test_query_by_event_type_prefix_no_match() {
2327        let store = EventStore::new();
2328
2329        store
2330            .ingest(create_test_event("entity-1", "index.created"))
2331            .unwrap();
2332
2333        let results = store
2334            .query(QueryEventsRequest {
2335                entity_id: None,
2336                event_type: None,
2337                tenant_id: None,
2338                as_of: None,
2339                since: None,
2340                until: None,
2341                limit: None,
2342                event_type_prefix: Some("nonexistent.".to_string()),
2343                payload_filter: None,
2344            })
2345            .unwrap();
2346
2347        assert!(results.is_empty());
2348    }
2349
2350    #[test]
2351    fn test_query_by_entity_with_type_prefix() {
2352        let store = EventStore::new();
2353
2354        store
2355            .ingest(create_test_event("entity-1", "index.created"))
2356            .unwrap();
2357        store
2358            .ingest(create_test_event("entity-1", "trade.created"))
2359            .unwrap();
2360        store
2361            .ingest(create_test_event("entity-2", "index.updated"))
2362            .unwrap();
2363
2364        // Query entity-1 with prefix "index." should return 1
2365        let results = store
2366            .query(QueryEventsRequest {
2367                entity_id: Some("entity-1".to_string()),
2368                event_type: None,
2369                tenant_id: None,
2370                as_of: None,
2371                since: None,
2372                until: None,
2373                limit: None,
2374                event_type_prefix: Some("index.".to_string()),
2375                payload_filter: None,
2376            })
2377            .unwrap();
2378
2379        assert_eq!(results.len(), 1);
2380        assert_eq!(results[0].event_type_str(), "index.created");
2381    }
2382
2383    #[test]
2384    fn test_query_prefix_with_limit() {
2385        let store = EventStore::new();
2386
2387        for i in 0..5 {
2388            store
2389                .ingest(create_test_event(&format!("entity-{}", i), "index.created"))
2390                .unwrap();
2391        }
2392
2393        let results = store
2394            .query(QueryEventsRequest {
2395                entity_id: None,
2396                event_type: None,
2397                tenant_id: None,
2398                as_of: None,
2399                since: None,
2400                until: None,
2401                limit: Some(3),
2402                event_type_prefix: Some("index.".to_string()),
2403                payload_filter: None,
2404            })
2405            .unwrap();
2406
2407        assert_eq!(results.len(), 3);
2408    }
2409
2410    #[test]
2411    fn test_query_prefix_alongside_existing_filters() {
2412        let store = EventStore::new();
2413
2414        store
2415            .ingest(create_test_event("entity-1", "index.created"))
2416            .unwrap();
2417        // Sleep briefly to ensure different timestamps
2418        std::thread::sleep(std::time::Duration::from_millis(10));
2419        store
2420            .ingest(create_test_event("entity-2", "index.strategy.updated"))
2421            .unwrap();
2422        std::thread::sleep(std::time::Duration::from_millis(10));
2423        store
2424            .ingest(create_test_event("entity-3", "index.deleted"))
2425            .unwrap();
2426
2427        // Prefix with limit
2428        let results = store
2429            .query(QueryEventsRequest {
2430                entity_id: None,
2431                event_type: None,
2432                tenant_id: None,
2433                as_of: None,
2434                since: None,
2435                until: None,
2436                limit: Some(2),
2437                event_type_prefix: Some("index.".to_string()),
2438                payload_filter: None,
2439            })
2440            .unwrap();
2441
2442        assert_eq!(results.len(), 2);
2443    }
2444
2445    #[test]
2446    fn test_query_with_payload_filter() {
2447        let store = EventStore::new();
2448
2449        // Ingest 5 events with user_id=alice
2450        for i in 0..5 {
2451            store
2452                .ingest(create_test_event_with_payload(
2453                    &format!("entity-{}", i),
2454                    "user.action",
2455                    serde_json::json!({"user_id": "alice", "action": "click"}),
2456                ))
2457                .unwrap();
2458        }
2459        // Ingest 5 events with user_id=bob
2460        for i in 5..10 {
2461            store
2462                .ingest(create_test_event_with_payload(
2463                    &format!("entity-{}", i),
2464                    "user.action",
2465                    serde_json::json!({"user_id": "bob", "action": "view"}),
2466                ))
2467                .unwrap();
2468        }
2469
2470        // Filter for alice
2471        let results = store
2472            .query(QueryEventsRequest {
2473                entity_id: None,
2474                event_type: Some("user.action".to_string()),
2475                tenant_id: None,
2476                as_of: None,
2477                since: None,
2478                until: None,
2479                limit: None,
2480                event_type_prefix: None,
2481                payload_filter: Some(r#"{"user_id":"alice"}"#.to_string()),
2482            })
2483            .unwrap();
2484
2485        assert_eq!(results.len(), 5);
2486    }
2487
2488    #[test]
2489    fn test_query_payload_filter_non_existent_field() {
2490        let store = EventStore::new();
2491
2492        store
2493            .ingest(create_test_event_with_payload(
2494                "entity-1",
2495                "user.action",
2496                serde_json::json!({"user_id": "alice"}),
2497            ))
2498            .unwrap();
2499
2500        // Filter for a field that doesn't exist — returns 0, not error
2501        let results = store
2502            .query(QueryEventsRequest {
2503                entity_id: None,
2504                event_type: None,
2505                tenant_id: None,
2506                as_of: None,
2507                since: None,
2508                until: None,
2509                limit: None,
2510                event_type_prefix: None,
2511                payload_filter: Some(r#"{"nonexistent":"value"}"#.to_string()),
2512            })
2513            .unwrap();
2514
2515        assert!(results.is_empty());
2516    }
2517
2518    #[test]
2519    fn test_query_payload_filter_with_prefix() {
2520        let store = EventStore::new();
2521
2522        store
2523            .ingest(create_test_event_with_payload(
2524                "entity-1",
2525                "index.created",
2526                serde_json::json!({"status": "active"}),
2527            ))
2528            .unwrap();
2529        store
2530            .ingest(create_test_event_with_payload(
2531                "entity-2",
2532                "index.created",
2533                serde_json::json!({"status": "inactive"}),
2534            ))
2535            .unwrap();
2536        store
2537            .ingest(create_test_event_with_payload(
2538                "entity-3",
2539                "trade.created",
2540                serde_json::json!({"status": "active"}),
2541            ))
2542            .unwrap();
2543
2544        // Combine prefix + payload filter
2545        let results = store
2546            .query(QueryEventsRequest {
2547                entity_id: None,
2548                event_type: None,
2549                tenant_id: None,
2550                as_of: None,
2551                since: None,
2552                until: None,
2553                limit: None,
2554                event_type_prefix: Some("index.".to_string()),
2555                payload_filter: Some(r#"{"status":"active"}"#.to_string()),
2556            })
2557            .unwrap();
2558
2559        assert_eq!(results.len(), 1);
2560        assert_eq!(results[0].entity_id().to_string(), "entity-1");
2561    }
2562
2563    #[test]
2564    fn test_flush_storage_no_storage() {
2565        let store = EventStore::new();
2566        // Without storage, flush should succeed (no-op)
2567        let result = store.flush_storage();
2568        assert!(result.is_ok());
2569    }
2570
2571    #[test]
2572    fn test_state_evolution() {
2573        let store = EventStore::new();
2574
2575        // Initial state
2576        store
2577            .ingest(
2578                Event::from_strings(
2579                    "user.created".to_string(),
2580                    "user-1".to_string(),
2581                    "default".to_string(),
2582                    serde_json::json!({"name": "Alice", "age": 25}),
2583                    None,
2584                )
2585                .unwrap(),
2586            )
2587            .unwrap();
2588
2589        // Update state
2590        store
2591            .ingest(
2592                Event::from_strings(
2593                    "user.updated".to_string(),
2594                    "user-1".to_string(),
2595                    "default".to_string(),
2596                    serde_json::json!({"age": 26}),
2597                    None,
2598                )
2599                .unwrap(),
2600            )
2601            .unwrap();
2602
2603        let state = store.reconstruct_state("user-1", None).unwrap();
2604        // The state is wrapped with metadata
2605        assert_eq!(state["current_state"]["name"], "Alice");
2606        assert_eq!(state["current_state"]["age"], 26);
2607    }
2608
2609    #[test]
2610    fn test_reject_system_event_types() {
2611        let store = EventStore::new();
2612
2613        // System event types should be rejected via user-facing ingestion
2614        let event = Event::reconstruct_from_strings(
2615            uuid::Uuid::new_v4(),
2616            "_system.tenant.created".to_string(),
2617            "_system:tenant:acme".to_string(),
2618            "_system".to_string(),
2619            serde_json::json!({"name": "ACME"}),
2620            chrono::Utc::now(),
2621            None,
2622            1,
2623        );
2624
2625        let result = store.ingest(event);
2626        assert!(result.is_err());
2627        let err = result.unwrap_err();
2628        assert!(
2629            err.to_string().contains("reserved for internal use"),
2630            "Expected system namespace rejection, got: {}",
2631            err
2632        );
2633    }
2634
2635    // -----------------------------------------------------------------------
2636    // Crash recovery: WAL events survive restart via Parquet checkpoint.
2637    // Regression test for GitHub issue #84 — flush_storage() was a no-op
2638    // during recovery because events were never buffered into Parquet's
2639    // current_batch before flushing.
2640    // -----------------------------------------------------------------------
2641
2642    #[test]
2643    fn test_wal_recovery_checkpoints_to_parquet() {
2644        let data_dir = TempDir::new().unwrap();
2645        let storage_dir = data_dir.path().join("storage");
2646        let wal_dir = data_dir.path().join("wal");
2647
2648        // Session 1: ingest events with WAL + Parquet
2649        {
2650            let config = EventStoreConfig::production(
2651                &storage_dir,
2652                &wal_dir,
2653                SnapshotConfig::default(),
2654                WALConfig {
2655                    sync_on_write: true,
2656                    ..WALConfig::default()
2657                },
2658                CompactionConfig::default(),
2659            );
2660            let store = EventStore::with_config(config);
2661
2662            for i in 0..5 {
2663                let event = Event::from_strings(
2664                    "test.created".to_string(),
2665                    format!("entity-{}", i),
2666                    "default".to_string(),
2667                    serde_json::json!({"index": i}),
2668                    None,
2669                )
2670                .unwrap();
2671                store.ingest(event).unwrap();
2672            }
2673
2674            assert_eq!(store.stats().total_events, 5);
2675
2676            // Do NOT call flush_storage or shutdown — simulate a crash.
2677            // Events are in WAL (sync_on_write: true) but NOT in Parquet.
2678        }
2679
2680        // Verify WAL file has data
2681        let wal_files: Vec<_> = std::fs::read_dir(&wal_dir)
2682            .unwrap()
2683            .filter_map(|e| e.ok())
2684            .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
2685            .collect();
2686        assert!(!wal_files.is_empty(), "WAL file should exist");
2687        let wal_size = wal_files[0].metadata().unwrap().len();
2688        assert!(wal_size > 0, "WAL file should have data (got 0 bytes)");
2689
2690        // Session 2: reopen — recovery should checkpoint WAL to Parquet, then truncate
2691        {
2692            let config = EventStoreConfig::production(
2693                &storage_dir,
2694                &wal_dir,
2695                SnapshotConfig::default(),
2696                WALConfig {
2697                    sync_on_write: true,
2698                    ..WALConfig::default()
2699                },
2700                CompactionConfig::default(),
2701            );
2702            let store = EventStore::with_config(config);
2703
2704            // Events should be recovered
2705            assert_eq!(
2706                store.stats().total_events,
2707                5,
2708                "Session 2 should have all 5 events after WAL recovery"
2709            );
2710
2711            // Parquet should now have files (checkpoint happened)
2712            let parquet_files: Vec<_> = std::fs::read_dir(&storage_dir)
2713                .unwrap()
2714                .filter_map(|e| e.ok())
2715                .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2716                .collect();
2717            assert!(
2718                !parquet_files.is_empty(),
2719                "Parquet file should exist after WAL checkpoint"
2720            );
2721        }
2722
2723        // Session 3: reopen again — events should load from Parquet (WAL was truncated)
2724        {
2725            let config = EventStoreConfig::production(
2726                &storage_dir,
2727                &wal_dir,
2728                SnapshotConfig::default(),
2729                WALConfig {
2730                    sync_on_write: true,
2731                    ..WALConfig::default()
2732                },
2733                CompactionConfig::default(),
2734            );
2735            let store = EventStore::with_config(config);
2736
2737            assert_eq!(
2738                store.stats().total_events,
2739                5,
2740                "Session 3 should still have all 5 events from Parquet"
2741            );
2742        }
2743    }
2744}