Skip to main content

allsource_core/
store.rs

1#[cfg(feature = "server")]
2use crate::application::services::webhook::WebhookRegistry;
3#[cfg(feature = "server")]
4use crate::infrastructure::observability::metrics::MetricsRegistry;
5#[cfg(feature = "server")]
6use crate::infrastructure::web::websocket::WebSocketManager;
7use crate::{
8    application::{
9        dto::QueryEventsRequest,
10        services::{
11            exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
12            pipeline::PipelineManager,
13            projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
14            replay::ReplayManager,
15            schema::{SchemaRegistry, SchemaRegistryConfig},
16            schema_evolution::SchemaEvolutionManager,
17        },
18    },
19    domain::entities::Event,
20    error::{AllSourceError, Result},
21    infrastructure::{
22        persistence::{
23            compaction::{CompactionConfig, CompactionManager},
24            index::{EventIndex, IndexEntry},
25            snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
26            storage::ParquetStorage,
27            wal::{WALConfig, WriteAheadLog},
28        },
29        query::geospatial::GeoIndex,
30    },
31};
32use chrono::{DateTime, Utc};
33use dashmap::DashMap;
34use parking_lot::RwLock;
35use std::{path::PathBuf, sync::Arc};
36#[cfg(feature = "server")]
37use tokio::sync::mpsc;
38
39/// High-performance event store with columnar storage
40pub struct EventStore {
41    /// In-memory event storage
42    events: Arc<RwLock<Vec<Event>>>,
43
44    /// High-performance concurrent index
45    index: Arc<EventIndex>,
46
47    /// Projection manager for real-time aggregations
48    pub(crate) projections: Arc<RwLock<ProjectionManager>>,
49
50    /// Optional persistent storage (v0.2 feature)
51    storage: Option<Arc<RwLock<ParquetStorage>>>,
52
53    /// WebSocket manager for real-time event streaming (v0.2 feature)
54    #[cfg(feature = "server")]
55    websocket_manager: Arc<WebSocketManager>,
56
57    /// Snapshot manager for fast state recovery (v0.2 feature)
58    snapshot_manager: Arc<SnapshotManager>,
59
60    /// Write-Ahead Log for durability (v0.2 feature)
61    wal: Option<Arc<WriteAheadLog>>,
62
63    /// Compaction manager for Parquet optimization (v0.2 feature)
64    compaction_manager: Option<Arc<CompactionManager>>,
65
66    /// Schema registry for event validation (v0.5 feature)
67    schema_registry: Arc<SchemaRegistry>,
68
69    /// Replay manager for event replay and projection rebuilding (v0.5 feature)
70    replay_manager: Arc<ReplayManager>,
71
72    /// Pipeline manager for stream processing (v0.5 feature)
73    pipeline_manager: Arc<PipelineManager>,
74
75    /// Prometheus metrics registry (v0.6 feature)
76    #[cfg(feature = "server")]
77    metrics: Arc<MetricsRegistry>,
78
79    /// Total events ingested (for metrics)
80    total_ingested: Arc<RwLock<u64>>,
81
82    /// Projection state cache for Query Service integration (v0.7 feature)
83    /// Key format: "{projection_name}:{entity_id}"
84    /// This DashMap provides O(1) access with ~11.9 μs latency
85    projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
86
87    /// Webhook registry for outbound event delivery (v0.11 feature)
88    #[cfg(feature = "server")]
89    webhook_registry: Arc<WebhookRegistry>,
90
91    /// Channel sender for async webhook delivery tasks
92    #[cfg(feature = "server")]
93    webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
94
95    /// Geospatial index for coordinate-based queries (v2.0 feature)
96    geo_index: Arc<GeoIndex>,
97
98    /// Exactly-once processing registry (v2.0 feature)
99    exactly_once: Arc<ExactlyOnceRegistry>,
100
101    /// Autonomous schema evolution manager (v2.0 feature)
102    schema_evolution: Arc<SchemaEvolutionManager>,
103}
104
105/// A task queued for async webhook delivery
106#[cfg(feature = "server")]
107#[derive(Debug, Clone)]
108pub struct WebhookDeliveryTask {
109    pub webhook: crate::application::services::webhook::WebhookSubscription,
110    pub event: Event,
111}
112
113impl EventStore {
114    /// Create a new in-memory event store
115    pub fn new() -> Self {
116        Self::with_config(EventStoreConfig::default())
117    }
118
119    /// Create event store with custom configuration
120    pub fn with_config(config: EventStoreConfig) -> Self {
121        let mut projections = ProjectionManager::new();
122
123        // Register built-in projections
124        projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
125        projections.register(Arc::new(EventCounterProjection::new("event_counters")));
126
127        // Initialize persistent storage if configured
128        let storage = config
129            .storage_dir
130            .as_ref()
131            .and_then(|dir| match ParquetStorage::new(dir) {
132                Ok(storage) => {
133                    tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
134                    Some(Arc::new(RwLock::new(storage)))
135                }
136                Err(e) => {
137                    tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
138                    None
139                }
140            });
141
142        // Initialize WAL if configured (v0.2 feature)
143        let wal = config.wal_dir.as_ref().and_then(|dir| {
144            match WriteAheadLog::new(dir, config.wal_config.clone()) {
145                Ok(wal) => {
146                    tracing::info!("✅ WAL enabled at: {}", dir.display());
147                    Some(Arc::new(wal))
148                }
149                Err(e) => {
150                    tracing::error!("❌ Failed to initialize WAL: {}", e);
151                    None
152                }
153            }
154        });
155
156        // Initialize compaction manager if Parquet storage is enabled (v0.2 feature)
157        let compaction_manager = config.storage_dir.as_ref().map(|dir| {
158            let manager = CompactionManager::new(dir, config.compaction_config.clone());
159            Arc::new(manager)
160        });
161
162        // Initialize schema registry (v0.5 feature)
163        let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
164        tracing::info!("✅ Schema registry enabled");
165
166        // Initialize replay manager (v0.5 feature)
167        let replay_manager = Arc::new(ReplayManager::new());
168        tracing::info!("✅ Replay manager enabled");
169
170        // Initialize pipeline manager (v0.5 feature)
171        let pipeline_manager = Arc::new(PipelineManager::new());
172        tracing::info!("✅ Pipeline manager enabled");
173
174        // Initialize metrics registry (v0.6 feature)
175        #[cfg(feature = "server")]
176        let metrics = {
177            let m = MetricsRegistry::new();
178            tracing::info!("✅ Prometheus metrics registry initialized");
179            m
180        };
181
182        // Initialize projection state cache (v0.7 feature)
183        let projection_state_cache = Arc::new(DashMap::new());
184        tracing::info!("✅ Projection state cache initialized");
185
186        // Initialize webhook registry (v0.11 feature)
187        #[cfg(feature = "server")]
188        let webhook_registry = {
189            let w = Arc::new(WebhookRegistry::new());
190            tracing::info!("✅ Webhook registry initialized");
191            w
192        };
193
194        let store = Self {
195            events: Arc::new(RwLock::new(Vec::new())),
196            index: Arc::new(EventIndex::new()),
197            projections: Arc::new(RwLock::new(projections)),
198            storage,
199            #[cfg(feature = "server")]
200            websocket_manager: Arc::new(WebSocketManager::new()),
201            snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
202            wal,
203            compaction_manager,
204            schema_registry,
205            replay_manager,
206            pipeline_manager,
207            #[cfg(feature = "server")]
208            metrics,
209            total_ingested: Arc::new(RwLock::new(0)),
210            projection_state_cache,
211            #[cfg(feature = "server")]
212            webhook_registry,
213            #[cfg(feature = "server")]
214            webhook_tx: Arc::new(RwLock::new(None)),
215            geo_index: Arc::new(GeoIndex::new()),
216            exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
217            schema_evolution: Arc::new(SchemaEvolutionManager::new()),
218        };
219
220        // Recover from WAL first (most recent data)
221        let mut wal_recovered = false;
222        if let Some(ref wal) = store.wal {
223            match wal.recover() {
224                Ok(recovered_events) if !recovered_events.is_empty() => {
225                    tracing::info!(
226                        "🔄 Recovering {} events from WAL...",
227                        recovered_events.len()
228                    );
229
230                    for event in recovered_events {
231                        // Re-index and process events from WAL
232                        let offset = store.events.read().len();
233                        if let Err(e) = store.index.index_event(
234                            event.id,
235                            event.entity_id_str(),
236                            event.event_type_str(),
237                            event.timestamp,
238                            offset,
239                        ) {
240                            tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
241                        }
242
243                        if let Err(e) = store.projections.read().process_event(&event) {
244                            tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
245                        }
246
247                        store.events.write().push(event);
248                    }
249
250                    let total = store.events.read().len();
251                    *store.total_ingested.write() = total as u64;
252                    tracing::info!("✅ Successfully recovered {} events from WAL", total);
253
254                    // After successful recovery, checkpoint to Parquet if enabled
255                    if store.storage.is_some() {
256                        tracing::info!("📸 Checkpointing WAL to Parquet storage...");
257                        if let Err(e) = store.flush_storage() {
258                            tracing::error!("Failed to checkpoint to Parquet: {}", e);
259                        } else if let Err(e) = wal.truncate() {
260                            tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
261                        } else {
262                            tracing::info!("✅ WAL checkpointed and truncated");
263                        }
264                    }
265
266                    wal_recovered = true;
267                }
268                Ok(_) => {
269                    tracing::debug!("No events to recover from WAL");
270                }
271                Err(e) => {
272                    tracing::error!("❌ WAL recovery failed: {}", e);
273                }
274            }
275        }
276
277        // Load persisted events from Parquet only if we didn't recover from WAL
278        // (to avoid loading the same events twice after WAL checkpoint)
279        if !wal_recovered
280            && let Some(ref storage) = store.storage
281            && let Ok(persisted_events) = storage.read().load_all_events()
282        {
283            tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
284
285            for event in persisted_events {
286                // Re-index loaded events
287                let offset = store.events.read().len();
288                if let Err(e) = store.index.index_event(
289                    event.id,
290                    event.entity_id_str(),
291                    event.event_type_str(),
292                    event.timestamp,
293                    offset,
294                ) {
295                    tracing::error!("Failed to re-index event {}: {}", event.id, e);
296                }
297
298                // Re-process through projections
299                if let Err(e) = store.projections.read().process_event(&event) {
300                    tracing::error!("Failed to re-process event {}: {}", event.id, e);
301                }
302
303                store.events.write().push(event);
304            }
305
306            let total = store.events.read().len();
307            *store.total_ingested.write() = total as u64;
308            tracing::info!("✅ Successfully loaded {} events from storage", total);
309        }
310
311        store
312    }
313
314    /// Ingest a new event into the store
315    pub fn ingest(&self, event: Event) -> Result<()> {
316        // Start metrics timer (v0.6 feature)
317        #[cfg(feature = "server")]
318        let timer = self.metrics.ingestion_duration_seconds.start_timer();
319
320        // Validate event
321        let validation_result = self.validate_event(&event);
322        if let Err(e) = validation_result {
323            #[cfg(feature = "server")]
324            {
325                self.metrics.ingestion_errors_total.inc();
326                timer.observe_duration();
327            }
328            return Err(e);
329        }
330
331        // Write to WAL FIRST for durability (v0.2 feature)
332        // This ensures event is persisted before processing
333        if let Some(ref wal) = self.wal
334            && let Err(e) = wal.append(event.clone())
335        {
336            #[cfg(feature = "server")]
337            {
338                self.metrics.ingestion_errors_total.inc();
339                timer.observe_duration();
340            }
341            return Err(e);
342        }
343
344        let mut events = self.events.write();
345        let offset = events.len();
346
347        // Index the event
348        self.index.index_event(
349            event.id,
350            event.entity_id_str(),
351            event.event_type_str(),
352            event.timestamp,
353            offset,
354        )?;
355
356        // Process through projections
357        let projections = self.projections.read();
358        projections.process_event(&event)?;
359        drop(projections); // Release lock
360
361        // Process through pipelines (v0.5 feature)
362        // Pipelines can transform, filter, and aggregate events in real-time
363        let pipeline_results = self.pipeline_manager.process_event(&event);
364        if !pipeline_results.is_empty() {
365            tracing::debug!(
366                "Event {} processed by {} pipeline(s)",
367                event.id,
368                pipeline_results.len()
369            );
370            // Pipeline results could be stored, emitted, or forwarded elsewhere
371            // For now, we just log them for observability
372            for (pipeline_id, result) in pipeline_results {
373                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
374            }
375        }
376
377        // Persist to Parquet storage if enabled (v0.2)
378        if let Some(ref storage) = self.storage {
379            let storage = storage.read();
380            storage.append_event(event.clone())?;
381        }
382
383        // Store the event in memory
384        events.push(event.clone());
385        let total_events = events.len();
386        drop(events); // Release lock early
387
388        // Broadcast to WebSocket clients (v0.2 feature)
389        #[cfg(feature = "server")]
390        self.websocket_manager
391            .broadcast_event(Arc::new(event.clone()));
392
393        // Dispatch to matching webhook subscriptions (v0.11 feature)
394        #[cfg(feature = "server")]
395        self.dispatch_webhooks(&event);
396
397        // Update geospatial index (v2.0 feature)
398        self.geo_index.index_event(&event);
399
400        // Autonomous schema evolution (v2.0 feature)
401        self.schema_evolution
402            .analyze_event(event.event_type_str(), &event.payload);
403
404        // Check if automatic snapshot should be created (v0.2 feature)
405        self.check_auto_snapshot(event.entity_id_str(), &event);
406
407        // Update metrics (v0.6 feature)
408        #[cfg(feature = "server")]
409        {
410            self.metrics.events_ingested_total.inc();
411            self.metrics
412                .events_ingested_by_type
413                .with_label_values(&[event.event_type_str()])
414                .inc();
415            self.metrics.storage_events_total.set(total_events as i64);
416        }
417
418        // Update legacy total counter
419        let mut total = self.total_ingested.write();
420        *total += 1;
421
422        #[cfg(feature = "server")]
423        timer.observe_duration();
424
425        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
426
427        Ok(())
428    }
429
430    /// Ingest a batch of events with a single write lock acquisition.
431    ///
432    /// All events are validated first. If any event fails validation, no
433    /// events are stored (all-or-nothing validation). Events are then written
434    /// to WAL, indexed, processed through projections, and pushed to the
435    /// events vector under a single write lock.
436    pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()> {
437        if batch.is_empty() {
438            return Ok(());
439        }
440
441        // Phase 1: Validate all events before acquiring any locks
442        for event in &batch {
443            self.validate_event(event)?;
444        }
445
446        // Phase 2: Write all events to WAL (before write lock, for durability)
447        if let Some(ref wal) = self.wal {
448            for event in &batch {
449                wal.append(event.clone())?;
450            }
451        }
452
453        // Phase 3: Single write lock for index + projections + push
454        let mut events = self.events.write();
455        let projections = self.projections.read();
456
457        for event in batch {
458            let offset = events.len();
459
460            self.index.index_event(
461                event.id,
462                event.entity_id_str(),
463                event.event_type_str(),
464                event.timestamp,
465                offset,
466            )?;
467
468            projections.process_event(&event)?;
469            self.pipeline_manager.process_event(&event);
470
471            if let Some(ref storage) = self.storage {
472                let storage = storage.read();
473                storage.append_event(event.clone())?;
474            }
475
476            self.geo_index.index_event(&event);
477            self.schema_evolution
478                .analyze_event(event.event_type_str(), &event.payload);
479
480            events.push(event);
481        }
482
483        let total_events = events.len();
484        drop(projections);
485        drop(events);
486
487        let mut total = self.total_ingested.write();
488        *total += total_events as u64;
489
490        Ok(())
491    }
492
493    /// Ingest a replicated event from the leader (follower mode).
494    ///
495    /// Unlike `ingest()`, this method:
496    /// - Skips WAL writing (the follower's WalReceiver manages its own local WAL)
497    /// - Skips schema validation (the leader already validated)
498    /// - Still indexes, processes projections/pipelines, and broadcasts to WebSocket clients
499    pub fn ingest_replicated(&self, event: Event) -> Result<()> {
500        #[cfg(feature = "server")]
501        let timer = self.metrics.ingestion_duration_seconds.start_timer();
502
503        let mut events = self.events.write();
504        let offset = events.len();
505
506        // Index the event
507        self.index.index_event(
508            event.id,
509            event.entity_id_str(),
510            event.event_type_str(),
511            event.timestamp,
512            offset,
513        )?;
514
515        // Process through projections
516        let projections = self.projections.read();
517        projections.process_event(&event)?;
518        drop(projections);
519
520        // Process through pipelines
521        let pipeline_results = self.pipeline_manager.process_event(&event);
522        if !pipeline_results.is_empty() {
523            tracing::debug!(
524                "Replicated event {} processed by {} pipeline(s)",
525                event.id,
526                pipeline_results.len()
527            );
528        }
529
530        // Store the event in memory
531        events.push(event.clone());
532        let total_events = events.len();
533        drop(events);
534
535        // Broadcast to WebSocket clients
536        #[cfg(feature = "server")]
537        self.websocket_manager
538            .broadcast_event(Arc::new(event.clone()));
539
540        // Update metrics
541        #[cfg(feature = "server")]
542        {
543            self.metrics.events_ingested_total.inc();
544            self.metrics
545                .events_ingested_by_type
546                .with_label_values(&[event.event_type_str()])
547                .inc();
548            self.metrics.storage_events_total.set(total_events as i64);
549        }
550
551        let mut total = self.total_ingested.write();
552        *total += 1;
553
554        #[cfg(feature = "server")]
555        timer.observe_duration();
556
557        tracing::debug!(
558            "Replicated event ingested: {} (offset: {})",
559            event.id,
560            offset
561        );
562
563        Ok(())
564    }
565
566    /// Get the WebSocket manager for this store
567    #[cfg(feature = "server")]
568    pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
569        Arc::clone(&self.websocket_manager)
570    }
571
572    /// Get the snapshot manager for this store
573    pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
574        Arc::clone(&self.snapshot_manager)
575    }
576
577    /// Get the compaction manager for this store
578    pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
579        self.compaction_manager.as_ref().map(Arc::clone)
580    }
581
582    /// Get the schema registry for this store (v0.5 feature)
583    pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
584        Arc::clone(&self.schema_registry)
585    }
586
587    /// Get the replay manager for this store (v0.5 feature)
588    pub fn replay_manager(&self) -> Arc<ReplayManager> {
589        Arc::clone(&self.replay_manager)
590    }
591
592    /// Get the pipeline manager for this store (v0.5 feature)
593    pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
594        Arc::clone(&self.pipeline_manager)
595    }
596
597    /// Get the metrics registry for this store (v0.6 feature)
598    #[cfg(feature = "server")]
599    pub fn metrics(&self) -> Arc<MetricsRegistry> {
600        Arc::clone(&self.metrics)
601    }
602
603    /// Get the projection manager for this store (v0.7 feature)
604    pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
605        self.projections.read()
606    }
607
608    /// Register a custom projection at runtime.
609    ///
610    /// The projection will receive all future events via `process()`.
611    /// Historical events are **not** replayed — only events ingested after
612    /// registration will be processed by this projection.
613    ///
614    /// See [`register_projection_with_backfill`](Self::register_projection_with_backfill)
615    /// to also process historical events.
616    pub fn register_projection(
617        &self,
618        projection: Arc<dyn crate::application::services::projection::Projection>,
619    ) {
620        let mut pm = self.projections.write();
621        pm.register(projection);
622    }
623
624    /// Register a custom projection and replay all existing events through it.
625    ///
626    /// After registration, the projection will also receive all future events.
627    /// Historical events are replayed under a read lock — the projection's
628    /// internal state (typically DashMap) handles concurrent access.
629    pub fn register_projection_with_backfill(
630        &self,
631        projection: Arc<dyn crate::application::services::projection::Projection>,
632    ) -> Result<()> {
633        // First register so future events are processed
634        {
635            let mut pm = self.projections.write();
636            pm.register(Arc::clone(&projection));
637        }
638
639        // Then replay existing events under read lock
640        let events = self.events.read();
641        for event in events.iter() {
642            projection.process(event)?;
643        }
644
645        Ok(())
646    }
647
648    /// Get the projection state cache for this store (v0.7 feature)
649    /// Used by Elixir Query Service for state synchronization
650    pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
651        Arc::clone(&self.projection_state_cache)
652    }
653
654    /// Get the webhook registry for this store (v0.11 feature)
655    /// Geospatial index for coordinate-based queries (v2.0 feature)
656    pub fn geo_index(&self) -> Arc<GeoIndex> {
657        self.geo_index.clone()
658    }
659
660    /// Exactly-once processing registry (v2.0 feature)
661    pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
662        self.exactly_once.clone()
663    }
664
665    /// Schema evolution manager (v2.0 feature)
666    pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
667        self.schema_evolution.clone()
668    }
669
670    /// Get a read-locked snapshot of all events (for EventQL/GraphQL queries).
671    ///
672    /// Returns an `Arc` reference to the internal events vec, avoiding a full
673    /// clone. The caller holds a read lock for the duration of the `Arc`
674    /// lifetime — prefer short-lived usage.
675    pub fn snapshot_events(&self) -> Vec<Event> {
676        self.events.read().clone()
677    }
678
679    /// Compact token events for an entity by replacing matching events with a
680    /// single merged event. Used by the embedded streaming feature.
681    ///
682    /// Returns `Ok(true)` if compaction was performed, `Ok(false)` if no
683    /// matching events were found.
684    ///
685    /// **Note:** The merged event is processed through projections *without*
686    /// clearing the removed events' projection state first. Projections that
687    /// accumulate state (e.g., counters) should be designed to handle this
688    /// (the merged event replaces individual tokens, not adds to them).
689    ///
690    /// **Crash safety:** The WAL append happens *after* the in-memory swap
691    /// under the write lock. If the process crashes before the WAL write,
692    /// no change is persisted — WAL replay restores the pre-compaction state.
693    /// If the process crashes after the WAL write, replay sees the merged
694    /// event (and the original tokens, which are idempotent to replay since
695    /// the merged event supersedes them).
696    ///
697    /// The write lock is held for the swap + WAL write + index rebuild.
698    /// The index rebuild is O(N) over all events, which is acceptable for
699    /// embedded workloads but should not be called in hot paths for large stores.
700    pub fn compact_entity_tokens(
701        &self,
702        entity_id: &str,
703        token_event_type: &str,
704        merged_event: Event,
705    ) -> Result<bool> {
706        // Phase 1: Read-only check — do we have anything to compact?
707        {
708            let events = self.events.read();
709            let has_tokens = events
710                .iter()
711                .any(|e| e.entity_id_str() == entity_id && e.event_type_str() == token_event_type);
712            if !has_tokens {
713                return Ok(false);
714            }
715        }
716
717        // Phase 2: Process merged event through projections (no write lock held)
718        let projections = self.projections.read();
719        projections.process_event(&merged_event)?;
720        drop(projections);
721
722        // Phase 3: Acquire write lock for the swap + WAL + index rebuild
723        let mut events = self.events.write();
724
725        events.retain(|e| {
726            !(e.entity_id_str() == entity_id && e.event_type_str() == token_event_type)
727        });
728
729        events.push(merged_event.clone());
730
731        // WAL append inside write lock: crash before this line = no change persisted.
732        // Crash after = merged event in WAL, original tokens also in WAL but
733        // superseded by the merged event's entity_id + event_type.
734        if let Some(ref wal) = self.wal {
735            wal.append(merged_event)?;
736        }
737
738        // Rebuild entire index since retain() shifted event positions.
739        // Errors here indicate a corrupt event (missing entity_id/event_type)
740        // which should not happen for well-formed events. Log and continue
741        // rather than failing the entire compaction.
742        self.index.clear();
743        for (offset, event) in events.iter().enumerate() {
744            if let Err(e) = self.index.index_event(
745                event.id,
746                event.entity_id_str(),
747                event.event_type_str(),
748                event.timestamp,
749                offset,
750            ) {
751                tracing::warn!(
752                    event_id = %event.id,
753                    offset,
754                    "Failed to re-index event during compaction: {e}"
755                );
756            }
757        }
758
759        Ok(true)
760    }
761
762    #[cfg(feature = "server")]
763    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
764        Arc::clone(&self.webhook_registry)
765    }
766
767    /// Set the channel for async webhook delivery.
768    /// Called during server startup to wire the delivery worker.
769    #[cfg(feature = "server")]
770    pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
771        *self.webhook_tx.write() = Some(tx);
772        tracing::info!("Webhook delivery channel connected");
773    }
774
775    /// Dispatch matching webhooks for a given event (non-blocking).
776    #[cfg(feature = "server")]
777    fn dispatch_webhooks(&self, event: &Event) {
778        let matching = self.webhook_registry.find_matching(event);
779        if matching.is_empty() {
780            return;
781        }
782
783        let tx_guard = self.webhook_tx.read();
784        if let Some(ref tx) = *tx_guard {
785            for webhook in matching {
786                let task = WebhookDeliveryTask {
787                    webhook,
788                    event: event.clone(),
789                };
790                if let Err(e) = tx.send(task) {
791                    tracing::warn!("Failed to queue webhook delivery: {}", e);
792                }
793            }
794        }
795    }
796
797    /// Manually flush any pending events to persistent storage
798    pub fn flush_storage(&self) -> Result<()> {
799        if let Some(ref storage) = self.storage {
800            let storage = storage.read();
801            storage.flush()?;
802            tracing::info!("✅ Flushed events to persistent storage");
803        }
804        Ok(())
805    }
806
807    /// Manually create a snapshot for an entity
808    pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
809        // Get all events for this entity
810        let events = self.query(QueryEventsRequest {
811            entity_id: Some(entity_id.to_string()),
812            event_type: None,
813            tenant_id: None,
814            as_of: None,
815            since: None,
816            until: None,
817            limit: None,
818            event_type_prefix: None,
819            payload_filter: None,
820        })?;
821
822        if events.is_empty() {
823            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
824        }
825
826        // Build current state
827        let mut state = serde_json::json!({});
828        for event in &events {
829            if let serde_json::Value::Object(ref mut state_map) = state
830                && let serde_json::Value::Object(ref payload_map) = event.payload
831            {
832                for (key, value) in payload_map {
833                    state_map.insert(key.clone(), value.clone());
834                }
835            }
836        }
837
838        let last_event = events.last().unwrap();
839        self.snapshot_manager.create_snapshot(
840            entity_id.to_string(),
841            state,
842            last_event.timestamp,
843            events.len(),
844            SnapshotType::Manual,
845        )?;
846
847        Ok(())
848    }
849
850    /// Check and create automatic snapshots if needed
851    fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
852        // Count events for this entity
853        let entity_event_count = self
854            .index
855            .get_by_entity(entity_id)
856            .map(|entries| entries.len())
857            .unwrap_or(0);
858
859        if self.snapshot_manager.should_create_snapshot(
860            entity_id,
861            entity_event_count,
862            event.timestamp,
863        ) {
864            // Create snapshot in background (don't block ingestion)
865            if let Err(e) = self.create_snapshot(entity_id) {
866                tracing::warn!(
867                    "Failed to create automatic snapshot for {}: {}",
868                    entity_id,
869                    e
870                );
871            }
872        }
873    }
874
875    /// Validate an event before ingestion
876    fn validate_event(&self, event: &Event) -> Result<()> {
877        // EntityId and EventType value objects already validate non-empty in their constructors
878        // So these checks are now redundant, but we keep them for explicit validation
879        if event.entity_id_str().is_empty() {
880            return Err(AllSourceError::ValidationError(
881                "entity_id cannot be empty".to_string(),
882            ));
883        }
884
885        if event.event_type_str().is_empty() {
886            return Err(AllSourceError::ValidationError(
887                "event_type cannot be empty".to_string(),
888            ));
889        }
890
891        // Reject system namespace events from user-facing ingestion.
892        // System events are written exclusively via SystemMetadataStore.
893        if event.event_type().is_system() {
894            return Err(AllSourceError::ValidationError(
895                "Event types starting with '_system.' are reserved for internal use".to_string(),
896            ));
897        }
898
899        Ok(())
900    }
901
902    /// Reset a projection by clearing its state and reprocessing all events
903    pub fn reset_projection(&self, name: &str) -> Result<usize> {
904        let projection_manager = self.projections.read();
905        let projection = projection_manager.get_projection(name).ok_or_else(|| {
906            AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
907        })?;
908
909        // Clear existing state
910        projection.clear();
911
912        // Clear cached state for this projection
913        let prefix = format!("{name}:");
914        let keys_to_remove: Vec<String> = self
915            .projection_state_cache
916            .iter()
917            .filter(|entry| entry.key().starts_with(&prefix))
918            .map(|entry| entry.key().clone())
919            .collect();
920        for key in keys_to_remove {
921            self.projection_state_cache.remove(&key);
922        }
923
924        // Reprocess all events through this projection
925        let events = self.events.read();
926        let mut reprocessed = 0usize;
927        for event in events.iter() {
928            if projection.process(event).is_ok() {
929                reprocessed += 1;
930            }
931        }
932
933        Ok(reprocessed)
934    }
935
936    /// Get a single event by its UUID
937    pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
938        if let Some(offset) = self.index.get_by_id(event_id) {
939            let events = self.events.read();
940            Ok(events.get(offset).cloned())
941        } else {
942            Ok(None)
943        }
944    }
945
946    /// Query events based on filters (optimized with indices)
947    pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
948        // Determine query type for metrics (v0.6 feature)
949        let query_type = if request.entity_id.is_some() {
950            "entity"
951        } else if request.event_type.is_some() {
952            "type"
953        } else if request.event_type_prefix.is_some() {
954            "type_prefix"
955        } else {
956            "full_scan"
957        };
958
959        // Start metrics timer (v0.6 feature)
960        #[cfg(feature = "server")]
961        let timer = self
962            .metrics
963            .query_duration_seconds
964            .with_label_values(&[query_type])
965            .start_timer();
966
967        // Increment query counter (v0.6 feature)
968        #[cfg(feature = "server")]
969        self.metrics
970            .queries_total
971            .with_label_values(&[query_type])
972            .inc();
973
974        let events = self.events.read();
975
976        // Use index for fast lookups
977        let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
978            // Use entity index
979            self.index
980                .get_by_entity(entity_id)
981                .map(|entries| self.filter_entries(entries, &request))
982                .unwrap_or_default()
983        } else if let Some(event_type) = &request.event_type {
984            // Use type index (exact match)
985            self.index
986                .get_by_type(event_type)
987                .map(|entries| self.filter_entries(entries, &request))
988                .unwrap_or_default()
989        } else if let Some(prefix) = &request.event_type_prefix {
990            // Use type index (prefix match)
991            let entries = self.index.get_by_type_prefix(prefix);
992            self.filter_entries(entries, &request)
993        } else {
994            // Full scan (less efficient but necessary for complex queries)
995            (0..events.len()).collect()
996        };
997
998        // Fetch events and apply remaining filters
999        let mut results: Vec<Event> = offsets
1000            .iter()
1001            .filter_map(|&offset| events.get(offset).cloned())
1002            .filter(|event| self.apply_filters(event, &request))
1003            .collect();
1004
1005        // Sort by timestamp (ascending)
1006        results.sort_by_key(|x| x.timestamp);
1007
1008        // Apply limit
1009        if let Some(limit) = request.limit {
1010            results.truncate(limit);
1011        }
1012
1013        // Record query results count (v0.6 feature)
1014        #[cfg(feature = "server")]
1015        {
1016            self.metrics
1017                .query_results_total
1018                .with_label_values(&[query_type])
1019                .inc_by(results.len() as u64);
1020            timer.observe_duration();
1021        }
1022
1023        Ok(results)
1024    }
1025
1026    /// Filter index entries based on query parameters
1027    fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
1028        entries
1029            .into_iter()
1030            .filter(|entry| {
1031                // Time filters
1032                if let Some(as_of) = request.as_of
1033                    && entry.timestamp > as_of
1034                {
1035                    return false;
1036                }
1037                if let Some(since) = request.since
1038                    && entry.timestamp < since
1039                {
1040                    return false;
1041                }
1042                if let Some(until) = request.until
1043                    && entry.timestamp > until
1044                {
1045                    return false;
1046                }
1047                true
1048            })
1049            .map(|entry| entry.offset)
1050            .collect()
1051    }
1052
1053    /// Apply filters to an event
1054    fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
1055        // Additional type filter if entity was primary
1056        if request.entity_id.is_some()
1057            && let Some(ref event_type) = request.event_type
1058            && event.event_type_str() != event_type
1059        {
1060            return false;
1061        }
1062
1063        // Additional prefix filter if entity was primary
1064        if request.entity_id.is_some()
1065            && let Some(ref prefix) = request.event_type_prefix
1066            && !event.event_type_str().starts_with(prefix)
1067        {
1068            return false;
1069        }
1070
1071        // Payload field filtering
1072        if let Some(ref filter_str) = request.payload_filter
1073            && let Ok(filter_obj) =
1074                serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(filter_str)
1075        {
1076            let payload = event.payload();
1077            for (key, expected_value) in &filter_obj {
1078                match payload.get(key) {
1079                    Some(actual_value) if actual_value == expected_value => {}
1080                    _ => return false,
1081                }
1082            }
1083        }
1084
1085        true
1086    }
1087
1088    /// Reconstruct entity state as of a specific timestamp
1089    /// v0.2: Now uses snapshots for fast reconstruction
1090    pub fn reconstruct_state(
1091        &self,
1092        entity_id: &str,
1093        as_of: Option<DateTime<Utc>>,
1094    ) -> Result<serde_json::Value> {
1095        // Try to find a snapshot to use as a base (v0.2 optimization)
1096        let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
1097            // Get snapshot closest to requested time
1098            if let Some(snapshot) = self
1099                .snapshot_manager
1100                .get_snapshot_as_of(entity_id, as_of_time)
1101            {
1102                tracing::debug!(
1103                    "Using snapshot from {} for entity {} (saved {} events)",
1104                    snapshot.as_of,
1105                    entity_id,
1106                    snapshot.event_count
1107                );
1108                (snapshot.state.clone(), Some(snapshot.as_of))
1109            } else {
1110                (serde_json::json!({}), None)
1111            }
1112        } else {
1113            // Get latest snapshot for current state
1114            if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
1115                tracing::debug!(
1116                    "Using latest snapshot from {} for entity {}",
1117                    snapshot.as_of,
1118                    entity_id
1119                );
1120                (snapshot.state.clone(), Some(snapshot.as_of))
1121            } else {
1122                (serde_json::json!({}), None)
1123            }
1124        };
1125
1126        // Query events after the snapshot (or all if no snapshot)
1127        let events = self.query(QueryEventsRequest {
1128            entity_id: Some(entity_id.to_string()),
1129            event_type: None,
1130            tenant_id: None,
1131            as_of,
1132            since: since_timestamp,
1133            until: None,
1134            limit: None,
1135            event_type_prefix: None,
1136            payload_filter: None,
1137        })?;
1138
1139        // If no events and no snapshot, entity not found
1140        if events.is_empty() && since_timestamp.is_none() {
1141            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
1142        }
1143
1144        // Merge events on top of snapshot (or from scratch if no snapshot)
1145        let mut merged_state = merged_state;
1146        for event in &events {
1147            if let serde_json::Value::Object(ref mut state_map) = merged_state
1148                && let serde_json::Value::Object(ref payload_map) = event.payload
1149            {
1150                for (key, value) in payload_map {
1151                    state_map.insert(key.clone(), value.clone());
1152                }
1153            }
1154        }
1155
1156        // Wrap with metadata
1157        let state = serde_json::json!({
1158            "entity_id": entity_id,
1159            "last_updated": events.last().map(|e| e.timestamp),
1160            "event_count": events.len(),
1161            "as_of": as_of,
1162            "current_state": merged_state,
1163            "history": events.iter().map(|e| {
1164                serde_json::json!({
1165                    "event_id": e.id,
1166                    "type": e.event_type,
1167                    "timestamp": e.timestamp,
1168                    "payload": e.payload
1169                })
1170            }).collect::<Vec<_>>()
1171        });
1172
1173        Ok(state)
1174    }
1175
1176    /// Get snapshot from projection (faster than reconstructing)
1177    pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
1178        let projections = self.projections.read();
1179
1180        if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
1181            && let Some(state) = snapshot_projection.get_state(entity_id)
1182        {
1183            return Ok(serde_json::json!({
1184                "entity_id": entity_id,
1185                "snapshot": state,
1186                "from_projection": "entity_snapshots"
1187            }));
1188        }
1189
1190        Err(AllSourceError::EntityNotFound(entity_id.to_string()))
1191    }
1192
1193    /// Get statistics about the event store
1194    pub fn stats(&self) -> StoreStats {
1195        let events = self.events.read();
1196        let index_stats = self.index.stats();
1197
1198        StoreStats {
1199            total_events: events.len(),
1200            total_entities: index_stats.total_entities,
1201            total_event_types: index_stats.total_event_types,
1202            total_ingested: *self.total_ingested.read(),
1203        }
1204    }
1205
1206    /// Get all unique streams (entity_ids) in the store
1207    pub fn list_streams(&self) -> Vec<StreamInfo> {
1208        self.index
1209            .get_all_entities()
1210            .into_iter()
1211            .map(|entity_id| {
1212                let event_count = self
1213                    .index
1214                    .get_by_entity(&entity_id)
1215                    .map(|entries| entries.len())
1216                    .unwrap_or(0);
1217                let last_event_at = self
1218                    .index
1219                    .get_by_entity(&entity_id)
1220                    .and_then(|entries| entries.last().map(|e| e.timestamp));
1221                StreamInfo {
1222                    stream_id: entity_id,
1223                    event_count,
1224                    last_event_at,
1225                }
1226            })
1227            .collect()
1228    }
1229
1230    /// Get all unique event types in the store
1231    pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
1232        self.index
1233            .get_all_types()
1234            .into_iter()
1235            .map(|event_type| {
1236                let event_count = self
1237                    .index
1238                    .get_by_type(&event_type)
1239                    .map(|entries| entries.len())
1240                    .unwrap_or(0);
1241                let last_event_at = self
1242                    .index
1243                    .get_by_type(&event_type)
1244                    .and_then(|entries| entries.last().map(|e| e.timestamp));
1245                EventTypeInfo {
1246                    event_type,
1247                    event_count,
1248                    last_event_at,
1249                }
1250            })
1251            .collect()
1252    }
1253
1254    /// Attach a broadcast sender to the WAL for replication.
1255    ///
1256    /// Thread-safe: can be called through `Arc<EventStore>` at runtime.
1257    /// Used during initial setup and during follower → leader promotion.
1258    /// When set, every WAL append publishes the entry to the broadcast
1259    /// channel so the WAL shipper can stream it to followers.
1260    pub fn enable_wal_replication(
1261        &self,
1262        tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
1263    ) {
1264        if let Some(ref wal_arc) = self.wal {
1265            wal_arc.set_replication_tx(tx);
1266            tracing::info!("WAL replication broadcast enabled");
1267        } else {
1268            tracing::warn!("Cannot enable WAL replication: WAL is not configured");
1269        }
1270    }
1271
1272    /// Get a reference to the WAL (if configured).
1273    /// Used by the replication catch-up protocol to determine oldest available offset.
1274    pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
1275        self.wal.as_ref()
1276    }
1277
1278    /// Get a reference to the Parquet storage (if configured).
1279    /// Used by the replication catch-up protocol to stream snapshot files to followers.
1280    pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
1281        self.storage.as_ref()
1282    }
1283}
1284
1285/// Configuration for EventStore
1286#[derive(Debug, Clone, Default)]
1287pub struct EventStoreConfig {
1288    /// Optional directory for persistent Parquet storage (v0.2 feature)
1289    pub storage_dir: Option<PathBuf>,
1290
1291    /// Snapshot configuration (v0.2 feature)
1292    pub snapshot_config: SnapshotConfig,
1293
1294    /// Optional directory for WAL (Write-Ahead Log) (v0.2 feature)
1295    pub wal_dir: Option<PathBuf>,
1296
1297    /// WAL configuration (v0.2 feature)
1298    pub wal_config: WALConfig,
1299
1300    /// Compaction configuration (v0.2 feature)
1301    pub compaction_config: CompactionConfig,
1302
1303    /// Schema registry configuration (v0.5 feature)
1304    pub schema_registry_config: SchemaRegistryConfig,
1305
1306    /// Optional directory for system metadata storage (dogfood feature).
1307    /// When set, operational metadata (tenants, config, audit) is stored
1308    /// using AllSource's own event store rather than an external database.
1309    /// Defaults to `{storage_dir}/__system/` when storage_dir is set.
1310    pub system_data_dir: Option<PathBuf>,
1311
1312    /// Name of the default tenant to auto-create on first boot.
1313    pub bootstrap_tenant: Option<String>,
1314}
1315
1316impl EventStoreConfig {
1317    /// Create config with persistent storage enabled
1318    pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
1319        Self {
1320            storage_dir: Some(storage_dir.into()),
1321            ..Self::default()
1322        }
1323    }
1324
1325    /// Create config with custom snapshot settings
1326    pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
1327        Self {
1328            snapshot_config,
1329            ..Self::default()
1330        }
1331    }
1332
1333    /// Create config with WAL enabled
1334    pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
1335        Self {
1336            wal_dir: Some(wal_dir.into()),
1337            wal_config,
1338            ..Self::default()
1339        }
1340    }
1341
1342    /// Create config with both persistence and snapshots
1343    pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
1344        Self {
1345            storage_dir: Some(storage_dir.into()),
1346            snapshot_config,
1347            ..Self::default()
1348        }
1349    }
1350
1351    /// Create production config with all features enabled
1352    pub fn production(
1353        storage_dir: impl Into<PathBuf>,
1354        wal_dir: impl Into<PathBuf>,
1355        snapshot_config: SnapshotConfig,
1356        wal_config: WALConfig,
1357        compaction_config: CompactionConfig,
1358    ) -> Self {
1359        let storage_dir = storage_dir.into();
1360        let system_data_dir = storage_dir.join("__system");
1361        Self {
1362            storage_dir: Some(storage_dir),
1363            snapshot_config,
1364            wal_dir: Some(wal_dir.into()),
1365            wal_config,
1366            compaction_config,
1367            system_data_dir: Some(system_data_dir),
1368            ..Self::default()
1369        }
1370    }
1371
1372    /// Resolve the effective system data directory.
1373    ///
1374    /// If explicitly set, returns that. Otherwise, derives from storage_dir.
1375    /// Returns None if neither is configured (in-memory mode).
1376    pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
1377        self.system_data_dir
1378            .clone()
1379            .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
1380    }
1381
1382    /// Build config from environment variables.
1383    ///
1384    /// Reads `ALLSOURCE_DATA_DIR`, `ALLSOURCE_STORAGE_DIR`, `ALLSOURCE_WAL_DIR`,
1385    /// and `ALLSOURCE_WAL_ENABLED` to determine persistence mode.
1386    ///
1387    /// Returns `(config, description)` where description is a human-readable
1388    /// summary of the persistence mode for logging.
1389    pub fn from_env() -> (Self, &'static str) {
1390        Self::from_env_vars(
1391            std::env::var("ALLSOURCE_DATA_DIR")
1392                .ok()
1393                .filter(|s| !s.is_empty()),
1394            std::env::var("ALLSOURCE_STORAGE_DIR")
1395                .ok()
1396                .filter(|s| !s.is_empty()),
1397            std::env::var("ALLSOURCE_WAL_DIR")
1398                .ok()
1399                .filter(|s| !s.is_empty()),
1400            std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
1401        )
1402    }
1403
1404    /// Build config from explicit env-var values (testable without mutating process env).
1405    pub fn from_env_vars(
1406        data_dir: Option<String>,
1407        explicit_storage_dir: Option<String>,
1408        explicit_wal_dir: Option<String>,
1409        wal_enabled_var: Option<String>,
1410    ) -> (Self, &'static str) {
1411        let data_dir = data_dir.filter(|s| !s.is_empty());
1412        let storage_dir = explicit_storage_dir
1413            .filter(|s| !s.is_empty())
1414            .or_else(|| data_dir.as_ref().map(|d| format!("{}/storage", d)));
1415        let wal_dir = explicit_wal_dir
1416            .filter(|s| !s.is_empty())
1417            .or_else(|| data_dir.as_ref().map(|d| format!("{}/wal", d)));
1418        let wal_enabled = wal_enabled_var.map(|v| v == "true").unwrap_or(true);
1419
1420        match (&storage_dir, &wal_dir) {
1421            (Some(sd), Some(wd)) if wal_enabled => {
1422                let config = Self::production(
1423                    sd,
1424                    wd,
1425                    SnapshotConfig::default(),
1426                    WALConfig::default(),
1427                    CompactionConfig::default(),
1428                );
1429                (config, "wal+parquet")
1430            }
1431            (Some(sd), _) => {
1432                let config = Self::with_persistence(sd);
1433                (config, "parquet-only")
1434            }
1435            (_, Some(wd)) if wal_enabled => {
1436                let config = Self::with_wal(wd, WALConfig::default());
1437                (config, "wal-only")
1438            }
1439            _ => (Self::default(), "in-memory"),
1440        }
1441    }
1442}
1443
1444#[derive(Debug, serde::Serialize)]
1445pub struct StoreStats {
1446    pub total_events: usize,
1447    pub total_entities: usize,
1448    pub total_event_types: usize,
1449    pub total_ingested: u64,
1450}
1451
1452/// Information about a stream (entity_id)
1453#[derive(Debug, Clone, serde::Serialize)]
1454pub struct StreamInfo {
1455    /// The stream identifier (entity_id)
1456    pub stream_id: String,
1457    /// Total number of events in this stream
1458    pub event_count: usize,
1459    /// Timestamp of the last event in this stream
1460    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1461}
1462
1463/// Information about an event type
1464#[derive(Debug, Clone, serde::Serialize)]
1465pub struct EventTypeInfo {
1466    /// The event type name
1467    pub event_type: String,
1468    /// Total number of events of this type
1469    pub event_count: usize,
1470    /// Timestamp of the last event of this type
1471    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
1472}
1473
1474impl Default for EventStore {
1475    fn default() -> Self {
1476        Self::new()
1477    }
1478}
1479
1480#[cfg(test)]
1481mod tests {
1482    use super::*;
1483    use crate::domain::entities::Event;
1484    use tempfile::TempDir;
1485
1486    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1487        Event::from_strings(
1488            event_type.to_string(),
1489            entity_id.to_string(),
1490            "default".to_string(),
1491            serde_json::json!({"name": "Test", "value": 42}),
1492            None,
1493        )
1494        .unwrap()
1495    }
1496
1497    fn create_test_event_with_payload(
1498        entity_id: &str,
1499        event_type: &str,
1500        payload: serde_json::Value,
1501    ) -> Event {
1502        Event::from_strings(
1503            event_type.to_string(),
1504            entity_id.to_string(),
1505            "default".to_string(),
1506            payload,
1507            None,
1508        )
1509        .unwrap()
1510    }
1511
1512    #[test]
1513    fn test_event_store_new() {
1514        let store = EventStore::new();
1515        assert_eq!(store.stats().total_events, 0);
1516        assert_eq!(store.stats().total_entities, 0);
1517    }
1518
1519    #[test]
1520    fn test_event_store_default() {
1521        let store = EventStore::default();
1522        assert_eq!(store.stats().total_events, 0);
1523    }
1524
1525    #[test]
1526    fn test_ingest_single_event() {
1527        let store = EventStore::new();
1528        let event = create_test_event("entity-1", "user.created");
1529
1530        store.ingest(event).unwrap();
1531
1532        assert_eq!(store.stats().total_events, 1);
1533        assert_eq!(store.stats().total_ingested, 1);
1534    }
1535
1536    #[test]
1537    fn test_ingest_multiple_events() {
1538        let store = EventStore::new();
1539
1540        for i in 0..10 {
1541            let event = create_test_event(&format!("entity-{}", i), "user.created");
1542            store.ingest(event).unwrap();
1543        }
1544
1545        assert_eq!(store.stats().total_events, 10);
1546        assert_eq!(store.stats().total_ingested, 10);
1547    }
1548
1549    #[test]
1550    fn test_query_by_entity_id() {
1551        let store = EventStore::new();
1552
1553        store
1554            .ingest(create_test_event("entity-1", "user.created"))
1555            .unwrap();
1556        store
1557            .ingest(create_test_event("entity-2", "user.created"))
1558            .unwrap();
1559        store
1560            .ingest(create_test_event("entity-1", "user.updated"))
1561            .unwrap();
1562
1563        let results = store
1564            .query(QueryEventsRequest {
1565                entity_id: Some("entity-1".to_string()),
1566                event_type: None,
1567                tenant_id: None,
1568                as_of: None,
1569                since: None,
1570                until: None,
1571                limit: None,
1572                event_type_prefix: None,
1573                payload_filter: None,
1574            })
1575            .unwrap();
1576
1577        assert_eq!(results.len(), 2);
1578    }
1579
1580    #[test]
1581    fn test_query_by_event_type() {
1582        let store = EventStore::new();
1583
1584        store
1585            .ingest(create_test_event("entity-1", "user.created"))
1586            .unwrap();
1587        store
1588            .ingest(create_test_event("entity-2", "user.updated"))
1589            .unwrap();
1590        store
1591            .ingest(create_test_event("entity-3", "user.created"))
1592            .unwrap();
1593
1594        let results = store
1595            .query(QueryEventsRequest {
1596                entity_id: None,
1597                event_type: Some("user.created".to_string()),
1598                tenant_id: None,
1599                as_of: None,
1600                since: None,
1601                until: None,
1602                limit: None,
1603                event_type_prefix: None,
1604                payload_filter: None,
1605            })
1606            .unwrap();
1607
1608        assert_eq!(results.len(), 2);
1609    }
1610
1611    #[test]
1612    fn test_query_with_limit() {
1613        let store = EventStore::new();
1614
1615        for i in 0..10 {
1616            let event = create_test_event(&format!("entity-{}", i), "user.created");
1617            store.ingest(event).unwrap();
1618        }
1619
1620        let results = store
1621            .query(QueryEventsRequest {
1622                entity_id: None,
1623                event_type: None,
1624                tenant_id: None,
1625                as_of: None,
1626                since: None,
1627                until: None,
1628                limit: Some(5),
1629                event_type_prefix: None,
1630                payload_filter: None,
1631            })
1632            .unwrap();
1633
1634        assert_eq!(results.len(), 5);
1635    }
1636
1637    #[test]
1638    fn test_query_empty_store() {
1639        let store = EventStore::new();
1640
1641        let results = store
1642            .query(QueryEventsRequest {
1643                entity_id: Some("non-existent".to_string()),
1644                event_type: None,
1645                tenant_id: None,
1646                as_of: None,
1647                since: None,
1648                until: None,
1649                limit: None,
1650                event_type_prefix: None,
1651                payload_filter: None,
1652            })
1653            .unwrap();
1654
1655        assert!(results.is_empty());
1656    }
1657
1658    #[test]
1659    fn test_reconstruct_state() {
1660        let store = EventStore::new();
1661
1662        store
1663            .ingest(create_test_event("entity-1", "user.created"))
1664            .unwrap();
1665
1666        let state = store.reconstruct_state("entity-1", None).unwrap();
1667        // The state is wrapped with metadata
1668        assert_eq!(state["current_state"]["name"], "Test");
1669        assert_eq!(state["current_state"]["value"], 42);
1670    }
1671
1672    #[test]
1673    fn test_reconstruct_state_not_found() {
1674        let store = EventStore::new();
1675
1676        let result = store.reconstruct_state("non-existent", None);
1677        assert!(result.is_err());
1678    }
1679
1680    #[test]
1681    fn test_get_snapshot_empty() {
1682        let store = EventStore::new();
1683
1684        let result = store.get_snapshot("non-existent");
1685        // Entity not found error is expected
1686        assert!(result.is_err());
1687    }
1688
1689    #[test]
1690    fn test_create_snapshot() {
1691        let store = EventStore::new();
1692
1693        store
1694            .ingest(create_test_event("entity-1", "user.created"))
1695            .unwrap();
1696
1697        store.create_snapshot("entity-1").unwrap();
1698
1699        // Verify snapshot was created
1700        let snapshot = store.get_snapshot("entity-1").unwrap();
1701        assert!(snapshot != serde_json::json!(null));
1702    }
1703
1704    #[test]
1705    fn test_create_snapshot_entity_not_found() {
1706        let store = EventStore::new();
1707
1708        let result = store.create_snapshot("non-existent");
1709        assert!(result.is_err());
1710    }
1711
1712    #[test]
1713    fn test_websocket_manager() {
1714        let store = EventStore::new();
1715        let manager = store.websocket_manager();
1716        // Manager should be accessible
1717        assert!(Arc::strong_count(&manager) >= 1);
1718    }
1719
1720    #[test]
1721    fn test_snapshot_manager() {
1722        let store = EventStore::new();
1723        let manager = store.snapshot_manager();
1724        assert!(Arc::strong_count(&manager) >= 1);
1725    }
1726
1727    #[test]
1728    fn test_compaction_manager_none() {
1729        let store = EventStore::new();
1730        // Without storage_dir, compaction manager should be None
1731        assert!(store.compaction_manager().is_none());
1732    }
1733
1734    #[test]
1735    fn test_schema_registry() {
1736        let store = EventStore::new();
1737        let registry = store.schema_registry();
1738        assert!(Arc::strong_count(&registry) >= 1);
1739    }
1740
1741    #[test]
1742    fn test_replay_manager() {
1743        let store = EventStore::new();
1744        let manager = store.replay_manager();
1745        assert!(Arc::strong_count(&manager) >= 1);
1746    }
1747
1748    #[test]
1749    fn test_pipeline_manager() {
1750        let store = EventStore::new();
1751        let manager = store.pipeline_manager();
1752        assert!(Arc::strong_count(&manager) >= 1);
1753    }
1754
1755    #[test]
1756    fn test_projection_manager() {
1757        let store = EventStore::new();
1758        let manager = store.projection_manager();
1759        // Built-in projections should be registered
1760        let projections = manager.list_projections();
1761        assert!(projections.len() >= 2); // entity_snapshots and event_counters
1762    }
1763
1764    #[test]
1765    fn test_projection_state_cache() {
1766        let store = EventStore::new();
1767        let cache = store.projection_state_cache();
1768
1769        cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
1770        assert_eq!(cache.len(), 1);
1771
1772        let value = cache.get("test:key").unwrap();
1773        assert_eq!(value["value"], 123);
1774    }
1775
1776    #[test]
1777    fn test_metrics() {
1778        let store = EventStore::new();
1779        let metrics = store.metrics();
1780        assert!(Arc::strong_count(&metrics) >= 1);
1781    }
1782
1783    #[test]
1784    fn test_store_stats() {
1785        let store = EventStore::new();
1786
1787        store
1788            .ingest(create_test_event("entity-1", "user.created"))
1789            .unwrap();
1790        store
1791            .ingest(create_test_event("entity-2", "order.placed"))
1792            .unwrap();
1793
1794        let stats = store.stats();
1795        assert_eq!(stats.total_events, 2);
1796        assert_eq!(stats.total_entities, 2);
1797        assert_eq!(stats.total_event_types, 2);
1798        assert_eq!(stats.total_ingested, 2);
1799    }
1800
1801    #[test]
1802    fn test_event_store_config_default() {
1803        let config = EventStoreConfig::default();
1804        assert!(config.storage_dir.is_none());
1805        assert!(config.wal_dir.is_none());
1806    }
1807
1808    #[test]
1809    fn test_event_store_config_with_persistence() {
1810        let temp_dir = TempDir::new().unwrap();
1811        let config = EventStoreConfig::with_persistence(temp_dir.path());
1812
1813        assert!(config.storage_dir.is_some());
1814        assert!(config.wal_dir.is_none());
1815    }
1816
1817    #[test]
1818    fn test_event_store_config_with_wal() {
1819        let temp_dir = TempDir::new().unwrap();
1820        let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
1821
1822        assert!(config.storage_dir.is_none());
1823        assert!(config.wal_dir.is_some());
1824    }
1825
1826    #[test]
1827    fn test_event_store_config_with_all() {
1828        let temp_dir = TempDir::new().unwrap();
1829        let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
1830
1831        assert!(config.storage_dir.is_some());
1832    }
1833
1834    #[test]
1835    fn test_event_store_config_production() {
1836        let storage_dir = TempDir::new().unwrap();
1837        let wal_dir = TempDir::new().unwrap();
1838        let config = EventStoreConfig::production(
1839            storage_dir.path(),
1840            wal_dir.path(),
1841            SnapshotConfig::default(),
1842            WALConfig::default(),
1843            CompactionConfig::default(),
1844        );
1845
1846        assert!(config.storage_dir.is_some());
1847        assert!(config.wal_dir.is_some());
1848    }
1849
1850    // -----------------------------------------------------------------------
1851    // from_env_vars tests — verifies the env-var-to-config wiring that
1852    // caused the durability bug (events lost on restart) in v0.10.3.
1853    // -----------------------------------------------------------------------
1854
1855    #[test]
1856    fn test_from_env_vars_data_dir_enables_full_persistence() {
1857        let (config, mode) =
1858            EventStoreConfig::from_env_vars(Some("/app/data".to_string()), None, None, None);
1859        assert_eq!(mode, "wal+parquet");
1860        assert_eq!(
1861            config.storage_dir.unwrap().to_str().unwrap(),
1862            "/app/data/storage"
1863        );
1864        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
1865    }
1866
1867    #[test]
1868    fn test_from_env_vars_explicit_dirs() {
1869        let (config, mode) = EventStoreConfig::from_env_vars(
1870            None,
1871            Some("/custom/storage".to_string()),
1872            Some("/custom/wal".to_string()),
1873            None,
1874        );
1875        assert_eq!(mode, "wal+parquet");
1876        assert_eq!(
1877            config.storage_dir.unwrap().to_str().unwrap(),
1878            "/custom/storage"
1879        );
1880        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
1881    }
1882
1883    #[test]
1884    fn test_from_env_vars_wal_disabled() {
1885        let (config, mode) = EventStoreConfig::from_env_vars(
1886            Some("/app/data".to_string()),
1887            None,
1888            None,
1889            Some("false".to_string()),
1890        );
1891        assert_eq!(mode, "parquet-only");
1892        assert!(config.storage_dir.is_some());
1893        assert!(config.wal_dir.is_none());
1894    }
1895
1896    #[test]
1897    fn test_from_env_vars_no_dirs_is_in_memory() {
1898        let (config, mode) = EventStoreConfig::from_env_vars(None, None, None, None);
1899        assert_eq!(mode, "in-memory");
1900        assert!(config.storage_dir.is_none());
1901        assert!(config.wal_dir.is_none());
1902    }
1903
1904    #[test]
1905    fn test_from_env_vars_empty_strings_treated_as_none() {
1906        let (_, mode) = EventStoreConfig::from_env_vars(
1907            Some("".to_string()),
1908            Some("".to_string()),
1909            Some("".to_string()),
1910            None,
1911        );
1912        assert_eq!(mode, "in-memory");
1913    }
1914
1915    #[test]
1916    fn test_from_env_vars_explicit_overrides_data_dir() {
1917        let (config, mode) = EventStoreConfig::from_env_vars(
1918            Some("/app/data".to_string()),
1919            Some("/override/storage".to_string()),
1920            Some("/override/wal".to_string()),
1921            None,
1922        );
1923        assert_eq!(mode, "wal+parquet");
1924        assert_eq!(
1925            config.storage_dir.unwrap().to_str().unwrap(),
1926            "/override/storage"
1927        );
1928        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
1929    }
1930
1931    #[test]
1932    fn test_from_env_vars_wal_only() {
1933        let (config, mode) =
1934            EventStoreConfig::from_env_vars(None, None, Some("/wal/only".to_string()), None);
1935        assert_eq!(mode, "wal-only");
1936        assert!(config.storage_dir.is_none());
1937        assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
1938    }
1939
1940    #[test]
1941    fn test_store_stats_serde() {
1942        let stats = StoreStats {
1943            total_events: 100,
1944            total_entities: 50,
1945            total_event_types: 10,
1946            total_ingested: 100,
1947        };
1948
1949        let json = serde_json::to_string(&stats).unwrap();
1950        assert!(json.contains("\"total_events\":100"));
1951        assert!(json.contains("\"total_entities\":50"));
1952    }
1953
1954    #[test]
1955    fn test_query_with_entity_and_type() {
1956        let store = EventStore::new();
1957
1958        store
1959            .ingest(create_test_event("entity-1", "user.created"))
1960            .unwrap();
1961        store
1962            .ingest(create_test_event("entity-1", "user.updated"))
1963            .unwrap();
1964        store
1965            .ingest(create_test_event("entity-2", "user.created"))
1966            .unwrap();
1967
1968        let results = store
1969            .query(QueryEventsRequest {
1970                entity_id: Some("entity-1".to_string()),
1971                event_type: Some("user.created".to_string()),
1972                tenant_id: None,
1973                as_of: None,
1974                since: None,
1975                until: None,
1976                limit: None,
1977                event_type_prefix: None,
1978                payload_filter: None,
1979            })
1980            .unwrap();
1981
1982        assert_eq!(results.len(), 1);
1983        assert_eq!(results[0].event_type_str(), "user.created");
1984    }
1985
1986    #[test]
1987    fn test_query_by_event_type_prefix() {
1988        let store = EventStore::new();
1989
1990        // Ingest events with various types
1991        store
1992            .ingest(create_test_event("entity-1", "index.created"))
1993            .unwrap();
1994        store
1995            .ingest(create_test_event("entity-2", "index.updated"))
1996            .unwrap();
1997        store
1998            .ingest(create_test_event("entity-3", "trade.created"))
1999            .unwrap();
2000        store
2001            .ingest(create_test_event("entity-4", "trade.completed"))
2002            .unwrap();
2003        store
2004            .ingest(create_test_event("entity-5", "balance.updated"))
2005            .unwrap();
2006
2007        // Query with prefix "index." should return exactly 2
2008        let results = store
2009            .query(QueryEventsRequest {
2010                entity_id: None,
2011                event_type: None,
2012                tenant_id: None,
2013                as_of: None,
2014                since: None,
2015                until: None,
2016                limit: None,
2017                event_type_prefix: Some("index.".to_string()),
2018                payload_filter: None,
2019            })
2020            .unwrap();
2021
2022        assert_eq!(results.len(), 2);
2023        assert!(
2024            results
2025                .iter()
2026                .all(|e| e.event_type_str().starts_with("index."))
2027        );
2028    }
2029
2030    #[test]
2031    fn test_query_by_event_type_prefix_empty_returns_all() {
2032        let store = EventStore::new();
2033
2034        store
2035            .ingest(create_test_event("entity-1", "index.created"))
2036            .unwrap();
2037        store
2038            .ingest(create_test_event("entity-2", "trade.created"))
2039            .unwrap();
2040
2041        // Empty prefix matches all types
2042        let results = store
2043            .query(QueryEventsRequest {
2044                entity_id: None,
2045                event_type: None,
2046                tenant_id: None,
2047                as_of: None,
2048                since: None,
2049                until: None,
2050                limit: None,
2051                event_type_prefix: Some("".to_string()),
2052                payload_filter: None,
2053            })
2054            .unwrap();
2055
2056        assert_eq!(results.len(), 2);
2057    }
2058
2059    #[test]
2060    fn test_query_by_event_type_prefix_no_match() {
2061        let store = EventStore::new();
2062
2063        store
2064            .ingest(create_test_event("entity-1", "index.created"))
2065            .unwrap();
2066
2067        let results = store
2068            .query(QueryEventsRequest {
2069                entity_id: None,
2070                event_type: None,
2071                tenant_id: None,
2072                as_of: None,
2073                since: None,
2074                until: None,
2075                limit: None,
2076                event_type_prefix: Some("nonexistent.".to_string()),
2077                payload_filter: None,
2078            })
2079            .unwrap();
2080
2081        assert!(results.is_empty());
2082    }
2083
2084    #[test]
2085    fn test_query_by_entity_with_type_prefix() {
2086        let store = EventStore::new();
2087
2088        store
2089            .ingest(create_test_event("entity-1", "index.created"))
2090            .unwrap();
2091        store
2092            .ingest(create_test_event("entity-1", "trade.created"))
2093            .unwrap();
2094        store
2095            .ingest(create_test_event("entity-2", "index.updated"))
2096            .unwrap();
2097
2098        // Query entity-1 with prefix "index." should return 1
2099        let results = store
2100            .query(QueryEventsRequest {
2101                entity_id: Some("entity-1".to_string()),
2102                event_type: None,
2103                tenant_id: None,
2104                as_of: None,
2105                since: None,
2106                until: None,
2107                limit: None,
2108                event_type_prefix: Some("index.".to_string()),
2109                payload_filter: None,
2110            })
2111            .unwrap();
2112
2113        assert_eq!(results.len(), 1);
2114        assert_eq!(results[0].event_type_str(), "index.created");
2115    }
2116
2117    #[test]
2118    fn test_query_prefix_with_limit() {
2119        let store = EventStore::new();
2120
2121        for i in 0..5 {
2122            store
2123                .ingest(create_test_event(&format!("entity-{}", i), "index.created"))
2124                .unwrap();
2125        }
2126
2127        let results = store
2128            .query(QueryEventsRequest {
2129                entity_id: None,
2130                event_type: None,
2131                tenant_id: None,
2132                as_of: None,
2133                since: None,
2134                until: None,
2135                limit: Some(3),
2136                event_type_prefix: Some("index.".to_string()),
2137                payload_filter: None,
2138            })
2139            .unwrap();
2140
2141        assert_eq!(results.len(), 3);
2142    }
2143
2144    #[test]
2145    fn test_query_prefix_alongside_existing_filters() {
2146        let store = EventStore::new();
2147
2148        store
2149            .ingest(create_test_event("entity-1", "index.created"))
2150            .unwrap();
2151        // Sleep briefly to ensure different timestamps
2152        std::thread::sleep(std::time::Duration::from_millis(10));
2153        store
2154            .ingest(create_test_event("entity-2", "index.strategy.updated"))
2155            .unwrap();
2156        std::thread::sleep(std::time::Duration::from_millis(10));
2157        store
2158            .ingest(create_test_event("entity-3", "index.deleted"))
2159            .unwrap();
2160
2161        // Prefix with limit
2162        let results = store
2163            .query(QueryEventsRequest {
2164                entity_id: None,
2165                event_type: None,
2166                tenant_id: None,
2167                as_of: None,
2168                since: None,
2169                until: None,
2170                limit: Some(2),
2171                event_type_prefix: Some("index.".to_string()),
2172                payload_filter: None,
2173            })
2174            .unwrap();
2175
2176        assert_eq!(results.len(), 2);
2177    }
2178
2179    #[test]
2180    fn test_query_with_payload_filter() {
2181        let store = EventStore::new();
2182
2183        // Ingest 5 events with user_id=alice
2184        for i in 0..5 {
2185            store
2186                .ingest(create_test_event_with_payload(
2187                    &format!("entity-{}", i),
2188                    "user.action",
2189                    serde_json::json!({"user_id": "alice", "action": "click"}),
2190                ))
2191                .unwrap();
2192        }
2193        // Ingest 5 events with user_id=bob
2194        for i in 5..10 {
2195            store
2196                .ingest(create_test_event_with_payload(
2197                    &format!("entity-{}", i),
2198                    "user.action",
2199                    serde_json::json!({"user_id": "bob", "action": "view"}),
2200                ))
2201                .unwrap();
2202        }
2203
2204        // Filter for alice
2205        let results = store
2206            .query(QueryEventsRequest {
2207                entity_id: None,
2208                event_type: Some("user.action".to_string()),
2209                tenant_id: None,
2210                as_of: None,
2211                since: None,
2212                until: None,
2213                limit: None,
2214                event_type_prefix: None,
2215                payload_filter: Some(r#"{"user_id":"alice"}"#.to_string()),
2216            })
2217            .unwrap();
2218
2219        assert_eq!(results.len(), 5);
2220    }
2221
2222    #[test]
2223    fn test_query_payload_filter_non_existent_field() {
2224        let store = EventStore::new();
2225
2226        store
2227            .ingest(create_test_event_with_payload(
2228                "entity-1",
2229                "user.action",
2230                serde_json::json!({"user_id": "alice"}),
2231            ))
2232            .unwrap();
2233
2234        // Filter for a field that doesn't exist — returns 0, not error
2235        let results = store
2236            .query(QueryEventsRequest {
2237                entity_id: None,
2238                event_type: None,
2239                tenant_id: None,
2240                as_of: None,
2241                since: None,
2242                until: None,
2243                limit: None,
2244                event_type_prefix: None,
2245                payload_filter: Some(r#"{"nonexistent":"value"}"#.to_string()),
2246            })
2247            .unwrap();
2248
2249        assert!(results.is_empty());
2250    }
2251
2252    #[test]
2253    fn test_query_payload_filter_with_prefix() {
2254        let store = EventStore::new();
2255
2256        store
2257            .ingest(create_test_event_with_payload(
2258                "entity-1",
2259                "index.created",
2260                serde_json::json!({"status": "active"}),
2261            ))
2262            .unwrap();
2263        store
2264            .ingest(create_test_event_with_payload(
2265                "entity-2",
2266                "index.created",
2267                serde_json::json!({"status": "inactive"}),
2268            ))
2269            .unwrap();
2270        store
2271            .ingest(create_test_event_with_payload(
2272                "entity-3",
2273                "trade.created",
2274                serde_json::json!({"status": "active"}),
2275            ))
2276            .unwrap();
2277
2278        // Combine prefix + payload filter
2279        let results = store
2280            .query(QueryEventsRequest {
2281                entity_id: None,
2282                event_type: None,
2283                tenant_id: None,
2284                as_of: None,
2285                since: None,
2286                until: None,
2287                limit: None,
2288                event_type_prefix: Some("index.".to_string()),
2289                payload_filter: Some(r#"{"status":"active"}"#.to_string()),
2290            })
2291            .unwrap();
2292
2293        assert_eq!(results.len(), 1);
2294        assert_eq!(results[0].entity_id().to_string(), "entity-1");
2295    }
2296
2297    #[test]
2298    fn test_flush_storage_no_storage() {
2299        let store = EventStore::new();
2300        // Without storage, flush should succeed (no-op)
2301        let result = store.flush_storage();
2302        assert!(result.is_ok());
2303    }
2304
2305    #[test]
2306    fn test_state_evolution() {
2307        let store = EventStore::new();
2308
2309        // Initial state
2310        store
2311            .ingest(
2312                Event::from_strings(
2313                    "user.created".to_string(),
2314                    "user-1".to_string(),
2315                    "default".to_string(),
2316                    serde_json::json!({"name": "Alice", "age": 25}),
2317                    None,
2318                )
2319                .unwrap(),
2320            )
2321            .unwrap();
2322
2323        // Update state
2324        store
2325            .ingest(
2326                Event::from_strings(
2327                    "user.updated".to_string(),
2328                    "user-1".to_string(),
2329                    "default".to_string(),
2330                    serde_json::json!({"age": 26}),
2331                    None,
2332                )
2333                .unwrap(),
2334            )
2335            .unwrap();
2336
2337        let state = store.reconstruct_state("user-1", None).unwrap();
2338        // The state is wrapped with metadata
2339        assert_eq!(state["current_state"]["name"], "Alice");
2340        assert_eq!(state["current_state"]["age"], 26);
2341    }
2342
2343    #[test]
2344    fn test_reject_system_event_types() {
2345        let store = EventStore::new();
2346
2347        // System event types should be rejected via user-facing ingestion
2348        let event = Event::reconstruct_from_strings(
2349            uuid::Uuid::new_v4(),
2350            "_system.tenant.created".to_string(),
2351            "_system:tenant:acme".to_string(),
2352            "_system".to_string(),
2353            serde_json::json!({"name": "ACME"}),
2354            chrono::Utc::now(),
2355            None,
2356            1,
2357        );
2358
2359        let result = store.ingest(event);
2360        assert!(result.is_err());
2361        let err = result.unwrap_err();
2362        assert!(
2363            err.to_string().contains("reserved for internal use"),
2364            "Expected system namespace rejection, got: {}",
2365            err
2366        );
2367    }
2368}