jmap-base-client 0.1.0

RFC 8620 JMAP base client — auth-agnostic, session fetch, blob, SSE, WebSocket
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
//! Auth-agnostic base JMAP HTTP client (RFC 8620).
//!
//! Provides [`JmapClient`] for session fetch, API calls, blob transfer,
//! SSE event streaming, and [`extract_response`] for parsing method results.

use std::sync::Arc;

use futures::StreamExt;

use crate::auth::{AuthProvider, DefaultTransport, TransportConfig};
use crate::error::ClientError;
use crate::request::Session;
use crate::sse::{parse_sse_block, SseFrame};

/// Internal state threaded through the `subscribe_events` unfold loop.
struct SseStreamState<S> {
    stream: S,
    /// Accumulates raw bytes from the HTTP stream before UTF-8 decoding.
    /// Incomplete multi-byte sequences remain here until the next chunk
    /// completes them, preventing stream termination when a codepoint is
    /// split across adjacent chunks.
    raw_buf: Vec<u8>,
    buf: String,
    /// Byte offset from which the next delimiter scan begins.
    /// Must always be a valid UTF-8 char boundary of `buf`.
    scan_from: usize,
}

/// Per-client configuration for timeouts and body size limits.
///
/// Use [`ClientConfig::default()`] for production defaults (30s timeout, RFC-safe caps).
///
/// This type is `#[non_exhaustive]`: callers outside this crate must use
/// `..ClientConfig::default()` when constructing it, allowing new fields to
/// be added in minor versions without breaking callers.
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClientConfig {
    /// Timeout for HTTP request/response cycles (fetch_session, call, upload_blob, download_blob).
    /// Does NOT apply to SSE or WebSocket streams (which are indefinite by nature).
    /// Must be > 0. Use `Duration::from_secs(30)` for a 30-second timeout.
    /// Default: 30 seconds.
    pub request_timeout: std::time::Duration,
    /// Maximum response body for fetch_session. Default: 1 MiB.
    pub max_session_body: u64,
    /// Maximum response body for call(). Default: 8 MiB.
    pub max_call_body: u64,
    /// Maximum response body for download_blob(). Default: 64 MiB.
    pub max_download_body: u64,
    /// Maximum response body for upload_blob() response parsing. Default: 1 MiB.
    pub max_upload_body: u64,
    /// Maximum byte length of a single SSE frame (raw bytes and decoded text).
    /// Protects against memory exhaustion from a hostile or misbehaving server
    /// that sends a single very large frame. Must be > 0. Default: 1 MiB.
    pub max_sse_frame: usize,
}

impl Default for ClientConfig {
    fn default() -> Self {
        ClientConfig {
            request_timeout: std::time::Duration::from_secs(30),
            max_session_body: 1024 * 1024,
            max_call_body: 8 * 1024 * 1024,
            max_download_body: 64 * 1024 * 1024,
            max_upload_body: 1024 * 1024,
            max_sse_frame: 1024 * 1024,
        }
    }
}

impl ClientConfig {
    /// Validate that all config fields satisfy their constraints.
    ///
    /// Called automatically by [`JmapClient::new`].  Callers may also call
    /// this directly to pre-validate a config before passing it to the
    /// constructor.
    ///
    /// # Errors
    ///
    /// Returns [`ClientError::InvalidArgument`] when any field is zero or
    /// out-of-range.
    pub fn validate(&self) -> Result<(), ClientError> {
        if self.max_session_body == 0 {
            return Err(ClientError::InvalidArgument(
                "ClientConfig.max_session_body must be > 0".into(),
            ));
        }
        if self.max_call_body == 0 {
            return Err(ClientError::InvalidArgument(
                "ClientConfig.max_call_body must be > 0".into(),
            ));
        }
        if self.max_download_body == 0 {
            return Err(ClientError::InvalidArgument(
                "ClientConfig.max_download_body must be > 0".into(),
            ));
        }
        if self.max_upload_body == 0 {
            return Err(ClientError::InvalidArgument(
                "ClientConfig.max_upload_body must be > 0".into(),
            ));
        }
        if self.request_timeout == std::time::Duration::ZERO {
            return Err(ClientError::InvalidArgument(
                "ClientConfig.request_timeout must be > 0; use Duration::from_secs(30) or similar"
                    .into(),
            ));
        }
        if self.max_sse_frame == 0 {
            return Err(ClientError::InvalidArgument(
                "ClientConfig.max_sse_frame must be > 0".into(),
            ));
        }
        Ok(())
    }
}

/// Auth-agnostic JMAP base HTTP client.
///
/// Construct with [`JmapClient::new`] or [`JmapClient::new_plain`].
/// Extension-specific clients (`jmap-chat-client`, `jmap-mail-client`) depend
/// on this crate and add their method implementations via `impl JmapClient`.
#[derive(Clone)]
pub struct JmapClient {
    pub(crate) base_url: url::Url,
    pub(crate) auth: Arc<dyn AuthProvider>,
    pub(crate) http: reqwest::Client,
    pub(crate) config: ClientConfig,
}

impl std::fmt::Debug for JmapClient {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("JmapClient")
            .field("base_url", &self.base_url)
            .field("config", &self.config)
            .finish_non_exhaustive()
    }
}

impl JmapClient {
    /// Create a new client.
    ///
    /// `transport` configures the underlying HTTP client (TLS trust roots,
    /// client certificates, timeouts). `auth` injects per-request credentials
    /// (Bearer token, Basic credentials, or none). The two are independent so
    /// any transport can be paired with any credential scheme — for example,
    /// `CustomCaTransport` with `BearerAuth`. `base_url` must be the server
    /// origin (scheme, host, optional port) with no path, query, or fragment
    /// — e.g. `"https://100.64.1.1:8008"`. Trailing slashes are normalized
    /// away by the URL parser and are therefore accepted.
    pub fn new(
        transport: impl TransportConfig,
        auth: impl AuthProvider + 'static,
        base_url: &str,
        config: ClientConfig,
    ) -> Result<Self, ClientError> {
        if base_url.is_empty() {
            return Err(ClientError::InvalidArgument(
                "base_url may not be empty".into(),
            ));
        }
        let parsed = url::Url::parse(base_url).map_err(|e| {
            ClientError::InvalidArgument(format!("base_url is not a valid URL: {e}"))
        })?;
        let scheme = parsed.scheme();
        if scheme != "http" && scheme != "https" {
            return Err(ClientError::InvalidArgument(format!(
                "base_url scheme must be http or https, got: {scheme:?}"
            )));
        }
        let path = parsed.path();
        // url::Url::path() returns "/" for root-only URLs (no path segments);
        // any value other than "/" means the URL contains an explicit path component.
        if path != "/" {
            return Err(ClientError::InvalidArgument(format!(
                "base_url must not have a path component, got: {path:?}"
            )));
        }
        if parsed.query().is_some() {
            return Err(ClientError::InvalidArgument(
                "base_url must not have a query string".into(),
            ));
        }
        if parsed.fragment().is_some() {
            return Err(ClientError::InvalidArgument(
                "base_url must not have a fragment".into(),
            ));
        }
        config.validate()?;
        let http = transport.build_client()?;
        Ok(Self {
            base_url: parsed,
            auth: Arc::new(auth),
            http,
            config,
        })
    }

    /// Convenience constructor for servers with publicly-trusted TLS.
    ///
    /// Equivalent to `JmapClient::new(DefaultTransport, auth, base_url, config)`.
    /// Use [`JmapClient::new`] when you need a custom transport (e.g.
    /// `CustomCaTransport` for a private-CA server).
    pub fn new_plain(
        auth: impl AuthProvider + 'static,
        base_url: &str,
        config: ClientConfig,
    ) -> Result<Self, ClientError> {
        Self::new(DefaultTransport, auth, base_url, config)
    }

    /// Apply the auth header (if any) to a request builder.
    ///
    /// Centralises the repeated `if let Some(...) = self.auth.auth_header()` pattern
    /// that every HTTP method uses. Callers: `fetch_session`, `call`,
    /// `subscribe_events`, `upload_blob`, `download_blob`.
    pub(crate) fn inject_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
        if let Some((name, value)) = self.auth.auth_header() {
            builder.header(name, value)
        } else {
            builder
        }
    }

    /// Returns `Err(ClientError::AuthFailed)` when the HTTP status indicates an
    /// authentication or authorization failure.
    ///
    /// Specifically handles:
    /// - 401 Unauthorized (RFC 7235 §3.1) — missing or invalid credentials
    /// - 403 Forbidden (RFC 7235 §3.2) — credentials present but insufficient
    ///
    /// Called before reading the response body so callers can distinguish
    /// permanent auth failures from transient errors without consuming the body.
    pub(crate) fn check_auth_status(status: reqwest::StatusCode) -> Result<(), ClientError> {
        if status == reqwest::StatusCode::UNAUTHORIZED || status == reqwest::StatusCode::FORBIDDEN {
            Err(ClientError::AuthFailed(status.as_u16()))
        } else {
            Ok(())
        }
    }

    /// Fetch the JMAP Session object from `{base_url}/.well-known/jmap` (RFC 8620 §2).
    ///
    /// The response body is capped at 1 MiB. Returns `ClientError::ResponseTooLarge`
    /// if the server sends more. Session URL fields (`apiUrl`, `uploadUrl`,
    /// `downloadUrl`, `eventSourceUrl`) are validated to have http/https scheme;
    /// a non-http scheme returns `ClientError::InvalidSession`.
    ///
    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
    pub async fn fetch_session(&self) -> Result<Session, ClientError> {
        let limit = self.config.max_session_body;
        let url = self.base_url.join(".well-known/jmap").map_err(|e| {
            // base_url was pre-validated in JmapClient::new; this path is
            // unreachable in practice but must be handled for completeness.
            ClientError::InvalidSession(format!("cannot construct session URL: {e}"))
        })?;

        let req = self.inject_auth(self.http.get(url).timeout(self.config.request_timeout));

        let resp = {
            let raw_resp = req.send().await.map_err(ClientError::Http)?;
            Self::check_auth_status(raw_resp.status())?;
            raw_resp.error_for_status().map_err(ClientError::Http)?
        };

        // Enforce size cap before reading. Content-Length can lie, so we check
        // both the header and the actual read size.
        if let Some(len) = resp.content_length() {
            if len > limit {
                return Err(ClientError::ResponseTooLarge { actual: len, limit });
            }
        }
        let bytes = resp.bytes().await.map_err(ClientError::Http)?;
        if bytes.len() as u64 > limit {
            return Err(ClientError::ResponseTooLarge {
                actual: bytes.len() as u64,
                limit,
            });
        }

        let session: Session = serde_json::from_slice(&bytes).map_err(ClientError::Parse)?;

        validate_session_urls(&session)?;

        Ok(session)
    }

    /// POST a [`jmap_types::JmapRequest`] to `api_url` and return the parsed [`jmap_types::JmapResponse`]
    /// (RFC 8620 §3.3).
    ///
    /// `api_url` is taken as an explicit parameter (not from `self`) because the
    /// caller holds a [`Session`] and selects the correct URL from it.
    ///
    /// The response body is capped at 8 MiB. Returns `ClientError::ResponseTooLarge`
    /// if the server sends more.
    ///
    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
    pub async fn call(
        &self,
        api_url: &str,
        req: &jmap_types::JmapRequest,
    ) -> Result<jmap_types::JmapResponse, ClientError> {
        require_http_url(api_url)?;
        let limit = self.config.max_call_body;

        let builder = self.inject_auth(
            self.http
                .post(api_url)
                .json(req)
                .timeout(self.config.request_timeout),
        );

        let resp = {
            let raw_resp = builder.send().await.map_err(ClientError::Http)?;
            Self::check_auth_status(raw_resp.status())?;
            raw_resp.error_for_status().map_err(ClientError::Http)?
        };

        // Enforce size cap before reading.
        if let Some(len) = resp.content_length() {
            if len > limit {
                return Err(ClientError::ResponseTooLarge { actual: len, limit });
            }
        }
        let bytes = resp.bytes().await.map_err(ClientError::Http)?;
        if bytes.len() as u64 > limit {
            return Err(ClientError::ResponseTooLarge {
                actual: bytes.len() as u64,
                limit,
            });
        }

        let jmap_resp: jmap_types::JmapResponse =
            serde_json::from_slice(&bytes).map_err(ClientError::Parse)?;

        Ok(jmap_resp)
    }

    /// Open an SSE connection to `event_source_url` and return an async stream
    /// of parsed [`SseFrame`]s (RFC 8620 §7.3).
    ///
    /// # URI template expansion
    ///
    /// `Session.event_source_url` is a URI template (RFC 6570 Level-1) with
    /// variables `types`, `closeafter`, and `ping`. You **must** expand it
    /// before passing it to this function, or the server will receive the
    /// literal text `{types}` in the URL and return an error. Use
    /// [`expand_url_template`](crate::expand_url_template):
    ///
    /// ```rust,ignore
    /// let url = jmap_base_client::expand_url_template(
    ///     &session.event_source_url,
    ///     &[("types", "*"), ("closeafter", "no"), ("ping", "0")],
    /// )?;
    /// let stream = client.subscribe_events(&url, None).await?;
    /// ```
    ///
    /// If `last_event_id` is `Some`, sends a `Last-Event-ID` header so the
    /// server can resume from where the previous stream left off.
    ///
    /// Buffer growth is capped at [`ClientConfig::max_sse_frame`] bytes per
    /// frame (default: 1 MiB). If a single SSE frame exceeds this limit the
    /// stream yields `ClientError::SseFrameTooLarge` and terminates.
    ///
    /// No timeout is applied to this call or to the resulting stream.  The
    /// connect timeout (10 s, TCP only) is the only deadline enforced.  If the
    /// server stalls before sending HTTP response headers, or later goes silent
    /// on the open connection, this call or the stream will hang indefinitely.
    /// Wrap the entire call and/or stream iteration in [`tokio::time::timeout`]
    /// if you need to bound either phase.
    ///
    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403 before the stream
    /// starts.
    pub async fn subscribe_events(
        &self,
        event_source_url: &str,
        last_event_id: Option<&str>,
    ) -> Result<futures::stream::BoxStream<'static, Result<SseFrame, ClientError>>, ClientError>
    {
        require_http_url(event_source_url)?;
        let mut req = self
            .http
            .get(event_source_url)
            .header("Accept", "text/event-stream");
        if let Some(id) = last_event_id {
            req = req.header("Last-Event-ID", id);
        }
        let req = self.inject_auth(req);

        let resp = req.send().await.map_err(ClientError::Http)?;
        Self::check_auth_status(resp.status())?;
        let resp = resp.error_for_status().map_err(ClientError::Http)?;

        // Verify Content-Type before streaming. A misconfigured server returning
        // application/json would silently produce no events (no SSE delimiter found).
        {
            // to_ascii_lowercase(): media types are case-insensitive per RFC 7231 §3.1.1.1.
            let ct = resp
                .headers()
                .get(reqwest::header::CONTENT_TYPE)
                .and_then(|v| v.to_str().ok())
                .unwrap_or("")
                .to_ascii_lowercase();
            if !ct.starts_with("text/event-stream") {
                return Err(ClientError::UnexpectedResponse(format!(
                    "subscribe_events: expected Content-Type text/event-stream, got: {ct:?}"
                )));
            }
        }

        let byte_stream = resp.bytes_stream();
        let sse_frame_limit = self.config.max_sse_frame;

        Ok(futures::stream::unfold(
            Some(SseStreamState {
                stream: byte_stream,
                raw_buf: Vec::new(),
                buf: String::new(),
                scan_from: 0, // invariant: valid UTF-8 char boundary of buf; 0 always satisfies this
            }),
            move |state| async move {
                let SseStreamState {
                    mut stream,
                    mut raw_buf,
                    mut buf,
                    mut scan_from,
                } = state?;
                loop {
                    // Search for any double-newline delimiter (LF/CRLF/CR variants).
                    // scan_from is set to old_len.saturating_sub(3) after each append
                    // so we only re-scan the overlap region.  3 bytes back is the
                    // minimum that covers all delimiter prefixes that can straddle a
                    // chunk boundary:
                    //   - `\r\n\r\n` (4 bytes): longest prefix that fits in one chunk
                    //     but is incomplete is `\r\n\r` (3 bytes) — exactly covered.
                    //   - `\n\r\n` (3 bytes): longest incomplete prefix is `\n\r` (2
                    //     bytes) — covered by the 3-byte overlap.
                    //   - `\n\n` and `\r\r` (2 bytes each): longest incomplete prefix
                    //     is 1 byte — covered by the 3-byte overlap.
                    // Since \r and \n are single-byte UTF-8 codepoints, 3 bytes back
                    // is always a valid char boundary — no adjustment needed.
                    let frame_end = [
                        buf[scan_from..]
                            .find("\r\n\r\n")
                            .map(|p| (scan_from + p, 4usize)),
                        buf[scan_from..]
                            .find("\n\n")
                            .map(|p| (scan_from + p, 2usize)),
                        buf[scan_from..]
                            .find("\r\r")
                            .map(|p| (scan_from + p, 2usize)),
                        // Mixed: LF-terminated last field line followed by a
                        // CRLF-terminated blank line.  Not detected by \n\n (the
                        // LFs are separated by a CR) or \r\n\r\n (no leading \r\n).
                        buf[scan_from..]
                            .find("\n\r\n")
                            .map(|p| (scan_from + p, 3usize)),
                    ]
                    .into_iter()
                    .flatten()
                    .min_by_key(|&(pos, _)| pos);

                    if let Some((pos, delim_len)) = frame_end {
                        let frame = {
                            let slice = &buf[..pos];
                            if slice.contains('\r') {
                                slice.replace("\r\n", "\n").replace('\r', "\n")
                            } else {
                                slice.to_owned()
                            }
                        };
                        buf.drain(..pos + delim_len);
                        scan_from = 0; // 0 satisfies the UTF-8 char boundary invariant
                        let sse_frame = parse_sse_block(&frame);
                        return Some((
                            Ok(sse_frame),
                            Some(SseStreamState {
                                stream,
                                raw_buf,
                                buf,
                                scan_from,
                            }),
                        ));
                    }

                    match stream.next().await {
                        None => return None,
                        Some(Err(e)) => {
                            return Some((Err(ClientError::Http(e)), None));
                        }
                        Some(Ok(bytes)) => {
                            // Accumulate raw bytes first. A multi-byte UTF-8 codepoint
                            // may be split across adjacent HTTP chunks; decode only the
                            // valid prefix and leave the remainder in raw_buf until the
                            // next chunk completes the sequence.
                            raw_buf.extend_from_slice(&bytes);
                            // Cap raw_buf to prevent OOM on persistent invalid UTF-8 input.
                            // Use the same limit as the decoded buf cap.
                            if raw_buf.len() > sse_frame_limit {
                                return Some((
                                    Err(ClientError::SseFrameTooLarge {
                                        limit: sse_frame_limit,
                                    }),
                                    None,
                                ));
                            }
                            let old_len = buf.len();
                            decode_utf8_chunk(&mut raw_buf, &mut buf);
                            scan_from = old_len.saturating_sub(3);
                            // Walk backward to a valid UTF-8 char boundary so that
                            // buf[scan_from..] never panics on multibyte characters.
                            while scan_from > 0 && !buf.is_char_boundary(scan_from) {
                                scan_from -= 1;
                            }
                            // Guard against unbounded buffer growth from a hostile server.
                            // Yield the error and terminate (state = None).
                            if buf.len() > sse_frame_limit {
                                return Some((
                                    Err(ClientError::SseFrameTooLarge {
                                        limit: sse_frame_limit,
                                    }),
                                    None,
                                ));
                            }
                        }
                    }
                }
            },
        )
        .boxed())
    }
}

/// Find the method response matching `call_id` in `resp` and deserialize its
/// arguments into `T`.
///
/// Returns [`ClientError::MethodNotFound`] if no invocation with the given
/// call_id exists. Returns [`ClientError::MethodError`] if the matched
/// invocation is a JMAP `"error"` response (RFC 8620 §3.6.1).
///
/// This function is `pub` so extension crates (`jmap-chat-client`,
/// `jmap-mail-client`) can use it to extract typed results from a
/// [`jmap_types::JmapResponse`] without depending on internal details.
pub fn extract_response<T: serde::de::DeserializeOwned>(
    resp: &jmap_types::JmapResponse,
    call_id: &str,
) -> Result<T, ClientError> {
    // Invocation is a type alias (String, Value, String) = (method, args, call_id)
    let inv = resp
        .method_responses
        .iter()
        .find(|inv| inv.2 == call_id)
        .ok_or_else(|| ClientError::MethodNotFound(call_id.to_owned()))?;
    let (method_name, args, _) = inv;

    // RFC 8620 §3.6.1: a method name of "error" signals a protocol-level error.
    if method_name == "error" {
        let err_type = args
            .get("type")
            .and_then(|v| v.as_str())
            .unwrap_or("serverError") // safe: fallback literal, not user input
            .to_owned();
        let description = args
            .get("description")
            .and_then(|v| v.as_str())
            .map(str::to_owned);
        return Err(ClientError::MethodError {
            error_type: err_type,
            description,
        });
    }

    <T as serde::Deserialize>::deserialize(args).map_err(ClientError::Parse)
}

/// Decode as much valid UTF-8 as possible from `raw` into `buf`, draining
/// consumed (or definitively-invalid) bytes from `raw`.
///
/// Three cases:
/// - `raw` is fully valid UTF-8: pushed entirely to `buf`, `raw` cleared.
/// - `raw` ends with an **incomplete** multi-byte sequence (`error_len == None`):
///   the valid prefix is pushed to `buf` and drained from `raw`; the incomplete
///   head bytes stay in `raw` for the next chunk to complete them.
/// - `raw` contains a **definitively invalid** byte sequence (`error_len == Some(n)`):
///   the valid prefix is pushed to `buf`; the valid prefix AND the `n` invalid
///   bytes are drained from `raw` so they do not accumulate.
///
/// The caller is responsible for capping `raw.len()` before calling this
/// function; unbounded growth from a hostile server that never completes a
/// sequence is prevented by the 1 MiB `SSE_BUF_SIZE_LIMIT` check in
/// `subscribe_events`.
fn decode_utf8_chunk(raw: &mut Vec<u8>, buf: &mut String) {
    match std::str::from_utf8(raw) {
        Ok(s) => {
            buf.push_str(s);
            raw.clear();
        }
        Err(e) => {
            let valid_up_to = e.valid_up_to();
            // valid_up_to is always a char boundary by definition.
            buf.push_str(
                std::str::from_utf8(&raw[..valid_up_to])
                    .expect("valid_up_to is a valid UTF-8 boundary"),
            );
            match e.error_len() {
                Some(n) => {
                    // Definitively invalid: drain the valid prefix AND the
                    // invalid bytes so they do not accumulate in raw.
                    let drain_end = (valid_up_to + n).min(raw.len());
                    raw.drain(..drain_end);
                }
                None => {
                    // Incomplete multi-byte sequence: drain only the valid
                    // prefix.  The incomplete head stays in raw until the next
                    // chunk arrives with the missing continuation bytes.
                    raw.drain(..valid_up_to);
                }
            }
        }
    }
}

/// Extract the URL scheme as a lowercase ASCII string.
///
/// Returns the prefix before `"://"`, lowercased per RFC 3986 §3.1 (schemes
/// are case-insensitive). Returns an empty string if `"://"` is not present.
/// URL templates containing `{variable}` syntax are handled correctly because
/// the extraction is a prefix scan, not a full URL parse.
fn url_scheme(url: &str) -> String {
    url.find("://")
        .map(|i| url[..i].to_ascii_lowercase())
        .unwrap_or_default()
}

/// Validate that `url` uses an http or https scheme.
///
/// Called at the top of each public method that accepts a URL parameter
/// (`call`, `subscribe_events`, `upload_blob`, `download_blob`).  This is a
/// defense-in-depth check: the primary protection is [`validate_session_urls`]
/// which rejects bad URLs in the Session document at fetch time.  This check
/// makes each individual call site self-defending against accidentally passing
/// a non-http URL (e.g. from a test fixture or a misused API).
///
/// Returns [`ClientError::InvalidArgument`] if the scheme is not http/https.
pub(crate) fn require_http_url(url: &str) -> Result<(), ClientError> {
    let scheme = url_scheme(url);
    if scheme != "http" && scheme != "https" {
        return Err(ClientError::InvalidArgument(format!(
            "URL must have http or https scheme, got: {url:?}"
        )));
    }
    Ok(())
}

/// Validate that all URL fields in `session` use an http or https scheme.
///
/// Returns `ClientError::InvalidSession` if any URL has a non-http/https scheme.
/// This prevents a malicious server from injecting non-HTTP URLs into subsequent
/// requests (e.g. `file://`, `ftp://`).
///
/// Scheme comparison is case-insensitive per RFC 3986 §3.1: both `http://` and
/// `HTTP://` are accepted.  Session URL templates may contain `{variable}`
/// syntax that prevents full URL parsing, so only the scheme prefix is checked.
fn validate_session_urls(session: &Session) -> Result<(), ClientError> {
    for url in [
        &session.api_url,
        &session.upload_url,
        &session.download_url,
        &session.event_source_url,
    ] {
        let scheme = url_scheme(url);
        if scheme != "http" && scheme != "https" {
            return Err(ClientError::InvalidSession(format!(
                "session URL has non-http/https scheme: {:?}",
                url
            )));
        }
    }
    Ok(())
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::decode_utf8_chunk;

    /// Oracle: all-ASCII bytes pushed to buf; raw cleared.
    #[test]
    fn decode_utf8_chunk_all_ascii() {
        let mut raw = b"hello".to_vec();
        let mut buf = String::new();
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(buf, "hello");
        assert!(raw.is_empty());
    }

    /// Oracle: complete multi-byte codepoint (U+00E9 café = 0xC3 0xA9) pushed fully.
    #[test]
    fn decode_utf8_chunk_complete_multibyte() {
        let mut raw = "café".as_bytes().to_vec();
        let mut buf = String::new();
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(buf, "café");
        assert!(raw.is_empty());
    }

    /// Oracle: first byte of a 2-byte sequence (U+00E9 = 0xC3 0xA9) arrives alone.
    /// error_len == None (incomplete) — the byte must be RETAINED in raw, not dropped.
    /// Regression test for the bug where valid_up_to.max(1) discarded this byte.
    #[test]
    fn decode_utf8_chunk_incomplete_head_retained() {
        let mut raw = vec![0xC3u8]; // first byte of 2-byte sequence
        let mut buf = String::new();
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(buf, "", "no complete codepoints to push");
        assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
    }

    /// Oracle: valid ASCII prefix then first byte of a 2-byte sequence.
    /// Valid prefix goes to buf; incomplete head stays in raw.
    #[test]
    fn decode_utf8_chunk_prefix_then_incomplete_head() {
        let mut raw = vec![b'a', b'b', 0xC3u8];
        let mut buf = String::new();
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(buf, "ab");
        assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
    }

    /// Oracle: two-call simulation — incomplete sequence completed on second call.
    /// This is the exact HTTP chunk-split scenario the fix targets.
    #[test]
    fn decode_utf8_chunk_split_sequence_completed() {
        // Chunk 1: only the first byte of U+00E9 (é = 0xC3 0xA9)
        let mut raw = vec![0xC3u8];
        let mut buf = String::new();
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(raw, vec![0xC3u8], "incomplete head retained after chunk 1");

        // Chunk 2: completion byte
        raw.push(0xA9u8);
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(buf, "é", "character fully decoded after chunk 2");
        assert!(raw.is_empty());
    }

    /// Oracle: definitively invalid byte (0xFF is never valid in UTF-8) is drained.
    #[test]
    fn decode_utf8_chunk_invalid_byte_drained() {
        let mut raw = vec![0xFFu8];
        let mut buf = String::new();
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(buf, "");
        assert!(raw.is_empty(), "definitively invalid byte must be drained");
    }

    /// Oracle: valid prefix then definitively invalid byte — prefix pushed and both drained.
    #[test]
    fn decode_utf8_chunk_prefix_then_invalid_drained() {
        let mut raw = vec![b'a', b'b', 0xFFu8];
        let mut buf = String::new();
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(buf, "ab");
        assert!(raw.is_empty(), "prefix and invalid byte must be drained");
    }
}