pub struct EventStore { /* private fields */ }Expand description
High-performance event store with columnar storage
Implementations§
Source§impl EventStore
impl EventStore
Sourcepub fn with_config(config: EventStoreConfig) -> Self
pub fn with_config(config: EventStoreConfig) -> Self
Create event store with custom configuration
Sourcepub fn ingest_with_expected_version(
&self,
event: &Event,
expected_version: Option<u64>,
) -> Result<u64>
pub fn ingest_with_expected_version( &self, event: &Event, expected_version: Option<u64>, ) -> Result<u64>
Ingest a new event with optional optimistic concurrency check.
If expected_version is Some(v), the write is rejected with
VersionConflict unless the entity’s current version equals v.
The version check and WAL append are atomic (locked together).
Returns the new entity version after the append.
Sourcepub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()>
pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()>
Ingest a batch of events with a single write lock acquisition.
All events are validated first. If any event fails validation, no events are stored (all-or-nothing validation). Events are then written to WAL, indexed, processed through projections, and pushed to the events vector under a single write lock.
Sourcepub fn ingest_replicated(&self, event: &Event) -> Result<()>
pub fn ingest_replicated(&self, event: &Event) -> Result<()>
Ingest a replicated event from the leader (follower mode).
Unlike ingest(), this method:
- Skips WAL writing (the follower’s WalReceiver manages its own local WAL)
- Skips schema validation (the leader already validated)
- Still indexes, processes projections/pipelines, and broadcasts to WebSocket clients
Sourcepub fn get_entity_version(&self, entity_id: &str) -> u64
pub fn get_entity_version(&self, entity_id: &str) -> u64
Get the current version for an entity (number of events appended for it). Returns 0 if the entity has no events.
Sourcepub fn consumer_registry(&self) -> &ConsumerRegistry
pub fn consumer_registry(&self) -> &ConsumerRegistry
Get the consumer registry for durable subscriptions.
Sourcepub fn set_consumer_registry(&mut self, registry: Arc<ConsumerRegistry>)
pub fn set_consumer_registry(&mut self, registry: Arc<ConsumerRegistry>)
Replace the default in-memory consumer registry with a durable one.
Called during startup when system repositories are available, so that consumer cursors survive Core restarts via WAL persistence.
Sourcepub fn total_events(&self) -> usize
pub fn total_events(&self) -> usize
Get the total number of events in the store (used as max offset for consumer ack).
Sourcepub fn events_after_offset(
&self,
offset: u64,
filters: &[String],
limit: usize,
) -> Vec<(u64, Event)>
pub fn events_after_offset( &self, offset: u64, filters: &[String], limit: usize, ) -> Vec<(u64, Event)>
Get events after a given offset, optionally filtered by event type prefixes. Used by consumer polling to fetch unprocessed events.
Sourcepub fn websocket_manager(&self) -> Arc<WebSocketManager>
pub fn websocket_manager(&self) -> Arc<WebSocketManager>
Get the WebSocket manager for this store
Sourcepub fn snapshot_manager(&self) -> Arc<SnapshotManager>
pub fn snapshot_manager(&self) -> Arc<SnapshotManager>
Get the snapshot manager for this store
Sourcepub fn compaction_manager(&self) -> Option<Arc<CompactionManager>>
pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>>
Get the compaction manager for this store
Sourcepub fn schema_registry(&self) -> Arc<SchemaRegistry>
pub fn schema_registry(&self) -> Arc<SchemaRegistry>
Get the schema registry for this store (v0.5 feature)
Sourcepub fn replay_manager(&self) -> Arc<ReplayManager>
pub fn replay_manager(&self) -> Arc<ReplayManager>
Get the replay manager for this store (v0.5 feature)
Sourcepub fn pipeline_manager(&self) -> Arc<PipelineManager>
pub fn pipeline_manager(&self) -> Arc<PipelineManager>
Get the pipeline manager for this store (v0.5 feature)
Sourcepub fn metrics(&self) -> Arc<MetricsRegistry>
pub fn metrics(&self) -> Arc<MetricsRegistry>
Get the metrics registry for this store (v0.6 feature)
Sourcepub fn projection_manager(&self) -> RwLockReadGuard<'_, ProjectionManager>
pub fn projection_manager(&self) -> RwLockReadGuard<'_, ProjectionManager>
Get the projection manager for this store (v0.7 feature)
Sourcepub fn register_projection(&self, projection: Arc<dyn Projection>)
pub fn register_projection(&self, projection: Arc<dyn Projection>)
Register a custom projection at runtime.
The projection will receive all future events via process().
Historical events are not replayed — only events ingested after
registration will be processed by this projection.
See register_projection_with_backfill
to also process historical events.
Sourcepub fn register_projection_with_backfill(
&self,
projection: &Arc<dyn Projection>,
) -> Result<()>
pub fn register_projection_with_backfill( &self, projection: &Arc<dyn Projection>, ) -> Result<()>
Register a custom projection and replay all existing events through it.
After registration, the projection will also receive all future events. Historical events are replayed under a read lock — the projection’s internal state (typically DashMap) handles concurrent access.
Sourcepub fn projection_state_cache(&self) -> Arc<DashMap<String, Value>>
pub fn projection_state_cache(&self) -> Arc<DashMap<String, Value>>
Get the projection state cache for this store (v0.7 feature) Used by Elixir Query Service for state synchronization
Sourcepub fn projection_status(&self) -> Arc<DashMap<String, String>>
pub fn projection_status(&self) -> Arc<DashMap<String, String>>
Get the projection status map (v0.13 feature)
Sourcepub fn geo_index(&self) -> Arc<GeoIndex>
pub fn geo_index(&self) -> Arc<GeoIndex>
Get the webhook registry for this store (v0.11 feature) Geospatial index for coordinate-based queries (v2.0 feature)
Sourcepub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry>
pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry>
Exactly-once processing registry (v2.0 feature)
Sourcepub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager>
pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager>
Schema evolution manager (v2.0 feature)
Sourcepub fn snapshot_events(&self) -> Vec<Event>
pub fn snapshot_events(&self) -> Vec<Event>
Get a read-locked snapshot of all events (for EventQL/GraphQL queries).
Returns an Arc reference to the internal events vec, avoiding a full
clone. The caller holds a read lock for the duration of the Arc
lifetime — prefer short-lived usage.
Sourcepub fn compact_entity_tokens(
&self,
entity_id: &str,
token_event_type: &str,
merged_event: Event,
) -> Result<bool>
pub fn compact_entity_tokens( &self, entity_id: &str, token_event_type: &str, merged_event: Event, ) -> Result<bool>
Compact token events for an entity by replacing matching events with a single merged event. Used by the embedded streaming feature.
Returns Ok(true) if compaction was performed, Ok(false) if no
matching events were found.
Note: The merged event is processed through projections without clearing the removed events’ projection state first. Projections that accumulate state (e.g., counters) should be designed to handle this (the merged event replaces individual tokens, not adds to them).
Crash safety: The WAL append happens after the in-memory swap under the write lock. If the process crashes before the WAL write, no change is persisted — WAL replay restores the pre-compaction state. If the process crashes after the WAL write, replay sees the merged event (and the original tokens, which are idempotent to replay since the merged event supersedes them).
The write lock is held for the swap + WAL write + index rebuild. The index rebuild is O(N) over all events, which is acceptable for embedded workloads but should not be called in hot paths for large stores.
pub fn webhook_registry(&self) -> Arc<WebhookRegistry>
Sourcepub fn set_webhook_tx(&self, tx: UnboundedSender<WebhookDeliveryTask>)
pub fn set_webhook_tx(&self, tx: UnboundedSender<WebhookDeliveryTask>)
Set the channel for async webhook delivery. Called during server startup to wire the delivery worker.
Sourcepub fn flush_storage(&self) -> Result<()>
pub fn flush_storage(&self) -> Result<()>
Manually flush any pending events to persistent storage
Sourcepub fn create_snapshot(&self, entity_id: &str) -> Result<()>
pub fn create_snapshot(&self, entity_id: &str) -> Result<()>
Manually create a snapshot for an entity
Sourcepub fn reset_projection(&self, name: &str) -> Result<usize>
pub fn reset_projection(&self, name: &str) -> Result<usize>
Reset a projection by clearing its state and reprocessing all events
Sourcepub fn get_event_by_id(&self, event_id: &Uuid) -> Result<Option<Event>>
pub fn get_event_by_id(&self, event_id: &Uuid) -> Result<Option<Event>>
Get a single event by its UUID
Sourcepub fn query(&self, request: &QueryEventsRequest) -> Result<Vec<Event>>
pub fn query(&self, request: &QueryEventsRequest) -> Result<Vec<Event>>
Query events based on filters (optimized with indices)
Sourcepub fn reconstruct_state(
&self,
entity_id: &str,
as_of: Option<DateTime<Utc>>,
) -> Result<Value>
pub fn reconstruct_state( &self, entity_id: &str, as_of: Option<DateTime<Utc>>, ) -> Result<Value>
Reconstruct entity state as of a specific timestamp v0.2: Now uses snapshots for fast reconstruction
Sourcepub fn get_snapshot(&self, entity_id: &str) -> Result<Value>
pub fn get_snapshot(&self, entity_id: &str) -> Result<Value>
Get snapshot from projection (faster than reconstructing)
Sourcepub fn stats(&self) -> StoreStats
pub fn stats(&self) -> StoreStats
Get statistics about the event store
Sourcepub fn list_streams(&self) -> Vec<StreamInfo>
pub fn list_streams(&self) -> Vec<StreamInfo>
Get all unique streams (entity_ids) in the store
Sourcepub fn list_event_types(&self) -> Vec<EventTypeInfo>
pub fn list_event_types(&self) -> Vec<EventTypeInfo>
Get all unique event types in the store
Sourcepub fn enable_wal_replication(&self, tx: Sender<WALEntry>)
pub fn enable_wal_replication(&self, tx: Sender<WALEntry>)
Attach a broadcast sender to the WAL for replication.
Thread-safe: can be called through Arc<EventStore> at runtime.
Used during initial setup and during follower → leader promotion.
When set, every WAL append publishes the entry to the broadcast
channel so the WAL shipper can stream it to followers.
Sourcepub fn wal(&self) -> Option<&Arc<WriteAheadLog>>
pub fn wal(&self) -> Option<&Arc<WriteAheadLog>>
Get a reference to the WAL (if configured). Used by the replication catch-up protocol to determine oldest available offset.
Sourcepub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>>
pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>>
Get a reference to the Parquet storage (if configured). Used by the replication catch-up protocol to stream snapshot files to followers.