pub struct EventStore { /* private fields */ }Expand description
High-performance event store with columnar storage
Implementations§
Source§impl EventStore
impl EventStore
Sourcepub fn with_config(config: EventStoreConfig) -> Self
pub fn with_config(config: EventStoreConfig) -> Self
Create event store with custom configuration
Sourcepub fn ingest_with_expected_version(
&self,
event: &Event,
expected_version: Option<u64>,
) -> Result<u64>
pub fn ingest_with_expected_version( &self, event: &Event, expected_version: Option<u64>, ) -> Result<u64>
Ingest a new event with optional optimistic concurrency check.
If expected_version is Some(v), the write is rejected with
VersionConflict unless the entity’s current version equals v.
The version check and WAL append are atomic (locked together).
Returns the new entity version after the append.
Sourcepub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()>
pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()>
Ingest a batch of events with a single write lock acquisition.
All events are validated first. If any event fails validation, no events are stored (all-or-nothing validation). Events are then written to WAL, indexed, processed through projections, and pushed to the events vector under a single write lock.
Sourcepub fn ingest_replicated(&self, event: &Event) -> Result<()>
pub fn ingest_replicated(&self, event: &Event) -> Result<()>
Ingest a replicated event from the leader (follower mode).
Unlike ingest(), this method:
- Skips WAL writing (the follower’s WalReceiver manages its own local WAL)
- Skips schema validation (the leader already validated)
- Still indexes, processes projections/pipelines, and broadcasts to WebSocket clients
Sourcepub fn get_entity_version(&self, entity_id: &str) -> u64
pub fn get_entity_version(&self, entity_id: &str) -> u64
Get the current version for an entity (number of events appended for it). Returns 0 if the entity has no events.
Sourcepub fn consumer_registry(&self) -> &ConsumerRegistry
pub fn consumer_registry(&self) -> &ConsumerRegistry
Get the consumer registry for durable subscriptions.
Sourcepub fn subscribe_events(&self) -> Receiver<Arc<Event>>
pub fn subscribe_events(&self) -> Receiver<Arc<Event>>
Subscribe to every successfully-ingested event in this store.
Returns a tokio::sync::broadcast::Receiver that yields an Arc<Event>
for each ingest. Always available — does not require the server
feature. Lagging receivers surface RecvError::Lagged.
Sourcepub fn set_consumer_registry(&mut self, registry: Arc<ConsumerRegistry>)
pub fn set_consumer_registry(&mut self, registry: Arc<ConsumerRegistry>)
Replace the default in-memory consumer registry with a durable one.
Called during startup when system repositories are available, so that consumer cursors survive Core restarts via WAL persistence.
Sourcepub fn total_events(&self) -> usize
pub fn total_events(&self) -> usize
Get the total number of events in the store (used as max offset for consumer ack).
Sourcepub fn events_after_offset(
&self,
offset: u64,
filters: &[String],
limit: usize,
) -> Vec<(u64, Event)>
pub fn events_after_offset( &self, offset: u64, filters: &[String], limit: usize, ) -> Vec<(u64, Event)>
Get events after a given offset, optionally filtered by event type prefixes. Used by consumer polling to fetch unprocessed events.
Sourcepub fn websocket_manager(&self) -> Arc<WebSocketManager> ⓘ
pub fn websocket_manager(&self) -> Arc<WebSocketManager> ⓘ
Get the WebSocket manager for this store
Sourcepub fn snapshot_manager(&self) -> Arc<SnapshotManager> ⓘ
pub fn snapshot_manager(&self) -> Arc<SnapshotManager> ⓘ
Get the snapshot manager for this store
Sourcepub fn compaction_manager(&self) -> Option<Arc<CompactionManager>>
pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>>
Get the compaction manager for this store
Sourcepub fn schema_registry(&self) -> Arc<SchemaRegistry> ⓘ
pub fn schema_registry(&self) -> Arc<SchemaRegistry> ⓘ
Get the schema registry for this store (v0.5 feature)
Sourcepub fn replay_manager(&self) -> Arc<ReplayManager> ⓘ
pub fn replay_manager(&self) -> Arc<ReplayManager> ⓘ
Get the replay manager for this store (v0.5 feature)
Sourcepub fn pipeline_manager(&self) -> Arc<PipelineManager> ⓘ
pub fn pipeline_manager(&self) -> Arc<PipelineManager> ⓘ
Get the pipeline manager for this store (v0.5 feature)
Sourcepub fn metrics(&self) -> Arc<MetricsRegistry> ⓘ
pub fn metrics(&self) -> Arc<MetricsRegistry> ⓘ
Get the metrics registry for this store (v0.6 feature)
Sourcepub fn projection_manager(&self) -> RwLockReadGuard<'_, ProjectionManager>
pub fn projection_manager(&self) -> RwLockReadGuard<'_, ProjectionManager>
Get the projection manager for this store (v0.7 feature)
Sourcepub fn register_projection(&self, projection: Arc<dyn Projection>)
pub fn register_projection(&self, projection: Arc<dyn Projection>)
Register a custom projection at runtime.
The projection will receive all future events via process().
Historical events are not replayed — only events ingested after
registration will be processed by this projection.
See register_projection_with_backfill
to also process historical events.
Sourcepub fn register_projection_with_backfill(
&self,
projection: &Arc<dyn Projection>,
) -> Result<()>
pub fn register_projection_with_backfill( &self, projection: &Arc<dyn Projection>, ) -> Result<()>
Register a custom projection and replay all existing events through it.
After registration, the projection will also receive all future events. Historical events are replayed under a read lock — the projection’s internal state (typically DashMap) handles concurrent access.
Sourcepub fn projection_state_cache(&self) -> Arc<DashMap<String, Value>> ⓘ
pub fn projection_state_cache(&self) -> Arc<DashMap<String, Value>> ⓘ
Get the projection state cache for this store (v0.7 feature) Used by Elixir Query Service for state synchronization
Sourcepub fn projection_status(&self) -> Arc<DashMap<String, String>> ⓘ
pub fn projection_status(&self) -> Arc<DashMap<String, String>> ⓘ
Get the projection status map (v0.13 feature)
Sourcepub fn geo_index(&self) -> Arc<GeoIndex> ⓘ
pub fn geo_index(&self) -> Arc<GeoIndex> ⓘ
Get the webhook registry for this store (v0.11 feature) Geospatial index for coordinate-based queries (v2.0 feature)
Sourcepub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> ⓘ
pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry> ⓘ
Exactly-once processing registry (v2.0 feature)
Sourcepub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> ⓘ
pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager> ⓘ
Schema evolution manager (v2.0 feature)
Sourcepub fn snapshot_events(&self) -> Vec<Event>
pub fn snapshot_events(&self) -> Vec<Event>
Get a read-locked snapshot of all events (for EventQL/GraphQL queries).
Returns an Arc reference to the internal events vec, avoiding a full
clone. The caller holds a read lock for the duration of the Arc
lifetime — prefer short-lived usage.
Sourcepub fn compact_entity_tokens(
&self,
entity_id: &str,
token_event_type: &str,
merged_event: Event,
) -> Result<bool>
pub fn compact_entity_tokens( &self, entity_id: &str, token_event_type: &str, merged_event: Event, ) -> Result<bool>
Compact token events for an entity by replacing matching events with a single merged event. Used by the embedded streaming feature.
Returns Ok(true) if compaction was performed, Ok(false) if no
matching events were found.
Note: The merged event is processed through projections without clearing the removed events’ projection state first. Projections that accumulate state (e.g., counters) should be designed to handle this (the merged event replaces individual tokens, not adds to them).
Crash safety: The WAL append happens after the in-memory swap under the write lock. If the process crashes before the WAL write, no change is persisted — WAL replay restores the pre-compaction state. If the process crashes after the WAL write, replay sees the merged event (and the original tokens, which are idempotent to replay since the merged event supersedes them).
The write lock is held for the swap + WAL write + index rebuild. The index rebuild is O(N) over all events, which is acceptable for embedded workloads but should not be called in hot paths for large stores.
pub fn webhook_registry(&self) -> Arc<WebhookRegistry> ⓘ
Sourcepub fn set_webhook_tx(&self, tx: UnboundedSender<WebhookDeliveryTask>)
pub fn set_webhook_tx(&self, tx: UnboundedSender<WebhookDeliveryTask>)
Set the channel for async webhook delivery. Called during server startup to wire the delivery worker.
Sourcepub fn flush_storage(&self) -> Result<()>
pub fn flush_storage(&self) -> Result<()>
Manually flush any pending events to persistent storage
Sourcepub fn checkpoint(&self) -> Result<()>
pub fn checkpoint(&self) -> Result<()>
Run a checkpoint: flush pending Parquet batches, then truncate the WAL through the checkpoint point (Step 6 of the sustainable data strategy).
Order matters. We flush Parquet first; only on success do we
truncate the WAL. If the process crashes between the flush and the
truncate, the WAL still contains the events that were just durably
written, and recovery will replay them. The dedupe in
append_loaded_event (index probe) makes that idempotent — the
event is already in Parquet, so the lazy-load splice no-ops once
the tenant is hydrated.
The reverse order would be unsafe: a crash between truncate and flush would lose committed events.
This bounds dirty-restart replay time to one checkpoint interval regardless of total dataset size — that’s the load-bearing property for cold-start time as ingest rate grows.
No-op when no WAL is configured (in-memory-only mode).
Sourcepub fn checkpoint_interval(&self) -> Option<Duration>
pub fn checkpoint_interval(&self) -> Option<Duration>
Get the configured checkpoint cadence (used by background tasks).
Sourcepub fn ensure_tenant_loaded(&self, tenant_id: &str) -> Result<()>
pub fn ensure_tenant_loaded(&self, tenant_id: &str) -> Result<()>
Hydrate tenant_id’s persisted Parquet data into the in-memory
pile if it isn’t already loaded. Cheap on the warm path
(DashMap probe); on the cold path it walks just that tenant’s
subtree (load_events_for_tenant) and splices the events into
events/index/projections/entity_versions.
Concurrent first-callers for the same tenant serialize on a per-tenant Mutex (singleflight) so the disk read happens once. Other tenants are unaffected — distinct lock per tenant.
Returns Err if the tenant_id fails the path-safety
whitelist, the Parquet read fails, or another in-flight load
holds the lock past the configured timeout. The caller (a
query handler) is expected to surface that as a 5xx — see
Step 2’s “no infinite hangs” acceptance criterion.
On failure, loaded is NOT marked, so a transient error is
retried on the next request rather than poisoning the tenant
permanently. A future commit may add a circuit breaker if
thrash becomes an issue.
No-op (and Ok) when no Parquet storage is configured — the in-memory-only mode used by tests has nothing to hydrate.
Sourcepub fn is_tenant_loaded(&self, tenant_id: &str) -> bool
pub fn is_tenant_loaded(&self, tenant_id: &str) -> bool
True iff ensure_tenant_loaded has previously succeeded for
this tenant. Diagnostic / testing API.
Sourcepub fn evict_tenant(&self, tenant_id: &str)
pub fn evict_tenant(&self, tenant_id: &str)
Drop tenant_id from the in-memory cache. Step 3 #2 of the
sustainable data strategy.
Removes every event for this tenant from the events Vec,
rebuilds the index/entity_versions for the retained events
(Vec offsets shift on remove, so the index has to be
rebuilt), and resets the tenant_loader bookkeeping so a
subsequent query triggers a fresh ensure_tenant_loaded.
Parquet is canonical, in-memory is just cache. This only affects the in-memory side. Disk data is untouched — that’s why eviction is safe even for tenants with recently-ingested data: a query after eviction transparently re-reads from Parquet.
Projection state is NOT rolled back. Projections accumulate across boots and tenants (their durability story is separate); subtracting them would need replay support that doesn’t exist in this commit. After eviction + re-load, projections may double-count the re-loaded events. Step 3 #4’s stress test only asserts the cache budget is held; a future commit will tackle projection-aware eviction.
Locking: takes the events write lock for the full duration of the filter + re-index. Concurrent ingest blocks until done. Eviction is the cold path; the working set should stay in budget so this rarely fires.
Sourcepub fn tenant_resident_bytes(&self, tenant_id: &str) -> u64
pub fn tenant_resident_bytes(&self, tenant_id: &str) -> u64
Approximate resident bytes a single tenant occupies in the in-memory cache. Step 3 budget-tracking input. 0 for cold or evicted tenants.
Sourcepub fn cache_resident_bytes(&self) -> u64
pub fn cache_resident_bytes(&self) -> u64
Sum of resident-byte estimates across every loaded tenant. What the budget check compares against.
Sourcepub fn create_snapshot(&self, entity_id: &str) -> Result<()>
pub fn create_snapshot(&self, entity_id: &str) -> Result<()>
Manually create a snapshot for an entity
Sourcepub fn reset_projection(&self, name: &str) -> Result<usize>
pub fn reset_projection(&self, name: &str) -> Result<usize>
Reset a projection by clearing its state and reprocessing all events
Sourcepub fn get_event_by_id(&self, event_id: &Uuid) -> Result<Option<Event>>
pub fn get_event_by_id(&self, event_id: &Uuid) -> Result<Option<Event>>
Get a single event by its UUID
Sourcepub fn query(&self, request: &QueryEventsRequest) -> Result<Vec<Event>>
pub fn query(&self, request: &QueryEventsRequest) -> Result<Vec<Event>>
Query events based on filters (optimized with indices)
Sourcepub fn reconstruct_state(
&self,
entity_id: &str,
as_of: Option<DateTime<Utc>>,
) -> Result<Value>
pub fn reconstruct_state( &self, entity_id: &str, as_of: Option<DateTime<Utc>>, ) -> Result<Value>
Reconstruct entity state as of a specific timestamp v0.2: Now uses snapshots for fast reconstruction
Sourcepub fn get_snapshot(&self, entity_id: &str) -> Result<Value>
pub fn get_snapshot(&self, entity_id: &str) -> Result<Value>
Get snapshot from projection (faster than reconstructing)
Sourcepub fn stats(&self) -> StoreStats
pub fn stats(&self) -> StoreStats
Get statistics about the event store
Sourcepub fn list_streams(&self) -> Vec<StreamInfo>
pub fn list_streams(&self) -> Vec<StreamInfo>
Get all unique streams (entity_ids) in the store
Sourcepub fn list_event_types(&self) -> Vec<EventTypeInfo>
pub fn list_event_types(&self) -> Vec<EventTypeInfo>
Get all unique event types in the store
Sourcepub fn enable_wal_replication(&self, tx: Sender<WALEntry>)
pub fn enable_wal_replication(&self, tx: Sender<WALEntry>)
Attach a broadcast sender to the WAL for replication.
Thread-safe: can be called through Arc<EventStore> at runtime.
Used during initial setup and during follower → leader promotion.
When set, every WAL append publishes the entry to the broadcast
channel so the WAL shipper can stream it to followers.
Sourcepub fn wal(&self) -> Option<&Arc<WriteAheadLog>>
pub fn wal(&self) -> Option<&Arc<WriteAheadLog>>
Get a reference to the WAL (if configured). Used by the replication catch-up protocol to determine oldest available offset.
Sourcepub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>>
pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>>
Get a reference to the Parquet storage (if configured). Used by the replication catch-up protocol to stream snapshot files to followers.