Skip to main content

jmap_base_client/
client.rs

1//! Auth-agnostic base JMAP HTTP client (RFC 8620).
2//!
3//! Provides [`JmapClient`] for session fetch, API calls, blob transfer,
4//! SSE event streaming, and [`extract_response`] for parsing method results.
5
6use std::sync::Arc;
7
8use futures::StreamExt;
9
10use crate::auth::{AuthProvider, DefaultTransport, TransportConfig};
11use crate::error::ClientError;
12use crate::request::Session;
13use crate::sse::{parse_sse_block, SseFrame};
14
15/// Internal state threaded through the `subscribe_events` unfold loop.
16struct SseStreamState<S> {
17    stream: S,
18    /// Accumulates raw bytes from the HTTP stream before UTF-8 decoding.
19    /// Incomplete multi-byte sequences remain here until the next chunk
20    /// completes them, preventing stream termination when a codepoint is
21    /// split across adjacent chunks.
22    raw_buf: Vec<u8>,
23    buf: String,
24    /// Byte offset from which the next delimiter scan begins.
25    /// Must always be a valid UTF-8 char boundary of `buf`.
26    scan_from: usize,
27}
28
29/// Per-client configuration for timeouts and body size limits.
30///
31/// Use [`ClientConfig::default()`] for production defaults (30s timeout, RFC-safe caps).
32///
33/// This type is `#[non_exhaustive]`: callers outside this crate must use
34/// `..ClientConfig::default()` when constructing it, allowing new fields to
35/// be added in minor versions without breaking callers.
36///
37/// # Field-type rationale (bd:JMAP-6r7c.21)
38///
39/// The fields mix `u64` and `usize` integer types. The split is deliberate
40/// and tracks the underlying transport's expectations:
41///
42/// - **HTTP body caps are `u64`** (`max_session_body`, `max_call_body`,
43///   `max_download_body`, `max_upload_response_body`). `reqwest` reports
44///   `Content-Length` as `u64`, and an HTTP body can in principle exceed
45///   `usize::MAX` on a 32-bit target (4 GiB) without exceeding 64-bit
46///   limits; the cap comparison happens before accumulation.
47/// - **In-memory frame/message caps are `usize`** (`max_sse_frame`,
48///   `max_ws_message`). These bound a `Vec<u8>` or `String` that must
49///   fit in process memory; `usize` is the address-space-native type
50///   and matches what `tokio_tungstenite::tungstenite::protocol::WebSocketConfig`
51///   expects.
52///
53/// A future minor release MAY consolidate on `u64` with internal
54/// `usize::try_from` casts at the call sites that need it. The mixed-int
55/// API is a small ergonomic cost (callers cannot pass the same untyped
56/// integer literal to both kinds of field without a `_u64` or `_usize`
57/// suffix or `as usize` cast) traded for transparency to the underlying
58/// transport's contract.
59///
60/// # No cross-field invariants are enforced
61///
62/// `validate()` checks each field independently. There is intentionally
63/// no constraint that `max_sse_frame <= max_call_body` or similar — they
64/// govern different threats (per-frame buffer cap vs whole-body cap)
65/// and a deployment may rationally configure them in either order.
66/// Document the chosen values in your own deployment notes.
67#[non_exhaustive]
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct ClientConfig {
70    /// Timeout for HTTP request/response cycles (fetch_session, call, upload_blob, download_blob).
71    /// Does NOT apply to SSE or WebSocket streams (which are indefinite by nature).
72    /// Must be > 0. Use `Duration::from_secs(30)` for a 30-second timeout.
73    /// Default: 30 seconds.
74    ///
75    /// `Duration::ZERO` is forbidden because `reqwest::RequestBuilder::timeout`
76    /// treats `Duration::ZERO` as "no per-request timeout" (only the
77    /// client-level `connect_timeout` applies), not "instant fail". A future
78    /// maintainer who thinks "zero means disable timeout, why not allow it?"
79    /// is reading reqwest's semantics correctly but missing that **this
80    /// crate intentionally forbids the no-timeout configuration**: a
81    /// caller-visible 30-second-by-default timeout protects against
82    /// indefinite hangs on stalled servers and against a slowloris-style
83    /// resource leak in JMAP clients that hold one outstanding request per
84    /// account. `validate()` enforces the positive-timeout invariant
85    /// (bd:JMAP-6r7c.29). Do not relax this without re-deriving the
86    /// no-timeout DoS argument.
87    pub request_timeout: std::time::Duration,
88    /// Maximum response body for fetch_session. Default: 1 MiB.
89    pub max_session_body: u64,
90    /// Maximum response body for call(). Default: 8 MiB.
91    pub max_call_body: u64,
92    /// Maximum response body for download_blob(). Default: 64 MiB.
93    pub max_download_body: u64,
94    /// Maximum size in bytes of the JSON response body returned by the
95    /// server in reply to `upload_blob()`. Does NOT cap the size of the
96    /// blob being uploaded — the JMAP server enforces that via its
97    /// `maxSizeUpload` capability. This field caps only the small JSON
98    /// envelope the server returns describing the stored blob.
99    /// Default: 1 MiB.
100    pub max_upload_response_body: u64,
101    /// Maximum byte length of a single SSE frame; applied independently to
102    /// the raw incoming bytes (pre-UTF-8 decode) and the decoded text.
103    /// Protects against memory exhaustion from a hostile or misbehaving
104    /// server that sends a single very large frame. Must be > 0.
105    /// Default: 1 MiB.
106    ///
107    /// **Memory residency note (bd:JMAP-6lsm.7):** because the raw byte
108    /// buffer and the decoded text buffer are tracked separately, the
109    /// in-flight footprint while parsing a single frame can momentarily
110    /// reach ~2 × `max_sse_frame` (raw bytes accumulated up to the limit,
111    /// plus decoded text not yet drained). If you tune this value for a
112    /// tight memory budget, plan for that 2× peak. The independent
113    /// tracking is correct for the streaming UTF-8 decoder (which needs
114    /// the raw buffer to be at least one full frame to handle split
115    /// multi-byte sequences across HTTP chunks); see `decode_utf8_chunk`
116    /// for the rationale.
117    pub max_sse_frame: usize,
118    /// Maximum byte length of a single WebSocket message (and frame). Mirrors
119    /// `max_sse_frame` for the WebSocket transport. Used by
120    /// [`JmapClient::connect_ws_session`] and threaded through to
121    /// `tokio_tungstenite::tungstenite::protocol::WebSocketConfig`'s
122    /// `max_message_size` and `max_frame_size`. Must be > 0.
123    /// Default: 1 MiB. (bd:JMAP-6lsm.5)
124    pub max_ws_message: usize,
125}
126
127impl Default for ClientConfig {
128    fn default() -> Self {
129        ClientConfig {
130            request_timeout: std::time::Duration::from_secs(30),
131            max_session_body: 1024 * 1024,
132            max_call_body: 8 * 1024 * 1024,
133            max_download_body: 64 * 1024 * 1024,
134            max_upload_response_body: 1024 * 1024,
135            max_sse_frame: 1024 * 1024,
136            max_ws_message: 1024 * 1024,
137        }
138    }
139}
140
141impl ClientConfig {
142    /// Validate that all config fields satisfy their constraints.
143    ///
144    /// Called automatically by [`JmapClient::new`].  Callers may also call
145    /// this directly to pre-validate a config before passing it to the
146    /// constructor.
147    ///
148    /// # Errors
149    ///
150    /// Returns [`ClientError::InvalidArgument`] when any field is zero or
151    /// out-of-range.
152    pub fn validate(&self) -> Result<(), ClientError> {
153        if self.max_session_body == 0 {
154            return Err(ClientError::InvalidArgument(
155                "ClientConfig.max_session_body must be > 0".into(),
156            ));
157        }
158        if self.max_call_body == 0 {
159            return Err(ClientError::InvalidArgument(
160                "ClientConfig.max_call_body must be > 0".into(),
161            ));
162        }
163        if self.max_download_body == 0 {
164            return Err(ClientError::InvalidArgument(
165                "ClientConfig.max_download_body must be > 0".into(),
166            ));
167        }
168        if self.max_upload_response_body == 0 {
169            return Err(ClientError::InvalidArgument(
170                "ClientConfig.max_upload_response_body must be > 0".into(),
171            ));
172        }
173        if self.request_timeout == std::time::Duration::ZERO {
174            // Duration::ZERO would let reqwest run requests without a
175            // per-request timeout (reqwest treats ZERO as "no timeout").
176            // Reject explicitly — see ClientConfig::request_timeout
177            // rustdoc for the DoS-resistance rationale (bd:JMAP-6r7c.29).
178            return Err(ClientError::InvalidArgument(
179                "ClientConfig.request_timeout must be > 0; Duration::ZERO would disable the per-request timeout in reqwest, not 'fail immediately'. Use Duration::from_secs(30) or similar."
180                    .into(),
181            ));
182        }
183        if self.max_sse_frame == 0 {
184            return Err(ClientError::InvalidArgument(
185                "ClientConfig.max_sse_frame must be > 0".into(),
186            ));
187        }
188        if self.max_ws_message == 0 {
189            return Err(ClientError::InvalidArgument(
190                "ClientConfig.max_ws_message must be > 0".into(),
191            ));
192        }
193        Ok(())
194    }
195}
196
197/// Auth-agnostic JMAP base HTTP client.
198///
199/// Construct with [`JmapClient::new`] or [`JmapClient::new_plain`].
200/// Extension-specific clients (`jmap-chat-client`, `jmap-mail-client`) depend
201/// on this crate and add their method implementations via `impl JmapClient`.
202///
203/// # Thread-safety (bd:JMAP-6r7c.25)
204///
205/// `JmapClient` is `Send + Sync + Clone`. Share by clone across threads or
206/// `tokio::spawn` tasks; the underlying `reqwest::Client` is reference-counted
207/// (Arc-backed) and the [`AuthProvider`] trait requires `Send + Sync` on every
208/// implementation. A compile-time assertion in this crate's test suite pins
209/// the `Send + Sync` contract: a future refactor that adds a non-`Sync` field
210/// (e.g. `Rc<T>`, `RefCell<T>`, `Cell<T>`) would break the assertion in CI
211/// before any downstream consumer is exposed.
212#[derive(Clone)]
213pub struct JmapClient {
214    pub(crate) base_url: url::Url,
215    pub(crate) auth: Arc<dyn AuthProvider>,
216    pub(crate) http: reqwest::Client,
217    pub(crate) config: ClientConfig,
218}
219
220impl std::fmt::Debug for JmapClient {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        f.debug_struct("JmapClient")
223            .field("base_url", &self.base_url)
224            .field("config", &self.config)
225            .finish_non_exhaustive()
226    }
227}
228
229impl JmapClient {
230    /// Create a new client.
231    ///
232    /// `transport` configures the underlying HTTP client (TLS trust roots,
233    /// client certificates, timeouts). `auth` injects per-request credentials
234    /// (Bearer token, Basic credentials, or none). The two are independent so
235    /// any transport can be paired with any credential scheme — for example,
236    /// `CustomCaTransport` with `BearerAuth`. `base_url` must be the server
237    /// origin (scheme, host, optional port) with no path, query, or fragment
238    /// — e.g. `"https://100.64.1.1:8008"`. Trailing slashes are normalized
239    /// away by the URL parser and are therefore accepted.
240    pub fn new(
241        transport: impl TransportConfig,
242        auth: impl AuthProvider + 'static,
243        base_url: &str,
244        config: ClientConfig,
245    ) -> Result<Self, ClientError> {
246        // Parse-don't-validate: parse_base_url returns a fully-validated
247        // url::Url so the constructor body can read as four facts (parse,
248        // validate config, build transport, construct Self) rather than
249        // a procedure (bd:JMAP-6lsm.26).
250        let parsed = parse_base_url(base_url)?;
251        config.validate()?;
252        // Unwrap the opaque HttpClient at construction time (bd:JMAP-6r7c.36).
253        // The wrapper exists to keep reqwest::Client out of the
254        // TransportConfig trait signature; once the client is owned by
255        // JmapClient, the inner reqwest::Client is the working type for
256        // internal request building. into_inner is pub(crate) so external
257        // code cannot reach inside the opaque wrapper.
258        let http = transport.build_client()?.into_inner();
259        Ok(Self {
260            base_url: parsed,
261            auth: Arc::new(auth),
262            http,
263            config,
264        })
265    }
266
267    /// Convenience constructor for servers with publicly-trusted TLS.
268    ///
269    /// Equivalent to `JmapClient::new(DefaultTransport, auth, base_url, config)`.
270    /// Use [`JmapClient::new`] when you need a custom transport (e.g.
271    /// `CustomCaTransport` for a private-CA server).
272    pub fn new_plain(
273        auth: impl AuthProvider + 'static,
274        base_url: &str,
275        config: ClientConfig,
276    ) -> Result<Self, ClientError> {
277        Self::new(DefaultTransport, auth, base_url, config)
278    }
279
280    /// Create a new client sharing an existing `Arc<dyn AuthProvider>`
281    /// (bd:JMAP-6r7c.27).
282    ///
283    /// `JmapClient::new` and `new_plain` take `auth` by value and wrap it
284    /// in a fresh `Arc` internally. That is the ergonomic case for a
285    /// caller constructing one client with one auth provider. It is the
286    /// wrong shape for a caller who:
287    ///
288    /// - Operates multiple `JmapClient` instances against different
289    ///   shards or accounts but uses the **same** credential holder
290    ///   (e.g. a shared OAuth token-refresh state machine, a shared
291    ///   service-account principal).
292    /// - Wants a credential refresh in one client to be visible to all
293    ///   sibling clients without rebuilding each one.
294    ///
295    /// This constructor takes a pre-built `Arc<dyn AuthProvider>` so
296    /// callers can clone the Arc and pass clones to multiple
297    /// `JmapClient::new_with_shared_auth` calls. The auth provider is
298    /// then shared by reference-count, and any interior-mutable state
299    /// (e.g. an `RwLock<TokenState>` inside a custom `OAuthAuth`
300    /// implementation that holds a refreshable bearer) is genuinely
301    /// shared across all sibling clients.
302    ///
303    /// Arguments mirror [`JmapClient::new`] otherwise.
304    ///
305    /// ```rust,ignore
306    /// use std::sync::Arc;
307    /// use jmap_base_client::{auth::{AuthProvider, BearerAuth, DefaultTransport}, client::{JmapClient, ClientConfig}};
308    ///
309    /// let auth: Arc<dyn AuthProvider> = Arc::new(BearerAuth::new("token")?);
310    /// let shard_a = JmapClient::new_with_shared_auth(
311    ///     DefaultTransport,
312    ///     auth.clone(),
313    ///     "https://a.example.com",
314    ///     ClientConfig::default(),
315    /// )?;
316    /// let shard_b = JmapClient::new_with_shared_auth(
317    ///     DefaultTransport,
318    ///     auth,
319    ///     "https://b.example.com",
320    ///     ClientConfig::default(),
321    /// )?;
322    /// ```
323    pub fn new_with_shared_auth(
324        transport: impl TransportConfig,
325        auth: Arc<dyn AuthProvider>,
326        base_url: &str,
327        config: ClientConfig,
328    ) -> Result<Self, ClientError> {
329        let parsed = parse_base_url(base_url)?;
330        config.validate()?;
331        // Unwrap the opaque HttpClient at construction time (bd:JMAP-6r7c.36).
332        let http = transport.build_client()?.into_inner();
333        Ok(Self {
334            base_url: parsed,
335            auth,
336            http,
337            config,
338        })
339    }
340
341    /// Apply the auth header (if any) to a request builder.
342    ///
343    /// Centralises the repeated `if let Some(...) = self.auth.auth_header()` pattern
344    /// that every HTTP method uses. Callers: `fetch_session`, `call`,
345    /// `subscribe_events`, `upload_blob`, `download_blob`.
346    pub(crate) fn inject_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
347        if let Some(header) = self.auth.auth_header() {
348            builder.header(header.name(), header.expose_value())
349        } else {
350            builder
351        }
352    }
353
354    /// Returns `Err(ClientError::AuthFailed)` when the HTTP status indicates an
355    /// authentication or authorization failure.
356    ///
357    /// Specifically handles:
358    /// - 401 Unauthorized (RFC 7235 §3.1) — missing or invalid credentials
359    /// - 403 Forbidden (RFC 7235 §3.2) — credentials present but insufficient
360    ///
361    /// Called before reading the response body so callers can distinguish
362    /// permanent auth failures from transient errors without consuming the body.
363    pub(crate) fn check_auth_status(status: reqwest::StatusCode) -> Result<(), ClientError> {
364        if status == reqwest::StatusCode::UNAUTHORIZED || status == reqwest::StatusCode::FORBIDDEN {
365            Err(ClientError::AuthFailed(status.as_u16()))
366        } else {
367            Ok(())
368        }
369    }
370}
371
372/// Read a response body into memory, capped at `limit` bytes.
373///
374/// Performs the Content-Length pre-check as a fast-path rejection for honest
375/// oversized responses, then streams the body chunk-by-chunk and enforces the
376/// cap before each accumulation. This prevents buffering a response that
377/// exceeds the limit when Content-Length is absent (chunked Transfer-Encoding)
378/// or under-reported by a hostile server — without per-chunk streaming, a
379/// `.bytes().await` call would buffer the entire response before the
380/// post-buffer length check could fire (bd:JMAP-6r7c.1).
381///
382/// Threat model: matters when connecting to untrusted servers (federation
383/// peers, third-party JMAP services, DNS-hijacked endpoints, MITM scenarios).
384/// On low-memory devices the peak allocation from naive buffering can OOM-kill
385/// the client even if the post-buffer check would otherwise reject.
386///
387/// Returns `ClientError::ResponseTooLarge` when either the advertised
388/// Content-Length or the accumulated chunk total exceeds `limit`.
389pub(crate) async fn read_capped_body(
390    resp: reqwest::Response,
391    limit: u64,
392) -> Result<Vec<u8>, ClientError> {
393    if let Some(len) = resp.content_length() {
394        if len > limit {
395            return Err(ClientError::ResponseTooLarge { actual: len, limit });
396        }
397    }
398
399    let mut stream = resp.bytes_stream();
400    let mut body: Vec<u8> = Vec::new();
401    while let Some(chunk) = stream.next().await {
402        let chunk = chunk.map_err(ClientError::from_reqwest)?;
403        let new_len = body.len() as u64 + chunk.len() as u64;
404        if new_len > limit {
405            return Err(ClientError::ResponseTooLarge {
406                actual: new_len,
407                limit,
408            });
409        }
410        body.extend_from_slice(&chunk);
411    }
412    Ok(body)
413}
414
415impl JmapClient {
416    /// Fetch the JMAP Session object from `{base_url}/.well-known/jmap` (RFC 8620 §2).
417    ///
418    /// The response body is capped at 1 MiB. Returns `ClientError::ResponseTooLarge`
419    /// if the server sends more. Session URL fields (`apiUrl`, `uploadUrl`,
420    /// `downloadUrl`, `eventSourceUrl`) are validated to have http/https scheme;
421    /// a non-http scheme returns `ClientError::InvalidSession`.
422    ///
423    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
424    ///
425    /// # Charset
426    ///
427    /// The response body MUST be UTF-8-encoded JSON (RFC 8259 §8.1). A server
428    /// that sends UTF-16 or UTF-32 JSON — even with a matching
429    /// `charset=utf-16` `Content-Type` parameter — will fail to parse as
430    /// `ClientError::Parse`; the error does not specifically call out the
431    /// charset mismatch. Every shipped JMAP server uses UTF-8, but a
432    /// non-conformant server can produce a confusing parse error
433    /// (bd:JMAP-6r7c.28).
434    pub async fn fetch_session(&self) -> Result<Session, ClientError> {
435        let limit = self.config.max_session_body;
436        let url = self.base_url.join(".well-known/jmap").map_err(|e| {
437            // base_url was pre-validated in JmapClient::new; this path is
438            // unreachable in practice but must be handled for completeness.
439            ClientError::InvalidSession(format!("cannot construct session URL: {e}"))
440        })?;
441
442        let req = self.inject_auth(self.http.get(url).timeout(self.config.request_timeout));
443
444        let resp = {
445            let raw_resp = req.send().await.map_err(ClientError::from_reqwest)?;
446            Self::check_auth_status(raw_resp.status())?;
447            raw_resp
448                .error_for_status()
449                .map_err(ClientError::from_reqwest)?
450        };
451
452        // Stream the body chunk-by-chunk with the cap enforced before each
453        // accumulation. The Content-Length pre-check inside read_capped_body
454        // is a fast-path rejection for honest oversized responses; the
455        // streaming cap is the actual DOS guard against a hostile server
456        // that under-reports Content-Length or omits it entirely under
457        // chunked Transfer-Encoding (bd:JMAP-6r7c.1).
458        let body = read_capped_body(resp, limit).await?;
459
460        let session: Session = serde_json::from_slice(&body).map_err(ClientError::from_parse)?;
461
462        validate_session_url_schemes(&session)?;
463
464        Ok(session)
465    }
466
467    /// POST a [`jmap_types::JmapRequest`] to `api_url` and return the parsed [`jmap_types::JmapResponse`]
468    /// (RFC 8620 §3.3).
469    ///
470    /// `api_url` is taken as an explicit parameter (not from `self`) because the
471    /// caller holds a [`Session`] and selects the correct URL from it.
472    ///
473    /// The response body is capped at 8 MiB. Returns `ClientError::ResponseTooLarge`
474    /// if the server sends more.
475    ///
476    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
477    ///
478    /// # Charset
479    ///
480    /// The response body MUST be UTF-8-encoded JSON (RFC 8259 §8.1). A server
481    /// that sends UTF-16 or UTF-32 JSON — even with a matching
482    /// `charset=utf-16` `Content-Type` parameter — will fail to parse as
483    /// `ClientError::Parse`; the error does not specifically call out the
484    /// charset mismatch. Every shipped JMAP server uses UTF-8, but a
485    /// non-conformant server can produce a confusing parse error
486    /// (bd:JMAP-6r7c.28).
487    ///
488    /// # See also
489    ///
490    /// Prefer [`JmapClient::call_session`] when you have a [`Session`] —
491    /// it picks the correct URL field automatically and prevents the
492    /// "I passed `session.upload_url` instead of `session.api_url`"
493    /// confusion (bd:JMAP-6r7c.39).
494    pub async fn call(
495        &self,
496        api_url: &str,
497        req: &jmap_types::JmapRequest,
498    ) -> Result<jmap_types::JmapResponse, ClientError> {
499        require_http_url(api_url)?;
500        let limit = self.config.max_call_body;
501
502        let builder = self.inject_auth(
503            self.http
504                .post(api_url)
505                .json(req)
506                .timeout(self.config.request_timeout),
507        );
508
509        let resp = {
510            let raw_resp = builder.send().await.map_err(ClientError::from_reqwest)?;
511            Self::check_auth_status(raw_resp.status())?;
512            raw_resp
513                .error_for_status()
514                .map_err(ClientError::from_reqwest)?
515        };
516
517        // Stream the body chunk-by-chunk with the cap enforced before each
518        // accumulation (bd:JMAP-6r7c.1). See `read_capped_body` for the DOS
519        // rationale; without per-chunk streaming, a server that under-reports
520        // or omits Content-Length can force unbounded allocation here.
521        let body = read_capped_body(resp, limit).await?;
522
523        let jmap_resp: jmap_types::JmapResponse =
524            serde_json::from_slice(&body).map_err(ClientError::from_parse)?;
525
526        Ok(jmap_resp)
527    }
528
529    /// POST a [`jmap_types::JmapRequest`] to the `api_url` field of `session`
530    /// and return the parsed response (bd:JMAP-6r7c.39).
531    ///
532    /// Type-safe alternative to [`JmapClient::call`]: takes a [`Session`]
533    /// reference and reads `session.api_url` internally. The
534    /// "I passed `session.upload_url` instead of `session.api_url`"
535    /// confusion is impossible at the call site because the caller does
536    /// not select a URL — only `Session::api_url` is used.
537    ///
538    /// Same body cap, auth, and error semantics as [`JmapClient::call`].
539    pub async fn call_session(
540        &self,
541        session: &Session,
542        req: &jmap_types::JmapRequest,
543    ) -> Result<jmap_types::JmapResponse, ClientError> {
544        self.call(session.api_url.as_str(), req).await
545    }
546
547    /// Open an SSE connection to `event_source_url` and return an async stream
548    /// of parsed [`SseFrame`]s (RFC 8620 §7.3).
549    ///
550    /// # URI template expansion
551    ///
552    /// `Session.event_source_url` is a URI template (RFC 6570 Level-1) with
553    /// variables `types`, `closeafter`, and `ping`. You **must** expand it
554    /// before passing it to this function, or the server will receive the
555    /// literal text `{types}` in the URL and return an error. Use
556    /// [`expand_url_template`](crate::expand_url_template):
557    ///
558    /// ```rust,ignore
559    /// let url = jmap_base_client::expand_url_template(
560    ///     &session.event_source_url,
561    ///     &[("types", "*"), ("closeafter", "no"), ("ping", "0")],
562    /// )?;
563    /// let stream = client.subscribe_events(&url, None).await?;
564    /// ```
565    ///
566    /// If `last_event_id` is `Some`, sends a `Last-Event-ID` header so the
567    /// server can resume from where the previous stream left off.
568    ///
569    /// Buffer growth is capped at [`ClientConfig::max_sse_frame`] bytes per
570    /// frame (default: 1 MiB). If a single SSE frame exceeds this limit the
571    /// stream yields `ClientError::SseFrameTooLarge` and terminates.
572    ///
573    /// No timeout is applied to this call or to the resulting stream.  The
574    /// connect timeout (10 s, TCP only) is the only deadline enforced.  If the
575    /// server stalls before sending HTTP response headers, or later goes silent
576    /// on the open connection, this call or the stream will hang indefinitely.
577    /// Wrap the entire call and/or stream iteration in [`tokio::time::timeout`]
578    /// if you need to bound either phase.
579    ///
580    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403 before the stream
581    /// starts.
582    ///
583    /// # Stream drop and cancellation (bd:JMAP-6r7c.24)
584    ///
585    /// The returned `BoxStream` may be dropped at any point — mid-frame,
586    /// while awaiting `StreamExt::next`, or from inside a `tokio::select!`
587    /// losing-branch cancellation. Dropping is always safe and always
588    /// synchronous:
589    ///
590    /// - **Partial frame bytes are discarded.** Any bytes accumulated in
591    ///   the internal `raw_buf` or `buf` that have not yet been parsed
592    ///   into an [`SseFrame`] are lost. There is no buffering or replay
593    ///   inside the client — the server is the source of truth for what
594    ///   was emitted vs what was acknowledged.
595    /// - **The underlying HTTP connection is released.** The
596    ///   `reqwest::Response::bytes_stream` held inside the stream is
597    ///   dropped along with the stream; reqwest returns the connection
598    ///   to its pool (or closes it) per its own pool policy.
599    /// - **Resumption is the caller's job.** If you want to resume from
600    ///   the last successfully-parsed frame, capture the most recent
601    ///   `SseFrame::id` (if the server sets one) and pass it as
602    ///   `last_event_id` on the next `subscribe_events` call. The
603    ///   server will replay events from that point per RFC 8895 §9.
604    ///
605    /// `tokio::select!` cancellation is the canonical use case: a caller
606    /// racing the SSE stream against a shutdown signal can drop the
607    /// stream by selecting the shutdown branch without leaking the HTTP
608    /// connection or memory.
609    pub async fn subscribe_events(
610        &self,
611        event_source_url: &str,
612        last_event_id: Option<&str>,
613    ) -> Result<futures::stream::BoxStream<'static, Result<SseFrame, ClientError>>, ClientError>
614    {
615        require_http_url(event_source_url)?;
616        let mut req = self
617            .http
618            .get(event_source_url)
619            .header("Accept", "text/event-stream");
620        if let Some(id) = last_event_id {
621            req = req.header("Last-Event-ID", id);
622        }
623        let req = self.inject_auth(req);
624
625        let resp = req.send().await.map_err(ClientError::from_reqwest)?;
626        Self::check_auth_status(resp.status())?;
627        let resp = resp.error_for_status().map_err(ClientError::from_reqwest)?;
628
629        // Verify Content-Type before streaming. A misconfigured server returning
630        // application/json would silently produce no events (no SSE delimiter found).
631        {
632            // to_ascii_lowercase(): media types are case-insensitive per RFC 7231 §3.1.1.1.
633            let ct = resp
634                .headers()
635                .get(reqwest::header::CONTENT_TYPE)
636                .and_then(|v| v.to_str().ok())
637                .unwrap_or("")
638                .to_ascii_lowercase();
639            // RFC 7231 §3.1.1.1 / RFC 9110 §8.3: the media-type "essence" (type
640            // + "/" + subtype) is bounded by ';', SP, HTAB, or end-of-string.
641            // A naive starts_with("text/event-stream") would accept
642            // "text/event-streamish" or "text/event-stream2" — exactly the bug
643            // JMAP-6lsm.2 flagged. Split off the parameter list / trailing
644            // whitespace and compare the essence exactly.
645            let essence = ct
646                .split(|c: char| c == ';' || c.is_whitespace())
647                .next()
648                .unwrap_or("");
649            if essence != "text/event-stream" {
650                return Err(ClientError::UnexpectedResponse(format!(
651                    "subscribe_events: expected Content-Type text/event-stream, got: {ct:?}"
652                )));
653            }
654        }
655
656        let byte_stream = resp.bytes_stream();
657        let sse_frame_limit = self.config.max_sse_frame;
658
659        Ok(futures::stream::unfold(
660            Some(SseStreamState {
661                stream: byte_stream,
662                raw_buf: Vec::new(),
663                buf: String::new(),
664                scan_from: 0, // invariant: valid UTF-8 char boundary of buf; 0 always satisfies this
665            }),
666            move |state| async move {
667                let SseStreamState {
668                    mut stream,
669                    mut raw_buf,
670                    mut buf,
671                    mut scan_from,
672                } = state?;
673                loop {
674                    // Search for any double-newline delimiter (LF/CRLF/CR variants).
675                    // scan_from is set to old_len.saturating_sub(3) after each append
676                    // so we only re-scan the overlap region.  3 bytes back is the
677                    // minimum that covers all delimiter prefixes that can straddle a
678                    // chunk boundary:
679                    //   - `\r\n\r\n` (4 bytes): longest prefix that fits in one chunk
680                    //     but is incomplete is `\r\n\r` (3 bytes) — exactly covered.
681                    //   - `\n\r\n` (3 bytes): longest incomplete prefix is `\n\r` (2
682                    //     bytes) — covered by the 3-byte overlap.
683                    //   - `\n\n` and `\r\r` (2 bytes each): longest incomplete prefix
684                    //     is 1 byte — covered by the 3-byte overlap.
685                    // Since \r and \n are single-byte UTF-8 codepoints, 3 bytes back
686                    // is always a valid char boundary — no adjustment needed.
687                    //
688                    // Do not simplify to "rescan the whole buffer" (bd:JMAP-6r7c.19).
689                    // The four-delimiter min_by_key + 3-byte overlap pattern is
690                    // load-bearing for four independent reasons:
691                    //
692                    //   1. Overlap-rescan is O(chunk_size) per chunk; whole-buffer
693                    //      rescan is O(n) per chunk, i.e. O(n²) over the lifetime
694                    //      of a long-lived SSE stream. JMAP push streams can run
695                    //      for hours with thousands of small events; the
696                    //      difference is measurable.
697                    //   2. All four delimiter forms are necessary. The WHATWG
698                    //      EventSource spec + RFC 8895 line-terminator rules let
699                    //      a single server emit mixed line endings within one
700                    //      frame. min_by_key over all four catches the mixed
701                    //      `\n\r\n` case that neither `\n\n` nor `\r\n\r\n`
702                    //      alone catches. Removing forms produces silent
703                    //      frame-misalignment on those servers.
704                    //   3. The walk-back to a UTF-8 char boundary on the
705                    //      `scan_from = old_len.saturating_sub(3)` assignment
706                    //      below is needed because the byte preceding scan_from
707                    //      might be in the middle of a multi-byte codepoint;
708                    //      `buf[scan_from..]` would panic on the next `.find()`
709                    //      without it.
710                    //   4. Tests in tests/client_tests.rs pin all three line-
711                    //      ending variants against hand-crafted server bodies:
712                    //      `test_subscribe_events_crlf_line_endings`,
713                    //      `test_subscribe_events_lf_crlf_frame_delimiter`,
714                    //      and `test_subscribe_events_cr_line_endings`. The
715                    //      mixed `\n\r\n` case (the second test) is the
716                    //      trickiest and is the canonical example of why all
717                    //      four delimiter searches are present.
718                    //
719                    // Resist the "just rescan the whole buffer" refactor.
720                    let frame_end = [
721                        buf[scan_from..]
722                            .find("\r\n\r\n")
723                            .map(|p| (scan_from + p, 4usize)),
724                        buf[scan_from..]
725                            .find("\n\n")
726                            .map(|p| (scan_from + p, 2usize)),
727                        buf[scan_from..]
728                            .find("\r\r")
729                            .map(|p| (scan_from + p, 2usize)),
730                        // Mixed: LF-terminated last field line followed by a
731                        // CRLF-terminated blank line.  Not detected by \n\n (the
732                        // LFs are separated by a CR) or \r\n\r\n (no leading \r\n).
733                        buf[scan_from..]
734                            .find("\n\r\n")
735                            .map(|p| (scan_from + p, 3usize)),
736                    ]
737                    .into_iter()
738                    .flatten()
739                    .min_by_key(|&(pos, _)| pos);
740
741                    if let Some((pos, delim_len)) = frame_end {
742                        let frame = {
743                            let slice = &buf[..pos];
744                            if slice.contains('\r') {
745                                slice.replace("\r\n", "\n").replace('\r', "\n")
746                            } else {
747                                slice.to_owned()
748                            }
749                        };
750                        buf.drain(..pos + delim_len);
751                        scan_from = 0; // 0 satisfies the UTF-8 char boundary invariant
752                        let sse_frame = parse_sse_block(&frame);
753                        return Some((
754                            Ok(sse_frame),
755                            Some(SseStreamState {
756                                stream,
757                                raw_buf,
758                                buf,
759                                scan_from,
760                            }),
761                        ));
762                    }
763
764                    match stream.next().await {
765                        None => return None,
766                        Some(Err(e)) => {
767                            return Some((Err(ClientError::from_reqwest(e)), None));
768                        }
769                        Some(Ok(bytes)) => {
770                            // Accumulate raw bytes first. A multi-byte UTF-8 codepoint
771                            // may be split across adjacent HTTP chunks; decode only the
772                            // valid prefix and leave the remainder in raw_buf until the
773                            // next chunk completes the sequence.
774                            raw_buf.extend_from_slice(&bytes);
775                            // Cap raw_buf to prevent OOM on persistent invalid UTF-8 input.
776                            // Use the same limit as the decoded buf cap.
777                            if raw_buf.len() > sse_frame_limit {
778                                return Some((
779                                    Err(ClientError::SseFrameTooLarge {
780                                        limit: sse_frame_limit,
781                                    }),
782                                    None,
783                                ));
784                            }
785                            let old_len = buf.len();
786                            decode_utf8_chunk(&mut raw_buf, &mut buf);
787                            scan_from = old_len.saturating_sub(3);
788                            // Walk backward to a valid UTF-8 char boundary so that
789                            // buf[scan_from..] never panics on multibyte characters.
790                            while scan_from > 0 && !buf.is_char_boundary(scan_from) {
791                                scan_from -= 1;
792                            }
793                            // Guard against unbounded buffer growth from a hostile server.
794                            // Yield the error and terminate (state = None).
795                            if buf.len() > sse_frame_limit {
796                                return Some((
797                                    Err(ClientError::SseFrameTooLarge {
798                                        limit: sse_frame_limit,
799                                    }),
800                                    None,
801                                ));
802                            }
803                        }
804                    }
805                }
806            },
807        )
808        .boxed())
809    }
810
811    /// Open a WebSocket connection to `ws_url` using this client's
812    /// configured `max_ws_message` byte cap.
813    ///
814    /// Convenience wrapper around [`crate::ws::connect_ws_with_limit`] that
815    /// passes [`ClientConfig::max_ws_message`] as the per-message /
816    /// per-frame byte cap. Mirrors the [`Self::subscribe_events`]
817    /// pattern of "the JmapClient method uses ClientConfig; the free
818    /// function takes an explicit value".
819    ///
820    /// `ws_url` must come from the session document's WebSocket capability
821    /// URL. `auth_header` is an optional `(name, value)` pair for the
822    /// upgrade request; the auth provider on this client is NOT used here
823    /// because some servers attach WebSocket auth via cookie or session
824    /// header rather than the same scheme as HTTP requests.
825    ///
826    /// # Security
827    ///
828    /// The `auth_header` value is a credential and must not be logged or
829    /// echoed back to other systems. Treat it with the same care as a
830    /// [`crate::auth::BearerAuth`] token. Transport errors raised by this
831    /// method are constructed without the original credential bytes, but
832    /// downstream code that inspects [`ClientError`] should still avoid
833    /// printing or storing the `auth_header` itself.
834    ///
835    /// Returns [`ClientError::InvalidArgument`] for non-`ws://`/`wss://` URLs.
836    /// See [`crate::ws::connect_ws_with_limit`] for full error semantics.
837    pub async fn connect_ws_session(
838        &self,
839        ws_url: &str,
840        auth_header: Option<crate::auth::AuthHeader<'_>>,
841    ) -> Result<crate::ws::WsSession, ClientError> {
842        crate::ws::connect_ws_with_limit(ws_url, auth_header, self.config.max_ws_message).await
843    }
844
845    /// Open an SSE connection via a [`Session`]-supplied
846    /// URL template (bd:JMAP-6r7c.64).
847    ///
848    /// Type-safe convenience wrapper over [`Self::subscribe_events`] —
849    /// expands `session.event_source_url` internally using the
850    /// caller-supplied [`SubscribeEventsSessionParams`] template
851    /// variables. The caller cannot accidentally pass `session.api_url`
852    /// or any other URL field because no URL is exposed at the call
853    /// site.
854    ///
855    /// Template variables that are `None` expand to an empty string,
856    /// per the RFC 8620 §7.3 default-omission semantics. Most JMAP
857    /// servers accept `types=` (subscribe to all types),
858    /// `closeafter=` (stay-open), and `ping=` (no server pings) as
859    /// defaults; if your server requires explicit values, supply them
860    /// via the params struct.
861    pub async fn subscribe_events_session(
862        &self,
863        session: &Session,
864        params: SubscribeEventsSessionParams<'_>,
865    ) -> Result<futures::stream::BoxStream<'static, Result<SseFrame, ClientError>>, ClientError>
866    {
867        let SubscribeEventsSessionParams {
868            types,
869            close_after,
870            ping,
871            last_event_id,
872        } = params;
873
874        // RFC 8620 §7.3 event_source_url variables: types, closeafter, ping.
875        // ping is u32; format to a String only when needed so the empty-default
876        // path stays allocation-free.
877        let ping_owned = ping.map(|p| p.to_string());
878        let vars = [
879            ("types", types.unwrap_or("")),
880            ("closeafter", close_after.unwrap_or("")),
881            ("ping", ping_owned.as_deref().unwrap_or("")),
882        ];
883        let expanded = crate::blob::expand_url_template(session.event_source_url.as_str(), &vars)?;
884        self.subscribe_events(&expanded, last_event_id).await
885    }
886}
887
888/// Parameters for [`JmapClient::subscribe_events_session`]
889/// (bd:JMAP-6r7c.64).
890///
891/// Carries the RFC 8620 §7.3 event_source_url template variables
892/// (`types`, `closeafter`, `ping`) plus the optional Last-Event-ID
893/// header (RFC 8895 §9). `None` template values expand to an empty
894/// string per the RFC's default-omission semantics.
895///
896/// Construct with a struct literal:
897///
898/// ```rust,ignore
899/// client.subscribe_events_session(&session, SubscribeEventsSessionParams {
900///     types: Some("Email,Mailbox"),
901///     close_after: Some("state"),
902///     ping: Some(60),
903///     last_event_id: Some("evt-1234"),
904/// }).await?;
905/// ```
906#[derive(Debug, Clone, Copy, Default)]
907pub struct SubscribeEventsSessionParams<'a> {
908    /// `types` template variable: comma-separated list of JMAP data-type
909    /// names to subscribe to (`"Email,Mailbox"`), or `"*"` for all types
910    /// the server supports. `None` expands to an empty string.
911    pub types: Option<&'a str>,
912    /// `closeafter` template variable: `"state"` to close after the
913    /// first state-change push, `"no"` for stay-open. `None` expands to
914    /// an empty string.
915    pub close_after: Option<&'a str>,
916    /// `ping` template variable: server-ping interval in seconds, or
917    /// `0` to disable. `None` expands to an empty string.
918    pub ping: Option<u32>,
919    /// Optional `Last-Event-ID` header for resumption (RFC 8895 §9).
920    pub last_event_id: Option<&'a str>,
921}
922
923/// Find the method response matching `call_id` in `resp` and deserialize its
924/// arguments into `T`.
925///
926/// Returns [`ClientError::MethodNotFound`] if no invocation with the given
927/// call_id exists. Returns [`ClientError::MethodError`] if any invocation
928/// with the matching call_id is a JMAP `"error"` response (RFC 8620 §3.6.1).
929///
930/// # Multiple invocations sharing a call_id
931///
932/// Per RFC 8620 §3.2, a single method call may produce multiple invocations
933/// in the response — for example, `Foo/copy` with `onSuccessDestroyOriginal:
934/// true` produces both a `Foo/copy` and an implicit `Foo/set` invocation,
935/// both stamped with the same call_id (RFC 8620 §5.8 example, lines 3158–
936/// 3180). This function handles that case by:
937///
938/// 1. **Errors take precedence.** If any invocation matching `call_id`
939///    has method name `"error"`, this function returns that error. A
940///    success response cannot mask a sibling error response with the same
941///    call_id — silently returning the success while the server reported
942///    failure would be data loss for the caller.
943/// 2. Otherwise, the **first** non-error invocation matching `call_id`
944///    is deserialized into `T`. In the §5.8 implicit-method case both
945///    invocations are successes; the first is the primary response and
946///    is what the caller wants.
947///
948/// # Contract: `call_id` is the only matcher (bd:JMAP-6r7c.23)
949///
950/// The method-name field of the matching invocation is **not** checked
951/// against `T`. The function trusts the caller's choice of `call_id` to
952/// identify the invocation and trusts the server's choice of method name
953/// to be consistent with the request it was answering. Two consequences
954/// callers should be aware of:
955///
956/// - **Wrong `T` may still parse.** If a caller asks for
957///   `extract_response::<EmailGetResponse>(&resp, "r1")` but the matching
958///   invocation is actually a `Mailbox/get` response, `serde_json::from_value`
959///   will succeed on any structural overlap between the two shapes (both
960///   carry `accountId`, an `ids`/`list` field, etc.) and return a
961///   default-shaped `EmailGetResponse`. The function cannot detect this
962///   because it has no view of what method name the caller expected.
963/// - **Server ordering is trusted.** When multiple non-error invocations
964///   share a `call_id`, the function returns the first one in
965///   `resp.method_responses` order. RFC 8620 §5.8 implies the primary
966///   response is first, but the spec does not normatively require it; a
967///   non-conformant server that returns the implicit method first would
968///   silently get the wrong invocation deserialized as `T`.
969///
970/// Callers that need method-name verification or want to disambiguate
971/// among multiple non-error matches should iterate
972/// `resp.method_responses` directly. The field is public and the
973/// [`jmap_types::Invocation`] type is `(method, args, call_id)`.
974///
975/// # Panics
976///
977/// This function does not catch panics from `T`'s [`serde::Deserialize`]
978/// implementation. If a custom `T` type's `deserialize` impl panics — e.g.
979/// because of an `.unwrap()` on a sub-field — the panic propagates through
980/// `extract_response` to the caller's await point. The standard derived
981/// `Deserialize` impls in the workspace type crates (`jmap-types`,
982/// `jmap-mail-types`, etc.) do not panic; this caveat only affects
983/// hand-rolled `Deserialize` impls outside the workspace
984/// (bd:JMAP-6r7c.44).
985///
986/// This function is `pub` so extension crates (`jmap-chat-client`,
987/// `jmap-mail-client`) can use it to extract typed results from a
988/// [`jmap_types::JmapResponse`] without depending on internal details.
989pub fn extract_response<T: serde::de::DeserializeOwned>(
990    resp: &jmap_types::JmapResponse,
991    call_id: &str,
992) -> Result<T, ClientError> {
993    // Invocation is a type alias (String, Value, String) = (method, args, call_id).
994    // Two-pass scan: errors take precedence per §3.6.1, so look for an error
995    // invocation first; otherwise return the first non-error invocation.
996    // The underlying iterator is slice::Iter, so .clone() is a cheap pointer
997    // copy — no allocation.
998    let mut candidates = resp.method_responses.iter().filter(|inv| inv.2 == call_id);
999
1000    if let Some(err_inv) = candidates.clone().find(|inv| inv.0 == "error") {
1001        let args = &err_inv.1;
1002        let err_type = args
1003            .get("type")
1004            .and_then(|v| v.as_str())
1005            .unwrap_or("serverError") // safe: fallback literal, not user input
1006            .to_owned();
1007        let description = args
1008            .get("description")
1009            .and_then(|v| v.as_str())
1010            .map(str::to_owned);
1011        return Err(ClientError::MethodError {
1012            error_type: err_type,
1013            description,
1014        });
1015    }
1016
1017    // The early return above already handled the error case; every remaining
1018    // candidate is a success invocation, so we just take the first one.
1019    let inv = candidates
1020        .next()
1021        .ok_or_else(|| ClientError::MethodNotFound(call_id.to_owned()))?;
1022    <T as serde::Deserialize>::deserialize(&inv.1).map_err(ClientError::from_parse)
1023}
1024
1025/// Decode as much valid UTF-8 as possible from `raw` into `buf`, draining
1026/// consumed (or definitively-invalid) bytes from `raw`.
1027///
1028/// Three cases:
1029/// - `raw` is fully valid UTF-8: pushed entirely to `buf`, `raw` cleared.
1030/// - `raw` ends with an **incomplete** multi-byte sequence (`error_len == None`):
1031///   the valid prefix is pushed to `buf` and drained from `raw`; the incomplete
1032///   head bytes stay in `raw` for the next chunk to complete them.
1033/// - `raw` contains a **definitively invalid** byte sequence (`error_len == Some(n)`):
1034///   the valid prefix is pushed to `buf`; the valid prefix AND the `n` invalid
1035///   bytes are drained from `raw` so they do not accumulate.
1036///
1037/// The caller is responsible for capping `raw.len()` before calling this
1038/// function; unbounded growth from a hostile server that never completes a
1039/// sequence is prevented by the 1 MiB `SSE_BUF_SIZE_LIMIT` check in
1040/// `subscribe_events`.
1041fn decode_utf8_chunk(raw: &mut Vec<u8>, buf: &mut String) {
1042    match std::str::from_utf8(raw) {
1043        Ok(s) => {
1044            buf.push_str(s);
1045            raw.clear();
1046        }
1047        Err(e) => {
1048            let valid_up_to = e.valid_up_to();
1049            // valid_up_to is the documented prefix length that IS valid UTF-8
1050            // by construction (std::str::Utf8Error contract). The crate-root
1051            // #![forbid(unsafe_code)] rules out std::str::from_utf8_unchecked,
1052            // which would be the canonical zero-cost path for "I have a slice
1053            // I know is valid UTF-8 but the type system doesn't". With unsafe
1054            // forbidden, .expect() is the cheapest legal alternative; the
1055            // expect message tracks the soundness argument so a future
1056            // reviewer does not "fix" this with unwrap_or_default()
1057            // (which would silently drop valid UTF-8 prefix bytes on a
1058            // hypothetical impossible failure path) (bd:JMAP-6r7c.54).
1059            buf.push_str(
1060                std::str::from_utf8(&raw[..valid_up_to])
1061                    .expect("valid_up_to is a valid UTF-8 boundary"),
1062            );
1063            match e.error_len() {
1064                Some(n) => {
1065                    // Definitively invalid: drain the valid prefix AND the
1066                    // invalid bytes so they do not accumulate in raw.
1067                    let drain_end = (valid_up_to + n).min(raw.len());
1068                    raw.drain(..drain_end);
1069                }
1070                None => {
1071                    // Incomplete multi-byte sequence: drain only the valid
1072                    // prefix.  The incomplete head stays in raw until the next
1073                    // chunk arrives with the missing continuation bytes.
1074                    raw.drain(..valid_up_to);
1075                }
1076            }
1077        }
1078    }
1079}
1080
1081/// Extract the URL scheme as a borrowed slice of `url`.
1082///
1083/// Returns the prefix before `"://"`. The returned slice is in the
1084/// *original* case of the input (callers must use [`str::eq_ignore_ascii_case`]
1085/// for the comparison per RFC 3986 §3.1, which says only schemes are
1086/// case-insensitive). Returns `None` if `"://"` is not present in `url`.
1087/// URL templates containing `{variable}` syntax are handled correctly
1088/// because the extraction is a prefix scan, not a full URL parse.
1089///
1090/// Returning a borrowed slice rather than a `to_ascii_lowercase()` String
1091/// avoids allocating on every scheme check; previously each call to
1092/// `url_scheme` produced a fresh String the size of the URL prefix
1093/// (bd:JMAP-6lsm.10).
1094fn url_scheme(url: &str) -> Option<&str> {
1095    url.split_once("://").map(|(scheme, _)| scheme)
1096}
1097
1098/// `true` if `url`'s scheme prefix is `http` or `https` (case-insensitive,
1099/// RFC 3986 §3.1).
1100fn is_http_or_https(url: &str) -> bool {
1101    url_scheme(url)
1102        .is_some_and(|s| s.eq_ignore_ascii_case("http") || s.eq_ignore_ascii_case("https"))
1103}
1104
1105/// Parse and validate a JMAP base URL.
1106///
1107/// The input must be:
1108/// - non-empty
1109/// - syntactically valid (parses with [`url::Url::parse`])
1110/// - http or https scheme (case-insensitive per RFC 3986 §3.1)
1111/// - origin-only: no explicit path component, no query string, no fragment.
1112///   "Origin" here means scheme + host + optional port — the form a JMAP
1113///   server is identified by. Trailing slashes are accepted (the URL
1114///   parser normalizes them away).
1115///
1116/// Examples that PASS: `https://jmap.example.com`,
1117/// `https://jmap.example.com/`, `https://10.0.0.1:8008`,
1118/// `http://localhost:8080`.
1119///
1120/// Examples that FAIL: `""`, `https://example.com/api`,
1121/// `https://example.com?query=1`, `https://example.com#fragment`,
1122/// `ftp://example.com`.
1123///
1124/// Extracted from [`JmapClient::new`] so the constructor reads as
1125/// 'parse-don't-validate' rather than a six-step inline procedure
1126/// (bd:JMAP-6lsm.26).
1127fn parse_base_url(base_url: &str) -> Result<url::Url, ClientError> {
1128    if base_url.is_empty() {
1129        return Err(ClientError::InvalidArgument(
1130            "base_url may not be empty".into(),
1131        ));
1132    }
1133    let parsed = url::Url::parse(base_url)
1134        .map_err(|e| ClientError::InvalidArgument(format!("base_url is not a valid URL: {e}")))?;
1135    // Reject RFC 3986 user-info (`user:password@host`) before any further
1136    // validation. The url crate's Display impl echoes user-info verbatim
1137    // (lossless-round-trip, NOT safe-display — RFC 3986 §7.5 warns
1138    // against the latter), so an unrejected user-info value would surface
1139    // through every subsequent error message, every reqwest::Error's
1140    // url() field, and any tracing instrumentation that captures the
1141    // base URL or a derived ClientError. JMAP authenticates via the
1142    // Authorization header (AuthProvider trait), not URL user-info; the
1143    // user-info component has no legitimate use here (bd:JMAP-6r7c.58).
1144    //
1145    // The error message DOES NOT echo `base_url` back — doing so would
1146    // route the password into the error chain we are trying to keep it
1147    // out of.
1148    if !parsed.username().is_empty() || parsed.password().is_some() {
1149        return Err(ClientError::InvalidArgument(
1150            "base_url must not contain user-info (`user:password@host`); pass credentials via \
1151             AuthProvider instead"
1152                .into(),
1153        ));
1154    }
1155    let scheme = parsed.scheme();
1156    if scheme != "http" && scheme != "https" {
1157        return Err(ClientError::InvalidArgument(format!(
1158            "base_url scheme must be http or https, got: {scheme:?}"
1159        )));
1160    }
1161    let path = parsed.path();
1162    // url::Url::path() returns "/" for root-only URLs (no path segments);
1163    // any value other than "/" means the URL contains an explicit path component.
1164    if path != "/" {
1165        return Err(ClientError::InvalidArgument(format!(
1166            "base_url must not have a path component, got: {path:?}"
1167        )));
1168    }
1169    if parsed.query().is_some() {
1170        return Err(ClientError::InvalidArgument(
1171            "base_url must not have a query string".into(),
1172        ));
1173    }
1174    if parsed.fragment().is_some() {
1175        return Err(ClientError::InvalidArgument(
1176            "base_url must not have a fragment".into(),
1177        ));
1178    }
1179    Ok(parsed)
1180}
1181
1182/// Validate that `url` uses an http or https scheme.
1183///
1184/// Called at the top of each public method that accepts a URL parameter
1185/// (`call`, `subscribe_events`, `upload_blob`, `download_blob`).  This is a
1186/// defense-in-depth check: the primary protection is
1187/// [`validate_session_url_schemes`] which rejects bad URLs in the Session
1188/// document at fetch time.  This check makes each individual call site
1189/// self-defending against accidentally passing a non-http URL (e.g. from a
1190/// test fixture or a misused API).
1191///
1192/// Returns [`ClientError::InvalidArgument`] if the scheme is not http/https.
1193pub(crate) fn require_http_url(url: &str) -> Result<(), ClientError> {
1194    if !is_http_or_https(url) {
1195        return Err(ClientError::InvalidArgument(format!(
1196            "URL must have http or https scheme, got: {url:?}"
1197        )));
1198    }
1199    Ok(())
1200}
1201
1202/// Validate the *schemes only* of the four session URL fields.
1203///
1204/// Three of the four (`upload_url`, `download_url`, `event_source_url`) are
1205/// RFC 6570 URI templates carrying `{accountId}`, `{blobId}`, `{types}`,
1206/// etc. They cannot be fully parsed as URLs without first being expanded
1207/// with the relevant variables, but the scheme prefix is always concrete
1208/// (templates put the scheme on the left of `://`). This function does the
1209/// minimal check that the scheme prefix is `http://` or `https://` — it
1210/// does NOT verify that the templates carry the required variables, that
1211/// the host is well-formed, or that the path is reachable.
1212///
1213/// Renamed from `validate_session_urls` for accuracy (bd:JMAP-6lsm.23):
1214/// the name implied stronger validation than the function actually
1215/// performs.
1216fn validate_session_url_schemes(session: &Session) -> Result<(), ClientError> {
1217    // Mixed types (JmapUrl, JmapUrlTemplate) deliberately — both expose
1218    // .as_str() identically. Iterating &str values keeps the validation
1219    // loop oblivious to the typed-URL distinction (bd:JMAP-6r7c.40),
1220    // which is the right scope for a scheme check.
1221    for url in [
1222        session.api_url.as_str(),
1223        session.upload_url.as_str(),
1224        session.download_url.as_str(),
1225        session.event_source_url.as_str(),
1226    ] {
1227        if !is_http_or_https(url) {
1228            return Err(ClientError::InvalidSession(format!(
1229                "session URL has non-http/https scheme: {url:?}"
1230            )));
1231        }
1232    }
1233    Ok(())
1234}
1235
1236// ---------------------------------------------------------------------------
1237// Tests
1238// ---------------------------------------------------------------------------
1239
1240#[cfg(test)]
1241mod tests {
1242    use super::decode_utf8_chunk;
1243
1244    /// Oracle: all-ASCII bytes pushed to buf; raw cleared.
1245    #[test]
1246    fn decode_utf8_chunk_all_ascii() {
1247        let mut raw = b"hello".to_vec();
1248        let mut buf = String::new();
1249        decode_utf8_chunk(&mut raw, &mut buf);
1250        assert_eq!(buf, "hello");
1251        assert!(raw.is_empty());
1252    }
1253
1254    /// Oracle: complete multi-byte codepoint (U+00E9 café = 0xC3 0xA9) pushed fully.
1255    #[test]
1256    fn decode_utf8_chunk_complete_multibyte() {
1257        let mut raw = "café".as_bytes().to_vec();
1258        let mut buf = String::new();
1259        decode_utf8_chunk(&mut raw, &mut buf);
1260        assert_eq!(buf, "café");
1261        assert!(raw.is_empty());
1262    }
1263
1264    /// Oracle: first byte of a 2-byte sequence (U+00E9 = 0xC3 0xA9) arrives alone.
1265    /// error_len == None (incomplete) — the byte must be RETAINED in raw, not dropped.
1266    /// Regression test for the bug where valid_up_to.max(1) discarded this byte.
1267    #[test]
1268    fn decode_utf8_chunk_incomplete_head_retained() {
1269        let mut raw = vec![0xC3u8]; // first byte of 2-byte sequence
1270        let mut buf = String::new();
1271        decode_utf8_chunk(&mut raw, &mut buf);
1272        assert_eq!(buf, "", "no complete codepoints to push");
1273        assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
1274    }
1275
1276    /// Oracle: valid ASCII prefix then first byte of a 2-byte sequence.
1277    /// Valid prefix goes to buf; incomplete head stays in raw.
1278    #[test]
1279    fn decode_utf8_chunk_prefix_then_incomplete_head() {
1280        let mut raw = vec![b'a', b'b', 0xC3u8];
1281        let mut buf = String::new();
1282        decode_utf8_chunk(&mut raw, &mut buf);
1283        assert_eq!(buf, "ab");
1284        assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
1285    }
1286
1287    /// Oracle: two-call simulation — incomplete sequence completed on second call.
1288    /// This is the exact HTTP chunk-split scenario the fix targets.
1289    #[test]
1290    fn decode_utf8_chunk_split_sequence_completed() {
1291        // Chunk 1: only the first byte of U+00E9 (é = 0xC3 0xA9)
1292        let mut raw = vec![0xC3u8];
1293        let mut buf = String::new();
1294        decode_utf8_chunk(&mut raw, &mut buf);
1295        assert_eq!(raw, vec![0xC3u8], "incomplete head retained after chunk 1");
1296
1297        // Chunk 2: completion byte
1298        raw.push(0xA9u8);
1299        decode_utf8_chunk(&mut raw, &mut buf);
1300        assert_eq!(buf, "é", "character fully decoded after chunk 2");
1301        assert!(raw.is_empty());
1302    }
1303
1304    /// Oracle: definitively invalid byte (0xFF is never valid in UTF-8) is drained.
1305    #[test]
1306    fn decode_utf8_chunk_invalid_byte_drained() {
1307        let mut raw = vec![0xFFu8];
1308        let mut buf = String::new();
1309        decode_utf8_chunk(&mut raw, &mut buf);
1310        assert_eq!(buf, "");
1311        assert!(raw.is_empty(), "definitively invalid byte must be drained");
1312    }
1313
1314    /// Oracle: valid prefix then definitively invalid byte — prefix pushed and both drained.
1315    #[test]
1316    fn decode_utf8_chunk_prefix_then_invalid_drained() {
1317        let mut raw = vec![b'a', b'b', 0xFFu8];
1318        let mut buf = String::new();
1319        decode_utf8_chunk(&mut raw, &mut buf);
1320        assert_eq!(buf, "ab");
1321        assert!(raw.is_empty(), "prefix and invalid byte must be drained");
1322    }
1323
1324    /// Compile-time assertion that [`JmapClient`] is `Send + Sync + Clone`
1325    /// (bd:JMAP-6r7c.25). A future refactor that adds a non-`Sync` field
1326    /// (e.g. `Rc<T>`, `RefCell<T>`, `Cell<T>`) would fail this test in CI
1327    /// before any downstream consumer that shares a `JmapClient` across
1328    /// tokio tasks is exposed. The body is fenceposts: the function bodies
1329    /// of `assert_send`, `assert_sync`, and `assert_clone` never run; what
1330    /// matters is that the trait-bound monomorphization succeeds.
1331    #[test]
1332    fn jmap_client_is_send_sync_clone() {
1333        fn assert_send<T: Send>() {}
1334        fn assert_sync<T: Sync>() {}
1335        fn assert_clone<T: Clone>() {}
1336        assert_send::<super::JmapClient>();
1337        assert_sync::<super::JmapClient>();
1338        assert_clone::<super::JmapClient>();
1339    }
1340}