cellos-server 0.5.3

HTTP control plane for CellOS — admission, projection over JetStream, WebSocket fan-out of CloudEvents. Pure event-sourced architecture.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
//! `GET /v1/events` — one-shot snapshot of recent CloudEvents.
//!
//! EVT-001 (E2E report): `cellctl events` (without `--follow`) needs an
//! HTTP fallback for environments where WebSocket isn't viable — same
//! ergonomics as `kubectl get events`. The live-tail path
//! (`/ws/events`, see `ws.rs`) continues to serve `cellctl events
//! --follow` and the web UI.
//!
//! ## Response shape
//!
//! ```json
//! {
//!   "events": [{ "seq": 12345, "event": { /* CloudEvent */ } }, …],
//!   "cursor": 12345
//! }
//! ```
//!
//! `seq` matches the WebSocket envelope (`ws.rs::build_envelope`) so
//! clients can mix snapshot + follow flows without translating shapes:
//! open `/v1/events?since=<N>`, take the returned `cursor`, then
//! `/ws/events?since=<cursor>` to live-tail with no gap.
//!
//! `cursor` is the highest `seq` in the returned page. On an empty
//! page we return the JetStream tail (`state.last_sequence`) when it
//! is known, otherwise the caller's `since` echoed back. The tail
//! clamp (FUZZ-MED-3) prevents a hostile or stale client from pinning
//! the cursor to a future sequence — a client that asks
//! `?since=999999999` on a stream whose real tail is 42 sees
//! `cursor: 42`, not `999999999`. This is monotone: a follow-up
//! `?since=cursor` will not skip ahead of the stream's true tail.
//!
//! ## Query parameters
//!
//! - `since=<seq>` (optional, default 0) — return events with
//!   `seq > since`. `0` means "from the oldest retained event".
//! - `limit=<n>` (optional, default 100, clamped to 1..=1000) — page
//!   ceiling. The handler clamps in-band rather than 400-ing on a bad
//!   value because the cellctl/web flow is read-only and a stale client
//!   that asked for `limit=99999` deserves a useful (capped) answer
//!   rather than an error toast.
//!
//! ## What this is NOT
//!
//! - Not a search interface. Subject-filtering is `?subject=` on the
//!   WS path and is intentionally absent here; one-shot snapshot is a
//!   "give me the recent firehose" tool, not a query language.
//! - Not durable. The handler creates an *ephemeral* JetStream consumer
//!   per request, drains a single batch, and drops the consumer.
//!   `inactive_threshold` on the consumer config bounds broker-side
//!   cleanup if our process dies between batch-fetch and consumer-drop.

use axum::extract::{Query, State};
use axum::http::HeaderMap;
use axum::Json;
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};

use crate::auth::require_bearer;
use crate::error::AppError;
use crate::jetstream::{create_ephemeral_consumer, deliver_policy_for};
use crate::state::AppState;

/// Default page size when `?limit=` is omitted. Aligned with kubectl's
/// `--chunk-size=500` lineage but trimmed to 100 because the typical
/// `cellctl events` viewer is a human-readable terminal, not a paginated
/// batch crawler.
const DEFAULT_LIMIT: usize = 100;

/// Hard cap on the page size — operators can ask for more but the server
/// will not pay more than this in one round-trip. 1000 is well above the
/// usual viewer's screen (60-200 lines) and well below the per-request
/// budget the broker is willing to deliver before its own batch timeout
/// kicks in (`REPLAY_BATCH_TIMEOUT` in `jetstream.rs`).
const MAX_LIMIT: usize = 1000;

/// How long the handler waits for a JetStream batch before deciding the
/// caller has caught up to the tail. Short on purpose — a long timeout
/// just blocks a viewer that already has the snapshot it asked for.
const FETCH_TIMEOUT_MS: u64 = 250;

/// FUZZ-CRIT-1 (wave-1 report §Critical): wall-clock cap on every NATS-
/// side request the handler issues (`get_stream`, `create_ephemeral_
/// consumer`, batch `messages()`). Without this, an unreachable broker
/// turns each `/v1/events` request into a ~10s hang (async-nats default
/// JetStream request timeout is 5s; chained with a reconnect storm the
/// wave-1 fuzzer saw 10s wall-clock 500s). 1500 ms is well above the
/// p99 of a healthy local-network NATS round-trip (low double-digit ms)
/// and short enough that a flood of `/v1/events` during a NATS partition
/// can't pin handler tasks long enough to be a DoS lever.
const NATS_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1500);

/// Query parameters for `GET /v1/events`. See module-level docs for the
/// semantics. `serde(default)` so missing fields use the defaults below
/// without rejecting the request — kubectl-style ergonomics.
#[derive(Debug, Deserialize, Default)]
pub struct EventsQuery {
    /// Return events with `seq > since`. `None`/`0` means "from the
    /// oldest retained event" (JetStream's `DeliverPolicy::All`).
    #[serde(default)]
    pub since: Option<u64>,
    /// Page size; clamped to `[1, MAX_LIMIT]` after parsing.
    #[serde(default)]
    pub limit: Option<usize>,
}

/// One envelope in the response page. Wire-compatible with the
/// `/ws/events` frame shape (`ws.rs::build_envelope`).
#[derive(Debug, Serialize)]
pub struct EventEnvelope {
    pub seq: u64,
    pub event: serde_json::Value,
}

/// Response body. `cursor` is the highest seq in `events`, or `since`
/// echoed back when the page is empty (see module-level docs).
#[derive(Debug, Serialize)]
pub struct EventsResponse {
    pub events: Vec<EventEnvelope>,
    pub cursor: u64,
}

/// `GET /v1/events` — Bearer-protected one-shot snapshot of the
/// JetStream event log. See module docs for the contract.
pub async fn list_events(
    State(state): State<AppState>,
    headers: HeaderMap,
    Query(query): Query<EventsQuery>,
) -> Result<Json<EventsResponse>, AppError> {
    require_bearer(&headers, &state.api_token)?;

    let since = query.since.unwrap_or(0);
    let limit = clamp_limit(query.limit);

    // No broker configured (test mode, NATS unreachable at startup):
    // return the current cursor with no events rather than a 5xx so
    // operators inspecting a broker-less projection get a useful
    // answer. The cursor still reflects whatever was replayed at boot.
    let Some(ctx) = state.jetstream.clone() else {
        debug!("GET /v1/events: no JetStream context configured; returning empty snapshot");
        return Ok(Json(EventsResponse {
            events: Vec::new(),
            cursor: state.cursor().max(since),
        }));
    };

    // FUZZ-CRIT-1: wrap every NATS-side call in a hard wall-clock
    // timeout AND map any error (timeout, connection failure, JetStream
    // RPC error) to a redacted 503. Operators get the underlying error
    // via the WARN log; clients see only the generic problem+json body.
    let mut stream = match tokio::time::timeout(
        NATS_REQUEST_TIMEOUT,
        ctx.get_stream(crate::jetstream::STREAM_NAME),
    )
    .await
    {
        Ok(Ok(s)) => s,
        Ok(Err(e)) => {
            warn!(
                stream = crate::jetstream::STREAM_NAME,
                error = %e,
                stage = "get_stream",
                "GET /v1/events: JetStream request failed; returning 503",
            );
            return Err(AppError::service_unavailable());
        }
        Err(_elapsed) => {
            warn!(
                stream = crate::jetstream::STREAM_NAME,
                timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
                stage = "get_stream",
                "GET /v1/events: JetStream get_stream timed out; returning 503",
            );
            return Err(AppError::service_unavailable());
        }
    };

    // `since == 0` → DeliverPolicy::All (everything currently retained).
    // `since > 0` → DeliverPolicy::ByStartSequence { since + 1 }, which
    // is exactly the ADR-0015 §D3 contract the WS path already uses.
    let policy = if since == 0 {
        deliver_policy_for(None)
    } else {
        deliver_policy_for(Some(since))
    };

    let consumer = match tokio::time::timeout(
        NATS_REQUEST_TIMEOUT,
        create_ephemeral_consumer(&stream, policy, None),
    )
    .await
    {
        Ok(Ok(c)) => c,
        Ok(Err(e)) => {
            warn!(
                stream = crate::jetstream::STREAM_NAME,
                error = %format!("{e:#}"),
                stage = "create_ephemeral_consumer",
                "GET /v1/events: JetStream consumer create failed; returning 503",
            );
            return Err(AppError::service_unavailable());
        }
        Err(_elapsed) => {
            warn!(
                stream = crate::jetstream::STREAM_NAME,
                timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
                stage = "create_ephemeral_consumer",
                "GET /v1/events: JetStream consumer create timed out; returning 503",
            );
            return Err(AppError::service_unavailable());
        }
    };

    let mut batch = match tokio::time::timeout(
        NATS_REQUEST_TIMEOUT,
        consumer
            .fetch()
            .max_messages(limit)
            .expires(std::time::Duration::from_millis(FETCH_TIMEOUT_MS))
            .messages(),
    )
    .await
    {
        Ok(Ok(b)) => b,
        Ok(Err(e)) => {
            warn!(
                stream = crate::jetstream::STREAM_NAME,
                error = %format!("{e:#}"),
                stage = "fetch_batch",
                "GET /v1/events: JetStream batch fetch failed; returning 503",
            );
            return Err(AppError::service_unavailable());
        }
        Err(_elapsed) => {
            warn!(
                stream = crate::jetstream::STREAM_NAME,
                timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
                stage = "fetch_batch",
                "GET /v1/events: JetStream batch fetch timed out; returning 503",
            );
            return Err(AppError::service_unavailable());
        }
    };

    let mut envelopes: Vec<EventEnvelope> = Vec::with_capacity(limit.min(64));
    let mut highest: u64 = since;

    while let Some(msg) = batch.next().await {
        let msg = match msg {
            Ok(m) => m,
            Err(e) => {
                warn!(error = %e, "GET /v1/events: batch read error; ending page");
                break;
            }
        };
        let seq = match msg.info() {
            Ok(info) => info.stream_sequence,
            Err(e) => {
                warn!(error = %e, "GET /v1/events: message missing stream info; skipping");
                continue;
            }
        };
        let event_value = match serde_json::from_slice::<serde_json::Value>(&msg.payload) {
            Ok(v) => v,
            Err(e) => {
                warn!(seq, error = %e, "GET /v1/events: payload not JSON; skipping");
                continue;
            }
        };
        if seq > highest {
            highest = seq;
        }
        envelopes.push(EventEnvelope {
            seq,
            event: event_value,
        });
        if envelopes.len() >= limit {
            break;
        }
    }

    // FUZZ-MED-3: fetch the stream's true tail and clamp the response
    // cursor against it. We tolerate `info()` failures (None) by
    // passing `highest` through — a missing ceiling is no worse than
    // the pre-fix behaviour. We keep the same wall-clock timeout
    // wrapper used above so a degraded broker still can't pin the
    // request beyond NATS_REQUEST_TIMEOUT.
    let tail: Option<u64> = match tokio::time::timeout(NATS_REQUEST_TIMEOUT, stream.info()).await {
        Ok(Ok(info)) => Some(info.state.last_sequence),
        Ok(Err(e)) => {
            debug!(
                stream = crate::jetstream::STREAM_NAME,
                error = %e,
                stage = "stream_info",
                "GET /v1/events: stream.info() failed; cursor will not be clamped",
            );
            None
        }
        Err(_elapsed) => {
            debug!(
                stream = crate::jetstream::STREAM_NAME,
                timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
                stage = "stream_info",
                "GET /v1/events: stream.info() timed out; cursor will not be clamped",
            );
            None
        }
    };

    Ok(Json(EventsResponse {
        events: envelopes,
        cursor: clamp_cursor(highest, tail),
    }))
}

/// FUZZ-MED-3: clamp a response cursor to the stream's true tail.
///
/// The handler computes `supplied` as the highest sequence it observed
/// in the batch, defaulting to the caller's `?since=` when the batch
/// was empty. If the caller's `since` was past the stream's tail (a
/// hostile or stale client), `supplied` would echo that bogus value
/// back and a naive follow-up `?since=cursor` would silently skip
/// real events when they arrived. We clamp to `tail` when `tail` is
/// known: the response cursor cannot exceed the highest sequence the
/// stream has actually retained.
///
/// `tail = None` covers the no-broker / stream-info-unreachable case;
/// we pass the supplied value through unchanged because there's no
/// trustworthy ceiling to clamp against.
///
/// Pulled out so the boundary conditions can be exhaustively unit-
/// tested without driving a real JetStream.
pub(crate) fn clamp_cursor(supplied: u64, tail: Option<u64>) -> u64 {
    match tail {
        Some(t) => supplied.min(t),
        None => supplied,
    }
}

/// Apply the documented `?limit=` defaults and cap. Pulled out so the
/// clamping behaviour can be unit-tested without spinning a router.
pub(crate) fn clamp_limit(requested: Option<usize>) -> usize {
    match requested {
        None => DEFAULT_LIMIT,
        Some(0) => DEFAULT_LIMIT, // `?limit=0` is meaningless → use default rather than error
        Some(n) if n > MAX_LIMIT => MAX_LIMIT,
        Some(n) => n,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn clamp_limit_default_when_unset() {
        assert_eq!(clamp_limit(None), DEFAULT_LIMIT);
    }

    #[test]
    fn clamp_limit_default_when_zero() {
        // `?limit=0` is meaningless — fall back to default rather than
        // returning an empty page (which would force the client to
        // re-request).
        assert_eq!(clamp_limit(Some(0)), DEFAULT_LIMIT);
    }

    #[test]
    fn clamp_limit_caps_at_max() {
        assert_eq!(clamp_limit(Some(MAX_LIMIT + 1)), MAX_LIMIT);
        assert_eq!(clamp_limit(Some(usize::MAX)), MAX_LIMIT);
    }

    #[test]
    fn clamp_limit_passes_through_in_range() {
        assert_eq!(clamp_limit(Some(1)), 1);
        assert_eq!(clamp_limit(Some(50)), 50);
        assert_eq!(clamp_limit(Some(MAX_LIMIT)), MAX_LIMIT);
    }

    /// FUZZ-MED-3: a client supplying `?since=` past the stream's
    /// true tail must NOT see that bogus value echoed back. We clamp
    /// to the tail so a follow-up `?since=cursor` cannot skip ahead
    /// of real events that will arrive.
    #[test]
    fn clamp_cursor_clamps_future_cursor_to_stream_tail() {
        // Supplied 1_000_000, tail is 42 → clamp to 42.
        assert_eq!(clamp_cursor(1_000_000, Some(42)), 42);
        // Just-past-tail by one → clamp to tail.
        assert_eq!(clamp_cursor(43, Some(42)), 42);
    }

    /// FUZZ-MED-3: a client supplying `?since=` that is at or below
    /// the stream's tail is a legitimate cursor — pass it through
    /// unchanged so the contract that `cursor` is monotone-non-
    /// decreasing across requests holds.
    #[test]
    fn clamp_cursor_passes_through_in_range() {
        // Strictly below the tail.
        assert_eq!(clamp_cursor(10, Some(42)), 10);
        // Exactly at the tail.
        assert_eq!(clamp_cursor(42, Some(42)), 42);
        // Zero (the cold-start sentinel).
        assert_eq!(clamp_cursor(0, Some(42)), 0);
    }

    /// FUZZ-MED-3: when the stream tail is unknown (info() failure,
    /// no broker), we cannot clamp — pass `supplied` through so the
    /// handler stays at parity with its pre-fix behaviour rather than
    /// silently zeroing the cursor.
    #[test]
    fn clamp_cursor_no_op_when_tail_unknown() {
        assert_eq!(clamp_cursor(0, None), 0);
        assert_eq!(clamp_cursor(42, None), 42);
        assert_eq!(clamp_cursor(1_000_000, None), 1_000_000);
    }
}