Skip to main content

allsource_core/
store.rs

1#[cfg(feature = "server")]
2use crate::application::services::webhook::WebhookRegistry;
3#[cfg(feature = "server")]
4use crate::infrastructure::observability::metrics::MetricsRegistry;
5#[cfg(feature = "server")]
6use crate::infrastructure::web::websocket::WebSocketManager;
7use crate::{
8    application::{
9        dto::QueryEventsRequest,
10        services::{
11            consumer::ConsumerRegistry,
12            exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
13            pipeline::PipelineManager,
14            projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
15            replay::ReplayManager,
16            schema::{SchemaRegistry, SchemaRegistryConfig},
17            schema_evolution::SchemaEvolutionManager,
18        },
19    },
20    domain::entities::Event,
21    error::{AllSourceError, Result},
22    infrastructure::{
23        persistence::{
24            compaction::{CompactionConfig, CompactionManager},
25            index::{EventIndex, IndexEntry},
26            snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
27            storage::ParquetStorage,
28            wal::{WALConfig, WriteAheadLog},
29        },
30        query::geospatial::GeoIndex,
31    },
32};
33use chrono::{DateTime, Utc};
34use dashmap::DashMap;
35use parking_lot::RwLock;
36use std::{path::PathBuf, sync::Arc};
37#[cfg(feature = "server")]
38use tokio::sync::mpsc;
39
40/// High-performance event store with columnar storage
41pub struct EventStore {
42    /// In-memory event storage
43    events: Arc<RwLock<Vec<Event>>>,
44
45    /// High-performance concurrent index
46    index: Arc<EventIndex>,
47
48    /// Projection manager for real-time aggregations
49    pub(crate) projections: Arc<RwLock<ProjectionManager>>,
50
51    /// Optional persistent storage (v0.2 feature)
52    storage: Option<Arc<RwLock<ParquetStorage>>>,
53
54    /// WebSocket manager for real-time event streaming (v0.2 feature)
55    #[cfg(feature = "server")]
56    websocket_manager: Arc<WebSocketManager>,
57
58    /// Snapshot manager for fast state recovery (v0.2 feature)
59    snapshot_manager: Arc<SnapshotManager>,
60
61    /// Write-Ahead Log for durability (v0.2 feature)
62    wal: Option<Arc<WriteAheadLog>>,
63
64    /// Compaction manager for Parquet optimization (v0.2 feature)
65    compaction_manager: Option<Arc<CompactionManager>>,
66
67    /// Schema registry for event validation (v0.5 feature)
68    schema_registry: Arc<SchemaRegistry>,
69
70    /// Replay manager for event replay and projection rebuilding (v0.5 feature)
71    replay_manager: Arc<ReplayManager>,
72
73    /// Pipeline manager for stream processing (v0.5 feature)
74    pipeline_manager: Arc<PipelineManager>,
75
76    /// Prometheus metrics registry (v0.6 feature)
77    #[cfg(feature = "server")]
78    metrics: Arc<MetricsRegistry>,
79
80    /// Total events ingested (for metrics)
81    total_ingested: Arc<RwLock<u64>>,
82
83    /// Projection state cache for Query Service integration (v0.7 feature)
84    /// Key format: "{projection_name}:{entity_id}"
85    /// This DashMap provides O(1) access with ~11.9 μs latency
86    projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
87
88    /// Projection status overrides (v0.13 feature)
89    /// Tracks pause/start state per projection name: "running" or "paused"
90    projection_status: Arc<DashMap<String, String>>,
91
92    /// Webhook registry for outbound event delivery (v0.11 feature)
93    #[cfg(feature = "server")]
94    webhook_registry: Arc<WebhookRegistry>,
95
96    /// Channel sender for async webhook delivery tasks
97    #[cfg(feature = "server")]
98    webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
99
100    /// Geospatial index for coordinate-based queries (v2.0 feature)
101    geo_index: Arc<GeoIndex>,
102
103    /// Exactly-once processing registry (v2.0 feature)
104    exactly_once: Arc<ExactlyOnceRegistry>,
105
106    /// Autonomous schema evolution manager (v2.0 feature)
107    schema_evolution: Arc<SchemaEvolutionManager>,
108
109    /// Per-entity version counters for optimistic concurrency control (v0.14 feature)
110    /// Key: entity_id string, Value: monotonic version (number of events for that entity)
111    entity_versions: Arc<DashMap<String, u64>>,
112
113    /// Durable consumer registry for subscription cursor tracking (v0.14 feature)
114    consumer_registry: Arc<ConsumerRegistry>,
115
116    /// In-process broadcast of every successfully-ingested event. Always enabled
117    /// so embedded consumers (TUI, web) can tail changes without the `server`
118    /// feature / HTTP stack. Lagging receivers see `RecvError::Lagged`.
119    event_broadcast_tx: tokio::sync::broadcast::Sender<Arc<Event>>,
120}
121
122/// A task queued for async webhook delivery
123#[cfg(feature = "server")]
124#[derive(Debug, Clone)]
125pub struct WebhookDeliveryTask {
126    pub webhook: crate::application::services::webhook::WebhookSubscription,
127    pub event: Event,
128}
129
130impl EventStore {
131    /// Create a new in-memory event store
132    pub fn new() -> Self {
133        Self::with_config(EventStoreConfig::default())
134    }
135
136    /// Create event store with custom configuration
137    pub fn with_config(config: EventStoreConfig) -> Self {
138        let mut projections = ProjectionManager::new();
139
140        // Register built-in projections
141        projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
142        projections.register(Arc::new(EventCounterProjection::new("event_counters")));
143
144        // Initialize persistent storage if configured
145        let storage = config
146            .storage_dir
147            .as_ref()
148            .and_then(|dir| match ParquetStorage::new(dir) {
149                Ok(storage) => {
150                    tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
151                    Some(Arc::new(RwLock::new(storage)))
152                }
153                Err(e) => {
154                    tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
155                    None
156                }
157            });
158
159        // Initialize WAL if configured (v0.2 feature)
160        let wal = config.wal_dir.as_ref().and_then(|dir| {
161            match WriteAheadLog::new(dir, config.wal_config.clone()) {
162                Ok(wal) => {
163                    tracing::info!("✅ WAL enabled at: {}", dir.display());
164                    Some(Arc::new(wal))
165                }
166                Err(e) => {
167                    tracing::error!("❌ Failed to initialize WAL: {}", e);
168                    None
169                }
170            }
171        });
172
173        // Initialize compaction manager if Parquet storage is enabled (v0.2 feature)
174        let compaction_manager = config.storage_dir.as_ref().map(|dir| {
175            let manager = CompactionManager::new(dir, config.compaction_config.clone());
176            Arc::new(manager)
177        });
178
179        // Initialize schema registry (v0.5 feature)
180        let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
181        tracing::info!("✅ Schema registry enabled");
182
183        // Initialize replay manager (v0.5 feature)
184        let replay_manager = Arc::new(ReplayManager::new());
185        tracing::info!("✅ Replay manager enabled");
186
187        // Initialize pipeline manager (v0.5 feature)
188        let pipeline_manager = Arc::new(PipelineManager::new());
189        tracing::info!("✅ Pipeline manager enabled");
190
191        // Initialize metrics registry (v0.6 feature)
192        #[cfg(feature = "server")]
193        let metrics = {
194            let m = MetricsRegistry::new();
195            tracing::info!("✅ Prometheus metrics registry initialized");
196            m
197        };
198
199        // Initialize projection state cache (v0.7 feature)
200        let projection_state_cache = Arc::new(DashMap::new());
201        tracing::info!("✅ Projection state cache initialized");
202
203        // Initialize webhook registry (v0.11 feature)
204        #[cfg(feature = "server")]
205        let webhook_registry = {
206            let w = Arc::new(WebhookRegistry::new());
207            tracing::info!("✅ Webhook registry initialized");
208            w
209        };
210
211        // Unconditional in-process event broadcaster so embedded consumers
212        // (TUI, web) can live-reload without the `server` feature.
213        let (event_broadcast_tx, _) = tokio::sync::broadcast::channel(1024);
214
215        let store = Self {
216            events: Arc::new(RwLock::new(Vec::new())),
217            index: Arc::new(EventIndex::new()),
218            projections: Arc::new(RwLock::new(projections)),
219            storage,
220            #[cfg(feature = "server")]
221            websocket_manager: Arc::new(WebSocketManager::new()),
222            snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
223            wal,
224            compaction_manager,
225            schema_registry,
226            replay_manager,
227            pipeline_manager,
228            #[cfg(feature = "server")]
229            metrics,
230            total_ingested: Arc::new(RwLock::new(0)),
231            projection_state_cache,
232            projection_status: Arc::new(DashMap::new()),
233            #[cfg(feature = "server")]
234            webhook_registry,
235            #[cfg(feature = "server")]
236            webhook_tx: Arc::new(RwLock::new(None)),
237            geo_index: Arc::new(GeoIndex::new()),
238            exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
239            schema_evolution: Arc::new(SchemaEvolutionManager::new()),
240            entity_versions: Arc::new(DashMap::new()),
241            consumer_registry: Arc::new(ConsumerRegistry::new()),
242            event_broadcast_tx,
243        };
244
245        // Step 1: Load persisted events from Parquet (the durable baseline)
246        if let Some(ref storage) = store.storage {
247            match storage.read().load_all_events() {
248                Err(e) => {
249                    tracing::error!(
250                        "❌ Failed to load events from Parquet — data exists on disk but \
251                         could not be read. This means events are NOT available until the \
252                         issue is resolved. Error: {e}"
253                    );
254                }
255                Ok(persisted_events) if persisted_events.is_empty() => {
256                    tracing::info!("📂 No persisted events found in Parquet storage");
257                }
258                Ok(persisted_events) => {
259                    tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
260
261                    for event in persisted_events {
262                        let offset = store.events.read().len();
263                        if let Err(e) = store.index.index_event(
264                            event.id,
265                            event.entity_id_str(),
266                            event.event_type_str(),
267                            event.timestamp,
268                            offset,
269                        ) {
270                            tracing::error!("Failed to re-index event {}: {}", event.id, e);
271                        }
272
273                        if let Err(e) = store.projections.read().process_event(&event) {
274                            tracing::error!("Failed to re-process event {}: {}", event.id, e);
275                        }
276
277                        // Track per-entity version
278                        *store
279                            .entity_versions
280                            .entry(event.entity_id_str().to_string())
281                            .or_insert(0) += 1;
282
283                        store.events.write().push(event);
284                    }
285
286                    let total = store.events.read().len();
287                    *store.total_ingested.write() = total as u64;
288                    tracing::info!("✅ Successfully loaded {} events from storage", total);
289                }
290            }
291        }
292
293        // Step 2: Recover WAL events (written after last Parquet checkpoint)
294        if let Some(ref wal) = store.wal {
295            match wal.recover() {
296                Ok(recovered_events) if !recovered_events.is_empty() => {
297                    // Collect IDs already loaded from Parquet to skip duplicates
298                    let existing_ids: std::collections::HashSet<uuid::Uuid> =
299                        store.events.read().iter().map(|e| e.id).collect();
300
301                    let mut wal_new = 0usize;
302                    for event in recovered_events {
303                        if existing_ids.contains(&event.id) {
304                            continue; // already loaded from Parquet
305                        }
306
307                        let offset = store.events.read().len();
308                        if let Err(e) = store.index.index_event(
309                            event.id,
310                            event.entity_id_str(),
311                            event.event_type_str(),
312                            event.timestamp,
313                            offset,
314                        ) {
315                            tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
316                        }
317
318                        if let Err(e) = store.projections.read().process_event(&event) {
319                            tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
320                        }
321
322                        // Track per-entity version
323                        *store
324                            .entity_versions
325                            .entry(event.entity_id_str().to_string())
326                            .or_insert(0) += 1;
327
328                        store.events.write().push(event);
329                        wal_new += 1;
330                    }
331
332                    if wal_new > 0 {
333                        let total = store.events.read().len();
334                        *store.total_ingested.write() = total as u64;
335                        tracing::info!(
336                            "✅ Recovered {} new events from WAL ({} total)",
337                            wal_new,
338                            total
339                        );
340
341                        // Checkpoint WAL events to Parquet — buffer them into
342                        // the Parquet batch first, then flush. Without this,
343                        // flush_storage() finds an empty current_batch and
344                        // silently no-ops, then we truncate the WAL and the
345                        // events exist only in memory (lost on next restart).
346                        if let Some(ref storage) = store.storage {
347                            tracing::info!(
348                                "📸 Checkpointing {} WAL events to Parquet storage...",
349                                wal_new
350                            );
351                            let parquet = storage.read();
352                            let events = store.events.read();
353                            let mut buffered = 0usize;
354                            for event in events.iter().skip(events.len() - wal_new) {
355                                if let Err(e) = parquet.append_event(event.clone()) {
356                                    tracing::error!(
357                                        "Failed to buffer WAL event for Parquet: {}",
358                                        e
359                                    );
360                                } else {
361                                    buffered += 1;
362                                }
363                            }
364                            drop(events);
365                            drop(parquet);
366
367                            if buffered > 0 {
368                                if let Err(e) = store.flush_storage() {
369                                    tracing::error!("Failed to checkpoint to Parquet: {}", e);
370                                } else if let Err(e) = wal.truncate() {
371                                    tracing::error!(
372                                        "Failed to truncate WAL after checkpoint: {}",
373                                        e
374                                    );
375                                } else {
376                                    tracing::info!(
377                                        "✅ WAL checkpointed and truncated ({} events)",
378                                        buffered
379                                    );
380                                }
381                            }
382                        }
383                    }
384                }
385                Ok(_) => {
386                    tracing::debug!("No events to recover from WAL");
387                }
388                Err(e) => {
389                    tracing::error!("❌ WAL recovery failed: {}", e);
390                }
391            }
392        }
393
394        store
395    }
396
397    /// Ingest a new event with optional optimistic concurrency check.
398    ///
399    /// If `expected_version` is `Some(v)`, the write is rejected with
400    /// `VersionConflict` unless the entity's current version equals `v`.
401    /// The version check and WAL append are atomic (locked together).
402    ///
403    /// Returns the new entity version after the append.
404    #[cfg_attr(feature = "hotpath", hotpath::measure)]
405    pub fn ingest_with_expected_version(
406        &self,
407        event: &Event,
408        expected_version: Option<u64>,
409    ) -> Result<u64> {
410        // Validate event first (before any locking)
411        self.validate_event(event)?;
412
413        let entity_id = event.entity_id_str().to_string();
414
415        // Atomic version check + append: hold the DashMap entry lock
416        // to prevent TOCTOU races between check and write.
417        let new_version = {
418            let mut version_entry = self.entity_versions.entry(entity_id.clone()).or_insert(0);
419            let current = *version_entry;
420
421            if let Some(expected) = expected_version
422                && current != expected
423            {
424                return Err(crate::error::AllSourceError::VersionConflict { expected, current });
425            }
426
427            // Write to WAL FIRST for durability (under version lock to keep atomicity)
428            if let Some(ref wal) = self.wal {
429                wal.append(event.clone())?;
430            }
431
432            *version_entry += 1;
433            *version_entry
434        };
435
436        // From here on, the event is durable (WAL) and version is bumped.
437        // Continue with indexing, projections, storage, and broadcast.
438        self.ingest_post_wal(event)?;
439
440        Ok(new_version)
441    }
442
443    /// Post-WAL ingestion: index, projections, storage, broadcast.
444    /// Called after WAL append and version bump are complete.
445    #[cfg_attr(feature = "hotpath", hotpath::measure)]
446    fn ingest_post_wal(&self, event: &Event) -> Result<()> {
447        #[cfg(feature = "server")]
448        let timer = self.metrics.ingestion_duration_seconds.start_timer();
449
450        let mut events = self.events.write();
451        let offset = events.len();
452
453        // Index the event
454        self.index.index_event(
455            event.id,
456            event.entity_id_str(),
457            event.event_type_str(),
458            event.timestamp,
459            offset,
460        )?;
461
462        // Process through projections
463        let projections = self.projections.read();
464        projections.process_event(event)?;
465        drop(projections);
466
467        // Process through pipelines
468        let pipeline_results = self.pipeline_manager.process_event(event);
469        if !pipeline_results.is_empty() {
470            tracing::debug!(
471                "Event {} processed by {} pipeline(s)",
472                event.id,
473                pipeline_results.len()
474            );
475            for (pipeline_id, result) in pipeline_results {
476                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
477            }
478        }
479
480        // Persist to Parquet storage if enabled
481        if let Some(ref storage) = self.storage {
482            let storage = storage.read();
483            storage.append_event(event.clone())?;
484        }
485
486        // Store the event in memory
487        events.push(event.clone());
488        let total_events = events.len();
489        drop(events);
490
491        // Broadcast to in-process subscribers (always on) + optional WS.
492        let event_arc = Arc::new(event.clone());
493        let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
494        #[cfg(feature = "server")]
495        self.websocket_manager.broadcast_event(event_arc);
496
497        // Dispatch to matching webhook subscriptions
498        #[cfg(feature = "server")]
499        self.dispatch_webhooks(event);
500
501        // Update geospatial index
502        self.geo_index.index_event(event);
503
504        // Autonomous schema evolution
505        self.schema_evolution
506            .analyze_event(event.event_type_str(), &event.payload);
507
508        // Check if automatic snapshot should be created
509        self.check_auto_snapshot(event.entity_id_str(), event);
510
511        // Update metrics
512        #[cfg(feature = "server")]
513        {
514            self.metrics.events_ingested_total.inc();
515            self.metrics
516                .events_ingested_by_type
517                .with_label_values(&[event.event_type_str()])
518                .inc();
519            self.metrics.storage_events_total.set(total_events as i64);
520        }
521
522        // Update legacy total counter
523        let mut total = self.total_ingested.write();
524        *total += 1;
525
526        #[cfg(feature = "server")]
527        timer.observe_duration();
528
529        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
530
531        Ok(())
532    }
533
534    /// Ingest a new event into the store
535    #[cfg_attr(feature = "hotpath", hotpath::measure)]
536    pub fn ingest(&self, event: &Event) -> Result<()> {
537        // Start metrics timer (v0.6 feature)
538        #[cfg(feature = "server")]
539        let timer = self.metrics.ingestion_duration_seconds.start_timer();
540
541        // Validate event
542        let validation_result = self.validate_event(event);
543        if let Err(e) = validation_result {
544            #[cfg(feature = "server")]
545            {
546                self.metrics.ingestion_errors_total.inc();
547                timer.observe_duration();
548            }
549            return Err(e);
550        }
551
552        // Write to WAL FIRST for durability (v0.2 feature)
553        // This ensures event is persisted before processing
554        if let Some(ref wal) = self.wal
555            && let Err(e) = wal.append(event.clone())
556        {
557            #[cfg(feature = "server")]
558            {
559                self.metrics.ingestion_errors_total.inc();
560                timer.observe_duration();
561            }
562            return Err(e);
563        }
564
565        // Track per-entity version (unconditional increment, no version check)
566        *self
567            .entity_versions
568            .entry(event.entity_id_str().to_string())
569            .or_insert(0) += 1;
570
571        let mut events = self.events.write();
572        let offset = events.len();
573
574        // Index the event
575        self.index.index_event(
576            event.id,
577            event.entity_id_str(),
578            event.event_type_str(),
579            event.timestamp,
580            offset,
581        )?;
582
583        // Process through projections
584        let projections = self.projections.read();
585        projections.process_event(event)?;
586        drop(projections); // Release lock
587
588        // Process through pipelines (v0.5 feature)
589        // Pipelines can transform, filter, and aggregate events in real-time
590        let pipeline_results = self.pipeline_manager.process_event(event);
591        if !pipeline_results.is_empty() {
592            tracing::debug!(
593                "Event {} processed by {} pipeline(s)",
594                event.id,
595                pipeline_results.len()
596            );
597            // Pipeline results could be stored, emitted, or forwarded elsewhere
598            // For now, we just log them for observability
599            for (pipeline_id, result) in pipeline_results {
600                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
601            }
602        }
603
604        // Persist to Parquet storage if enabled (v0.2)
605        if let Some(ref storage) = self.storage {
606            let storage = storage.read();
607            storage.append_event(event.clone())?;
608        }
609
610        // Store the event in memory
611        events.push(event.clone());
612        let total_events = events.len();
613        drop(events); // Release lock early
614
615        // Broadcast to in-process subscribers + optional WS.
616        let event_arc = Arc::new(event.clone());
617        let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
618        #[cfg(feature = "server")]
619        self.websocket_manager.broadcast_event(event_arc);
620
621        // Dispatch to matching webhook subscriptions (v0.11 feature)
622        #[cfg(feature = "server")]
623        self.dispatch_webhooks(event);
624
625        // Update geospatial index (v2.0 feature)
626        self.geo_index.index_event(event);
627
628        // Autonomous schema evolution (v2.0 feature)
629        self.schema_evolution
630            .analyze_event(event.event_type_str(), &event.payload);
631
632        // Check if automatic snapshot should be created (v0.2 feature)
633        self.check_auto_snapshot(event.entity_id_str(), event);
634
635        // Update metrics (v0.6 feature)
636        #[cfg(feature = "server")]
637        {
638            self.metrics.events_ingested_total.inc();
639            self.metrics
640                .events_ingested_by_type
641                .with_label_values(&[event.event_type_str()])
642                .inc();
643            self.metrics.storage_events_total.set(total_events as i64);
644        }
645
646        // Update legacy total counter
647        let mut total = self.total_ingested.write();
648        *total += 1;
649
650        #[cfg(feature = "server")]
651        timer.observe_duration();
652
653        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
654
655        Ok(())
656    }
657
658    /// Ingest a batch of events with a single write lock acquisition.
659    ///
660    /// All events are validated first. If any event fails validation, no
661    /// events are stored (all-or-nothing validation). Events are then written
662    /// to WAL, indexed, processed through projections, and pushed to the
663    /// events vector under a single write lock.
664    #[cfg_attr(feature = "hotpath", hotpath::measure)]
665    pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()> {
666        if batch.is_empty() {
667            return Ok(());
668        }
669
670        // Phase 1: Validate all events before acquiring any locks
671        for event in &batch {
672            self.validate_event(event)?;
673        }
674
675        // Phase 2: Write all events to WAL (before write lock, for durability)
676        if let Some(ref wal) = self.wal {
677            for event in &batch {
678                wal.append(event.clone())?;
679            }
680        }
681
682        // Phase 3: Single write lock for index + projections + push
683        let mut events = self.events.write();
684        let projections = self.projections.read();
685
686        for event in batch {
687            let offset = events.len();
688
689            self.index.index_event(
690                event.id,
691                event.entity_id_str(),
692                event.event_type_str(),
693                event.timestamp,
694                offset,
695            )?;
696
697            projections.process_event(&event)?;
698            self.pipeline_manager.process_event(&event);
699
700            if let Some(ref storage) = self.storage {
701                let storage = storage.read();
702                storage.append_event(event.clone())?;
703            }
704
705            self.geo_index.index_event(&event);
706            self.schema_evolution
707                .analyze_event(event.event_type_str(), &event.payload);
708
709            // Track per-entity version
710            *self
711                .entity_versions
712                .entry(event.entity_id_str().to_string())
713                .or_insert(0) += 1;
714
715            // Broadcast to in-process subscribers
716            let _ = self.event_broadcast_tx.send(Arc::new(event.clone()));
717
718            events.push(event);
719        }
720
721        let total_events = events.len();
722        drop(projections);
723        drop(events);
724
725        let mut total = self.total_ingested.write();
726        *total += total_events as u64;
727
728        Ok(())
729    }
730
731    /// Ingest a replicated event from the leader (follower mode).
732    ///
733    /// Unlike `ingest()`, this method:
734    /// - Skips WAL writing (the follower's WalReceiver manages its own local WAL)
735    /// - Skips schema validation (the leader already validated)
736    /// - Still indexes, processes projections/pipelines, and broadcasts to WebSocket clients
737    #[cfg_attr(feature = "hotpath", hotpath::measure)]
738    pub fn ingest_replicated(&self, event: &Event) -> Result<()> {
739        #[cfg(feature = "server")]
740        let timer = self.metrics.ingestion_duration_seconds.start_timer();
741
742        let mut events = self.events.write();
743        let offset = events.len();
744
745        // Index the event
746        self.index.index_event(
747            event.id,
748            event.entity_id_str(),
749            event.event_type_str(),
750            event.timestamp,
751            offset,
752        )?;
753
754        // Process through projections
755        let projections = self.projections.read();
756        projections.process_event(event)?;
757        drop(projections);
758
759        // Process through pipelines
760        let pipeline_results = self.pipeline_manager.process_event(event);
761        if !pipeline_results.is_empty() {
762            tracing::debug!(
763                "Replicated event {} processed by {} pipeline(s)",
764                event.id,
765                pipeline_results.len()
766            );
767        }
768
769        // Track per-entity version
770        *self
771            .entity_versions
772            .entry(event.entity_id_str().to_string())
773            .or_insert(0) += 1;
774
775        // Store the event in memory
776        events.push(event.clone());
777        let total_events = events.len();
778        drop(events);
779
780        // Broadcast to in-process subscribers + optional WS.
781        let event_arc = Arc::new(event.clone());
782        let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
783        #[cfg(feature = "server")]
784        self.websocket_manager.broadcast_event(event_arc);
785
786        // Update metrics
787        #[cfg(feature = "server")]
788        {
789            self.metrics.events_ingested_total.inc();
790            self.metrics
791                .events_ingested_by_type
792                .with_label_values(&[event.event_type_str()])
793                .inc();
794            self.metrics.storage_events_total.set(total_events as i64);
795        }
796
797        let mut total = self.total_ingested.write();
798        *total += 1;
799
800        #[cfg(feature = "server")]
801        timer.observe_duration();
802
803        tracing::debug!(
804            "Replicated event ingested: {} (offset: {})",
805            event.id,
806            offset
807        );
808
809        Ok(())
810    }
811
812    /// Get the current version for an entity (number of events appended for it).
813    /// Returns 0 if the entity has no events.
814    #[cfg_attr(feature = "hotpath", hotpath::measure)]
815    pub fn get_entity_version(&self, entity_id: &str) -> u64 {
816        self.entity_versions.get(entity_id).map_or(0, |v| *v)
817    }
818
819    /// Get the consumer registry for durable subscriptions.
820    pub fn consumer_registry(&self) -> &ConsumerRegistry {
821        &self.consumer_registry
822    }
823
824    /// Subscribe to every successfully-ingested event in this store.
825    ///
826    /// Returns a `tokio::sync::broadcast::Receiver` that yields an `Arc<Event>`
827    /// for each ingest. Always available — does not require the `server`
828    /// feature. Lagging receivers surface `RecvError::Lagged`.
829    pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<Arc<Event>> {
830        self.event_broadcast_tx.subscribe()
831    }
832
833    /// Replace the default in-memory consumer registry with a durable one.
834    ///
835    /// Called during startup when system repositories are available, so that
836    /// consumer cursors survive Core restarts via WAL persistence.
837    pub fn set_consumer_registry(&mut self, registry: Arc<ConsumerRegistry>) {
838        self.consumer_registry = registry;
839    }
840
841    /// Get the total number of events in the store (used as max offset for consumer ack).
842    pub fn total_events(&self) -> usize {
843        self.events.read().len()
844    }
845
846    /// Get events after a given offset, optionally filtered by event type prefixes.
847    /// Used by consumer polling to fetch unprocessed events.
848    pub fn events_after_offset(
849        &self,
850        offset: u64,
851        filters: &[String],
852        limit: usize,
853    ) -> Vec<(u64, Event)> {
854        let events = self.events.read();
855        let start = offset as usize;
856        if start >= events.len() {
857            return vec![];
858        }
859
860        events[start..]
861            .iter()
862            .enumerate()
863            .filter(|(_, event)| ConsumerRegistry::matches_filters(event.event_type_str(), filters))
864            .take(limit)
865            .map(|(i, event)| ((start + i + 1) as u64, event.clone()))
866            .collect()
867    }
868
869    /// Get the WebSocket manager for this store
870    #[cfg(feature = "server")]
871    pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
872        Arc::clone(&self.websocket_manager)
873    }
874
875    /// Get the snapshot manager for this store
876    pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
877        Arc::clone(&self.snapshot_manager)
878    }
879
880    /// Get the compaction manager for this store
881    pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
882        self.compaction_manager.as_ref().map(Arc::clone)
883    }
884
885    /// Get the schema registry for this store (v0.5 feature)
886    pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
887        Arc::clone(&self.schema_registry)
888    }
889
890    /// Get the replay manager for this store (v0.5 feature)
891    pub fn replay_manager(&self) -> Arc<ReplayManager> {
892        Arc::clone(&self.replay_manager)
893    }
894
895    /// Get the pipeline manager for this store (v0.5 feature)
896    pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
897        Arc::clone(&self.pipeline_manager)
898    }
899
900    /// Get the metrics registry for this store (v0.6 feature)
901    #[cfg(feature = "server")]
902    pub fn metrics(&self) -> Arc<MetricsRegistry> {
903        Arc::clone(&self.metrics)
904    }
905
906    /// Get the projection manager for this store (v0.7 feature)
907    pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
908        self.projections.read()
909    }
910
911    /// Register a custom projection at runtime.
912    ///
913    /// The projection will receive all future events via `process()`.
914    /// Historical events are **not** replayed — only events ingested after
915    /// registration will be processed by this projection.
916    ///
917    /// See [`register_projection_with_backfill`](Self::register_projection_with_backfill)
918    /// to also process historical events.
919    pub fn register_projection(
920        &self,
921        projection: Arc<dyn crate::application::services::projection::Projection>,
922    ) {
923        let mut pm = self.projections.write();
924        pm.register(projection);
925    }
926
927    /// Register a custom projection and replay all existing events through it.
928    ///
929    /// After registration, the projection will also receive all future events.
930    /// Historical events are replayed under a read lock — the projection's
931    /// internal state (typically DashMap) handles concurrent access.
932    pub fn register_projection_with_backfill(
933        &self,
934        projection: &Arc<dyn crate::application::services::projection::Projection>,
935    ) -> Result<()> {
936        // First register so future events are processed
937        {
938            let mut pm = self.projections.write();
939            pm.register(Arc::clone(projection));
940        }
941
942        // Then replay existing events under read lock
943        let events = self.events.read();
944        for event in events.iter() {
945            projection.process(event)?;
946        }
947
948        Ok(())
949    }
950
951    /// Get the projection state cache for this store (v0.7 feature)
952    /// Used by Elixir Query Service for state synchronization
953    pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
954        Arc::clone(&self.projection_state_cache)
955    }
956
957    /// Get the projection status map (v0.13 feature)
958    pub fn projection_status(&self) -> Arc<DashMap<String, String>> {
959        Arc::clone(&self.projection_status)
960    }
961
962    /// Get the webhook registry for this store (v0.11 feature)
963    /// Geospatial index for coordinate-based queries (v2.0 feature)
964    pub fn geo_index(&self) -> Arc<GeoIndex> {
965        self.geo_index.clone()
966    }
967
968    /// Exactly-once processing registry (v2.0 feature)
969    pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
970        self.exactly_once.clone()
971    }
972
973    /// Schema evolution manager (v2.0 feature)
974    pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
975        self.schema_evolution.clone()
976    }
977
978    /// Get a read-locked snapshot of all events (for EventQL/GraphQL queries).
979    ///
980    /// Returns an `Arc` reference to the internal events vec, avoiding a full
981    /// clone. The caller holds a read lock for the duration of the `Arc`
982    /// lifetime — prefer short-lived usage.
983    pub fn snapshot_events(&self) -> Vec<Event> {
984        self.events.read().clone()
985    }
986
987    /// Compact token events for an entity by replacing matching events with a
988    /// single merged event. Used by the embedded streaming feature.
989    ///
990    /// Returns `Ok(true)` if compaction was performed, `Ok(false)` if no
991    /// matching events were found.
992    ///
993    /// **Note:** The merged event is processed through projections *without*
994    /// clearing the removed events' projection state first. Projections that
995    /// accumulate state (e.g., counters) should be designed to handle this
996    /// (the merged event replaces individual tokens, not adds to them).
997    ///
998    /// **Crash safety:** The WAL append happens *after* the in-memory swap
999    /// under the write lock. If the process crashes before the WAL write,
1000    /// no change is persisted — WAL replay restores the pre-compaction state.
1001    /// If the process crashes after the WAL write, replay sees the merged
1002    /// event (and the original tokens, which are idempotent to replay since
1003    /// the merged event supersedes them).
1004    ///
1005    /// The write lock is held for the swap + WAL write + index rebuild.
1006    /// The index rebuild is O(N) over all events, which is acceptable for
1007    /// embedded workloads but should not be called in hot paths for large stores.
1008    pub fn compact_entity_tokens(
1009        &self,
1010        entity_id: &str,
1011        token_event_type: &str,
1012        merged_event: Event,
1013    ) -> Result<bool> {
1014        // Phase 1: Read-only check — do we have anything to compact?
1015        {
1016            let events = self.events.read();
1017            let has_tokens = events
1018                .iter()
1019                .any(|e| e.entity_id_str() == entity_id && e.event_type_str() == token_event_type);
1020            if !has_tokens {
1021                return Ok(false);
1022            }
1023        }
1024
1025        // Phase 2: Process merged event through projections (no write lock held)
1026        let projections = self.projections.read();
1027        projections.process_event(&merged_event)?;
1028        drop(projections);
1029
1030        // Phase 3: Acquire write lock for the swap + WAL + index rebuild
1031        let mut events = self.events.write();
1032
1033        events.retain(|e| {
1034            !(e.entity_id_str() == entity_id && e.event_type_str() == token_event_type)
1035        });
1036
1037        events.push(merged_event.clone());
1038
1039        // WAL append inside write lock: crash before this line = no change persisted.
1040        // Crash after = merged event in WAL, original tokens also in WAL but
1041        // superseded by the merged event's entity_id + event_type.
1042        if let Some(ref wal) = self.wal {
1043            wal.append(merged_event)?;
1044        }
1045
1046        // Rebuild entire index since retain() shifted event positions.
1047        // Errors here indicate a corrupt event (missing entity_id/event_type)
1048        // which should not happen for well-formed events. Log and continue
1049        // rather than failing the entire compaction.
1050        self.index.clear();
1051        for (offset, event) in events.iter().enumerate() {
1052            if let Err(e) = self.index.index_event(
1053                event.id,
1054                event.entity_id_str(),
1055                event.event_type_str(),
1056                event.timestamp,
1057                offset,
1058            ) {
1059                tracing::warn!(
1060                    event_id = %event.id,
1061                    offset,
1062                    "Failed to re-index event during compaction: {e}"
1063                );
1064            }
1065        }
1066
1067        Ok(true)
1068    }
1069
1070    #[cfg(feature = "server")]
1071    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
1072        Arc::clone(&self.webhook_registry)
1073    }
1074
1075    /// Set the channel for async webhook delivery.
1076    /// Called during server startup to wire the delivery worker.
1077    #[cfg(feature = "server")]
1078    pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
1079        *self.webhook_tx.write() = Some(tx);
1080        tracing::info!("Webhook delivery channel connected");
1081    }
1082
1083    /// Dispatch matching webhooks for a given event (non-blocking).
1084    #[cfg(feature = "server")]
1085    fn dispatch_webhooks(&self, event: &Event) {
1086        let matching = self.webhook_registry.find_matching(event);
1087        if matching.is_empty() {
1088            return;
1089        }
1090
1091        let tx_guard = self.webhook_tx.read();
1092        if let Some(ref tx) = *tx_guard {
1093            for webhook in matching {
1094                let task = WebhookDeliveryTask {
1095                    webhook,
1096                    event: event.clone(),
1097                };
1098                if let Err(e) = tx.send(task) {
1099                    tracing::warn!("Failed to queue webhook delivery: {}", e);
1100                }
1101            }
1102        }
1103    }
1104
1105    /// Manually flush any pending events to persistent storage
1106    pub fn flush_storage(&self) -> Result<()> {
1107        if let Some(ref storage) = self.storage {
1108            let storage = storage.read();
1109            storage.flush()?;
1110            tracing::info!("✅ Flushed events to persistent storage");
1111        }
1112        Ok(())
1113    }
1114
1115    /// Manually create a snapshot for an entity
1116    pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
1117        // Get all events for this entity
1118        let events = self.query(&QueryEventsRequest {
1119            entity_id: Some(entity_id.to_string()),
1120            event_type: None,
1121            tenant_id: None,
1122            as_of: None,
1123            since: None,
1124            until: None,
1125            limit: None,
1126            event_type_prefix: None,
1127            payload_filter: None,
1128        })?;
1129
1130        if events.is_empty() {
1131            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1132        }
1133
1134        // Build current state
1135        let mut state = serde_json::json!({});
1136        for event in &events {
1137            if let serde_json::Value::Object(ref mut state_map) = state
1138                && let serde_json::Value::Object(ref payload_map) = event.payload
1139            {
1140                for (key, value) in payload_map {
1141                    state_map.insert(key.clone(), value.clone());
1142                }
1143            }
1144        }
1145
1146        let last_event = events.last().unwrap();
1147        self.snapshot_manager.create_snapshot(
1148            entity_id,
1149            state,
1150            last_event.timestamp,
1151            events.len(),
1152            SnapshotType::Manual,
1153        )?;
1154
1155        Ok(())
1156    }
1157
1158    /// Check and create automatic snapshots if needed
1159    fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
1160        // Count events for this entity
1161        let entity_event_count = self
1162            .index
1163            .get_by_entity(entity_id)
1164            .map_or(0, |entries| entries.len());
1165
1166        if self.snapshot_manager.should_create_snapshot(
1167            entity_id,
1168            entity_event_count,
1169            event.timestamp,
1170        ) {
1171            // Create snapshot in background (don't block ingestion)
1172            if let Err(e) = self.create_snapshot(entity_id) {
1173                tracing::warn!(
1174                    "Failed to create automatic snapshot for {}: {}",
1175                    entity_id,
1176                    e
1177                );
1178            }
1179        }
1180    }
1181
1182    /// Validate an event before ingestion
1183    fn validate_event(&self, event: &Event) -> Result<()> {
1184        // EntityId and EventType value objects already validate non-empty in their constructors
1185        // So these checks are now redundant, but we keep them for explicit validation
1186        if event.entity_id_str().is_empty() {
1187            return Err(AllSourceError::ValidationError(
1188                "entity_id cannot be empty".to_string(),
1189            ));
1190        }
1191
1192        if event.event_type_str().is_empty() {
1193            return Err(AllSourceError::ValidationError(
1194                "event_type cannot be empty".to_string(),
1195            ));
1196        }
1197
1198        // Reject system namespace events from user-facing ingestion.
1199        // System events are written exclusively via SystemMetadataStore.
1200        if event.event_type().is_system() {
1201            return Err(AllSourceError::ValidationError(
1202                "Event types starting with '_system.' are reserved for internal use".to_string(),
1203            ));
1204        }
1205
1206        Ok(())
1207    }
1208
1209    /// Reset a projection by clearing its state and reprocessing all events
1210    pub fn reset_projection(&self, name: &str) -> Result<usize> {
1211        let projection_manager = self.projections.read();
1212        let projection = projection_manager.get_projection(name).ok_or_else(|| {
1213            AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1214        })?;
1215
1216        // Clear existing state
1217        projection.clear();
1218
1219        // Clear cached state for this projection
1220        let prefix = format!("{name}:");
1221        let keys_to_remove: Vec<String> = self
1222            .projection_state_cache
1223            .iter()
1224            .filter(|entry| entry.key().starts_with(&prefix))
1225            .map(|entry| entry.key().clone())
1226            .collect();
1227        for key in keys_to_remove {
1228            self.projection_state_cache.remove(&key);
1229        }
1230
1231        // Reprocess all events through this projection
1232        let events = self.events.read();
1233        let mut reprocessed = 0usize;
1234        for event in events.iter() {
1235            if projection.process(event).is_ok() {
1236                reprocessed += 1;
1237            }
1238        }
1239
1240        Ok(reprocessed)
1241    }
1242
1243    /// Get a single event by its UUID
1244    pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
1245        if let Some(offset) = self.index.get_by_id(event_id) {
1246            let events = self.events.read();
1247            Ok(events.get(offset).cloned())
1248        } else {
1249            Ok(None)
1250        }
1251    }
1252
1253    /// Query events based on filters (optimized with indices)
1254    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1255    pub fn query(&self, request: &QueryEventsRequest) -> Result<Vec<Event>> {
1256        // Determine query type for metrics (v0.6 feature)
1257        let query_type = if request.entity_id.is_some() {
1258            "entity"
1259        } else if request.event_type.is_some() {
1260            "type"
1261        } else if request.event_type_prefix.is_some() {
1262            "type_prefix"
1263        } else {
1264            "full_scan"
1265        };
1266
1267        // Start metrics timer (v0.6 feature)
1268        #[cfg(feature = "server")]
1269        let timer = self
1270            .metrics
1271            .query_duration_seconds
1272            .with_label_values(&[query_type])
1273            .start_timer();
1274
1275        // Increment query counter (v0.6 feature)
1276        #[cfg(feature = "server")]
1277        self.metrics
1278            .queries_total
1279            .with_label_values(&[query_type])
1280            .inc();
1281
1282        let events = self.events.read();
1283
1284        // Use index for fast lookups
1285        let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
1286            // Use entity index
1287            self.index
1288                .get_by_entity(entity_id)
1289                .map(|entries| self.filter_entries(entries, request))
1290                .unwrap_or_default()
1291        } else if let Some(event_type) = &request.event_type {
1292            // Use type index (exact match)
1293            self.index
1294                .get_by_type(event_type)
1295                .map(|entries| self.filter_entries(entries, request))
1296                .unwrap_or_default()
1297        } else if let Some(prefix) = &request.event_type_prefix {
1298            // Use type index (prefix match)
1299            let entries = self.index.get_by_type_prefix(prefix);
1300            self.filter_entries(entries, request)
1301        } else {
1302            // Full scan (less efficient but necessary for complex queries)
1303            (0..events.len()).collect()
1304        };
1305
1306        // Fetch events and apply remaining filters
1307        let mut results: Vec<Event> = offsets
1308            .iter()
1309            .filter_map(|&offset| events.get(offset).cloned())
1310            .filter(|event| self.apply_filters(event, request))
1311            .collect();
1312
1313        // Sort by timestamp (ascending)
1314        results.sort_by_key(|x| x.timestamp);
1315
1316        // Apply limit
1317        if let Some(limit) = request.limit {
1318            results.truncate(limit);
1319        }
1320
1321        // Record query results count (v0.6 feature)
1322        #[cfg(feature = "server")]
1323        {
1324            self.metrics
1325                .query_results_total
1326                .with_label_values(&[query_type])
1327                .inc_by(results.len() as u64);
1328            timer.observe_duration();
1329        }
1330
1331        Ok(results)
1332    }
1333
1334    /// Filter index entries based on query parameters
1335    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1336    fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
1337        entries
1338            .into_iter()
1339            .filter(|entry| {
1340                // Time filters
1341                if let Some(as_of) = request.as_of
1342                    && entry.timestamp > as_of
1343                {
1344                    return false;
1345                }
1346                if let Some(since) = request.since
1347                    && entry.timestamp < since
1348                {
1349                    return false;
1350                }
1351                if let Some(until) = request.until
1352                    && entry.timestamp > until
1353                {
1354                    return false;
1355                }
1356                true
1357            })
1358            .map(|entry| entry.offset)
1359            .collect()
1360    }
1361
1362    /// Apply filters to an event
1363    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1364    fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
1365        // Tenant isolation: if a tenant_id is specified, only return events from that tenant
1366        if let Some(ref tid) = request.tenant_id
1367            && event.tenant_id_str() != tid
1368        {
1369            return false;
1370        }
1371
1372        // Additional type filter if entity was primary
1373        if request.entity_id.is_some()
1374            && let Some(ref event_type) = request.event_type
1375            && event.event_type_str() != event_type
1376        {
1377            return false;
1378        }
1379
1380        // Additional prefix filter if entity was primary
1381        if request.entity_id.is_some()
1382            && let Some(ref prefix) = request.event_type_prefix
1383            && !event.event_type_str().starts_with(prefix)
1384        {
1385            return false;
1386        }
1387
1388        // Payload field filtering
1389        if let Some(ref filter_str) = request.payload_filter
1390            && let Ok(filter_obj) =
1391                serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(filter_str)
1392        {
1393            let payload = event.payload();
1394            for (key, expected_value) in &filter_obj {
1395                match payload.get(key) {
1396                    Some(actual_value) if actual_value == expected_value => {}
1397                    _ => return false,
1398                }
1399            }
1400        }
1401
1402        true
1403    }
1404
1405    /// Reconstruct entity state as of a specific timestamp
1406    /// v0.2: Now uses snapshots for fast reconstruction
1407    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1408    pub fn reconstruct_state(
1409        &self,
1410        entity_id: &str,
1411        as_of: Option<DateTime<Utc>>,
1412    ) -> Result<serde_json::Value> {
1413        // Try to find a snapshot to use as a base (v0.2 optimization)
1414        let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
1415            // Get snapshot closest to requested time
1416            if let Some(snapshot) = self
1417                .snapshot_manager
1418                .get_snapshot_as_of(entity_id, as_of_time)
1419            {
1420                tracing::debug!(
1421                    "Using snapshot from {} for entity {} (saved {} events)",
1422                    snapshot.as_of,
1423                    entity_id,
1424                    snapshot.event_count
1425                );
1426                (snapshot.state.clone(), Some(snapshot.as_of))
1427            } else {
1428                (serde_json::json!({}), None)
1429            }
1430        } else {
1431            // Get latest snapshot for current state
1432            if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
1433                tracing::debug!(
1434                    "Using latest snapshot from {} for entity {}",
1435                    snapshot.as_of,
1436                    entity_id
1437                );
1438                (snapshot.state.clone(), Some(snapshot.as_of))
1439            } else {
1440                (serde_json::json!({}), None)
1441            }
1442        };
1443
1444        // Query events after the snapshot (or all if no snapshot)
1445        let events = self.query(&QueryEventsRequest {
1446            entity_id: Some(entity_id.to_string()),
1447            event_type: None,
1448            tenant_id: None,
1449            as_of,
1450            since: since_timestamp,
1451            until: None,
1452            limit: None,
1453            event_type_prefix: None,
1454            payload_filter: None,
1455        })?;
1456
1457        // If no events and no snapshot, entity not found
1458        if events.is_empty() && since_timestamp.is_none() {
1459            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1460        }
1461
1462        // Merge events on top of snapshot (or from scratch if no snapshot)
1463        let mut merged_state = merged_state;
1464        for event in &events {
1465            if let serde_json::Value::Object(ref mut state_map) = merged_state
1466                && let serde_json::Value::Object(ref payload_map) = event.payload
1467            {
1468                for (key, value) in payload_map {
1469                    state_map.insert(key.clone(), value.clone());
1470                }
1471            }
1472        }
1473
1474        // Wrap with metadata
1475        let state = serde_json::json!({
1476            "entity_id": entity_id,
1477            "last_updated": events.last().map(|e| e.timestamp),
1478            "event_count": events.len(),
1479            "as_of": as_of,
1480            "current_state": merged_state,
1481            "history": events.iter().map(|e| {
1482                serde_json::json!({
1483                    "event_id": e.id,
1484                    "type": e.event_type,
1485                    "timestamp": e.timestamp,
1486                    "payload": e.payload
1487                })
1488            }).collect::<Vec<_>>()
1489        });
1490
1491        Ok(state)
1492    }
1493
1494    /// Get snapshot from projection (faster than reconstructing)
1495    pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
1496        let projections = self.projections.read();
1497
1498        if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
1499            && let Some(state) = snapshot_projection.get_state(entity_id)
1500        {
1501            return Ok(serde_json::json!({
1502                "entity_id": entity_id,
1503                "snapshot": state,
1504                "from_projection": "entity_snapshots"
1505            }));
1506        }
1507
1508        Err(AllSourceError::EntityNotFound(entity_id.to_string()))
1509    }
1510
1511    /// Get statistics about the event store
1512    pub fn stats(&self) -> StoreStats {
1513        let events = self.events.read();
1514        let index_stats = self.index.stats();
1515
1516        StoreStats {
1517            total_events: events.len(),
1518            total_entities: index_stats.total_entities,
1519            total_event_types: index_stats.total_event_types,
1520            total_ingested: *self.total_ingested.read(),
1521        }
1522    }
1523
1524    /// Get all unique streams (entity_ids) in the store
1525    pub fn list_streams(&self) -> Vec<StreamInfo> {
1526        self.index
1527            .get_all_entities()
1528            .into_iter()
1529            .map(|entity_id| {
1530                let event_count = self
1531                    .index
1532                    .get_by_entity(&entity_id)
1533                    .map_or(0, |entries| entries.len());
1534                let last_event_at = self
1535                    .index
1536                    .get_by_entity(&entity_id)
1537                    .and_then(|entries| entries.last().map(|e| e.timestamp));
1538                StreamInfo {
1539                    stream_id: entity_id,
1540                    event_count,
1541                    last_event_at,
1542                }
1543            })
1544            .collect()
1545    }
1546
1547    /// Get all unique event types in the store
1548    pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
1549        self.index
1550            .get_all_types()
1551            .into_iter()
1552            .map(|event_type| {
1553                let event_count = self
1554                    .index
1555                    .get_by_type(&event_type)
1556                    .map_or(0, |entries| entries.len());
1557                let last_event_at = self
1558                    .index
1559                    .get_by_type(&event_type)
1560                    .and_then(|entries| entries.last().map(|e| e.timestamp));
1561                EventTypeInfo {
1562                    event_type,
1563                    event_count,
1564                    last_event_at,
1565                }
1566            })
1567            .collect()
1568    }
1569
1570    /// Attach a broadcast sender to the WAL for replication.
1571    ///
1572    /// Thread-safe: can be called through `Arc<EventStore>` at runtime.
1573    /// Used during initial setup and during follower → leader promotion.
1574    /// When set, every WAL append publishes the entry to the broadcast
1575    /// channel so the WAL shipper can stream it to followers.
1576    pub fn enable_wal_replication(
1577        &self,
1578        tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
1579    ) {
1580        if let Some(ref wal_arc) = self.wal {
1581            wal_arc.set_replication_tx(tx);
1582            tracing::info!("WAL replication broadcast enabled");
1583        } else {
1584            tracing::warn!("Cannot enable WAL replication: WAL is not configured");
1585        }
1586    }
1587
1588    /// Get a reference to the WAL (if configured).
1589    /// Used by the replication catch-up protocol to determine oldest available offset.
1590    pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1591        self.wal.as_ref()
1592    }
1593
1594    /// Get a reference to the Parquet storage (if configured).
1595    /// Used by the replication catch-up protocol to stream snapshot files to followers.
1596    pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1597        self.storage.as_ref()
1598    }
1599}
1600
1601/// Configuration for EventStore
1602#[derive(Debug, Clone, Default)]
1603pub struct EventStoreConfig {
1604    /// Optional directory for persistent Parquet storage (v0.2 feature)
1605    pub storage_dir: Option<PathBuf>,
1606
1607    /// Snapshot configuration (v0.2 feature)
1608    pub snapshot_config: SnapshotConfig,
1609
1610    /// Optional directory for WAL (Write-Ahead Log) (v0.2 feature)
1611    pub wal_dir: Option<PathBuf>,
1612
1613    /// WAL configuration (v0.2 feature)
1614    pub wal_config: WALConfig,
1615
1616    /// Compaction configuration (v0.2 feature)
1617    pub compaction_config: CompactionConfig,
1618
1619    /// Schema registry configuration (v0.5 feature)
1620    pub schema_registry_config: SchemaRegistryConfig,
1621
1622    /// Optional directory for system metadata storage (dogfood feature).
1623    /// When set, operational metadata (tenants, config, audit) is stored
1624    /// using AllSource's own event store rather than an external database.
1625    /// Defaults to `{storage_dir}/__system/` when storage_dir is set.
1626    pub system_data_dir: Option<PathBuf>,
1627
1628    /// Name of the default tenant to auto-create on first boot.
1629    pub bootstrap_tenant: Option<String>,
1630}
1631
1632impl EventStoreConfig {
1633    /// Create config with persistent storage enabled
1634    pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
1635        Self {
1636            storage_dir: Some(storage_dir.into()),
1637            ..Self::default()
1638        }
1639    }
1640
1641    /// Create config with custom snapshot settings
1642    pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
1643        Self {
1644            snapshot_config,
1645            ..Self::default()
1646        }
1647    }
1648
1649    /// Create config with WAL enabled
1650    pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
1651        Self {
1652            wal_dir: Some(wal_dir.into()),
1653            wal_config,
1654            ..Self::default()
1655        }
1656    }
1657
1658    /// Create config with both persistence and snapshots
1659    pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
1660        Self {
1661            storage_dir: Some(storage_dir.into()),
1662            snapshot_config,
1663            ..Self::default()
1664        }
1665    }
1666
1667    /// Create production config with all features enabled
1668    pub fn production(
1669        storage_dir: impl Into<PathBuf>,
1670        wal_dir: impl Into<PathBuf>,
1671        snapshot_config: SnapshotConfig,
1672        wal_config: WALConfig,
1673        compaction_config: CompactionConfig,
1674    ) -> Self {
1675        let storage_dir = storage_dir.into();
1676        let system_data_dir = storage_dir.join("__system");
1677        Self {
1678            storage_dir: Some(storage_dir),
1679            snapshot_config,
1680            wal_dir: Some(wal_dir.into()),
1681            wal_config,
1682            compaction_config,
1683            system_data_dir: Some(system_data_dir),
1684            ..Self::default()
1685        }
1686    }
1687
1688    /// Resolve the effective system data directory.
1689    ///
1690    /// If explicitly set, returns that. Otherwise, derives from storage_dir.
1691    /// Returns None if neither is configured (in-memory mode).
1692    pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
1693        self.system_data_dir
1694            .clone()
1695            .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
1696    }
1697
1698    /// Build config from environment variables.
1699    ///
1700    /// Reads `ALLSOURCE_DATA_DIR`, `ALLSOURCE_STORAGE_DIR`, `ALLSOURCE_WAL_DIR`,
1701    /// and `ALLSOURCE_WAL_ENABLED` to determine persistence mode.
1702    ///
1703    /// Returns `(config, description)` where description is a human-readable
1704    /// summary of the persistence mode for logging.
1705    pub fn from_env() -> (Self, &'static str) {
1706        Self::from_env_vars(
1707            std::env::var("ALLSOURCE_DATA_DIR")
1708                .ok()
1709                .filter(|s| !s.is_empty()),
1710            std::env::var("ALLSOURCE_STORAGE_DIR")
1711                .ok()
1712                .filter(|s| !s.is_empty()),
1713            std::env::var("ALLSOURCE_WAL_DIR")
1714                .ok()
1715                .filter(|s| !s.is_empty()),
1716            std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
1717        )
1718    }
1719
1720    /// Build config from explicit env-var values (testable without mutating process env).
1721    pub fn from_env_vars(
1722        data_dir: Option<String>,
1723        explicit_storage_dir: Option<String>,
1724        explicit_wal_dir: Option<String>,
1725        wal_enabled_var: Option<String>,
1726    ) -> (Self, &'static str) {
1727        let data_dir = data_dir.filter(|s| !s.is_empty());
1728        let storage_dir = explicit_storage_dir
1729            .filter(|s| !s.is_empty())
1730            .or_else(|| data_dir.as_ref().map(|d| format!("{d}/storage")));
1731        let wal_dir = explicit_wal_dir
1732            .filter(|s| !s.is_empty())
1733            .or_else(|| data_dir.as_ref().map(|d| format!("{d}/wal")));
1734        let wal_enabled = wal_enabled_var.is_none_or(|v| v == "true");
1735
1736        match (&storage_dir, &wal_dir) {
1737            (Some(sd), Some(wd)) if wal_enabled => {
1738                let config = Self::production(
1739                    sd,
1740                    wd,
1741                    SnapshotConfig::default(),
1742                    WALConfig::default(),
1743                    CompactionConfig::default(),
1744                );
1745                (config, "wal+parquet")
1746            }
1747            (Some(sd), _) => {
1748                let config = Self::with_persistence(sd);
1749                (config, "parquet-only")
1750            }
1751            (_, Some(wd)) if wal_enabled => {
1752                let config = Self::with_wal(wd, WALConfig::default());
1753                (config, "wal-only")
1754            }
1755            _ => (Self::default(), "in-memory"),
1756        }
1757    }
1758}
1759
1760#[derive(Debug, serde::Serialize)]
1761pub struct StoreStats {
1762    pub total_events: usize,
1763    pub total_entities: usize,
1764    pub total_event_types: usize,
1765    pub total_ingested: u64,
1766}
1767
1768/// Information about a stream (entity_id)
1769#[derive(Debug, Clone, serde::Serialize)]
1770pub struct StreamInfo {
1771    /// The stream identifier (entity_id)
1772    pub stream_id: String,
1773    /// Total number of events in this stream
1774    pub event_count: usize,
1775    /// Timestamp of the last event in this stream
1776    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1777}
1778
1779/// Information about an event type
1780#[derive(Debug, Clone, serde::Serialize)]
1781pub struct EventTypeInfo {
1782    /// The event type name
1783    pub event_type: String,
1784    /// Total number of events of this type
1785    pub event_count: usize,
1786    /// Timestamp of the last event of this type
1787    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1788}
1789
1790impl Default for EventStore {
1791    fn default() -> Self {
1792        Self::new()
1793    }
1794}
1795
1796#[cfg(test)]
1797mod tests {
1798    use super::*;
1799    use crate::domain::entities::Event;
1800    use tempfile::TempDir;
1801
1802    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1803        Event::from_strings(
1804            event_type.to_string(),
1805            entity_id.to_string(),
1806            "default".to_string(),
1807            serde_json::json!({"name": "Test", "value": 42}),
1808            None,
1809        )
1810        .unwrap()
1811    }
1812
1813    fn create_test_event_with_payload(
1814        entity_id: &str,
1815        event_type: &str,
1816        payload: serde_json::Value,
1817    ) -> Event {
1818        Event::from_strings(
1819            event_type.to_string(),
1820            entity_id.to_string(),
1821            "default".to_string(),
1822            payload,
1823            None,
1824        )
1825        .unwrap()
1826    }
1827
1828    #[test]
1829    fn test_event_store_new() {
1830        let store = EventStore::new();
1831        assert_eq!(store.stats().total_events, 0);
1832        assert_eq!(store.stats().total_entities, 0);
1833    }
1834
1835    #[test]
1836    fn test_event_store_default() {
1837        let store = EventStore::default();
1838        assert_eq!(store.stats().total_events, 0);
1839    }
1840
1841    #[test]
1842    fn test_ingest_single_event() {
1843        let store = EventStore::new();
1844        let event = create_test_event("entity-1", "user.created");
1845
1846        store.ingest(&event).unwrap();
1847
1848        assert_eq!(store.stats().total_events, 1);
1849        assert_eq!(store.stats().total_ingested, 1);
1850    }
1851
1852    #[test]
1853    fn test_ingest_multiple_events() {
1854        let store = EventStore::new();
1855
1856        for i in 0..10 {
1857            let event = create_test_event(&format!("entity-{i}"), "user.created");
1858            store.ingest(&event).unwrap();
1859        }
1860
1861        assert_eq!(store.stats().total_events, 10);
1862        assert_eq!(store.stats().total_ingested, 10);
1863    }
1864
1865    #[test]
1866    fn test_query_by_entity_id() {
1867        let store = EventStore::new();
1868
1869        store
1870            .ingest(&create_test_event("entity-1", "user.created"))
1871            .unwrap();
1872        store
1873            .ingest(&create_test_event("entity-2", "user.created"))
1874            .unwrap();
1875        store
1876            .ingest(&create_test_event("entity-1", "user.updated"))
1877            .unwrap();
1878
1879        let results = store
1880            .query(&QueryEventsRequest {
1881                entity_id: Some("entity-1".to_string()),
1882                event_type: None,
1883                tenant_id: None,
1884                as_of: None,
1885                since: None,
1886                until: None,
1887                limit: None,
1888                event_type_prefix: None,
1889                payload_filter: None,
1890            })
1891            .unwrap();
1892
1893        assert_eq!(results.len(), 2);
1894    }
1895
1896    #[test]
1897    fn test_query_by_event_type() {
1898        let store = EventStore::new();
1899
1900        store
1901            .ingest(&create_test_event("entity-1", "user.created"))
1902            .unwrap();
1903        store
1904            .ingest(&create_test_event("entity-2", "user.updated"))
1905            .unwrap();
1906        store
1907            .ingest(&create_test_event("entity-3", "user.created"))
1908            .unwrap();
1909
1910        let results = store
1911            .query(&QueryEventsRequest {
1912                entity_id: None,
1913                event_type: Some("user.created".to_string()),
1914                tenant_id: None,
1915                as_of: None,
1916                since: None,
1917                until: None,
1918                limit: None,
1919                event_type_prefix: None,
1920                payload_filter: None,
1921            })
1922            .unwrap();
1923
1924        assert_eq!(results.len(), 2);
1925    }
1926
1927    #[test]
1928    fn test_query_with_limit() {
1929        let store = EventStore::new();
1930
1931        for i in 0..10 {
1932            let event = create_test_event(&format!("entity-{i}"), "user.created");
1933            store.ingest(&event).unwrap();
1934        }
1935
1936        let results = store
1937            .query(&QueryEventsRequest {
1938                entity_id: None,
1939                event_type: None,
1940                tenant_id: None,
1941                as_of: None,
1942                since: None,
1943                until: None,
1944                limit: Some(5),
1945                event_type_prefix: None,
1946                payload_filter: None,
1947            })
1948            .unwrap();
1949
1950        assert_eq!(results.len(), 5);
1951    }
1952
1953    #[test]
1954    fn test_query_empty_store() {
1955        let store = EventStore::new();
1956
1957        let results = store
1958            .query(&QueryEventsRequest {
1959                entity_id: Some("non-existent".to_string()),
1960                event_type: None,
1961                tenant_id: None,
1962                as_of: None,
1963                since: None,
1964                until: None,
1965                limit: None,
1966                event_type_prefix: None,
1967                payload_filter: None,
1968            })
1969            .unwrap();
1970
1971        assert!(results.is_empty());
1972    }
1973
1974    #[test]
1975    fn test_reconstruct_state() {
1976        let store = EventStore::new();
1977
1978        store
1979            .ingest(&create_test_event("entity-1", "user.created"))
1980            .unwrap();
1981
1982        let state = store.reconstruct_state("entity-1", None).unwrap();
1983        // The state is wrapped with metadata
1984        assert_eq!(state["current_state"]["name"], "Test");
1985        assert_eq!(state["current_state"]["value"], 42);
1986    }
1987
1988    #[test]
1989    fn test_reconstruct_state_not_found() {
1990        let store = EventStore::new();
1991
1992        let result = store.reconstruct_state("non-existent", None);
1993        assert!(result.is_err());
1994    }
1995
1996    #[test]
1997    fn test_get_snapshot_empty() {
1998        let store = EventStore::new();
1999
2000        let result = store.get_snapshot("non-existent");
2001        // Entity not found error is expected
2002        assert!(result.is_err());
2003    }
2004
2005    #[test]
2006    fn test_create_snapshot() {
2007        let store = EventStore::new();
2008
2009        store
2010            .ingest(&create_test_event("entity-1", "user.created"))
2011            .unwrap();
2012
2013        store.create_snapshot("entity-1").unwrap();
2014
2015        // Verify snapshot was created
2016        let snapshot = store.get_snapshot("entity-1").unwrap();
2017        assert!(snapshot != serde_json::json!(null));
2018    }
2019
2020    #[test]
2021    fn test_create_snapshot_entity_not_found() {
2022        let store = EventStore::new();
2023
2024        let result = store.create_snapshot("non-existent");
2025        assert!(result.is_err());
2026    }
2027
2028    #[test]
2029    fn test_websocket_manager() {
2030        let store = EventStore::new();
2031        let manager = store.websocket_manager();
2032        // Manager should be accessible
2033        assert!(Arc::strong_count(&manager) >= 1);
2034    }
2035
2036    #[test]
2037    fn test_snapshot_manager() {
2038        let store = EventStore::new();
2039        let manager = store.snapshot_manager();
2040        assert!(Arc::strong_count(&manager) >= 1);
2041    }
2042
2043    #[test]
2044    fn test_compaction_manager_none() {
2045        let store = EventStore::new();
2046        // Without storage_dir, compaction manager should be None
2047        assert!(store.compaction_manager().is_none());
2048    }
2049
2050    #[test]
2051    fn test_schema_registry() {
2052        let store = EventStore::new();
2053        let registry = store.schema_registry();
2054        assert!(Arc::strong_count(&registry) >= 1);
2055    }
2056
2057    #[test]
2058    fn test_replay_manager() {
2059        let store = EventStore::new();
2060        let manager = store.replay_manager();
2061        assert!(Arc::strong_count(&manager) >= 1);
2062    }
2063
2064    #[test]
2065    fn test_pipeline_manager() {
2066        let store = EventStore::new();
2067        let manager = store.pipeline_manager();
2068        assert!(Arc::strong_count(&manager) >= 1);
2069    }
2070
2071    #[test]
2072    fn test_projection_manager() {
2073        let store = EventStore::new();
2074        let manager = store.projection_manager();
2075        // Built-in projections should be registered
2076        let projections = manager.list_projections();
2077        assert!(projections.len() >= 2); // entity_snapshots and event_counters
2078    }
2079
2080    #[test]
2081    fn test_projection_state_cache() {
2082        let store = EventStore::new();
2083        let cache = store.projection_state_cache();
2084
2085        cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
2086        assert_eq!(cache.len(), 1);
2087
2088        let value = cache.get("test:key").unwrap();
2089        assert_eq!(value["value"], 123);
2090    }
2091
2092    #[test]
2093    fn test_metrics() {
2094        let store = EventStore::new();
2095        let metrics = store.metrics();
2096        assert!(Arc::strong_count(&metrics) >= 1);
2097    }
2098
2099    #[test]
2100    fn test_store_stats() {
2101        let store = EventStore::new();
2102
2103        store
2104            .ingest(&create_test_event("entity-1", "user.created"))
2105            .unwrap();
2106        store
2107            .ingest(&create_test_event("entity-2", "order.placed"))
2108            .unwrap();
2109
2110        let stats = store.stats();
2111        assert_eq!(stats.total_events, 2);
2112        assert_eq!(stats.total_entities, 2);
2113        assert_eq!(stats.total_event_types, 2);
2114        assert_eq!(stats.total_ingested, 2);
2115    }
2116
2117    #[test]
2118    fn test_event_store_config_default() {
2119        let config = EventStoreConfig::default();
2120        assert!(config.storage_dir.is_none());
2121        assert!(config.wal_dir.is_none());
2122    }
2123
2124    #[test]
2125    fn test_event_store_config_with_persistence() {
2126        let temp_dir = TempDir::new().unwrap();
2127        let config = EventStoreConfig::with_persistence(temp_dir.path());
2128
2129        assert!(config.storage_dir.is_some());
2130        assert!(config.wal_dir.is_none());
2131    }
2132
2133    #[test]
2134    fn test_event_store_config_with_wal() {
2135        let temp_dir = TempDir::new().unwrap();
2136        let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
2137
2138        assert!(config.storage_dir.is_none());
2139        assert!(config.wal_dir.is_some());
2140    }
2141
2142    #[test]
2143    fn test_event_store_config_with_all() {
2144        let temp_dir = TempDir::new().unwrap();
2145        let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
2146
2147        assert!(config.storage_dir.is_some());
2148    }
2149
2150    #[test]
2151    fn test_event_store_config_production() {
2152        let storage_dir = TempDir::new().unwrap();
2153        let wal_dir = TempDir::new().unwrap();
2154        let config = EventStoreConfig::production(
2155            storage_dir.path(),
2156            wal_dir.path(),
2157            SnapshotConfig::default(),
2158            WALConfig::default(),
2159            CompactionConfig::default(),
2160        );
2161
2162        assert!(config.storage_dir.is_some());
2163        assert!(config.wal_dir.is_some());
2164    }
2165
2166    // -----------------------------------------------------------------------
2167    // from_env_vars tests — verifies the env-var-to-config wiring that
2168    // caused the durability bug (events lost on restart) in v0.10.3.
2169    // -----------------------------------------------------------------------
2170
2171    #[test]
2172    fn test_from_env_vars_data_dir_enables_full_persistence() {
2173        let (config, mode) =
2174            EventStoreConfig::from_env_vars(Some("/app/data".to_string()), None, None, None);
2175        assert_eq!(mode, "wal+parquet");
2176        assert_eq!(
2177            config.storage_dir.unwrap().to_str().unwrap(),
2178            "/app/data/storage"
2179        );
2180        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
2181    }
2182
2183    #[test]
2184    fn test_from_env_vars_explicit_dirs() {
2185        let (config, mode) = EventStoreConfig::from_env_vars(
2186            None,
2187            Some("/custom/storage".to_string()),
2188            Some("/custom/wal".to_string()),
2189            None,
2190        );
2191        assert_eq!(mode, "wal+parquet");
2192        assert_eq!(
2193            config.storage_dir.unwrap().to_str().unwrap(),
2194            "/custom/storage"
2195        );
2196        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
2197    }
2198
2199    #[test]
2200    fn test_from_env_vars_wal_disabled() {
2201        let (config, mode) = EventStoreConfig::from_env_vars(
2202            Some("/app/data".to_string()),
2203            None,
2204            None,
2205            Some("false".to_string()),
2206        );
2207        assert_eq!(mode, "parquet-only");
2208        assert!(config.storage_dir.is_some());
2209        assert!(config.wal_dir.is_none());
2210    }
2211
2212    #[test]
2213    fn test_from_env_vars_no_dirs_is_in_memory() {
2214        let (config, mode) = EventStoreConfig::from_env_vars(None, None, None, None);
2215        assert_eq!(mode, "in-memory");
2216        assert!(config.storage_dir.is_none());
2217        assert!(config.wal_dir.is_none());
2218    }
2219
2220    #[test]
2221    fn test_from_env_vars_empty_strings_treated_as_none() {
2222        let (_, mode) = EventStoreConfig::from_env_vars(
2223            Some(String::new()),
2224            Some(String::new()),
2225            Some(String::new()),
2226            None,
2227        );
2228        assert_eq!(mode, "in-memory");
2229    }
2230
2231    #[test]
2232    fn test_from_env_vars_explicit_overrides_data_dir() {
2233        let (config, mode) = EventStoreConfig::from_env_vars(
2234            Some("/app/data".to_string()),
2235            Some("/override/storage".to_string()),
2236            Some("/override/wal".to_string()),
2237            None,
2238        );
2239        assert_eq!(mode, "wal+parquet");
2240        assert_eq!(
2241            config.storage_dir.unwrap().to_str().unwrap(),
2242            "/override/storage"
2243        );
2244        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
2245    }
2246
2247    #[test]
2248    fn test_from_env_vars_wal_only() {
2249        let (config, mode) =
2250            EventStoreConfig::from_env_vars(None, None, Some("/wal/only".to_string()), None);
2251        assert_eq!(mode, "wal-only");
2252        assert!(config.storage_dir.is_none());
2253        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
2254    }
2255
2256    #[test]
2257    fn test_store_stats_serde() {
2258        let stats = StoreStats {
2259            total_events: 100,
2260            total_entities: 50,
2261            total_event_types: 10,
2262            total_ingested: 100,
2263        };
2264
2265        let json = serde_json::to_string(&stats).unwrap();
2266        assert!(json.contains("\"total_events\":100"));
2267        assert!(json.contains("\"total_entities\":50"));
2268    }
2269
2270    #[test]
2271    fn test_query_with_entity_and_type() {
2272        let store = EventStore::new();
2273
2274        store
2275            .ingest(&create_test_event("entity-1", "user.created"))
2276            .unwrap();
2277        store
2278            .ingest(&create_test_event("entity-1", "user.updated"))
2279            .unwrap();
2280        store
2281            .ingest(&create_test_event("entity-2", "user.created"))
2282            .unwrap();
2283
2284        let results = store
2285            .query(&QueryEventsRequest {
2286                entity_id: Some("entity-1".to_string()),
2287                event_type: Some("user.created".to_string()),
2288                tenant_id: None,
2289                as_of: None,
2290                since: None,
2291                until: None,
2292                limit: None,
2293                event_type_prefix: None,
2294                payload_filter: None,
2295            })
2296            .unwrap();
2297
2298        assert_eq!(results.len(), 1);
2299        assert_eq!(results[0].event_type_str(), "user.created");
2300    }
2301
2302    #[test]
2303    fn test_query_by_event_type_prefix() {
2304        let store = EventStore::new();
2305
2306        // Ingest events with various types
2307        store
2308            .ingest(&create_test_event("entity-1", "index.created"))
2309            .unwrap();
2310        store
2311            .ingest(&create_test_event("entity-2", "index.updated"))
2312            .unwrap();
2313        store
2314            .ingest(&create_test_event("entity-3", "trade.created"))
2315            .unwrap();
2316        store
2317            .ingest(&create_test_event("entity-4", "trade.completed"))
2318            .unwrap();
2319        store
2320            .ingest(&create_test_event("entity-5", "balance.updated"))
2321            .unwrap();
2322
2323        // Query with prefix "index." should return exactly 2
2324        let results = store
2325            .query(&QueryEventsRequest {
2326                entity_id: None,
2327                event_type: None,
2328                tenant_id: None,
2329                as_of: None,
2330                since: None,
2331                until: None,
2332                limit: None,
2333                event_type_prefix: Some("index.".to_string()),
2334                payload_filter: None,
2335            })
2336            .unwrap();
2337
2338        assert_eq!(results.len(), 2);
2339        assert!(
2340            results
2341                .iter()
2342                .all(|e| e.event_type_str().starts_with("index."))
2343        );
2344    }
2345
2346    #[test]
2347    fn test_query_by_event_type_prefix_empty_returns_all() {
2348        let store = EventStore::new();
2349
2350        store
2351            .ingest(&create_test_event("entity-1", "index.created"))
2352            .unwrap();
2353        store
2354            .ingest(&create_test_event("entity-2", "trade.created"))
2355            .unwrap();
2356
2357        // Empty prefix matches all types
2358        let results = store
2359            .query(&QueryEventsRequest {
2360                entity_id: None,
2361                event_type: None,
2362                tenant_id: None,
2363                as_of: None,
2364                since: None,
2365                until: None,
2366                limit: None,
2367                event_type_prefix: Some(String::new()),
2368                payload_filter: None,
2369            })
2370            .unwrap();
2371
2372        assert_eq!(results.len(), 2);
2373    }
2374
2375    #[test]
2376    fn test_query_by_event_type_prefix_no_match() {
2377        let store = EventStore::new();
2378
2379        store
2380            .ingest(&create_test_event("entity-1", "index.created"))
2381            .unwrap();
2382
2383        let results = store
2384            .query(&QueryEventsRequest {
2385                entity_id: None,
2386                event_type: None,
2387                tenant_id: None,
2388                as_of: None,
2389                since: None,
2390                until: None,
2391                limit: None,
2392                event_type_prefix: Some("nonexistent.".to_string()),
2393                payload_filter: None,
2394            })
2395            .unwrap();
2396
2397        assert!(results.is_empty());
2398    }
2399
2400    #[test]
2401    fn test_query_by_entity_with_type_prefix() {
2402        let store = EventStore::new();
2403
2404        store
2405            .ingest(&create_test_event("entity-1", "index.created"))
2406            .unwrap();
2407        store
2408            .ingest(&create_test_event("entity-1", "trade.created"))
2409            .unwrap();
2410        store
2411            .ingest(&create_test_event("entity-2", "index.updated"))
2412            .unwrap();
2413
2414        // Query entity-1 with prefix "index." should return 1
2415        let results = store
2416            .query(&QueryEventsRequest {
2417                entity_id: Some("entity-1".to_string()),
2418                event_type: None,
2419                tenant_id: None,
2420                as_of: None,
2421                since: None,
2422                until: None,
2423                limit: None,
2424                event_type_prefix: Some("index.".to_string()),
2425                payload_filter: None,
2426            })
2427            .unwrap();
2428
2429        assert_eq!(results.len(), 1);
2430        assert_eq!(results[0].event_type_str(), "index.created");
2431    }
2432
2433    #[test]
2434    fn test_query_prefix_with_limit() {
2435        let store = EventStore::new();
2436
2437        for i in 0..5 {
2438            store
2439                .ingest(&create_test_event(&format!("entity-{i}"), "index.created"))
2440                .unwrap();
2441        }
2442
2443        let results = store
2444            .query(&QueryEventsRequest {
2445                entity_id: None,
2446                event_type: None,
2447                tenant_id: None,
2448                as_of: None,
2449                since: None,
2450                until: None,
2451                limit: Some(3),
2452                event_type_prefix: Some("index.".to_string()),
2453                payload_filter: None,
2454            })
2455            .unwrap();
2456
2457        assert_eq!(results.len(), 3);
2458    }
2459
2460    #[test]
2461    fn test_query_prefix_alongside_existing_filters() {
2462        let store = EventStore::new();
2463
2464        store
2465            .ingest(&create_test_event("entity-1", "index.created"))
2466            .unwrap();
2467        // Sleep briefly to ensure different timestamps
2468        std::thread::sleep(std::time::Duration::from_millis(10));
2469        store
2470            .ingest(&create_test_event("entity-2", "index.strategy.updated"))
2471            .unwrap();
2472        std::thread::sleep(std::time::Duration::from_millis(10));
2473        store
2474            .ingest(&create_test_event("entity-3", "index.deleted"))
2475            .unwrap();
2476
2477        // Prefix with limit
2478        let results = store
2479            .query(&QueryEventsRequest {
2480                entity_id: None,
2481                event_type: None,
2482                tenant_id: None,
2483                as_of: None,
2484                since: None,
2485                until: None,
2486                limit: Some(2),
2487                event_type_prefix: Some("index.".to_string()),
2488                payload_filter: None,
2489            })
2490            .unwrap();
2491
2492        assert_eq!(results.len(), 2);
2493    }
2494
2495    #[test]
2496    fn test_query_with_payload_filter() {
2497        let store = EventStore::new();
2498
2499        // Ingest 5 events with user_id=alice
2500        for i in 0..5 {
2501            store
2502                .ingest(&create_test_event_with_payload(
2503                    &format!("entity-{i}"),
2504                    "user.action",
2505                    serde_json::json!({"user_id": "alice", "action": "click"}),
2506                ))
2507                .unwrap();
2508        }
2509        // Ingest 5 events with user_id=bob
2510        for i in 5..10 {
2511            store
2512                .ingest(&create_test_event_with_payload(
2513                    &format!("entity-{i}"),
2514                    "user.action",
2515                    serde_json::json!({"user_id": "bob", "action": "view"}),
2516                ))
2517                .unwrap();
2518        }
2519
2520        // Filter for alice
2521        let results = store
2522            .query(&QueryEventsRequest {
2523                entity_id: None,
2524                event_type: Some("user.action".to_string()),
2525                tenant_id: None,
2526                as_of: None,
2527                since: None,
2528                until: None,
2529                limit: None,
2530                event_type_prefix: None,
2531                payload_filter: Some(r#"{"user_id":"alice"}"#.to_string()),
2532            })
2533            .unwrap();
2534
2535        assert_eq!(results.len(), 5);
2536    }
2537
2538    #[test]
2539    fn test_query_payload_filter_non_existent_field() {
2540        let store = EventStore::new();
2541
2542        store
2543            .ingest(&create_test_event_with_payload(
2544                "entity-1",
2545                "user.action",
2546                serde_json::json!({"user_id": "alice"}),
2547            ))
2548            .unwrap();
2549
2550        // Filter for a field that doesn't exist — returns 0, not error
2551        let results = store
2552            .query(&QueryEventsRequest {
2553                entity_id: None,
2554                event_type: None,
2555                tenant_id: None,
2556                as_of: None,
2557                since: None,
2558                until: None,
2559                limit: None,
2560                event_type_prefix: None,
2561                payload_filter: Some(r#"{"nonexistent":"value"}"#.to_string()),
2562            })
2563            .unwrap();
2564
2565        assert!(results.is_empty());
2566    }
2567
2568    #[test]
2569    fn test_query_payload_filter_with_prefix() {
2570        let store = EventStore::new();
2571
2572        store
2573            .ingest(&create_test_event_with_payload(
2574                "entity-1",
2575                "index.created",
2576                serde_json::json!({"status": "active"}),
2577            ))
2578            .unwrap();
2579        store
2580            .ingest(&create_test_event_with_payload(
2581                "entity-2",
2582                "index.created",
2583                serde_json::json!({"status": "inactive"}),
2584            ))
2585            .unwrap();
2586        store
2587            .ingest(&create_test_event_with_payload(
2588                "entity-3",
2589                "trade.created",
2590                serde_json::json!({"status": "active"}),
2591            ))
2592            .unwrap();
2593
2594        // Combine prefix + payload filter
2595        let results = store
2596            .query(&QueryEventsRequest {
2597                entity_id: None,
2598                event_type: None,
2599                tenant_id: None,
2600                as_of: None,
2601                since: None,
2602                until: None,
2603                limit: None,
2604                event_type_prefix: Some("index.".to_string()),
2605                payload_filter: Some(r#"{"status":"active"}"#.to_string()),
2606            })
2607            .unwrap();
2608
2609        assert_eq!(results.len(), 1);
2610        assert_eq!(results[0].entity_id().to_string(), "entity-1");
2611    }
2612
2613    #[test]
2614    fn test_flush_storage_no_storage() {
2615        let store = EventStore::new();
2616        // Without storage, flush should succeed (no-op)
2617        let result = store.flush_storage();
2618        assert!(result.is_ok());
2619    }
2620
2621    #[test]
2622    fn test_state_evolution() {
2623        let store = EventStore::new();
2624
2625        // Initial state
2626        store
2627            .ingest(
2628                &Event::from_strings(
2629                    "user.created".to_string(),
2630                    "user-1".to_string(),
2631                    "default".to_string(),
2632                    serde_json::json!({"name": "Alice", "age": 25}),
2633                    None,
2634                )
2635                .unwrap(),
2636            )
2637            .unwrap();
2638
2639        // Update state
2640        store
2641            .ingest(
2642                &Event::from_strings(
2643                    "user.updated".to_string(),
2644                    "user-1".to_string(),
2645                    "default".to_string(),
2646                    serde_json::json!({"age": 26}),
2647                    None,
2648                )
2649                .unwrap(),
2650            )
2651            .unwrap();
2652
2653        let state = store.reconstruct_state("user-1", None).unwrap();
2654        // The state is wrapped with metadata
2655        assert_eq!(state["current_state"]["name"], "Alice");
2656        assert_eq!(state["current_state"]["age"], 26);
2657    }
2658
2659    #[test]
2660    fn test_reject_system_event_types() {
2661        let store = EventStore::new();
2662
2663        // System event types should be rejected via user-facing ingestion
2664        let event = Event::reconstruct_from_strings(
2665            uuid::Uuid::new_v4(),
2666            "_system.tenant.created".to_string(),
2667            "_system:tenant:acme".to_string(),
2668            "_system".to_string(),
2669            serde_json::json!({"name": "ACME"}),
2670            chrono::Utc::now(),
2671            None,
2672            1,
2673        );
2674
2675        let result = store.ingest(&event);
2676        assert!(result.is_err());
2677        let err = result.unwrap_err();
2678        assert!(
2679            err.to_string().contains("reserved for internal use"),
2680            "Expected system namespace rejection, got: {err}"
2681        );
2682    }
2683
2684    // -----------------------------------------------------------------------
2685    // Crash recovery: WAL events survive restart via Parquet checkpoint.
2686    // Regression test for GitHub issue #84 — flush_storage() was a no-op
2687    // during recovery because events were never buffered into Parquet's
2688    // current_batch before flushing.
2689    // -----------------------------------------------------------------------
2690
2691    #[test]
2692    fn test_wal_recovery_checkpoints_to_parquet() {
2693        let data_dir = TempDir::new().unwrap();
2694        let storage_dir = data_dir.path().join("storage");
2695        let wal_dir = data_dir.path().join("wal");
2696
2697        // Session 1: ingest events with WAL + Parquet
2698        {
2699            let config = EventStoreConfig::production(
2700                &storage_dir,
2701                &wal_dir,
2702                SnapshotConfig::default(),
2703                WALConfig {
2704                    sync_on_write: true,
2705                    ..WALConfig::default()
2706                },
2707                CompactionConfig::default(),
2708            );
2709            let store = EventStore::with_config(config);
2710
2711            for i in 0..5 {
2712                let event = Event::from_strings(
2713                    "test.created".to_string(),
2714                    format!("entity-{i}"),
2715                    "default".to_string(),
2716                    serde_json::json!({"index": i}),
2717                    None,
2718                )
2719                .unwrap();
2720                store.ingest(&event).unwrap();
2721            }
2722
2723            assert_eq!(store.stats().total_events, 5);
2724
2725            // Do NOT call flush_storage or shutdown — simulate a crash.
2726            // Events are in WAL (sync_on_write: true) but NOT in Parquet.
2727        }
2728
2729        // Verify WAL file has data
2730        let wal_files: Vec<_> = std::fs::read_dir(&wal_dir)
2731            .unwrap()
2732            .filter_map(std::result::Result::ok)
2733            .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
2734            .collect();
2735        assert!(!wal_files.is_empty(), "WAL file should exist");
2736        let wal_size = wal_files[0].metadata().unwrap().len();
2737        assert!(wal_size > 0, "WAL file should have data (got 0 bytes)");
2738
2739        // Session 2: reopen — recovery should checkpoint WAL to Parquet, then truncate
2740        {
2741            let config = EventStoreConfig::production(
2742                &storage_dir,
2743                &wal_dir,
2744                SnapshotConfig::default(),
2745                WALConfig {
2746                    sync_on_write: true,
2747                    ..WALConfig::default()
2748                },
2749                CompactionConfig::default(),
2750            );
2751            let store = EventStore::with_config(config);
2752
2753            // Events should be recovered
2754            assert_eq!(
2755                store.stats().total_events,
2756                5,
2757                "Session 2 should have all 5 events after WAL recovery"
2758            );
2759
2760            // Parquet should now have files (checkpoint happened)
2761            let parquet_files: Vec<_> = std::fs::read_dir(&storage_dir)
2762                .unwrap()
2763                .filter_map(std::result::Result::ok)
2764                .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2765                .collect();
2766            assert!(
2767                !parquet_files.is_empty(),
2768                "Parquet file should exist after WAL checkpoint"
2769            );
2770        }
2771
2772        // Session 3: reopen again — events should load from Parquet (WAL was truncated)
2773        {
2774            let config = EventStoreConfig::production(
2775                &storage_dir,
2776                &wal_dir,
2777                SnapshotConfig::default(),
2778                WALConfig {
2779                    sync_on_write: true,
2780                    ..WALConfig::default()
2781                },
2782                CompactionConfig::default(),
2783            );
2784            let store = EventStore::with_config(config);
2785
2786            assert_eq!(
2787                store.stats().total_events,
2788                5,
2789                "Session 3 should still have all 5 events from Parquet"
2790            );
2791        }
2792    }
2793
2794    #[test]
2795    fn test_parquet_restore_surfaces_errors_not_silent() {
2796        // Write events with WAL+Parquet, flush to Parquet, then corrupt the
2797        // Parquet file. On reload, the error must be logged (not silently
2798        // swallowed as 0 events).
2799        let data_dir = TempDir::new().unwrap();
2800        let storage_dir = data_dir.path().join("storage");
2801        let wal_dir = data_dir.path().join("wal");
2802
2803        // Session 1: write events and flush to Parquet
2804        {
2805            let config = EventStoreConfig::production(
2806                &storage_dir,
2807                &wal_dir,
2808                SnapshotConfig::default(),
2809                WALConfig {
2810                    sync_on_write: true,
2811                    ..WALConfig::default()
2812                },
2813                CompactionConfig::default(),
2814            );
2815            let store = EventStore::with_config(config);
2816
2817            for i in 0..3 {
2818                let event = Event::from_strings(
2819                    "test.created".to_string(),
2820                    format!("entity-{i}"),
2821                    "default".to_string(),
2822                    serde_json::json!({"i": i}),
2823                    None,
2824                )
2825                .unwrap();
2826                store.ingest(&event).unwrap();
2827            }
2828
2829            store.flush_storage().unwrap();
2830            assert_eq!(store.stats().total_events, 3);
2831        }
2832
2833        // Verify parquet file exists
2834        let parquet_files: Vec<_> = std::fs::read_dir(&storage_dir)
2835            .unwrap()
2836            .filter_map(std::result::Result::ok)
2837            .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
2838            .collect();
2839        assert!(!parquet_files.is_empty(), "Parquet file must exist");
2840
2841        // Corrupt the parquet file
2842        std::fs::write(parquet_files[0].path(), b"corrupted data").unwrap();
2843
2844        // Truncate WAL so only Parquet matters
2845        for entry in std::fs::read_dir(&wal_dir).unwrap().flatten() {
2846            std::fs::write(entry.path(), b"").unwrap();
2847        }
2848
2849        // Session 2: reload — should NOT silently report 0 events.
2850        // The error is logged via tracing::error! which we can't capture in a
2851        // unit test, but we CAN verify the store has 0 events (previously this
2852        // looked identical to "no data on disk" — now there's an error log).
2853        // The key behavioral change is that with_config no longer uses a
2854        // let-chain that silently drops the Err variant.
2855        {
2856            let config = EventStoreConfig::production(
2857                &storage_dir,
2858                &wal_dir,
2859                SnapshotConfig::default(),
2860                WALConfig::default(),
2861                CompactionConfig::default(),
2862            );
2863            let store = EventStore::with_config(config);
2864
2865            // Store has 0 events because Parquet is corrupted — but the error
2866            // is now logged (not silently swallowed).
2867            assert_eq!(store.stats().total_events, 0);
2868        }
2869    }
2870}