Skip to main content

jmap_base_client/ws/
mod.rs

1//! WebSocket transport for JMAP (RFC 8887).
2//!
3//! Provides [`connect_ws`] which establishes a WebSocket connection and
4//! returns a [`WsSession`] for sending and receiving frames.
5//!
6//! URL source: `Session::capabilities["urn:ietf:params:jmap:websocket"].url`
7//! (the session document advertises the WebSocket endpoint).
8
9use std::str::FromStr as _;
10
11use futures::SinkExt as _;
12use futures::StreamExt as _;
13use tokio_tungstenite::tungstenite::client::IntoClientRequest as _;
14use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
15use tokio_tungstenite::tungstenite::Message;
16
17use crate::push::StateChange;
18
19/// Wire frame sent from the client to the server over WebSocket (RFC 8887 §4.3.2).
20///
21/// Wraps a [`jmap_types::JmapRequest`] and injects the mandatory `@type: "Request"`
22/// field (and optional `id`) in a single `serde_json::to_string` pass, avoiding
23/// the `to_value` + mutation + `to_string` double-serialization that the naive
24/// approach requires.
25#[derive(serde::Serialize)]
26struct WsRequestFrame<'a> {
27    /// RFC 8887 §4.3.2 — every JMAP request frame MUST carry "@type": "Request".
28    #[serde(rename = "@type")]
29    ws_type: &'static str,
30    /// Optional correlation ID echoed back in the server's Response frame.
31    #[serde(skip_serializing_if = "Option::is_none")]
32    id: Option<&'a str>,
33    /// The JMAP request payload; flattened into the enclosing JSON object.
34    #[serde(flatten)]
35    inner: &'a jmap_types::JmapRequest,
36}
37
38/// Maximum WebSocket message size (1 MiB), consistent with the SSE frame limit.
39/// Prevents a misbehaving or hostile server from forcing the client to buffer
40/// large messages over the event connection.
41/// Default per-message / per-frame byte cap for WebSocket connections opened
42/// via [`connect_ws`] (which does not take a limit parameter). Callers that
43/// need a different cap should use [`connect_ws_with_limit`] or the
44/// [`crate::JmapClient::connect_ws_session`] convenience method which
45/// reads the `max_ws_message` field from `ClientConfig`. Default: 1 MiB.
46pub const DEFAULT_WS_MAX_MESSAGE_BYTES: usize = 1 << 20;
47
48/// A parsed frame received from the JMAP WebSocket.
49///
50/// Marked `#[non_exhaustive]` because the spec may define additional
51/// `@type` values in future revisions.
52///
53/// # `Debug` redaction
54///
55/// `WsFrame` has a hand-written [`std::fmt::Debug`] impl rather than
56/// `#[derive(Debug)]` so the [`Unknown::raw`](WsFrame::Unknown) field is
57/// printed as `[REDACTED]` instead of being serialised verbatim. See the
58/// field doc on `Unknown.raw` for the credential-leak class this guards
59/// against (bd:JMAP-6r7c.5). Use the structured accessors on the frame
60/// itself, not `{:?}`, when you need the raw value.
61#[non_exhaustive]
62#[derive(Clone, PartialEq)]
63pub enum WsFrame {
64    /// RFC 8620 §7.1 StateChange — one or more object types have changed
65    /// state; client must re-fetch the affected data types.
66    StateChange(StateChange),
67    /// RFC 8887 Response — reply to a JMAP request sent on this connection.
68    Response(jmap_types::JmapResponse),
69    /// Unrecognized `@type` — silently ignored per forward-compatibility rules
70    /// (RFC 8887 §4.3.1: clients SHOULD ignore unknown message types).
71    ///
72    /// Also produced when a known type (`"Response"` or `"StateChange"`) fails
73    /// to deserialize — `type_name` will be `"Response"` or `"StateChange"` in
74    /// that case, which can signal server misbehavior or a schema version
75    /// mismatch. Callers that log unknown frames should check for these names.
76    Unknown {
77        /// Value of the `@type` field. Either an unrecognized message type
78        /// per RFC 8887 §4.3.1, or `"Response"` / `"StateChange"` when a known
79        /// type failed to deserialize into its typed variant.
80        type_name: String,
81        /// Raw JSON object as received from the server, preserved for
82        /// forward-compatibility diagnostics.
83        ///
84        /// **DO NOT log this field verbatim.** Future or extension JMAP
85        /// WebSocket message types may carry credential-grade material —
86        /// push verification codes (RFC 8887 §7.2), federation handshake
87        /// tokens, session-rotation challenges, etc. — and a malformed
88        /// `Response` to a method like `PushSubscription/get` can echo a
89        /// `verificationCode` back into this field.
90        ///
91        /// The enclosing `WsFrame` uses a hand-written `Debug` impl that
92        /// renders this field as `[REDACTED]` to neutralise the natural
93        /// `{:?}`-leaks-the-field failure mode (bd:JMAP-6r7c.5). For
94        /// operator logs, prefer logging `type_name` only, or apply a
95        /// project-specific redaction filter before passing `raw` to a
96        /// logging sink. See bd:JMAP-sc1b.98.
97        raw: serde_json::Value,
98    },
99}
100
101impl std::fmt::Debug for WsFrame {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        match self {
104            WsFrame::StateChange(sc) => f.debug_tuple("StateChange").field(sc).finish(),
105            WsFrame::Response(r) => f.debug_tuple("Response").field(r).finish(),
106            // `raw` may carry credential-grade material in failure modes
107            // (see the field doc); render as a literal string so neither
108            // `{:?}` nor `tracing::*` with `?frame` reveals the payload.
109            WsFrame::Unknown { type_name, raw: _ } => f
110                .debug_struct("Unknown")
111                .field("type_name", type_name)
112                .field("raw", &"[REDACTED]")
113                .finish(),
114        }
115    }
116}
117
118type Inner =
119    tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
120
121/// An established JMAP WebSocket session (RFC 8887).
122///
123/// Call [`next_frame`](WsSession::next_frame) in a loop to receive events.
124/// Use [`send_request`](WsSession::send_request) to transmit JMAP requests.
125///
126/// The caller is responsible for reconnecting after the stream ends or returns
127/// a transport error. Use exponential backoff.
128///
129/// # Drop and cancellation (bd:JMAP-6r7c.24)
130///
131/// `WsSession` may be dropped at any point — including from inside a
132/// `tokio::select!` losing branch while [`next_frame`](WsSession::next_frame)
133/// is awaiting. Dropping is always safe and always synchronous:
134///
135/// - **Partial frame state is discarded.** The split sink and stream halves
136///   are owned by the session; dropping the session drops both halves and
137///   any in-flight tungstenite buffering.
138/// - **The underlying TCP / TLS connection is closed.** No `Close` frame is
139///   sent — dropping skips the WebSocket close handshake. Callers that want
140///   the server to see a clean shutdown should arrange to receive the
141///   server's `Close` (which `next_frame` returns as `None`) before drop;
142///   this crate does not currently expose an explicit client-initiated
143///   `close()` method.
144/// - **Resumption is the caller's job.** If you need to recover the
145///   conversation state, you must either persist enough application-level
146///   state to replay, or rely on the server's push-state replay protocol.
147///   `WsSession` itself buffers nothing the caller can replay.
148///
149/// `next_frame` is cancel-safe in the sense that cancelling its future
150/// (via `select!` or `drop`) does not corrupt subsequent calls to
151/// `next_frame` on the same `WsSession` — but cancelling means giving up
152/// on any partial frame that was being read; the next `next_frame` call
153/// starts from the next complete message.
154pub struct WsSession {
155    sender: WsSender,
156    receiver: WsReceiver,
157}
158
159/// Owning send-half of a WebSocket connection (bd:JMAP-6r7c.31).
160///
161/// Returned from [`WsSession::split`]. Holds the underlying tungstenite
162/// sink and exposes the per-direction send methods that the unified
163/// [`WsSession`] previously bundled with the receive half. Use with
164/// [`WsReceiver`] in two-task topologies (one task in a `next_frame` loop,
165/// one task occasionally sending requests) — the previous unified shape
166/// required serialising send and receive through a single `&mut WsSession`
167/// borrow, which made concurrent send-while-receiving impossible without a
168/// `Mutex` that holds across `.await`.
169pub struct WsSender {
170    sink: futures::stream::SplitSink<Inner, Message>,
171}
172
173/// Owning receive-half of a WebSocket connection (bd:JMAP-6r7c.31).
174///
175/// Returned from [`WsSession::split`]. Holds the underlying tungstenite
176/// stream and exposes [`next_frame`](WsReceiver::next_frame). See
177/// [`WsSender`] for the companion type and the concurrency rationale.
178pub struct WsReceiver {
179    stream: futures::stream::SplitStream<Inner>,
180}
181
182/// Maximum number of consecutive non-Text non-Close non-Binary frames
183/// (Ping, Pong, Frame, etc.) `next_frame` will silently skip in a single call.
184///
185/// Tungstenite handles ping/pong at the protocol layer, so seeing them at the
186/// `Message` layer is unusual but legal — we skip them. A misbehaving or
187/// hostile server that floods the stream with no-op frames could otherwise
188/// starve a caller of `next_frame` indefinitely; this cap surfaces an
189/// `UnexpectedResponse` error before that can happen. 64 is high enough that
190/// a normal connection never trips it (typical SSE/WS streams interleave at
191/// most a handful of pings between data frames) and low enough that the
192/// caller doesn't wait long if a bad server is talking nonsense.
193///
194/// `Binary` frames are NOT counted here — they violate RFC 8887 §4.1 and
195/// surface as `UnexpectedResponse` immediately on the first occurrence.
196///
197/// # Do not remove the cap (bd:JMAP-6r7c.17, originally bd:JMAP-6lsm.6)
198///
199/// A future contributor may suggest "tungstenite handles pings at the
200/// protocol layer, just continue the loop silently for any non-text
201/// non-close frame." That re-introduces the resource-exhaustion bug
202/// bd:JMAP-6lsm.6 fixed. The cap is load-bearing for five reasons:
203///
204/// 1. **Starvation defense.** A hostile or misbehaving server can
205///    flood the stream with no-op frames (Ping, Pong, Frame, future
206///    variants) and starve a caller of `next_frame()` indefinitely.
207///    The cap surfaces this as an error before the caller's outer
208///    loop is starved.
209/// 2. **Calibrated threshold.** 64 is high enough that a normal
210///    connection never trips it. Lower numbers (e.g. 8) would
211///    false-positive on networks with weak NAT keepalives where the
212///    server sends frequent pings to keep the connection alive.
213/// 3. **Binary frames are a separate guarantee.** They are NOT
214///    subsumed by the cap — they violate RFC 8887 §4.1 and surface
215///    immediately on first occurrence (see `classify_message`
216///    `MessageDisposition::UnexpectedFrame` path). Removing the cap
217///    does not affect Binary handling; the threats are different.
218/// 4. **Unit-testable policy.** `classify_message` (free function,
219///    not a method) is testable without a real WebSocket. The test
220///    suite in this file exercises the classification explicitly so
221///    a refactor that "just changes the loop" doesn't silently lose
222///    the policy.
223/// 5. **Pinned by tripwire test.**
224///    `consecutive_skip_cap_matches_documented_value` asserts the
225///    constant equals 64. A retune fails the test loudly so the
226///    change is visible in CI, forcing a deliberate choice rather
227///    than a silent regression.
228///
229/// Resist requests to "simplify" by removing the count.
230const MAX_CONSECUTIVE_NON_TEXT_FRAMES: usize = 64;
231
232/// Classify a single tungstenite [`Message`] into a [`MessageDisposition`]
233/// that tells the [`WsSession::next_frame`] loop what to do with it.
234///
235/// Extracted as a free function so the policy is unit-testable without a
236/// real WebSocket: see the inline test module. Pure function over the
237/// message variant.
238fn classify_message(msg: &Message) -> MessageDisposition {
239    match msg {
240        Message::Text(_) => MessageDisposition::Text,
241        Message::Close(_) => MessageDisposition::Close,
242        Message::Binary(_) => MessageDisposition::Binary,
243        // Ping, Pong, Frame, and any future variants: skip, but count.
244        _ => MessageDisposition::Skip,
245    }
246}
247
248/// Decision a `next_frame` loop iteration takes after looking at one
249/// [`Message`]. See [`classify_message`].
250#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251enum MessageDisposition {
252    /// Text frame: hand to `parse_ws_frame` and return its result.
253    Text,
254    /// Close frame: end the stream by returning `None`.
255    Close,
256    /// Binary frame: violates RFC 8887 §4.1; surface as
257    /// `UnexpectedResponse` immediately on the first occurrence.
258    Binary,
259    /// Ping / Pong / Frame / future variants: silently skip and continue
260    /// the loop, subject to [`MAX_CONSECUTIVE_NON_TEXT_FRAMES`].
261    Skip,
262}
263
264impl WsReceiver {
265    /// Receive the next parsed frame from the server.
266    ///
267    /// Returns `None` when the server has cleanly closed the connection.
268    /// Returns `Some(Err(...))` on parse failure, transport error, RFC 8887
269    /// §4.1 violation (Binary frame), or starvation cap (more than 64
270    /// consecutive Ping/Pong/Frame messages — see the private
271    /// `MAX_CONSECUTIVE_NON_TEXT_FRAMES` constant for the exact value).
272    /// After a transport error the connection is broken and `next_frame`
273    /// must not be called again. After an `UnexpectedResponse` error the
274    /// underlying stream is still healthy — the caller may choose to
275    /// ignore it and retry, or to disconnect.
276    pub async fn next_frame(&mut self) -> Option<Result<WsFrame, crate::error::ClientError>> {
277        let mut consecutive_skips = 0usize;
278        loop {
279            let msg = match self.stream.next().await? {
280                Ok(m) => m,
281                Err(e) => return Some(Err(crate::error::ClientError::from_ws(e))),
282            };
283            match classify_message(&msg) {
284                MessageDisposition::Text => {
285                    let Message::Text(text) = msg else {
286                        // Unreachable: classify_message returned Text only for
287                        // Message::Text. Defensive in case the variant grows.
288                        return Some(Err(crate::error::ClientError::UnexpectedResponse(
289                            "WebSocket: classify_message returned Text for non-Text variant".into(),
290                        )));
291                    };
292                    return Some(parse_ws_frame(&text));
293                }
294                MessageDisposition::Close => return None,
295                MessageDisposition::Binary => {
296                    // RFC 8887 §4.1: JMAP only uses text frames. Surface the
297                    // violation; underlying stream is still healthy so the
298                    // caller can choose to retry next_frame if it wants.
299                    return Some(Err(crate::error::ClientError::UnexpectedResponse(
300                        "WebSocket: server sent Binary frame; RFC 8887 §4.1 mandates text frames"
301                            .into(),
302                    )));
303                }
304                MessageDisposition::Skip => {
305                    consecutive_skips = consecutive_skips.saturating_add(1);
306                    if consecutive_skips > MAX_CONSECUTIVE_NON_TEXT_FRAMES {
307                        return Some(Err(crate::error::ClientError::UnexpectedResponse(
308                            format!(
309                                "WebSocket: exceeded {MAX_CONSECUTIVE_NON_TEXT_FRAMES} consecutive non-text frames; possible server misbehaviour"
310                            ),
311                        )));
312                    }
313                }
314            }
315        }
316    }
317}
318
319impl WsSender {
320    /// Send a raw text frame over the WebSocket connection.
321    ///
322    /// Used by extension crates to send non-JMAP frames (e.g., JMAP Chat
323    /// ephemeral stream control messages).
324    pub async fn send_text(&mut self, text: String) -> Result<(), crate::error::ClientError> {
325        self.sink
326            .send(Message::Text(text.into()))
327            .await
328            .map_err(crate::error::ClientError::from_ws)
329    }
330
331    /// Send a JMAP request over the WebSocket connection.
332    ///
333    /// Serializes `req` and injects `"@type": "Request"` into the outgoing
334    /// JSON object as required by RFC 8887 §4.3.2.  The optional `id` is
335    /// echoed back in the corresponding `Response` frame, enabling out-of-order
336    /// correlation.
337    ///
338    /// # Errors
339    ///
340    /// Returns `ClientError::Serialize` if `req` cannot be serialized, or
341    /// `ClientError::WebSocket` on a transport failure.
342    pub async fn send_request(
343        &mut self,
344        req: &jmap_types::JmapRequest,
345        id: Option<&str>,
346    ) -> Result<(), crate::error::ClientError> {
347        // Wrap req in WsRequestFrame to inject @type and optional id in one
348        // serialization pass (no intermediate serde_json::Value allocation).
349        let frame = WsRequestFrame {
350            ws_type: "Request",
351            id,
352            inner: req,
353        };
354        let text =
355            serde_json::to_string(&frame).map_err(crate::error::ClientError::from_serialize)?;
356        self.sink
357            .send(Message::Text(text.into()))
358            .await
359            .map_err(crate::error::ClientError::from_ws)
360    }
361}
362
363impl WsSession {
364    /// Receive the next parsed frame from the server.
365    ///
366    /// Delegates to [`WsReceiver::next_frame`]. Use this unified handle when
367    /// a single task drives both send and receive; for the
368    /// receive-loop-in-a-separate-task topology, call [`split`](Self::split)
369    /// to get owned [`WsReceiver`] / [`WsSender`] halves.
370    pub async fn next_frame(&mut self) -> Option<Result<WsFrame, crate::error::ClientError>> {
371        self.receiver.next_frame().await
372    }
373
374    /// Send a raw text frame over the WebSocket connection.
375    ///
376    /// Delegates to [`WsSender::send_text`].
377    pub async fn send_text(&mut self, text: String) -> Result<(), crate::error::ClientError> {
378        self.sender.send_text(text).await
379    }
380
381    /// Send a JMAP request over the WebSocket connection.
382    ///
383    /// Delegates to [`WsSender::send_request`].
384    pub async fn send_request(
385        &mut self,
386        req: &jmap_types::JmapRequest,
387        id: Option<&str>,
388    ) -> Result<(), crate::error::ClientError> {
389        self.sender.send_request(req, id).await
390    }
391
392    /// Consume the session and return its owned send and receive halves
393    /// (bd:JMAP-6r7c.31).
394    ///
395    /// Use this when a caller needs to drive the receive loop and the
396    /// send path concurrently — typically from two `tokio::spawn`-ed
397    /// tasks, one running a `while let Some(...) = receiver.next_frame()
398    /// .await` loop and one occasionally sending requests via
399    /// `sender.send_request(...)`. The unified `WsSession` API requires
400    /// `&mut self` for both directions and therefore cannot service them
401    /// concurrently; `split` is the explicit opt-in.
402    ///
403    /// The two halves are independent owners of their tungstenite
404    /// sub-streams. Dropping one half does not close the connection until
405    /// the other half is also dropped (tungstenite's `WebSocketStream`
406    /// only initiates the close handshake when both halves are gone). To
407    /// initiate a clean shutdown from a split session, drop the sender
408    /// after sending a final request and read from the receiver until it
409    /// returns `None`.
410    pub fn split(self) -> (WsSender, WsReceiver) {
411        let WsSession { sender, receiver } = self;
412        (sender, receiver)
413    }
414}
415
416/// Parse a raw WebSocket text frame into a `WsFrame`.
417///
418/// Two passes over `text`:
419///
420/// 1. Parse to [`serde_json::Value`] to extract `@type` (and to keep a
421///    structured fallback alive for the Unknown branch).
422/// 2. For the typed branches (`StateChange`, `Response`), call
423///    [`serde_json::from_str`] directly against the original `text`.
424///
425/// The previous shape `let raw = val.clone(); from_value::<T>(val)` paid a
426/// deep Value clone on every successful frame even though `raw` was thrown
427/// away. For 1-MiB-cap WS messages on a hot push path, the clone allocates
428/// a HashMap per `Value::Object` and a `String` per `Value::String` and
429/// dropped them moments later. Two text parses are cheaper for typical
430/// payload shapes than one parse + one deep Value clone, and the borrow
431/// checker no longer needs ownership tricks (bd:JMAP-6lsm.11).
432fn parse_ws_frame(text: &str) -> Result<WsFrame, crate::error::ClientError> {
433    let val: serde_json::Value =
434        serde_json::from_str(text).map_err(crate::error::ClientError::from_parse)?;
435
436    let type_name = val
437        .get("@type")
438        .and_then(|v| v.as_str())
439        .unwrap_or("<no @type>")
440        .to_owned();
441
442    match type_name.as_str() {
443        // A malformed StateChange is degraded to Unknown rather than a
444        // transport error. A single bad server frame must not kill the
445        // entire WebSocket connection; only tungstenite transport errors
446        // warrant a reconnect. The `val` we already parsed is the Unknown
447        // payload — no clone needed.
448        "StateChange" => match serde_json::from_str::<StateChange>(text) {
449            Ok(sc) => Ok(WsFrame::StateChange(sc)),
450            Err(_) => Ok(WsFrame::Unknown {
451                type_name,
452                raw: val,
453            }),
454        },
455        // Same degradation policy for malformed Response frames.
456        "Response" => match serde_json::from_str::<jmap_types::JmapResponse>(text) {
457            Ok(r) => Ok(WsFrame::Response(r)),
458            Err(_) => Ok(WsFrame::Unknown {
459                type_name,
460                raw: val,
461            }),
462        },
463        _ => Ok(WsFrame::Unknown {
464            type_name,
465            raw: val,
466        }),
467    }
468}
469
470/// Open a JMAP WebSocket connection (RFC 8887).
471///
472/// `ws_url` must come from the session document's WebSocket capability URL
473/// (a `wss://` endpoint in production; `ws://` is accepted in tests).
474///
475/// `auth_header` is an optional `(header-name, header-value)` pair injected
476/// into the WebSocket upgrade request. Pass `None` when the server does not
477/// require authentication headers on the WebSocket handshake.
478///
479/// Returns `ClientError::InvalidArgument` if the URL scheme is not
480/// `ws://` or `wss://`, preventing accidental use with untrusted URLs.
481///
482/// The returned [`WsSession`] provides [`WsSession::next_frame`] for receiving
483/// events. The caller is responsible for reconnecting after disconnect with
484/// exponential backoff.
485///
486/// Uses [`DEFAULT_WS_MAX_MESSAGE_BYTES`] as the per-message / per-frame cap.
487/// Callers that need a different cap should use [`connect_ws_with_limit`] or
488/// [`crate::JmapClient::connect_ws_session`] (which reads `ClientConfig::max_ws_message`).
489///
490/// # Security
491///
492/// The `auth_header` value is a credential and must not be logged or
493/// echoed back to other systems. Treat it with the same care as a
494/// [`crate::auth::BearerAuth`] token.
495pub async fn connect_ws(
496    ws_url: &str,
497    auth_header: Option<crate::auth::AuthHeader<'_>>,
498) -> Result<WsSession, crate::error::ClientError> {
499    connect_ws_with_limit(ws_url, auth_header, DEFAULT_WS_MAX_MESSAGE_BYTES).await
500}
501
502/// Establish a WebSocket connection with an explicit per-message / per-frame
503/// byte cap.
504///
505/// Same contract as [`connect_ws`] but lets the caller pin the
506/// `max_message_size` / `max_frame_size` config passed to tungstenite.
507/// Useful when the JMAP server is known to send larger pushes than the
508/// 1 MiB default (e.g. some Mailbox/changes push payloads on accounts with
509/// many mailboxes can exceed 1 MiB).
510///
511/// `max_message_bytes` MUST be > 0; tungstenite treats `Some(0)` as
512/// "no message of any size is acceptable" which is a misconfiguration trap.
513/// We surface `ClientError::InvalidArgument` instead.
514///
515/// # Security
516///
517/// The `auth_header` value is a credential and must not be logged or
518/// echoed back to other systems. Treat it with the same care as a
519/// [`crate::auth::BearerAuth`] token. The `ClientError::InvalidArgument`
520/// values produced for malformed auth header names or values are
521/// constructed without the original bytes, but callers should still
522/// avoid printing or storing the `auth_header` they passed in.
523pub async fn connect_ws_with_limit(
524    ws_url: &str,
525    auth_header: Option<crate::auth::AuthHeader<'_>>,
526    max_message_bytes: usize,
527) -> Result<WsSession, crate::error::ClientError> {
528    if max_message_bytes == 0 {
529        return Err(crate::error::ClientError::InvalidArgument(
530            "connect_ws_with_limit: max_message_bytes must be > 0".to_owned(),
531        ));
532    }
533    // Validate scheme to prevent SSRF via a compromised or MITM'd session.
534    // Case-insensitive check per RFC 3986 §3.1: only the SCHEME component is
535    // case-insensitive, not the path/query — so split off the scheme and
536    // compare with eq_ignore_ascii_case rather than lowercasing the whole
537    // URL. Lowercasing the whole URL allocated a fresh String the size of
538    // the URL on every connect (bd:JMAP-6lsm.9). The original (unmodified)
539    // URL is passed to tungstenite and kept in error messages for diagnostics.
540    let scheme_ok = ws_url
541        .split_once("://")
542        .is_some_and(|(s, _)| s.eq_ignore_ascii_case("ws") || s.eq_ignore_ascii_case("wss"));
543    if !scheme_ok {
544        return Err(crate::error::ClientError::InvalidArgument(format!(
545            "WebSocket URL must start with ws:// or wss://, got: {ws_url:?}"
546        )));
547    }
548
549    let mut request = ws_url
550        .into_client_request()
551        .map_err(crate::error::ClientError::from_ws)?;
552
553    if let Some(header) = auth_header {
554        // Both arms construct ClientError::InvalidArgument with a fixed
555        // string and deliberately discard the http-crate's Display output
556        // for the inner error. The original `name` / `value` bytes are
557        // credential-adjacent (the name component is less sensitive than
558        // the value, but a future http-crate version could begin echoing
559        // bytes in its Display impl). Defense-in-depth: keep neither in
560        // the error chain.
561        let hdr_name = http::HeaderName::from_str(header.name()).map_err(|_| {
562            crate::error::ClientError::InvalidArgument("invalid auth header name".to_owned())
563        })?;
564        let hdr_value = http::HeaderValue::from_str(header.expose_value()).map_err(|_| {
565            crate::error::ClientError::InvalidArgument("invalid auth header value".to_owned())
566        })?;
567        request.headers_mut().insert(hdr_name, hdr_value);
568    }
569
570    // WebSocketConfig is #[non_exhaustive] in tungstenite; use Default + field assignment.
571    let mut config = WebSocketConfig::default();
572    config.max_message_size = Some(max_message_bytes);
573    config.max_frame_size = Some(max_message_bytes);
574
575    // Apply a 10-second connect timeout, consistent with the HTTP transport's
576    // connect_timeout in DefaultTransport/CustomCaTransport.  tungstenite does
577    // not expose a connect timeout parameter, so we wrap at the Future level.
578    // A stalled TCP or TLS handshake would otherwise block indefinitely.
579    let connect_result = tokio::time::timeout(
580        std::time::Duration::from_secs(10),
581        tokio_tungstenite::connect_async_with_config(request, Some(config), false),
582    )
583    .await
584    .map_err(|_elapsed| {
585        // Synthesize an Io-kind transport error to surface the timeout
586        // through the public WebSocketError accessors (is_io() will be
587        // true). The third-party error type is constructed locally and
588        // immediately wrapped, so it does not leak to callers.
589        crate::error::ClientError::from_ws(tokio_tungstenite::tungstenite::Error::Io(
590            std::io::Error::new(
591                std::io::ErrorKind::TimedOut,
592                "WebSocket connect timed out after 10 seconds",
593            ),
594        ))
595    })?;
596    let (ws_stream, _response) = connect_result.map_err(crate::error::ClientError::from_ws)?;
597
598    let (sink, stream) = ws_stream.split();
599    Ok(WsSession {
600        sender: WsSender { sink },
601        receiver: WsReceiver { stream },
602    })
603}
604
605impl std::fmt::Debug for WsSession {
606    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
607        f.debug_struct("WsSession").finish_non_exhaustive()
608    }
609}
610
611impl std::fmt::Debug for WsSender {
612    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
613        f.debug_struct("WsSender").finish_non_exhaustive()
614    }
615}
616
617impl std::fmt::Debug for WsReceiver {
618    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
619        f.debug_struct("WsReceiver").finish_non_exhaustive()
620    }
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626
627    /// Verify WsFrame does not contain ChatTyping or ChatPresence variants.
628    /// This exhaustive match will fail to compile if either variant is reintroduced.
629    #[test]
630    fn ws_frame_has_no_chat_variants() {
631        let frame = WsFrame::Unknown {
632            type_name: "test".to_owned(),
633            raw: serde_json::Value::Null,
634        };
635        match frame {
636            WsFrame::StateChange(_) => {}
637            WsFrame::Response(_) => {}
638            WsFrame::Unknown { .. } => {}
639        }
640    }
641
642    /// Oracle: parse_ws_frame dispatches on @type field and produces a typed StateChange.
643    /// Wire format from RFC 8620 §7.1.1 example.
644    #[test]
645    fn parse_state_change() {
646        let json = r#"{"@type":"StateChange","changed":{"account1":{"Mail":"s2"}}}"#;
647        let frame = parse_ws_frame(json).expect("must parse");
648        match frame {
649            WsFrame::StateChange(sc) => {
650                let account = sc
651                    .changed
652                    .get("account1")
653                    .expect("account1 must be present");
654                assert_eq!(account.get("Mail").map(|s| s.as_ref()), Some("s2"));
655            }
656            other => panic!("expected StateChange, got {other:?}"),
657        }
658    }
659
660    /// Oracle: a StateChange with missing `changed` field degrades to Unknown.
661    #[test]
662    fn parse_malformed_state_change_degrades_to_unknown() {
663        let json = r#"{"@type":"StateChange","unexpected_field":42}"#;
664        let frame = parse_ws_frame(json).expect("must not error");
665        match frame {
666            WsFrame::Unknown { type_name, .. } => assert_eq!(type_name, "StateChange"),
667            other => panic!("expected Unknown, got {other:?}"),
668        }
669    }
670
671    /// Oracle: parse_ws_frame returns Unknown for unrecognized @type.
672    /// Derived from parse_unknown_type test in source ws/mod.rs.
673    #[test]
674    fn parse_unknown_type() {
675        let json = r#"{"@type":"FutureEvent","foo":"bar"}"#;
676        let frame = parse_ws_frame(json).expect("must parse");
677        match frame {
678            WsFrame::Unknown { type_name, .. } => assert_eq!(type_name, "FutureEvent"),
679            other => panic!("expected Unknown, got {other:?}"),
680        }
681    }
682
683    /// Oracle: parse_ws_frame returns Unknown for missing @type.
684    /// Derived from parse_missing_type_field test in source ws/mod.rs.
685    #[test]
686    fn parse_missing_type_field() {
687        let json = r#"{"foo":"bar"}"#;
688        let frame = parse_ws_frame(json).expect("must parse");
689        assert!(matches!(frame, WsFrame::Unknown { .. }));
690    }
691
692    /// Oracle: parse_ws_frame returns Err(Parse) for invalid JSON.
693    /// Derived from parse_invalid_json_returns_parse_error test in source ws/mod.rs.
694    #[test]
695    fn parse_invalid_json_returns_parse_error() {
696        let err = parse_ws_frame("not json").expect_err("must fail");
697        assert!(matches!(err, crate::error::ClientError::Parse(_)));
698    }
699
700    /// Oracle: RFC 8887 §4.3.2 — every JMAP request sent over WebSocket MUST
701    /// include "@type": "Request".  Tests WsRequestFrame serde directly to
702    /// verify the #[serde(rename = "@type")] attribute and flatten are correct.
703    #[test]
704    fn send_request_includes_at_type_request() {
705        let req = jmap_types::JmapRequest::new(
706            vec!["urn:ietf:params:jmap:core".to_owned()],
707            vec![],
708            None,
709        );
710        let frame = WsRequestFrame {
711            ws_type: "Request",
712            id: None,
713            inner: &req,
714        };
715        let serialized = serde_json::to_string(&frame).expect("WsRequestFrame must serialize");
716        assert!(
717            serialized.contains("\"@type\":\"Request\""),
718            "RFC 8887 §4.3.2 requires @type:Request in outgoing WS frames; got: {serialized}"
719        );
720    }
721
722    /// Oracle: RFC 8887 §4.3.2 — optional `id` field is echoed in the response.
723    /// When an id is supplied, WsRequestFrame must include it in the serialized frame.
724    #[test]
725    fn send_request_includes_id_when_provided() {
726        let req = jmap_types::JmapRequest::new(
727            vec!["urn:ietf:params:jmap:core".to_owned()],
728            vec![],
729            None,
730        );
731        let frame = WsRequestFrame {
732            ws_type: "Request",
733            id: Some("req-42"),
734            inner: &req,
735        };
736        let serialized = serde_json::to_string(&frame).expect("WsRequestFrame must serialize");
737        assert!(
738            serialized.contains("\"id\":\"req-42\""),
739            "RFC 8887 §4.3.2 optional id must be present when provided; got: {serialized}"
740        );
741    }
742
743    /// Oracle: RFC 8887 §4.3.2 — when id is None, no `id` field appears in the frame.
744    /// WsRequestFrame uses skip_serializing_if to omit the field entirely.
745    #[test]
746    fn send_request_omits_id_when_none() {
747        let req = jmap_types::JmapRequest::new(
748            vec!["urn:ietf:params:jmap:core".to_owned()],
749            vec![],
750            None,
751        );
752        let frame = WsRequestFrame {
753            ws_type: "Request",
754            id: None,
755            inner: &req,
756        };
757        let serialized = serde_json::to_string(&frame).expect("WsRequestFrame must serialize");
758        assert!(
759            !serialized.contains("\"id\":"),
760            "RFC 8887 §4.3.2: no id field must appear when id is None; got: {serialized}"
761        );
762    }
763
764    /// Oracle: connect_ws must reject http:// and https:// URLs with InvalidArgument.
765    ///
766    /// This is the documented SSRF prevention guard: a compromised or MITM'd session
767    /// could send an http:// URL; we must not follow it as a WebSocket URL.
768    /// The scheme check runs before any network I/O.
769    /// Derived from connect_ws_rejects_non_ws_schemes test in source ws/mod.rs.
770    #[tokio::test]
771    async fn connect_ws_rejects_non_ws_schemes() {
772        for bad_url in &["http://host/", "https://host/", "ftp://host/"] {
773            let result = connect_ws(bad_url, None).await.map(|_| ());
774            match result {
775                Err(crate::error::ClientError::InvalidArgument(_)) => {}
776                other => panic!("expected InvalidArgument for {bad_url:?}, got {other:?}"),
777            }
778        }
779    }
780
781    // -----------------------------------------------------------------------
782    // classify_message — bd:JMAP-6lsm.6
783    // -----------------------------------------------------------------------
784
785    /// Oracle: Text frames classify as Text. The independent oracle is
786    /// the next_frame contract in the docstring above.
787    #[test]
788    fn classify_text_message() {
789        let m = Message::Text("hi".into());
790        assert_eq!(classify_message(&m), MessageDisposition::Text);
791    }
792
793    /// Oracle: Close frames classify as Close, ending the stream.
794    #[test]
795    fn classify_close_message() {
796        let m = Message::Close(None);
797        assert_eq!(classify_message(&m), MessageDisposition::Close);
798    }
799
800    /// Oracle: Binary frames violate RFC 8887 §4.1 and must classify as
801    /// Binary so the next_frame loop surfaces UnexpectedResponse rather
802    /// than silently skipping (the bug JMAP-6lsm.6 fixes). The independent
803    /// oracle is RFC 8887 §4.1.
804    #[test]
805    fn classify_binary_message_is_not_skipped() {
806        let m = Message::Binary(vec![1, 2, 3].into());
807        assert_eq!(classify_message(&m), MessageDisposition::Binary);
808        assert_ne!(
809            classify_message(&m),
810            MessageDisposition::Skip,
811            "Binary must NOT be silently skipped (RFC 8887 §4.1)"
812        );
813    }
814
815    /// Oracle: Ping/Pong frames classify as Skip. Tungstenite handles
816    /// them at the protocol layer, so seeing them at the Message layer
817    /// is unusual but legal — skip and continue.
818    #[test]
819    fn classify_ping_pong_messages_are_skipped() {
820        let ping = Message::Ping(vec![].into());
821        let pong = Message::Pong(vec![].into());
822        assert_eq!(classify_message(&ping), MessageDisposition::Skip);
823        assert_eq!(classify_message(&pong), MessageDisposition::Skip);
824    }
825
826    /// Tripwire: the consecutive-skip cap is the documented value.
827    /// A future retune will fail this test loudly so the change is
828    /// visible in CI. Documented value is 64 (see the const docstring).
829    #[test]
830    fn consecutive_skip_cap_matches_documented_value() {
831        assert_eq!(MAX_CONSECUTIVE_NON_TEXT_FRAMES, 64);
832    }
833
834    /// Oracle: bd:JMAP-6r7c.5 / workspace AGENTS.md "Security testing"
835    /// pattern 1 — per-type Debug redaction canary.
836    ///
837    /// A future contributor restoring `#[derive(Debug)]` on `WsFrame` would
838    /// re-expose the credential-grade material that can land in
839    /// `Unknown.raw` (push verification codes, federation handshake tokens,
840    /// session-rotation challenges). This test constructs a canary literal
841    /// inside the `raw` field and asserts the literal does not appear in
842    /// the `{:?}`-formatted output of the frame.
843    #[test]
844    fn ws_frame_unknown_raw_is_redacted_in_debug_output() {
845        let canary = "redaction-canary-cred-WFTMr8FoYpfP-do-not-leak";
846        let frame = WsFrame::Unknown {
847            type_name: "PushVerification".to_owned(),
848            raw: serde_json::json!({
849                "verificationCode": canary,
850            }),
851        };
852        let rendered = format!("{frame:?}");
853        assert!(
854            !rendered.contains(canary),
855            "WsFrame::Unknown Debug must redact `raw`; the canary literal \
856             appeared in the rendered output, indicating either \
857             #[derive(Debug)] was restored or the manual Debug impl \
858             forgot to redact the raw field. Rendered output: {rendered}"
859        );
860        assert!(
861            rendered.contains("[REDACTED]"),
862            "WsFrame::Unknown Debug must render the redaction placeholder; \
863             rendered output: {rendered}"
864        );
865        assert!(
866            rendered.contains("PushVerification"),
867            "WsFrame::Unknown Debug must still surface type_name for \
868             diagnostics; rendered output: {rendered}"
869        );
870    }
871
872    /// Oracle: bd:JMAP-6r7c.5 — the Debug impl must still render the
873    /// other two variants usefully. A naive impl that redacted everything
874    /// would also be wrong; the security goal is narrowly scoped to the
875    /// `Unknown.raw` field.
876    #[test]
877    fn ws_frame_other_variants_remain_useful_in_debug_output() {
878        let response_frame = WsFrame::Response(jmap_types::JmapResponse::new(
879            vec![],
880            "test-session".into(),
881            None,
882        ));
883        let rendered = format!("{response_frame:?}");
884        assert!(
885            rendered.starts_with("Response"),
886            "Response variant Debug must surface variant tag; got: {rendered}"
887        );
888        assert!(
889            rendered.contains("test-session"),
890            "Response variant Debug must surface session_state for \
891             diagnostics; got: {rendered}"
892        );
893    }
894
895    /// bd:JMAP-6r7c.31 — WsSender and WsReceiver must each be `Send` so a
896    /// caller can move one half into a separate tokio task while keeping
897    /// the other half in the current task. The whole point of the split
898    /// is two-task concurrent send-while-receiving; if either half were
899    /// `!Send`, the split would not enable it.
900    ///
901    /// `Sync` is required because `WsSender` behind `Arc` (for concurrent
902    /// sends from multiple tasks) needs `Arc<WsSender>: Send`, which
903    /// requires `WsSender: Sync`. Compile-time check only — nothing to
904    /// assert at runtime.
905    #[test]
906    fn ws_sender_and_receiver_are_send_and_sync() {
907        fn assert_send_sync<T: Send + Sync>() {}
908        assert_send_sync::<WsSender>();
909        assert_send_sync::<WsReceiver>();
910        // The unified session was already Send+Sync before the split
911        // landed; assert here as a regression guard so a future refactor
912        // that accidentally introduces a `!Send` or `!Sync` field in
913        // WsSession is caught.
914        assert_send_sync::<WsSession>();
915    }
916}