Skip to main content

ProjectionRunner

Struct ProjectionRunner 

Source
pub struct ProjectionRunner;
Expand description

Drives one or more projections over a slice of events.

The runner is stateless — it simply iterates over events and calls Projection::handle_event for each.

Implementations§

Source§

impl ProjectionRunner

Source

pub fn run<P: Projection>(projection: &mut P, events: &[EventEnvelope])

Feed all events into projection in order (full replay).

§Performance

This method requires the caller to have already loaded all events into a Vec. For large event streams, prefer run_from_store / run_all_streams which use fold_stream internally and avoid allocating the full event slice.

Source

pub fn run_all( projections: &mut [&mut dyn Projection], events: &[EventEnvelope], )

Feed all events into multiple projections simultaneously (single pass, full replay).

§Performance

Same caveat as run: the caller must supply a pre-loaded slice. For large streams, prefer run_all_streams which streams events directly from the store with O(1) working memory.

Source

pub fn catch_up<P: Projection>(projection: &mut P, events: &[EventEnvelope])

Feed only events newer than the projection’s cursor into projection.

Queries Projection::last_sequence to determine the starting point. If the projection returns None, all events are fed (same as run).

events must be sorted by sequence_number in ascending order (which is the contract for all slices returned by EventStore::load / EventStore::load_from).

This is a binary-search–accelerated variant: it finds the first event past the cursor in O(log n) then feeds the tail in O(k) where k is the number of new events.

Source

pub async fn run_from_store<P, S>( projection: &mut P, store: &S, stream_id: &StreamId, ) -> Result<(), EngineError>
where P: Projection + Send, S: EventStore,

Full replay of stream_id into projection without pre-loading the event slice into a Vec.

Uses EventStore::fold_stream internally so production backends can stream events with cursor-based pagination rather than loading all events at once.

§Errors

Returns EngineError::Store on storage failure. Returns EngineError::Deserialization when the fold closure returns an error (propagated from the store).

Source

pub async fn catch_up_from_store<P, S>( projection: &mut P, store: &S, stream_id: &StreamId, ) -> Result<(), EngineError>
where P: Projection + Send, S: EventStore,

Incremental catch-up of stream_id into projection without pre-loading the event slice into a Vec.

Queries Projection::last_sequence to determine the starting point. If the projection returns None, performs a full replay (same as run_from_store).

§Errors

Returns EngineError::Store on storage failure.

Source

pub async fn run_all_streams<P, S>( projection: &mut P, store: &S, stream_ids: &[StreamId], ) -> Result<GlobalProjectionCheckpoint, EngineError>
where P: Projection + Send, S: EventStore,

Full replay of multiple stream_ids into projection.

Events from each stream are fed in sequence order within that stream. Streams are processed in the order given by stream_ids — if cross-stream event ordering matters, sort stream_ids accordingly or use a single global-sequence backend.

Returns a GlobalProjectionCheckpoint recording the last-processed sequence number for every stream. Pass this to catch_up_all_streams for subsequent incremental updates.

§Production workers: use catch_up_persistent instead

run_all_streams performs a full replay from sequence 0 every time it is called. In a long-running background worker this becomes prohibitively expensive as the event log grows. Use catch_up_persistent instead — it loads and saves a durable checkpoint so only events appended since the last run are fed to the projection.

This method is appropriate for one-shot diagnostic tools, tests, or the very first population of a new projection.

§Errors

Returns EngineError::Store on storage failure for any stream.

Source

pub async fn catch_up_all_streams<P, S>( projection: &mut P, store: &S, stream_ids: &[StreamId], checkpoint: &GlobalProjectionCheckpoint, ) -> Result<GlobalProjectionCheckpoint, EngineError>
where P: Projection + Send, S: EventStore,

Incremental catch-up of multiple stream_ids into projection.

For each stream, queries checkpoint for the last-processed sequence number and feeds only events newer than that cursor.

Returns an updated GlobalProjectionCheckpoint reflecting the new cursors after this catch-up pass. Pass the returned checkpoint to the next catch_up_all_streams call — do not reuse the input checkpoint.

§Errors

Returns EngineError::Store on storage failure for any stream.

Source

pub async fn run_matching_streams<P, S>( projection: &mut P, store: &S, prefix: Option<&str>, ) -> Result<GlobalProjectionCheckpoint, EngineError>
where P: Projection + Send, S: EventStore,

Discover all streams matching prefix and replay them into projection.

Convenience wrapper around EventStore::list_streams + run_all_streams. Useful when the full set of streams is not known at compile time.

§Production workers: use catch_up_persistent instead

This function performs a full replay from sequence 0 every call. For persistent background workers, use catch_up_persistent so only events appended since the last checkpoint are processed.

§Errors

Returns EngineError::Store on storage failures.

Source

pub async fn catch_up_matching_streams<P, S>( projection: &mut P, store: &S, prefix: Option<&str>, checkpoint: &GlobalProjectionCheckpoint, ) -> Result<GlobalProjectionCheckpoint, EngineError>
where P: Projection + Send, S: EventStore,

Incremental catch-up of all streams matching prefix.

Convenience wrapper for the common pattern of discovering streams and then calling catch_up_all_streams.

§Errors

Returns EngineError::Store on storage failures.

Source

pub async fn catch_up_persistent<P, S>( projection: &mut P, store: &S, prefix: Option<&str>, checkpoint_name: &str, ) -> Result<GlobalProjectionCheckpoint, EngineError>

Incremental, persistent catch-up for all streams matching prefix.

Loads the named checkpoint from store, performs an incremental catch-up of every matching stream, then saves the updated checkpoint back atomically. On the next call, only events appended since the last run are processed — avoiding full replays across restarts.

This is the preferred entry point for background projection workers that must survive process restarts.

§Key space

The SlateDB implementation stores cp/{checkpoint_name}/{stream_id}u64 LE (8 bytes) per stream. Each cycle only writes the streams whose cursors advanced, giving O(changed_streams) write cost instead of O(total_streams).

§Errors

Returns EngineError::Store on any storage failure (checkpoint load, event scan, or checkpoint save).

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Read<Exclusive, BecauseExclusive> for T
where T: ?Sized,

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more