pub struct ProjectionRunner;Expand description
Drives one or more projections over a slice of events.
The runner is stateless — it simply iterates over events and calls
Projection::handle_event for each.
Implementations§
Source§impl ProjectionRunner
impl ProjectionRunner
Sourcepub fn run<P: Projection>(projection: &mut P, events: &[EventEnvelope])
pub fn run<P: Projection>(projection: &mut P, events: &[EventEnvelope])
Feed all events into projection in order (full replay).
§Performance
This method requires the caller to have already loaded all events into
a Vec. For large event streams, prefer run_from_store /
run_all_streams which use fold_stream internally and avoid
allocating the full event slice.
Sourcepub fn run_all(
projections: &mut [&mut dyn Projection],
events: &[EventEnvelope],
)
pub fn run_all( projections: &mut [&mut dyn Projection], events: &[EventEnvelope], )
Feed all events into multiple projections simultaneously (single pass,
full replay).
§Performance
Same caveat as run: the caller must supply a pre-loaded slice.
For large streams, prefer run_all_streams which streams events
directly from the store with O(1) working memory.
Sourcepub fn catch_up<P: Projection>(projection: &mut P, events: &[EventEnvelope])
pub fn catch_up<P: Projection>(projection: &mut P, events: &[EventEnvelope])
Feed only events newer than the projection’s cursor into projection.
Queries Projection::last_sequence to determine the starting point.
If the projection returns None, all events are fed (same as run).
events must be sorted by sequence_number in ascending order (which
is the contract for all slices returned by EventStore::load /
EventStore::load_from).
This is a binary-search–accelerated variant: it finds the first event past the cursor in O(log n) then feeds the tail in O(k) where k is the number of new events.
Sourcepub async fn run_from_store<P, S>(
projection: &mut P,
store: &S,
stream_id: &StreamId,
) -> Result<(), EngineError>
pub async fn run_from_store<P, S>( projection: &mut P, store: &S, stream_id: &StreamId, ) -> Result<(), EngineError>
Full replay of stream_id into projection without pre-loading the
event slice into a Vec.
Uses EventStore::fold_stream internally so production backends can
stream events with cursor-based pagination rather than loading all
events at once.
§Errors
Returns EngineError::Store on storage failure.
Returns EngineError::Deserialization when the fold closure returns
an error (propagated from the store).
Sourcepub async fn catch_up_from_store<P, S>(
projection: &mut P,
store: &S,
stream_id: &StreamId,
) -> Result<(), EngineError>
pub async fn catch_up_from_store<P, S>( projection: &mut P, store: &S, stream_id: &StreamId, ) -> Result<(), EngineError>
Incremental catch-up of stream_id into projection without
pre-loading the event slice into a Vec.
Queries Projection::last_sequence to determine the starting point.
If the projection returns None, performs a full replay (same as
run_from_store).
§Errors
Returns EngineError::Store on storage failure.
Sourcepub async fn run_all_streams<P, S>(
projection: &mut P,
store: &S,
stream_ids: &[StreamId],
) -> Result<GlobalProjectionCheckpoint, EngineError>
pub async fn run_all_streams<P, S>( projection: &mut P, store: &S, stream_ids: &[StreamId], ) -> Result<GlobalProjectionCheckpoint, EngineError>
Full replay of multiple stream_ids into projection.
Events from each stream are fed in sequence order within that stream.
Streams are processed in the order given by stream_ids — if
cross-stream event ordering matters, sort stream_ids accordingly
or use a single global-sequence backend.
Returns a GlobalProjectionCheckpoint recording the last-processed
sequence number for every stream. Pass this to
catch_up_all_streams for subsequent incremental updates.
§Production workers: use catch_up_persistent instead
run_all_streams performs a full replay from sequence 0 every
time it is called. In a long-running background worker this becomes
prohibitively expensive as the event log grows. Use
catch_up_persistent instead — it loads and saves a durable
checkpoint so only events appended since the last run are fed to the
projection.
This method is appropriate for one-shot diagnostic tools, tests, or the very first population of a new projection.
§Errors
Returns EngineError::Store on storage failure for any stream.
Sourcepub async fn catch_up_all_streams<P, S>(
projection: &mut P,
store: &S,
stream_ids: &[StreamId],
checkpoint: &GlobalProjectionCheckpoint,
) -> Result<GlobalProjectionCheckpoint, EngineError>
pub async fn catch_up_all_streams<P, S>( projection: &mut P, store: &S, stream_ids: &[StreamId], checkpoint: &GlobalProjectionCheckpoint, ) -> Result<GlobalProjectionCheckpoint, EngineError>
Incremental catch-up of multiple stream_ids into projection.
For each stream, queries checkpoint for the last-processed sequence
number and feeds only events newer than that cursor.
Returns an updated GlobalProjectionCheckpoint reflecting the new
cursors after this catch-up pass. Pass the returned checkpoint to the
next catch_up_all_streams call — do not reuse the input checkpoint.
§Errors
Returns EngineError::Store on storage failure for any stream.
Sourcepub async fn run_matching_streams<P, S>(
projection: &mut P,
store: &S,
prefix: Option<&str>,
) -> Result<GlobalProjectionCheckpoint, EngineError>
pub async fn run_matching_streams<P, S>( projection: &mut P, store: &S, prefix: Option<&str>, ) -> Result<GlobalProjectionCheckpoint, EngineError>
Discover all streams matching prefix and replay them into projection.
Convenience wrapper around EventStore::list_streams +
run_all_streams. Useful when the full set of streams is not known
at compile time.
§Production workers: use catch_up_persistent instead
This function performs a full replay from sequence 0 every call.
For persistent background workers, use catch_up_persistent so only
events appended since the last checkpoint are processed.
§Errors
Returns EngineError::Store on storage failures.
Sourcepub async fn catch_up_matching_streams<P, S>(
projection: &mut P,
store: &S,
prefix: Option<&str>,
checkpoint: &GlobalProjectionCheckpoint,
) -> Result<GlobalProjectionCheckpoint, EngineError>
pub async fn catch_up_matching_streams<P, S>( projection: &mut P, store: &S, prefix: Option<&str>, checkpoint: &GlobalProjectionCheckpoint, ) -> Result<GlobalProjectionCheckpoint, EngineError>
Incremental catch-up of all streams matching prefix.
Convenience wrapper for the common pattern of discovering streams and
then calling catch_up_all_streams.
§Errors
Returns EngineError::Store on storage failures.
Sourcepub async fn catch_up_persistent<P, S>(
projection: &mut P,
store: &S,
prefix: Option<&str>,
checkpoint_name: &str,
) -> Result<GlobalProjectionCheckpoint, EngineError>
pub async fn catch_up_persistent<P, S>( projection: &mut P, store: &S, prefix: Option<&str>, checkpoint_name: &str, ) -> Result<GlobalProjectionCheckpoint, EngineError>
Incremental, persistent catch-up for all streams matching prefix.
Loads the named checkpoint from store, performs an incremental
catch-up of every matching stream, then saves the updated checkpoint
back atomically. On the next call, only events appended since the last
run are processed — avoiding full replays across restarts.
This is the preferred entry point for background projection workers that must survive process restarts.
§Key space
The SlateDB implementation stores cp/{checkpoint_name}/{stream_id} →
u64 LE (8 bytes) per stream. Each cycle only writes the streams
whose cursors advanced, giving O(changed_streams) write cost instead
of O(total_streams).
§Errors
Returns EngineError::Store on any storage failure (checkpoint load,
event scan, or checkpoint save).