Skip to main content

jmap_base_client/
client.rs

1//! Auth-agnostic base JMAP HTTP client (RFC 8620).
2//!
3//! Provides [`JmapClient`] for session fetch, API calls, blob transfer,
4//! SSE event streaming, and [`extract_response`] for parsing method results.
5
6use std::sync::Arc;
7
8use futures::StreamExt;
9
10use crate::auth::{AuthProvider, DefaultTransport, TransportConfig};
11use crate::error::ClientError;
12use crate::request::Session;
13use crate::sse::{parse_sse_block, SseFrame};
14
15/// Internal state threaded through the `subscribe_events` unfold loop.
16struct SseStreamState<S> {
17    stream: S,
18    /// Accumulates raw bytes from the HTTP stream before UTF-8 decoding.
19    /// Incomplete multi-byte sequences remain here until the next chunk
20    /// completes them, preventing stream termination when a codepoint is
21    /// split across adjacent chunks.
22    raw_buf: Vec<u8>,
23    buf: String,
24    /// Byte offset from which the next delimiter scan begins.
25    /// Must always be a valid UTF-8 char boundary of `buf`.
26    scan_from: usize,
27}
28
29/// Per-client configuration for timeouts and body size limits.
30///
31/// Use [`ClientConfig::default()`] for production defaults (30s timeout, RFC-safe caps).
32///
33/// This type is `#[non_exhaustive]`: callers outside this crate must use
34/// `..ClientConfig::default()` when constructing it, allowing new fields to
35/// be added in minor versions without breaking callers.
36#[non_exhaustive]
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct ClientConfig {
39    /// Timeout for HTTP request/response cycles (fetch_session, call, upload_blob, download_blob).
40    /// Does NOT apply to SSE or WebSocket streams (which are indefinite by nature).
41    /// Must be > 0. Use `Duration::from_secs(30)` for a 30-second timeout.
42    /// Default: 30 seconds.
43    pub request_timeout: std::time::Duration,
44    /// Maximum response body for fetch_session. Default: 1 MiB.
45    pub max_session_body: u64,
46    /// Maximum response body for call(). Default: 8 MiB.
47    pub max_call_body: u64,
48    /// Maximum response body for download_blob(). Default: 64 MiB.
49    pub max_download_body: u64,
50    /// Maximum response body for upload_blob() response parsing. Default: 1 MiB.
51    pub max_upload_body: u64,
52    /// Maximum byte length of a single SSE frame (raw bytes and decoded text).
53    /// Protects against memory exhaustion from a hostile or misbehaving server
54    /// that sends a single very large frame. Must be > 0. Default: 1 MiB.
55    pub max_sse_frame: usize,
56}
57
58impl Default for ClientConfig {
59    fn default() -> Self {
60        ClientConfig {
61            request_timeout: std::time::Duration::from_secs(30),
62            max_session_body: 1024 * 1024,
63            max_call_body: 8 * 1024 * 1024,
64            max_download_body: 64 * 1024 * 1024,
65            max_upload_body: 1024 * 1024,
66            max_sse_frame: 1024 * 1024,
67        }
68    }
69}
70
71impl ClientConfig {
72    /// Validate that all config fields satisfy their constraints.
73    ///
74    /// Called automatically by [`JmapClient::new`].  Callers may also call
75    /// this directly to pre-validate a config before passing it to the
76    /// constructor.
77    ///
78    /// # Errors
79    ///
80    /// Returns [`ClientError::InvalidArgument`] when any field is zero or
81    /// out-of-range.
82    pub fn validate(&self) -> Result<(), ClientError> {
83        if self.max_session_body == 0 {
84            return Err(ClientError::InvalidArgument(
85                "ClientConfig.max_session_body must be > 0".into(),
86            ));
87        }
88        if self.max_call_body == 0 {
89            return Err(ClientError::InvalidArgument(
90                "ClientConfig.max_call_body must be > 0".into(),
91            ));
92        }
93        if self.max_download_body == 0 {
94            return Err(ClientError::InvalidArgument(
95                "ClientConfig.max_download_body must be > 0".into(),
96            ));
97        }
98        if self.max_upload_body == 0 {
99            return Err(ClientError::InvalidArgument(
100                "ClientConfig.max_upload_body must be > 0".into(),
101            ));
102        }
103        if self.request_timeout == std::time::Duration::ZERO {
104            return Err(ClientError::InvalidArgument(
105                "ClientConfig.request_timeout must be > 0; use Duration::from_secs(30) or similar"
106                    .into(),
107            ));
108        }
109        if self.max_sse_frame == 0 {
110            return Err(ClientError::InvalidArgument(
111                "ClientConfig.max_sse_frame must be > 0".into(),
112            ));
113        }
114        Ok(())
115    }
116}
117
118/// Auth-agnostic JMAP base HTTP client.
119///
120/// Construct with [`JmapClient::new`] or [`JmapClient::new_plain`].
121/// Extension-specific clients (`jmap-chat-client`, `jmap-mail-client`) depend
122/// on this crate and add their method implementations via `impl JmapClient`.
123#[derive(Clone)]
124pub struct JmapClient {
125    pub(crate) base_url: url::Url,
126    pub(crate) auth: Arc<dyn AuthProvider>,
127    pub(crate) http: reqwest::Client,
128    pub(crate) config: ClientConfig,
129}
130
131impl std::fmt::Debug for JmapClient {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        f.debug_struct("JmapClient")
134            .field("base_url", &self.base_url)
135            .field("config", &self.config)
136            .finish_non_exhaustive()
137    }
138}
139
140impl JmapClient {
141    /// Create a new client.
142    ///
143    /// `transport` configures the underlying HTTP client (TLS trust roots,
144    /// client certificates, timeouts). `auth` injects per-request credentials
145    /// (Bearer token, Basic credentials, or none). The two are independent so
146    /// any transport can be paired with any credential scheme — for example,
147    /// `CustomCaTransport` with `BearerAuth`. `base_url` must be the server
148    /// origin (scheme, host, optional port) with no path, query, or fragment
149    /// — e.g. `"https://100.64.1.1:8008"`. Trailing slashes are normalized
150    /// away by the URL parser and are therefore accepted.
151    pub fn new(
152        transport: impl TransportConfig,
153        auth: impl AuthProvider + 'static,
154        base_url: &str,
155        config: ClientConfig,
156    ) -> Result<Self, ClientError> {
157        if base_url.is_empty() {
158            return Err(ClientError::InvalidArgument(
159                "base_url may not be empty".into(),
160            ));
161        }
162        let parsed = url::Url::parse(base_url).map_err(|e| {
163            ClientError::InvalidArgument(format!("base_url is not a valid URL: {e}"))
164        })?;
165        let scheme = parsed.scheme();
166        if scheme != "http" && scheme != "https" {
167            return Err(ClientError::InvalidArgument(format!(
168                "base_url scheme must be http or https, got: {scheme:?}"
169            )));
170        }
171        let path = parsed.path();
172        // url::Url::path() returns "/" for root-only URLs (no path segments);
173        // any value other than "/" means the URL contains an explicit path component.
174        if path != "/" {
175            return Err(ClientError::InvalidArgument(format!(
176                "base_url must not have a path component, got: {path:?}"
177            )));
178        }
179        if parsed.query().is_some() {
180            return Err(ClientError::InvalidArgument(
181                "base_url must not have a query string".into(),
182            ));
183        }
184        if parsed.fragment().is_some() {
185            return Err(ClientError::InvalidArgument(
186                "base_url must not have a fragment".into(),
187            ));
188        }
189        config.validate()?;
190        let http = transport.build_client()?;
191        Ok(Self {
192            base_url: parsed,
193            auth: Arc::new(auth),
194            http,
195            config,
196        })
197    }
198
199    /// Convenience constructor for servers with publicly-trusted TLS.
200    ///
201    /// Equivalent to `JmapClient::new(DefaultTransport, auth, base_url, config)`.
202    /// Use [`JmapClient::new`] when you need a custom transport (e.g.
203    /// `CustomCaTransport` for a private-CA server).
204    pub fn new_plain(
205        auth: impl AuthProvider + 'static,
206        base_url: &str,
207        config: ClientConfig,
208    ) -> Result<Self, ClientError> {
209        Self::new(DefaultTransport, auth, base_url, config)
210    }
211
212    /// Apply the auth header (if any) to a request builder.
213    ///
214    /// Centralises the repeated `if let Some(...) = self.auth.auth_header()` pattern
215    /// that every HTTP method uses. Callers: `fetch_session`, `call`,
216    /// `subscribe_events`, `upload_blob`, `download_blob`.
217    pub(crate) fn inject_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
218        if let Some((name, value)) = self.auth.auth_header() {
219            builder.header(name, value)
220        } else {
221            builder
222        }
223    }
224
225    /// Returns `Err(ClientError::AuthFailed)` when the HTTP status indicates an
226    /// authentication or authorization failure.
227    ///
228    /// Specifically handles:
229    /// - 401 Unauthorized (RFC 7235 §3.1) — missing or invalid credentials
230    /// - 403 Forbidden (RFC 7235 §3.2) — credentials present but insufficient
231    ///
232    /// Called before reading the response body so callers can distinguish
233    /// permanent auth failures from transient errors without consuming the body.
234    pub(crate) fn check_auth_status(status: reqwest::StatusCode) -> Result<(), ClientError> {
235        if status == reqwest::StatusCode::UNAUTHORIZED || status == reqwest::StatusCode::FORBIDDEN {
236            Err(ClientError::AuthFailed(status.as_u16()))
237        } else {
238            Ok(())
239        }
240    }
241
242    /// Fetch the JMAP Session object from `{base_url}/.well-known/jmap` (RFC 8620 §2).
243    ///
244    /// The response body is capped at 1 MiB. Returns `ClientError::ResponseTooLarge`
245    /// if the server sends more. Session URL fields (`apiUrl`, `uploadUrl`,
246    /// `downloadUrl`, `eventSourceUrl`) are validated to have http/https scheme;
247    /// a non-http scheme returns `ClientError::InvalidSession`.
248    ///
249    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
250    pub async fn fetch_session(&self) -> Result<Session, ClientError> {
251        let limit = self.config.max_session_body;
252        let url = self.base_url.join(".well-known/jmap").map_err(|e| {
253            // base_url was pre-validated in JmapClient::new; this path is
254            // unreachable in practice but must be handled for completeness.
255            ClientError::InvalidSession(format!("cannot construct session URL: {e}"))
256        })?;
257
258        let req = self.inject_auth(self.http.get(url).timeout(self.config.request_timeout));
259
260        let resp = {
261            let raw_resp = req.send().await.map_err(ClientError::Http)?;
262            Self::check_auth_status(raw_resp.status())?;
263            raw_resp.error_for_status().map_err(ClientError::Http)?
264        };
265
266        // Enforce size cap before reading. Content-Length can lie, so we check
267        // both the header and the actual read size.
268        if let Some(len) = resp.content_length() {
269            if len > limit {
270                return Err(ClientError::ResponseTooLarge { actual: len, limit });
271            }
272        }
273        let bytes = resp.bytes().await.map_err(ClientError::Http)?;
274        if bytes.len() as u64 > limit {
275            return Err(ClientError::ResponseTooLarge {
276                actual: bytes.len() as u64,
277                limit,
278            });
279        }
280
281        let session: Session = serde_json::from_slice(&bytes).map_err(ClientError::Parse)?;
282
283        validate_session_urls(&session)?;
284
285        Ok(session)
286    }
287
288    /// POST a [`jmap_types::JmapRequest`] to `api_url` and return the parsed [`jmap_types::JmapResponse`]
289    /// (RFC 8620 §3.3).
290    ///
291    /// `api_url` is taken as an explicit parameter (not from `self`) because the
292    /// caller holds a [`Session`] and selects the correct URL from it.
293    ///
294    /// The response body is capped at 8 MiB. Returns `ClientError::ResponseTooLarge`
295    /// if the server sends more.
296    ///
297    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
298    pub async fn call(
299        &self,
300        api_url: &str,
301        req: &jmap_types::JmapRequest,
302    ) -> Result<jmap_types::JmapResponse, ClientError> {
303        require_http_url(api_url)?;
304        let limit = self.config.max_call_body;
305
306        let builder = self.inject_auth(
307            self.http
308                .post(api_url)
309                .json(req)
310                .timeout(self.config.request_timeout),
311        );
312
313        let resp = {
314            let raw_resp = builder.send().await.map_err(ClientError::Http)?;
315            Self::check_auth_status(raw_resp.status())?;
316            raw_resp.error_for_status().map_err(ClientError::Http)?
317        };
318
319        // Enforce size cap before reading.
320        if let Some(len) = resp.content_length() {
321            if len > limit {
322                return Err(ClientError::ResponseTooLarge { actual: len, limit });
323            }
324        }
325        let bytes = resp.bytes().await.map_err(ClientError::Http)?;
326        if bytes.len() as u64 > limit {
327            return Err(ClientError::ResponseTooLarge {
328                actual: bytes.len() as u64,
329                limit,
330            });
331        }
332
333        let jmap_resp: jmap_types::JmapResponse =
334            serde_json::from_slice(&bytes).map_err(ClientError::Parse)?;
335
336        Ok(jmap_resp)
337    }
338
339    /// Open an SSE connection to `event_source_url` and return an async stream
340    /// of parsed [`SseFrame`]s (RFC 8620 §7.3).
341    ///
342    /// # URI template expansion
343    ///
344    /// `Session.event_source_url` is a URI template (RFC 6570 Level-1) with
345    /// variables `types`, `closeafter`, and `ping`. You **must** expand it
346    /// before passing it to this function, or the server will receive the
347    /// literal text `{types}` in the URL and return an error. Use
348    /// [`expand_url_template`](crate::expand_url_template):
349    ///
350    /// ```rust,ignore
351    /// let url = jmap_base_client::expand_url_template(
352    ///     &session.event_source_url,
353    ///     &[("types", "*"), ("closeafter", "no"), ("ping", "0")],
354    /// )?;
355    /// let stream = client.subscribe_events(&url, None).await?;
356    /// ```
357    ///
358    /// If `last_event_id` is `Some`, sends a `Last-Event-ID` header so the
359    /// server can resume from where the previous stream left off.
360    ///
361    /// Buffer growth is capped at [`ClientConfig::max_sse_frame`] bytes per
362    /// frame (default: 1 MiB). If a single SSE frame exceeds this limit the
363    /// stream yields `ClientError::SseFrameTooLarge` and terminates.
364    ///
365    /// No timeout is applied to this call or to the resulting stream.  The
366    /// connect timeout (10 s, TCP only) is the only deadline enforced.  If the
367    /// server stalls before sending HTTP response headers, or later goes silent
368    /// on the open connection, this call or the stream will hang indefinitely.
369    /// Wrap the entire call and/or stream iteration in [`tokio::time::timeout`]
370    /// if you need to bound either phase.
371    ///
372    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403 before the stream
373    /// starts.
374    pub async fn subscribe_events(
375        &self,
376        event_source_url: &str,
377        last_event_id: Option<&str>,
378    ) -> Result<futures::stream::BoxStream<'static, Result<SseFrame, ClientError>>, ClientError>
379    {
380        require_http_url(event_source_url)?;
381        let mut req = self
382            .http
383            .get(event_source_url)
384            .header("Accept", "text/event-stream");
385        if let Some(id) = last_event_id {
386            req = req.header("Last-Event-ID", id);
387        }
388        let req = self.inject_auth(req);
389
390        let resp = req.send().await.map_err(ClientError::Http)?;
391        Self::check_auth_status(resp.status())?;
392        let resp = resp.error_for_status().map_err(ClientError::Http)?;
393
394        // Verify Content-Type before streaming. A misconfigured server returning
395        // application/json would silently produce no events (no SSE delimiter found).
396        {
397            // to_ascii_lowercase(): media types are case-insensitive per RFC 7231 §3.1.1.1.
398            let ct = resp
399                .headers()
400                .get(reqwest::header::CONTENT_TYPE)
401                .and_then(|v| v.to_str().ok())
402                .unwrap_or("")
403                .to_ascii_lowercase();
404            if !ct.starts_with("text/event-stream") {
405                return Err(ClientError::UnexpectedResponse(format!(
406                    "subscribe_events: expected Content-Type text/event-stream, got: {ct:?}"
407                )));
408            }
409        }
410
411        let byte_stream = resp.bytes_stream();
412        let sse_frame_limit = self.config.max_sse_frame;
413
414        Ok(futures::stream::unfold(
415            Some(SseStreamState {
416                stream: byte_stream,
417                raw_buf: Vec::new(),
418                buf: String::new(),
419                scan_from: 0, // invariant: valid UTF-8 char boundary of buf; 0 always satisfies this
420            }),
421            move |state| async move {
422                let SseStreamState {
423                    mut stream,
424                    mut raw_buf,
425                    mut buf,
426                    mut scan_from,
427                } = state?;
428                loop {
429                    // Search for any double-newline delimiter (LF/CRLF/CR variants).
430                    // scan_from is set to old_len.saturating_sub(3) after each append
431                    // so we only re-scan the overlap region.  3 bytes back is the
432                    // minimum that covers all delimiter prefixes that can straddle a
433                    // chunk boundary:
434                    //   - `\r\n\r\n` (4 bytes): longest prefix that fits in one chunk
435                    //     but is incomplete is `\r\n\r` (3 bytes) — exactly covered.
436                    //   - `\n\r\n` (3 bytes): longest incomplete prefix is `\n\r` (2
437                    //     bytes) — covered by the 3-byte overlap.
438                    //   - `\n\n` and `\r\r` (2 bytes each): longest incomplete prefix
439                    //     is 1 byte — covered by the 3-byte overlap.
440                    // Since \r and \n are single-byte UTF-8 codepoints, 3 bytes back
441                    // is always a valid char boundary — no adjustment needed.
442                    let frame_end = [
443                        buf[scan_from..]
444                            .find("\r\n\r\n")
445                            .map(|p| (scan_from + p, 4usize)),
446                        buf[scan_from..]
447                            .find("\n\n")
448                            .map(|p| (scan_from + p, 2usize)),
449                        buf[scan_from..]
450                            .find("\r\r")
451                            .map(|p| (scan_from + p, 2usize)),
452                        // Mixed: LF-terminated last field line followed by a
453                        // CRLF-terminated blank line.  Not detected by \n\n (the
454                        // LFs are separated by a CR) or \r\n\r\n (no leading \r\n).
455                        buf[scan_from..]
456                            .find("\n\r\n")
457                            .map(|p| (scan_from + p, 3usize)),
458                    ]
459                    .into_iter()
460                    .flatten()
461                    .min_by_key(|&(pos, _)| pos);
462
463                    if let Some((pos, delim_len)) = frame_end {
464                        let frame = {
465                            let slice = &buf[..pos];
466                            if slice.contains('\r') {
467                                slice.replace("\r\n", "\n").replace('\r', "\n")
468                            } else {
469                                slice.to_owned()
470                            }
471                        };
472                        buf.drain(..pos + delim_len);
473                        scan_from = 0; // 0 satisfies the UTF-8 char boundary invariant
474                        let sse_frame = parse_sse_block(&frame);
475                        return Some((
476                            Ok(sse_frame),
477                            Some(SseStreamState {
478                                stream,
479                                raw_buf,
480                                buf,
481                                scan_from,
482                            }),
483                        ));
484                    }
485
486                    match stream.next().await {
487                        None => return None,
488                        Some(Err(e)) => {
489                            return Some((Err(ClientError::Http(e)), None));
490                        }
491                        Some(Ok(bytes)) => {
492                            // Accumulate raw bytes first. A multi-byte UTF-8 codepoint
493                            // may be split across adjacent HTTP chunks; decode only the
494                            // valid prefix and leave the remainder in raw_buf until the
495                            // next chunk completes the sequence.
496                            raw_buf.extend_from_slice(&bytes);
497                            // Cap raw_buf to prevent OOM on persistent invalid UTF-8 input.
498                            // Use the same limit as the decoded buf cap.
499                            if raw_buf.len() > sse_frame_limit {
500                                return Some((
501                                    Err(ClientError::SseFrameTooLarge {
502                                        limit: sse_frame_limit,
503                                    }),
504                                    None,
505                                ));
506                            }
507                            let old_len = buf.len();
508                            decode_utf8_chunk(&mut raw_buf, &mut buf);
509                            scan_from = old_len.saturating_sub(3);
510                            // Walk backward to a valid UTF-8 char boundary so that
511                            // buf[scan_from..] never panics on multibyte characters.
512                            while scan_from > 0 && !buf.is_char_boundary(scan_from) {
513                                scan_from -= 1;
514                            }
515                            // Guard against unbounded buffer growth from a hostile server.
516                            // Yield the error and terminate (state = None).
517                            if buf.len() > sse_frame_limit {
518                                return Some((
519                                    Err(ClientError::SseFrameTooLarge {
520                                        limit: sse_frame_limit,
521                                    }),
522                                    None,
523                                ));
524                            }
525                        }
526                    }
527                }
528            },
529        )
530        .boxed())
531    }
532}
533
534/// Find the method response matching `call_id` in `resp` and deserialize its
535/// arguments into `T`.
536///
537/// Returns [`ClientError::MethodNotFound`] if no invocation with the given
538/// call_id exists. Returns [`ClientError::MethodError`] if the matched
539/// invocation is a JMAP `"error"` response (RFC 8620 §3.6.1).
540///
541/// This function is `pub` so extension crates (`jmap-chat-client`,
542/// `jmap-mail-client`) can use it to extract typed results from a
543/// [`jmap_types::JmapResponse`] without depending on internal details.
544pub fn extract_response<T: serde::de::DeserializeOwned>(
545    resp: &jmap_types::JmapResponse,
546    call_id: &str,
547) -> Result<T, ClientError> {
548    // Invocation is a type alias (String, Value, String) = (method, args, call_id)
549    let inv = resp
550        .method_responses
551        .iter()
552        .find(|inv| inv.2 == call_id)
553        .ok_or_else(|| ClientError::MethodNotFound(call_id.to_owned()))?;
554    let (method_name, args, _) = inv;
555
556    // RFC 8620 §3.6.1: a method name of "error" signals a protocol-level error.
557    if method_name == "error" {
558        let err_type = args
559            .get("type")
560            .and_then(|v| v.as_str())
561            .unwrap_or("serverError") // safe: fallback literal, not user input
562            .to_owned();
563        let description = args
564            .get("description")
565            .and_then(|v| v.as_str())
566            .map(str::to_owned);
567        return Err(ClientError::MethodError {
568            error_type: err_type,
569            description,
570        });
571    }
572
573    <T as serde::Deserialize>::deserialize(args).map_err(ClientError::Parse)
574}
575
576/// Decode as much valid UTF-8 as possible from `raw` into `buf`, draining
577/// consumed (or definitively-invalid) bytes from `raw`.
578///
579/// Three cases:
580/// - `raw` is fully valid UTF-8: pushed entirely to `buf`, `raw` cleared.
581/// - `raw` ends with an **incomplete** multi-byte sequence (`error_len == None`):
582///   the valid prefix is pushed to `buf` and drained from `raw`; the incomplete
583///   head bytes stay in `raw` for the next chunk to complete them.
584/// - `raw` contains a **definitively invalid** byte sequence (`error_len == Some(n)`):
585///   the valid prefix is pushed to `buf`; the valid prefix AND the `n` invalid
586///   bytes are drained from `raw` so they do not accumulate.
587///
588/// The caller is responsible for capping `raw.len()` before calling this
589/// function; unbounded growth from a hostile server that never completes a
590/// sequence is prevented by the 1 MiB `SSE_BUF_SIZE_LIMIT` check in
591/// `subscribe_events`.
592fn decode_utf8_chunk(raw: &mut Vec<u8>, buf: &mut String) {
593    match std::str::from_utf8(raw) {
594        Ok(s) => {
595            buf.push_str(s);
596            raw.clear();
597        }
598        Err(e) => {
599            let valid_up_to = e.valid_up_to();
600            // valid_up_to is always a char boundary by definition.
601            buf.push_str(
602                std::str::from_utf8(&raw[..valid_up_to])
603                    .expect("valid_up_to is a valid UTF-8 boundary"),
604            );
605            match e.error_len() {
606                Some(n) => {
607                    // Definitively invalid: drain the valid prefix AND the
608                    // invalid bytes so they do not accumulate in raw.
609                    let drain_end = (valid_up_to + n).min(raw.len());
610                    raw.drain(..drain_end);
611                }
612                None => {
613                    // Incomplete multi-byte sequence: drain only the valid
614                    // prefix.  The incomplete head stays in raw until the next
615                    // chunk arrives with the missing continuation bytes.
616                    raw.drain(..valid_up_to);
617                }
618            }
619        }
620    }
621}
622
623/// Extract the URL scheme as a lowercase ASCII string.
624///
625/// Returns the prefix before `"://"`, lowercased per RFC 3986 §3.1 (schemes
626/// are case-insensitive). Returns an empty string if `"://"` is not present.
627/// URL templates containing `{variable}` syntax are handled correctly because
628/// the extraction is a prefix scan, not a full URL parse.
629fn url_scheme(url: &str) -> String {
630    url.find("://")
631        .map(|i| url[..i].to_ascii_lowercase())
632        .unwrap_or_default()
633}
634
635/// Validate that `url` uses an http or https scheme.
636///
637/// Called at the top of each public method that accepts a URL parameter
638/// (`call`, `subscribe_events`, `upload_blob`, `download_blob`).  This is a
639/// defense-in-depth check: the primary protection is [`validate_session_urls`]
640/// which rejects bad URLs in the Session document at fetch time.  This check
641/// makes each individual call site self-defending against accidentally passing
642/// a non-http URL (e.g. from a test fixture or a misused API).
643///
644/// Returns [`ClientError::InvalidArgument`] if the scheme is not http/https.
645pub(crate) fn require_http_url(url: &str) -> Result<(), ClientError> {
646    let scheme = url_scheme(url);
647    if scheme != "http" && scheme != "https" {
648        return Err(ClientError::InvalidArgument(format!(
649            "URL must have http or https scheme, got: {url:?}"
650        )));
651    }
652    Ok(())
653}
654
655/// Validate that all URL fields in `session` use an http or https scheme.
656///
657/// Returns `ClientError::InvalidSession` if any URL has a non-http/https scheme.
658/// This prevents a malicious server from injecting non-HTTP URLs into subsequent
659/// requests (e.g. `file://`, `ftp://`).
660///
661/// Scheme comparison is case-insensitive per RFC 3986 §3.1: both `http://` and
662/// `HTTP://` are accepted.  Session URL templates may contain `{variable}`
663/// syntax that prevents full URL parsing, so only the scheme prefix is checked.
664fn validate_session_urls(session: &Session) -> Result<(), ClientError> {
665    for url in [
666        &session.api_url,
667        &session.upload_url,
668        &session.download_url,
669        &session.event_source_url,
670    ] {
671        let scheme = url_scheme(url);
672        if scheme != "http" && scheme != "https" {
673            return Err(ClientError::InvalidSession(format!(
674                "session URL has non-http/https scheme: {:?}",
675                url
676            )));
677        }
678    }
679    Ok(())
680}
681
682// ---------------------------------------------------------------------------
683// Tests
684// ---------------------------------------------------------------------------
685
686#[cfg(test)]
687mod tests {
688    use super::decode_utf8_chunk;
689
690    /// Oracle: all-ASCII bytes pushed to buf; raw cleared.
691    #[test]
692    fn decode_utf8_chunk_all_ascii() {
693        let mut raw = b"hello".to_vec();
694        let mut buf = String::new();
695        decode_utf8_chunk(&mut raw, &mut buf);
696        assert_eq!(buf, "hello");
697        assert!(raw.is_empty());
698    }
699
700    /// Oracle: complete multi-byte codepoint (U+00E9 café = 0xC3 0xA9) pushed fully.
701    #[test]
702    fn decode_utf8_chunk_complete_multibyte() {
703        let mut raw = "café".as_bytes().to_vec();
704        let mut buf = String::new();
705        decode_utf8_chunk(&mut raw, &mut buf);
706        assert_eq!(buf, "café");
707        assert!(raw.is_empty());
708    }
709
710    /// Oracle: first byte of a 2-byte sequence (U+00E9 = 0xC3 0xA9) arrives alone.
711    /// error_len == None (incomplete) — the byte must be RETAINED in raw, not dropped.
712    /// Regression test for the bug where valid_up_to.max(1) discarded this byte.
713    #[test]
714    fn decode_utf8_chunk_incomplete_head_retained() {
715        let mut raw = vec![0xC3u8]; // first byte of 2-byte sequence
716        let mut buf = String::new();
717        decode_utf8_chunk(&mut raw, &mut buf);
718        assert_eq!(buf, "", "no complete codepoints to push");
719        assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
720    }
721
722    /// Oracle: valid ASCII prefix then first byte of a 2-byte sequence.
723    /// Valid prefix goes to buf; incomplete head stays in raw.
724    #[test]
725    fn decode_utf8_chunk_prefix_then_incomplete_head() {
726        let mut raw = vec![b'a', b'b', 0xC3u8];
727        let mut buf = String::new();
728        decode_utf8_chunk(&mut raw, &mut buf);
729        assert_eq!(buf, "ab");
730        assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
731    }
732
733    /// Oracle: two-call simulation — incomplete sequence completed on second call.
734    /// This is the exact HTTP chunk-split scenario the fix targets.
735    #[test]
736    fn decode_utf8_chunk_split_sequence_completed() {
737        // Chunk 1: only the first byte of U+00E9 (é = 0xC3 0xA9)
738        let mut raw = vec![0xC3u8];
739        let mut buf = String::new();
740        decode_utf8_chunk(&mut raw, &mut buf);
741        assert_eq!(raw, vec![0xC3u8], "incomplete head retained after chunk 1");
742
743        // Chunk 2: completion byte
744        raw.push(0xA9u8);
745        decode_utf8_chunk(&mut raw, &mut buf);
746        assert_eq!(buf, "é", "character fully decoded after chunk 2");
747        assert!(raw.is_empty());
748    }
749
750    /// Oracle: definitively invalid byte (0xFF is never valid in UTF-8) is drained.
751    #[test]
752    fn decode_utf8_chunk_invalid_byte_drained() {
753        let mut raw = vec![0xFFu8];
754        let mut buf = String::new();
755        decode_utf8_chunk(&mut raw, &mut buf);
756        assert_eq!(buf, "");
757        assert!(raw.is_empty(), "definitively invalid byte must be drained");
758    }
759
760    /// Oracle: valid prefix then definitively invalid byte — prefix pushed and both drained.
761    #[test]
762    fn decode_utf8_chunk_prefix_then_invalid_drained() {
763        let mut raw = vec![b'a', b'b', 0xFFu8];
764        let mut buf = String::new();
765        decode_utf8_chunk(&mut raw, &mut buf);
766        assert_eq!(buf, "ab");
767        assert!(raw.is_empty(), "prefix and invalid byte must be drained");
768    }
769}