Skip to main content

spawn_live_projector

Function spawn_live_projector 

Source
pub async fn spawn_live_projector(
    state: AppState,
    ctx: Context,
) -> Result<JoinHandle<()>>
Expand description

Drive a long-lived DeliverPolicy::New consumer that pumps every post-boot CloudEvent into AppState via apply_event_payload.

This closes the PROJECTION-LIVE-LAG gap (E2E-V2 acceptance report, 0.5.1 → 0.5.2 wave): replay_projection rebuilds the projection once at boot, but until this loop runs nothing fed new events into the projection while the server was alive — supervisor-emitted cell.lifecycle.v1.* events only appeared in GET /v1/cells after the next server restart re-replayed the stream.

The loop is intentionally minimal:

  • DeliverPolicy::New (not All): replay has already drained the stream’s history; we only want what publishes AFTER the listener binds. The two paths together cover the whole timeline without double-counting the boundary message.
  • No subject filter: same cellos.events.> surface the replay consumer used. apply_event_payload is the single discriminator.
  • AckPolicy::None + ephemeral: identical to the replay consumer’s semantics, so a process restart re-derives the consumer cheaply and never accumulates broker-side state.
  • messages() (not fetch()): we want to block on the next publish indefinitely; this is the long-lived tail, not a drain.

The task is detached — main.rs keeps the JoinHandle only so a future graceful-shutdown path can abort it cleanly. On error the loop logs and exits; the projection will go stale until the server restarts, but that is strictly no worse than the pre-fix behavior (which never had a live projector at all).