cellos_server/jetstream.rs
1//! JetStream wiring for `cellos-server`.
2//!
3//! Closes the two open SEAMs from ADR-0011 (projection replay on boot)
4//! and ADR-0015 (`?since=<seq>` historical replay via JetStream).
5//!
6//! ## Why the indirection
7//!
8//! `cellos-server` is a *projection* over the `cellos.events.>` event
9//! log (CHATROOM Session 16). The in-memory `AppState` registry MUST be
10//! rebuildable from JetStream alone — that is the property that makes
11//! the server safely restartable. This module owns:
12//!
13//! 1. **Stream provisioning.** [`ensure_stream`] gets-or-creates the
14//! canonical `CELLOS_EVENTS` stream with the FC-74 retention floor
15//! (90 days, file-backed). Idempotent; a pre-existing stream is left
16//! untouched (we do not reconcile config drift here — that is a
17//! cluster-admin concern, not an HTTP-control-plane one).
18//!
19//! 2. **Ephemeral consumer construction.** [`create_ephemeral_consumer`]
20//! returns a pull consumer whose `DeliverPolicy` is selected from a
21//! caller-supplied `start_seq`: `None` → `DeliverPolicy::All`
22//! (replay everything from the earliest retained message), `Some(N)`
23//! → `DeliverPolicy::ByStartSequence { N+1 }` (resume from the
24//! message after the client's cursor, exactly matching the ADR-0015
25//! §D3 contract).
26//!
27//! 3. **Boot-time replay.** [`replay_projection`] drains every message
28//! currently in the stream into `AppState`, advancing the snapshot
29//! cursor as it goes, then returns. Called before the HTTP listener
30//! binds so the first `GET /v1/formations` after a restart returns
31//! the fully-rebuilt view with a non-zero cursor.
32//!
33//! ## What we deliberately do *not* do
34//!
35//! - We never create *durable* consumers. The server's view is
36//! ephemeral by design: every restart replays the stream, so there
37//! is no consumer state to leak into JetStream's own consumer table.
38//! - We do not acknowledge messages. `AckPolicy::None` keeps the
39//! consumer cheap and side-effect-free; this is a *read-only*
40//! projection feed, not a work queue.
41//! - We do not reconcile stream-config drift. If `CELLOS_EVENTS`
42//! already exists with a different `max_age`, we use it as-is. The
43//! alternative — silently mutating an operator's stream config —
44//! would violate doctrine on least surprise.
45
46use std::time::Duration;
47
48use anyhow::Context as _;
49use async_nats::jetstream::{
50 self,
51 consumer::{
52 pull::{Config as PullConfig, Stream as PullStream},
53 AckPolicy, Consumer, DeliverPolicy,
54 },
55 context::Context,
56 stream::{Config as StreamConfig, RetentionPolicy, StorageType, Stream},
57};
58use futures_util::StreamExt;
59use tracing::{debug, info, warn};
60
61use crate::state::{AppState, ApplyOutcome};
62
63/// Canonical stream name for every CloudEvent the platform emits.
64/// Mirrors `cellos-supervisor`'s producer-side constant.
65pub const STREAM_NAME: &str = "CELLOS_EVENTS";
66
67/// Subject filter for every CloudEvent. The MVP supervisor publishes
68/// to `cellos.events.<entity>.<id>.<phase>`; the wildcard captures all
69/// of them.
70pub const STREAM_SUBJECT: &str = "cellos.events.>";
71
72/// FC-74 retention floor — operators have committed to keeping the
73/// last 90 days of events available for forensic replay. Anything
74/// older falls off the tail.
75const RETENTION_MAX_AGE: Duration = Duration::from_secs(90 * 24 * 60 * 60);
76
77/// Boot-replay drains messages in batches of this size. Picked to be
78/// large enough that a fresh cluster with thousands of events doesn't
79/// pay one round-trip per event, but small enough that a single batch
80/// doesn't dominate startup latency on a small cluster.
81const REPLAY_BATCH: usize = 256;
82
83/// Timeout for a single batch fetch during replay. Short because we
84/// only need to detect "no more messages right now" — the next batch
85/// either returns immediately with data or this timeout fires and we
86/// exit the drain loop.
87const REPLAY_BATCH_TIMEOUT: Duration = Duration::from_millis(250);
88
89/// Construct a JetStream context and ensure the canonical
90/// `CELLOS_EVENTS` stream exists with the FC-74 retention floor.
91///
92/// Idempotent: if the stream already exists with any config we leave
93/// it alone (see module docs for the rationale).
94pub async fn ensure_stream(client: &async_nats::Client) -> anyhow::Result<Context> {
95 let ctx = jetstream::new(client.clone());
96 let config = StreamConfig {
97 name: STREAM_NAME.to_string(),
98 subjects: vec![STREAM_SUBJECT.to_string()],
99 retention: RetentionPolicy::Limits,
100 storage: StorageType::File,
101 max_age: RETENTION_MAX_AGE,
102 ..Default::default()
103 };
104
105 // `get_or_create_stream` is server-idempotent: if a stream with
106 // this name already exists, the server returns it as-is and we
107 // never overwrite operator-tuned settings.
108 ctx.get_or_create_stream(config)
109 .await
110 .context("get_or_create CELLOS_EVENTS stream")?;
111
112 Ok(ctx)
113}
114
115/// Decide which `DeliverPolicy` to apply for a given resume cursor.
116///
117/// Pulled out as a pure function so the policy decision can be
118/// unit-tested without spinning up a NATS broker. The mapping is the
119/// ADR-0015 §D3 contract: `None` means "deliver everything currently
120/// retained" (used by boot replay), `Some(N)` means "deliver from N+1
121/// onward" (the client has already seen up through N).
122pub fn deliver_policy_for(start_seq: Option<u64>) -> DeliverPolicy {
123 match start_seq {
124 // Boot replay path: rebuild state from the oldest retained
125 // message. With FC-74 retention this is the last 90 days.
126 None => DeliverPolicy::All,
127 // Resume path: the client's cursor IS the last seq it applied,
128 // so we start delivery at seq+1 to avoid re-emitting frames it
129 // already processed.
130 Some(seq) => DeliverPolicy::ByStartSequence {
131 start_sequence: seq.saturating_add(1),
132 },
133 }
134}
135
136/// Live-tail policy for `/ws/events` opens without a `?since=` param.
137///
138/// Distinct from the resume path because "no cursor at all" should
139/// NOT replay history; ad-hoc subscribers (cellctl `--follow`, debug
140/// tools) want to see only what flows in after their connect.
141pub fn deliver_policy_live_tail() -> DeliverPolicy {
142 DeliverPolicy::New
143}
144
145/// How long a WS-bridge ephemeral consumer may sit idle on the broker
146/// after its client disappears before JetStream itself garbage-collects
147/// it. Without this, a WebSocket whose TCP connection died ungracefully
148/// (laptop suspend, NAT eviction, OOM kill) leaves the ephemeral
149/// consumer pinned to that ghost forever — under `AckPolicy::None` the
150/// pending-ack count stays at zero, but the consumer record still
151/// occupies broker resources.
152///
153/// 5 minutes is well clear of the web view's 30-second reconnect
154/// ceiling (ADR-0015 §D4) plus a couple of retries, and short enough
155/// that thousands of dead consumers do not accumulate over a day.
156const EPHEMERAL_INACTIVE_THRESHOLD: Duration = Duration::from_secs(5 * 60);
157
158/// Construct the pull-consumer config that backs the WebSocket bridge
159/// and boot-time replay. Pulled out as a pure function so the policy
160/// decision can be unit-tested without a live NATS broker.
161pub fn ephemeral_consumer_config_for(policy: DeliverPolicy, subject: Option<&str>) -> PullConfig {
162 PullConfig {
163 durable_name: None,
164 name: None,
165 deliver_policy: policy,
166 ack_policy: AckPolicy::None,
167 filter_subject: subject.unwrap_or("").to_string(),
168 inactive_threshold: EPHEMERAL_INACTIVE_THRESHOLD,
169 ..Default::default()
170 }
171}
172
173/// Create an ephemeral pull consumer against the canonical stream.
174///
175/// `start_seq` follows the same convention as
176/// [`deliver_policy_for`]: `None` → `DeliverPolicy::All`, `Some(N)` →
177/// `DeliverPolicy::ByStartSequence { N+1 }`. `subject` optionally
178/// filters to a sub-tree of `cellos.events.>` so a tenant-scoped
179/// WebSocket sees only its tenant's events.
180///
181/// Always ephemeral (`durable_name = None`), `AckPolicy::None`, and
182/// gated by `EPHEMERAL_INACTIVE_THRESHOLD` for broker-side GC — see
183/// module docs and that constant.
184pub async fn create_ephemeral_consumer(
185 stream: &Stream,
186 policy: DeliverPolicy,
187 subject: Option<&str>,
188) -> anyhow::Result<Consumer<PullConfig>> {
189 let config = ephemeral_consumer_config_for(policy, subject);
190 stream
191 .create_consumer(config)
192 .await
193 .context("create ephemeral pull consumer")
194}
195
196/// Drive an ephemeral `DeliverPolicy::All` consumer to completion,
197/// applying each event into `AppState` and advancing the snapshot
198/// cursor. Returns once a batch fetch comes back empty within
199/// [`REPLAY_BATCH_TIMEOUT`], which indicates the stream's tail has
200/// been reached.
201///
202/// This is the ADR-0011 SEAM closure: after this returns the
203/// in-memory registry reflects the current authoritative view of the
204/// event log, so the HTTP listener can open and serve `GET
205/// /v1/formations` with a non-zero `cursor` immediately.
206pub async fn replay_projection(state: &AppState, ctx: &Context) -> anyhow::Result<()> {
207 // Operator escape hatch — tests that drive `cellos-server` in
208 // isolation (no live NATS, no JetStream) set this to skip replay
209 // entirely. Production deployments leave it unset.
210 if std::env::var("CELLOS_SERVER_SKIP_REPLAY")
211 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
212 .unwrap_or(false)
213 {
214 info!("CELLOS_SERVER_SKIP_REPLAY set; skipping projection replay");
215 return Ok(());
216 }
217
218 let stream = ctx
219 .get_stream(STREAM_NAME)
220 .await
221 .context("get_stream CELLOS_EVENTS for replay")?;
222
223 let consumer = create_ephemeral_consumer(&stream, deliver_policy_for(None), None).await?;
224
225 let mut total_seen: u64 = 0;
226 let mut total_applied: u64 = 0;
227 let mut highest_seq: u64 = 0;
228
229 // We use `fetch().messages()` rather than `consumer.messages()`
230 // because `fetch` terminates on a single empty batch — exactly
231 // the "drain until idle" semantics replay needs. A long-lived
232 // `messages()` stream would idle forever waiting for new
233 // publishes, which would deadlock startup on a quiet cluster.
234 loop {
235 let mut batch = consumer
236 .fetch()
237 .max_messages(REPLAY_BATCH)
238 .expires(REPLAY_BATCH_TIMEOUT)
239 .messages()
240 .await
241 .context("replay: pull batch")?;
242
243 let mut batch_size: usize = 0;
244 while let Some(msg) = batch.next().await {
245 // async-nats yields Result<Message, Box<dyn StdError + Send + Sync>>;
246 // Box<dyn StdError> doesn't impl anyhow::Context. Convert via map_err.
247 let msg = msg.map_err(|e| anyhow::anyhow!("replay: read message from batch: {e}"))?;
248 batch_size += 1;
249 total_seen += 1;
250
251 let seq = match msg.info() {
252 Ok(info) => info.stream_sequence,
253 Err(e) => {
254 warn!(error = %e, "replay: message missing JetStream info; skipping");
255 continue;
256 }
257 };
258 highest_seq = highest_seq.max(seq);
259
260 match state.apply_event_payload(&msg.payload).await {
261 Ok(ApplyOutcome::Applied) => total_applied += 1,
262 Ok(ApplyOutcome::Ignored) => {}
263 Err(e) => {
264 warn!(seq, error = %e, "replay: failed to apply event; skipping");
265 }
266 }
267 state.bump_cursor(seq);
268 }
269
270 if batch_size == 0 {
271 // No messages within the batch's expires window → we are
272 // at the tail. Done.
273 break;
274 }
275 debug!(batch_size, total_seen, "replay batch consumed");
276 }
277
278 let formations = state.formations.read().await.len();
279 info!(
280 cursor = highest_seq,
281 formations,
282 events_seen = total_seen,
283 events_applied = total_applied,
284 "replay_complete"
285 );
286 Ok(())
287}
288
289/// Open an ephemeral pull-consumer message stream for the WebSocket
290/// bridge. Distinct from `replay_projection`'s consumer because the
291/// WS path needs a long-lived `messages()` stream (waits for new
292/// publishes) rather than a fetch-and-terminate loop.
293pub async fn open_ws_message_stream(
294 ctx: &Context,
295 subject: Option<&str>,
296 since: Option<u64>,
297) -> anyhow::Result<PullStream> {
298 let stream = ctx
299 .get_stream(STREAM_NAME)
300 .await
301 .context("get_stream CELLOS_EVENTS for ws")?;
302
303 let policy = match since {
304 Some(seq) => deliver_policy_for(Some(seq)),
305 None => deliver_policy_live_tail(),
306 };
307
308 let consumer = create_ephemeral_consumer(&stream, policy, subject).await?;
309 consumer
310 .messages()
311 .await
312 .context("open consumer messages stream")
313}
314
315/// Inspect the live stream's retained-sequence floor so the WS bridge
316/// can report it back to a client whose `since` was below the
317/// retention window. Returns `None` if the stream cannot be reached
318/// (in which case the caller should fall through to the generic
319/// close path).
320pub async fn stream_first_seq(ctx: &Context) -> Option<u64> {
321 match ctx.get_stream(STREAM_NAME).await {
322 Ok(mut stream) => match stream.info().await {
323 Ok(info) => Some(info.state.first_sequence),
324 Err(e) => {
325 debug!(error = %e, "stream_first_seq: info() failed");
326 None
327 }
328 },
329 Err(e) => {
330 debug!(error = %e, "stream_first_seq: get_stream failed");
331 None
332 }
333 }
334}
335
336/// Heuristic: did `create_ephemeral_consumer` fail because the
337/// caller's `start_seq` is older than the stream's retained floor?
338///
339/// JetStream surfaces this as a server-side `ErrConsumerCreate` with
340/// a description like "optional start sequence ... is too low".
341/// Rather than depend on a specific error-code shape we string-match
342/// the conventional message; if NATS changes the wording the worst
343/// case is we fall through to the generic close path, which is the
344/// same behavior as before this commit.
345pub fn looks_like_retention_exhausted(err: &anyhow::Error) -> bool {
346 let chain = format!("{err:#}").to_ascii_lowercase();
347 chain.contains("optional start sequence")
348 || chain.contains("too low")
349 || chain.contains("no such sequence")
350 || chain.contains("not found in stream")
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356
357 /// ADR-0015 §D3 — boot replay (no cursor) must deliver every
358 /// retained message. Encoding this as a test pins the contract:
359 /// changing this enum variant in either direction breaks resume
360 /// or breaks boot.
361 #[test]
362 fn deliver_policy_none_is_all() {
363 assert!(matches!(deliver_policy_for(None), DeliverPolicy::All));
364 }
365
366 /// ADR-0015 §D3 — `?since=N` resumes at seq N+1, not N. The
367 /// client's cursor is the *last applied* sequence, so re-emitting
368 /// N would be a duplicate delivery.
369 #[test]
370 fn deliver_policy_some_starts_at_seq_plus_one() {
371 match deliver_policy_for(Some(42)) {
372 DeliverPolicy::ByStartSequence { start_sequence } => {
373 assert_eq!(start_sequence, 43);
374 }
375 other => panic!("expected ByStartSequence(43), got {other:?}"),
376 }
377 }
378
379 /// `?since=0` is a legitimate fresh client — they want everything
380 /// from seq 1 onward. The +1 still applies (`0 + 1 = 1`), which
381 /// is exactly the first retained sequence in a fresh stream.
382 #[test]
383 fn deliver_policy_since_zero_starts_at_one() {
384 match deliver_policy_for(Some(0)) {
385 DeliverPolicy::ByStartSequence { start_sequence } => {
386 assert_eq!(start_sequence, 1);
387 }
388 other => panic!("expected ByStartSequence(1), got {other:?}"),
389 }
390 }
391
392 /// `u64::MAX` saturates rather than overflows. A misbehaving
393 /// client that hands us a sentinel value cannot panic the server.
394 #[test]
395 fn deliver_policy_since_saturates_at_u64_max() {
396 match deliver_policy_for(Some(u64::MAX)) {
397 DeliverPolicy::ByStartSequence { start_sequence } => {
398 assert_eq!(start_sequence, u64::MAX);
399 }
400 other => panic!("expected ByStartSequence(u64::MAX), got {other:?}"),
401 }
402 }
403
404 /// Live-tail must NOT replay history; an ad-hoc `/ws/events`
405 /// subscriber sees only what publishes after their open.
406 #[test]
407 fn live_tail_is_deliver_new() {
408 assert!(matches!(deliver_policy_live_tail(), DeliverPolicy::New));
409 }
410
411 /// Retention-exhaustion detection is best-effort string matching.
412 /// These are the message shapes we currently know about — if the
413 /// regression here is a new NATS error wording, that's exactly
414 /// when this test should fail.
415 #[test]
416 fn retention_exhausted_matches_known_messages() {
417 let cases = [
418 "consumer create failed: optional start sequence 5 is too low",
419 "no such sequence in stream",
420 "sequence 7 not found in stream CELLOS_EVENTS",
421 "ErrConsumerCreate: optional start sequence too low",
422 ];
423 for case in cases {
424 let err = anyhow::anyhow!(case.to_string());
425 assert!(
426 looks_like_retention_exhausted(&err),
427 "expected match for {case:?}",
428 );
429 }
430 }
431
432 /// Red-team finding: ephemeral consumer config MUST set
433 /// `inactive_threshold` so the broker reclaims consumers whose WS
434 /// clients died without a clean close. Without this, half-open TCP
435 /// connections (laptop suspend, NAT eviction) accumulate consumer
436 /// records on the broker indefinitely.
437 #[test]
438 fn ephemeral_consumer_config_sets_inactive_threshold() {
439 let cfg = ephemeral_consumer_config_for(DeliverPolicy::New, None);
440 assert!(
441 cfg.inactive_threshold > Duration::ZERO,
442 "inactive_threshold must be non-zero so broker GCs dead consumers; got {:?}",
443 cfg.inactive_threshold,
444 );
445 assert!(
446 cfg.inactive_threshold >= Duration::from_secs(60),
447 "inactive_threshold must ride out transient reconnect; got {:?}",
448 cfg.inactive_threshold,
449 );
450 assert!(cfg.durable_name.is_none(), "consumer must be ephemeral");
451 assert!(
452 matches!(cfg.ack_policy, AckPolicy::None),
453 "consumer must remain AckPolicy::None for read-only projection feed"
454 );
455 }
456
457 /// Unrelated errors must NOT be classified as retention loss —
458 /// otherwise we'd send 4410 close codes for transient broker
459 /// hiccups and stampede clients into snapshot refetches.
460 #[test]
461 fn retention_exhausted_ignores_unrelated_errors() {
462 let cases = [
463 "broker connection refused",
464 "timed out",
465 "stream not found", // ambiguous; we match on "not found in stream" only
466 ];
467 for case in cases {
468 let err = anyhow::anyhow!(case.to_string());
469 assert!(
470 !looks_like_retention_exhausted(&err),
471 "unexpected match for {case:?}",
472 );
473 }
474 }
475}