Skip to main content

cellos_server/
ws.rs

1//! WebSocket → NATS bridge.
2//!
3//! `GET /ws/events` upgrades the connection, subscribes to
4//! `cellos.events.>` (or the optional `?subject=` override), and forwards
5//! every NATS message as a JSON envelope:
6//!
7//! ```json
8//! { "seq": 12345, "event": { /* CloudEvent */ } }
9//! ```
10//!
11//! `seq` is the cursor described in ADR-0015. When the underlying
12//! subscription is a JetStream consumer it is the JetStream stream
13//! sequence; when it is a core-NATS subscription (today's MVP path) it
14//! is a per-connection monotonic counter. Either way the contract on
15//! the wire is the same: `seq` is monotonic, and the snapshot
16//! endpoint's `cursor` is comparable to it on the same broker.
17//!
18//! ## ADR-0015 §D3 — `?since=<seq>` resume
19//!
20//! Clients open `/ws/events?since=<cursor>` to resume after a
21//! reconnect. With a JetStream consumer this maps directly to
22//! `DeliverPolicy::ByStartSequence { OptStartSeq: since + 1 }`. With
23//! the core-NATS bridge the parameter is *accepted but cannot replay
24//! history* — the bridge only delivers messages published after the
25//! subscription was created. The contract is preserved (every frame
26//! still carries `seq`) so clients work transparently against either
27//! bridge; migration to JetStream is tracked as a follow-up. Omitting
28//! `since` keeps today's "new messages only" behavior for ad-hoc
29//! subscribers (cellctl `--follow`, debug tools).
30//!
31//! ## ADR-0015 §D6 — heartbeat
32//!
33//! A WebSocket can sit idle on a quiet stream without either side
34//! knowing the underlying socket has died. The server sends a `Ping`
35//! frame every 25 seconds when idle; axum's WebSocket type handles the
36//! pong roundtrip transparently.
37
38use std::time::Duration;
39
40use axum::extract::ws::{Message, WebSocket};
41use axum::extract::{Query, State, WebSocketUpgrade};
42use axum::http::HeaderMap;
43use axum::response::IntoResponse;
44use futures_util::{SinkExt, StreamExt};
45use serde::Deserialize;
46use tokio::time::interval;
47use tracing::{debug, info, warn};
48
49use crate::auth::require_bearer;
50use crate::error::AppError;
51use crate::jetstream::{looks_like_retention_exhausted, open_ws_message_stream, stream_first_seq};
52use crate::state::AppState;
53
54const DEFAULT_SUBJECT: &str = "cellos.events.>";
55/// ADR-0015 §D6 — heartbeat interval. The web view treats >45s of
56/// silence as a dead connection; 25s keeps us well inside that budget.
57const HEARTBEAT: Duration = Duration::from_secs(25);
58
59/// Per-frame / per-message ceiling on inbound WebSocket data.
60///
61/// axum defaults to 16 MiB / 64 MiB. `/ws/events` is a one-way feed
62/// from the server to the client — we deliberately ignore all inbound
63/// frames other than `Close`. Accepting the default ceiling would let
64/// an authed client park a 64 MiB frame in the kernel buffer for free
65/// (resource amplification post-auth). 64 KiB is far more than any
66/// control frame a future client might send and below the threshold
67/// where a single misbehaving client can hurt the server.
68const WS_MAX_FRAME_BYTES: usize = 64 * 1024;
69
70/// Per-frame send timeout for the data path. Red-team wave 2 (MED-W2A-2):
71/// without a bounded send, a client whose TCP receive window is wedged
72/// (suspended laptop, congested link) parks the entire WS task on
73/// `tx.send().await` — heartbeats stop, client-close goes unobserved,
74/// the JetStream pull stream backs up behind the un-drained channel,
75/// and the broker doesn't reclaim the consumer until
76/// `EPHEMERAL_INACTIVE_THRESHOLD` (5 min). 50s is comfortably above
77/// two heartbeat intervals; anything longer than that and the client
78/// is effectively gone.
79const WS_SEND_TIMEOUT: Duration = Duration::from_secs(50);
80
81#[derive(Debug, Deserialize)]
82pub struct WsParams {
83    /// Optional NATS subject filter. Defaults to `cellos.events.>` which
84    /// receives every CloudEvent the platform emits. Callers can scope
85    /// to a tenant with e.g. `?subject=cellos.events.tenant1.>`.
86    pub subject: Option<String>,
87    /// ADR-0015 §D3 — resume cursor. When present the server starts
88    /// delivery at `since + 1`. With the core-NATS MVP bridge this is
89    /// accepted but historical replay is unavailable; the contract
90    /// (every frame carries `seq`) is preserved either way so clients
91    /// don't branch on the bridge implementation.
92    pub since: Option<u64>,
93}
94
95pub async fn ws_events(
96    State(state): State<AppState>,
97    headers: HeaderMap,
98    Query(params): Query<WsParams>,
99    ws: WebSocketUpgrade,
100) -> Result<impl IntoResponse, AppError> {
101    // Auth runs BEFORE the upgrade so an unauthenticated client sees
102    // 401 problem+json rather than a confusing protocol error.
103    require_bearer(&headers, &state.api_token)?;
104
105    let subject = params.subject.unwrap_or_else(|| DEFAULT_SUBJECT.to_owned());
106    let since = params.since;
107    // Cap inbound frame/message size — see WS_MAX_FRAME_BYTES. axum's
108    // defaults (16 MiB / 64 MiB) are too generous for a one-way feed.
109    let ws = ws
110        .max_message_size(WS_MAX_FRAME_BYTES)
111        .max_frame_size(WS_MAX_FRAME_BYTES);
112    Ok(ws.on_upgrade(move |socket| handle_socket(socket, state, subject, since)))
113}
114
115async fn handle_socket(socket: WebSocket, state: AppState, subject: String, since: Option<u64>) {
116    let Some(ctx) = state.jetstream.clone() else {
117        warn!("ws connect with no JetStream context configured; closing");
118        let _ = socket
119            .send_close_with_reason("no upstream broker configured")
120            .await;
121        return;
122    };
123
124    let subject_filter = if subject == DEFAULT_SUBJECT {
125        None
126    } else {
127        Some(subject.as_str())
128    };
129
130    // ADR-0015 §D3 — open the JetStream consumer with the right
131    // DeliverPolicy. When `since` is provided we resume at `since+1`;
132    // otherwise we live-tail from the next published message.
133    let mut messages = match open_ws_message_stream(&ctx, subject_filter, since).await {
134        Ok(s) => s,
135        Err(e) => {
136            warn!(error = %format!("{e:#}"), subject = %subject, since = ?since, "jetstream consumer create failed");
137            // ADR-0015 §D7 — when `since` is older than the stream's
138            // retention floor, close with the 4410 retention-exhausted
139            // contract. The client treats 4410 as "drop cache,
140            // re-hydrate from snapshot, reconnect at new cursor".
141            if since.is_some() && looks_like_retention_exhausted(&e) {
142                let oldest = stream_first_seq(&ctx).await;
143                close_retention_exhausted(socket, oldest).await;
144            } else {
145                let _ = socket.send_close_with_reason("subscribe failed").await;
146            }
147            return;
148        }
149    };
150
151    info!(
152        subject = %subject,
153        since = ?since,
154        "ws client connected, bridging JetStream messages",
155    );
156    let (mut tx, mut rx) = socket.split();
157
158    let mut heartbeat = interval(HEARTBEAT);
159    // The first tick fires immediately; skip it so we don't ping
160    // before the client has had a chance to settle.
161    heartbeat.tick().await;
162
163    loop {
164        tokio::select! {
165            // Red-team wave 2 (MED-W2A-1): `biased` so client-close and
166            // heartbeat ticks take priority over a saturated message
167            // firehose. Without this, a flood on `cellos.events.>` could
168            // starve `rx.next()` and leave a dead consumer pinned on the
169            // broker for the full 5-min inactive-threshold window after
170            // the client gives up.
171            biased;
172            incoming = rx.next() => {
173                match incoming {
174                    Some(Ok(Message::Close(_))) | None => {
175                        debug!("ws client closed");
176                        break;
177                    }
178                    Some(Err(e)) => {
179                        warn!(error = %e, "ws recv error");
180                        break;
181                    }
182                    // Ignore inbound pings/pongs/text/binary — this is a
183                    // one-way feed for the MVP.
184                    Some(Ok(_)) => {}
185                }
186            }
187            _ = heartbeat.tick() => {
188                // ADR-0015 §D6 — keepalive. axum's WebSocket auto-
189                // replies to pings *from* the client; sending one
190                // *to* the client is on us. Empty payload is fine —
191                // the client only cares that a frame arrived.
192                match tokio::time::timeout(WS_SEND_TIMEOUT, tx.send(Message::Ping(Vec::new()))).await {
193                    Ok(Ok(())) => {}
194                    Ok(Err(_)) => {
195                        debug!("ws heartbeat send failed; client gone");
196                        break;
197                    }
198                    Err(_) => {
199                        warn!("ws heartbeat send timed out after {:?}; closing", WS_SEND_TIMEOUT);
200                        break;
201                    }
202                }
203            }
204            msg = messages.next() => {
205                match msg {
206                    Some(Ok(m)) => {
207                        // ADR-0015 §D1 — seq is the JetStream stream
208                        // sequence, broker-authoritative across reconnects.
209                        let seq = match m.info() {
210                            Ok(info) => info.stream_sequence,
211                            Err(e) => {
212                                warn!(error = %e, "ws msg missing stream info; skipping");
213                                continue;
214                            }
215                        };
216                        let payload = match build_envelope(seq, &m.payload) {
217                            Ok(s) => s,
218                            Err(EnvelopeError::NotUtf8) => {
219                                warn!(subject = %subject, "dropping non-utf8 jetstream payload");
220                                continue;
221                            }
222                            Err(EnvelopeError::NotJson(e)) => {
223                                warn!(
224                                    subject = %subject,
225                                    error = %e,
226                                    "dropping non-json jetstream payload",
227                                );
228                                continue;
229                            }
230                        };
231                        // ADR-0015 §D2 — bump the projection cursor so
232                        // future snapshot fetches advertise the latest
233                        // applied seq. Monotonic; bump_cursor handles
234                        // out-of-order under concurrent connections.
235                        state.bump_cursor(seq);
236                        // Red-team wave 2 (MED-W2A-2): bounded send.
237                        // Without a timeout a wedged TCP receive window
238                        // (suspended laptop, congested link) parks the
239                        // entire WS task on `tx.send().await`, starving
240                        // heartbeats and rx, and pinning the broker
241                        // consumer for `EPHEMERAL_INACTIVE_THRESHOLD`.
242                        match tokio::time::timeout(WS_SEND_TIMEOUT, tx.send(Message::Text(payload))).await {
243                            Ok(Ok(())) => {}
244                            Ok(Err(_)) => {
245                                debug!("ws send failed; client gone");
246                                break;
247                            }
248                            Err(_) => {
249                                warn!(seq, "ws send timed out after {:?}; closing", WS_SEND_TIMEOUT);
250                                break;
251                            }
252                        }
253                        // The consumer is created with `AckPolicy::None`
254                        // (see jetstream.rs::create_ephemeral_consumer)
255                        // so JetStream never expects an ack and never
256                        // redelivers. This call is a no-op under the
257                        // current policy; we keep it so that flipping
258                        // the consumer to `AckPolicy::Explicit` later
259                        // does not silently lose ack semantics.
260                        if let Err(e) = m.ack().await {
261                            debug!(seq, error = %e, "jetstream ack failed (AckPolicy::None)");
262                        }
263                    }
264                    Some(Err(e)) => {
265                        warn!(error = %e, "jetstream message error; closing ws");
266                        break;
267                    }
268                    None => {
269                        debug!("jetstream message stream ended");
270                        break;
271                    }
272                }
273            }
274        }
275    }
276
277    info!(subject = %subject, "ws client disconnected");
278}
279
280/// ADR-0015 §D7 — close the WebSocket with the 4410 retention-exhausted
281/// contract. Sends a problem+json text frame describing the failure,
282/// then the close frame with the custom 4410 code. The client treats
283/// this as "drop my cached projection, re-hydrate from snapshot, and
284/// reconnect at the new cursor".
285async fn close_retention_exhausted(mut socket: WebSocket, oldest_seq: Option<u64>) {
286    let problem = serde_json::json!({
287        "type": "/problems/ws/retention-exhausted",
288        "title": "Cursor older than stream retention",
289        "oldest_seq": oldest_seq,
290    });
291    let _ = socket.send(Message::Text(problem.to_string())).await;
292    let _ = socket
293        .send(Message::Close(Some(axum::extract::ws::CloseFrame {
294            code: 4410,
295            reason: "retention-exhausted".into(),
296        })))
297        .await;
298}
299
300/// Build the ADR-0015 `{seq, event}` envelope JSON for a single NATS
301/// payload. Pulled out of the hot loop so tests can exercise the
302/// contract without spinning up a NATS broker.
303///
304/// Returns `Err` if the payload is not valid UTF-8 or not valid JSON —
305/// the bridge logs and drops in both cases, but the test surface needs
306/// to distinguish them.
307pub(crate) fn build_envelope(seq: u64, payload: &[u8]) -> Result<String, EnvelopeError> {
308    let s = std::str::from_utf8(payload).map_err(|_| EnvelopeError::NotUtf8)?;
309    let event_value: serde_json::Value = serde_json::from_str(s).map_err(EnvelopeError::NotJson)?;
310    let envelope = serde_json::json!({ "seq": seq, "event": event_value });
311    Ok(envelope.to_string())
312}
313
314#[derive(Debug)]
315pub(crate) enum EnvelopeError {
316    NotUtf8,
317    NotJson(serde_json::Error),
318}
319
320/// Helper trait so the early-exit paths can write a close frame without
321/// re-stating the (code, reason) tuple at every call site.
322trait CloseExt {
323    async fn send_close_with_reason(self, reason: &'static str) -> Result<(), axum::Error>;
324}
325
326impl CloseExt for WebSocket {
327    async fn send_close_with_reason(mut self, reason: &'static str) -> Result<(), axum::Error> {
328        self.send(Message::Close(Some(axum::extract::ws::CloseFrame {
329            code: axum::extract::ws::close_code::POLICY,
330            reason: reason.into(),
331        })))
332        .await
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339
340    /// ADR-0015 §D1 — every `/ws/events` frame is a JSON envelope
341    /// `{seq, event}` where `event` is the CloudEvent parsed as a
342    /// structured value (not a string-of-JSON). This is the contract
343    /// the web view reducer keys off of, so the test is deliberately
344    /// at the byte level.
345    #[test]
346    fn ws_envelope_carries_seq() {
347        let cloud_event = serde_json::json!({
348            "specversion": "1.0",
349            "type": "io.cellos.formation.v1.created",
350            "source": "/formations/abc",
351            "id": "evt-1",
352            "data": { "name": "demo" }
353        });
354        let payload = serde_json::to_vec(&cloud_event).unwrap();
355
356        let frame = build_envelope(42, &payload).expect("envelope build");
357        let parsed: serde_json::Value = serde_json::from_str(&frame).unwrap();
358
359        assert_eq!(
360            parsed["seq"].as_u64(),
361            Some(42),
362            "envelope must carry the seq as the cursor field; got {}",
363            parsed["seq"]
364        );
365        assert!(
366            parsed["event"].is_object(),
367            "event must be a structured JSON object, not a string-of-JSON; got {}",
368            parsed["event"]
369        );
370        assert_eq!(parsed["event"]["type"], "io.cellos.formation.v1.created");
371        assert_eq!(parsed["event"]["data"]["name"], "demo");
372    }
373
374    #[test]
375    fn ws_envelope_rejects_non_utf8_payload() {
376        // Invalid UTF-8 sequence; must be dropped rather than mangled
377        // so the producer-side bug surfaces on the wire test, not as
378        // garbled frames on the client.
379        let bad = [0xffu8, 0xfe, 0xfd];
380        match build_envelope(1, &bad) {
381            Err(EnvelopeError::NotUtf8) => {}
382            other => panic!("expected NotUtf8, got {other:?}"),
383        }
384    }
385
386    #[test]
387    fn ws_envelope_rejects_non_json_payload() {
388        // Producers MUST emit CloudEvent JSON. Plain text is a contract
389        // violation and the bridge drops it.
390        let bad = b"hello, world";
391        match build_envelope(1, bad) {
392            Err(EnvelopeError::NotJson(_)) => {}
393            other => panic!("expected NotJson, got {other:?}"),
394        }
395    }
396}