Skip to main content

allsource_core/
store.rs

1#[cfg(feature = "server")]
2use crate::application::services::webhook::WebhookRegistry;
3#[cfg(feature = "server")]
4use crate::infrastructure::observability::metrics::MetricsRegistry;
5#[cfg(feature = "server")]
6use crate::infrastructure::web::websocket::WebSocketManager;
7use crate::{
8    application::{
9        dto::QueryEventsRequest,
10        services::{
11            consumer::ConsumerRegistry,
12            exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
13            pipeline::PipelineManager,
14            projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
15            replay::ReplayManager,
16            schema::{SchemaRegistry, SchemaRegistryConfig},
17            schema_evolution::SchemaEvolutionManager,
18        },
19    },
20    domain::entities::Event,
21    error::{AllSourceError, Result},
22    infrastructure::{
23        persistence::{
24            compaction::{CompactionConfig, CompactionManager},
25            index::{EventIndex, IndexEntry},
26            snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
27            storage::ParquetStorage,
28            tenant_loader::TenantLoader,
29            wal::{WALConfig, WriteAheadLog},
30        },
31        query::geospatial::GeoIndex,
32    },
33};
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use parking_lot::RwLock;
37use std::{path::PathBuf, sync::Arc};
38#[cfg(feature = "server")]
39use tokio::sync::mpsc;
40
41/// High-performance event store with columnar storage
42pub struct EventStore {
43    /// In-memory event storage
44    events: Arc<RwLock<Vec<Event>>>,
45
46    /// High-performance concurrent index
47    index: Arc<EventIndex>,
48
49    /// Projection manager for real-time aggregations
50    pub(crate) projections: Arc<RwLock<ProjectionManager>>,
51
52    /// Optional persistent storage (v0.2 feature)
53    storage: Option<Arc<RwLock<ParquetStorage>>>,
54
55    /// WebSocket manager for real-time event streaming (v0.2 feature)
56    #[cfg(feature = "server")]
57    websocket_manager: Arc<WebSocketManager>,
58
59    /// Snapshot manager for fast state recovery (v0.2 feature)
60    snapshot_manager: Arc<SnapshotManager>,
61
62    /// Write-Ahead Log for durability (v0.2 feature)
63    wal: Option<Arc<WriteAheadLog>>,
64
65    /// Compaction manager for Parquet optimization (v0.2 feature)
66    compaction_manager: Option<Arc<CompactionManager>>,
67
68    /// Schema registry for event validation (v0.5 feature)
69    schema_registry: Arc<SchemaRegistry>,
70
71    /// Replay manager for event replay and projection rebuilding (v0.5 feature)
72    replay_manager: Arc<ReplayManager>,
73
74    /// Pipeline manager for stream processing (v0.5 feature)
75    pipeline_manager: Arc<PipelineManager>,
76
77    /// Prometheus metrics registry (v0.6 feature)
78    #[cfg(feature = "server")]
79    metrics: Arc<MetricsRegistry>,
80
81    /// Total events ingested (for metrics)
82    total_ingested: Arc<RwLock<u64>>,
83
84    /// Projection state cache for Query Service integration (v0.7 feature)
85    /// Key format: "{projection_name}:{entity_id}"
86    /// This DashMap provides O(1) access with ~11.9 μs latency
87    projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
88
89    /// Projection status overrides (v0.13 feature)
90    /// Tracks pause/start state per projection name: "running" or "paused"
91    projection_status: Arc<DashMap<String, String>>,
92
93    /// Webhook registry for outbound event delivery (v0.11 feature)
94    #[cfg(feature = "server")]
95    webhook_registry: Arc<WebhookRegistry>,
96
97    /// Channel sender for async webhook delivery tasks
98    #[cfg(feature = "server")]
99    webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
100
101    /// Geospatial index for coordinate-based queries (v2.0 feature)
102    geo_index: Arc<GeoIndex>,
103
104    /// Exactly-once processing registry (v2.0 feature)
105    exactly_once: Arc<ExactlyOnceRegistry>,
106
107    /// Autonomous schema evolution manager (v2.0 feature)
108    schema_evolution: Arc<SchemaEvolutionManager>,
109
110    /// Per-entity version counters for optimistic concurrency control (v0.14 feature)
111    /// Key: entity_id string, Value: monotonic version (number of events for that entity)
112    entity_versions: Arc<DashMap<String, u64>>,
113
114    /// Durable consumer registry for subscription cursor tracking (v0.14 feature)
115    consumer_registry: Arc<ConsumerRegistry>,
116
117    /// In-process broadcast of every successfully-ingested event. Always enabled
118    /// so embedded consumers (TUI, web) can tail changes without the `server`
119    /// feature / HTTP stack. Lagging receivers see `RecvError::Lagged`.
120    event_broadcast_tx: tokio::sync::broadcast::Sender<Arc<Event>>,
121
122    /// Per-tenant lazy-load bookkeeping. Tracks which tenants have
123    /// been hydrated from Parquet into the in-memory pile and
124    /// serializes concurrent first-loads of the same tenant. See
125    /// `ensure_tenant_loaded` and Step 2 of the sustainable data
126    /// strategy.
127    tenant_loader: Arc<TenantLoader>,
128
129    /// Cadence of the runtime checkpoint loop (Step 6). `None` means
130    /// the loop is disabled — WAL grows until boot. Production reads
131    /// this from `ALLSOURCE_CHECKPOINT_INTERVAL_SECONDS`. Stored on
132    /// the store so background tasks can read it without re-parsing
133    /// the env.
134    checkpoint_interval_secs: Option<u64>,
135}
136
137/// A task queued for async webhook delivery
138#[cfg(feature = "server")]
139#[derive(Debug, Clone)]
140pub struct WebhookDeliveryTask {
141    pub webhook: crate::application::services::webhook::WebhookSubscription,
142    pub event: Event,
143}
144
145impl EventStore {
146    /// Create a new in-memory event store
147    pub fn new() -> Self {
148        Self::with_config(EventStoreConfig::default())
149    }
150
151    /// Create event store with custom configuration
152    pub fn with_config(config: EventStoreConfig) -> Self {
153        let mut projections = ProjectionManager::new();
154
155        // Register built-in projections
156        projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
157        projections.register(Arc::new(EventCounterProjection::new("event_counters")));
158
159        // Initialize persistent storage if configured
160        let storage = config
161            .storage_dir
162            .as_ref()
163            .and_then(|dir| match ParquetStorage::new(dir) {
164                Ok(storage) => {
165                    tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
166                    Some(Arc::new(RwLock::new(storage)))
167                }
168                Err(e) => {
169                    tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
170                    None
171                }
172            });
173
174        // Initialize WAL if configured (v0.2 feature)
175        let wal = config.wal_dir.as_ref().and_then(|dir| {
176            match WriteAheadLog::new(dir, config.wal_config.clone()) {
177                Ok(wal) => {
178                    tracing::info!("✅ WAL enabled at: {}", dir.display());
179                    Some(Arc::new(wal))
180                }
181                Err(e) => {
182                    tracing::error!("❌ Failed to initialize WAL: {}", e);
183                    None
184                }
185            }
186        });
187
188        // Initialize compaction manager if Parquet storage is enabled (v0.2 feature)
189        let compaction_manager = config.storage_dir.as_ref().map(|dir| {
190            let manager = CompactionManager::new(dir, config.compaction_config.clone());
191            Arc::new(manager)
192        });
193
194        // Initialize schema registry (v0.5 feature)
195        let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
196        tracing::info!("✅ Schema registry enabled");
197
198        // Initialize replay manager (v0.5 feature)
199        let replay_manager = Arc::new(ReplayManager::new());
200        tracing::info!("✅ Replay manager enabled");
201
202        // Initialize pipeline manager (v0.5 feature)
203        let pipeline_manager = Arc::new(PipelineManager::new());
204        tracing::info!("✅ Pipeline manager enabled");
205
206        // Initialize metrics registry (v0.6 feature)
207        #[cfg(feature = "server")]
208        let metrics = {
209            let m = MetricsRegistry::new();
210            tracing::info!("✅ Prometheus metrics registry initialized");
211            m
212        };
213
214        // Initialize projection state cache (v0.7 feature)
215        let projection_state_cache = Arc::new(DashMap::new());
216        tracing::info!("✅ Projection state cache initialized");
217
218        // Initialize webhook registry (v0.11 feature)
219        #[cfg(feature = "server")]
220        let webhook_registry = {
221            let w = Arc::new(WebhookRegistry::new());
222            tracing::info!("✅ Webhook registry initialized");
223            w
224        };
225
226        // Unconditional in-process event broadcaster so embedded consumers
227        // (TUI, web) can live-reload without the `server` feature.
228        let (event_broadcast_tx, _) = tokio::sync::broadcast::channel(1024);
229
230        let store = Self {
231            events: Arc::new(RwLock::new(Vec::new())),
232            index: Arc::new(EventIndex::new()),
233            projections: Arc::new(RwLock::new(projections)),
234            storage,
235            #[cfg(feature = "server")]
236            websocket_manager: Arc::new(WebSocketManager::new()),
237            snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
238            wal,
239            compaction_manager,
240            schema_registry,
241            replay_manager,
242            pipeline_manager,
243            #[cfg(feature = "server")]
244            metrics,
245            total_ingested: Arc::new(RwLock::new(0)),
246            projection_state_cache,
247            projection_status: Arc::new(DashMap::new()),
248            #[cfg(feature = "server")]
249            webhook_registry,
250            #[cfg(feature = "server")]
251            webhook_tx: Arc::new(RwLock::new(None)),
252            geo_index: Arc::new(GeoIndex::new()),
253            exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
254            schema_evolution: Arc::new(SchemaEvolutionManager::new()),
255            entity_versions: Arc::new(DashMap::new()),
256            consumer_registry: Arc::new(ConsumerRegistry::new()),
257            event_broadcast_tx,
258            tenant_loader: {
259                let loader = TenantLoader::new();
260                if let Some(budget) = config.cache_byte_budget {
261                    loader.set_byte_budget(budget);
262                    tracing::info!(
263                        "✅ Cache byte budget set to {} bytes ({:.2} GiB) — LRU eviction enabled",
264                        budget,
265                        budget as f64 / (1024.0 * 1024.0 * 1024.0)
266                    );
267                } else {
268                    tracing::info!(
269                        "✅ Cache budget unset — every loaded tenant stays resident \
270                         (set ALLSOURCE_CACHE_BYTES to enable eviction)"
271                    );
272                }
273                Arc::new(loader)
274            },
275            checkpoint_interval_secs: config.checkpoint_interval_secs,
276        };
277
278        // Boot is now O(1) regardless of dataset size (Step 2 of the
279        // sustainable data strategy). Pre-Step-2 we scanned every
280        // Parquet file at startup; on the production volume that
281        // grew past available memory and Core OOM'd during recovery
282        // (issue #160). Now:
283        //
284        //   - Parquet data stays on disk. Tenants are hydrated on
285        //     demand by `ensure_tenant_loaded`, called from the
286        //     query path on first access.
287        //   - WAL is still recovered eagerly. WAL is bounded
288        //     (rotates / truncates after each Parquet flush), so
289        //     replaying it is O(recent un-flushed writes) — small
290        //     by construction and required for correctness, since
291        //     those events aren't durably in Parquet yet.
292        //
293        // After WAL recovery we still checkpoint recovered events
294        // to Parquet and truncate the WAL. The lazy-load path's
295        // dedupe in `append_loaded_event` (index.get_by_id check)
296        // makes it safe for the same event to be reachable through
297        // both WAL recovery and a subsequent ensure_tenant_loaded
298        // pass on the same tenant.
299        if let Some(ref wal) = store.wal {
300            match wal.recover() {
301                Ok(recovered_events) if !recovered_events.is_empty() => {
302                    let mut wal_new = 0usize;
303                    for event in recovered_events {
304                        let offset = store.events.read().len();
305                        if let Err(e) = store.index.index_event(
306                            event.id,
307                            event.entity_id_str(),
308                            event.event_type_str(),
309                            event.timestamp,
310                            offset,
311                        ) {
312                            tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
313                        }
314
315                        if let Err(e) = store.projections.read().process_event(&event) {
316                            tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
317                        }
318
319                        *store
320                            .entity_versions
321                            .entry(event.entity_id_str().to_string())
322                            .or_insert(0) += 1;
323
324                        store.events.write().push(event);
325                        wal_new += 1;
326                    }
327
328                    // Step 6: expose replay size as a gauge so ops
329                    // can graph "how big was the last replay?" and
330                    // catch regressions where the checkpoint loop
331                    // stops draining the WAL.
332                    #[cfg(feature = "server")]
333                    store.metrics.wal_replay_events_total.set(wal_new as i64);
334
335                    if wal_new > 0 {
336                        let total = store.events.read().len();
337                        // total_ingested now reflects "events the
338                        // process knows about" rather than "events
339                        // ever written" — Parquet data isn't loaded
340                        // on boot, so the count grows as tenants
341                        // get hydrated. Consumers that need the
342                        // historical total should look at Parquet
343                        // file stats, not this counter.
344                        *store.total_ingested.write() = total as u64;
345                        tracing::info!(
346                            "✅ Recovered {} events from WAL (Parquet data stays cold until \
347                             first per-tenant query)",
348                            wal_new
349                        );
350
351                        // Checkpoint WAL events to Parquet — buffer
352                        // them into the per-tenant Parquet batches
353                        // first, then flush. Without this,
354                        // flush_storage() finds an empty current
355                        // batch and silently no-ops, the WAL gets
356                        // truncated, and the events exist only in
357                        // memory (lost on next restart).
358                        if let Some(ref storage) = store.storage {
359                            tracing::info!(
360                                "📸 Checkpointing {} WAL events to Parquet storage...",
361                                wal_new
362                            );
363                            let parquet = storage.read();
364                            let events = store.events.read();
365                            let mut buffered = 0usize;
366                            for event in events.iter().skip(events.len() - wal_new) {
367                                if let Err(e) = parquet.append_event(event.clone()) {
368                                    tracing::error!(
369                                        "Failed to buffer WAL event for Parquet: {}",
370                                        e
371                                    );
372                                } else {
373                                    buffered += 1;
374                                }
375                            }
376                            drop(events);
377                            drop(parquet);
378
379                            if buffered > 0 {
380                                if let Err(e) = store.flush_storage() {
381                                    tracing::error!("Failed to checkpoint to Parquet: {}", e);
382                                } else if let Err(e) = wal.truncate() {
383                                    tracing::error!(
384                                        "Failed to truncate WAL after checkpoint: {}",
385                                        e
386                                    );
387                                } else {
388                                    tracing::info!(
389                                        "✅ WAL checkpointed and truncated ({} events)",
390                                        buffered
391                                    );
392                                }
393                            }
394                        }
395                    }
396                }
397                Ok(_) => {
398                    tracing::debug!("No events to recover from WAL");
399                    #[cfg(feature = "server")]
400                    store.metrics.wal_replay_events_total.set(0);
401                }
402                Err(e) => {
403                    tracing::error!("❌ WAL recovery failed: {}", e);
404                }
405            }
406        } else if store.storage.is_some() {
407            tracing::info!(
408                "📂 Boot complete (lazy-load mode): Parquet data stays on disk until first \
409                 per-tenant query"
410            );
411        }
412
413        store
414    }
415
416    /// Ingest a new event with optional optimistic concurrency check.
417    ///
418    /// If `expected_version` is `Some(v)`, the write is rejected with
419    /// `VersionConflict` unless the entity's current version equals `v`.
420    /// The version check and WAL append are atomic (locked together).
421    ///
422    /// Returns the new entity version after the append.
423    #[cfg_attr(feature = "hotpath", hotpath::measure)]
424    pub fn ingest_with_expected_version(
425        &self,
426        event: &Event,
427        expected_version: Option<u64>,
428    ) -> Result<u64> {
429        // Validate event first (before any locking)
430        self.validate_event(event)?;
431
432        let entity_id = event.entity_id_str().to_string();
433
434        // Atomic version check + append: hold the DashMap entry lock
435        // to prevent TOCTOU races between check and write.
436        let new_version = {
437            let mut version_entry = self.entity_versions.entry(entity_id.clone()).or_insert(0);
438            let current = *version_entry;
439
440            if let Some(expected) = expected_version
441                && current != expected
442            {
443                return Err(crate::error::AllSourceError::VersionConflict { expected, current });
444            }
445
446            // Write to WAL FIRST for durability (under version lock to keep atomicity)
447            if let Some(ref wal) = self.wal {
448                wal.append(event.clone())?;
449            }
450
451            *version_entry += 1;
452            *version_entry
453        };
454
455        // From here on, the event is durable (WAL) and version is bumped.
456        // Continue with indexing, projections, storage, and broadcast.
457        self.ingest_post_wal(event)?;
458
459        Ok(new_version)
460    }
461
462    /// Post-WAL ingestion: index, projections, storage, broadcast.
463    /// Called after WAL append and version bump are complete.
464    #[cfg_attr(feature = "hotpath", hotpath::measure)]
465    fn ingest_post_wal(&self, event: &Event) -> Result<()> {
466        #[cfg(feature = "server")]
467        let timer = self.metrics.ingestion_duration_seconds.start_timer();
468
469        let mut events = self.events.write();
470        let offset = events.len();
471
472        // Index the event
473        self.index.index_event(
474            event.id,
475            event.entity_id_str(),
476            event.event_type_str(),
477            event.timestamp,
478            offset,
479        )?;
480
481        // Process through projections
482        let projections = self.projections.read();
483        projections.process_event(event)?;
484        drop(projections);
485
486        // Process through pipelines
487        let pipeline_results = self.pipeline_manager.process_event(event);
488        if !pipeline_results.is_empty() {
489            tracing::debug!(
490                "Event {} processed by {} pipeline(s)",
491                event.id,
492                pipeline_results.len()
493            );
494            for (pipeline_id, result) in pipeline_results {
495                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
496            }
497        }
498
499        // Persist to Parquet storage if enabled
500        if let Some(ref storage) = self.storage {
501            let storage = storage.read();
502            storage.append_event(event.clone())?;
503        }
504
505        // Store the event in memory
506        events.push(event.clone());
507        let total_events = events.len();
508        drop(events);
509
510        // Broadcast to in-process subscribers (always on) + optional WS.
511        let event_arc = Arc::new(event.clone());
512        let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
513        #[cfg(feature = "server")]
514        self.websocket_manager.broadcast_event(event_arc);
515
516        // Dispatch to matching webhook subscriptions
517        #[cfg(feature = "server")]
518        self.dispatch_webhooks(event);
519
520        // Update geospatial index
521        self.geo_index.index_event(event);
522
523        // Autonomous schema evolution
524        self.schema_evolution
525            .analyze_event(event.event_type_str(), &event.payload);
526
527        // Check if automatic snapshot should be created
528        self.check_auto_snapshot(event.entity_id_str(), event);
529
530        // Update metrics
531        #[cfg(feature = "server")]
532        {
533            self.metrics.events_ingested_total.inc();
534            self.metrics
535                .events_ingested_by_type
536                .with_label_values(&[event.event_type_str()])
537                .inc();
538            self.metrics.storage_events_total.set(total_events as i64);
539        }
540
541        // Update legacy total counter
542        let mut total = self.total_ingested.write();
543        *total += 1;
544
545        #[cfg(feature = "server")]
546        timer.observe_duration();
547
548        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
549
550        Ok(())
551    }
552
553    /// Ingest a new event into the store
554    #[cfg_attr(feature = "hotpath", hotpath::measure)]
555    pub fn ingest(&self, event: &Event) -> Result<()> {
556        // Start metrics timer (v0.6 feature)
557        #[cfg(feature = "server")]
558        let timer = self.metrics.ingestion_duration_seconds.start_timer();
559
560        // Validate event
561        let validation_result = self.validate_event(event);
562        if let Err(e) = validation_result {
563            #[cfg(feature = "server")]
564            {
565                self.metrics.ingestion_errors_total.inc();
566                timer.observe_duration();
567            }
568            return Err(e);
569        }
570
571        // Write to WAL FIRST for durability (v0.2 feature)
572        // This ensures event is persisted before processing
573        if let Some(ref wal) = self.wal
574            && let Err(e) = wal.append(event.clone())
575        {
576            #[cfg(feature = "server")]
577            {
578                self.metrics.ingestion_errors_total.inc();
579                timer.observe_duration();
580            }
581            return Err(e);
582        }
583
584        // Track per-entity version (unconditional increment, no version check)
585        *self
586            .entity_versions
587            .entry(event.entity_id_str().to_string())
588            .or_insert(0) += 1;
589
590        let mut events = self.events.write();
591        let offset = events.len();
592
593        // Index the event
594        self.index.index_event(
595            event.id,
596            event.entity_id_str(),
597            event.event_type_str(),
598            event.timestamp,
599            offset,
600        )?;
601
602        // Process through projections
603        let projections = self.projections.read();
604        projections.process_event(event)?;
605        drop(projections); // Release lock
606
607        // Process through pipelines (v0.5 feature)
608        // Pipelines can transform, filter, and aggregate events in real-time
609        let pipeline_results = self.pipeline_manager.process_event(event);
610        if !pipeline_results.is_empty() {
611            tracing::debug!(
612                "Event {} processed by {} pipeline(s)",
613                event.id,
614                pipeline_results.len()
615            );
616            // Pipeline results could be stored, emitted, or forwarded elsewhere
617            // For now, we just log them for observability
618            for (pipeline_id, result) in pipeline_results {
619                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
620            }
621        }
622
623        // Persist to Parquet storage if enabled (v0.2)
624        if let Some(ref storage) = self.storage {
625            let storage = storage.read();
626            storage.append_event(event.clone())?;
627        }
628
629        // Store the event in memory
630        events.push(event.clone());
631        let total_events = events.len();
632        drop(events); // Release lock early
633
634        // Broadcast to in-process subscribers + optional WS.
635        let event_arc = Arc::new(event.clone());
636        let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
637        #[cfg(feature = "server")]
638        self.websocket_manager.broadcast_event(event_arc);
639
640        // Dispatch to matching webhook subscriptions (v0.11 feature)
641        #[cfg(feature = "server")]
642        self.dispatch_webhooks(event);
643
644        // Update geospatial index (v2.0 feature)
645        self.geo_index.index_event(event);
646
647        // Autonomous schema evolution (v2.0 feature)
648        self.schema_evolution
649            .analyze_event(event.event_type_str(), &event.payload);
650
651        // Check if automatic snapshot should be created (v0.2 feature)
652        self.check_auto_snapshot(event.entity_id_str(), event);
653
654        // Update metrics (v0.6 feature)
655        #[cfg(feature = "server")]
656        {
657            self.metrics.events_ingested_total.inc();
658            self.metrics
659                .events_ingested_by_type
660                .with_label_values(&[event.event_type_str()])
661                .inc();
662            self.metrics.storage_events_total.set(total_events as i64);
663        }
664
665        // Update legacy total counter
666        let mut total = self.total_ingested.write();
667        *total += 1;
668
669        #[cfg(feature = "server")]
670        timer.observe_duration();
671
672        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
673
674        Ok(())
675    }
676
677    /// Ingest a batch of events with a single write lock acquisition.
678    ///
679    /// All events are validated first. If any event fails validation, no
680    /// events are stored (all-or-nothing validation). Events are then written
681    /// to WAL, indexed, processed through projections, and pushed to the
682    /// events vector under a single write lock.
683    #[cfg_attr(feature = "hotpath", hotpath::measure)]
684    pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()> {
685        if batch.is_empty() {
686            return Ok(());
687        }
688
689        // Phase 1: Validate all events before acquiring any locks
690        for event in &batch {
691            self.validate_event(event)?;
692        }
693
694        // Phase 2: Write all events to WAL (before write lock, for durability)
695        if let Some(ref wal) = self.wal {
696            for event in &batch {
697                wal.append(event.clone())?;
698            }
699        }
700
701        // Phase 3: Single write lock for index + projections + push
702        let mut events = self.events.write();
703        let projections = self.projections.read();
704
705        for event in batch {
706            let offset = events.len();
707
708            self.index.index_event(
709                event.id,
710                event.entity_id_str(),
711                event.event_type_str(),
712                event.timestamp,
713                offset,
714            )?;
715
716            projections.process_event(&event)?;
717            self.pipeline_manager.process_event(&event);
718
719            if let Some(ref storage) = self.storage {
720                let storage = storage.read();
721                storage.append_event(event.clone())?;
722            }
723
724            self.geo_index.index_event(&event);
725            self.schema_evolution
726                .analyze_event(event.event_type_str(), &event.payload);
727
728            // Track per-entity version
729            *self
730                .entity_versions
731                .entry(event.entity_id_str().to_string())
732                .or_insert(0) += 1;
733
734            // Broadcast to in-process subscribers
735            let _ = self.event_broadcast_tx.send(Arc::new(event.clone()));
736
737            events.push(event);
738        }
739
740        let total_events = events.len();
741        drop(projections);
742        drop(events);
743
744        let mut total = self.total_ingested.write();
745        *total += total_events as u64;
746
747        Ok(())
748    }
749
750    /// Ingest a replicated event from the leader (follower mode).
751    ///
752    /// Unlike `ingest()`, this method:
753    /// - Skips WAL writing (the follower's WalReceiver manages its own local WAL)
754    /// - Skips schema validation (the leader already validated)
755    /// - Still indexes, processes projections/pipelines, and broadcasts to WebSocket clients
756    #[cfg_attr(feature = "hotpath", hotpath::measure)]
757    pub fn ingest_replicated(&self, event: &Event) -> Result<()> {
758        #[cfg(feature = "server")]
759        let timer = self.metrics.ingestion_duration_seconds.start_timer();
760
761        let mut events = self.events.write();
762        let offset = events.len();
763
764        // Index the event
765        self.index.index_event(
766            event.id,
767            event.entity_id_str(),
768            event.event_type_str(),
769            event.timestamp,
770            offset,
771        )?;
772
773        // Process through projections
774        let projections = self.projections.read();
775        projections.process_event(event)?;
776        drop(projections);
777
778        // Process through pipelines
779        let pipeline_results = self.pipeline_manager.process_event(event);
780        if !pipeline_results.is_empty() {
781            tracing::debug!(
782                "Replicated event {} processed by {} pipeline(s)",
783                event.id,
784                pipeline_results.len()
785            );
786        }
787
788        // Track per-entity version
789        *self
790            .entity_versions
791            .entry(event.entity_id_str().to_string())
792            .or_insert(0) += 1;
793
794        // Store the event in memory
795        events.push(event.clone());
796        let total_events = events.len();
797        drop(events);
798
799        // Broadcast to in-process subscribers + optional WS.
800        let event_arc = Arc::new(event.clone());
801        let _ = self.event_broadcast_tx.send(Arc::clone(&event_arc));
802        #[cfg(feature = "server")]
803        self.websocket_manager.broadcast_event(event_arc);
804
805        // Update metrics
806        #[cfg(feature = "server")]
807        {
808            self.metrics.events_ingested_total.inc();
809            self.metrics
810                .events_ingested_by_type
811                .with_label_values(&[event.event_type_str()])
812                .inc();
813            self.metrics.storage_events_total.set(total_events as i64);
814        }
815
816        let mut total = self.total_ingested.write();
817        *total += 1;
818
819        #[cfg(feature = "server")]
820        timer.observe_duration();
821
822        tracing::debug!(
823            "Replicated event ingested: {} (offset: {})",
824            event.id,
825            offset
826        );
827
828        Ok(())
829    }
830
831    /// Get the current version for an entity (number of events appended for it).
832    /// Returns 0 if the entity has no events.
833    #[cfg_attr(feature = "hotpath", hotpath::measure)]
834    pub fn get_entity_version(&self, entity_id: &str) -> u64 {
835        self.entity_versions.get(entity_id).map_or(0, |v| *v)
836    }
837
838    /// Get the consumer registry for durable subscriptions.
839    pub fn consumer_registry(&self) -> &ConsumerRegistry {
840        &self.consumer_registry
841    }
842
843    /// Subscribe to every successfully-ingested event in this store.
844    ///
845    /// Returns a `tokio::sync::broadcast::Receiver` that yields an `Arc<Event>`
846    /// for each ingest. Always available — does not require the `server`
847    /// feature. Lagging receivers surface `RecvError::Lagged`.
848    pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<Arc<Event>> {
849        self.event_broadcast_tx.subscribe()
850    }
851
852    /// Replace the default in-memory consumer registry with a durable one.
853    ///
854    /// Called during startup when system repositories are available, so that
855    /// consumer cursors survive Core restarts via WAL persistence.
856    pub fn set_consumer_registry(&mut self, registry: Arc<ConsumerRegistry>) {
857        self.consumer_registry = registry;
858    }
859
860    /// Get the total number of events in the store (used as max offset for consumer ack).
861    pub fn total_events(&self) -> usize {
862        self.events.read().len()
863    }
864
865    /// Get events after a given offset, optionally filtered by event type prefixes.
866    /// Used by consumer polling to fetch unprocessed events.
867    pub fn events_after_offset(
868        &self,
869        offset: u64,
870        filters: &[String],
871        limit: usize,
872    ) -> Vec<(u64, Event)> {
873        let events = self.events.read();
874        let start = offset as usize;
875        if start >= events.len() {
876            return vec![];
877        }
878
879        events[start..]
880            .iter()
881            .enumerate()
882            .filter(|(_, event)| ConsumerRegistry::matches_filters(event.event_type_str(), filters))
883            .take(limit)
884            .map(|(i, event)| ((start + i + 1) as u64, event.clone()))
885            .collect()
886    }
887
888    /// Get the WebSocket manager for this store
889    #[cfg(feature = "server")]
890    pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
891        Arc::clone(&self.websocket_manager)
892    }
893
894    /// Get the snapshot manager for this store
895    pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
896        Arc::clone(&self.snapshot_manager)
897    }
898
899    /// Get the compaction manager for this store
900    pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
901        self.compaction_manager.as_ref().map(Arc::clone)
902    }
903
904    /// Get the schema registry for this store (v0.5 feature)
905    pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
906        Arc::clone(&self.schema_registry)
907    }
908
909    /// Get the replay manager for this store (v0.5 feature)
910    pub fn replay_manager(&self) -> Arc<ReplayManager> {
911        Arc::clone(&self.replay_manager)
912    }
913
914    /// Get the pipeline manager for this store (v0.5 feature)
915    pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
916        Arc::clone(&self.pipeline_manager)
917    }
918
919    /// Get the metrics registry for this store (v0.6 feature)
920    #[cfg(feature = "server")]
921    pub fn metrics(&self) -> Arc<MetricsRegistry> {
922        Arc::clone(&self.metrics)
923    }
924
925    /// Get the projection manager for this store (v0.7 feature)
926    pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
927        self.projections.read()
928    }
929
930    /// Register a custom projection at runtime.
931    ///
932    /// The projection will receive all future events via `process()`.
933    /// Historical events are **not** replayed — only events ingested after
934    /// registration will be processed by this projection.
935    ///
936    /// See [`register_projection_with_backfill`](Self::register_projection_with_backfill)
937    /// to also process historical events.
938    pub fn register_projection(
939        &self,
940        projection: Arc<dyn crate::application::services::projection::Projection>,
941    ) {
942        let mut pm = self.projections.write();
943        pm.register(projection);
944    }
945
946    /// Register a custom projection and replay all existing events through it.
947    ///
948    /// After registration, the projection will also receive all future events.
949    /// Historical events are replayed under a read lock — the projection's
950    /// internal state (typically DashMap) handles concurrent access.
951    pub fn register_projection_with_backfill(
952        &self,
953        projection: &Arc<dyn crate::application::services::projection::Projection>,
954    ) -> Result<()> {
955        // First register so future events are processed
956        {
957            let mut pm = self.projections.write();
958            pm.register(Arc::clone(projection));
959        }
960
961        // Then replay existing events under read lock
962        let events = self.events.read();
963        for event in events.iter() {
964            projection.process(event)?;
965        }
966
967        Ok(())
968    }
969
970    /// Get the projection state cache for this store (v0.7 feature)
971    /// Used by Elixir Query Service for state synchronization
972    pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
973        Arc::clone(&self.projection_state_cache)
974    }
975
976    /// Get the projection status map (v0.13 feature)
977    pub fn projection_status(&self) -> Arc<DashMap<String, String>> {
978        Arc::clone(&self.projection_status)
979    }
980
981    /// Get the webhook registry for this store (v0.11 feature)
982    /// Geospatial index for coordinate-based queries (v2.0 feature)
983    pub fn geo_index(&self) -> Arc<GeoIndex> {
984        self.geo_index.clone()
985    }
986
987    /// Exactly-once processing registry (v2.0 feature)
988    pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
989        self.exactly_once.clone()
990    }
991
992    /// Schema evolution manager (v2.0 feature)
993    pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
994        self.schema_evolution.clone()
995    }
996
997    /// Get a read-locked snapshot of all events (for EventQL/GraphQL queries).
998    ///
999    /// Returns an `Arc` reference to the internal events vec, avoiding a full
1000    /// clone. The caller holds a read lock for the duration of the `Arc`
1001    /// lifetime — prefer short-lived usage.
1002    pub fn snapshot_events(&self) -> Vec<Event> {
1003        self.events.read().clone()
1004    }
1005
1006    /// Compact token events for an entity by replacing matching events with a
1007    /// single merged event. Used by the embedded streaming feature.
1008    ///
1009    /// Returns `Ok(true)` if compaction was performed, `Ok(false)` if no
1010    /// matching events were found.
1011    ///
1012    /// **Note:** The merged event is processed through projections *without*
1013    /// clearing the removed events' projection state first. Projections that
1014    /// accumulate state (e.g., counters) should be designed to handle this
1015    /// (the merged event replaces individual tokens, not adds to them).
1016    ///
1017    /// **Crash safety:** The WAL append happens *after* the in-memory swap
1018    /// under the write lock. If the process crashes before the WAL write,
1019    /// no change is persisted — WAL replay restores the pre-compaction state.
1020    /// If the process crashes after the WAL write, replay sees the merged
1021    /// event (and the original tokens, which are idempotent to replay since
1022    /// the merged event supersedes them).
1023    ///
1024    /// The write lock is held for the swap + WAL write + index rebuild.
1025    /// The index rebuild is O(N) over all events, which is acceptable for
1026    /// embedded workloads but should not be called in hot paths for large stores.
1027    pub fn compact_entity_tokens(
1028        &self,
1029        entity_id: &str,
1030        token_event_type: &str,
1031        merged_event: Event,
1032    ) -> Result<bool> {
1033        // Phase 1: Read-only check — do we have anything to compact?
1034        {
1035            let events = self.events.read();
1036            let has_tokens = events
1037                .iter()
1038                .any(|e| e.entity_id_str() == entity_id && e.event_type_str() == token_event_type);
1039            if !has_tokens {
1040                return Ok(false);
1041            }
1042        }
1043
1044        // Phase 2: Process merged event through projections (no write lock held)
1045        let projections = self.projections.read();
1046        projections.process_event(&merged_event)?;
1047        drop(projections);
1048
1049        // Phase 3: Acquire write lock for the swap + WAL + index rebuild
1050        let mut events = self.events.write();
1051
1052        events.retain(|e| {
1053            !(e.entity_id_str() == entity_id && e.event_type_str() == token_event_type)
1054        });
1055
1056        events.push(merged_event.clone());
1057
1058        // WAL append inside write lock: crash before this line = no change persisted.
1059        // Crash after = merged event in WAL, original tokens also in WAL but
1060        // superseded by the merged event's entity_id + event_type.
1061        if let Some(ref wal) = self.wal {
1062            wal.append(merged_event)?;
1063        }
1064
1065        // Rebuild entire index since retain() shifted event positions.
1066        // Errors here indicate a corrupt event (missing entity_id/event_type)
1067        // which should not happen for well-formed events. Log and continue
1068        // rather than failing the entire compaction.
1069        self.index.clear();
1070        for (offset, event) in events.iter().enumerate() {
1071            if let Err(e) = self.index.index_event(
1072                event.id,
1073                event.entity_id_str(),
1074                event.event_type_str(),
1075                event.timestamp,
1076                offset,
1077            ) {
1078                tracing::warn!(
1079                    event_id = %event.id,
1080                    offset,
1081                    "Failed to re-index event during compaction: {e}"
1082                );
1083            }
1084        }
1085
1086        Ok(true)
1087    }
1088
1089    #[cfg(feature = "server")]
1090    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
1091        Arc::clone(&self.webhook_registry)
1092    }
1093
1094    /// Set the channel for async webhook delivery.
1095    /// Called during server startup to wire the delivery worker.
1096    #[cfg(feature = "server")]
1097    pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
1098        *self.webhook_tx.write() = Some(tx);
1099        tracing::info!("Webhook delivery channel connected");
1100    }
1101
1102    /// Dispatch matching webhooks for a given event (non-blocking).
1103    #[cfg(feature = "server")]
1104    fn dispatch_webhooks(&self, event: &Event) {
1105        let matching = self.webhook_registry.find_matching(event);
1106        if matching.is_empty() {
1107            return;
1108        }
1109
1110        let tx_guard = self.webhook_tx.read();
1111        if let Some(ref tx) = *tx_guard {
1112            for webhook in matching {
1113                let task = WebhookDeliveryTask {
1114                    webhook,
1115                    event: event.clone(),
1116                };
1117                if let Err(e) = tx.send(task) {
1118                    tracing::warn!("Failed to queue webhook delivery: {}", e);
1119                }
1120            }
1121        }
1122    }
1123
1124    /// Manually flush any pending events to persistent storage
1125    pub fn flush_storage(&self) -> Result<()> {
1126        if let Some(ref storage) = self.storage {
1127            let storage = storage.read();
1128            storage.flush()?;
1129            tracing::info!("✅ Flushed events to persistent storage");
1130        }
1131        Ok(())
1132    }
1133
1134    /// Run a checkpoint: flush pending Parquet batches, then truncate the
1135    /// WAL through the checkpoint point (Step 6 of the sustainable data
1136    /// strategy).
1137    ///
1138    /// Order matters. We flush Parquet first; only on success do we
1139    /// truncate the WAL. If the process crashes between the flush and the
1140    /// truncate, the WAL still contains the events that were just durably
1141    /// written, and recovery will replay them. The dedupe in
1142    /// `append_loaded_event` (index probe) makes that idempotent — the
1143    /// event is already in Parquet, so the lazy-load splice no-ops once
1144    /// the tenant is hydrated.
1145    ///
1146    /// The reverse order would be unsafe: a crash between truncate and
1147    /// flush would lose committed events.
1148    ///
1149    /// This bounds dirty-restart replay time to one checkpoint interval
1150    /// regardless of total dataset size — that's the load-bearing
1151    /// property for cold-start time as ingest rate grows.
1152    ///
1153    /// No-op when no WAL is configured (in-memory-only mode).
1154    pub fn checkpoint(&self) -> Result<()> {
1155        let Some(ref wal) = self.wal else {
1156            return Ok(());
1157        };
1158
1159        // Flush before recording the truncation target — both
1160        // because flush() may rotate the WAL underneath us and
1161        // because we want the truncate point to reflect what's
1162        // actually durable on disk.
1163        self.flush_storage()?;
1164        wal.truncate()?;
1165        tracing::debug!("✅ Checkpoint complete: Parquet flushed, WAL truncated");
1166        Ok(())
1167    }
1168
1169    /// Get the configured checkpoint cadence (used by background tasks).
1170    pub fn checkpoint_interval(&self) -> Option<std::time::Duration> {
1171        self.checkpoint_interval_secs
1172            .map(std::time::Duration::from_secs)
1173    }
1174
1175    /// Hydrate `tenant_id`'s persisted Parquet data into the in-memory
1176    /// pile if it isn't already loaded. Cheap on the warm path
1177    /// (DashMap probe); on the cold path it walks just that tenant's
1178    /// subtree (`load_events_for_tenant`) and splices the events into
1179    /// `events`/`index`/`projections`/`entity_versions`.
1180    ///
1181    /// Concurrent first-callers for the same tenant serialize on a
1182    /// per-tenant Mutex (singleflight) so the disk read happens once.
1183    /// Other tenants are unaffected — distinct lock per tenant.
1184    ///
1185    /// Returns `Err` if the tenant_id fails the path-safety
1186    /// whitelist, the Parquet read fails, or another in-flight load
1187    /// holds the lock past the configured timeout. The caller (a
1188    /// query handler) is expected to surface that as a 5xx — see
1189    /// Step 2's "no infinite hangs" acceptance criterion.
1190    ///
1191    /// On failure, `loaded` is NOT marked, so a transient error is
1192    /// retried on the next request rather than poisoning the tenant
1193    /// permanently. A future commit may add a circuit breaker if
1194    /// thrash becomes an issue.
1195    ///
1196    /// No-op (and Ok) when no Parquet storage is configured — the
1197    /// in-memory-only mode used by tests has nothing to hydrate.
1198    pub fn ensure_tenant_loaded(&self, tenant_id: &str) -> Result<()> {
1199        // Fast path: warm tenant. Avoids the Mutex altogether.
1200        if self.tenant_loader.is_loaded(tenant_id) {
1201            return Ok(());
1202        }
1203
1204        let Some(storage) = self.storage.as_ref().map(Arc::clone) else {
1205            // No persistent storage to load from. Mark loaded so we
1206            // don't keep re-entering the slow path.
1207            self.tenant_loader.mark_loaded(tenant_id);
1208            return Ok(());
1209        };
1210
1211        // Singleflight: get-or-insert the per-tenant lock and try to
1212        // acquire it within the timeout budget.
1213        let lock = self.tenant_loader.lock_for(tenant_id);
1214        let timeout = self.tenant_loader.load_timeout();
1215        let _guard = lock.try_lock_for(timeout).ok_or_else(|| {
1216            AllSourceError::StorageError(format!(
1217                "ensure_tenant_loaded timed out after {timeout:?} waiting for in-flight load of \
1218                 tenant {tenant_id:?}"
1219            ))
1220        })?;
1221
1222        // Re-check inside the lock — another thread may have completed
1223        // the load while we were waiting.
1224        if self.tenant_loader.is_loaded(tenant_id) {
1225            return Ok(());
1226        }
1227
1228        let started = std::time::Instant::now();
1229        let events = storage.read().load_events_for_tenant(tenant_id)?;
1230        let read_count = events.len();
1231
1232        let before = self.events.read().len();
1233        for event in events {
1234            self.append_loaded_event(event);
1235        }
1236        let applied = self.events.read().len() - before;
1237
1238        // total_ingested only counts events newly added to memory.
1239        // Dedupe (e.g. WAL events re-checkpointed to Parquet) makes
1240        // applied < read_count possible.
1241        *self.total_ingested.write() += applied as u64;
1242        self.tenant_loader.mark_loaded(tenant_id);
1243
1244        tracing::info!(
1245            tenant_id = tenant_id,
1246            read = read_count,
1247            applied = applied,
1248            elapsed_ms = started.elapsed().as_millis() as u64,
1249            "ensure_tenant_loaded: tenant hydrated"
1250        );
1251
1252        // Budget check (Step 3 #3). After splicing the new
1253        // tenant in, evict LRU tenants until we're back under
1254        // budget. Excludes the just-loaded tenant from the
1255        // candidate set — otherwise a single oversized tenant
1256        // would evict itself in a tight loop. If no other tenant
1257        // is loaded, accept the over-budget state and log a
1258        // warning so ops can see it.
1259        self.enforce_cache_budget(tenant_id);
1260
1261        // Refresh the resident-bytes gauge — covers both the
1262        // load-with-no-eviction case and the post-eviction state.
1263        #[cfg(feature = "server")]
1264        self.metrics
1265            .cache_bytes
1266            .set(self.tenant_loader.total_bytes() as i64);
1267
1268        Ok(())
1269    }
1270
1271    /// Walks the LRU until total resident bytes are within the
1272    /// configured budget, calling `evict_tenant` on each victim.
1273    /// Excludes `recently_touched` from the candidate set so we
1274    /// don't evict the tenant that just triggered the call. Step 3
1275    /// #3 entry point.
1276    ///
1277    /// No-op when no budget is set or when already under budget —
1278    /// most queries take the early-return fast path. Eviction is
1279    /// the cold path; with a well-sized budget this only fires
1280    /// during warm-up of new tenants past the working-set size.
1281    fn enforce_cache_budget(&self, recently_touched: &str) {
1282        if !self.tenant_loader.over_budget() {
1283            return;
1284        }
1285        loop {
1286            let Some(victim) = self.tenant_loader.pick_lru_excluding(recently_touched) else {
1287                tracing::warn!(
1288                    cache_bytes = self.tenant_loader.total_bytes(),
1289                    budget = self.tenant_loader.byte_budget(),
1290                    recently_touched = recently_touched,
1291                    "cache over budget but no other tenant available to evict — \
1292                     a single tenant exceeds the budget; consider raising it"
1293                );
1294                return;
1295            };
1296            self.evict_tenant(&victim);
1297            if !self.tenant_loader.over_budget() {
1298                return;
1299            }
1300        }
1301    }
1302
1303    /// True iff `ensure_tenant_loaded` has previously succeeded for
1304    /// this tenant. Diagnostic / testing API.
1305    pub fn is_tenant_loaded(&self, tenant_id: &str) -> bool {
1306        self.tenant_loader.is_loaded(tenant_id)
1307    }
1308
1309    /// Drop `tenant_id` from the in-memory cache. Step 3 #2 of the
1310    /// sustainable data strategy.
1311    ///
1312    /// Removes every event for this tenant from the events Vec,
1313    /// rebuilds the index/entity_versions for the retained events
1314    /// (Vec offsets shift on remove, so the index has to be
1315    /// rebuilt), and resets the tenant_loader bookkeeping so a
1316    /// subsequent query triggers a fresh `ensure_tenant_loaded`.
1317    ///
1318    /// **Parquet is canonical, in-memory is just cache.** This
1319    /// only affects the in-memory side. Disk data is untouched —
1320    /// that's why eviction is safe even for tenants with
1321    /// recently-ingested data: a query after eviction transparently
1322    /// re-reads from Parquet.
1323    ///
1324    /// Projection state is NOT rolled back. Projections accumulate
1325    /// across boots and tenants (their durability story is
1326    /// separate); subtracting them would need replay support that
1327    /// doesn't exist in this commit. After eviction + re-load,
1328    /// projections may double-count the re-loaded events. Step 3
1329    /// #4's stress test only asserts the cache budget is held; a
1330    /// future commit will tackle projection-aware eviction.
1331    ///
1332    /// Locking: takes the events write lock for the full duration
1333    /// of the filter + re-index. Concurrent ingest blocks until
1334    /// done. Eviction is the cold path; the working set should
1335    /// stay in budget so this rarely fires.
1336    pub fn evict_tenant(&self, tenant_id: &str) {
1337        let mut events = self.events.write();
1338        let before = events.len();
1339        let evicted_bytes = self.tenant_loader.bytes_for(tenant_id);
1340
1341        events.retain(|e| e.tenant_id_str() != tenant_id);
1342        let after = events.len();
1343        let dropped = before - after;
1344
1345        if dropped == 0 {
1346            // Tenant had no events in memory. Still clear loader
1347            // state (e.g. a "loaded with zero events" marker) so
1348            // is_tenant_loaded reports the right thing.
1349            drop(events);
1350            self.tenant_loader.mark_unloaded(tenant_id);
1351            return;
1352        }
1353
1354        // Rebuild the index — Vec offsets shifted under retain().
1355        // Rebuild entity_versions from scratch too, since the
1356        // counter reflects "how many events of this entity remain".
1357        self.index.clear();
1358        self.entity_versions.clear();
1359        for (offset, event) in events.iter().enumerate() {
1360            if let Err(e) = self.index.index_event(
1361                event.id,
1362                event.entity_id_str(),
1363                event.event_type_str(),
1364                event.timestamp,
1365                offset,
1366            ) {
1367                tracing::error!(
1368                    "Failed to re-index event during eviction of {}: {}",
1369                    tenant_id,
1370                    e
1371                );
1372            }
1373            *self
1374                .entity_versions
1375                .entry(event.entity_id_str().to_string())
1376                .or_insert(0) += 1;
1377        }
1378        drop(events);
1379
1380        self.tenant_loader.mark_unloaded(tenant_id);
1381
1382        // total_ingested under Steps 2-3 means "events currently
1383        // resident in memory". Subtract what we just dropped.
1384        let mut t = self.total_ingested.write();
1385        *t = t.saturating_sub(dropped as u64);
1386        drop(t);
1387
1388        // Step 3 #4: cache observability. Increment the eviction
1389        // counter, refresh the resident-bytes gauge.
1390        #[cfg(feature = "server")]
1391        {
1392            self.metrics.cache_evictions_total.inc();
1393            self.metrics
1394                .cache_bytes
1395                .set(self.tenant_loader.total_bytes() as i64);
1396        }
1397
1398        tracing::info!(
1399            tenant_id = tenant_id,
1400            events_dropped = dropped,
1401            bytes_freed = evicted_bytes,
1402            "evicted tenant from memory cache"
1403        );
1404    }
1405
1406    /// Approximate resident bytes a single tenant occupies in the
1407    /// in-memory cache. Step 3 budget-tracking input. 0 for cold
1408    /// or evicted tenants.
1409    pub fn tenant_resident_bytes(&self, tenant_id: &str) -> u64 {
1410        self.tenant_loader.bytes_for(tenant_id)
1411    }
1412
1413    /// Sum of resident-byte estimates across every loaded tenant.
1414    /// What the budget check compares against.
1415    pub fn cache_resident_bytes(&self) -> u64 {
1416        self.tenant_loader.total_bytes()
1417    }
1418
1419    /// Splice a single loaded event into the in-memory structures
1420    /// (events vec, index, projections, entity_versions) atomically
1421    /// w.r.t. concurrent ingest. Used by `ensure_tenant_loaded`.
1422    ///
1423    /// The WAL recovery path on boot has its own (single-threaded)
1424    /// variant inline because boot can't race with ingest. This
1425    /// helper is the variant safe to call while traffic is flowing
1426    /// — it holds the events write lock across the index/offset
1427    /// assignment so (offset, push) stays atomic.
1428    ///
1429    /// Dedupes against events already in memory by event ID. Two
1430    /// paths can surface the same event:
1431    /// 1. WAL recovery on boot pushed it into memory.
1432    /// 2. The event was then checkpointed to Parquet and the
1433    ///    WAL truncated. A later ensure_tenant_loaded re-reads
1434    ///    the Parquet file, including this event.
1435    ///
1436    /// Without the dedupe, step 2 would double-count the event.
1437    /// The check is O(1) — DashMap probe by UUID — and the
1438    /// alternative (loading every tenant before truncating WAL)
1439    /// would defeat the lazy-load.
1440    fn append_loaded_event(&self, event: Event) {
1441        if self.index.get_by_id(&event.id).is_some() {
1442            return;
1443        }
1444
1445        let event_bytes = event.estimated_size_bytes();
1446        let tenant = event.tenant_id_str().to_string();
1447
1448        let mut events = self.events.write();
1449        let offset = events.len();
1450
1451        if let Err(e) = self.index.index_event(
1452            event.id,
1453            event.entity_id_str(),
1454            event.event_type_str(),
1455            event.timestamp,
1456            offset,
1457        ) {
1458            tracing::error!("Failed to index loaded event {}: {}", event.id, e);
1459        }
1460
1461        if let Err(e) = self.projections.read().process_event(&event) {
1462            tracing::error!("Failed to project loaded event {}: {}", event.id, e);
1463        }
1464
1465        *self
1466            .entity_versions
1467            .entry(event.entity_id_str().to_string())
1468            .or_insert(0) += 1;
1469
1470        events.push(event);
1471        // Account for the bytes AFTER the push so a panic in the
1472        // index/projection path doesn't leave the counter inflated.
1473        // The DashMap update is itself the last fallible step.
1474        self.tenant_loader.add_bytes(&tenant, event_bytes);
1475    }
1476
1477    /// Manually create a snapshot for an entity
1478    pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
1479        // Get all events for this entity
1480        let events = self.query(&QueryEventsRequest {
1481            entity_id: Some(entity_id.to_string()),
1482            event_type: None,
1483            tenant_id: None,
1484            as_of: None,
1485            since: None,
1486            until: None,
1487            limit: None,
1488            event_type_prefix: None,
1489            payload_filter: None,
1490        })?;
1491
1492        if events.is_empty() {
1493            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1494        }
1495
1496        // Build current state
1497        let mut state = serde_json::json!({});
1498        for event in &events {
1499            if let serde_json::Value::Object(ref mut state_map) = state
1500                && let serde_json::Value::Object(ref payload_map) = event.payload
1501            {
1502                for (key, value) in payload_map {
1503                    state_map.insert(key.clone(), value.clone());
1504                }
1505            }
1506        }
1507
1508        let last_event = events.last().unwrap();
1509        self.snapshot_manager.create_snapshot(
1510            entity_id,
1511            state,
1512            last_event.timestamp,
1513            events.len(),
1514            SnapshotType::Manual,
1515        )?;
1516
1517        Ok(())
1518    }
1519
1520    /// Check and create automatic snapshots if needed
1521    fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
1522        // Count events for this entity
1523        let entity_event_count = self
1524            .index
1525            .get_by_entity(entity_id)
1526            .map_or(0, |entries| entries.len());
1527
1528        if self.snapshot_manager.should_create_snapshot(
1529            entity_id,
1530            entity_event_count,
1531            event.timestamp,
1532        ) {
1533            // Create snapshot in background (don't block ingestion)
1534            if let Err(e) = self.create_snapshot(entity_id) {
1535                tracing::warn!(
1536                    "Failed to create automatic snapshot for {}: {}",
1537                    entity_id,
1538                    e
1539                );
1540            }
1541        }
1542    }
1543
1544    /// Validate an event before ingestion
1545    fn validate_event(&self, event: &Event) -> Result<()> {
1546        // EntityId and EventType value objects already validate non-empty in their constructors
1547        // So these checks are now redundant, but we keep them for explicit validation
1548        if event.entity_id_str().is_empty() {
1549            return Err(AllSourceError::ValidationError(
1550                "entity_id cannot be empty".to_string(),
1551            ));
1552        }
1553
1554        if event.event_type_str().is_empty() {
1555            return Err(AllSourceError::ValidationError(
1556                "event_type cannot be empty".to_string(),
1557            ));
1558        }
1559
1560        // Reject system namespace events from user-facing ingestion.
1561        // System events are written exclusively via SystemMetadataStore.
1562        if event.event_type().is_system() {
1563            return Err(AllSourceError::ValidationError(
1564                "Event types starting with '_system.' are reserved for internal use".to_string(),
1565            ));
1566        }
1567
1568        Ok(())
1569    }
1570
1571    /// Reset a projection by clearing its state and reprocessing all events
1572    pub fn reset_projection(&self, name: &str) -> Result<usize> {
1573        let projection_manager = self.projections.read();
1574        let projection = projection_manager.get_projection(name).ok_or_else(|| {
1575            AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1576        })?;
1577
1578        // Clear existing state
1579        projection.clear();
1580
1581        // Clear cached state for this projection
1582        let prefix = format!("{name}:");
1583        let keys_to_remove: Vec<String> = self
1584            .projection_state_cache
1585            .iter()
1586            .filter(|entry| entry.key().starts_with(&prefix))
1587            .map(|entry| entry.key().clone())
1588            .collect();
1589        for key in keys_to_remove {
1590            self.projection_state_cache.remove(&key);
1591        }
1592
1593        // Reprocess all events through this projection
1594        let events = self.events.read();
1595        let mut reprocessed = 0usize;
1596        for event in events.iter() {
1597            if projection.process(event).is_ok() {
1598                reprocessed += 1;
1599            }
1600        }
1601
1602        Ok(reprocessed)
1603    }
1604
1605    /// Get a single event by its UUID
1606    pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
1607        if let Some(offset) = self.index.get_by_id(event_id) {
1608            let events = self.events.read();
1609            Ok(events.get(offset).cloned())
1610        } else {
1611            Ok(None)
1612        }
1613    }
1614
1615    /// Query events based on filters (optimized with indices)
1616    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1617    pub fn query(&self, request: &QueryEventsRequest) -> Result<Vec<Event>> {
1618        // Lazy-load gate (Step 2): if the request scopes to a tenant,
1619        // make sure that tenant's persisted data is in memory before
1620        // running the in-memory index lookup. First call for a cold
1621        // tenant blocks here for the disk read (single-digit seconds
1622        // on ~100k events); warm tenants take the DashMap fast path
1623        // and add no measurable latency.
1624        //
1625        // Errors propagate as `Err`; the HTTP layer turns that into
1626        // a 5xx, which is the explicit "no infinite hangs" contract
1627        // from the Step 2 acceptance criteria.
1628        //
1629        // Unfiltered (cross-tenant) queries — `tenant_id = None` —
1630        // run against whatever is currently in memory. They cannot
1631        // pre-load every tenant without defeating the whole point
1632        // of the lazy-load model. In practice the gateway always
1633        // injects an auth-derived `tenant_id`; an unfiltered query
1634        // is admin-only and gets degraded results until a future
1635        // commit adds an explicit "load all tenants" admin path.
1636        if let Some(ref tenant_id) = request.tenant_id {
1637            self.ensure_tenant_loaded(tenant_id)?;
1638            // LRU touch — the most-recently-queried tenant moves
1639            // to the back of the eviction queue. Cheap (single
1640            // DashMap insert), called on every per-tenant query.
1641            self.tenant_loader.touch(tenant_id);
1642        }
1643
1644        // Determine query type for metrics (v0.6 feature)
1645        let query_type = if request.entity_id.is_some() {
1646            "entity"
1647        } else if request.event_type.is_some() {
1648            "type"
1649        } else if request.event_type_prefix.is_some() {
1650            "type_prefix"
1651        } else {
1652            "full_scan"
1653        };
1654
1655        // Start metrics timer (v0.6 feature)
1656        #[cfg(feature = "server")]
1657        let timer = self
1658            .metrics
1659            .query_duration_seconds
1660            .with_label_values(&[query_type])
1661            .start_timer();
1662
1663        // Increment query counter (v0.6 feature)
1664        #[cfg(feature = "server")]
1665        self.metrics
1666            .queries_total
1667            .with_label_values(&[query_type])
1668            .inc();
1669
1670        let events = self.events.read();
1671
1672        // Use index for fast lookups
1673        let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
1674            // Use entity index
1675            self.index
1676                .get_by_entity(entity_id)
1677                .map(|entries| self.filter_entries(entries, request))
1678                .unwrap_or_default()
1679        } else if let Some(event_type) = &request.event_type {
1680            // Use type index (exact match)
1681            self.index
1682                .get_by_type(event_type)
1683                .map(|entries| self.filter_entries(entries, request))
1684                .unwrap_or_default()
1685        } else if let Some(prefix) = &request.event_type_prefix {
1686            // Use type index (prefix match)
1687            let entries = self.index.get_by_type_prefix(prefix);
1688            self.filter_entries(entries, request)
1689        } else {
1690            // Full scan (less efficient but necessary for complex queries)
1691            (0..events.len()).collect()
1692        };
1693
1694        // Fetch events and apply remaining filters
1695        let mut results: Vec<Event> = offsets
1696            .iter()
1697            .filter_map(|&offset| events.get(offset).cloned())
1698            .filter(|event| self.apply_filters(event, request))
1699            .collect();
1700
1701        // Sort by timestamp ascending, with version as a deterministic
1702        // tie-breaker so events that share a timestamp keep a stable,
1703        // well-defined order — "the latest event" must be unambiguous
1704        // (issue #177).
1705        results.sort_by(|a, b| {
1706            a.timestamp
1707                .cmp(&b.timestamp)
1708                .then_with(|| a.version.cmp(&b.version))
1709        });
1710
1711        // Apply limit
1712        if let Some(limit) = request.limit {
1713            results.truncate(limit);
1714        }
1715
1716        // Record query results count (v0.6 feature)
1717        #[cfg(feature = "server")]
1718        {
1719            self.metrics
1720                .query_results_total
1721                .with_label_values(&[query_type])
1722                .inc_by(results.len() as u64);
1723            timer.observe_duration();
1724        }
1725
1726        Ok(results)
1727    }
1728
1729    /// Filter index entries based on query parameters
1730    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1731    fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
1732        entries
1733            .into_iter()
1734            .filter(|entry| {
1735                // Time filters
1736                if let Some(as_of) = request.as_of
1737                    && entry.timestamp > as_of
1738                {
1739                    return false;
1740                }
1741                if let Some(since) = request.since
1742                    && entry.timestamp < since
1743                {
1744                    return false;
1745                }
1746                if let Some(until) = request.until
1747                    && entry.timestamp > until
1748                {
1749                    return false;
1750                }
1751                true
1752            })
1753            .map(|entry| entry.offset)
1754            .collect()
1755    }
1756
1757    /// Apply filters to an event
1758    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1759    fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
1760        // Tenant isolation: if a tenant_id is specified, only return events from that tenant
1761        if let Some(ref tid) = request.tenant_id
1762            && event.tenant_id_str() != tid
1763        {
1764            return false;
1765        }
1766
1767        // Additional type filter if entity was primary
1768        if request.entity_id.is_some()
1769            && let Some(ref event_type) = request.event_type
1770            && event.event_type_str() != event_type
1771        {
1772            return false;
1773        }
1774
1775        // Additional prefix filter if entity was primary
1776        if request.entity_id.is_some()
1777            && let Some(ref prefix) = request.event_type_prefix
1778            && !event.event_type_str().starts_with(prefix)
1779        {
1780            return false;
1781        }
1782
1783        // Payload field filtering
1784        if let Some(ref filter_str) = request.payload_filter
1785            && let Ok(filter_obj) =
1786                serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(filter_str)
1787        {
1788            let payload = event.payload();
1789            for (key, expected_value) in &filter_obj {
1790                match payload.get(key) {
1791                    Some(actual_value) if actual_value == expected_value => {}
1792                    _ => return false,
1793                }
1794            }
1795        }
1796
1797        true
1798    }
1799
1800    /// Reconstruct entity state as of a specific timestamp
1801    /// v0.2: Now uses snapshots for fast reconstruction
1802    #[cfg_attr(feature = "hotpath", hotpath::measure)]
1803    pub fn reconstruct_state(
1804        &self,
1805        entity_id: &str,
1806        as_of: Option<DateTime<Utc>>,
1807    ) -> Result<serde_json::Value> {
1808        // Try to find a snapshot to use as a base (v0.2 optimization)
1809        let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
1810            // Get snapshot closest to requested time
1811            if let Some(snapshot) = self
1812                .snapshot_manager
1813                .get_snapshot_as_of(entity_id, as_of_time)
1814            {
1815                tracing::debug!(
1816                    "Using snapshot from {} for entity {} (saved {} events)",
1817                    snapshot.as_of,
1818                    entity_id,
1819                    snapshot.event_count
1820                );
1821                (snapshot.state.clone(), Some(snapshot.as_of))
1822            } else {
1823                (serde_json::json!({}), None)
1824            }
1825        } else {
1826            // Get latest snapshot for current state
1827            if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
1828                tracing::debug!(
1829                    "Using latest snapshot from {} for entity {}",
1830                    snapshot.as_of,
1831                    entity_id
1832                );
1833                (snapshot.state.clone(), Some(snapshot.as_of))
1834            } else {
1835                (serde_json::json!({}), None)
1836            }
1837        };
1838
1839        // Query events after the snapshot (or all if no snapshot)
1840        let events = self.query(&QueryEventsRequest {
1841            entity_id: Some(entity_id.to_string()),
1842            event_type: None,
1843            tenant_id: None,
1844            as_of,
1845            since: since_timestamp,
1846            until: None,
1847            limit: None,
1848            event_type_prefix: None,
1849            payload_filter: None,
1850        })?;
1851
1852        // If no events and no snapshot, entity not found
1853        if events.is_empty() && since_timestamp.is_none() {
1854            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1855        }
1856
1857        // Merge events on top of snapshot (or from scratch if no snapshot)
1858        let mut merged_state = merged_state;
1859        for event in &events {
1860            if let serde_json::Value::Object(ref mut state_map) = merged_state
1861                && let serde_json::Value::Object(ref payload_map) = event.payload
1862            {
1863                for (key, value) in payload_map {
1864                    state_map.insert(key.clone(), value.clone());
1865                }
1866            }
1867        }
1868
1869        // Wrap with metadata
1870        let state = serde_json::json!({
1871            "entity_id": entity_id,
1872            "last_updated": events.last().map(|e| e.timestamp),
1873            "event_count": events.len(),
1874            "as_of": as_of,
1875            "current_state": merged_state,
1876            "history": events.iter().map(|e| {
1877                serde_json::json!({
1878                    "event_id": e.id,
1879                    "type": e.event_type,
1880                    "timestamp": e.timestamp,
1881                    "payload": e.payload
1882                })
1883            }).collect::<Vec<_>>()
1884        });
1885
1886        Ok(state)
1887    }
1888
1889    /// Get snapshot from projection (faster than reconstructing)
1890    pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
1891        let projections = self.projections.read();
1892
1893        if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
1894            && let Some(state) = snapshot_projection.get_state(entity_id)
1895        {
1896            return Ok(serde_json::json!({
1897                "entity_id": entity_id,
1898                "snapshot": state,
1899                "from_projection": "entity_snapshots"
1900            }));
1901        }
1902
1903        Err(AllSourceError::EntityNotFound(entity_id.to_string()))
1904    }
1905
1906    /// Get statistics about the event store
1907    pub fn stats(&self) -> StoreStats {
1908        let events = self.events.read();
1909        let index_stats = self.index.stats();
1910
1911        StoreStats {
1912            total_events: events.len(),
1913            total_entities: index_stats.total_entities,
1914            total_event_types: index_stats.total_event_types,
1915            total_ingested: *self.total_ingested.read(),
1916        }
1917    }
1918
1919    /// Get all unique streams (entity_ids) in the store
1920    pub fn list_streams(&self) -> Vec<StreamInfo> {
1921        self.index
1922            .get_all_entities()
1923            .into_iter()
1924            .map(|entity_id| {
1925                let event_count = self
1926                    .index
1927                    .get_by_entity(&entity_id)
1928                    .map_or(0, |entries| entries.len());
1929                let last_event_at = self
1930                    .index
1931                    .get_by_entity(&entity_id)
1932                    .and_then(|entries| entries.last().map(|e| e.timestamp));
1933                StreamInfo {
1934                    stream_id: entity_id,
1935                    event_count,
1936                    last_event_at,
1937                }
1938            })
1939            .collect()
1940    }
1941
1942    /// Get all unique event types in the store
1943    pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
1944        self.index
1945            .get_all_types()
1946            .into_iter()
1947            .map(|event_type| {
1948                let event_count = self
1949                    .index
1950                    .get_by_type(&event_type)
1951                    .map_or(0, |entries| entries.len());
1952                let last_event_at = self
1953                    .index
1954                    .get_by_type(&event_type)
1955                    .and_then(|entries| entries.last().map(|e| e.timestamp));
1956                EventTypeInfo {
1957                    event_type,
1958                    event_count,
1959                    last_event_at,
1960                }
1961            })
1962            .collect()
1963    }
1964
1965    /// Attach a broadcast sender to the WAL for replication.
1966    ///
1967    /// Thread-safe: can be called through `Arc<EventStore>` at runtime.
1968    /// Used during initial setup and during follower → leader promotion.
1969    /// When set, every WAL append publishes the entry to the broadcast
1970    /// channel so the WAL shipper can stream it to followers.
1971    pub fn enable_wal_replication(
1972        &self,
1973        tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
1974    ) {
1975        if let Some(ref wal_arc) = self.wal {
1976            wal_arc.set_replication_tx(tx);
1977            tracing::info!("WAL replication broadcast enabled");
1978        } else {
1979            tracing::warn!("Cannot enable WAL replication: WAL is not configured");
1980        }
1981    }
1982
1983    /// Get a reference to the WAL (if configured).
1984    /// Used by the replication catch-up protocol to determine oldest available offset.
1985    pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1986        self.wal.as_ref()
1987    }
1988
1989    /// Get a reference to the Parquet storage (if configured).
1990    /// Used by the replication catch-up protocol to stream snapshot files to followers.
1991    pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1992        self.storage.as_ref()
1993    }
1994}
1995
1996/// Configuration for EventStore
1997#[derive(Debug, Clone, Default)]
1998pub struct EventStoreConfig {
1999    /// Optional directory for persistent Parquet storage (v0.2 feature)
2000    pub storage_dir: Option<PathBuf>,
2001
2002    /// Snapshot configuration (v0.2 feature)
2003    pub snapshot_config: SnapshotConfig,
2004
2005    /// Optional directory for WAL (Write-Ahead Log) (v0.2 feature)
2006    pub wal_dir: Option<PathBuf>,
2007
2008    /// WAL configuration (v0.2 feature)
2009    pub wal_config: WALConfig,
2010
2011    /// Compaction configuration (v0.2 feature)
2012    pub compaction_config: CompactionConfig,
2013
2014    /// Schema registry configuration (v0.5 feature)
2015    pub schema_registry_config: SchemaRegistryConfig,
2016
2017    /// Optional directory for system metadata storage (dogfood feature).
2018    /// When set, operational metadata (tenants, config, audit) is stored
2019    /// using AllSource's own event store rather than an external database.
2020    /// Defaults to `{storage_dir}/__system/` when storage_dir is set.
2021    pub system_data_dir: Option<PathBuf>,
2022
2023    /// Name of the default tenant to auto-create on first boot.
2024    pub bootstrap_tenant: Option<String>,
2025
2026    /// In-memory cache budget in bytes (Step 3). When the resident
2027    /// total exceeds this after a load, the LRU tenant is evicted
2028    /// until the cache fits. `None` (the default in tests) disables
2029    /// the budget — every loaded tenant stays resident. Production
2030    /// reads this from the `ALLSOURCE_CACHE_BYTES` env var; see
2031    /// `from_env`.
2032    pub cache_byte_budget: Option<u64>,
2033
2034    /// Cadence of the runtime checkpoint loop, in seconds (Step 6).
2035    /// Each tick flushes pending Parquet batches and, on success,
2036    /// truncates the WAL up through the checkpoint. This bounds
2037    /// dirty-restart replay time to one interval of writes
2038    /// regardless of total dataset size.
2039    ///
2040    /// `None` disables the loop — the WAL still grows but is only
2041    /// truncated at boot, which is the pre-Step-6 behavior. Tests
2042    /// default to `None`; production reads
2043    /// `ALLSOURCE_CHECKPOINT_INTERVAL_SECONDS` (default 60s) via
2044    /// `from_env_vars`.
2045    pub checkpoint_interval_secs: Option<u64>,
2046}
2047
2048impl EventStoreConfig {
2049    /// Create config with persistent storage enabled
2050    pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
2051        Self {
2052            storage_dir: Some(storage_dir.into()),
2053            ..Self::default()
2054        }
2055    }
2056
2057    /// Create config with custom snapshot settings
2058    pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
2059        Self {
2060            snapshot_config,
2061            ..Self::default()
2062        }
2063    }
2064
2065    /// Create config with WAL enabled
2066    pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
2067        Self {
2068            wal_dir: Some(wal_dir.into()),
2069            wal_config,
2070            ..Self::default()
2071        }
2072    }
2073
2074    /// Create config with both persistence and snapshots
2075    pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
2076        Self {
2077            storage_dir: Some(storage_dir.into()),
2078            snapshot_config,
2079            ..Self::default()
2080        }
2081    }
2082
2083    /// Create production config with all features enabled
2084    pub fn production(
2085        storage_dir: impl Into<PathBuf>,
2086        wal_dir: impl Into<PathBuf>,
2087        snapshot_config: SnapshotConfig,
2088        wal_config: WALConfig,
2089        compaction_config: CompactionConfig,
2090    ) -> Self {
2091        let storage_dir = storage_dir.into();
2092        let system_data_dir = storage_dir.join("__system");
2093        Self {
2094            storage_dir: Some(storage_dir),
2095            snapshot_config,
2096            wal_dir: Some(wal_dir.into()),
2097            wal_config,
2098            compaction_config,
2099            system_data_dir: Some(system_data_dir),
2100            ..Self::default()
2101        }
2102    }
2103
2104    /// Resolve the effective system data directory.
2105    ///
2106    /// If explicitly set, returns that. Otherwise, derives from storage_dir.
2107    /// Returns None if neither is configured (in-memory mode).
2108    pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
2109        self.system_data_dir
2110            .clone()
2111            .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
2112    }
2113
2114    /// Build config from environment variables.
2115    ///
2116    /// Reads `ALLSOURCE_DATA_DIR`, `ALLSOURCE_STORAGE_DIR`, `ALLSOURCE_WAL_DIR`,
2117    /// and `ALLSOURCE_WAL_ENABLED` to determine persistence mode.
2118    ///
2119    /// Returns `(config, description)` where description is a human-readable
2120    /// summary of the persistence mode for logging.
2121    pub fn from_env() -> (Self, &'static str) {
2122        Self::from_env_vars(
2123            std::env::var("ALLSOURCE_DATA_DIR")
2124                .ok()
2125                .filter(|s| !s.is_empty()),
2126            std::env::var("ALLSOURCE_STORAGE_DIR")
2127                .ok()
2128                .filter(|s| !s.is_empty()),
2129            std::env::var("ALLSOURCE_WAL_DIR")
2130                .ok()
2131                .filter(|s| !s.is_empty()),
2132            std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
2133            std::env::var("ALLSOURCE_CACHE_BYTES").ok(),
2134            std::env::var("ALLSOURCE_SNAPSHOT_INTERVAL_SECONDS").ok(),
2135            std::env::var("ALLSOURCE_RETENTION_SYSTEM_DAYS").ok(),
2136            std::env::var("ALLSOURCE_CHECKPOINT_INTERVAL_SECONDS").ok(),
2137        )
2138    }
2139
2140    /// Build config from explicit env-var values (testable without mutating process env).
2141    pub fn from_env_vars(
2142        data_dir: Option<String>,
2143        explicit_storage_dir: Option<String>,
2144        explicit_wal_dir: Option<String>,
2145        wal_enabled_var: Option<String>,
2146        cache_bytes_var: Option<String>,
2147        snapshot_interval_var: Option<String>,
2148        retention_system_days_var: Option<String>,
2149        checkpoint_interval_var: Option<String>,
2150    ) -> (Self, &'static str) {
2151        let data_dir = data_dir.filter(|s| !s.is_empty());
2152        let storage_dir = explicit_storage_dir
2153            .filter(|s| !s.is_empty())
2154            .or_else(|| data_dir.as_ref().map(|d| format!("{d}/storage")));
2155        let wal_dir = explicit_wal_dir
2156            .filter(|s| !s.is_empty())
2157            .or_else(|| data_dir.as_ref().map(|d| format!("{d}/wal")));
2158        let wal_enabled = wal_enabled_var.is_none_or(|v| v == "true");
2159        // ALLSOURCE_CACHE_BYTES: parse decimal bytes. Unparseable
2160        // input is logged and ignored rather than failing boot —
2161        // the unbounded fallback is safe (worst case is the
2162        // original pre-Step-3 behavior).
2163        let cache_byte_budget =
2164            cache_bytes_var
2165                .filter(|s| !s.is_empty())
2166                .and_then(|s| match s.parse::<u64>() {
2167                    Ok(v) => Some(v),
2168                    Err(e) => {
2169                        tracing::warn!(
2170                            "ALLSOURCE_CACHE_BYTES={s:?} could not be parsed as u64: {e}; \
2171                         cache budget disabled"
2172                        );
2173                        None
2174                    }
2175                });
2176        let compaction_config =
2177            CompactionConfig::from_env_vars(snapshot_interval_var, retention_system_days_var);
2178
2179        // ALLSOURCE_CHECKPOINT_INTERVAL_SECONDS: parse decimal seconds. The
2180        // default (60s) only applies when WAL is enabled — there's no
2181        // checkpoint loop to run otherwise. Unparseable input is logged
2182        // and falls back to the default rather than failing boot.
2183        let checkpoint_interval_secs = if wal_enabled {
2184            checkpoint_interval_var
2185                .filter(|s| !s.is_empty())
2186                .map(|s| match s.parse::<u64>() {
2187                    Ok(v) => v,
2188                    Err(e) => {
2189                        tracing::warn!(
2190                            "ALLSOURCE_CHECKPOINT_INTERVAL_SECONDS={s:?} could not be parsed as \
2191                             u64: {e}; falling back to default 60s"
2192                        );
2193                        60
2194                    }
2195                })
2196                .or(Some(60))
2197        } else {
2198            None
2199        };
2200
2201        let mut config = match (&storage_dir, &wal_dir) {
2202            (Some(sd), Some(wd)) if wal_enabled => Self::production(
2203                sd,
2204                wd,
2205                SnapshotConfig::default(),
2206                WALConfig::default(),
2207                compaction_config,
2208            ),
2209            (Some(sd), _) => Self::with_persistence(sd),
2210            (_, Some(wd)) if wal_enabled => Self::with_wal(wd, WALConfig::default()),
2211            _ => Self::default(),
2212        };
2213        config.cache_byte_budget = cache_byte_budget;
2214        config.checkpoint_interval_secs = checkpoint_interval_secs;
2215
2216        let mode = match (&storage_dir, &wal_dir) {
2217            (Some(_), Some(_)) if wal_enabled => "wal+parquet",
2218            (Some(_), _) => "parquet-only",
2219            (_, Some(_)) if wal_enabled => "wal-only",
2220            _ => "in-memory",
2221        };
2222        (config, mode)
2223    }
2224}
2225
2226#[derive(Debug, serde::Serialize)]
2227pub struct StoreStats {
2228    pub total_events: usize,
2229    pub total_entities: usize,
2230    pub total_event_types: usize,
2231    pub total_ingested: u64,
2232}
2233
2234/// Information about a stream (entity_id)
2235#[derive(Debug, Clone, serde::Serialize)]
2236pub struct StreamInfo {
2237    /// The stream identifier (entity_id)
2238    pub stream_id: String,
2239    /// Total number of events in this stream
2240    pub event_count: usize,
2241    /// Timestamp of the last event in this stream
2242    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
2243}
2244
2245/// Information about an event type
2246#[derive(Debug, Clone, serde::Serialize)]
2247pub struct EventTypeInfo {
2248    /// The event type name
2249    pub event_type: String,
2250    /// Total number of events of this type
2251    pub event_count: usize,
2252    /// Timestamp of the last event of this type
2253    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
2254}
2255
2256impl Default for EventStore {
2257    fn default() -> Self {
2258        Self::new()
2259    }
2260}
2261
2262#[cfg(test)]
2263mod tests {
2264    use super::*;
2265    use crate::domain::entities::Event;
2266    use tempfile::TempDir;
2267
2268    /// Recursively walk `dir` looking for `*.parquet` files.
2269    /// Tests that pre-date Step 1's tenant-partitioned layout used a
2270    /// flat `read_dir` here; after the move to <root>/<tenant>/<yyyy-mm>/
2271    /// they need to walk subdirectories.
2272    fn find_parquet_files(dir: &std::path::Path) -> Vec<std::path::PathBuf> {
2273        let mut out = Vec::new();
2274        let mut stack = vec![dir.to_path_buf()];
2275        while let Some(d) = stack.pop() {
2276            let Ok(entries) = std::fs::read_dir(&d) else {
2277                continue;
2278            };
2279            for e in entries.flatten() {
2280                let p = e.path();
2281                if p.is_dir() {
2282                    stack.push(p);
2283                } else if p.extension().and_then(|s| s.to_str()) == Some("parquet") {
2284                    out.push(p);
2285                }
2286            }
2287        }
2288        out
2289    }
2290
2291    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2292        Event::from_strings(
2293            event_type.to_string(),
2294            entity_id.to_string(),
2295            "default".to_string(),
2296            serde_json::json!({"name": "Test", "value": 42}),
2297            None,
2298        )
2299        .unwrap()
2300    }
2301
2302    fn create_test_event_with_payload(
2303        entity_id: &str,
2304        event_type: &str,
2305        payload: serde_json::Value,
2306    ) -> Event {
2307        Event::from_strings(
2308            event_type.to_string(),
2309            entity_id.to_string(),
2310            "default".to_string(),
2311            payload,
2312            None,
2313        )
2314        .unwrap()
2315    }
2316
2317    #[test]
2318    fn test_event_store_new() {
2319        let store = EventStore::new();
2320        assert_eq!(store.stats().total_events, 0);
2321        assert_eq!(store.stats().total_entities, 0);
2322    }
2323
2324    // -----------------------------------------------------------------
2325    // Step 2: ensure_tenant_loaded smoke tests. The full
2326    // cold-boot/lazy-hydrate paths land in commit #2 (skip boot
2327    // load) and commit #4 (integration test).
2328    // -----------------------------------------------------------------
2329
2330    #[test]
2331    fn test_ensure_tenant_loaded_no_storage_is_a_noop() {
2332        // An in-memory-only store (no ParquetStorage configured) has
2333        // nothing to hydrate. The method must succeed and mark the
2334        // tenant loaded so subsequent calls hit the fast path.
2335        let store = EventStore::new();
2336        assert!(!store.is_tenant_loaded("alice"));
2337        store.ensure_tenant_loaded("alice").unwrap();
2338        assert!(store.is_tenant_loaded("alice"));
2339        // Other tenants stay cold — the call is per-tenant.
2340        assert!(!store.is_tenant_loaded("bob"));
2341    }
2342
2343    #[test]
2344    fn test_ensure_tenant_loaded_warm_path_is_idempotent() {
2345        let store = EventStore::new();
2346        store.ensure_tenant_loaded("alice").unwrap();
2347        // Second call hits the DashMap fast path and returns Ok.
2348        store.ensure_tenant_loaded("alice").unwrap();
2349    }
2350
2351    #[test]
2352    fn test_ensure_tenant_loaded_rejects_unsafe_tenant_id() {
2353        // With persistence configured, the call has to walk a
2354        // tenant subtree, so the path-safety whitelist applies.
2355        // The error must propagate; the tenant must NOT be marked
2356        // loaded (otherwise an attacker probing path-traversal
2357        // strings could spam the loaded-set with junk).
2358        let temp_dir = TempDir::new().unwrap();
2359        let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
2360        for unsafe_tid in ["..", "a/b", "a\\b", ""] {
2361            let result = store.ensure_tenant_loaded(unsafe_tid);
2362            assert!(
2363                result.is_err(),
2364                "tenant_id {unsafe_tid:?} should have been rejected"
2365            );
2366            assert!(
2367                !store.is_tenant_loaded(unsafe_tid),
2368                "rejected tenant {unsafe_tid:?} must not be marked loaded"
2369            );
2370        }
2371    }
2372
2373    #[test]
2374    fn test_ensure_tenant_loaded_no_subtree_marks_loaded_with_zero_events() {
2375        // A tenant that has no on-disk data (fresh tenant, never
2376        // persisted) must still succeed — load_events_for_tenant
2377        // returns empty, ensure_tenant_loaded marks it loaded so we
2378        // don't re-walk the empty subtree on every query.
2379        let temp_dir = TempDir::new().unwrap();
2380        let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
2381        assert!(!store.is_tenant_loaded("never-existed"));
2382        store.ensure_tenant_loaded("never-existed").unwrap();
2383        assert!(store.is_tenant_loaded("never-existed"));
2384    }
2385
2386    #[test]
2387    fn test_evict_tenant_drops_events_and_resets_bytes() {
2388        // After eviction, the tenant's events are gone from memory,
2389        // its byte counter is reset, and is_tenant_loaded returns
2390        // false. Other tenants are untouched.
2391        let temp_dir = TempDir::new().unwrap();
2392        let storage_dir = temp_dir.path().to_path_buf();
2393
2394        {
2395            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2396            for i in 0..3 {
2397                store
2398                    .ingest(
2399                        &Event::from_strings(
2400                            "test.event".to_string(),
2401                            format!("a-{i}"),
2402                            "alice".to_string(),
2403                            serde_json::json!({"i": i}),
2404                            None,
2405                        )
2406                        .unwrap(),
2407                    )
2408                    .unwrap();
2409            }
2410            for i in 0..2 {
2411                store
2412                    .ingest(
2413                        &Event::from_strings(
2414                            "test.event".to_string(),
2415                            format!("b-{i}"),
2416                            "bob".to_string(),
2417                            serde_json::json!({"i": i}),
2418                            None,
2419                        )
2420                        .unwrap(),
2421                    )
2422                    .unwrap();
2423            }
2424            store.flush_storage().unwrap();
2425        }
2426
2427        let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2428        store.ensure_tenant_loaded("alice").unwrap();
2429        store.ensure_tenant_loaded("bob").unwrap();
2430        assert_eq!(store.stats().total_events, 5);
2431        let alice_bytes = store.tenant_resident_bytes("alice");
2432        let bob_bytes = store.tenant_resident_bytes("bob");
2433        assert!(alice_bytes > 0 && bob_bytes > 0);
2434
2435        store.evict_tenant("alice");
2436
2437        assert!(!store.is_tenant_loaded("alice"));
2438        assert!(store.is_tenant_loaded("bob"));
2439        assert_eq!(store.tenant_resident_bytes("alice"), 0);
2440        assert_eq!(store.tenant_resident_bytes("bob"), bob_bytes);
2441        assert_eq!(store.stats().total_events, 2, "only bob's 2 events remain");
2442    }
2443
2444    #[test]
2445    fn test_evict_tenant_then_query_re_loads_from_disk() {
2446        // The transparent re-load behavior the bead's AC #5 calls
2447        // out: evict, then query the same tenant — its data comes
2448        // back via ensure_tenant_loaded, sourced from Parquet.
2449        let temp_dir = TempDir::new().unwrap();
2450        let storage_dir = temp_dir.path().to_path_buf();
2451
2452        {
2453            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2454            for i in 0..4 {
2455                store
2456                    .ingest(
2457                        &Event::from_strings(
2458                            "test.event".to_string(),
2459                            format!("a-{i}"),
2460                            "alice".to_string(),
2461                            serde_json::json!({"i": i}),
2462                            None,
2463                        )
2464                        .unwrap(),
2465                    )
2466                    .unwrap();
2467            }
2468            store.flush_storage().unwrap();
2469        }
2470
2471        let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2472        store.ensure_tenant_loaded("alice").unwrap();
2473        store.evict_tenant("alice");
2474        assert_eq!(store.stats().total_events, 0);
2475
2476        // Query — re-load happens transparently.
2477        let results = store
2478            .query(&QueryEventsRequest {
2479                entity_id: None,
2480                event_type: None,
2481                tenant_id: Some("alice".to_string()),
2482                as_of: None,
2483                since: None,
2484                until: None,
2485                limit: None,
2486                event_type_prefix: None,
2487                payload_filter: None,
2488            })
2489            .unwrap();
2490        assert_eq!(results.len(), 4);
2491        assert!(store.is_tenant_loaded("alice"));
2492    }
2493
2494    #[test]
2495    fn test_evict_tenant_rebuilds_index_with_new_offsets() {
2496        // After eviction, the events Vec is compacted. The index
2497        // must be rebuilt against the new offsets — otherwise
2498        // queries return stale or wrong events. This test checks
2499        // index correctness end-to-end via a query for the
2500        // surviving tenant after the evicted tenant's events are
2501        // gone.
2502        let temp_dir = TempDir::new().unwrap();
2503        let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
2504
2505        // Interleave: alice, bob, alice, bob, alice. After
2506        // evicting alice, the events Vec compacts to [bob, bob]
2507        // and the index must reflect the new layout.
2508        for i in 0..3 {
2509            store
2510                .ingest(
2511                    &Event::from_strings(
2512                        "test.event".to_string(),
2513                        format!("a-{i}"),
2514                        "alice".to_string(),
2515                        serde_json::json!({"i": i}),
2516                        None,
2517                    )
2518                    .unwrap(),
2519                )
2520                .unwrap();
2521            if i < 2 {
2522                store
2523                    .ingest(
2524                        &Event::from_strings(
2525                            "test.event".to_string(),
2526                            format!("b-{i}"),
2527                            "bob".to_string(),
2528                            serde_json::json!({"i": i}),
2529                            None,
2530                        )
2531                        .unwrap(),
2532                    )
2533                    .unwrap();
2534            }
2535        }
2536        // Mark both as loaded for accurate eviction bookkeeping.
2537        store.tenant_loader.mark_loaded("alice");
2538        store.tenant_loader.mark_loaded("bob");
2539
2540        store.evict_tenant("alice");
2541
2542        let bob_results = store
2543            .query(&QueryEventsRequest {
2544                entity_id: None,
2545                event_type: None,
2546                tenant_id: Some("bob".to_string()),
2547                as_of: None,
2548                since: None,
2549                until: None,
2550                limit: None,
2551                event_type_prefix: None,
2552                payload_filter: None,
2553            })
2554            .unwrap();
2555        assert_eq!(bob_results.len(), 2);
2556        for e in &bob_results {
2557            assert_eq!(e.tenant_id_str(), "bob");
2558        }
2559    }
2560
2561    #[test]
2562    fn test_budget_eviction_keeps_resident_set_bounded() {
2563        // Configure a tiny budget. Load three tenants in sequence;
2564        // the third load must evict the LRU tenant, keeping the
2565        // resident set under (or near) the budget.
2566        let temp_dir = TempDir::new().unwrap();
2567        let storage_dir = temp_dir.path().to_path_buf();
2568
2569        // Persist 5 events per tenant with ~1 KiB payloads. Each
2570        // tenant ends up at ~5 KiB + overhead.
2571        let big_payload = serde_json::json!({"data": "x".repeat(1000)});
2572        {
2573            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2574            for tenant in ["alice", "bob", "carol"] {
2575                for i in 0..5 {
2576                    store
2577                        .ingest(
2578                            &Event::from_strings(
2579                                "test.event".to_string(),
2580                                format!("{tenant}-{i}"),
2581                                tenant.to_string(),
2582                                big_payload.clone(),
2583                                None,
2584                            )
2585                            .unwrap(),
2586                        )
2587                        .unwrap();
2588                }
2589            }
2590            store.flush_storage().unwrap();
2591        }
2592
2593        // Budget = 12 KiB. Two tenants (~6 KiB each = ~12 KiB) is
2594        // tight; loading a third must evict.
2595        let mut config = EventStoreConfig::with_persistence(&storage_dir);
2596        config.cache_byte_budget = Some(12_000);
2597        let store = EventStore::with_config(config);
2598
2599        // Load alice — under budget, no eviction.
2600        store.ensure_tenant_loaded("alice").unwrap();
2601        assert!(store.is_tenant_loaded("alice"));
2602
2603        // Touch alice and immediately load bob. Bob is the
2604        // freshly-loaded one, so bob is excluded from eviction.
2605        // Alice is the next-oldest. After the load, total may
2606        // exceed budget — if so, evict alice.
2607        store.tenant_loader.touch("alice");
2608        std::thread::sleep(std::time::Duration::from_millis(10));
2609        store.ensure_tenant_loaded("bob").unwrap();
2610        assert!(store.is_tenant_loaded("bob"));
2611
2612        // Touch bob, load carol. Carol is freshly-loaded; the LRU
2613        // candidate is the older of {alice, bob} — alice (since
2614        // bob was just touched).
2615        store.tenant_loader.touch("bob");
2616        std::thread::sleep(std::time::Duration::from_millis(10));
2617        store.ensure_tenant_loaded("carol").unwrap();
2618        assert!(store.is_tenant_loaded("carol"));
2619
2620        // After all loads, the cache must respect the budget OR
2621        // (if a single tenant alone exceeds it) we should at most
2622        // hold the just-loaded tenant. The test budget is small
2623        // enough that we expect at least one eviction.
2624        let resident = store.cache_resident_bytes();
2625        let budget = 12_000u64;
2626
2627        // Either we're within the budget, or only the freshly-loaded
2628        // tenant is left (the "single oversized tenant" fallback).
2629        if resident > budget {
2630            let loaded_count = ["alice", "bob", "carol"]
2631                .iter()
2632                .filter(|t| store.is_tenant_loaded(t))
2633                .count();
2634            assert_eq!(
2635                loaded_count, 1,
2636                "over budget but more than one tenant loaded — eviction policy didn't fire"
2637            );
2638        }
2639
2640        // Carol must still be loaded — it's the most recent and
2641        // never picked as a victim.
2642        assert!(store.is_tenant_loaded("carol"));
2643    }
2644
2645    #[test]
2646    fn test_query_after_eviction_re_loads_transparently() {
2647        // The end-to-end shape of AC #5: query → evict → query
2648        // again returns the right data.
2649        let temp_dir = TempDir::new().unwrap();
2650        let storage_dir = temp_dir.path().to_path_buf();
2651
2652        let big_payload = serde_json::json!({"data": "x".repeat(2000)});
2653        {
2654            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2655            for tenant in ["alice", "bob"] {
2656                for i in 0..3 {
2657                    store
2658                        .ingest(
2659                            &Event::from_strings(
2660                                "test.event".to_string(),
2661                                format!("{tenant}-{i}"),
2662                                tenant.to_string(),
2663                                big_payload.clone(),
2664                                None,
2665                            )
2666                            .unwrap(),
2667                        )
2668                        .unwrap();
2669                }
2670            }
2671            store.flush_storage().unwrap();
2672        }
2673
2674        // Budget = 5 KiB — one tenant fits, two don't.
2675        let mut config = EventStoreConfig::with_persistence(&storage_dir);
2676        config.cache_byte_budget = Some(5_000);
2677        let store = EventStore::with_config(config);
2678
2679        // Query alice — sized at ~6 KiB, so over budget but no
2680        // peer to evict; alice stays as the single-oversized-tenant
2681        // case.
2682        let alice_first = store
2683            .query(&QueryEventsRequest {
2684                entity_id: None,
2685                event_type: None,
2686                tenant_id: Some("alice".to_string()),
2687                as_of: None,
2688                since: None,
2689                until: None,
2690                limit: None,
2691                event_type_prefix: None,
2692                payload_filter: None,
2693            })
2694            .unwrap();
2695        assert_eq!(alice_first.len(), 3);
2696
2697        // Sleep to make alice older than bob in the LRU ordering.
2698        std::thread::sleep(std::time::Duration::from_millis(15));
2699        // Query bob — alice will get evicted.
2700        let _bob = store
2701            .query(&QueryEventsRequest {
2702                entity_id: None,
2703                event_type: None,
2704                tenant_id: Some("bob".to_string()),
2705                as_of: None,
2706                since: None,
2707                until: None,
2708                limit: None,
2709                event_type_prefix: None,
2710                payload_filter: None,
2711            })
2712            .unwrap();
2713        assert!(
2714            !store.is_tenant_loaded("alice"),
2715            "alice should have been evicted"
2716        );
2717
2718        // Re-query alice — must transparently re-load.
2719        let alice_second = store
2720            .query(&QueryEventsRequest {
2721                entity_id: None,
2722                event_type: None,
2723                tenant_id: Some("alice".to_string()),
2724                as_of: None,
2725                since: None,
2726                until: None,
2727                limit: None,
2728                event_type_prefix: None,
2729                payload_filter: None,
2730            })
2731            .unwrap();
2732        assert_eq!(
2733            alice_second.len(),
2734            3,
2735            "alice's events come back via re-load"
2736        );
2737        assert!(store.is_tenant_loaded("alice"));
2738    }
2739
2740    #[test]
2741    #[cfg(feature = "server")]
2742    fn test_cache_metrics_track_evictions_and_bytes() {
2743        // Smoke test for the Step 3 #4 Prometheus metrics —
2744        // confirms the counter increments on eviction and the
2745        // gauge tracks the resident bytes.
2746        let temp_dir = TempDir::new().unwrap();
2747        let storage_dir = temp_dir.path().to_path_buf();
2748
2749        let big_payload = serde_json::json!({"data": "x".repeat(2000)});
2750        {
2751            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2752            for tenant in ["alice", "bob"] {
2753                for i in 0..3 {
2754                    store
2755                        .ingest(
2756                            &Event::from_strings(
2757                                "test.event".to_string(),
2758                                format!("{tenant}-{i}"),
2759                                tenant.to_string(),
2760                                big_payload.clone(),
2761                                None,
2762                            )
2763                            .unwrap(),
2764                        )
2765                        .unwrap();
2766                }
2767            }
2768            store.flush_storage().unwrap();
2769        }
2770
2771        let mut config = EventStoreConfig::with_persistence(&storage_dir);
2772        config.cache_byte_budget = Some(5_000); // forces eviction
2773        let store = EventStore::with_config(config);
2774
2775        assert_eq!(store.metrics.cache_evictions_total.get(), 0);
2776        assert_eq!(store.metrics.cache_bytes.get(), 0);
2777
2778        store.ensure_tenant_loaded("alice").unwrap();
2779        // After loading alice, gauge reflects her bytes.
2780        let after_alice = store.metrics.cache_bytes.get();
2781        assert!(after_alice > 0, "gauge should reflect alice's bytes");
2782        // Single oversized tenant — no eviction yet.
2783        assert_eq!(store.metrics.cache_evictions_total.get(), 0);
2784
2785        std::thread::sleep(std::time::Duration::from_millis(10));
2786        store.ensure_tenant_loaded("bob").unwrap();
2787
2788        // Bob's load pushed total over budget; alice (older) was
2789        // evicted. Counter increments.
2790        assert_eq!(
2791            store.metrics.cache_evictions_total.get(),
2792            1,
2793            "exactly one tenant evicted after bob's load"
2794        );
2795        // Gauge now reflects only bob's bytes.
2796        let after_bob = store.metrics.cache_bytes.get();
2797        assert!(after_bob > 0);
2798        assert!(after_bob <= after_alice, "gauge dropped after eviction");
2799    }
2800
2801    #[test]
2802    fn test_stress_resident_set_stays_near_budget_under_rolling_queries() {
2803        // Scaled-down version of the bead's stress test: the
2804        // bead's 10 × 50 MB / 100 MB ratio (10× tenants vs
2805        // budget-headroom) preserved at 500 KB / 1 MB to stay
2806        // unit-test-fast. The same correctness property: after
2807        // many rolling queries across more tenants than fit, the
2808        // resident set must stay at-or-near the budget.
2809        let temp_dir = TempDir::new().unwrap();
2810        let storage_dir = temp_dir.path().to_path_buf();
2811
2812        const TENANT_COUNT: usize = 10;
2813        const EVENTS_PER_TENANT: usize = 50;
2814        // Per-event payload ~10 KiB → tenant ~ 500 KiB.
2815        let big_payload = serde_json::json!({"data": "x".repeat(10_000)});
2816
2817        // Persist all tenants. Each ends up at ~500 KiB on disk
2818        // (and roughly the same in memory once loaded).
2819        {
2820            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2821            for t in 0..TENANT_COUNT {
2822                let tenant = format!("tenant-{t}");
2823                for i in 0..EVENTS_PER_TENANT {
2824                    store
2825                        .ingest(
2826                            &Event::from_strings(
2827                                "test.event".to_string(),
2828                                format!("{tenant}-{i}"),
2829                                tenant.clone(),
2830                                big_payload.clone(),
2831                                None,
2832                            )
2833                            .unwrap(),
2834                        )
2835                        .unwrap();
2836                }
2837            }
2838            store.flush_storage().unwrap();
2839        }
2840
2841        // Budget = 1 MiB → fits ~2 tenants. We're going to query
2842        // all 10, so the LRU policy must hold the resident set
2843        // near 1 MiB across the rolling sequence.
2844        const BUDGET: u64 = 1_048_576;
2845        let mut config = EventStoreConfig::with_persistence(&storage_dir);
2846        config.cache_byte_budget = Some(BUDGET);
2847        let store = EventStore::with_config(config);
2848
2849        // Sweep through tenants in order. Each query loads its
2850        // tenant; if budget is exceeded after the load, an LRU
2851        // eviction fires.
2852        let mut peak_resident: u64 = 0;
2853        for t in 0..TENANT_COUNT {
2854            let tenant = format!("tenant-{t}");
2855            let results = store
2856                .query(&QueryEventsRequest {
2857                    entity_id: None,
2858                    event_type: None,
2859                    tenant_id: Some(tenant.clone()),
2860                    as_of: None,
2861                    since: None,
2862                    until: None,
2863                    limit: None,
2864                    event_type_prefix: None,
2865                    payload_filter: None,
2866                })
2867                .unwrap();
2868            assert_eq!(
2869                results.len(),
2870                EVENTS_PER_TENANT,
2871                "every per-tenant query must return all of that tenant's events"
2872            );
2873            // Track peak resident bytes seen during the sweep.
2874            let resident = store.cache_resident_bytes();
2875            if resident > peak_resident {
2876                peak_resident = resident;
2877            }
2878        }
2879
2880        let final_resident = store.cache_resident_bytes();
2881
2882        // Tolerance: a tenant's bytes get added before eviction
2883        // fires, so peak transiently exceeds the budget by at
2884        // most one tenant's worth (~500 KiB). The final state
2885        // after the sweep should be well-bounded.
2886        let tolerance = BUDGET; // generous: 2× budget upper bound
2887        assert!(
2888            peak_resident <= BUDGET + tolerance,
2889            "peak resident {peak_resident} exceeds budget {BUDGET} by more than {tolerance} \
2890             — eviction policy not keeping up with the working-set churn"
2891        );
2892        assert!(
2893            final_resident <= BUDGET + tolerance,
2894            "final resident {final_resident} exceeds budget {BUDGET} by more than {tolerance}"
2895        );
2896
2897        // The most-recently-queried tenant must still be loaded
2898        // (it was just touched).
2899        let last_tenant = format!("tenant-{}", TENANT_COUNT - 1);
2900        assert!(
2901            store.is_tenant_loaded(&last_tenant),
2902            "the most-recent tenant must remain loaded after the sweep"
2903        );
2904
2905        // At least some tenants must have been evicted — otherwise
2906        // the budget didn't fire.
2907        let still_loaded = (0..TENANT_COUNT)
2908            .filter(|t| store.is_tenant_loaded(&format!("tenant-{t}")))
2909            .count();
2910        assert!(
2911            still_loaded < TENANT_COUNT,
2912            "no tenants evicted ({still_loaded}/{TENANT_COUNT} still loaded) — \
2913             budget enforcement didn't engage"
2914        );
2915    }
2916
2917    #[test]
2918    fn test_evict_tenant_when_not_loaded_is_a_noop() {
2919        // Eviction of a never-loaded tenant must not panic and
2920        // must not affect other tenants.
2921        let store = EventStore::new();
2922        store.evict_tenant("nobody"); // should not panic
2923        assert!(!store.is_tenant_loaded("nobody"));
2924    }
2925
2926    #[test]
2927    fn test_lazy_load_accounts_bytes_per_tenant() {
2928        // Step 3 #1: per-tenant byte tracking. Loading a tenant
2929        // should accumulate bytes proportional to its event
2930        // payload sizes; another tenant's counter must stay 0.
2931        let temp_dir = TempDir::new().unwrap();
2932        let storage_dir = temp_dir.path().to_path_buf();
2933
2934        // Persist 5 events for alice with measurable-size payloads.
2935        {
2936            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2937            for i in 0..5 {
2938                store
2939                    .ingest(
2940                        &Event::from_strings(
2941                            "test.event".to_string(),
2942                            format!("a-{i}"),
2943                            "alice".to_string(),
2944                            serde_json::json!({"data": "x".repeat(1000)}),
2945                            None,
2946                        )
2947                        .unwrap(),
2948                    )
2949                    .unwrap();
2950            }
2951            store.flush_storage().unwrap();
2952        }
2953
2954        let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2955        // Cold: zero bytes accounted.
2956        assert_eq!(store.tenant_resident_bytes("alice"), 0);
2957        assert_eq!(store.cache_resident_bytes(), 0);
2958
2959        store.ensure_tenant_loaded("alice").unwrap();
2960
2961        // After load: alice's counter is non-trivial (5 events
2962        // each carrying ~1000 bytes of payload + overhead).
2963        let alice_bytes = store.tenant_resident_bytes("alice");
2964        assert!(
2965            alice_bytes >= 5 * 1000,
2966            "alice should have at least 5 KiB resident; got {alice_bytes}"
2967        );
2968        // Bob never loaded → 0.
2969        assert_eq!(store.tenant_resident_bytes("bob"), 0);
2970        // Total equals alice's portion (only loaded tenant).
2971        assert_eq!(store.cache_resident_bytes(), alice_bytes);
2972    }
2973
2974    #[test]
2975    fn test_query_lazy_loads_tenant_on_first_call() {
2976        // The end-to-end shape of Step 2: persist events for a
2977        // tenant in session 1, restart, and confirm session 2 boots
2978        // empty but a query for that tenant pulls them in.
2979        let temp_dir = TempDir::new().unwrap();
2980        let storage_dir = temp_dir.path().to_path_buf();
2981
2982        // Session 1: ingest 3 events for tenant "alice", flush, drop.
2983        {
2984            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
2985            for i in 0..3 {
2986                let event = Event::from_strings(
2987                    "test.event".to_string(),
2988                    format!("e-{i}"),
2989                    "alice".to_string(),
2990                    serde_json::json!({"i": i}),
2991                    None,
2992                )
2993                .unwrap();
2994                store.ingest(&event).unwrap();
2995            }
2996            store.flush_storage().unwrap();
2997        }
2998
2999        // Session 2: fresh boot. Events on disk, nothing in memory.
3000        let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3001        assert_eq!(
3002            store.stats().total_events,
3003            0,
3004            "boot must be O(1) — no Parquet pre-load"
3005        );
3006        assert!(!store.is_tenant_loaded("alice"));
3007        assert!(!store.is_tenant_loaded("bob"));
3008
3009        // First query for alice: triggers ensure_tenant_loaded.
3010        let results = store
3011            .query(&QueryEventsRequest {
3012                entity_id: None,
3013                event_type: None,
3014                tenant_id: Some("alice".to_string()),
3015                as_of: None,
3016                since: None,
3017                until: None,
3018                limit: None,
3019                event_type_prefix: None,
3020                payload_filter: None,
3021            })
3022            .unwrap();
3023        assert_eq!(results.len(), 3, "alice's 3 events are returned");
3024        assert!(store.is_tenant_loaded("alice"), "alice now warm");
3025        // bob untouched — load is per-tenant, so a query for alice
3026        // must not have hydrated bob.
3027        assert!(!store.is_tenant_loaded("bob"), "bob still cold");
3028    }
3029
3030    #[test]
3031    fn test_query_invalid_tenant_id_returns_error_no_hang() {
3032        // Step 2 acceptance criterion: in-flight load failures
3033        // surface as errors, not infinite hangs. Path-traversal
3034        // input fails fast at sanitization and propagates.
3035        let temp_dir = TempDir::new().unwrap();
3036        let store = EventStore::with_config(EventStoreConfig::with_persistence(temp_dir.path()));
3037
3038        let result = store.query(&QueryEventsRequest {
3039            entity_id: None,
3040            event_type: None,
3041            tenant_id: Some("../etc".to_string()),
3042            as_of: None,
3043            since: None,
3044            until: None,
3045            limit: None,
3046            event_type_prefix: None,
3047            payload_filter: None,
3048        });
3049        assert!(result.is_err(), "unsafe tenant_id must surface as error");
3050    }
3051
3052    #[test]
3053    fn test_query_concurrent_first_queries_for_same_tenant_all_succeed() {
3054        // Singleflight: N threads racing to query the same cold
3055        // tenant must all return the same correct result. The
3056        // tenant-load must happen exactly once (verified
3057        // structurally by the per-tenant Mutex in tenant_loader,
3058        // tested directly in test_singleflight_blocks_second_caller).
3059        // This integration test confirms the wiring at the query
3060        // level — no thread observes a half-loaded state.
3061        let temp_dir = TempDir::new().unwrap();
3062        let storage_dir = temp_dir.path().to_path_buf();
3063
3064        // Persist 25 events for tenant "alice".
3065        {
3066            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3067            for i in 0..25 {
3068                let event = Event::from_strings(
3069                    "test.event".to_string(),
3070                    format!("e-{i}"),
3071                    "alice".to_string(),
3072                    serde_json::json!({"i": i}),
3073                    None,
3074                )
3075                .unwrap();
3076                store.ingest(&event).unwrap();
3077            }
3078            store.flush_storage().unwrap();
3079        }
3080
3081        // Fresh boot, then 8 threads simultaneously query alice.
3082        let store = Arc::new(EventStore::with_config(EventStoreConfig::with_persistence(
3083            &storage_dir,
3084        )));
3085        assert!(!store.is_tenant_loaded("alice"));
3086
3087        let mut handles = Vec::new();
3088        for _ in 0..8 {
3089            let s = store.clone();
3090            handles.push(std::thread::spawn(move || {
3091                s.query(&QueryEventsRequest {
3092                    entity_id: None,
3093                    event_type: None,
3094                    tenant_id: Some("alice".to_string()),
3095                    as_of: None,
3096                    since: None,
3097                    until: None,
3098                    limit: None,
3099                    event_type_prefix: None,
3100                    payload_filter: None,
3101                })
3102            }));
3103        }
3104
3105        for h in handles {
3106            let result = h.join().unwrap().unwrap();
3107            assert_eq!(
3108                result.len(),
3109                25,
3110                "every concurrent caller must see all 25 events"
3111            );
3112        }
3113        assert!(store.is_tenant_loaded("alice"));
3114        // Memory has exactly 25 events — no double-load.
3115        assert_eq!(store.stats().total_events, 25);
3116    }
3117
3118    #[test]
3119    fn test_query_two_cold_tenants_load_independently() {
3120        // Querying tenant A loads only A; querying B then loads
3121        // only B. State after both queries: both tenants warm,
3122        // memory has exactly the expected event counts.
3123        let temp_dir = TempDir::new().unwrap();
3124        let storage_dir = temp_dir.path().to_path_buf();
3125
3126        {
3127            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3128            for i in 0..3 {
3129                store
3130                    .ingest(
3131                        &Event::from_strings(
3132                            "test.event".to_string(),
3133                            format!("a-{i}"),
3134                            "alice".to_string(),
3135                            serde_json::json!({"i": i}),
3136                            None,
3137                        )
3138                        .unwrap(),
3139                    )
3140                    .unwrap();
3141            }
3142            for i in 0..5 {
3143                store
3144                    .ingest(
3145                        &Event::from_strings(
3146                            "test.event".to_string(),
3147                            format!("b-{i}"),
3148                            "bob".to_string(),
3149                            serde_json::json!({"i": i}),
3150                            None,
3151                        )
3152                        .unwrap(),
3153                    )
3154                    .unwrap();
3155            }
3156            store.flush_storage().unwrap();
3157        }
3158
3159        let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3160        assert_eq!(store.stats().total_events, 0);
3161
3162        // Query alice — bob stays cold.
3163        let alice = store
3164            .query(&QueryEventsRequest {
3165                entity_id: None,
3166                event_type: None,
3167                tenant_id: Some("alice".to_string()),
3168                as_of: None,
3169                since: None,
3170                until: None,
3171                limit: None,
3172                event_type_prefix: None,
3173                payload_filter: None,
3174            })
3175            .unwrap();
3176        assert_eq!(alice.len(), 3);
3177        assert!(store.is_tenant_loaded("alice"));
3178        assert!(!store.is_tenant_loaded("bob"));
3179        assert_eq!(store.stats().total_events, 3);
3180
3181        // Query bob — both warm now.
3182        let bob = store
3183            .query(&QueryEventsRequest {
3184                entity_id: None,
3185                event_type: None,
3186                tenant_id: Some("bob".to_string()),
3187                as_of: None,
3188                since: None,
3189                until: None,
3190                limit: None,
3191                event_type_prefix: None,
3192                payload_filter: None,
3193            })
3194            .unwrap();
3195        assert_eq!(bob.len(), 5);
3196        assert!(store.is_tenant_loaded("bob"));
3197        assert_eq!(store.stats().total_events, 8);
3198    }
3199
3200    #[test]
3201    fn test_boot_with_persisted_data_is_o1() {
3202        // Step 2's headline acceptance criterion: boot time does
3203        // not scale with persisted-data size. The 5M-events / <2s
3204        // target is too large for a unit test, so this asserts the
3205        // weaker but structural property: boot reads zero events
3206        // into memory regardless of how many are on disk.
3207        //
3208        // We persist 50 events across 3 tenants in session 1,
3209        // restart in session 2, and verify session 2's
3210        // total_events is 0. The actual boot wall-clock isn't
3211        // asserted here — it's machine-dependent — but the absence
3212        // of any in-memory data is the structural proxy that the
3213        // boot path no longer iterates Parquet.
3214        let temp_dir = TempDir::new().unwrap();
3215        let storage_dir = temp_dir.path().to_path_buf();
3216
3217        {
3218            let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3219            for tenant in ["alice", "bob", "carol"] {
3220                for i in 0..50 / 3 {
3221                    store
3222                        .ingest(
3223                            &Event::from_strings(
3224                                "test.event".to_string(),
3225                                format!("{tenant}-{i}"),
3226                                tenant.to_string(),
3227                                serde_json::json!({"i": i}),
3228                                None,
3229                            )
3230                            .unwrap(),
3231                        )
3232                        .unwrap();
3233                }
3234            }
3235            store.flush_storage().unwrap();
3236        }
3237
3238        // Confirm there is in fact data on disk to load.
3239        let on_disk = find_parquet_files(&storage_dir);
3240        assert!(
3241            !on_disk.is_empty(),
3242            "session 1 should have produced parquet files; pre-condition for the test"
3243        );
3244
3245        let started = std::time::Instant::now();
3246        let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3247        let boot_elapsed = started.elapsed();
3248
3249        assert_eq!(
3250            store.stats().total_events,
3251            0,
3252            "boot must not pre-load any Parquet events"
3253        );
3254
3255        // Sanity: even on a slow CI box, an O(1) boot finishes in
3256        // well under a second. If this trips it's a strong signal
3257        // the boot path regressed to scanning the Parquet tree.
3258        assert!(
3259            boot_elapsed < std::time::Duration::from_secs(2),
3260            "boot took {boot_elapsed:?} — Step 2 boot should be O(1)"
3261        );
3262    }
3263
3264    #[test]
3265    fn test_query_warm_tenant_does_not_re_read_disk() {
3266        // Performance contract: a warm tenant query goes through the
3267        // DashMap fast path. We can't easily assert "no disk read"
3268        // directly in a unit test, but we CAN assert the call
3269        // succeeds in O(in-memory-events) time even after the
3270        // on-disk file is removed — proving we didn't re-walk it.
3271        let temp_dir = TempDir::new().unwrap();
3272        let storage_dir = temp_dir.path().to_path_buf();
3273
3274        let store = EventStore::with_config(EventStoreConfig::with_persistence(&storage_dir));
3275        for i in 0..3 {
3276            let event = Event::from_strings(
3277                "test.event".to_string(),
3278                format!("e-{i}"),
3279                "alice".to_string(),
3280                serde_json::json!({"i": i}),
3281                None,
3282            )
3283            .unwrap();
3284            store.ingest(&event).unwrap();
3285        }
3286        store.flush_storage().unwrap();
3287
3288        // First query: cold, hits disk.
3289        let _ = store
3290            .query(&QueryEventsRequest {
3291                entity_id: None,
3292                event_type: None,
3293                tenant_id: Some("alice".to_string()),
3294                as_of: None,
3295                since: None,
3296                until: None,
3297                limit: None,
3298                event_type_prefix: None,
3299                payload_filter: None,
3300            })
3301            .unwrap();
3302        assert!(store.is_tenant_loaded("alice"));
3303
3304        // Now wipe the on-disk file. A warm-path query must still
3305        // succeed because it doesn't need disk.
3306        let parquet_files = find_parquet_files(&storage_dir);
3307        for f in parquet_files {
3308            std::fs::remove_file(&f).unwrap();
3309        }
3310
3311        let results = store
3312            .query(&QueryEventsRequest {
3313                entity_id: None,
3314                event_type: None,
3315                tenant_id: Some("alice".to_string()),
3316                as_of: None,
3317                since: None,
3318                until: None,
3319                limit: None,
3320                event_type_prefix: None,
3321                payload_filter: None,
3322            })
3323            .unwrap();
3324        assert_eq!(
3325            results.len(),
3326            3,
3327            "warm tenant query must not need disk; got {} events from a deleted parquet",
3328            results.len()
3329        );
3330    }
3331
3332    #[test]
3333    fn test_event_store_default() {
3334        let store = EventStore::default();
3335        assert_eq!(store.stats().total_events, 0);
3336    }
3337
3338    #[test]
3339    fn test_ingest_single_event() {
3340        let store = EventStore::new();
3341        let event = create_test_event("entity-1", "user.created");
3342
3343        store.ingest(&event).unwrap();
3344
3345        assert_eq!(store.stats().total_events, 1);
3346        assert_eq!(store.stats().total_ingested, 1);
3347    }
3348
3349    #[test]
3350    fn test_ingest_multiple_events() {
3351        let store = EventStore::new();
3352
3353        for i in 0..10 {
3354            let event = create_test_event(&format!("entity-{i}"), "user.created");
3355            store.ingest(&event).unwrap();
3356        }
3357
3358        assert_eq!(store.stats().total_events, 10);
3359        assert_eq!(store.stats().total_ingested, 10);
3360    }
3361
3362    #[test]
3363    fn test_query_by_entity_id() {
3364        let store = EventStore::new();
3365
3366        store
3367            .ingest(&create_test_event("entity-1", "user.created"))
3368            .unwrap();
3369        store
3370            .ingest(&create_test_event("entity-2", "user.created"))
3371            .unwrap();
3372        store
3373            .ingest(&create_test_event("entity-1", "user.updated"))
3374            .unwrap();
3375
3376        let results = store
3377            .query(&QueryEventsRequest {
3378                entity_id: Some("entity-1".to_string()),
3379                event_type: None,
3380                tenant_id: None,
3381                as_of: None,
3382                since: None,
3383                until: None,
3384                limit: None,
3385                event_type_prefix: None,
3386                payload_filter: None,
3387            })
3388            .unwrap();
3389
3390        assert_eq!(results.len(), 2);
3391    }
3392
3393    #[test]
3394    fn test_query_by_event_type() {
3395        let store = EventStore::new();
3396
3397        store
3398            .ingest(&create_test_event("entity-1", "user.created"))
3399            .unwrap();
3400        store
3401            .ingest(&create_test_event("entity-2", "user.updated"))
3402            .unwrap();
3403        store
3404            .ingest(&create_test_event("entity-3", "user.created"))
3405            .unwrap();
3406
3407        let results = store
3408            .query(&QueryEventsRequest {
3409                entity_id: None,
3410                event_type: Some("user.created".to_string()),
3411                tenant_id: None,
3412                as_of: None,
3413                since: None,
3414                until: None,
3415                limit: None,
3416                event_type_prefix: None,
3417                payload_filter: None,
3418            })
3419            .unwrap();
3420
3421        assert_eq!(results.len(), 2);
3422    }
3423
3424    #[test]
3425    fn test_query_with_limit() {
3426        let store = EventStore::new();
3427
3428        for i in 0..10 {
3429            let event = create_test_event(&format!("entity-{i}"), "user.created");
3430            store.ingest(&event).unwrap();
3431        }
3432
3433        let results = store
3434            .query(&QueryEventsRequest {
3435                entity_id: None,
3436                event_type: None,
3437                tenant_id: None,
3438                as_of: None,
3439                since: None,
3440                until: None,
3441                limit: Some(5),
3442                event_type_prefix: None,
3443                payload_filter: None,
3444            })
3445            .unwrap();
3446
3447        assert_eq!(results.len(), 5);
3448    }
3449
3450    #[test]
3451    fn test_query_empty_store() {
3452        let store = EventStore::new();
3453
3454        let results = store
3455            .query(&QueryEventsRequest {
3456                entity_id: Some("non-existent".to_string()),
3457                event_type: None,
3458                tenant_id: None,
3459                as_of: None,
3460                since: None,
3461                until: None,
3462                limit: None,
3463                event_type_prefix: None,
3464                payload_filter: None,
3465            })
3466            .unwrap();
3467
3468        assert!(results.is_empty());
3469    }
3470
3471    #[test]
3472    fn test_reconstruct_state() {
3473        let store = EventStore::new();
3474
3475        store
3476            .ingest(&create_test_event("entity-1", "user.created"))
3477            .unwrap();
3478
3479        let state = store.reconstruct_state("entity-1", None).unwrap();
3480        // The state is wrapped with metadata
3481        assert_eq!(state["current_state"]["name"], "Test");
3482        assert_eq!(state["current_state"]["value"], 42);
3483    }
3484
3485    #[test]
3486    fn test_reconstruct_state_not_found() {
3487        let store = EventStore::new();
3488
3489        let result = store.reconstruct_state("non-existent", None);
3490        assert!(result.is_err());
3491    }
3492
3493    #[test]
3494    fn test_get_snapshot_empty() {
3495        let store = EventStore::new();
3496
3497        let result = store.get_snapshot("non-existent");
3498        // Entity not found error is expected
3499        assert!(result.is_err());
3500    }
3501
3502    #[test]
3503    fn test_create_snapshot() {
3504        let store = EventStore::new();
3505
3506        store
3507            .ingest(&create_test_event("entity-1", "user.created"))
3508            .unwrap();
3509
3510        store.create_snapshot("entity-1").unwrap();
3511
3512        // Verify snapshot was created
3513        let snapshot = store.get_snapshot("entity-1").unwrap();
3514        assert!(snapshot != serde_json::json!(null));
3515    }
3516
3517    #[test]
3518    fn test_create_snapshot_entity_not_found() {
3519        let store = EventStore::new();
3520
3521        let result = store.create_snapshot("non-existent");
3522        assert!(result.is_err());
3523    }
3524
3525    #[test]
3526    fn test_websocket_manager() {
3527        let store = EventStore::new();
3528        let manager = store.websocket_manager();
3529        // Manager should be accessible
3530        assert!(Arc::strong_count(&manager) >= 1);
3531    }
3532
3533    #[test]
3534    fn test_snapshot_manager() {
3535        let store = EventStore::new();
3536        let manager = store.snapshot_manager();
3537        assert!(Arc::strong_count(&manager) >= 1);
3538    }
3539
3540    #[test]
3541    fn test_compaction_manager_none() {
3542        let store = EventStore::new();
3543        // Without storage_dir, compaction manager should be None
3544        assert!(store.compaction_manager().is_none());
3545    }
3546
3547    #[test]
3548    fn test_schema_registry() {
3549        let store = EventStore::new();
3550        let registry = store.schema_registry();
3551        assert!(Arc::strong_count(&registry) >= 1);
3552    }
3553
3554    #[test]
3555    fn test_replay_manager() {
3556        let store = EventStore::new();
3557        let manager = store.replay_manager();
3558        assert!(Arc::strong_count(&manager) >= 1);
3559    }
3560
3561    #[test]
3562    fn test_pipeline_manager() {
3563        let store = EventStore::new();
3564        let manager = store.pipeline_manager();
3565        assert!(Arc::strong_count(&manager) >= 1);
3566    }
3567
3568    #[test]
3569    fn test_projection_manager() {
3570        let store = EventStore::new();
3571        let manager = store.projection_manager();
3572        // Built-in projections should be registered
3573        let projections = manager.list_projections();
3574        assert!(projections.len() >= 2); // entity_snapshots and event_counters
3575    }
3576
3577    #[test]
3578    fn test_projection_state_cache() {
3579        let store = EventStore::new();
3580        let cache = store.projection_state_cache();
3581
3582        cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
3583        assert_eq!(cache.len(), 1);
3584
3585        let value = cache.get("test:key").unwrap();
3586        assert_eq!(value["value"], 123);
3587    }
3588
3589    #[test]
3590    fn test_metrics() {
3591        let store = EventStore::new();
3592        let metrics = store.metrics();
3593        assert!(Arc::strong_count(&metrics) >= 1);
3594    }
3595
3596    #[test]
3597    fn test_store_stats() {
3598        let store = EventStore::new();
3599
3600        store
3601            .ingest(&create_test_event("entity-1", "user.created"))
3602            .unwrap();
3603        store
3604            .ingest(&create_test_event("entity-2", "order.placed"))
3605            .unwrap();
3606
3607        let stats = store.stats();
3608        assert_eq!(stats.total_events, 2);
3609        assert_eq!(stats.total_entities, 2);
3610        assert_eq!(stats.total_event_types, 2);
3611        assert_eq!(stats.total_ingested, 2);
3612    }
3613
3614    #[test]
3615    fn test_event_store_config_default() {
3616        let config = EventStoreConfig::default();
3617        assert!(config.storage_dir.is_none());
3618        assert!(config.wal_dir.is_none());
3619    }
3620
3621    #[test]
3622    fn test_event_store_config_with_persistence() {
3623        let temp_dir = TempDir::new().unwrap();
3624        let config = EventStoreConfig::with_persistence(temp_dir.path());
3625
3626        assert!(config.storage_dir.is_some());
3627        assert!(config.wal_dir.is_none());
3628    }
3629
3630    #[test]
3631    fn test_event_store_config_with_wal() {
3632        let temp_dir = TempDir::new().unwrap();
3633        let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
3634
3635        assert!(config.storage_dir.is_none());
3636        assert!(config.wal_dir.is_some());
3637    }
3638
3639    #[test]
3640    fn test_event_store_config_with_all() {
3641        let temp_dir = TempDir::new().unwrap();
3642        let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
3643
3644        assert!(config.storage_dir.is_some());
3645    }
3646
3647    #[test]
3648    fn test_event_store_config_production() {
3649        let storage_dir = TempDir::new().unwrap();
3650        let wal_dir = TempDir::new().unwrap();
3651        let config = EventStoreConfig::production(
3652            storage_dir.path(),
3653            wal_dir.path(),
3654            SnapshotConfig::default(),
3655            WALConfig::default(),
3656            CompactionConfig::default(),
3657        );
3658
3659        assert!(config.storage_dir.is_some());
3660        assert!(config.wal_dir.is_some());
3661    }
3662
3663    // -----------------------------------------------------------------------
3664    // from_env_vars tests — verifies the env-var-to-config wiring that
3665    // caused the durability bug (events lost on restart) in v0.10.3.
3666    // -----------------------------------------------------------------------
3667
3668    #[test]
3669    fn test_from_env_vars_data_dir_enables_full_persistence() {
3670        let (config, mode) = EventStoreConfig::from_env_vars(
3671            Some("/app/data".to_string()),
3672            None,
3673            None,
3674            None,
3675            None,
3676            None,
3677            None,
3678            None,
3679        );
3680        assert_eq!(mode, "wal+parquet");
3681        assert_eq!(
3682            config.storage_dir.unwrap().to_str().unwrap(),
3683            "/app/data/storage"
3684        );
3685        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
3686    }
3687
3688    #[test]
3689    fn test_from_env_vars_explicit_dirs() {
3690        let (config, mode) = EventStoreConfig::from_env_vars(
3691            None,
3692            Some("/custom/storage".to_string()),
3693            Some("/custom/wal".to_string()),
3694            None,
3695            None,
3696            None,
3697            None,
3698            None,
3699        );
3700        assert_eq!(mode, "wal+parquet");
3701        assert_eq!(
3702            config.storage_dir.unwrap().to_str().unwrap(),
3703            "/custom/storage"
3704        );
3705        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
3706    }
3707
3708    #[test]
3709    fn test_from_env_vars_wal_disabled() {
3710        let (config, mode) = EventStoreConfig::from_env_vars(
3711            Some("/app/data".to_string()),
3712            None,
3713            None,
3714            Some("false".to_string()),
3715            None,
3716            None,
3717            None,
3718            None,
3719        );
3720        assert_eq!(mode, "parquet-only");
3721        assert!(config.storage_dir.is_some());
3722        assert!(config.wal_dir.is_none());
3723    }
3724
3725    #[test]
3726    fn test_from_env_vars_no_dirs_is_in_memory() {
3727        let (config, mode) =
3728            EventStoreConfig::from_env_vars(None, None, None, None, None, None, None, None);
3729        assert_eq!(mode, "in-memory");
3730        assert!(config.storage_dir.is_none());
3731        assert!(config.wal_dir.is_none());
3732    }
3733
3734    #[test]
3735    fn test_from_env_vars_empty_strings_treated_as_none() {
3736        let (_, mode) = EventStoreConfig::from_env_vars(
3737            Some(String::new()),
3738            Some(String::new()),
3739            Some(String::new()),
3740            None,
3741            None,
3742            None,
3743            None,
3744            None,
3745        );
3746        assert_eq!(mode, "in-memory");
3747    }
3748
3749    #[test]
3750    fn test_from_env_vars_explicit_overrides_data_dir() {
3751        let (config, mode) = EventStoreConfig::from_env_vars(
3752            Some("/app/data".to_string()),
3753            Some("/override/storage".to_string()),
3754            Some("/override/wal".to_string()),
3755            None,
3756            None,
3757            None,
3758            None,
3759            None,
3760        );
3761        assert_eq!(mode, "wal+parquet");
3762        assert_eq!(
3763            config.storage_dir.unwrap().to_str().unwrap(),
3764            "/override/storage"
3765        );
3766        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
3767    }
3768
3769    #[test]
3770    fn test_from_env_vars_wal_only() {
3771        let (config, mode) = EventStoreConfig::from_env_vars(
3772            None,
3773            None,
3774            Some("/wal/only".to_string()),
3775            None,
3776            None,
3777            None,
3778            None,
3779            None,
3780        );
3781        assert_eq!(mode, "wal-only");
3782        assert!(config.storage_dir.is_none());
3783        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
3784    }
3785
3786    #[test]
3787    fn test_from_env_vars_cache_bytes_parses_decimal() {
3788        let (config, _) = EventStoreConfig::from_env_vars(
3789            Some("/app/data".to_string()),
3790            None,
3791            None,
3792            None,
3793            Some("536870912".to_string()),
3794            // 512 MiB
3795            None,
3796            None,
3797            None,
3798        );
3799        assert_eq!(config.cache_byte_budget, Some(536_870_912));
3800    }
3801
3802    #[test]
3803    fn test_from_env_vars_cache_bytes_unparseable_disables_budget() {
3804        // Garbage in CACHE_BYTES doesn't fail boot — we log and
3805        // fall back to no-budget. The unbounded fallback is safe
3806        // (just the pre-Step-3 behavior).
3807        let (config, _) = EventStoreConfig::from_env_vars(
3808            Some("/app/data".to_string()),
3809            None,
3810            None,
3811            None,
3812            Some("not-a-number".to_string()),
3813            None,
3814            None,
3815            None,
3816        );
3817        assert_eq!(config.cache_byte_budget, None);
3818    }
3819
3820    #[test]
3821    fn test_from_env_vars_cache_bytes_empty_disables_budget() {
3822        let (config, _) = EventStoreConfig::from_env_vars(
3823            Some("/app/data".to_string()),
3824            None,
3825            None,
3826            None,
3827            Some(String::new()),
3828            None,
3829            None,
3830            None,
3831        );
3832        assert_eq!(config.cache_byte_budget, None);
3833    }
3834
3835    #[test]
3836    fn test_from_env_vars_snapshot_interval_overrides_default() {
3837        // ALLSOURCE_SNAPSHOT_INTERVAL_SECONDS plumbs through to
3838        // CompactionConfig.compaction_interval_seconds. Default is
3839        // 3600s (hourly) per the bead.
3840        let (config, _) = EventStoreConfig::from_env_vars(
3841            Some("/app/data".to_string()),
3842            None,
3843            None,
3844            None,
3845            None,
3846            Some("60".to_string()),
3847            None,
3848            None,
3849        );
3850        assert_eq!(config.compaction_config.compaction_interval_seconds, 60);
3851    }
3852
3853    #[test]
3854    fn test_from_env_vars_snapshot_interval_default_is_hourly() {
3855        let (config, _) = EventStoreConfig::from_env_vars(
3856            Some("/app/data".to_string()),
3857            None,
3858            None,
3859            None,
3860            None,
3861            None,
3862            None,
3863            None,
3864        );
3865        assert_eq!(config.compaction_config.compaction_interval_seconds, 3600);
3866    }
3867
3868    #[test]
3869    fn test_from_env_vars_snapshot_interval_unparseable_falls_back() {
3870        let (config, _) = EventStoreConfig::from_env_vars(
3871            Some("/app/data".to_string()),
3872            None,
3873            None,
3874            None,
3875            None,
3876            Some("not-a-number".to_string()),
3877            None,
3878            None,
3879        );
3880        assert_eq!(config.compaction_config.compaction_interval_seconds, 3600);
3881    }
3882
3883    #[test]
3884    fn test_from_env_vars_retention_system_days_overrides_default() {
3885        // Step 5: ALLSOURCE_RETENTION_SYSTEM_DAYS overrides the
3886        // default 30-day TTL for the system tenant.
3887        let (config, _) = EventStoreConfig::from_env_vars(
3888            Some("/app/data".to_string()),
3889            None,
3890            None,
3891            None,
3892            None,
3893            None,
3894            Some("7".to_string()),
3895            None,
3896        );
3897        let ttl = config
3898            .compaction_config
3899            .retention
3900            .ttl_for("system")
3901            .unwrap();
3902        assert_eq!(ttl.as_secs(), 7 * 24 * 3600);
3903    }
3904
3905    #[test]
3906    fn test_from_env_vars_retention_default_is_30_days_for_system() {
3907        let (config, _) = EventStoreConfig::from_env_vars(
3908            Some("/app/data".to_string()),
3909            None,
3910            None,
3911            None,
3912            None,
3913            None,
3914            None,
3915            None,
3916        );
3917        let ttl = config
3918            .compaction_config
3919            .retention
3920            .ttl_for("system")
3921            .unwrap();
3922        assert_eq!(ttl.as_secs(), 30 * 24 * 3600);
3923        // Other tenants keep forever by default.
3924        assert!(config.compaction_config.retention.ttl_for("acme").is_none());
3925    }
3926
3927    #[test]
3928    fn test_store_stats_serde() {
3929        let stats = StoreStats {
3930            total_events: 100,
3931            total_entities: 50,
3932            total_event_types: 10,
3933            total_ingested: 100,
3934        };
3935
3936        let json = serde_json::to_string(&stats).unwrap();
3937        assert!(json.contains("\"total_events\":100"));
3938        assert!(json.contains("\"total_entities\":50"));
3939    }
3940
3941    #[test]
3942    fn test_query_with_entity_and_type() {
3943        let store = EventStore::new();
3944
3945        store
3946            .ingest(&create_test_event("entity-1", "user.created"))
3947            .unwrap();
3948        store
3949            .ingest(&create_test_event("entity-1", "user.updated"))
3950            .unwrap();
3951        store
3952            .ingest(&create_test_event("entity-2", "user.created"))
3953            .unwrap();
3954
3955        let results = store
3956            .query(&QueryEventsRequest {
3957                entity_id: Some("entity-1".to_string()),
3958                event_type: Some("user.created".to_string()),
3959                tenant_id: None,
3960                as_of: None,
3961                since: None,
3962                until: None,
3963                limit: None,
3964                event_type_prefix: None,
3965                payload_filter: None,
3966            })
3967            .unwrap();
3968
3969        assert_eq!(results.len(), 1);
3970        assert_eq!(results[0].event_type_str(), "user.created");
3971    }
3972
3973    #[test]
3974    fn test_query_by_event_type_prefix() {
3975        let store = EventStore::new();
3976
3977        // Ingest events with various types
3978        store
3979            .ingest(&create_test_event("entity-1", "index.created"))
3980            .unwrap();
3981        store
3982            .ingest(&create_test_event("entity-2", "index.updated"))
3983            .unwrap();
3984        store
3985            .ingest(&create_test_event("entity-3", "trade.created"))
3986            .unwrap();
3987        store
3988            .ingest(&create_test_event("entity-4", "trade.completed"))
3989            .unwrap();
3990        store
3991            .ingest(&create_test_event("entity-5", "balance.updated"))
3992            .unwrap();
3993
3994        // Query with prefix "index." should return exactly 2
3995        let results = store
3996            .query(&QueryEventsRequest {
3997                entity_id: None,
3998                event_type: None,
3999                tenant_id: None,
4000                as_of: None,
4001                since: None,
4002                until: None,
4003                limit: None,
4004                event_type_prefix: Some("index.".to_string()),
4005                payload_filter: None,
4006            })
4007            .unwrap();
4008
4009        assert_eq!(results.len(), 2);
4010        assert!(
4011            results
4012                .iter()
4013                .all(|e| e.event_type_str().starts_with("index."))
4014        );
4015    }
4016
4017    #[test]
4018    fn test_query_by_event_type_prefix_empty_returns_all() {
4019        let store = EventStore::new();
4020
4021        store
4022            .ingest(&create_test_event("entity-1", "index.created"))
4023            .unwrap();
4024        store
4025            .ingest(&create_test_event("entity-2", "trade.created"))
4026            .unwrap();
4027
4028        // Empty prefix matches all types
4029        let results = store
4030            .query(&QueryEventsRequest {
4031                entity_id: None,
4032                event_type: None,
4033                tenant_id: None,
4034                as_of: None,
4035                since: None,
4036                until: None,
4037                limit: None,
4038                event_type_prefix: Some(String::new()),
4039                payload_filter: None,
4040            })
4041            .unwrap();
4042
4043        assert_eq!(results.len(), 2);
4044    }
4045
4046    #[test]
4047    fn test_query_by_event_type_prefix_no_match() {
4048        let store = EventStore::new();
4049
4050        store
4051            .ingest(&create_test_event("entity-1", "index.created"))
4052            .unwrap();
4053
4054        let results = store
4055            .query(&QueryEventsRequest {
4056                entity_id: None,
4057                event_type: None,
4058                tenant_id: None,
4059                as_of: None,
4060                since: None,
4061                until: None,
4062                limit: None,
4063                event_type_prefix: Some("nonexistent.".to_string()),
4064                payload_filter: None,
4065            })
4066            .unwrap();
4067
4068        assert!(results.is_empty());
4069    }
4070
4071    #[test]
4072    fn test_query_by_entity_with_type_prefix() {
4073        let store = EventStore::new();
4074
4075        store
4076            .ingest(&create_test_event("entity-1", "index.created"))
4077            .unwrap();
4078        store
4079            .ingest(&create_test_event("entity-1", "trade.created"))
4080            .unwrap();
4081        store
4082            .ingest(&create_test_event("entity-2", "index.updated"))
4083            .unwrap();
4084
4085        // Query entity-1 with prefix "index." should return 1
4086        let results = store
4087            .query(&QueryEventsRequest {
4088                entity_id: Some("entity-1".to_string()),
4089                event_type: None,
4090                tenant_id: None,
4091                as_of: None,
4092                since: None,
4093                until: None,
4094                limit: None,
4095                event_type_prefix: Some("index.".to_string()),
4096                payload_filter: None,
4097            })
4098            .unwrap();
4099
4100        assert_eq!(results.len(), 1);
4101        assert_eq!(results[0].event_type_str(), "index.created");
4102    }
4103
4104    #[test]
4105    fn test_query_prefix_with_limit() {
4106        let store = EventStore::new();
4107
4108        for i in 0..5 {
4109            store
4110                .ingest(&create_test_event(&format!("entity-{i}"), "index.created"))
4111                .unwrap();
4112        }
4113
4114        let results = store
4115            .query(&QueryEventsRequest {
4116                entity_id: None,
4117                event_type: None,
4118                tenant_id: None,
4119                as_of: None,
4120                since: None,
4121                until: None,
4122                limit: Some(3),
4123                event_type_prefix: Some("index.".to_string()),
4124                payload_filter: None,
4125            })
4126            .unwrap();
4127
4128        assert_eq!(results.len(), 3);
4129    }
4130
4131    #[test]
4132    fn test_query_prefix_alongside_existing_filters() {
4133        let store = EventStore::new();
4134
4135        store
4136            .ingest(&create_test_event("entity-1", "index.created"))
4137            .unwrap();
4138        // Sleep briefly to ensure different timestamps
4139        std::thread::sleep(std::time::Duration::from_millis(10));
4140        store
4141            .ingest(&create_test_event("entity-2", "index.strategy.updated"))
4142            .unwrap();
4143        std::thread::sleep(std::time::Duration::from_millis(10));
4144        store
4145            .ingest(&create_test_event("entity-3", "index.deleted"))
4146            .unwrap();
4147
4148        // Prefix with limit
4149        let results = store
4150            .query(&QueryEventsRequest {
4151                entity_id: None,
4152                event_type: None,
4153                tenant_id: None,
4154                as_of: None,
4155                since: None,
4156                until: None,
4157                limit: Some(2),
4158                event_type_prefix: Some("index.".to_string()),
4159                payload_filter: None,
4160            })
4161            .unwrap();
4162
4163        assert_eq!(results.len(), 2);
4164    }
4165
4166    #[test]
4167    fn test_query_with_payload_filter() {
4168        let store = EventStore::new();
4169
4170        // Ingest 5 events with user_id=alice
4171        for i in 0..5 {
4172            store
4173                .ingest(&create_test_event_with_payload(
4174                    &format!("entity-{i}"),
4175                    "user.action",
4176                    serde_json::json!({"user_id": "alice", "action": "click"}),
4177                ))
4178                .unwrap();
4179        }
4180        // Ingest 5 events with user_id=bob
4181        for i in 5..10 {
4182            store
4183                .ingest(&create_test_event_with_payload(
4184                    &format!("entity-{i}"),
4185                    "user.action",
4186                    serde_json::json!({"user_id": "bob", "action": "view"}),
4187                ))
4188                .unwrap();
4189        }
4190
4191        // Filter for alice
4192        let results = store
4193            .query(&QueryEventsRequest {
4194                entity_id: None,
4195                event_type: Some("user.action".to_string()),
4196                tenant_id: None,
4197                as_of: None,
4198                since: None,
4199                until: None,
4200                limit: None,
4201                event_type_prefix: None,
4202                payload_filter: Some(r#"{"user_id":"alice"}"#.to_string()),
4203            })
4204            .unwrap();
4205
4206        assert_eq!(results.len(), 5);
4207    }
4208
4209    #[test]
4210    fn test_query_payload_filter_non_existent_field() {
4211        let store = EventStore::new();
4212
4213        store
4214            .ingest(&create_test_event_with_payload(
4215                "entity-1",
4216                "user.action",
4217                serde_json::json!({"user_id": "alice"}),
4218            ))
4219            .unwrap();
4220
4221        // Filter for a field that doesn't exist — returns 0, not error
4222        let results = store
4223            .query(&QueryEventsRequest {
4224                entity_id: None,
4225                event_type: None,
4226                tenant_id: None,
4227                as_of: None,
4228                since: None,
4229                until: None,
4230                limit: None,
4231                event_type_prefix: None,
4232                payload_filter: Some(r#"{"nonexistent":"value"}"#.to_string()),
4233            })
4234            .unwrap();
4235
4236        assert!(results.is_empty());
4237    }
4238
4239    #[test]
4240    fn test_query_payload_filter_with_prefix() {
4241        let store = EventStore::new();
4242
4243        store
4244            .ingest(&create_test_event_with_payload(
4245                "entity-1",
4246                "index.created",
4247                serde_json::json!({"status": "active"}),
4248            ))
4249            .unwrap();
4250        store
4251            .ingest(&create_test_event_with_payload(
4252                "entity-2",
4253                "index.created",
4254                serde_json::json!({"status": "inactive"}),
4255            ))
4256            .unwrap();
4257        store
4258            .ingest(&create_test_event_with_payload(
4259                "entity-3",
4260                "trade.created",
4261                serde_json::json!({"status": "active"}),
4262            ))
4263            .unwrap();
4264
4265        // Combine prefix + payload filter
4266        let results = store
4267            .query(&QueryEventsRequest {
4268                entity_id: None,
4269                event_type: None,
4270                tenant_id: None,
4271                as_of: None,
4272                since: None,
4273                until: None,
4274                limit: None,
4275                event_type_prefix: Some("index.".to_string()),
4276                payload_filter: Some(r#"{"status":"active"}"#.to_string()),
4277            })
4278            .unwrap();
4279
4280        assert_eq!(results.len(), 1);
4281        assert_eq!(results[0].entity_id().to_string(), "entity-1");
4282    }
4283
4284    #[test]
4285    fn test_flush_storage_no_storage() {
4286        let store = EventStore::new();
4287        // Without storage, flush should succeed (no-op)
4288        let result = store.flush_storage();
4289        assert!(result.is_ok());
4290    }
4291
4292    #[test]
4293    fn test_state_evolution() {
4294        let store = EventStore::new();
4295
4296        // Initial state
4297        store
4298            .ingest(
4299                &Event::from_strings(
4300                    "user.created".to_string(),
4301                    "user-1".to_string(),
4302                    "default".to_string(),
4303                    serde_json::json!({"name": "Alice", "age": 25}),
4304                    None,
4305                )
4306                .unwrap(),
4307            )
4308            .unwrap();
4309
4310        // Update state
4311        store
4312            .ingest(
4313                &Event::from_strings(
4314                    "user.updated".to_string(),
4315                    "user-1".to_string(),
4316                    "default".to_string(),
4317                    serde_json::json!({"age": 26}),
4318                    None,
4319                )
4320                .unwrap(),
4321            )
4322            .unwrap();
4323
4324        let state = store.reconstruct_state("user-1", None).unwrap();
4325        // The state is wrapped with metadata
4326        assert_eq!(state["current_state"]["name"], "Alice");
4327        assert_eq!(state["current_state"]["age"], 26);
4328    }
4329
4330    #[test]
4331    fn test_reject_system_event_types() {
4332        let store = EventStore::new();
4333
4334        // System event types should be rejected via user-facing ingestion
4335        let event = Event::reconstruct_from_strings(
4336            uuid::Uuid::new_v4(),
4337            "_system.tenant.created".to_string(),
4338            "_system:tenant:acme".to_string(),
4339            "_system".to_string(),
4340            serde_json::json!({"name": "ACME"}),
4341            chrono::Utc::now(),
4342            None,
4343            1,
4344        );
4345
4346        let result = store.ingest(&event);
4347        assert!(result.is_err());
4348        let err = result.unwrap_err();
4349        assert!(
4350            err.to_string().contains("reserved for internal use"),
4351            "Expected system namespace rejection, got: {err}"
4352        );
4353    }
4354
4355    // -----------------------------------------------------------------------
4356    // Crash recovery: WAL events survive restart via Parquet checkpoint.
4357    // Regression test for GitHub issue #84 — flush_storage() was a no-op
4358    // during recovery because events were never buffered into Parquet's
4359    // current_batch before flushing.
4360    // -----------------------------------------------------------------------
4361
4362    #[test]
4363    fn test_wal_recovery_checkpoints_to_parquet() {
4364        let data_dir = TempDir::new().unwrap();
4365        let storage_dir = data_dir.path().join("storage");
4366        let wal_dir = data_dir.path().join("wal");
4367
4368        // Session 1: ingest events with WAL + Parquet
4369        {
4370            let config = EventStoreConfig::production(
4371                &storage_dir,
4372                &wal_dir,
4373                SnapshotConfig::default(),
4374                WALConfig {
4375                    sync_on_write: true,
4376                    ..WALConfig::default()
4377                },
4378                CompactionConfig::default(),
4379            );
4380            let store = EventStore::with_config(config);
4381
4382            for i in 0..5 {
4383                let event = Event::from_strings(
4384                    "test.created".to_string(),
4385                    format!("entity-{i}"),
4386                    "default".to_string(),
4387                    serde_json::json!({"index": i}),
4388                    None,
4389                )
4390                .unwrap();
4391                store.ingest(&event).unwrap();
4392            }
4393
4394            assert_eq!(store.stats().total_events, 5);
4395
4396            // Do NOT call flush_storage or shutdown — simulate a crash.
4397            // Events are in WAL (sync_on_write: true) but NOT in Parquet.
4398        }
4399
4400        // Verify WAL file has data
4401        let wal_files: Vec<_> = std::fs::read_dir(&wal_dir)
4402            .unwrap()
4403            .filter_map(std::result::Result::ok)
4404            .filter(|e| e.path().extension().is_some_and(|ext| ext == "log"))
4405            .collect();
4406        assert!(!wal_files.is_empty(), "WAL file should exist");
4407        let wal_size = wal_files[0].metadata().unwrap().len();
4408        assert!(wal_size > 0, "WAL file should have data (got 0 bytes)");
4409
4410        // Session 2: reopen — recovery should checkpoint WAL to Parquet, then truncate
4411        {
4412            let config = EventStoreConfig::production(
4413                &storage_dir,
4414                &wal_dir,
4415                SnapshotConfig::default(),
4416                WALConfig {
4417                    sync_on_write: true,
4418                    ..WALConfig::default()
4419                },
4420                CompactionConfig::default(),
4421            );
4422            let store = EventStore::with_config(config);
4423
4424            // Events should be recovered
4425            assert_eq!(
4426                store.stats().total_events,
4427                5,
4428                "Session 2 should have all 5 events after WAL recovery"
4429            );
4430
4431            // Parquet should now have files (checkpoint happened).
4432            // After Step 1, files live under <root>/<tenant>/<yyyy-mm>/,
4433            // so walk recursively.
4434            let parquet_files = find_parquet_files(&storage_dir);
4435            assert!(
4436                !parquet_files.is_empty(),
4437                "Parquet file should exist after WAL checkpoint"
4438            );
4439        }
4440
4441        // Session 3: reopen again — events should be reachable via
4442        // lazy-load (Step 2: boot does not pre-load Parquet).
4443        {
4444            let config = EventStoreConfig::production(
4445                &storage_dir,
4446                &wal_dir,
4447                SnapshotConfig::default(),
4448                WALConfig {
4449                    sync_on_write: true,
4450                    ..WALConfig::default()
4451                },
4452                CompactionConfig::default(),
4453            );
4454            let store = EventStore::with_config(config);
4455
4456            // Boot is now O(1) — Parquet stays cold until first
4457            // per-tenant query. WAL was truncated in session 2,
4458            // so nothing is pre-loaded.
4459            assert_eq!(
4460                store.stats().total_events,
4461                0,
4462                "Session 3 boot should not pre-load Parquet (lazy-load mode)"
4463            );
4464
4465            // Trigger lazy load for the test tenant (events were
4466            // ingested with tenant_id=\"default\").
4467            store.ensure_tenant_loaded("default").unwrap();
4468            assert_eq!(
4469                store.stats().total_events,
4470                5,
4471                "Session 3 should have all 5 events after ensure_tenant_loaded"
4472            );
4473        }
4474    }
4475
4476    #[test]
4477    fn test_parquet_restore_surfaces_errors_not_silent() {
4478        // Write events with WAL+Parquet, flush to Parquet, then corrupt the
4479        // Parquet file. On reload, the error must be logged (not silently
4480        // swallowed as 0 events).
4481        let data_dir = TempDir::new().unwrap();
4482        let storage_dir = data_dir.path().join("storage");
4483        let wal_dir = data_dir.path().join("wal");
4484
4485        // Session 1: write events and flush to Parquet
4486        {
4487            let config = EventStoreConfig::production(
4488                &storage_dir,
4489                &wal_dir,
4490                SnapshotConfig::default(),
4491                WALConfig {
4492                    sync_on_write: true,
4493                    ..WALConfig::default()
4494                },
4495                CompactionConfig::default(),
4496            );
4497            let store = EventStore::with_config(config);
4498
4499            for i in 0..3 {
4500                let event = Event::from_strings(
4501                    "test.created".to_string(),
4502                    format!("entity-{i}"),
4503                    "default".to_string(),
4504                    serde_json::json!({"i": i}),
4505                    None,
4506                )
4507                .unwrap();
4508                store.ingest(&event).unwrap();
4509            }
4510
4511            store.flush_storage().unwrap();
4512            assert_eq!(store.stats().total_events, 3);
4513        }
4514
4515        // Verify parquet file exists. After Step 1 the file lives
4516        // under <root>/<tenant>/<yyyy-mm>/, so walk recursively.
4517        let parquet_files = find_parquet_files(&storage_dir);
4518        assert!(!parquet_files.is_empty(), "Parquet file must exist");
4519
4520        // Corrupt the parquet file
4521        std::fs::write(&parquet_files[0], b"corrupted data").unwrap();
4522
4523        // Truncate WAL so only Parquet matters
4524        for entry in std::fs::read_dir(&wal_dir).unwrap().flatten() {
4525            std::fs::write(entry.path(), b"").unwrap();
4526        }
4527
4528        // Session 2: reload — should NOT silently report 0 events.
4529        // The error is logged via tracing::error! which we can't capture in a
4530        // unit test, but we CAN verify the store has 0 events (previously this
4531        // looked identical to "no data on disk" — now there's an error log).
4532        // The key behavioral change is that with_config no longer uses a
4533        // let-chain that silently drops the Err variant.
4534        {
4535            let config = EventStoreConfig::production(
4536                &storage_dir,
4537                &wal_dir,
4538                SnapshotConfig::default(),
4539                WALConfig::default(),
4540                CompactionConfig::default(),
4541            );
4542            let store = EventStore::with_config(config);
4543
4544            // Store has 0 events because Parquet is corrupted — but the error
4545            // is now logged (not silently swallowed).
4546            assert_eq!(store.stats().total_events, 0);
4547        }
4548    }
4549
4550    // -----------------------------------------------------------------------
4551    // Step 6: Bounded WAL replay. Each successful checkpoint truncates the
4552    // WAL so cold-start replay is O(one checkpoint interval) regardless of
4553    // total dataset size.
4554    // -----------------------------------------------------------------------
4555
4556    /// Count entries in every WAL file under `wal_dir` (any line that
4557    /// parses as a valid JSON object — the line format is one
4558    /// JSON-serialized WALEntry per line, see WALFile::write_entry).
4559    fn count_wal_entries(wal_dir: &std::path::Path) -> usize {
4560        use std::io::{BufRead, BufReader};
4561        let mut total = 0usize;
4562        let Ok(entries) = std::fs::read_dir(wal_dir) else {
4563            return 0;
4564        };
4565        for entry in entries.flatten() {
4566            let path = entry.path();
4567            if path.extension().is_none_or(|e| e != "log") {
4568                continue;
4569            }
4570            let Ok(file) = std::fs::File::open(&path) else {
4571                continue;
4572            };
4573            for line in BufReader::new(file)
4574                .lines()
4575                .map_while(std::result::Result::ok)
4576            {
4577                if !line.trim().is_empty() {
4578                    total += 1;
4579                }
4580            }
4581        }
4582        total
4583    }
4584
4585    #[test]
4586    fn test_checkpoint_truncates_wal_after_flush() {
4587        // After a successful checkpoint, every previously-ingested event
4588        // should be in Parquet, and the WAL should be empty (truncated).
4589        // This is the load-bearing invariant for Step 6's bounded-replay
4590        // promise — without truncation, the WAL grows unboundedly.
4591        let data_dir = TempDir::new().unwrap();
4592        let storage_dir = data_dir.path().join("storage");
4593        let wal_dir = data_dir.path().join("wal");
4594
4595        let config = EventStoreConfig::production(
4596            &storage_dir,
4597            &wal_dir,
4598            SnapshotConfig::default(),
4599            WALConfig {
4600                sync_on_write: true,
4601                ..WALConfig::default()
4602            },
4603            CompactionConfig::default(),
4604        );
4605        let store = EventStore::with_config(config);
4606
4607        for i in 0..10 {
4608            let event = Event::from_strings(
4609                "test.created".to_string(),
4610                format!("entity-{i}"),
4611                "default".to_string(),
4612                serde_json::json!({"i": i}),
4613                None,
4614            )
4615            .unwrap();
4616            store.ingest(&event).unwrap();
4617        }
4618
4619        // Sanity: all 10 events are in the WAL pre-checkpoint.
4620        assert_eq!(
4621            count_wal_entries(&wal_dir),
4622            10,
4623            "WAL should have 10 events before checkpoint"
4624        );
4625
4626        store.checkpoint().unwrap();
4627
4628        assert_eq!(
4629            count_wal_entries(&wal_dir),
4630            0,
4631            "WAL should be empty after successful checkpoint"
4632        );
4633        let parquet_files = find_parquet_files(&storage_dir);
4634        assert!(!parquet_files.is_empty(), "Parquet should hold the events");
4635    }
4636
4637    #[test]
4638    fn test_replay_only_post_checkpoint_events_after_crash() {
4639        // Headline AC for the bead: write N events, checkpoint, write K
4640        // more, simulate a crash, restart, and verify only K events go
4641        // through replay (not N+K).
4642        //
4643        // Uses small N (50) and K (5) for test speed — the property
4644        // is the same as the spec's 1M+10k example, just scaled down.
4645        let data_dir = TempDir::new().unwrap();
4646        let storage_dir = data_dir.path().join("storage");
4647        let wal_dir = data_dir.path().join("wal");
4648
4649        let config_factory = || {
4650            EventStoreConfig::production(
4651                &storage_dir,
4652                &wal_dir,
4653                SnapshotConfig::default(),
4654                WALConfig {
4655                    sync_on_write: true,
4656                    ..WALConfig::default()
4657                },
4658                CompactionConfig::default(),
4659            )
4660        };
4661
4662        // Session 1: ingest N, checkpoint, ingest K, then drop without
4663        // a graceful shutdown — that's the crash.
4664        const N: usize = 50;
4665        const K: usize = 5;
4666        {
4667            let store = EventStore::with_config(config_factory());
4668            for i in 0..N {
4669                store
4670                    .ingest(
4671                        &Event::from_strings(
4672                            "pre.checkpoint".to_string(),
4673                            format!("e-{i}"),
4674                            "default".to_string(),
4675                            serde_json::json!({"i": i}),
4676                            None,
4677                        )
4678                        .unwrap(),
4679                    )
4680                    .unwrap();
4681            }
4682            store.checkpoint().unwrap();
4683            assert_eq!(
4684                count_wal_entries(&wal_dir),
4685                0,
4686                "WAL should be empty immediately after checkpoint"
4687            );
4688
4689            for i in 0..K {
4690                store
4691                    .ingest(
4692                        &Event::from_strings(
4693                            "post.checkpoint".to_string(),
4694                            format!("p-{i}"),
4695                            "default".to_string(),
4696                            serde_json::json!({"i": i}),
4697                            None,
4698                        )
4699                        .unwrap(),
4700                    )
4701                    .unwrap();
4702            }
4703            assert_eq!(
4704                count_wal_entries(&wal_dir),
4705                K,
4706                "WAL should hold only post-checkpoint events"
4707            );
4708            // Drop without flushing — simulates a crash mid-write.
4709        }
4710
4711        // Session 2: reopen. Recovery should replay only the K post-
4712        // checkpoint events from the WAL — the N pre-checkpoint events
4713        // are durable in Parquet and lazy-loaded on demand.
4714        {
4715            let store = EventStore::with_config(config_factory());
4716            // total_events reflects only WAL-recovered events at boot
4717            // (Step 2 — Parquet stays cold until first per-tenant
4718            // query). So the WAL replay size IS exactly K.
4719            assert_eq!(
4720                store.stats().total_events,
4721                K,
4722                "Boot should replay exactly K events from WAL (the post-checkpoint window), not N+K"
4723            );
4724
4725            // Lazy-load brings the rest in.
4726            store.ensure_tenant_loaded("default").unwrap();
4727            assert_eq!(
4728                store.stats().total_events,
4729                N + K,
4730                "After lazy-load, both pre- and post-checkpoint events should be reachable"
4731            );
4732        }
4733    }
4734
4735    #[test]
4736    fn test_checkpoint_is_idempotent() {
4737        // Calling checkpoint() twice in a row is safe: the second call
4738        // finds an empty WAL and an empty Parquet batch, and no-ops.
4739        let data_dir = TempDir::new().unwrap();
4740        let storage_dir = data_dir.path().join("storage");
4741        let wal_dir = data_dir.path().join("wal");
4742
4743        let store = EventStore::with_config(EventStoreConfig::production(
4744            &storage_dir,
4745            &wal_dir,
4746            SnapshotConfig::default(),
4747            WALConfig::default(),
4748            CompactionConfig::default(),
4749        ));
4750
4751        for i in 0..5 {
4752            store
4753                .ingest(
4754                    &Event::from_strings(
4755                        "x".to_string(),
4756                        format!("e-{i}"),
4757                        "default".to_string(),
4758                        serde_json::json!({}),
4759                        None,
4760                    )
4761                    .unwrap(),
4762                )
4763                .unwrap();
4764        }
4765
4766        store.checkpoint().unwrap();
4767        // Second call is a no-op and must not error.
4768        store.checkpoint().unwrap();
4769        assert_eq!(count_wal_entries(&wal_dir), 0);
4770    }
4771
4772    #[test]
4773    fn test_checkpoint_noop_in_memory_only_mode() {
4774        // Without WAL configured, checkpoint() is a no-op.
4775        let store = EventStore::new();
4776        store.checkpoint().unwrap();
4777    }
4778
4779    #[test]
4780    fn test_checkpoint_interval_from_env_defaults_to_60s_when_wal_enabled() {
4781        let (config, _) = EventStoreConfig::from_env_vars(
4782            Some("/app/data".to_string()),
4783            None,
4784            None,
4785            None,
4786            None,
4787            None,
4788            None,
4789            None,
4790        );
4791        assert_eq!(config.checkpoint_interval_secs, Some(60));
4792    }
4793
4794    #[test]
4795    fn test_checkpoint_interval_from_env_overrides_default() {
4796        let (config, _) = EventStoreConfig::from_env_vars(
4797            Some("/app/data".to_string()),
4798            None,
4799            None,
4800            None,
4801            None,
4802            None,
4803            None,
4804            Some("15".to_string()),
4805        );
4806        assert_eq!(config.checkpoint_interval_secs, Some(15));
4807    }
4808
4809    #[test]
4810    fn test_checkpoint_interval_disabled_when_wal_disabled() {
4811        // No WAL → no checkpoint loop, regardless of env var value.
4812        let (config, _) = EventStoreConfig::from_env_vars(
4813            Some("/app/data".to_string()),
4814            None,
4815            None,
4816            Some("false".to_string()),
4817            None,
4818            None,
4819            None,
4820            Some("15".to_string()),
4821        );
4822        assert_eq!(config.checkpoint_interval_secs, None);
4823    }
4824
4825    #[test]
4826    fn test_checkpoint_interval_unparseable_falls_back_to_default() {
4827        let (config, _) = EventStoreConfig::from_env_vars(
4828            Some("/app/data".to_string()),
4829            None,
4830            None,
4831            None,
4832            None,
4833            None,
4834            None,
4835            Some("not-a-number".to_string()),
4836        );
4837        assert_eq!(config.checkpoint_interval_secs, Some(60));
4838    }
4839}