cellos_server/routes/events.rs
1//! `GET /v1/events` — one-shot snapshot of recent CloudEvents.
2//!
3//! EVT-001 (E2E report): `cellctl events` (without `--follow`) needs an
4//! HTTP fallback for environments where WebSocket isn't viable — same
5//! ergonomics as `kubectl get events`. The live-tail path
6//! (`/ws/events`, see `ws.rs`) continues to serve `cellctl events
7//! --follow` and the web UI.
8//!
9//! ## Response shape
10//!
11//! ```json
12//! {
13//! "events": [{ "seq": 12345, "event": { /* CloudEvent */ } }, …],
14//! "cursor": 12345
15//! }
16//! ```
17//!
18//! `seq` matches the WebSocket envelope (`ws.rs::build_envelope`) so
19//! clients can mix snapshot + follow flows without translating shapes:
20//! open `/v1/events?since=<N>`, take the returned `cursor`, then
21//! `/ws/events?since=<cursor>` to live-tail with no gap.
22//!
23//! `cursor` is the highest `seq` in the returned page, or the caller's
24//! `since` echoed back when the page is empty (the JetStream tail has
25//! caught up to the requested cursor). It is NEVER lower than `since`,
26//! so a client can use `cursor` as the next page's `since` without
27//! risking duplicate delivery on a quiet stream.
28//!
29//! ## Query parameters
30//!
31//! - `since=<seq>` (optional, default 0) — return events with
32//! `seq > since`. `0` means "from the oldest retained event".
33//! - `limit=<n>` (optional, default 100, clamped to 1..=1000) — page
34//! ceiling. The handler clamps in-band rather than 400-ing on a bad
35//! value because the cellctl/web flow is read-only and a stale client
36//! that asked for `limit=99999` deserves a useful (capped) answer
37//! rather than an error toast.
38//!
39//! ## What this is NOT
40//!
41//! - Not a search interface. Subject-filtering is `?subject=` on the
42//! WS path and is intentionally absent here; one-shot snapshot is a
43//! "give me the recent firehose" tool, not a query language.
44//! - Not durable. The handler creates an *ephemeral* JetStream consumer
45//! per request, drains a single batch, and drops the consumer.
46//! `inactive_threshold` on the consumer config bounds broker-side
47//! cleanup if our process dies between batch-fetch and consumer-drop.
48
49use axum::extract::{Query, State};
50use axum::http::HeaderMap;
51use axum::Json;
52use futures_util::StreamExt;
53use serde::{Deserialize, Serialize};
54use tracing::{debug, warn};
55
56use crate::auth::require_bearer;
57use crate::error::AppError;
58use crate::jetstream::{create_ephemeral_consumer, deliver_policy_for};
59use crate::state::AppState;
60
61/// Default page size when `?limit=` is omitted. Aligned with kubectl's
62/// `--chunk-size=500` lineage but trimmed to 100 because the typical
63/// `cellctl events` viewer is a human-readable terminal, not a paginated
64/// batch crawler.
65const DEFAULT_LIMIT: usize = 100;
66
67/// Hard cap on the page size — operators can ask for more but the server
68/// will not pay more than this in one round-trip. 1000 is well above the
69/// usual viewer's screen (60-200 lines) and well below the per-request
70/// budget the broker is willing to deliver before its own batch timeout
71/// kicks in (`REPLAY_BATCH_TIMEOUT` in `jetstream.rs`).
72const MAX_LIMIT: usize = 1000;
73
74/// How long the handler waits for a JetStream batch before deciding the
75/// caller has caught up to the tail. Short on purpose — a long timeout
76/// just blocks a viewer that already has the snapshot it asked for.
77const FETCH_TIMEOUT_MS: u64 = 250;
78
79/// FUZZ-CRIT-1 (wave-1 report §Critical): wall-clock cap on every NATS-
80/// side request the handler issues (`get_stream`, `create_ephemeral_
81/// consumer`, batch `messages()`). Without this, an unreachable broker
82/// turns each `/v1/events` request into a ~10s hang (async-nats default
83/// JetStream request timeout is 5s; chained with a reconnect storm the
84/// wave-1 fuzzer saw 10s wall-clock 500s). 1500 ms is well above the
85/// p99 of a healthy local-network NATS round-trip (low double-digit ms)
86/// and short enough that a flood of `/v1/events` during a NATS partition
87/// can't pin handler tasks long enough to be a DoS lever.
88const NATS_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1500);
89
90/// Query parameters for `GET /v1/events`. See module-level docs for the
91/// semantics. `serde(default)` so missing fields use the defaults below
92/// without rejecting the request — kubectl-style ergonomics.
93#[derive(Debug, Deserialize, Default)]
94pub struct EventsQuery {
95 /// Return events with `seq > since`. `None`/`0` means "from the
96 /// oldest retained event" (JetStream's `DeliverPolicy::All`).
97 #[serde(default)]
98 pub since: Option<u64>,
99 /// Page size; clamped to `[1, MAX_LIMIT]` after parsing.
100 #[serde(default)]
101 pub limit: Option<usize>,
102}
103
104/// One envelope in the response page. Wire-compatible with the
105/// `/ws/events` frame shape (`ws.rs::build_envelope`).
106#[derive(Debug, Serialize)]
107pub struct EventEnvelope {
108 pub seq: u64,
109 pub event: serde_json::Value,
110}
111
112/// Response body. `cursor` is the highest seq in `events`, or `since`
113/// echoed back when the page is empty (see module-level docs).
114#[derive(Debug, Serialize)]
115pub struct EventsResponse {
116 pub events: Vec<EventEnvelope>,
117 pub cursor: u64,
118}
119
120/// `GET /v1/events` — Bearer-protected one-shot snapshot of the
121/// JetStream event log. See module docs for the contract.
122pub async fn list_events(
123 State(state): State<AppState>,
124 headers: HeaderMap,
125 Query(query): Query<EventsQuery>,
126) -> Result<Json<EventsResponse>, AppError> {
127 require_bearer(&headers, &state.api_token)?;
128
129 let since = query.since.unwrap_or(0);
130 let limit = clamp_limit(query.limit);
131
132 // No broker configured (test mode, NATS unreachable at startup):
133 // return the current cursor with no events rather than a 5xx so
134 // operators inspecting a broker-less projection get a useful
135 // answer. The cursor still reflects whatever was replayed at boot.
136 let Some(ctx) = state.jetstream.clone() else {
137 debug!("GET /v1/events: no JetStream context configured; returning empty snapshot");
138 return Ok(Json(EventsResponse {
139 events: Vec::new(),
140 cursor: state.cursor().max(since),
141 }));
142 };
143
144 // FUZZ-CRIT-1: wrap every NATS-side call in a hard wall-clock
145 // timeout AND map any error (timeout, connection failure, JetStream
146 // RPC error) to a redacted 503. Operators get the underlying error
147 // via the WARN log; clients see only the generic problem+json body.
148 let stream = match tokio::time::timeout(
149 NATS_REQUEST_TIMEOUT,
150 ctx.get_stream(crate::jetstream::STREAM_NAME),
151 )
152 .await
153 {
154 Ok(Ok(s)) => s,
155 Ok(Err(e)) => {
156 warn!(
157 stream = crate::jetstream::STREAM_NAME,
158 error = %e,
159 stage = "get_stream",
160 "GET /v1/events: JetStream request failed; returning 503",
161 );
162 return Err(AppError::service_unavailable());
163 }
164 Err(_elapsed) => {
165 warn!(
166 stream = crate::jetstream::STREAM_NAME,
167 timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
168 stage = "get_stream",
169 "GET /v1/events: JetStream get_stream timed out; returning 503",
170 );
171 return Err(AppError::service_unavailable());
172 }
173 };
174
175 // `since == 0` → DeliverPolicy::All (everything currently retained).
176 // `since > 0` → DeliverPolicy::ByStartSequence { since + 1 }, which
177 // is exactly the ADR-0015 §D3 contract the WS path already uses.
178 let policy = if since == 0 {
179 deliver_policy_for(None)
180 } else {
181 deliver_policy_for(Some(since))
182 };
183
184 let consumer = match tokio::time::timeout(
185 NATS_REQUEST_TIMEOUT,
186 create_ephemeral_consumer(&stream, policy, None),
187 )
188 .await
189 {
190 Ok(Ok(c)) => c,
191 Ok(Err(e)) => {
192 warn!(
193 stream = crate::jetstream::STREAM_NAME,
194 error = %format!("{e:#}"),
195 stage = "create_ephemeral_consumer",
196 "GET /v1/events: JetStream consumer create failed; returning 503",
197 );
198 return Err(AppError::service_unavailable());
199 }
200 Err(_elapsed) => {
201 warn!(
202 stream = crate::jetstream::STREAM_NAME,
203 timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
204 stage = "create_ephemeral_consumer",
205 "GET /v1/events: JetStream consumer create timed out; returning 503",
206 );
207 return Err(AppError::service_unavailable());
208 }
209 };
210
211 let mut batch = match tokio::time::timeout(
212 NATS_REQUEST_TIMEOUT,
213 consumer
214 .fetch()
215 .max_messages(limit)
216 .expires(std::time::Duration::from_millis(FETCH_TIMEOUT_MS))
217 .messages(),
218 )
219 .await
220 {
221 Ok(Ok(b)) => b,
222 Ok(Err(e)) => {
223 warn!(
224 stream = crate::jetstream::STREAM_NAME,
225 error = %format!("{e:#}"),
226 stage = "fetch_batch",
227 "GET /v1/events: JetStream batch fetch failed; returning 503",
228 );
229 return Err(AppError::service_unavailable());
230 }
231 Err(_elapsed) => {
232 warn!(
233 stream = crate::jetstream::STREAM_NAME,
234 timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
235 stage = "fetch_batch",
236 "GET /v1/events: JetStream batch fetch timed out; returning 503",
237 );
238 return Err(AppError::service_unavailable());
239 }
240 };
241
242 let mut envelopes: Vec<EventEnvelope> = Vec::with_capacity(limit.min(64));
243 let mut highest: u64 = since;
244
245 while let Some(msg) = batch.next().await {
246 let msg = match msg {
247 Ok(m) => m,
248 Err(e) => {
249 warn!(error = %e, "GET /v1/events: batch read error; ending page");
250 break;
251 }
252 };
253 let seq = match msg.info() {
254 Ok(info) => info.stream_sequence,
255 Err(e) => {
256 warn!(error = %e, "GET /v1/events: message missing stream info; skipping");
257 continue;
258 }
259 };
260 let event_value = match serde_json::from_slice::<serde_json::Value>(&msg.payload) {
261 Ok(v) => v,
262 Err(e) => {
263 warn!(seq, error = %e, "GET /v1/events: payload not JSON; skipping");
264 continue;
265 }
266 };
267 if seq > highest {
268 highest = seq;
269 }
270 envelopes.push(EventEnvelope {
271 seq,
272 event: event_value,
273 });
274 if envelopes.len() >= limit {
275 break;
276 }
277 }
278
279 Ok(Json(EventsResponse {
280 events: envelopes,
281 cursor: highest,
282 }))
283}
284
285/// Apply the documented `?limit=` defaults and cap. Pulled out so the
286/// clamping behaviour can be unit-tested without spinning a router.
287pub(crate) fn clamp_limit(requested: Option<usize>) -> usize {
288 match requested {
289 None => DEFAULT_LIMIT,
290 Some(0) => DEFAULT_LIMIT, // `?limit=0` is meaningless → use default rather than error
291 Some(n) if n > MAX_LIMIT => MAX_LIMIT,
292 Some(n) => n,
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299
300 #[test]
301 fn clamp_limit_default_when_unset() {
302 assert_eq!(clamp_limit(None), DEFAULT_LIMIT);
303 }
304
305 #[test]
306 fn clamp_limit_default_when_zero() {
307 // `?limit=0` is meaningless — fall back to default rather than
308 // returning an empty page (which would force the client to
309 // re-request).
310 assert_eq!(clamp_limit(Some(0)), DEFAULT_LIMIT);
311 }
312
313 #[test]
314 fn clamp_limit_caps_at_max() {
315 assert_eq!(clamp_limit(Some(MAX_LIMIT + 1)), MAX_LIMIT);
316 assert_eq!(clamp_limit(Some(usize::MAX)), MAX_LIMIT);
317 }
318
319 #[test]
320 fn clamp_limit_passes_through_in_range() {
321 assert_eq!(clamp_limit(Some(1)), 1);
322 assert_eq!(clamp_limit(Some(50)), 50);
323 assert_eq!(clamp_limit(Some(MAX_LIMIT)), MAX_LIMIT);
324 }
325}