Expand description
JetStream wiring for cellos-server.
Closes the two open SEAMs from ADR-0011 (projection replay on boot)
and ADR-0015 (?since=<seq> historical replay via JetStream).
§Why the indirection
cellos-server is a projection over the cellos.events.> event
log (CHATROOM Session 16). The in-memory AppState registry MUST be
rebuildable from JetStream alone — that is the property that makes
the server safely restartable. This module owns:
-
Stream provisioning.
ensure_streamgets-or-creates the canonicalCELLOS_EVENTSstream with the FC-74 retention floor (90 days, file-backed). Idempotent; a pre-existing stream is left untouched (we do not reconcile config drift here — that is a cluster-admin concern, not an HTTP-control-plane one). -
Ephemeral consumer construction.
create_ephemeral_consumerreturns a pull consumer whoseDeliverPolicyis selected from a caller-suppliedstart_seq:None→DeliverPolicy::All(replay everything from the earliest retained message),Some(N)→DeliverPolicy::ByStartSequence { N+1 }(resume from the message after the client’s cursor, exactly matching the ADR-0015 §D3 contract). -
Boot-time replay.
replay_projectiondrains every message currently in the stream intoAppState, advancing the snapshot cursor as it goes, then returns. Called before the HTTP listener binds so the firstGET /v1/formationsafter a restart returns the fully-rebuilt view with a non-zero cursor.
§What we deliberately do not do
- We never create durable consumers. The server’s view is ephemeral by design: every restart replays the stream, so there is no consumer state to leak into JetStream’s own consumer table.
- We do not acknowledge messages.
AckPolicy::Nonekeeps the consumer cheap and side-effect-free; this is a read-only projection feed, not a work queue. - We do not reconcile stream-config drift. If
CELLOS_EVENTSalready exists with a differentmax_age, we use it as-is. The alternative — silently mutating an operator’s stream config — would violate doctrine on least surprise.
Constants§
- STREAM_
NAME - Canonical stream name for every CloudEvent the platform emits.
Mirrors
cellos-supervisor’s producer-side constant. - STREAM_
SUBJECT - Subject filter for every CloudEvent. The MVP supervisor publishes
to
cellos.events.<entity>.<id>.<phase>; the wildcard captures all of them.
Functions§
- create_
ephemeral_ consumer - Create an ephemeral pull consumer against the canonical stream.
- deliver_
policy_ for - Decide which
DeliverPolicyto apply for a given resume cursor. - deliver_
policy_ live_ tail - Live-tail policy for
/ws/eventsopens without a?since=param. - ensure_
stream - Construct a JetStream context and ensure the canonical
CELLOS_EVENTSstream exists with the FC-74 retention floor. - ephemeral_
consumer_ config_ for - Construct the pull-consumer config that backs the WebSocket bridge and boot-time replay. Pulled out as a pure function so the policy decision can be unit-tested without a live NATS broker.
- looks_
like_ retention_ exhausted - Heuristic: did
create_ephemeral_consumerfail because the caller’sstart_seqis older than the stream’s retained floor? - open_
ws_ message_ stream - Open an ephemeral pull-consumer message stream for the WebSocket
bridge. Distinct from
replay_projection’s consumer because the WS path needs a long-livedmessages()stream (waits for new publishes) rather than a fetch-and-terminate loop. - replay_
projection - Drive an ephemeral
DeliverPolicy::Allconsumer to completion, applying each event intoAppStateand advancing the snapshot cursor. Returns once a batch fetch comes back empty within [REPLAY_BATCH_TIMEOUT], which indicates the stream’s tail has been reached. - stream_
first_ seq - Inspect the live stream’s retained-sequence floor so the WS bridge
can report it back to a client whose
sincewas below the retention window. ReturnsNoneif the stream cannot be reached (in which case the caller should fall through to the generic close path).