cellos_server/jetstream.rs
1//! JetStream wiring for `cellos-server`.
2//!
3//! Closes the two open SEAMs from ADR-0011 (projection replay on boot)
4//! and ADR-0015 (`?since=<seq>` historical replay via JetStream).
5//!
6//! ## Why the indirection
7//!
8//! `cellos-server` is a *projection* over the `cellos.events.>` event
9//! log (CHATROOM Session 16). The in-memory `AppState` registry MUST be
10//! rebuildable from JetStream alone — that is the property that makes
11//! the server safely restartable. This module owns:
12//!
13//! 1. **Stream provisioning.** [`ensure_stream`] gets-or-creates the
14//! canonical `CELLOS_EVENTS` stream with the FC-74 retention floor
15//! (90 days, file-backed). Idempotent; a pre-existing stream is left
16//! untouched (we do not reconcile config drift here — that is a
17//! cluster-admin concern, not an HTTP-control-plane one).
18//!
19//! 2. **Ephemeral consumer construction.** [`create_ephemeral_consumer`]
20//! returns a pull consumer whose `DeliverPolicy` is selected from a
21//! caller-supplied `start_seq`: `None` → `DeliverPolicy::All`
22//! (replay everything from the earliest retained message), `Some(N)`
23//! → `DeliverPolicy::ByStartSequence { N+1 }` (resume from the
24//! message after the client's cursor, exactly matching the ADR-0015
25//! §D3 contract).
26//!
27//! 3. **Boot-time replay.** [`replay_projection`] drains every message
28//! currently in the stream into `AppState`, advancing the snapshot
29//! cursor as it goes, then returns. Called before the HTTP listener
30//! binds so the first `GET /v1/formations` after a restart returns
31//! the fully-rebuilt view with a non-zero cursor.
32//!
33//! ## What we deliberately do *not* do
34//!
35//! - We never create *durable* consumers. The server's view is
36//! ephemeral by design: every restart replays the stream, so there
37//! is no consumer state to leak into JetStream's own consumer table.
38//! - We do not acknowledge messages. `AckPolicy::None` keeps the
39//! consumer cheap and side-effect-free; this is a *read-only*
40//! projection feed, not a work queue.
41//! - We do not reconcile stream-config drift. If `CELLOS_EVENTS`
42//! already exists with a different `max_age`, we use it as-is. The
43//! alternative — silently mutating an operator's stream config —
44//! would violate doctrine on least surprise.
45
46use std::time::Duration;
47
48use anyhow::Context as _;
49use async_nats::jetstream::{
50 self,
51 consumer::{
52 pull::{Config as PullConfig, Stream as PullStream},
53 AckPolicy, Consumer, DeliverPolicy,
54 },
55 context::Context,
56 stream::{Config as StreamConfig, RetentionPolicy, StorageType, Stream},
57};
58use futures_util::StreamExt;
59use tracing::{debug, info, warn};
60
61use crate::state::{AppState, ApplyOutcome};
62
63/// Canonical stream name for every CloudEvent the platform emits.
64/// Mirrors `cellos-supervisor`'s producer-side constant.
65pub const STREAM_NAME: &str = "CELLOS_EVENTS";
66
67/// Subject filter for every CloudEvent. The MVP supervisor publishes
68/// to `cellos.events.<entity>.<id>.<phase>`; the wildcard captures all
69/// of them.
70pub const STREAM_SUBJECT: &str = "cellos.events.>";
71
72/// FC-74 retention floor — operators have committed to keeping the
73/// last 90 days of events available for forensic replay. Anything
74/// older falls off the tail.
75const RETENTION_MAX_AGE: Duration = Duration::from_secs(90 * 24 * 60 * 60);
76
77/// Boot-replay drains messages in batches of this size. Picked to be
78/// large enough that a fresh cluster with thousands of events doesn't
79/// pay one round-trip per event, but small enough that a single batch
80/// doesn't dominate startup latency on a small cluster.
81const REPLAY_BATCH: usize = 256;
82
83/// Timeout for a single batch fetch during replay. Short because we
84/// only need to detect "no more messages right now" — the next batch
85/// either returns immediately with data or this timeout fires and we
86/// exit the drain loop.
87const REPLAY_BATCH_TIMEOUT: Duration = Duration::from_millis(250);
88
89/// Construct a JetStream context and ensure the canonical
90/// `CELLOS_EVENTS` stream exists with the FC-74 retention floor.
91///
92/// Idempotent: if the stream already exists with any config we leave
93/// it alone (see module docs for the rationale).
94pub async fn ensure_stream(client: &async_nats::Client) -> anyhow::Result<Context> {
95 let ctx = jetstream::new(client.clone());
96 let config = StreamConfig {
97 name: STREAM_NAME.to_string(),
98 subjects: vec![STREAM_SUBJECT.to_string()],
99 retention: RetentionPolicy::Limits,
100 storage: StorageType::File,
101 max_age: RETENTION_MAX_AGE,
102 ..Default::default()
103 };
104
105 // `get_or_create_stream` is server-idempotent: if a stream with
106 // this name already exists, the server returns it as-is and we
107 // never overwrite operator-tuned settings.
108 ctx.get_or_create_stream(config)
109 .await
110 .context("get_or_create CELLOS_EVENTS stream")?;
111
112 Ok(ctx)
113}
114
115/// Decide which `DeliverPolicy` to apply for a given resume cursor.
116///
117/// Pulled out as a pure function so the policy decision can be
118/// unit-tested without spinning up a NATS broker. The mapping is the
119/// ADR-0015 §D3 contract: `None` means "deliver everything currently
120/// retained" (used by boot replay), `Some(N)` means "deliver from N+1
121/// onward" (the client has already seen up through N).
122pub fn deliver_policy_for(start_seq: Option<u64>) -> DeliverPolicy {
123 match start_seq {
124 // Boot replay path: rebuild state from the oldest retained
125 // message. With FC-74 retention this is the last 90 days.
126 None => DeliverPolicy::All,
127 // Resume path: the client's cursor IS the last seq it applied,
128 // so we start delivery at seq+1 to avoid re-emitting frames it
129 // already processed.
130 Some(seq) => DeliverPolicy::ByStartSequence {
131 start_sequence: seq.saturating_add(1),
132 },
133 }
134}
135
136/// Live-tail policy for `/ws/events` opens without a `?since=` param.
137///
138/// Distinct from the resume path because "no cursor at all" should
139/// NOT replay history; ad-hoc subscribers (cellctl `--follow`, debug
140/// tools) want to see only what flows in after their connect.
141pub fn deliver_policy_live_tail() -> DeliverPolicy {
142 DeliverPolicy::New
143}
144
145/// How long a WS-bridge ephemeral consumer may sit idle on the broker
146/// after its client disappears before JetStream itself garbage-collects
147/// it. Without this, a WebSocket whose TCP connection died ungracefully
148/// (laptop suspend, NAT eviction, OOM kill) leaves the ephemeral
149/// consumer pinned to that ghost forever — under `AckPolicy::None` the
150/// pending-ack count stays at zero, but the consumer record still
151/// occupies broker resources.
152///
153/// 5 minutes is well clear of the web view's 30-second reconnect
154/// ceiling (ADR-0015 §D4) plus a couple of retries, and short enough
155/// that thousands of dead consumers do not accumulate over a day.
156const EPHEMERAL_INACTIVE_THRESHOLD: Duration = Duration::from_secs(5 * 60);
157
158/// Construct the pull-consumer config that backs the WebSocket bridge
159/// and boot-time replay. Pulled out as a pure function so the policy
160/// decision can be unit-tested without a live NATS broker.
161pub fn ephemeral_consumer_config_for(policy: DeliverPolicy, subject: Option<&str>) -> PullConfig {
162 PullConfig {
163 durable_name: None,
164 name: None,
165 deliver_policy: policy,
166 ack_policy: AckPolicy::None,
167 filter_subject: subject.unwrap_or("").to_string(),
168 inactive_threshold: EPHEMERAL_INACTIVE_THRESHOLD,
169 ..Default::default()
170 }
171}
172
173/// Create an ephemeral pull consumer against the canonical stream.
174///
175/// `start_seq` follows the same convention as
176/// [`deliver_policy_for`]: `None` → `DeliverPolicy::All`, `Some(N)` →
177/// `DeliverPolicy::ByStartSequence { N+1 }`. `subject` optionally
178/// filters to a sub-tree of `cellos.events.>` so a tenant-scoped
179/// WebSocket sees only its tenant's events.
180///
181/// Always ephemeral (`durable_name = None`), `AckPolicy::None`, and
182/// gated by `EPHEMERAL_INACTIVE_THRESHOLD` for broker-side GC — see
183/// module docs and that constant.
184pub async fn create_ephemeral_consumer(
185 stream: &Stream,
186 policy: DeliverPolicy,
187 subject: Option<&str>,
188) -> anyhow::Result<Consumer<PullConfig>> {
189 let config = ephemeral_consumer_config_for(policy, subject);
190 stream
191 .create_consumer(config)
192 .await
193 .context("create ephemeral pull consumer")
194}
195
196/// Drive an ephemeral `DeliverPolicy::All` consumer to completion,
197/// applying each event into `AppState` and advancing the snapshot
198/// cursor. Returns once a batch fetch comes back empty within
199/// [`REPLAY_BATCH_TIMEOUT`], which indicates the stream's tail has
200/// been reached.
201///
202/// This is the ADR-0011 SEAM closure: after this returns the
203/// in-memory registry reflects the current authoritative view of the
204/// event log, so the HTTP listener can open and serve `GET
205/// /v1/formations` with a non-zero `cursor` immediately.
206pub async fn replay_projection(state: &AppState, ctx: &Context) -> anyhow::Result<()> {
207 // Operator escape hatch — tests that drive `cellos-server` in
208 // isolation (no live NATS, no JetStream) set this to skip replay
209 // entirely. Production deployments leave it unset.
210 if std::env::var("CELLOS_SERVER_SKIP_REPLAY")
211 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
212 .unwrap_or(false)
213 {
214 info!("CELLOS_SERVER_SKIP_REPLAY set; skipping projection replay");
215 return Ok(());
216 }
217
218 let stream = ctx
219 .get_stream(STREAM_NAME)
220 .await
221 .context("get_stream CELLOS_EVENTS for replay")?;
222
223 let consumer = create_ephemeral_consumer(&stream, deliver_policy_for(None), None).await?;
224
225 let mut total_seen: u64 = 0;
226 let mut total_applied: u64 = 0;
227 let mut highest_seq: u64 = 0;
228
229 // We use `fetch().messages()` rather than `consumer.messages()`
230 // because `fetch` terminates on a single empty batch — exactly
231 // the "drain until idle" semantics replay needs. A long-lived
232 // `messages()` stream would idle forever waiting for new
233 // publishes, which would deadlock startup on a quiet cluster.
234 loop {
235 let mut batch = consumer
236 .fetch()
237 .max_messages(REPLAY_BATCH)
238 .expires(REPLAY_BATCH_TIMEOUT)
239 .messages()
240 .await
241 .context("replay: pull batch")?;
242
243 let mut batch_size: usize = 0;
244 while let Some(msg) = batch.next().await {
245 // async-nats yields Result<Message, Box<dyn StdError + Send + Sync>>;
246 // Box<dyn StdError> doesn't impl anyhow::Context. Convert via map_err.
247 let msg = msg.map_err(|e| anyhow::anyhow!("replay: read message from batch: {e}"))?;
248 batch_size += 1;
249 total_seen += 1;
250
251 let seq = match msg.info() {
252 Ok(info) => info.stream_sequence,
253 Err(e) => {
254 warn!(error = %e, "replay: message missing JetStream info; skipping");
255 continue;
256 }
257 };
258 highest_seq = highest_seq.max(seq);
259
260 match state.apply_event_payload(&msg.payload).await {
261 Ok(ApplyOutcome::Applied) => total_applied += 1,
262 Ok(ApplyOutcome::Ignored) => {}
263 Err(e) => {
264 warn!(seq, error = %e, "replay: failed to apply event; skipping");
265 }
266 }
267 state.bump_cursor(seq);
268 }
269
270 if batch_size == 0 {
271 // No messages within the batch's expires window → we are
272 // at the tail. Done.
273 break;
274 }
275 debug!(batch_size, total_seen, "replay batch consumed");
276 }
277
278 let formations = state.formations.read().await.len();
279 info!(
280 cursor = highest_seq,
281 formations,
282 events_seen = total_seen,
283 events_applied = total_applied,
284 "replay_complete"
285 );
286 Ok(())
287}
288
289/// Drive a long-lived `DeliverPolicy::New` consumer that pumps every
290/// post-boot CloudEvent into `AppState` via `apply_event_payload`.
291///
292/// This closes the PROJECTION-LIVE-LAG gap (E2E-V2 acceptance report,
293/// 0.5.1 → 0.5.2 wave): [`replay_projection`] rebuilds the projection
294/// once at boot, but until this loop runs nothing fed new events into
295/// the projection while the server was alive — supervisor-emitted
296/// `cell.lifecycle.v1.*` events only appeared in `GET /v1/cells` after
297/// the next server restart re-replayed the stream.
298///
299/// The loop is intentionally minimal:
300///
301/// - `DeliverPolicy::New` (not `All`): replay has already drained the
302/// stream's history; we only want what publishes AFTER the listener
303/// binds. The two paths together cover the whole timeline without
304/// double-counting the boundary message.
305/// - No subject filter: same `cellos.events.>` surface the replay
306/// consumer used. `apply_event_payload` is the single discriminator.
307/// - `AckPolicy::None` + ephemeral: identical to the replay consumer's
308/// semantics, so a process restart re-derives the consumer cheaply
309/// and never accumulates broker-side state.
310/// - `messages()` (not `fetch()`): we want to block on the next publish
311/// indefinitely; this is the long-lived tail, not a drain.
312///
313/// The task is detached — `main.rs` keeps the `JoinHandle` only so a
314/// future graceful-shutdown path can abort it cleanly. On error the
315/// loop logs and exits; the projection will go stale until the server
316/// restarts, but that is strictly no worse than the pre-fix behavior
317/// (which never had a live projector at all).
318pub async fn spawn_live_projector(
319 state: AppState,
320 ctx: Context,
321) -> anyhow::Result<tokio::task::JoinHandle<()>> {
322 let stream = ctx
323 .get_stream(STREAM_NAME)
324 .await
325 .context("get_stream CELLOS_EVENTS for live projector")?;
326
327 // `DeliverPolicy::New` so we don't re-apply the replayed history.
328 // The post-boot stream tail is the projector's responsibility from
329 // here on.
330 let consumer = create_ephemeral_consumer(&stream, DeliverPolicy::New, None).await?;
331 let mut messages = consumer
332 .messages()
333 .await
334 .context("open live projector message stream")?;
335
336 let handle = tokio::spawn(async move {
337 info!("live projector subscribed; tailing cellos.events.>");
338 while let Some(msg) = messages.next().await {
339 let msg = match msg {
340 Ok(m) => m,
341 Err(e) => {
342 warn!(error = %e, "live projector: read error; exiting tail");
343 return;
344 }
345 };
346
347 let seq = match msg.info() {
348 Ok(info) => info.stream_sequence,
349 Err(e) => {
350 warn!(error = %e, "live projector: message missing JetStream info; skipping");
351 continue;
352 }
353 };
354
355 match state.apply_event_payload(&msg.payload).await {
356 Ok(ApplyOutcome::Applied) => {
357 debug!(seq, "live projector: event applied");
358 }
359 Ok(ApplyOutcome::Ignored) => {}
360 Err(e) => {
361 warn!(seq, error = %e, "live projector: failed to apply event; skipping");
362 }
363 }
364 // Mirror replay's contract: advance the cursor whether or
365 // not the projection mutated. WS clients reading the
366 // snapshot cursor see live events without a restart.
367 state.bump_cursor(seq);
368 }
369 debug!("live projector: stream ended");
370 });
371
372 Ok(handle)
373}
374
375/// Open an ephemeral pull-consumer message stream for the WebSocket
376/// bridge. Distinct from `replay_projection`'s consumer because the
377/// WS path needs a long-lived `messages()` stream (waits for new
378/// publishes) rather than a fetch-and-terminate loop.
379pub async fn open_ws_message_stream(
380 ctx: &Context,
381 subject: Option<&str>,
382 since: Option<u64>,
383) -> anyhow::Result<PullStream> {
384 let stream = ctx
385 .get_stream(STREAM_NAME)
386 .await
387 .context("get_stream CELLOS_EVENTS for ws")?;
388
389 let policy = match since {
390 Some(seq) => deliver_policy_for(Some(seq)),
391 None => deliver_policy_live_tail(),
392 };
393
394 let consumer = create_ephemeral_consumer(&stream, policy, subject).await?;
395 consumer
396 .messages()
397 .await
398 .context("open consumer messages stream")
399}
400
401/// Inspect the live stream's retained-sequence floor so the WS bridge
402/// can report it back to a client whose `since` was below the
403/// retention window. Returns `None` if the stream cannot be reached
404/// (in which case the caller should fall through to the generic
405/// close path).
406pub async fn stream_first_seq(ctx: &Context) -> Option<u64> {
407 match ctx.get_stream(STREAM_NAME).await {
408 Ok(mut stream) => match stream.info().await {
409 Ok(info) => Some(info.state.first_sequence),
410 Err(e) => {
411 debug!(error = %e, "stream_first_seq: info() failed");
412 None
413 }
414 },
415 Err(e) => {
416 debug!(error = %e, "stream_first_seq: get_stream failed");
417 None
418 }
419 }
420}
421
422/// Heuristic: did `create_ephemeral_consumer` fail because the
423/// caller's `start_seq` is older than the stream's retained floor?
424///
425/// JetStream surfaces this as a server-side `ErrConsumerCreate` with
426/// a description like "optional start sequence ... is too low".
427/// Rather than depend on a specific error-code shape we string-match
428/// the conventional message; if NATS changes the wording the worst
429/// case is we fall through to the generic close path, which is the
430/// same behavior as before this commit.
431pub fn looks_like_retention_exhausted(err: &anyhow::Error) -> bool {
432 let chain = format!("{err:#}").to_ascii_lowercase();
433 chain.contains("optional start sequence")
434 || chain.contains("too low")
435 || chain.contains("no such sequence")
436 || chain.contains("not found in stream")
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442
443 /// ADR-0015 §D3 — boot replay (no cursor) must deliver every
444 /// retained message. Encoding this as a test pins the contract:
445 /// changing this enum variant in either direction breaks resume
446 /// or breaks boot.
447 #[test]
448 fn deliver_policy_none_is_all() {
449 assert!(matches!(deliver_policy_for(None), DeliverPolicy::All));
450 }
451
452 /// ADR-0015 §D3 — `?since=N` resumes at seq N+1, not N. The
453 /// client's cursor is the *last applied* sequence, so re-emitting
454 /// N would be a duplicate delivery.
455 #[test]
456 fn deliver_policy_some_starts_at_seq_plus_one() {
457 match deliver_policy_for(Some(42)) {
458 DeliverPolicy::ByStartSequence { start_sequence } => {
459 assert_eq!(start_sequence, 43);
460 }
461 other => panic!("expected ByStartSequence(43), got {other:?}"),
462 }
463 }
464
465 /// `?since=0` is a legitimate fresh client — they want everything
466 /// from seq 1 onward. The +1 still applies (`0 + 1 = 1`), which
467 /// is exactly the first retained sequence in a fresh stream.
468 #[test]
469 fn deliver_policy_since_zero_starts_at_one() {
470 match deliver_policy_for(Some(0)) {
471 DeliverPolicy::ByStartSequence { start_sequence } => {
472 assert_eq!(start_sequence, 1);
473 }
474 other => panic!("expected ByStartSequence(1), got {other:?}"),
475 }
476 }
477
478 /// `u64::MAX` saturates rather than overflows. A misbehaving
479 /// client that hands us a sentinel value cannot panic the server.
480 #[test]
481 fn deliver_policy_since_saturates_at_u64_max() {
482 match deliver_policy_for(Some(u64::MAX)) {
483 DeliverPolicy::ByStartSequence { start_sequence } => {
484 assert_eq!(start_sequence, u64::MAX);
485 }
486 other => panic!("expected ByStartSequence(u64::MAX), got {other:?}"),
487 }
488 }
489
490 /// Live-tail must NOT replay history; an ad-hoc `/ws/events`
491 /// subscriber sees only what publishes after their open.
492 #[test]
493 fn live_tail_is_deliver_new() {
494 assert!(matches!(deliver_policy_live_tail(), DeliverPolicy::New));
495 }
496
497 /// Retention-exhaustion detection is best-effort string matching.
498 /// These are the message shapes we currently know about — if the
499 /// regression here is a new NATS error wording, that's exactly
500 /// when this test should fail.
501 #[test]
502 fn retention_exhausted_matches_known_messages() {
503 let cases = [
504 "consumer create failed: optional start sequence 5 is too low",
505 "no such sequence in stream",
506 "sequence 7 not found in stream CELLOS_EVENTS",
507 "ErrConsumerCreate: optional start sequence too low",
508 ];
509 for case in cases {
510 let err = anyhow::anyhow!(case.to_string());
511 assert!(
512 looks_like_retention_exhausted(&err),
513 "expected match for {case:?}",
514 );
515 }
516 }
517
518 /// Red-team finding: ephemeral consumer config MUST set
519 /// `inactive_threshold` so the broker reclaims consumers whose WS
520 /// clients died without a clean close. Without this, half-open TCP
521 /// connections (laptop suspend, NAT eviction) accumulate consumer
522 /// records on the broker indefinitely.
523 #[test]
524 fn ephemeral_consumer_config_sets_inactive_threshold() {
525 let cfg = ephemeral_consumer_config_for(DeliverPolicy::New, None);
526 assert!(
527 cfg.inactive_threshold > Duration::ZERO,
528 "inactive_threshold must be non-zero so broker GCs dead consumers; got {:?}",
529 cfg.inactive_threshold,
530 );
531 assert!(
532 cfg.inactive_threshold >= Duration::from_secs(60),
533 "inactive_threshold must ride out transient reconnect; got {:?}",
534 cfg.inactive_threshold,
535 );
536 assert!(cfg.durable_name.is_none(), "consumer must be ephemeral");
537 assert!(
538 matches!(cfg.ack_policy, AckPolicy::None),
539 "consumer must remain AckPolicy::None for read-only projection feed"
540 );
541 }
542
543 /// Unrelated errors must NOT be classified as retention loss —
544 /// otherwise we'd send 4410 close codes for transient broker
545 /// hiccups and stampede clients into snapshot refetches.
546 #[test]
547 fn retention_exhausted_ignores_unrelated_errors() {
548 let cases = [
549 "broker connection refused",
550 "timed out",
551 "stream not found", // ambiguous; we match on "not found in stream" only
552 ];
553 for case in cases {
554 let err = anyhow::anyhow!(case.to_string());
555 assert!(
556 !looks_like_retention_exhausted(&err),
557 "unexpected match for {case:?}",
558 );
559 }
560 }
561}