Skip to main content

cellos_server/
jetstream.rs

1//! JetStream wiring for `cellos-server`.
2//!
3//! Closes the two open SEAMs from ADR-0011 (projection replay on boot)
4//! and ADR-0015 (`?since=<seq>` historical replay via JetStream).
5//!
6//! ## Why the indirection
7//!
8//! `cellos-server` is a *projection* over the `cellos.events.>` event
9//! log (CHATROOM Session 16). The in-memory `AppState` registry MUST be
10//! rebuildable from JetStream alone — that is the property that makes
11//! the server safely restartable. This module owns:
12//!
13//! 1. **Stream provisioning.** [`ensure_stream`] gets-or-creates the
14//!    canonical `CELLOS_EVENTS` stream with the FC-74 retention floor
15//!    (90 days, file-backed). Idempotent; a pre-existing stream is left
16//!    untouched (we do not reconcile config drift here — that is a
17//!    cluster-admin concern, not an HTTP-control-plane one).
18//!
19//! 2. **Ephemeral consumer construction.** [`create_ephemeral_consumer`]
20//!    returns a pull consumer whose `DeliverPolicy` is selected from a
21//!    caller-supplied `start_seq`: `None` → `DeliverPolicy::All`
22//!    (replay everything from the earliest retained message), `Some(N)`
23//!    → `DeliverPolicy::ByStartSequence { N+1 }` (resume from the
24//!    message after the client's cursor, exactly matching the ADR-0015
25//!    §D3 contract).
26//!
27//! 3. **Boot-time replay.** [`replay_projection`] drains every message
28//!    currently in the stream into `AppState`, advancing the snapshot
29//!    cursor as it goes, then returns. Called before the HTTP listener
30//!    binds so the first `GET /v1/formations` after a restart returns
31//!    the fully-rebuilt view with a non-zero cursor.
32//!
33//! ## What we deliberately do *not* do
34//!
35//! - We never create *durable* consumers. The server's view is
36//!   ephemeral by design: every restart replays the stream, so there
37//!   is no consumer state to leak into JetStream's own consumer table.
38//! - We do not acknowledge messages. `AckPolicy::None` keeps the
39//!   consumer cheap and side-effect-free; this is a *read-only*
40//!   projection feed, not a work queue.
41//! - We do not reconcile stream-config drift. If `CELLOS_EVENTS`
42//!   already exists with a different `max_age`, we use it as-is. The
43//!   alternative — silently mutating an operator's stream config —
44//!   would violate doctrine on least surprise.
45
46use std::time::Duration;
47
48use anyhow::Context as _;
49use async_nats::jetstream::{
50    self,
51    consumer::{
52        pull::{Config as PullConfig, Stream as PullStream},
53        AckPolicy, Consumer, DeliverPolicy,
54    },
55    context::Context,
56    stream::{Config as StreamConfig, RetentionPolicy, StorageType, Stream},
57};
58use futures_util::StreamExt;
59use tracing::{debug, info, warn};
60
61use crate::state::{AppState, ApplyOutcome};
62
63/// Canonical stream name for every CloudEvent the platform emits.
64/// Mirrors `cellos-supervisor`'s producer-side constant.
65pub const STREAM_NAME: &str = "CELLOS_EVENTS";
66
67/// Subject filter for every CloudEvent. The MVP supervisor publishes
68/// to `cellos.events.<entity>.<id>.<phase>`; the wildcard captures all
69/// of them.
70pub const STREAM_SUBJECT: &str = "cellos.events.>";
71
72/// FC-74 retention floor — operators have committed to keeping the
73/// last 90 days of events available for forensic replay. Anything
74/// older falls off the tail.
75const RETENTION_MAX_AGE: Duration = Duration::from_secs(90 * 24 * 60 * 60);
76
77/// Boot-replay drains messages in batches of this size. Picked to be
78/// large enough that a fresh cluster with thousands of events doesn't
79/// pay one round-trip per event, but small enough that a single batch
80/// doesn't dominate startup latency on a small cluster.
81const REPLAY_BATCH: usize = 256;
82
83/// Timeout for a single batch fetch during replay. Short because we
84/// only need to detect "no more messages right now" — the next batch
85/// either returns immediately with data or this timeout fires and we
86/// exit the drain loop.
87const REPLAY_BATCH_TIMEOUT: Duration = Duration::from_millis(250);
88
89/// Construct a JetStream context and ensure the canonical
90/// `CELLOS_EVENTS` stream exists with the FC-74 retention floor.
91///
92/// Idempotent: if the stream already exists with any config we leave
93/// it alone (see module docs for the rationale).
94pub async fn ensure_stream(client: &async_nats::Client) -> anyhow::Result<Context> {
95    let ctx = jetstream::new(client.clone());
96    let config = StreamConfig {
97        name: STREAM_NAME.to_string(),
98        subjects: vec![STREAM_SUBJECT.to_string()],
99        retention: RetentionPolicy::Limits,
100        storage: StorageType::File,
101        max_age: RETENTION_MAX_AGE,
102        ..Default::default()
103    };
104
105    // `get_or_create_stream` is server-idempotent: if a stream with
106    // this name already exists, the server returns it as-is and we
107    // never overwrite operator-tuned settings.
108    ctx.get_or_create_stream(config)
109        .await
110        .context("get_or_create CELLOS_EVENTS stream")?;
111
112    Ok(ctx)
113}
114
115/// Decide which `DeliverPolicy` to apply for a given resume cursor.
116///
117/// Pulled out as a pure function so the policy decision can be
118/// unit-tested without spinning up a NATS broker. The mapping is the
119/// ADR-0015 §D3 contract: `None` means "deliver everything currently
120/// retained" (used by boot replay), `Some(N)` means "deliver from N+1
121/// onward" (the client has already seen up through N).
122pub fn deliver_policy_for(start_seq: Option<u64>) -> DeliverPolicy {
123    match start_seq {
124        // Boot replay path: rebuild state from the oldest retained
125        // message. With FC-74 retention this is the last 90 days.
126        None => DeliverPolicy::All,
127        // Resume path: the client's cursor IS the last seq it applied,
128        // so we start delivery at seq+1 to avoid re-emitting frames it
129        // already processed.
130        Some(seq) => DeliverPolicy::ByStartSequence {
131            start_sequence: seq.saturating_add(1),
132        },
133    }
134}
135
136/// Live-tail policy for `/ws/events` opens without a `?since=` param.
137///
138/// Distinct from the resume path because "no cursor at all" should
139/// NOT replay history; ad-hoc subscribers (cellctl `--follow`, debug
140/// tools) want to see only what flows in after their connect.
141pub fn deliver_policy_live_tail() -> DeliverPolicy {
142    DeliverPolicy::New
143}
144
145/// How long a WS-bridge ephemeral consumer may sit idle on the broker
146/// after its client disappears before JetStream itself garbage-collects
147/// it. Without this, a WebSocket whose TCP connection died ungracefully
148/// (laptop suspend, NAT eviction, OOM kill) leaves the ephemeral
149/// consumer pinned to that ghost forever — under `AckPolicy::None` the
150/// pending-ack count stays at zero, but the consumer record still
151/// occupies broker resources.
152///
153/// 5 minutes is well clear of the web view's 30-second reconnect
154/// ceiling (ADR-0015 §D4) plus a couple of retries, and short enough
155/// that thousands of dead consumers do not accumulate over a day.
156const EPHEMERAL_INACTIVE_THRESHOLD: Duration = Duration::from_secs(5 * 60);
157
158/// Construct the pull-consumer config that backs the WebSocket bridge
159/// and boot-time replay. Pulled out as a pure function so the policy
160/// decision can be unit-tested without a live NATS broker.
161pub fn ephemeral_consumer_config_for(policy: DeliverPolicy, subject: Option<&str>) -> PullConfig {
162    PullConfig {
163        durable_name: None,
164        name: None,
165        deliver_policy: policy,
166        ack_policy: AckPolicy::None,
167        filter_subject: subject.unwrap_or("").to_string(),
168        inactive_threshold: EPHEMERAL_INACTIVE_THRESHOLD,
169        ..Default::default()
170    }
171}
172
173/// Create an ephemeral pull consumer against the canonical stream.
174///
175/// `start_seq` follows the same convention as
176/// [`deliver_policy_for`]: `None` → `DeliverPolicy::All`, `Some(N)` →
177/// `DeliverPolicy::ByStartSequence { N+1 }`. `subject` optionally
178/// filters to a sub-tree of `cellos.events.>` so a tenant-scoped
179/// WebSocket sees only its tenant's events.
180///
181/// Always ephemeral (`durable_name = None`), `AckPolicy::None`, and
182/// gated by `EPHEMERAL_INACTIVE_THRESHOLD` for broker-side GC — see
183/// module docs and that constant.
184pub async fn create_ephemeral_consumer(
185    stream: &Stream,
186    policy: DeliverPolicy,
187    subject: Option<&str>,
188) -> anyhow::Result<Consumer<PullConfig>> {
189    let config = ephemeral_consumer_config_for(policy, subject);
190    stream
191        .create_consumer(config)
192        .await
193        .context("create ephemeral pull consumer")
194}
195
196/// Drive an ephemeral `DeliverPolicy::All` consumer to completion,
197/// applying each event into `AppState` and advancing the snapshot
198/// cursor. Returns once a batch fetch comes back empty within
199/// [`REPLAY_BATCH_TIMEOUT`], which indicates the stream's tail has
200/// been reached.
201///
202/// This is the ADR-0011 SEAM closure: after this returns the
203/// in-memory registry reflects the current authoritative view of the
204/// event log, so the HTTP listener can open and serve `GET
205/// /v1/formations` with a non-zero `cursor` immediately.
206pub async fn replay_projection(state: &AppState, ctx: &Context) -> anyhow::Result<()> {
207    // Operator escape hatch — tests that drive `cellos-server` in
208    // isolation (no live NATS, no JetStream) set this to skip replay
209    // entirely. Production deployments leave it unset.
210    if std::env::var("CELLOS_SERVER_SKIP_REPLAY")
211        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
212        .unwrap_or(false)
213    {
214        info!("CELLOS_SERVER_SKIP_REPLAY set; skipping projection replay");
215        return Ok(());
216    }
217
218    let stream = ctx
219        .get_stream(STREAM_NAME)
220        .await
221        .context("get_stream CELLOS_EVENTS for replay")?;
222
223    let consumer = create_ephemeral_consumer(&stream, deliver_policy_for(None), None).await?;
224
225    let mut total_seen: u64 = 0;
226    let mut total_applied: u64 = 0;
227    let mut highest_seq: u64 = 0;
228
229    // We use `fetch().messages()` rather than `consumer.messages()`
230    // because `fetch` terminates on a single empty batch — exactly
231    // the "drain until idle" semantics replay needs. A long-lived
232    // `messages()` stream would idle forever waiting for new
233    // publishes, which would deadlock startup on a quiet cluster.
234    loop {
235        let mut batch = consumer
236            .fetch()
237            .max_messages(REPLAY_BATCH)
238            .expires(REPLAY_BATCH_TIMEOUT)
239            .messages()
240            .await
241            .context("replay: pull batch")?;
242
243        let mut batch_size: usize = 0;
244        while let Some(msg) = batch.next().await {
245            // async-nats yields Result<Message, Box<dyn StdError + Send + Sync>>;
246            // Box<dyn StdError> doesn't impl anyhow::Context. Convert via map_err.
247            let msg = msg.map_err(|e| anyhow::anyhow!("replay: read message from batch: {e}"))?;
248            batch_size += 1;
249            total_seen += 1;
250
251            let seq = match msg.info() {
252                Ok(info) => info.stream_sequence,
253                Err(e) => {
254                    warn!(error = %e, "replay: message missing JetStream info; skipping");
255                    continue;
256                }
257            };
258            highest_seq = highest_seq.max(seq);
259
260            match state.apply_event_payload(&msg.payload).await {
261                Ok(ApplyOutcome::Applied) => total_applied += 1,
262                Ok(ApplyOutcome::Ignored) => {}
263                Err(e) => {
264                    warn!(seq, error = %e, "replay: failed to apply event; skipping");
265                }
266            }
267            state.bump_cursor(seq);
268        }
269
270        if batch_size == 0 {
271            // No messages within the batch's expires window → we are
272            // at the tail. Done.
273            break;
274        }
275        debug!(batch_size, total_seen, "replay batch consumed");
276    }
277
278    let formations = state.formations.read().await.len();
279    info!(
280        cursor = highest_seq,
281        formations,
282        events_seen = total_seen,
283        events_applied = total_applied,
284        "replay_complete"
285    );
286    Ok(())
287}
288
289/// Drive a long-lived `DeliverPolicy::New` consumer that pumps every
290/// post-boot CloudEvent into `AppState` via `apply_event_payload`.
291///
292/// This closes the PROJECTION-LIVE-LAG gap (E2E-V2 acceptance report,
293/// 0.5.1 → 0.5.2 wave): [`replay_projection`] rebuilds the projection
294/// once at boot, but until this loop runs nothing fed new events into
295/// the projection while the server was alive — supervisor-emitted
296/// `cell.lifecycle.v1.*` events only appeared in `GET /v1/cells` after
297/// the next server restart re-replayed the stream.
298///
299/// The loop is intentionally minimal:
300///
301/// - `DeliverPolicy::New` (not `All`): replay has already drained the
302///   stream's history; we only want what publishes AFTER the listener
303///   binds. The two paths together cover the whole timeline without
304///   double-counting the boundary message.
305/// - No subject filter: same `cellos.events.>` surface the replay
306///   consumer used. `apply_event_payload` is the single discriminator.
307/// - `AckPolicy::None` + ephemeral: identical to the replay consumer's
308///   semantics, so a process restart re-derives the consumer cheaply
309///   and never accumulates broker-side state.
310/// - `messages()` (not `fetch()`): we want to block on the next publish
311///   indefinitely; this is the long-lived tail, not a drain.
312///
313/// The task is detached — `main.rs` keeps the `JoinHandle` only so a
314/// future graceful-shutdown path can abort it cleanly. On error the
315/// loop logs and exits; the projection will go stale until the server
316/// restarts, but that is strictly no worse than the pre-fix behavior
317/// (which never had a live projector at all).
318pub async fn spawn_live_projector(
319    state: AppState,
320    ctx: Context,
321) -> anyhow::Result<tokio::task::JoinHandle<()>> {
322    let stream = ctx
323        .get_stream(STREAM_NAME)
324        .await
325        .context("get_stream CELLOS_EVENTS for live projector")?;
326
327    // `DeliverPolicy::New` so we don't re-apply the replayed history.
328    // The post-boot stream tail is the projector's responsibility from
329    // here on.
330    let consumer = create_ephemeral_consumer(&stream, DeliverPolicy::New, None).await?;
331    let mut messages = consumer
332        .messages()
333        .await
334        .context("open live projector message stream")?;
335
336    let handle = tokio::spawn(async move {
337        info!("live projector subscribed; tailing cellos.events.>");
338        while let Some(msg) = messages.next().await {
339            let msg = match msg {
340                Ok(m) => m,
341                Err(e) => {
342                    warn!(error = %e, "live projector: read error; exiting tail");
343                    return;
344                }
345            };
346
347            let seq = match msg.info() {
348                Ok(info) => info.stream_sequence,
349                Err(e) => {
350                    warn!(error = %e, "live projector: message missing JetStream info; skipping");
351                    continue;
352                }
353            };
354
355            match state.apply_event_payload(&msg.payload).await {
356                Ok(ApplyOutcome::Applied) => {
357                    debug!(seq, "live projector: event applied");
358                }
359                Ok(ApplyOutcome::Ignored) => {}
360                Err(e) => {
361                    warn!(seq, error = %e, "live projector: failed to apply event; skipping");
362                }
363            }
364            // Mirror replay's contract: advance the cursor whether or
365            // not the projection mutated. WS clients reading the
366            // snapshot cursor see live events without a restart.
367            state.bump_cursor(seq);
368        }
369        debug!("live projector: stream ended");
370    });
371
372    Ok(handle)
373}
374
375/// Open an ephemeral pull-consumer message stream for the WebSocket
376/// bridge. Distinct from `replay_projection`'s consumer because the
377/// WS path needs a long-lived `messages()` stream (waits for new
378/// publishes) rather than a fetch-and-terminate loop.
379pub async fn open_ws_message_stream(
380    ctx: &Context,
381    subject: Option<&str>,
382    since: Option<u64>,
383) -> anyhow::Result<PullStream> {
384    let stream = ctx
385        .get_stream(STREAM_NAME)
386        .await
387        .context("get_stream CELLOS_EVENTS for ws")?;
388
389    let policy = match since {
390        Some(seq) => deliver_policy_for(Some(seq)),
391        None => deliver_policy_live_tail(),
392    };
393
394    let consumer = create_ephemeral_consumer(&stream, policy, subject).await?;
395    consumer
396        .messages()
397        .await
398        .context("open consumer messages stream")
399}
400
401/// Inspect the live stream's retained-sequence floor so the WS bridge
402/// can report it back to a client whose `since` was below the
403/// retention window. Returns `None` if the stream cannot be reached
404/// (in which case the caller should fall through to the generic
405/// close path).
406pub async fn stream_first_seq(ctx: &Context) -> Option<u64> {
407    match ctx.get_stream(STREAM_NAME).await {
408        Ok(mut stream) => match stream.info().await {
409            Ok(info) => Some(info.state.first_sequence),
410            Err(e) => {
411                debug!(error = %e, "stream_first_seq: info() failed");
412                None
413            }
414        },
415        Err(e) => {
416            debug!(error = %e, "stream_first_seq: get_stream failed");
417            None
418        }
419    }
420}
421
422/// Heuristic: did `create_ephemeral_consumer` fail because the
423/// caller's `start_seq` is older than the stream's retained floor?
424///
425/// JetStream surfaces this as a server-side `ErrConsumerCreate` with
426/// a description like "optional start sequence ... is too low".
427/// Rather than depend on a specific error-code shape we string-match
428/// the conventional message; if NATS changes the wording the worst
429/// case is we fall through to the generic close path, which is the
430/// same behavior as before this commit.
431pub fn looks_like_retention_exhausted(err: &anyhow::Error) -> bool {
432    let chain = format!("{err:#}").to_ascii_lowercase();
433    chain.contains("optional start sequence")
434        || chain.contains("too low")
435        || chain.contains("no such sequence")
436        || chain.contains("not found in stream")
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    /// ADR-0015 §D3 — boot replay (no cursor) must deliver every
444    /// retained message. Encoding this as a test pins the contract:
445    /// changing this enum variant in either direction breaks resume
446    /// or breaks boot.
447    #[test]
448    fn deliver_policy_none_is_all() {
449        assert!(matches!(deliver_policy_for(None), DeliverPolicy::All));
450    }
451
452    /// ADR-0015 §D3 — `?since=N` resumes at seq N+1, not N. The
453    /// client's cursor is the *last applied* sequence, so re-emitting
454    /// N would be a duplicate delivery.
455    #[test]
456    fn deliver_policy_some_starts_at_seq_plus_one() {
457        match deliver_policy_for(Some(42)) {
458            DeliverPolicy::ByStartSequence { start_sequence } => {
459                assert_eq!(start_sequence, 43);
460            }
461            other => panic!("expected ByStartSequence(43), got {other:?}"),
462        }
463    }
464
465    /// `?since=0` is a legitimate fresh client — they want everything
466    /// from seq 1 onward. The +1 still applies (`0 + 1 = 1`), which
467    /// is exactly the first retained sequence in a fresh stream.
468    #[test]
469    fn deliver_policy_since_zero_starts_at_one() {
470        match deliver_policy_for(Some(0)) {
471            DeliverPolicy::ByStartSequence { start_sequence } => {
472                assert_eq!(start_sequence, 1);
473            }
474            other => panic!("expected ByStartSequence(1), got {other:?}"),
475        }
476    }
477
478    /// `u64::MAX` saturates rather than overflows. A misbehaving
479    /// client that hands us a sentinel value cannot panic the server.
480    #[test]
481    fn deliver_policy_since_saturates_at_u64_max() {
482        match deliver_policy_for(Some(u64::MAX)) {
483            DeliverPolicy::ByStartSequence { start_sequence } => {
484                assert_eq!(start_sequence, u64::MAX);
485            }
486            other => panic!("expected ByStartSequence(u64::MAX), got {other:?}"),
487        }
488    }
489
490    /// Live-tail must NOT replay history; an ad-hoc `/ws/events`
491    /// subscriber sees only what publishes after their open.
492    #[test]
493    fn live_tail_is_deliver_new() {
494        assert!(matches!(deliver_policy_live_tail(), DeliverPolicy::New));
495    }
496
497    /// Retention-exhaustion detection is best-effort string matching.
498    /// These are the message shapes we currently know about — if the
499    /// regression here is a new NATS error wording, that's exactly
500    /// when this test should fail.
501    #[test]
502    fn retention_exhausted_matches_known_messages() {
503        let cases = [
504            "consumer create failed: optional start sequence 5 is too low",
505            "no such sequence in stream",
506            "sequence 7 not found in stream CELLOS_EVENTS",
507            "ErrConsumerCreate: optional start sequence too low",
508        ];
509        for case in cases {
510            let err = anyhow::anyhow!(case.to_string());
511            assert!(
512                looks_like_retention_exhausted(&err),
513                "expected match for {case:?}",
514            );
515        }
516    }
517
518    /// Red-team finding: ephemeral consumer config MUST set
519    /// `inactive_threshold` so the broker reclaims consumers whose WS
520    /// clients died without a clean close. Without this, half-open TCP
521    /// connections (laptop suspend, NAT eviction) accumulate consumer
522    /// records on the broker indefinitely.
523    #[test]
524    fn ephemeral_consumer_config_sets_inactive_threshold() {
525        let cfg = ephemeral_consumer_config_for(DeliverPolicy::New, None);
526        assert!(
527            cfg.inactive_threshold > Duration::ZERO,
528            "inactive_threshold must be non-zero so broker GCs dead consumers; got {:?}",
529            cfg.inactive_threshold,
530        );
531        assert!(
532            cfg.inactive_threshold >= Duration::from_secs(60),
533            "inactive_threshold must ride out transient reconnect; got {:?}",
534            cfg.inactive_threshold,
535        );
536        assert!(cfg.durable_name.is_none(), "consumer must be ephemeral");
537        assert!(
538            matches!(cfg.ack_policy, AckPolicy::None),
539            "consumer must remain AckPolicy::None for read-only projection feed"
540        );
541    }
542
543    /// Unrelated errors must NOT be classified as retention loss —
544    /// otherwise we'd send 4410 close codes for transient broker
545    /// hiccups and stampede clients into snapshot refetches.
546    #[test]
547    fn retention_exhausted_ignores_unrelated_errors() {
548        let cases = [
549            "broker connection refused",
550            "timed out",
551            "stream not found", // ambiguous; we match on "not found in stream" only
552        ];
553        for case in cases {
554            let err = anyhow::anyhow!(case.to_string());
555            assert!(
556                !looks_like_retention_exhausted(&err),
557                "unexpected match for {case:?}",
558            );
559        }
560    }
561}