Skip to main content

EventStore

Trait EventStore 

Source
pub trait EventStore: Send + Sync {
    // Required methods
    async fn append(
        &self,
        stream_id: &StreamId,
        expected_version: ExpectedVersion,
        events: &[NewEvent],
    ) -> Result<AppendResult, EngineError>;
    async fn load(
        &self,
        stream_id: &StreamId,
    ) -> Result<Vec<EventEnvelope>, EngineError>;
    async fn load_from(
        &self,
        stream_id: &StreamId,
        from_sequence: u64,
    ) -> Result<Vec<EventEnvelope>, EngineError>;
    async fn stream_version(
        &self,
        stream_id: &StreamId,
    ) -> Result<u64, EngineError>;
    async fn list_streams(
        &self,
        prefix: Option<&str>,
    ) -> Result<Vec<StreamId>, EngineError>;
    async fn fold_stream<T, F>(
        &self,
        stream_id: &StreamId,
        from_sequence: u64,
        initial: T,
        f: F,
    ) -> Result<T, EngineError>
       where T: Send,
             F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send;

    // Provided method
    async fn list_streams_page(
        &self,
        prefix: Option<&str>,
        cursor: Option<&StreamId>,
        limit: usize,
    ) -> Result<Vec<StreamId>, EngineError> { ... }
}
Expand description

Append-only, ordered event stream storage contract.

§Implementation requirements

  • Ordered: events within a stream are always returned in append order.
  • Atomic: a multi-event append either fully succeeds or fully fails.
  • Optimistic concurrency: detect concurrent writers via ExpectedVersion.
  • Append-only: events are never modified or deleted through this API.
  • Sequence number ownership: the store assigns sequence_number, event_id, stream_id, and timestamp on each appended envelope. Callers submit NewEvent values without these fields.

§Blanket Arc implementation

Arc<S> implements EventStore whenever S: EventStore, so Process<W, Arc<MyStore>> works without any extra wrapper type.

Required Methods§

Source

async fn append( &self, stream_id: &StreamId, expected_version: ExpectedVersion, events: &[NewEvent], ) -> Result<AppendResult, EngineError>

Atomically append events to stream_id.

The store assigns event_id, sequence_number, stream_id, and timestamp on each event. The fully materialised envelopes are returned in AppendResult::events.

§Errors
Source

async fn load( &self, stream_id: &StreamId, ) -> Result<Vec<EventEnvelope>, EngineError>

Load all events from stream_id in sequence order.

Returns an empty Vec when the stream does not exist.

§Errors

Returns EngineError::Store for underlying storage failures.

Source

async fn load_from( &self, stream_id: &StreamId, from_sequence: u64, ) -> Result<Vec<EventEnvelope>, EngineError>

Load events from stream_id starting after from_sequence (exclusive).

Useful for incremental projection catch-up: pass the projection’s last processed sequence number to load only new events.

Returns an empty Vec when no new events exist.

§Errors

Returns EngineError::Store for underlying storage failures.

Source

async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError>

Return the current sequence number of stream_id.

The sequence number equals the number of events in the stream (1-based after the first append). Returns 0 when the stream does not exist.

Use this instead of load(…).await?.len() when you only need the count — backends can implement this as a cheap metadata query without transferring event payloads.

Required. There is no default implementation — a fallback that loads all events defeats the O(1) metadata-query contract. Implementors must read the stored sequence counter directly (e.g. a sv/{stream_id} key in SlateDB) to avoid O(n) event-payload transfers.

§Errors

Returns EngineError::Store for underlying storage failures.

Source

async fn list_streams( &self, prefix: Option<&str>, ) -> Result<Vec<StreamId>, EngineError>

Return all known stream identifiers in this store, optionally filtered by prefix.

When prefix is Some("process/"), only streams whose identifiers start with "process/" are returned (e.g. all process-instance streams). When prefix is None, all streams are returned.

This is the primary enumeration API for multi-stream projections: the caller discovers all relevant streams, then passes the list to crate::projection::ProjectionRunner::run_all_streams / crate::projection::ProjectionRunner::catch_up_all_streams.

The returned order is unspecified. Stable ordering can be achieved by sorting the Vec before use if deterministic replay is required.

Required. There is no default implementation — a missing override silently returns no streams, causing multi-stream projections (e.g. MABIS billing aggregations) to process zero streams and return empty read models with no error signal.

§Errors

Returns EngineError::Store for underlying storage failures.

Source

async fn fold_stream<T, F>( &self, stream_id: &StreamId, from_sequence: u64, initial: T, f: F, ) -> Result<T, EngineError>
where T: Send, F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send,

Fold over events in stream_id starting after from_sequence (exclusive), accumulating state without materialising the full Vec<EventEnvelope>.

This is the memory-efficient alternative to load_from for large streams. Instead of returning all events as a Vec, it applies f to each event in order and returns the final accumulated value.

from_sequence = 0 folds from the beginning of the stream.

// Reconstruct process state event-by-event without a Vec allocation:
let state = store.fold_stream(
    &stream_id, 0, W::State::default(),
    |acc, env| Ok(acc.apply(env.event()))
).await?;

Required. There is no default implementation — a fallback that materialises load_from(...) into a Vec defeats the purpose of this method for large MABIS billing streams (potentially thousands of events per billing period). Implementors must provide a cursor-based scan for constant-memory behaviour.

§Errors

Returns EngineError::Store for underlying storage failures. Returns any error produced by f.

Provided Methods§

Source

async fn list_streams_page( &self, prefix: Option<&str>, cursor: Option<&StreamId>, limit: usize, ) -> Result<Vec<StreamId>, EngineError>

Paginated stream enumeration — equivalent to list_streams but returns at most limit entries starting after cursor (exclusive, UTF-8 stream-ID order).

§Parameters
  • prefix — optional key prefix to restrict the scan (same semantics as list_streams).
  • cursor — if Some(s), resume after stream ID s; None starts from the beginning.
  • limit — maximum number of stream IDs to return per page. A return count strictly less than limit indicates the last page.
§Page iteration pattern
let mut cursor: Option<StreamId> = None;
loop {
    let page = store.list_streams_page(Some("process/"), cursor.as_ref(), 100).await?;
    let done = page.len() < 100;
    for id in &page { /* process */ }
    cursor = page.into_iter().last();
    if done { break; }
}
§Default implementation

Falls back to list_streams + in-memory slicing for stores that do not provide a native cursor scan.

§⚠️ Override required for production stores

This default loads all matching stream IDs into memory on every call, making list_streams_page loops O(n²) in total stream count. Any production EventStore implementation (e.g. PostgreSQL, CockroachDB) must override this method with an efficient cursor-based scan. The SlateDB store already provides such an override. Failure to override this method will cause projection catch-up to degrade silently under deployments with > 10,000 active process streams.

§Errors

Returns EngineError::Store for underlying storage failures.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety".

Implementations on Foreign Types§

Source§

impl<S: EventStore> EventStore for Arc<S>

Source§

async fn append( &self, stream_id: &StreamId, expected_version: ExpectedVersion, events: &[NewEvent], ) -> Result<AppendResult, EngineError>

Source§

async fn load( &self, stream_id: &StreamId, ) -> Result<Vec<EventEnvelope>, EngineError>

Source§

async fn load_from( &self, stream_id: &StreamId, from_sequence: u64, ) -> Result<Vec<EventEnvelope>, EngineError>

Source§

async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError>

Source§

async fn list_streams( &self, prefix: Option<&str>, ) -> Result<Vec<StreamId>, EngineError>

Source§

async fn list_streams_page( &self, prefix: Option<&str>, cursor: Option<&StreamId>, limit: usize, ) -> Result<Vec<StreamId>, EngineError>

Source§

async fn fold_stream<T, F>( &self, stream_id: &StreamId, from_sequence: u64, initial: T, f: F, ) -> Result<T, EngineError>
where T: Send, F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send,

Implementors§