Skip to main content

allsource_core/
store.rs

1use crate::{
2    application::{
3        dto::QueryEventsRequest,
4        services::{
5            pipeline::PipelineManager,
6            projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
7            replay::ReplayManager,
8            schema::{SchemaRegistry, SchemaRegistryConfig},
9        },
10    },
11    domain::entities::Event,
12    error::{AllSourceError, Result},
13    infrastructure::{
14        observability::metrics::MetricsRegistry,
15        persistence::{
16            compaction::{CompactionConfig, CompactionManager},
17            index::{EventIndex, IndexEntry},
18            snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
19            storage::ParquetStorage,
20            wal::{WALConfig, WriteAheadLog},
21        },
22        web::websocket::WebSocketManager,
23    },
24};
25use chrono::{DateTime, Utc};
26use dashmap::DashMap;
27use parking_lot::RwLock;
28use std::{path::PathBuf, sync::Arc};
29
30/// High-performance event store with columnar storage
31pub struct EventStore {
32    /// In-memory event storage
33    events: Arc<RwLock<Vec<Event>>>,
34
35    /// High-performance concurrent index
36    index: Arc<EventIndex>,
37
38    /// Projection manager for real-time aggregations
39    pub(crate) projections: Arc<RwLock<ProjectionManager>>,
40
41    /// Optional persistent storage (v0.2 feature)
42    storage: Option<Arc<RwLock<ParquetStorage>>>,
43
44    /// WebSocket manager for real-time event streaming (v0.2 feature)
45    websocket_manager: Arc<WebSocketManager>,
46
47    /// Snapshot manager for fast state recovery (v0.2 feature)
48    snapshot_manager: Arc<SnapshotManager>,
49
50    /// Write-Ahead Log for durability (v0.2 feature)
51    wal: Option<Arc<WriteAheadLog>>,
52
53    /// Compaction manager for Parquet optimization (v0.2 feature)
54    compaction_manager: Option<Arc<CompactionManager>>,
55
56    /// Schema registry for event validation (v0.5 feature)
57    schema_registry: Arc<SchemaRegistry>,
58
59    /// Replay manager for event replay and projection rebuilding (v0.5 feature)
60    replay_manager: Arc<ReplayManager>,
61
62    /// Pipeline manager for stream processing (v0.5 feature)
63    pipeline_manager: Arc<PipelineManager>,
64
65    /// Prometheus metrics registry (v0.6 feature)
66    metrics: Arc<MetricsRegistry>,
67
68    /// Total events ingested (for metrics)
69    total_ingested: Arc<RwLock<u64>>,
70
71    /// Projection state cache for Query Service integration (v0.7 feature)
72    /// Key format: "{projection_name}:{entity_id}"
73    /// This DashMap provides O(1) access with ~11.9 μs latency
74    projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
75}
76
77impl EventStore {
78    /// Create a new in-memory event store
79    pub fn new() -> Self {
80        Self::with_config(EventStoreConfig::default())
81    }
82
83    /// Create event store with custom configuration
84    pub fn with_config(config: EventStoreConfig) -> Self {
85        let mut projections = ProjectionManager::new();
86
87        // Register built-in projections
88        projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
89        projections.register(Arc::new(EventCounterProjection::new("event_counters")));
90
91        // Initialize persistent storage if configured
92        let storage = config
93            .storage_dir
94            .as_ref()
95            .and_then(|dir| match ParquetStorage::new(dir) {
96                Ok(storage) => {
97                    tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
98                    Some(Arc::new(RwLock::new(storage)))
99                }
100                Err(e) => {
101                    tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
102                    None
103                }
104            });
105
106        // Initialize WAL if configured (v0.2 feature)
107        let wal = config.wal_dir.as_ref().and_then(|dir| {
108            match WriteAheadLog::new(dir, config.wal_config.clone()) {
109                Ok(wal) => {
110                    tracing::info!("✅ WAL enabled at: {}", dir.display());
111                    Some(Arc::new(wal))
112                }
113                Err(e) => {
114                    tracing::error!("❌ Failed to initialize WAL: {}", e);
115                    None
116                }
117            }
118        });
119
120        // Initialize compaction manager if Parquet storage is enabled (v0.2 feature)
121        let compaction_manager = config.storage_dir.as_ref().map(|dir| {
122            let manager = CompactionManager::new(dir, config.compaction_config.clone());
123            Arc::new(manager)
124        });
125
126        // Initialize schema registry (v0.5 feature)
127        let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
128        tracing::info!("✅ Schema registry enabled");
129
130        // Initialize replay manager (v0.5 feature)
131        let replay_manager = Arc::new(ReplayManager::new());
132        tracing::info!("✅ Replay manager enabled");
133
134        // Initialize pipeline manager (v0.5 feature)
135        let pipeline_manager = Arc::new(PipelineManager::new());
136        tracing::info!("✅ Pipeline manager enabled");
137
138        // Initialize metrics registry (v0.6 feature)
139        let metrics = MetricsRegistry::new();
140        tracing::info!("✅ Prometheus metrics registry initialized");
141
142        // Initialize projection state cache (v0.7 feature)
143        let projection_state_cache = Arc::new(DashMap::new());
144        tracing::info!("✅ Projection state cache initialized");
145
146        let store = Self {
147            events: Arc::new(RwLock::new(Vec::new())),
148            index: Arc::new(EventIndex::new()),
149            projections: Arc::new(RwLock::new(projections)),
150            storage,
151            websocket_manager: Arc::new(WebSocketManager::new()),
152            snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
153            wal,
154            compaction_manager,
155            schema_registry,
156            replay_manager,
157            pipeline_manager,
158            metrics,
159            total_ingested: Arc::new(RwLock::new(0)),
160            projection_state_cache,
161        };
162
163        // Recover from WAL first (most recent data)
164        let mut wal_recovered = false;
165        if let Some(ref wal) = store.wal {
166            match wal.recover() {
167                Ok(recovered_events) if !recovered_events.is_empty() => {
168                    tracing::info!(
169                        "🔄 Recovering {} events from WAL...",
170                        recovered_events.len()
171                    );
172
173                    for event in recovered_events {
174                        // Re-index and process events from WAL
175                        let offset = store.events.read().len();
176                        if let Err(e) = store.index.index_event(
177                            event.id,
178                            event.entity_id_str(),
179                            event.event_type_str(),
180                            event.timestamp,
181                            offset,
182                        ) {
183                            tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
184                        }
185
186                        if let Err(e) = store.projections.read().process_event(&event) {
187                            tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
188                        }
189
190                        store.events.write().push(event);
191                    }
192
193                    let total = store.events.read().len();
194                    *store.total_ingested.write() = total as u64;
195                    tracing::info!("✅ Successfully recovered {} events from WAL", total);
196
197                    // After successful recovery, checkpoint to Parquet if enabled
198                    if store.storage.is_some() {
199                        tracing::info!("📸 Checkpointing WAL to Parquet storage...");
200                        if let Err(e) = store.flush_storage() {
201                            tracing::error!("Failed to checkpoint to Parquet: {}", e);
202                        } else if let Err(e) = wal.truncate() {
203                            tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
204                        } else {
205                            tracing::info!("✅ WAL checkpointed and truncated");
206                        }
207                    }
208
209                    wal_recovered = true;
210                }
211                Ok(_) => {
212                    tracing::debug!("No events to recover from WAL");
213                }
214                Err(e) => {
215                    tracing::error!("❌ WAL recovery failed: {}", e);
216                }
217            }
218        }
219
220        // Load persisted events from Parquet only if we didn't recover from WAL
221        // (to avoid loading the same events twice after WAL checkpoint)
222        if !wal_recovered
223            && let Some(ref storage) = store.storage
224            && let Ok(persisted_events) = storage.read().load_all_events()
225        {
226            tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
227
228            for event in persisted_events {
229                // Re-index loaded events
230                let offset = store.events.read().len();
231                if let Err(e) = store.index.index_event(
232                    event.id,
233                    event.entity_id_str(),
234                    event.event_type_str(),
235                    event.timestamp,
236                    offset,
237                ) {
238                    tracing::error!("Failed to re-index event {}: {}", event.id, e);
239                }
240
241                // Re-process through projections
242                if let Err(e) = store.projections.read().process_event(&event) {
243                    tracing::error!("Failed to re-process event {}: {}", event.id, e);
244                }
245
246                store.events.write().push(event);
247            }
248
249            let total = store.events.read().len();
250            *store.total_ingested.write() = total as u64;
251            tracing::info!("✅ Successfully loaded {} events from storage", total);
252        }
253
254        store
255    }
256
257    /// Ingest a new event into the store
258    pub fn ingest(&self, event: Event) -> Result<()> {
259        // Start metrics timer (v0.6 feature)
260        let timer = self.metrics.ingestion_duration_seconds.start_timer();
261
262        // Validate event
263        let validation_result = self.validate_event(&event);
264        if let Err(e) = validation_result {
265            // Record ingestion error
266            self.metrics.ingestion_errors_total.inc();
267            timer.observe_duration();
268            return Err(e);
269        }
270
271        // Write to WAL FIRST for durability (v0.2 feature)
272        // This ensures event is persisted before processing
273        if let Some(ref wal) = self.wal
274            && let Err(e) = wal.append(event.clone())
275        {
276            self.metrics.ingestion_errors_total.inc();
277            timer.observe_duration();
278            return Err(e);
279        }
280
281        let mut events = self.events.write();
282        let offset = events.len();
283
284        // Index the event
285        self.index.index_event(
286            event.id,
287            event.entity_id_str(),
288            event.event_type_str(),
289            event.timestamp,
290            offset,
291        )?;
292
293        // Process through projections
294        let projections = self.projections.read();
295        projections.process_event(&event)?;
296        drop(projections); // Release lock
297
298        // Process through pipelines (v0.5 feature)
299        // Pipelines can transform, filter, and aggregate events in real-time
300        let pipeline_results = self.pipeline_manager.process_event(&event);
301        if !pipeline_results.is_empty() {
302            tracing::debug!(
303                "Event {} processed by {} pipeline(s)",
304                event.id,
305                pipeline_results.len()
306            );
307            // Pipeline results could be stored, emitted, or forwarded elsewhere
308            // For now, we just log them for observability
309            for (pipeline_id, result) in pipeline_results {
310                tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
311            }
312        }
313
314        // Persist to Parquet storage if enabled (v0.2)
315        if let Some(ref storage) = self.storage {
316            let storage = storage.read();
317            storage.append_event(event.clone())?;
318        }
319
320        // Store the event in memory
321        events.push(event.clone());
322        let total_events = events.len();
323        drop(events); // Release lock early
324
325        // Broadcast to WebSocket clients (v0.2 feature)
326        self.websocket_manager
327            .broadcast_event(Arc::new(event.clone()));
328
329        // Check if automatic snapshot should be created (v0.2 feature)
330        self.check_auto_snapshot(event.entity_id_str(), &event);
331
332        // Update metrics (v0.6 feature)
333        self.metrics.events_ingested_total.inc();
334        self.metrics
335            .events_ingested_by_type
336            .with_label_values(&[event.event_type_str()])
337            .inc();
338        self.metrics.storage_events_total.set(total_events as i64);
339
340        // Update legacy total counter
341        let mut total = self.total_ingested.write();
342        *total += 1;
343
344        timer.observe_duration();
345
346        tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
347
348        Ok(())
349    }
350
351    /// Ingest a replicated event from the leader (follower mode).
352    ///
353    /// Unlike `ingest()`, this method:
354    /// - Skips WAL writing (the follower's WalReceiver manages its own local WAL)
355    /// - Skips schema validation (the leader already validated)
356    /// - Still indexes, processes projections/pipelines, and broadcasts to WebSocket clients
357    pub fn ingest_replicated(&self, event: Event) -> Result<()> {
358        let timer = self.metrics.ingestion_duration_seconds.start_timer();
359
360        let mut events = self.events.write();
361        let offset = events.len();
362
363        // Index the event
364        self.index.index_event(
365            event.id,
366            event.entity_id_str(),
367            event.event_type_str(),
368            event.timestamp,
369            offset,
370        )?;
371
372        // Process through projections
373        let projections = self.projections.read();
374        projections.process_event(&event)?;
375        drop(projections);
376
377        // Process through pipelines
378        let pipeline_results = self.pipeline_manager.process_event(&event);
379        if !pipeline_results.is_empty() {
380            tracing::debug!(
381                "Replicated event {} processed by {} pipeline(s)",
382                event.id,
383                pipeline_results.len()
384            );
385        }
386
387        // Store the event in memory
388        events.push(event.clone());
389        let total_events = events.len();
390        drop(events);
391
392        // Broadcast to WebSocket clients
393        self.websocket_manager
394            .broadcast_event(Arc::new(event.clone()));
395
396        // Update metrics
397        self.metrics.events_ingested_total.inc();
398        self.metrics
399            .events_ingested_by_type
400            .with_label_values(&[event.event_type_str()])
401            .inc();
402        self.metrics.storage_events_total.set(total_events as i64);
403
404        let mut total = self.total_ingested.write();
405        *total += 1;
406
407        timer.observe_duration();
408
409        tracing::debug!(
410            "Replicated event ingested: {} (offset: {})",
411            event.id,
412            offset
413        );
414
415        Ok(())
416    }
417
418    /// Get the WebSocket manager for this store
419    pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
420        Arc::clone(&self.websocket_manager)
421    }
422
423    /// Get the snapshot manager for this store
424    pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
425        Arc::clone(&self.snapshot_manager)
426    }
427
428    /// Get the compaction manager for this store
429    pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
430        self.compaction_manager.as_ref().map(Arc::clone)
431    }
432
433    /// Get the schema registry for this store (v0.5 feature)
434    pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
435        Arc::clone(&self.schema_registry)
436    }
437
438    /// Get the replay manager for this store (v0.5 feature)
439    pub fn replay_manager(&self) -> Arc<ReplayManager> {
440        Arc::clone(&self.replay_manager)
441    }
442
443    /// Get the pipeline manager for this store (v0.5 feature)
444    pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
445        Arc::clone(&self.pipeline_manager)
446    }
447
448    /// Get the metrics registry for this store (v0.6 feature)
449    pub fn metrics(&self) -> Arc<MetricsRegistry> {
450        Arc::clone(&self.metrics)
451    }
452
453    /// Get the projection manager for this store (v0.7 feature)
454    pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
455        self.projections.read()
456    }
457
458    /// Get the projection state cache for this store (v0.7 feature)
459    /// Used by Elixir Query Service for state synchronization
460    pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
461        Arc::clone(&self.projection_state_cache)
462    }
463
464    /// Manually flush any pending events to persistent storage
465    pub fn flush_storage(&self) -> Result<()> {
466        if let Some(ref storage) = self.storage {
467            let storage = storage.read();
468            storage.flush()?;
469            tracing::info!("✅ Flushed events to persistent storage");
470        }
471        Ok(())
472    }
473
474    /// Manually create a snapshot for an entity
475    pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
476        // Get all events for this entity
477        let events = self.query(QueryEventsRequest {
478            entity_id: Some(entity_id.to_string()),
479            event_type: None,
480            tenant_id: None,
481            as_of: None,
482            since: None,
483            until: None,
484            limit: None,
485        })?;
486
487        if events.is_empty() {
488            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
489        }
490
491        // Build current state
492        let mut state = serde_json::json!({});
493        for event in &events {
494            if let serde_json::Value::Object(ref mut state_map) = state
495                && let serde_json::Value::Object(ref payload_map) = event.payload
496            {
497                for (key, value) in payload_map {
498                    state_map.insert(key.clone(), value.clone());
499                }
500            }
501        }
502
503        let last_event = events.last().unwrap();
504        self.snapshot_manager.create_snapshot(
505            entity_id.to_string(),
506            state,
507            last_event.timestamp,
508            events.len(),
509            SnapshotType::Manual,
510        )?;
511
512        Ok(())
513    }
514
515    /// Check and create automatic snapshots if needed
516    fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
517        // Count events for this entity
518        let entity_event_count = self
519            .index
520            .get_by_entity(entity_id)
521            .map(|entries| entries.len())
522            .unwrap_or(0);
523
524        if self.snapshot_manager.should_create_snapshot(
525            entity_id,
526            entity_event_count,
527            event.timestamp,
528        ) {
529            // Create snapshot in background (don't block ingestion)
530            if let Err(e) = self.create_snapshot(entity_id) {
531                tracing::warn!(
532                    "Failed to create automatic snapshot for {}: {}",
533                    entity_id,
534                    e
535                );
536            }
537        }
538    }
539
540    /// Validate an event before ingestion
541    fn validate_event(&self, event: &Event) -> Result<()> {
542        // EntityId and EventType value objects already validate non-empty in their constructors
543        // So these checks are now redundant, but we keep them for explicit validation
544        if event.entity_id_str().is_empty() {
545            return Err(AllSourceError::ValidationError(
546                "entity_id cannot be empty".to_string(),
547            ));
548        }
549
550        if event.event_type_str().is_empty() {
551            return Err(AllSourceError::ValidationError(
552                "event_type cannot be empty".to_string(),
553            ));
554        }
555
556        // Reject system namespace events from user-facing ingestion.
557        // System events are written exclusively via SystemMetadataStore.
558        if event.event_type().is_system() {
559            return Err(AllSourceError::ValidationError(
560                "Event types starting with '_system.' are reserved for internal use".to_string(),
561            ));
562        }
563
564        Ok(())
565    }
566
567    /// Query events based on filters (optimized with indices)
568    pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
569        // Determine query type for metrics (v0.6 feature)
570        let query_type = if request.entity_id.is_some() {
571            "entity"
572        } else if request.event_type.is_some() {
573            "type"
574        } else {
575            "full_scan"
576        };
577
578        // Start metrics timer (v0.6 feature)
579        let timer = self
580            .metrics
581            .query_duration_seconds
582            .with_label_values(&[query_type])
583            .start_timer();
584
585        // Increment query counter (v0.6 feature)
586        self.metrics
587            .queries_total
588            .with_label_values(&[query_type])
589            .inc();
590
591        let events = self.events.read();
592
593        // Use index for fast lookups
594        let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
595            // Use entity index
596            self.index
597                .get_by_entity(entity_id)
598                .map(|entries| self.filter_entries(entries, &request))
599                .unwrap_or_default()
600        } else if let Some(event_type) = &request.event_type {
601            // Use type index
602            self.index
603                .get_by_type(event_type)
604                .map(|entries| self.filter_entries(entries, &request))
605                .unwrap_or_default()
606        } else {
607            // Full scan (less efficient but necessary for complex queries)
608            (0..events.len()).collect()
609        };
610
611        // Fetch events and apply remaining filters
612        let mut results: Vec<Event> = offsets
613            .iter()
614            .filter_map(|&offset| events.get(offset).cloned())
615            .filter(|event| self.apply_filters(event, &request))
616            .collect();
617
618        // Sort by timestamp (ascending)
619        results.sort_by_key(|x| x.timestamp);
620
621        // Apply limit
622        if let Some(limit) = request.limit {
623            results.truncate(limit);
624        }
625
626        // Record query results count (v0.6 feature)
627        self.metrics
628            .query_results_total
629            .with_label_values(&[query_type])
630            .inc_by(results.len() as u64);
631
632        timer.observe_duration();
633
634        Ok(results)
635    }
636
637    /// Filter index entries based on query parameters
638    fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
639        entries
640            .into_iter()
641            .filter(|entry| {
642                // Time filters
643                if let Some(as_of) = request.as_of
644                    && entry.timestamp > as_of
645                {
646                    return false;
647                }
648                if let Some(since) = request.since
649                    && entry.timestamp < since
650                {
651                    return false;
652                }
653                if let Some(until) = request.until
654                    && entry.timestamp > until
655                {
656                    return false;
657                }
658                true
659            })
660            .map(|entry| entry.offset)
661            .collect()
662    }
663
664    /// Apply filters to an event
665    fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
666        // Additional type filter if entity was primary
667        if request.entity_id.is_some()
668            && let Some(ref event_type) = request.event_type
669            && event.event_type_str() != event_type
670        {
671            return false;
672        }
673
674        true
675    }
676
677    /// Reconstruct entity state as of a specific timestamp
678    /// v0.2: Now uses snapshots for fast reconstruction
679    pub fn reconstruct_state(
680        &self,
681        entity_id: &str,
682        as_of: Option<DateTime<Utc>>,
683    ) -> Result<serde_json::Value> {
684        // Try to find a snapshot to use as a base (v0.2 optimization)
685        let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
686            // Get snapshot closest to requested time
687            if let Some(snapshot) = self
688                .snapshot_manager
689                .get_snapshot_as_of(entity_id, as_of_time)
690            {
691                tracing::debug!(
692                    "Using snapshot from {} for entity {} (saved {} events)",
693                    snapshot.as_of,
694                    entity_id,
695                    snapshot.event_count
696                );
697                (snapshot.state.clone(), Some(snapshot.as_of))
698            } else {
699                (serde_json::json!({}), None)
700            }
701        } else {
702            // Get latest snapshot for current state
703            if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
704                tracing::debug!(
705                    "Using latest snapshot from {} for entity {}",
706                    snapshot.as_of,
707                    entity_id
708                );
709                (snapshot.state.clone(), Some(snapshot.as_of))
710            } else {
711                (serde_json::json!({}), None)
712            }
713        };
714
715        // Query events after the snapshot (or all if no snapshot)
716        let events = self.query(QueryEventsRequest {
717            entity_id: Some(entity_id.to_string()),
718            event_type: None,
719            tenant_id: None,
720            as_of,
721            since: since_timestamp,
722            until: None,
723            limit: None,
724        })?;
725
726        // If no events and no snapshot, entity not found
727        if events.is_empty() && since_timestamp.is_none() {
728            return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
729        }
730
731        // Merge events on top of snapshot (or from scratch if no snapshot)
732        let mut merged_state = merged_state;
733        for event in &events {
734            if let serde_json::Value::Object(ref mut state_map) = merged_state
735                && let serde_json::Value::Object(ref payload_map) = event.payload
736            {
737                for (key, value) in payload_map {
738                    state_map.insert(key.clone(), value.clone());
739                }
740            }
741        }
742
743        // Wrap with metadata
744        let state = serde_json::json!({
745            "entity_id": entity_id,
746            "last_updated": events.last().map(|e| e.timestamp),
747            "event_count": events.len(),
748            "as_of": as_of,
749            "current_state": merged_state,
750            "history": events.iter().map(|e| {
751                serde_json::json!({
752                    "event_id": e.id,
753                    "type": e.event_type,
754                    "timestamp": e.timestamp,
755                    "payload": e.payload
756                })
757            }).collect::<Vec<_>>()
758        });
759
760        Ok(state)
761    }
762
763    /// Get snapshot from projection (faster than reconstructing)
764    pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
765        let projections = self.projections.read();
766
767        if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
768            && let Some(state) = snapshot_projection.get_state(entity_id)
769        {
770            return Ok(serde_json::json!({
771                "entity_id": entity_id,
772                "snapshot": state,
773                "from_projection": "entity_snapshots"
774            }));
775        }
776
777        Err(AllSourceError::EntityNotFound(entity_id.to_string()))
778    }
779
780    /// Get statistics about the event store
781    pub fn stats(&self) -> StoreStats {
782        let events = self.events.read();
783        let index_stats = self.index.stats();
784
785        StoreStats {
786            total_events: events.len(),
787            total_entities: index_stats.total_entities,
788            total_event_types: index_stats.total_event_types,
789            total_ingested: *self.total_ingested.read(),
790        }
791    }
792
793    /// Get all unique streams (entity_ids) in the store
794    pub fn list_streams(&self) -> Vec<StreamInfo> {
795        self.index
796            .get_all_entities()
797            .into_iter()
798            .map(|entity_id| {
799                let event_count = self
800                    .index
801                    .get_by_entity(&entity_id)
802                    .map(|entries| entries.len())
803                    .unwrap_or(0);
804                let last_event_at = self
805                    .index
806                    .get_by_entity(&entity_id)
807                    .and_then(|entries| entries.last().map(|e| e.timestamp));
808                StreamInfo {
809                    stream_id: entity_id,
810                    event_count,
811                    last_event_at,
812                }
813            })
814            .collect()
815    }
816
817    /// Get all unique event types in the store
818    pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
819        self.index
820            .get_all_types()
821            .into_iter()
822            .map(|event_type| {
823                let event_count = self
824                    .index
825                    .get_by_type(&event_type)
826                    .map(|entries| entries.len())
827                    .unwrap_or(0);
828                let last_event_at = self
829                    .index
830                    .get_by_type(&event_type)
831                    .and_then(|entries| entries.last().map(|e| e.timestamp));
832                EventTypeInfo {
833                    event_type,
834                    event_count,
835                    last_event_at,
836                }
837            })
838            .collect()
839    }
840
841    /// Attach a broadcast sender to the WAL for replication.
842    ///
843    /// Thread-safe: can be called through `Arc<EventStore>` at runtime.
844    /// Used during initial setup and during follower → leader promotion.
845    /// When set, every WAL append publishes the entry to the broadcast
846    /// channel so the WAL shipper can stream it to followers.
847    pub fn enable_wal_replication(
848        &self,
849        tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
850    ) {
851        if let Some(ref wal_arc) = self.wal {
852            wal_arc.set_replication_tx(tx);
853            tracing::info!("WAL replication broadcast enabled");
854        } else {
855            tracing::warn!("Cannot enable WAL replication: WAL is not configured");
856        }
857    }
858
859    /// Get a reference to the WAL (if configured).
860    /// Used by the replication catch-up protocol to determine oldest available offset.
861    pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
862        self.wal.as_ref()
863    }
864
865    /// Get a reference to the Parquet storage (if configured).
866    /// Used by the replication catch-up protocol to stream snapshot files to followers.
867    pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
868        self.storage.as_ref()
869    }
870}
871
872/// Configuration for EventStore
873#[derive(Debug, Clone, Default)]
874pub struct EventStoreConfig {
875    /// Optional directory for persistent Parquet storage (v0.2 feature)
876    pub storage_dir: Option<PathBuf>,
877
878    /// Snapshot configuration (v0.2 feature)
879    pub snapshot_config: SnapshotConfig,
880
881    /// Optional directory for WAL (Write-Ahead Log) (v0.2 feature)
882    pub wal_dir: Option<PathBuf>,
883
884    /// WAL configuration (v0.2 feature)
885    pub wal_config: WALConfig,
886
887    /// Compaction configuration (v0.2 feature)
888    pub compaction_config: CompactionConfig,
889
890    /// Schema registry configuration (v0.5 feature)
891    pub schema_registry_config: SchemaRegistryConfig,
892
893    /// Optional directory for system metadata storage (dogfood feature).
894    /// When set, operational metadata (tenants, config, audit) is stored
895    /// using AllSource's own event store rather than an external database.
896    /// Defaults to `{storage_dir}/__system/` when storage_dir is set.
897    pub system_data_dir: Option<PathBuf>,
898
899    /// Name of the default tenant to auto-create on first boot.
900    pub bootstrap_tenant: Option<String>,
901}
902
903impl EventStoreConfig {
904    /// Create config with persistent storage enabled
905    pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
906        Self {
907            storage_dir: Some(storage_dir.into()),
908            ..Self::default()
909        }
910    }
911
912    /// Create config with custom snapshot settings
913    pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
914        Self {
915            snapshot_config,
916            ..Self::default()
917        }
918    }
919
920    /// Create config with WAL enabled
921    pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
922        Self {
923            wal_dir: Some(wal_dir.into()),
924            wal_config,
925            ..Self::default()
926        }
927    }
928
929    /// Create config with both persistence and snapshots
930    pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
931        Self {
932            storage_dir: Some(storage_dir.into()),
933            snapshot_config,
934            ..Self::default()
935        }
936    }
937
938    /// Create production config with all features enabled
939    pub fn production(
940        storage_dir: impl Into<PathBuf>,
941        wal_dir: impl Into<PathBuf>,
942        snapshot_config: SnapshotConfig,
943        wal_config: WALConfig,
944        compaction_config: CompactionConfig,
945    ) -> Self {
946        let storage_dir = storage_dir.into();
947        let system_data_dir = storage_dir.join("__system");
948        Self {
949            storage_dir: Some(storage_dir),
950            snapshot_config,
951            wal_dir: Some(wal_dir.into()),
952            wal_config,
953            compaction_config,
954            system_data_dir: Some(system_data_dir),
955            ..Self::default()
956        }
957    }
958
959    /// Resolve the effective system data directory.
960    ///
961    /// If explicitly set, returns that. Otherwise, derives from storage_dir.
962    /// Returns None if neither is configured (in-memory mode).
963    pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
964        self.system_data_dir
965            .clone()
966            .or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
967    }
968}
969
970#[derive(Debug, serde::Serialize)]
971pub struct StoreStats {
972    pub total_events: usize,
973    pub total_entities: usize,
974    pub total_event_types: usize,
975    pub total_ingested: u64,
976}
977
978/// Information about a stream (entity_id)
979#[derive(Debug, Clone, serde::Serialize)]
980pub struct StreamInfo {
981    /// The stream identifier (entity_id)
982    pub stream_id: String,
983    /// Total number of events in this stream
984    pub event_count: usize,
985    /// Timestamp of the last event in this stream
986    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
987}
988
989/// Information about an event type
990#[derive(Debug, Clone, serde::Serialize)]
991pub struct EventTypeInfo {
992    /// The event type name
993    pub event_type: String,
994    /// Total number of events of this type
995    pub event_count: usize,
996    /// Timestamp of the last event of this type
997    pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
998}
999
1000impl Default for EventStore {
1001    fn default() -> Self {
1002        Self::new()
1003    }
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008    use super::*;
1009    use crate::domain::entities::Event;
1010    use tempfile::TempDir;
1011
1012    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1013        Event::from_strings(
1014            event_type.to_string(),
1015            entity_id.to_string(),
1016            "default".to_string(),
1017            serde_json::json!({"name": "Test", "value": 42}),
1018            None,
1019        )
1020        .unwrap()
1021    }
1022
1023    #[test]
1024    fn test_event_store_new() {
1025        let store = EventStore::new();
1026        assert_eq!(store.stats().total_events, 0);
1027        assert_eq!(store.stats().total_entities, 0);
1028    }
1029
1030    #[test]
1031    fn test_event_store_default() {
1032        let store = EventStore::default();
1033        assert_eq!(store.stats().total_events, 0);
1034    }
1035
1036    #[test]
1037    fn test_ingest_single_event() {
1038        let store = EventStore::new();
1039        let event = create_test_event("entity-1", "user.created");
1040
1041        store.ingest(event).unwrap();
1042
1043        assert_eq!(store.stats().total_events, 1);
1044        assert_eq!(store.stats().total_ingested, 1);
1045    }
1046
1047    #[test]
1048    fn test_ingest_multiple_events() {
1049        let store = EventStore::new();
1050
1051        for i in 0..10 {
1052            let event = create_test_event(&format!("entity-{}", i), "user.created");
1053            store.ingest(event).unwrap();
1054        }
1055
1056        assert_eq!(store.stats().total_events, 10);
1057        assert_eq!(store.stats().total_ingested, 10);
1058    }
1059
1060    #[test]
1061    fn test_query_by_entity_id() {
1062        let store = EventStore::new();
1063
1064        store
1065            .ingest(create_test_event("entity-1", "user.created"))
1066            .unwrap();
1067        store
1068            .ingest(create_test_event("entity-2", "user.created"))
1069            .unwrap();
1070        store
1071            .ingest(create_test_event("entity-1", "user.updated"))
1072            .unwrap();
1073
1074        let results = store
1075            .query(QueryEventsRequest {
1076                entity_id: Some("entity-1".to_string()),
1077                event_type: None,
1078                tenant_id: None,
1079                as_of: None,
1080                since: None,
1081                until: None,
1082                limit: None,
1083            })
1084            .unwrap();
1085
1086        assert_eq!(results.len(), 2);
1087    }
1088
1089    #[test]
1090    fn test_query_by_event_type() {
1091        let store = EventStore::new();
1092
1093        store
1094            .ingest(create_test_event("entity-1", "user.created"))
1095            .unwrap();
1096        store
1097            .ingest(create_test_event("entity-2", "user.updated"))
1098            .unwrap();
1099        store
1100            .ingest(create_test_event("entity-3", "user.created"))
1101            .unwrap();
1102
1103        let results = store
1104            .query(QueryEventsRequest {
1105                entity_id: None,
1106                event_type: Some("user.created".to_string()),
1107                tenant_id: None,
1108                as_of: None,
1109                since: None,
1110                until: None,
1111                limit: None,
1112            })
1113            .unwrap();
1114
1115        assert_eq!(results.len(), 2);
1116    }
1117
1118    #[test]
1119    fn test_query_with_limit() {
1120        let store = EventStore::new();
1121
1122        for i in 0..10 {
1123            let event = create_test_event(&format!("entity-{}", i), "user.created");
1124            store.ingest(event).unwrap();
1125        }
1126
1127        let results = store
1128            .query(QueryEventsRequest {
1129                entity_id: None,
1130                event_type: None,
1131                tenant_id: None,
1132                as_of: None,
1133                since: None,
1134                until: None,
1135                limit: Some(5),
1136            })
1137            .unwrap();
1138
1139        assert_eq!(results.len(), 5);
1140    }
1141
1142    #[test]
1143    fn test_query_empty_store() {
1144        let store = EventStore::new();
1145
1146        let results = store
1147            .query(QueryEventsRequest {
1148                entity_id: Some("non-existent".to_string()),
1149                event_type: None,
1150                tenant_id: None,
1151                as_of: None,
1152                since: None,
1153                until: None,
1154                limit: None,
1155            })
1156            .unwrap();
1157
1158        assert!(results.is_empty());
1159    }
1160
1161    #[test]
1162    fn test_reconstruct_state() {
1163        let store = EventStore::new();
1164
1165        store
1166            .ingest(create_test_event("entity-1", "user.created"))
1167            .unwrap();
1168
1169        let state = store.reconstruct_state("entity-1", None).unwrap();
1170        // The state is wrapped with metadata
1171        assert_eq!(state["current_state"]["name"], "Test");
1172        assert_eq!(state["current_state"]["value"], 42);
1173    }
1174
1175    #[test]
1176    fn test_reconstruct_state_not_found() {
1177        let store = EventStore::new();
1178
1179        let result = store.reconstruct_state("non-existent", None);
1180        assert!(result.is_err());
1181    }
1182
1183    #[test]
1184    fn test_get_snapshot_empty() {
1185        let store = EventStore::new();
1186
1187        let result = store.get_snapshot("non-existent");
1188        // Entity not found error is expected
1189        assert!(result.is_err());
1190    }
1191
1192    #[test]
1193    fn test_create_snapshot() {
1194        let store = EventStore::new();
1195
1196        store
1197            .ingest(create_test_event("entity-1", "user.created"))
1198            .unwrap();
1199
1200        store.create_snapshot("entity-1").unwrap();
1201
1202        // Verify snapshot was created
1203        let snapshot = store.get_snapshot("entity-1").unwrap();
1204        assert!(snapshot != serde_json::json!(null));
1205    }
1206
1207    #[test]
1208    fn test_create_snapshot_entity_not_found() {
1209        let store = EventStore::new();
1210
1211        let result = store.create_snapshot("non-existent");
1212        assert!(result.is_err());
1213    }
1214
1215    #[test]
1216    fn test_websocket_manager() {
1217        let store = EventStore::new();
1218        let manager = store.websocket_manager();
1219        // Manager should be accessible
1220        assert!(Arc::strong_count(&manager) >= 1);
1221    }
1222
1223    #[test]
1224    fn test_snapshot_manager() {
1225        let store = EventStore::new();
1226        let manager = store.snapshot_manager();
1227        assert!(Arc::strong_count(&manager) >= 1);
1228    }
1229
1230    #[test]
1231    fn test_compaction_manager_none() {
1232        let store = EventStore::new();
1233        // Without storage_dir, compaction manager should be None
1234        assert!(store.compaction_manager().is_none());
1235    }
1236
1237    #[test]
1238    fn test_schema_registry() {
1239        let store = EventStore::new();
1240        let registry = store.schema_registry();
1241        assert!(Arc::strong_count(&registry) >= 1);
1242    }
1243
1244    #[test]
1245    fn test_replay_manager() {
1246        let store = EventStore::new();
1247        let manager = store.replay_manager();
1248        assert!(Arc::strong_count(&manager) >= 1);
1249    }
1250
1251    #[test]
1252    fn test_pipeline_manager() {
1253        let store = EventStore::new();
1254        let manager = store.pipeline_manager();
1255        assert!(Arc::strong_count(&manager) >= 1);
1256    }
1257
1258    #[test]
1259    fn test_projection_manager() {
1260        let store = EventStore::new();
1261        let manager = store.projection_manager();
1262        // Built-in projections should be registered
1263        let projections = manager.list_projections();
1264        assert!(projections.len() >= 2); // entity_snapshots and event_counters
1265    }
1266
1267    #[test]
1268    fn test_projection_state_cache() {
1269        let store = EventStore::new();
1270        let cache = store.projection_state_cache();
1271
1272        cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
1273        assert_eq!(cache.len(), 1);
1274
1275        let value = cache.get("test:key").unwrap();
1276        assert_eq!(value["value"], 123);
1277    }
1278
1279    #[test]
1280    fn test_metrics() {
1281        let store = EventStore::new();
1282        let metrics = store.metrics();
1283        assert!(Arc::strong_count(&metrics) >= 1);
1284    }
1285
1286    #[test]
1287    fn test_store_stats() {
1288        let store = EventStore::new();
1289
1290        store
1291            .ingest(create_test_event("entity-1", "user.created"))
1292            .unwrap();
1293        store
1294            .ingest(create_test_event("entity-2", "order.placed"))
1295            .unwrap();
1296
1297        let stats = store.stats();
1298        assert_eq!(stats.total_events, 2);
1299        assert_eq!(stats.total_entities, 2);
1300        assert_eq!(stats.total_event_types, 2);
1301        assert_eq!(stats.total_ingested, 2);
1302    }
1303
1304    #[test]
1305    fn test_event_store_config_default() {
1306        let config = EventStoreConfig::default();
1307        assert!(config.storage_dir.is_none());
1308        assert!(config.wal_dir.is_none());
1309    }
1310
1311    #[test]
1312    fn test_event_store_config_with_persistence() {
1313        let temp_dir = TempDir::new().unwrap();
1314        let config = EventStoreConfig::with_persistence(temp_dir.path());
1315
1316        assert!(config.storage_dir.is_some());
1317        assert!(config.wal_dir.is_none());
1318    }
1319
1320    #[test]
1321    fn test_event_store_config_with_wal() {
1322        let temp_dir = TempDir::new().unwrap();
1323        let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
1324
1325        assert!(config.storage_dir.is_none());
1326        assert!(config.wal_dir.is_some());
1327    }
1328
1329    #[test]
1330    fn test_event_store_config_with_all() {
1331        let temp_dir = TempDir::new().unwrap();
1332        let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
1333
1334        assert!(config.storage_dir.is_some());
1335    }
1336
1337    #[test]
1338    fn test_event_store_config_production() {
1339        let storage_dir = TempDir::new().unwrap();
1340        let wal_dir = TempDir::new().unwrap();
1341        let config = EventStoreConfig::production(
1342            storage_dir.path(),
1343            wal_dir.path(),
1344            SnapshotConfig::default(),
1345            WALConfig::default(),
1346            CompactionConfig::default(),
1347        );
1348
1349        assert!(config.storage_dir.is_some());
1350        assert!(config.wal_dir.is_some());
1351    }
1352
1353    #[test]
1354    fn test_store_stats_serde() {
1355        let stats = StoreStats {
1356            total_events: 100,
1357            total_entities: 50,
1358            total_event_types: 10,
1359            total_ingested: 100,
1360        };
1361
1362        let json = serde_json::to_string(&stats).unwrap();
1363        assert!(json.contains("\"total_events\":100"));
1364        assert!(json.contains("\"total_entities\":50"));
1365    }
1366
1367    #[test]
1368    fn test_query_with_entity_and_type() {
1369        let store = EventStore::new();
1370
1371        store
1372            .ingest(create_test_event("entity-1", "user.created"))
1373            .unwrap();
1374        store
1375            .ingest(create_test_event("entity-1", "user.updated"))
1376            .unwrap();
1377        store
1378            .ingest(create_test_event("entity-2", "user.created"))
1379            .unwrap();
1380
1381        let results = store
1382            .query(QueryEventsRequest {
1383                entity_id: Some("entity-1".to_string()),
1384                event_type: Some("user.created".to_string()),
1385                tenant_id: None,
1386                as_of: None,
1387                since: None,
1388                until: None,
1389                limit: None,
1390            })
1391            .unwrap();
1392
1393        assert_eq!(results.len(), 1);
1394        assert_eq!(results[0].event_type_str(), "user.created");
1395    }
1396
1397    #[test]
1398    fn test_flush_storage_no_storage() {
1399        let store = EventStore::new();
1400        // Without storage, flush should succeed (no-op)
1401        let result = store.flush_storage();
1402        assert!(result.is_ok());
1403    }
1404
1405    #[test]
1406    fn test_state_evolution() {
1407        let store = EventStore::new();
1408
1409        // Initial state
1410        store
1411            .ingest(
1412                Event::from_strings(
1413                    "user.created".to_string(),
1414                    "user-1".to_string(),
1415                    "default".to_string(),
1416                    serde_json::json!({"name": "Alice", "age": 25}),
1417                    None,
1418                )
1419                .unwrap(),
1420            )
1421            .unwrap();
1422
1423        // Update state
1424        store
1425            .ingest(
1426                Event::from_strings(
1427                    "user.updated".to_string(),
1428                    "user-1".to_string(),
1429                    "default".to_string(),
1430                    serde_json::json!({"age": 26}),
1431                    None,
1432                )
1433                .unwrap(),
1434            )
1435            .unwrap();
1436
1437        let state = store.reconstruct_state("user-1", None).unwrap();
1438        // The state is wrapped with metadata
1439        assert_eq!(state["current_state"]["name"], "Alice");
1440        assert_eq!(state["current_state"]["age"], 26);
1441    }
1442
1443    #[test]
1444    fn test_reject_system_event_types() {
1445        let store = EventStore::new();
1446
1447        // System event types should be rejected via user-facing ingestion
1448        let event = Event::reconstruct_from_strings(
1449            uuid::Uuid::new_v4(),
1450            "_system.tenant.created".to_string(),
1451            "_system:tenant:acme".to_string(),
1452            "_system".to_string(),
1453            serde_json::json!({"name": "ACME"}),
1454            chrono::Utc::now(),
1455            None,
1456            1,
1457        );
1458
1459        let result = store.ingest(event);
1460        assert!(result.is_err());
1461        let err = result.unwrap_err();
1462        assert!(
1463            err.to_string().contains("reserved for internal use"),
1464            "Expected system namespace rejection, got: {}",
1465            err
1466        );
1467    }
1468}