Skip to main content

allsource_core/
store.rs

1#[cfg(feature = "server")]
2use crate::application::services::webhook::WebhookRegistry;
3#[cfg(feature = "server")]
4use crate::infrastructure::observability::metrics::MetricsRegistry;
5#[cfg(feature = "server")]
6use crate::infrastructure::web::websocket::WebSocketManager;
7use crate::{
8    application::{
9        dto::QueryEventsRequest,
10        services::{
11            consumer::ConsumerRegistry,
12            exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
13            pipeline::PipelineManager,
14            projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
15            replay::ReplayManager,
16            schema::{SchemaRegistry, SchemaRegistryConfig},
17            schema_evolution::SchemaEvolutionManager,
18        },
19    },
20    domain::entities::Event,
21    error::{AllSourceError, Result},
22    infrastructure::{
23        persistence::{
24            compaction::{CompactionConfig, CompactionManager},
25            index::{EventIndex, IndexEntry},
26            snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
27            storage::ParquetStorage,
28            tenant_loader::TenantLoader,
29            wal::{WALConfig, WriteAheadLog},
30        },
31        query::geospatial::GeoIndex,
32    },
33};
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use parking_lot::RwLock;
37use std::{path::PathBuf, sync::Arc};
38#[cfg(feature = "server")]
39use tokio::sync::mpsc;
40
41/// High-performance event store with columnar storage
42pub struct EventStore {
43    /// In-memory event storage
44    events: Arc<RwLock<Vec<Event>>>,
45
46    /// High-performance concurrent index
47    index: Arc<EventIndex>,
48
49    /// Projection manager for real-time aggregations
50    pub(crate) projections: Arc<RwLock<ProjectionManager>>,
51
52    /// Optional persistent storage (v0.2 feature)
53    storage: Option<Arc<RwLock<ParquetStorage>>>,
54
55    /// WebSocket manager for real-time event streaming (v0.2 feature)
56    #[cfg(feature = "server")]
57    websocket_manager: Arc<WebSocketManager>,
58
59    /// Snapshot manager for fast state recovery (v0.2 feature)
60    snapshot_manager: Arc<SnapshotManager>,
61
62    /// Write-Ahead Log for durability (v0.2 feature)
63    wal: Option<Arc<WriteAheadLog>>,
64
65    /// Compaction manager for Parquet optimization (v0.2 feature)
66    compaction_manager: Option<Arc<CompactionManager>>,
67
68    /// Schema registry for event validation (v0.5 feature)
69    schema_registry: Arc<SchemaRegistry>,
70
71    /// Replay manager for event replay and projection rebuilding (v0.5 feature)
72    replay_manager: Arc<ReplayManager>,
73
74    /// Pipeline manager for stream processing (v0.5 feature)
75    pipeline_manager: Arc<PipelineManager>,
76
77    /// Prometheus metrics registry (v0.6 feature)
78    #[cfg(feature = "server")]
79    metrics: Arc<MetricsRegistry>,
80
81    /// Total events ingested (for metrics)
82    total_ingested: Arc<RwLock<u64>>,
83
84    /// Projection state cache for Query Service integration (v0.7 feature)
85    /// Key format: "{projection_name}:{entity_id}"
86    /// This DashMap provides O(1) access with ~11.9 μs latency
87    projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
88
89    /// Projection status overrides (v0.13 feature)
90    /// Tracks pause/start state per projection name: "running" or "paused"
91    projection_status: Arc<DashMap<String, String>>,
92
93    /// Webhook registry for outbound event delivery (v0.11 feature)
94    #[cfg(feature = "server")]
95    webhook_registry: Arc<WebhookRegistry>,
96
97    /// Channel sender for async webhook delivery tasks
98    #[cfg(feature = "server")]
99    webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
100
101    /// Geospatial index for coordinate-based queries (v2.0 feature)
102    geo_index: Arc<GeoIndex>,
103
104    /// Exactly-once processing registry (v2.0 feature)
105    exactly_once: Arc<ExactlyOnceRegistry>,
106
107    /// Autonomous schema evolution manager (v2.0 feature)
108    schema_evolution: Arc<SchemaEvolutionManager>,
109
110    /// Per-entity version counters for optimistic concurrency control (v0.14 feature)
111    /// Key: entity_id string, Value: monotonic version (number of events for that entity)
112    entity_versions: Arc<DashMap<String, u64>>,
113
114    /// Durable consumer registry for subscription cursor tracking (v0.14 feature)
115    consumer_registry: Arc<ConsumerRegistry>,
116
117    /// In-process broadcast of every successfully-ingested event. Always enabled
118    /// so embedded consumers (TUI, web) can tail changes without the `server`
119    /// feature / HTTP stack. Lagging receivers see `RecvError::Lagged`.
120    event_broadcast_tx: tokio::sync::broadcast::Sender<Arc<Event>>,
121
122    /// Per-tenant lazy-load bookkeeping. Tracks which tenants have
123    /// been hydrated from Parquet into the in-memory pile and
124    /// serializes concurrent first-loads of the same tenant. See
125    /// `ensure_tenant_loaded` and Step 2 of the sustainable data
126    /// strategy.
127    tenant_loader: Arc<TenantLoader>,
128
129    /// Cadence of the runtime checkpoint loop (Step 6). `None` means
130    /// the loop is disabled — WAL grows until boot. Production reads
131    /// this from `ALLSOURCE_CHECKPOINT_INTERVAL_SECONDS`. Stored on
132    /// the store so background tasks can read it without re-parsing
133    /// the env.
134    checkpoint_interval_secs: Option<u64>,
135}
136
137/// A task queued for async webhook delivery
138#[cfg(feature = "server")]
139#[derive(Debug, Clone)]
140pub struct WebhookDeliveryTask {
141    pub webhook: crate::application::services::webhook::WebhookSubscription,
142    pub event: Event,
143}
144
145impl EventStore {
146    /// Create a new in-memory event store
147    pub fn new() -> Self {
148        Self::with_config(EventStoreConfig::default())
149    }
150
151    /// Create event store with custom configuration
152    pub fn with_config(config: EventStoreConfig) -> Self {
153        let mut projections = ProjectionManager::new();
154
155        // Register built-in projections
156        projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
157        projections.register(Arc::new(EventCounterProjection::new("event_counters")));
158
159        // Initialize persistent storage if configured
160        let storage = config
161            .storage_dir
162            .as_ref()
163            .and_then(|dir| match ParquetStorage::new(dir) {
164                Ok(storage) => {
165                    tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
166                    Some(Arc::new(RwLock::new(storage)))
167                }
168                Err(e) => {
169                    tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
170                    None
171                }
172            });
173
174        // Initialize WAL if configured (v0.2 feature)
175        let wal = config.wal_dir.as_ref().and_then(|dir| {
176            match WriteAheadLog::new(dir, config.wal_config.clone()) {
177                Ok(wal) => {
178                    tracing::info!("✅ WAL enabled at: {}", dir.display());
179                    Some(Arc::new(wal))
180                }
181                Err(e) => {
182                    tracing::error!("❌ Failed to initialize WAL: {}", e);
183                    None
184                }
185            }
186        });
187
188        // Initialize compaction manager if Parquet storage is enabled (v0.2 feature)
189        let compaction_manager = config.storage_dir.as_ref().map(|dir| {
190            let manager = CompactionManager::new(dir, config.compaction_config.clone());
191            Arc::new(manager)
192        });
193
194        // Initialize schema registry (v0.5 feature)
195        let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
196        tracing::info!("✅ Schema registry enabled");
197
198        // Initialize replay manager (v0.5 feature)
199        let replay_manager = Arc::new(ReplayManager::new());
200        tracing::info!("✅ Replay manager enabled");
201
202        // Initialize pipeline manager (v0.5 feature)
203        let pipeline_manager = Arc::new(PipelineManager::new());
204        tracing::info!("✅ Pipeline manager enabled");
205
206        // Initialize metrics registry (v0.6 feature)
207        #[cfg(feature = "server")]
208        let metrics = {
209            let m = MetricsRegistry::new();
210            tracing::info!("✅ Prometheus metrics registry initialized");
211            m
212        };
213
214        // Initialize projection state cache (v0.7 feature)
215        let projection_state_cache = Arc::new(DashMap::new());
216        tracing::info!("✅ Projection state cache initialized");
217
218        // Initialize webhook registry (v0.11 feature)
219        #[cfg(feature = "server")]
220        let webhook_registry = {
221            let w = Arc::new(WebhookRegistry::new());
222            tracing::info!("✅ Webhook registry initialized");
223            w
224        };
225
226        // Unconditional in-process event broadcaster so embedded consumers
227        // (TUI, web) can live-reload without the `server` feature.
228        let (event_broadcast_tx, _) = tokio::sync::broadcast::channel(1024);
229
230        let store = Self {
231            events: Arc::new(RwLock::new(Vec::new())),
232            index: Arc::new(EventIndex::new()),
233            projections: Arc::new(RwLock::new(projections)),
234            storage,
235            #[cfg(feature = "server")]
236            websocket_manager: Arc::new(WebSocketManager::new()),
237            snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
238            wal,
239            compaction_manager,
240            schema_registry,
241            replay_manager,
242            pipeline_manager,
243            #[cfg(feature = "server")]
244            metrics,
245            total_ingested: Arc::new(RwLock::new(0)),
246            projection_state_cache,
247            projection_status: Arc::new(DashMap::new()),
248            #[cfg(feature = "server")]
249            webhook_registry,
250            #[cfg(feature = "server")]
251            webhook_tx: Arc::new(RwLock::new(None)),
252            geo_index: Arc::new(GeoIndex::new()),
253            exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
254            schema_evolution: Arc::new(SchemaEvolutionManager::new()),
255            entity_versions: Arc::new(DashMap::new()),
256            consumer_registry: Arc::new(ConsumerRegistry::new()),
257            event_broadcast_tx,
258            tenant_loader: {
259                let loader = TenantLoader::new();
260                if let Some(budget) = config.cache_byte_budget {
261                    loader.set_byte_budget(budget);
262                    tracing::info!(
263                        "✅ Cache byte budget set to {} bytes ({:.2} GiB) — LRU eviction enabled",
264                        budget,
265                        budget as f64 / (1024.0 * 1024.0 * 1024.0)
266                    );
267                } else {
268                    tracing::info!(
269                        "✅ Cache budget unset — every loaded tenant stays resident \
270                         (set ALLSOURCE_CACHE_BYTES to enable eviction)"
271                    );
272                }
273                Arc::new(loader)
274            },
275            checkpoint_interval_secs: config.checkpoint_interval_secs,
276        };
277
278        // Boot is now O(1) regardless of dataset size (Step 2 of the
279        // sustainable data strategy). Pre-Step-2 we scanned every
280        // Parquet file at startup; on the production volume that
281        // grew past available memory and Core OOM'd during recovery
282        // (issue #160). Now:
283        //
284        //   - Parquet data stays on disk. Tenants are hydrated on
285        //     demand by `ensure_tenant_loaded`, called from the
286        //     query path on first access.
287        //   - WAL is still recovered eagerly. WAL is bounded
288        //     (rotates / truncates after each Parquet flush), so
289        //     replaying it is O(recent un-flushed writes) — small
290        //     by construction and required for correctness, since
291        //     those events aren't durably in Parquet yet.
292        //
293        // After WAL recovery we still checkpoint recovered events
294        // to Parquet and truncate the WAL. The lazy-load path's
295        // dedupe in `append_loaded_event` (index.get_by_id check)
296        // makes it safe for the same event to be reachable through
297        // both WAL recovery and a subsequent ensure_tenant_loaded
298        // pass on the same tenant.
299        if let Some(ref wal) = store.wal {
300            match wal.recover() {
301                Ok(recovered_events) if !recovered_events.is_empty() => {
302                    let mut wal_new = 0usize;
303                    for event in recovered_events {
304                        let offset = store.events.read().len();
305                        if let Err(e) = store.index.index_event(
306                            event.id,
307                            event.entity_id_str(),
308                            event.event_type_str(),
309                            event.timestamp,
310                            offset,
311                        ) {
312                            tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
313                        }
314
315                        if let Err(e) = store.projections.read().process_event(&event) {
316                            tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
317                        }
318
319                        *store
320                            .entity_versions
321                            .entry(event.entity_id_str().to_string())
322                            .or_insert(0) += 1;
323
324                        store.events.write().push(event);
325                        wal_new += 1;
326                    }
327
328                    // Step 6: expose replay size as a gauge so ops
329                    // can graph "how big was the last replay?" and
330                    // catch regressions where the checkpoint loop
331                    // stops draining the WAL.
332                    #[cfg(feature = "server")]
333                    store.metrics.wal_replay_events_total.set(wal_new as i64);
334
335                    if wal_new > 0 {
336                        let total = store.events.read().len();
337                        // total_ingested now reflects "events the
338                        // process knows about" rather than "events
339                        // ever written" — Parquet data isn't loaded
340                        // on boot, so the count grows as tenants
341                        // get hydrated. Consumers that need the
342                        // historical total should look at Parquet
343                        // file stats, not this counter.
344                        *store.total_ingested.write() = total as u64;
345                        tracing::info!(
346                            "✅ Recovered {} events from WAL (Parquet data stays cold until \
347                             first per-tenant query)",
348                            wal_new
349                        );
350
351                        // Checkpoint WAL events to Parquet — buffer
352                        // them into the per-tenant Parquet batches
353                        // first, then flush. Without this,
354                        // flush_storage() finds an empty current
355                        // batch and silently no-ops, the WAL gets
356                        // truncated, and the events exist only in
357                        // memory (lost on next restart).
358                        if let Some(ref storage) = store.storage {
359                            tracing::info!(
360                                "📸 Checkpointing {} WAL events to Parquet storage...",
361                                wal_new
362                            );
363                            let parquet = storage.read();
364                            let events = store.events.read();
365                            let mut buffered = 0usize;
366                            for event in events.iter().skip(events.len() - wal_new) {
367                                if let Err(e) = parquet.append_event(event.clone()) {
368                                    tracing::error!(
369                                        "Failed to buffer WAL event for Parquet: {}",
370                                        e
371                                    );
372                                } else {
373                                    buffered += 1;
374                                }
375                            }
376                            drop(events);
377                            drop(parquet);
378
379                            if buffered > 0 {
380                                if let Err(e) = store.flush_storage() {
381                                    tracing::error!("Failed to checkpoint to Parquet: {}", e);
382                                } else if let Err(e) = wal.truncate() {
383                                    tracing::error!(
384                                        "Failed to truncate WAL after checkpoint: {}",
385                                        e
386                                    );
387                                } else {
388                                    tracing::info!(
389                                        "✅ WAL checkpointed and truncated ({} events)",
390                                        buffered
391                                    );
392                                }
393                            }
394                        }
395                    }
396                }
397                Ok(_) => {
398                    tracing::debug!("No events to recover from WAL");
399                    #[cfg(feature = "server")]
400                    store.metrics.wal_replay_events_total.set(0);
401                }
402                Err(e) => {
403                    tracing::error!("❌ WAL recovery failed: {}", e);
404                }
405            }
406        } else if store.storage.is_some() {
407            tracing::info!(
408                "📂 Boot complete (lazy-load mode): Parquet data stays on disk until first \
409                 per-tenant query"
410            );
411        }
412
413        store
414    }
415
416    /// Ingest a new event with optional optimistic concurrency check.
417    ///
418    /// If `expected_version` is `Some(v)`, the write is rejected with
419    /// `VersionConflict` unless the entity's current version equals `v`.
420    /// The version check and WAL append are atomic (locked together).
421    ///
422    /// Returns the new entity version after the append.
423    #[cfg_attr(feature = "hotpath", hotpath::measure)]
424    pub fn ingest_with_expected_version(
425        &self,
426        event: &Event,
427        expected_version: Option<u64>,
428    ) -> Result<u64> {
429        // Validate event first (before any locking)
430        self.validate_event(event)?;
431
432        let entity_id = event.entity_id_str().to_string();
433
434        // Atomic version check + append: hold the DashMap entry lock
435        // to prevent TOCTOU races between check and write.
436        let new_version = {
437            let mut version_entry = self.entity_versions.entry(entity_id.clone()).or_insert(0);
438            let current = *version_entry;
439
440            if let Some(expected) = expected_version
441                && current != expected
442            {
443                return Err(crate::error::AllSourceError::VersionConflict { expected, current });
444            }
445
446            // Write to WAL FIRST for durability (under version lock to keep atomicity)
447            if let Some(ref wal) = self.wal {
448                wal.append(event.clone())?;
449            }
450
451            *version_entry += 1;
452            *version_entry
453        };
454
455        // From here on, the event is durable (WAL) and version is bumped.
456        // Continue with indexing, projections, storage, and broadcast.
457        self.ingest_post_wal(event)?;
458
459        Ok(new_version)
460    }
461
462    /// Post-WAL ingestion: index, projections, storage, broadcast.
463    /// Called after WAL append and version bump are complete.
464    #[cfg_attr(feature = "hotpath", hotpath::measure)]
465    fn ingest_post_wal(&self, event: &Event) -> Result<()> {
466        #[cfg(feature = "server")]
467        let timer = self.metrics.ingestion_duration_seconds.start_timer();
468
469        let mut events = self.events.write();
470        let offset = events.len();
471
472        // Index the event
473        self.index.index_event(
474            event.id,
475            event.entity_id_str(),
476            event.event_type_str(),
477            event.timestamp,
478            offset,
479        )?;
480
481        // Process through projections
482        let projections = self.projections.read();
483        projections.process_event(event)?;
484        drop(projections);
485
486        // Process through pipelines
487        let pipeline_results = self.pipeline_manager.process_event(event);
488        if !pipeline_results.is_empty() {
489            tracing::debug!(
490                "Event {} processed by {} pipeline(s)",
491                event.id,
492                pipeline_results.len()
493            );
494            for (pipeline_id, result) in pipeline_results {
495                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
496            }
497        }
498
499        // Persist to Parquet storage if enabled
500        if let Some(ref storage) = self.storage {
501            let storage = storage.read();
502            storage.append_event(event.clone())?;
503        }
504
505        // Store the event in memory
506        events.push(event.clone());
507        let total_events = events.len();
508        drop(events);
509
510        // Broadcast to in-process subscribers (always on) + optional WS.
511        let event_arc = Arc::new(event.clone());
512        let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
513        #[cfg(feature = "server")]
514        self.websocket_manager.broadcast_event(event_arc);
515
516        // Dispatch to matching webhook subscriptions
517        #[cfg(feature = "server")]
518        self.dispatch_webhooks(event);
519
520        // Update geospatial index
521        self.geo_index.index_event(event);
522
523        // Autonomous schema evolution
524        self.schema_evolution
525            .analyze_event(event.event_type_str(), &event.payload);
526
527        // Check if automatic snapshot should be created
528        self.check_auto_snapshot(event.entity_id_str(), event);
529
530        // Update metrics
531        #[cfg(feature = "server")]
532        {
533            self.metrics.events_ingested_total.inc();
534            self.metrics
535                .events_ingested_by_type
536                .with_label_values(&[event.event_type_str()])
537                .inc();
538            self.metrics.storage_events_total.set(total_events as i64);
539        }
540
541        // Update legacy total counter
542        let mut total = self.total_ingested.write();
543        *total += 1;
544
545        #[cfg(feature = "server")]
546        timer.observe_duration();
547
548        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
549
550        Ok(())
551    }
552
553    /// Ingest a new event into the store
554    #[cfg_attr(feature = "hotpath", hotpath::measure)]
555    pub fn ingest(&self, event: &Event) -> Result<()> {
556        // Start metrics timer (v0.6 feature)
557        #[cfg(feature = "server")]
558        let timer = self.metrics.ingestion_duration_seconds.start_timer();
559
560        // Validate event
561        let validation_result = self.validate_event(event);
562        if let Err(e) = validation_result {
563            #[cfg(feature = "server")]
564            {
565                self.metrics.ingestion_errors_total.inc();
566                timer.observe_duration();
567            }
568            return Err(e);
569        }
570
571        // Write to WAL FIRST for durability (v0.2 feature)
572        // This ensures event is persisted before processing
573        if let Some(ref wal) = self.wal
574            && let Err(e) = wal.append(event.clone())
575        {
576            #[cfg(feature = "server")]
577            {
578                self.metrics.ingestion_errors_total.inc();
579                timer.observe_duration();
580            }
581            return Err(e);
582        }
583
584        // Track per-entity version (unconditional increment, no version check)
585        *self
586            .entity_versions
587            .entry(event.entity_id_str().to_string())
588            .or_insert(0) += 1;
589
590        let mut events = self.events.write();
591        let offset = events.len();
592
593        // Index the event
594        self.index.index_event(
595            event.id,
596            event.entity_id_str(),
597            event.event_type_str(),
598            event.timestamp,
599            offset,
600        )?;
601
602        // Process through projections
603        let projections = self.projections.read();
604        projections.process_event(event)?;
605        drop(projections); // Release lock
606
607        // Process through pipelines (v0.5 feature)
608        // Pipelines can transform, filter, and aggregate events in real-time
609        let pipeline_results = self.pipeline_manager.process_event(event);
610        if !pipeline_results.is_empty() {
611            tracing::debug!(
612                "Event {} processed by {} pipeline(s)",
613                event.id,
614                pipeline_results.len()
615            );
616            // Pipeline results could be stored, emitted, or forwarded elsewhere
617            // For now, we just log them for observability
618            for (pipeline_id, result) in pipeline_results {
619                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
620            }
621        }
622
623        // Persist to Parquet storage if enabled (v0.2)
624        if let Some(ref storage) = self.storage {
625            let storage = storage.read();
626            storage.append_event(event.clone())?;
627        }
628
629        // Store the event in memory
630        events.push(event.clone());
631        let total_events = events.len();
632        drop(events); // Release lock early
633
634        // Broadcast to in-process subscribers + optional WS.
635        let event_arc = Arc::new(event.clone());
636        let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
637        #[cfg(feature = "server")]
638        self.websocket_manager.broadcast_event(event_arc);
639
640        // Dispatch to matching webhook subscriptions (v0.11 feature)
641        #[cfg(feature = "server")]
642        self.dispatch_webhooks(event);
643
644        // Update geospatial index (v2.0 feature)
645        self.geo_index.index_event(event);
646
647        // Autonomous schema evolution (v2.0 feature)
648        self.schema_evolution
649            .analyze_event(event.event_type_str(), &event.payload);
650
651        // Check if automatic snapshot should be created (v0.2 feature)
652        self.check_auto_snapshot(event.entity_id_str(), event);
653
654        // Update metrics (v0.6 feature)
655        #[cfg(feature = "server")]
656        {
657            self.metrics.events_ingested_total.inc();
658            self.metrics
659                .events_ingested_by_type
660                .with_label_values(&[event.event_type_str()])
661                .inc();
662            self.metrics.storage_events_total.set(total_events as i64);
663        }
664
665        // Update legacy total counter
666        let mut total = self.total_ingested.write();
667        *total += 1;
668
669        #[cfg(feature = "server")]
670        timer.observe_duration();
671
672        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
673
674        Ok(())
675    }
676
677    /// Ingest a batch of events with a single write lock acquisition.
678    ///
679    /// All events are validated first. If any event fails validation, no
680    /// events are stored (all-or-nothing validation). Events are then written
681    /// to WAL, indexed, processed through projections, and pushed to the
682    /// events vector under a single write lock.
683    #[cfg_attr(feature = "hotpath", hotpath::measure)]
684    pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()> {
685        if batch.is_empty() {
686            return Ok(());
687        }
688
689        // Phase 1: Validate all events before acquiring any locks
690        for event in &batch {
691            self.validate_event(event)?;
692        }
693
694        // Phase 2: Write all events to WAL (before write lock, for durability)
695        if let Some(ref wal) = self.wal {
696            for event in &batch {
697                wal.append(event.clone())?;
698            }
699        }
700
701        // Phase 3: Single write lock for index + projections + push
702        let mut events = self.events.write();
703        let projections = self.projections.read();
704
705        for event in batch {
706            let offset = events.len();
707
708            self.index.index_event(
709                event.id,
710                event.entity_id_str(),
711                event.event_type_str(),
712                event.timestamp,
713                offset,
714            )?;
715
716            projections.process_event(&event)?;
717            self.pipeline_manager.process_event(&event);
718
719            if let Some(ref storage) = self.storage {
720                let storage = storage.read();
721                storage.append_event(event.clone())?;
722            }
723
724            self.geo_index.index_event(&event);
725            self.schema_evolution
726                .analyze_event(event.event_type_str(), &event.payload);
727
728            // Track per-entity version
729            *self
730                .entity_versions
731                .entry(event.entity_id_str().to_string())
732                .or_insert(0) += 1;
733
734            // Broadcast to in-process subscribers
735            let _ = self.event_broadcast_tx.send(Arc::new(event.clone()));
736
737            events.push(event);
738        }
739
740        let total_events = events.len();
741        drop(projections);
742        drop(events);
743
744        let mut total = self.total_ingested.write();
745        *total += total_events as u64;
746
747        Ok(())
748    }
749
750    /// Ingest a replicated event from the leader (follower mode).
751    ///
752    /// Unlike `ingest()`, this method:
753    /// - Skips WAL writing (the follower's WalReceiver manages its own local WAL)
754    /// - Skips schema validation (the leader already validated)
755    /// - Still indexes, processes projections/pipelines, and broadcasts to WebSocket clients
756    #[cfg_attr(feature = "hotpath", hotpath::measure)]
757    pub fn ingest_replicated(&self, event: &Event) -> Result<()> {
758        #[cfg(feature = "server")]
759        let timer = self.metrics.ingestion_duration_seconds.start_timer();
760
761        let mut events = self.events.write();
762        let offset = events.len();
763
764        // Index the event
765        self.index.index_event(
766            event.id,
767            event.entity_id_str(),
768            event.event_type_str(),
769            event.timestamp,
770            offset,
771        )?;
772
773        // Process through projections
774        let projections = self.projections.read();
775        projections.process_event(event)?;
776        drop(projections);
777
778        // Process through pipelines
779        let pipeline_results = self.pipeline_manager.process_event(event);
780        if !pipeline_results.is_empty() {
781            tracing::debug!(
782                "Replicated event {} processed by {} pipeline(s)",
783                event.id,
784                pipeline_results.len()
785            );
786        }
787
788        // Track per-entity version
789        *self
790            .entity_versions
791            .entry(event.entity_id_str().to_string())
792            .or_insert(0) += 1;
793
794        // Store the event in memory
795        events.push(event.clone());
796        let total_events = events.len();
797        drop(events);
798
799        // Broadcast to in-process subscribers + optional WS.
800        let event_arc = Arc::new(event.clone());
801        let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
802        #[cfg(feature = "server")]
803        self.websocket_manager.broadcast_event(event_arc);
804
805        // Update metrics
806        #[cfg(feature = "server")]
807        {
808            self.metrics.events_ingested_total.inc();
809            self.metrics
810                .events_ingested_by_type
811                .with_label_values(&[event.event_type_str()])
812                .inc();
813            self.metrics.storage_events_total.set(total_events as i64);
814        }
815
816        let mut total = self.total_ingested.write();
817        *total += 1;
818
819        #[cfg(feature = "server")]
820        timer.observe_duration();
821
822        tracing::debug!(
823            "Replicated event ingested: {} (offset: {})",
824            event.id,
825            offset
826        );
827
828        Ok(())
829    }
830
831    /// Get the current version for an entity (number of events appended for it).
832    /// Returns 0 if the entity has no events.
833    #[cfg_attr(feature = "hotpath", hotpath::measure)]
834    pub fn get_entity_version(&self, entity_id: &str) -> u64 {
835        self.entity_versions.get(entity_id).map_or(0, |v| *v)
836    }
837
838    /// Get the consumer registry for durable subscriptions.
839    pub fn consumer_registry(&self) -> &ConsumerRegistry {
840        &self.consumer_registry
841    }
842
843    /// Subscribe to every successfully-ingested event in this store.
844    ///
845    /// Returns a `tokio::sync::broadcast::Receiver` that yields an `Arc<Event>`
846    /// for each ingest. Always available — does not require the `server`
847    /// feature. Lagging receivers surface `RecvError::Lagged`.
848    pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<Arc<Event>> {
849        self.event_broadcast_tx.subscribe()
850    }
851
852    /// Replace the default in-memory consumer registry with a durable one.
853    ///
854    /// Called during startup when system repositories are available, so that
855    /// consumer cursors survive Core restarts via WAL persistence.
856    pub fn set_consumer_registry(&mut self, registry: Arc<ConsumerRegistry>) {
857        self.consumer_registry = registry;
858    }
859
860    /// Get the total number of events in the store (used as max offset for consumer ack).
861    pub fn total_events(&self) -> usize {
862        self.events.read().len()
863    }
864
865    /// Get events after a given offset, optionally filtered by event type prefixes.
866    /// Used by consumer polling to fetch unprocessed events.
867    pub fn events_after_offset(
868        &self,
869        offset: u64,
870        filters: &[String],
871        limit: usize,
872    ) -> Vec<(u64, Event)> {
873        let events = self.events.read();
874        let start = offset as usize;
875        if start >= events.len() {
876            return vec![];
877        }
878
879        events[start..]
880            .iter()
881            .enumerate()
882            .filter(|(_, event)| ConsumerRegistry::matches_filters(event.event_type_str(), filters))
883            .take(limit)
884            .map(|(i, event)| ((start + i + 1) as u64, event.clone()))
885            .collect()
886    }
887
888    /// Get the WebSocket manager for this store
889    #[cfg(feature = "server")]
890    pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
891        Arc::clone(&self.websocket_manager)
892    }
893
894    /// Get the snapshot manager for this store
895    pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
896        Arc::clone(&self.snapshot_manager)
897    }
898
899    /// Get the compaction manager for this store
900    pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
901        self.compaction_manager.as_ref().map(Arc::clone)
902    }
903
904    /// Get the schema registry for this store (v0.5 feature)
905    pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
906        Arc::clone(&self.schema_registry)
907    }
908
909    /// Get the replay manager for this store (v0.5 feature)
910    pub fn replay_manager(&self) -> Arc<ReplayManager> {
911        Arc::clone(&self.replay_manager)
912    }
913
914    /// Get the pipeline manager for this store (v0.5 feature)
915    pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
916        Arc::clone(&self.pipeline_manager)
917    }
918
919    /// Get the metrics registry for this store (v0.6 feature)
920    #[cfg(feature = "server")]
921    pub fn metrics(&self) -> Arc<MetricsRegistry> {
922        Arc::clone(&self.metrics)
923    }
924
925    /// Get the projection manager for this store (v0.7 feature)
926    pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
927        self.projections.read()
928    }
929
930    /// Register a custom projection at runtime.
931    ///
932    /// The projection will receive all future events via `process()`.
933    /// Historical events are **not** replayed — only events ingested after
934    /// registration will be processed by this projection.
935    ///
936    /// See [`register_projection_with_backfill`](Self::register_projection_with_backfill)
937    /// to also process historical events.
938    pub fn register_projection(
939        &self,
940        projection: Arc<dyn crate::application::services::projection::Projection>,
941    ) {
942        let mut pm = self.projections.write();
943        pm.register(projection);
944    }
945
946    /// Register a custom projection and replay all existing events through it.
947    ///
948    /// After registration, the projection will also receive all future events.
949    /// Historical events are replayed under a read lock — the projection's
950    /// internal state (typically DashMap) handles concurrent access.
951    pub fn register_projection_with_backfill(
952        &self,
953        projection: &Arc<dyn crate::application::services::projection::Projection>,
954    ) -> Result<()> {
955        // First register so future events are processed
956        {
957            let mut pm = self.projections.write();
958            pm.register(Arc::clone(projection));
959        }
960
961        // Then replay existing events under read lock
962        let events = self.events.read();
963        for event in events.iter() {
964            projection.process(event)?;
965        }
966
967        Ok(())
968    }
969
970    /// Get the projection state cache for this store (v0.7 feature)
971    /// Used by Elixir Query Service for state synchronization
972    pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
973        Arc::clone(&self.projection_state_cache)
974    }
975
976    /// Get the projection status map (v0.13 feature)
977    pub fn projection_status(&self) -> Arc<DashMap<String, String>> {
978        Arc::clone(&self.projection_status)
979    }
980
981    /// Get the webhook registry for this store (v0.11 feature)
982    /// Geospatial index for coordinate-based queries (v2.0 feature)
983    pub fn geo_index(&self) -> Arc<GeoIndex> {
984        self.geo_index.clone()
985    }
986
987    /// Exactly-once processing registry (v2.0 feature)
988    pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
989        self.exactly_once.clone()
990    }
991
992    /// Schema evolution manager (v2.0 feature)
993    pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
994        self.schema_evolution.clone()
995    }
996
997    /// Get a read-locked snapshot of all events (for EventQL/GraphQL queries).
998    ///
999    /// Returns an `Arc` reference to the internal events vec, avoiding a full
1000    /// clone. The caller holds a read lock for the duration of the `Arc`
1001    /// lifetime — prefer short-lived usage.
1002    pub fn snapshot_events(&self) -> Vec<Event> {
1003        self.events.read().clone()
1004    }
1005
1006    /// Compact token events for an entity by replacing matching events with a
1007    /// single merged event. Used by the embedded streaming feature.
1008    ///
1009    /// Returns `Ok(true)` if compaction was performed, `Ok(false)` if no
1010    /// matching events were found.
1011    ///
1012    /// **Note:** The merged event is processed through projections *without*
1013    /// clearing the removed events' projection state first. Projections that
1014    /// accumulate state (e.g., counters) should be designed to handle this
1015    /// (the merged event replaces individual tokens, not adds to them).
1016    ///
1017    /// **Crash safety:** The WAL append happens *after* the in-memory swap
1018    /// under the write lock. If the process crashes before the WAL write,
1019    /// no change is persisted — WAL replay restores the pre-compaction state.
1020    /// If the process crashes after the WAL write, replay sees the merged
1021    /// event (and the original tokens, which are idempotent to replay since
1022    /// the merged event supersedes them).
1023    ///
1024    /// The write lock is held for the swap + WAL write + index rebuild.
1025    /// The index rebuild is O(N) over all events, which is acceptable for
1026    /// embedded workloads but should not be called in hot paths for large stores.
1027    pub fn compact_entity_tokens(
1028        &self,
1029        entity_id: &str,
1030        token_event_type: &str,
1031        merged_event: Event,
1032    ) -> Result<bool> {
1033        // Phase 1: Read-only check — do we have anything to compact?
1034        {
1035            let events = self.events.read();
1036            let has_tokens = events
1037                .iter()
1038                .any(|e| e.entity_id_str() == entity_id && e.event_type_str() == token_event_type);
1039            if !has_tokens {
1040                return Ok(false);
1041            }
1042        }
1043
1044        // Phase 2: Process merged event through projections (no write lock held)
1045        let projections = self.projections.read();
1046        projections.process_event(&merged_event)?;
1047        drop(projections);
1048
1049        // Phase 3: Acquire write lock for the swap + WAL + index rebuild
1050        let mut events = self.events.write();
1051
1052        events.retain(|e| {
1053            !(e.entity_id_str() == entity_id && e.event_type_str() == token_event_type)
1054        });
1055
1056        events.push(merged_event.clone());
1057
1058        // WAL append inside write lock: crash before this line = no change persisted.
1059        // Crash after = merged event in WAL, original tokens also in WAL but
1060        // superseded by the merged event's entity_id + event_type.
1061        if let Some(ref wal) = self.wal {
1062            wal.append(merged_event)?;
1063        }
1064
1065        // Rebuild entire index since retain() shifted event positions.
1066        // Errors here indicate a corrupt event (missing entity_id/event_type)
1067        // which should not happen for well-formed events. Log and continue
1068        // rather than failing the entire compaction.
1069        self.index.clear();
1070        for (offset, event) in events.iter().enumerate() {
1071            if let Err(e) = self.index.index_event(
1072                event.id,
1073                event.entity_id_str(),
1074                event.event_type_str(),
1075                event.timestamp,
1076                offset,
1077            ) {
1078                tracing::warn!(
1079                    event_id = %event.id,
1080                    offset,
1081                    "Failed to re-index event during compaction: {e}"
1082                );
1083            }
1084        }
1085
1086        Ok(true)
1087    }
1088
1089    #[cfg(feature = "server")]
1090    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
1091        Arc::clone(&self.webhook_registry)
1092    }
1093
1094    /// Set the channel for async webhook delivery.
1095    /// Called during server startup to wire the delivery worker.
1096    #[cfg(feature = "server")]
1097    pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
1098        *self.webhook_tx.write() = Some(tx);
1099        tracing::info!("Webhook delivery channel connected");
1100    }
1101
1102    /// Dispatch matching webhooks for a given event (non-blocking).
1103    #[cfg(feature = "server")]
1104    fn dispatch_webhooks(&self, event: &Event) {
1105        let matching = self.webhook_registry.find_matching(event);
1106        if matching.is_empty() {
1107            return;
1108        }
1109
1110        let tx_guard = self.webhook_tx.read();
1111        if let Some(ref tx) = *tx_guard {
1112            for webhook in matching {
1113                let task = WebhookDeliveryTask {
1114                    webhook,
1115                    event: event.clone(),
1116                };
1117                if let Err(e) = tx.send(task) {
1118                    tracing::warn!("Failed to queue webhook delivery: {}", e);
1119                }
1120            }
1121        }
1122    }
1123
1124    /// Manually flush any pending events to persistent storage
1125    pub fn flush_storage(&self) -> Result<()> {
1126        if let Some(ref storage) = self.storage {
1127            let storage = storage.read();
1128            storage.flush()?;
1129            tracing::info!("✅ Flushed events to persistent storage");
1130        }
1131        Ok(())
1132    }
1133
1134    /// Run a checkpoint: flush pending Parquet batches, then truncate the
1135    /// WAL through the checkpoint point (Step 6 of the sustainable data
1136    /// strategy).
1137    ///
1138    /// Order matters. We flush Parquet first; only on success do we
1139    /// truncate the WAL. If the process crashes between the flush and the
1140    /// truncate, the WAL still contains the events that were just durably
1141    /// written, and recovery will replay them. The dedupe in
1142    /// `append_loaded_event` (index probe) makes that idempotent — the
1143    /// event is already in Parquet, so the lazy-load splice no-ops once
1144    /// the tenant is hydrated.
1145    ///
1146    /// The reverse order would be unsafe: a crash between truncate and
1147    /// flush would lose committed events.
1148    ///
1149    /// This bounds dirty-restart replay time to one checkpoint interval
1150    /// regardless of total dataset size — that's the load-bearing
1151    /// property for cold-start time as ingest rate grows.
1152    ///
1153    /// No-op when no WAL is configured (in-memory-only mode).
1154    pub fn checkpoint(&self) -> Result<()> {
1155        let Some(ref wal) = self.wal else {
1156            return Ok(());
1157        };
1158
1159        // Flush before recording the truncation target — both
1160        // because flush() may rotate the WAL underneath us and
1161        // because we want the truncate point to reflect what's
1162        // actually durable on disk.
1163        self.flush_storage()?;
1164        wal.truncate()?;
1165        tracing::debug!("✅ Checkpoint complete: Parquet flushed, WAL truncated");
1166        Ok(())
1167    }
1168
1169    /// Get the configured checkpoint cadence (used by background tasks).
1170    pub fn checkpoint_interval(&self) -> Option<std::time::Duration> {
1171        self.checkpoint_interval_secs
1172            .map(std::time::Duration::from_secs)
1173    }
1174
1175    /// Hydrate `tenant_id`'s persisted Parquet data into the in-memory
1176    /// pile if it isn't already loaded. Cheap on the warm path
1177    /// (DashMap probe); on the cold path it walks just that tenant's
1178    /// subtree (`load_events_for_tenant`) and splices the events into
1179    /// `events`/`index`/`projections`/`entity_versions`.
1180    ///
1181    /// Concurrent first-callers for the same tenant serialize on a
1182    /// per-tenant Mutex (singleflight) so the disk read happens once.
1183    /// Other tenants are unaffected — distinct lock per tenant.
1184    ///
1185    /// Returns `Err` if the tenant_id fails the path-safety
1186    /// whitelist, the Parquet read fails, or another in-flight load
1187    /// holds the lock past the configured timeout. The caller (a
1188    /// query handler) is expected to surface that as a 5xx — see
1189    /// Step 2's "no infinite hangs" acceptance criterion.
1190    ///
1191    /// On failure, `loaded` is NOT marked, so a transient error is
1192    /// retried on the next request rather than poisoning the tenant
1193    /// permanently. A future commit may add a circuit breaker if
1194    /// thrash becomes an issue.
1195    ///
1196    /// No-op (and Ok) when no Parquet storage is configured — the
1197    /// in-memory-only mode used by tests has nothing to hydrate.
1198    pub fn ensure_tenant_loaded(&self, tenant_id: &str) -> Result<()> {
1199        // Fast path: warm tenant. Avoids the Mutex altogether.
1200        if self.tenant_loader.is_loaded(tenant_id) {
1201            return Ok(());
1202        }
1203
1204        let Some(storage) = self.storage.as_ref().map(Arc::clone) else {
1205            // No persistent storage to load from. Mark loaded so we
1206            // don't keep re-entering the slow path.
1207            self.tenant_loader.mark_loaded(tenant_id);
1208            return Ok(());
1209        };
1210
1211        // Singleflight: get-or-insert the per-tenant lock and try to
1212        // acquire it within the timeout budget.
1213        let lock = self.tenant_loader.lock_for(tenant_id);
1214        let timeout = self.tenant_loader.load_timeout();
1215        let _guard = lock.try_lock_for(timeout).ok_or_else(|| {
1216            AllSourceError::StorageError(format!(
1217                "ensure_tenant_loaded timed out after {timeout:?} waiting for in-flight load of \
1218                 tenant {tenant_id:?}"
1219            ))
1220        })?;
1221
1222        // Re-check inside the lock — another thread may have completed
1223        // the load while we were waiting.
1224        if self.tenant_loader.is_loaded(tenant_id) {
1225            return Ok(());
1226        }
1227
1228        let started = std::time::Instant::now();
1229        let events = storage.read().load_events_for_tenant(tenant_id)?;
1230        let read_count = events.len();
1231
1232        let before = self.events.read().len();
1233        for event in events {
1234            self.append_loaded_event(event);
1235        }
1236        let applied = self.events.read().len() - before;
1237
1238        // total_ingested only counts events newly added to memory.
1239        // Dedupe (e.g. WAL events re-checkpointed to Parquet) makes
1240        // applied < read_count possible.
1241        *self.total_ingested.write() += applied as u64;
1242        self.tenant_loader.mark_loaded(tenant_id);
1243
1244        tracing::info!(
1245            tenant_id = tenant_id,
1246            read = read_count,
1247            applied = applied,
1248            elapsed_ms = started.elapsed().as_millis() as u64,
1249            "ensure_tenant_loaded: tenant hydrated"
1250        );
1251
1252        // Budget check (Step 3 #3). After splicing the new
1253        // tenant in, evict LRU tenants until we're back under
1254        // budget. Excludes the just-loaded tenant from the
1255        // candidate set — otherwise a single oversized tenant
1256        // would evict itself in a tight loop. If no other tenant
1257        // is loaded, accept the over-budget state and log a
1258        // warning so ops can see it.
1259        self.enforce_cache_budget(tenant_id);
1260
1261        // Refresh the resident-bytes gauge — covers both the
1262        // load-with-no-eviction case and the post-eviction state.
1263        #[cfg(feature = "server")]
1264        self.metrics
1265            .cache_bytes
1266            .set(self.tenant_loader.total_bytes() as i64);
1267
1268        Ok(())
1269    }
1270
1271    /// Walks the LRU until total resident bytes are within the
1272    /// configured budget, calling `evict_tenant` on each victim.
1273    /// Excludes `recently_touched` from the candidate set so we
1274    /// don't evict the tenant that just triggered the call. Step 3
1275    /// #3 entry point.
1276    ///
1277    /// No-op when no budget is set or when already under budget —
1278    /// most queries take the early-return fast path. Eviction is
1279    /// the cold path; with a well-sized budget this only fires
1280    /// during warm-up of new tenants past the working-set size.
1281    fn enforce_cache_budget(&self, recently_touched: &str) {
1282        if !self.tenant_loader.over_budget() {
1283            return;
1284        }
1285        loop {
1286            let Some(victim) = self.tenant_loader.pick_lru_excluding(recently_touched) else {
1287                tracing::warn!(
1288                    cache_bytes = self.tenant_loader.total_bytes(),
1289                    budget = self.tenant_loader.byte_budget(),
1290                    recently_touched = recently_touched,
1291                    "cache over budget but no other tenant available to evict — \
1292                     a single tenant exceeds the budget; consider raising it"
1293                );
1294                return;
1295            };
1296            self.evict_tenant(&victim);
1297            if !self.tenant_loader.over_budget() {
1298                return;
1299            }
1300        }
1301    }
1302
1303    /// True iff `ensure_tenant_loaded` has previously succeeded for
1304    /// this tenant. Diagnostic / testing API.
1305    pub fn is_tenant_loaded(&self, tenant_id: &str) -> bool {
1306        self.tenant_loader.is_loaded(tenant_id)
1307    }
1308
1309    /// Drop `tenant_id` from the in-memory cache. Step 3 #2 of the
1310    /// sustainable data strategy.
1311    ///
1312    /// Removes every event for this tenant from the events Vec,
1313    /// rebuilds the index/entity_versions for the retained events
1314    /// (Vec offsets shift on remove, so the index has to be
1315    /// rebuilt), and resets the tenant_loader bookkeeping so a
1316    /// subsequent query triggers a fresh `ensure_tenant_loaded`.
1317    ///
1318    /// **Parquet is canonical, in-memory is just cache.** This
1319    /// only affects the in-memory side. Disk data is untouched —
1320    /// that's why eviction is safe even for tenants with
1321    /// recently-ingested data: a query after eviction transparently
1322    /// re-reads from Parquet.
1323    ///
1324    /// Projection state is NOT rolled back. Projections accumulate
1325    /// across boots and tenants (their durability story is
1326    /// separate); subtracting them would need replay support that
1327    /// doesn't exist in this commit. After eviction + re-load,
1328    /// projections may double-count the re-loaded events. Step 3
1329    /// #4's stress test only asserts the cache budget is held; a
1330    /// future commit will tackle projection-aware eviction.
1331    ///
1332    /// Locking: takes the events write lock for the full duration
1333    /// of the filter + re-index. Concurrent ingest blocks until
1334    /// done. Eviction is the cold path; the working set should
1335    /// stay in budget so this rarely fires.
1336    pub fn evict_tenant(&self, tenant_id: &str) {
1337        let mut events = self.events.write();
1338        let before = events.len();
1339        let evicted_bytes = self.tenant_loader.bytes_for(tenant_id);
1340
1341        events.retain(|e| e.tenant_id_str() != tenant_id);
1342        let after = events.len();
1343        let dropped = before - after;
1344
1345        if dropped == 0 {
1346            // Tenant had no events in memory. Still clear loader
1347            // state (e.g. a "loaded with zero events" marker) so
1348            // is_tenant_loaded reports the right thing.
1349            drop(events);
1350            self.tenant_loader.mark_unloaded(tenant_id);
1351            return;
1352        }
1353
1354        // Rebuild the index — Vec offsets shifted under retain().
1355        // Rebuild entity_versions from scratch too, since the
1356        // counter reflects "how many events of this entity remain".
1357        self.index.clear();
1358        self.entity_versions.clear();
1359        for (offset, event) in events.iter().enumerate() {
1360            if let Err(e) = self.index.index_event(
1361                event.id,
1362                event.entity_id_str(),
1363                event.event_type_str(),
1364                event.timestamp,
1365                offset,
1366            ) {
1367                tracing::error!(
1368                    "Failed to re-index event during eviction of {}: {}",
1369                    tenant_id,
1370                    e
1371                );
1372            }
1373            *self
1374                .entity_versions
1375                .entry(event.entity_id_str().to_string())
1376                .or_insert(0) += 1;
1377        }
1378        drop(events);
1379
1380        self.tenant_loader.mark_unloaded(tenant_id);
1381
1382        // total_ingested under Steps 2-3 means "events currently
1383        // resident in memory". Subtract what we just dropped.
1384        let mut t = self.total_ingested.write();
1385        *t = t.saturating_sub(dropped as u64);
1386        drop(t);
1387
1388        // Step 3 #4: cache observability. Increment the eviction
1389        // counter, refresh the resident-bytes gauge.
1390        #[cfg(feature = "server")]
1391        {
1392            self.metrics.cache_evictions_total.inc();
1393            self.metrics
1394                .cache_bytes
1395                .set(self.tenant_loader.total_bytes() as i64);
1396        }
1397
1398        tracing::info!(
1399            tenant_id = tenant_id,
1400            events_dropped = dropped,
1401            bytes_freed = evicted_bytes,
1402            "evicted tenant from memory cache"
1403        );
1404    }
1405
1406    /// Approximate resident bytes a single tenant occupies in the
1407    /// in-memory cache. Step 3 budget-tracking input. 0 for cold
1408    /// or evicted tenants.
1409    pub fn tenant_resident_bytes(&self, tenant_id: &str) -> u64 {
1410        self.tenant_loader.bytes_for(tenant_id)
1411    }
1412
1413    /// Sum of resident-byte estimates across every loaded tenant.
1414    /// What the budget check compares against.
1415    pub fn cache_resident_bytes(&self) -> u64 {
1416        self.tenant_loader.total_bytes()
1417    }
1418
1419    /// Splice a single loaded event into the in-memory structures
1420    /// (events vec, index, projections, entity_versions) atomically
1421    /// w.r.t. concurrent ingest. Used by `ensure_tenant_loaded`.
1422    ///
1423    /// The WAL recovery path on boot has its own (single-threaded)
1424    /// variant inline because boot can't race with ingest. This
1425    /// helper is the variant safe to call while traffic is flowing
1426    /// — it holds the events write lock across the index/offset
1427    /// assignment so (offset, push) stays atomic.
1428    ///
1429    /// Dedupes against events already in memory by event ID. Two
1430    /// paths can surface the same event:
1431    /// 1. WAL recovery on boot pushed it into memory.
1432    /// 2. The event was then checkpointed to Parquet and the
1433    ///    WAL truncated. A later ensure_tenant_loaded re-reads
1434    ///    the Parquet file, including this event.
1435    ///
1436    /// Without the dedupe, step 2 would double-count the event.
1437    /// The check is O(1) — DashMap probe by UUID — and the
1438    /// alternative (loading every tenant before truncating WAL)
1439    /// would defeat the lazy-load.
1440    fn append_loaded_event(&self, event: Event) {
1441        if self.index.get_by_id(&event.id).is_some() {
1442            return;
1443        }
1444
1445        let event_bytes = event.estimated_size_bytes();
1446        let tenant = event.tenant_id_str().to_string();
1447
1448        let mut events = self.events.write();
1449        let offset = events.len();
1450
1451        if let Err(e) = self.index.index_event(
1452            event.id,
1453            event.entity_id_str(),
1454            event.event_type_str(),
1455            event.timestamp,
1456            offset,
1457        ) {
1458            tracing::error!("Failed to index loaded event {}: {}", event.id, e);
1459        }
1460
1461        if let Err(e) = self.projections.read().process_event(&event) {
1462            tracing::error!("Failed to project loaded event {}: {}", event.id, e);
1463        }
1464
1465        *self
1466            .entity_versions
1467            .entry(event.entity_id_str().to_string())
1468            .or_insert(0) += 1;
1469
1470        events.push(event);
1471        // Account for the bytes AFTER the push so a panic in the
1472        // index/projection path doesn't leave the counter inflated.
1473        // The DashMap update is itself the last fallible step.
1474        self.tenant_loader.add_bytes(&tenant, event_bytes);
1475    }
1476
1477    /// Manually create a snapshot for an entity
1478    pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
1479        // Get all events for this entity
1480        let events = self.query(&QueryEventsRequest {
1481            entity_id: Some(entity_id.to_string()),
1482            event_type: None,
1483            tenant_id: None,
1484            as_of: None,
1485            since: None,
1486            until: None,
1487            limit: None,
1488            event_type_prefix: None,
1489            payload_filter: None,
1490        })?;
1491
1492        if events.is_empty() {
1493            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1494        }
1495
1496        // Build current state
1497        let mut state = serde_json::json!({});
1498        for event in &events {
1499            if let serde_json::Value::Object(ref mut state_map) = state
1500                && let serde_json::Value::Object(ref payload_map) = event.payload
1501            {
1502                for (key, value) in payload_map {
1503                    state_map.insert(key.clone(), value.clone());
1504                }
1505            }
1506        }
1507
1508        let last_event = events.last().unwrap();
1509        self.snapshot_manager.create_snapshot(
1510            entity_id,
1511            state,
1512            last_event.timestamp,
1513            events.len(),
1514            SnapshotType::Manual,
1515        )?;
1516
1517        Ok(())
1518    }
1519
1520    /// Check and create automatic snapshots if needed
1521    fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
1522        // Count events for this entity
1523        let entity_event_count = self
1524            .index
1525            .get_by_entity(entity_id)
1526            .map_or(0, |entries| entries.len());
1527
1528        if self.snapshot_manager.should_create_snapshot(
1529            entity_id,
1530            entity_event_count,
1531            event.timestamp,
1532        ) {
1533            // Create snapshot in background (don't block ingestion)
1534            if let Err(e) = self.create_snapshot(entity_id) {
1535                tracing::warn!(
1536                    "Failed to create automatic snapshot for {}: {}",
1537                    entity_id,
1538                    e
1539                );
1540            }
1541        }
1542    }
1543
1544    /// Validate an event before ingestion
1545    fn validate_event(&self, event: &Event) -> Result<()> {
1546        // EntityId and EventType value objects already validate non-empty in their constructors
1547        // So these checks are now redundant, but we keep them for explicit validation
1548        if event.entity_id_str().is_empty() {
1549            return Err(AllSourceError::ValidationError(
1550                "entity_id cannot be empty".to_string(),
1551            ));
1552        }
1553
1554        if event.event_type_str().is_empty() {
1555            return Err(AllSourceError::ValidationError(
1556                "event_type cannot be empty".to_string(),
1557            ));
1558        }
1559
1560        // Reject system namespace events from user-facing ingestion.
1561        // System events are written exclusively via SystemMetadataStore.
1562        if event.event_type().is_system() {
1563            return Err(AllSourceError::ValidationError(
1564                "Event types starting with '_system.' are reserved for internal use".to_string(),
1565            ));
1566        }
1567
1568        Ok(())
1569    }
1570
1571    /// Reset a projection by clearing its state and reprocessing all events
1572    pub fn reset_projection(&self, name: &str) -> Result<usize> {
1573        let projection_manager = self.projections.read();
1574        let projection = projection_manager.get_projection(name).ok_or_else(|| {
1575            AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1576        })?;
1577
1578        // Clear existing state
1579        projection.clear();
1580
1581        // Clear cached state for this projection
1582        let prefix = format!("{name}:");
1583        let keys_to_remove: Vec<String> = self
1584            .projection_state_cache
1585            .iter()
1586            .filter(|entry| entry.key().starts_with(&prefix))
1587            .map(|entry| entry.key().clone())
1588            .collect();
1589        for key in keys_to_remove {
1590            self.projection_state_cache.remove(&key);
1591        }
1592
1593        // Reprocess all events through this projection
1594        let events = self.events.read();
1595        let mut reprocessed = 0usize;
1596        for event in events.iter() {
1597            if projection.process(event).is_ok() {
1598                reprocessed += 1;
1599            }
1600        }
1601
1602        Ok(reprocessed)
1603    }
1604
1605    /// Get a single event by its UUID
1606    pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
1607        if let Some(offset) = self.index.get_by_id(event_id) {
1608            let events = self.events.read();
1609            Ok(events.get(offset).cloned())
1610        } else {
1611            Ok(None)
1612        }
1613    }
1614
1615    /// Query events based on filters (optimized with indices)
1616    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1617    pub fn query(&self, request: &QueryEventsRequest) -> Result<Vec<Event>> {
1618        // Lazy-load gate (Step 2): if the request scopes to a tenant,
1619        // make sure that tenant's persisted data is in memory before
1620        // running the in-memory index lookup. First call for a cold
1621        // tenant blocks here for the disk read (single-digit seconds
1622        // on ~100k events); warm tenants take the DashMap fast path
1623        // and add no measurable latency.
1624        //
1625        // Errors propagate as `Err`; the HTTP layer turns that into
1626        // a 5xx, which is the explicit "no infinite hangs" contract
1627        // from the Step 2 acceptance criteria.
1628        //
1629        // Unfiltered (cross-tenant) queries — `tenant_id = None` —
1630        // run against whatever is currently in memory. They cannot
1631        // pre-load every tenant without defeating the whole point
1632        // of the lazy-load model. In practice the gateway always
1633        // injects an auth-derived `tenant_id`; an unfiltered query
1634        // is admin-only and gets degraded results until a future
1635        // commit adds an explicit "load all tenants" admin path.
1636        if let Some(ref tenant_id) = request.tenant_id {
1637            self.ensure_tenant_loaded(tenant_id)?;
1638            // LRU touch — the most-recently-queried tenant moves
1639            // to the back of the eviction queue. Cheap (single
1640            // DashMap insert), called on every per-tenant query.
1641            self.tenant_loader.touch(tenant_id);
1642        }
1643
1644        // Determine query type for metrics (v0.6 feature)
1645        let query_type = if request.entity_id.is_some() {
1646            "entity"
1647        } else if request.event_type.is_some() {
1648            "type"
1649        } else if request.event_type_prefix.is_some() {
1650            "type_prefix"
1651        } else {
1652            "full_scan"
1653        };
1654
1655        // Start metrics timer (v0.6 feature)
1656        #[cfg(feature = "server")]
1657        let timer = self
1658            .metrics
1659            .query_duration_seconds
1660            .with_label_values(&[query_type])
1661            .start_timer();
1662
1663        // Increment query counter (v0.6 feature)
1664        #[cfg(feature = "server")]
1665        self.metrics
1666            .queries_total
1667            .with_label_values(&[query_type])
1668            .inc();
1669
1670        let events = self.events.read();
1671
1672        // Use index for fast lookups
1673        let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
1674            // Use entity index
1675            self.index
1676                .get_by_entity(entity_id)
1677                .map(|entries| self.filter_entries(entries, request))
1678                .unwrap_or_default()
1679        } else if let Some(event_type) = &request.event_type {
1680            // Use type index (exact match)
1681            self.index
1682                .get_by_type(event_type)
1683                .map(|entries| self.filter_entries(entries, request))
1684                .unwrap_or_default()
1685        } else if let Some(prefix) = &request.event_type_prefix {
1686            // Use type index (prefix match)
1687            let entries = self.index.get_by_type_prefix(prefix);
1688            self.filter_entries(entries, request)
1689        } else {
1690            // Full scan (less efficient but necessary for complex queries)
1691            (0..events.len()).collect()
1692        };
1693
1694        // Fetch events and apply remaining filters
1695        let mut results: Vec<Event> = offsets
1696            .iter()
1697            .filter_map(|&offset| events.get(offset).cloned())
1698            .filter(|event| self.apply_filters(event, request))
1699            .collect();
1700
1701        // Sort by timestamp (ascending)
1702        results.sort_by_key(|x| x.timestamp);
1703
1704        // Apply limit
1705        if let Some(limit) = request.limit {
1706            results.truncate(limit);
1707        }
1708
1709        // Record query results count (v0.6 feature)
1710        #[cfg(feature = "server")]
1711        {
1712            self.metrics
1713                .query_results_total
1714                .with_label_values(&[query_type])
1715                .inc_by(results.len() as u64);
1716            timer.observe_duration();
1717        }
1718
1719        Ok(results)
1720    }
1721
1722    /// Filter index entries based on query parameters
1723    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1724    fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
1725        entries
1726            .into_iter()
1727            .filter(|entry| {
1728                // Time filters
1729                if let Some(as_of) = request.as_of
1730                    && entry.timestamp > as_of
1731                {
1732                    return false;
1733                }
1734                if let Some(since) = request.since
1735                    && entry.timestamp < since
1736                {
1737                    return false;
1738                }
1739                if let Some(until) = request.until
1740                    && entry.timestamp > until
1741                {
1742                    return false;
1743                }
1744                true
1745            })
1746            .map(|entry| entry.offset)
1747            .collect()
1748    }
1749
1750    /// Apply filters to an event
1751    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1752    fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
1753        // Tenant isolation: if a tenant_id is specified, only return events from that tenant
1754        if let Some(ref tid) = request.tenant_id
1755            && event.tenant_id_str() != tid
1756        {
1757            return false;
1758        }
1759
1760        // Additional type filter if entity was primary
1761        if request.entity_id.is_some()
1762            && let Some(ref event_type) = request.event_type
1763            && event.event_type_str() != event_type
1764        {
1765            return false;
1766        }
1767
1768        // Additional prefix filter if entity was primary
1769        if request.entity_id.is_some()
1770            && let Some(ref prefix) = request.event_type_prefix
1771            && !event.event_type_str().starts_with(prefix)
1772        {
1773            return false;
1774        }
1775
1776        // Payload field filtering
1777        if let Some(ref filter_str) = request.payload_filter
1778            && let Ok(filter_obj) =
1779                serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(filter_str)
1780        {
1781            let payload = event.payload();
1782            for (key, expected_value) in &filter_obj {
1783                match payload.get(key) {
1784                    Some(actual_value) if actual_value == expected_value => {}
1785                    _ => return false,
1786                }
1787            }
1788        }
1789
1790        true
1791    }
1792
1793    /// Reconstruct entity state as of a specific timestamp
1794    /// v0.2: Now uses snapshots for fast reconstruction
1795    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1796    pub fn reconstruct_state(
1797        &self,
1798        entity_id: &str,
1799        as_of: Option<DateTime<Utc>>,
1800    ) -> Result<serde_json::Value> {
1801        // Try to find a snapshot to use as a base (v0.2 optimization)
1802        let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
1803            // Get snapshot closest to requested time
1804            if let Some(snapshot) = self
1805                .snapshot_manager
1806                .get_snapshot_as_of(entity_id, as_of_time)
1807            {
1808                tracing::debug!(
1809                    "Using snapshot from {} for entity {} (saved {} events)",
1810                    snapshot.as_of,
1811                    entity_id,
1812                    snapshot.event_count
1813                );
1814                (snapshot.state.clone(), Some(snapshot.as_of))
1815            } else {
1816                (serde_json::json!({}), None)
1817            }
1818        } else {
1819            // Get latest snapshot for current state
1820            if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
1821                tracing::debug!(
1822                    "Using latest snapshot from {} for entity {}",
1823                    snapshot.as_of,
1824                    entity_id
1825                );
1826                (snapshot.state.clone(), Some(snapshot.as_of))
1827            } else {
1828                (serde_json::json!({}), None)
1829            }
1830        };
1831
1832        // Query events after the snapshot (or all if no snapshot)
1833        let events = self.query(&QueryEventsRequest {
1834            entity_id: Some(entity_id.to_string()),
1835            event_type: None,
1836            tenant_id: None,
1837            as_of,
1838            since: since_timestamp,
1839            until: None,
1840            limit: None,
1841            event_type_prefix: None,
1842            payload_filter: None,
1843        })?;
1844
1845        // If no events and no snapshot, entity not found
1846        if events.is_empty() && since_timestamp.is_none() {
1847            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1848        }
1849
1850        // Merge events on top of snapshot (or from scratch if no snapshot)
1851        let mut merged_state = merged_state;
1852        for event in &events {
1853            if let serde_json::Value::Object(ref mut state_map) = merged_state
1854                && let serde_json::Value::Object(ref payload_map) = event.payload
1855            {
1856                for (key, value) in payload_map {
1857                    state_map.insert(key.clone(), value.clone());
1858                }
1859            }
1860        }
1861
1862        // Wrap with metadata
1863        let state = serde_json::json!({
1864            "entity_id": entity_id,
1865            "last_updated": events.last().map(|e| e.timestamp),
1866            "event_count": events.len(),
1867            "as_of": as_of,
1868            "current_state": merged_state,
1869            "history": events.iter().map(|e| {
1870                serde_json::json!({
1871                    "event_id": e.id,
1872                    "type": e.event_type,
1873                    "timestamp": e.timestamp,
1874                    "payload": e.payload
1875                })
1876            }).collect::<Vec<_>>()
1877        });
1878
1879        Ok(state)
1880    }
1881
1882    /// Get snapshot from projection (faster than reconstructing)
1883    pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
1884        let projections = self.projections.read();
1885
1886        if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
1887            && let Some(state) = snapshot_projection.get_state(entity_id)
1888        {
1889            return Ok(serde_json::json!({
1890                "entity_id": entity_id,
1891                "snapshot": state,
1892                "from_projection": "entity_snapshots"
1893            }));
1894        }
1895
1896        Err(AllSourceError::EntityNotFound(entity_id.to_string()))
1897    }
1898
1899    /// Get statistics about the event store
1900    pub fn stats(&self) -> StoreStats {
1901        let events = self.events.read();
1902        let index_stats = self.index.stats();
1903
1904        StoreStats {
1905            total_events: events.len(),
1906            total_entities: index_stats.total_entities,
1907            total_event_types: index_stats.total_event_types,
1908            total_ingested: *self.total_ingested.read(),
1909        }
1910    }
1911
1912    /// Get all unique streams (entity_ids) in the store
1913    pub fn list_streams(&self) -> Vec<StreamInfo> {
1914        self.index
1915            .get_all_entities()
1916            .into_iter()
1917            .map(|entity_id| {
1918                let event_count = self
1919                    .index
1920                    .get_by_entity(&entity_id)
1921                    .map_or(0, |entries| entries.len());
1922                let last_event_at = self
1923                    .index
1924                    .get_by_entity(&entity_id)
1925                    .and_then(|entries| entries.last().map(|e| e.timestamp));
1926                StreamInfo {
1927                    stream_id: entity_id,
1928                    event_count,
1929                    last_event_at,
1930                }
1931            })
1932            .collect()
1933    }
1934
1935    /// Get all unique event types in the store
1936    pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
1937        self.index
1938            .get_all_types()
1939            .into_iter()
1940            .map(|event_type| {
1941                let event_count = self
1942                    .index
1943                    .get_by_type(&event_type)
1944                    .map_or(0, |entries| entries.len());
1945                let last_event_at = self
1946                    .index
1947                    .get_by_type(&event_type)
1948                    .and_then(|entries| entries.last().map(|e| e.timestamp));
1949                EventTypeInfo {
1950                    event_type,
1951                    event_count,
1952                    last_event_at,
1953                }
1954            })
1955            .collect()
1956    }
1957
1958    /// Attach a broadcast sender to the WAL for replication.
1959    ///
1960    /// Thread-safe: can be called through `Arc<EventStore>` at runtime.
1961    /// Used during initial setup and during follower → leader promotion.
1962    /// When set, every WAL append publishes the entry to the broadcast
1963    /// channel so the WAL shipper can stream it to followers.
1964    pub fn enable_wal_replication(
1965        &self,
1966        tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
1967    ) {
1968        if let Some(ref wal_arc) = self.wal {
1969            wal_arc.set_replication_tx(tx);
1970            tracing::info!("WAL replication broadcast enabled");
1971        } else {
1972            tracing::warn!("Cannot enable WAL replication: WAL is not configured");
1973        }
1974    }
1975
1976    /// Get a reference to the WAL (if configured).
1977    /// Used by the replication catch-up protocol to determine oldest available offset.
1978    pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1979        self.wal.as_ref()
1980    }
1981
1982    /// Get a reference to the Parquet storage (if configured).
1983    /// Used by the replication catch-up protocol to stream snapshot files to followers.
1984    pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1985        self.storage.as_ref()
1986    }
1987}
1988
1989/// Configuration for EventStore
1990#[derive(Debug, Clone, Default)]
1991pub struct EventStoreConfig {
1992    /// Optional directory for persistent Parquet storage (v0.2 feature)
1993    pub storage_dir: Option<PathBuf>,
1994
1995    /// Snapshot configuration (v0.2 feature)
1996    pub snapshot_config: SnapshotConfig,
1997
1998    /// Optional directory for WAL (Write-Ahead Log) (v0.2 feature)
1999    pub wal_dir: Option<PathBuf>,
2000
2001    /// WAL configuration (v0.2 feature)
2002    pub wal_config: WALConfig,
2003
2004    /// Compaction configuration (v0.2 feature)
2005    pub compaction_config: CompactionConfig,
2006
2007    /// Schema registry configuration (v0.5 feature)
2008    pub schema_registry_config: SchemaRegistryConfig,
2009
2010    /// Optional directory for system metadata storage (dogfood feature).
2011    /// When set, operational metadata (tenants, config, audit) is stored
2012    /// using AllSource's own event store rather than an external database.
2013    /// Defaults to `{storage_dir}/__system/` when storage_dir is set.
2014    pub system_data_dir: Option<PathBuf>,
2015
2016    /// Name of the default tenant to auto-create on first boot.
2017    pub bootstrap_tenant: Option<String>,
2018
2019    /// In-memory cache budget in bytes (Step 3). When the resident
2020    /// total exceeds this after a load, the LRU tenant is evicted
2021    /// until the cache fits. `None` (the default in tests) disables
2022    /// the budget — every loaded tenant stays resident. Production
2023    /// reads this from the `ALLSOURCE_CACHE_BYTES` env var; see
2024    /// `from_env`.
2025    pub cache_byte_budget: Option<u64>,
2026
2027    /// Cadence of the runtime checkpoint loop, in seconds (Step 6).
2028    /// Each tick flushes pending Parquet batches and, on success,
2029    /// truncates the WAL up through the checkpoint. This bounds
2030    /// dirty-restart replay time to one interval of writes
2031    /// regardless of total dataset size.
2032    ///
2033    /// `None` disables the loop — the WAL still grows but is only
2034    /// truncated at boot, which is the pre-Step-6 behavior. Tests
2035    /// default to `None`; production reads
2036    /// `ALLSOURCE_CHECKPOINT_INTERVAL_SECONDS` (default 60s) via
2037    /// `from_env_vars`.
2038    pub checkpoint_interval_secs: Option<u64>,
2039}
2040
2041impl EventStoreConfig {
2042    /// Create config with persistent storage enabled
2043    pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
2044        Self {
2045            storage_dir: Some(storage_dir.into()),
2046            ..Self::default()
2047        }
2048    }
2049
2050    /// Create config with custom snapshot settings
2051    pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
2052        Self {
2053            snapshot_config,
2054            ..Self::default()
2055        }
2056    }
2057
2058    /// Create config with WAL enabled
2059    pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
2060        Self {
2061            wal_dir: Some(wal_dir.into()),
2062            wal_config,
2063            ..Self::default()
2064        }
2065    }
2066
2067    /// Create config with both persistence and snapshots
2068    pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
2069        Self {
2070            storage_dir: Some(storage_dir.into()),
2071            snapshot_config,
2072            ..Self::default()
2073        }
2074    }
2075
2076    /// Create production config with all features enabled
2077    pub fn production(
2078        storage_dir: impl Into<PathBuf>,
2079        wal_dir: impl Into<PathBuf>,
2080        snapshot_config: SnapshotConfig,
2081        wal_config: WALConfig,
2082        compaction_config: CompactionConfig,
2083    ) -> Self {
2084        let storage_dir = storage_dir.into();
2085        let system_data_dir = storage_dir.join("__system");
2086        Self {
2087            storage_dir: Some(storage_dir),
2088            snapshot_config,
2089            wal_dir: Some(wal_dir.into()),
2090            wal_config,
2091            compaction_config,
2092            system_data_dir: Some(system_data_dir),
2093            ..Self::default()
2094        }
2095    }
2096
2097    /// Resolve the effective system data directory.
2098    ///
2099    /// If explicitly set, returns that. Otherwise, derives from storage_dir.
2100    /// Returns None if neither is configured (in-memory mode).
2101    pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
2102        self.system_data_dir
2103            .clone()
2104            .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
2105    }
2106
2107    /// Build config from environment variables.
2108    ///
2109    /// Reads `ALLSOURCE_DATA_DIR`, `ALLSOURCE_STORAGE_DIR`, `ALLSOURCE_WAL_DIR`,
2110    /// and `ALLSOURCE_WAL_ENABLED` to determine persistence mode.
2111    ///
2112    /// Returns `(config, description)` where description is a human-readable
2113    /// summary of the persistence mode for logging.
2114    pub fn from_env() -> (Self, &'static str) {
2115        Self::from_env_vars(
2116            std::env::var("ALLSOURCE_DATA_DIR")
2117                .ok()
2118                .filter(|s| !s.is_empty()),
2119            std::env::var("ALLSOURCE_STORAGE_DIR")
2120                .ok()
2121                .filter(|s| !s.is_empty()),
2122            std::env::var("ALLSOURCE_WAL_DIR")
2123                .ok()
2124                .filter(|s| !s.is_empty()),
2125            std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
2126            std::env::var("ALLSOURCE_CACHE_BYTES").ok(),
2127            std::env::var("ALLSOURCE_SNAPSHOT_INTERVAL_SECONDS").ok(),
2128            std::env::var("ALLSOURCE_RETENTION_SYSTEM_DAYS").ok(),
2129            std::env::var("ALLSOURCE_CHECKPOINT_INTERVAL_SECONDS").ok(),
2130        )
2131    }
2132
2133    /// Build config from explicit env-var values (testable without mutating process env).
2134    pub fn from_env_vars(
2135        data_dir: Option<String>,
2136        explicit_storage_dir: Option<String>,
2137        explicit_wal_dir: Option<String>,
2138        wal_enabled_var: Option<String>,
2139        cache_bytes_var: Option<String>,
2140        snapshot_interval_var: Option<String>,
2141        retention_system_days_var: Option<String>,
2142        checkpoint_interval_var: Option<String>,
2143    ) -> (Self, &'static str) {
2144        let data_dir = data_dir.filter(|s| !s.is_empty());
2145        let storage_dir = explicit_storage_dir
2146            .filter(|s| !s.is_empty())
2147            .or_else(|| data_dir.as_ref().map(|d| format!("{d}/storage")));
2148        let wal_dir = explicit_wal_dir
2149            .filter(|s| !s.is_empty())
2150            .or_else(|| data_dir.as_ref().map(|d| format!("{d}/wal")));
2151        let wal_enabled = wal_enabled_var.is_none_or(|v| v == "true");
2152        // ALLSOURCE_CACHE_BYTES: parse decimal bytes. Unparseable
2153        // input is logged and ignored rather than failing boot —
2154        // the unbounded fallback is safe (worst case is the
2155        // original pre-Step-3 behavior).
2156        let cache_byte_budget =
2157            cache_bytes_var
2158                .filter(|s| !s.is_empty())
2159                .and_then(|s| match s.parse::<u64>() {
2160                    Ok(v) => Some(v),
2161                    Err(e) => {
2162                        tracing::warn!(
2163                            "ALLSOURCE_CACHE_BYTES={s:?} could not be parsed as u64: {e}; \
2164                         cache budget disabled"
2165                        );
2166                        None
2167                    }
2168                });
2169        let compaction_config =
2170            CompactionConfig::from_env_vars(snapshot_interval_var, retention_system_days_var);
2171
2172        // ALLSOURCE_CHECKPOINT_INTERVAL_SECONDS: parse decimal seconds. The
2173        // default (60s) only applies when WAL is enabled — there's no
2174        // checkpoint loop to run otherwise. Unparseable input is logged
2175        // and falls back to the default rather than failing boot.
2176        let checkpoint_interval_secs = if wal_enabled {
2177            checkpoint_interval_var
2178                .filter(|s| !s.is_empty())
2179                .map(|s| match s.parse::<u64>() {
2180                    Ok(v) => v,
2181                    Err(e) => {
2182                        tracing::warn!(
2183                            "ALLSOURCE_CHECKPOINT_INTERVAL_SECONDS={s:?} could not be parsed as \
2184                             u64: {e}; falling back to default 60s"
2185                        );
2186                        60
2187                    }
2188                })
2189                .or(Some(60))
2190        } else {
2191            None
2192        };
2193
2194        let mut config = match (&storage_dir, &wal_dir) {
2195            (Some(sd), Some(wd)) if wal_enabled => Self::production(
2196                sd,
2197                wd,
2198                SnapshotConfig::default(),
2199                WALConfig::default(),
2200                compaction_config,
2201            ),
2202            (Some(sd), _) => Self::with_persistence(sd),
2203            (_, Some(wd)) if wal_enabled => Self::with_wal(wd, WALConfig::default()),
2204            _ => Self::default(),
2205        };
2206        config.cache_byte_budget = cache_byte_budget;
2207        config.checkpoint_interval_secs = checkpoint_interval_secs;
2208
2209        let mode = match (&storage_dir, &wal_dir) {
2210            (Some(_), Some(_)) if wal_enabled => "wal+parquet",
2211            (Some(_), _) => "parquet-only",
2212            (_, Some(_)) if wal_enabled => "wal-only",
2213            _ => "in-memory",
2214        };
2215        (config, mode)
2216    }
2217}
2218
2219#[derive(Debug, serde::Serialize)]
2220pub struct StoreStats {
2221    pub total_events: usize,
2222    pub total_entities: usize,
2223    pub total_event_types: usize,
2224    pub total_ingested: u64,
2225}
2226
2227/// Information about a stream (entity_id)
2228#[derive(Debug, Clone, serde::Serialize)]
2229pub struct StreamInfo {
2230    /// The stream identifier (entity_id)
2231    pub stream_id: String,
2232    /// Total number of events in this stream
2233    pub event_count: usize,
2234    /// Timestamp of the last event in this stream
2235    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
2236}
2237
2238/// Information about an event type
2239#[derive(Debug, Clone, serde::Serialize)]
2240pub struct EventTypeInfo {
2241    /// The event type name
2242    pub event_type: String,
2243    /// Total number of events of this type
2244    pub event_count: usize,
2245    /// Timestamp of the last event of this type
2246    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
2247}
2248
2249impl Default for EventStore {
2250    fn default() -> Self {
2251        Self::new()
2252    }
2253}
2254
2255#[cfg(test)]
2256mod tests {
2257    use super::*;
2258    use crate::domain::entities::Event;
2259    use tempfile::TempDir;
2260
2261    /// Recursively walk `dir` looking for `*.parquet` files.
2262    /// Tests that pre-date Step 1's tenant-partitioned layout used a
2263    /// flat `read_dir` here; after the move to <root>/<tenant>/<yyyy-mm>/
2264    /// they need to walk subdirectories.
2265    fn find_parquet_files(dir: &std::path::Path) -> Vec<std::path::PathBuf> {
2266        let mut out = Vec::new();
2267        let mut stack = vec![dir.to_path_buf()];
2268        while let Some(d) = stack.pop() {
2269            let Ok(entries) = std::fs::read_dir(&d) else {
2270                continue;
2271            };
2272            for e in entries.flatten() {
2273                let p = e.path();
2274                if p.is_dir() {
2275                    stack.push(p);
2276                } else if p.extension().and_then(|s| s.to_str()) == Some("parquet") {
2277                    out.push(p);
2278                }
2279            }
2280        }
2281        out
2282    }
2283
2284    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2285        Event::from_strings(
2286            event_type.to_string(),
2287            entity_id.to_string(),
2288            "default".to_string(),
2289            serde_json::json!({"name": "Test", "value": 42}),
2290            None,
2291        )
2292        .unwrap()
2293    }
2294
2295    fn create_test_event_with_payload(
2296        entity_id: &str,
2297        event_type: &str,
2298        payload: serde_json::Value,
2299    ) -> Event {
2300        Event::from_strings(
2301            event_type.to_string(),
2302            entity_id.to_string(),
2303            "default".to_string(),
2304            payload,
2305            None,
2306        )
2307        .unwrap()
2308    }
2309
2310    #[test]
2311    fn test_event_store_new() {
2312        let store = EventStore::new();
2313        assert_eq!(store.stats().total_events, 0);
2314        assert_eq!(store.stats().total_entities, 0);
2315    }
2316
2317    // -----------------------------------------------------------------
2318    // Step 2: ensure_tenant_loaded smoke tests. The full
2319    // cold-boot/lazy-hydrate paths land in commit #2 (skip boot
2320    // load) and commit #4 (integration test).
2321    // -----------------------------------------------------------------
2322
2323    #[test]
2324    fn test_ensure_tenant_loaded_no_storage_is_a_noop() {
2325        // An in-memory-only store (no ParquetStorage configured) has
2326        // nothing to hydrate. The method must succeed and mark the
2327        // tenant loaded so subsequent calls hit the fast path.
2328        let store = EventStore::new();
2329        assert!(!store.is_tenant_loaded("alice"));
2330        store.ensure_tenant_loaded("alice").unwrap();
2331        assert!(store.is_tenant_loaded("alice"));
2332        // Other tenants stay cold — the call is per-tenant.
2333        assert!(!store.is_tenant_loaded("bob"));
2334    }
2335
2336    #[test]
2337    fn test_ensure_tenant_loaded_warm_path_is_idempotent() {
2338        let store = EventStore::new();
2339        store.ensure_tenant_loaded("alice").unwrap();
2340        // Second call hits the DashMap fast path and returns Ok.
2341        store.ensure_tenant_loaded("alice").unwrap();
2342    }
2343
2344    #[test]
2345    fn test_ensure_tenant_loaded_rejects_unsafe_tenant_id() {
2346        // With persistence configured, the call has to walk a
2347        // tenant subtree, so the path-safety whitelist applies.
2348        // The error must propagate; the tenant must NOT be marked
2349        // loaded (otherwise an attacker probing path-traversal
2350        // strings could spam the loaded-set with junk).
2351        let temp_dir = TempDir::new().unwrap();
2352        let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
2353        for unsafe_tid in ["..", "a/b", "a\\b", ""] {
2354            let result = store.ensure_tenant_loaded(unsafe_tid);
2355            assert!(
2356                result.is_err(),
2357                "tenant_id {unsafe_tid:?} should have been rejected"
2358            );
2359            assert!(
2360                !store.is_tenant_loaded(unsafe_tid),
2361                "rejected tenant {unsafe_tid:?} must not be marked loaded"
2362            );
2363        }
2364    }
2365
2366    #[test]
2367    fn test_ensure_tenant_loaded_no_subtree_marks_loaded_with_zero_events() {
2368        // A tenant that has no on-disk data (fresh tenant, never
2369        // persisted) must still succeed — load_events_for_tenant
2370        // returns empty, ensure_tenant_loaded marks it loaded so we
2371        // don't re-walk the empty subtree on every query.
2372        let temp_dir = TempDir::new().unwrap();
2373        let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
2374        assert!(!store.is_tenant_loaded("never-existed"));
2375        store.ensure_tenant_loaded("never-existed").unwrap();
2376        assert!(store.is_tenant_loaded("never-existed"));
2377    }
2378
2379    #[test]
2380    fn test_evict_tenant_drops_events_and_resets_bytes() {
2381        // After eviction, the tenant's events are gone from memory,
2382        // its byte counter is reset, and is_tenant_loaded returns
2383        // false. Other tenants are untouched.
2384        let temp_dir = TempDir::new().unwrap();
2385        let storage_dir = temp_dir.path().to_path_buf();
2386
2387        {
2388            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2389            for i in 0..3 {
2390                store
2391                    .ingest(
2392                        &Event::from_strings(
2393                            "test.event".to_string(),
2394                            format!("a-{i}"),
2395                            "alice".to_string(),
2396                            serde_json::json!({"i": i}),
2397                            None,
2398                        )
2399                        .unwrap(),
2400                    )
2401                    .unwrap();
2402            }
2403            for i in 0..2 {
2404                store
2405                    .ingest(
2406                        &Event::from_strings(
2407                            "test.event".to_string(),
2408                            format!("b-{i}"),
2409                            "bob".to_string(),
2410                            serde_json::json!({"i": i}),
2411                            None,
2412                        )
2413                        .unwrap(),
2414                    )
2415                    .unwrap();
2416            }
2417            store.flush_storage().unwrap();
2418        }
2419
2420        let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2421        store.ensure_tenant_loaded("alice").unwrap();
2422        store.ensure_tenant_loaded("bob").unwrap();
2423        assert_eq!(store.stats().total_events, 5);
2424        let alice_bytes = store.tenant_resident_bytes("alice");
2425        let bob_bytes = store.tenant_resident_bytes("bob");
2426        assert!(alice_bytes > 0 && bob_bytes > 0);
2427
2428        store.evict_tenant("alice");
2429
2430        assert!(!store.is_tenant_loaded("alice"));
2431        assert!(store.is_tenant_loaded("bob"));
2432        assert_eq!(store.tenant_resident_bytes("alice"), 0);
2433        assert_eq!(store.tenant_resident_bytes("bob"), bob_bytes);
2434        assert_eq!(store.stats().total_events, 2, "only bob's 2 events remain");
2435    }
2436
2437    #[test]
2438    fn test_evict_tenant_then_query_re_loads_from_disk() {
2439        // The transparent re-load behavior the bead's AC #5 calls
2440        // out: evict, then query the same tenant — its data comes
2441        // back via ensure_tenant_loaded, sourced from Parquet.
2442        let temp_dir = TempDir::new().unwrap();
2443        let storage_dir = temp_dir.path().to_path_buf();
2444
2445        {
2446            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2447            for i in 0..4 {
2448                store
2449                    .ingest(
2450                        &Event::from_strings(
2451                            "test.event".to_string(),
2452                            format!("a-{i}"),
2453                            "alice".to_string(),
2454                            serde_json::json!({"i": i}),
2455                            None,
2456                        )
2457                        .unwrap(),
2458                    )
2459                    .unwrap();
2460            }
2461            store.flush_storage().unwrap();
2462        }
2463
2464        let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2465        store.ensure_tenant_loaded("alice").unwrap();
2466        store.evict_tenant("alice");
2467        assert_eq!(store.stats().total_events, 0);
2468
2469        // Query — re-load happens transparently.
2470        let results = store
2471            .query(&QueryEventsRequest {
2472                entity_id: None,
2473                event_type: None,
2474                tenant_id: Some("alice".to_string()),
2475                as_of: None,
2476                since: None,
2477                until: None,
2478                limit: None,
2479                event_type_prefix: None,
2480                payload_filter: None,
2481            })
2482            .unwrap();
2483        assert_eq!(results.len(), 4);
2484        assert!(store.is_tenant_loaded("alice"));
2485    }
2486
2487    #[test]
2488    fn test_evict_tenant_rebuilds_index_with_new_offsets() {
2489        // After eviction, the events Vec is compacted. The index
2490        // must be rebuilt against the new offsets — otherwise
2491        // queries return stale or wrong events. This test checks
2492        // index correctness end-to-end via a query for the
2493        // surviving tenant after the evicted tenant's events are
2494        // gone.
2495        let temp_dir = TempDir::new().unwrap();
2496        let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
2497
2498        // Interleave: alice, bob, alice, bob, alice. After
2499        // evicting alice, the events Vec compacts to [bob, bob]
2500        // and the index must reflect the new layout.
2501        for i in 0..3 {
2502            store
2503                .ingest(
2504                    &Event::from_strings(
2505                        "test.event".to_string(),
2506                        format!("a-{i}"),
2507                        "alice".to_string(),
2508                        serde_json::json!({"i": i}),
2509                        None,
2510                    )
2511                    .unwrap(),
2512                )
2513                .unwrap();
2514            if i < 2 {
2515                store
2516                    .ingest(
2517                        &Event::from_strings(
2518                            "test.event".to_string(),
2519                            format!("b-{i}"),
2520                            "bob".to_string(),
2521                            serde_json::json!({"i": i}),
2522                            None,
2523                        )
2524                        .unwrap(),
2525                    )
2526                    .unwrap();
2527            }
2528        }
2529        // Mark both as loaded for accurate eviction bookkeeping.
2530        store.tenant_loader.mark_loaded("alice");
2531        store.tenant_loader.mark_loaded("bob");
2532
2533        store.evict_tenant("alice");
2534
2535        let bob_results = store
2536            .query(&QueryEventsRequest {
2537                entity_id: None,
2538                event_type: None,
2539                tenant_id: Some("bob".to_string()),
2540                as_of: None,
2541                since: None,
2542                until: None,
2543                limit: None,
2544                event_type_prefix: None,
2545                payload_filter: None,
2546            })
2547            .unwrap();
2548        assert_eq!(bob_results.len(), 2);
2549        for e in &bob_results {
2550            assert_eq!(e.tenant_id_str(), "bob");
2551        }
2552    }
2553
2554    #[test]
2555    fn test_budget_eviction_keeps_resident_set_bounded() {
2556        // Configure a tiny budget. Load three tenants in sequence;
2557        // the third load must evict the LRU tenant, keeping the
2558        // resident set under (or near) the budget.
2559        let temp_dir = TempDir::new().unwrap();
2560        let storage_dir = temp_dir.path().to_path_buf();
2561
2562        // Persist 5 events per tenant with ~1 KiB payloads. Each
2563        // tenant ends up at ~5 KiB + overhead.
2564        let big_payload = serde_json::json!({"data": "x".repeat(1000)});
2565        {
2566            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2567            for tenant in ["alice", "bob", "carol"] {
2568                for i in 0..5 {
2569                    store
2570                        .ingest(
2571                            &Event::from_strings(
2572                                "test.event".to_string(),
2573                                format!("{tenant}-{i}"),
2574                                tenant.to_string(),
2575                                big_payload.clone(),
2576                                None,
2577                            )
2578                            .unwrap(),
2579                        )
2580                        .unwrap();
2581                }
2582            }
2583            store.flush_storage().unwrap();
2584        }
2585
2586        // Budget = 12 KiB. Two tenants (~6 KiB each = ~12 KiB) is
2587        // tight; loading a third must evict.
2588        let mut config = EventStoreConfig::with_persistence(&storage_dir);
2589        config.cache_byte_budget = Some(12_000);
2590        let store = EventStore::with_config(config);
2591
2592        // Load alice — under budget, no eviction.
2593        store.ensure_tenant_loaded("alice").unwrap();
2594        assert!(store.is_tenant_loaded("alice"));
2595
2596        // Touch alice and immediately load bob. Bob is the
2597        // freshly-loaded one, so bob is excluded from eviction.
2598        // Alice is the next-oldest. After the load, total may
2599        // exceed budget — if so, evict alice.
2600        store.tenant_loader.touch("alice");
2601        std::thread::sleep(std::time::Duration::from_millis(10));
2602        store.ensure_tenant_loaded("bob").unwrap();
2603        assert!(store.is_tenant_loaded("bob"));
2604
2605        // Touch bob, load carol. Carol is freshly-loaded; the LRU
2606        // candidate is the older of {alice, bob} — alice (since
2607        // bob was just touched).
2608        store.tenant_loader.touch("bob");
2609        std::thread::sleep(std::time::Duration::from_millis(10));
2610        store.ensure_tenant_loaded("carol").unwrap();
2611        assert!(store.is_tenant_loaded("carol"));
2612
2613        // After all loads, the cache must respect the budget OR
2614        // (if a single tenant alone exceeds it) we should at most
2615        // hold the just-loaded tenant. The test budget is small
2616        // enough that we expect at least one eviction.
2617        let resident = store.cache_resident_bytes();
2618        let budget = 12_000u64;
2619
2620        // Either we're within the budget, or only the freshly-loaded
2621        // tenant is left (the "single oversized tenant" fallback).
2622        if resident > budget {
2623            let loaded_count = ["alice", "bob", "carol"]
2624                .iter()
2625                .filter(|t| store.is_tenant_loaded(t))
2626                .count();
2627            assert_eq!(
2628                loaded_count, 1,
2629                "over budget but more than one tenant loaded — eviction policy didn't fire"
2630            );
2631        }
2632
2633        // Carol must still be loaded — it's the most recent and
2634        // never picked as a victim.
2635        assert!(store.is_tenant_loaded("carol"));
2636    }
2637
2638    #[test]
2639    fn test_query_after_eviction_re_loads_transparently() {
2640        // The end-to-end shape of AC #5: query → evict → query
2641        // again returns the right data.
2642        let temp_dir = TempDir::new().unwrap();
2643        let storage_dir = temp_dir.path().to_path_buf();
2644
2645        let big_payload = serde_json::json!({"data": "x".repeat(2000)});
2646        {
2647            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2648            for tenant in ["alice", "bob"] {
2649                for i in 0..3 {
2650                    store
2651                        .ingest(
2652                            &Event::from_strings(
2653                                "test.event".to_string(),
2654                                format!("{tenant}-{i}"),
2655                                tenant.to_string(),
2656                                big_payload.clone(),
2657                                None,
2658                            )
2659                            .unwrap(),
2660                        )
2661                        .unwrap();
2662                }
2663            }
2664            store.flush_storage().unwrap();
2665        }
2666
2667        // Budget = 5 KiB — one tenant fits, two don't.
2668        let mut config = EventStoreConfig::with_persistence(&storage_dir);
2669        config.cache_byte_budget = Some(5_000);
2670        let store = EventStore::with_config(config);
2671
2672        // Query alice — sized at ~6 KiB, so over budget but no
2673        // peer to evict; alice stays as the single-oversized-tenant
2674        // case.
2675        let alice_first = store
2676            .query(&QueryEventsRequest {
2677                entity_id: None,
2678                event_type: None,
2679                tenant_id: Some("alice".to_string()),
2680                as_of: None,
2681                since: None,
2682                until: None,
2683                limit: None,
2684                event_type_prefix: None,
2685                payload_filter: None,
2686            })
2687            .unwrap();
2688        assert_eq!(alice_first.len(), 3);
2689
2690        // Sleep to make alice older than bob in the LRU ordering.
2691        std::thread::sleep(std::time::Duration::from_millis(15));
2692        // Query bob — alice will get evicted.
2693        let _bob = store
2694            .query(&QueryEventsRequest {
2695                entity_id: None,
2696                event_type: None,
2697                tenant_id: Some("bob".to_string()),
2698                as_of: None,
2699                since: None,
2700                until: None,
2701                limit: None,
2702                event_type_prefix: None,
2703                payload_filter: None,
2704            })
2705            .unwrap();
2706        assert!(
2707            !store.is_tenant_loaded("alice"),
2708            "alice should have been evicted"
2709        );
2710
2711        // Re-query alice — must transparently re-load.
2712        let alice_second = store
2713            .query(&QueryEventsRequest {
2714                entity_id: None,
2715                event_type: None,
2716                tenant_id: Some("alice".to_string()),
2717                as_of: None,
2718                since: None,
2719                until: None,
2720                limit: None,
2721                event_type_prefix: None,
2722                payload_filter: None,
2723            })
2724            .unwrap();
2725        assert_eq!(
2726            alice_second.len(),
2727            3,
2728            "alice's events come back via re-load"
2729        );
2730        assert!(store.is_tenant_loaded("alice"));
2731    }
2732
2733    #[test]
2734    #[cfg(feature = "server")]
2735    fn test_cache_metrics_track_evictions_and_bytes() {
2736        // Smoke test for the Step 3 #4 Prometheus metrics —
2737        // confirms the counter increments on eviction and the
2738        // gauge tracks the resident bytes.
2739        let temp_dir = TempDir::new().unwrap();
2740        let storage_dir = temp_dir.path().to_path_buf();
2741
2742        let big_payload = serde_json::json!({"data": "x".repeat(2000)});
2743        {
2744            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2745            for tenant in ["alice", "bob"] {
2746                for i in 0..3 {
2747                    store
2748                        .ingest(
2749                            &Event::from_strings(
2750                                "test.event".to_string(),
2751                                format!("{tenant}-{i}"),
2752                                tenant.to_string(),
2753                                big_payload.clone(),
2754                                None,
2755                            )
2756                            .unwrap(),
2757                        )
2758                        .unwrap();
2759                }
2760            }
2761            store.flush_storage().unwrap();
2762        }
2763
2764        let mut config = EventStoreConfig::with_persistence(&storage_dir);
2765        config.cache_byte_budget = Some(5_000); // forces eviction
2766        let store = EventStore::with_config(config);
2767
2768        assert_eq!(store.metrics.cache_evictions_total.get(), 0);
2769        assert_eq!(store.metrics.cache_bytes.get(), 0);
2770
2771        store.ensure_tenant_loaded("alice").unwrap();
2772        // After loading alice, gauge reflects her bytes.
2773        let after_alice = store.metrics.cache_bytes.get();
2774        assert!(after_alice > 0, "gauge should reflect alice's bytes");
2775        // Single oversized tenant — no eviction yet.
2776        assert_eq!(store.metrics.cache_evictions_total.get(), 0);
2777
2778        std::thread::sleep(std::time::Duration::from_millis(10));
2779        store.ensure_tenant_loaded("bob").unwrap();
2780
2781        // Bob's load pushed total over budget; alice (older) was
2782        // evicted. Counter increments.
2783        assert_eq!(
2784            store.metrics.cache_evictions_total.get(),
2785            1,
2786            "exactly one tenant evicted after bob's load"
2787        );
2788        // Gauge now reflects only bob's bytes.
2789        let after_bob = store.metrics.cache_bytes.get();
2790        assert!(after_bob > 0);
2791        assert!(after_bob <= after_alice, "gauge dropped after eviction");
2792    }
2793
2794    #[test]
2795    fn test_stress_resident_set_stays_near_budget_under_rolling_queries() {
2796        // Scaled-down version of the bead's stress test: the
2797        // bead's 10 × 50 MB / 100 MB ratio (10× tenants vs
2798        // budget-headroom) preserved at 500 KB / 1 MB to stay
2799        // unit-test-fast. The same correctness property: after
2800        // many rolling queries across more tenants than fit, the
2801        // resident set must stay at-or-near the budget.
2802        let temp_dir = TempDir::new().unwrap();
2803        let storage_dir = temp_dir.path().to_path_buf();
2804
2805        const TENANT_COUNT: usize = 10;
2806        const EVENTS_PER_TENANT: usize = 50;
2807        // Per-event payload ~10 KiB → tenant ~ 500 KiB.
2808        let big_payload = serde_json::json!({"data": "x".repeat(10_000)});
2809
2810        // Persist all tenants. Each ends up at ~500 KiB on disk
2811        // (and roughly the same in memory once loaded).
2812        {
2813            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2814            for t in 0..TENANT_COUNT {
2815                let tenant = format!("tenant-{t}");
2816                for i in 0..EVENTS_PER_TENANT {
2817                    store
2818                        .ingest(
2819                            &Event::from_strings(
2820                                "test.event".to_string(),
2821                                format!("{tenant}-{i}"),
2822                                tenant.clone(),
2823                                big_payload.clone(),
2824                                None,
2825                            )
2826                            .unwrap(),
2827                        )
2828                        .unwrap();
2829                }
2830            }
2831            store.flush_storage().unwrap();
2832        }
2833
2834        // Budget = 1 MiB → fits ~2 tenants. We're going to query
2835        // all 10, so the LRU policy must hold the resident set
2836        // near 1 MiB across the rolling sequence.
2837        const BUDGET: u64 = 1_048_576;
2838        let mut config = EventStoreConfig::with_persistence(&storage_dir);
2839        config.cache_byte_budget = Some(BUDGET);
2840        let store = EventStore::with_config(config);
2841
2842        // Sweep through tenants in order. Each query loads its
2843        // tenant; if budget is exceeded after the load, an LRU
2844        // eviction fires.
2845        let mut peak_resident: u64 = 0;
2846        for t in 0..TENANT_COUNT {
2847            let tenant = format!("tenant-{t}");
2848            let results = store
2849                .query(&QueryEventsRequest {
2850                    entity_id: None,
2851                    event_type: None,
2852                    tenant_id: Some(tenant.clone()),
2853                    as_of: None,
2854                    since: None,
2855                    until: None,
2856                    limit: None,
2857                    event_type_prefix: None,
2858                    payload_filter: None,
2859                })
2860                .unwrap();
2861            assert_eq!(
2862                results.len(),
2863                EVENTS_PER_TENANT,
2864                "every per-tenant query must return all of that tenant's events"
2865            );
2866            // Track peak resident bytes seen during the sweep.
2867            let resident = store.cache_resident_bytes();
2868            if resident > peak_resident {
2869                peak_resident = resident;
2870            }
2871        }
2872
2873        let final_resident = store.cache_resident_bytes();
2874
2875        // Tolerance: a tenant's bytes get added before eviction
2876        // fires, so peak transiently exceeds the budget by at
2877        // most one tenant's worth (~500 KiB). The final state
2878        // after the sweep should be well-bounded.
2879        let tolerance = BUDGET; // generous: 2× budget upper bound
2880        assert!(
2881            peak_resident <= BUDGET + tolerance,
2882            "peak resident {peak_resident} exceeds budget {BUDGET} by more than {tolerance} \
2883             — eviction policy not keeping up with the working-set churn"
2884        );
2885        assert!(
2886            final_resident <= BUDGET + tolerance,
2887            "final resident {final_resident} exceeds budget {BUDGET} by more than {tolerance}"
2888        );
2889
2890        // The most-recently-queried tenant must still be loaded
2891        // (it was just touched).
2892        let last_tenant = format!("tenant-{}", TENANT_COUNT - 1);
2893        assert!(
2894            store.is_tenant_loaded(&last_tenant),
2895            "the most-recent tenant must remain loaded after the sweep"
2896        );
2897
2898        // At least some tenants must have been evicted — otherwise
2899        // the budget didn't fire.
2900        let still_loaded = (0..TENANT_COUNT)
2901            .filter(|t| store.is_tenant_loaded(&format!("tenant-{t}")))
2902            .count();
2903        assert!(
2904            still_loaded < TENANT_COUNT,
2905            "no tenants evicted ({still_loaded}/{TENANT_COUNT} still loaded) — \
2906             budget enforcement didn't engage"
2907        );
2908    }
2909
2910    #[test]
2911    fn test_evict_tenant_when_not_loaded_is_a_noop() {
2912        // Eviction of a never-loaded tenant must not panic and
2913        // must not affect other tenants.
2914        let store = EventStore::new();
2915        store.evict_tenant("nobody"); // should not panic
2916        assert!(!store.is_tenant_loaded("nobody"));
2917    }
2918
2919    #[test]
2920    fn test_lazy_load_accounts_bytes_per_tenant() {
2921        // Step 3 #1: per-tenant byte tracking. Loading a tenant
2922        // should accumulate bytes proportional to its event
2923        // payload sizes; another tenant's counter must stay 0.
2924        let temp_dir = TempDir::new().unwrap();
2925        let storage_dir = temp_dir.path().to_path_buf();
2926
2927        // Persist 5 events for alice with measurable-size payloads.
2928        {
2929            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2930            for i in 0..5 {
2931                store
2932                    .ingest(
2933                        &Event::from_strings(
2934                            "test.event".to_string(),
2935                            format!("a-{i}"),
2936                            "alice".to_string(),
2937                            serde_json::json!({"data": "x".repeat(1000)}),
2938                            None,
2939                        )
2940                        .unwrap(),
2941                    )
2942                    .unwrap();
2943            }
2944            store.flush_storage().unwrap();
2945        }
2946
2947        let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2948        // Cold: zero bytes accounted.
2949        assert_eq!(store.tenant_resident_bytes("alice"), 0);
2950        assert_eq!(store.cache_resident_bytes(), 0);
2951
2952        store.ensure_tenant_loaded("alice").unwrap();
2953
2954        // After load: alice's counter is non-trivial (5 events
2955        // each carrying ~1000 bytes of payload + overhead).
2956        let alice_bytes = store.tenant_resident_bytes("alice");
2957        assert!(
2958            alice_bytes >= 5 * 1000,
2959            "alice should have at least 5 KiB resident; got {alice_bytes}"
2960        );
2961        // Bob never loaded → 0.
2962        assert_eq!(store.tenant_resident_bytes("bob"), 0);
2963        // Total equals alice's portion (only loaded tenant).
2964        assert_eq!(store.cache_resident_bytes(), alice_bytes);
2965    }
2966
2967    #[test]
2968    fn test_query_lazy_loads_tenant_on_first_call() {
2969        // The end-to-end shape of Step 2: persist events for a
2970        // tenant in session 1, restart, and confirm session 2 boots
2971        // empty but a query for that tenant pulls them in.
2972        let temp_dir = TempDir::new().unwrap();
2973        let storage_dir = temp_dir.path().to_path_buf();
2974
2975        // Session 1: ingest 3 events for tenant "alice", flush, drop.
2976        {
2977            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2978            for i in 0..3 {
2979                let event = Event::from_strings(
2980                    "test.event".to_string(),
2981                    format!("e-{i}"),
2982                    "alice".to_string(),
2983                    serde_json::json!({"i": i}),
2984                    None,
2985                )
2986                .unwrap();
2987                store.ingest(&event).unwrap();
2988            }
2989            store.flush_storage().unwrap();
2990        }
2991
2992        // Session 2: fresh boot. Events on disk, nothing in memory.
2993        let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2994        assert_eq!(
2995            store.stats().total_events,
2996            0,
2997            "boot must be O(1) — no Parquet pre-load"
2998        );
2999        assert!(!store.is_tenant_loaded("alice"));
3000        assert!(!store.is_tenant_loaded("bob"));
3001
3002        // First query for alice: triggers ensure_tenant_loaded.
3003        let results = store
3004            .query(&QueryEventsRequest {
3005                entity_id: None,
3006                event_type: None,
3007                tenant_id: Some("alice".to_string()),
3008                as_of: None,
3009                since: None,
3010                until: None,
3011                limit: None,
3012                event_type_prefix: None,
3013                payload_filter: None,
3014            })
3015            .unwrap();
3016        assert_eq!(results.len(), 3, "alice's 3 events are returned");
3017        assert!(store.is_tenant_loaded("alice"), "alice now warm");
3018        // bob untouched — load is per-tenant, so a query for alice
3019        // must not have hydrated bob.
3020        assert!(!store.is_tenant_loaded("bob"), "bob still cold");
3021    }
3022
3023    #[test]
3024    fn test_query_invalid_tenant_id_returns_error_no_hang() {
3025        // Step 2 acceptance criterion: in-flight load failures
3026        // surface as errors, not infinite hangs. Path-traversal
3027        // input fails fast at sanitization and propagates.
3028        let temp_dir = TempDir::new().unwrap();
3029        let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
3030
3031        let result = store.query(&QueryEventsRequest {
3032            entity_id: None,
3033            event_type: None,
3034            tenant_id: Some("../etc".to_string()),
3035            as_of: None,
3036            since: None,
3037            until: None,
3038            limit: None,
3039            event_type_prefix: None,
3040            payload_filter: None,
3041        });
3042        assert!(result.is_err(), "unsafe tenant_id must surface as error");
3043    }
3044
3045    #[test]
3046    fn test_query_concurrent_first_queries_for_same_tenant_all_succeed() {
3047        // Singleflight: N threads racing to query the same cold
3048        // tenant must all return the same correct result. The
3049        // tenant-load must happen exactly once (verified
3050        // structurally by the per-tenant Mutex in tenant_loader,
3051        // tested directly in test_singleflight_blocks_second_caller).
3052        // This integration test confirms the wiring at the query
3053        // level — no thread observes a half-loaded state.
3054        let temp_dir = TempDir::new().unwrap();
3055        let storage_dir = temp_dir.path().to_path_buf();
3056
3057        // Persist 25 events for tenant "alice".
3058        {
3059            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3060            for i in 0..25 {
3061                let event = Event::from_strings(
3062                    "test.event".to_string(),
3063                    format!("e-{i}"),
3064                    "alice".to_string(),
3065                    serde_json::json!({"i": i}),
3066                    None,
3067                )
3068                .unwrap();
3069                store.ingest(&event).unwrap();
3070            }
3071            store.flush_storage().unwrap();
3072        }
3073
3074        // Fresh boot, then 8 threads simultaneously query alice.
3075        let store = Arc::new(EventStore::with_config(EventStoreConfig::with_persistence(
3076            &storage_dir,
3077        )));
3078        assert!(!store.is_tenant_loaded("alice"));
3079
3080        let mut handles = Vec::new();
3081        for _ in 0..8 {
3082            let s = store.clone();
3083            handles.push(std::thread::spawn(move || {
3084                s.query(&QueryEventsRequest {
3085                    entity_id: None,
3086                    event_type: None,
3087                    tenant_id: Some("alice".to_string()),
3088                    as_of: None,
3089                    since: None,
3090                    until: None,
3091                    limit: None,
3092                    event_type_prefix: None,
3093                    payload_filter: None,
3094                })
3095            }));
3096        }
3097
3098        for h in handles {
3099            let result = h.join().unwrap().unwrap();
3100            assert_eq!(
3101                result.len(),
3102                25,
3103                "every concurrent caller must see all 25 events"
3104            );
3105        }
3106        assert!(store.is_tenant_loaded("alice"));
3107        // Memory has exactly 25 events — no double-load.
3108        assert_eq!(store.stats().total_events, 25);
3109    }
3110
3111    #[test]
3112    fn test_query_two_cold_tenants_load_independently() {
3113        // Querying tenant A loads only A; querying B then loads
3114        // only B. State after both queries: both tenants warm,
3115        // memory has exactly the expected event counts.
3116        let temp_dir = TempDir::new().unwrap();
3117        let storage_dir = temp_dir.path().to_path_buf();
3118
3119        {
3120            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3121            for i in 0..3 {
3122                store
3123                    .ingest(
3124                        &Event::from_strings(
3125                            "test.event".to_string(),
3126                            format!("a-{i}"),
3127                            "alice".to_string(),
3128                            serde_json::json!({"i": i}),
3129                            None,
3130                        )
3131                        .unwrap(),
3132                    )
3133                    .unwrap();
3134            }
3135            for i in 0..5 {
3136                store
3137                    .ingest(
3138                        &Event::from_strings(
3139                            "test.event".to_string(),
3140                            format!("b-{i}"),
3141                            "bob".to_string(),
3142                            serde_json::json!({"i": i}),
3143                            None,
3144                        )
3145                        .unwrap(),
3146                    )
3147                    .unwrap();
3148            }
3149            store.flush_storage().unwrap();
3150        }
3151
3152        let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3153        assert_eq!(store.stats().total_events, 0);
3154
3155        // Query alice — bob stays cold.
3156        let alice = store
3157            .query(&QueryEventsRequest {
3158                entity_id: None,
3159                event_type: None,
3160                tenant_id: Some("alice".to_string()),
3161                as_of: None,
3162                since: None,
3163                until: None,
3164                limit: None,
3165                event_type_prefix: None,
3166                payload_filter: None,
3167            })
3168            .unwrap();
3169        assert_eq!(alice.len(), 3);
3170        assert!(store.is_tenant_loaded("alice"));
3171        assert!(!store.is_tenant_loaded("bob"));
3172        assert_eq!(store.stats().total_events, 3);
3173
3174        // Query bob — both warm now.
3175        let bob = store
3176            .query(&QueryEventsRequest {
3177                entity_id: None,
3178                event_type: None,
3179                tenant_id: Some("bob".to_string()),
3180                as_of: None,
3181                since: None,
3182                until: None,
3183                limit: None,
3184                event_type_prefix: None,
3185                payload_filter: None,
3186            })
3187            .unwrap();
3188        assert_eq!(bob.len(), 5);
3189        assert!(store.is_tenant_loaded("bob"));
3190        assert_eq!(store.stats().total_events, 8);
3191    }
3192
3193    #[test]
3194    fn test_boot_with_persisted_data_is_o1() {
3195        // Step 2's headline acceptance criterion: boot time does
3196        // not scale with persisted-data size. The 5M-events / <2s
3197        // target is too large for a unit test, so this asserts the
3198        // weaker but structural property: boot reads zero events
3199        // into memory regardless of how many are on disk.
3200        //
3201        // We persist 50 events across 3 tenants in session 1,
3202        // restart in session 2, and verify session 2's
3203        // total_events is 0. The actual boot wall-clock isn't
3204        // asserted here — it's machine-dependent — but the absence
3205        // of any in-memory data is the structural proxy that the
3206        // boot path no longer iterates Parquet.
3207        let temp_dir = TempDir::new().unwrap();
3208        let storage_dir = temp_dir.path().to_path_buf();
3209
3210        {
3211            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3212            for tenant in ["alice", "bob", "carol"] {
3213                for i in 0..50 / 3 {
3214                    store
3215                        .ingest(
3216                            &Event::from_strings(
3217                                "test.event".to_string(),
3218                                format!("{tenant}-{i}"),
3219                                tenant.to_string(),
3220                                serde_json::json!({"i": i}),
3221                                None,
3222                            )
3223                            .unwrap(),
3224                        )
3225                        .unwrap();
3226                }
3227            }
3228            store.flush_storage().unwrap();
3229        }
3230
3231        // Confirm there is in fact data on disk to load.
3232        let on_disk = find_parquet_files(&storage_dir);
3233        assert!(
3234            !on_disk.is_empty(),
3235            "session 1 should have produced parquet files; pre-condition for the test"
3236        );
3237
3238        let started = std::time::Instant::now();
3239        let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3240        let boot_elapsed = started.elapsed();
3241
3242        assert_eq!(
3243            store.stats().total_events,
3244            0,
3245            "boot must not pre-load any Parquet events"
3246        );
3247
3248        // Sanity: even on a slow CI box, an O(1) boot finishes in
3249        // well under a second. If this trips it's a strong signal
3250        // the boot path regressed to scanning the Parquet tree.
3251        assert!(
3252            boot_elapsed < std::time::Duration::from_secs(2),
3253            "boot took {boot_elapsed:?} — Step 2 boot should be O(1)"
3254        );
3255    }
3256
3257    #[test]
3258    fn test_query_warm_tenant_does_not_re_read_disk() {
3259        // Performance contract: a warm tenant query goes through the
3260        // DashMap fast path. We can't easily assert "no disk read"
3261        // directly in a unit test, but we CAN assert the call
3262        // succeeds in O(in-memory-events) time even after the
3263        // on-disk file is removed — proving we didn't re-walk it.
3264        let temp_dir = TempDir::new().unwrap();
3265        let storage_dir = temp_dir.path().to_path_buf();
3266
3267        let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3268        for i in 0..3 {
3269            let event = Event::from_strings(
3270                "test.event".to_string(),
3271                format!("e-{i}"),
3272                "alice".to_string(),
3273                serde_json::json!({"i": i}),
3274                None,
3275            )
3276            .unwrap();
3277            store.ingest(&event).unwrap();
3278        }
3279        store.flush_storage().unwrap();
3280
3281        // First query: cold, hits disk.
3282        let _ = store
3283            .query(&QueryEventsRequest {
3284                entity_id: None,
3285                event_type: None,
3286                tenant_id: Some("alice".to_string()),
3287                as_of: None,
3288                since: None,
3289                until: None,
3290                limit: None,
3291                event_type_prefix: None,
3292                payload_filter: None,
3293            })
3294            .unwrap();
3295        assert!(store.is_tenant_loaded("alice"));
3296
3297        // Now wipe the on-disk file. A warm-path query must still
3298        // succeed because it doesn't need disk.
3299        let parquet_files = find_parquet_files(&storage_dir);
3300        for f in parquet_files {
3301            std::fs::remove_file(&f).unwrap();
3302        }
3303
3304        let results = store
3305            .query(&QueryEventsRequest {
3306                entity_id: None,
3307                event_type: None,
3308                tenant_id: Some("alice".to_string()),
3309                as_of: None,
3310                since: None,
3311                until: None,
3312                limit: None,
3313                event_type_prefix: None,
3314                payload_filter: None,
3315            })
3316            .unwrap();
3317        assert_eq!(
3318            results.len(),
3319            3,
3320            "warm tenant query must not need disk; got {} events from a deleted parquet",
3321            results.len()
3322        );
3323    }
3324
3325    #[test]
3326    fn test_event_store_default() {
3327        let store = EventStore::default();
3328        assert_eq!(store.stats().total_events, 0);
3329    }
3330
3331    #[test]
3332    fn test_ingest_single_event() {
3333        let store = EventStore::new();
3334        let event = create_test_event("entity-1", "user.created");
3335
3336        store.ingest(&event).unwrap();
3337
3338        assert_eq!(store.stats().total_events, 1);
3339        assert_eq!(store.stats().total_ingested, 1);
3340    }
3341
3342    #[test]
3343    fn test_ingest_multiple_events() {
3344        let store = EventStore::new();
3345
3346        for i in 0..10 {
3347            let event = create_test_event(&format!("entity-{i}"), "user.created");
3348            store.ingest(&event).unwrap();
3349        }
3350
3351        assert_eq!(store.stats().total_events, 10);
3352        assert_eq!(store.stats().total_ingested, 10);
3353    }
3354
3355    #[test]
3356    fn test_query_by_entity_id() {
3357        let store = EventStore::new();
3358
3359        store
3360            .ingest(&create_test_event("entity-1", "user.created"))
3361            .unwrap();
3362        store
3363            .ingest(&create_test_event("entity-2", "user.created"))
3364            .unwrap();
3365        store
3366            .ingest(&create_test_event("entity-1", "user.updated"))
3367            .unwrap();
3368
3369        let results = store
3370            .query(&QueryEventsRequest {
3371                entity_id: Some("entity-1".to_string()),
3372                event_type: None,
3373                tenant_id: None,
3374                as_of: None,
3375                since: None,
3376                until: None,
3377                limit: None,
3378                event_type_prefix: None,
3379                payload_filter: None,
3380            })
3381            .unwrap();
3382
3383        assert_eq!(results.len(), 2);
3384    }
3385
3386    #[test]
3387    fn test_query_by_event_type() {
3388        let store = EventStore::new();
3389
3390        store
3391            .ingest(&create_test_event("entity-1", "user.created"))
3392            .unwrap();
3393        store
3394            .ingest(&create_test_event("entity-2", "user.updated"))
3395            .unwrap();
3396        store
3397            .ingest(&create_test_event("entity-3", "user.created"))
3398            .unwrap();
3399
3400        let results = store
3401            .query(&QueryEventsRequest {
3402                entity_id: None,
3403                event_type: Some("user.created".to_string()),
3404                tenant_id: None,
3405                as_of: None,
3406                since: None,
3407                until: None,
3408                limit: None,
3409                event_type_prefix: None,
3410                payload_filter: None,
3411            })
3412            .unwrap();
3413
3414        assert_eq!(results.len(), 2);
3415    }
3416
3417    #[test]
3418    fn test_query_with_limit() {
3419        let store = EventStore::new();
3420
3421        for i in 0..10 {
3422            let event = create_test_event(&format!("entity-{i}"), "user.created");
3423            store.ingest(&event).unwrap();
3424        }
3425
3426        let results = store
3427            .query(&QueryEventsRequest {
3428                entity_id: None,
3429                event_type: None,
3430                tenant_id: None,
3431                as_of: None,
3432                since: None,
3433                until: None,
3434                limit: Some(5),
3435                event_type_prefix: None,
3436                payload_filter: None,
3437            })
3438            .unwrap();
3439
3440        assert_eq!(results.len(), 5);
3441    }
3442
3443    #[test]
3444    fn test_query_empty_store() {
3445        let store = EventStore::new();
3446
3447        let results = store
3448            .query(&QueryEventsRequest {
3449                entity_id: Some("non-existent".to_string()),
3450                event_type: None,
3451                tenant_id: None,
3452                as_of: None,
3453                since: None,
3454                until: None,
3455                limit: None,
3456                event_type_prefix: None,
3457                payload_filter: None,
3458            })
3459            .unwrap();
3460
3461        assert!(results.is_empty());
3462    }
3463
3464    #[test]
3465    fn test_reconstruct_state() {
3466        let store = EventStore::new();
3467
3468        store
3469            .ingest(&create_test_event("entity-1", "user.created"))
3470            .unwrap();
3471
3472        let state = store.reconstruct_state("entity-1", None).unwrap();
3473        // The state is wrapped with metadata
3474        assert_eq!(state["current_state"]["name"], "Test");
3475        assert_eq!(state["current_state"]["value"], 42);
3476    }
3477
3478    #[test]
3479    fn test_reconstruct_state_not_found() {
3480        let store = EventStore::new();
3481
3482        let result = store.reconstruct_state("non-existent", None);
3483        assert!(result.is_err());
3484    }
3485
3486    #[test]
3487    fn test_get_snapshot_empty() {
3488        let store = EventStore::new();
3489
3490        let result = store.get_snapshot("non-existent");
3491        // Entity not found error is expected
3492        assert!(result.is_err());
3493    }
3494
3495    #[test]
3496    fn test_create_snapshot() {
3497        let store = EventStore::new();
3498
3499        store
3500            .ingest(&create_test_event("entity-1", "user.created"))
3501            .unwrap();
3502
3503        store.create_snapshot("entity-1").unwrap();
3504
3505        // Verify snapshot was created
3506        let snapshot = store.get_snapshot("entity-1").unwrap();
3507        assert!(snapshot != serde_json::json!(null));
3508    }
3509
3510    #[test]
3511    fn test_create_snapshot_entity_not_found() {
3512        let store = EventStore::new();
3513
3514        let result = store.create_snapshot("non-existent");
3515        assert!(result.is_err());
3516    }
3517
3518    #[test]
3519    fn test_websocket_manager() {
3520        let store = EventStore::new();
3521        let manager = store.websocket_manager();
3522        // Manager should be accessible
3523        assert!(Arc::strong_count(&manager) >= 1);
3524    }
3525
3526    #[test]
3527    fn test_snapshot_manager() {
3528        let store = EventStore::new();
3529        let manager = store.snapshot_manager();
3530        assert!(Arc::strong_count(&manager) >= 1);
3531    }
3532
3533    #[test]
3534    fn test_compaction_manager_none() {
3535        let store = EventStore::new();
3536        // Without storage_dir, compaction manager should be None
3537        assert!(store.compaction_manager().is_none());
3538    }
3539
3540    #[test]
3541    fn test_schema_registry() {
3542        let store = EventStore::new();
3543        let registry = store.schema_registry();
3544        assert!(Arc::strong_count(&registry) >= 1);
3545    }
3546
3547    #[test]
3548    fn test_replay_manager() {
3549        let store = EventStore::new();
3550        let manager = store.replay_manager();
3551        assert!(Arc::strong_count(&manager) >= 1);
3552    }
3553
3554    #[test]
3555    fn test_pipeline_manager() {
3556        let store = EventStore::new();
3557        let manager = store.pipeline_manager();
3558        assert!(Arc::strong_count(&manager) >= 1);
3559    }
3560
3561    #[test]
3562    fn test_projection_manager() {
3563        let store = EventStore::new();
3564        let manager = store.projection_manager();
3565        // Built-in projections should be registered
3566        let projections = manager.list_projections();
3567        assert!(projections.len() >= 2); // entity_snapshots and event_counters
3568    }
3569
3570    #[test]
3571    fn test_projection_state_cache() {
3572        let store = EventStore::new();
3573        let cache = store.projection_state_cache();
3574
3575        cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
3576        assert_eq!(cache.len(), 1);
3577
3578        let value = cache.get("test:key").unwrap();
3579        assert_eq!(value["value"], 123);
3580    }
3581
3582    #[test]
3583    fn test_metrics() {
3584        let store = EventStore::new();
3585        let metrics = store.metrics();
3586        assert!(Arc::strong_count(&metrics) >= 1);
3587    }
3588
3589    #[test]
3590    fn test_store_stats() {
3591        let store = EventStore::new();
3592
3593        store
3594            .ingest(&create_test_event("entity-1", "user.created"))
3595            .unwrap();
3596        store
3597            .ingest(&create_test_event("entity-2", "order.placed"))
3598            .unwrap();
3599
3600        let stats = store.stats();
3601        assert_eq!(stats.total_events, 2);
3602        assert_eq!(stats.total_entities, 2);
3603        assert_eq!(stats.total_event_types, 2);
3604        assert_eq!(stats.total_ingested, 2);
3605    }
3606
3607    #[test]
3608    fn test_event_store_config_default() {
3609        let config = EventStoreConfig::default();
3610        assert!(config.storage_dir.is_none());
3611        assert!(config.wal_dir.is_none());
3612    }
3613
3614    #[test]
3615    fn test_event_store_config_with_persistence() {
3616        let temp_dir = TempDir::new().unwrap();
3617        let config = EventStoreConfig::with_persistence(temp_dir.path());
3618
3619        assert!(config.storage_dir.is_some());
3620        assert!(config.wal_dir.is_none());
3621    }
3622
3623    #[test]
3624    fn test_event_store_config_with_wal() {
3625        let temp_dir = TempDir::new().unwrap();
3626        let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
3627
3628        assert!(config.storage_dir.is_none());
3629        assert!(config.wal_dir.is_some());
3630    }
3631
3632    #[test]
3633    fn test_event_store_config_with_all() {
3634        let temp_dir = TempDir::new().unwrap();
3635        let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
3636
3637        assert!(config.storage_dir.is_some());
3638    }
3639
3640    #[test]
3641    fn test_event_store_config_production() {
3642        let storage_dir = TempDir::new().unwrap();
3643        let wal_dir = TempDir::new().unwrap();
3644        let config = EventStoreConfig::production(
3645            storage_dir.path(),
3646            wal_dir.path(),
3647            SnapshotConfig::default(),
3648            WALConfig::default(),
3649            CompactionConfig::default(),
3650        );
3651
3652        assert!(config.storage_dir.is_some());
3653        assert!(config.wal_dir.is_some());
3654    }
3655
3656    // -----------------------------------------------------------------------
3657    // from_env_vars tests — verifies the env-var-to-config wiring that
3658    // caused the durability bug (events lost on restart) in v0.10.3.
3659    // -----------------------------------------------------------------------
3660
3661    #[test]
3662    fn test_from_env_vars_data_dir_enables_full_persistence() {
3663        let (config, mode) = EventStoreConfig::from_env_vars(
3664            Some("/app/data".to_string()),
3665            None,
3666            None,
3667            None,
3668            None,
3669            None,
3670            None,
3671            None,
3672        );
3673        assert_eq!(mode, "wal+parquet");
3674        assert_eq!(
3675            config.storage_dir.unwrap().to_str().unwrap(),
3676            "/app/data/storage"
3677        );
3678        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
3679    }
3680
3681    #[test]
3682    fn test_from_env_vars_explicit_dirs() {
3683        let (config, mode) = EventStoreConfig::from_env_vars(
3684            None,
3685            Some("/custom/storage".to_string()),
3686            Some("/custom/wal".to_string()),
3687            None,
3688            None,
3689            None,
3690            None,
3691            None,
3692        );
3693        assert_eq!(mode, "wal+parquet");
3694        assert_eq!(
3695            config.storage_dir.unwrap().to_str().unwrap(),
3696            "/custom/storage"
3697        );
3698        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
3699    }
3700
3701    #[test]
3702    fn test_from_env_vars_wal_disabled() {
3703        let (config, mode) = EventStoreConfig::from_env_vars(
3704            Some("/app/data".to_string()),
3705            None,
3706            None,
3707            Some("false".to_string()),
3708            None,
3709            None,
3710            None,
3711            None,
3712        );
3713        assert_eq!(mode, "parquet-only");
3714        assert!(config.storage_dir.is_some());
3715        assert!(config.wal_dir.is_none());
3716    }
3717
3718    #[test]
3719    fn test_from_env_vars_no_dirs_is_in_memory() {
3720        let (config, mode) =
3721            EventStoreConfig::from_env_vars(None, None, None, None, None, None, None, None);
3722        assert_eq!(mode, "in-memory");
3723        assert!(config.storage_dir.is_none());
3724        assert!(config.wal_dir.is_none());
3725    }
3726
3727    #[test]
3728    fn test_from_env_vars_empty_strings_treated_as_none() {
3729        let (_, mode) = EventStoreConfig::from_env_vars(
3730            Some(String::new()),
3731            Some(String::new()),
3732            Some(String::new()),
3733            None,
3734            None,
3735            None,
3736            None,
3737            None,
3738        );
3739        assert_eq!(mode, "in-memory");
3740    }
3741
3742    #[test]
3743    fn test_from_env_vars_explicit_overrides_data_dir() {
3744        let (config, mode) = EventStoreConfig::from_env_vars(
3745            Some("/app/data".to_string()),
3746            Some("/override/storage".to_string()),
3747            Some("/override/wal".to_string()),
3748            None,
3749            None,
3750            None,
3751            None,
3752            None,
3753        );
3754        assert_eq!(mode, "wal+parquet");
3755        assert_eq!(
3756            config.storage_dir.unwrap().to_str().unwrap(),
3757            "/override/storage"
3758        );
3759        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
3760    }
3761
3762    #[test]
3763    fn test_from_env_vars_wal_only() {
3764        let (config, mode) = EventStoreConfig::from_env_vars(
3765            None,
3766            None,
3767            Some("/wal/only".to_string()),
3768            None,
3769            None,
3770            None,
3771            None,
3772            None,
3773        );
3774        assert_eq!(mode, "wal-only");
3775        assert!(config.storage_dir.is_none());
3776        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
3777    }
3778
3779    #[test]
3780    fn test_from_env_vars_cache_bytes_parses_decimal() {
3781        let (config, _) = EventStoreConfig::from_env_vars(
3782            Some("/app/data".to_string()),
3783            None,
3784            None,
3785            None,
3786            Some("536870912".to_string()),
3787            // 512 MiB
3788            None,
3789            None,
3790            None,
3791        );
3792        assert_eq!(config.cache_byte_budget, Some(536_870_912));
3793    }
3794
3795    #[test]
3796    fn test_from_env_vars_cache_bytes_unparseable_disables_budget() {
3797        // Garbage in CACHE_BYTES doesn't fail boot — we log and
3798        // fall back to no-budget. The unbounded fallback is safe
3799        // (just the pre-Step-3 behavior).
3800        let (config, _) = EventStoreConfig::from_env_vars(
3801            Some("/app/data".to_string()),
3802            None,
3803            None,
3804            None,
3805            Some("not-a-number".to_string()),
3806            None,
3807            None,
3808            None,
3809        );
3810        assert_eq!(config.cache_byte_budget, None);
3811    }
3812
3813    #[test]
3814    fn test_from_env_vars_cache_bytes_empty_disables_budget() {
3815        let (config, _) = EventStoreConfig::from_env_vars(
3816            Some("/app/data".to_string()),
3817            None,
3818            None,
3819            None,
3820            Some(String::new()),
3821            None,
3822            None,
3823            None,
3824        );
3825        assert_eq!(config.cache_byte_budget, None);
3826    }
3827
3828    #[test]
3829    fn test_from_env_vars_snapshot_interval_overrides_default() {
3830        // ALLSOURCE_SNAPSHOT_INTERVAL_SECONDS plumbs through to
3831        // CompactionConfig.compaction_interval_seconds. Default is
3832        // 3600s (hourly) per the bead.
3833        let (config, _) = EventStoreConfig::from_env_vars(
3834            Some("/app/data".to_string()),
3835            None,
3836            None,
3837            None,
3838            None,
3839            Some("60".to_string()),
3840            None,
3841            None,
3842        );
3843        assert_eq!(config.compaction_config.compaction_interval_seconds, 60);
3844    }
3845
3846    #[test]
3847    fn test_from_env_vars_snapshot_interval_default_is_hourly() {
3848        let (config, _) = EventStoreConfig::from_env_vars(
3849            Some("/app/data".to_string()),
3850            None,
3851            None,
3852            None,
3853            None,
3854            None,
3855            None,
3856            None,
3857        );
3858        assert_eq!(config.compaction_config.compaction_interval_seconds, 3600);
3859    }
3860
3861    #[test]
3862    fn test_from_env_vars_snapshot_interval_unparseable_falls_back() {
3863        let (config, _) = EventStoreConfig::from_env_vars(
3864            Some("/app/data".to_string()),
3865            None,
3866            None,
3867            None,
3868            None,
3869            Some("not-a-number".to_string()),
3870            None,
3871            None,
3872        );
3873        assert_eq!(config.compaction_config.compaction_interval_seconds, 3600);
3874    }
3875
3876    #[test]
3877    fn test_from_env_vars_retention_system_days_overrides_default() {
3878        // Step 5: ALLSOURCE_RETENTION_SYSTEM_DAYS overrides the
3879        // default 30-day TTL for the system tenant.
3880        let (config, _) = EventStoreConfig::from_env_vars(
3881            Some("/app/data".to_string()),
3882            None,
3883            None,
3884            None,
3885            None,
3886            None,
3887            Some("7".to_string()),
3888            None,
3889        );
3890        let ttl = config
3891            .compaction_config
3892            .retention
3893            .ttl_for("system")
3894            .unwrap();
3895        assert_eq!(ttl.as_secs(), 7 * 24 * 3600);
3896    }
3897
3898    #[test]
3899    fn test_from_env_vars_retention_default_is_30_days_for_system() {
3900        let (config, _) = EventStoreConfig::from_env_vars(
3901            Some("/app/data".to_string()),
3902            None,
3903            None,
3904            None,
3905            None,
3906            None,
3907            None,
3908            None,
3909        );
3910        let ttl = config
3911            .compaction_config
3912            .retention
3913            .ttl_for("system")
3914            .unwrap();
3915        assert_eq!(ttl.as_secs(), 30 * 24 * 3600);
3916        // Other tenants keep forever by default.
3917        assert!(config.compaction_config.retention.ttl_for("acme").is_none());
3918    }
3919
3920    #[test]
3921    fn test_store_stats_serde() {
3922        let stats = StoreStats {
3923            total_events: 100,
3924            total_entities: 50,
3925            total_event_types: 10,
3926            total_ingested: 100,
3927        };
3928
3929        let json = serde_json::to_string(&stats).unwrap();
3930        assert!(json.contains("\"total_events\":100"));
3931        assert!(json.contains("\"total_entities\":50"));
3932    }
3933
3934    #[test]
3935    fn test_query_with_entity_and_type() {
3936        let store = EventStore::new();
3937
3938        store
3939            .ingest(&create_test_event("entity-1", "user.created"))
3940            .unwrap();
3941        store
3942            .ingest(&create_test_event("entity-1", "user.updated"))
3943            .unwrap();
3944        store
3945            .ingest(&create_test_event("entity-2", "user.created"))
3946            .unwrap();
3947
3948        let results = store
3949            .query(&QueryEventsRequest {
3950                entity_id: Some("entity-1".to_string()),
3951                event_type: Some("user.created".to_string()),
3952                tenant_id: None,
3953                as_of: None,
3954                since: None,
3955                until: None,
3956                limit: None,
3957                event_type_prefix: None,
3958                payload_filter: None,
3959            })
3960            .unwrap();
3961
3962        assert_eq!(results.len(), 1);
3963        assert_eq!(results[0].event_type_str(), "user.created");
3964    }
3965
3966    #[test]
3967    fn test_query_by_event_type_prefix() {
3968        let store = EventStore::new();
3969
3970        // Ingest events with various types
3971        store
3972            .ingest(&create_test_event("entity-1", "index.created"))
3973            .unwrap();
3974        store
3975            .ingest(&create_test_event("entity-2", "index.updated"))
3976            .unwrap();
3977        store
3978            .ingest(&create_test_event("entity-3", "trade.created"))
3979            .unwrap();
3980        store
3981            .ingest(&create_test_event("entity-4", "trade.completed"))
3982            .unwrap();
3983        store
3984            .ingest(&create_test_event("entity-5", "balance.updated"))
3985            .unwrap();
3986
3987        // Query with prefix "index." should return exactly 2
3988        let results = store
3989            .query(&QueryEventsRequest {
3990                entity_id: None,
3991                event_type: None,
3992                tenant_id: None,
3993                as_of: None,
3994                since: None,
3995                until: None,
3996                limit: None,
3997                event_type_prefix: Some("index.".to_string()),
3998                payload_filter: None,
3999            })
4000            .unwrap();
4001
4002        assert_eq!(results.len(), 2);
4003        assert!(
4004            results
4005                .iter()
4006                .all(|e| e.event_type_str().starts_with("index."))
4007        );
4008    }
4009
4010    #[test]
4011    fn test_query_by_event_type_prefix_empty_returns_all() {
4012        let store = EventStore::new();
4013
4014        store
4015            .ingest(&create_test_event("entity-1", "index.created"))
4016            .unwrap();
4017        store
4018            .ingest(&create_test_event("entity-2", "trade.created"))
4019            .unwrap();
4020
4021        // Empty prefix matches all types
4022        let results = store
4023            .query(&QueryEventsRequest {
4024                entity_id: None,
4025                event_type: None,
4026                tenant_id: None,
4027                as_of: None,
4028                since: None,
4029                until: None,
4030                limit: None,
4031                event_type_prefix: Some(String::new()),
4032                payload_filter: None,
4033            })
4034            .unwrap();
4035
4036        assert_eq!(results.len(), 2);
4037    }
4038
4039    #[test]
4040    fn test_query_by_event_type_prefix_no_match() {
4041        let store = EventStore::new();
4042
4043        store
4044            .ingest(&create_test_event("entity-1", "index.created"))
4045            .unwrap();
4046
4047        let results = store
4048            .query(&QueryEventsRequest {
4049                entity_id: None,
4050                event_type: None,
4051                tenant_id: None,
4052                as_of: None,
4053                since: None,
4054                until: None,
4055                limit: None,
4056                event_type_prefix: Some("nonexistent.".to_string()),
4057                payload_filter: None,
4058            })
4059            .unwrap();
4060
4061        assert!(results.is_empty());
4062    }
4063
4064    #[test]
4065    fn test_query_by_entity_with_type_prefix() {
4066        let store = EventStore::new();
4067
4068        store
4069            .ingest(&create_test_event("entity-1", "index.created"))
4070            .unwrap();
4071        store
4072            .ingest(&create_test_event("entity-1", "trade.created"))
4073            .unwrap();
4074        store
4075            .ingest(&create_test_event("entity-2", "index.updated"))
4076            .unwrap();
4077
4078        // Query entity-1 with prefix "index." should return 1
4079        let results = store
4080            .query(&QueryEventsRequest {
4081                entity_id: Some("entity-1".to_string()),
4082                event_type: None,
4083                tenant_id: None,
4084                as_of: None,
4085                since: None,
4086                until: None,
4087                limit: None,
4088                event_type_prefix: Some("index.".to_string()),
4089                payload_filter: None,
4090            })
4091            .unwrap();
4092
4093        assert_eq!(results.len(), 1);
4094        assert_eq!(results[0].event_type_str(), "index.created");
4095    }
4096
4097    #[test]
4098    fn test_query_prefix_with_limit() {
4099        let store = EventStore::new();
4100
4101        for i in 0..5 {
4102            store
4103                .ingest(&create_test_event(&format!("entity-{i}"), "index.created"))
4104                .unwrap();
4105        }
4106
4107        let results = store
4108            .query(&QueryEventsRequest {
4109                entity_id: None,
4110                event_type: None,
4111                tenant_id: None,
4112                as_of: None,
4113                since: None,
4114                until: None,
4115                limit: Some(3),
4116                event_type_prefix: Some("index.".to_string()),
4117                payload_filter: None,
4118            })
4119            .unwrap();
4120
4121        assert_eq!(results.len(), 3);
4122    }
4123
4124    #[test]
4125    fn test_query_prefix_alongside_existing_filters() {
4126        let store = EventStore::new();
4127
4128        store
4129            .ingest(&create_test_event("entity-1", "index.created"))
4130            .unwrap();
4131        // Sleep briefly to ensure different timestamps
4132        std::thread::sleep(std::time::Duration::from_millis(10));
4133        store
4134            .ingest(&create_test_event("entity-2", "index.strategy.updated"))
4135            .unwrap();
4136        std::thread::sleep(std::time::Duration::from_millis(10));
4137        store
4138            .ingest(&create_test_event("entity-3", "index.deleted"))
4139            .unwrap();
4140
4141        // Prefix with limit
4142        let results = store
4143            .query(&QueryEventsRequest {
4144                entity_id: None,
4145                event_type: None,
4146                tenant_id: None,
4147                as_of: None,
4148                since: None,
4149                until: None,
4150                limit: Some(2),
4151                event_type_prefix: Some("index.".to_string()),
4152                payload_filter: None,
4153            })
4154            .unwrap();
4155
4156        assert_eq!(results.len(), 2);
4157    }
4158
4159    #[test]
4160    fn test_query_with_payload_filter() {
4161        let store = EventStore::new();
4162
4163        // Ingest 5 events with user_id=alice
4164        for i in 0..5 {
4165            store
4166                .ingest(&create_test_event_with_payload(
4167                    &format!("entity-{i}"),
4168                    "user.action",
4169                    serde_json::json!({"user_id": "alice", "action": "click"}),
4170                ))
4171                .unwrap();
4172        }
4173        // Ingest 5 events with user_id=bob
4174        for i in 5..10 {
4175            store
4176                .ingest(&create_test_event_with_payload(
4177                    &format!("entity-{i}"),
4178                    "user.action",
4179                    serde_json::json!({"user_id": "bob", "action": "view"}),
4180                ))
4181                .unwrap();
4182        }
4183
4184        // Filter for alice
4185        let results = store
4186            .query(&QueryEventsRequest {
4187                entity_id: None,
4188                event_type: Some("user.action".to_string()),
4189                tenant_id: None,
4190                as_of: None,
4191                since: None,
4192                until: None,
4193                limit: None,
4194                event_type_prefix: None,
4195                payload_filter: Some(r#"{"user_id":"alice"}"#.to_string()),
4196            })
4197            .unwrap();
4198
4199        assert_eq!(results.len(), 5);
4200    }
4201
4202    #[test]
4203    fn test_query_payload_filter_non_existent_field() {
4204        let store = EventStore::new();
4205
4206        store
4207            .ingest(&create_test_event_with_payload(
4208                "entity-1",
4209                "user.action",
4210                serde_json::json!({"user_id": "alice"}),
4211            ))
4212            .unwrap();
4213
4214        // Filter for a field that doesn't exist — returns 0, not error
4215        let results = store
4216            .query(&QueryEventsRequest {
4217                entity_id: None,
4218                event_type: None,
4219                tenant_id: None,
4220                as_of: None,
4221                since: None,
4222                until: None,
4223                limit: None,
4224                event_type_prefix: None,
4225                payload_filter: Some(r#"{"nonexistent":"value"}"#.to_string()),
4226            })
4227            .unwrap();
4228
4229        assert!(results.is_empty());
4230    }
4231
4232    #[test]
4233    fn test_query_payload_filter_with_prefix() {
4234        let store = EventStore::new();
4235
4236        store
4237            .ingest(&create_test_event_with_payload(
4238                "entity-1",
4239                "index.created",
4240                serde_json::json!({"status": "active"}),
4241            ))
4242            .unwrap();
4243        store
4244            .ingest(&create_test_event_with_payload(
4245                "entity-2",
4246                "index.created",
4247                serde_json::json!({"status": "inactive"}),
4248            ))
4249            .unwrap();
4250        store
4251            .ingest(&create_test_event_with_payload(
4252                "entity-3",
4253                "trade.created",
4254                serde_json::json!({"status": "active"}),
4255            ))
4256            .unwrap();
4257
4258        // Combine prefix + payload filter
4259        let results = store
4260            .query(&QueryEventsRequest {
4261                entity_id: None,
4262                event_type: None,
4263                tenant_id: None,
4264                as_of: None,
4265                since: None,
4266                until: None,
4267                limit: None,
4268                event_type_prefix: Some("index.".to_string()),
4269                payload_filter: Some(r#"{"status":"active"}"#.to_string()),
4270            })
4271            .unwrap();
4272
4273        assert_eq!(results.len(), 1);
4274        assert_eq!(results[0].entity_id().to_string(), "entity-1");
4275    }
4276
4277    #[test]
4278    fn test_flush_storage_no_storage() {
4279        let store = EventStore::new();
4280        // Without storage, flush should succeed (no-op)
4281        let result = store.flush_storage();
4282        assert!(result.is_ok());
4283    }
4284
4285    #[test]
4286    fn test_state_evolution() {
4287        let store = EventStore::new();
4288
4289        // Initial state
4290        store
4291            .ingest(
4292                &Event::from_strings(
4293                    "user.created".to_string(),
4294                    "user-1".to_string(),
4295                    "default".to_string(),
4296                    serde_json::json!({"name": "Alice", "age": 25}),
4297                    None,
4298                )
4299                .unwrap(),
4300            )
4301            .unwrap();
4302
4303        // Update state
4304        store
4305            .ingest(
4306                &Event::from_strings(
4307                    "user.updated".to_string(),
4308                    "user-1".to_string(),
4309                    "default".to_string(),
4310                    serde_json::json!({"age": 26}),
4311                    None,
4312                )
4313                .unwrap(),
4314            )
4315            .unwrap();
4316
4317        let state = store.reconstruct_state("user-1", None).unwrap();
4318        // The state is wrapped with metadata
4319        assert_eq!(state["current_state"]["name"], "Alice");
4320        assert_eq!(state["current_state"]["age"], 26);
4321    }
4322
4323    #[test]
4324    fn test_reject_system_event_types() {
4325        let store = EventStore::new();
4326
4327        // System event types should be rejected via user-facing ingestion
4328        let event = Event::reconstruct_from_strings(
4329            uuid::Uuid::new_v4(),
4330            "_system.tenant.created".to_string(),
4331            "_system:tenant:acme".to_string(),
4332            "_system".to_string(),
4333            serde_json::json!({"name": "ACME"}),
4334            chrono::Utc::now(),
4335            None,
4336            1,
4337        );
4338
4339        let result = store.ingest(&event);
4340        assert!(result.is_err());
4341        let err = result.unwrap_err();
4342        assert!(
4343            err.to_string().contains("reserved for internal use"),
4344            "Expected system namespace rejection, got: {err}"
4345        );
4346    }
4347
4348    // -----------------------------------------------------------------------
4349    // Crash recovery: WAL events survive restart via Parquet checkpoint.
4350    // Regression test for GitHub issue #84 — flush_storage() was a no-op
4351    // during recovery because events were never buffered into Parquet's
4352    // current_batch before flushing.
4353    // -----------------------------------------------------------------------
4354
4355    #[test]
4356    fn test_wal_recovery_checkpoints_to_parquet() {
4357        let data_dir = TempDir::new().unwrap();
4358        let storage_dir = data_dir.path().join("storage");
4359        let wal_dir = data_dir.path().join("wal");
4360
4361        // Session 1: ingest events with WAL + Parquet
4362        {
4363            let config = EventStoreConfig::production(
4364                &storage_dir,
4365                &wal_dir,
4366                SnapshotConfig::default(),
4367                WALConfig {
4368                    sync_on_write: true,
4369                    ..WALConfig::default()
4370                },
4371                CompactionConfig::default(),
4372            );
4373            let store = EventStore::with_config(config);
4374
4375            for i in 0..5 {
4376                let event = Event::from_strings(
4377                    "test.created".to_string(),
4378                    format!("entity-{i}"),
4379                    "default".to_string(),
4380                    serde_json::json!({"index": i}),
4381                    None,
4382                )
4383                .unwrap();
4384                store.ingest(&event).unwrap();
4385            }
4386
4387            assert_eq!(store.stats().total_events, 5);
4388
4389            // Do NOT call flush_storage or shutdown — simulate a crash.
4390            // Events are in WAL (sync_on_write: true) but NOT in Parquet.
4391        }
4392
4393        // Verify WAL file has data
4394        let wal_files: Vec<_> = std::fs::read_dir(&wal_dir)
4395            .unwrap()
4396            .filter_map(std::result::Result::ok)
4397            .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
4398            .collect();
4399        assert!(!wal_files.is_empty(), "WAL file should exist");
4400        let wal_size = wal_files[0].metadata().unwrap().len();
4401        assert!(wal_size > 0, "WAL file should have data (got 0 bytes)");
4402
4403        // Session 2: reopen — recovery should checkpoint WAL to Parquet, then truncate
4404        {
4405            let config = EventStoreConfig::production(
4406                &storage_dir,
4407                &wal_dir,
4408                SnapshotConfig::default(),
4409                WALConfig {
4410                    sync_on_write: true,
4411                    ..WALConfig::default()
4412                },
4413                CompactionConfig::default(),
4414            );
4415            let store = EventStore::with_config(config);
4416
4417            // Events should be recovered
4418            assert_eq!(
4419                store.stats().total_events,
4420                5,
4421                "Session 2 should have all 5 events after WAL recovery"
4422            );
4423
4424            // Parquet should now have files (checkpoint happened).
4425            // After Step 1, files live under <root>/<tenant>/<yyyy-mm>/,
4426            // so walk recursively.
4427            let parquet_files = find_parquet_files(&storage_dir);
4428            assert!(
4429                !parquet_files.is_empty(),
4430                "Parquet file should exist after WAL checkpoint"
4431            );
4432        }
4433
4434        // Session 3: reopen again — events should be reachable via
4435        // lazy-load (Step 2: boot does not pre-load Parquet).
4436        {
4437            let config = EventStoreConfig::production(
4438                &storage_dir,
4439                &wal_dir,
4440                SnapshotConfig::default(),
4441                WALConfig {
4442                    sync_on_write: true,
4443                    ..WALConfig::default()
4444                },
4445                CompactionConfig::default(),
4446            );
4447            let store = EventStore::with_config(config);
4448
4449            // Boot is now O(1) — Parquet stays cold until first
4450            // per-tenant query. WAL was truncated in session 2,
4451            // so nothing is pre-loaded.
4452            assert_eq!(
4453                store.stats().total_events,
4454                0,
4455                "Session 3 boot should not pre-load Parquet (lazy-load mode)"
4456            );
4457
4458            // Trigger lazy load for the test tenant (events were
4459            // ingested with tenant_id=\"default\").
4460            store.ensure_tenant_loaded("default").unwrap();
4461            assert_eq!(
4462                store.stats().total_events,
4463                5,
4464                "Session 3 should have all 5 events after ensure_tenant_loaded"
4465            );
4466        }
4467    }
4468
4469    #[test]
4470    fn test_parquet_restore_surfaces_errors_not_silent() {
4471        // Write events with WAL+Parquet, flush to Parquet, then corrupt the
4472        // Parquet file. On reload, the error must be logged (not silently
4473        // swallowed as 0 events).
4474        let data_dir = TempDir::new().unwrap();
4475        let storage_dir = data_dir.path().join("storage");
4476        let wal_dir = data_dir.path().join("wal");
4477
4478        // Session 1: write events and flush to Parquet
4479        {
4480            let config = EventStoreConfig::production(
4481                &storage_dir,
4482                &wal_dir,
4483                SnapshotConfig::default(),
4484                WALConfig {
4485                    sync_on_write: true,
4486                    ..WALConfig::default()
4487                },
4488                CompactionConfig::default(),
4489            );
4490            let store = EventStore::with_config(config);
4491
4492            for i in 0..3 {
4493                let event = Event::from_strings(
4494                    "test.created".to_string(),
4495                    format!("entity-{i}"),
4496                    "default".to_string(),
4497                    serde_json::json!({"i": i}),
4498                    None,
4499                )
4500                .unwrap();
4501                store.ingest(&event).unwrap();
4502            }
4503
4504            store.flush_storage().unwrap();
4505            assert_eq!(store.stats().total_events, 3);
4506        }
4507
4508        // Verify parquet file exists. After Step 1 the file lives
4509        // under <root>/<tenant>/<yyyy-mm>/, so walk recursively.
4510        let parquet_files = find_parquet_files(&storage_dir);
4511        assert!(!parquet_files.is_empty(), "Parquet file must exist");
4512
4513        // Corrupt the parquet file
4514        std::fs::write(&parquet_files[0], b"corrupted data").unwrap();
4515
4516        // Truncate WAL so only Parquet matters
4517        for entry in std::fs::read_dir(&wal_dir).unwrap().flatten() {
4518            std::fs::write(entry.path(), b"").unwrap();
4519        }
4520
4521        // Session 2: reload — should NOT silently report 0 events.
4522        // The error is logged via tracing::error! which we can't capture in a
4523        // unit test, but we CAN verify the store has 0 events (previously this
4524        // looked identical to "no data on disk" — now there's an error log).
4525        // The key behavioral change is that with_config no longer uses a
4526        // let-chain that silently drops the Err variant.
4527        {
4528            let config = EventStoreConfig::production(
4529                &storage_dir,
4530                &wal_dir,
4531                SnapshotConfig::default(),
4532                WALConfig::default(),
4533                CompactionConfig::default(),
4534            );
4535            let store = EventStore::with_config(config);
4536
4537            // Store has 0 events because Parquet is corrupted — but the error
4538            // is now logged (not silently swallowed).
4539            assert_eq!(store.stats().total_events, 0);
4540        }
4541    }
4542
4543    // -----------------------------------------------------------------------
4544    // Step 6: Bounded WAL replay. Each successful checkpoint truncates the
4545    // WAL so cold-start replay is O(one checkpoint interval) regardless of
4546    // total dataset size.
4547    // -----------------------------------------------------------------------
4548
4549    /// Count entries in every WAL file under `wal_dir` (any line that
4550    /// parses as a valid JSON object — the line format is one
4551    /// JSON-serialized WALEntry per line, see WALFile::write_entry).
4552    fn count_wal_entries(wal_dir: &std::path::Path) -> usize {
4553        use std::io::{BufRead, BufReader};
4554        let mut total = 0usize;
4555        let Ok(entries) = std::fs::read_dir(wal_dir) else {
4556            return 0;
4557        };
4558        for entry in entries.flatten() {
4559            let path = entry.path();
4560            if path.extension().is_none_or(|e| e != "log") {
4561                continue;
4562            }
4563            let Ok(file) = std::fs::File::open(&path) else {
4564                continue;
4565            };
4566            for line in BufReader::new(file)
4567                .lines()
4568                .map_while(std::result::Result::ok)
4569            {
4570                if !line.trim().is_empty() {
4571                    total += 1;
4572                }
4573            }
4574        }
4575        total
4576    }
4577
4578    #[test]
4579    fn test_checkpoint_truncates_wal_after_flush() {
4580        // After a successful checkpoint, every previously-ingested event
4581        // should be in Parquet, and the WAL should be empty (truncated).
4582        // This is the load-bearing invariant for Step 6's bounded-replay
4583        // promise — without truncation, the WAL grows unboundedly.
4584        let data_dir = TempDir::new().unwrap();
4585        let storage_dir = data_dir.path().join("storage");
4586        let wal_dir = data_dir.path().join("wal");
4587
4588        let config = EventStoreConfig::production(
4589            &storage_dir,
4590            &wal_dir,
4591            SnapshotConfig::default(),
4592            WALConfig {
4593                sync_on_write: true,
4594                ..WALConfig::default()
4595            },
4596            CompactionConfig::default(),
4597        );
4598        let store = EventStore::with_config(config);
4599
4600        for i in 0..10 {
4601            let event = Event::from_strings(
4602                "test.created".to_string(),
4603                format!("entity-{i}"),
4604                "default".to_string(),
4605                serde_json::json!({"i": i}),
4606                None,
4607            )
4608            .unwrap();
4609            store.ingest(&event).unwrap();
4610        }
4611
4612        // Sanity: all 10 events are in the WAL pre-checkpoint.
4613        assert_eq!(
4614            count_wal_entries(&wal_dir),
4615            10,
4616            "WAL should have 10 events before checkpoint"
4617        );
4618
4619        store.checkpoint().unwrap();
4620
4621        assert_eq!(
4622            count_wal_entries(&wal_dir),
4623            0,
4624            "WAL should be empty after successful checkpoint"
4625        );
4626        let parquet_files = find_parquet_files(&storage_dir);
4627        assert!(!parquet_files.is_empty(), "Parquet should hold the events");
4628    }
4629
4630    #[test]
4631    fn test_replay_only_post_checkpoint_events_after_crash() {
4632        // Headline AC for the bead: write N events, checkpoint, write K
4633        // more, simulate a crash, restart, and verify only K events go
4634        // through replay (not N+K).
4635        //
4636        // Uses small N (50) and K (5) for test speed — the property
4637        // is the same as the spec's 1M+10k example, just scaled down.
4638        let data_dir = TempDir::new().unwrap();
4639        let storage_dir = data_dir.path().join("storage");
4640        let wal_dir = data_dir.path().join("wal");
4641
4642        let config_factory = || {
4643            EventStoreConfig::production(
4644                &storage_dir,
4645                &wal_dir,
4646                SnapshotConfig::default(),
4647                WALConfig {
4648                    sync_on_write: true,
4649                    ..WALConfig::default()
4650                },
4651                CompactionConfig::default(),
4652            )
4653        };
4654
4655        // Session 1: ingest N, checkpoint, ingest K, then drop without
4656        // a graceful shutdown — that's the crash.
4657        const N: usize = 50;
4658        const K: usize = 5;
4659        {
4660            let store = EventStore::with_config(config_factory());
4661            for i in 0..N {
4662                store
4663                    .ingest(
4664                        &Event::from_strings(
4665                            "pre.checkpoint".to_string(),
4666                            format!("e-{i}"),
4667                            "default".to_string(),
4668                            serde_json::json!({"i": i}),
4669                            None,
4670                        )
4671                        .unwrap(),
4672                    )
4673                    .unwrap();
4674            }
4675            store.checkpoint().unwrap();
4676            assert_eq!(
4677                count_wal_entries(&wal_dir),
4678                0,
4679                "WAL should be empty immediately after checkpoint"
4680            );
4681
4682            for i in 0..K {
4683                store
4684                    .ingest(
4685                        &Event::from_strings(
4686                            "post.checkpoint".to_string(),
4687                            format!("p-{i}"),
4688                            "default".to_string(),
4689                            serde_json::json!({"i": i}),
4690                            None,
4691                        )
4692                        .unwrap(),
4693                    )
4694                    .unwrap();
4695            }
4696            assert_eq!(
4697                count_wal_entries(&wal_dir),
4698                K,
4699                "WAL should hold only post-checkpoint events"
4700            );
4701            // Drop without flushing — simulates a crash mid-write.
4702        }
4703
4704        // Session 2: reopen. Recovery should replay only the K post-
4705        // checkpoint events from the WAL — the N pre-checkpoint events
4706        // are durable in Parquet and lazy-loaded on demand.
4707        {
4708            let store = EventStore::with_config(config_factory());
4709            // total_events reflects only WAL-recovered events at boot
4710            // (Step 2 — Parquet stays cold until first per-tenant
4711            // query). So the WAL replay size IS exactly K.
4712            assert_eq!(
4713                store.stats().total_events,
4714                K,
4715                "Boot should replay exactly K events from WAL (the post-checkpoint window), not N+K"
4716            );
4717
4718            // Lazy-load brings the rest in.
4719            store.ensure_tenant_loaded("default").unwrap();
4720            assert_eq!(
4721                store.stats().total_events,
4722                N + K,
4723                "After lazy-load, both pre- and post-checkpoint events should be reachable"
4724            );
4725        }
4726    }
4727
4728    #[test]
4729    fn test_checkpoint_is_idempotent() {
4730        // Calling checkpoint() twice in a row is safe: the second call
4731        // finds an empty WAL and an empty Parquet batch, and no-ops.
4732        let data_dir = TempDir::new().unwrap();
4733        let storage_dir = data_dir.path().join("storage");
4734        let wal_dir = data_dir.path().join("wal");
4735
4736        let store = EventStore::with_config(EventStoreConfig::production(
4737            &storage_dir,
4738            &wal_dir,
4739            SnapshotConfig::default(),
4740            WALConfig::default(),
4741            CompactionConfig::default(),
4742        ));
4743
4744        for i in 0..5 {
4745            store
4746                .ingest(
4747                    &Event::from_strings(
4748                        "x".to_string(),
4749                        format!("e-{i}"),
4750                        "default".to_string(),
4751                        serde_json::json!({}),
4752                        None,
4753                    )
4754                    .unwrap(),
4755                )
4756                .unwrap();
4757        }
4758
4759        store.checkpoint().unwrap();
4760        // Second call is a no-op and must not error.
4761        store.checkpoint().unwrap();
4762        assert_eq!(count_wal_entries(&wal_dir), 0);
4763    }
4764
4765    #[test]
4766    fn test_checkpoint_noop_in_memory_only_mode() {
4767        // Without WAL configured, checkpoint() is a no-op.
4768        let store = EventStore::new();
4769        store.checkpoint().unwrap();
4770    }
4771
4772    #[test]
4773    fn test_checkpoint_interval_from_env_defaults_to_60s_when_wal_enabled() {
4774        let (config, _) = EventStoreConfig::from_env_vars(
4775            Some("/app/data".to_string()),
4776            None,
4777            None,
4778            None,
4779            None,
4780            None,
4781            None,
4782            None,
4783        );
4784        assert_eq!(config.checkpoint_interval_secs, Some(60));
4785    }
4786
4787    #[test]
4788    fn test_checkpoint_interval_from_env_overrides_default() {
4789        let (config, _) = EventStoreConfig::from_env_vars(
4790            Some("/app/data".to_string()),
4791            None,
4792            None,
4793            None,
4794            None,
4795            None,
4796            None,
4797            Some("15".to_string()),
4798        );
4799        assert_eq!(config.checkpoint_interval_secs, Some(15));
4800    }
4801
4802    #[test]
4803    fn test_checkpoint_interval_disabled_when_wal_disabled() {
4804        // No WAL → no checkpoint loop, regardless of env var value.
4805        let (config, _) = EventStoreConfig::from_env_vars(
4806            Some("/app/data".to_string()),
4807            None,
4808            None,
4809            Some("false".to_string()),
4810            None,
4811            None,
4812            None,
4813            Some("15".to_string()),
4814        );
4815        assert_eq!(config.checkpoint_interval_secs, None);
4816    }
4817
4818    #[test]
4819    fn test_checkpoint_interval_unparseable_falls_back_to_default() {
4820        let (config, _) = EventStoreConfig::from_env_vars(
4821            Some("/app/data".to_string()),
4822            None,
4823            None,
4824            None,
4825            None,
4826            None,
4827            None,
4828            Some("not-a-number".to_string()),
4829        );
4830        assert_eq!(config.checkpoint_interval_secs, Some(60));
4831    }
4832}