cellos-server 0.5.2

HTTP control plane for CellOS — admission, projection over JetStream, WebSocket fan-out of CloudEvents. Pure event-sourced architecture.
Documentation
//! WebSocket → NATS bridge.
//!
//! `GET /ws/events` upgrades the connection, subscribes to
//! `cellos.events.>` (or the optional `?subject=` override), and forwards
//! every NATS message as a JSON envelope:
//!
//! ```json
//! { "seq": 12345, "event": { /* CloudEvent */ } }
//! ```
//!
//! `seq` is the cursor described in ADR-0015. When the underlying
//! subscription is a JetStream consumer it is the JetStream stream
//! sequence; when it is a core-NATS subscription (today's MVP path) it
//! is a per-connection monotonic counter. Either way the contract on
//! the wire is the same: `seq` is monotonic, and the snapshot
//! endpoint's `cursor` is comparable to it on the same broker.
//!
//! ## ADR-0015 §D3 — `?since=<seq>` resume
//!
//! Clients open `/ws/events?since=<cursor>` to resume after a
//! reconnect. With a JetStream consumer this maps directly to
//! `DeliverPolicy::ByStartSequence { OptStartSeq: since + 1 }`. With
//! the core-NATS bridge the parameter is *accepted but cannot replay
//! history* — the bridge only delivers messages published after the
//! subscription was created. The contract is preserved (every frame
//! still carries `seq`) so clients work transparently against either
//! bridge; migration to JetStream is tracked as a follow-up. Omitting
//! `since` keeps today's "new messages only" behavior for ad-hoc
//! subscribers (cellctl `--follow`, debug tools).
//!
//! ## ADR-0015 §D6 — heartbeat
//!
//! A WebSocket can sit idle on a quiet stream without either side
//! knowing the underlying socket has died. The server sends a `Ping`
//! frame every 25 seconds when idle; axum's WebSocket type handles the
//! pong roundtrip transparently.

use std::time::Duration;

use axum::extract::ws::{Message, WebSocket};
use axum::extract::{Query, State, WebSocketUpgrade};
use axum::http::HeaderMap;
use axum::response::IntoResponse;
use futures_util::{SinkExt, StreamExt};
use serde::Deserialize;
use tokio::time::interval;
use tracing::{debug, info, warn};

use crate::auth::require_bearer;
use crate::error::AppError;
use crate::jetstream::{looks_like_retention_exhausted, open_ws_message_stream, stream_first_seq};
use crate::state::AppState;

const DEFAULT_SUBJECT: &str = "cellos.events.>";
/// ADR-0015 §D6 — heartbeat interval. The web view treats >45s of
/// silence as a dead connection; 25s keeps us well inside that budget.
const HEARTBEAT: Duration = Duration::from_secs(25);

/// Per-frame / per-message ceiling on inbound WebSocket data.
///
/// axum defaults to 16 MiB / 64 MiB. `/ws/events` is a one-way feed
/// from the server to the client — we deliberately ignore all inbound
/// frames other than `Close`. Accepting the default ceiling would let
/// an authed client park a 64 MiB frame in the kernel buffer for free
/// (resource amplification post-auth). 64 KiB is far more than any
/// control frame a future client might send and below the threshold
/// where a single misbehaving client can hurt the server.
const WS_MAX_FRAME_BYTES: usize = 64 * 1024;

/// Per-frame send timeout for the data path. Red-team wave 2 (MED-W2A-2):
/// without a bounded send, a client whose TCP receive window is wedged
/// (suspended laptop, congested link) parks the entire WS task on
/// `tx.send().await` — heartbeats stop, client-close goes unobserved,
/// the JetStream pull stream backs up behind the un-drained channel,
/// and the broker doesn't reclaim the consumer until
/// `EPHEMERAL_INACTIVE_THRESHOLD` (5 min). 50s is comfortably above
/// two heartbeat intervals; anything longer than that and the client
/// is effectively gone.
const WS_SEND_TIMEOUT: Duration = Duration::from_secs(50);

#[derive(Debug, Deserialize)]
pub struct WsParams {
    /// Optional NATS subject filter. Defaults to `cellos.events.>` which
    /// receives every CloudEvent the platform emits. Callers can scope
    /// to a tenant with e.g. `?subject=cellos.events.tenant1.>`.
    pub subject: Option<String>,
    /// ADR-0015 §D3 — resume cursor. When present the server starts
    /// delivery at `since + 1`. With the core-NATS MVP bridge this is
    /// accepted but historical replay is unavailable; the contract
    /// (every frame carries `seq`) is preserved either way so clients
    /// don't branch on the bridge implementation.
    pub since: Option<u64>,
}

pub async fn ws_events(
    State(state): State<AppState>,
    headers: HeaderMap,
    Query(params): Query<WsParams>,
    ws: WebSocketUpgrade,
) -> Result<impl IntoResponse, AppError> {
    // Auth runs BEFORE the upgrade so an unauthenticated client sees
    // 401 problem+json rather than a confusing protocol error.
    require_bearer(&headers, &state.api_token)?;

    let subject = params.subject.unwrap_or_else(|| DEFAULT_SUBJECT.to_owned());
    let since = params.since;
    // Cap inbound frame/message size — see WS_MAX_FRAME_BYTES. axum's
    // defaults (16 MiB / 64 MiB) are too generous for a one-way feed.
    let ws = ws
        .max_message_size(WS_MAX_FRAME_BYTES)
        .max_frame_size(WS_MAX_FRAME_BYTES);
    Ok(ws.on_upgrade(move |socket| handle_socket(socket, state, subject, since)))
}

async fn handle_socket(socket: WebSocket, state: AppState, subject: String, since: Option<u64>) {
    let Some(ctx) = state.jetstream.clone() else {
        warn!("ws connect with no JetStream context configured; closing");
        let _ = socket
            .send_close_with_reason("no upstream broker configured")
            .await;
        return;
    };

    let subject_filter = if subject == DEFAULT_SUBJECT {
        None
    } else {
        Some(subject.as_str())
    };

    // ADR-0015 §D3 — open the JetStream consumer with the right
    // DeliverPolicy. When `since` is provided we resume at `since+1`;
    // otherwise we live-tail from the next published message.
    let mut messages = match open_ws_message_stream(&ctx, subject_filter, since).await {
        Ok(s) => s,
        Err(e) => {
            warn!(error = %format!("{e:#}"), subject = %subject, since = ?since, "jetstream consumer create failed");
            // ADR-0015 §D7 — when `since` is older than the stream's
            // retention floor, close with the 4410 retention-exhausted
            // contract. The client treats 4410 as "drop cache,
            // re-hydrate from snapshot, reconnect at new cursor".
            if since.is_some() && looks_like_retention_exhausted(&e) {
                let oldest = stream_first_seq(&ctx).await;
                close_retention_exhausted(socket, oldest).await;
            } else {
                let _ = socket.send_close_with_reason("subscribe failed").await;
            }
            return;
        }
    };

    info!(
        subject = %subject,
        since = ?since,
        "ws client connected, bridging JetStream messages",
    );
    let (mut tx, mut rx) = socket.split();

    let mut heartbeat = interval(HEARTBEAT);
    // The first tick fires immediately; skip it so we don't ping
    // before the client has had a chance to settle.
    heartbeat.tick().await;

    loop {
        tokio::select! {
            // Red-team wave 2 (MED-W2A-1): `biased` so client-close and
            // heartbeat ticks take priority over a saturated message
            // firehose. Without this, a flood on `cellos.events.>` could
            // starve `rx.next()` and leave a dead consumer pinned on the
            // broker for the full 5-min inactive-threshold window after
            // the client gives up.
            biased;
            incoming = rx.next() => {
                match incoming {
                    Some(Ok(Message::Close(_))) | None => {
                        debug!("ws client closed");
                        break;
                    }
                    Some(Err(e)) => {
                        warn!(error = %e, "ws recv error");
                        break;
                    }
                    // Ignore inbound pings/pongs/text/binary — this is a
                    // one-way feed for the MVP.
                    Some(Ok(_)) => {}
                }
            }
            _ = heartbeat.tick() => {
                // ADR-0015 §D6 — keepalive. axum's WebSocket auto-
                // replies to pings *from* the client; sending one
                // *to* the client is on us. Empty payload is fine —
                // the client only cares that a frame arrived.
                match tokio::time::timeout(WS_SEND_TIMEOUT, tx.send(Message::Ping(Vec::new()))).await {
                    Ok(Ok(())) => {}
                    Ok(Err(_)) => {
                        debug!("ws heartbeat send failed; client gone");
                        break;
                    }
                    Err(_) => {
                        warn!("ws heartbeat send timed out after {:?}; closing", WS_SEND_TIMEOUT);
                        break;
                    }
                }
            }
            msg = messages.next() => {
                match msg {
                    Some(Ok(m)) => {
                        // ADR-0015 §D1 — seq is the JetStream stream
                        // sequence, broker-authoritative across reconnects.
                        let seq = match m.info() {
                            Ok(info) => info.stream_sequence,
                            Err(e) => {
                                warn!(error = %e, "ws msg missing stream info; skipping");
                                continue;
                            }
                        };
                        let payload = match build_envelope(seq, &m.payload) {
                            Ok(s) => s,
                            Err(EnvelopeError::NotUtf8) => {
                                warn!(subject = %subject, "dropping non-utf8 jetstream payload");
                                continue;
                            }
                            Err(EnvelopeError::NotJson(e)) => {
                                warn!(
                                    subject = %subject,
                                    error = %e,
                                    "dropping non-json jetstream payload",
                                );
                                continue;
                            }
                        };
                        // ADR-0015 §D2 — bump the projection cursor so
                        // future snapshot fetches advertise the latest
                        // applied seq. Monotonic; bump_cursor handles
                        // out-of-order under concurrent connections.
                        state.bump_cursor(seq);
                        // Red-team wave 2 (MED-W2A-2): bounded send.
                        // Without a timeout a wedged TCP receive window
                        // (suspended laptop, congested link) parks the
                        // entire WS task on `tx.send().await`, starving
                        // heartbeats and rx, and pinning the broker
                        // consumer for `EPHEMERAL_INACTIVE_THRESHOLD`.
                        match tokio::time::timeout(WS_SEND_TIMEOUT, tx.send(Message::Text(payload))).await {
                            Ok(Ok(())) => {}
                            Ok(Err(_)) => {
                                debug!("ws send failed; client gone");
                                break;
                            }
                            Err(_) => {
                                warn!(seq, "ws send timed out after {:?}; closing", WS_SEND_TIMEOUT);
                                break;
                            }
                        }
                        // The consumer is created with `AckPolicy::None`
                        // (see jetstream.rs::create_ephemeral_consumer)
                        // so JetStream never expects an ack and never
                        // redelivers. This call is a no-op under the
                        // current policy; we keep it so that flipping
                        // the consumer to `AckPolicy::Explicit` later
                        // does not silently lose ack semantics.
                        if let Err(e) = m.ack().await {
                            debug!(seq, error = %e, "jetstream ack failed (AckPolicy::None)");
                        }
                    }
                    Some(Err(e)) => {
                        warn!(error = %e, "jetstream message error; closing ws");
                        break;
                    }
                    None => {
                        debug!("jetstream message stream ended");
                        break;
                    }
                }
            }
        }
    }

    info!(subject = %subject, "ws client disconnected");
}

/// ADR-0015 §D7 — close the WebSocket with the 4410 retention-exhausted
/// contract. Sends a problem+json text frame describing the failure,
/// then the close frame with the custom 4410 code. The client treats
/// this as "drop my cached projection, re-hydrate from snapshot, and
/// reconnect at the new cursor".
async fn close_retention_exhausted(mut socket: WebSocket, oldest_seq: Option<u64>) {
    let problem = serde_json::json!({
        "type": "/problems/ws/retention-exhausted",
        "title": "Cursor older than stream retention",
        "oldest_seq": oldest_seq,
    });
    let _ = socket.send(Message::Text(problem.to_string())).await;
    let _ = socket
        .send(Message::Close(Some(axum::extract::ws::CloseFrame {
            code: 4410,
            reason: "retention-exhausted".into(),
        })))
        .await;
}

/// Build the ADR-0015 `{seq, event}` envelope JSON for a single NATS
/// payload. Pulled out of the hot loop so tests can exercise the
/// contract without spinning up a NATS broker.
///
/// Returns `Err` if the payload is not valid UTF-8 or not valid JSON —
/// the bridge logs and drops in both cases, but the test surface needs
/// to distinguish them.
pub(crate) fn build_envelope(seq: u64, payload: &[u8]) -> Result<String, EnvelopeError> {
    let s = std::str::from_utf8(payload).map_err(|_| EnvelopeError::NotUtf8)?;
    let event_value: serde_json::Value = serde_json::from_str(s).map_err(EnvelopeError::NotJson)?;
    let envelope = serde_json::json!({ "seq": seq, "event": event_value });
    Ok(envelope.to_string())
}

#[derive(Debug)]
pub(crate) enum EnvelopeError {
    NotUtf8,
    NotJson(serde_json::Error),
}

/// Helper trait so the early-exit paths can write a close frame without
/// re-stating the (code, reason) tuple at every call site.
trait CloseExt {
    async fn send_close_with_reason(self, reason: &'static str) -> Result<(), axum::Error>;
}

impl CloseExt for WebSocket {
    async fn send_close_with_reason(mut self, reason: &'static str) -> Result<(), axum::Error> {
        self.send(Message::Close(Some(axum::extract::ws::CloseFrame {
            code: axum::extract::ws::close_code::POLICY,
            reason: reason.into(),
        })))
        .await
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    /// ADR-0015 §D1 — every `/ws/events` frame is a JSON envelope
    /// `{seq, event}` where `event` is the CloudEvent parsed as a
    /// structured value (not a string-of-JSON). This is the contract
    /// the web view reducer keys off of, so the test is deliberately
    /// at the byte level.
    #[test]
    fn ws_envelope_carries_seq() {
        let cloud_event = serde_json::json!({
            "specversion": "1.0",
            "type": "io.cellos.formation.v1.created",
            "source": "/formations/abc",
            "id": "evt-1",
            "data": { "name": "demo" }
        });
        let payload = serde_json::to_vec(&cloud_event).unwrap();

        let frame = build_envelope(42, &payload).expect("envelope build");
        let parsed: serde_json::Value = serde_json::from_str(&frame).unwrap();

        assert_eq!(
            parsed["seq"].as_u64(),
            Some(42),
            "envelope must carry the seq as the cursor field; got {}",
            parsed["seq"]
        );
        assert!(
            parsed["event"].is_object(),
            "event must be a structured JSON object, not a string-of-JSON; got {}",
            parsed["event"]
        );
        assert_eq!(parsed["event"]["type"], "io.cellos.formation.v1.created");
        assert_eq!(parsed["event"]["data"]["name"], "demo");
    }

    #[test]
    fn ws_envelope_rejects_non_utf8_payload() {
        // Invalid UTF-8 sequence; must be dropped rather than mangled
        // so the producer-side bug surfaces on the wire test, not as
        // garbled frames on the client.
        let bad = [0xffu8, 0xfe, 0xfd];
        match build_envelope(1, &bad) {
            Err(EnvelopeError::NotUtf8) => {}
            other => panic!("expected NotUtf8, got {other:?}"),
        }
    }

    #[test]
    fn ws_envelope_rejects_non_json_payload() {
        // Producers MUST emit CloudEvent JSON. Plain text is a contract
        // violation and the bridge drops it.
        let bad = b"hello, world";
        match build_envelope(1, bad) {
            Err(EnvelopeError::NotJson(_)) => {}
            other => panic!("expected NotJson, got {other:?}"),
        }
    }
}