jmap-base-client 0.1.2

RFC 8620 JMAP base client — auth-agnostic, session fetch, blob, SSE, WebSocket
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
//! Auth-agnostic base JMAP HTTP client (RFC 8620).
//!
//! Provides [`JmapClient`] for session fetch, API calls, blob transfer,
//! SSE event streaming, and [`extract_response`] for parsing method results.

use std::sync::Arc;

use futures::StreamExt;

use crate::auth::{AuthProvider, DefaultTransport, TransportConfig};
use crate::error::ClientError;
use crate::request::Session;
use crate::sse::{parse_sse_block, SseFrame};

/// Internal state threaded through the `subscribe_events` unfold loop.
struct SseStreamState<S> {
    stream: S,
    /// Accumulates raw bytes from the HTTP stream before UTF-8 decoding.
    /// Incomplete multi-byte sequences remain here until the next chunk
    /// completes them, preventing stream termination when a codepoint is
    /// split across adjacent chunks.
    raw_buf: Vec<u8>,
    buf: String,
    /// Byte offset from which the next delimiter scan begins.
    /// Must always be a valid UTF-8 char boundary of `buf`.
    scan_from: usize,
}

/// Per-client configuration for timeouts and body size limits.
///
/// Use [`ClientConfig::default()`] for production defaults (30s timeout, RFC-safe caps).
///
/// This type is `#[non_exhaustive]`: callers outside this crate must use
/// `..ClientConfig::default()` when constructing it, allowing new fields to
/// be added in minor versions without breaking callers.
///
/// # Field-type rationale (bd:JMAP-6r7c.21)
///
/// The fields mix `u64` and `usize` integer types. The split is deliberate
/// and tracks the underlying transport's expectations:
///
/// - **HTTP body caps are `u64`** (`max_session_body`, `max_call_body`,
///   `max_download_body`, `max_upload_response_body`). `reqwest` reports
///   `Content-Length` as `u64`, and an HTTP body can in principle exceed
///   `usize::MAX` on a 32-bit target (4 GiB) without exceeding 64-bit
///   limits; the cap comparison happens before accumulation.
/// - **In-memory frame/message caps are `usize`** (`max_sse_frame`,
///   `max_ws_message`). These bound a `Vec<u8>` or `String` that must
///   fit in process memory; `usize` is the address-space-native type
///   and matches what `tokio_tungstenite::tungstenite::protocol::WebSocketConfig`
///   expects.
///
/// A future minor release MAY consolidate on `u64` with internal
/// `usize::try_from` casts at the call sites that need it. The mixed-int
/// API is a small ergonomic cost (callers cannot pass the same untyped
/// integer literal to both kinds of field without a `_u64` or `_usize`
/// suffix or `as usize` cast) traded for transparency to the underlying
/// transport's contract.
///
/// # No cross-field invariants are enforced
///
/// `validate()` checks each field independently. There is intentionally
/// no constraint that `max_sse_frame <= max_call_body` or similar — they
/// govern different threats (per-frame buffer cap vs whole-body cap)
/// and a deployment may rationally configure them in either order.
/// Document the chosen values in your own deployment notes.
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClientConfig {
    /// Timeout for HTTP request/response cycles (fetch_session, call, upload_blob, download_blob).
    /// Does NOT apply to SSE or WebSocket streams (which are indefinite by nature).
    /// Must be > 0. Use `Duration::from_secs(30)` for a 30-second timeout.
    /// Default: 30 seconds.
    ///
    /// `Duration::ZERO` is forbidden because `reqwest::RequestBuilder::timeout`
    /// treats `Duration::ZERO` as "no per-request timeout" (only the
    /// client-level `connect_timeout` applies), not "instant fail". A future
    /// maintainer who thinks "zero means disable timeout, why not allow it?"
    /// is reading reqwest's semantics correctly but missing that **this
    /// crate intentionally forbids the no-timeout configuration**: a
    /// caller-visible 30-second-by-default timeout protects against
    /// indefinite hangs on stalled servers and against a slowloris-style
    /// resource leak in JMAP clients that hold one outstanding request per
    /// account. `validate()` enforces the positive-timeout invariant
    /// (bd:JMAP-6r7c.29). Do not relax this without re-deriving the
    /// no-timeout DoS argument.
    pub request_timeout: std::time::Duration,
    /// Maximum response body for fetch_session. Default: 1 MiB.
    pub max_session_body: u64,
    /// Maximum response body for call(). Default: 8 MiB.
    pub max_call_body: u64,
    /// Maximum response body for download_blob(). Default: 64 MiB.
    pub max_download_body: u64,
    /// Maximum size in bytes of the JSON response body returned by the
    /// server in reply to `upload_blob()`. Does NOT cap the size of the
    /// blob being uploaded — the JMAP server enforces that via its
    /// `maxSizeUpload` capability. This field caps only the small JSON
    /// envelope the server returns describing the stored blob.
    /// Default: 1 MiB.
    pub max_upload_response_body: u64,
    /// Maximum byte length of a single SSE frame; applied independently to
    /// the raw incoming bytes (pre-UTF-8 decode) and the decoded text.
    /// Protects against memory exhaustion from a hostile or misbehaving
    /// server that sends a single very large frame. Must be > 0.
    /// Default: 1 MiB.
    ///
    /// **Memory residency note (bd:JMAP-6lsm.7):** because the raw byte
    /// buffer and the decoded text buffer are tracked separately, the
    /// in-flight footprint while parsing a single frame can momentarily
    /// reach ~2 × `max_sse_frame` (raw bytes accumulated up to the limit,
    /// plus decoded text not yet drained). If you tune this value for a
    /// tight memory budget, plan for that 2× peak. The independent
    /// tracking is correct for the streaming UTF-8 decoder (which needs
    /// the raw buffer to be at least one full frame to handle split
    /// multi-byte sequences across HTTP chunks); see `decode_utf8_chunk`
    /// for the rationale.
    pub max_sse_frame: usize,
    /// Maximum byte length of a single WebSocket message (and frame). Mirrors
    /// `max_sse_frame` for the WebSocket transport. Used by
    /// [`JmapClient::connect_ws_session`] and threaded through to
    /// `tokio_tungstenite::tungstenite::protocol::WebSocketConfig`'s
    /// `max_message_size` and `max_frame_size`. Must be > 0.
    /// Default: 1 MiB. (bd:JMAP-6lsm.5)
    pub max_ws_message: usize,
}

impl Default for ClientConfig {
    fn default() -> Self {
        ClientConfig {
            request_timeout: std::time::Duration::from_secs(30),
            max_session_body: 1024 * 1024,
            max_call_body: 8 * 1024 * 1024,
            max_download_body: 64 * 1024 * 1024,
            max_upload_response_body: 1024 * 1024,
            max_sse_frame: 1024 * 1024,
            max_ws_message: 1024 * 1024,
        }
    }
}

impl ClientConfig {
    /// Validate that all config fields satisfy their constraints.
    ///
    /// Called automatically by [`JmapClient::new`].  Callers may also call
    /// this directly to pre-validate a config before passing it to the
    /// constructor.
    ///
    /// # Errors
    ///
    /// Returns [`ClientError::InvalidArgument`] when any field is zero or
    /// out-of-range.
    pub fn validate(&self) -> Result<(), ClientError> {
        if self.max_session_body == 0 {
            return Err(ClientError::InvalidArgument(
                "ClientConfig.max_session_body must be > 0".into(),
            ));
        }
        if self.max_call_body == 0 {
            return Err(ClientError::InvalidArgument(
                "ClientConfig.max_call_body must be > 0".into(),
            ));
        }
        if self.max_download_body == 0 {
            return Err(ClientError::InvalidArgument(
                "ClientConfig.max_download_body must be > 0".into(),
            ));
        }
        if self.max_upload_response_body == 0 {
            return Err(ClientError::InvalidArgument(
                "ClientConfig.max_upload_response_body must be > 0".into(),
            ));
        }
        if self.request_timeout == std::time::Duration::ZERO {
            // Duration::ZERO would let reqwest run requests without a
            // per-request timeout (reqwest treats ZERO as "no timeout").
            // Reject explicitly — see ClientConfig::request_timeout
            // rustdoc for the DoS-resistance rationale (bd:JMAP-6r7c.29).
            return Err(ClientError::InvalidArgument(
                "ClientConfig.request_timeout must be > 0; Duration::ZERO would disable the per-request timeout in reqwest, not 'fail immediately'. Use Duration::from_secs(30) or similar."
                    .into(),
            ));
        }
        if self.max_sse_frame == 0 {
            return Err(ClientError::InvalidArgument(
                "ClientConfig.max_sse_frame must be > 0".into(),
            ));
        }
        if self.max_ws_message == 0 {
            return Err(ClientError::InvalidArgument(
                "ClientConfig.max_ws_message must be > 0".into(),
            ));
        }
        Ok(())
    }
}

/// Auth-agnostic JMAP base HTTP client.
///
/// Construct with [`JmapClient::new`] or [`JmapClient::new_plain`].
/// Extension-specific clients (`jmap-chat-client`, `jmap-mail-client`) depend
/// on this crate and add their method implementations via `impl JmapClient`.
///
/// # Thread-safety (bd:JMAP-6r7c.25)
///
/// `JmapClient` is `Send + Sync + Clone`. Share by clone across threads or
/// `tokio::spawn` tasks; the underlying `reqwest::Client` is reference-counted
/// (Arc-backed) and the [`AuthProvider`] trait requires `Send + Sync` on every
/// implementation. A compile-time assertion in this crate's test suite pins
/// the `Send + Sync` contract: a future refactor that adds a non-`Sync` field
/// (e.g. `Rc<T>`, `RefCell<T>`, `Cell<T>`) would break the assertion in CI
/// before any downstream consumer is exposed.
#[derive(Clone)]
pub struct JmapClient {
    pub(crate) base_url: url::Url,
    pub(crate) auth: Arc<dyn AuthProvider>,
    pub(crate) http: reqwest::Client,
    pub(crate) config: ClientConfig,
}

impl std::fmt::Debug for JmapClient {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("JmapClient")
            .field("base_url", &self.base_url)
            .field("config", &self.config)
            .finish_non_exhaustive()
    }
}

impl JmapClient {
    /// Create a new client.
    ///
    /// `transport` configures the underlying HTTP client (TLS trust roots,
    /// client certificates, timeouts). `auth` injects per-request credentials
    /// (Bearer token, Basic credentials, or none). The two are independent so
    /// any transport can be paired with any credential scheme — for example,
    /// `CustomCaTransport` with `BearerAuth`. `base_url` must be the server
    /// origin (scheme, host, optional port) with no path, query, or fragment
    /// — e.g. `"https://100.64.1.1:8008"`. Trailing slashes are normalized
    /// away by the URL parser and are therefore accepted.
    pub fn new(
        transport: impl TransportConfig,
        auth: impl AuthProvider + 'static,
        base_url: &str,
        config: ClientConfig,
    ) -> Result<Self, ClientError> {
        // Parse-don't-validate: parse_base_url returns a fully-validated
        // url::Url so the constructor body can read as four facts (parse,
        // validate config, build transport, construct Self) rather than
        // a procedure (bd:JMAP-6lsm.26).
        let parsed = parse_base_url(base_url)?;
        config.validate()?;
        // Unwrap the opaque HttpClient at construction time (bd:JMAP-6r7c.36).
        // The wrapper exists to keep reqwest::Client out of the
        // TransportConfig trait signature; once the client is owned by
        // JmapClient, the inner reqwest::Client is the working type for
        // internal request building. into_inner is pub(crate) so external
        // code cannot reach inside the opaque wrapper.
        let http = transport.build_client()?.into_inner();
        Ok(Self {
            base_url: parsed,
            auth: Arc::new(auth),
            http,
            config,
        })
    }

    /// Convenience constructor for servers with publicly-trusted TLS.
    ///
    /// Equivalent to `JmapClient::new(DefaultTransport, auth, base_url, config)`.
    /// Use [`JmapClient::new`] when you need a custom transport (e.g.
    /// `CustomCaTransport` for a private-CA server).
    pub fn new_plain(
        auth: impl AuthProvider + 'static,
        base_url: &str,
        config: ClientConfig,
    ) -> Result<Self, ClientError> {
        Self::new(DefaultTransport, auth, base_url, config)
    }

    /// Create a new client sharing an existing `Arc<dyn AuthProvider>`
    /// (bd:JMAP-6r7c.27).
    ///
    /// `JmapClient::new` and `new_plain` take `auth` by value and wrap it
    /// in a fresh `Arc` internally. That is the ergonomic case for a
    /// caller constructing one client with one auth provider. It is the
    /// wrong shape for a caller who:
    ///
    /// - Operates multiple `JmapClient` instances against different
    ///   shards or accounts but uses the **same** credential holder
    ///   (e.g. a shared OAuth token-refresh state machine, a shared
    ///   service-account principal).
    /// - Wants a credential refresh in one client to be visible to all
    ///   sibling clients without rebuilding each one.
    ///
    /// This constructor takes a pre-built `Arc<dyn AuthProvider>` so
    /// callers can clone the Arc and pass clones to multiple
    /// `JmapClient::new_with_shared_auth` calls. The auth provider is
    /// then shared by reference-count, and any interior-mutable state
    /// (e.g. an `RwLock<TokenState>` inside a custom `OAuthAuth`
    /// implementation that holds a refreshable bearer) is genuinely
    /// shared across all sibling clients.
    ///
    /// Arguments mirror [`JmapClient::new`] otherwise.
    ///
    /// ```rust,ignore
    /// use std::sync::Arc;
    /// use jmap_base_client::{auth::{AuthProvider, BearerAuth, DefaultTransport}, client::{JmapClient, ClientConfig}};
    ///
    /// let auth: Arc<dyn AuthProvider> = Arc::new(BearerAuth::new("token")?);
    /// let shard_a = JmapClient::new_with_shared_auth(
    ///     DefaultTransport,
    ///     auth.clone(),
    ///     "https://a.example.com",
    ///     ClientConfig::default(),
    /// )?;
    /// let shard_b = JmapClient::new_with_shared_auth(
    ///     DefaultTransport,
    ///     auth,
    ///     "https://b.example.com",
    ///     ClientConfig::default(),
    /// )?;
    /// ```
    pub fn new_with_shared_auth(
        transport: impl TransportConfig,
        auth: Arc<dyn AuthProvider>,
        base_url: &str,
        config: ClientConfig,
    ) -> Result<Self, ClientError> {
        let parsed = parse_base_url(base_url)?;
        config.validate()?;
        // Unwrap the opaque HttpClient at construction time (bd:JMAP-6r7c.36).
        let http = transport.build_client()?.into_inner();
        Ok(Self {
            base_url: parsed,
            auth,
            http,
            config,
        })
    }

    /// Apply the auth header (if any) to a request builder.
    ///
    /// Centralises the repeated `if let Some(...) = self.auth.auth_header()` pattern
    /// that every HTTP method uses. Callers: `fetch_session`, `call`,
    /// `subscribe_events`, `upload_blob`, `download_blob`.
    pub(crate) fn inject_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
        if let Some(header) = self.auth.auth_header() {
            builder.header(header.name(), header.expose_value())
        } else {
            builder
        }
    }

    /// Returns `Err(ClientError::AuthFailed)` when the HTTP status indicates an
    /// authentication or authorization failure.
    ///
    /// Specifically handles:
    /// - 401 Unauthorized (RFC 7235 §3.1) — missing or invalid credentials
    /// - 403 Forbidden (RFC 7235 §3.2) — credentials present but insufficient
    ///
    /// Called before reading the response body so callers can distinguish
    /// permanent auth failures from transient errors without consuming the body.
    pub(crate) fn check_auth_status(status: reqwest::StatusCode) -> Result<(), ClientError> {
        if status == reqwest::StatusCode::UNAUTHORIZED || status == reqwest::StatusCode::FORBIDDEN {
            Err(ClientError::AuthFailed(status.as_u16()))
        } else {
            Ok(())
        }
    }
}

/// Read a response body into memory, capped at `limit` bytes.
///
/// Performs the Content-Length pre-check as a fast-path rejection for honest
/// oversized responses, then streams the body chunk-by-chunk and enforces the
/// cap before each accumulation. This prevents buffering a response that
/// exceeds the limit when Content-Length is absent (chunked Transfer-Encoding)
/// or under-reported by a hostile server — without per-chunk streaming, a
/// `.bytes().await` call would buffer the entire response before the
/// post-buffer length check could fire (bd:JMAP-6r7c.1).
///
/// Threat model: matters when connecting to untrusted servers (federation
/// peers, third-party JMAP services, DNS-hijacked endpoints, MITM scenarios).
/// On low-memory devices the peak allocation from naive buffering can OOM-kill
/// the client even if the post-buffer check would otherwise reject.
///
/// Returns `ClientError::ResponseTooLarge` when either the advertised
/// Content-Length or the accumulated chunk total exceeds `limit`.
pub(crate) async fn read_capped_body(
    resp: reqwest::Response,
    limit: u64,
) -> Result<Vec<u8>, ClientError> {
    if let Some(len) = resp.content_length() {
        if len > limit {
            return Err(ClientError::ResponseTooLarge { actual: len, limit });
        }
    }

    let mut stream = resp.bytes_stream();
    let mut body: Vec<u8> = Vec::new();
    while let Some(chunk) = stream.next().await {
        let chunk = chunk.map_err(ClientError::from_reqwest)?;
        let new_len = body.len() as u64 + chunk.len() as u64;
        if new_len > limit {
            return Err(ClientError::ResponseTooLarge {
                actual: new_len,
                limit,
            });
        }
        body.extend_from_slice(&chunk);
    }
    Ok(body)
}

impl JmapClient {
    /// Fetch the JMAP Session object from `{base_url}/.well-known/jmap` (RFC 8620 §2).
    ///
    /// The response body is capped at 1 MiB. Returns `ClientError::ResponseTooLarge`
    /// if the server sends more. Session URL fields (`apiUrl`, `uploadUrl`,
    /// `downloadUrl`, `eventSourceUrl`) are validated to have http/https scheme;
    /// a non-http scheme returns `ClientError::InvalidSession`.
    ///
    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
    ///
    /// # Charset
    ///
    /// The response body MUST be UTF-8-encoded JSON (RFC 8259 §8.1). A server
    /// that sends UTF-16 or UTF-32 JSON — even with a matching
    /// `charset=utf-16` `Content-Type` parameter — will fail to parse as
    /// `ClientError::Parse`; the error does not specifically call out the
    /// charset mismatch. Every shipped JMAP server uses UTF-8, but a
    /// non-conformant server can produce a confusing parse error
    /// (bd:JMAP-6r7c.28).
    pub async fn fetch_session(&self) -> Result<Session, ClientError> {
        let limit = self.config.max_session_body;
        let url = self.base_url.join(".well-known/jmap").map_err(|e| {
            // base_url was pre-validated in JmapClient::new; this path is
            // unreachable in practice but must be handled for completeness.
            ClientError::InvalidSession(format!("cannot construct session URL: {e}"))
        })?;

        let req = self.inject_auth(self.http.get(url).timeout(self.config.request_timeout));

        let resp = {
            let raw_resp = req.send().await.map_err(ClientError::from_reqwest)?;
            Self::check_auth_status(raw_resp.status())?;
            raw_resp
                .error_for_status()
                .map_err(ClientError::from_reqwest)?
        };

        // Stream the body chunk-by-chunk with the cap enforced before each
        // accumulation. The Content-Length pre-check inside read_capped_body
        // is a fast-path rejection for honest oversized responses; the
        // streaming cap is the actual DOS guard against a hostile server
        // that under-reports Content-Length or omits it entirely under
        // chunked Transfer-Encoding (bd:JMAP-6r7c.1).
        let body = read_capped_body(resp, limit).await?;

        let session: Session = serde_json::from_slice(&body).map_err(ClientError::from_parse)?;

        validate_session_url_schemes(&session)?;

        Ok(session)
    }

    /// POST a [`jmap_types::JmapRequest`] to `api_url` and return the parsed [`jmap_types::JmapResponse`]
    /// (RFC 8620 §3.3).
    ///
    /// `api_url` is taken as an explicit parameter (not from `self`) because the
    /// caller holds a [`Session`] and selects the correct URL from it.
    ///
    /// The response body is capped at 8 MiB. Returns `ClientError::ResponseTooLarge`
    /// if the server sends more.
    ///
    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403.
    ///
    /// # Charset
    ///
    /// The response body MUST be UTF-8-encoded JSON (RFC 8259 §8.1). A server
    /// that sends UTF-16 or UTF-32 JSON — even with a matching
    /// `charset=utf-16` `Content-Type` parameter — will fail to parse as
    /// `ClientError::Parse`; the error does not specifically call out the
    /// charset mismatch. Every shipped JMAP server uses UTF-8, but a
    /// non-conformant server can produce a confusing parse error
    /// (bd:JMAP-6r7c.28).
    ///
    /// # See also
    ///
    /// Prefer [`JmapClient::call_session`] when you have a [`Session`] —
    /// it picks the correct URL field automatically and prevents the
    /// "I passed `session.upload_url` instead of `session.api_url`"
    /// confusion (bd:JMAP-6r7c.39).
    pub async fn call(
        &self,
        api_url: &str,
        req: &jmap_types::JmapRequest,
    ) -> Result<jmap_types::JmapResponse, ClientError> {
        require_http_url(api_url)?;
        let limit = self.config.max_call_body;

        let builder = self.inject_auth(
            self.http
                .post(api_url)
                .json(req)
                .timeout(self.config.request_timeout),
        );

        let resp = {
            let raw_resp = builder.send().await.map_err(ClientError::from_reqwest)?;
            Self::check_auth_status(raw_resp.status())?;
            raw_resp
                .error_for_status()
                .map_err(ClientError::from_reqwest)?
        };

        // Stream the body chunk-by-chunk with the cap enforced before each
        // accumulation (bd:JMAP-6r7c.1). See `read_capped_body` for the DOS
        // rationale; without per-chunk streaming, a server that under-reports
        // or omits Content-Length can force unbounded allocation here.
        let body = read_capped_body(resp, limit).await?;

        let jmap_resp: jmap_types::JmapResponse =
            serde_json::from_slice(&body).map_err(ClientError::from_parse)?;

        Ok(jmap_resp)
    }

    /// POST a [`jmap_types::JmapRequest`] to the `api_url` field of `session`
    /// and return the parsed response (bd:JMAP-6r7c.39).
    ///
    /// Type-safe alternative to [`JmapClient::call`]: takes a [`Session`]
    /// reference and reads `session.api_url` internally. The
    /// "I passed `session.upload_url` instead of `session.api_url`"
    /// confusion is impossible at the call site because the caller does
    /// not select a URL — only `Session::api_url` is used.
    ///
    /// Same body cap, auth, and error semantics as [`JmapClient::call`].
    pub async fn call_session(
        &self,
        session: &Session,
        req: &jmap_types::JmapRequest,
    ) -> Result<jmap_types::JmapResponse, ClientError> {
        self.call(session.api_url.as_str(), req).await
    }

    /// Open an SSE connection to `event_source_url` and return an async stream
    /// of parsed [`SseFrame`]s (RFC 8620 §7.3).
    ///
    /// # URI template expansion
    ///
    /// `Session.event_source_url` is a URI template (RFC 6570 Level-1) with
    /// variables `types`, `closeafter`, and `ping`. You **must** expand it
    /// before passing it to this function, or the server will receive the
    /// literal text `{types}` in the URL and return an error. Use
    /// [`expand_url_template`](crate::expand_url_template):
    ///
    /// ```rust,ignore
    /// let url = jmap_base_client::expand_url_template(
    ///     &session.event_source_url,
    ///     &[("types", "*"), ("closeafter", "no"), ("ping", "0")],
    /// )?;
    /// let stream = client.subscribe_events(&url, None).await?;
    /// ```
    ///
    /// If `last_event_id` is `Some`, sends a `Last-Event-ID` header so the
    /// server can resume from where the previous stream left off.
    ///
    /// Buffer growth is capped at [`ClientConfig::max_sse_frame`] bytes per
    /// frame (default: 1 MiB). If a single SSE frame exceeds this limit the
    /// stream yields `ClientError::SseFrameTooLarge` and terminates.
    ///
    /// No timeout is applied to this call or to the resulting stream.  The
    /// connect timeout (10 s, TCP only) is the only deadline enforced.  If the
    /// server stalls before sending HTTP response headers, or later goes silent
    /// on the open connection, this call or the stream will hang indefinitely.
    /// Wrap the entire call and/or stream iteration in [`tokio::time::timeout`]
    /// if you need to bound either phase.
    ///
    /// Returns `ClientError::AuthFailed` on HTTP 401 or 403 before the stream
    /// starts.
    ///
    /// # Stream drop and cancellation (bd:JMAP-6r7c.24)
    ///
    /// The returned `BoxStream` may be dropped at any point — mid-frame,
    /// while awaiting `StreamExt::next`, or from inside a `tokio::select!`
    /// losing-branch cancellation. Dropping is always safe and always
    /// synchronous:
    ///
    /// - **Partial frame bytes are discarded.** Any bytes accumulated in
    ///   the internal `raw_buf` or `buf` that have not yet been parsed
    ///   into an [`SseFrame`] are lost. There is no buffering or replay
    ///   inside the client — the server is the source of truth for what
    ///   was emitted vs what was acknowledged.
    /// - **The underlying HTTP connection is released.** The
    ///   `reqwest::Response::bytes_stream` held inside the stream is
    ///   dropped along with the stream; reqwest returns the connection
    ///   to its pool (or closes it) per its own pool policy.
    /// - **Resumption is the caller's job.** If you want to resume from
    ///   the last successfully-parsed frame, capture the most recent
    ///   `SseFrame::id` (if the server sets one) and pass it as
    ///   `last_event_id` on the next `subscribe_events` call. The
    ///   server will replay events from that point per RFC 8895 §9.
    ///
    /// `tokio::select!` cancellation is the canonical use case: a caller
    /// racing the SSE stream against a shutdown signal can drop the
    /// stream by selecting the shutdown branch without leaking the HTTP
    /// connection or memory.
    pub async fn subscribe_events(
        &self,
        event_source_url: &str,
        last_event_id: Option<&str>,
    ) -> Result<futures::stream::BoxStream<'static, Result<SseFrame, ClientError>>, ClientError>
    {
        require_http_url(event_source_url)?;
        let mut req = self
            .http
            .get(event_source_url)
            .header("Accept", "text/event-stream");
        if let Some(id) = last_event_id {
            req = req.header("Last-Event-ID", id);
        }
        let req = self.inject_auth(req);

        let resp = req.send().await.map_err(ClientError::from_reqwest)?;
        Self::check_auth_status(resp.status())?;
        let resp = resp.error_for_status().map_err(ClientError::from_reqwest)?;

        // Verify Content-Type before streaming. A misconfigured server returning
        // application/json would silently produce no events (no SSE delimiter found).
        {
            // to_ascii_lowercase(): media types are case-insensitive per RFC 7231 §3.1.1.1.
            let ct = resp
                .headers()
                .get(reqwest::header::CONTENT_TYPE)
                .and_then(|v| v.to_str().ok())
                .unwrap_or("")
                .to_ascii_lowercase();
            // RFC 7231 §3.1.1.1 / RFC 9110 §8.3: the media-type "essence" (type
            // + "/" + subtype) is bounded by ';', SP, HTAB, or end-of-string.
            // A naive starts_with("text/event-stream") would accept
            // "text/event-streamish" or "text/event-stream2" — exactly the bug
            // JMAP-6lsm.2 flagged. Split off the parameter list / trailing
            // whitespace and compare the essence exactly.
            let essence = ct
                .split(|c: char| c == ';' || c.is_whitespace())
                .next()
                .unwrap_or("");
            if essence != "text/event-stream" {
                return Err(ClientError::UnexpectedResponse(format!(
                    "subscribe_events: expected Content-Type text/event-stream, got: {ct:?}"
                )));
            }
        }

        let byte_stream = resp.bytes_stream();
        let sse_frame_limit = self.config.max_sse_frame;

        Ok(futures::stream::unfold(
            Some(SseStreamState {
                stream: byte_stream,
                raw_buf: Vec::new(),
                buf: String::new(),
                scan_from: 0, // invariant: valid UTF-8 char boundary of buf; 0 always satisfies this
            }),
            move |state| async move {
                let SseStreamState {
                    mut stream,
                    mut raw_buf,
                    mut buf,
                    mut scan_from,
                } = state?;
                loop {
                    // Search for any double-newline delimiter (LF/CRLF/CR variants).
                    // scan_from is set to old_len.saturating_sub(3) after each append
                    // so we only re-scan the overlap region.  3 bytes back is the
                    // minimum that covers all delimiter prefixes that can straddle a
                    // chunk boundary:
                    //   - `\r\n\r\n` (4 bytes): longest prefix that fits in one chunk
                    //     but is incomplete is `\r\n\r` (3 bytes) — exactly covered.
                    //   - `\n\r\n` (3 bytes): longest incomplete prefix is `\n\r` (2
                    //     bytes) — covered by the 3-byte overlap.
                    //   - `\n\n` and `\r\r` (2 bytes each): longest incomplete prefix
                    //     is 1 byte — covered by the 3-byte overlap.
                    // Since \r and \n are single-byte UTF-8 codepoints, 3 bytes back
                    // is always a valid char boundary — no adjustment needed.
                    //
                    // Do not simplify to "rescan the whole buffer" (bd:JMAP-6r7c.19).
                    // The four-delimiter min_by_key + 3-byte overlap pattern is
                    // load-bearing for four independent reasons:
                    //
                    //   1. Overlap-rescan is O(chunk_size) per chunk; whole-buffer
                    //      rescan is O(n) per chunk, i.e. O(n²) over the lifetime
                    //      of a long-lived SSE stream. JMAP push streams can run
                    //      for hours with thousands of small events; the
                    //      difference is measurable.
                    //   2. All four delimiter forms are necessary. The WHATWG
                    //      EventSource spec + RFC 8895 line-terminator rules let
                    //      a single server emit mixed line endings within one
                    //      frame. min_by_key over all four catches the mixed
                    //      `\n\r\n` case that neither `\n\n` nor `\r\n\r\n`
                    //      alone catches. Removing forms produces silent
                    //      frame-misalignment on those servers.
                    //   3. The walk-back to a UTF-8 char boundary on the
                    //      `scan_from = old_len.saturating_sub(3)` assignment
                    //      below is needed because the byte preceding scan_from
                    //      might be in the middle of a multi-byte codepoint;
                    //      `buf[scan_from..]` would panic on the next `.find()`
                    //      without it.
                    //   4. Tests in tests/client_tests.rs pin all three line-
                    //      ending variants against hand-crafted server bodies:
                    //      `test_subscribe_events_crlf_line_endings`,
                    //      `test_subscribe_events_lf_crlf_frame_delimiter`,
                    //      and `test_subscribe_events_cr_line_endings`. The
                    //      mixed `\n\r\n` case (the second test) is the
                    //      trickiest and is the canonical example of why all
                    //      four delimiter searches are present.
                    //
                    // Resist the "just rescan the whole buffer" refactor.
                    let frame_end = [
                        buf[scan_from..]
                            .find("\r\n\r\n")
                            .map(|p| (scan_from + p, 4usize)),
                        buf[scan_from..]
                            .find("\n\n")
                            .map(|p| (scan_from + p, 2usize)),
                        buf[scan_from..]
                            .find("\r\r")
                            .map(|p| (scan_from + p, 2usize)),
                        // Mixed: LF-terminated last field line followed by a
                        // CRLF-terminated blank line.  Not detected by \n\n (the
                        // LFs are separated by a CR) or \r\n\r\n (no leading \r\n).
                        buf[scan_from..]
                            .find("\n\r\n")
                            .map(|p| (scan_from + p, 3usize)),
                    ]
                    .into_iter()
                    .flatten()
                    .min_by_key(|&(pos, _)| pos);

                    if let Some((pos, delim_len)) = frame_end {
                        let frame = {
                            let slice = &buf[..pos];
                            if slice.contains('\r') {
                                slice.replace("\r\n", "\n").replace('\r', "\n")
                            } else {
                                slice.to_owned()
                            }
                        };
                        buf.drain(..pos + delim_len);
                        scan_from = 0; // 0 satisfies the UTF-8 char boundary invariant
                        let sse_frame = parse_sse_block(&frame);
                        return Some((
                            Ok(sse_frame),
                            Some(SseStreamState {
                                stream,
                                raw_buf,
                                buf,
                                scan_from,
                            }),
                        ));
                    }

                    match stream.next().await {
                        None => return None,
                        Some(Err(e)) => {
                            return Some((Err(ClientError::from_reqwest(e)), None));
                        }
                        Some(Ok(bytes)) => {
                            // Accumulate raw bytes first. A multi-byte UTF-8 codepoint
                            // may be split across adjacent HTTP chunks; decode only the
                            // valid prefix and leave the remainder in raw_buf until the
                            // next chunk completes the sequence.
                            raw_buf.extend_from_slice(&bytes);
                            // Cap raw_buf to prevent OOM on persistent invalid UTF-8 input.
                            // Use the same limit as the decoded buf cap.
                            if raw_buf.len() > sse_frame_limit {
                                return Some((
                                    Err(ClientError::SseFrameTooLarge {
                                        limit: sse_frame_limit,
                                    }),
                                    None,
                                ));
                            }
                            let old_len = buf.len();
                            decode_utf8_chunk(&mut raw_buf, &mut buf);
                            scan_from = old_len.saturating_sub(3);
                            // Walk backward to a valid UTF-8 char boundary so that
                            // buf[scan_from..] never panics on multibyte characters.
                            while scan_from > 0 && !buf.is_char_boundary(scan_from) {
                                scan_from -= 1;
                            }
                            // Guard against unbounded buffer growth from a hostile server.
                            // Yield the error and terminate (state = None).
                            if buf.len() > sse_frame_limit {
                                return Some((
                                    Err(ClientError::SseFrameTooLarge {
                                        limit: sse_frame_limit,
                                    }),
                                    None,
                                ));
                            }
                        }
                    }
                }
            },
        )
        .boxed())
    }

    /// Open a WebSocket connection to `ws_url` using this client's
    /// configured `max_ws_message` byte cap.
    ///
    /// Convenience wrapper around [`crate::ws::connect_ws_with_limit`] that
    /// passes [`ClientConfig::max_ws_message`] as the per-message /
    /// per-frame byte cap. Mirrors the [`Self::subscribe_events`]
    /// pattern of "the JmapClient method uses ClientConfig; the free
    /// function takes an explicit value".
    ///
    /// `ws_url` must come from the session document's WebSocket capability
    /// URL. `auth_header` is an optional `(name, value)` pair for the
    /// upgrade request; the auth provider on this client is NOT used here
    /// because some servers attach WebSocket auth via cookie or session
    /// header rather than the same scheme as HTTP requests.
    ///
    /// # Security
    ///
    /// The `auth_header` value is a credential and must not be logged or
    /// echoed back to other systems. Treat it with the same care as a
    /// [`crate::auth::BearerAuth`] token. Transport errors raised by this
    /// method are constructed without the original credential bytes, but
    /// downstream code that inspects [`ClientError`] should still avoid
    /// printing or storing the `auth_header` itself.
    ///
    /// Returns [`ClientError::InvalidArgument`] for non-`ws://`/`wss://` URLs.
    /// See [`crate::ws::connect_ws_with_limit`] for full error semantics.
    pub async fn connect_ws_session(
        &self,
        ws_url: &str,
        auth_header: Option<crate::auth::AuthHeader<'_>>,
    ) -> Result<crate::ws::WsSession, ClientError> {
        crate::ws::connect_ws_with_limit(ws_url, auth_header, self.config.max_ws_message).await
    }

    /// Open an SSE connection via a [`Session`]-supplied
    /// URL template (bd:JMAP-6r7c.64).
    ///
    /// Type-safe convenience wrapper over [`Self::subscribe_events`] —
    /// expands `session.event_source_url` internally using the
    /// caller-supplied [`SubscribeEventsSessionParams`] template
    /// variables. The caller cannot accidentally pass `session.api_url`
    /// or any other URL field because no URL is exposed at the call
    /// site.
    ///
    /// Template variables that are `None` expand to an empty string,
    /// per the RFC 8620 §7.3 default-omission semantics. Most JMAP
    /// servers accept `types=` (subscribe to all types),
    /// `closeafter=` (stay-open), and `ping=` (no server pings) as
    /// defaults; if your server requires explicit values, supply them
    /// via the params struct.
    pub async fn subscribe_events_session(
        &self,
        session: &Session,
        params: SubscribeEventsSessionParams<'_>,
    ) -> Result<futures::stream::BoxStream<'static, Result<SseFrame, ClientError>>, ClientError>
    {
        let SubscribeEventsSessionParams {
            types,
            close_after,
            ping,
            last_event_id,
        } = params;

        // RFC 8620 §7.3 event_source_url variables: types, closeafter, ping.
        // ping is u32; format to a String only when needed so the empty-default
        // path stays allocation-free.
        let ping_owned = ping.map(|p| p.to_string());
        let vars = [
            ("types", types.unwrap_or("")),
            ("closeafter", close_after.unwrap_or("")),
            ("ping", ping_owned.as_deref().unwrap_or("")),
        ];
        let expanded = crate::blob::expand_url_template(session.event_source_url.as_str(), &vars)?;
        self.subscribe_events(&expanded, last_event_id).await
    }
}

/// Parameters for [`JmapClient::subscribe_events_session`]
/// (bd:JMAP-6r7c.64).
///
/// Carries the RFC 8620 §7.3 event_source_url template variables
/// (`types`, `closeafter`, `ping`) plus the optional Last-Event-ID
/// header (RFC 8895 §9). `None` template values expand to an empty
/// string per the RFC's default-omission semantics.
///
/// Construct with a struct literal:
///
/// ```rust,ignore
/// client.subscribe_events_session(&session, SubscribeEventsSessionParams {
///     types: Some("Email,Mailbox"),
///     close_after: Some("state"),
///     ping: Some(60),
///     last_event_id: Some("evt-1234"),
/// }).await?;
/// ```
#[derive(Debug, Clone, Copy, Default)]
pub struct SubscribeEventsSessionParams<'a> {
    /// `types` template variable: comma-separated list of JMAP data-type
    /// names to subscribe to (`"Email,Mailbox"`), or `"*"` for all types
    /// the server supports. `None` expands to an empty string.
    pub types: Option<&'a str>,
    /// `closeafter` template variable: `"state"` to close after the
    /// first state-change push, `"no"` for stay-open. `None` expands to
    /// an empty string.
    pub close_after: Option<&'a str>,
    /// `ping` template variable: server-ping interval in seconds, or
    /// `0` to disable. `None` expands to an empty string.
    pub ping: Option<u32>,
    /// Optional `Last-Event-ID` header for resumption (RFC 8895 §9).
    pub last_event_id: Option<&'a str>,
}

/// Find the method response matching `call_id` in `resp` and deserialize its
/// arguments into `T`.
///
/// Returns [`ClientError::MethodNotFound`] if no invocation with the given
/// call_id exists. Returns [`ClientError::MethodError`] if any invocation
/// with the matching call_id is a JMAP `"error"` response (RFC 8620 §3.6.1).
///
/// # Multiple invocations sharing a call_id
///
/// Per RFC 8620 §3.2, a single method call may produce multiple invocations
/// in the response — for example, `Foo/copy` with `onSuccessDestroyOriginal:
/// true` produces both a `Foo/copy` and an implicit `Foo/set` invocation,
/// both stamped with the same call_id (RFC 8620 §5.8 example, lines 3158–
/// 3180). This function handles that case by:
///
/// 1. **Errors take precedence.** If any invocation matching `call_id`
///    has method name `"error"`, this function returns that error. A
///    success response cannot mask a sibling error response with the same
///    call_id — silently returning the success while the server reported
///    failure would be data loss for the caller.
/// 2. Otherwise, the **first** non-error invocation matching `call_id`
///    is deserialized into `T`. In the §5.8 implicit-method case both
///    invocations are successes; the first is the primary response and
///    is what the caller wants.
///
/// # Contract: `call_id` is the only matcher (bd:JMAP-6r7c.23)
///
/// The method-name field of the matching invocation is **not** checked
/// against `T`. The function trusts the caller's choice of `call_id` to
/// identify the invocation and trusts the server's choice of method name
/// to be consistent with the request it was answering. Two consequences
/// callers should be aware of:
///
/// - **Wrong `T` may still parse.** If a caller asks for
///   `extract_response::<EmailGetResponse>(&resp, "r1")` but the matching
///   invocation is actually a `Mailbox/get` response, `serde_json::from_value`
///   will succeed on any structural overlap between the two shapes (both
///   carry `accountId`, an `ids`/`list` field, etc.) and return a
///   default-shaped `EmailGetResponse`. The function cannot detect this
///   because it has no view of what method name the caller expected.
/// - **Server ordering is trusted.** When multiple non-error invocations
///   share a `call_id`, the function returns the first one in
///   `resp.method_responses` order. RFC 8620 §5.8 implies the primary
///   response is first, but the spec does not normatively require it; a
///   non-conformant server that returns the implicit method first would
///   silently get the wrong invocation deserialized as `T`.
///
/// Callers that need method-name verification or want to disambiguate
/// among multiple non-error matches should iterate
/// `resp.method_responses` directly. The field is public and the
/// [`jmap_types::Invocation`] type is `(method, args, call_id)`.
///
/// # Panics
///
/// This function does not catch panics from `T`'s [`serde::Deserialize`]
/// implementation. If a custom `T` type's `deserialize` impl panics — e.g.
/// because of an `.unwrap()` on a sub-field — the panic propagates through
/// `extract_response` to the caller's await point. The standard derived
/// `Deserialize` impls in the workspace type crates (`jmap-types`,
/// `jmap-mail-types`, etc.) do not panic; this caveat only affects
/// hand-rolled `Deserialize` impls outside the workspace
/// (bd:JMAP-6r7c.44).
///
/// This function is `pub` so extension crates (`jmap-chat-client`,
/// `jmap-mail-client`) can use it to extract typed results from a
/// [`jmap_types::JmapResponse`] without depending on internal details.
pub fn extract_response<T: serde::de::DeserializeOwned>(
    resp: &jmap_types::JmapResponse,
    call_id: &str,
) -> Result<T, ClientError> {
    // Invocation is a type alias (String, Value, String) = (method, args, call_id).
    // Two-pass scan: errors take precedence per §3.6.1, so look for an error
    // invocation first; otherwise return the first non-error invocation.
    // The underlying iterator is slice::Iter, so .clone() is a cheap pointer
    // copy — no allocation.
    let mut candidates = resp.method_responses.iter().filter(|inv| inv.2 == call_id);

    if let Some(err_inv) = candidates.clone().find(|inv| inv.0 == "error") {
        let args = &err_inv.1;
        let err_type = args
            .get("type")
            .and_then(|v| v.as_str())
            .unwrap_or("serverError") // safe: fallback literal, not user input
            .to_owned();
        let description = args
            .get("description")
            .and_then(|v| v.as_str())
            .map(str::to_owned);
        return Err(ClientError::MethodError {
            error_type: err_type,
            description,
        });
    }

    // The early return above already handled the error case; every remaining
    // candidate is a success invocation, so we just take the first one.
    let inv = candidates
        .next()
        .ok_or_else(|| ClientError::MethodNotFound(call_id.to_owned()))?;
    <T as serde::Deserialize>::deserialize(&inv.1).map_err(ClientError::from_parse)
}

/// Decode as much valid UTF-8 as possible from `raw` into `buf`, draining
/// consumed (or definitively-invalid) bytes from `raw`.
///
/// Three cases:
/// - `raw` is fully valid UTF-8: pushed entirely to `buf`, `raw` cleared.
/// - `raw` ends with an **incomplete** multi-byte sequence (`error_len == None`):
///   the valid prefix is pushed to `buf` and drained from `raw`; the incomplete
///   head bytes stay in `raw` for the next chunk to complete them.
/// - `raw` contains a **definitively invalid** byte sequence (`error_len == Some(n)`):
///   the valid prefix is pushed to `buf`; the valid prefix AND the `n` invalid
///   bytes are drained from `raw` so they do not accumulate.
///
/// The caller is responsible for capping `raw.len()` before calling this
/// function; unbounded growth from a hostile server that never completes a
/// sequence is prevented by the 1 MiB `SSE_BUF_SIZE_LIMIT` check in
/// `subscribe_events`.
fn decode_utf8_chunk(raw: &mut Vec<u8>, buf: &mut String) {
    match std::str::from_utf8(raw) {
        Ok(s) => {
            buf.push_str(s);
            raw.clear();
        }
        Err(e) => {
            let valid_up_to = e.valid_up_to();
            // valid_up_to is the documented prefix length that IS valid UTF-8
            // by construction (std::str::Utf8Error contract). The crate-root
            // #![forbid(unsafe_code)] rules out std::str::from_utf8_unchecked,
            // which would be the canonical zero-cost path for "I have a slice
            // I know is valid UTF-8 but the type system doesn't". With unsafe
            // forbidden, .expect() is the cheapest legal alternative; the
            // expect message tracks the soundness argument so a future
            // reviewer does not "fix" this with unwrap_or_default()
            // (which would silently drop valid UTF-8 prefix bytes on a
            // hypothetical impossible failure path) (bd:JMAP-6r7c.54).
            buf.push_str(
                std::str::from_utf8(&raw[..valid_up_to])
                    .expect("valid_up_to is a valid UTF-8 boundary"),
            );
            match e.error_len() {
                Some(n) => {
                    // Definitively invalid: drain the valid prefix AND the
                    // invalid bytes so they do not accumulate in raw.
                    let drain_end = (valid_up_to + n).min(raw.len());
                    raw.drain(..drain_end);
                }
                None => {
                    // Incomplete multi-byte sequence: drain only the valid
                    // prefix.  The incomplete head stays in raw until the next
                    // chunk arrives with the missing continuation bytes.
                    raw.drain(..valid_up_to);
                }
            }
        }
    }
}

/// Extract the URL scheme as a borrowed slice of `url`.
///
/// Returns the prefix before `"://"`. The returned slice is in the
/// *original* case of the input (callers must use [`str::eq_ignore_ascii_case`]
/// for the comparison per RFC 3986 §3.1, which says only schemes are
/// case-insensitive). Returns `None` if `"://"` is not present in `url`.
/// URL templates containing `{variable}` syntax are handled correctly
/// because the extraction is a prefix scan, not a full URL parse.
///
/// Returning a borrowed slice rather than a `to_ascii_lowercase()` String
/// avoids allocating on every scheme check; previously each call to
/// `url_scheme` produced a fresh String the size of the URL prefix
/// (bd:JMAP-6lsm.10).
fn url_scheme(url: &str) -> Option<&str> {
    url.split_once("://").map(|(scheme, _)| scheme)
}

/// `true` if `url`'s scheme prefix is `http` or `https` (case-insensitive,
/// RFC 3986 §3.1).
fn is_http_or_https(url: &str) -> bool {
    url_scheme(url)
        .is_some_and(|s| s.eq_ignore_ascii_case("http") || s.eq_ignore_ascii_case("https"))
}

/// Parse and validate a JMAP base URL.
///
/// The input must be:
/// - non-empty
/// - syntactically valid (parses with [`url::Url::parse`])
/// - http or https scheme (case-insensitive per RFC 3986 §3.1)
/// - origin-only: no explicit path component, no query string, no fragment.
///   "Origin" here means scheme + host + optional port — the form a JMAP
///   server is identified by. Trailing slashes are accepted (the URL
///   parser normalizes them away).
///
/// Examples that PASS: `https://jmap.example.com`,
/// `https://jmap.example.com/`, `https://10.0.0.1:8008`,
/// `http://localhost:8080`.
///
/// Examples that FAIL: `""`, `https://example.com/api`,
/// `https://example.com?query=1`, `https://example.com#fragment`,
/// `ftp://example.com`.
///
/// Extracted from [`JmapClient::new`] so the constructor reads as
/// 'parse-don't-validate' rather than a six-step inline procedure
/// (bd:JMAP-6lsm.26).
fn parse_base_url(base_url: &str) -> Result<url::Url, ClientError> {
    if base_url.is_empty() {
        return Err(ClientError::InvalidArgument(
            "base_url may not be empty".into(),
        ));
    }
    let parsed = url::Url::parse(base_url)
        .map_err(|e| ClientError::InvalidArgument(format!("base_url is not a valid URL: {e}")))?;
    // Reject RFC 3986 user-info (`user:password@host`) before any further
    // validation. The url crate's Display impl echoes user-info verbatim
    // (lossless-round-trip, NOT safe-display — RFC 3986 §7.5 warns
    // against the latter), so an unrejected user-info value would surface
    // through every subsequent error message, every reqwest::Error's
    // url() field, and any tracing instrumentation that captures the
    // base URL or a derived ClientError. JMAP authenticates via the
    // Authorization header (AuthProvider trait), not URL user-info; the
    // user-info component has no legitimate use here (bd:JMAP-6r7c.58).
    //
    // The error message DOES NOT echo `base_url` back — doing so would
    // route the password into the error chain we are trying to keep it
    // out of.
    if !parsed.username().is_empty() || parsed.password().is_some() {
        return Err(ClientError::InvalidArgument(
            "base_url must not contain user-info (`user:password@host`); pass credentials via \
             AuthProvider instead"
                .into(),
        ));
    }
    let scheme = parsed.scheme();
    if scheme != "http" && scheme != "https" {
        return Err(ClientError::InvalidArgument(format!(
            "base_url scheme must be http or https, got: {scheme:?}"
        )));
    }
    let path = parsed.path();
    // url::Url::path() returns "/" for root-only URLs (no path segments);
    // any value other than "/" means the URL contains an explicit path component.
    if path != "/" {
        return Err(ClientError::InvalidArgument(format!(
            "base_url must not have a path component, got: {path:?}"
        )));
    }
    if parsed.query().is_some() {
        return Err(ClientError::InvalidArgument(
            "base_url must not have a query string".into(),
        ));
    }
    if parsed.fragment().is_some() {
        return Err(ClientError::InvalidArgument(
            "base_url must not have a fragment".into(),
        ));
    }
    Ok(parsed)
}

/// Validate that `url` uses an http or https scheme.
///
/// Called at the top of each public method that accepts a URL parameter
/// (`call`, `subscribe_events`, `upload_blob`, `download_blob`).  This is a
/// defense-in-depth check: the primary protection is
/// [`validate_session_url_schemes`] which rejects bad URLs in the Session
/// document at fetch time.  This check makes each individual call site
/// self-defending against accidentally passing a non-http URL (e.g. from a
/// test fixture or a misused API).
///
/// Returns [`ClientError::InvalidArgument`] if the scheme is not http/https.
pub(crate) fn require_http_url(url: &str) -> Result<(), ClientError> {
    if !is_http_or_https(url) {
        return Err(ClientError::InvalidArgument(format!(
            "URL must have http or https scheme, got: {url:?}"
        )));
    }
    Ok(())
}

/// Validate the *schemes only* of the four session URL fields.
///
/// Three of the four (`upload_url`, `download_url`, `event_source_url`) are
/// RFC 6570 URI templates carrying `{accountId}`, `{blobId}`, `{types}`,
/// etc. They cannot be fully parsed as URLs without first being expanded
/// with the relevant variables, but the scheme prefix is always concrete
/// (templates put the scheme on the left of `://`). This function does the
/// minimal check that the scheme prefix is `http://` or `https://` — it
/// does NOT verify that the templates carry the required variables, that
/// the host is well-formed, or that the path is reachable.
///
/// Renamed from `validate_session_urls` for accuracy (bd:JMAP-6lsm.23):
/// the name implied stronger validation than the function actually
/// performs.
fn validate_session_url_schemes(session: &Session) -> Result<(), ClientError> {
    // Mixed types (JmapUrl, JmapUrlTemplate) deliberately — both expose
    // .as_str() identically. Iterating &str values keeps the validation
    // loop oblivious to the typed-URL distinction (bd:JMAP-6r7c.40),
    // which is the right scope for a scheme check.
    for url in [
        session.api_url.as_str(),
        session.upload_url.as_str(),
        session.download_url.as_str(),
        session.event_source_url.as_str(),
    ] {
        if !is_http_or_https(url) {
            return Err(ClientError::InvalidSession(format!(
                "session URL has non-http/https scheme: {url:?}"
            )));
        }
    }
    Ok(())
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::decode_utf8_chunk;

    /// Oracle: all-ASCII bytes pushed to buf; raw cleared.
    #[test]
    fn decode_utf8_chunk_all_ascii() {
        let mut raw = b"hello".to_vec();
        let mut buf = String::new();
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(buf, "hello");
        assert!(raw.is_empty());
    }

    /// Oracle: complete multi-byte codepoint (U+00E9 café = 0xC3 0xA9) pushed fully.
    #[test]
    fn decode_utf8_chunk_complete_multibyte() {
        let mut raw = "café".as_bytes().to_vec();
        let mut buf = String::new();
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(buf, "café");
        assert!(raw.is_empty());
    }

    /// Oracle: first byte of a 2-byte sequence (U+00E9 = 0xC3 0xA9) arrives alone.
    /// error_len == None (incomplete) — the byte must be RETAINED in raw, not dropped.
    /// Regression test for the bug where valid_up_to.max(1) discarded this byte.
    #[test]
    fn decode_utf8_chunk_incomplete_head_retained() {
        let mut raw = vec![0xC3u8]; // first byte of 2-byte sequence
        let mut buf = String::new();
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(buf, "", "no complete codepoints to push");
        assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
    }

    /// Oracle: valid ASCII prefix then first byte of a 2-byte sequence.
    /// Valid prefix goes to buf; incomplete head stays in raw.
    #[test]
    fn decode_utf8_chunk_prefix_then_incomplete_head() {
        let mut raw = vec![b'a', b'b', 0xC3u8];
        let mut buf = String::new();
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(buf, "ab");
        assert_eq!(raw, vec![0xC3u8], "incomplete head must stay in raw");
    }

    /// Oracle: two-call simulation — incomplete sequence completed on second call.
    /// This is the exact HTTP chunk-split scenario the fix targets.
    #[test]
    fn decode_utf8_chunk_split_sequence_completed() {
        // Chunk 1: only the first byte of U+00E9 (é = 0xC3 0xA9)
        let mut raw = vec![0xC3u8];
        let mut buf = String::new();
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(raw, vec![0xC3u8], "incomplete head retained after chunk 1");

        // Chunk 2: completion byte
        raw.push(0xA9u8);
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(buf, "é", "character fully decoded after chunk 2");
        assert!(raw.is_empty());
    }

    /// Oracle: definitively invalid byte (0xFF is never valid in UTF-8) is drained.
    #[test]
    fn decode_utf8_chunk_invalid_byte_drained() {
        let mut raw = vec![0xFFu8];
        let mut buf = String::new();
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(buf, "");
        assert!(raw.is_empty(), "definitively invalid byte must be drained");
    }

    /// Oracle: valid prefix then definitively invalid byte — prefix pushed and both drained.
    #[test]
    fn decode_utf8_chunk_prefix_then_invalid_drained() {
        let mut raw = vec![b'a', b'b', 0xFFu8];
        let mut buf = String::new();
        decode_utf8_chunk(&mut raw, &mut buf);
        assert_eq!(buf, "ab");
        assert!(raw.is_empty(), "prefix and invalid byte must be drained");
    }

    /// Compile-time assertion that [`JmapClient`] is `Send + Sync + Clone`
    /// (bd:JMAP-6r7c.25). A future refactor that adds a non-`Sync` field
    /// (e.g. `Rc<T>`, `RefCell<T>`, `Cell<T>`) would fail this test in CI
    /// before any downstream consumer that shares a `JmapClient` across
    /// tokio tasks is exposed. The body is fenceposts: the function bodies
    /// of `assert_send`, `assert_sync`, and `assert_clone` never run; what
    /// matters is that the trait-bound monomorphization succeeds.
    #[test]
    fn jmap_client_is_send_sync_clone() {
        fn assert_send<T: Send>() {}
        fn assert_sync<T: Sync>() {}
        fn assert_clone<T: Clone>() {}
        assert_send::<super::JmapClient>();
        assert_sync::<super::JmapClient>();
        assert_clone::<super::JmapClient>();
    }
}