cellos_server/ws.rs
1//! WebSocket → NATS bridge.
2//!
3//! `GET /ws/events` upgrades the connection, subscribes to
4//! `cellos.events.>` (or the optional `?subject=` override), and forwards
5//! every NATS message as a JSON envelope:
6//!
7//! ```json
8//! { "seq": 12345, "event": { /* CloudEvent */ } }
9//! ```
10//!
11//! `seq` is the cursor described in ADR-0015. When the underlying
12//! subscription is a JetStream consumer it is the JetStream stream
13//! sequence; when it is a core-NATS subscription (today's MVP path) it
14//! is a per-connection monotonic counter. Either way the contract on
15//! the wire is the same: `seq` is monotonic, and the snapshot
16//! endpoint's `cursor` is comparable to it on the same broker.
17//!
18//! ## ADR-0015 §D3 — `?since=<seq>` resume
19//!
20//! Clients open `/ws/events?since=<cursor>` to resume after a
21//! reconnect. With a JetStream consumer this maps directly to
22//! `DeliverPolicy::ByStartSequence { OptStartSeq: since + 1 }`. With
23//! the core-NATS bridge the parameter is *accepted but cannot replay
24//! history* — the bridge only delivers messages published after the
25//! subscription was created. The contract is preserved (every frame
26//! still carries `seq`) so clients work transparently against either
27//! bridge; migration to JetStream is tracked as a follow-up. Omitting
28//! `since` keeps today's "new messages only" behavior for ad-hoc
29//! subscribers (cellctl `--follow`, debug tools).
30//!
31//! ## ADR-0015 §D6 — heartbeat
32//!
33//! A WebSocket can sit idle on a quiet stream without either side
34//! knowing the underlying socket has died. The server sends a `Ping`
35//! frame every 25 seconds when idle; axum's WebSocket type handles the
36//! pong roundtrip transparently.
37
38use std::time::Duration;
39
40use axum::extract::ws::{Message, WebSocket};
41use axum::extract::{Query, State, WebSocketUpgrade};
42use axum::http::HeaderMap;
43use axum::response::IntoResponse;
44use futures_util::{SinkExt, StreamExt};
45use serde::Deserialize;
46use tokio::time::interval;
47use tracing::{debug, info, warn};
48
49use crate::auth::require_bearer;
50use crate::error::AppError;
51use crate::jetstream::{looks_like_retention_exhausted, open_ws_message_stream, stream_first_seq};
52use crate::state::AppState;
53
54const DEFAULT_SUBJECT: &str = "cellos.events.>";
55/// ADR-0015 §D6 — heartbeat interval. The web view treats >45s of
56/// silence as a dead connection; 25s keeps us well inside that budget.
57const HEARTBEAT: Duration = Duration::from_secs(25);
58
59/// Per-frame / per-message ceiling on inbound WebSocket data.
60///
61/// axum defaults to 16 MiB / 64 MiB. `/ws/events` is a one-way feed
62/// from the server to the client — we deliberately ignore all inbound
63/// frames other than `Close`. Accepting the default ceiling would let
64/// an authed client park a 64 MiB frame in the kernel buffer for free
65/// (resource amplification post-auth). 64 KiB is far more than any
66/// control frame a future client might send and below the threshold
67/// where a single misbehaving client can hurt the server.
68const WS_MAX_FRAME_BYTES: usize = 64 * 1024;
69
70/// Per-frame send timeout for the data path. Red-team wave 2 (MED-W2A-2):
71/// without a bounded send, a client whose TCP receive window is wedged
72/// (suspended laptop, congested link) parks the entire WS task on
73/// `tx.send().await` — heartbeats stop, client-close goes unobserved,
74/// the JetStream pull stream backs up behind the un-drained channel,
75/// and the broker doesn't reclaim the consumer until
76/// `EPHEMERAL_INACTIVE_THRESHOLD` (5 min). 50s is comfortably above
77/// two heartbeat intervals; anything longer than that and the client
78/// is effectively gone.
79const WS_SEND_TIMEOUT: Duration = Duration::from_secs(50);
80
81#[derive(Debug, Deserialize)]
82pub struct WsParams {
83 /// Optional NATS subject filter. Defaults to `cellos.events.>` which
84 /// receives every CloudEvent the platform emits. Callers can scope
85 /// to a tenant with e.g. `?subject=cellos.events.tenant1.>`.
86 pub subject: Option<String>,
87 /// ADR-0015 §D3 — resume cursor. When present the server starts
88 /// delivery at `since + 1`. With the core-NATS MVP bridge this is
89 /// accepted but historical replay is unavailable; the contract
90 /// (every frame carries `seq`) is preserved either way so clients
91 /// don't branch on the bridge implementation.
92 pub since: Option<u64>,
93}
94
95pub async fn ws_events(
96 State(state): State<AppState>,
97 headers: HeaderMap,
98 Query(params): Query<WsParams>,
99 ws: WebSocketUpgrade,
100) -> Result<impl IntoResponse, AppError> {
101 // Auth runs BEFORE the upgrade so an unauthenticated client sees
102 // 401 problem+json rather than a confusing protocol error.
103 require_bearer(&headers, &state.api_token)?;
104
105 let subject = params.subject.unwrap_or_else(|| DEFAULT_SUBJECT.to_owned());
106 let since = params.since;
107 // Cap inbound frame/message size — see WS_MAX_FRAME_BYTES. axum's
108 // defaults (16 MiB / 64 MiB) are too generous for a one-way feed.
109 let ws = ws
110 .max_message_size(WS_MAX_FRAME_BYTES)
111 .max_frame_size(WS_MAX_FRAME_BYTES);
112 Ok(ws.on_upgrade(move |socket| handle_socket(socket, state, subject, since)))
113}
114
115async fn handle_socket(socket: WebSocket, state: AppState, subject: String, since: Option<u64>) {
116 let Some(ctx) = state.jetstream.clone() else {
117 warn!("ws connect with no JetStream context configured; closing");
118 let _ = socket
119 .send_close_with_reason("no upstream broker configured")
120 .await;
121 return;
122 };
123
124 let subject_filter = if subject == DEFAULT_SUBJECT {
125 None
126 } else {
127 Some(subject.as_str())
128 };
129
130 // ADR-0015 §D3 — open the JetStream consumer with the right
131 // DeliverPolicy. When `since` is provided we resume at `since+1`;
132 // otherwise we live-tail from the next published message.
133 let mut messages = match open_ws_message_stream(&ctx, subject_filter, since).await {
134 Ok(s) => s,
135 Err(e) => {
136 warn!(error = %format!("{e:#}"), subject = %subject, since = ?since, "jetstream consumer create failed");
137 // ADR-0015 §D7 — when `since` is older than the stream's
138 // retention floor, close with the 4410 retention-exhausted
139 // contract. The client treats 4410 as "drop cache,
140 // re-hydrate from snapshot, reconnect at new cursor".
141 if since.is_some() && looks_like_retention_exhausted(&e) {
142 let oldest = stream_first_seq(&ctx).await;
143 close_retention_exhausted(socket, oldest).await;
144 } else {
145 let _ = socket.send_close_with_reason("subscribe failed").await;
146 }
147 return;
148 }
149 };
150
151 info!(
152 subject = %subject,
153 since = ?since,
154 "ws client connected, bridging JetStream messages",
155 );
156 let (mut tx, mut rx) = socket.split();
157
158 let mut heartbeat = interval(HEARTBEAT);
159 // The first tick fires immediately; skip it so we don't ping
160 // before the client has had a chance to settle.
161 heartbeat.tick().await;
162
163 loop {
164 tokio::select! {
165 // Red-team wave 2 (MED-W2A-1): `biased` so client-close and
166 // heartbeat ticks take priority over a saturated message
167 // firehose. Without this, a flood on `cellos.events.>` could
168 // starve `rx.next()` and leave a dead consumer pinned on the
169 // broker for the full 5-min inactive-threshold window after
170 // the client gives up.
171 biased;
172 incoming = rx.next() => {
173 match incoming {
174 Some(Ok(Message::Close(_))) | None => {
175 debug!("ws client closed");
176 break;
177 }
178 Some(Err(e)) => {
179 warn!(error = %e, "ws recv error");
180 break;
181 }
182 // Ignore inbound pings/pongs/text/binary — this is a
183 // one-way feed for the MVP.
184 Some(Ok(_)) => {}
185 }
186 }
187 _ = heartbeat.tick() => {
188 // ADR-0015 §D6 — keepalive. axum's WebSocket auto-
189 // replies to pings *from* the client; sending one
190 // *to* the client is on us. Empty payload is fine —
191 // the client only cares that a frame arrived.
192 match tokio::time::timeout(WS_SEND_TIMEOUT, tx.send(Message::Ping(Vec::new()))).await {
193 Ok(Ok(())) => {}
194 Ok(Err(_)) => {
195 debug!("ws heartbeat send failed; client gone");
196 break;
197 }
198 Err(_) => {
199 warn!("ws heartbeat send timed out after {:?}; closing", WS_SEND_TIMEOUT);
200 break;
201 }
202 }
203 }
204 msg = messages.next() => {
205 match msg {
206 Some(Ok(m)) => {
207 // ADR-0015 §D1 — seq is the JetStream stream
208 // sequence, broker-authoritative across reconnects.
209 let seq = match m.info() {
210 Ok(info) => info.stream_sequence,
211 Err(e) => {
212 warn!(error = %e, "ws msg missing stream info; skipping");
213 continue;
214 }
215 };
216 let payload = match build_envelope(seq, &m.payload) {
217 Ok(s) => s,
218 Err(EnvelopeError::NotUtf8) => {
219 warn!(subject = %subject, "dropping non-utf8 jetstream payload");
220 continue;
221 }
222 Err(EnvelopeError::NotJson(e)) => {
223 warn!(
224 subject = %subject,
225 error = %e,
226 "dropping non-json jetstream payload",
227 );
228 continue;
229 }
230 };
231 // ADR-0015 §D2 — bump the projection cursor so
232 // future snapshot fetches advertise the latest
233 // applied seq. Monotonic; bump_cursor handles
234 // out-of-order under concurrent connections.
235 state.bump_cursor(seq);
236 // Red-team wave 2 (MED-W2A-2): bounded send.
237 // Without a timeout a wedged TCP receive window
238 // (suspended laptop, congested link) parks the
239 // entire WS task on `tx.send().await`, starving
240 // heartbeats and rx, and pinning the broker
241 // consumer for `EPHEMERAL_INACTIVE_THRESHOLD`.
242 match tokio::time::timeout(WS_SEND_TIMEOUT, tx.send(Message::Text(payload))).await {
243 Ok(Ok(())) => {}
244 Ok(Err(_)) => {
245 debug!("ws send failed; client gone");
246 break;
247 }
248 Err(_) => {
249 warn!(seq, "ws send timed out after {:?}; closing", WS_SEND_TIMEOUT);
250 break;
251 }
252 }
253 // The consumer is created with `AckPolicy::None`
254 // (see jetstream.rs::create_ephemeral_consumer)
255 // so JetStream never expects an ack and never
256 // redelivers. This call is a no-op under the
257 // current policy; we keep it so that flipping
258 // the consumer to `AckPolicy::Explicit` later
259 // does not silently lose ack semantics.
260 if let Err(e) = m.ack().await {
261 debug!(seq, error = %e, "jetstream ack failed (AckPolicy::None)");
262 }
263 }
264 Some(Err(e)) => {
265 warn!(error = %e, "jetstream message error; closing ws");
266 break;
267 }
268 None => {
269 debug!("jetstream message stream ended");
270 break;
271 }
272 }
273 }
274 }
275 }
276
277 info!(subject = %subject, "ws client disconnected");
278}
279
280/// ADR-0015 §D7 — close the WebSocket with the 4410 retention-exhausted
281/// contract. Sends a problem+json text frame describing the failure,
282/// then the close frame with the custom 4410 code. The client treats
283/// this as "drop my cached projection, re-hydrate from snapshot, and
284/// reconnect at the new cursor".
285async fn close_retention_exhausted(mut socket: WebSocket, oldest_seq: Option<u64>) {
286 let problem = serde_json::json!({
287 "type": "/problems/ws/retention-exhausted",
288 "title": "Cursor older than stream retention",
289 "oldest_seq": oldest_seq,
290 });
291 let _ = socket.send(Message::Text(problem.to_string())).await;
292 let _ = socket
293 .send(Message::Close(Some(axum::extract::ws::CloseFrame {
294 code: 4410,
295 reason: "retention-exhausted".into(),
296 })))
297 .await;
298}
299
300/// Build the ADR-0015 `{seq, event}` envelope JSON for a single NATS
301/// payload. Pulled out of the hot loop so tests can exercise the
302/// contract without spinning up a NATS broker.
303///
304/// Returns `Err` if the payload is not valid UTF-8 or not valid JSON —
305/// the bridge logs and drops in both cases, but the test surface needs
306/// to distinguish them.
307pub(crate) fn build_envelope(seq: u64, payload: &[u8]) -> Result<String, EnvelopeError> {
308 let s = std::str::from_utf8(payload).map_err(|_| EnvelopeError::NotUtf8)?;
309 let event_value: serde_json::Value = serde_json::from_str(s).map_err(EnvelopeError::NotJson)?;
310 let envelope = serde_json::json!({ "seq": seq, "event": event_value });
311 Ok(envelope.to_string())
312}
313
314#[derive(Debug)]
315pub(crate) enum EnvelopeError {
316 NotUtf8,
317 NotJson(serde_json::Error),
318}
319
320/// Helper trait so the early-exit paths can write a close frame without
321/// re-stating the (code, reason) tuple at every call site.
322trait CloseExt {
323 async fn send_close_with_reason(self, reason: &'static str) -> Result<(), axum::Error>;
324}
325
326impl CloseExt for WebSocket {
327 async fn send_close_with_reason(mut self, reason: &'static str) -> Result<(), axum::Error> {
328 self.send(Message::Close(Some(axum::extract::ws::CloseFrame {
329 code: axum::extract::ws::close_code::POLICY,
330 reason: reason.into(),
331 })))
332 .await
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 /// ADR-0015 §D1 — every `/ws/events` frame is a JSON envelope
341 /// `{seq, event}` where `event` is the CloudEvent parsed as a
342 /// structured value (not a string-of-JSON). This is the contract
343 /// the web view reducer keys off of, so the test is deliberately
344 /// at the byte level.
345 #[test]
346 fn ws_envelope_carries_seq() {
347 let cloud_event = serde_json::json!({
348 "specversion": "1.0",
349 "type": "io.cellos.formation.v1.created",
350 "source": "/formations/abc",
351 "id": "evt-1",
352 "data": { "name": "demo" }
353 });
354 let payload = serde_json::to_vec(&cloud_event).unwrap();
355
356 let frame = build_envelope(42, &payload).expect("envelope build");
357 let parsed: serde_json::Value = serde_json::from_str(&frame).unwrap();
358
359 assert_eq!(
360 parsed["seq"].as_u64(),
361 Some(42),
362 "envelope must carry the seq as the cursor field; got {}",
363 parsed["seq"]
364 );
365 assert!(
366 parsed["event"].is_object(),
367 "event must be a structured JSON object, not a string-of-JSON; got {}",
368 parsed["event"]
369 );
370 assert_eq!(parsed["event"]["type"], "io.cellos.formation.v1.created");
371 assert_eq!(parsed["event"]["data"]["name"], "demo");
372 }
373
374 #[test]
375 fn ws_envelope_rejects_non_utf8_payload() {
376 // Invalid UTF-8 sequence; must be dropped rather than mangled
377 // so the producer-side bug surfaces on the wire test, not as
378 // garbled frames on the client.
379 let bad = [0xffu8, 0xfe, 0xfd];
380 match build_envelope(1, &bad) {
381 Err(EnvelopeError::NotUtf8) => {}
382 other => panic!("expected NotUtf8, got {other:?}"),
383 }
384 }
385
386 #[test]
387 fn ws_envelope_rejects_non_json_payload() {
388 // Producers MUST emit CloudEvent JSON. Plain text is a contract
389 // violation and the bridge drops it.
390 let bad = b"hello, world";
391 match build_envelope(1, bad) {
392 Err(EnvelopeError::NotJson(_)) => {}
393 other => panic!("expected NotJson, got {other:?}"),
394 }
395 }
396}