Skip to main content

cellos_server/routes/
events.rs

1//! `GET /v1/events` — one-shot snapshot of recent CloudEvents.
2//!
3//! EVT-001 (E2E report): `cellctl events` (without `--follow`) needs an
4//! HTTP fallback for environments where WebSocket isn't viable — same
5//! ergonomics as `kubectl get events`. The live-tail path
6//! (`/ws/events`, see `ws.rs`) continues to serve `cellctl events
7//! --follow` and the web UI.
8//!
9//! ## Response shape
10//!
11//! ```json
12//! {
13//!   "events": [{ "seq": 12345, "event": { /* CloudEvent */ } }, …],
14//!   "cursor": 12345
15//! }
16//! ```
17//!
18//! `seq` matches the WebSocket envelope (`ws.rs::build_envelope`) so
19//! clients can mix snapshot + follow flows without translating shapes:
20//! open `/v1/events?since=<N>`, take the returned `cursor`, then
21//! `/ws/events?since=<cursor>` to live-tail with no gap.
22//!
23//! `cursor` is the highest `seq` in the returned page. On an empty
24//! page we return the JetStream tail (`state.last_sequence`) when it
25//! is known, otherwise the caller's `since` echoed back. The tail
26//! clamp (FUZZ-MED-3) prevents a hostile or stale client from pinning
27//! the cursor to a future sequence — a client that asks
28//! `?since=999999999` on a stream whose real tail is 42 sees
29//! `cursor: 42`, not `999999999`. This is monotone: a follow-up
30//! `?since=cursor` will not skip ahead of the stream's true tail.
31//!
32//! ## Query parameters
33//!
34//! - `since=<seq>` (optional, default 0) — return events with
35//!   `seq > since`. `0` means "from the oldest retained event".
36//! - `limit=<n>` (optional, default 100, clamped to 1..=1000) — page
37//!   ceiling. The handler clamps in-band rather than 400-ing on a bad
38//!   value because the cellctl/web flow is read-only and a stale client
39//!   that asked for `limit=99999` deserves a useful (capped) answer
40//!   rather than an error toast.
41//!
42//! ## What this is NOT
43//!
44//! - Not a search interface. Subject-filtering is `?subject=` on the
45//!   WS path and is intentionally absent here; one-shot snapshot is a
46//!   "give me the recent firehose" tool, not a query language.
47//! - Not durable. The handler creates an *ephemeral* JetStream consumer
48//!   per request, drains a single batch, and drops the consumer.
49//!   `inactive_threshold` on the consumer config bounds broker-side
50//!   cleanup if our process dies between batch-fetch and consumer-drop.
51
52use axum::extract::{Query, State};
53use axum::http::HeaderMap;
54use axum::Json;
55use futures_util::StreamExt;
56use serde::{Deserialize, Serialize};
57use tracing::{debug, warn};
58
59use crate::auth::require_bearer;
60use crate::error::AppError;
61use crate::jetstream::{create_ephemeral_consumer, deliver_policy_for};
62use crate::state::AppState;
63
64/// Default page size when `?limit=` is omitted. Aligned with kubectl's
65/// `--chunk-size=500` lineage but trimmed to 100 because the typical
66/// `cellctl events` viewer is a human-readable terminal, not a paginated
67/// batch crawler.
68const DEFAULT_LIMIT: usize = 100;
69
70/// Hard cap on the page size — operators can ask for more but the server
71/// will not pay more than this in one round-trip. 1000 is well above the
72/// usual viewer's screen (60-200 lines) and well below the per-request
73/// budget the broker is willing to deliver before its own batch timeout
74/// kicks in (`REPLAY_BATCH_TIMEOUT` in `jetstream.rs`).
75const MAX_LIMIT: usize = 1000;
76
77/// How long the handler waits for a JetStream batch before deciding the
78/// caller has caught up to the tail. Short on purpose — a long timeout
79/// just blocks a viewer that already has the snapshot it asked for.
80const FETCH_TIMEOUT_MS: u64 = 250;
81
82/// FUZZ-CRIT-1 (wave-1 report §Critical): wall-clock cap on every NATS-
83/// side request the handler issues (`get_stream`, `create_ephemeral_
84/// consumer`, batch `messages()`). Without this, an unreachable broker
85/// turns each `/v1/events` request into a ~10s hang (async-nats default
86/// JetStream request timeout is 5s; chained with a reconnect storm the
87/// wave-1 fuzzer saw 10s wall-clock 500s). 1500 ms is well above the
88/// p99 of a healthy local-network NATS round-trip (low double-digit ms)
89/// and short enough that a flood of `/v1/events` during a NATS partition
90/// can't pin handler tasks long enough to be a DoS lever.
91const NATS_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1500);
92
93/// Query parameters for `GET /v1/events`. See module-level docs for the
94/// semantics. `serde(default)` so missing fields use the defaults below
95/// without rejecting the request — kubectl-style ergonomics.
96#[derive(Debug, Deserialize, Default)]
97pub struct EventsQuery {
98    /// Return events with `seq > since`. `None`/`0` means "from the
99    /// oldest retained event" (JetStream's `DeliverPolicy::All`).
100    #[serde(default)]
101    pub since: Option<u64>,
102    /// Page size; clamped to `[1, MAX_LIMIT]` after parsing.
103    #[serde(default)]
104    pub limit: Option<usize>,
105}
106
107/// One envelope in the response page. Wire-compatible with the
108/// `/ws/events` frame shape (`ws.rs::build_envelope`).
109#[derive(Debug, Serialize)]
110pub struct EventEnvelope {
111    pub seq: u64,
112    pub event: serde_json::Value,
113}
114
115/// Response body. `cursor` is the highest seq in `events`, or `since`
116/// echoed back when the page is empty (see module-level docs).
117#[derive(Debug, Serialize)]
118pub struct EventsResponse {
119    pub events: Vec<EventEnvelope>,
120    pub cursor: u64,
121}
122
123/// `GET /v1/events` — Bearer-protected one-shot snapshot of the
124/// JetStream event log. See module docs for the contract.
125pub async fn list_events(
126    State(state): State<AppState>,
127    headers: HeaderMap,
128    Query(query): Query<EventsQuery>,
129) -> Result<Json<EventsResponse>, AppError> {
130    require_bearer(&headers, &state.api_token)?;
131
132    let since = query.since.unwrap_or(0);
133    let limit = clamp_limit(query.limit);
134
135    // No broker configured (test mode, NATS unreachable at startup):
136    // return the current cursor with no events rather than a 5xx so
137    // operators inspecting a broker-less projection get a useful
138    // answer. The cursor still reflects whatever was replayed at boot.
139    let Some(ctx) = state.jetstream.clone() else {
140        debug!("GET /v1/events: no JetStream context configured; returning empty snapshot");
141        return Ok(Json(EventsResponse {
142            events: Vec::new(),
143            cursor: state.cursor().max(since),
144        }));
145    };
146
147    // FUZZ-CRIT-1: wrap every NATS-side call in a hard wall-clock
148    // timeout AND map any error (timeout, connection failure, JetStream
149    // RPC error) to a redacted 503. Operators get the underlying error
150    // via the WARN log; clients see only the generic problem+json body.
151    let mut stream = match tokio::time::timeout(
152        NATS_REQUEST_TIMEOUT,
153        ctx.get_stream(crate::jetstream::STREAM_NAME),
154    )
155    .await
156    {
157        Ok(Ok(s)) => s,
158        Ok(Err(e)) => {
159            warn!(
160                stream = crate::jetstream::STREAM_NAME,
161                error = %e,
162                stage = "get_stream",
163                "GET /v1/events: JetStream request failed; returning 503",
164            );
165            return Err(AppError::service_unavailable());
166        }
167        Err(_elapsed) => {
168            warn!(
169                stream = crate::jetstream::STREAM_NAME,
170                timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
171                stage = "get_stream",
172                "GET /v1/events: JetStream get_stream timed out; returning 503",
173            );
174            return Err(AppError::service_unavailable());
175        }
176    };
177
178    // `since == 0` → DeliverPolicy::All (everything currently retained).
179    // `since > 0` → DeliverPolicy::ByStartSequence { since + 1 }, which
180    // is exactly the ADR-0015 §D3 contract the WS path already uses.
181    let policy = if since == 0 {
182        deliver_policy_for(None)
183    } else {
184        deliver_policy_for(Some(since))
185    };
186
187    let consumer = match tokio::time::timeout(
188        NATS_REQUEST_TIMEOUT,
189        create_ephemeral_consumer(&stream, policy, None),
190    )
191    .await
192    {
193        Ok(Ok(c)) => c,
194        Ok(Err(e)) => {
195            warn!(
196                stream = crate::jetstream::STREAM_NAME,
197                error = %format!("{e:#}"),
198                stage = "create_ephemeral_consumer",
199                "GET /v1/events: JetStream consumer create failed; returning 503",
200            );
201            return Err(AppError::service_unavailable());
202        }
203        Err(_elapsed) => {
204            warn!(
205                stream = crate::jetstream::STREAM_NAME,
206                timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
207                stage = "create_ephemeral_consumer",
208                "GET /v1/events: JetStream consumer create timed out; returning 503",
209            );
210            return Err(AppError::service_unavailable());
211        }
212    };
213
214    let mut batch = match tokio::time::timeout(
215        NATS_REQUEST_TIMEOUT,
216        consumer
217            .fetch()
218            .max_messages(limit)
219            .expires(std::time::Duration::from_millis(FETCH_TIMEOUT_MS))
220            .messages(),
221    )
222    .await
223    {
224        Ok(Ok(b)) => b,
225        Ok(Err(e)) => {
226            warn!(
227                stream = crate::jetstream::STREAM_NAME,
228                error = %format!("{e:#}"),
229                stage = "fetch_batch",
230                "GET /v1/events: JetStream batch fetch failed; returning 503",
231            );
232            return Err(AppError::service_unavailable());
233        }
234        Err(_elapsed) => {
235            warn!(
236                stream = crate::jetstream::STREAM_NAME,
237                timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
238                stage = "fetch_batch",
239                "GET /v1/events: JetStream batch fetch timed out; returning 503",
240            );
241            return Err(AppError::service_unavailable());
242        }
243    };
244
245    let mut envelopes: Vec<EventEnvelope> = Vec::with_capacity(limit.min(64));
246    let mut highest: u64 = since;
247
248    while let Some(msg) = batch.next().await {
249        let msg = match msg {
250            Ok(m) => m,
251            Err(e) => {
252                warn!(error = %e, "GET /v1/events: batch read error; ending page");
253                break;
254            }
255        };
256        let seq = match msg.info() {
257            Ok(info) => info.stream_sequence,
258            Err(e) => {
259                warn!(error = %e, "GET /v1/events: message missing stream info; skipping");
260                continue;
261            }
262        };
263        let event_value = match serde_json::from_slice::<serde_json::Value>(&msg.payload) {
264            Ok(v) => v,
265            Err(e) => {
266                warn!(seq, error = %e, "GET /v1/events: payload not JSON; skipping");
267                continue;
268            }
269        };
270        if seq > highest {
271            highest = seq;
272        }
273        envelopes.push(EventEnvelope {
274            seq,
275            event: event_value,
276        });
277        if envelopes.len() >= limit {
278            break;
279        }
280    }
281
282    // FUZZ-MED-3: fetch the stream's true tail and clamp the response
283    // cursor against it. We tolerate `info()` failures (None) by
284    // passing `highest` through — a missing ceiling is no worse than
285    // the pre-fix behaviour. We keep the same wall-clock timeout
286    // wrapper used above so a degraded broker still can't pin the
287    // request beyond NATS_REQUEST_TIMEOUT.
288    let tail: Option<u64> = match tokio::time::timeout(NATS_REQUEST_TIMEOUT, stream.info()).await {
289        Ok(Ok(info)) => Some(info.state.last_sequence),
290        Ok(Err(e)) => {
291            debug!(
292                stream = crate::jetstream::STREAM_NAME,
293                error = %e,
294                stage = "stream_info",
295                "GET /v1/events: stream.info() failed; cursor will not be clamped",
296            );
297            None
298        }
299        Err(_elapsed) => {
300            debug!(
301                stream = crate::jetstream::STREAM_NAME,
302                timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
303                stage = "stream_info",
304                "GET /v1/events: stream.info() timed out; cursor will not be clamped",
305            );
306            None
307        }
308    };
309
310    Ok(Json(EventsResponse {
311        events: envelopes,
312        cursor: clamp_cursor(highest, tail),
313    }))
314}
315
316/// FUZZ-MED-3: clamp a response cursor to the stream's true tail.
317///
318/// The handler computes `supplied` as the highest sequence it observed
319/// in the batch, defaulting to the caller's `?since=` when the batch
320/// was empty. If the caller's `since` was past the stream's tail (a
321/// hostile or stale client), `supplied` would echo that bogus value
322/// back and a naive follow-up `?since=cursor` would silently skip
323/// real events when they arrived. We clamp to `tail` when `tail` is
324/// known: the response cursor cannot exceed the highest sequence the
325/// stream has actually retained.
326///
327/// `tail = None` covers the no-broker / stream-info-unreachable case;
328/// we pass the supplied value through unchanged because there's no
329/// trustworthy ceiling to clamp against.
330///
331/// Pulled out so the boundary conditions can be exhaustively unit-
332/// tested without driving a real JetStream.
333pub(crate) fn clamp_cursor(supplied: u64, tail: Option<u64>) -> u64 {
334    match tail {
335        Some(t) => supplied.min(t),
336        None => supplied,
337    }
338}
339
340/// Apply the documented `?limit=` defaults and cap. Pulled out so the
341/// clamping behaviour can be unit-tested without spinning a router.
342pub(crate) fn clamp_limit(requested: Option<usize>) -> usize {
343    match requested {
344        None => DEFAULT_LIMIT,
345        Some(0) => DEFAULT_LIMIT, // `?limit=0` is meaningless → use default rather than error
346        Some(n) if n > MAX_LIMIT => MAX_LIMIT,
347        Some(n) => n,
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354
355    #[test]
356    fn clamp_limit_default_when_unset() {
357        assert_eq!(clamp_limit(None), DEFAULT_LIMIT);
358    }
359
360    #[test]
361    fn clamp_limit_default_when_zero() {
362        // `?limit=0` is meaningless — fall back to default rather than
363        // returning an empty page (which would force the client to
364        // re-request).
365        assert_eq!(clamp_limit(Some(0)), DEFAULT_LIMIT);
366    }
367
368    #[test]
369    fn clamp_limit_caps_at_max() {
370        assert_eq!(clamp_limit(Some(MAX_LIMIT + 1)), MAX_LIMIT);
371        assert_eq!(clamp_limit(Some(usize::MAX)), MAX_LIMIT);
372    }
373
374    #[test]
375    fn clamp_limit_passes_through_in_range() {
376        assert_eq!(clamp_limit(Some(1)), 1);
377        assert_eq!(clamp_limit(Some(50)), 50);
378        assert_eq!(clamp_limit(Some(MAX_LIMIT)), MAX_LIMIT);
379    }
380
381    /// FUZZ-MED-3: a client supplying `?since=` past the stream's
382    /// true tail must NOT see that bogus value echoed back. We clamp
383    /// to the tail so a follow-up `?since=cursor` cannot skip ahead
384    /// of real events that will arrive.
385    #[test]
386    fn clamp_cursor_clamps_future_cursor_to_stream_tail() {
387        // Supplied 1_000_000, tail is 42 → clamp to 42.
388        assert_eq!(clamp_cursor(1_000_000, Some(42)), 42);
389        // Just-past-tail by one → clamp to tail.
390        assert_eq!(clamp_cursor(43, Some(42)), 42);
391    }
392
393    /// FUZZ-MED-3: a client supplying `?since=` that is at or below
394    /// the stream's tail is a legitimate cursor — pass it through
395    /// unchanged so the contract that `cursor` is monotone-non-
396    /// decreasing across requests holds.
397    #[test]
398    fn clamp_cursor_passes_through_in_range() {
399        // Strictly below the tail.
400        assert_eq!(clamp_cursor(10, Some(42)), 10);
401        // Exactly at the tail.
402        assert_eq!(clamp_cursor(42, Some(42)), 42);
403        // Zero (the cold-start sentinel).
404        assert_eq!(clamp_cursor(0, Some(42)), 0);
405    }
406
407    /// FUZZ-MED-3: when the stream tail is unknown (info() failure,
408    /// no broker), we cannot clamp — pass `supplied` through so the
409    /// handler stays at parity with its pre-fix behaviour rather than
410    /// silently zeroing the cursor.
411    #[test]
412    fn clamp_cursor_no_op_when_tail_unknown() {
413        assert_eq!(clamp_cursor(0, None), 0);
414        assert_eq!(clamp_cursor(42, None), 42);
415        assert_eq!(clamp_cursor(1_000_000, None), 1_000_000);
416    }
417}