pub trait EventStore: Send + Sync {
// Required methods
async fn append(
&self,
stream_id: &StreamId,
expected_version: ExpectedVersion,
events: &[NewEvent],
) -> Result<AppendResult, EngineError>;
async fn load(
&self,
stream_id: &StreamId,
) -> Result<Vec<EventEnvelope>, EngineError>;
async fn load_from(
&self,
stream_id: &StreamId,
from_sequence: u64,
) -> Result<Vec<EventEnvelope>, EngineError>;
async fn stream_version(
&self,
stream_id: &StreamId,
) -> Result<u64, EngineError>;
async fn list_streams(
&self,
prefix: Option<&str>,
) -> Result<Vec<StreamId>, EngineError>;
async fn fold_stream<T, F>(
&self,
stream_id: &StreamId,
from_sequence: u64,
initial: T,
f: F,
) -> Result<T, EngineError>
where T: Send,
F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send;
// Provided method
async fn list_streams_page(
&self,
prefix: Option<&str>,
cursor: Option<&StreamId>,
limit: usize,
) -> Result<Vec<StreamId>, EngineError> { ... }
}Expand description
Append-only, ordered event stream storage contract.
§Implementation requirements
- Ordered: events within a stream are always returned in append order.
- Atomic: a multi-event append either fully succeeds or fully fails.
- Optimistic concurrency: detect concurrent writers via
ExpectedVersion. - Append-only: events are never modified or deleted through this API.
- Sequence number ownership: the store assigns
sequence_number,event_id,stream_id, andtimestampon each appended envelope. Callers submitNewEventvalues without these fields.
§Blanket Arc implementation
Arc<S> implements EventStore whenever S: EventStore, so
Process<W, Arc<MyStore>> works without any extra wrapper type.
Required Methods§
Sourceasync fn append(
&self,
stream_id: &StreamId,
expected_version: ExpectedVersion,
events: &[NewEvent],
) -> Result<AppendResult, EngineError>
async fn append( &self, stream_id: &StreamId, expected_version: ExpectedVersion, events: &[NewEvent], ) -> Result<AppendResult, EngineError>
Atomically append events to stream_id.
The store assigns event_id, sequence_number, stream_id, and
timestamp on each event. The fully materialised envelopes are
returned in AppendResult::events.
§Errors
EngineError::VersionConflictwhenexpected_versionisExpectedVersion::NoStreamorExpectedVersion::Exactand the actual stream version does not match.EngineError::Storefor underlying storage failures.
Sourceasync fn load(
&self,
stream_id: &StreamId,
) -> Result<Vec<EventEnvelope>, EngineError>
async fn load( &self, stream_id: &StreamId, ) -> Result<Vec<EventEnvelope>, EngineError>
Load all events from stream_id in sequence order.
Returns an empty Vec when the stream does not exist.
§Errors
Returns EngineError::Store for underlying storage failures.
Sourceasync fn load_from(
&self,
stream_id: &StreamId,
from_sequence: u64,
) -> Result<Vec<EventEnvelope>, EngineError>
async fn load_from( &self, stream_id: &StreamId, from_sequence: u64, ) -> Result<Vec<EventEnvelope>, EngineError>
Load events from stream_id starting after from_sequence (exclusive).
Useful for incremental projection catch-up: pass the projection’s last processed sequence number to load only new events.
Returns an empty Vec when no new events exist.
§Errors
Returns EngineError::Store for underlying storage failures.
Sourceasync fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError>
async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError>
Return the current sequence number of stream_id.
The sequence number equals the number of events in the stream (1-based
after the first append). Returns 0 when the stream does not exist.
Use this instead of load(…).await?.len() when you only need the
count — backends can implement this as a cheap metadata query without
transferring event payloads.
Required. There is no default implementation — a fallback that
loads all events defeats the O(1) metadata-query contract. Implementors
must read the stored sequence counter directly (e.g. a sv/{stream_id}
key in SlateDB) to avoid O(n) event-payload transfers.
§Errors
Returns EngineError::Store for underlying storage failures.
Sourceasync fn list_streams(
&self,
prefix: Option<&str>,
) -> Result<Vec<StreamId>, EngineError>
async fn list_streams( &self, prefix: Option<&str>, ) -> Result<Vec<StreamId>, EngineError>
Return all known stream identifiers in this store, optionally filtered
by prefix.
When prefix is Some("process/"), only streams whose identifiers
start with "process/" are returned (e.g. all process-instance
streams). When prefix is None, all streams are returned.
This is the primary enumeration API for multi-stream projections: the
caller discovers all relevant streams, then passes the list to
crate::projection::ProjectionRunner::run_all_streams /
crate::projection::ProjectionRunner::catch_up_all_streams.
The returned order is unspecified. Stable ordering can be achieved by
sorting the Vec before use if deterministic replay is required.
Required. There is no default implementation — a missing override silently returns no streams, causing multi-stream projections (e.g. MABIS billing aggregations) to process zero streams and return empty read models with no error signal.
§Errors
Returns EngineError::Store for underlying storage failures.
Sourceasync fn fold_stream<T, F>(
&self,
stream_id: &StreamId,
from_sequence: u64,
initial: T,
f: F,
) -> Result<T, EngineError>
async fn fold_stream<T, F>( &self, stream_id: &StreamId, from_sequence: u64, initial: T, f: F, ) -> Result<T, EngineError>
Fold over events in stream_id starting after from_sequence
(exclusive), accumulating state without materialising the full
Vec<EventEnvelope>.
This is the memory-efficient alternative to load_from for large
streams. Instead of returning all events as a Vec, it applies f to
each event in order and returns the final accumulated value.
from_sequence = 0 folds from the beginning of the stream.
// Reconstruct process state event-by-event without a Vec allocation:
let state = store.fold_stream(
&stream_id, 0, W::State::default(),
|acc, env| Ok(acc.apply(env.event()))
).await?;Required. There is no default implementation — a fallback that
materialises load_from(...) into a Vec defeats the purpose of this
method for large MABIS billing streams (potentially thousands of events
per billing period). Implementors must provide a cursor-based scan for
constant-memory behaviour.
§Errors
Returns EngineError::Store for underlying storage failures.
Returns any error produced by f.
Provided Methods§
Sourceasync fn list_streams_page(
&self,
prefix: Option<&str>,
cursor: Option<&StreamId>,
limit: usize,
) -> Result<Vec<StreamId>, EngineError>
async fn list_streams_page( &self, prefix: Option<&str>, cursor: Option<&StreamId>, limit: usize, ) -> Result<Vec<StreamId>, EngineError>
Paginated stream enumeration — equivalent to list_streams but returns
at most limit entries starting after cursor (exclusive, UTF-8
stream-ID order).
§Parameters
prefix— optional key prefix to restrict the scan (same semantics aslist_streams).cursor— ifSome(s), resume after stream IDs;Nonestarts from the beginning.limit— maximum number of stream IDs to return per page. A return count strictly less thanlimitindicates the last page.
§Page iteration pattern
let mut cursor: Option<StreamId> = None;
loop {
let page = store.list_streams_page(Some("process/"), cursor.as_ref(), 100).await?;
let done = page.len() < 100;
for id in &page { /* process */ }
cursor = page.into_iter().last();
if done { break; }
}§Default implementation
Falls back to list_streams + in-memory slicing for stores that do not
provide a native cursor scan.
§⚠️ Override required for production stores
This default loads all matching stream IDs into memory on every call,
making list_streams_page loops O(n²) in total stream count. Any
production EventStore implementation (e.g. PostgreSQL, CockroachDB)
must override this method with an efficient cursor-based scan. The
SlateDB store already provides such an override. Failure to override
this method will cause projection catch-up to degrade silently under
deployments with > 10,000 active process streams.
§Errors
Returns EngineError::Store for underlying storage failures.
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety".