cellos-server 0.5.2

HTTP control plane for CellOS — admission, projection over JetStream, WebSocket fan-out of CloudEvents. Pure event-sourced architecture.
Documentation
//! `GET /v1/events` — one-shot snapshot of recent CloudEvents.
//!
//! EVT-001 (E2E report): `cellctl events` (without `--follow`) needs an
//! HTTP fallback for environments where WebSocket isn't viable — same
//! ergonomics as `kubectl get events`. The live-tail path
//! (`/ws/events`, see `ws.rs`) continues to serve `cellctl events
//! --follow` and the web UI.
//!
//! ## Response shape
//!
//! ```json
//! {
//!   "events": [{ "seq": 12345, "event": { /* CloudEvent */ } }, …],
//!   "cursor": 12345
//! }
//! ```
//!
//! `seq` matches the WebSocket envelope (`ws.rs::build_envelope`) so
//! clients can mix snapshot + follow flows without translating shapes:
//! open `/v1/events?since=<N>`, take the returned `cursor`, then
//! `/ws/events?since=<cursor>` to live-tail with no gap.
//!
//! `cursor` is the highest `seq` in the returned page, or the caller's
//! `since` echoed back when the page is empty (the JetStream tail has
//! caught up to the requested cursor). It is NEVER lower than `since`,
//! so a client can use `cursor` as the next page's `since` without
//! risking duplicate delivery on a quiet stream.
//!
//! ## Query parameters
//!
//! - `since=<seq>` (optional, default 0) — return events with
//!   `seq > since`. `0` means "from the oldest retained event".
//! - `limit=<n>` (optional, default 100, clamped to 1..=1000) — page
//!   ceiling. The handler clamps in-band rather than 400-ing on a bad
//!   value because the cellctl/web flow is read-only and a stale client
//!   that asked for `limit=99999` deserves a useful (capped) answer
//!   rather than an error toast.
//!
//! ## What this is NOT
//!
//! - Not a search interface. Subject-filtering is `?subject=` on the
//!   WS path and is intentionally absent here; one-shot snapshot is a
//!   "give me the recent firehose" tool, not a query language.
//! - Not durable. The handler creates an *ephemeral* JetStream consumer
//!   per request, drains a single batch, and drops the consumer.
//!   `inactive_threshold` on the consumer config bounds broker-side
//!   cleanup if our process dies between batch-fetch and consumer-drop.

use axum::extract::{Query, State};
use axum::http::HeaderMap;
use axum::Json;
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};

use crate::auth::require_bearer;
use crate::error::AppError;
use crate::jetstream::{create_ephemeral_consumer, deliver_policy_for};
use crate::state::AppState;

/// Default page size when `?limit=` is omitted. Aligned with kubectl's
/// `--chunk-size=500` lineage but trimmed to 100 because the typical
/// `cellctl events` viewer is a human-readable terminal, not a paginated
/// batch crawler.
const DEFAULT_LIMIT: usize = 100;

/// Hard cap on the page size — operators can ask for more but the server
/// will not pay more than this in one round-trip. 1000 is well above the
/// usual viewer's screen (60-200 lines) and well below the per-request
/// budget the broker is willing to deliver before its own batch timeout
/// kicks in (`REPLAY_BATCH_TIMEOUT` in `jetstream.rs`).
const MAX_LIMIT: usize = 1000;

/// How long the handler waits for a JetStream batch before deciding the
/// caller has caught up to the tail. Short on purpose — a long timeout
/// just blocks a viewer that already has the snapshot it asked for.
const FETCH_TIMEOUT_MS: u64 = 250;

/// FUZZ-CRIT-1 (wave-1 report §Critical): wall-clock cap on every NATS-
/// side request the handler issues (`get_stream`, `create_ephemeral_
/// consumer`, batch `messages()`). Without this, an unreachable broker
/// turns each `/v1/events` request into a ~10s hang (async-nats default
/// JetStream request timeout is 5s; chained with a reconnect storm the
/// wave-1 fuzzer saw 10s wall-clock 500s). 1500 ms is well above the
/// p99 of a healthy local-network NATS round-trip (low double-digit ms)
/// and short enough that a flood of `/v1/events` during a NATS partition
/// can't pin handler tasks long enough to be a DoS lever.
const NATS_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1500);

/// Query parameters for `GET /v1/events`. See module-level docs for the
/// semantics. `serde(default)` so missing fields use the defaults below
/// without rejecting the request — kubectl-style ergonomics.
#[derive(Debug, Deserialize, Default)]
pub struct EventsQuery {
    /// Return events with `seq > since`. `None`/`0` means "from the
    /// oldest retained event" (JetStream's `DeliverPolicy::All`).
    #[serde(default)]
    pub since: Option<u64>,
    /// Page size; clamped to `[1, MAX_LIMIT]` after parsing.
    #[serde(default)]
    pub limit: Option<usize>,
}

/// One envelope in the response page. Wire-compatible with the
/// `/ws/events` frame shape (`ws.rs::build_envelope`).
#[derive(Debug, Serialize)]
pub struct EventEnvelope {
    pub seq: u64,
    pub event: serde_json::Value,
}

/// Response body. `cursor` is the highest seq in `events`, or `since`
/// echoed back when the page is empty (see module-level docs).
#[derive(Debug, Serialize)]
pub struct EventsResponse {
    pub events: Vec<EventEnvelope>,
    pub cursor: u64,
}

/// `GET /v1/events` — Bearer-protected one-shot snapshot of the
/// JetStream event log. See module docs for the contract.
pub async fn list_events(
    State(state): State<AppState>,
    headers: HeaderMap,
    Query(query): Query<EventsQuery>,
) -> Result<Json<EventsResponse>, AppError> {
    require_bearer(&headers, &state.api_token)?;

    let since = query.since.unwrap_or(0);
    let limit = clamp_limit(query.limit);

    // No broker configured (test mode, NATS unreachable at startup):
    // return the current cursor with no events rather than a 5xx so
    // operators inspecting a broker-less projection get a useful
    // answer. The cursor still reflects whatever was replayed at boot.
    let Some(ctx) = state.jetstream.clone() else {
        debug!("GET /v1/events: no JetStream context configured; returning empty snapshot");
        return Ok(Json(EventsResponse {
            events: Vec::new(),
            cursor: state.cursor().max(since),
        }));
    };

    // FUZZ-CRIT-1: wrap every NATS-side call in a hard wall-clock
    // timeout AND map any error (timeout, connection failure, JetStream
    // RPC error) to a redacted 503. Operators get the underlying error
    // via the WARN log; clients see only the generic problem+json body.
    let stream = match tokio::time::timeout(
        NATS_REQUEST_TIMEOUT,
        ctx.get_stream(crate::jetstream::STREAM_NAME),
    )
    .await
    {
        Ok(Ok(s)) => s,
        Ok(Err(e)) => {
            warn!(
                stream = crate::jetstream::STREAM_NAME,
                error = %e,
                stage = "get_stream",
                "GET /v1/events: JetStream request failed; returning 503",
            );
            return Err(AppError::service_unavailable());
        }
        Err(_elapsed) => {
            warn!(
                stream = crate::jetstream::STREAM_NAME,
                timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
                stage = "get_stream",
                "GET /v1/events: JetStream get_stream timed out; returning 503",
            );
            return Err(AppError::service_unavailable());
        }
    };

    // `since == 0` → DeliverPolicy::All (everything currently retained).
    // `since > 0` → DeliverPolicy::ByStartSequence { since + 1 }, which
    // is exactly the ADR-0015 §D3 contract the WS path already uses.
    let policy = if since == 0 {
        deliver_policy_for(None)
    } else {
        deliver_policy_for(Some(since))
    };

    let consumer = match tokio::time::timeout(
        NATS_REQUEST_TIMEOUT,
        create_ephemeral_consumer(&stream, policy, None),
    )
    .await
    {
        Ok(Ok(c)) => c,
        Ok(Err(e)) => {
            warn!(
                stream = crate::jetstream::STREAM_NAME,
                error = %format!("{e:#}"),
                stage = "create_ephemeral_consumer",
                "GET /v1/events: JetStream consumer create failed; returning 503",
            );
            return Err(AppError::service_unavailable());
        }
        Err(_elapsed) => {
            warn!(
                stream = crate::jetstream::STREAM_NAME,
                timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
                stage = "create_ephemeral_consumer",
                "GET /v1/events: JetStream consumer create timed out; returning 503",
            );
            return Err(AppError::service_unavailable());
        }
    };

    let mut batch = match tokio::time::timeout(
        NATS_REQUEST_TIMEOUT,
        consumer
            .fetch()
            .max_messages(limit)
            .expires(std::time::Duration::from_millis(FETCH_TIMEOUT_MS))
            .messages(),
    )
    .await
    {
        Ok(Ok(b)) => b,
        Ok(Err(e)) => {
            warn!(
                stream = crate::jetstream::STREAM_NAME,
                error = %format!("{e:#}"),
                stage = "fetch_batch",
                "GET /v1/events: JetStream batch fetch failed; returning 503",
            );
            return Err(AppError::service_unavailable());
        }
        Err(_elapsed) => {
            warn!(
                stream = crate::jetstream::STREAM_NAME,
                timeout_ms = NATS_REQUEST_TIMEOUT.as_millis() as u64,
                stage = "fetch_batch",
                "GET /v1/events: JetStream batch fetch timed out; returning 503",
            );
            return Err(AppError::service_unavailable());
        }
    };

    let mut envelopes: Vec<EventEnvelope> = Vec::with_capacity(limit.min(64));
    let mut highest: u64 = since;

    while let Some(msg) = batch.next().await {
        let msg = match msg {
            Ok(m) => m,
            Err(e) => {
                warn!(error = %e, "GET /v1/events: batch read error; ending page");
                break;
            }
        };
        let seq = match msg.info() {
            Ok(info) => info.stream_sequence,
            Err(e) => {
                warn!(error = %e, "GET /v1/events: message missing stream info; skipping");
                continue;
            }
        };
        let event_value = match serde_json::from_slice::<serde_json::Value>(&msg.payload) {
            Ok(v) => v,
            Err(e) => {
                warn!(seq, error = %e, "GET /v1/events: payload not JSON; skipping");
                continue;
            }
        };
        if seq > highest {
            highest = seq;
        }
        envelopes.push(EventEnvelope {
            seq,
            event: event_value,
        });
        if envelopes.len() >= limit {
            break;
        }
    }

    Ok(Json(EventsResponse {
        events: envelopes,
        cursor: highest,
    }))
}

/// Apply the documented `?limit=` defaults and cap. Pulled out so the
/// clamping behaviour can be unit-tested without spinning a router.
pub(crate) fn clamp_limit(requested: Option<usize>) -> usize {
    match requested {
        None => DEFAULT_LIMIT,
        Some(0) => DEFAULT_LIMIT, // `?limit=0` is meaningless → use default rather than error
        Some(n) if n > MAX_LIMIT => MAX_LIMIT,
        Some(n) => n,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn clamp_limit_default_when_unset() {
        assert_eq!(clamp_limit(None), DEFAULT_LIMIT);
    }

    #[test]
    fn clamp_limit_default_when_zero() {
        // `?limit=0` is meaningless — fall back to default rather than
        // returning an empty page (which would force the client to
        // re-request).
        assert_eq!(clamp_limit(Some(0)), DEFAULT_LIMIT);
    }

    #[test]
    fn clamp_limit_caps_at_max() {
        assert_eq!(clamp_limit(Some(MAX_LIMIT + 1)), MAX_LIMIT);
        assert_eq!(clamp_limit(Some(usize::MAX)), MAX_LIMIT);
    }

    #[test]
    fn clamp_limit_passes_through_in_range() {
        assert_eq!(clamp_limit(Some(1)), 1);
        assert_eq!(clamp_limit(Some(50)), 50);
        assert_eq!(clamp_limit(Some(MAX_LIMIT)), MAX_LIMIT);
    }
}