Skip to main content

EventStore

Struct EventStore 

Source
pub struct EventStore { /* private fields */ }
Expand description

High-performance event store with columnar storage

Implementations§

Source§

impl EventStore

Source

pub fn new() -> Self

Create a new in-memory event store

Source

pub fn with_config(config: EventStoreConfig) -> Self

Create event store with custom configuration

Source

pub fn ingest_with_expected_version( &self, event: &Event, expected_version: Option<u64>, ) -> Result<u64>

Ingest a new event with optional optimistic concurrency check.

If expected_version is Some(v), the write is rejected with VersionConflict unless the entity’s current version equals v. The version check and WAL append are atomic (locked together).

Returns the new entity version after the append.

Source

pub fn ingest(&self, event: &Event) -> Result<()>

Ingest a new event into the store

Source

pub fn ingest_batch(&self, batch: Vec<Event>) -> Result<()>

Ingest a batch of events with a single write lock acquisition.

All events are validated first. If any event fails validation, no events are stored (all-or-nothing validation). Events are then written to WAL, indexed, processed through projections, and pushed to the events vector under a single write lock.

Source

pub fn ingest_replicated(&self, event: &Event) -> Result<()>

Ingest a replicated event from the leader (follower mode).

Unlike ingest(), this method:

  • Skips WAL writing (the follower’s WalReceiver manages its own local WAL)
  • Skips schema validation (the leader already validated)
  • Still indexes, processes projections/pipelines, and broadcasts to WebSocket clients
Source

pub fn get_entity_version(&self, entity_id: &str) -> u64

Get the current version for an entity (number of events appended for it). Returns 0 if the entity has no events.

Source

pub fn consumer_registry(&self) -> &ConsumerRegistry

Get the consumer registry for durable subscriptions.

Source

pub fn set_consumer_registry(&mut self, registry: Arc<ConsumerRegistry>)

Replace the default in-memory consumer registry with a durable one.

Called during startup when system repositories are available, so that consumer cursors survive Core restarts via WAL persistence.

Source

pub fn total_events(&self) -> usize

Get the total number of events in the store (used as max offset for consumer ack).

Source

pub fn events_after_offset( &self, offset: u64, filters: &[String], limit: usize, ) -> Vec<(u64, Event)>

Get events after a given offset, optionally filtered by event type prefixes. Used by consumer polling to fetch unprocessed events.

Source

pub fn websocket_manager(&self) -> Arc<WebSocketManager>

Get the WebSocket manager for this store

Source

pub fn snapshot_manager(&self) -> Arc<SnapshotManager>

Get the snapshot manager for this store

Source

pub fn compaction_manager(&self) -> Option<Arc<CompactionManager>>

Get the compaction manager for this store

Source

pub fn schema_registry(&self) -> Arc<SchemaRegistry>

Get the schema registry for this store (v0.5 feature)

Source

pub fn replay_manager(&self) -> Arc<ReplayManager>

Get the replay manager for this store (v0.5 feature)

Source

pub fn pipeline_manager(&self) -> Arc<PipelineManager>

Get the pipeline manager for this store (v0.5 feature)

Source

pub fn metrics(&self) -> Arc<MetricsRegistry>

Get the metrics registry for this store (v0.6 feature)

Source

pub fn projection_manager(&self) -> RwLockReadGuard<'_, ProjectionManager>

Get the projection manager for this store (v0.7 feature)

Source

pub fn register_projection(&self, projection: Arc<dyn Projection>)

Register a custom projection at runtime.

The projection will receive all future events via process(). Historical events are not replayed — only events ingested after registration will be processed by this projection.

See register_projection_with_backfill to also process historical events.

Source

pub fn register_projection_with_backfill( &self, projection: &Arc<dyn Projection>, ) -> Result<()>

Register a custom projection and replay all existing events through it.

After registration, the projection will also receive all future events. Historical events are replayed under a read lock — the projection’s internal state (typically DashMap) handles concurrent access.

Source

pub fn projection_state_cache(&self) -> Arc<DashMap<String, Value>>

Get the projection state cache for this store (v0.7 feature) Used by Elixir Query Service for state synchronization

Source

pub fn projection_status(&self) -> Arc<DashMap<String, String>>

Get the projection status map (v0.13 feature)

Source

pub fn geo_index(&self) -> Arc<GeoIndex>

Get the webhook registry for this store (v0.11 feature) Geospatial index for coordinate-based queries (v2.0 feature)

Source

pub fn exactly_once(&self) -> Arc<ExactlyOnceRegistry>

Exactly-once processing registry (v2.0 feature)

Source

pub fn schema_evolution(&self) -> Arc<SchemaEvolutionManager>

Schema evolution manager (v2.0 feature)

Source

pub fn snapshot_events(&self) -> Vec<Event>

Get a read-locked snapshot of all events (for EventQL/GraphQL queries).

Returns an Arc reference to the internal events vec, avoiding a full clone. The caller holds a read lock for the duration of the Arc lifetime — prefer short-lived usage.

Source

pub fn compact_entity_tokens( &self, entity_id: &str, token_event_type: &str, merged_event: Event, ) -> Result<bool>

Compact token events for an entity by replacing matching events with a single merged event. Used by the embedded streaming feature.

Returns Ok(true) if compaction was performed, Ok(false) if no matching events were found.

Note: The merged event is processed through projections without clearing the removed events’ projection state first. Projections that accumulate state (e.g., counters) should be designed to handle this (the merged event replaces individual tokens, not adds to them).

Crash safety: The WAL append happens after the in-memory swap under the write lock. If the process crashes before the WAL write, no change is persisted — WAL replay restores the pre-compaction state. If the process crashes after the WAL write, replay sees the merged event (and the original tokens, which are idempotent to replay since the merged event supersedes them).

The write lock is held for the swap + WAL write + index rebuild. The index rebuild is O(N) over all events, which is acceptable for embedded workloads but should not be called in hot paths for large stores.

Source

pub fn webhook_registry(&self) -> Arc<WebhookRegistry>

Source

pub fn set_webhook_tx(&self, tx: UnboundedSender<WebhookDeliveryTask>)

Set the channel for async webhook delivery. Called during server startup to wire the delivery worker.

Source

pub fn flush_storage(&self) -> Result<()>

Manually flush any pending events to persistent storage

Source

pub fn create_snapshot(&self, entity_id: &str) -> Result<()>

Manually create a snapshot for an entity

Source

pub fn reset_projection(&self, name: &str) -> Result<usize>

Reset a projection by clearing its state and reprocessing all events

Source

pub fn get_event_by_id(&self, event_id: &Uuid) -> Result<Option<Event>>

Get a single event by its UUID

Source

pub fn query(&self, request: &QueryEventsRequest) -> Result<Vec<Event>>

Query events based on filters (optimized with indices)

Source

pub fn reconstruct_state( &self, entity_id: &str, as_of: Option<DateTime<Utc>>, ) -> Result<Value>

Reconstruct entity state as of a specific timestamp v0.2: Now uses snapshots for fast reconstruction

Source

pub fn get_snapshot(&self, entity_id: &str) -> Result<Value>

Get snapshot from projection (faster than reconstructing)

Source

pub fn stats(&self) -> StoreStats

Get statistics about the event store

Source

pub fn list_streams(&self) -> Vec<StreamInfo>

Get all unique streams (entity_ids) in the store

Source

pub fn list_event_types(&self) -> Vec<EventTypeInfo>

Get all unique event types in the store

Source

pub fn enable_wal_replication(&self, tx: Sender<WALEntry>)

Attach a broadcast sender to the WAL for replication.

Thread-safe: can be called through Arc<EventStore> at runtime. Used during initial setup and during follower → leader promotion. When set, every WAL append publishes the entry to the broadcast channel so the WAL shipper can stream it to followers.

Source

pub fn wal(&self) -> Option<&Arc<WriteAheadLog>>

Get a reference to the WAL (if configured). Used by the replication catch-up protocol to determine oldest available offset.

Source

pub fn parquet_storage(&self) -> Option<&Arc<RwLock<ParquetStorage>>>

Get a reference to the Parquet storage (if configured). Used by the replication catch-up protocol to stream snapshot files to followers.

Trait Implementations§

Source§

impl Default for EventStore

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<A, B, T> HttpServerConnExec<A, B> for T
where B: Body,