cellos-server 0.5.1

HTTP control plane for CellOS — admission, projection over JetStream, WebSocket fan-out of CloudEvents. Pure event-sourced architecture.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
//! JetStream wiring for `cellos-server`.
//!
//! Closes the two open SEAMs from ADR-0011 (projection replay on boot)
//! and ADR-0015 (`?since=<seq>` historical replay via JetStream).
//!
//! ## Why the indirection
//!
//! `cellos-server` is a *projection* over the `cellos.events.>` event
//! log (CHATROOM Session 16). The in-memory `AppState` registry MUST be
//! rebuildable from JetStream alone — that is the property that makes
//! the server safely restartable. This module owns:
//!
//! 1. **Stream provisioning.** [`ensure_stream`] gets-or-creates the
//!    canonical `CELLOS_EVENTS` stream with the FC-74 retention floor
//!    (90 days, file-backed). Idempotent; a pre-existing stream is left
//!    untouched (we do not reconcile config drift here — that is a
//!    cluster-admin concern, not an HTTP-control-plane one).
//!
//! 2. **Ephemeral consumer construction.** [`create_ephemeral_consumer`]
//!    returns a pull consumer whose `DeliverPolicy` is selected from a
//!    caller-supplied `start_seq`: `None` → `DeliverPolicy::All`
//!    (replay everything from the earliest retained message), `Some(N)`
//!    → `DeliverPolicy::ByStartSequence { N+1 }` (resume from the
//!    message after the client's cursor, exactly matching the ADR-0015
//!    §D3 contract).
//!
//! 3. **Boot-time replay.** [`replay_projection`] drains every message
//!    currently in the stream into `AppState`, advancing the snapshot
//!    cursor as it goes, then returns. Called before the HTTP listener
//!    binds so the first `GET /v1/formations` after a restart returns
//!    the fully-rebuilt view with a non-zero cursor.
//!
//! ## What we deliberately do *not* do
//!
//! - We never create *durable* consumers. The server's view is
//!   ephemeral by design: every restart replays the stream, so there
//!   is no consumer state to leak into JetStream's own consumer table.
//! - We do not acknowledge messages. `AckPolicy::None` keeps the
//!   consumer cheap and side-effect-free; this is a *read-only*
//!   projection feed, not a work queue.
//! - We do not reconcile stream-config drift. If `CELLOS_EVENTS`
//!   already exists with a different `max_age`, we use it as-is. The
//!   alternative — silently mutating an operator's stream config —
//!   would violate doctrine on least surprise.

use std::time::Duration;

use anyhow::Context as _;
use async_nats::jetstream::{
    self,
    consumer::{
        pull::{Config as PullConfig, Stream as PullStream},
        AckPolicy, Consumer, DeliverPolicy,
    },
    context::Context,
    stream::{Config as StreamConfig, RetentionPolicy, StorageType, Stream},
};
use futures_util::StreamExt;
use tracing::{debug, info, warn};

use crate::state::{AppState, ApplyOutcome};

/// Canonical stream name for every CloudEvent the platform emits.
/// Mirrors `cellos-supervisor`'s producer-side constant.
pub const STREAM_NAME: &str = "CELLOS_EVENTS";

/// Subject filter for every CloudEvent. The MVP supervisor publishes
/// to `cellos.events.<entity>.<id>.<phase>`; the wildcard captures all
/// of them.
pub const STREAM_SUBJECT: &str = "cellos.events.>";

/// FC-74 retention floor — operators have committed to keeping the
/// last 90 days of events available for forensic replay. Anything
/// older falls off the tail.
const RETENTION_MAX_AGE: Duration = Duration::from_secs(90 * 24 * 60 * 60);

/// Boot-replay drains messages in batches of this size. Picked to be
/// large enough that a fresh cluster with thousands of events doesn't
/// pay one round-trip per event, but small enough that a single batch
/// doesn't dominate startup latency on a small cluster.
const REPLAY_BATCH: usize = 256;

/// Timeout for a single batch fetch during replay. Short because we
/// only need to detect "no more messages right now" — the next batch
/// either returns immediately with data or this timeout fires and we
/// exit the drain loop.
const REPLAY_BATCH_TIMEOUT: Duration = Duration::from_millis(250);

/// Construct a JetStream context and ensure the canonical
/// `CELLOS_EVENTS` stream exists with the FC-74 retention floor.
///
/// Idempotent: if the stream already exists with any config we leave
/// it alone (see module docs for the rationale).
pub async fn ensure_stream(client: &async_nats::Client) -> anyhow::Result<Context> {
    let ctx = jetstream::new(client.clone());
    let config = StreamConfig {
        name: STREAM_NAME.to_string(),
        subjects: vec![STREAM_SUBJECT.to_string()],
        retention: RetentionPolicy::Limits,
        storage: StorageType::File,
        max_age: RETENTION_MAX_AGE,
        ..Default::default()
    };

    // `get_or_create_stream` is server-idempotent: if a stream with
    // this name already exists, the server returns it as-is and we
    // never overwrite operator-tuned settings.
    ctx.get_or_create_stream(config)
        .await
        .context("get_or_create CELLOS_EVENTS stream")?;

    Ok(ctx)
}

/// Decide which `DeliverPolicy` to apply for a given resume cursor.
///
/// Pulled out as a pure function so the policy decision can be
/// unit-tested without spinning up a NATS broker. The mapping is the
/// ADR-0015 §D3 contract: `None` means "deliver everything currently
/// retained" (used by boot replay), `Some(N)` means "deliver from N+1
/// onward" (the client has already seen up through N).
pub fn deliver_policy_for(start_seq: Option<u64>) -> DeliverPolicy {
    match start_seq {
        // Boot replay path: rebuild state from the oldest retained
        // message. With FC-74 retention this is the last 90 days.
        None => DeliverPolicy::All,
        // Resume path: the client's cursor IS the last seq it applied,
        // so we start delivery at seq+1 to avoid re-emitting frames it
        // already processed.
        Some(seq) => DeliverPolicy::ByStartSequence {
            start_sequence: seq.saturating_add(1),
        },
    }
}

/// Live-tail policy for `/ws/events` opens without a `?since=` param.
///
/// Distinct from the resume path because "no cursor at all" should
/// NOT replay history; ad-hoc subscribers (cellctl `--follow`, debug
/// tools) want to see only what flows in after their connect.
pub fn deliver_policy_live_tail() -> DeliverPolicy {
    DeliverPolicy::New
}

/// How long a WS-bridge ephemeral consumer may sit idle on the broker
/// after its client disappears before JetStream itself garbage-collects
/// it. Without this, a WebSocket whose TCP connection died ungracefully
/// (laptop suspend, NAT eviction, OOM kill) leaves the ephemeral
/// consumer pinned to that ghost forever — under `AckPolicy::None` the
/// pending-ack count stays at zero, but the consumer record still
/// occupies broker resources.
///
/// 5 minutes is well clear of the web view's 30-second reconnect
/// ceiling (ADR-0015 §D4) plus a couple of retries, and short enough
/// that thousands of dead consumers do not accumulate over a day.
const EPHEMERAL_INACTIVE_THRESHOLD: Duration = Duration::from_secs(5 * 60);

/// Construct the pull-consumer config that backs the WebSocket bridge
/// and boot-time replay. Pulled out as a pure function so the policy
/// decision can be unit-tested without a live NATS broker.
pub fn ephemeral_consumer_config_for(policy: DeliverPolicy, subject: Option<&str>) -> PullConfig {
    PullConfig {
        durable_name: None,
        name: None,
        deliver_policy: policy,
        ack_policy: AckPolicy::None,
        filter_subject: subject.unwrap_or("").to_string(),
        inactive_threshold: EPHEMERAL_INACTIVE_THRESHOLD,
        ..Default::default()
    }
}

/// Create an ephemeral pull consumer against the canonical stream.
///
/// `start_seq` follows the same convention as
/// [`deliver_policy_for`]: `None` → `DeliverPolicy::All`, `Some(N)` →
/// `DeliverPolicy::ByStartSequence { N+1 }`. `subject` optionally
/// filters to a sub-tree of `cellos.events.>` so a tenant-scoped
/// WebSocket sees only its tenant's events.
///
/// Always ephemeral (`durable_name = None`), `AckPolicy::None`, and
/// gated by `EPHEMERAL_INACTIVE_THRESHOLD` for broker-side GC — see
/// module docs and that constant.
pub async fn create_ephemeral_consumer(
    stream: &Stream,
    policy: DeliverPolicy,
    subject: Option<&str>,
) -> anyhow::Result<Consumer<PullConfig>> {
    let config = ephemeral_consumer_config_for(policy, subject);
    stream
        .create_consumer(config)
        .await
        .context("create ephemeral pull consumer")
}

/// Drive an ephemeral `DeliverPolicy::All` consumer to completion,
/// applying each event into `AppState` and advancing the snapshot
/// cursor. Returns once a batch fetch comes back empty within
/// [`REPLAY_BATCH_TIMEOUT`], which indicates the stream's tail has
/// been reached.
///
/// This is the ADR-0011 SEAM closure: after this returns the
/// in-memory registry reflects the current authoritative view of the
/// event log, so the HTTP listener can open and serve `GET
/// /v1/formations` with a non-zero `cursor` immediately.
pub async fn replay_projection(state: &AppState, ctx: &Context) -> anyhow::Result<()> {
    // Operator escape hatch — tests that drive `cellos-server` in
    // isolation (no live NATS, no JetStream) set this to skip replay
    // entirely. Production deployments leave it unset.
    if std::env::var("CELLOS_SERVER_SKIP_REPLAY")
        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
        .unwrap_or(false)
    {
        info!("CELLOS_SERVER_SKIP_REPLAY set; skipping projection replay");
        return Ok(());
    }

    let stream = ctx
        .get_stream(STREAM_NAME)
        .await
        .context("get_stream CELLOS_EVENTS for replay")?;

    let consumer = create_ephemeral_consumer(&stream, deliver_policy_for(None), None).await?;

    let mut total_seen: u64 = 0;
    let mut total_applied: u64 = 0;
    let mut highest_seq: u64 = 0;

    // We use `fetch().messages()` rather than `consumer.messages()`
    // because `fetch` terminates on a single empty batch — exactly
    // the "drain until idle" semantics replay needs. A long-lived
    // `messages()` stream would idle forever waiting for new
    // publishes, which would deadlock startup on a quiet cluster.
    loop {
        let mut batch = consumer
            .fetch()
            .max_messages(REPLAY_BATCH)
            .expires(REPLAY_BATCH_TIMEOUT)
            .messages()
            .await
            .context("replay: pull batch")?;

        let mut batch_size: usize = 0;
        while let Some(msg) = batch.next().await {
            // async-nats yields Result<Message, Box<dyn StdError + Send + Sync>>;
            // Box<dyn StdError> doesn't impl anyhow::Context. Convert via map_err.
            let msg = msg.map_err(|e| anyhow::anyhow!("replay: read message from batch: {e}"))?;
            batch_size += 1;
            total_seen += 1;

            let seq = match msg.info() {
                Ok(info) => info.stream_sequence,
                Err(e) => {
                    warn!(error = %e, "replay: message missing JetStream info; skipping");
                    continue;
                }
            };
            highest_seq = highest_seq.max(seq);

            match state.apply_event_payload(&msg.payload).await {
                Ok(ApplyOutcome::Applied) => total_applied += 1,
                Ok(ApplyOutcome::Ignored) => {}
                Err(e) => {
                    warn!(seq, error = %e, "replay: failed to apply event; skipping");
                }
            }
            state.bump_cursor(seq);
        }

        if batch_size == 0 {
            // No messages within the batch's expires window → we are
            // at the tail. Done.
            break;
        }
        debug!(batch_size, total_seen, "replay batch consumed");
    }

    let formations = state.formations.read().await.len();
    info!(
        cursor = highest_seq,
        formations,
        events_seen = total_seen,
        events_applied = total_applied,
        "replay_complete"
    );
    Ok(())
}

/// Open an ephemeral pull-consumer message stream for the WebSocket
/// bridge. Distinct from `replay_projection`'s consumer because the
/// WS path needs a long-lived `messages()` stream (waits for new
/// publishes) rather than a fetch-and-terminate loop.
pub async fn open_ws_message_stream(
    ctx: &Context,
    subject: Option<&str>,
    since: Option<u64>,
) -> anyhow::Result<PullStream> {
    let stream = ctx
        .get_stream(STREAM_NAME)
        .await
        .context("get_stream CELLOS_EVENTS for ws")?;

    let policy = match since {
        Some(seq) => deliver_policy_for(Some(seq)),
        None => deliver_policy_live_tail(),
    };

    let consumer = create_ephemeral_consumer(&stream, policy, subject).await?;
    consumer
        .messages()
        .await
        .context("open consumer messages stream")
}

/// Inspect the live stream's retained-sequence floor so the WS bridge
/// can report it back to a client whose `since` was below the
/// retention window. Returns `None` if the stream cannot be reached
/// (in which case the caller should fall through to the generic
/// close path).
pub async fn stream_first_seq(ctx: &Context) -> Option<u64> {
    match ctx.get_stream(STREAM_NAME).await {
        Ok(mut stream) => match stream.info().await {
            Ok(info) => Some(info.state.first_sequence),
            Err(e) => {
                debug!(error = %e, "stream_first_seq: info() failed");
                None
            }
        },
        Err(e) => {
            debug!(error = %e, "stream_first_seq: get_stream failed");
            None
        }
    }
}

/// Heuristic: did `create_ephemeral_consumer` fail because the
/// caller's `start_seq` is older than the stream's retained floor?
///
/// JetStream surfaces this as a server-side `ErrConsumerCreate` with
/// a description like "optional start sequence ... is too low".
/// Rather than depend on a specific error-code shape we string-match
/// the conventional message; if NATS changes the wording the worst
/// case is we fall through to the generic close path, which is the
/// same behavior as before this commit.
pub fn looks_like_retention_exhausted(err: &anyhow::Error) -> bool {
    let chain = format!("{err:#}").to_ascii_lowercase();
    chain.contains("optional start sequence")
        || chain.contains("too low")
        || chain.contains("no such sequence")
        || chain.contains("not found in stream")
}

#[cfg(test)]
mod tests {
    use super::*;

    /// ADR-0015 §D3 — boot replay (no cursor) must deliver every
    /// retained message. Encoding this as a test pins the contract:
    /// changing this enum variant in either direction breaks resume
    /// or breaks boot.
    #[test]
    fn deliver_policy_none_is_all() {
        assert!(matches!(deliver_policy_for(None), DeliverPolicy::All));
    }

    /// ADR-0015 §D3 — `?since=N` resumes at seq N+1, not N. The
    /// client's cursor is the *last applied* sequence, so re-emitting
    /// N would be a duplicate delivery.
    #[test]
    fn deliver_policy_some_starts_at_seq_plus_one() {
        match deliver_policy_for(Some(42)) {
            DeliverPolicy::ByStartSequence { start_sequence } => {
                assert_eq!(start_sequence, 43);
            }
            other => panic!("expected ByStartSequence(43), got {other:?}"),
        }
    }

    /// `?since=0` is a legitimate fresh client — they want everything
    /// from seq 1 onward. The +1 still applies (`0 + 1 = 1`), which
    /// is exactly the first retained sequence in a fresh stream.
    #[test]
    fn deliver_policy_since_zero_starts_at_one() {
        match deliver_policy_for(Some(0)) {
            DeliverPolicy::ByStartSequence { start_sequence } => {
                assert_eq!(start_sequence, 1);
            }
            other => panic!("expected ByStartSequence(1), got {other:?}"),
        }
    }

    /// `u64::MAX` saturates rather than overflows. A misbehaving
    /// client that hands us a sentinel value cannot panic the server.
    #[test]
    fn deliver_policy_since_saturates_at_u64_max() {
        match deliver_policy_for(Some(u64::MAX)) {
            DeliverPolicy::ByStartSequence { start_sequence } => {
                assert_eq!(start_sequence, u64::MAX);
            }
            other => panic!("expected ByStartSequence(u64::MAX), got {other:?}"),
        }
    }

    /// Live-tail must NOT replay history; an ad-hoc `/ws/events`
    /// subscriber sees only what publishes after their open.
    #[test]
    fn live_tail_is_deliver_new() {
        assert!(matches!(deliver_policy_live_tail(), DeliverPolicy::New));
    }

    /// Retention-exhaustion detection is best-effort string matching.
    /// These are the message shapes we currently know about — if the
    /// regression here is a new NATS error wording, that's exactly
    /// when this test should fail.
    #[test]
    fn retention_exhausted_matches_known_messages() {
        let cases = [
            "consumer create failed: optional start sequence 5 is too low",
            "no such sequence in stream",
            "sequence 7 not found in stream CELLOS_EVENTS",
            "ErrConsumerCreate: optional start sequence too low",
        ];
        for case in cases {
            let err = anyhow::anyhow!(case.to_string());
            assert!(
                looks_like_retention_exhausted(&err),
                "expected match for {case:?}",
            );
        }
    }

    /// Red-team finding: ephemeral consumer config MUST set
    /// `inactive_threshold` so the broker reclaims consumers whose WS
    /// clients died without a clean close. Without this, half-open TCP
    /// connections (laptop suspend, NAT eviction) accumulate consumer
    /// records on the broker indefinitely.
    #[test]
    fn ephemeral_consumer_config_sets_inactive_threshold() {
        let cfg = ephemeral_consumer_config_for(DeliverPolicy::New, None);
        assert!(
            cfg.inactive_threshold > Duration::ZERO,
            "inactive_threshold must be non-zero so broker GCs dead consumers; got {:?}",
            cfg.inactive_threshold,
        );
        assert!(
            cfg.inactive_threshold >= Duration::from_secs(60),
            "inactive_threshold must ride out transient reconnect; got {:?}",
            cfg.inactive_threshold,
        );
        assert!(cfg.durable_name.is_none(), "consumer must be ephemeral");
        assert!(
            matches!(cfg.ack_policy, AckPolicy::None),
            "consumer must remain AckPolicy::None for read-only projection feed"
        );
    }

    /// Unrelated errors must NOT be classified as retention loss —
    /// otherwise we'd send 4410 close codes for transient broker
    /// hiccups and stampede clients into snapshot refetches.
    #[test]
    fn retention_exhausted_ignores_unrelated_errors() {
        let cases = [
            "broker connection refused",
            "timed out",
            "stream not found", // ambiguous; we match on "not found in stream" only
        ];
        for case in cases {
            let err = anyhow::anyhow!(case.to_string());
            assert!(
                !looks_like_retention_exhausted(&err),
                "unexpected match for {case:?}",
            );
        }
    }
}