jmap_base_client/client.rs
1//! Auth-agnostic base JMAP HTTP client (RFC 8620).
2//!
3//! Provides [`JmapClient`] for session fetch, API calls, blob transfer,
4//! SSE event streaming, and [`extract_response`] for parsing method results.
5
6use std::sync::Arc;
7
8use futures::StreamExt;
9
10use crate::auth::{AuthProvider, DefaultTransport, TransportConfig};
11use crate::error::ClientError;
12use crate::request::Session;
13use crate::sse::{parse_sse_block, SseFrame};
14
15/// Internal state threaded through the `subscribe_events` unfold loop.
16struct SseStreamState<S> {
17 stream: S,
18 /// Accumulates raw bytes from the HTTP stream before UTF-8 decoding.
19 /// Incomplete multi-byte sequences remain here until the next chunk
20 /// completes them, preventing stream termination when a codepoint is
21 /// split across adjacent chunks.
22 raw_buf: Vec<u8>,
23 buf: String,
24 /// Byte offset from which the next delimiter scan begins.
25 /// Must always be a valid UTF-8 char boundary of `buf`.
26 scan_from: usize,
27}
28
29/// Per-client configuration for timeouts and body size limits.
30///
31/// Use [`ClientConfig::default()`] for production defaults (30s timeout, RFC-safe caps).
32///
33/// This type is `#[non_exhaustive]`: callers outside this crate must use
34/// `..ClientConfig::default()` when constructing it, allowing new fields to
35/// be added in minor versions without breaking callers.
36#[non_exhaustive]
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct ClientConfig {
39 /// Timeout for HTTP request/response cycles (fetch_session, call, upload_blob, download_blob).
40 /// Does NOT apply to SSE or WebSocket streams (which are indefinite by nature).
41 /// Must be > 0. Use `Duration::from_secs(30)` for a 30-second timeout.
42 /// Default: 30 seconds.
43 pub request_timeout: std::time::Duration,
44 /// Maximum response body for fetch_session. Default: 1 MiB.
45 pub max_session_body: u64,
46 /// Maximum response body for call(). Default: 8 MiB.
47 pub max_call_body: u64,
48 /// Maximum response body for download_blob(). Default: 64 MiB.
49 pub max_download_body: u64,
50 /// Maximum size in bytes of the JSON response body returned by the
51 /// server in reply to `upload_blob()`. Does NOT cap the size of the
52 /// blob being uploaded — the JMAP server enforces that via its
53 /// `maxSizeUpload` capability. This field caps only the small JSON
54 /// envelope the server returns describing the stored blob.
55 /// Default: 1 MiB.
56 pub max_upload_response_body: u64,
57 /// Maximum byte length of a single SSE frame; applied independently to
58 /// the raw incoming bytes (pre-UTF-8 decode) and the decoded text.
59 /// Protects against memory exhaustion from a hostile or misbehaving
60 /// server that sends a single very large frame. Must be > 0.
61 /// Default: 1 MiB.
62 ///
63 /// **Memory residency note (bd:JMAP-6lsm.7):** because the raw byte
64 /// buffer and the decoded text buffer are tracked separately, the
65 /// in-flight footprint while parsing a single frame can momentarily
66 /// reach ~2 × `max_sse_frame` (raw bytes accumulated up to the limit,
67 /// plus decoded text not yet drained). If you tune this value for a
68 /// tight memory budget, plan for that 2× peak. The independent
69 /// tracking is correct for the streaming UTF-8 decoder (which needs
70 /// the raw buffer to be at least one full frame to handle split
71 /// multi-byte sequences across HTTP chunks); see `decode_utf8_chunk`
72 /// for the rationale.
73 pub max_sse_frame: usize,
74 /// Maximum byte length of a single WebSocket message (and frame). Mirrors
75 /// `max_sse_frame` for the WebSocket transport. Used by
76 /// [`JmapClient::connect_ws_session`] and threaded through to
77 /// `tokio_tungstenite::tungstenite::protocol::WebSocketConfig`'s
78 /// `max_message_size` and `max_frame_size`. Must be > 0.
79 /// Default: 1 MiB. (bd:JMAP-6lsm.5)
80 pub max_ws_message: usize,
81}
82
83impl Default for ClientConfig {
84 fn default() -> Self {
85 ClientConfig {
86 request_timeout: std::time::Duration::from_secs(30),
87 max_session_body: 1024 * 1024,
88 max_call_body: 8 * 1024 * 1024,
89 max_download_body: 64 * 1024 * 1024,
90 max_upload_response_body: 1024 * 1024,
91 max_sse_frame: 1024 * 1024,
92 max_ws_message: 1024 * 1024,
93 }
94 }
95}
96
97impl ClientConfig {
98 /// Validate that all config fields satisfy their constraints.
99 ///
100 /// Called automatically by [`JmapClient::new`]. Callers may also call
101 /// this directly to pre-validate a config before passing it to the
102 /// constructor.
103 ///
104 /// # Errors
105 ///
106 /// Returns [`ClientError::InvalidArgument`] when any field is zero or
107 /// out-of-range.
108 pub fn validate(&self) -> Result<(), ClientError> {
109 if self.max_session_body == 0 {
110 return Err(ClientError::InvalidArgument(
111 "ClientConfig.max_session_body must be > 0".into(),
112 ));
113 }
114 if self.max_call_body == 0 {
115 return Err(ClientError::InvalidArgument(
116 "ClientConfig.max_call_body must be > 0".into(),
117 ));
118 }
119 if self.max_download_body == 0 {
120 return Err(ClientError::InvalidArgument(
121 "ClientConfig.max_download_body must be > 0".into(),
122 ));
123 }
124 if self.max_upload_response_body == 0 {
125 return Err(ClientError::InvalidArgument(
126 "ClientConfig.max_upload_response_body must be > 0".into(),
127 ));
128 }
129 if self.request_timeout == std::time::Duration::ZERO {
130 return Err(ClientError::InvalidArgument(
131 "ClientConfig.request_timeout must be > 0; use Duration::from_secs(30) or similar"
132 .into(),
133 ));
134 }
135 if self.max_sse_frame == 0 {
136 return Err(ClientError::InvalidArgument(
137 "ClientConfig.max_sse_frame must be > 0".into(),
138 ));
139 }
140 if self.max_ws_message == 0 {
141 return Err(ClientError::InvalidArgument(
142 "ClientConfig.max_ws_message must be > 0".into(),
143 ));
144 }
145 Ok(())
146 }
147}
148
149/// Auth-agnostic JMAP base HTTP client.
150///
151/// Construct with [`JmapClient::new`] or [`JmapClient::new_plain`].
152/// Extension-specific clients (`jmap-chat-client`, `jmap-mail-client`) depend
153/// on this crate and add their method implementations via `impl JmapClient`.
154#[derive(Clone)]
155pub struct JmapClient {
156 pub(crate) base_url: url::Url,
157 pub(crate) auth: Arc<dyn AuthProvider>,
158 pub(crate) http: reqwest::Client,
159 pub(crate) config: ClientConfig,
160}
161
162impl std::fmt::Debug for JmapClient {
163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 f.debug_struct("JmapClient")
165 .field("base_url", &self.base_url)
166 .field("config", &self.config)
167 .finish_non_exhaustive()
168 }
169}
170
171impl JmapClient {
172 /// Create a new client.
173 ///
174 /// `transport` configures the underlying HTTP client (TLS trust roots,
175 /// client certificates, timeouts). `auth` injects per-request credentials
176 /// (Bearer token, Basic credentials, or none). The two are independent so
177 /// any transport can be paired with any credential scheme — for example,
178 /// `CustomCaTransport` with `BearerAuth`. `base_url` must be the server
179 /// origin (scheme, host, optional port) with no path, query, or fragment
180 /// — e.g. `"https://100.64.1.1:8008"`. Trailing slashes are normalized
181 /// away by the URL parser and are therefore accepted.
182 pub fn new(
183 transport: impl TransportConfig,
184 auth: impl AuthProvider + 'static,
185 base_url: &str,
186 config: ClientConfig,
187 ) -> Result<Self, ClientError> {
188 // Parse-don't-validate: parse_base_url returns a fully-validated
189 // url::Url so the constructor body can read as four facts (parse,
190 // validate config, build transport, construct Self) rather than
191 // a procedure (bd:JMAP-6lsm.26).
192 let parsed = parse_base_url(base_url)?;
193 config.validate()?;
194 let http = transport.build_client()?;
195 Ok(Self {
196 base_url: parsed,
197 auth: Arc::new(auth),
198 http,
199 config,
200 })
201 }
202
203 /// Convenience constructor for servers with publicly-trusted TLS.
204 ///
205 /// Equivalent to `JmapClient::new(DefaultTransport, auth, base_url, config)`.
206 /// Use [`JmapClient::new`] when you need a custom transport (e.g.
207 /// `CustomCaTransport` for a private-CA server).
208 pub fn new_plain(
209 auth: impl AuthProvider + 'static,
210 base_url: &str,
211 config: ClientConfig,
212 ) -> Result<Self, ClientError> {
213 Self::new(DefaultTransport, auth, base_url, config)
214 }
215
216 /// Apply the auth header (if any) to a request builder.
217 ///
218 /// Centralises the repeated `if let Some(...) = self.auth.auth_header()` pattern
219 /// that every HTTP method uses. Callers: `fetch_session`, `call`,
220 /// `subscribe_events`, `upload_blob`, `download_blob`.
221 pub(crate) fn inject_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
222 if let Some((name, value)) = self.auth.auth_header() {
223 builder.header(name, value)
224 } else {
225 builder
226 }
227 }
228
229 /// Returns `Err(ClientError::AuthFailed)` when the HTTP status indicates an
230 /// authentication or authorization failure.
231 ///
232 /// Specifically handles:
233 /// - 401 Unauthorized (RFC 7235 §3.1) — missing or invalid credentials
234 /// - 403 Forbidden (RFC 7235 §3.2) — credentials present but insufficient
235 ///
236 /// Called before reading the response body so callers can distinguish
237 /// permanent auth failures from transient errors without consuming the body.
238 pub(crate) fn check_auth_status(status: reqwest::StatusCode) -> Result<(), ClientError> {
239 if status == reqwest::StatusCode::UNAUTHORIZED || status == reqwest::StatusCode::FORBIDDEN {
240 Err(ClientError::AuthFailed(status.as_u16()))
241 } else {
242 Ok(())
243 }
244 }
245
246 /// Fetch the JMAP Session object from `{base_url}/.well-known/jmap` (RFC 8620 §2).
247 ///
248 /// The response body is capped at 1 MiB. Returns `ClientError::ResponseTooLarge`
249 /// if the server sends more. Session URL fields (`apiUrl`, `uploadUrl`,
250 /// `downloadUrl`, `eventSourceUrl`) are validated to have http/https scheme;
251 /// a non-http scheme returns `ClientError::InvalidSession`.
252 ///
253 /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
254 pub async fn fetch_session(&self) -> Result<Session, ClientError> {
255 let limit = self.config.max_session_body;
256 let url = self.base_url.join(".well-known/jmap").map_err(|e| {
257 // base_url was pre-validated in JmapClient::new; this path is
258 // unreachable in practice but must be handled for completeness.
259 ClientError::InvalidSession(format!("cannot construct session URL: {e}"))
260 })?;
261
262 let req = self.inject_auth(self.http.get(url).timeout(self.config.request_timeout));
263
264 let resp = {
265 let raw_resp = req.send().await.map_err(ClientError::from_reqwest)?;
266 Self::check_auth_status(raw_resp.status())?;
267 raw_resp
268 .error_for_status()
269 .map_err(ClientError::from_reqwest)?
270 };
271
272 // Enforce size cap before reading. Content-Length can lie, so we check
273 // both the header and the actual read size.
274 if let Some(len) = resp.content_length() {
275 if len > limit {
276 return Err(ClientError::ResponseTooLarge { actual: len, limit });
277 }
278 }
279 let bytes = resp.bytes().await.map_err(ClientError::from_reqwest)?;
280 if bytes.len() as u64 > limit {
281 return Err(ClientError::ResponseTooLarge {
282 actual: bytes.len() as u64,
283 limit,
284 });
285 }
286
287 let session: Session = serde_json::from_slice(&bytes).map_err(ClientError::Parse)?;
288
289 validate_session_url_schemes(&session)?;
290
291 Ok(session)
292 }
293
294 /// POST a [`jmap_types::JmapRequest`] to `api_url` and return the parsed [`jmap_types::JmapResponse`]
295 /// (RFC 8620 §3.3).
296 ///
297 /// `api_url` is taken as an explicit parameter (not from `self`) because the
298 /// caller holds a [`Session`] and selects the correct URL from it.
299 ///
300 /// The response body is capped at 8 MiB. Returns `ClientError::ResponseTooLarge`
301 /// if the server sends more.
302 ///
303 /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
304 pub async fn call(
305 &self,
306 api_url: &str,
307 req: &jmap_types::JmapRequest,
308 ) -> Result<jmap_types::JmapResponse, ClientError> {
309 require_http_url(api_url)?;
310 let limit = self.config.max_call_body;
311
312 let builder = self.inject_auth(
313 self.http
314 .post(api_url)
315 .json(req)
316 .timeout(self.config.request_timeout),
317 );
318
319 let resp = {
320 let raw_resp = builder.send().await.map_err(ClientError::from_reqwest)?;
321 Self::check_auth_status(raw_resp.status())?;
322 raw_resp
323 .error_for_status()
324 .map_err(ClientError::from_reqwest)?
325 };
326
327 // Enforce size cap before reading.
328 if let Some(len) = resp.content_length() {
329 if len > limit {
330 return Err(ClientError::ResponseTooLarge { actual: len, limit });
331 }
332 }
333 let bytes = resp.bytes().await.map_err(ClientError::from_reqwest)?;
334 if bytes.len() as u64 > limit {
335 return Err(ClientError::ResponseTooLarge {
336 actual: bytes.len() as u64,
337 limit,
338 });
339 }
340
341 let jmap_resp: jmap_types::JmapResponse =
342 serde_json::from_slice(&bytes).map_err(ClientError::Parse)?;
343
344 Ok(jmap_resp)
345 }
346
347 /// Open an SSE connection to `event_source_url` and return an async stream
348 /// of parsed [`SseFrame`]s (RFC 8620 §7.3).
349 ///
350 /// # URI template expansion
351 ///
352 /// `Session.event_source_url` is a URI template (RFC 6570 Level-1) with
353 /// variables `types`, `closeafter`, and `ping`. You **must** expand it
354 /// before passing it to this function, or the server will receive the
355 /// literal text `{types}` in the URL and return an error. Use
356 /// [`expand_url_template`](crate::expand_url_template):
357 ///
358 /// ```rust,ignore
359 /// let url = jmap_base_client::expand_url_template(
360 /// &session.event_source_url,
361 /// &[("types", "*"), ("closeafter", "no"), ("ping", "0")],
362 /// )?;
363 /// let stream = client.subscribe_events(&url, None).await?;
364 /// ```
365 ///
366 /// If `last_event_id` is `Some`, sends a `Last-Event-ID` header so the
367 /// server can resume from where the previous stream left off.
368 ///
369 /// Buffer growth is capped at [`ClientConfig::max_sse_frame`] bytes per
370 /// frame (default: 1 MiB). If a single SSE frame exceeds this limit the
371 /// stream yields `ClientError::SseFrameTooLarge` and terminates.
372 ///
373 /// No timeout is applied to this call or to the resulting stream. The
374 /// connect timeout (10 s, TCP only) is the only deadline enforced. If the
375 /// server stalls before sending HTTP response headers, or later goes silent
376 /// on the open connection, this call or the stream will hang indefinitely.
377 /// Wrap the entire call and/or stream iteration in [`tokio::time::timeout`]
378 /// if you need to bound either phase.
379 ///
380 /// Returns `ClientError::AuthFailed` on HTTP 401 or 403 before the stream
381 /// starts.
382 pub async fn subscribe_events(
383 &self,
384 event_source_url: &str,
385 last_event_id: Option<&str>,
386 ) -> Result<futures::stream::BoxStream<'static, Result<SseFrame, ClientError>>, ClientError>
387 {
388 require_http_url(event_source_url)?;
389 let mut req = self
390 .http
391 .get(event_source_url)
392 .header("Accept", "text/event-stream");
393 if let Some(id) = last_event_id {
394 req = req.header("Last-Event-ID", id);
395 }
396 let req = self.inject_auth(req);
397
398 let resp = req.send().await.map_err(ClientError::from_reqwest)?;
399 Self::check_auth_status(resp.status())?;
400 let resp = resp.error_for_status().map_err(ClientError::from_reqwest)?;
401
402 // Verify Content-Type before streaming. A misconfigured server returning
403 // application/json would silently produce no events (no SSE delimiter found).
404 {
405 // to_ascii_lowercase(): media types are case-insensitive per RFC 7231 §3.1.1.1.
406 let ct = resp
407 .headers()
408 .get(reqwest::header::CONTENT_TYPE)
409 .and_then(|v| v.to_str().ok())
410 .unwrap_or("")
411 .to_ascii_lowercase();
412 // RFC 7231 §3.1.1.1 / RFC 9110 §8.3: the media-type "essence" (type
413 // + "/" + subtype) is bounded by ';', SP, HTAB, or end-of-string.
414 // A naive starts_with("text/event-stream") would accept
415 // "text/event-streamish" or "text/event-stream2" — exactly the bug
416 // JMAP-6lsm.2 flagged. Split off the parameter list / trailing
417 // whitespace and compare the essence exactly.
418 let essence = ct
419 .split(|c: char| c == ';' || c.is_whitespace())
420 .next()
421 .unwrap_or("");
422 if essence != "text/event-stream" {
423 return Err(ClientError::UnexpectedResponse(format!(
424 "subscribe_events: expected Content-Type text/event-stream, got: {ct:?}"
425 )));
426 }
427 }
428
429 let byte_stream = resp.bytes_stream();
430 let sse_frame_limit = self.config.max_sse_frame;
431
432 Ok(futures::stream::unfold(
433 Some(SseStreamState {
434 stream: byte_stream,
435 raw_buf: Vec::new(),
436 buf: String::new(),
437 scan_from: 0, // invariant: valid UTF-8 char boundary of buf; 0 always satisfies this
438 }),
439 move |state| async move {
440 let SseStreamState {
441 mut stream,
442 mut raw_buf,
443 mut buf,
444 mut scan_from,
445 } = state?;
446 loop {
447 // Search for any double-newline delimiter (LF/CRLF/CR variants).
448 // scan_from is set to old_len.saturating_sub(3) after each append
449 // so we only re-scan the overlap region. 3 bytes back is the
450 // minimum that covers all delimiter prefixes that can straddle a
451 // chunk boundary:
452 // - `\r\n\r\n` (4 bytes): longest prefix that fits in one chunk
453 // but is incomplete is `\r\n\r` (3 bytes) — exactly covered.
454 // - `\n\r\n` (3 bytes): longest incomplete prefix is `\n\r` (2
455 // bytes) — covered by the 3-byte overlap.
456 // - `\n\n` and `\r\r` (2 bytes each): longest incomplete prefix
457 // is 1 byte — covered by the 3-byte overlap.
458 // Since \r and \n are single-byte UTF-8 codepoints, 3 bytes back
459 // is always a valid char boundary — no adjustment needed.
460 let frame_end = [
461 buf[scan_from..]
462 .find("\r\n\r\n")
463 .map(|p| (scan_from + p, 4usize)),
464 buf[scan_from..]
465 .find("\n\n")
466 .map(|p| (scan_from + p, 2usize)),
467 buf[scan_from..]
468 .find("\r\r")
469 .map(|p| (scan_from + p, 2usize)),
470 // Mixed: LF-terminated last field line followed by a
471 // CRLF-terminated blank line. Not detected by \n\n (the
472 // LFs are separated by a CR) or \r\n\r\n (no leading \r\n).
473 buf[scan_from..]
474 .find("\n\r\n")
475 .map(|p| (scan_from + p, 3usize)),
476 ]
477 .into_iter()
478 .flatten()
479 .min_by_key(|&(pos, _)| pos);
480
481 if let Some((pos, delim_len)) = frame_end {
482 let frame = {
483 let slice = &buf[..pos];
484 if slice.contains('\r') {
485 slice.replace("\r\n", "\n").replace('\r', "\n")
486 } else {
487 slice.to_owned()
488 }
489 };
490 buf.drain(..pos + delim_len);
491 scan_from = 0; // 0 satisfies the UTF-8 char boundary invariant
492 let sse_frame = parse_sse_block(&frame);
493 return Some((
494 Ok(sse_frame),
495 Some(SseStreamState {
496 stream,
497 raw_buf,
498 buf,
499 scan_from,
500 }),
501 ));
502 }
503
504 match stream.next().await {
505 None => return None,
506 Some(Err(e)) => {
507 return Some((Err(ClientError::from_reqwest(e)), None));
508 }
509 Some(Ok(bytes)) => {
510 // Accumulate raw bytes first. A multi-byte UTF-8 codepoint
511 // may be split across adjacent HTTP chunks; decode only the
512 // valid prefix and leave the remainder in raw_buf until the
513 // next chunk completes the sequence.
514 raw_buf.extend_from_slice(&bytes);
515 // Cap raw_buf to prevent OOM on persistent invalid UTF-8 input.
516 // Use the same limit as the decoded buf cap.
517 if raw_buf.len() > sse_frame_limit {
518 return Some((
519 Err(ClientError::SseFrameTooLarge {
520 limit: sse_frame_limit,
521 }),
522 None,
523 ));
524 }
525 let old_len = buf.len();
526 decode_utf8_chunk(&mut raw_buf, &mut buf);
527 scan_from = old_len.saturating_sub(3);
528 // Walk backward to a valid UTF-8 char boundary so that
529 // buf[scan_from..] never panics on multibyte characters.
530 while scan_from > 0 && !buf.is_char_boundary(scan_from) {
531 scan_from -= 1;
532 }
533 // Guard against unbounded buffer growth from a hostile server.
534 // Yield the error and terminate (state = None).
535 if buf.len() > sse_frame_limit {
536 return Some((
537 Err(ClientError::SseFrameTooLarge {
538 limit: sse_frame_limit,
539 }),
540 None,
541 ));
542 }
543 }
544 }
545 }
546 },
547 )
548 .boxed())
549 }
550
551 /// Open a WebSocket connection to `ws_url` using this client's
552 /// configured `max_ws_message` byte cap.
553 ///
554 /// Convenience wrapper around [`crate::ws::connect_ws_with_limit`] that
555 /// passes [`ClientConfig::max_ws_message`] as the per-message /
556 /// per-frame byte cap. Mirrors the [`Self::subscribe_events`]
557 /// pattern of "the JmapClient method uses ClientConfig; the free
558 /// function takes an explicit value".
559 ///
560 /// `ws_url` must come from the session document's WebSocket capability
561 /// URL. `auth_header` is an optional `(name, value)` pair for the
562 /// upgrade request; the auth provider on this client is NOT used here
563 /// because some servers attach WebSocket auth via cookie or session
564 /// header rather than the same scheme as HTTP requests.
565 ///
566 /// # Security
567 ///
568 /// The `auth_header` value is a credential and must not be logged or
569 /// echoed back to other systems. Treat it with the same care as a
570 /// [`crate::auth::BearerAuth`] token. Transport errors raised by this
571 /// method are constructed without the original credential bytes, but
572 /// downstream code that inspects [`ClientError`] should still avoid
573 /// printing or storing the `auth_header` itself.
574 ///
575 /// Returns [`ClientError::InvalidArgument`] for non-`ws://`/`wss://` URLs.
576 /// See [`crate::ws::connect_ws_with_limit`] for full error semantics.
577 pub async fn connect_ws_session(
578 &self,
579 ws_url: &str,
580 auth_header: Option<(&str, &str)>,
581 ) -> Result<crate::ws::WsSession, ClientError> {
582 crate::ws::connect_ws_with_limit(ws_url, auth_header, self.config.max_ws_message).await
583 }
584}
585
586/// Find the method response matching `call_id` in `resp` and deserialize its
587/// arguments into `T`.
588///
589/// Returns [`ClientError::MethodNotFound`] if no invocation with the given
590/// call_id exists. Returns [`ClientError::MethodError`] if any invocation
591/// with the matching call_id is a JMAP `"error"` response (RFC 8620 §3.6.1).
592///
593/// # Multiple invocations sharing a call_id
594///
595/// Per RFC 8620 §3.2, a single method call may produce multiple invocations
596/// in the response — for example, `Foo/copy` with `onSuccessDestroyOriginal:
597/// true` produces both a `Foo/copy` and an implicit `Foo/set` invocation,
598/// both stamped with the same call_id (RFC 8620 §5.8 example, lines 3158–
599/// 3180). This function handles that case by:
600///
601/// 1. **Errors take precedence.** If any invocation matching `call_id`
602/// has method name `"error"`, this function returns that error. A
603/// success response cannot mask a sibling error response with the same
604/// call_id — silently returning the success while the server reported
605/// failure would be data loss for the caller.
606/// 2. Otherwise, the **first** non-error invocation matching `call_id`
607/// is deserialized into `T`. In the §5.8 implicit-method case both
608/// invocations are successes; the first is the primary response and
609/// is what the caller wants.
610///
611/// This function is `pub` so extension crates (`jmap-chat-client`,
612/// `jmap-mail-client`) can use it to extract typed results from a
613/// [`jmap_types::JmapResponse`] without depending on internal details.
614pub fn extract_response<T: serde::de::DeserializeOwned>(
615 resp: &jmap_types::JmapResponse,
616 call_id: &str,
617) -> Result<T, ClientError> {
618 // Invocation is a type alias (String, Value, String) = (method, args, call_id).
619 // Two-pass scan: first look for any error invocation with this call_id
620 // (errors take precedence per §3.6.1 — see the doc-comment); then fall
621 // through to the first non-error invocation.
622 let mut first_success: Option<&jmap_types::Invocation> = None;
623 for inv in resp.method_responses.iter().filter(|inv| inv.2 == call_id) {
624 if inv.0 == "error" {
625 let args = &inv.1;
626 let err_type = args
627 .get("type")
628 .and_then(|v| v.as_str())
629 .unwrap_or("serverError") // safe: fallback literal, not user input
630 .to_owned();
631 let description = args
632 .get("description")
633 .and_then(|v| v.as_str())
634 .map(str::to_owned);
635 return Err(ClientError::MethodError {
636 error_type: err_type,
637 description,
638 });
639 }
640 if first_success.is_none() {
641 first_success = Some(inv);
642 }
643 }
644 let inv = first_success.ok_or_else(|| ClientError::MethodNotFound(call_id.to_owned()))?;
645 <T as serde::Deserialize>::deserialize(&inv.1).map_err(ClientError::Parse)
646}
647
648/// Decode as much valid UTF-8 as possible from `raw` into `buf`, draining
649/// consumed (or definitively-invalid) bytes from `raw`.
650///
651/// Three cases:
652/// - `raw` is fully valid UTF-8: pushed entirely to `buf`, `raw` cleared.
653/// - `raw` ends with an **incomplete** multi-byte sequence (`error_len == None`):
654/// the valid prefix is pushed to `buf` and drained from `raw`; the incomplete
655/// head bytes stay in `raw` for the next chunk to complete them.
656/// - `raw` contains a **definitively invalid** byte sequence (`error_len == Some(n)`):
657/// the valid prefix is pushed to `buf`; the valid prefix AND the `n` invalid
658/// bytes are drained from `raw` so they do not accumulate.
659///
660/// The caller is responsible for capping `raw.len()` before calling this
661/// function; unbounded growth from a hostile server that never completes a
662/// sequence is prevented by the 1 MiB `SSE_BUF_SIZE_LIMIT` check in
663/// `subscribe_events`.
664fn decode_utf8_chunk(raw: &mut Vec<u8>, buf: &mut String) {
665 match std::str::from_utf8(raw) {
666 Ok(s) => {
667 buf.push_str(s);
668 raw.clear();
669 }
670 Err(e) => {
671 let valid_up_to = e.valid_up_to();
672 // valid_up_to is always a char boundary by definition.
673 buf.push_str(
674 std::str::from_utf8(&raw[..valid_up_to])
675 .expect("valid_up_to is a valid UTF-8 boundary"),
676 );
677 match e.error_len() {
678 Some(n) => {
679 // Definitively invalid: drain the valid prefix AND the
680 // invalid bytes so they do not accumulate in raw.
681 let drain_end = (valid_up_to + n).min(raw.len());
682 raw.drain(..drain_end);
683 }
684 None => {
685 // Incomplete multi-byte sequence: drain only the valid
686 // prefix. The incomplete head stays in raw until the next
687 // chunk arrives with the missing continuation bytes.
688 raw.drain(..valid_up_to);
689 }
690 }
691 }
692 }
693}
694
695/// Extract the URL scheme as a borrowed slice of `url`.
696///
697/// Returns the prefix before `"://"`. The returned slice is in the
698/// *original* case of the input (callers must use [`str::eq_ignore_ascii_case`]
699/// for the comparison per RFC 3986 §3.1, which says only schemes are
700/// case-insensitive). Returns `None` if `"://"` is not present in `url`.
701/// URL templates containing `{variable}` syntax are handled correctly
702/// because the extraction is a prefix scan, not a full URL parse.
703///
704/// Returning a borrowed slice rather than a `to_ascii_lowercase()` String
705/// avoids allocating on every scheme check; previously each call to
706/// `url_scheme` produced a fresh String the size of the URL prefix
707/// (bd:JMAP-6lsm.10).
708fn url_scheme(url: &str) -> Option<&str> {
709 url.split_once("://").map(|(scheme, _)| scheme)
710}
711
712/// `true` if `url`'s scheme prefix is `http` or `https` (case-insensitive,
713/// RFC 3986 §3.1).
714fn is_http_or_https(url: &str) -> bool {
715 url_scheme(url)
716 .is_some_and(|s| s.eq_ignore_ascii_case("http") || s.eq_ignore_ascii_case("https"))
717}
718
719/// Parse and validate a JMAP base URL.
720///
721/// The input must be:
722/// - non-empty
723/// - syntactically valid (parses with [`url::Url::parse`])
724/// - http or https scheme (case-insensitive per RFC 3986 §3.1)
725/// - origin-only: no explicit path component, no query string, no fragment.
726/// "Origin" here means scheme + host + optional port — the form a JMAP
727/// server is identified by. Trailing slashes are accepted (the URL
728/// parser normalizes them away).
729///
730/// Examples that PASS: `https://jmap.example.com`,
731/// `https://jmap.example.com/`, `https://10.0.0.1:8008`,
732/// `http://localhost:8080`.
733///
734/// Examples that FAIL: `""`, `https://example.com/api`,
735/// `https://example.com?query=1`, `https://example.com#fragment`,
736/// `ftp://example.com`.
737///
738/// Extracted from [`JmapClient::new`] so the constructor reads as
739/// 'parse-don't-validate' rather than a six-step inline procedure
740/// (bd:JMAP-6lsm.26).
741fn parse_base_url(base_url: &str) -> Result<url::Url, ClientError> {
742 if base_url.is_empty() {
743 return Err(ClientError::InvalidArgument(
744 "base_url may not be empty".into(),
745 ));
746 }
747 let parsed = url::Url::parse(base_url)
748 .map_err(|e| ClientError::InvalidArgument(format!("base_url is not a valid URL: {e}")))?;
749 let scheme = parsed.scheme();
750 if scheme != "http" && scheme != "https" {
751 return Err(ClientError::InvalidArgument(format!(
752 "base_url scheme must be http or https, got: {scheme:?}"
753 )));
754 }
755 let path = parsed.path();
756 // url::Url::path() returns "/" for root-only URLs (no path segments);
757 // any value other than "/" means the URL contains an explicit path component.
758 if path != "/" {
759 return Err(ClientError::InvalidArgument(format!(
760 "base_url must not have a path component, got: {path:?}"
761 )));
762 }
763 if parsed.query().is_some() {
764 return Err(ClientError::InvalidArgument(
765 "base_url must not have a query string".into(),
766 ));
767 }
768 if parsed.fragment().is_some() {
769 return Err(ClientError::InvalidArgument(
770 "base_url must not have a fragment".into(),
771 ));
772 }
773 Ok(parsed)
774}
775
776/// Validate that `url` uses an http or https scheme.
777///
778/// Called at the top of each public method that accepts a URL parameter
779/// (`call`, `subscribe_events`, `upload_blob`, `download_blob`). This is a
780/// defense-in-depth check: the primary protection is
781/// [`validate_session_url_schemes`] which rejects bad URLs in the Session
782/// document at fetch time. This check makes each individual call site
783/// self-defending against accidentally passing a non-http URL (e.g. from a
784/// test fixture or a misused API).
785///
786/// Returns [`ClientError::InvalidArgument`] if the scheme is not http/https.
787pub(crate) fn require_http_url(url: &str) -> Result<(), ClientError> {
788 if !is_http_or_https(url) {
789 return Err(ClientError::InvalidArgument(format!(
790 "URL must have http or https scheme, got: {url:?}"
791 )));
792 }
793 Ok(())
794}
795
796/// Validate the *schemes only* of the four session URL fields.
797///
798/// Three of the four (`upload_url`, `download_url`, `event_source_url`) are
799/// RFC 6570 URI templates carrying `{accountId}`, `{blobId}`, `{types}`,
800/// etc. They cannot be fully parsed as URLs without first being expanded
801/// with the relevant variables, but the scheme prefix is always concrete
802/// (templates put the scheme on the left of `://`). This function does the
803/// minimal check that the scheme prefix is `http://` or `https://` — it
804/// does NOT verify that the templates carry the required variables, that
805/// the host is well-formed, or that the path is reachable.
806///
807/// Renamed from `validate_session_urls` for accuracy (bd:JMAP-6lsm.23):
808/// the name implied stronger validation than the function actually
809/// performs.
810fn validate_session_url_schemes(session: &Session) -> Result<(), ClientError> {
811 for url in [
812 &session.api_url,
813 &session.upload_url,
814 &session.download_url,
815 &session.event_source_url,
816 ] {
817 if !is_http_or_https(url) {
818 return Err(ClientError::InvalidSession(format!(
819 "session URL has non-http/https scheme: {url:?}"
820 )));
821 }
822 }
823 Ok(())
824}
825
826// ---------------------------------------------------------------------------
827// Tests
828// ---------------------------------------------------------------------------
829
830#[cfg(test)]
831mod tests {
832 use super::decode_utf8_chunk;
833
834 /// Oracle: all-ASCII bytes pushed to buf; raw cleared.
835 #[test]
836 fn decode_utf8_chunk_all_ascii() {
837 let mut raw = b"hello".to_vec();
838 let mut buf = String::new();
839 decode_utf8_chunk(&mut raw, &mut buf);
840 assert_eq!(buf, "hello");
841 assert!(raw.is_empty());
842 }
843
844 /// Oracle: complete multi-byte codepoint (U+00E9 café = 0xC3 0xA9) pushed fully.
845 #[test]
846 fn decode_utf8_chunk_complete_multibyte() {
847 let mut raw = "café".as_bytes().to_vec();
848 let mut buf = String::new();
849 decode_utf8_chunk(&mut raw, &mut buf);
850 assert_eq!(buf, "café");
851 assert!(raw.is_empty());
852 }
853
854 /// Oracle: first byte of a 2-byte sequence (U+00E9 = 0xC3 0xA9) arrives alone.
855 /// error_len == None (incomplete) — the byte must be RETAINED in raw, not dropped.
856 /// Regression test for the bug where valid_up_to.max(1) discarded this byte.
857 #[test]
858 fn decode_utf8_chunk_incomplete_head_retained() {
859 let mut raw = vec![0xC3u8]; // first byte of 2-byte sequence
860 let mut buf = String::new();
861 decode_utf8_chunk(&mut raw, &mut buf);
862 assert_eq!(buf, "", "no complete codepoints to push");
863 assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
864 }
865
866 /// Oracle: valid ASCII prefix then first byte of a 2-byte sequence.
867 /// Valid prefix goes to buf; incomplete head stays in raw.
868 #[test]
869 fn decode_utf8_chunk_prefix_then_incomplete_head() {
870 let mut raw = vec![b'a', b'b', 0xC3u8];
871 let mut buf = String::new();
872 decode_utf8_chunk(&mut raw, &mut buf);
873 assert_eq!(buf, "ab");
874 assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
875 }
876
877 /// Oracle: two-call simulation — incomplete sequence completed on second call.
878 /// This is the exact HTTP chunk-split scenario the fix targets.
879 #[test]
880 fn decode_utf8_chunk_split_sequence_completed() {
881 // Chunk 1: only the first byte of U+00E9 (é = 0xC3 0xA9)
882 let mut raw = vec![0xC3u8];
883 let mut buf = String::new();
884 decode_utf8_chunk(&mut raw, &mut buf);
885 assert_eq!(raw, vec![0xC3u8], "incomplete head retained after chunk 1");
886
887 // Chunk 2: completion byte
888 raw.push(0xA9u8);
889 decode_utf8_chunk(&mut raw, &mut buf);
890 assert_eq!(buf, "é", "character fully decoded after chunk 2");
891 assert!(raw.is_empty());
892 }
893
894 /// Oracle: definitively invalid byte (0xFF is never valid in UTF-8) is drained.
895 #[test]
896 fn decode_utf8_chunk_invalid_byte_drained() {
897 let mut raw = vec![0xFFu8];
898 let mut buf = String::new();
899 decode_utf8_chunk(&mut raw, &mut buf);
900 assert_eq!(buf, "");
901 assert!(raw.is_empty(), "definitively invalid byte must be drained");
902 }
903
904 /// Oracle: valid prefix then definitively invalid byte — prefix pushed and both drained.
905 #[test]
906 fn decode_utf8_chunk_prefix_then_invalid_drained() {
907 let mut raw = vec![b'a', b'b', 0xFFu8];
908 let mut buf = String::new();
909 decode_utf8_chunk(&mut raw, &mut buf);
910 assert_eq!(buf, "ab");
911 assert!(raw.is_empty(), "prefix and invalid byte must be drained");
912 }
913}