rsurl 0.0.4

A pure-Rust implementation of curl. Library, C FFI, and CLI for HTTP/HTTPS/FTP/FTPS.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
//! WebSocket support (RFC 6455).
//!
//! WS handshakes are HTTP/1.1 `Upgrade: websocket` requests followed by
//! binary/text frames. We perform the handshake by hand (so we can sit on the
//! raw stream without buffered-reader leftovers eating into the frame
//! channel), then drive a proper frame loop. For `wss://`, the TCP stream is
//! wrapped with [`crate::tls::connect_over`] before sending the upgrade.
//!
//! What this module does:
//!   * Send-side data frames: `send_message` writes a masked client
//!     text/binary frame (client frames MUST be masked, RFC 6455 §5.3).
//!   * Receive-side reassembly: `read_message` runs a frame loop that
//!     stitches an initial data frame (FIN=0) and its CONTINUATION frames
//!     (opcode 0x0) back into one message, enforcing the
//!     `MAX_PAYLOAD_BYTES` cap on the *cumulative* reassembled size so a
//!     fragmented bomb can't slip past it.
//!   * Control frames inline: a PING is answered with a PONG echoing its
//!     application data, an unsolicited PONG is ignored, and a CLOSE is
//!     answered with a CLOSE before returning cleanly. Control frames are
//!     handled both while waiting for the first data frame and in between
//!     fragments. Per §5.4/§5.5 control frames must not be fragmented and
//!     carry at most 125 bytes; violations are rejected as protocol errors.
//!   * permessage-deflate (RFC 7692): the upgrade request offers the
//!     extension (with `client_no_context_takeover` /
//!     `server_no_context_takeover` and `client_max_window_bits`). If the
//!     server agrees, data messages whose first frame has RSV1 set are
//!     per-message DEFLATE-compressed (RFC 1951 raw deflate) — we append the
//!     `00 00 FF FF` empty-block terminator the sender strips and inflate,
//!     bounding the inflated size against `MAX_PAYLOAD_BYTES` so a
//!     compression bomb can't bypass the cap. Outgoing messages are deflated
//!     and flagged with RSV1 when compression is negotiated. RSV1 is rejected
//!     when compression was *not* negotiated, RSV2/RSV3 are always rejected,
//!     and RSV1 on a control frame is rejected. See `Pmd` for the
//!     context-takeover decision.
//!
//! Limitations of this scaffold (intentionally deferred):
//!   * Streaming/large payloads — the whole message is buffered in memory.
//!   * Ping *intervals* / timer-driven keepalive; we react to peer pings but
//!     do not proactively send our own on a schedule.

use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::Duration;

use compcol::deflate::Deflate;
use compcol::limit::LimitedDecoder;
use compcol::vec::compress_to_vec;
use compcol::{Algorithm, Decoder, Status};
use purecrypto::hash::{Digest, Sha1};

use crate::error::{Error, Result};
use crate::tls::TlsStream;
use crate::url::Url;

const WS_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

/// The empty-block terminator a permessage-deflate sender strips from the
/// tail of each deflated message (RFC 7692 §7.2.1) and the receiver appends
/// back before inflating (§7.2.2).
const DEFLATE_TAIL: [u8; 4] = [0x00, 0x00, 0xFF, 0xFF];

const OPCODE_CONT: u8 = 0x0;
const OPCODE_TEXT: u8 = 0x1;
const OPCODE_BINARY: u8 = 0x2;
const OPCODE_CLOSE: u8 = 0x8;
const OPCODE_PING: u8 = 0x9;
const OPCODE_PONG: u8 = 0xA;

const MAX_PAYLOAD_BYTES: u64 = 64 * 1024 * 1024;

/// The permessage-deflate offer we put in the upgrade request. We advertise
/// `client_no_context_takeover` and `server_no_context_takeover` so each
/// message inflates/deflates independently (no sliding window carried across
/// messages), which keeps state bounded and the implementation simple. We
/// also offer `client_max_window_bits` so a server that wants to shrink our
/// (notional) window can; we don't carry context anyway, so any value it
/// echoes is fine.
const PMD_OFFER: &str =
    "permessage-deflate; client_no_context_takeover; server_no_context_takeover; client_max_window_bits";

/// Negotiated permessage-deflate (RFC 7692) state for a connection.
///
/// ## Context-takeover decision
///
/// We always operate our **send** side in no-context-takeover mode: every
/// outgoing message is an independent raw-DEFLATE stream produced by a fresh
/// encoder. That is why we offer `client_no_context_takeover` — it lets the
/// peer reset its inflater per message to match.
///
/// On the **receive** side we honour what the server negotiated. The
/// underlying `compcol` raw-deflate decoder keeps a 32 KiB sliding window and
/// can carry it across separate `decode` calls, so a persistent inflate
/// context across messages is supported: we keep one decoder and only
/// [`reset`](Decoder::reset) it per message when `server_no_context_takeover`
/// was agreed. Since our offer always includes `server_no_context_takeover`,
/// a compliant server typically agrees to it and we reset each message; but
/// if it declines, the persistent-window path keeps us correct.
struct Pmd {
    /// `client_no_context_takeover` was agreed: our encoder is fresh per
    /// message (always true for us — we never carry send context). Recorded
    /// for completeness/observability; our send path is unconditionally
    /// no-context-takeover, so we don't branch on it.
    #[allow(dead_code)]
    client_no_context_takeover: bool,
    /// `server_no_context_takeover` was agreed: reset the inflate decoder
    /// before each incoming message rather than carrying its window.
    server_no_context_takeover: bool,
    /// Persistent inflate context, reused across messages when the server
    /// did not agree `server_no_context_takeover`.
    decoder: <Deflate as Algorithm>::Decoder,
}

impl Pmd {
    /// Inflate one full (reassembled) compressed message payload. Appends the
    /// stripped `00 00 FF FF` terminator (RFC 7692 §7.2.2) and runs raw
    /// DEFLATE, bounding the inflated output at `MAX_PAYLOAD_BYTES` so a
    /// compression bomb can't slip past the cap. The decoder's sliding window
    /// is carried across messages unless `server_no_context_takeover` was
    /// negotiated, in which case it is reset first.
    fn inflate_message(&mut self, compressed: &[u8]) -> Result<Vec<u8>> {
        if self.server_no_context_takeover {
            self.decoder.reset();
        }
        // RFC 7692 §7.2.2: append the empty-block terminator the sender
        // stripped, then inflate.
        let mut input = Vec::with_capacity(compressed.len() + DEFLATE_TAIL.len());
        input.extend_from_slice(compressed);
        input.extend_from_slice(&DEFLATE_TAIL);

        // Bound the inflated output against the cap, exactly like compress.rs's
        // LimitedDecoder path, so a compressed bomb can't expand past
        // MAX_PAYLOAD_BYTES. `LimitedDecoder` takes the decoder by value, so we
        // move our persistent decoder into it, run, then recover it via
        // `into_inner` — preserving its 32 KiB window for the next message
        // (the context-takeover path).
        let taken = std::mem::replace(&mut self.decoder, Deflate::decoder());
        let mut limited = LimitedDecoder::new(taken, MAX_PAYLOAD_BYTES);

        let result = Self::run_inflate(&mut limited, &input);
        // Restore the (now advanced) decoder regardless of outcome.
        self.decoder = limited.into_inner();
        result
    }

    /// Drive the bounded decoder over `input`, collecting all inflated output.
    /// Stops at stream end (our own BFINAL=1 messages) or when the input is
    /// exhausted with no further progress (a peer's sync-flushed message,
    /// whose final empty block leaves the decoder parked in `InputEmpty`).
    fn run_inflate(
        limited: &mut LimitedDecoder<<Deflate as Algorithm>::Decoder>,
        input: &[u8],
    ) -> Result<Vec<u8>> {
        let mut out: Vec<u8> = Vec::new();
        let mut scratch = vec![0u8; 64 * 1024];
        let mut consumed = 0usize;
        loop {
            let before_consumed = consumed;
            let before_written = out.len();
            let (p, status) = limited
                .decode(&input[consumed..], &mut scratch)
                .map_err(|e| {
                    Error::BadResponse(format!("permessage-deflate inflate failed: {e}"))
                })?;
            out.extend_from_slice(&scratch[..p.written]);
            consumed += p.consumed;
            match status {
                Status::StreamEnd => break,
                Status::OutputFull => continue,
                Status::InputEmpty => {
                    if consumed >= input.len()
                        || (consumed == before_consumed && out.len() == before_written)
                    {
                        break;
                    }
                }
            }
        }
        Ok(out)
    }
}

/// Deflate one application message for sending (RFC 7692 §7.2.1). Produces a
/// fresh raw-DEFLATE stream (no context carried — we always operate the send
/// side in no-context-takeover mode). `compcol`'s encoder terminates the
/// stream with a BFINAL=1 block rather than a `Z_SYNC_FLUSH`, so there is no
/// trailing `00 00 FF FF` to strip; the receiver appends its own terminator
/// and, on hitting our final block, recovers the exact payload (the appended
/// tail is then harmlessly ignored). This is spec-legal and, paired with our
/// `client_no_context_takeover` offer, lets the peer reset its inflater each
/// message.
fn deflate_message(payload: &[u8]) -> Result<Vec<u8>> {
    let mut out = compress_to_vec::<Deflate>(payload)
        .map_err(|e| Error::BadResponse(format!("permessage-deflate deflate failed: {e}")))?;
    // If the encoder ever did emit the sync-flush terminator, strip it per
    // §7.2.1. compcol terminates with a final block today, so this is a
    // forward-compatible no-op, but it keeps us correct if that changes.
    if out.ends_with(&DEFLATE_TAIL) {
        out.truncate(out.len() - DEFLATE_TAIL.len());
    }
    Ok(out)
}

/// Open a WS connection, read one full text or binary message (reassembling
/// fragments and answering any interleaved ping/close control frames), send a
/// close, and return that message's payload.
pub fn fetch(url: &Url) -> Result<Vec<u8>> {
    match url.scheme.as_str() {
        "ws" => {
            let mut sock = tcp_connect(url)?;
            let mut pmd = handshake(&mut sock, url)?;
            read_data_and_close(&mut sock, pmd.as_mut())
        }
        "wss" => {
            let tcp = tcp_connect(url)?;
            let mut tls = crate::tls::connect_over(tcp, &url.host)?;
            let mut pmd = handshake(&mut tls, url)?;
            read_data_and_close(&mut tls, pmd.as_mut())
        }
        other => Err(Error::UnsupportedScheme(other.to_string())),
    }
}

fn tcp_connect(url: &Url) -> Result<TcpStream> {
    let addr = format!("{}:{}", url.host, url.port);
    let addrs: Vec<_> = std::net::ToSocketAddrs::to_socket_addrs(&addr)?.collect();
    let first = addrs
        .into_iter()
        .next()
        .ok_or_else(|| Error::InvalidUrl(url.host.clone()))?;
    let stream = TcpStream::connect_timeout(&first, Duration::from_secs(30))?;
    stream.set_read_timeout(Some(Duration::from_secs(60)))?;
    stream.set_write_timeout(Some(Duration::from_secs(60)))?;
    Ok(stream)
}

/// Drive the HTTP/1.1 upgrade handshake on `stream`. After this returns, the
/// stream sits at the first byte of the first WS frame.
///
/// Returns `Some(Pmd)` if the server agreed to permessage-deflate (RFC 7692),
/// `None` otherwise (in which case the connection operates uncompressed,
/// exactly as before).
fn handshake<S: Read + Write>(stream: &mut S, url: &Url) -> Result<Option<Pmd>> {
    let key_bytes: [u8; 16] = random_16()?;
    let key_b64 = base64_encode(&key_bytes);

    let host_header =
        if (url.scheme == "ws" && url.port == 80) || (url.scheme == "wss" && url.port == 443) {
            url.host.clone()
        } else {
            format!("{}:{}", url.host, url.port)
        };

    let path = if url.path.is_empty() {
        "/"
    } else {
        url.path.as_str()
    };

    let req = format!(
        "GET {path} HTTP/1.1\r\n\
         Host: {host_header}\r\n\
         Upgrade: websocket\r\n\
         Connection: Upgrade\r\n\
         Sec-WebSocket-Key: {key_b64}\r\n\
         Sec-WebSocket-Version: 13\r\n\
         Sec-WebSocket-Extensions: {PMD_OFFER}\r\n\
         \r\n"
    );
    stream.write_all(req.as_bytes())?;
    stream.flush()?;

    // Read the response headers byte-by-byte so we don't over-read into the
    // post-handshake WS frame stream. RFC 6455 requires the response end at
    // \r\n\r\n with no extra data, so this is fine.
    let mut buf: Vec<u8> = Vec::with_capacity(512);
    loop {
        let mut b = [0u8; 1];
        let n = stream.read(&mut b)?;
        if n == 0 {
            return Err(Error::UnexpectedEof);
        }
        buf.push(b[0]);
        if buf.len() >= 4 && &buf[buf.len() - 4..] == b"\r\n\r\n" {
            break;
        }
        if buf.len() > 64 * 1024 {
            return Err(Error::BadResponse("handshake response too large".into()));
        }
    }

    let head = std::str::from_utf8(&buf)
        .map_err(|_| Error::BadResponse("non-utf8 handshake response".into()))?;
    let mut lines = head.split("\r\n");
    let status_line = lines
        .next()
        .ok_or_else(|| Error::BadResponse("empty handshake response".into()))?;
    if !(status_line.starts_with("HTTP/1.1 101") || status_line.starts_with("HTTP/1.0 101")) {
        return Err(Error::BadResponse(format!(
            "expected 101 Switching Protocols, got: {status_line:?}"
        )));
    }

    let mut upgrade_ok = false;
    let mut connection_ok = false;
    let mut accept_value: Option<String> = None;
    let mut extensions_value: Option<String> = None;
    for line in lines {
        if line.is_empty() {
            break;
        }
        let (k, v) = match line.split_once(':') {
            Some((k, v)) => (k.trim(), v.trim()),
            None => continue,
        };
        if k.eq_ignore_ascii_case("upgrade") {
            if v.eq_ignore_ascii_case("websocket") {
                upgrade_ok = true;
            }
        } else if k.eq_ignore_ascii_case("connection") {
            // Connection can be a comma-separated list of tokens.
            if v.split(',')
                .any(|t| t.trim().eq_ignore_ascii_case("upgrade"))
            {
                connection_ok = true;
            }
        } else if k.eq_ignore_ascii_case("sec-websocket-accept") {
            accept_value = Some(v.to_string());
        } else if k.eq_ignore_ascii_case("sec-websocket-extensions") {
            // A server may repeat the header; concatenate as the spec allows
            // a comma-joined list to be split across lines.
            match &mut extensions_value {
                Some(existing) => {
                    existing.push_str(", ");
                    existing.push_str(v);
                }
                None => extensions_value = Some(v.to_string()),
            }
        }
    }
    if !upgrade_ok {
        return Err(Error::BadResponse(
            "missing or wrong Upgrade header in handshake response".into(),
        ));
    }
    if !connection_ok {
        return Err(Error::BadResponse(
            "missing or wrong Connection header in handshake response".into(),
        ));
    }
    let accept = accept_value
        .ok_or_else(|| Error::BadResponse("missing Sec-WebSocket-Accept header".into()))?;
    let expected = derive_accept(&key_b64);
    if accept != expected {
        return Err(Error::BadResponse(format!(
            "Sec-WebSocket-Accept mismatch: got {accept:?}, expected {expected:?}"
        )));
    }

    // If the server accepted permessage-deflate, enable compression and record
    // the negotiated context-takeover parameters. Otherwise operate
    // uncompressed exactly as before.
    let pmd = extensions_value.as_deref().and_then(parse_pmd_response);

    Ok(pmd)
}

/// Parse a `Sec-WebSocket-Extensions` response value and, if it selects
/// `permessage-deflate`, return the negotiated `Pmd` state. Returns `None`
/// if permessage-deflate was not selected.
///
/// The header is a comma-separated list of extensions, each a semicolon-
/// separated list of `token[=value]` parameters (RFC 7692 §7 / RFC 6455 §9.1).
/// We look only at the first `permessage-deflate` offer the server returned
/// (a compliant server returns at most one) and read the two
/// context-takeover flags; `*_max_window_bits` values are accepted but, since
/// we never carry a window on our side and bound the inflate output by byte
/// count regardless, they don't change our behaviour.
fn parse_pmd_response(value: &str) -> Option<Pmd> {
    for ext in value.split(',') {
        let mut params = ext.split(';').map(str::trim);
        let name = params.next()?;
        if !name.eq_ignore_ascii_case("permessage-deflate") {
            continue;
        }
        let mut client_no_context_takeover = false;
        let mut server_no_context_takeover = false;
        for param in params {
            if param.is_empty() {
                continue;
            }
            // A parameter may be `token` or `token=value`; we only key off the
            // token names here.
            let token = param.split('=').next().unwrap_or(param).trim();
            if token.eq_ignore_ascii_case("client_no_context_takeover") {
                client_no_context_takeover = true;
            } else if token.eq_ignore_ascii_case("server_no_context_takeover") {
                server_no_context_takeover = true;
            }
            // client_max_window_bits / server_max_window_bits are accepted but
            // intentionally ignored (see the doc comment).
        }
        return Some(Pmd {
            client_no_context_takeover,
            server_no_context_takeover,
            decoder: Deflate::decoder(),
        });
    }
    None
}

/// What `read_message` produced for the caller.
#[derive(Debug, Clone, PartialEq, Eq)]
enum Message {
    /// A reassembled text or binary message.
    Data { opcode: u8, payload: Vec<u8> },
    /// The peer initiated a close; we have already answered it.
    Closed,
}

/// Max payload of a control frame (RFC 6455 §5.5).
const MAX_CONTROL_PAYLOAD: usize = 125;

/// Run the receive frame loop until one full data message is reassembled or
/// the peer closes. Control frames (ping/pong/close) are handled inline:
///
///   * PING → reply with a PONG echoing the application data.
///   * PONG → ignored (unsolicited keepalive response).
///   * CLOSE → reply with a CLOSE and return [`Message::Closed`].
///
/// Control frames are honored both before the first data frame and in between
/// fragments. Data fragments (initial frame with FIN=0 followed by
/// CONTINUATION frames until FIN=1) are stitched together, with the
/// cumulative size enforced against `MAX_PAYLOAD_BYTES` so a fragmented
/// payload cannot exceed the cap that a single frame would be held to.
///
/// permessage-deflate (RFC 7692 §7.2.2): if `pmd` is `Some` (the extension
/// was negotiated) and the FIRST frame of a data message has RSV1 set, the
/// reassembled payload is raw-DEFLATE-compressed and is inflated before
/// returning, with the cumulative cap enforced on the *inflated* size. RSV1 is
/// rejected when compression was not negotiated; RSV1 on a control frame is
/// rejected; RSV1 only carries meaning on the first frame of a message (it
/// must be 0 on continuation frames).
fn read_message<S: Read + Write>(stream: &mut S, mut pmd: Option<&mut Pmd>) -> Result<Message> {
    // State for an in-progress fragmented data message. `None` means we are
    // not currently inside a fragmentation chain.
    let mut frag_opcode: Option<u8> = None;
    // Whether the in-progress message is permessage-deflate compressed (RSV1
    // was set on its first frame).
    let mut compressed = false;
    let mut buf: Vec<u8> = Vec::new();

    loop {
        let frame = read_frame(stream)?;

        // Control frames (opcode >= 0x8) may be interleaved between fragments
        // but MUST NOT themselves be fragmented and MUST have payload <= 125.
        if frame.opcode >= 0x8 {
            if frame.rsv1 {
                return Err(Error::BadResponse("RSV1 set on a WS control frame".into()));
            }
            if !frame.fin {
                return Err(Error::BadResponse(
                    "fragmented control frame (FIN=0 on a control opcode)".into(),
                ));
            }
            if frame.payload.len() > MAX_CONTROL_PAYLOAD {
                return Err(Error::BadResponse(format!(
                    "control frame payload too large: {} bytes (max {MAX_CONTROL_PAYLOAD})",
                    frame.payload.len()
                )));
            }
            match frame.opcode {
                OPCODE_PING => {
                    let pong = build_client_frame(OPCODE_PONG, &frame.payload)?;
                    stream.write_all(&pong)?;
                    stream.flush()?;
                    continue;
                }
                OPCODE_PONG => continue,
                OPCODE_CLOSE => {
                    // Echo the close (masked, per §5.3). Best-effort: if we
                    // can't build it we still report a clean close.
                    if let Ok(close) = build_client_frame(OPCODE_CLOSE, &[]) {
                        let _ = stream.write_all(&close);
                        let _ = stream.flush();
                    }
                    return Ok(Message::Closed);
                }
                other => {
                    return Err(Error::BadResponse(format!(
                        "unknown WS control opcode 0x{other:x}"
                    )));
                }
            }
        }

        // Data / continuation frames.
        match frame.opcode {
            OPCODE_TEXT | OPCODE_BINARY => {
                if frag_opcode.is_some() {
                    return Err(Error::BadResponse(
                        "new data frame began while a fragmented message was in progress".into(),
                    ));
                }
                // RSV1 on the first data frame means a compressed message — but
                // only if permessage-deflate was negotiated. Reject otherwise.
                if frame.rsv1 {
                    if pmd.is_none() {
                        return Err(Error::BadResponse(
                            "RSV1 set on a WS frame but permessage-deflate was not negotiated"
                                .into(),
                        ));
                    }
                    compressed = true;
                }
                accumulate(&mut buf, &frame.payload)?;
                if frame.fin {
                    return finish_data_message(frame.opcode, buf, compressed, pmd.as_deref_mut());
                }
                frag_opcode = Some(frame.opcode);
            }
            OPCODE_CONT => {
                let opcode = frag_opcode.ok_or_else(|| {
                    Error::BadResponse("continuation frame with no message in progress".into())
                })?;
                // RSV1 is only meaningful on the first frame of a message; a
                // continuation frame must clear it (RFC 7692 §7.2.2).
                if frame.rsv1 {
                    return Err(Error::BadResponse(
                        "RSV1 set on a WS continuation frame".into(),
                    ));
                }
                accumulate(&mut buf, &frame.payload)?;
                if frame.fin {
                    return finish_data_message(opcode, buf, compressed, pmd.as_deref_mut());
                }
            }
            other => {
                return Err(Error::BadResponse(format!("unknown WS opcode 0x{other:x}")));
            }
        }
    }
}

/// Finalise a reassembled data message: inflate it if it was permessage-
/// deflate compressed, then hand it back as a [`Message::Data`]. When
/// `compressed` is set, `pmd` must be `Some` (the read loop only sets
/// `compressed` after confirming negotiation).
fn finish_data_message(
    opcode: u8,
    payload: Vec<u8>,
    compressed: bool,
    pmd: Option<&mut Pmd>,
) -> Result<Message> {
    if compressed {
        let pmd = pmd.ok_or_else(|| {
            Error::BadResponse("compressed WS message without negotiated permessage-deflate".into())
        })?;
        let inflated = pmd.inflate_message(&payload)?;
        Ok(Message::Data {
            opcode,
            payload: inflated,
        })
    } else {
        Ok(Message::Data { opcode, payload })
    }
}

/// Append `chunk` to the reassembly buffer, enforcing the cumulative cap.
/// `read_frame` already bounds a single frame; this guards against many
/// small fragments adding up past `MAX_PAYLOAD_BYTES`.
fn accumulate(buf: &mut Vec<u8>, chunk: &[u8]) -> Result<()> {
    let total = buf.len() as u64 + chunk.len() as u64;
    if total > MAX_PAYLOAD_BYTES {
        return Err(Error::BadResponse(format!(
            "reassembled WS message too large: {total} bytes (max {MAX_PAYLOAD_BYTES})"
        )));
    }
    buf.extend_from_slice(chunk);
    Ok(())
}

/// Send a masked client data frame. `opcode` must be [`OPCODE_TEXT`] or
/// [`OPCODE_BINARY`]; the payload is masked per RFC 6455 §5.3 using the
/// crate's CSPRNG. Exposed for a transfer/CLI layer to drive the send side
/// (not yet wired into the one-shot `fetch` path, hence `dead_code`).
///
/// When `pmd` is `Some` (permessage-deflate was negotiated), the payload is
/// raw-DEFLATE-compressed (RFC 7692 §7.2.1) and the frame's RSV1 bit is set.
/// We always compress and never carry encoder context across messages, so
/// each message is an independent stream (consistent with our
/// `client_no_context_takeover` offer).
#[allow(dead_code)]
fn send_message<S: Write>(
    stream: &mut S,
    opcode: u8,
    payload: &[u8],
    pmd: Option<&mut Pmd>,
) -> Result<()> {
    if opcode != OPCODE_TEXT && opcode != OPCODE_BINARY {
        return Err(Error::BadResponse(format!(
            "send_message expects a data opcode (text/binary), got 0x{opcode:x}"
        )));
    }
    let frame = if pmd.is_some() {
        let compressed = deflate_message(payload)?;
        build_client_frame_rsv1(opcode, &compressed)?
    } else {
        build_client_frame(opcode, payload)?
    };
    stream.write_all(&frame)?;
    stream.flush()?;
    Ok(())
}

/// Read frames until a full data message is reassembled, then send a close
/// frame and return that message's payload. Interleaved pings are answered
/// with pongs; a close from the server short-circuits to returning whatever
/// (likely empty) payload we have collected.
fn read_data_and_close<S: Read + Write>(stream: &mut S, pmd: Option<&mut Pmd>) -> Result<Vec<u8>> {
    let payload = match read_message(stream, pmd)? {
        Message::Data { payload, .. } => payload,
        // Peer closed before sending data; read_message already replied.
        Message::Closed => return Ok(Vec::new()),
    };

    // Polite close. Client→server frames must be masked (RFC 6455 §5.3),
    // including close frames; with a zero-length payload there's nothing to
    // mask, but we send the properly masked variant anyway to stay
    // spec-clean. A failure to obtain entropy for the close frame is
    // non-fatal: we've already captured the payload, so just skip the polite
    // close in that (extremely unlikely) case rather than discarding a good
    // result.
    if let Ok(close) = build_client_frame(OPCODE_CLOSE, &[]) {
        let _ = stream.write_all(&close);
        let _ = stream.flush();
    }
    Ok(payload)
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct Frame {
    fin: bool,
    /// RSV1 bit. For data messages this signals a permessage-deflate
    /// compressed payload (RFC 7692 §7.2.2) when set on the first frame.
    rsv1: bool,
    opcode: u8,
    payload: Vec<u8>,
}

/// Parse a single frame off the wire. Server-to-client frames must NOT be
/// masked per RFC 6455 §5.1; a masked frame is rejected as a protocol error.
///
/// RSV2 and RSV3 are always rejected (no extension that uses them is
/// negotiated). RSV1 is surfaced via [`Frame::rsv1`]; whether it is legal
/// depends on context (permessage-deflate negotiation + frame type), which is
/// enforced by the caller in `read_message`.
fn read_frame<S: Read>(stream: &mut S) -> Result<Frame> {
    let mut header = [0u8; 2];
    read_exact(stream, &mut header)?;
    let fin = (header[0] & 0x80) != 0;
    let rsv1 = (header[0] & 0x40) != 0;
    // RSV2/RSV3 must always be zero — no extension using them is negotiated.
    if (header[0] & 0x30) != 0 {
        return Err(Error::BadResponse(
            "non-zero RSV2/RSV3 bits on incoming WS frame".into(),
        ));
    }
    let opcode = header[0] & 0x0F;
    let masked = (header[1] & 0x80) != 0;
    if masked {
        return Err(Error::BadResponse(
            "server-to-client frame is masked".into(),
        ));
    }
    let len7 = header[1] & 0x7F;
    let payload_len: u64 = match len7 {
        0..=125 => len7 as u64,
        126 => {
            let mut ext = [0u8; 2];
            read_exact(stream, &mut ext)?;
            u16::from_be_bytes(ext) as u64
        }
        127 => {
            let mut ext = [0u8; 8];
            read_exact(stream, &mut ext)?;
            u64::from_be_bytes(ext)
        }
        _ => unreachable!(),
    };
    if payload_len > MAX_PAYLOAD_BYTES {
        return Err(Error::BadResponse(format!(
            "WS payload too large: {payload_len} bytes"
        )));
    }
    let mut payload = vec![0u8; payload_len as usize];
    if payload_len > 0 {
        read_exact(stream, &mut payload)?;
    }
    Ok(Frame {
        fin,
        rsv1,
        opcode,
        payload,
    })
}

/// Build an unfragmented client-to-server frame with the given opcode and
/// payload. Client frames must be masked (RFC 6455 §5.3), and the mask must
/// be unpredictable, so this fails if no secure entropy source is available.
fn build_client_frame(opcode: u8, payload: &[u8]) -> Result<Vec<u8>> {
    build_client_frame_inner(opcode, payload, false)
}

/// Like [`build_client_frame`] but with the RSV1 bit set, marking the payload
/// as permessage-deflate compressed (RFC 7692 §7.2.1). Only valid on a data
/// frame; callers guarantee that.
fn build_client_frame_rsv1(opcode: u8, payload: &[u8]) -> Result<Vec<u8>> {
    build_client_frame_inner(opcode, payload, true)
}

fn build_client_frame_inner(opcode: u8, payload: &[u8], rsv1: bool) -> Result<Vec<u8>> {
    let mask: [u8; 4] = {
        let r = random_16()?;
        [r[0], r[1], r[2], r[3]]
    };
    let mut out = Vec::with_capacity(2 + 8 + 4 + payload.len());
    let rsv1_bit = if rsv1 { 0x40 } else { 0x00 };
    out.push(0x80 | rsv1_bit | (opcode & 0x0F)); // FIN=1 (+ RSV1) + opcode
    let n = payload.len();
    if n < 126 {
        out.push(0x80 | (n as u8));
    } else if n <= u16::MAX as usize {
        out.push(0x80 | 126);
        out.extend_from_slice(&(n as u16).to_be_bytes());
    } else {
        out.push(0x80 | 127);
        out.extend_from_slice(&(n as u64).to_be_bytes());
    }
    out.extend_from_slice(&mask);
    let start = out.len();
    out.extend_from_slice(payload);
    for (i, b) in out[start..].iter_mut().enumerate() {
        *b ^= mask[i & 3];
    }
    Ok(out)
}

fn read_exact<R: Read>(r: &mut R, buf: &mut [u8]) -> Result<()> {
    let mut got = 0;
    while got < buf.len() {
        let n = r.read(&mut buf[got..])?;
        if n == 0 {
            return Err(Error::UnexpectedEof);
        }
        got += n;
    }
    Ok(())
}

/// `base64(sha1(key + WS_GUID))`. Used by both sides of the handshake to
/// prove the response was generated specifically for this request.
fn derive_accept(key_b64: &str) -> String {
    let mut h = Sha1::new();
    h.update(key_b64.as_bytes());
    h.update(WS_GUID.as_bytes());
    let digest = h.finalize();
    base64_encode(digest.as_ref())
}

/// 16 cryptographically-random bytes for the `Sec-WebSocket-Key` and frame
/// masks, sourced from the crate's vetted CSPRNG ([`purecrypto::rng::OsRng`],
/// the same source used by `mqtt.rs`).
///
/// `OsRng::fill_bytes` panics if it cannot read OS entropy (e.g. a missing
/// `/dev/urandom` in a locked-down sandbox). We catch that and surface it as
/// a connection error rather than either crashing the process or — worse —
/// falling back to predictable time/PID entropy: a guessable mask weakens the
/// masking the spec relies on, so failing closed is the secure choice.
fn random_16() -> Result<[u8; 16]> {
    use purecrypto::rng::{OsRng, RngCore};
    let mut out = [0u8; 16];
    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
        OsRng.fill_bytes(&mut out);
    }))
    .map_err(|_| Error::BadResponse("websocket: no secure entropy source available".into()))?;
    Ok(out)
}

/// Standard base64 (RFC 4648 §4) with `=` padding. Hand-rolled so we don't
/// pull in another dependency for ~30 lines of work. (Also reused by HTTP
/// Basic auth in `crate::http`.)
pub(crate) fn base64_encode(input: &[u8]) -> String {
    const ALPHA: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
    let mut out = String::with_capacity(input.len().div_ceil(3) * 4);
    let mut i = 0;
    while i + 3 <= input.len() {
        let b0 = input[i];
        let b1 = input[i + 1];
        let b2 = input[i + 2];
        out.push(ALPHA[(b0 >> 2) as usize] as char);
        out.push(ALPHA[(((b0 & 0x03) << 4) | (b1 >> 4)) as usize] as char);
        out.push(ALPHA[(((b1 & 0x0F) << 2) | (b2 >> 6)) as usize] as char);
        out.push(ALPHA[(b2 & 0x3F) as usize] as char);
        i += 3;
    }
    let rem = input.len() - i;
    if rem == 1 {
        let b0 = input[i];
        out.push(ALPHA[(b0 >> 2) as usize] as char);
        out.push(ALPHA[((b0 & 0x03) << 4) as usize] as char);
        out.push('=');
        out.push('=');
    } else if rem == 2 {
        let b0 = input[i];
        let b1 = input[i + 1];
        out.push(ALPHA[(b0 >> 2) as usize] as char);
        out.push(ALPHA[(((b0 & 0x03) << 4) | (b1 >> 4)) as usize] as char);
        out.push(ALPHA[((b1 & 0x0F) << 2) as usize] as char);
        out.push('=');
    }
    out
}

// Silence unused-import warning in builds that take only the `ws://` path
// — `TlsStream` is referenced in docs but not directly in this module
// (we call `crate::tls::connect_over` instead).
#[allow(dead_code)]
fn _tlsstream_in_scope_for_docs<S: Read + Write>(_: TlsStream<S>) {}

#[cfg(test)]
mod tests {
    use super::*;
    use std::io::Cursor;

    /// In-memory duplex stream: `inbound` is what the "server" sends to us
    /// (drained by `read`), `sent` captures what we write back. Lets us drive
    /// the full read/control-frame loop without a socket.
    struct MockStream {
        inbound: Cursor<Vec<u8>>,
        sent: Vec<u8>,
    }

    impl MockStream {
        fn new(inbound: Vec<u8>) -> Self {
            MockStream {
                inbound: Cursor::new(inbound),
                sent: Vec::new(),
            }
        }
    }

    impl Read for MockStream {
        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
            self.inbound.read(buf)
        }
    }

    impl Write for MockStream {
        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
            self.sent.extend_from_slice(buf);
            Ok(buf.len())
        }
        fn flush(&mut self) -> std::io::Result<()> {
            Ok(())
        }
    }

    /// Build an unmasked server-to-client frame (server frames must not be
    /// masked) for feeding into the mock stream.
    fn server_frame(fin: bool, opcode: u8, payload: &[u8]) -> Vec<u8> {
        let mut out = Vec::new();
        let b0 = if fin { 0x80 } else { 0x00 } | (opcode & 0x0F);
        out.push(b0);
        let n = payload.len();
        if n < 126 {
            out.push(n as u8);
        } else if n <= u16::MAX as usize {
            out.push(126);
            out.extend_from_slice(&(n as u16).to_be_bytes());
        } else {
            out.push(127);
            out.extend_from_slice(&(n as u64).to_be_bytes());
        }
        out.extend_from_slice(payload);
        out
    }

    /// Like [`server_frame`] but with the RSV1 bit set on the header — used to
    /// feed permessage-deflate compressed frames into the mock stream.
    fn server_frame_rsv1(fin: bool, opcode: u8, payload: &[u8]) -> Vec<u8> {
        let mut out = server_frame(fin, opcode, payload);
        out[0] |= 0x40; // set RSV1
        out
    }

    /// Raw-DEFLATE-compress `data` with compcol and strip the trailing
    /// `00 00 FF FF` terminator if present, producing a permessage-deflate
    /// message payload (RFC 7692 §7.2.1).
    fn pmd_compress(data: &[u8]) -> Vec<u8> {
        let mut out = compress_to_vec::<Deflate>(data).expect("deflate encode");
        if out.ends_with(&DEFLATE_TAIL) {
            out.truncate(out.len() - DEFLATE_TAIL.len());
        }
        out
    }

    /// A negotiated `Pmd` in `server_no_context_takeover` mode (the common
    /// case for our offer), for driving the receive path in tests.
    fn test_pmd() -> Pmd {
        Pmd {
            client_no_context_takeover: true,
            server_no_context_takeover: true,
            decoder: Deflate::decoder(),
        }
    }

    /// Decode every frame the client wrote into `sent`, returning
    /// `(opcode, unmasked_payload)` pairs. Asserts each is masked, as client
    /// frames must be (RFC 6455 §5.3).
    fn decode_sent(sent: &[u8]) -> Vec<(u8, Vec<u8>)> {
        let mut frames = Vec::new();
        let mut i = 0;
        while i < sent.len() {
            let opcode = sent[i] & 0x0F;
            let masked = (sent[i + 1] & 0x80) != 0;
            assert!(masked, "client frame must be masked");
            let len7 = sent[i + 1] & 0x7F;
            i += 2;
            let len = match len7 {
                0..=125 => len7 as usize,
                126 => {
                    let l = u16::from_be_bytes([sent[i], sent[i + 1]]) as usize;
                    i += 2;
                    l
                }
                127 => {
                    let mut b = [0u8; 8];
                    b.copy_from_slice(&sent[i..i + 8]);
                    i += 8;
                    u64::from_be_bytes(b) as usize
                }
                _ => unreachable!(),
            };
            let mask = [sent[i], sent[i + 1], sent[i + 2], sent[i + 3]];
            i += 4;
            let mut payload = sent[i..i + len].to_vec();
            i += len;
            for (j, b) in payload.iter_mut().enumerate() {
                *b ^= mask[j & 3];
            }
            frames.push((opcode, payload));
        }
        frames
    }

    #[test]
    fn base64_encode_hello() {
        assert_eq!(base64_encode(b"hello"), "aGVsbG8=");
    }

    #[test]
    fn base64_encode_rfc4648_vectors() {
        // Classic RFC 4648 §10 vectors.
        assert_eq!(base64_encode(b""), "");
        assert_eq!(base64_encode(b"f"), "Zg==");
        assert_eq!(base64_encode(b"fo"), "Zm8=");
        assert_eq!(base64_encode(b"foo"), "Zm9v");
        assert_eq!(base64_encode(b"foob"), "Zm9vYg==");
        assert_eq!(base64_encode(b"fooba"), "Zm9vYmE=");
        assert_eq!(base64_encode(b"foobar"), "Zm9vYmFy");
    }

    #[test]
    fn rfc6455_accept_derivation() {
        // The example from RFC 6455 §1.3.
        let key = "dGhlIHNhbXBsZSBub25jZQ==";
        let expected = "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=";
        assert_eq!(derive_accept(key), expected);
    }

    #[test]
    fn parse_short_text_frame() {
        // 0x81 = FIN + opcode 1 (text), 0x05 = unmasked length 5, "hello"
        let bytes = [0x81, 0x05, b'h', b'e', b'l', b'l', b'o'];
        let mut cur = Cursor::new(&bytes[..]);
        let f = read_frame(&mut cur).expect("frame parses");
        assert!(f.fin);
        assert_eq!(f.opcode, OPCODE_TEXT);
        assert_eq!(f.payload, b"hello");
    }

    #[test]
    fn parse_16bit_length_frame() {
        // 200-byte binary payload of 0x41 ('A'), length encoded as 126 + u16.
        let mut bytes: Vec<u8> = vec![0x82, 126, 0x00, 200];
        bytes.extend(std::iter::repeat_n(b'A', 200));
        let mut cur = Cursor::new(bytes);
        let f = read_frame(&mut cur).expect("frame parses");
        assert_eq!(f.opcode, OPCODE_BINARY);
        assert_eq!(f.payload.len(), 200);
        assert!(f.payload.iter().all(|&b| b == b'A'));
    }

    #[test]
    fn reject_masked_server_frame() {
        // MASK bit set, length 0 — server is not allowed to mask.
        let bytes = [0x81, 0x80, 0, 0, 0, 0];
        let mut cur = Cursor::new(&bytes[..]);
        let err = read_frame(&mut cur).expect_err("masked server frame must be rejected");
        match err {
            Error::BadResponse(_) => {}
            other => panic!("wrong error: {other:?}"),
        }
    }

    #[test]
    fn build_close_frame_short_payload() {
        // Empty close frame: header byte 0x88 (FIN + opcode 8), len byte has
        // MASK bit set + payload-len 0, plus a 4-byte mask. Total: 6 bytes.
        let frame = build_client_frame(OPCODE_CLOSE, &[]).unwrap();
        assert_eq!(frame.len(), 6);
        assert_eq!(frame[0], 0x88);
        assert_eq!(frame[1], 0x80); // mask flag set, length 0
    }

    #[test]
    fn build_text_frame_masks_payload() {
        let payload = b"hi";
        let frame = build_client_frame(OPCODE_TEXT, payload).unwrap();
        // Header (2) + mask (4) + payload (2) = 8.
        assert_eq!(frame.len(), 8);
        assert_eq!(frame[0], 0x81);
        assert_eq!(frame[1], 0x82);
        let mask = [frame[2], frame[3], frame[4], frame[5]];
        let unmasked: Vec<u8> = frame[6..]
            .iter()
            .enumerate()
            .map(|(i, &b)| b ^ mask[i & 3])
            .collect();
        assert_eq!(unmasked, payload);
    }

    #[test]
    fn build_frame_uses_16bit_length_for_medium_payload() {
        let payload = vec![0u8; 200];
        let frame = build_client_frame(OPCODE_BINARY, &payload).unwrap();
        assert_eq!(frame[0], 0x82);
        assert_eq!(frame[1], 0x80 | 126);
        let len = u16::from_be_bytes([frame[2], frame[3]]);
        assert_eq!(len, 200);
        // 2 (header) + 2 (ext len) + 4 (mask) + 200 (payload).
        assert_eq!(frame.len(), 208);
    }

    #[test]
    fn build_frame_uses_64bit_length_for_large_payload() {
        let payload = vec![0u8; 70_000];
        let frame = build_client_frame(OPCODE_BINARY, &payload).unwrap();
        assert_eq!(frame[1], 0x80 | 127);
        let len = u64::from_be_bytes([
            frame[2], frame[3], frame[4], frame[5], frame[6], frame[7], frame[8], frame[9],
        ]);
        assert_eq!(len, 70_000);
    }

    #[test]
    fn random_16_is_nonzero() {
        // Astronomically unlikely the CSPRNG returns all zeros.
        let r = random_16().expect("OS entropy available in the test environment");
        assert_ne!(r, [0u8; 16]);
    }

    #[test]
    fn random_16_is_not_constant() {
        // Two draws should differ — a sanity check that we're pulling fresh
        // entropy, not a fixed seed.
        let a = random_16().unwrap();
        let b = random_16().unwrap();
        assert_ne!(a, b);
    }

    #[test]
    fn reassembles_fragmented_text_message() {
        // "Hel" (text, FIN=0) + "lo " (cont, FIN=0) + "world" (cont, FIN=1).
        let mut inbound = server_frame(false, OPCODE_TEXT, b"Hel");
        inbound.extend(server_frame(false, OPCODE_CONT, b"lo "));
        inbound.extend(server_frame(true, OPCODE_CONT, b"world"));
        let mut s = MockStream::new(inbound);
        let msg = read_message(&mut s, None).expect("reassembles");
        assert_eq!(
            msg,
            Message::Data {
                opcode: OPCODE_TEXT,
                payload: b"Hello world".to_vec(),
            }
        );
    }

    #[test]
    fn ping_between_fragments_gets_pong_and_message_completes() {
        // "foo" (text, FIN=0), a PING with "pingdata", then "bar" (cont,
        // FIN=1). The message must still reassemble and a PONG echoing the
        // ping data must have been sent.
        let mut inbound = server_frame(false, OPCODE_TEXT, b"foo");
        inbound.extend(server_frame(true, OPCODE_PING, b"pingdata"));
        inbound.extend(server_frame(true, OPCODE_CONT, b"bar"));
        let mut s = MockStream::new(inbound);
        let msg = read_message(&mut s, None).expect("completes despite ping");
        assert_eq!(
            msg,
            Message::Data {
                opcode: OPCODE_TEXT,
                payload: b"foobar".to_vec(),
            }
        );
        let sent = decode_sent(&s.sent);
        assert_eq!(sent.len(), 1, "exactly one pong expected");
        assert_eq!(sent[0].0, OPCODE_PONG);
        assert_eq!(sent[0].1, b"pingdata");
    }

    #[test]
    fn close_is_answered_and_returns_closed() {
        let inbound = server_frame(true, OPCODE_CLOSE, &[]);
        let mut s = MockStream::new(inbound);
        let msg = read_message(&mut s, None).expect("handles close");
        assert_eq!(msg, Message::Closed);
        let sent = decode_sent(&s.sent);
        assert_eq!(sent.len(), 1, "exactly one close reply expected");
        assert_eq!(sent[0].0, OPCODE_CLOSE);
    }

    #[test]
    fn unsolicited_pong_is_ignored_then_data_returns() {
        let mut inbound = server_frame(true, OPCODE_PONG, b"x");
        inbound.extend(server_frame(true, OPCODE_TEXT, b"hi"));
        let mut s = MockStream::new(inbound);
        let msg = read_message(&mut s, None).expect("ignores pong");
        assert_eq!(
            msg,
            Message::Data {
                opcode: OPCODE_TEXT,
                payload: b"hi".to_vec(),
            }
        );
        // Nothing should have been written for the pong.
        assert!(s.sent.is_empty(), "unsolicited pong must not be answered");
    }

    #[test]
    fn send_message_produces_masked_frame() {
        let mut s = MockStream::new(Vec::new());
        send_message(&mut s, OPCODE_TEXT, b"hello", None).expect("sends");
        // Raw bytes: FIN+text, MASK+len, 4-byte mask, 5 masked bytes.
        assert_eq!(s.sent[0], 0x81);
        assert_eq!(s.sent[1], 0x80 | 5);
        assert_eq!(s.sent.len(), 2 + 4 + 5);
        let decoded = decode_sent(&s.sent);
        assert_eq!(decoded, vec![(OPCODE_TEXT, b"hello".to_vec())]);
    }

    #[test]
    fn send_message_rejects_control_opcode() {
        let mut s = MockStream::new(Vec::new());
        let err = send_message(&mut s, OPCODE_PING, b"x", None)
            .expect_err("control opcode must be rejected for send_message");
        match err {
            Error::BadResponse(_) => {}
            other => panic!("wrong error: {other:?}"),
        }
        assert!(s.sent.is_empty());
    }

    #[test]
    fn oversized_cumulative_fragmented_payload_is_rejected() {
        // Two frames whose individual sizes are fine, but whose sum exceeds
        // MAX_PAYLOAD_BYTES — the cumulative cap must catch it. We forge the
        // header to claim a huge length without actually allocating the bytes
        // would still trip read_frame's per-frame cap, so instead we lower the
        // bar by checking `accumulate` directly against the cap boundary.
        let mut buf = vec![0u8; (MAX_PAYLOAD_BYTES - 1) as usize];
        // One more byte is exactly at the cap: allowed.
        accumulate(&mut buf, &[0u8]).expect("exactly at the cap is allowed");
        // The next byte pushes over the cap: rejected.
        let err = accumulate(&mut buf, &[0u8]).expect_err("over the cap must be rejected");
        match err {
            Error::BadResponse(_) => {}
            other => panic!("wrong error: {other:?}"),
        }
    }

    #[test]
    fn fragmented_control_frame_is_rejected() {
        // A PING with FIN=0 is illegal: control frames must not be fragmented.
        let inbound = server_frame(false, OPCODE_PING, b"x");
        let mut s = MockStream::new(inbound);
        let err = read_message(&mut s, None).expect_err("fragmented control must be rejected");
        match err {
            Error::BadResponse(_) => {}
            other => panic!("wrong error: {other:?}"),
        }
    }

    #[test]
    fn oversized_control_frame_is_rejected() {
        // A PING with a 126-byte payload exceeds the 125-byte control cap.
        let inbound = server_frame(true, OPCODE_PING, &[0u8; 126]);
        let mut s = MockStream::new(inbound);
        let err = read_message(&mut s, None).expect_err("oversized control must be rejected");
        match err {
            Error::BadResponse(_) => {}
            other => panic!("wrong error: {other:?}"),
        }
    }

    #[test]
    fn new_data_frame_during_fragmentation_is_rejected() {
        // text(FIN=0) then a second text(FIN=1) without a continuation: the
        // peer must use opcode 0x0 to continue, so this is a protocol error.
        let mut inbound = server_frame(false, OPCODE_TEXT, b"a");
        inbound.extend(server_frame(true, OPCODE_TEXT, b"b"));
        let mut s = MockStream::new(inbound);
        let err =
            read_message(&mut s, None).expect_err("interleaved new data frame must be rejected");
        match err {
            Error::BadResponse(_) => {}
            other => panic!("wrong error: {other:?}"),
        }
    }

    #[test]
    fn lone_continuation_frame_is_rejected() {
        // A continuation frame with no message in progress is illegal.
        let inbound = server_frame(true, OPCODE_CONT, b"x");
        let mut s = MockStream::new(inbound);
        let err = read_message(&mut s, None).expect_err("lone continuation must be rejected");
        match err {
            Error::BadResponse(_) => {}
            other => panic!("wrong error: {other:?}"),
        }
    }

    #[test]
    fn read_data_and_close_returns_reassembled_payload_and_sends_close() {
        let mut inbound = server_frame(false, OPCODE_BINARY, &[1, 2, 3]);
        inbound.extend(server_frame(true, OPCODE_CONT, &[4, 5]));
        let mut s = MockStream::new(inbound);
        let payload = read_data_and_close(&mut s, None).expect("reads message");
        assert_eq!(payload, vec![1, 2, 3, 4, 5]);
        // A polite close should have been written.
        let sent = decode_sent(&s.sent);
        assert_eq!(sent.last().map(|f| f.0), Some(OPCODE_CLOSE));
    }

    // ─── permessage-deflate (RFC 7692) ──────────────────────────────────────

    /// Decode one client frame including its RSV1 bit: `(opcode, rsv1, payload)`.
    fn decode_first_frame_with_rsv1(sent: &[u8]) -> (u8, bool, Vec<u8>) {
        let opcode = sent[0] & 0x0F;
        let rsv1 = (sent[0] & 0x40) != 0;
        let masked = (sent[1] & 0x80) != 0;
        assert!(masked, "client frame must be masked");
        let len7 = sent[1] & 0x7F;
        let mut i = 2;
        let len = match len7 {
            0..=125 => len7 as usize,
            126 => {
                let l = u16::from_be_bytes([sent[i], sent[i + 1]]) as usize;
                i += 2;
                l
            }
            127 => {
                let mut b = [0u8; 8];
                b.copy_from_slice(&sent[i..i + 8]);
                i += 8;
                u64::from_be_bytes(b) as usize
            }
            _ => unreachable!(),
        };
        let mask = [sent[i], sent[i + 1], sent[i + 2], sent[i + 3]];
        i += 4;
        let mut payload = sent[i..i + len].to_vec();
        for (j, b) in payload.iter_mut().enumerate() {
            *b ^= mask[j & 3];
        }
        (opcode, rsv1, payload)
    }

    /// Inflate a permessage-deflate payload the way a server would: append the
    /// stripped terminator and run raw deflate.
    fn pmd_inflate(compressed: &[u8]) -> Vec<u8> {
        let mut input = compressed.to_vec();
        input.extend_from_slice(&DEFLATE_TAIL);
        let mut dec = Deflate::decoder();
        let mut out = Vec::new();
        let mut scratch = vec![0u8; 32 * 1024];
        let mut consumed = 0usize;
        loop {
            let before_c = consumed;
            let before_w = out.len();
            let (p, status) = dec
                .decode(&input[consumed..], &mut scratch)
                .expect("inflate");
            out.extend_from_slice(&scratch[..p.written]);
            consumed += p.consumed;
            match status {
                Status::StreamEnd => break,
                Status::OutputFull => continue,
                Status::InputEmpty => {
                    if consumed >= input.len() || (consumed == before_c && out.len() == before_w) {
                        break;
                    }
                }
            }
        }
        out
    }

    #[test]
    fn handshake_offers_permessage_deflate() {
        // The upgrade request must advertise a permessage-deflate offer. Drive
        // a handshake against a mock server that returns a valid 101 and see
        // what we wrote.
        struct Recorder {
            request: Vec<u8>,
            response: Cursor<Vec<u8>>,
        }
        impl Read for Recorder {
            fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
                self.response.read(buf)
            }
        }
        impl Write for Recorder {
            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
                self.request.extend_from_slice(buf);
                Ok(buf.len())
            }
            fn flush(&mut self) -> std::io::Result<()> {
                Ok(())
            }
        }
        // We can't predict the random key, so we can't precompute Accept; build
        // the response after observing the request. Easiest: run handshake
        // twice isn't possible on one stream, so instead capture the request
        // by sending a deliberately-wrong response and asserting on the bytes
        // we wrote before the Accept check fails.
        let resp = b"HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: wrong\r\n\r\n".to_vec();
        let mut rec = Recorder {
            request: Vec::new(),
            response: Cursor::new(resp),
        };
        let url = Url::parse("ws://example.com/chat").expect("url");
        let _ = handshake(&mut rec, &url); // Accept mismatch is fine here.
        let req = String::from_utf8(rec.request).expect("utf8 request");
        assert!(
            req.contains("Sec-WebSocket-Extensions: permessage-deflate"),
            "request must offer permessage-deflate, got:\n{req}"
        );
        assert!(req.contains("client_no_context_takeover"));
        assert!(req.contains("server_no_context_takeover"));
    }

    #[test]
    fn parse_pmd_response_enables_compression() {
        let pmd = parse_pmd_response("permessage-deflate; server_no_context_takeover")
            .expect("permessage-deflate accepted");
        assert!(pmd.server_no_context_takeover);
        assert!(!pmd.client_no_context_takeover);

        let pmd2 = parse_pmd_response(
            "permessage-deflate; client_no_context_takeover; server_no_context_takeover",
        )
        .expect("accepted with both flags");
        assert!(pmd2.client_no_context_takeover);
        assert!(pmd2.server_no_context_takeover);
    }

    #[test]
    fn parse_pmd_response_without_extension_is_none() {
        // No permessage-deflate token at all → compression stays off.
        assert!(parse_pmd_response("some-other-extension").is_none());
        assert!(parse_pmd_response("").is_none());
        // A different (unrelated) extension alongside is still no PMD.
        assert!(parse_pmd_response("foo; bar=1").is_none());
    }

    #[test]
    fn inflate_compressed_message_decodes_to_original() {
        // A known raw-DEFLATE payload (built with compcol, terminator stripped)
        // framed with RSV1 set must decode back to the original string.
        let original = b"the quick brown fox jumps over the lazy dog, the quick brown fox";
        let compressed = pmd_compress(original);
        assert!(
            compressed.len() < original.len(),
            "fixture should actually compress"
        );
        let inbound = server_frame_rsv1(true, OPCODE_TEXT, &compressed);
        let mut s = MockStream::new(inbound);
        let mut pmd = test_pmd();
        let msg = read_message(&mut s, Some(&mut pmd)).expect("decodes compressed message");
        assert_eq!(
            msg,
            Message::Data {
                opcode: OPCODE_TEXT,
                payload: original.to_vec(),
            }
        );
    }

    #[test]
    fn send_message_compressed_round_trips() {
        // send_message with compression on must produce an RSV1 frame whose
        // payload, once the terminator is appended and inflated, equals input.
        let input = b"hello hello hello permessage-deflate round trip";
        let mut s = MockStream::new(Vec::new());
        let mut pmd = test_pmd();
        send_message(&mut s, OPCODE_TEXT, input, Some(&mut pmd)).expect("sends compressed");
        let (opcode, rsv1, payload) = decode_first_frame_with_rsv1(&s.sent);
        assert_eq!(opcode, OPCODE_TEXT);
        assert!(rsv1, "compressed frame must have RSV1 set");
        assert_ne!(payload, input, "payload should be compressed, not raw");
        assert_eq!(pmd_inflate(&payload), input);
    }

    #[test]
    fn rsv1_without_negotiation_is_rejected() {
        // RSV1 set on a data frame but compression was never negotiated (pmd
        // is None) must be rejected as a protocol error.
        let inbound = server_frame_rsv1(true, OPCODE_TEXT, b"whatever");
        let mut s = MockStream::new(inbound);
        let err = read_message(&mut s, None).expect_err("RSV1 without PMD must be rejected");
        match err {
            Error::BadResponse(_) => {}
            other => panic!("wrong error: {other:?}"),
        }
    }

    #[test]
    fn rsv2_or_rsv3_always_rejected() {
        // RSV2 set (0x20) — always illegal, even with PMD negotiated.
        let mut inbound = server_frame(true, OPCODE_TEXT, b"x");
        inbound[0] |= 0x20;
        let mut s = MockStream::new(inbound);
        let mut pmd = test_pmd();
        let err = read_message(&mut s, Some(&mut pmd)).expect_err("RSV2 must be rejected");
        assert!(matches!(err, Error::BadResponse(_)));

        // RSV3 set (0x10) — likewise.
        let mut inbound = server_frame(true, OPCODE_TEXT, b"x");
        inbound[0] |= 0x10;
        let mut s = MockStream::new(inbound);
        let mut pmd = test_pmd();
        let err = read_message(&mut s, Some(&mut pmd)).expect_err("RSV3 must be rejected");
        assert!(matches!(err, Error::BadResponse(_)));
    }

    #[test]
    fn rsv1_on_control_frame_is_rejected() {
        // A PING with RSV1 set is illegal even when PMD is negotiated —
        // compression applies to data messages only.
        let inbound = server_frame_rsv1(true, OPCODE_PING, b"x");
        let mut s = MockStream::new(inbound);
        let mut pmd = test_pmd();
        let err =
            read_message(&mut s, Some(&mut pmd)).expect_err("RSV1 on control must be rejected");
        assert!(matches!(err, Error::BadResponse(_)));
    }

    #[test]
    fn compressed_bomb_exceeding_cap_is_rejected() {
        // A highly-compressible payload that inflates beyond MAX_PAYLOAD_BYTES
        // must be rejected by the bounded inflate, not materialised.
        let huge = vec![0u8; (MAX_PAYLOAD_BYTES + (1 << 20)) as usize];
        let compressed = pmd_compress(&huge);
        assert!(
            (compressed.len() as u64) < MAX_PAYLOAD_BYTES,
            "fixture must be much smaller than the cap"
        );
        let inbound = server_frame_rsv1(true, OPCODE_BINARY, &compressed);
        let mut s = MockStream::new(inbound);
        let mut pmd = test_pmd();
        let err =
            read_message(&mut s, Some(&mut pmd)).expect_err("compression bomb must be rejected");
        match err {
            Error::BadResponse(msg) => {
                assert!(msg.contains("permessage-deflate"), "got {msg:?}")
            }
            other => panic!("wrong error: {other:?}"),
        }
    }

    #[test]
    fn fragmented_compressed_message_reassembles_and_inflates() {
        // RSV1 on the first frame, payload split across a TEXT(FIN=0) +
        // CONTINUATION(FIN=1). The whole compressed blob is reassembled, then
        // inflated as one unit.
        let original = b"fragmented compressed payload, split across two frames on the wire";
        let compressed = pmd_compress(original);
        assert!(compressed.len() >= 4, "need enough bytes to split");
        let mid = compressed.len() / 2;
        let mut inbound = server_frame_rsv1(false, OPCODE_TEXT, &compressed[..mid]);
        inbound.extend(server_frame(true, OPCODE_CONT, &compressed[mid..]));
        let mut s = MockStream::new(inbound);
        let mut pmd = test_pmd();
        let msg = read_message(&mut s, Some(&mut pmd)).expect("reassembles + inflates");
        assert_eq!(
            msg,
            Message::Data {
                opcode: OPCODE_TEXT,
                payload: original.to_vec(),
            }
        );
    }

    #[test]
    fn rsv1_on_continuation_frame_is_rejected() {
        // RSV1 is only meaningful on a message's first frame; setting it on a
        // continuation is a protocol error.
        let original = b"continuation rsv1 should be rejected here";
        let compressed = pmd_compress(original);
        let mid = compressed.len() / 2;
        let mut inbound = server_frame_rsv1(false, OPCODE_TEXT, &compressed[..mid]);
        inbound.extend(server_frame_rsv1(true, OPCODE_CONT, &compressed[mid..]));
        let mut s = MockStream::new(inbound);
        let mut pmd = test_pmd();
        let err = read_message(&mut s, Some(&mut pmd))
            .expect_err("RSV1 on continuation must be rejected");
        assert!(matches!(err, Error::BadResponse(_)));
    }

    #[test]
    fn uncompressed_message_passes_through_when_pmd_negotiated() {
        // Even with PMD negotiated, a server may send an uncompressed message
        // (RSV1 clear). It must be returned verbatim, not run through inflate.
        let inbound = server_frame(true, OPCODE_TEXT, b"plain text, no rsv1");
        let mut s = MockStream::new(inbound);
        let mut pmd = test_pmd();
        let msg = read_message(&mut s, Some(&mut pmd)).expect("plain message");
        assert_eq!(
            msg,
            Message::Data {
                opcode: OPCODE_TEXT,
                payload: b"plain text, no rsv1".to_vec(),
            }
        );
    }
}