use crate::{
application::{
dto::QueryEventsRequest,
services::{
exactly_once::{ExactlyOnceConfig, ExactlyOnceRegistry},
pipeline::PipelineManager,
projection::{EntitySnapshotProjection, EventCounterProjection, ProjectionManager},
replay::ReplayManager,
schema::{SchemaRegistry, SchemaRegistryConfig},
schema_evolution::SchemaEvolutionManager,
webhook::WebhookRegistry,
},
},
domain::entities::Event,
error::{AllSourceError, Result},
infrastructure::{
observability::metrics::MetricsRegistry,
persistence::{
compaction::{CompactionConfig, CompactionManager},
index::{EventIndex, IndexEntry},
snapshot::{SnapshotConfig, SnapshotManager, SnapshotType},
storage::ParquetStorage,
wal::{WALConfig, WriteAheadLog},
},
query::geospatial::GeoIndex,
web::websocket::WebSocketManager,
},
};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use parking_lot::RwLock;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::mpsc;
pub struct EventStore {
events: Arc<RwLock<Vec<Event>>>,
index: Arc<EventIndex>,
pub(crate) projections: Arc<RwLock<ProjectionManager>>,
storage: Option<Arc<RwLock<ParquetStorage>>>,
websocket_manager: Arc<WebSocketManager>,
snapshot_manager: Arc<SnapshotManager>,
wal: Option<Arc<WriteAheadLog>>,
compaction_manager: Option<Arc<CompactionManager>>,
schema_registry: Arc<SchemaRegistry>,
replay_manager: Arc<ReplayManager>,
pipeline_manager: Arc<PipelineManager>,
metrics: Arc<MetricsRegistry>,
total_ingested: Arc<RwLock<u64>>,
projection_state_cache: Arc<DashMap<String, serde_json::Value>>,
webhook_registry: Arc<WebhookRegistry>,
webhook_tx: Arc<RwLock<Option<mpsc::UnboundedSender<WebhookDeliveryTask>>>>,
geo_index: Arc<GeoIndex>,
exactly_once: Arc<ExactlyOnceRegistry>,
schema_evolution: Arc<SchemaEvolutionManager>,
}
#[derive(Debug, Clone)]
pub struct WebhookDeliveryTask {
pub webhook: crate::application::services::webhook::WebhookSubscription,
pub event: Event,
}
impl EventStore {
pub fn new() -> Self {
Self::with_config(EventStoreConfig::default())
}
pub fn with_config(config: EventStoreConfig) -> Self {
let mut projections = ProjectionManager::new();
projections.register(Arc::new(EntitySnapshotProjection::new("entity_snapshots")));
projections.register(Arc::new(EventCounterProjection::new("event_counters")));
let storage = config
.storage_dir
.as_ref()
.and_then(|dir| match ParquetStorage::new(dir) {
Ok(storage) => {
tracing::info!("✅ Parquet persistence enabled at: {}", dir.display());
Some(Arc::new(RwLock::new(storage)))
}
Err(e) => {
tracing::error!("❌ Failed to initialize Parquet storage: {}", e);
None
}
});
let wal = config.wal_dir.as_ref().and_then(|dir| {
match WriteAheadLog::new(dir, config.wal_config.clone()) {
Ok(wal) => {
tracing::info!("✅ WAL enabled at: {}", dir.display());
Some(Arc::new(wal))
}
Err(e) => {
tracing::error!("❌ Failed to initialize WAL: {}", e);
None
}
}
});
let compaction_manager = config.storage_dir.as_ref().map(|dir| {
let manager = CompactionManager::new(dir, config.compaction_config.clone());
Arc::new(manager)
});
let schema_registry = Arc::new(SchemaRegistry::new(config.schema_registry_config.clone()));
tracing::info!("✅ Schema registry enabled");
let replay_manager = Arc::new(ReplayManager::new());
tracing::info!("✅ Replay manager enabled");
let pipeline_manager = Arc::new(PipelineManager::new());
tracing::info!("✅ Pipeline manager enabled");
let metrics = MetricsRegistry::new();
tracing::info!("✅ Prometheus metrics registry initialized");
let projection_state_cache = Arc::new(DashMap::new());
tracing::info!("✅ Projection state cache initialized");
let webhook_registry = Arc::new(WebhookRegistry::new());
tracing::info!("✅ Webhook registry initialized");
let store = Self {
events: Arc::new(RwLock::new(Vec::new())),
index: Arc::new(EventIndex::new()),
projections: Arc::new(RwLock::new(projections)),
storage,
websocket_manager: Arc::new(WebSocketManager::new()),
snapshot_manager: Arc::new(SnapshotManager::new(config.snapshot_config)),
wal,
compaction_manager,
schema_registry,
replay_manager,
pipeline_manager,
metrics,
total_ingested: Arc::new(RwLock::new(0)),
projection_state_cache,
webhook_registry,
webhook_tx: Arc::new(RwLock::new(None)),
geo_index: Arc::new(GeoIndex::new()),
exactly_once: Arc::new(ExactlyOnceRegistry::new(ExactlyOnceConfig::default())),
schema_evolution: Arc::new(SchemaEvolutionManager::new()),
};
let mut wal_recovered = false;
if let Some(ref wal) = store.wal {
match wal.recover() {
Ok(recovered_events) if !recovered_events.is_empty() => {
tracing::info!(
"🔄 Recovering {} events from WAL...",
recovered_events.len()
);
for event in recovered_events {
let offset = store.events.read().len();
if let Err(e) = store.index.index_event(
event.id,
event.entity_id_str(),
event.event_type_str(),
event.timestamp,
offset,
) {
tracing::error!("Failed to re-index WAL event {}: {}", event.id, e);
}
if let Err(e) = store.projections.read().process_event(&event) {
tracing::error!("Failed to re-process WAL event {}: {}", event.id, e);
}
store.events.write().push(event);
}
let total = store.events.read().len();
*store.total_ingested.write() = total as u64;
tracing::info!("✅ Successfully recovered {} events from WAL", total);
if store.storage.is_some() {
tracing::info!("📸 Checkpointing WAL to Parquet storage...");
if let Err(e) = store.flush_storage() {
tracing::error!("Failed to checkpoint to Parquet: {}", e);
} else if let Err(e) = wal.truncate() {
tracing::error!("Failed to truncate WAL after checkpoint: {}", e);
} else {
tracing::info!("✅ WAL checkpointed and truncated");
}
}
wal_recovered = true;
}
Ok(_) => {
tracing::debug!("No events to recover from WAL");
}
Err(e) => {
tracing::error!("❌ WAL recovery failed: {}", e);
}
}
}
if !wal_recovered
&& let Some(ref storage) = store.storage
&& let Ok(persisted_events) = storage.read().load_all_events()
{
tracing::info!("📂 Loading {} persisted events...", persisted_events.len());
for event in persisted_events {
let offset = store.events.read().len();
if let Err(e) = store.index.index_event(
event.id,
event.entity_id_str(),
event.event_type_str(),
event.timestamp,
offset,
) {
tracing::error!("Failed to re-index event {}: {}", event.id, e);
}
if let Err(e) = store.projections.read().process_event(&event) {
tracing::error!("Failed to re-process event {}: {}", event.id, e);
}
store.events.write().push(event);
}
let total = store.events.read().len();
*store.total_ingested.write() = total as u64;
tracing::info!("✅ Successfully loaded {} events from storage", total);
}
store
}
pub fn ingest(&self, event: Event) -> Result<()> {
let timer = self.metrics.ingestion_duration_seconds.start_timer();
let validation_result = self.validate_event(&event);
if let Err(e) = validation_result {
self.metrics.ingestion_errors_total.inc();
timer.observe_duration();
return Err(e);
}
if let Some(ref wal) = self.wal
&& let Err(e) = wal.append(event.clone())
{
self.metrics.ingestion_errors_total.inc();
timer.observe_duration();
return Err(e);
}
let mut events = self.events.write();
let offset = events.len();
self.index.index_event(
event.id,
event.entity_id_str(),
event.event_type_str(),
event.timestamp,
offset,
)?;
let projections = self.projections.read();
projections.process_event(&event)?;
drop(projections);
let pipeline_results = self.pipeline_manager.process_event(&event);
if !pipeline_results.is_empty() {
tracing::debug!(
"Event {} processed by {} pipeline(s)",
event.id,
pipeline_results.len()
);
for (pipeline_id, result) in pipeline_results {
tracing::trace!("Pipeline {} result: {:?}", pipeline_id, result);
}
}
if let Some(ref storage) = self.storage {
let storage = storage.read();
storage.append_event(event.clone())?;
}
events.push(event.clone());
let total_events = events.len();
drop(events);
self.websocket_manager
.broadcast_event(Arc::new(event.clone()));
self.dispatch_webhooks(&event);
self.geo_index.index_event(&event);
self.schema_evolution
.analyze_event(event.event_type_str(), &event.payload);
self.check_auto_snapshot(event.entity_id_str(), &event);
self.metrics.events_ingested_total.inc();
self.metrics
.events_ingested_by_type
.with_label_values(&[event.event_type_str()])
.inc();
self.metrics.storage_events_total.set(total_events as i64);
let mut total = self.total_ingested.write();
*total += 1;
timer.observe_duration();
tracing::debug!("Event ingested: {} (offset: {})", event.id, offset);
Ok(())
}
pub fn ingest_replicated(&self, event: Event) -> Result<()> {
let timer = self.metrics.ingestion_duration_seconds.start_timer();
let mut events = self.events.write();
let offset = events.len();
self.index.index_event(
event.id,
event.entity_id_str(),
event.event_type_str(),
event.timestamp,
offset,
)?;
let projections = self.projections.read();
projections.process_event(&event)?;
drop(projections);
let pipeline_results = self.pipeline_manager.process_event(&event);
if !pipeline_results.is_empty() {
tracing::debug!(
"Replicated event {} processed by {} pipeline(s)",
event.id,
pipeline_results.len()
);
}
events.push(event.clone());
let total_events = events.len();
drop(events);
self.websocket_manager
.broadcast_event(Arc::new(event.clone()));
self.metrics.events_ingested_total.inc();
self.metrics
.events_ingested_by_type
.with_label_values(&[event.event_type_str()])
.inc();
self.metrics.storage_events_total.set(total_events as i64);
let mut total = self.total_ingested.write();
*total += 1;
timer.observe_duration();
tracing::debug!(
"Replicated event ingested: {} (offset: {})",
event.id,
offset
);
Ok(())
}
pub fn websocket_manager(&self) -> Arc<WebSocketManager> {
Arc::clone(&self.websocket_manager)
}
pub fn snapshot_manager(&self) -> Arc<SnapshotManager> {
Arc::clone(&self.snapshot_manager)
}
pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>> {
self.compaction_manager.as_ref().map(Arc::clone)
}
pub fn schema_registry(&self) -> Arc<SchemaRegistry> {
Arc::clone(&self.schema_registry)
}
pub fn replay_manager(&self) -> Arc<ReplayManager> {
Arc::clone(&self.replay_manager)
}
pub fn pipeline_manager(&self) -> Arc<PipelineManager> {
Arc::clone(&self.pipeline_manager)
}
pub fn metrics(&self) -> Arc<MetricsRegistry> {
Arc::clone(&self.metrics)
}
pub fn projection_manager(&self) -> parking_lot::RwLockReadGuard<'_, ProjectionManager> {
self.projections.read()
}
pub fn projection_state_cache(&self) -> Arc<DashMap<String, serde_json::Value>> {
Arc::clone(&self.projection_state_cache)
}
pub fn geo_index(&self) -> Arc<GeoIndex> {
self.geo_index.clone()
}
pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> {
self.exactly_once.clone()
}
pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> {
self.schema_evolution.clone()
}
pub fn snapshot_events(&self) -> Vec<Event> {
self.events.read().clone()
}
pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
Arc::clone(&self.webhook_registry)
}
pub fn set_webhook_tx(&self, tx: mpsc::UnboundedSender<WebhookDeliveryTask>) {
*self.webhook_tx.write() = Some(tx);
tracing::info!("Webhook delivery channel connected");
}
fn dispatch_webhooks(&self, event: &Event) {
let matching = self.webhook_registry.find_matching(event);
if matching.is_empty() {
return;
}
let tx_guard = self.webhook_tx.read();
if let Some(ref tx) = *tx_guard {
for webhook in matching {
let task = WebhookDeliveryTask {
webhook,
event: event.clone(),
};
if let Err(e) = tx.send(task) {
tracing::warn!("Failed to queue webhook delivery: {}", e);
}
}
}
}
pub fn flush_storage(&self) -> Result<()> {
if let Some(ref storage) = self.storage {
let storage = storage.read();
storage.flush()?;
tracing::info!("✅ Flushed events to persistent storage");
}
Ok(())
}
pub fn create_snapshot(&self, entity_id: &str) -> Result<()> {
let events = self.query(QueryEventsRequest {
entity_id: Some(entity_id.to_string()),
event_type: None,
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
})?;
if events.is_empty() {
return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
}
let mut state = serde_json::json!({});
for event in &events {
if let serde_json::Value::Object(ref mut state_map) = state
&& let serde_json::Value::Object(ref payload_map) = event.payload
{
for (key, value) in payload_map {
state_map.insert(key.clone(), value.clone());
}
}
}
let last_event = events.last().unwrap();
self.snapshot_manager.create_snapshot(
entity_id.to_string(),
state,
last_event.timestamp,
events.len(),
SnapshotType::Manual,
)?;
Ok(())
}
fn check_auto_snapshot(&self, entity_id: &str, event: &Event) {
let entity_event_count = self
.index
.get_by_entity(entity_id)
.map(|entries| entries.len())
.unwrap_or(0);
if self.snapshot_manager.should_create_snapshot(
entity_id,
entity_event_count,
event.timestamp,
) {
if let Err(e) = self.create_snapshot(entity_id) {
tracing::warn!(
"Failed to create automatic snapshot for {}: {}",
entity_id,
e
);
}
}
}
fn validate_event(&self, event: &Event) -> Result<()> {
if event.entity_id_str().is_empty() {
return Err(AllSourceError::ValidationError(
"entity_id cannot be empty".to_string(),
));
}
if event.event_type_str().is_empty() {
return Err(AllSourceError::ValidationError(
"event_type cannot be empty".to_string(),
));
}
if event.event_type().is_system() {
return Err(AllSourceError::ValidationError(
"Event types starting with '_system.' are reserved for internal use".to_string(),
));
}
Ok(())
}
pub fn reset_projection(&self, name: &str) -> Result<usize> {
let projection_manager = self.projections.read();
let projection = projection_manager.get_projection(name).ok_or_else(|| {
AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
})?;
projection.clear();
let prefix = format!("{name}:");
let keys_to_remove: Vec<String> = self
.projection_state_cache
.iter()
.filter(|entry| entry.key().starts_with(&prefix))
.map(|entry| entry.key().clone())
.collect();
for key in keys_to_remove {
self.projection_state_cache.remove(&key);
}
let events = self.events.read();
let mut reprocessed = 0usize;
for event in events.iter() {
if projection.process(event).is_ok() {
reprocessed += 1;
}
}
Ok(reprocessed)
}
pub fn get_event_by_id(&self, event_id: &uuid::Uuid) -> Result<Option<Event>> {
if let Some(offset) = self.index.get_by_id(event_id) {
let events = self.events.read();
Ok(events.get(offset).cloned())
} else {
Ok(None)
}
}
pub fn query(&self, request: QueryEventsRequest) -> Result<Vec<Event>> {
let query_type = if request.entity_id.is_some() {
"entity"
} else if request.event_type.is_some() {
"type"
} else {
"full_scan"
};
let timer = self
.metrics
.query_duration_seconds
.with_label_values(&[query_type])
.start_timer();
self.metrics
.queries_total
.with_label_values(&[query_type])
.inc();
let events = self.events.read();
let offsets: Vec<usize> = if let Some(entity_id) = &request.entity_id {
self.index
.get_by_entity(entity_id)
.map(|entries| self.filter_entries(entries, &request))
.unwrap_or_default()
} else if let Some(event_type) = &request.event_type {
self.index
.get_by_type(event_type)
.map(|entries| self.filter_entries(entries, &request))
.unwrap_or_default()
} else {
(0..events.len()).collect()
};
let mut results: Vec<Event> = offsets
.iter()
.filter_map(|&offset| events.get(offset).cloned())
.filter(|event| self.apply_filters(event, &request))
.collect();
results.sort_by_key(|x| x.timestamp);
if let Some(limit) = request.limit {
results.truncate(limit);
}
self.metrics
.query_results_total
.with_label_values(&[query_type])
.inc_by(results.len() as u64);
timer.observe_duration();
Ok(results)
}
fn filter_entries(&self, entries: Vec<IndexEntry>, request: &QueryEventsRequest) -> Vec<usize> {
entries
.into_iter()
.filter(|entry| {
if let Some(as_of) = request.as_of
&& entry.timestamp > as_of
{
return false;
}
if let Some(since) = request.since
&& entry.timestamp < since
{
return false;
}
if let Some(until) = request.until
&& entry.timestamp > until
{
return false;
}
true
})
.map(|entry| entry.offset)
.collect()
}
fn apply_filters(&self, event: &Event, request: &QueryEventsRequest) -> bool {
if request.entity_id.is_some()
&& let Some(ref event_type) = request.event_type
&& event.event_type_str() != event_type
{
return false;
}
true
}
pub fn reconstruct_state(
&self,
entity_id: &str,
as_of: Option<DateTime<Utc>>,
) -> Result<serde_json::Value> {
let (merged_state, since_timestamp) = if let Some(as_of_time) = as_of {
if let Some(snapshot) = self
.snapshot_manager
.get_snapshot_as_of(entity_id, as_of_time)
{
tracing::debug!(
"Using snapshot from {} for entity {} (saved {} events)",
snapshot.as_of,
entity_id,
snapshot.event_count
);
(snapshot.state.clone(), Some(snapshot.as_of))
} else {
(serde_json::json!({}), None)
}
} else {
if let Some(snapshot) = self.snapshot_manager.get_latest_snapshot(entity_id) {
tracing::debug!(
"Using latest snapshot from {} for entity {}",
snapshot.as_of,
entity_id
);
(snapshot.state.clone(), Some(snapshot.as_of))
} else {
(serde_json::json!({}), None)
}
};
let events = self.query(QueryEventsRequest {
entity_id: Some(entity_id.to_string()),
event_type: None,
tenant_id: None,
as_of,
since: since_timestamp,
until: None,
limit: None,
})?;
if events.is_empty() && since_timestamp.is_none() {
return Err(AllSourceError::EntityNotFound(entity_id.to_string()));
}
let mut merged_state = merged_state;
for event in &events {
if let serde_json::Value::Object(ref mut state_map) = merged_state
&& let serde_json::Value::Object(ref payload_map) = event.payload
{
for (key, value) in payload_map {
state_map.insert(key.clone(), value.clone());
}
}
}
let state = serde_json::json!({
"entity_id": entity_id,
"last_updated": events.last().map(|e| e.timestamp),
"event_count": events.len(),
"as_of": as_of,
"current_state": merged_state,
"history": events.iter().map(|e| {
serde_json::json!({
"event_id": e.id,
"type": e.event_type,
"timestamp": e.timestamp,
"payload": e.payload
})
}).collect::<Vec<_>>()
});
Ok(state)
}
pub fn get_snapshot(&self, entity_id: &str) -> Result<serde_json::Value> {
let projections = self.projections.read();
if let Some(snapshot_projection) = projections.get_projection("entity_snapshots")
&& let Some(state) = snapshot_projection.get_state(entity_id)
{
return Ok(serde_json::json!({
"entity_id": entity_id,
"snapshot": state,
"from_projection": "entity_snapshots"
}));
}
Err(AllSourceError::EntityNotFound(entity_id.to_string()))
}
pub fn stats(&self) -> StoreStats {
let events = self.events.read();
let index_stats = self.index.stats();
StoreStats {
total_events: events.len(),
total_entities: index_stats.total_entities,
total_event_types: index_stats.total_event_types,
total_ingested: *self.total_ingested.read(),
}
}
pub fn list_streams(&self) -> Vec<StreamInfo> {
self.index
.get_all_entities()
.into_iter()
.map(|entity_id| {
let event_count = self
.index
.get_by_entity(&entity_id)
.map(|entries| entries.len())
.unwrap_or(0);
let last_event_at = self
.index
.get_by_entity(&entity_id)
.and_then(|entries| entries.last().map(|e| e.timestamp));
StreamInfo {
stream_id: entity_id,
event_count,
last_event_at,
}
})
.collect()
}
pub fn list_event_types(&self) -> Vec<EventTypeInfo> {
self.index
.get_all_types()
.into_iter()
.map(|event_type| {
let event_count = self
.index
.get_by_type(&event_type)
.map(|entries| entries.len())
.unwrap_or(0);
let last_event_at = self
.index
.get_by_type(&event_type)
.and_then(|entries| entries.last().map(|e| e.timestamp));
EventTypeInfo {
event_type,
event_count,
last_event_at,
}
})
.collect()
}
pub fn enable_wal_replication(
&self,
tx: tokio::sync::broadcast::Sender<crate::infrastructure::persistence::wal::WALEntry>,
) {
if let Some(ref wal_arc) = self.wal {
wal_arc.set_replication_tx(tx);
tracing::info!("WAL replication broadcast enabled");
} else {
tracing::warn!("Cannot enable WAL replication: WAL is not configured");
}
}
pub fn wal(&self) -> Option<&Arc<WriteAheadLog>> {
self.wal.as_ref()
}
pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>> {
self.storage.as_ref()
}
}
#[derive(Debug, Clone, Default)]
pub struct EventStoreConfig {
pub storage_dir: Option<PathBuf>,
pub snapshot_config: SnapshotConfig,
pub wal_dir: Option<PathBuf>,
pub wal_config: WALConfig,
pub compaction_config: CompactionConfig,
pub schema_registry_config: SchemaRegistryConfig,
pub system_data_dir: Option<PathBuf>,
pub bootstrap_tenant: Option<String>,
}
impl EventStoreConfig {
pub fn with_persistence(storage_dir: impl Into<PathBuf>) -> Self {
Self {
storage_dir: Some(storage_dir.into()),
..Self::default()
}
}
pub fn with_snapshots(snapshot_config: SnapshotConfig) -> Self {
Self {
snapshot_config,
..Self::default()
}
}
pub fn with_wal(wal_dir: impl Into<PathBuf>, wal_config: WALConfig) -> Self {
Self {
wal_dir: Some(wal_dir.into()),
wal_config,
..Self::default()
}
}
pub fn with_all(storage_dir: impl Into<PathBuf>, snapshot_config: SnapshotConfig) -> Self {
Self {
storage_dir: Some(storage_dir.into()),
snapshot_config,
..Self::default()
}
}
pub fn production(
storage_dir: impl Into<PathBuf>,
wal_dir: impl Into<PathBuf>,
snapshot_config: SnapshotConfig,
wal_config: WALConfig,
compaction_config: CompactionConfig,
) -> Self {
let storage_dir = storage_dir.into();
let system_data_dir = storage_dir.join("__system");
Self {
storage_dir: Some(storage_dir),
snapshot_config,
wal_dir: Some(wal_dir.into()),
wal_config,
compaction_config,
system_data_dir: Some(system_data_dir),
..Self::default()
}
}
pub fn effective_system_data_dir(&self) -> Option<PathBuf> {
self.system_data_dir
.clone()
.or_else(|| self.storage_dir.as_ref().map(|d| d.join("__system")))
}
pub fn from_env() -> (Self, &'static str) {
Self::from_env_vars(
std::env::var("ALLSOURCE_DATA_DIR")
.ok()
.filter(|s| !s.is_empty()),
std::env::var("ALLSOURCE_STORAGE_DIR")
.ok()
.filter(|s| !s.is_empty()),
std::env::var("ALLSOURCE_WAL_DIR")
.ok()
.filter(|s| !s.is_empty()),
std::env::var("ALLSOURCE_WAL_ENABLED").ok(),
)
}
pub fn from_env_vars(
data_dir: Option<String>,
explicit_storage_dir: Option<String>,
explicit_wal_dir: Option<String>,
wal_enabled_var: Option<String>,
) -> (Self, &'static str) {
let data_dir = data_dir.filter(|s| !s.is_empty());
let storage_dir = explicit_storage_dir
.filter(|s| !s.is_empty())
.or_else(|| data_dir.as_ref().map(|d| format!("{}/storage", d)));
let wal_dir = explicit_wal_dir
.filter(|s| !s.is_empty())
.or_else(|| data_dir.as_ref().map(|d| format!("{}/wal", d)));
let wal_enabled = wal_enabled_var.map(|v| v == "true").unwrap_or(true);
match (&storage_dir, &wal_dir) {
(Some(sd), Some(wd)) if wal_enabled => {
let config = Self::production(
sd,
wd,
SnapshotConfig::default(),
WALConfig::default(),
CompactionConfig::default(),
);
(config, "wal+parquet")
}
(Some(sd), _) => {
let config = Self::with_persistence(sd);
(config, "parquet-only")
}
(_, Some(wd)) if wal_enabled => {
let config = Self::with_wal(wd, WALConfig::default());
(config, "wal-only")
}
_ => (Self::default(), "in-memory"),
}
}
}
#[derive(Debug, serde::Serialize)]
pub struct StoreStats {
pub total_events: usize,
pub total_entities: usize,
pub total_event_types: usize,
pub total_ingested: u64,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct StreamInfo {
pub stream_id: String,
pub event_count: usize,
pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct EventTypeInfo {
pub event_type: String,
pub event_count: usize,
pub last_event_at: Option<chrono::DateTime<chrono::Utc>>,
}
impl Default for EventStore {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::entities::Event;
use tempfile::TempDir;
fn create_test_event(entity_id: &str, event_type: &str) -> Event {
Event::from_strings(
event_type.to_string(),
entity_id.to_string(),
"default".to_string(),
serde_json::json!({"name": "Test", "value": 42}),
None,
)
.unwrap()
}
#[test]
fn test_event_store_new() {
let store = EventStore::new();
assert_eq!(store.stats().total_events, 0);
assert_eq!(store.stats().total_entities, 0);
}
#[test]
fn test_event_store_default() {
let store = EventStore::default();
assert_eq!(store.stats().total_events, 0);
}
#[test]
fn test_ingest_single_event() {
let store = EventStore::new();
let event = create_test_event("entity-1", "user.created");
store.ingest(event).unwrap();
assert_eq!(store.stats().total_events, 1);
assert_eq!(store.stats().total_ingested, 1);
}
#[test]
fn test_ingest_multiple_events() {
let store = EventStore::new();
for i in 0..10 {
let event = create_test_event(&format!("entity-{}", i), "user.created");
store.ingest(event).unwrap();
}
assert_eq!(store.stats().total_events, 10);
assert_eq!(store.stats().total_ingested, 10);
}
#[test]
fn test_query_by_entity_id() {
let store = EventStore::new();
store
.ingest(create_test_event("entity-1", "user.created"))
.unwrap();
store
.ingest(create_test_event("entity-2", "user.created"))
.unwrap();
store
.ingest(create_test_event("entity-1", "user.updated"))
.unwrap();
let results = store
.query(QueryEventsRequest {
entity_id: Some("entity-1".to_string()),
event_type: None,
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
})
.unwrap();
assert_eq!(results.len(), 2);
}
#[test]
fn test_query_by_event_type() {
let store = EventStore::new();
store
.ingest(create_test_event("entity-1", "user.created"))
.unwrap();
store
.ingest(create_test_event("entity-2", "user.updated"))
.unwrap();
store
.ingest(create_test_event("entity-3", "user.created"))
.unwrap();
let results = store
.query(QueryEventsRequest {
entity_id: None,
event_type: Some("user.created".to_string()),
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
})
.unwrap();
assert_eq!(results.len(), 2);
}
#[test]
fn test_query_with_limit() {
let store = EventStore::new();
for i in 0..10 {
let event = create_test_event(&format!("entity-{}", i), "user.created");
store.ingest(event).unwrap();
}
let results = store
.query(QueryEventsRequest {
entity_id: None,
event_type: None,
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: Some(5),
})
.unwrap();
assert_eq!(results.len(), 5);
}
#[test]
fn test_query_empty_store() {
let store = EventStore::new();
let results = store
.query(QueryEventsRequest {
entity_id: Some("non-existent".to_string()),
event_type: None,
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
})
.unwrap();
assert!(results.is_empty());
}
#[test]
fn test_reconstruct_state() {
let store = EventStore::new();
store
.ingest(create_test_event("entity-1", "user.created"))
.unwrap();
let state = store.reconstruct_state("entity-1", None).unwrap();
assert_eq!(state["current_state"]["name"], "Test");
assert_eq!(state["current_state"]["value"], 42);
}
#[test]
fn test_reconstruct_state_not_found() {
let store = EventStore::new();
let result = store.reconstruct_state("non-existent", None);
assert!(result.is_err());
}
#[test]
fn test_get_snapshot_empty() {
let store = EventStore::new();
let result = store.get_snapshot("non-existent");
assert!(result.is_err());
}
#[test]
fn test_create_snapshot() {
let store = EventStore::new();
store
.ingest(create_test_event("entity-1", "user.created"))
.unwrap();
store.create_snapshot("entity-1").unwrap();
let snapshot = store.get_snapshot("entity-1").unwrap();
assert!(snapshot != serde_json::json!(null));
}
#[test]
fn test_create_snapshot_entity_not_found() {
let store = EventStore::new();
let result = store.create_snapshot("non-existent");
assert!(result.is_err());
}
#[test]
fn test_websocket_manager() {
let store = EventStore::new();
let manager = store.websocket_manager();
assert!(Arc::strong_count(&manager) >= 1);
}
#[test]
fn test_snapshot_manager() {
let store = EventStore::new();
let manager = store.snapshot_manager();
assert!(Arc::strong_count(&manager) >= 1);
}
#[test]
fn test_compaction_manager_none() {
let store = EventStore::new();
assert!(store.compaction_manager().is_none());
}
#[test]
fn test_schema_registry() {
let store = EventStore::new();
let registry = store.schema_registry();
assert!(Arc::strong_count(®istry) >= 1);
}
#[test]
fn test_replay_manager() {
let store = EventStore::new();
let manager = store.replay_manager();
assert!(Arc::strong_count(&manager) >= 1);
}
#[test]
fn test_pipeline_manager() {
let store = EventStore::new();
let manager = store.pipeline_manager();
assert!(Arc::strong_count(&manager) >= 1);
}
#[test]
fn test_projection_manager() {
let store = EventStore::new();
let manager = store.projection_manager();
let projections = manager.list_projections();
assert!(projections.len() >= 2); }
#[test]
fn test_projection_state_cache() {
let store = EventStore::new();
let cache = store.projection_state_cache();
cache.insert("test:key".to_string(), serde_json::json!({"value": 123}));
assert_eq!(cache.len(), 1);
let value = cache.get("test:key").unwrap();
assert_eq!(value["value"], 123);
}
#[test]
fn test_metrics() {
let store = EventStore::new();
let metrics = store.metrics();
assert!(Arc::strong_count(&metrics) >= 1);
}
#[test]
fn test_store_stats() {
let store = EventStore::new();
store
.ingest(create_test_event("entity-1", "user.created"))
.unwrap();
store
.ingest(create_test_event("entity-2", "order.placed"))
.unwrap();
let stats = store.stats();
assert_eq!(stats.total_events, 2);
assert_eq!(stats.total_entities, 2);
assert_eq!(stats.total_event_types, 2);
assert_eq!(stats.total_ingested, 2);
}
#[test]
fn test_event_store_config_default() {
let config = EventStoreConfig::default();
assert!(config.storage_dir.is_none());
assert!(config.wal_dir.is_none());
}
#[test]
fn test_event_store_config_with_persistence() {
let temp_dir = TempDir::new().unwrap();
let config = EventStoreConfig::with_persistence(temp_dir.path());
assert!(config.storage_dir.is_some());
assert!(config.wal_dir.is_none());
}
#[test]
fn test_event_store_config_with_wal() {
let temp_dir = TempDir::new().unwrap();
let config = EventStoreConfig::with_wal(temp_dir.path(), WALConfig::default());
assert!(config.storage_dir.is_none());
assert!(config.wal_dir.is_some());
}
#[test]
fn test_event_store_config_with_all() {
let temp_dir = TempDir::new().unwrap();
let config = EventStoreConfig::with_all(temp_dir.path(), SnapshotConfig::default());
assert!(config.storage_dir.is_some());
}
#[test]
fn test_event_store_config_production() {
let storage_dir = TempDir::new().unwrap();
let wal_dir = TempDir::new().unwrap();
let config = EventStoreConfig::production(
storage_dir.path(),
wal_dir.path(),
SnapshotConfig::default(),
WALConfig::default(),
CompactionConfig::default(),
);
assert!(config.storage_dir.is_some());
assert!(config.wal_dir.is_some());
}
#[test]
fn test_from_env_vars_data_dir_enables_full_persistence() {
let (config, mode) =
EventStoreConfig::from_env_vars(Some("/app/data".to_string()), None, None, None);
assert_eq!(mode, "wal+parquet");
assert_eq!(
config.storage_dir.unwrap().to_str().unwrap(),
"/app/data/storage"
);
assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/app/data/wal");
}
#[test]
fn test_from_env_vars_explicit_dirs() {
let (config, mode) = EventStoreConfig::from_env_vars(
None,
Some("/custom/storage".to_string()),
Some("/custom/wal".to_string()),
None,
);
assert_eq!(mode, "wal+parquet");
assert_eq!(
config.storage_dir.unwrap().to_str().unwrap(),
"/custom/storage"
);
assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/custom/wal");
}
#[test]
fn test_from_env_vars_wal_disabled() {
let (config, mode) = EventStoreConfig::from_env_vars(
Some("/app/data".to_string()),
None,
None,
Some("false".to_string()),
);
assert_eq!(mode, "parquet-only");
assert!(config.storage_dir.is_some());
assert!(config.wal_dir.is_none());
}
#[test]
fn test_from_env_vars_no_dirs_is_in_memory() {
let (config, mode) = EventStoreConfig::from_env_vars(None, None, None, None);
assert_eq!(mode, "in-memory");
assert!(config.storage_dir.is_none());
assert!(config.wal_dir.is_none());
}
#[test]
fn test_from_env_vars_empty_strings_treated_as_none() {
let (_, mode) = EventStoreConfig::from_env_vars(
Some("".to_string()),
Some("".to_string()),
Some("".to_string()),
None,
);
assert_eq!(mode, "in-memory");
}
#[test]
fn test_from_env_vars_explicit_overrides_data_dir() {
let (config, mode) = EventStoreConfig::from_env_vars(
Some("/app/data".to_string()),
Some("/override/storage".to_string()),
Some("/override/wal".to_string()),
None,
);
assert_eq!(mode, "wal+parquet");
assert_eq!(
config.storage_dir.unwrap().to_str().unwrap(),
"/override/storage"
);
assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/override/wal");
}
#[test]
fn test_from_env_vars_wal_only() {
let (config, mode) =
EventStoreConfig::from_env_vars(None, None, Some("/wal/only".to_string()), None);
assert_eq!(mode, "wal-only");
assert!(config.storage_dir.is_none());
assert_eq!(config.wal_dir.unwrap().to_str().unwrap(), "/wal/only");
}
#[test]
fn test_store_stats_serde() {
let stats = StoreStats {
total_events: 100,
total_entities: 50,
total_event_types: 10,
total_ingested: 100,
};
let json = serde_json::to_string(&stats).unwrap();
assert!(json.contains("\"total_events\":100"));
assert!(json.contains("\"total_entities\":50"));
}
#[test]
fn test_query_with_entity_and_type() {
let store = EventStore::new();
store
.ingest(create_test_event("entity-1", "user.created"))
.unwrap();
store
.ingest(create_test_event("entity-1", "user.updated"))
.unwrap();
store
.ingest(create_test_event("entity-2", "user.created"))
.unwrap();
let results = store
.query(QueryEventsRequest {
entity_id: Some("entity-1".to_string()),
event_type: Some("user.created".to_string()),
tenant_id: None,
as_of: None,
since: None,
until: None,
limit: None,
})
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].event_type_str(), "user.created");
}
#[test]
fn test_flush_storage_no_storage() {
let store = EventStore::new();
let result = store.flush_storage();
assert!(result.is_ok());
}
#[test]
fn test_state_evolution() {
let store = EventStore::new();
store
.ingest(
Event::from_strings(
"user.created".to_string(),
"user-1".to_string(),
"default".to_string(),
serde_json::json!({"name": "Alice", "age": 25}),
None,
)
.unwrap(),
)
.unwrap();
store
.ingest(
Event::from_strings(
"user.updated".to_string(),
"user-1".to_string(),
"default".to_string(),
serde_json::json!({"age": 26}),
None,
)
.unwrap(),
)
.unwrap();
let state = store.reconstruct_state("user-1", None).unwrap();
assert_eq!(state["current_state"]["name"], "Alice");
assert_eq!(state["current_state"]["age"], 26);
}
#[test]
fn test_reject_system_event_types() {
let store = EventStore::new();
let event = Event::reconstruct_from_strings(
uuid::Uuid::new_v4(),
"_system.tenant.created".to_string(),
"_system:tenant:acme".to_string(),
"_system".to_string(),
serde_json::json!({"name": "ACME"}),
chrono::Utc::now(),
None,
1,
);
let result = store.ingest(event);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.to_string().contains("reserved for internal use"),
"Expected system namespace rejection, got: {}",
err
);
}
}