jmap_base_client/client.rs
1//! Auth-agnostic base JMAP HTTP client (RFC 8620).
2//!
3//! Provides [`JmapClient`] for session fetch, API calls, blob transfer,
4//! SSE event streaming, and [`extract_response`] for parsing method results.
5
6use std::sync::Arc;
7
8use futures::StreamExt;
9
10use crate::auth::{AuthProvider, DefaultTransport, TransportConfig};
11use crate::error::ClientError;
12use crate::request::Session;
13use crate::sse::{parse_sse_block, SseFrame};
14
15/// Internal state threaded through the `subscribe_events` unfold loop.
16struct SseStreamState<S> {
17 stream: S,
18 /// Accumulates raw bytes from the HTTP stream before UTF-8 decoding.
19 /// Incomplete multi-byte sequences remain here until the next chunk
20 /// completes them, preventing stream termination when a codepoint is
21 /// split across adjacent chunks.
22 raw_buf: Vec<u8>,
23 buf: String,
24 /// Byte offset from which the next delimiter scan begins.
25 /// Must always be a valid UTF-8 char boundary of `buf`.
26 scan_from: usize,
27}
28
29/// Per-client configuration for timeouts and body size limits.
30///
31/// Use [`ClientConfig::default()`] for production defaults (30s timeout, RFC-safe caps).
32///
33/// This type is `#[non_exhaustive]`: callers outside this crate must use
34/// `..ClientConfig::default()` when constructing it, allowing new fields to
35/// be added in minor versions without breaking callers.
36#[non_exhaustive]
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct ClientConfig {
39 /// Timeout for HTTP request/response cycles (fetch_session, call, upload_blob, download_blob).
40 /// Does NOT apply to SSE or WebSocket streams (which are indefinite by nature).
41 /// Must be > 0. Use `Duration::from_secs(30)` for a 30-second timeout.
42 /// Default: 30 seconds.
43 pub request_timeout: std::time::Duration,
44 /// Maximum response body for fetch_session. Default: 1 MiB.
45 pub max_session_body: u64,
46 /// Maximum response body for call(). Default: 8 MiB.
47 pub max_call_body: u64,
48 /// Maximum response body for download_blob(). Default: 64 MiB.
49 pub max_download_body: u64,
50 /// Maximum response body for upload_blob() response parsing. Default: 1 MiB.
51 pub max_upload_body: u64,
52 /// Maximum byte length of a single SSE frame (raw bytes and decoded text).
53 /// Protects against memory exhaustion from a hostile or misbehaving server
54 /// that sends a single very large frame. Must be > 0. Default: 1 MiB.
55 pub max_sse_frame: usize,
56}
57
58impl Default for ClientConfig {
59 fn default() -> Self {
60 ClientConfig {
61 request_timeout: std::time::Duration::from_secs(30),
62 max_session_body: 1024 * 1024,
63 max_call_body: 8 * 1024 * 1024,
64 max_download_body: 64 * 1024 * 1024,
65 max_upload_body: 1024 * 1024,
66 max_sse_frame: 1024 * 1024,
67 }
68 }
69}
70
71impl ClientConfig {
72 /// Validate that all config fields satisfy their constraints.
73 ///
74 /// Called automatically by [`JmapClient::new`]. Callers may also call
75 /// this directly to pre-validate a config before passing it to the
76 /// constructor.
77 ///
78 /// # Errors
79 ///
80 /// Returns [`ClientError::InvalidArgument`] when any field is zero or
81 /// out-of-range.
82 pub fn validate(&self) -> Result<(), ClientError> {
83 if self.max_session_body == 0 {
84 return Err(ClientError::InvalidArgument(
85 "ClientConfig.max_session_body must be > 0".into(),
86 ));
87 }
88 if self.max_call_body == 0 {
89 return Err(ClientError::InvalidArgument(
90 "ClientConfig.max_call_body must be > 0".into(),
91 ));
92 }
93 if self.max_download_body == 0 {
94 return Err(ClientError::InvalidArgument(
95 "ClientConfig.max_download_body must be > 0".into(),
96 ));
97 }
98 if self.max_upload_body == 0 {
99 return Err(ClientError::InvalidArgument(
100 "ClientConfig.max_upload_body must be > 0".into(),
101 ));
102 }
103 if self.request_timeout == std::time::Duration::ZERO {
104 return Err(ClientError::InvalidArgument(
105 "ClientConfig.request_timeout must be > 0; use Duration::from_secs(30) or similar"
106 .into(),
107 ));
108 }
109 if self.max_sse_frame == 0 {
110 return Err(ClientError::InvalidArgument(
111 "ClientConfig.max_sse_frame must be > 0".into(),
112 ));
113 }
114 Ok(())
115 }
116}
117
118/// Auth-agnostic JMAP base HTTP client.
119///
120/// Construct with [`JmapClient::new`] or [`JmapClient::new_plain`].
121/// Extension-specific clients (`jmap-chat-client`, `jmap-mail-client`) depend
122/// on this crate and add their method implementations via `impl JmapClient`.
123#[derive(Clone)]
124pub struct JmapClient {
125 pub(crate) base_url: url::Url,
126 pub(crate) auth: Arc<dyn AuthProvider>,
127 pub(crate) http: reqwest::Client,
128 pub(crate) config: ClientConfig,
129}
130
131impl std::fmt::Debug for JmapClient {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 f.debug_struct("JmapClient")
134 .field("base_url", &self.base_url)
135 .field("config", &self.config)
136 .finish_non_exhaustive()
137 }
138}
139
140impl JmapClient {
141 /// Create a new client.
142 ///
143 /// `transport` configures the underlying HTTP client (TLS trust roots,
144 /// client certificates, timeouts). `auth` injects per-request credentials
145 /// (Bearer token, Basic credentials, or none). The two are independent so
146 /// any transport can be paired with any credential scheme — for example,
147 /// `CustomCaTransport` with `BearerAuth`. `base_url` must be the server
148 /// origin (scheme, host, optional port) with no path, query, or fragment
149 /// — e.g. `"https://100.64.1.1:8008"`. Trailing slashes are normalized
150 /// away by the URL parser and are therefore accepted.
151 pub fn new(
152 transport: impl TransportConfig,
153 auth: impl AuthProvider + 'static,
154 base_url: &str,
155 config: ClientConfig,
156 ) -> Result<Self, ClientError> {
157 if base_url.is_empty() {
158 return Err(ClientError::InvalidArgument(
159 "base_url may not be empty".into(),
160 ));
161 }
162 let parsed = url::Url::parse(base_url).map_err(|e| {
163 ClientError::InvalidArgument(format!("base_url is not a valid URL: {e}"))
164 })?;
165 let scheme = parsed.scheme();
166 if scheme != "http" && scheme != "https" {
167 return Err(ClientError::InvalidArgument(format!(
168 "base_url scheme must be http or https, got: {scheme:?}"
169 )));
170 }
171 let path = parsed.path();
172 // url::Url::path() returns "/" for root-only URLs (no path segments);
173 // any value other than "/" means the URL contains an explicit path component.
174 if path != "/" {
175 return Err(ClientError::InvalidArgument(format!(
176 "base_url must not have a path component, got: {path:?}"
177 )));
178 }
179 if parsed.query().is_some() {
180 return Err(ClientError::InvalidArgument(
181 "base_url must not have a query string".into(),
182 ));
183 }
184 if parsed.fragment().is_some() {
185 return Err(ClientError::InvalidArgument(
186 "base_url must not have a fragment".into(),
187 ));
188 }
189 config.validate()?;
190 let http = transport.build_client()?;
191 Ok(Self {
192 base_url: parsed,
193 auth: Arc::new(auth),
194 http,
195 config,
196 })
197 }
198
199 /// Convenience constructor for servers with publicly-trusted TLS.
200 ///
201 /// Equivalent to `JmapClient::new(DefaultTransport, auth, base_url, config)`.
202 /// Use [`JmapClient::new`] when you need a custom transport (e.g.
203 /// `CustomCaTransport` for a private-CA server).
204 pub fn new_plain(
205 auth: impl AuthProvider + 'static,
206 base_url: &str,
207 config: ClientConfig,
208 ) -> Result<Self, ClientError> {
209 Self::new(DefaultTransport, auth, base_url, config)
210 }
211
212 /// Apply the auth header (if any) to a request builder.
213 ///
214 /// Centralises the repeated `if let Some(...) = self.auth.auth_header()` pattern
215 /// that every HTTP method uses. Callers: `fetch_session`, `call`,
216 /// `subscribe_events`, `upload_blob`, `download_blob`.
217 pub(crate) fn inject_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
218 if let Some((name, value)) = self.auth.auth_header() {
219 builder.header(name, value)
220 } else {
221 builder
222 }
223 }
224
225 /// Returns `Err(ClientError::AuthFailed)` when the HTTP status indicates an
226 /// authentication or authorization failure.
227 ///
228 /// Specifically handles:
229 /// - 401 Unauthorized (RFC 7235 §3.1) — missing or invalid credentials
230 /// - 403 Forbidden (RFC 7235 §3.2) — credentials present but insufficient
231 ///
232 /// Called before reading the response body so callers can distinguish
233 /// permanent auth failures from transient errors without consuming the body.
234 pub(crate) fn check_auth_status(status: reqwest::StatusCode) -> Result<(), ClientError> {
235 if status == reqwest::StatusCode::UNAUTHORIZED || status == reqwest::StatusCode::FORBIDDEN {
236 Err(ClientError::AuthFailed(status.as_u16()))
237 } else {
238 Ok(())
239 }
240 }
241
242 /// Fetch the JMAP Session object from `{base_url}/.well-known/jmap` (RFC 8620 §2).
243 ///
244 /// The response body is capped at 1 MiB. Returns `ClientError::ResponseTooLarge`
245 /// if the server sends more. Session URL fields (`apiUrl`, `uploadUrl`,
246 /// `downloadUrl`, `eventSourceUrl`) are validated to have http/https scheme;
247 /// a non-http scheme returns `ClientError::InvalidSession`.
248 ///
249 /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
250 pub async fn fetch_session(&self) -> Result<Session, ClientError> {
251 let limit = self.config.max_session_body;
252 let url = self.base_url.join(".well-known/jmap").map_err(|e| {
253 // base_url was pre-validated in JmapClient::new; this path is
254 // unreachable in practice but must be handled for completeness.
255 ClientError::InvalidSession(format!("cannot construct session URL: {e}"))
256 })?;
257
258 let req = self.inject_auth(self.http.get(url).timeout(self.config.request_timeout));
259
260 let resp = {
261 let raw_resp = req.send().await.map_err(ClientError::Http)?;
262 Self::check_auth_status(raw_resp.status())?;
263 raw_resp.error_for_status().map_err(ClientError::Http)?
264 };
265
266 // Enforce size cap before reading. Content-Length can lie, so we check
267 // both the header and the actual read size.
268 if let Some(len) = resp.content_length() {
269 if len > limit {
270 return Err(ClientError::ResponseTooLarge { actual: len, limit });
271 }
272 }
273 let bytes = resp.bytes().await.map_err(ClientError::Http)?;
274 if bytes.len() as u64 > limit {
275 return Err(ClientError::ResponseTooLarge {
276 actual: bytes.len() as u64,
277 limit,
278 });
279 }
280
281 let session: Session = serde_json::from_slice(&bytes).map_err(ClientError::Parse)?;
282
283 validate_session_urls(&session)?;
284
285 Ok(session)
286 }
287
288 /// POST a [`jmap_types::JmapRequest`] to `api_url` and return the parsed [`jmap_types::JmapResponse`]
289 /// (RFC 8620 §3.3).
290 ///
291 /// `api_url` is taken as an explicit parameter (not from `self`) because the
292 /// caller holds a [`Session`] and selects the correct URL from it.
293 ///
294 /// The response body is capped at 8 MiB. Returns `ClientError::ResponseTooLarge`
295 /// if the server sends more.
296 ///
297 /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
298 pub async fn call(
299 &self,
300 api_url: &str,
301 req: &jmap_types::JmapRequest,
302 ) -> Result<jmap_types::JmapResponse, ClientError> {
303 require_http_url(api_url)?;
304 let limit = self.config.max_call_body;
305
306 let builder = self.inject_auth(
307 self.http
308 .post(api_url)
309 .json(req)
310 .timeout(self.config.request_timeout),
311 );
312
313 let resp = {
314 let raw_resp = builder.send().await.map_err(ClientError::Http)?;
315 Self::check_auth_status(raw_resp.status())?;
316 raw_resp.error_for_status().map_err(ClientError::Http)?
317 };
318
319 // Enforce size cap before reading.
320 if let Some(len) = resp.content_length() {
321 if len > limit {
322 return Err(ClientError::ResponseTooLarge { actual: len, limit });
323 }
324 }
325 let bytes = resp.bytes().await.map_err(ClientError::Http)?;
326 if bytes.len() as u64 > limit {
327 return Err(ClientError::ResponseTooLarge {
328 actual: bytes.len() as u64,
329 limit,
330 });
331 }
332
333 let jmap_resp: jmap_types::JmapResponse =
334 serde_json::from_slice(&bytes).map_err(ClientError::Parse)?;
335
336 Ok(jmap_resp)
337 }
338
339 /// Open an SSE connection to `event_source_url` and return an async stream
340 /// of parsed [`SseFrame`]s (RFC 8620 §7.3).
341 ///
342 /// # URI template expansion
343 ///
344 /// `Session.event_source_url` is a URI template (RFC 6570 Level-1) with
345 /// variables `types`, `closeafter`, and `ping`. You **must** expand it
346 /// before passing it to this function, or the server will receive the
347 /// literal text `{types}` in the URL and return an error. Use
348 /// [`expand_url_template`](crate::expand_url_template):
349 ///
350 /// ```rust,ignore
351 /// let url = jmap_base_client::expand_url_template(
352 /// &session.event_source_url,
353 /// &[("types", "*"), ("closeafter", "no"), ("ping", "0")],
354 /// )?;
355 /// let stream = client.subscribe_events(&url, None).await?;
356 /// ```
357 ///
358 /// If `last_event_id` is `Some`, sends a `Last-Event-ID` header so the
359 /// server can resume from where the previous stream left off.
360 ///
361 /// Buffer growth is capped at [`ClientConfig::max_sse_frame`] bytes per
362 /// frame (default: 1 MiB). If a single SSE frame exceeds this limit the
363 /// stream yields `ClientError::SseFrameTooLarge` and terminates.
364 ///
365 /// No timeout is applied to this call or to the resulting stream. The
366 /// connect timeout (10 s, TCP only) is the only deadline enforced. If the
367 /// server stalls before sending HTTP response headers, or later goes silent
368 /// on the open connection, this call or the stream will hang indefinitely.
369 /// Wrap the entire call and/or stream iteration in [`tokio::time::timeout`]
370 /// if you need to bound either phase.
371 ///
372 /// Returns `ClientError::AuthFailed` on HTTP 401 or 403 before the stream
373 /// starts.
374 pub async fn subscribe_events(
375 &self,
376 event_source_url: &str,
377 last_event_id: Option<&str>,
378 ) -> Result<futures::stream::BoxStream<'static, Result<SseFrame, ClientError>>, ClientError>
379 {
380 require_http_url(event_source_url)?;
381 let mut req = self
382 .http
383 .get(event_source_url)
384 .header("Accept", "text/event-stream");
385 if let Some(id) = last_event_id {
386 req = req.header("Last-Event-ID", id);
387 }
388 let req = self.inject_auth(req);
389
390 let resp = req.send().await.map_err(ClientError::Http)?;
391 Self::check_auth_status(resp.status())?;
392 let resp = resp.error_for_status().map_err(ClientError::Http)?;
393
394 // Verify Content-Type before streaming. A misconfigured server returning
395 // application/json would silently produce no events (no SSE delimiter found).
396 {
397 // to_ascii_lowercase(): media types are case-insensitive per RFC 7231 §3.1.1.1.
398 let ct = resp
399 .headers()
400 .get(reqwest::header::CONTENT_TYPE)
401 .and_then(|v| v.to_str().ok())
402 .unwrap_or("")
403 .to_ascii_lowercase();
404 if !ct.starts_with("text/event-stream") {
405 return Err(ClientError::UnexpectedResponse(format!(
406 "subscribe_events: expected Content-Type text/event-stream, got: {ct:?}"
407 )));
408 }
409 }
410
411 let byte_stream = resp.bytes_stream();
412 let sse_frame_limit = self.config.max_sse_frame;
413
414 Ok(futures::stream::unfold(
415 Some(SseStreamState {
416 stream: byte_stream,
417 raw_buf: Vec::new(),
418 buf: String::new(),
419 scan_from: 0, // invariant: valid UTF-8 char boundary of buf; 0 always satisfies this
420 }),
421 move |state| async move {
422 let SseStreamState {
423 mut stream,
424 mut raw_buf,
425 mut buf,
426 mut scan_from,
427 } = state?;
428 loop {
429 // Search for any double-newline delimiter (LF/CRLF/CR variants).
430 // scan_from is set to old_len.saturating_sub(3) after each append
431 // so we only re-scan the overlap region. 3 bytes back is the
432 // minimum that covers all delimiter prefixes that can straddle a
433 // chunk boundary:
434 // - `\r\n\r\n` (4 bytes): longest prefix that fits in one chunk
435 // but is incomplete is `\r\n\r` (3 bytes) — exactly covered.
436 // - `\n\r\n` (3 bytes): longest incomplete prefix is `\n\r` (2
437 // bytes) — covered by the 3-byte overlap.
438 // - `\n\n` and `\r\r` (2 bytes each): longest incomplete prefix
439 // is 1 byte — covered by the 3-byte overlap.
440 // Since \r and \n are single-byte UTF-8 codepoints, 3 bytes back
441 // is always a valid char boundary — no adjustment needed.
442 let frame_end = [
443 buf[scan_from..]
444 .find("\r\n\r\n")
445 .map(|p| (scan_from + p, 4usize)),
446 buf[scan_from..]
447 .find("\n\n")
448 .map(|p| (scan_from + p, 2usize)),
449 buf[scan_from..]
450 .find("\r\r")
451 .map(|p| (scan_from + p, 2usize)),
452 // Mixed: LF-terminated last field line followed by a
453 // CRLF-terminated blank line. Not detected by \n\n (the
454 // LFs are separated by a CR) or \r\n\r\n (no leading \r\n).
455 buf[scan_from..]
456 .find("\n\r\n")
457 .map(|p| (scan_from + p, 3usize)),
458 ]
459 .into_iter()
460 .flatten()
461 .min_by_key(|&(pos, _)| pos);
462
463 if let Some((pos, delim_len)) = frame_end {
464 let frame = {
465 let slice = &buf[..pos];
466 if slice.contains('\r') {
467 slice.replace("\r\n", "\n").replace('\r', "\n")
468 } else {
469 slice.to_owned()
470 }
471 };
472 buf.drain(..pos + delim_len);
473 scan_from = 0; // 0 satisfies the UTF-8 char boundary invariant
474 let sse_frame = parse_sse_block(&frame);
475 return Some((
476 Ok(sse_frame),
477 Some(SseStreamState {
478 stream,
479 raw_buf,
480 buf,
481 scan_from,
482 }),
483 ));
484 }
485
486 match stream.next().await {
487 None => return None,
488 Some(Err(e)) => {
489 return Some((Err(ClientError::Http(e)), None));
490 }
491 Some(Ok(bytes)) => {
492 // Accumulate raw bytes first. A multi-byte UTF-8 codepoint
493 // may be split across adjacent HTTP chunks; decode only the
494 // valid prefix and leave the remainder in raw_buf until the
495 // next chunk completes the sequence.
496 raw_buf.extend_from_slice(&bytes);
497 // Cap raw_buf to prevent OOM on persistent invalid UTF-8 input.
498 // Use the same limit as the decoded buf cap.
499 if raw_buf.len() > sse_frame_limit {
500 return Some((
501 Err(ClientError::SseFrameTooLarge {
502 limit: sse_frame_limit,
503 }),
504 None,
505 ));
506 }
507 let old_len = buf.len();
508 decode_utf8_chunk(&mut raw_buf, &mut buf);
509 scan_from = old_len.saturating_sub(3);
510 // Walk backward to a valid UTF-8 char boundary so that
511 // buf[scan_from..] never panics on multibyte characters.
512 while scan_from > 0 && !buf.is_char_boundary(scan_from) {
513 scan_from -= 1;
514 }
515 // Guard against unbounded buffer growth from a hostile server.
516 // Yield the error and terminate (state = None).
517 if buf.len() > sse_frame_limit {
518 return Some((
519 Err(ClientError::SseFrameTooLarge {
520 limit: sse_frame_limit,
521 }),
522 None,
523 ));
524 }
525 }
526 }
527 }
528 },
529 )
530 .boxed())
531 }
532}
533
534/// Find the method response matching `call_id` in `resp` and deserialize its
535/// arguments into `T`.
536///
537/// Returns [`ClientError::MethodNotFound`] if no invocation with the given
538/// call_id exists. Returns [`ClientError::MethodError`] if the matched
539/// invocation is a JMAP `"error"` response (RFC 8620 §3.6.1).
540///
541/// This function is `pub` so extension crates (`jmap-chat-client`,
542/// `jmap-mail-client`) can use it to extract typed results from a
543/// [`jmap_types::JmapResponse`] without depending on internal details.
544pub fn extract_response<T: serde::de::DeserializeOwned>(
545 resp: &jmap_types::JmapResponse,
546 call_id: &str,
547) -> Result<T, ClientError> {
548 // Invocation is a type alias (String, Value, String) = (method, args, call_id)
549 let inv = resp
550 .method_responses
551 .iter()
552 .find(|inv| inv.2 == call_id)
553 .ok_or_else(|| ClientError::MethodNotFound(call_id.to_owned()))?;
554 let (method_name, args, _) = inv;
555
556 // RFC 8620 §3.6.1: a method name of "error" signals a protocol-level error.
557 if method_name == "error" {
558 let err_type = args
559 .get("type")
560 .and_then(|v| v.as_str())
561 .unwrap_or("serverError") // safe: fallback literal, not user input
562 .to_owned();
563 let description = args
564 .get("description")
565 .and_then(|v| v.as_str())
566 .map(str::to_owned);
567 return Err(ClientError::MethodError {
568 error_type: err_type,
569 description,
570 });
571 }
572
573 <T as serde::Deserialize>::deserialize(args).map_err(ClientError::Parse)
574}
575
576/// Decode as much valid UTF-8 as possible from `raw` into `buf`, draining
577/// consumed (or definitively-invalid) bytes from `raw`.
578///
579/// Three cases:
580/// - `raw` is fully valid UTF-8: pushed entirely to `buf`, `raw` cleared.
581/// - `raw` ends with an **incomplete** multi-byte sequence (`error_len == None`):
582/// the valid prefix is pushed to `buf` and drained from `raw`; the incomplete
583/// head bytes stay in `raw` for the next chunk to complete them.
584/// - `raw` contains a **definitively invalid** byte sequence (`error_len == Some(n)`):
585/// the valid prefix is pushed to `buf`; the valid prefix AND the `n` invalid
586/// bytes are drained from `raw` so they do not accumulate.
587///
588/// The caller is responsible for capping `raw.len()` before calling this
589/// function; unbounded growth from a hostile server that never completes a
590/// sequence is prevented by the 1 MiB `SSE_BUF_SIZE_LIMIT` check in
591/// `subscribe_events`.
592fn decode_utf8_chunk(raw: &mut Vec<u8>, buf: &mut String) {
593 match std::str::from_utf8(raw) {
594 Ok(s) => {
595 buf.push_str(s);
596 raw.clear();
597 }
598 Err(e) => {
599 let valid_up_to = e.valid_up_to();
600 // valid_up_to is always a char boundary by definition.
601 buf.push_str(
602 std::str::from_utf8(&raw[..valid_up_to])
603 .expect("valid_up_to is a valid UTF-8 boundary"),
604 );
605 match e.error_len() {
606 Some(n) => {
607 // Definitively invalid: drain the valid prefix AND the
608 // invalid bytes so they do not accumulate in raw.
609 let drain_end = (valid_up_to + n).min(raw.len());
610 raw.drain(..drain_end);
611 }
612 None => {
613 // Incomplete multi-byte sequence: drain only the valid
614 // prefix. The incomplete head stays in raw until the next
615 // chunk arrives with the missing continuation bytes.
616 raw.drain(..valid_up_to);
617 }
618 }
619 }
620 }
621}
622
623/// Extract the URL scheme as a lowercase ASCII string.
624///
625/// Returns the prefix before `"://"`, lowercased per RFC 3986 §3.1 (schemes
626/// are case-insensitive). Returns an empty string if `"://"` is not present.
627/// URL templates containing `{variable}` syntax are handled correctly because
628/// the extraction is a prefix scan, not a full URL parse.
629fn url_scheme(url: &str) -> String {
630 url.find("://")
631 .map(|i| url[..i].to_ascii_lowercase())
632 .unwrap_or_default()
633}
634
635/// Validate that `url` uses an http or https scheme.
636///
637/// Called at the top of each public method that accepts a URL parameter
638/// (`call`, `subscribe_events`, `upload_blob`, `download_blob`). This is a
639/// defense-in-depth check: the primary protection is [`validate_session_urls`]
640/// which rejects bad URLs in the Session document at fetch time. This check
641/// makes each individual call site self-defending against accidentally passing
642/// a non-http URL (e.g. from a test fixture or a misused API).
643///
644/// Returns [`ClientError::InvalidArgument`] if the scheme is not http/https.
645pub(crate) fn require_http_url(url: &str) -> Result<(), ClientError> {
646 let scheme = url_scheme(url);
647 if scheme != "http" && scheme != "https" {
648 return Err(ClientError::InvalidArgument(format!(
649 "URL must have http or https scheme, got: {url:?}"
650 )));
651 }
652 Ok(())
653}
654
655/// Validate that all URL fields in `session` use an http or https scheme.
656///
657/// Returns `ClientError::InvalidSession` if any URL has a non-http/https scheme.
658/// This prevents a malicious server from injecting non-HTTP URLs into subsequent
659/// requests (e.g. `file://`, `ftp://`).
660///
661/// Scheme comparison is case-insensitive per RFC 3986 §3.1: both `http://` and
662/// `HTTP://` are accepted. Session URL templates may contain `{variable}`
663/// syntax that prevents full URL parsing, so only the scheme prefix is checked.
664fn validate_session_urls(session: &Session) -> Result<(), ClientError> {
665 for url in [
666 &session.api_url,
667 &session.upload_url,
668 &session.download_url,
669 &session.event_source_url,
670 ] {
671 let scheme = url_scheme(url);
672 if scheme != "http" && scheme != "https" {
673 return Err(ClientError::InvalidSession(format!(
674 "session URL has non-http/https scheme: {:?}",
675 url
676 )));
677 }
678 }
679 Ok(())
680}
681
682// ---------------------------------------------------------------------------
683// Tests
684// ---------------------------------------------------------------------------
685
686#[cfg(test)]
687mod tests {
688 use super::decode_utf8_chunk;
689
690 /// Oracle: all-ASCII bytes pushed to buf; raw cleared.
691 #[test]
692 fn decode_utf8_chunk_all_ascii() {
693 let mut raw = b"hello".to_vec();
694 let mut buf = String::new();
695 decode_utf8_chunk(&mut raw, &mut buf);
696 assert_eq!(buf, "hello");
697 assert!(raw.is_empty());
698 }
699
700 /// Oracle: complete multi-byte codepoint (U+00E9 café = 0xC3 0xA9) pushed fully.
701 #[test]
702 fn decode_utf8_chunk_complete_multibyte() {
703 let mut raw = "café".as_bytes().to_vec();
704 let mut buf = String::new();
705 decode_utf8_chunk(&mut raw, &mut buf);
706 assert_eq!(buf, "café");
707 assert!(raw.is_empty());
708 }
709
710 /// Oracle: first byte of a 2-byte sequence (U+00E9 = 0xC3 0xA9) arrives alone.
711 /// error_len == None (incomplete) — the byte must be RETAINED in raw, not dropped.
712 /// Regression test for the bug where valid_up_to.max(1) discarded this byte.
713 #[test]
714 fn decode_utf8_chunk_incomplete_head_retained() {
715 let mut raw = vec![0xC3u8]; // first byte of 2-byte sequence
716 let mut buf = String::new();
717 decode_utf8_chunk(&mut raw, &mut buf);
718 assert_eq!(buf, "", "no complete codepoints to push");
719 assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
720 }
721
722 /// Oracle: valid ASCII prefix then first byte of a 2-byte sequence.
723 /// Valid prefix goes to buf; incomplete head stays in raw.
724 #[test]
725 fn decode_utf8_chunk_prefix_then_incomplete_head() {
726 let mut raw = vec![b'a', b'b', 0xC3u8];
727 let mut buf = String::new();
728 decode_utf8_chunk(&mut raw, &mut buf);
729 assert_eq!(buf, "ab");
730 assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
731 }
732
733 /// Oracle: two-call simulation — incomplete sequence completed on second call.
734 /// This is the exact HTTP chunk-split scenario the fix targets.
735 #[test]
736 fn decode_utf8_chunk_split_sequence_completed() {
737 // Chunk 1: only the first byte of U+00E9 (é = 0xC3 0xA9)
738 let mut raw = vec![0xC3u8];
739 let mut buf = String::new();
740 decode_utf8_chunk(&mut raw, &mut buf);
741 assert_eq!(raw, vec![0xC3u8], "incomplete head retained after chunk 1");
742
743 // Chunk 2: completion byte
744 raw.push(0xA9u8);
745 decode_utf8_chunk(&mut raw, &mut buf);
746 assert_eq!(buf, "é", "character fully decoded after chunk 2");
747 assert!(raw.is_empty());
748 }
749
750 /// Oracle: definitively invalid byte (0xFF is never valid in UTF-8) is drained.
751 #[test]
752 fn decode_utf8_chunk_invalid_byte_drained() {
753 let mut raw = vec![0xFFu8];
754 let mut buf = String::new();
755 decode_utf8_chunk(&mut raw, &mut buf);
756 assert_eq!(buf, "");
757 assert!(raw.is_empty(), "definitively invalid byte must be drained");
758 }
759
760 /// Oracle: valid prefix then definitively invalid byte — prefix pushed and both drained.
761 #[test]
762 fn decode_utf8_chunk_prefix_then_invalid_drained() {
763 let mut raw = vec![b'a', b'b', 0xFFu8];
764 let mut buf = String::new();
765 decode_utf8_chunk(&mut raw, &mut buf);
766 assert_eq!(buf, "ab");
767 assert!(raw.is_empty(), "prefix and invalid byte must be drained");
768 }
769}