Skip to main content

Module projection

Module projection 

Source
Expand description

Projection trait and ProjectionRunner.

Projections build read models from the event stream. They are:

  • Asynchronous — fed events independently of the write path
  • Disposable — the read model can be dropped and rebuilt at any time
  • Eventually consistent — they may lag behind the write head

Projection failures must never affect event persistence.

§Incremental catch-up

Projections that track their cursor position can implement Projection::last_sequence so ProjectionRunner::catch_up and ProjectionRunner::catch_up_from_store feed only new events.

§Store-backed streaming

ProjectionRunner::run_from_store and ProjectionRunner::catch_up_from_store load events directly from an EventStore without requiring the caller to pre-load the entire event slice into memory.

§Multi-stream projections

ProjectionRunner::run_all_streams and ProjectionRunner::catch_up_all_streams drive a projection across multiple streams simultaneously. This is required for process families where a read model aggregates across many process instances — for example, MABIS Bilanzkreisabrechnung aggregating events across thousands of MaLo-level process streams for a single billing period.

The GlobalProjectionCheckpoint records per-stream cursors so incremental catch-up only feeds events newer than the last replay.

// Initial full replay across all process streams:
let checkpoint = ProjectionRunner::run_all_streams(
    &mut billing_proj,
    &store,
    &stream_ids,
).await?;

// Later: incremental update after new events arrive:
let checkpoint = ProjectionRunner::catch_up_all_streams(
    &mut billing_proj,
    &store,
    &stream_ids,
    &checkpoint,
).await?;

To enumerate all process streams automatically, use EventStore::list_streams with a prefix. Pass "process/" to scan all tenants, or &format!("process/{tenant_id}/") to scope to one tenant:

// All tenants:
let streams = store.list_streams(Some("process/")).await?;
// Single tenant:
let streams = store.list_streams(Some(&format!("process/{tenant_id}/"))).await?;
let checkpoint = ProjectionRunner::run_all_streams(
    &mut billing_proj, &store, &streams,
).await?;

Structs§

GlobalProjectionCheckpoint
Per-stream sequence number cursors for multi-stream projections.
ProjectionRunner
Drives one or more projections over a slice of events.

Traits§

Projection
A read-model builder that consumes events and maintains queryable state.
ProjectionCheckpointStore
Persist and load named GlobalProjectionCheckpoint values.