Skip to main content

cellos_server/routes/
events.rs

1//! `GET /v1/events` — one-shot snapshot of recent CloudEvents.
2//!
3//! EVT-001 (E2E report): `cellctl events` (without `--follow`) needs an
4//! HTTP fallback for environments where WebSocket isn't viable — same
5//! ergonomics as `kubectl get events`. The live-tail path
6//! (`/ws/events`, see `ws.rs`) continues to serve `cellctl events
7//! --follow` and the web UI.
8//!
9//! ## Response shape
10//!
11//! ```json
12//! {
13//!   "events": [{ "seq": 12345, "event": { /* CloudEvent */ } }, …],
14//!   "cursor": 12345
15//! }
16//! ```
17//!
18//! `seq` matches the WebSocket envelope (`ws.rs::build_envelope`) so
19//! clients can mix snapshot + follow flows without translating shapes:
20//! open `/v1/events?since=<N>`, take the returned `cursor`, then
21//! `/ws/events?since=<cursor>` to live-tail with no gap.
22//!
23//! `cursor` is the highest `seq` in the returned page, or the caller's
24//! `since` echoed back when the page is empty (the JetStream tail has
25//! caught up to the requested cursor). It is NEVER lower than `since`,
26//! so a client can use `cursor` as the next page's `since` without
27//! risking duplicate delivery on a quiet stream.
28//!
29//! ## Query parameters
30//!
31//! - `since=<seq>` (optional, default 0) — return events with
32//!   `seq > since`. `0` means "from the oldest retained event".
33//! - `limit=<n>` (optional, default 100, clamped to 1..=1000) — page
34//!   ceiling. The handler clamps in-band rather than 400-ing on a bad
35//!   value because the cellctl/web flow is read-only and a stale client
36//!   that asked for `limit=99999` deserves a useful (capped) answer
37//!   rather than an error toast.
38//!
39//! ## What this is NOT
40//!
41//! - Not a search interface. Subject-filtering is `?subject=` on the
42//!   WS path and is intentionally absent here; one-shot snapshot is a
43//!   "give me the recent firehose" tool, not a query language.
44//! - Not durable. The handler creates an *ephemeral* JetStream consumer
45//!   per request, drains a single batch, and drops the consumer.
46//!   `inactive_threshold` on the consumer config bounds broker-side
47//!   cleanup if our process dies between batch-fetch and consumer-drop.
48
49use axum::extract::{Query, State};
50use axum::http::HeaderMap;
51use axum::Json;
52use futures_util::StreamExt;
53use serde::{Deserialize, Serialize};
54use tracing::{debug, warn};
55
56use crate::auth::require_bearer;
57use crate::error::{AppError, AppErrorKind};
58use crate::jetstream::{create_ephemeral_consumer, deliver_policy_for};
59use crate::state::AppState;
60
61/// Default page size when `?limit=` is omitted. Aligned with kubectl's
62/// `--chunk-size=500` lineage but trimmed to 100 because the typical
63/// `cellctl events` viewer is a human-readable terminal, not a paginated
64/// batch crawler.
65const DEFAULT_LIMIT: usize = 100;
66
67/// Hard cap on the page size — operators can ask for more but the server
68/// will not pay more than this in one round-trip. 1000 is well above the
69/// usual viewer's screen (60-200 lines) and well below the per-request
70/// budget the broker is willing to deliver before its own batch timeout
71/// kicks in (`REPLAY_BATCH_TIMEOUT` in `jetstream.rs`).
72const MAX_LIMIT: usize = 1000;
73
74/// How long the handler waits for a JetStream batch before deciding the
75/// caller has caught up to the tail. Short on purpose — a long timeout
76/// just blocks a viewer that already has the snapshot it asked for.
77const FETCH_TIMEOUT_MS: u64 = 250;
78
79/// Query parameters for `GET /v1/events`. See module-level docs for the
80/// semantics. `serde(default)` so missing fields use the defaults below
81/// without rejecting the request — kubectl-style ergonomics.
82#[derive(Debug, Deserialize, Default)]
83pub struct EventsQuery {
84    /// Return events with `seq > since`. `None`/`0` means "from the
85    /// oldest retained event" (JetStream's `DeliverPolicy::All`).
86    #[serde(default)]
87    pub since: Option<u64>,
88    /// Page size; clamped to `[1, MAX_LIMIT]` after parsing.
89    #[serde(default)]
90    pub limit: Option<usize>,
91}
92
93/// One envelope in the response page. Wire-compatible with the
94/// `/ws/events` frame shape (`ws.rs::build_envelope`).
95#[derive(Debug, Serialize)]
96pub struct EventEnvelope {
97    pub seq: u64,
98    pub event: serde_json::Value,
99}
100
101/// Response body. `cursor` is the highest seq in `events`, or `since`
102/// echoed back when the page is empty (see module-level docs).
103#[derive(Debug, Serialize)]
104pub struct EventsResponse {
105    pub events: Vec<EventEnvelope>,
106    pub cursor: u64,
107}
108
109/// `GET /v1/events` — Bearer-protected one-shot snapshot of the
110/// JetStream event log. See module docs for the contract.
111pub async fn list_events(
112    State(state): State<AppState>,
113    headers: HeaderMap,
114    Query(query): Query<EventsQuery>,
115) -> Result<Json<EventsResponse>, AppError> {
116    require_bearer(&headers, &state.api_token)?;
117
118    let since = query.since.unwrap_or(0);
119    let limit = clamp_limit(query.limit);
120
121    // No broker configured (test mode, NATS unreachable at startup):
122    // return the current cursor with no events rather than a 5xx so
123    // operators inspecting a broker-less projection get a useful
124    // answer. The cursor still reflects whatever was replayed at boot.
125    let Some(ctx) = state.jetstream.clone() else {
126        debug!("GET /v1/events: no JetStream context configured; returning empty snapshot");
127        return Ok(Json(EventsResponse {
128            events: Vec::new(),
129            cursor: state.cursor().max(since),
130        }));
131    };
132
133    let stream = match ctx.get_stream(crate::jetstream::STREAM_NAME).await {
134        Ok(s) => s,
135        Err(e) => {
136            warn!(error = %e, "GET /v1/events: get_stream failed");
137            return Err(AppError::new(
138                AppErrorKind::Internal,
139                format!("get_stream {}: {e}", crate::jetstream::STREAM_NAME),
140            ));
141        }
142    };
143
144    // `since == 0` → DeliverPolicy::All (everything currently retained).
145    // `since > 0` → DeliverPolicy::ByStartSequence { since + 1 }, which
146    // is exactly the ADR-0015 §D3 contract the WS path already uses.
147    let policy = if since == 0 {
148        deliver_policy_for(None)
149    } else {
150        deliver_policy_for(Some(since))
151    };
152
153    let consumer = create_ephemeral_consumer(&stream, policy, None)
154        .await
155        .map_err(|e| {
156            AppError::new(
157                AppErrorKind::Internal,
158                format!("create one-shot consumer: {e:#}"),
159            )
160        })?;
161
162    let mut batch = consumer
163        .fetch()
164        .max_messages(limit)
165        .expires(std::time::Duration::from_millis(FETCH_TIMEOUT_MS))
166        .messages()
167        .await
168        .map_err(|e| AppError::new(AppErrorKind::Internal, format!("fetch events batch: {e:#}")))?;
169
170    let mut envelopes: Vec<EventEnvelope> = Vec::with_capacity(limit.min(64));
171    let mut highest: u64 = since;
172
173    while let Some(msg) = batch.next().await {
174        let msg = match msg {
175            Ok(m) => m,
176            Err(e) => {
177                warn!(error = %e, "GET /v1/events: batch read error; ending page");
178                break;
179            }
180        };
181        let seq = match msg.info() {
182            Ok(info) => info.stream_sequence,
183            Err(e) => {
184                warn!(error = %e, "GET /v1/events: message missing stream info; skipping");
185                continue;
186            }
187        };
188        let event_value = match serde_json::from_slice::<serde_json::Value>(&msg.payload) {
189            Ok(v) => v,
190            Err(e) => {
191                warn!(seq, error = %e, "GET /v1/events: payload not JSON; skipping");
192                continue;
193            }
194        };
195        if seq > highest {
196            highest = seq;
197        }
198        envelopes.push(EventEnvelope {
199            seq,
200            event: event_value,
201        });
202        if envelopes.len() >= limit {
203            break;
204        }
205    }
206
207    Ok(Json(EventsResponse {
208        events: envelopes,
209        cursor: highest,
210    }))
211}
212
213/// Apply the documented `?limit=` defaults and cap. Pulled out so the
214/// clamping behaviour can be unit-tested without spinning a router.
215pub(crate) fn clamp_limit(requested: Option<usize>) -> usize {
216    match requested {
217        None => DEFAULT_LIMIT,
218        Some(0) => DEFAULT_LIMIT, // `?limit=0` is meaningless → use default rather than error
219        Some(n) if n > MAX_LIMIT => MAX_LIMIT,
220        Some(n) => n,
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227
228    #[test]
229    fn clamp_limit_default_when_unset() {
230        assert_eq!(clamp_limit(None), DEFAULT_LIMIT);
231    }
232
233    #[test]
234    fn clamp_limit_default_when_zero() {
235        // `?limit=0` is meaningless — fall back to default rather than
236        // returning an empty page (which would force the client to
237        // re-request).
238        assert_eq!(clamp_limit(Some(0)), DEFAULT_LIMIT);
239    }
240
241    #[test]
242    fn clamp_limit_caps_at_max() {
243        assert_eq!(clamp_limit(Some(MAX_LIMIT + 1)), MAX_LIMIT);
244        assert_eq!(clamp_limit(Some(usize::MAX)), MAX_LIMIT);
245    }
246
247    #[test]
248    fn clamp_limit_passes_through_in_range() {
249        assert_eq!(clamp_limit(Some(1)), 1);
250        assert_eq!(clamp_limit(Some(50)), 50);
251        assert_eq!(clamp_limit(Some(MAX_LIMIT)), MAX_LIMIT);
252    }
253}