Skip to main content

jmap_base_client/
client.rs

1//! Auth-agnostic base JMAP HTTP client (RFC 8620).
2//!
3//! Provides [`JmapClient`] for session fetch, API calls, blob transfer,
4//! SSE event streaming, and [`extract_response`] for parsing method results.
5
6use std::sync::Arc;
7
8use futures::StreamExt;
9
10use crate::auth::{AuthProvider, DefaultTransport, TransportConfig};
11use crate::error::ClientError;
12use crate::request::Session;
13use crate::sse::{parse_sse_block, SseFrame};
14
15/// Internal state threaded through the `subscribe_events` unfold loop.
16struct SseStreamState<S> {
17    stream: S,
18    /// Accumulates raw bytes from the HTTP stream before UTF-8 decoding.
19    /// Incomplete multi-byte sequences remain here until the next chunk
20    /// completes them, preventing stream termination when a codepoint is
21    /// split across adjacent chunks.
22    raw_buf: Vec<u8>,
23    buf: String,
24    /// Byte offset from which the next delimiter scan begins.
25    /// Must always be a valid UTF-8 char boundary of `buf`.
26    scan_from: usize,
27}
28
29/// Per-client configuration for timeouts and body size limits.
30///
31/// Use [`ClientConfig::default()`] for production defaults (30s timeout, RFC-safe caps).
32///
33/// This type is `#[non_exhaustive]`: callers outside this crate must use
34/// `..ClientConfig::default()` when constructing it, allowing new fields to
35/// be added in minor versions without breaking callers.
36#[non_exhaustive]
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct ClientConfig {
39    /// Timeout for HTTP request/response cycles (fetch_session, call, upload_blob, download_blob).
40    /// Does NOT apply to SSE or WebSocket streams (which are indefinite by nature).
41    /// Must be > 0. Use `Duration::from_secs(30)` for a 30-second timeout.
42    /// Default: 30 seconds.
43    pub request_timeout: std::time::Duration,
44    /// Maximum response body for fetch_session. Default: 1 MiB.
45    pub max_session_body: u64,
46    /// Maximum response body for call(). Default: 8 MiB.
47    pub max_call_body: u64,
48    /// Maximum response body for download_blob(). Default: 64 MiB.
49    pub max_download_body: u64,
50    /// Maximum size in bytes of the JSON response body returned by the
51    /// server in reply to `upload_blob()`. Does NOT cap the size of the
52    /// blob being uploaded — the JMAP server enforces that via its
53    /// `maxSizeUpload` capability. This field caps only the small JSON
54    /// envelope the server returns describing the stored blob.
55    /// Default: 1 MiB.
56    pub max_upload_response_body: u64,
57    /// Maximum byte length of a single SSE frame; applied independently to
58    /// the raw incoming bytes (pre-UTF-8 decode) and the decoded text.
59    /// Protects against memory exhaustion from a hostile or misbehaving
60    /// server that sends a single very large frame. Must be > 0.
61    /// Default: 1 MiB.
62    ///
63    /// **Memory residency note (bd:JMAP-6lsm.7):** because the raw byte
64    /// buffer and the decoded text buffer are tracked separately, the
65    /// in-flight footprint while parsing a single frame can momentarily
66    /// reach ~2 × `max_sse_frame` (raw bytes accumulated up to the limit,
67    /// plus decoded text not yet drained). If you tune this value for a
68    /// tight memory budget, plan for that 2× peak. The independent
69    /// tracking is correct for the streaming UTF-8 decoder (which needs
70    /// the raw buffer to be at least one full frame to handle split
71    /// multi-byte sequences across HTTP chunks); see `decode_utf8_chunk`
72    /// for the rationale.
73    pub max_sse_frame: usize,
74    /// Maximum byte length of a single WebSocket message (and frame). Mirrors
75    /// `max_sse_frame` for the WebSocket transport. Used by
76    /// [`JmapClient::connect_ws_session`] and threaded through to
77    /// `tokio_tungstenite::tungstenite::protocol::WebSocketConfig`'s
78    /// `max_message_size` and `max_frame_size`. Must be > 0.
79    /// Default: 1 MiB. (bd:JMAP-6lsm.5)
80    pub max_ws_message: usize,
81}
82
83impl Default for ClientConfig {
84    fn default() -> Self {
85        ClientConfig {
86            request_timeout: std::time::Duration::from_secs(30),
87            max_session_body: 1024 * 1024,
88            max_call_body: 8 * 1024 * 1024,
89            max_download_body: 64 * 1024 * 1024,
90            max_upload_response_body: 1024 * 1024,
91            max_sse_frame: 1024 * 1024,
92            max_ws_message: 1024 * 1024,
93        }
94    }
95}
96
97impl ClientConfig {
98    /// Validate that all config fields satisfy their constraints.
99    ///
100    /// Called automatically by [`JmapClient::new`].  Callers may also call
101    /// this directly to pre-validate a config before passing it to the
102    /// constructor.
103    ///
104    /// # Errors
105    ///
106    /// Returns [`ClientError::InvalidArgument`] when any field is zero or
107    /// out-of-range.
108    pub fn validate(&self) -> Result<(), ClientError> {
109        if self.max_session_body == 0 {
110            return Err(ClientError::InvalidArgument(
111                "ClientConfig.max_session_body must be > 0".into(),
112            ));
113        }
114        if self.max_call_body == 0 {
115            return Err(ClientError::InvalidArgument(
116                "ClientConfig.max_call_body must be > 0".into(),
117            ));
118        }
119        if self.max_download_body == 0 {
120            return Err(ClientError::InvalidArgument(
121                "ClientConfig.max_download_body must be > 0".into(),
122            ));
123        }
124        if self.max_upload_response_body == 0 {
125            return Err(ClientError::InvalidArgument(
126                "ClientConfig.max_upload_response_body must be > 0".into(),
127            ));
128        }
129        if self.request_timeout == std::time::Duration::ZERO {
130            return Err(ClientError::InvalidArgument(
131                "ClientConfig.request_timeout must be > 0; use Duration::from_secs(30) or similar"
132                    .into(),
133            ));
134        }
135        if self.max_sse_frame == 0 {
136            return Err(ClientError::InvalidArgument(
137                "ClientConfig.max_sse_frame must be > 0".into(),
138            ));
139        }
140        if self.max_ws_message == 0 {
141            return Err(ClientError::InvalidArgument(
142                "ClientConfig.max_ws_message must be > 0".into(),
143            ));
144        }
145        Ok(())
146    }
147}
148
149/// Auth-agnostic JMAP base HTTP client.
150///
151/// Construct with [`JmapClient::new`] or [`JmapClient::new_plain`].
152/// Extension-specific clients (`jmap-chat-client`, `jmap-mail-client`) depend
153/// on this crate and add their method implementations via `impl JmapClient`.
154#[derive(Clone)]
155pub struct JmapClient {
156    pub(crate) base_url: url::Url,
157    pub(crate) auth: Arc<dyn AuthProvider>,
158    pub(crate) http: reqwest::Client,
159    pub(crate) config: ClientConfig,
160}
161
162impl std::fmt::Debug for JmapClient {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        f.debug_struct("JmapClient")
165            .field("base_url", &self.base_url)
166            .field("config", &self.config)
167            .finish_non_exhaustive()
168    }
169}
170
171impl JmapClient {
172    /// Create a new client.
173    ///
174    /// `transport` configures the underlying HTTP client (TLS trust roots,
175    /// client certificates, timeouts). `auth` injects per-request credentials
176    /// (Bearer token, Basic credentials, or none). The two are independent so
177    /// any transport can be paired with any credential scheme — for example,
178    /// `CustomCaTransport` with `BearerAuth`. `base_url` must be the server
179    /// origin (scheme, host, optional port) with no path, query, or fragment
180    /// — e.g. `"https://100.64.1.1:8008"`. Trailing slashes are normalized
181    /// away by the URL parser and are therefore accepted.
182    pub fn new(
183        transport: impl TransportConfig,
184        auth: impl AuthProvider + 'static,
185        base_url: &str,
186        config: ClientConfig,
187    ) -> Result<Self, ClientError> {
188        // Parse-don't-validate: parse_base_url returns a fully-validated
189        // url::Url so the constructor body can read as four facts (parse,
190        // validate config, build transport, construct Self) rather than
191        // a procedure (bd:JMAP-6lsm.26).
192        let parsed = parse_base_url(base_url)?;
193        config.validate()?;
194        let http = transport.build_client()?;
195        Ok(Self {
196            base_url: parsed,
197            auth: Arc::new(auth),
198            http,
199            config,
200        })
201    }
202
203    /// Convenience constructor for servers with publicly-trusted TLS.
204    ///
205    /// Equivalent to `JmapClient::new(DefaultTransport, auth, base_url, config)`.
206    /// Use [`JmapClient::new`] when you need a custom transport (e.g.
207    /// `CustomCaTransport` for a private-CA server).
208    pub fn new_plain(
209        auth: impl AuthProvider + 'static,
210        base_url: &str,
211        config: ClientConfig,
212    ) -> Result<Self, ClientError> {
213        Self::new(DefaultTransport, auth, base_url, config)
214    }
215
216    /// Apply the auth header (if any) to a request builder.
217    ///
218    /// Centralises the repeated `if let Some(...) = self.auth.auth_header()` pattern
219    /// that every HTTP method uses. Callers: `fetch_session`, `call`,
220    /// `subscribe_events`, `upload_blob`, `download_blob`.
221    pub(crate) fn inject_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
222        if let Some((name, value)) = self.auth.auth_header() {
223            builder.header(name, value)
224        } else {
225            builder
226        }
227    }
228
229    /// Returns `Err(ClientError::AuthFailed)` when the HTTP status indicates an
230    /// authentication or authorization failure.
231    ///
232    /// Specifically handles:
233    /// - 401 Unauthorized (RFC 7235 §3.1) — missing or invalid credentials
234    /// - 403 Forbidden (RFC 7235 §3.2) — credentials present but insufficient
235    ///
236    /// Called before reading the response body so callers can distinguish
237    /// permanent auth failures from transient errors without consuming the body.
238    pub(crate) fn check_auth_status(status: reqwest::StatusCode) -> Result<(), ClientError> {
239        if status == reqwest::StatusCode::UNAUTHORIZED || status == reqwest::StatusCode::FORBIDDEN {
240            Err(ClientError::AuthFailed(status.as_u16()))
241        } else {
242            Ok(())
243        }
244    }
245
246    /// Fetch the JMAP Session object from `{base_url}/.well-known/jmap` (RFC 8620 §2).
247    ///
248    /// The response body is capped at 1 MiB. Returns `ClientError::ResponseTooLarge`
249    /// if the server sends more. Session URL fields (`apiUrl`, `uploadUrl`,
250    /// `downloadUrl`, `eventSourceUrl`) are validated to have http/https scheme;
251    /// a non-http scheme returns `ClientError::InvalidSession`.
252    ///
253    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
254    pub async fn fetch_session(&self) -> Result<Session, ClientError> {
255        let limit = self.config.max_session_body;
256        let url = self.base_url.join(".well-known/jmap").map_err(|e| {
257            // base_url was pre-validated in JmapClient::new; this path is
258            // unreachable in practice but must be handled for completeness.
259            ClientError::InvalidSession(format!("cannot construct session URL: {e}"))
260        })?;
261
262        let req = self.inject_auth(self.http.get(url).timeout(self.config.request_timeout));
263
264        let resp = {
265            let raw_resp = req.send().await.map_err(ClientError::from_reqwest)?;
266            Self::check_auth_status(raw_resp.status())?;
267            raw_resp
268                .error_for_status()
269                .map_err(ClientError::from_reqwest)?
270        };
271
272        // Enforce size cap before reading. Content-Length can lie, so we check
273        // both the header and the actual read size.
274        if let Some(len) = resp.content_length() {
275            if len > limit {
276                return Err(ClientError::ResponseTooLarge { actual: len, limit });
277            }
278        }
279        let bytes = resp.bytes().await.map_err(ClientError::from_reqwest)?;
280        if bytes.len() as u64 > limit {
281            return Err(ClientError::ResponseTooLarge {
282                actual: bytes.len() as u64,
283                limit,
284            });
285        }
286
287        let session: Session = serde_json::from_slice(&bytes).map_err(ClientError::Parse)?;
288
289        validate_session_url_schemes(&session)?;
290
291        Ok(session)
292    }
293
294    /// POST a [`jmap_types::JmapRequest`] to `api_url` and return the parsed [`jmap_types::JmapResponse`]
295    /// (RFC 8620 §3.3).
296    ///
297    /// `api_url` is taken as an explicit parameter (not from `self`) because the
298    /// caller holds a [`Session`] and selects the correct URL from it.
299    ///
300    /// The response body is capped at 8 MiB. Returns `ClientError::ResponseTooLarge`
301    /// if the server sends more.
302    ///
303    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
304    pub async fn call(
305        &self,
306        api_url: &str,
307        req: &jmap_types::JmapRequest,
308    ) -> Result<jmap_types::JmapResponse, ClientError> {
309        require_http_url(api_url)?;
310        let limit = self.config.max_call_body;
311
312        let builder = self.inject_auth(
313            self.http
314                .post(api_url)
315                .json(req)
316                .timeout(self.config.request_timeout),
317        );
318
319        let resp = {
320            let raw_resp = builder.send().await.map_err(ClientError::from_reqwest)?;
321            Self::check_auth_status(raw_resp.status())?;
322            raw_resp
323                .error_for_status()
324                .map_err(ClientError::from_reqwest)?
325        };
326
327        // Enforce size cap before reading.
328        if let Some(len) = resp.content_length() {
329            if len > limit {
330                return Err(ClientError::ResponseTooLarge { actual: len, limit });
331            }
332        }
333        let bytes = resp.bytes().await.map_err(ClientError::from_reqwest)?;
334        if bytes.len() as u64 > limit {
335            return Err(ClientError::ResponseTooLarge {
336                actual: bytes.len() as u64,
337                limit,
338            });
339        }
340
341        let jmap_resp: jmap_types::JmapResponse =
342            serde_json::from_slice(&bytes).map_err(ClientError::Parse)?;
343
344        Ok(jmap_resp)
345    }
346
347    /// Open an SSE connection to `event_source_url` and return an async stream
348    /// of parsed [`SseFrame`]s (RFC 8620 §7.3).
349    ///
350    /// # URI template expansion
351    ///
352    /// `Session.event_source_url` is a URI template (RFC 6570 Level-1) with
353    /// variables `types`, `closeafter`, and `ping`. You **must** expand it
354    /// before passing it to this function, or the server will receive the
355    /// literal text `{types}` in the URL and return an error. Use
356    /// [`expand_url_template`](crate::expand_url_template):
357    ///
358    /// ```rust,ignore
359    /// let url = jmap_base_client::expand_url_template(
360    ///     &session.event_source_url,
361    ///     &[("types", "*"), ("closeafter", "no"), ("ping", "0")],
362    /// )?;
363    /// let stream = client.subscribe_events(&url, None).await?;
364    /// ```
365    ///
366    /// If `last_event_id` is `Some`, sends a `Last-Event-ID` header so the
367    /// server can resume from where the previous stream left off.
368    ///
369    /// Buffer growth is capped at [`ClientConfig::max_sse_frame`] bytes per
370    /// frame (default: 1 MiB). If a single SSE frame exceeds this limit the
371    /// stream yields `ClientError::SseFrameTooLarge` and terminates.
372    ///
373    /// No timeout is applied to this call or to the resulting stream.  The
374    /// connect timeout (10 s, TCP only) is the only deadline enforced.  If the
375    /// server stalls before sending HTTP response headers, or later goes silent
376    /// on the open connection, this call or the stream will hang indefinitely.
377    /// Wrap the entire call and/or stream iteration in [`tokio::time::timeout`]
378    /// if you need to bound either phase.
379    ///
380    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403 before the stream
381    /// starts.
382    pub async fn subscribe_events(
383        &self,
384        event_source_url: &str,
385        last_event_id: Option<&str>,
386    ) -> Result<futures::stream::BoxStream<'static, Result<SseFrame, ClientError>>, ClientError>
387    {
388        require_http_url(event_source_url)?;
389        let mut req = self
390            .http
391            .get(event_source_url)
392            .header("Accept", "text/event-stream");
393        if let Some(id) = last_event_id {
394            req = req.header("Last-Event-ID", id);
395        }
396        let req = self.inject_auth(req);
397
398        let resp = req.send().await.map_err(ClientError::from_reqwest)?;
399        Self::check_auth_status(resp.status())?;
400        let resp = resp.error_for_status().map_err(ClientError::from_reqwest)?;
401
402        // Verify Content-Type before streaming. A misconfigured server returning
403        // application/json would silently produce no events (no SSE delimiter found).
404        {
405            // to_ascii_lowercase(): media types are case-insensitive per RFC 7231 §3.1.1.1.
406            let ct = resp
407                .headers()
408                .get(reqwest::header::CONTENT_TYPE)
409                .and_then(|v| v.to_str().ok())
410                .unwrap_or("")
411                .to_ascii_lowercase();
412            // RFC 7231 §3.1.1.1 / RFC 9110 §8.3: the media-type "essence" (type
413            // + "/" + subtype) is bounded by ';', SP, HTAB, or end-of-string.
414            // A naive starts_with("text/event-stream") would accept
415            // "text/event-streamish" or "text/event-stream2" — exactly the bug
416            // JMAP-6lsm.2 flagged. Split off the parameter list / trailing
417            // whitespace and compare the essence exactly.
418            let essence = ct
419                .split(|c: char| c == ';' || c.is_whitespace())
420                .next()
421                .unwrap_or("");
422            if essence != "text/event-stream" {
423                return Err(ClientError::UnexpectedResponse(format!(
424                    "subscribe_events: expected Content-Type text/event-stream, got: {ct:?}"
425                )));
426            }
427        }
428
429        let byte_stream = resp.bytes_stream();
430        let sse_frame_limit = self.config.max_sse_frame;
431
432        Ok(futures::stream::unfold(
433            Some(SseStreamState {
434                stream: byte_stream,
435                raw_buf: Vec::new(),
436                buf: String::new(),
437                scan_from: 0, // invariant: valid UTF-8 char boundary of buf; 0 always satisfies this
438            }),
439            move |state| async move {
440                let SseStreamState {
441                    mut stream,
442                    mut raw_buf,
443                    mut buf,
444                    mut scan_from,
445                } = state?;
446                loop {
447                    // Search for any double-newline delimiter (LF/CRLF/CR variants).
448                    // scan_from is set to old_len.saturating_sub(3) after each append
449                    // so we only re-scan the overlap region.  3 bytes back is the
450                    // minimum that covers all delimiter prefixes that can straddle a
451                    // chunk boundary:
452                    //   - `\r\n\r\n` (4 bytes): longest prefix that fits in one chunk
453                    //     but is incomplete is `\r\n\r` (3 bytes) — exactly covered.
454                    //   - `\n\r\n` (3 bytes): longest incomplete prefix is `\n\r` (2
455                    //     bytes) — covered by the 3-byte overlap.
456                    //   - `\n\n` and `\r\r` (2 bytes each): longest incomplete prefix
457                    //     is 1 byte — covered by the 3-byte overlap.
458                    // Since \r and \n are single-byte UTF-8 codepoints, 3 bytes back
459                    // is always a valid char boundary — no adjustment needed.
460                    let frame_end = [
461                        buf[scan_from..]
462                            .find("\r\n\r\n")
463                            .map(|p| (scan_from + p, 4usize)),
464                        buf[scan_from..]
465                            .find("\n\n")
466                            .map(|p| (scan_from + p, 2usize)),
467                        buf[scan_from..]
468                            .find("\r\r")
469                            .map(|p| (scan_from + p, 2usize)),
470                        // Mixed: LF-terminated last field line followed by a
471                        // CRLF-terminated blank line.  Not detected by \n\n (the
472                        // LFs are separated by a CR) or \r\n\r\n (no leading \r\n).
473                        buf[scan_from..]
474                            .find("\n\r\n")
475                            .map(|p| (scan_from + p, 3usize)),
476                    ]
477                    .into_iter()
478                    .flatten()
479                    .min_by_key(|&(pos, _)| pos);
480
481                    if let Some((pos, delim_len)) = frame_end {
482                        let frame = {
483                            let slice = &buf[..pos];
484                            if slice.contains('\r') {
485                                slice.replace("\r\n", "\n").replace('\r', "\n")
486                            } else {
487                                slice.to_owned()
488                            }
489                        };
490                        buf.drain(..pos + delim_len);
491                        scan_from = 0; // 0 satisfies the UTF-8 char boundary invariant
492                        let sse_frame = parse_sse_block(&frame);
493                        return Some((
494                            Ok(sse_frame),
495                            Some(SseStreamState {
496                                stream,
497                                raw_buf,
498                                buf,
499                                scan_from,
500                            }),
501                        ));
502                    }
503
504                    match stream.next().await {
505                        None => return None,
506                        Some(Err(e)) => {
507                            return Some((Err(ClientError::from_reqwest(e)), None));
508                        }
509                        Some(Ok(bytes)) => {
510                            // Accumulate raw bytes first. A multi-byte UTF-8 codepoint
511                            // may be split across adjacent HTTP chunks; decode only the
512                            // valid prefix and leave the remainder in raw_buf until the
513                            // next chunk completes the sequence.
514                            raw_buf.extend_from_slice(&bytes);
515                            // Cap raw_buf to prevent OOM on persistent invalid UTF-8 input.
516                            // Use the same limit as the decoded buf cap.
517                            if raw_buf.len() > sse_frame_limit {
518                                return Some((
519                                    Err(ClientError::SseFrameTooLarge {
520                                        limit: sse_frame_limit,
521                                    }),
522                                    None,
523                                ));
524                            }
525                            let old_len = buf.len();
526                            decode_utf8_chunk(&mut raw_buf, &mut buf);
527                            scan_from = old_len.saturating_sub(3);
528                            // Walk backward to a valid UTF-8 char boundary so that
529                            // buf[scan_from..] never panics on multibyte characters.
530                            while scan_from > 0 && !buf.is_char_boundary(scan_from) {
531                                scan_from -= 1;
532                            }
533                            // Guard against unbounded buffer growth from a hostile server.
534                            // Yield the error and terminate (state = None).
535                            if buf.len() > sse_frame_limit {
536                                return Some((
537                                    Err(ClientError::SseFrameTooLarge {
538                                        limit: sse_frame_limit,
539                                    }),
540                                    None,
541                                ));
542                            }
543                        }
544                    }
545                }
546            },
547        )
548        .boxed())
549    }
550
551    /// Open a WebSocket connection to `ws_url` using this client's
552    /// configured `max_ws_message` byte cap.
553    ///
554    /// Convenience wrapper around [`crate::ws::connect_ws_with_limit`] that
555    /// passes [`ClientConfig::max_ws_message`] as the per-message /
556    /// per-frame byte cap. Mirrors the [`Self::subscribe_events`]
557    /// pattern of "the JmapClient method uses ClientConfig; the free
558    /// function takes an explicit value".
559    ///
560    /// `ws_url` must come from the session document's WebSocket capability
561    /// URL. `auth_header` is an optional `(name, value)` pair for the
562    /// upgrade request; the auth provider on this client is NOT used here
563    /// because some servers attach WebSocket auth via cookie or session
564    /// header rather than the same scheme as HTTP requests.
565    ///
566    /// # Security
567    ///
568    /// The `auth_header` value is a credential and must not be logged or
569    /// echoed back to other systems. Treat it with the same care as a
570    /// [`crate::auth::BearerAuth`] token. Transport errors raised by this
571    /// method are constructed without the original credential bytes, but
572    /// downstream code that inspects [`ClientError`] should still avoid
573    /// printing or storing the `auth_header` itself.
574    ///
575    /// Returns [`ClientError::InvalidArgument`] for non-`ws://`/`wss://` URLs.
576    /// See [`crate::ws::connect_ws_with_limit`] for full error semantics.
577    pub async fn connect_ws_session(
578        &self,
579        ws_url: &str,
580        auth_header: Option<(&str, &str)>,
581    ) -> Result<crate::ws::WsSession, ClientError> {
582        crate::ws::connect_ws_with_limit(ws_url, auth_header, self.config.max_ws_message).await
583    }
584}
585
586/// Find the method response matching `call_id` in `resp` and deserialize its
587/// arguments into `T`.
588///
589/// Returns [`ClientError::MethodNotFound`] if no invocation with the given
590/// call_id exists. Returns [`ClientError::MethodError`] if any invocation
591/// with the matching call_id is a JMAP `"error"` response (RFC 8620 §3.6.1).
592///
593/// # Multiple invocations sharing a call_id
594///
595/// Per RFC 8620 §3.2, a single method call may produce multiple invocations
596/// in the response — for example, `Foo/copy` with `onSuccessDestroyOriginal:
597/// true` produces both a `Foo/copy` and an implicit `Foo/set` invocation,
598/// both stamped with the same call_id (RFC 8620 §5.8 example, lines 3158–
599/// 3180). This function handles that case by:
600///
601/// 1. **Errors take precedence.** If any invocation matching `call_id`
602///    has method name `"error"`, this function returns that error. A
603///    success response cannot mask a sibling error response with the same
604///    call_id — silently returning the success while the server reported
605///    failure would be data loss for the caller.
606/// 2. Otherwise, the **first** non-error invocation matching `call_id`
607///    is deserialized into `T`. In the §5.8 implicit-method case both
608///    invocations are successes; the first is the primary response and
609///    is what the caller wants.
610///
611/// This function is `pub` so extension crates (`jmap-chat-client`,
612/// `jmap-mail-client`) can use it to extract typed results from a
613/// [`jmap_types::JmapResponse`] without depending on internal details.
614pub fn extract_response<T: serde::de::DeserializeOwned>(
615    resp: &jmap_types::JmapResponse,
616    call_id: &str,
617) -> Result<T, ClientError> {
618    // Invocation is a type alias (String, Value, String) = (method, args, call_id).
619    // Two-pass scan: first look for any error invocation with this call_id
620    // (errors take precedence per §3.6.1 — see the doc-comment); then fall
621    // through to the first non-error invocation.
622    let mut first_success: Option<&jmap_types::Invocation> = None;
623    for inv in resp.method_responses.iter().filter(|inv| inv.2 == call_id) {
624        if inv.0 == "error" {
625            let args = &inv.1;
626            let err_type = args
627                .get("type")
628                .and_then(|v| v.as_str())
629                .unwrap_or("serverError") // safe: fallback literal, not user input
630                .to_owned();
631            let description = args
632                .get("description")
633                .and_then(|v| v.as_str())
634                .map(str::to_owned);
635            return Err(ClientError::MethodError {
636                error_type: err_type,
637                description,
638            });
639        }
640        if first_success.is_none() {
641            first_success = Some(inv);
642        }
643    }
644    let inv = first_success.ok_or_else(|| ClientError::MethodNotFound(call_id.to_owned()))?;
645    <T as serde::Deserialize>::deserialize(&inv.1).map_err(ClientError::Parse)
646}
647
648/// Decode as much valid UTF-8 as possible from `raw` into `buf`, draining
649/// consumed (or definitively-invalid) bytes from `raw`.
650///
651/// Three cases:
652/// - `raw` is fully valid UTF-8: pushed entirely to `buf`, `raw` cleared.
653/// - `raw` ends with an **incomplete** multi-byte sequence (`error_len == None`):
654///   the valid prefix is pushed to `buf` and drained from `raw`; the incomplete
655///   head bytes stay in `raw` for the next chunk to complete them.
656/// - `raw` contains a **definitively invalid** byte sequence (`error_len == Some(n)`):
657///   the valid prefix is pushed to `buf`; the valid prefix AND the `n` invalid
658///   bytes are drained from `raw` so they do not accumulate.
659///
660/// The caller is responsible for capping `raw.len()` before calling this
661/// function; unbounded growth from a hostile server that never completes a
662/// sequence is prevented by the 1 MiB `SSE_BUF_SIZE_LIMIT` check in
663/// `subscribe_events`.
664fn decode_utf8_chunk(raw: &mut Vec<u8>, buf: &mut String) {
665    match std::str::from_utf8(raw) {
666        Ok(s) => {
667            buf.push_str(s);
668            raw.clear();
669        }
670        Err(e) => {
671            let valid_up_to = e.valid_up_to();
672            // valid_up_to is always a char boundary by definition.
673            buf.push_str(
674                std::str::from_utf8(&raw[..valid_up_to])
675                    .expect("valid_up_to is a valid UTF-8 boundary"),
676            );
677            match e.error_len() {
678                Some(n) => {
679                    // Definitively invalid: drain the valid prefix AND the
680                    // invalid bytes so they do not accumulate in raw.
681                    let drain_end = (valid_up_to + n).min(raw.len());
682                    raw.drain(..drain_end);
683                }
684                None => {
685                    // Incomplete multi-byte sequence: drain only the valid
686                    // prefix.  The incomplete head stays in raw until the next
687                    // chunk arrives with the missing continuation bytes.
688                    raw.drain(..valid_up_to);
689                }
690            }
691        }
692    }
693}
694
695/// Extract the URL scheme as a borrowed slice of `url`.
696///
697/// Returns the prefix before `"://"`. The returned slice is in the
698/// *original* case of the input (callers must use [`str::eq_ignore_ascii_case`]
699/// for the comparison per RFC 3986 §3.1, which says only schemes are
700/// case-insensitive). Returns `None` if `"://"` is not present in `url`.
701/// URL templates containing `{variable}` syntax are handled correctly
702/// because the extraction is a prefix scan, not a full URL parse.
703///
704/// Returning a borrowed slice rather than a `to_ascii_lowercase()` String
705/// avoids allocating on every scheme check; previously each call to
706/// `url_scheme` produced a fresh String the size of the URL prefix
707/// (bd:JMAP-6lsm.10).
708fn url_scheme(url: &str) -> Option<&str> {
709    url.split_once("://").map(|(scheme, _)| scheme)
710}
711
712/// `true` if `url`'s scheme prefix is `http` or `https` (case-insensitive,
713/// RFC 3986 §3.1).
714fn is_http_or_https(url: &str) -> bool {
715    url_scheme(url)
716        .is_some_and(|s| s.eq_ignore_ascii_case("http") || s.eq_ignore_ascii_case("https"))
717}
718
719/// Parse and validate a JMAP base URL.
720///
721/// The input must be:
722/// - non-empty
723/// - syntactically valid (parses with [`url::Url::parse`])
724/// - http or https scheme (case-insensitive per RFC 3986 §3.1)
725/// - origin-only: no explicit path component, no query string, no fragment.
726///   "Origin" here means scheme + host + optional port — the form a JMAP
727///   server is identified by. Trailing slashes are accepted (the URL
728///   parser normalizes them away).
729///
730/// Examples that PASS: `https://jmap.example.com`,
731/// `https://jmap.example.com/`, `https://10.0.0.1:8008`,
732/// `http://localhost:8080`.
733///
734/// Examples that FAIL: `""`, `https://example.com/api`,
735/// `https://example.com?query=1`, `https://example.com#fragment`,
736/// `ftp://example.com`.
737///
738/// Extracted from [`JmapClient::new`] so the constructor reads as
739/// 'parse-don't-validate' rather than a six-step inline procedure
740/// (bd:JMAP-6lsm.26).
741fn parse_base_url(base_url: &str) -> Result<url::Url, ClientError> {
742    if base_url.is_empty() {
743        return Err(ClientError::InvalidArgument(
744            "base_url may not be empty".into(),
745        ));
746    }
747    let parsed = url::Url::parse(base_url)
748        .map_err(|e| ClientError::InvalidArgument(format!("base_url is not a valid URL: {e}")))?;
749    let scheme = parsed.scheme();
750    if scheme != "http" && scheme != "https" {
751        return Err(ClientError::InvalidArgument(format!(
752            "base_url scheme must be http or https, got: {scheme:?}"
753        )));
754    }
755    let path = parsed.path();
756    // url::Url::path() returns "/" for root-only URLs (no path segments);
757    // any value other than "/" means the URL contains an explicit path component.
758    if path != "/" {
759        return Err(ClientError::InvalidArgument(format!(
760            "base_url must not have a path component, got: {path:?}"
761        )));
762    }
763    if parsed.query().is_some() {
764        return Err(ClientError::InvalidArgument(
765            "base_url must not have a query string".into(),
766        ));
767    }
768    if parsed.fragment().is_some() {
769        return Err(ClientError::InvalidArgument(
770            "base_url must not have a fragment".into(),
771        ));
772    }
773    Ok(parsed)
774}
775
776/// Validate that `url` uses an http or https scheme.
777///
778/// Called at the top of each public method that accepts a URL parameter
779/// (`call`, `subscribe_events`, `upload_blob`, `download_blob`).  This is a
780/// defense-in-depth check: the primary protection is
781/// [`validate_session_url_schemes`] which rejects bad URLs in the Session
782/// document at fetch time.  This check makes each individual call site
783/// self-defending against accidentally passing a non-http URL (e.g. from a
784/// test fixture or a misused API).
785///
786/// Returns [`ClientError::InvalidArgument`] if the scheme is not http/https.
787pub(crate) fn require_http_url(url: &str) -> Result<(), ClientError> {
788    if !is_http_or_https(url) {
789        return Err(ClientError::InvalidArgument(format!(
790            "URL must have http or https scheme, got: {url:?}"
791        )));
792    }
793    Ok(())
794}
795
796/// Validate the *schemes only* of the four session URL fields.
797///
798/// Three of the four (`upload_url`, `download_url`, `event_source_url`) are
799/// RFC 6570 URI templates carrying `{accountId}`, `{blobId}`, `{types}`,
800/// etc. They cannot be fully parsed as URLs without first being expanded
801/// with the relevant variables, but the scheme prefix is always concrete
802/// (templates put the scheme on the left of `://`). This function does the
803/// minimal check that the scheme prefix is `http://` or `https://` — it
804/// does NOT verify that the templates carry the required variables, that
805/// the host is well-formed, or that the path is reachable.
806///
807/// Renamed from `validate_session_urls` for accuracy (bd:JMAP-6lsm.23):
808/// the name implied stronger validation than the function actually
809/// performs.
810fn validate_session_url_schemes(session: &Session) -> Result<(), ClientError> {
811    for url in [
812        &session.api_url,
813        &session.upload_url,
814        &session.download_url,
815        &session.event_source_url,
816    ] {
817        if !is_http_or_https(url) {
818            return Err(ClientError::InvalidSession(format!(
819                "session URL has non-http/https scheme: {url:?}"
820            )));
821        }
822    }
823    Ok(())
824}
825
826// ---------------------------------------------------------------------------
827// Tests
828// ---------------------------------------------------------------------------
829
830#[cfg(test)]
831mod tests {
832    use super::decode_utf8_chunk;
833
834    /// Oracle: all-ASCII bytes pushed to buf; raw cleared.
835    #[test]
836    fn decode_utf8_chunk_all_ascii() {
837        let mut raw = b"hello".to_vec();
838        let mut buf = String::new();
839        decode_utf8_chunk(&mut raw, &mut buf);
840        assert_eq!(buf, "hello");
841        assert!(raw.is_empty());
842    }
843
844    /// Oracle: complete multi-byte codepoint (U+00E9 café = 0xC3 0xA9) pushed fully.
845    #[test]
846    fn decode_utf8_chunk_complete_multibyte() {
847        let mut raw = "café".as_bytes().to_vec();
848        let mut buf = String::new();
849        decode_utf8_chunk(&mut raw, &mut buf);
850        assert_eq!(buf, "café");
851        assert!(raw.is_empty());
852    }
853
854    /// Oracle: first byte of a 2-byte sequence (U+00E9 = 0xC3 0xA9) arrives alone.
855    /// error_len == None (incomplete) — the byte must be RETAINED in raw, not dropped.
856    /// Regression test for the bug where valid_up_to.max(1) discarded this byte.
857    #[test]
858    fn decode_utf8_chunk_incomplete_head_retained() {
859        let mut raw = vec![0xC3u8]; // first byte of 2-byte sequence
860        let mut buf = String::new();
861        decode_utf8_chunk(&mut raw, &mut buf);
862        assert_eq!(buf, "", "no complete codepoints to push");
863        assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
864    }
865
866    /// Oracle: valid ASCII prefix then first byte of a 2-byte sequence.
867    /// Valid prefix goes to buf; incomplete head stays in raw.
868    #[test]
869    fn decode_utf8_chunk_prefix_then_incomplete_head() {
870        let mut raw = vec![b'a', b'b', 0xC3u8];
871        let mut buf = String::new();
872        decode_utf8_chunk(&mut raw, &mut buf);
873        assert_eq!(buf, "ab");
874        assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
875    }
876
877    /// Oracle: two-call simulation — incomplete sequence completed on second call.
878    /// This is the exact HTTP chunk-split scenario the fix targets.
879    #[test]
880    fn decode_utf8_chunk_split_sequence_completed() {
881        // Chunk 1: only the first byte of U+00E9 (é = 0xC3 0xA9)
882        let mut raw = vec![0xC3u8];
883        let mut buf = String::new();
884        decode_utf8_chunk(&mut raw, &mut buf);
885        assert_eq!(raw, vec![0xC3u8], "incomplete head retained after chunk 1");
886
887        // Chunk 2: completion byte
888        raw.push(0xA9u8);
889        decode_utf8_chunk(&mut raw, &mut buf);
890        assert_eq!(buf, "é", "character fully decoded after chunk 2");
891        assert!(raw.is_empty());
892    }
893
894    /// Oracle: definitively invalid byte (0xFF is never valid in UTF-8) is drained.
895    #[test]
896    fn decode_utf8_chunk_invalid_byte_drained() {
897        let mut raw = vec![0xFFu8];
898        let mut buf = String::new();
899        decode_utf8_chunk(&mut raw, &mut buf);
900        assert_eq!(buf, "");
901        assert!(raw.is_empty(), "definitively invalid byte must be drained");
902    }
903
904    /// Oracle: valid prefix then definitively invalid byte — prefix pushed and both drained.
905    #[test]
906    fn decode_utf8_chunk_prefix_then_invalid_drained() {
907        let mut raw = vec![b'a', b'b', 0xFFu8];
908        let mut buf = String::new();
909        decode_utf8_chunk(&mut raw, &mut buf);
910        assert_eq!(buf, "ab");
911        assert!(raw.is_empty(), "prefix and invalid byte must be drained");
912    }
913}