jmap_base_client/ws/mod.rs
1//! WebSocket transport for JMAP (RFC 8887).
2//!
3//! Provides [`connect_ws`] which establishes a WebSocket connection and
4//! returns a [`WsSession`] for sending and receiving frames.
5//!
6//! URL source: `Session::capabilities["urn:ietf:params:jmap:websocket"].url`
7//! (the session document advertises the WebSocket endpoint).
8
9use std::str::FromStr as _;
10
11use futures::SinkExt as _;
12use futures::StreamExt as _;
13use tokio_tungstenite::tungstenite::client::IntoClientRequest as _;
14use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
15use tokio_tungstenite::tungstenite::Message;
16
17use crate::push::StateChange;
18
19/// Wire frame sent from the client to the server over WebSocket (RFC 8887 §4.3.2).
20///
21/// Wraps a [`jmap_types::JmapRequest`] and injects the mandatory `@type: "Request"`
22/// field (and optional `id`) in a single `serde_json::to_string` pass, avoiding
23/// the `to_value` + mutation + `to_string` double-serialization that the naive
24/// approach requires.
25#[derive(serde::Serialize)]
26struct WsRequestFrame<'a> {
27 /// RFC 8887 §4.3.2 — every JMAP request frame MUST carry "@type": "Request".
28 #[serde(rename = "@type")]
29 ws_type: &'static str,
30 /// Optional correlation ID echoed back in the server's Response frame.
31 #[serde(skip_serializing_if = "Option::is_none")]
32 id: Option<&'a str>,
33 /// The JMAP request payload; flattened into the enclosing JSON object.
34 #[serde(flatten)]
35 inner: &'a jmap_types::JmapRequest,
36}
37
38/// Maximum WebSocket message size (1 MiB), consistent with the SSE frame limit.
39/// Prevents a misbehaving or hostile server from forcing the client to buffer
40/// large messages over the event connection.
41/// Default per-message / per-frame byte cap for WebSocket connections opened
42/// via [`connect_ws`] (which does not take a limit parameter). Callers that
43/// need a different cap should use [`connect_ws_with_limit`] or the
44/// [`crate::JmapClient::connect_ws_session`] convenience method which
45/// reads the `max_ws_message` field from `ClientConfig`. Default: 1 MiB.
46pub const DEFAULT_WS_MAX_MESSAGE_BYTES: usize = 1 << 20;
47
48/// A parsed frame received from the JMAP WebSocket.
49///
50/// Marked `#[non_exhaustive]` because the spec may define additional
51/// `@type` values in future revisions.
52///
53/// # `Debug` redaction
54///
55/// `WsFrame` has a hand-written [`std::fmt::Debug`] impl rather than
56/// `#[derive(Debug)]` so the [`Unknown::raw`](WsFrame::Unknown) field is
57/// printed as `[REDACTED]` instead of being serialised verbatim. See the
58/// field doc on `Unknown.raw` for the credential-leak class this guards
59/// against (bd:JMAP-6r7c.5). Use the structured accessors on the frame
60/// itself, not `{:?}`, when you need the raw value.
61#[non_exhaustive]
62#[derive(Clone, PartialEq)]
63pub enum WsFrame {
64 /// RFC 8620 §7.1 StateChange — one or more object types have changed
65 /// state; client must re-fetch the affected data types.
66 StateChange(StateChange),
67 /// RFC 8887 Response — reply to a JMAP request sent on this connection.
68 Response(jmap_types::JmapResponse),
69 /// Unrecognized `@type` — silently ignored per forward-compatibility rules
70 /// (RFC 8887 §4.3.1: clients SHOULD ignore unknown message types).
71 ///
72 /// Also produced when a known type (`"Response"` or `"StateChange"`) fails
73 /// to deserialize — `type_name` will be `"Response"` or `"StateChange"` in
74 /// that case, which can signal server misbehavior or a schema version
75 /// mismatch. Callers that log unknown frames should check for these names.
76 Unknown {
77 /// Value of the `@type` field. Either an unrecognized message type
78 /// per RFC 8887 §4.3.1, or `"Response"` / `"StateChange"` when a known
79 /// type failed to deserialize into its typed variant.
80 type_name: String,
81 /// Raw JSON object as received from the server, preserved for
82 /// forward-compatibility diagnostics.
83 ///
84 /// **DO NOT log this field verbatim.** Future or extension JMAP
85 /// WebSocket message types may carry credential-grade material —
86 /// push verification codes (RFC 8887 §7.2), federation handshake
87 /// tokens, session-rotation challenges, etc. — and a malformed
88 /// `Response` to a method like `PushSubscription/get` can echo a
89 /// `verificationCode` back into this field.
90 ///
91 /// The enclosing `WsFrame` uses a hand-written `Debug` impl that
92 /// renders this field as `[REDACTED]` to neutralise the natural
93 /// `{:?}`-leaks-the-field failure mode (bd:JMAP-6r7c.5). For
94 /// operator logs, prefer logging `type_name` only, or apply a
95 /// project-specific redaction filter before passing `raw` to a
96 /// logging sink. See bd:JMAP-sc1b.98.
97 raw: serde_json::Value,
98 },
99}
100
101impl std::fmt::Debug for WsFrame {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 match self {
104 WsFrame::StateChange(sc) => f.debug_tuple("StateChange").field(sc).finish(),
105 WsFrame::Response(r) => f.debug_tuple("Response").field(r).finish(),
106 // `raw` may carry credential-grade material in failure modes
107 // (see the field doc); render as a literal string so neither
108 // `{:?}` nor `tracing::*` with `?frame` reveals the payload.
109 WsFrame::Unknown { type_name, raw: _ } => f
110 .debug_struct("Unknown")
111 .field("type_name", type_name)
112 .field("raw", &"[REDACTED]")
113 .finish(),
114 }
115 }
116}
117
118type Inner =
119 tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
120
121/// An established JMAP WebSocket session (RFC 8887).
122///
123/// Call [`next_frame`](WsSession::next_frame) in a loop to receive events.
124/// Use [`send_request`](WsSession::send_request) to transmit JMAP requests.
125///
126/// The caller is responsible for reconnecting after the stream ends or returns
127/// a transport error. Use exponential backoff.
128///
129/// # Drop and cancellation (bd:JMAP-6r7c.24)
130///
131/// `WsSession` may be dropped at any point — including from inside a
132/// `tokio::select!` losing branch while [`next_frame`](WsSession::next_frame)
133/// is awaiting. Dropping is always safe and always synchronous:
134///
135/// - **Partial frame state is discarded.** The split sink and stream halves
136/// are owned by the session; dropping the session drops both halves and
137/// any in-flight tungstenite buffering.
138/// - **The underlying TCP / TLS connection is closed.** No `Close` frame is
139/// sent — dropping skips the WebSocket close handshake. Callers that want
140/// the server to see a clean shutdown should arrange to receive the
141/// server's `Close` (which `next_frame` returns as `None`) before drop;
142/// this crate does not currently expose an explicit client-initiated
143/// `close()` method.
144/// - **Resumption is the caller's job.** If you need to recover the
145/// conversation state, you must either persist enough application-level
146/// state to replay, or rely on the server's push-state replay protocol.
147/// `WsSession` itself buffers nothing the caller can replay.
148///
149/// `next_frame` is cancel-safe in the sense that cancelling its future
150/// (via `select!` or `drop`) does not corrupt subsequent calls to
151/// `next_frame` on the same `WsSession` — but cancelling means giving up
152/// on any partial frame that was being read; the next `next_frame` call
153/// starts from the next complete message.
154pub struct WsSession {
155 sender: WsSender,
156 receiver: WsReceiver,
157}
158
159/// Owning send-half of a WebSocket connection (bd:JMAP-6r7c.31).
160///
161/// Returned from [`WsSession::split`]. Holds the underlying tungstenite
162/// sink and exposes the per-direction send methods that the unified
163/// [`WsSession`] previously bundled with the receive half. Use with
164/// [`WsReceiver`] in two-task topologies (one task in a `next_frame` loop,
165/// one task occasionally sending requests) — the previous unified shape
166/// required serialising send and receive through a single `&mut WsSession`
167/// borrow, which made concurrent send-while-receiving impossible without a
168/// `Mutex` that holds across `.await`.
169pub struct WsSender {
170 sink: futures::stream::SplitSink<Inner, Message>,
171}
172
173/// Owning receive-half of a WebSocket connection (bd:JMAP-6r7c.31).
174///
175/// Returned from [`WsSession::split`]. Holds the underlying tungstenite
176/// stream and exposes [`next_frame`](WsReceiver::next_frame). See
177/// [`WsSender`] for the companion type and the concurrency rationale.
178pub struct WsReceiver {
179 stream: futures::stream::SplitStream<Inner>,
180}
181
182/// Maximum number of consecutive non-Text non-Close non-Binary frames
183/// (Ping, Pong, Frame, etc.) `next_frame` will silently skip in a single call.
184///
185/// Tungstenite handles ping/pong at the protocol layer, so seeing them at the
186/// `Message` layer is unusual but legal — we skip them. A misbehaving or
187/// hostile server that floods the stream with no-op frames could otherwise
188/// starve a caller of `next_frame` indefinitely; this cap surfaces an
189/// `UnexpectedResponse` error before that can happen. 64 is high enough that
190/// a normal connection never trips it (typical SSE/WS streams interleave at
191/// most a handful of pings between data frames) and low enough that the
192/// caller doesn't wait long if a bad server is talking nonsense.
193///
194/// `Binary` frames are NOT counted here — they violate RFC 8887 §4.1 and
195/// surface as `UnexpectedResponse` immediately on the first occurrence.
196///
197/// # Do not remove the cap (bd:JMAP-6r7c.17, originally bd:JMAP-6lsm.6)
198///
199/// A future contributor may suggest "tungstenite handles pings at the
200/// protocol layer, just continue the loop silently for any non-text
201/// non-close frame." That re-introduces the resource-exhaustion bug
202/// bd:JMAP-6lsm.6 fixed. The cap is load-bearing for five reasons:
203///
204/// 1. **Starvation defense.** A hostile or misbehaving server can
205/// flood the stream with no-op frames (Ping, Pong, Frame, future
206/// variants) and starve a caller of `next_frame()` indefinitely.
207/// The cap surfaces this as an error before the caller's outer
208/// loop is starved.
209/// 2. **Calibrated threshold.** 64 is high enough that a normal
210/// connection never trips it. Lower numbers (e.g. 8) would
211/// false-positive on networks with weak NAT keepalives where the
212/// server sends frequent pings to keep the connection alive.
213/// 3. **Binary frames are a separate guarantee.** They are NOT
214/// subsumed by the cap — they violate RFC 8887 §4.1 and surface
215/// immediately on first occurrence (see `classify_message`
216/// `MessageDisposition::UnexpectedFrame` path). Removing the cap
217/// does not affect Binary handling; the threats are different.
218/// 4. **Unit-testable policy.** `classify_message` (free function,
219/// not a method) is testable without a real WebSocket. The test
220/// suite in this file exercises the classification explicitly so
221/// a refactor that "just changes the loop" doesn't silently lose
222/// the policy.
223/// 5. **Pinned by tripwire test.**
224/// `consecutive_skip_cap_matches_documented_value` asserts the
225/// constant equals 64. A retune fails the test loudly so the
226/// change is visible in CI, forcing a deliberate choice rather
227/// than a silent regression.
228///
229/// Resist requests to "simplify" by removing the count.
230const MAX_CONSECUTIVE_NON_TEXT_FRAMES: usize = 64;
231
232/// Classify a single tungstenite [`Message`] into a [`MessageDisposition`]
233/// that tells the [`WsSession::next_frame`] loop what to do with it.
234///
235/// Extracted as a free function so the policy is unit-testable without a
236/// real WebSocket: see the inline test module. Pure function over the
237/// message variant.
238fn classify_message(msg: &Message) -> MessageDisposition {
239 match msg {
240 Message::Text(_) => MessageDisposition::Text,
241 Message::Close(_) => MessageDisposition::Close,
242 Message::Binary(_) => MessageDisposition::Binary,
243 // Ping, Pong, Frame, and any future variants: skip, but count.
244 _ => MessageDisposition::Skip,
245 }
246}
247
248/// Decision a `next_frame` loop iteration takes after looking at one
249/// [`Message`]. See [`classify_message`].
250#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251enum MessageDisposition {
252 /// Text frame: hand to `parse_ws_frame` and return its result.
253 Text,
254 /// Close frame: end the stream by returning `None`.
255 Close,
256 /// Binary frame: violates RFC 8887 §4.1; surface as
257 /// `UnexpectedResponse` immediately on the first occurrence.
258 Binary,
259 /// Ping / Pong / Frame / future variants: silently skip and continue
260 /// the loop, subject to [`MAX_CONSECUTIVE_NON_TEXT_FRAMES`].
261 Skip,
262}
263
264impl WsReceiver {
265 /// Receive the next parsed frame from the server.
266 ///
267 /// Returns `None` when the server has cleanly closed the connection.
268 /// Returns `Some(Err(...))` on parse failure, transport error, RFC 8887
269 /// §4.1 violation (Binary frame), or starvation cap (more than 64
270 /// consecutive Ping/Pong/Frame messages — see the private
271 /// `MAX_CONSECUTIVE_NON_TEXT_FRAMES` constant for the exact value).
272 /// After a transport error the connection is broken and `next_frame`
273 /// must not be called again. After an `UnexpectedResponse` error the
274 /// underlying stream is still healthy — the caller may choose to
275 /// ignore it and retry, or to disconnect.
276 pub async fn next_frame(&mut self) -> Option<Result<WsFrame, crate::error::ClientError>> {
277 let mut consecutive_skips = 0usize;
278 loop {
279 let msg = match self.stream.next().await? {
280 Ok(m) => m,
281 Err(e) => return Some(Err(crate::error::ClientError::from_ws(e))),
282 };
283 match classify_message(&msg) {
284 MessageDisposition::Text => {
285 let Message::Text(text) = msg else {
286 // Unreachable: classify_message returned Text only for
287 // Message::Text. Defensive in case the variant grows.
288 return Some(Err(crate::error::ClientError::UnexpectedResponse(
289 "WebSocket: classify_message returned Text for non-Text variant".into(),
290 )));
291 };
292 return Some(parse_ws_frame(&text));
293 }
294 MessageDisposition::Close => return None,
295 MessageDisposition::Binary => {
296 // RFC 8887 §4.1: JMAP only uses text frames. Surface the
297 // violation; underlying stream is still healthy so the
298 // caller can choose to retry next_frame if it wants.
299 return Some(Err(crate::error::ClientError::UnexpectedResponse(
300 "WebSocket: server sent Binary frame; RFC 8887 §4.1 mandates text frames"
301 .into(),
302 )));
303 }
304 MessageDisposition::Skip => {
305 consecutive_skips = consecutive_skips.saturating_add(1);
306 if consecutive_skips > MAX_CONSECUTIVE_NON_TEXT_FRAMES {
307 return Some(Err(crate::error::ClientError::UnexpectedResponse(
308 format!(
309 "WebSocket: exceeded {MAX_CONSECUTIVE_NON_TEXT_FRAMES} consecutive non-text frames; possible server misbehaviour"
310 ),
311 )));
312 }
313 }
314 }
315 }
316 }
317}
318
319impl WsSender {
320 /// Send a raw text frame over the WebSocket connection.
321 ///
322 /// Used by extension crates to send non-JMAP frames (e.g., JMAP Chat
323 /// ephemeral stream control messages).
324 pub async fn send_text(&mut self, text: String) -> Result<(), crate::error::ClientError> {
325 self.sink
326 .send(Message::Text(text.into()))
327 .await
328 .map_err(crate::error::ClientError::from_ws)
329 }
330
331 /// Send a JMAP request over the WebSocket connection.
332 ///
333 /// Serializes `req` and injects `"@type": "Request"` into the outgoing
334 /// JSON object as required by RFC 8887 §4.3.2. The optional `id` is
335 /// echoed back in the corresponding `Response` frame, enabling out-of-order
336 /// correlation.
337 ///
338 /// # Errors
339 ///
340 /// Returns `ClientError::Serialize` if `req` cannot be serialized, or
341 /// `ClientError::WebSocket` on a transport failure.
342 pub async fn send_request(
343 &mut self,
344 req: &jmap_types::JmapRequest,
345 id: Option<&str>,
346 ) -> Result<(), crate::error::ClientError> {
347 // Wrap req in WsRequestFrame to inject @type and optional id in one
348 // serialization pass (no intermediate serde_json::Value allocation).
349 let frame = WsRequestFrame {
350 ws_type: "Request",
351 id,
352 inner: req,
353 };
354 let text =
355 serde_json::to_string(&frame).map_err(crate::error::ClientError::from_serialize)?;
356 self.sink
357 .send(Message::Text(text.into()))
358 .await
359 .map_err(crate::error::ClientError::from_ws)
360 }
361}
362
363impl WsSession {
364 /// Receive the next parsed frame from the server.
365 ///
366 /// Delegates to [`WsReceiver::next_frame`]. Use this unified handle when
367 /// a single task drives both send and receive; for the
368 /// receive-loop-in-a-separate-task topology, call [`split`](Self::split)
369 /// to get owned [`WsReceiver`] / [`WsSender`] halves.
370 pub async fn next_frame(&mut self) -> Option<Result<WsFrame, crate::error::ClientError>> {
371 self.receiver.next_frame().await
372 }
373
374 /// Send a raw text frame over the WebSocket connection.
375 ///
376 /// Delegates to [`WsSender::send_text`].
377 pub async fn send_text(&mut self, text: String) -> Result<(), crate::error::ClientError> {
378 self.sender.send_text(text).await
379 }
380
381 /// Send a JMAP request over the WebSocket connection.
382 ///
383 /// Delegates to [`WsSender::send_request`].
384 pub async fn send_request(
385 &mut self,
386 req: &jmap_types::JmapRequest,
387 id: Option<&str>,
388 ) -> Result<(), crate::error::ClientError> {
389 self.sender.send_request(req, id).await
390 }
391
392 /// Consume the session and return its owned send and receive halves
393 /// (bd:JMAP-6r7c.31).
394 ///
395 /// Use this when a caller needs to drive the receive loop and the
396 /// send path concurrently — typically from two `tokio::spawn`-ed
397 /// tasks, one running a `while let Some(...) = receiver.next_frame()
398 /// .await` loop and one occasionally sending requests via
399 /// `sender.send_request(...)`. The unified `WsSession` API requires
400 /// `&mut self` for both directions and therefore cannot service them
401 /// concurrently; `split` is the explicit opt-in.
402 ///
403 /// The two halves are independent owners of their tungstenite
404 /// sub-streams. Dropping one half does not close the connection until
405 /// the other half is also dropped (tungstenite's `WebSocketStream`
406 /// only initiates the close handshake when both halves are gone). To
407 /// initiate a clean shutdown from a split session, drop the sender
408 /// after sending a final request and read from the receiver until it
409 /// returns `None`.
410 pub fn split(self) -> (WsSender, WsReceiver) {
411 let WsSession { sender, receiver } = self;
412 (sender, receiver)
413 }
414}
415
416/// Parse a raw WebSocket text frame into a `WsFrame`.
417///
418/// Two passes over `text`:
419///
420/// 1. Parse to [`serde_json::Value`] to extract `@type` (and to keep a
421/// structured fallback alive for the Unknown branch).
422/// 2. For the typed branches (`StateChange`, `Response`), call
423/// [`serde_json::from_str`] directly against the original `text`.
424///
425/// The previous shape `let raw = val.clone(); from_value::<T>(val)` paid a
426/// deep Value clone on every successful frame even though `raw` was thrown
427/// away. For 1-MiB-cap WS messages on a hot push path, the clone allocates
428/// a HashMap per `Value::Object` and a `String` per `Value::String` and
429/// dropped them moments later. Two text parses are cheaper for typical
430/// payload shapes than one parse + one deep Value clone, and the borrow
431/// checker no longer needs ownership tricks (bd:JMAP-6lsm.11).
432fn parse_ws_frame(text: &str) -> Result<WsFrame, crate::error::ClientError> {
433 let val: serde_json::Value =
434 serde_json::from_str(text).map_err(crate::error::ClientError::from_parse)?;
435
436 let type_name = val
437 .get("@type")
438 .and_then(|v| v.as_str())
439 .unwrap_or("<no @type>")
440 .to_owned();
441
442 match type_name.as_str() {
443 // A malformed StateChange is degraded to Unknown rather than a
444 // transport error. A single bad server frame must not kill the
445 // entire WebSocket connection; only tungstenite transport errors
446 // warrant a reconnect. The `val` we already parsed is the Unknown
447 // payload — no clone needed.
448 "StateChange" => match serde_json::from_str::<StateChange>(text) {
449 Ok(sc) => Ok(WsFrame::StateChange(sc)),
450 Err(_) => Ok(WsFrame::Unknown {
451 type_name,
452 raw: val,
453 }),
454 },
455 // Same degradation policy for malformed Response frames.
456 "Response" => match serde_json::from_str::<jmap_types::JmapResponse>(text) {
457 Ok(r) => Ok(WsFrame::Response(r)),
458 Err(_) => Ok(WsFrame::Unknown {
459 type_name,
460 raw: val,
461 }),
462 },
463 _ => Ok(WsFrame::Unknown {
464 type_name,
465 raw: val,
466 }),
467 }
468}
469
470/// Open a JMAP WebSocket connection (RFC 8887).
471///
472/// `ws_url` must come from the session document's WebSocket capability URL
473/// (a `wss://` endpoint in production; `ws://` is accepted in tests).
474///
475/// `auth_header` is an optional `(header-name, header-value)` pair injected
476/// into the WebSocket upgrade request. Pass `None` when the server does not
477/// require authentication headers on the WebSocket handshake.
478///
479/// Returns `ClientError::InvalidArgument` if the URL scheme is not
480/// `ws://` or `wss://`, preventing accidental use with untrusted URLs.
481///
482/// The returned [`WsSession`] provides [`WsSession::next_frame`] for receiving
483/// events. The caller is responsible for reconnecting after disconnect with
484/// exponential backoff.
485///
486/// Uses [`DEFAULT_WS_MAX_MESSAGE_BYTES`] as the per-message / per-frame cap.
487/// Callers that need a different cap should use [`connect_ws_with_limit`] or
488/// [`crate::JmapClient::connect_ws_session`] (which reads `ClientConfig::max_ws_message`).
489///
490/// # Security
491///
492/// The `auth_header` value is a credential and must not be logged or
493/// echoed back to other systems. Treat it with the same care as a
494/// [`crate::auth::BearerAuth`] token.
495pub async fn connect_ws(
496 ws_url: &str,
497 auth_header: Option<crate::auth::AuthHeader<'_>>,
498) -> Result<WsSession, crate::error::ClientError> {
499 connect_ws_with_limit(ws_url, auth_header, DEFAULT_WS_MAX_MESSAGE_BYTES).await
500}
501
502/// Establish a WebSocket connection with an explicit per-message / per-frame
503/// byte cap.
504///
505/// Same contract as [`connect_ws`] but lets the caller pin the
506/// `max_message_size` / `max_frame_size` config passed to tungstenite.
507/// Useful when the JMAP server is known to send larger pushes than the
508/// 1 MiB default (e.g. some Mailbox/changes push payloads on accounts with
509/// many mailboxes can exceed 1 MiB).
510///
511/// `max_message_bytes` MUST be > 0; tungstenite treats `Some(0)` as
512/// "no message of any size is acceptable" which is a misconfiguration trap.
513/// We surface `ClientError::InvalidArgument` instead.
514///
515/// # Security
516///
517/// The `auth_header` value is a credential and must not be logged or
518/// echoed back to other systems. Treat it with the same care as a
519/// [`crate::auth::BearerAuth`] token. The `ClientError::InvalidArgument`
520/// values produced for malformed auth header names or values are
521/// constructed without the original bytes, but callers should still
522/// avoid printing or storing the `auth_header` they passed in.
523pub async fn connect_ws_with_limit(
524 ws_url: &str,
525 auth_header: Option<crate::auth::AuthHeader<'_>>,
526 max_message_bytes: usize,
527) -> Result<WsSession, crate::error::ClientError> {
528 if max_message_bytes == 0 {
529 return Err(crate::error::ClientError::InvalidArgument(
530 "connect_ws_with_limit: max_message_bytes must be > 0".to_owned(),
531 ));
532 }
533 // Validate scheme to prevent SSRF via a compromised or MITM'd session.
534 // Case-insensitive check per RFC 3986 §3.1: only the SCHEME component is
535 // case-insensitive, not the path/query — so split off the scheme and
536 // compare with eq_ignore_ascii_case rather than lowercasing the whole
537 // URL. Lowercasing the whole URL allocated a fresh String the size of
538 // the URL on every connect (bd:JMAP-6lsm.9). The original (unmodified)
539 // URL is passed to tungstenite and kept in error messages for diagnostics.
540 let scheme_ok = ws_url
541 .split_once("://")
542 .is_some_and(|(s, _)| s.eq_ignore_ascii_case("ws") || s.eq_ignore_ascii_case("wss"));
543 if !scheme_ok {
544 return Err(crate::error::ClientError::InvalidArgument(format!(
545 "WebSocket URL must start with ws:// or wss://, got: {ws_url:?}"
546 )));
547 }
548
549 let mut request = ws_url
550 .into_client_request()
551 .map_err(crate::error::ClientError::from_ws)?;
552
553 if let Some(header) = auth_header {
554 // Both arms construct ClientError::InvalidArgument with a fixed
555 // string and deliberately discard the http-crate's Display output
556 // for the inner error. The original `name` / `value` bytes are
557 // credential-adjacent (the name component is less sensitive than
558 // the value, but a future http-crate version could begin echoing
559 // bytes in its Display impl). Defense-in-depth: keep neither in
560 // the error chain.
561 let hdr_name = http::HeaderName::from_str(header.name()).map_err(|_| {
562 crate::error::ClientError::InvalidArgument("invalid auth header name".to_owned())
563 })?;
564 let hdr_value = http::HeaderValue::from_str(header.expose_value()).map_err(|_| {
565 crate::error::ClientError::InvalidArgument("invalid auth header value".to_owned())
566 })?;
567 request.headers_mut().insert(hdr_name, hdr_value);
568 }
569
570 // WebSocketConfig is #[non_exhaustive] in tungstenite; use Default + field assignment.
571 let mut config = WebSocketConfig::default();
572 config.max_message_size = Some(max_message_bytes);
573 config.max_frame_size = Some(max_message_bytes);
574
575 // Apply a 10-second connect timeout, consistent with the HTTP transport's
576 // connect_timeout in DefaultTransport/CustomCaTransport. tungstenite does
577 // not expose a connect timeout parameter, so we wrap at the Future level.
578 // A stalled TCP or TLS handshake would otherwise block indefinitely.
579 let connect_result = tokio::time::timeout(
580 std::time::Duration::from_secs(10),
581 tokio_tungstenite::connect_async_with_config(request, Some(config), false),
582 )
583 .await
584 .map_err(|_elapsed| {
585 // Synthesize an Io-kind transport error to surface the timeout
586 // through the public WebSocketError accessors (is_io() will be
587 // true). The third-party error type is constructed locally and
588 // immediately wrapped, so it does not leak to callers.
589 crate::error::ClientError::from_ws(tokio_tungstenite::tungstenite::Error::Io(
590 std::io::Error::new(
591 std::io::ErrorKind::TimedOut,
592 "WebSocket connect timed out after 10 seconds",
593 ),
594 ))
595 })?;
596 let (ws_stream, _response) = connect_result.map_err(crate::error::ClientError::from_ws)?;
597
598 let (sink, stream) = ws_stream.split();
599 Ok(WsSession {
600 sender: WsSender { sink },
601 receiver: WsReceiver { stream },
602 })
603}
604
605impl std::fmt::Debug for WsSession {
606 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
607 f.debug_struct("WsSession").finish_non_exhaustive()
608 }
609}
610
611impl std::fmt::Debug for WsSender {
612 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
613 f.debug_struct("WsSender").finish_non_exhaustive()
614 }
615}
616
617impl std::fmt::Debug for WsReceiver {
618 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
619 f.debug_struct("WsReceiver").finish_non_exhaustive()
620 }
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626
627 /// Verify WsFrame does not contain ChatTyping or ChatPresence variants.
628 /// This exhaustive match will fail to compile if either variant is reintroduced.
629 #[test]
630 fn ws_frame_has_no_chat_variants() {
631 let frame = WsFrame::Unknown {
632 type_name: "test".to_owned(),
633 raw: serde_json::Value::Null,
634 };
635 match frame {
636 WsFrame::StateChange(_) => {}
637 WsFrame::Response(_) => {}
638 WsFrame::Unknown { .. } => {}
639 }
640 }
641
642 /// Oracle: parse_ws_frame dispatches on @type field and produces a typed StateChange.
643 /// Wire format from RFC 8620 §7.1.1 example.
644 #[test]
645 fn parse_state_change() {
646 let json = r#"{"@type":"StateChange","changed":{"account1":{"Mail":"s2"}}}"#;
647 let frame = parse_ws_frame(json).expect("must parse");
648 match frame {
649 WsFrame::StateChange(sc) => {
650 let account = sc
651 .changed
652 .get("account1")
653 .expect("account1 must be present");
654 assert_eq!(account.get("Mail").map(|s| s.as_ref()), Some("s2"));
655 }
656 other => panic!("expected StateChange, got {other:?}"),
657 }
658 }
659
660 /// Oracle: a StateChange with missing `changed` field degrades to Unknown.
661 #[test]
662 fn parse_malformed_state_change_degrades_to_unknown() {
663 let json = r#"{"@type":"StateChange","unexpected_field":42}"#;
664 let frame = parse_ws_frame(json).expect("must not error");
665 match frame {
666 WsFrame::Unknown { type_name, .. } => assert_eq!(type_name, "StateChange"),
667 other => panic!("expected Unknown, got {other:?}"),
668 }
669 }
670
671 /// Oracle: parse_ws_frame returns Unknown for unrecognized @type.
672 /// Derived from parse_unknown_type test in source ws/mod.rs.
673 #[test]
674 fn parse_unknown_type() {
675 let json = r#"{"@type":"FutureEvent","foo":"bar"}"#;
676 let frame = parse_ws_frame(json).expect("must parse");
677 match frame {
678 WsFrame::Unknown { type_name, .. } => assert_eq!(type_name, "FutureEvent"),
679 other => panic!("expected Unknown, got {other:?}"),
680 }
681 }
682
683 /// Oracle: parse_ws_frame returns Unknown for missing @type.
684 /// Derived from parse_missing_type_field test in source ws/mod.rs.
685 #[test]
686 fn parse_missing_type_field() {
687 let json = r#"{"foo":"bar"}"#;
688 let frame = parse_ws_frame(json).expect("must parse");
689 assert!(matches!(frame, WsFrame::Unknown { .. }));
690 }
691
692 /// Oracle: parse_ws_frame returns Err(Parse) for invalid JSON.
693 /// Derived from parse_invalid_json_returns_parse_error test in source ws/mod.rs.
694 #[test]
695 fn parse_invalid_json_returns_parse_error() {
696 let err = parse_ws_frame("not json").expect_err("must fail");
697 assert!(matches!(err, crate::error::ClientError::Parse(_)));
698 }
699
700 /// Oracle: RFC 8887 §4.3.2 — every JMAP request sent over WebSocket MUST
701 /// include "@type": "Request". Tests WsRequestFrame serde directly to
702 /// verify the #[serde(rename = "@type")] attribute and flatten are correct.
703 #[test]
704 fn send_request_includes_at_type_request() {
705 let req = jmap_types::JmapRequest::new(
706 vec!["urn:ietf:params:jmap:core".to_owned()],
707 vec![],
708 None,
709 );
710 let frame = WsRequestFrame {
711 ws_type: "Request",
712 id: None,
713 inner: &req,
714 };
715 let serialized = serde_json::to_string(&frame).expect("WsRequestFrame must serialize");
716 assert!(
717 serialized.contains("\"@type\":\"Request\""),
718 "RFC 8887 §4.3.2 requires @type:Request in outgoing WS frames; got: {serialized}"
719 );
720 }
721
722 /// Oracle: RFC 8887 §4.3.2 — optional `id` field is echoed in the response.
723 /// When an id is supplied, WsRequestFrame must include it in the serialized frame.
724 #[test]
725 fn send_request_includes_id_when_provided() {
726 let req = jmap_types::JmapRequest::new(
727 vec!["urn:ietf:params:jmap:core".to_owned()],
728 vec![],
729 None,
730 );
731 let frame = WsRequestFrame {
732 ws_type: "Request",
733 id: Some("req-42"),
734 inner: &req,
735 };
736 let serialized = serde_json::to_string(&frame).expect("WsRequestFrame must serialize");
737 assert!(
738 serialized.contains("\"id\":\"req-42\""),
739 "RFC 8887 §4.3.2 optional id must be present when provided; got: {serialized}"
740 );
741 }
742
743 /// Oracle: RFC 8887 §4.3.2 — when id is None, no `id` field appears in the frame.
744 /// WsRequestFrame uses skip_serializing_if to omit the field entirely.
745 #[test]
746 fn send_request_omits_id_when_none() {
747 let req = jmap_types::JmapRequest::new(
748 vec!["urn:ietf:params:jmap:core".to_owned()],
749 vec![],
750 None,
751 );
752 let frame = WsRequestFrame {
753 ws_type: "Request",
754 id: None,
755 inner: &req,
756 };
757 let serialized = serde_json::to_string(&frame).expect("WsRequestFrame must serialize");
758 assert!(
759 !serialized.contains("\"id\":"),
760 "RFC 8887 §4.3.2: no id field must appear when id is None; got: {serialized}"
761 );
762 }
763
764 /// Oracle: connect_ws must reject http:// and https:// URLs with InvalidArgument.
765 ///
766 /// This is the documented SSRF prevention guard: a compromised or MITM'd session
767 /// could send an http:// URL; we must not follow it as a WebSocket URL.
768 /// The scheme check runs before any network I/O.
769 /// Derived from connect_ws_rejects_non_ws_schemes test in source ws/mod.rs.
770 #[tokio::test]
771 async fn connect_ws_rejects_non_ws_schemes() {
772 for bad_url in &["http://host/", "https://host/", "ftp://host/"] {
773 let result = connect_ws(bad_url, None).await.map(|_| ());
774 match result {
775 Err(crate::error::ClientError::InvalidArgument(_)) => {}
776 other => panic!("expected InvalidArgument for {bad_url:?}, got {other:?}"),
777 }
778 }
779 }
780
781 // -----------------------------------------------------------------------
782 // classify_message — bd:JMAP-6lsm.6
783 // -----------------------------------------------------------------------
784
785 /// Oracle: Text frames classify as Text. The independent oracle is
786 /// the next_frame contract in the docstring above.
787 #[test]
788 fn classify_text_message() {
789 let m = Message::Text("hi".into());
790 assert_eq!(classify_message(&m), MessageDisposition::Text);
791 }
792
793 /// Oracle: Close frames classify as Close, ending the stream.
794 #[test]
795 fn classify_close_message() {
796 let m = Message::Close(None);
797 assert_eq!(classify_message(&m), MessageDisposition::Close);
798 }
799
800 /// Oracle: Binary frames violate RFC 8887 §4.1 and must classify as
801 /// Binary so the next_frame loop surfaces UnexpectedResponse rather
802 /// than silently skipping (the bug JMAP-6lsm.6 fixes). The independent
803 /// oracle is RFC 8887 §4.1.
804 #[test]
805 fn classify_binary_message_is_not_skipped() {
806 let m = Message::Binary(vec![1, 2, 3].into());
807 assert_eq!(classify_message(&m), MessageDisposition::Binary);
808 assert_ne!(
809 classify_message(&m),
810 MessageDisposition::Skip,
811 "Binary must NOT be silently skipped (RFC 8887 §4.1)"
812 );
813 }
814
815 /// Oracle: Ping/Pong frames classify as Skip. Tungstenite handles
816 /// them at the protocol layer, so seeing them at the Message layer
817 /// is unusual but legal — skip and continue.
818 #[test]
819 fn classify_ping_pong_messages_are_skipped() {
820 let ping = Message::Ping(vec![].into());
821 let pong = Message::Pong(vec![].into());
822 assert_eq!(classify_message(&ping), MessageDisposition::Skip);
823 assert_eq!(classify_message(&pong), MessageDisposition::Skip);
824 }
825
826 /// Tripwire: the consecutive-skip cap is the documented value.
827 /// A future retune will fail this test loudly so the change is
828 /// visible in CI. Documented value is 64 (see the const docstring).
829 #[test]
830 fn consecutive_skip_cap_matches_documented_value() {
831 assert_eq!(MAX_CONSECUTIVE_NON_TEXT_FRAMES, 64);
832 }
833
834 /// Oracle: bd:JMAP-6r7c.5 / workspace AGENTS.md "Security testing"
835 /// pattern 1 — per-type Debug redaction canary.
836 ///
837 /// A future contributor restoring `#[derive(Debug)]` on `WsFrame` would
838 /// re-expose the credential-grade material that can land in
839 /// `Unknown.raw` (push verification codes, federation handshake tokens,
840 /// session-rotation challenges). This test constructs a canary literal
841 /// inside the `raw` field and asserts the literal does not appear in
842 /// the `{:?}`-formatted output of the frame.
843 #[test]
844 fn ws_frame_unknown_raw_is_redacted_in_debug_output() {
845 let canary = "redaction-canary-cred-WFTMr8FoYpfP-do-not-leak";
846 let frame = WsFrame::Unknown {
847 type_name: "PushVerification".to_owned(),
848 raw: serde_json::json!({
849 "verificationCode": canary,
850 }),
851 };
852 let rendered = format!("{frame:?}");
853 assert!(
854 !rendered.contains(canary),
855 "WsFrame::Unknown Debug must redact `raw`; the canary literal \
856 appeared in the rendered output, indicating either \
857 #[derive(Debug)] was restored or the manual Debug impl \
858 forgot to redact the raw field. Rendered output: {rendered}"
859 );
860 assert!(
861 rendered.contains("[REDACTED]"),
862 "WsFrame::Unknown Debug must render the redaction placeholder; \
863 rendered output: {rendered}"
864 );
865 assert!(
866 rendered.contains("PushVerification"),
867 "WsFrame::Unknown Debug must still surface type_name for \
868 diagnostics; rendered output: {rendered}"
869 );
870 }
871
872 /// Oracle: bd:JMAP-6r7c.5 — the Debug impl must still render the
873 /// other two variants usefully. A naive impl that redacted everything
874 /// would also be wrong; the security goal is narrowly scoped to the
875 /// `Unknown.raw` field.
876 #[test]
877 fn ws_frame_other_variants_remain_useful_in_debug_output() {
878 let response_frame = WsFrame::Response(jmap_types::JmapResponse::new(
879 vec![],
880 "test-session".into(),
881 None,
882 ));
883 let rendered = format!("{response_frame:?}");
884 assert!(
885 rendered.starts_with("Response"),
886 "Response variant Debug must surface variant tag; got: {rendered}"
887 );
888 assert!(
889 rendered.contains("test-session"),
890 "Response variant Debug must surface session_state for \
891 diagnostics; got: {rendered}"
892 );
893 }
894
895 /// bd:JMAP-6r7c.31 — WsSender and WsReceiver must each be `Send` so a
896 /// caller can move one half into a separate tokio task while keeping
897 /// the other half in the current task. The whole point of the split
898 /// is two-task concurrent send-while-receiving; if either half were
899 /// `!Send`, the split would not enable it.
900 ///
901 /// `Sync` is required because `WsSender` behind `Arc` (for concurrent
902 /// sends from multiple tasks) needs `Arc<WsSender>: Send`, which
903 /// requires `WsSender: Sync`. Compile-time check only — nothing to
904 /// assert at runtime.
905 #[test]
906 fn ws_sender_and_receiver_are_send_and_sync() {
907 fn assert_send_sync<T: Send + Sync>() {}
908 assert_send_sync::<WsSender>();
909 assert_send_sync::<WsReceiver>();
910 // The unified session was already Send+Sync before the split
911 // landed; assert here as a regression guard so a future refactor
912 // that accidentally introduces a `!Send` or `!Sync` field in
913 // WsSession is caught.
914 assert_send_sync::<WsSession>();
915 }
916}