Skip to main content

cellos_server/
jetstream.rs

1//! JetStream wiring for `cellos-server`.
2//!
3//! Closes the two open SEAMs from ADR-0011 (projection replay on boot)
4//! and ADR-0015 (`?since=<seq>` historical replay via JetStream).
5//!
6//! ## Why the indirection
7//!
8//! `cellos-server` is a *projection* over the `cellos.events.>` event
9//! log (CHATROOM Session 16). The in-memory `AppState` registry MUST be
10//! rebuildable from JetStream alone — that is the property that makes
11//! the server safely restartable. This module owns:
12//!
13//! 1. **Stream provisioning.** [`ensure_stream`] gets-or-creates the
14//!    canonical `CELLOS_EVENTS` stream with the FC-74 retention floor
15//!    (90 days, file-backed). Idempotent; a pre-existing stream is left
16//!    untouched (we do not reconcile config drift here — that is a
17//!    cluster-admin concern, not an HTTP-control-plane one).
18//!
19//! 2. **Ephemeral consumer construction.** [`create_ephemeral_consumer`]
20//!    returns a pull consumer whose `DeliverPolicy` is selected from a
21//!    caller-supplied `start_seq`: `None` → `DeliverPolicy::All`
22//!    (replay everything from the earliest retained message), `Some(N)`
23//!    → `DeliverPolicy::ByStartSequence { N+1 }` (resume from the
24//!    message after the client's cursor, exactly matching the ADR-0015
25//!    §D3 contract).
26//!
27//! 3. **Boot-time replay.** [`replay_projection`] drains every message
28//!    currently in the stream into `AppState`, advancing the snapshot
29//!    cursor as it goes, then returns. Called before the HTTP listener
30//!    binds so the first `GET /v1/formations` after a restart returns
31//!    the fully-rebuilt view with a non-zero cursor.
32//!
33//! ## What we deliberately do *not* do
34//!
35//! - We never create *durable* consumers. The server's view is
36//!   ephemeral by design: every restart replays the stream, so there
37//!   is no consumer state to leak into JetStream's own consumer table.
38//! - We do not acknowledge messages. `AckPolicy::None` keeps the
39//!   consumer cheap and side-effect-free; this is a *read-only*
40//!   projection feed, not a work queue.
41//! - We do not reconcile stream-config drift. If `CELLOS_EVENTS`
42//!   already exists with a different `max_age`, we use it as-is. The
43//!   alternative — silently mutating an operator's stream config —
44//!   would violate doctrine on least surprise.
45
46use std::time::Duration;
47
48use anyhow::Context as _;
49use async_nats::jetstream::{
50    self,
51    consumer::{
52        pull::{Config as PullConfig, Stream as PullStream},
53        AckPolicy, Consumer, DeliverPolicy,
54    },
55    context::Context,
56    stream::{Config as StreamConfig, RetentionPolicy, StorageType, Stream},
57};
58use futures_util::StreamExt;
59use tracing::{debug, info, warn};
60
61use crate::state::{AppState, ApplyOutcome};
62
63/// Canonical stream name for every CloudEvent the platform emits.
64/// Mirrors `cellos-supervisor`'s producer-side constant.
65pub const STREAM_NAME: &str = "CELLOS_EVENTS";
66
67/// Subject filter for every CloudEvent. The MVP supervisor publishes
68/// to `cellos.events.<entity>.<id>.<phase>`; the wildcard captures all
69/// of them.
70pub const STREAM_SUBJECT: &str = "cellos.events.>";
71
72/// FC-74 retention floor — operators have committed to keeping the
73/// last 90 days of events available for forensic replay. Anything
74/// older falls off the tail.
75const RETENTION_MAX_AGE: Duration = Duration::from_secs(90 * 24 * 60 * 60);
76
77/// Boot-replay drains messages in batches of this size. Picked to be
78/// large enough that a fresh cluster with thousands of events doesn't
79/// pay one round-trip per event, but small enough that a single batch
80/// doesn't dominate startup latency on a small cluster.
81const REPLAY_BATCH: usize = 256;
82
83/// Timeout for a single batch fetch during replay. Short because we
84/// only need to detect "no more messages right now" — the next batch
85/// either returns immediately with data or this timeout fires and we
86/// exit the drain loop.
87const REPLAY_BATCH_TIMEOUT: Duration = Duration::from_millis(250);
88
89/// Construct a JetStream context and ensure the canonical
90/// `CELLOS_EVENTS` stream exists with the FC-74 retention floor.
91///
92/// Idempotent: if the stream already exists with any config we leave
93/// it alone (see module docs for the rationale).
94pub async fn ensure_stream(client: &async_nats::Client) -> anyhow::Result<Context> {
95    let ctx = jetstream::new(client.clone());
96    let config = StreamConfig {
97        name: STREAM_NAME.to_string(),
98        subjects: vec![STREAM_SUBJECT.to_string()],
99        retention: RetentionPolicy::Limits,
100        storage: StorageType::File,
101        max_age: RETENTION_MAX_AGE,
102        ..Default::default()
103    };
104
105    // `get_or_create_stream` is server-idempotent: if a stream with
106    // this name already exists, the server returns it as-is and we
107    // never overwrite operator-tuned settings.
108    ctx.get_or_create_stream(config)
109        .await
110        .context("get_or_create CELLOS_EVENTS stream")?;
111
112    Ok(ctx)
113}
114
115/// Decide which `DeliverPolicy` to apply for a given resume cursor.
116///
117/// Pulled out as a pure function so the policy decision can be
118/// unit-tested without spinning up a NATS broker. The mapping is the
119/// ADR-0015 §D3 contract: `None` means "deliver everything currently
120/// retained" (used by boot replay), `Some(N)` means "deliver from N+1
121/// onward" (the client has already seen up through N).
122pub fn deliver_policy_for(start_seq: Option<u64>) -> DeliverPolicy {
123    match start_seq {
124        // Boot replay path: rebuild state from the oldest retained
125        // message. With FC-74 retention this is the last 90 days.
126        None => DeliverPolicy::All,
127        // Resume path: the client's cursor IS the last seq it applied,
128        // so we start delivery at seq+1 to avoid re-emitting frames it
129        // already processed.
130        Some(seq) => DeliverPolicy::ByStartSequence {
131            start_sequence: seq.saturating_add(1),
132        },
133    }
134}
135
136/// Live-tail policy for `/ws/events` opens without a `?since=` param.
137///
138/// Distinct from the resume path because "no cursor at all" should
139/// NOT replay history; ad-hoc subscribers (cellctl `--follow`, debug
140/// tools) want to see only what flows in after their connect.
141pub fn deliver_policy_live_tail() -> DeliverPolicy {
142    DeliverPolicy::New
143}
144
145/// How long a WS-bridge ephemeral consumer may sit idle on the broker
146/// after its client disappears before JetStream itself garbage-collects
147/// it. Without this, a WebSocket whose TCP connection died ungracefully
148/// (laptop suspend, NAT eviction, OOM kill) leaves the ephemeral
149/// consumer pinned to that ghost forever — under `AckPolicy::None` the
150/// pending-ack count stays at zero, but the consumer record still
151/// occupies broker resources.
152///
153/// 5 minutes is well clear of the web view's 30-second reconnect
154/// ceiling (ADR-0015 §D4) plus a couple of retries, and short enough
155/// that thousands of dead consumers do not accumulate over a day.
156const EPHEMERAL_INACTIVE_THRESHOLD: Duration = Duration::from_secs(5 * 60);
157
158/// Construct the pull-consumer config that backs the WebSocket bridge
159/// and boot-time replay. Pulled out as a pure function so the policy
160/// decision can be unit-tested without a live NATS broker.
161pub fn ephemeral_consumer_config_for(policy: DeliverPolicy, subject: Option<&str>) -> PullConfig {
162    PullConfig {
163        durable_name: None,
164        name: None,
165        deliver_policy: policy,
166        ack_policy: AckPolicy::None,
167        filter_subject: subject.unwrap_or("").to_string(),
168        inactive_threshold: EPHEMERAL_INACTIVE_THRESHOLD,
169        ..Default::default()
170    }
171}
172
173/// Create an ephemeral pull consumer against the canonical stream.
174///
175/// `start_seq` follows the same convention as
176/// [`deliver_policy_for`]: `None` → `DeliverPolicy::All`, `Some(N)` →
177/// `DeliverPolicy::ByStartSequence { N+1 }`. `subject` optionally
178/// filters to a sub-tree of `cellos.events.>` so a tenant-scoped
179/// WebSocket sees only its tenant's events.
180///
181/// Always ephemeral (`durable_name = None`), `AckPolicy::None`, and
182/// gated by `EPHEMERAL_INACTIVE_THRESHOLD` for broker-side GC — see
183/// module docs and that constant.
184pub async fn create_ephemeral_consumer(
185    stream: &Stream,
186    policy: DeliverPolicy,
187    subject: Option<&str>,
188) -> anyhow::Result<Consumer<PullConfig>> {
189    let config = ephemeral_consumer_config_for(policy, subject);
190    stream
191        .create_consumer(config)
192        .await
193        .context("create ephemeral pull consumer")
194}
195
196/// Drive an ephemeral `DeliverPolicy::All` consumer to completion,
197/// applying each event into `AppState` and advancing the snapshot
198/// cursor. Returns once a batch fetch comes back empty within
199/// [`REPLAY_BATCH_TIMEOUT`], which indicates the stream's tail has
200/// been reached.
201///
202/// This is the ADR-0011 SEAM closure: after this returns the
203/// in-memory registry reflects the current authoritative view of the
204/// event log, so the HTTP listener can open and serve `GET
205/// /v1/formations` with a non-zero `cursor` immediately.
206pub async fn replay_projection(state: &AppState, ctx: &Context) -> anyhow::Result<()> {
207    // Operator escape hatch — tests that drive `cellos-server` in
208    // isolation (no live NATS, no JetStream) set this to skip replay
209    // entirely. Production deployments leave it unset.
210    if std::env::var("CELLOS_SERVER_SKIP_REPLAY")
211        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
212        .unwrap_or(false)
213    {
214        info!("CELLOS_SERVER_SKIP_REPLAY set; skipping projection replay");
215        return Ok(());
216    }
217
218    let stream = ctx
219        .get_stream(STREAM_NAME)
220        .await
221        .context("get_stream CELLOS_EVENTS for replay")?;
222
223    let consumer = create_ephemeral_consumer(&stream, deliver_policy_for(None), None).await?;
224
225    let mut total_seen: u64 = 0;
226    let mut total_applied: u64 = 0;
227    let mut highest_seq: u64 = 0;
228
229    // We use `fetch().messages()` rather than `consumer.messages()`
230    // because `fetch` terminates on a single empty batch — exactly
231    // the "drain until idle" semantics replay needs. A long-lived
232    // `messages()` stream would idle forever waiting for new
233    // publishes, which would deadlock startup on a quiet cluster.
234    loop {
235        let mut batch = consumer
236            .fetch()
237            .max_messages(REPLAY_BATCH)
238            .expires(REPLAY_BATCH_TIMEOUT)
239            .messages()
240            .await
241            .context("replay: pull batch")?;
242
243        let mut batch_size: usize = 0;
244        while let Some(msg) = batch.next().await {
245            // async-nats yields Result<Message, Box<dyn StdError + Send + Sync>>;
246            // Box<dyn StdError> doesn't impl anyhow::Context. Convert via map_err.
247            let msg = msg.map_err(|e| anyhow::anyhow!("replay: read message from batch: {e}"))?;
248            batch_size += 1;
249            total_seen += 1;
250
251            let seq = match msg.info() {
252                Ok(info) => info.stream_sequence,
253                Err(e) => {
254                    warn!(error = %e, "replay: message missing JetStream info; skipping");
255                    continue;
256                }
257            };
258            highest_seq = highest_seq.max(seq);
259
260            match state.apply_event_payload(&msg.payload).await {
261                Ok(ApplyOutcome::Applied) => total_applied += 1,
262                Ok(ApplyOutcome::Ignored) => {}
263                Err(e) => {
264                    warn!(seq, error = %e, "replay: failed to apply event; skipping");
265                }
266            }
267            state.bump_cursor(seq);
268        }
269
270        if batch_size == 0 {
271            // No messages within the batch's expires window → we are
272            // at the tail. Done.
273            break;
274        }
275        debug!(batch_size, total_seen, "replay batch consumed");
276    }
277
278    let formations = state.formations.read().await.len();
279    info!(
280        cursor = highest_seq,
281        formations,
282        events_seen = total_seen,
283        events_applied = total_applied,
284        "replay_complete"
285    );
286    Ok(())
287}
288
289/// Open an ephemeral pull-consumer message stream for the WebSocket
290/// bridge. Distinct from `replay_projection`'s consumer because the
291/// WS path needs a long-lived `messages()` stream (waits for new
292/// publishes) rather than a fetch-and-terminate loop.
293pub async fn open_ws_message_stream(
294    ctx: &Context,
295    subject: Option<&str>,
296    since: Option<u64>,
297) -> anyhow::Result<PullStream> {
298    let stream = ctx
299        .get_stream(STREAM_NAME)
300        .await
301        .context("get_stream CELLOS_EVENTS for ws")?;
302
303    let policy = match since {
304        Some(seq) => deliver_policy_for(Some(seq)),
305        None => deliver_policy_live_tail(),
306    };
307
308    let consumer = create_ephemeral_consumer(&stream, policy, subject).await?;
309    consumer
310        .messages()
311        .await
312        .context("open consumer messages stream")
313}
314
315/// Inspect the live stream's retained-sequence floor so the WS bridge
316/// can report it back to a client whose `since` was below the
317/// retention window. Returns `None` if the stream cannot be reached
318/// (in which case the caller should fall through to the generic
319/// close path).
320pub async fn stream_first_seq(ctx: &Context) -> Option<u64> {
321    match ctx.get_stream(STREAM_NAME).await {
322        Ok(mut stream) => match stream.info().await {
323            Ok(info) => Some(info.state.first_sequence),
324            Err(e) => {
325                debug!(error = %e, "stream_first_seq: info() failed");
326                None
327            }
328        },
329        Err(e) => {
330            debug!(error = %e, "stream_first_seq: get_stream failed");
331            None
332        }
333    }
334}
335
336/// Heuristic: did `create_ephemeral_consumer` fail because the
337/// caller's `start_seq` is older than the stream's retained floor?
338///
339/// JetStream surfaces this as a server-side `ErrConsumerCreate` with
340/// a description like "optional start sequence ... is too low".
341/// Rather than depend on a specific error-code shape we string-match
342/// the conventional message; if NATS changes the wording the worst
343/// case is we fall through to the generic close path, which is the
344/// same behavior as before this commit.
345pub fn looks_like_retention_exhausted(err: &anyhow::Error) -> bool {
346    let chain = format!("{err:#}").to_ascii_lowercase();
347    chain.contains("optional start sequence")
348        || chain.contains("too low")
349        || chain.contains("no such sequence")
350        || chain.contains("not found in stream")
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356
357    /// ADR-0015 §D3 — boot replay (no cursor) must deliver every
358    /// retained message. Encoding this as a test pins the contract:
359    /// changing this enum variant in either direction breaks resume
360    /// or breaks boot.
361    #[test]
362    fn deliver_policy_none_is_all() {
363        assert!(matches!(deliver_policy_for(None), DeliverPolicy::All));
364    }
365
366    /// ADR-0015 §D3 — `?since=N` resumes at seq N+1, not N. The
367    /// client's cursor is the *last applied* sequence, so re-emitting
368    /// N would be a duplicate delivery.
369    #[test]
370    fn deliver_policy_some_starts_at_seq_plus_one() {
371        match deliver_policy_for(Some(42)) {
372            DeliverPolicy::ByStartSequence { start_sequence } => {
373                assert_eq!(start_sequence, 43);
374            }
375            other => panic!("expected ByStartSequence(43), got {other:?}"),
376        }
377    }
378
379    /// `?since=0` is a legitimate fresh client — they want everything
380    /// from seq 1 onward. The +1 still applies (`0 + 1 = 1`), which
381    /// is exactly the first retained sequence in a fresh stream.
382    #[test]
383    fn deliver_policy_since_zero_starts_at_one() {
384        match deliver_policy_for(Some(0)) {
385            DeliverPolicy::ByStartSequence { start_sequence } => {
386                assert_eq!(start_sequence, 1);
387            }
388            other => panic!("expected ByStartSequence(1), got {other:?}"),
389        }
390    }
391
392    /// `u64::MAX` saturates rather than overflows. A misbehaving
393    /// client that hands us a sentinel value cannot panic the server.
394    #[test]
395    fn deliver_policy_since_saturates_at_u64_max() {
396        match deliver_policy_for(Some(u64::MAX)) {
397            DeliverPolicy::ByStartSequence { start_sequence } => {
398                assert_eq!(start_sequence, u64::MAX);
399            }
400            other => panic!("expected ByStartSequence(u64::MAX), got {other:?}"),
401        }
402    }
403
404    /// Live-tail must NOT replay history; an ad-hoc `/ws/events`
405    /// subscriber sees only what publishes after their open.
406    #[test]
407    fn live_tail_is_deliver_new() {
408        assert!(matches!(deliver_policy_live_tail(), DeliverPolicy::New));
409    }
410
411    /// Retention-exhaustion detection is best-effort string matching.
412    /// These are the message shapes we currently know about — if the
413    /// regression here is a new NATS error wording, that's exactly
414    /// when this test should fail.
415    #[test]
416    fn retention_exhausted_matches_known_messages() {
417        let cases = [
418            "consumer create failed: optional start sequence 5 is too low",
419            "no such sequence in stream",
420            "sequence 7 not found in stream CELLOS_EVENTS",
421            "ErrConsumerCreate: optional start sequence too low",
422        ];
423        for case in cases {
424            let err = anyhow::anyhow!(case.to_string());
425            assert!(
426                looks_like_retention_exhausted(&err),
427                "expected match for {case:?}",
428            );
429        }
430    }
431
432    /// Red-team finding: ephemeral consumer config MUST set
433    /// `inactive_threshold` so the broker reclaims consumers whose WS
434    /// clients died without a clean close. Without this, half-open TCP
435    /// connections (laptop suspend, NAT eviction) accumulate consumer
436    /// records on the broker indefinitely.
437    #[test]
438    fn ephemeral_consumer_config_sets_inactive_threshold() {
439        let cfg = ephemeral_consumer_config_for(DeliverPolicy::New, None);
440        assert!(
441            cfg.inactive_threshold > Duration::ZERO,
442            "inactive_threshold must be non-zero so broker GCs dead consumers; got {:?}",
443            cfg.inactive_threshold,
444        );
445        assert!(
446            cfg.inactive_threshold >= Duration::from_secs(60),
447            "inactive_threshold must ride out transient reconnect; got {:?}",
448            cfg.inactive_threshold,
449        );
450        assert!(cfg.durable_name.is_none(), "consumer must be ephemeral");
451        assert!(
452            matches!(cfg.ack_policy, AckPolicy::None),
453            "consumer must remain AckPolicy::None for read-only projection feed"
454        );
455    }
456
457    /// Unrelated errors must NOT be classified as retention loss —
458    /// otherwise we'd send 4410 close codes for transient broker
459    /// hiccups and stampede clients into snapshot refetches.
460    #[test]
461    fn retention_exhausted_ignores_unrelated_errors() {
462        let cases = [
463            "broker connection refused",
464            "timed out",
465            "stream not found", // ambiguous; we match on "not found in stream" only
466        ];
467        for case in cases {
468            let err = anyhow::anyhow!(case.to_string());
469            assert!(
470                !looks_like_retention_exhausted(&err),
471                "unexpected match for {case:?}",
472            );
473        }
474    }
475}