jmap_base_client/client.rs
1//! Auth-agnostic base JMAP HTTP client (RFC 8620).
2//!
3//! Provides [`JmapClient`] for session fetch, API calls, blob transfer,
4//! SSE event streaming, and [`extract_response`] for parsing method results.
5
6use std::sync::Arc;
7
8use futures::StreamExt;
9
10use crate::auth::{AuthProvider, DefaultTransport, TransportConfig};
11use crate::error::ClientError;
12use crate::request::Session;
13use crate::sse::{parse_sse_block, SseFrame};
14
15/// Internal state threaded through the `subscribe_events` unfold loop.
16struct SseStreamState<S> {
17 stream: S,
18 /// Accumulates raw bytes from the HTTP stream before UTF-8 decoding.
19 /// Incomplete multi-byte sequences remain here until the next chunk
20 /// completes them, preventing stream termination when a codepoint is
21 /// split across adjacent chunks.
22 raw_buf: Vec<u8>,
23 buf: String,
24 /// Byte offset from which the next delimiter scan begins.
25 /// Must always be a valid UTF-8 char boundary of `buf`.
26 scan_from: usize,
27}
28
29/// Per-client configuration for timeouts and body size limits.
30///
31/// Use [`ClientConfig::default()`] for production defaults (30s timeout, RFC-safe caps).
32///
33/// This type is `#[non_exhaustive]`: callers outside this crate must use
34/// `..ClientConfig::default()` when constructing it, allowing new fields to
35/// be added in minor versions without breaking callers.
36///
37/// # Field-type rationale (bd:JMAP-6r7c.21)
38///
39/// The fields mix `u64` and `usize` integer types. The split is deliberate
40/// and tracks the underlying transport's expectations:
41///
42/// - **HTTP body caps are `u64`** (`max_session_body`, `max_call_body`,
43/// `max_download_body`, `max_upload_response_body`). `reqwest` reports
44/// `Content-Length` as `u64`, and an HTTP body can in principle exceed
45/// `usize::MAX` on a 32-bit target (4 GiB) without exceeding 64-bit
46/// limits; the cap comparison happens before accumulation.
47/// - **In-memory frame/message caps are `usize`** (`max_sse_frame`,
48/// `max_ws_message`). These bound a `Vec<u8>` or `String` that must
49/// fit in process memory; `usize` is the address-space-native type
50/// and matches what `tokio_tungstenite::tungstenite::protocol::WebSocketConfig`
51/// expects.
52///
53/// A future minor release MAY consolidate on `u64` with internal
54/// `usize::try_from` casts at the call sites that need it. The mixed-int
55/// API is a small ergonomic cost (callers cannot pass the same untyped
56/// integer literal to both kinds of field without a `_u64` or `_usize`
57/// suffix or `as usize` cast) traded for transparency to the underlying
58/// transport's contract.
59///
60/// # No cross-field invariants are enforced
61///
62/// `validate()` checks each field independently. There is intentionally
63/// no constraint that `max_sse_frame <= max_call_body` or similar — they
64/// govern different threats (per-frame buffer cap vs whole-body cap)
65/// and a deployment may rationally configure them in either order.
66/// Document the chosen values in your own deployment notes.
67#[non_exhaustive]
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct ClientConfig {
70 /// Timeout for HTTP request/response cycles (fetch_session, call, upload_blob, download_blob).
71 /// Does NOT apply to SSE or WebSocket streams (which are indefinite by nature).
72 /// Must be > 0. Use `Duration::from_secs(30)` for a 30-second timeout.
73 /// Default: 30 seconds.
74 ///
75 /// `Duration::ZERO` is forbidden because `reqwest::RequestBuilder::timeout`
76 /// treats `Duration::ZERO` as "no per-request timeout" (only the
77 /// client-level `connect_timeout` applies), not "instant fail". A future
78 /// maintainer who thinks "zero means disable timeout, why not allow it?"
79 /// is reading reqwest's semantics correctly but missing that **this
80 /// crate intentionally forbids the no-timeout configuration**: a
81 /// caller-visible 30-second-by-default timeout protects against
82 /// indefinite hangs on stalled servers and against a slowloris-style
83 /// resource leak in JMAP clients that hold one outstanding request per
84 /// account. `validate()` enforces the positive-timeout invariant
85 /// (bd:JMAP-6r7c.29). Do not relax this without re-deriving the
86 /// no-timeout DoS argument.
87 pub request_timeout: std::time::Duration,
88 /// Maximum response body for fetch_session. Default: 1 MiB.
89 pub max_session_body: u64,
90 /// Maximum response body for call(). Default: 8 MiB.
91 pub max_call_body: u64,
92 /// Maximum response body for download_blob(). Default: 64 MiB.
93 pub max_download_body: u64,
94 /// Maximum size in bytes of the JSON response body returned by the
95 /// server in reply to `upload_blob()`. Does NOT cap the size of the
96 /// blob being uploaded — the JMAP server enforces that via its
97 /// `maxSizeUpload` capability. This field caps only the small JSON
98 /// envelope the server returns describing the stored blob.
99 /// Default: 1 MiB.
100 pub max_upload_response_body: u64,
101 /// Maximum byte length of a single SSE frame; applied independently to
102 /// the raw incoming bytes (pre-UTF-8 decode) and the decoded text.
103 /// Protects against memory exhaustion from a hostile or misbehaving
104 /// server that sends a single very large frame. Must be > 0.
105 /// Default: 1 MiB.
106 ///
107 /// **Memory residency note (bd:JMAP-6lsm.7):** because the raw byte
108 /// buffer and the decoded text buffer are tracked separately, the
109 /// in-flight footprint while parsing a single frame can momentarily
110 /// reach ~2 × `max_sse_frame` (raw bytes accumulated up to the limit,
111 /// plus decoded text not yet drained). If you tune this value for a
112 /// tight memory budget, plan for that 2× peak. The independent
113 /// tracking is correct for the streaming UTF-8 decoder (which needs
114 /// the raw buffer to be at least one full frame to handle split
115 /// multi-byte sequences across HTTP chunks); see `decode_utf8_chunk`
116 /// for the rationale.
117 pub max_sse_frame: usize,
118 /// Maximum byte length of a single WebSocket message (and frame). Mirrors
119 /// `max_sse_frame` for the WebSocket transport. Used by
120 /// [`JmapClient::connect_ws_session`] and threaded through to
121 /// `tokio_tungstenite::tungstenite::protocol::WebSocketConfig`'s
122 /// `max_message_size` and `max_frame_size`. Must be > 0.
123 /// Default: 1 MiB. (bd:JMAP-6lsm.5)
124 pub max_ws_message: usize,
125}
126
127impl Default for ClientConfig {
128 fn default() -> Self {
129 ClientConfig {
130 request_timeout: std::time::Duration::from_secs(30),
131 max_session_body: 1024 * 1024,
132 max_call_body: 8 * 1024 * 1024,
133 max_download_body: 64 * 1024 * 1024,
134 max_upload_response_body: 1024 * 1024,
135 max_sse_frame: 1024 * 1024,
136 max_ws_message: 1024 * 1024,
137 }
138 }
139}
140
141impl ClientConfig {
142 /// Validate that all config fields satisfy their constraints.
143 ///
144 /// Called automatically by [`JmapClient::new`]. Callers may also call
145 /// this directly to pre-validate a config before passing it to the
146 /// constructor.
147 ///
148 /// # Errors
149 ///
150 /// Returns [`ClientError::InvalidArgument`] when any field is zero or
151 /// out-of-range.
152 pub fn validate(&self) -> Result<(), ClientError> {
153 if self.max_session_body == 0 {
154 return Err(ClientError::InvalidArgument(
155 "ClientConfig.max_session_body must be > 0".into(),
156 ));
157 }
158 if self.max_call_body == 0 {
159 return Err(ClientError::InvalidArgument(
160 "ClientConfig.max_call_body must be > 0".into(),
161 ));
162 }
163 if self.max_download_body == 0 {
164 return Err(ClientError::InvalidArgument(
165 "ClientConfig.max_download_body must be > 0".into(),
166 ));
167 }
168 if self.max_upload_response_body == 0 {
169 return Err(ClientError::InvalidArgument(
170 "ClientConfig.max_upload_response_body must be > 0".into(),
171 ));
172 }
173 if self.request_timeout == std::time::Duration::ZERO {
174 // Duration::ZERO would let reqwest run requests without a
175 // per-request timeout (reqwest treats ZERO as "no timeout").
176 // Reject explicitly — see ClientConfig::request_timeout
177 // rustdoc for the DoS-resistance rationale (bd:JMAP-6r7c.29).
178 return Err(ClientError::InvalidArgument(
179 "ClientConfig.request_timeout must be > 0; Duration::ZERO would disable the per-request timeout in reqwest, not 'fail immediately'. Use Duration::from_secs(30) or similar."
180 .into(),
181 ));
182 }
183 if self.max_sse_frame == 0 {
184 return Err(ClientError::InvalidArgument(
185 "ClientConfig.max_sse_frame must be > 0".into(),
186 ));
187 }
188 if self.max_ws_message == 0 {
189 return Err(ClientError::InvalidArgument(
190 "ClientConfig.max_ws_message must be > 0".into(),
191 ));
192 }
193 Ok(())
194 }
195}
196
197/// Auth-agnostic JMAP base HTTP client.
198///
199/// Construct with [`JmapClient::new`] or [`JmapClient::new_plain`].
200/// Extension-specific clients (`jmap-chat-client`, `jmap-mail-client`) depend
201/// on this crate and add their method implementations via `impl JmapClient`.
202///
203/// # Thread-safety (bd:JMAP-6r7c.25)
204///
205/// `JmapClient` is `Send + Sync + Clone`. Share by clone across threads or
206/// `tokio::spawn` tasks; the underlying `reqwest::Client` is reference-counted
207/// (Arc-backed) and the [`AuthProvider`] trait requires `Send + Sync` on every
208/// implementation. A compile-time assertion in this crate's test suite pins
209/// the `Send + Sync` contract: a future refactor that adds a non-`Sync` field
210/// (e.g. `Rc<T>`, `RefCell<T>`, `Cell<T>`) would break the assertion in CI
211/// before any downstream consumer is exposed.
212#[derive(Clone)]
213pub struct JmapClient {
214 pub(crate) base_url: url::Url,
215 pub(crate) auth: Arc<dyn AuthProvider>,
216 pub(crate) http: reqwest::Client,
217 pub(crate) config: ClientConfig,
218}
219
220impl std::fmt::Debug for JmapClient {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 f.debug_struct("JmapClient")
223 .field("base_url", &self.base_url)
224 .field("config", &self.config)
225 .finish_non_exhaustive()
226 }
227}
228
229impl JmapClient {
230 /// Create a new client.
231 ///
232 /// `transport` configures the underlying HTTP client (TLS trust roots,
233 /// client certificates, timeouts). `auth` injects per-request credentials
234 /// (Bearer token, Basic credentials, or none). The two are independent so
235 /// any transport can be paired with any credential scheme — for example,
236 /// `CustomCaTransport` with `BearerAuth`. `base_url` must be the server
237 /// origin (scheme, host, optional port) with no path, query, or fragment
238 /// — e.g. `"https://100.64.1.1:8008"`. Trailing slashes are normalized
239 /// away by the URL parser and are therefore accepted.
240 pub fn new(
241 transport: impl TransportConfig,
242 auth: impl AuthProvider + 'static,
243 base_url: &str,
244 config: ClientConfig,
245 ) -> Result<Self, ClientError> {
246 // Parse-don't-validate: parse_base_url returns a fully-validated
247 // url::Url so the constructor body can read as four facts (parse,
248 // validate config, build transport, construct Self) rather than
249 // a procedure (bd:JMAP-6lsm.26).
250 let parsed = parse_base_url(base_url)?;
251 config.validate()?;
252 // Unwrap the opaque HttpClient at construction time (bd:JMAP-6r7c.36).
253 // The wrapper exists to keep reqwest::Client out of the
254 // TransportConfig trait signature; once the client is owned by
255 // JmapClient, the inner reqwest::Client is the working type for
256 // internal request building. into_inner is pub(crate) so external
257 // code cannot reach inside the opaque wrapper.
258 let http = transport.build_client()?.into_inner();
259 Ok(Self {
260 base_url: parsed,
261 auth: Arc::new(auth),
262 http,
263 config,
264 })
265 }
266
267 /// Convenience constructor for servers with publicly-trusted TLS.
268 ///
269 /// Equivalent to `JmapClient::new(DefaultTransport, auth, base_url, config)`.
270 /// Use [`JmapClient::new`] when you need a custom transport (e.g.
271 /// `CustomCaTransport` for a private-CA server).
272 pub fn new_plain(
273 auth: impl AuthProvider + 'static,
274 base_url: &str,
275 config: ClientConfig,
276 ) -> Result<Self, ClientError> {
277 Self::new(DefaultTransport, auth, base_url, config)
278 }
279
280 /// Create a new client sharing an existing `Arc<dyn AuthProvider>`
281 /// (bd:JMAP-6r7c.27).
282 ///
283 /// `JmapClient::new` and `new_plain` take `auth` by value and wrap it
284 /// in a fresh `Arc` internally. That is the ergonomic case for a
285 /// caller constructing one client with one auth provider. It is the
286 /// wrong shape for a caller who:
287 ///
288 /// - Operates multiple `JmapClient` instances against different
289 /// shards or accounts but uses the **same** credential holder
290 /// (e.g. a shared OAuth token-refresh state machine, a shared
291 /// service-account principal).
292 /// - Wants a credential refresh in one client to be visible to all
293 /// sibling clients without rebuilding each one.
294 ///
295 /// This constructor takes a pre-built `Arc<dyn AuthProvider>` so
296 /// callers can clone the Arc and pass clones to multiple
297 /// `JmapClient::new_with_shared_auth` calls. The auth provider is
298 /// then shared by reference-count, and any interior-mutable state
299 /// (e.g. an `RwLock<TokenState>` inside a custom `OAuthAuth`
300 /// implementation that holds a refreshable bearer) is genuinely
301 /// shared across all sibling clients.
302 ///
303 /// Arguments mirror [`JmapClient::new`] otherwise.
304 ///
305 /// ```rust,ignore
306 /// use std::sync::Arc;
307 /// use jmap_base_client::{auth::{AuthProvider, BearerAuth, DefaultTransport}, client::{JmapClient, ClientConfig}};
308 ///
309 /// let auth: Arc<dyn AuthProvider> = Arc::new(BearerAuth::new("token")?);
310 /// let shard_a = JmapClient::new_with_shared_auth(
311 /// DefaultTransport,
312 /// auth.clone(),
313 /// "https://a.example.com",
314 /// ClientConfig::default(),
315 /// )?;
316 /// let shard_b = JmapClient::new_with_shared_auth(
317 /// DefaultTransport,
318 /// auth,
319 /// "https://b.example.com",
320 /// ClientConfig::default(),
321 /// )?;
322 /// ```
323 pub fn new_with_shared_auth(
324 transport: impl TransportConfig,
325 auth: Arc<dyn AuthProvider>,
326 base_url: &str,
327 config: ClientConfig,
328 ) -> Result<Self, ClientError> {
329 let parsed = parse_base_url(base_url)?;
330 config.validate()?;
331 // Unwrap the opaque HttpClient at construction time (bd:JMAP-6r7c.36).
332 let http = transport.build_client()?.into_inner();
333 Ok(Self {
334 base_url: parsed,
335 auth,
336 http,
337 config,
338 })
339 }
340
341 /// Apply the auth header (if any) to a request builder.
342 ///
343 /// Centralises the repeated `if let Some(...) = self.auth.auth_header()` pattern
344 /// that every HTTP method uses. Callers: `fetch_session`, `call`,
345 /// `subscribe_events`, `upload_blob`, `download_blob`.
346 pub(crate) fn inject_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
347 if let Some(header) = self.auth.auth_header() {
348 builder.header(header.name(), header.expose_value())
349 } else {
350 builder
351 }
352 }
353
354 /// Returns `Err(ClientError::AuthFailed)` when the HTTP status indicates an
355 /// authentication or authorization failure.
356 ///
357 /// Specifically handles:
358 /// - 401 Unauthorized (RFC 7235 §3.1) — missing or invalid credentials
359 /// - 403 Forbidden (RFC 7235 §3.2) — credentials present but insufficient
360 ///
361 /// Called before reading the response body so callers can distinguish
362 /// permanent auth failures from transient errors without consuming the body.
363 pub(crate) fn check_auth_status(status: reqwest::StatusCode) -> Result<(), ClientError> {
364 if status == reqwest::StatusCode::UNAUTHORIZED || status == reqwest::StatusCode::FORBIDDEN {
365 Err(ClientError::AuthFailed(status.as_u16()))
366 } else {
367 Ok(())
368 }
369 }
370}
371
372/// Read a response body into memory, capped at `limit` bytes.
373///
374/// Performs the Content-Length pre-check as a fast-path rejection for honest
375/// oversized responses, then streams the body chunk-by-chunk and enforces the
376/// cap before each accumulation. This prevents buffering a response that
377/// exceeds the limit when Content-Length is absent (chunked Transfer-Encoding)
378/// or under-reported by a hostile server — without per-chunk streaming, a
379/// `.bytes().await` call would buffer the entire response before the
380/// post-buffer length check could fire (bd:JMAP-6r7c.1).
381///
382/// Threat model: matters when connecting to untrusted servers (federation
383/// peers, third-party JMAP services, DNS-hijacked endpoints, MITM scenarios).
384/// On low-memory devices the peak allocation from naive buffering can OOM-kill
385/// the client even if the post-buffer check would otherwise reject.
386///
387/// Returns `ClientError::ResponseTooLarge` when either the advertised
388/// Content-Length or the accumulated chunk total exceeds `limit`.
389pub(crate) async fn read_capped_body(
390 resp: reqwest::Response,
391 limit: u64,
392) -> Result<Vec<u8>, ClientError> {
393 if let Some(len) = resp.content_length() {
394 if len > limit {
395 return Err(ClientError::ResponseTooLarge { actual: len, limit });
396 }
397 }
398
399 let mut stream = resp.bytes_stream();
400 let mut body: Vec<u8> = Vec::new();
401 while let Some(chunk) = stream.next().await {
402 let chunk = chunk.map_err(ClientError::from_reqwest)?;
403 let new_len = body.len() as u64 + chunk.len() as u64;
404 if new_len > limit {
405 return Err(ClientError::ResponseTooLarge {
406 actual: new_len,
407 limit,
408 });
409 }
410 body.extend_from_slice(&chunk);
411 }
412 Ok(body)
413}
414
415impl JmapClient {
416 /// Fetch the JMAP Session object from `{base_url}/.well-known/jmap` (RFC 8620 §2).
417 ///
418 /// The response body is capped at 1 MiB. Returns `ClientError::ResponseTooLarge`
419 /// if the server sends more. Session URL fields (`apiUrl`, `uploadUrl`,
420 /// `downloadUrl`, `eventSourceUrl`) are validated to have http/https scheme;
421 /// a non-http scheme returns `ClientError::InvalidSession`.
422 ///
423 /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
424 ///
425 /// # Charset
426 ///
427 /// The response body MUST be UTF-8-encoded JSON (RFC 8259 §8.1). A server
428 /// that sends UTF-16 or UTF-32 JSON — even with a matching
429 /// `charset=utf-16` `Content-Type` parameter — will fail to parse as
430 /// `ClientError::Parse`; the error does not specifically call out the
431 /// charset mismatch. Every shipped JMAP server uses UTF-8, but a
432 /// non-conformant server can produce a confusing parse error
433 /// (bd:JMAP-6r7c.28).
434 pub async fn fetch_session(&self) -> Result<Session, ClientError> {
435 let limit = self.config.max_session_body;
436 let url = self.base_url.join(".well-known/jmap").map_err(|e| {
437 // base_url was pre-validated in JmapClient::new; this path is
438 // unreachable in practice but must be handled for completeness.
439 ClientError::InvalidSession(format!("cannot construct session URL: {e}"))
440 })?;
441
442 let req = self.inject_auth(self.http.get(url).timeout(self.config.request_timeout));
443
444 let resp = {
445 let raw_resp = req.send().await.map_err(ClientError::from_reqwest)?;
446 Self::check_auth_status(raw_resp.status())?;
447 raw_resp
448 .error_for_status()
449 .map_err(ClientError::from_reqwest)?
450 };
451
452 // Stream the body chunk-by-chunk with the cap enforced before each
453 // accumulation. The Content-Length pre-check inside read_capped_body
454 // is a fast-path rejection for honest oversized responses; the
455 // streaming cap is the actual DOS guard against a hostile server
456 // that under-reports Content-Length or omits it entirely under
457 // chunked Transfer-Encoding (bd:JMAP-6r7c.1).
458 let body = read_capped_body(resp, limit).await?;
459
460 let session: Session = serde_json::from_slice(&body).map_err(ClientError::from_parse)?;
461
462 validate_session_url_schemes(&session)?;
463
464 Ok(session)
465 }
466
467 /// POST a [`jmap_types::JmapRequest`] to `api_url` and return the parsed [`jmap_types::JmapResponse`]
468 /// (RFC 8620 §3.3).
469 ///
470 /// `api_url` is taken as an explicit parameter (not from `self`) because the
471 /// caller holds a [`Session`] and selects the correct URL from it.
472 ///
473 /// The response body is capped at 8 MiB. Returns `ClientError::ResponseTooLarge`
474 /// if the server sends more.
475 ///
476 /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
477 ///
478 /// # Charset
479 ///
480 /// The response body MUST be UTF-8-encoded JSON (RFC 8259 §8.1). A server
481 /// that sends UTF-16 or UTF-32 JSON — even with a matching
482 /// `charset=utf-16` `Content-Type` parameter — will fail to parse as
483 /// `ClientError::Parse`; the error does not specifically call out the
484 /// charset mismatch. Every shipped JMAP server uses UTF-8, but a
485 /// non-conformant server can produce a confusing parse error
486 /// (bd:JMAP-6r7c.28).
487 ///
488 /// # See also
489 ///
490 /// Prefer [`JmapClient::call_session`] when you have a [`Session`] —
491 /// it picks the correct URL field automatically and prevents the
492 /// "I passed `session.upload_url` instead of `session.api_url`"
493 /// confusion (bd:JMAP-6r7c.39).
494 pub async fn call(
495 &self,
496 api_url: &str,
497 req: &jmap_types::JmapRequest,
498 ) -> Result<jmap_types::JmapResponse, ClientError> {
499 require_http_url(api_url)?;
500 let limit = self.config.max_call_body;
501
502 let builder = self.inject_auth(
503 self.http
504 .post(api_url)
505 .json(req)
506 .timeout(self.config.request_timeout),
507 );
508
509 let resp = {
510 let raw_resp = builder.send().await.map_err(ClientError::from_reqwest)?;
511 Self::check_auth_status(raw_resp.status())?;
512 raw_resp
513 .error_for_status()
514 .map_err(ClientError::from_reqwest)?
515 };
516
517 // Stream the body chunk-by-chunk with the cap enforced before each
518 // accumulation (bd:JMAP-6r7c.1). See `read_capped_body` for the DOS
519 // rationale; without per-chunk streaming, a server that under-reports
520 // or omits Content-Length can force unbounded allocation here.
521 let body = read_capped_body(resp, limit).await?;
522
523 let jmap_resp: jmap_types::JmapResponse =
524 serde_json::from_slice(&body).map_err(ClientError::from_parse)?;
525
526 Ok(jmap_resp)
527 }
528
529 /// POST a [`jmap_types::JmapRequest`] to the `api_url` field of `session`
530 /// and return the parsed response (bd:JMAP-6r7c.39).
531 ///
532 /// Type-safe alternative to [`JmapClient::call`]: takes a [`Session`]
533 /// reference and reads `session.api_url` internally. The
534 /// "I passed `session.upload_url` instead of `session.api_url`"
535 /// confusion is impossible at the call site because the caller does
536 /// not select a URL — only `Session::api_url` is used.
537 ///
538 /// Same body cap, auth, and error semantics as [`JmapClient::call`].
539 pub async fn call_session(
540 &self,
541 session: &Session,
542 req: &jmap_types::JmapRequest,
543 ) -> Result<jmap_types::JmapResponse, ClientError> {
544 self.call(session.api_url.as_str(), req).await
545 }
546
547 /// Open an SSE connection to `event_source_url` and return an async stream
548 /// of parsed [`SseFrame`]s (RFC 8620 §7.3).
549 ///
550 /// # URI template expansion
551 ///
552 /// `Session.event_source_url` is a URI template (RFC 6570 Level-1) with
553 /// variables `types`, `closeafter`, and `ping`. You **must** expand it
554 /// before passing it to this function, or the server will receive the
555 /// literal text `{types}` in the URL and return an error. Use
556 /// [`expand_url_template`](crate::expand_url_template):
557 ///
558 /// ```rust,ignore
559 /// let url = jmap_base_client::expand_url_template(
560 /// &session.event_source_url,
561 /// &[("types", "*"), ("closeafter", "no"), ("ping", "0")],
562 /// )?;
563 /// let stream = client.subscribe_events(&url, None).await?;
564 /// ```
565 ///
566 /// If `last_event_id` is `Some`, sends a `Last-Event-ID` header so the
567 /// server can resume from where the previous stream left off.
568 ///
569 /// Buffer growth is capped at [`ClientConfig::max_sse_frame`] bytes per
570 /// frame (default: 1 MiB). If a single SSE frame exceeds this limit the
571 /// stream yields `ClientError::SseFrameTooLarge` and terminates.
572 ///
573 /// No timeout is applied to this call or to the resulting stream. The
574 /// connect timeout (10 s, TCP only) is the only deadline enforced. If the
575 /// server stalls before sending HTTP response headers, or later goes silent
576 /// on the open connection, this call or the stream will hang indefinitely.
577 /// Wrap the entire call and/or stream iteration in [`tokio::time::timeout`]
578 /// if you need to bound either phase.
579 ///
580 /// Returns `ClientError::AuthFailed` on HTTP 401 or 403 before the stream
581 /// starts.
582 ///
583 /// # Stream drop and cancellation (bd:JMAP-6r7c.24)
584 ///
585 /// The returned `BoxStream` may be dropped at any point — mid-frame,
586 /// while awaiting `StreamExt::next`, or from inside a `tokio::select!`
587 /// losing-branch cancellation. Dropping is always safe and always
588 /// synchronous:
589 ///
590 /// - **Partial frame bytes are discarded.** Any bytes accumulated in
591 /// the internal `raw_buf` or `buf` that have not yet been parsed
592 /// into an [`SseFrame`] are lost. There is no buffering or replay
593 /// inside the client — the server is the source of truth for what
594 /// was emitted vs what was acknowledged.
595 /// - **The underlying HTTP connection is released.** The
596 /// `reqwest::Response::bytes_stream` held inside the stream is
597 /// dropped along with the stream; reqwest returns the connection
598 /// to its pool (or closes it) per its own pool policy.
599 /// - **Resumption is the caller's job.** If you want to resume from
600 /// the last successfully-parsed frame, capture the most recent
601 /// `SseFrame::id` (if the server sets one) and pass it as
602 /// `last_event_id` on the next `subscribe_events` call. The
603 /// server will replay events from that point per RFC 8895 §9.
604 ///
605 /// `tokio::select!` cancellation is the canonical use case: a caller
606 /// racing the SSE stream against a shutdown signal can drop the
607 /// stream by selecting the shutdown branch without leaking the HTTP
608 /// connection or memory.
609 pub async fn subscribe_events(
610 &self,
611 event_source_url: &str,
612 last_event_id: Option<&str>,
613 ) -> Result<futures::stream::BoxStream<'static, Result<SseFrame, ClientError>>, ClientError>
614 {
615 require_http_url(event_source_url)?;
616 let mut req = self
617 .http
618 .get(event_source_url)
619 .header("Accept", "text/event-stream");
620 if let Some(id) = last_event_id {
621 req = req.header("Last-Event-ID", id);
622 }
623 let req = self.inject_auth(req);
624
625 let resp = req.send().await.map_err(ClientError::from_reqwest)?;
626 Self::check_auth_status(resp.status())?;
627 let resp = resp.error_for_status().map_err(ClientError::from_reqwest)?;
628
629 // Verify Content-Type before streaming. A misconfigured server returning
630 // application/json would silently produce no events (no SSE delimiter found).
631 {
632 // to_ascii_lowercase(): media types are case-insensitive per RFC 7231 §3.1.1.1.
633 let ct = resp
634 .headers()
635 .get(reqwest::header::CONTENT_TYPE)
636 .and_then(|v| v.to_str().ok())
637 .unwrap_or("")
638 .to_ascii_lowercase();
639 // RFC 7231 §3.1.1.1 / RFC 9110 §8.3: the media-type "essence" (type
640 // + "/" + subtype) is bounded by ';', SP, HTAB, or end-of-string.
641 // A naive starts_with("text/event-stream") would accept
642 // "text/event-streamish" or "text/event-stream2" — exactly the bug
643 // JMAP-6lsm.2 flagged. Split off the parameter list / trailing
644 // whitespace and compare the essence exactly.
645 let essence = ct
646 .split(|c: char| c == ';' || c.is_whitespace())
647 .next()
648 .unwrap_or("");
649 if essence != "text/event-stream" {
650 return Err(ClientError::UnexpectedResponse(format!(
651 "subscribe_events: expected Content-Type text/event-stream, got: {ct:?}"
652 )));
653 }
654 }
655
656 let byte_stream = resp.bytes_stream();
657 let sse_frame_limit = self.config.max_sse_frame;
658
659 Ok(futures::stream::unfold(
660 Some(SseStreamState {
661 stream: byte_stream,
662 raw_buf: Vec::new(),
663 buf: String::new(),
664 scan_from: 0, // invariant: valid UTF-8 char boundary of buf; 0 always satisfies this
665 }),
666 move |state| async move {
667 let SseStreamState {
668 mut stream,
669 mut raw_buf,
670 mut buf,
671 mut scan_from,
672 } = state?;
673 loop {
674 // Search for any double-newline delimiter (LF/CRLF/CR variants).
675 // scan_from is set to old_len.saturating_sub(3) after each append
676 // so we only re-scan the overlap region. 3 bytes back is the
677 // minimum that covers all delimiter prefixes that can straddle a
678 // chunk boundary:
679 // - `\r\n\r\n` (4 bytes): longest prefix that fits in one chunk
680 // but is incomplete is `\r\n\r` (3 bytes) — exactly covered.
681 // - `\n\r\n` (3 bytes): longest incomplete prefix is `\n\r` (2
682 // bytes) — covered by the 3-byte overlap.
683 // - `\n\n` and `\r\r` (2 bytes each): longest incomplete prefix
684 // is 1 byte — covered by the 3-byte overlap.
685 // Since \r and \n are single-byte UTF-8 codepoints, 3 bytes back
686 // is always a valid char boundary — no adjustment needed.
687 //
688 // Do not simplify to "rescan the whole buffer" (bd:JMAP-6r7c.19).
689 // The four-delimiter min_by_key + 3-byte overlap pattern is
690 // load-bearing for four independent reasons:
691 //
692 // 1. Overlap-rescan is O(chunk_size) per chunk; whole-buffer
693 // rescan is O(n) per chunk, i.e. O(n²) over the lifetime
694 // of a long-lived SSE stream. JMAP push streams can run
695 // for hours with thousands of small events; the
696 // difference is measurable.
697 // 2. All four delimiter forms are necessary. The WHATWG
698 // EventSource spec + RFC 8895 line-terminator rules let
699 // a single server emit mixed line endings within one
700 // frame. min_by_key over all four catches the mixed
701 // `\n\r\n` case that neither `\n\n` nor `\r\n\r\n`
702 // alone catches. Removing forms produces silent
703 // frame-misalignment on those servers.
704 // 3. The walk-back to a UTF-8 char boundary on the
705 // `scan_from = old_len.saturating_sub(3)` assignment
706 // below is needed because the byte preceding scan_from
707 // might be in the middle of a multi-byte codepoint;
708 // `buf[scan_from..]` would panic on the next `.find()`
709 // without it.
710 // 4. Tests in tests/client_tests.rs pin all three line-
711 // ending variants against hand-crafted server bodies:
712 // `test_subscribe_events_crlf_line_endings`,
713 // `test_subscribe_events_lf_crlf_frame_delimiter`,
714 // and `test_subscribe_events_cr_line_endings`. The
715 // mixed `\n\r\n` case (the second test) is the
716 // trickiest and is the canonical example of why all
717 // four delimiter searches are present.
718 //
719 // Resist the "just rescan the whole buffer" refactor.
720 let frame_end = [
721 buf[scan_from..]
722 .find("\r\n\r\n")
723 .map(|p| (scan_from + p, 4usize)),
724 buf[scan_from..]
725 .find("\n\n")
726 .map(|p| (scan_from + p, 2usize)),
727 buf[scan_from..]
728 .find("\r\r")
729 .map(|p| (scan_from + p, 2usize)),
730 // Mixed: LF-terminated last field line followed by a
731 // CRLF-terminated blank line. Not detected by \n\n (the
732 // LFs are separated by a CR) or \r\n\r\n (no leading \r\n).
733 buf[scan_from..]
734 .find("\n\r\n")
735 .map(|p| (scan_from + p, 3usize)),
736 ]
737 .into_iter()
738 .flatten()
739 .min_by_key(|&(pos, _)| pos);
740
741 if let Some((pos, delim_len)) = frame_end {
742 let frame = {
743 let slice = &buf[..pos];
744 if slice.contains('\r') {
745 slice.replace("\r\n", "\n").replace('\r', "\n")
746 } else {
747 slice.to_owned()
748 }
749 };
750 buf.drain(..pos + delim_len);
751 scan_from = 0; // 0 satisfies the UTF-8 char boundary invariant
752 let sse_frame = parse_sse_block(&frame);
753 return Some((
754 Ok(sse_frame),
755 Some(SseStreamState {
756 stream,
757 raw_buf,
758 buf,
759 scan_from,
760 }),
761 ));
762 }
763
764 match stream.next().await {
765 None => return None,
766 Some(Err(e)) => {
767 return Some((Err(ClientError::from_reqwest(e)), None));
768 }
769 Some(Ok(bytes)) => {
770 // Accumulate raw bytes first. A multi-byte UTF-8 codepoint
771 // may be split across adjacent HTTP chunks; decode only the
772 // valid prefix and leave the remainder in raw_buf until the
773 // next chunk completes the sequence.
774 raw_buf.extend_from_slice(&bytes);
775 // Cap raw_buf to prevent OOM on persistent invalid UTF-8 input.
776 // Use the same limit as the decoded buf cap.
777 if raw_buf.len() > sse_frame_limit {
778 return Some((
779 Err(ClientError::SseFrameTooLarge {
780 limit: sse_frame_limit,
781 }),
782 None,
783 ));
784 }
785 let old_len = buf.len();
786 decode_utf8_chunk(&mut raw_buf, &mut buf);
787 scan_from = old_len.saturating_sub(3);
788 // Walk backward to a valid UTF-8 char boundary so that
789 // buf[scan_from..] never panics on multibyte characters.
790 while scan_from > 0 && !buf.is_char_boundary(scan_from) {
791 scan_from -= 1;
792 }
793 // Guard against unbounded buffer growth from a hostile server.
794 // Yield the error and terminate (state = None).
795 if buf.len() > sse_frame_limit {
796 return Some((
797 Err(ClientError::SseFrameTooLarge {
798 limit: sse_frame_limit,
799 }),
800 None,
801 ));
802 }
803 }
804 }
805 }
806 },
807 )
808 .boxed())
809 }
810
811 /// Open a WebSocket connection to `ws_url` using this client's
812 /// configured `max_ws_message` byte cap.
813 ///
814 /// Convenience wrapper around [`crate::ws::connect_ws_with_limit`] that
815 /// passes [`ClientConfig::max_ws_message`] as the per-message /
816 /// per-frame byte cap. Mirrors the [`Self::subscribe_events`]
817 /// pattern of "the JmapClient method uses ClientConfig; the free
818 /// function takes an explicit value".
819 ///
820 /// `ws_url` must come from the session document's WebSocket capability
821 /// URL. `auth_header` is an optional `(name, value)` pair for the
822 /// upgrade request; the auth provider on this client is NOT used here
823 /// because some servers attach WebSocket auth via cookie or session
824 /// header rather than the same scheme as HTTP requests.
825 ///
826 /// # Security
827 ///
828 /// The `auth_header` value is a credential and must not be logged or
829 /// echoed back to other systems. Treat it with the same care as a
830 /// [`crate::auth::BearerAuth`] token. Transport errors raised by this
831 /// method are constructed without the original credential bytes, but
832 /// downstream code that inspects [`ClientError`] should still avoid
833 /// printing or storing the `auth_header` itself.
834 ///
835 /// Returns [`ClientError::InvalidArgument`] for non-`ws://`/`wss://` URLs.
836 /// See [`crate::ws::connect_ws_with_limit`] for full error semantics.
837 pub async fn connect_ws_session(
838 &self,
839 ws_url: &str,
840 auth_header: Option<crate::auth::AuthHeader<'_>>,
841 ) -> Result<crate::ws::WsSession, ClientError> {
842 crate::ws::connect_ws_with_limit(ws_url, auth_header, self.config.max_ws_message).await
843 }
844
845 /// Open an SSE connection via a [`Session`]-supplied
846 /// URL template (bd:JMAP-6r7c.64).
847 ///
848 /// Type-safe convenience wrapper over [`Self::subscribe_events`] —
849 /// expands `session.event_source_url` internally using the
850 /// caller-supplied [`SubscribeEventsSessionParams`] template
851 /// variables. The caller cannot accidentally pass `session.api_url`
852 /// or any other URL field because no URL is exposed at the call
853 /// site.
854 ///
855 /// Template variables that are `None` expand to an empty string,
856 /// per the RFC 8620 §7.3 default-omission semantics. Most JMAP
857 /// servers accept `types=` (subscribe to all types),
858 /// `closeafter=` (stay-open), and `ping=` (no server pings) as
859 /// defaults; if your server requires explicit values, supply them
860 /// via the params struct.
861 pub async fn subscribe_events_session(
862 &self,
863 session: &Session,
864 params: SubscribeEventsSessionParams<'_>,
865 ) -> Result<futures::stream::BoxStream<'static, Result<SseFrame, ClientError>>, ClientError>
866 {
867 let SubscribeEventsSessionParams {
868 types,
869 close_after,
870 ping,
871 last_event_id,
872 } = params;
873
874 // RFC 8620 §7.3 event_source_url variables: types, closeafter, ping.
875 // ping is u32; format to a String only when needed so the empty-default
876 // path stays allocation-free.
877 let ping_owned = ping.map(|p| p.to_string());
878 let vars = [
879 ("types", types.unwrap_or("")),
880 ("closeafter", close_after.unwrap_or("")),
881 ("ping", ping_owned.as_deref().unwrap_or("")),
882 ];
883 let expanded = crate::blob::expand_url_template(session.event_source_url.as_str(), &vars)?;
884 self.subscribe_events(&expanded, last_event_id).await
885 }
886}
887
888/// Parameters for [`JmapClient::subscribe_events_session`]
889/// (bd:JMAP-6r7c.64).
890///
891/// Carries the RFC 8620 §7.3 event_source_url template variables
892/// (`types`, `closeafter`, `ping`) plus the optional Last-Event-ID
893/// header (RFC 8895 §9). `None` template values expand to an empty
894/// string per the RFC's default-omission semantics.
895///
896/// Construct with a struct literal:
897///
898/// ```rust,ignore
899/// client.subscribe_events_session(&session, SubscribeEventsSessionParams {
900/// types: Some("Email,Mailbox"),
901/// close_after: Some("state"),
902/// ping: Some(60),
903/// last_event_id: Some("evt-1234"),
904/// }).await?;
905/// ```
906#[derive(Debug, Clone, Copy, Default)]
907pub struct SubscribeEventsSessionParams<'a> {
908 /// `types` template variable: comma-separated list of JMAP data-type
909 /// names to subscribe to (`"Email,Mailbox"`), or `"*"` for all types
910 /// the server supports. `None` expands to an empty string.
911 pub types: Option<&'a str>,
912 /// `closeafter` template variable: `"state"` to close after the
913 /// first state-change push, `"no"` for stay-open. `None` expands to
914 /// an empty string.
915 pub close_after: Option<&'a str>,
916 /// `ping` template variable: server-ping interval in seconds, or
917 /// `0` to disable. `None` expands to an empty string.
918 pub ping: Option<u32>,
919 /// Optional `Last-Event-ID` header for resumption (RFC 8895 §9).
920 pub last_event_id: Option<&'a str>,
921}
922
923/// Find the method response matching `call_id` in `resp` and deserialize its
924/// arguments into `T`.
925///
926/// Returns [`ClientError::MethodNotFound`] if no invocation with the given
927/// call_id exists. Returns [`ClientError::MethodError`] if any invocation
928/// with the matching call_id is a JMAP `"error"` response (RFC 8620 §3.6.1).
929///
930/// # Multiple invocations sharing a call_id
931///
932/// Per RFC 8620 §3.2, a single method call may produce multiple invocations
933/// in the response — for example, `Foo/copy` with `onSuccessDestroyOriginal:
934/// true` produces both a `Foo/copy` and an implicit `Foo/set` invocation,
935/// both stamped with the same call_id (RFC 8620 §5.8 example, lines 3158–
936/// 3180). This function handles that case by:
937///
938/// 1. **Errors take precedence.** If any invocation matching `call_id`
939/// has method name `"error"`, this function returns that error. A
940/// success response cannot mask a sibling error response with the same
941/// call_id — silently returning the success while the server reported
942/// failure would be data loss for the caller.
943/// 2. Otherwise, the **first** non-error invocation matching `call_id`
944/// is deserialized into `T`. In the §5.8 implicit-method case both
945/// invocations are successes; the first is the primary response and
946/// is what the caller wants.
947///
948/// # Contract: `call_id` is the only matcher (bd:JMAP-6r7c.23)
949///
950/// The method-name field of the matching invocation is **not** checked
951/// against `T`. The function trusts the caller's choice of `call_id` to
952/// identify the invocation and trusts the server's choice of method name
953/// to be consistent with the request it was answering. Two consequences
954/// callers should be aware of:
955///
956/// - **Wrong `T` may still parse.** If a caller asks for
957/// `extract_response::<EmailGetResponse>(&resp, "r1")` but the matching
958/// invocation is actually a `Mailbox/get` response, `serde_json::from_value`
959/// will succeed on any structural overlap between the two shapes (both
960/// carry `accountId`, an `ids`/`list` field, etc.) and return a
961/// default-shaped `EmailGetResponse`. The function cannot detect this
962/// because it has no view of what method name the caller expected.
963/// - **Server ordering is trusted.** When multiple non-error invocations
964/// share a `call_id`, the function returns the first one in
965/// `resp.method_responses` order. RFC 8620 §5.8 implies the primary
966/// response is first, but the spec does not normatively require it; a
967/// non-conformant server that returns the implicit method first would
968/// silently get the wrong invocation deserialized as `T`.
969///
970/// Callers that need method-name verification or want to disambiguate
971/// among multiple non-error matches should iterate
972/// `resp.method_responses` directly. The field is public and the
973/// [`jmap_types::Invocation`] type is `(method, args, call_id)`.
974///
975/// # Panics
976///
977/// This function does not catch panics from `T`'s [`serde::Deserialize`]
978/// implementation. If a custom `T` type's `deserialize` impl panics — e.g.
979/// because of an `.unwrap()` on a sub-field — the panic propagates through
980/// `extract_response` to the caller's await point. The standard derived
981/// `Deserialize` impls in the workspace type crates (`jmap-types`,
982/// `jmap-mail-types`, etc.) do not panic; this caveat only affects
983/// hand-rolled `Deserialize` impls outside the workspace
984/// (bd:JMAP-6r7c.44).
985///
986/// This function is `pub` so extension crates (`jmap-chat-client`,
987/// `jmap-mail-client`) can use it to extract typed results from a
988/// [`jmap_types::JmapResponse`] without depending on internal details.
989pub fn extract_response<T: serde::de::DeserializeOwned>(
990 resp: &jmap_types::JmapResponse,
991 call_id: &str,
992) -> Result<T, ClientError> {
993 // Invocation is a type alias (String, Value, String) = (method, args, call_id).
994 // Two-pass scan: errors take precedence per §3.6.1, so look for an error
995 // invocation first; otherwise return the first non-error invocation.
996 // The underlying iterator is slice::Iter, so .clone() is a cheap pointer
997 // copy — no allocation.
998 let mut candidates = resp.method_responses.iter().filter(|inv| inv.2 == call_id);
999
1000 if let Some(err_inv) = candidates.clone().find(|inv| inv.0 == "error") {
1001 let args = &err_inv.1;
1002 let err_type = args
1003 .get("type")
1004 .and_then(|v| v.as_str())
1005 .unwrap_or("serverError") // safe: fallback literal, not user input
1006 .to_owned();
1007 let description = args
1008 .get("description")
1009 .and_then(|v| v.as_str())
1010 .map(str::to_owned);
1011 return Err(ClientError::MethodError {
1012 error_type: err_type,
1013 description,
1014 });
1015 }
1016
1017 // The early return above already handled the error case; every remaining
1018 // candidate is a success invocation, so we just take the first one.
1019 let inv = candidates
1020 .next()
1021 .ok_or_else(|| ClientError::MethodNotFound(call_id.to_owned()))?;
1022 <T as serde::Deserialize>::deserialize(&inv.1).map_err(ClientError::from_parse)
1023}
1024
1025/// Decode as much valid UTF-8 as possible from `raw` into `buf`, draining
1026/// consumed (or definitively-invalid) bytes from `raw`.
1027///
1028/// Three cases:
1029/// - `raw` is fully valid UTF-8: pushed entirely to `buf`, `raw` cleared.
1030/// - `raw` ends with an **incomplete** multi-byte sequence (`error_len == None`):
1031/// the valid prefix is pushed to `buf` and drained from `raw`; the incomplete
1032/// head bytes stay in `raw` for the next chunk to complete them.
1033/// - `raw` contains a **definitively invalid** byte sequence (`error_len == Some(n)`):
1034/// the valid prefix is pushed to `buf`; the valid prefix AND the `n` invalid
1035/// bytes are drained from `raw` so they do not accumulate.
1036///
1037/// The caller is responsible for capping `raw.len()` before calling this
1038/// function; unbounded growth from a hostile server that never completes a
1039/// sequence is prevented by the 1 MiB `SSE_BUF_SIZE_LIMIT` check in
1040/// `subscribe_events`.
1041fn decode_utf8_chunk(raw: &mut Vec<u8>, buf: &mut String) {
1042 match std::str::from_utf8(raw) {
1043 Ok(s) => {
1044 buf.push_str(s);
1045 raw.clear();
1046 }
1047 Err(e) => {
1048 let valid_up_to = e.valid_up_to();
1049 // valid_up_to is the documented prefix length that IS valid UTF-8
1050 // by construction (std::str::Utf8Error contract). The crate-root
1051 // #![forbid(unsafe_code)] rules out std::str::from_utf8_unchecked,
1052 // which would be the canonical zero-cost path for "I have a slice
1053 // I know is valid UTF-8 but the type system doesn't". With unsafe
1054 // forbidden, .expect() is the cheapest legal alternative; the
1055 // expect message tracks the soundness argument so a future
1056 // reviewer does not "fix" this with unwrap_or_default()
1057 // (which would silently drop valid UTF-8 prefix bytes on a
1058 // hypothetical impossible failure path) (bd:JMAP-6r7c.54).
1059 buf.push_str(
1060 std::str::from_utf8(&raw[..valid_up_to])
1061 .expect("valid_up_to is a valid UTF-8 boundary"),
1062 );
1063 match e.error_len() {
1064 Some(n) => {
1065 // Definitively invalid: drain the valid prefix AND the
1066 // invalid bytes so they do not accumulate in raw.
1067 let drain_end = (valid_up_to + n).min(raw.len());
1068 raw.drain(..drain_end);
1069 }
1070 None => {
1071 // Incomplete multi-byte sequence: drain only the valid
1072 // prefix. The incomplete head stays in raw until the next
1073 // chunk arrives with the missing continuation bytes.
1074 raw.drain(..valid_up_to);
1075 }
1076 }
1077 }
1078 }
1079}
1080
1081/// Extract the URL scheme as a borrowed slice of `url`.
1082///
1083/// Returns the prefix before `"://"`. The returned slice is in the
1084/// *original* case of the input (callers must use [`str::eq_ignore_ascii_case`]
1085/// for the comparison per RFC 3986 §3.1, which says only schemes are
1086/// case-insensitive). Returns `None` if `"://"` is not present in `url`.
1087/// URL templates containing `{variable}` syntax are handled correctly
1088/// because the extraction is a prefix scan, not a full URL parse.
1089///
1090/// Returning a borrowed slice rather than a `to_ascii_lowercase()` String
1091/// avoids allocating on every scheme check; previously each call to
1092/// `url_scheme` produced a fresh String the size of the URL prefix
1093/// (bd:JMAP-6lsm.10).
1094fn url_scheme(url: &str) -> Option<&str> {
1095 url.split_once("://").map(|(scheme, _)| scheme)
1096}
1097
1098/// `true` if `url`'s scheme prefix is `http` or `https` (case-insensitive,
1099/// RFC 3986 §3.1).
1100fn is_http_or_https(url: &str) -> bool {
1101 url_scheme(url)
1102 .is_some_and(|s| s.eq_ignore_ascii_case("http") || s.eq_ignore_ascii_case("https"))
1103}
1104
1105/// Parse and validate a JMAP base URL.
1106///
1107/// The input must be:
1108/// - non-empty
1109/// - syntactically valid (parses with [`url::Url::parse`])
1110/// - http or https scheme (case-insensitive per RFC 3986 §3.1)
1111/// - origin-only: no explicit path component, no query string, no fragment.
1112/// "Origin" here means scheme + host + optional port — the form a JMAP
1113/// server is identified by. Trailing slashes are accepted (the URL
1114/// parser normalizes them away).
1115///
1116/// Examples that PASS: `https://jmap.example.com`,
1117/// `https://jmap.example.com/`, `https://10.0.0.1:8008`,
1118/// `http://localhost:8080`.
1119///
1120/// Examples that FAIL: `""`, `https://example.com/api`,
1121/// `https://example.com?query=1`, `https://example.com#fragment`,
1122/// `ftp://example.com`.
1123///
1124/// Extracted from [`JmapClient::new`] so the constructor reads as
1125/// 'parse-don't-validate' rather than a six-step inline procedure
1126/// (bd:JMAP-6lsm.26).
1127fn parse_base_url(base_url: &str) -> Result<url::Url, ClientError> {
1128 if base_url.is_empty() {
1129 return Err(ClientError::InvalidArgument(
1130 "base_url may not be empty".into(),
1131 ));
1132 }
1133 let parsed = url::Url::parse(base_url)
1134 .map_err(|e| ClientError::InvalidArgument(format!("base_url is not a valid URL: {e}")))?;
1135 // Reject RFC 3986 user-info (`user:password@host`) before any further
1136 // validation. The url crate's Display impl echoes user-info verbatim
1137 // (lossless-round-trip, NOT safe-display — RFC 3986 §7.5 warns
1138 // against the latter), so an unrejected user-info value would surface
1139 // through every subsequent error message, every reqwest::Error's
1140 // url() field, and any tracing instrumentation that captures the
1141 // base URL or a derived ClientError. JMAP authenticates via the
1142 // Authorization header (AuthProvider trait), not URL user-info; the
1143 // user-info component has no legitimate use here (bd:JMAP-6r7c.58).
1144 //
1145 // The error message DOES NOT echo `base_url` back — doing so would
1146 // route the password into the error chain we are trying to keep it
1147 // out of.
1148 if !parsed.username().is_empty() || parsed.password().is_some() {
1149 return Err(ClientError::InvalidArgument(
1150 "base_url must not contain user-info (`user:password@host`); pass credentials via \
1151 AuthProvider instead"
1152 .into(),
1153 ));
1154 }
1155 let scheme = parsed.scheme();
1156 if scheme != "http" && scheme != "https" {
1157 return Err(ClientError::InvalidArgument(format!(
1158 "base_url scheme must be http or https, got: {scheme:?}"
1159 )));
1160 }
1161 let path = parsed.path();
1162 // url::Url::path() returns "/" for root-only URLs (no path segments);
1163 // any value other than "/" means the URL contains an explicit path component.
1164 if path != "/" {
1165 return Err(ClientError::InvalidArgument(format!(
1166 "base_url must not have a path component, got: {path:?}"
1167 )));
1168 }
1169 if parsed.query().is_some() {
1170 return Err(ClientError::InvalidArgument(
1171 "base_url must not have a query string".into(),
1172 ));
1173 }
1174 if parsed.fragment().is_some() {
1175 return Err(ClientError::InvalidArgument(
1176 "base_url must not have a fragment".into(),
1177 ));
1178 }
1179 Ok(parsed)
1180}
1181
1182/// Validate that `url` uses an http or https scheme.
1183///
1184/// Called at the top of each public method that accepts a URL parameter
1185/// (`call`, `subscribe_events`, `upload_blob`, `download_blob`). This is a
1186/// defense-in-depth check: the primary protection is
1187/// [`validate_session_url_schemes`] which rejects bad URLs in the Session
1188/// document at fetch time. This check makes each individual call site
1189/// self-defending against accidentally passing a non-http URL (e.g. from a
1190/// test fixture or a misused API).
1191///
1192/// Returns [`ClientError::InvalidArgument`] if the scheme is not http/https.
1193pub(crate) fn require_http_url(url: &str) -> Result<(), ClientError> {
1194 if !is_http_or_https(url) {
1195 return Err(ClientError::InvalidArgument(format!(
1196 "URL must have http or https scheme, got: {url:?}"
1197 )));
1198 }
1199 Ok(())
1200}
1201
1202/// Validate the *schemes only* of the four session URL fields.
1203///
1204/// Three of the four (`upload_url`, `download_url`, `event_source_url`) are
1205/// RFC 6570 URI templates carrying `{accountId}`, `{blobId}`, `{types}`,
1206/// etc. They cannot be fully parsed as URLs without first being expanded
1207/// with the relevant variables, but the scheme prefix is always concrete
1208/// (templates put the scheme on the left of `://`). This function does the
1209/// minimal check that the scheme prefix is `http://` or `https://` — it
1210/// does NOT verify that the templates carry the required variables, that
1211/// the host is well-formed, or that the path is reachable.
1212///
1213/// Renamed from `validate_session_urls` for accuracy (bd:JMAP-6lsm.23):
1214/// the name implied stronger validation than the function actually
1215/// performs.
1216fn validate_session_url_schemes(session: &Session) -> Result<(), ClientError> {
1217 // Mixed types (JmapUrl, JmapUrlTemplate) deliberately — both expose
1218 // .as_str() identically. Iterating &str values keeps the validation
1219 // loop oblivious to the typed-URL distinction (bd:JMAP-6r7c.40),
1220 // which is the right scope for a scheme check.
1221 for url in [
1222 session.api_url.as_str(),
1223 session.upload_url.as_str(),
1224 session.download_url.as_str(),
1225 session.event_source_url.as_str(),
1226 ] {
1227 if !is_http_or_https(url) {
1228 return Err(ClientError::InvalidSession(format!(
1229 "session URL has non-http/https scheme: {url:?}"
1230 )));
1231 }
1232 }
1233 Ok(())
1234}
1235
1236// ---------------------------------------------------------------------------
1237// Tests
1238// ---------------------------------------------------------------------------
1239
1240#[cfg(test)]
1241mod tests {
1242 use super::decode_utf8_chunk;
1243
1244 /// Oracle: all-ASCII bytes pushed to buf; raw cleared.
1245 #[test]
1246 fn decode_utf8_chunk_all_ascii() {
1247 let mut raw = b"hello".to_vec();
1248 let mut buf = String::new();
1249 decode_utf8_chunk(&mut raw, &mut buf);
1250 assert_eq!(buf, "hello");
1251 assert!(raw.is_empty());
1252 }
1253
1254 /// Oracle: complete multi-byte codepoint (U+00E9 café = 0xC3 0xA9) pushed fully.
1255 #[test]
1256 fn decode_utf8_chunk_complete_multibyte() {
1257 let mut raw = "café".as_bytes().to_vec();
1258 let mut buf = String::new();
1259 decode_utf8_chunk(&mut raw, &mut buf);
1260 assert_eq!(buf, "café");
1261 assert!(raw.is_empty());
1262 }
1263
1264 /// Oracle: first byte of a 2-byte sequence (U+00E9 = 0xC3 0xA9) arrives alone.
1265 /// error_len == None (incomplete) — the byte must be RETAINED in raw, not dropped.
1266 /// Regression test for the bug where valid_up_to.max(1) discarded this byte.
1267 #[test]
1268 fn decode_utf8_chunk_incomplete_head_retained() {
1269 let mut raw = vec![0xC3u8]; // first byte of 2-byte sequence
1270 let mut buf = String::new();
1271 decode_utf8_chunk(&mut raw, &mut buf);
1272 assert_eq!(buf, "", "no complete codepoints to push");
1273 assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
1274 }
1275
1276 /// Oracle: valid ASCII prefix then first byte of a 2-byte sequence.
1277 /// Valid prefix goes to buf; incomplete head stays in raw.
1278 #[test]
1279 fn decode_utf8_chunk_prefix_then_incomplete_head() {
1280 let mut raw = vec![b'a', b'b', 0xC3u8];
1281 let mut buf = String::new();
1282 decode_utf8_chunk(&mut raw, &mut buf);
1283 assert_eq!(buf, "ab");
1284 assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
1285 }
1286
1287 /// Oracle: two-call simulation — incomplete sequence completed on second call.
1288 /// This is the exact HTTP chunk-split scenario the fix targets.
1289 #[test]
1290 fn decode_utf8_chunk_split_sequence_completed() {
1291 // Chunk 1: only the first byte of U+00E9 (é = 0xC3 0xA9)
1292 let mut raw = vec![0xC3u8];
1293 let mut buf = String::new();
1294 decode_utf8_chunk(&mut raw, &mut buf);
1295 assert_eq!(raw, vec![0xC3u8], "incomplete head retained after chunk 1");
1296
1297 // Chunk 2: completion byte
1298 raw.push(0xA9u8);
1299 decode_utf8_chunk(&mut raw, &mut buf);
1300 assert_eq!(buf, "é", "character fully decoded after chunk 2");
1301 assert!(raw.is_empty());
1302 }
1303
1304 /// Oracle: definitively invalid byte (0xFF is never valid in UTF-8) is drained.
1305 #[test]
1306 fn decode_utf8_chunk_invalid_byte_drained() {
1307 let mut raw = vec![0xFFu8];
1308 let mut buf = String::new();
1309 decode_utf8_chunk(&mut raw, &mut buf);
1310 assert_eq!(buf, "");
1311 assert!(raw.is_empty(), "definitively invalid byte must be drained");
1312 }
1313
1314 /// Oracle: valid prefix then definitively invalid byte — prefix pushed and both drained.
1315 #[test]
1316 fn decode_utf8_chunk_prefix_then_invalid_drained() {
1317 let mut raw = vec![b'a', b'b', 0xFFu8];
1318 let mut buf = String::new();
1319 decode_utf8_chunk(&mut raw, &mut buf);
1320 assert_eq!(buf, "ab");
1321 assert!(raw.is_empty(), "prefix and invalid byte must be drained");
1322 }
1323
1324 /// Compile-time assertion that [`JmapClient`] is `Send + Sync + Clone`
1325 /// (bd:JMAP-6r7c.25). A future refactor that adds a non-`Sync` field
1326 /// (e.g. `Rc<T>`, `RefCell<T>`, `Cell<T>`) would fail this test in CI
1327 /// before any downstream consumer that shares a `JmapClient` across
1328 /// tokio tasks is exposed. The body is fenceposts: the function bodies
1329 /// of `assert_send`, `assert_sync`, and `assert_clone` never run; what
1330 /// matters is that the trait-bound monomorphization succeeds.
1331 #[test]
1332 fn jmap_client_is_send_sync_clone() {
1333 fn assert_send<T: Send>() {}
1334 fn assert_sync<T: Sync>() {}
1335 fn assert_clone<T: Clone>() {}
1336 assert_send::<super::JmapClient>();
1337 assert_sync::<super::JmapClient>();
1338 assert_clone::<super::JmapClient>();
1339 }
1340}