cellos_server/routes/events.rs
1//! `GET /v1/events` — one-shot snapshot of recent CloudEvents.
2//!
3//! EVT-001 (E2E report): `cellctl events` (without `--follow`) needs an
4//! HTTP fallback for environments where WebSocket isn't viable — same
5//! ergonomics as `kubectl get events`. The live-tail path
6//! (`/ws/events`, see `ws.rs`) continues to serve `cellctl events
7//! --follow` and the web UI.
8//!
9//! ## Response shape
10//!
11//! ```json
12//! {
13//! "events": [{ "seq": 12345, "event": { /* CloudEvent */ } }, …],
14//! "cursor": 12345
15//! }
16//! ```
17//!
18//! `seq` matches the WebSocket envelope (`ws.rs::build_envelope`) so
19//! clients can mix snapshot + follow flows without translating shapes:
20//! open `/v1/events?since=<N>`, take the returned `cursor`, then
21//! `/ws/events?since=<cursor>` to live-tail with no gap.
22//!
23//! `cursor` is the highest `seq` in the returned page. On an empty
24//! page we return the JetStream tail (`state.last_sequence`) when it
25//! is known, otherwise the caller's `since` echoed back. The tail
26//! clamp (FUZZ-MED-3) prevents a hostile or stale client from pinning
27//! the cursor to a future sequence — a client that asks
28//! `?since=999999999` on a stream whose real tail is 42 sees
29//! `cursor: 42`, not `999999999`. This is monotone: a follow-up
30//! `?since=cursor` will not skip ahead of the stream's true tail.
31//!
32//! ## Query parameters
33//!
34//! - `since=<seq>` (optional, default 0) — return events with
35//! `seq > since`. `0` means "from the oldest retained event".
36//! - `limit=<n>` (optional, default 100, clamped to 1..=1000) — page
37//! ceiling. The handler clamps in-band rather than 400-ing on a bad
38//! value because the cellctl/web flow is read-only and a stale client
39//! that asked for `limit=99999` deserves a useful (capped) answer
40//! rather than an error toast.
41//!
42//! ## What this is NOT
43//!
44//! - Not a search interface. Subject-filtering is `?subject=` on the
45//! WS path and is intentionally absent here; one-shot snapshot is a
46//! "give me the recent firehose" tool, not a query language.
47//! - Not durable. The handler creates an *ephemeral* JetStream consumer
48//! per request, drains a single batch, and drops the consumer.
49//! `inactive_threshold` on the consumer config bounds broker-side
50//! cleanup if our process dies between batch-fetch and consumer-drop.
51
52use axum::extract::{Query, State};
53use axum::http::HeaderMap;
54use axum::Json;
55use futures_util::StreamExt;
56use serde::{Deserialize, Serialize};
57use tracing::{debug, warn};
58
59use crate::auth::require_bearer;
60use crate::error::AppError;
61use crate::jetstream::{create_ephemeral_consumer, deliver_policy_for};
62use crate::state::AppState;
63
64/// Default page size when `?limit=` is omitted. Aligned with kubectl's
65/// `--chunk-size=500` lineage but trimmed to 100 because the typical
66/// `cellctl events` viewer is a human-readable terminal, not a paginated
67/// batch crawler.
68const DEFAULT_LIMIT: usize = 100;
69
70/// Hard cap on the page size — operators can ask for more but the server
71/// will not pay more than this in one round-trip. 1000 is well above the
72/// usual viewer's screen (60-200 lines) and well below the per-request
73/// budget the broker is willing to deliver before its own batch timeout
74/// kicks in (`REPLAY_BATCH_TIMEOUT` in `jetstream.rs`).
75const MAX_LIMIT: usize = 1000;
76
77/// How long the handler waits for a JetStream batch before deciding the
78/// caller has caught up to the tail. Short on purpose — a long timeout
79/// just blocks a viewer that already has the snapshot it asked for.
80const FETCH_TIMEOUT_MS: u64 = 250;
81
82/// FUZZ-CRIT-1 (wave-1 report §Critical): wall-clock cap on every NATS-
83/// side request the handler issues (`get_stream`, `create_ephemeral_
84/// consumer`, batch `messages()`). Without this, an unreachable broker
85/// turns each `/v1/events` request into a ~10s hang (async-nats default
86/// JetStream request timeout is 5s; chained with a reconnect storm the
87/// wave-1 fuzzer saw 10s wall-clock 500s). 1500 ms is well above the
88/// p99 of a healthy local-network NATS round-trip (low double-digit ms)
89/// and short enough that a flood of `/v1/events` during a NATS partition
90/// can't pin handler tasks long enough to be a DoS lever.
91const NATS_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1500);
92
93/// Query parameters for `GET /v1/events`. See module-level docs for the
94/// semantics. `serde(default)` so missing fields use the defaults below
95/// without rejecting the request — kubectl-style ergonomics.
96#[derive(Debug, Deserialize, Default)]
97pub struct EventsQuery {
98 /// Return events with `seq > since`. `None`/`0` means "from the
99 /// oldest retained event" (JetStream's `DeliverPolicy::All`).
100 #[serde(default)]
101 pub since: Option<u64>,
102 /// Page size; clamped to `[1, MAX_LIMIT]` after parsing.
103 #[serde(default)]
104 pub limit: Option<usize>,
105}
106
107/// One envelope in the response page. Wire-compatible with the
108/// `/ws/events` frame shape (`ws.rs::build_envelope`).
109#[derive(Debug, Serialize)]
110pub struct EventEnvelope {
111 pub seq: u64,
112 pub event: serde_json::Value,
113}
114
115/// Response body. `cursor` is the highest seq in `events`, or `since`
116/// echoed back when the page is empty (see module-level docs).
117#[derive(Debug, Serialize)]
118pub struct EventsResponse {
119 pub events: Vec<EventEnvelope>,
120 pub cursor: u64,
121}
122
123/// `GET /v1/events` — Bearer-protected one-shot snapshot of the
124/// JetStream event log. See module docs for the contract.
125pub async fn list_events(
126 State(state): State<AppState>,
127 headers: HeaderMap,
128 Query(query): Query<EventsQuery>,
129) -> Result<Json<EventsResponse>, AppError> {
130 require_bearer(&headers, &state.api_token)?;
131
132 let since = query.since.unwrap_or(0);
133 let limit = clamp_limit(query.limit);
134
135 // No broker configured (test mode, NATS unreachable at startup):
136 // return the current cursor with no events rather than a 5xx so
137 // operators inspecting a broker-less projection get a useful
138 // answer. The cursor still reflects whatever was replayed at boot.
139 let Some(ctx) = state.jetstream.clone() else {
140 debug!("GET /v1/events: no JetStream context configured; returning empty snapshot");
141 return Ok(Json(EventsResponse {
142 events: Vec::new(),
143 cursor: state.cursor().max(since),
144 }));
145 };
146
147 // FUZZ-CRIT-1: wrap every NATS-side call in a hard wall-clock
148 // timeout AND map any error (timeout, connection failure, JetStream
149 // RPC error) to a redacted 503. Operators get the underlying error
150 // via the WARN log; clients see only the generic problem+json body.
151 let mut stream = match tokio::time::timeout(
152 NATS_REQUEST_TIMEOUT,
153 ctx.get_stream(crate::jetstream::STREAM_NAME),
154 )
155 .await
156 {
157 Ok(Ok(s)) => s,
158 Ok(Err(e)) => {
159 warn!(
160 stream = crate::jetstream::STREAM_NAME,
161 error = %e,
162 stage = "get_stream",
163 "GET /v1/events: JetStream request failed; returning 503",
164 );
165 return Err(AppError::service_unavailable());
166 }
167 Err(_elapsed) => {
168 warn!(
169 stream = crate::jetstream::STREAM_NAME,
170 timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
171 stage = "get_stream",
172 "GET /v1/events: JetStream get_stream timed out; returning 503",
173 );
174 return Err(AppError::service_unavailable());
175 }
176 };
177
178 // `since == 0` → DeliverPolicy::All (everything currently retained).
179 // `since > 0` → DeliverPolicy::ByStartSequence { since + 1 }, which
180 // is exactly the ADR-0015 §D3 contract the WS path already uses.
181 let policy = if since == 0 {
182 deliver_policy_for(None)
183 } else {
184 deliver_policy_for(Some(since))
185 };
186
187 let consumer = match tokio::time::timeout(
188 NATS_REQUEST_TIMEOUT,
189 create_ephemeral_consumer(&stream, policy, None),
190 )
191 .await
192 {
193 Ok(Ok(c)) => c,
194 Ok(Err(e)) => {
195 warn!(
196 stream = crate::jetstream::STREAM_NAME,
197 error = %format!("{e:#}"),
198 stage = "create_ephemeral_consumer",
199 "GET /v1/events: JetStream consumer create failed; returning 503",
200 );
201 return Err(AppError::service_unavailable());
202 }
203 Err(_elapsed) => {
204 warn!(
205 stream = crate::jetstream::STREAM_NAME,
206 timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
207 stage = "create_ephemeral_consumer",
208 "GET /v1/events: JetStream consumer create timed out; returning 503",
209 );
210 return Err(AppError::service_unavailable());
211 }
212 };
213
214 let mut batch = match tokio::time::timeout(
215 NATS_REQUEST_TIMEOUT,
216 consumer
217 .fetch()
218 .max_messages(limit)
219 .expires(std::time::Duration::from_millis(FETCH_TIMEOUT_MS))
220 .messages(),
221 )
222 .await
223 {
224 Ok(Ok(b)) => b,
225 Ok(Err(e)) => {
226 warn!(
227 stream = crate::jetstream::STREAM_NAME,
228 error = %format!("{e:#}"),
229 stage = "fetch_batch",
230 "GET /v1/events: JetStream batch fetch failed; returning 503",
231 );
232 return Err(AppError::service_unavailable());
233 }
234 Err(_elapsed) => {
235 warn!(
236 stream = crate::jetstream::STREAM_NAME,
237 timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
238 stage = "fetch_batch",
239 "GET /v1/events: JetStream batch fetch timed out; returning 503",
240 );
241 return Err(AppError::service_unavailable());
242 }
243 };
244
245 let mut envelopes: Vec<EventEnvelope> = Vec::with_capacity(limit.min(64));
246 let mut highest: u64 = since;
247
248 while let Some(msg) = batch.next().await {
249 let msg = match msg {
250 Ok(m) => m,
251 Err(e) => {
252 warn!(error = %e, "GET /v1/events: batch read error; ending page");
253 break;
254 }
255 };
256 let seq = match msg.info() {
257 Ok(info) => info.stream_sequence,
258 Err(e) => {
259 warn!(error = %e, "GET /v1/events: message missing stream info; skipping");
260 continue;
261 }
262 };
263 let event_value = match serde_json::from_slice::<serde_json::Value>(&msg.payload) {
264 Ok(v) => v,
265 Err(e) => {
266 warn!(seq, error = %e, "GET /v1/events: payload not JSON; skipping");
267 continue;
268 }
269 };
270 if seq > highest {
271 highest = seq;
272 }
273 envelopes.push(EventEnvelope {
274 seq,
275 event: event_value,
276 });
277 if envelopes.len() >= limit {
278 break;
279 }
280 }
281
282 // FUZZ-MED-3: fetch the stream's true tail and clamp the response
283 // cursor against it. We tolerate `info()` failures (None) by
284 // passing `highest` through — a missing ceiling is no worse than
285 // the pre-fix behaviour. We keep the same wall-clock timeout
286 // wrapper used above so a degraded broker still can't pin the
287 // request beyond NATS_REQUEST_TIMEOUT.
288 let tail: Option<u64> = match tokio::time::timeout(NATS_REQUEST_TIMEOUT, stream.info()).await {
289 Ok(Ok(info)) => Some(info.state.last_sequence),
290 Ok(Err(e)) => {
291 debug!(
292 stream = crate::jetstream::STREAM_NAME,
293 error = %e,
294 stage = "stream_info",
295 "GET /v1/events: stream.info() failed; cursor will not be clamped",
296 );
297 None
298 }
299 Err(_elapsed) => {
300 debug!(
301 stream = crate::jetstream::STREAM_NAME,
302 timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
303 stage = "stream_info",
304 "GET /v1/events: stream.info() timed out; cursor will not be clamped",
305 );
306 None
307 }
308 };
309
310 Ok(Json(EventsResponse {
311 events: envelopes,
312 cursor: clamp_cursor(highest, tail),
313 }))
314}
315
316/// FUZZ-MED-3: clamp a response cursor to the stream's true tail.
317///
318/// The handler computes `supplied` as the highest sequence it observed
319/// in the batch, defaulting to the caller's `?since=` when the batch
320/// was empty. If the caller's `since` was past the stream's tail (a
321/// hostile or stale client), `supplied` would echo that bogus value
322/// back and a naive follow-up `?since=cursor` would silently skip
323/// real events when they arrived. We clamp to `tail` when `tail` is
324/// known: the response cursor cannot exceed the highest sequence the
325/// stream has actually retained.
326///
327/// `tail = None` covers the no-broker / stream-info-unreachable case;
328/// we pass the supplied value through unchanged because there's no
329/// trustworthy ceiling to clamp against.
330///
331/// Pulled out so the boundary conditions can be exhaustively unit-
332/// tested without driving a real JetStream.
333pub(crate) fn clamp_cursor(supplied: u64, tail: Option<u64>) -> u64 {
334 match tail {
335 Some(t) => supplied.min(t),
336 None => supplied,
337 }
338}
339
340/// Apply the documented `?limit=` defaults and cap. Pulled out so the
341/// clamping behaviour can be unit-tested without spinning a router.
342pub(crate) fn clamp_limit(requested: Option<usize>) -> usize {
343 match requested {
344 None => DEFAULT_LIMIT,
345 Some(0) => DEFAULT_LIMIT, // `?limit=0` is meaningless → use default rather than error
346 Some(n) if n > MAX_LIMIT => MAX_LIMIT,
347 Some(n) => n,
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354
355 #[test]
356 fn clamp_limit_default_when_unset() {
357 assert_eq!(clamp_limit(None), DEFAULT_LIMIT);
358 }
359
360 #[test]
361 fn clamp_limit_default_when_zero() {
362 // `?limit=0` is meaningless — fall back to default rather than
363 // returning an empty page (which would force the client to
364 // re-request).
365 assert_eq!(clamp_limit(Some(0)), DEFAULT_LIMIT);
366 }
367
368 #[test]
369 fn clamp_limit_caps_at_max() {
370 assert_eq!(clamp_limit(Some(MAX_LIMIT + 1)), MAX_LIMIT);
371 assert_eq!(clamp_limit(Some(usize::MAX)), MAX_LIMIT);
372 }
373
374 #[test]
375 fn clamp_limit_passes_through_in_range() {
376 assert_eq!(clamp_limit(Some(1)), 1);
377 assert_eq!(clamp_limit(Some(50)), 50);
378 assert_eq!(clamp_limit(Some(MAX_LIMIT)), MAX_LIMIT);
379 }
380
381 /// FUZZ-MED-3: a client supplying `?since=` past the stream's
382 /// true tail must NOT see that bogus value echoed back. We clamp
383 /// to the tail so a follow-up `?since=cursor` cannot skip ahead
384 /// of real events that will arrive.
385 #[test]
386 fn clamp_cursor_clamps_future_cursor_to_stream_tail() {
387 // Supplied 1_000_000, tail is 42 → clamp to 42.
388 assert_eq!(clamp_cursor(1_000_000, Some(42)), 42);
389 // Just-past-tail by one → clamp to tail.
390 assert_eq!(clamp_cursor(43, Some(42)), 42);
391 }
392
393 /// FUZZ-MED-3: a client supplying `?since=` that is at or below
394 /// the stream's tail is a legitimate cursor — pass it through
395 /// unchanged so the contract that `cursor` is monotone-non-
396 /// decreasing across requests holds.
397 #[test]
398 fn clamp_cursor_passes_through_in_range() {
399 // Strictly below the tail.
400 assert_eq!(clamp_cursor(10, Some(42)), 10);
401 // Exactly at the tail.
402 assert_eq!(clamp_cursor(42, Some(42)), 42);
403 // Zero (the cold-start sentinel).
404 assert_eq!(clamp_cursor(0, Some(42)), 0);
405 }
406
407 /// FUZZ-MED-3: when the stream tail is unknown (info() failure,
408 /// no broker), we cannot clamp — pass `supplied` through so the
409 /// handler stays at parity with its pre-fix behaviour rather than
410 /// silently zeroing the cursor.
411 #[test]
412 fn clamp_cursor_no_op_when_tail_unknown() {
413 assert_eq!(clamp_cursor(0, None), 0);
414 assert_eq!(clamp_cursor(42, None), 42);
415 assert_eq!(clamp_cursor(1_000_000, None), 1_000_000);
416 }
417}