Skip to main content

Module jetstream

Module jetstream 

Source
Expand description

JetStream wiring for cellos-server.

Closes the two open SEAMs from ADR-0011 (projection replay on boot) and ADR-0015 (?since=<seq> historical replay via JetStream).

§Why the indirection

cellos-server is a projection over the cellos.events.> event log (CHATROOM Session 16). The in-memory AppState registry MUST be rebuildable from JetStream alone — that is the property that makes the server safely restartable. This module owns:

  1. Stream provisioning. ensure_stream gets-or-creates the canonical CELLOS_EVENTS stream with the FC-74 retention floor (90 days, file-backed). Idempotent; a pre-existing stream is left untouched (we do not reconcile config drift here — that is a cluster-admin concern, not an HTTP-control-plane one).

  2. Ephemeral consumer construction. create_ephemeral_consumer returns a pull consumer whose DeliverPolicy is selected from a caller-supplied start_seq: NoneDeliverPolicy::All (replay everything from the earliest retained message), Some(N)DeliverPolicy::ByStartSequence { N+1 } (resume from the message after the client’s cursor, exactly matching the ADR-0015 §D3 contract).

  3. Boot-time replay. replay_projection drains every message currently in the stream into AppState, advancing the snapshot cursor as it goes, then returns. Called before the HTTP listener binds so the first GET /v1/formations after a restart returns the fully-rebuilt view with a non-zero cursor.

§What we deliberately do not do

  • We never create durable consumers. The server’s view is ephemeral by design: every restart replays the stream, so there is no consumer state to leak into JetStream’s own consumer table.
  • We do not acknowledge messages. AckPolicy::None keeps the consumer cheap and side-effect-free; this is a read-only projection feed, not a work queue.
  • We do not reconcile stream-config drift. If CELLOS_EVENTS already exists with a different max_age, we use it as-is. The alternative — silently mutating an operator’s stream config — would violate doctrine on least surprise.

Constants§

STREAM_NAME
Canonical stream name for every CloudEvent the platform emits. Mirrors cellos-supervisor’s producer-side constant.
STREAM_SUBJECT
Subject filter for every CloudEvent. The MVP supervisor publishes to cellos.events.<entity>.<id>.<phase>; the wildcard captures all of them.

Functions§

create_ephemeral_consumer
Create an ephemeral pull consumer against the canonical stream.
deliver_policy_for
Decide which DeliverPolicy to apply for a given resume cursor.
deliver_policy_live_tail
Live-tail policy for /ws/events opens without a ?since= param.
ensure_stream
Construct a JetStream context and ensure the canonical CELLOS_EVENTS stream exists with the FC-74 retention floor.
ephemeral_consumer_config_for
Construct the pull-consumer config that backs the WebSocket bridge and boot-time replay. Pulled out as a pure function so the policy decision can be unit-tested without a live NATS broker.
looks_like_retention_exhausted
Heuristic: did create_ephemeral_consumer fail because the caller’s start_seq is older than the stream’s retained floor?
open_ws_message_stream
Open an ephemeral pull-consumer message stream for the WebSocket bridge. Distinct from replay_projection’s consumer because the WS path needs a long-lived messages() stream (waits for new publishes) rather than a fetch-and-terminate loop.
replay_projection
Drive an ephemeral DeliverPolicy::All consumer to completion, applying each event into AppState and advancing the snapshot cursor. Returns once a batch fetch comes back empty within [REPLAY_BATCH_TIMEOUT], which indicates the stream’s tail has been reached.
spawn_live_projector
Drive a long-lived DeliverPolicy::New consumer that pumps every post-boot CloudEvent into AppState via apply_event_payload.
stream_first_seq
Inspect the live stream’s retained-sequence floor so the WS bridge can report it back to a client whose since was below the retention window. Returns None if the stream cannot be reached (in which case the caller should fall through to the generic close path).