Skip to main content

nexus_web/ws/
frame_reader.rs

1use super::error::ProtocolError;
2use super::frame::{RawOpcode, Role};
3use super::mask::apply_mask;
4use super::message::{CloseCode, CloseFrame, Message};
5use nexus_net::buf::ReadBuf;
6
7/// Error from [`FrameReader::read`].
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum ReadError {
10    /// ReadBuf cannot accept the incoming bytes.
11    BufferFull {
12        /// Bytes the caller tried to write.
13        needed: usize,
14        /// Bytes available in spare region.
15        available: usize,
16    },
17}
18
19impl std::fmt::Display for ReadError {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        match self {
22            Self::BufferFull { needed, available } => {
23                write!(f, "buffer full: need {needed} bytes, {available} available")
24            }
25        }
26    }
27}
28
29impl std::error::Error for ReadError {}
30
31/// WebSocket frame reader — parses wire bytes into [`Message`]s.
32///
33/// Handles wire frame parsing, fragment assembly, control frame
34/// interleaving, masking, and UTF-8 validation. The user sees complete
35/// `Message` values — never raw frames or continuations.
36///
37/// # Usage
38///
39/// ```
40/// use nexus_web::ws::{FrameReader, Role, Message};
41///
42/// let mut reader = FrameReader::builder()
43///     .role(Role::Client)
44///     .buffer_capacity(65_536)
45///     .build();
46///
47/// // Feed wire bytes
48/// reader.read(&[0x81, 0x05, 0x48, 0x65, 0x6C, 0x6C, 0x6F]).unwrap();
49///
50/// // Parse messages
51/// match reader.next().unwrap().unwrap() {
52///     Message::Text(s) => assert_eq!(s, "Hello"),
53///     _ => panic!("expected text"),
54/// }
55/// ```
56pub struct FrameReader {
57    buf: ReadBuf,
58    msg_buf: Vec<u8>,
59    /// ReadBuf compaction trigger: compact when consumed bytes exceed this.
60    buf_compact_at: usize,
61
62    state: ParseState,
63    remaining_payload: usize,
64    mask_key: Option<[u8; 4]>,
65    mask_offset: u8,
66
67    assembling: bool,
68    assembly_opcode: Option<RawOpcode>,
69    utf8_valid_up_to: usize,
70
71    role: Role,
72    max_frame_size: u64,
73    max_message_size: usize,
74
75    /// Tracks what to clean up when next()/poll() is called again.
76    pending_cleanup: PendingCleanup,
77    /// Opcode of the pending message (set by poll, consumed by next).
78    pending_opcode: Option<RawOpcode>,
79    /// For control frames during assembly that span reads: the offset
80    /// in msg_buf where the control payload starts (after assembly data).
81    ctrl_payload_offset: usize,
82}
83
84/// What to clean up from the previously returned Message.
85#[derive(Clone, Copy, Default)]
86enum PendingCleanup {
87    #[default]
88    None,
89    /// Single-frame: advance ReadBuf past the payload.
90    AdvanceReadBuf(usize),
91    /// Assembled: clear msg_buf (and compact if oversized).
92    ClearMsgBuf,
93    /// Control frame during assembly: truncate msg_buf back to assembly data.
94    TruncateMsgBuf(usize),
95}
96
97#[derive(Clone, Copy, Default)]
98enum ParseState {
99    #[default]
100    Head,
101    /// Payload spans reads — always goes to msg_buf.
102    Payload { opcode: RawOpcode, fin: bool },
103}
104
105/// Builder for [`FrameReader`].
106pub struct FrameReaderBuilder {
107    buffer_capacity: usize,
108    pre_padding: usize,
109    post_padding: usize,
110    prealloc_capacity: usize,
111    compact_at: f64,
112    max_frame_size: u64,
113    max_message_size: usize,
114    role: Role,
115}
116
117impl FrameReader {
118    /// Create a builder.
119    #[must_use]
120    pub fn builder() -> FrameReaderBuilder {
121        FrameReaderBuilder {
122            buffer_capacity: 1024 * 1024,
123            pre_padding: 16,
124            post_padding: 4,
125            prealloc_capacity: 4096,
126            compact_at: 0.5,
127            max_frame_size: 16 * 1024 * 1024,
128            max_message_size: 16 * 1024 * 1024,
129            role: Role::Server,
130        }
131    }
132
133    /// Buffer wire bytes from a source.
134    pub fn read(&mut self, src: &[u8]) -> Result<(), ReadError> {
135        let mut spare = self.buf.spare();
136        if src.len() > spare.len() {
137            // Try compacting before giving up
138            self.buf.compact();
139            spare = self.buf.spare();
140            if src.len() > spare.len() {
141                return Err(ReadError::BufferFull {
142                    needed: src.len(),
143                    available: spare.len(),
144                });
145            }
146        }
147        spare[..src.len()].copy_from_slice(src);
148        self.buf.filled(src.len());
149        Ok(())
150    }
151
152    /// Read bytes from a source directly into the internal buffer.
153    ///
154    /// Convenience for `spare()` + `filled()`. Returns bytes read,
155    /// or 0 on EOF. Returns `Err` if the buffer is full after compaction
156    /// (indicates the buffer is undersized for the current message).
157    ///
158    /// ```ignore
159    /// let n = reader.read_from(&mut socket)?;
160    /// ```
161    pub fn read_from<R: std::io::Read>(&mut self, src: &mut R) -> std::io::Result<usize> {
162        let mut spare = self.buf.spare();
163        if spare.is_empty() {
164            // Reclaim consumed space (partial frame at end of buffer)
165            self.buf.compact();
166            spare = self.buf.spare();
167            if spare.is_empty() {
168                return Err(std::io::Error::other("frame reader buffer full"));
169            }
170        }
171        let n = src.read(spare)?;
172        self.buf.filled(n);
173        Ok(n)
174    }
175
176    /// Writable region for direct socket reads.
177    #[inline]
178    pub fn spare(&mut self) -> &mut [u8] {
179        self.buf.spare()
180    }
181
182    /// Commit bytes written into [`spare()`](Self::spare).
183    #[inline]
184    pub fn filled(&mut self, n: usize) {
185        self.buf.filled(n);
186    }
187
188    /// Reclaim consumed buffer space by moving unconsumed data to the front.
189    ///
190    /// Call when [`spare()`](Self::spare) is empty but there's still data to read.
191    /// This is O(n) in the amount of unconsumed data.
192    #[inline]
193    pub fn compact(&mut self) {
194        self.buf.compact();
195    }
196
197    /// Whether the ReadBuf should be compacted based on the configured threshold.
198    ///
199    /// Returns `true` when at least one byte has been consumed, consumed bytes
200    /// meet or exceed the threshold set by [`FrameReaderBuilder::compact_at`],
201    /// and there is unconsumed data to preserve.
202    /// Default threshold is 50% of buffer capacity.
203    #[inline]
204    pub fn should_compact(&self) -> bool {
205        let consumed = self.buf.consumed();
206        consumed > 0 && consumed >= self.buf_compact_at && !self.buf.is_empty()
207    }
208
209    /// Parse the next complete message.
210    #[inline]
211    #[allow(clippy::should_implement_trait)]
212    pub fn next(&mut self) -> Result<Option<Message<'_>>, ProtocolError> {
213        // If poll() already prepared a message, return it
214        if let Some(opcode) = self.pending_opcode.take() {
215            return self.make_message(opcode);
216        }
217
218        // Clean up from previously returned message
219        self.do_cleanup();
220
221        self.pump()?
222            .map_or(Ok(None), |opcode| self.make_message(opcode))
223    }
224
225    /// Advance the parser without constructing a Message.
226    /// Returns `true` if the next call to `next()` will return a message.
227    #[inline]
228    pub fn poll(&mut self) -> Result<bool, ProtocolError> {
229        if self.pending_opcode.is_some() {
230            return Ok(true);
231        }
232
233        self.do_cleanup();
234
235        match self.pump()? {
236            None => Ok(false),
237            Some(opcode) => {
238                self.pending_opcode = Some(opcode);
239                Ok(true)
240            }
241        }
242    }
243
244    /// Bytes of buffer space remaining.
245    #[inline]
246    pub fn remaining(&self) -> usize {
247        self.buf.remaining()
248    }
249
250    /// Bytes of unconsumed data in the buffer.
251    #[inline]
252    pub fn buffered(&self) -> usize {
253        self.buf.len()
254    }
255
256    /// Reset all state.
257    pub fn reset(&mut self) {
258        self.buf.clear();
259        self.msg_buf.clear();
260        self.state = ParseState::Head;
261        self.remaining_payload = 0;
262        self.mask_key = None;
263        self.mask_offset = 0;
264        self.assembling = false;
265        self.assembly_opcode = None;
266        self.utf8_valid_up_to = 0;
267        self.pending_cleanup = PendingCleanup::None;
268        self.pending_opcode = None;
269        self.ctrl_payload_offset = 0;
270    }
271
272    // =========================================================================
273    // Internals
274    // =========================================================================
275
276    /// Execute pending cleanup from the previously returned Message.
277    #[inline]
278    fn do_cleanup(&mut self) {
279        match self.pending_cleanup {
280            PendingCleanup::None => return,
281            PendingCleanup::AdvanceReadBuf(n) => {
282                self.buf.advance(n);
283            }
284            PendingCleanup::ClearMsgBuf => {
285                self.do_cleanup_msg_buf();
286            }
287            PendingCleanup::TruncateMsgBuf(len) => {
288                self.msg_buf.truncate(len);
289            }
290        }
291        self.pending_cleanup = PendingCleanup::None;
292    }
293
294    /// Cold path: clear msg_buf (multi-frame assembly buffer).
295    /// Capacity is retained — no allocation, no shrinking.
296    #[cold]
297    fn do_cleanup_msg_buf(&mut self) {
298        self.msg_buf.clear();
299    }
300
301    /// State machine: consume frames from ReadBuf.
302    /// Returns opcode of a completed message, or None if more bytes needed.
303    ///
304    /// For single-frame messages: payload stays in ReadBuf (zero-copy).
305    /// For assembled messages: payload accumulated in msg_buf.
306    #[inline]
307    fn pump(&mut self) -> Result<Option<RawOpcode>, ProtocolError> {
308        loop {
309            let state = self.state;
310            match state {
311                ParseState::Payload { opcode, fin } => {
312                    // Payload spans reads — goes to msg_buf
313                    let available = self.buf.len();
314                    if available == 0 {
315                        return Ok(None);
316                    }
317
318                    let take = available.min(self.remaining_payload);
319                    self.consume_partial_payload(take);
320
321                    if self.remaining_payload == 0 {
322                        self.state = ParseState::Head;
323                        if let Some(completed) = self.route_opcode(opcode, fin)? {
324                            if opcode.is_control() && self.assembling {
325                                // Control during assembly: payload appended after assembly data
326                                self.pending_cleanup =
327                                    PendingCleanup::TruncateMsgBuf(self.ctrl_payload_offset);
328                            } else {
329                                self.pending_cleanup = PendingCleanup::ClearMsgBuf;
330                            }
331                            return Ok(Some(completed));
332                        }
333                        continue;
334                    }
335                    return Ok(None);
336                }
337
338                ParseState::Head => {
339                    let data_len = self.buf.len();
340                    if data_len < 2 {
341                        return Ok(None);
342                    }
343
344                    let byte1 = self.buf.data()[1];
345                    let header_size = Self::header_size(byte1);
346                    if data_len < header_size {
347                        return Ok(None);
348                    }
349
350                    let parsed = {
351                        let data = self.buf.data();
352                        self.parse_header(&data[..header_size])?
353                    };
354
355                    let is_control = parsed.opcode.is_control();
356
357                    if !is_control {
358                        let total = self.msg_buf.len() + parsed.payload_len;
359                        if total > self.max_message_size {
360                            return Err(ProtocolError::MessageTooLarge {
361                                accumulated: total,
362                                max: self.max_message_size,
363                            });
364                        }
365                    }
366
367                    // Advance past header
368                    self.buf.advance(header_size);
369
370                    let available = self.buf.len();
371
372                    if available >= parsed.payload_len {
373                        // Full payload in ReadBuf
374                        let payload_len = parsed.payload_len;
375
376                        // Unmask in-place if needed
377                        if let Some(mask) = parsed.mask_key
378                            && payload_len > 0
379                        {
380                            let data = &mut self.buf.data_mut()[..payload_len];
381                            apply_mask(data, mask);
382                        }
383
384                        let is_single = parsed.fin && !self.assembling;
385
386                        if is_single || is_control {
387                            // ZERO-COPY: leave payload in ReadBuf, borrow directly.
388                            // Don't advance ReadBuf — cleanup will do it.
389                            if let Some(completed) = self.route_opcode(parsed.opcode, parsed.fin)? {
390                                self.pending_cleanup = PendingCleanup::AdvanceReadBuf(payload_len);
391                                return Ok(Some(completed));
392                            }
393                            // route_opcode returned None for a control frame? Shouldn't happen.
394                            // Advance and continue.
395                            self.buf.advance(payload_len);
396                            continue;
397                        }
398
399                        // Assembly path: copy to msg_buf, advance ReadBuf
400                        let data = &self.buf.data()[..payload_len];
401                        self.msg_buf.extend_from_slice(data);
402                        self.buf.advance(payload_len);
403
404                        if let Some(completed) = self.route_opcode(parsed.opcode, parsed.fin)? {
405                            self.pending_cleanup = PendingCleanup::ClearMsgBuf;
406                            return Ok(Some(completed));
407                        }
408                        continue;
409                    }
410
411                    // Partial payload — goes to msg_buf
412                    self.remaining_payload = parsed.payload_len;
413                    self.mask_key = parsed.mask_key;
414                    self.mask_offset = 0;
415
416                    // Track where control payload starts during assembly
417                    if parsed.opcode.is_control() && self.assembling {
418                        self.ctrl_payload_offset = self.msg_buf.len();
419                    }
420
421                    if available > 0 {
422                        self.consume_partial_payload(available);
423                    }
424
425                    self.state = ParseState::Payload {
426                        opcode: parsed.opcode,
427                        fin: parsed.fin,
428                    };
429                    return Ok(None);
430                }
431            }
432        }
433    }
434
435    /// Route a completed frame. Returns the opcode to surface as a
436    /// Message, or None if the frame was consumed internally (assembly).
437    #[inline(always)]
438    fn route_opcode(
439        &mut self,
440        opcode: RawOpcode,
441        fin: bool,
442    ) -> Result<Option<RawOpcode>, ProtocolError> {
443        if opcode.is_control() {
444            return Ok(Some(opcode));
445        }
446
447        match opcode {
448            RawOpcode::Text | RawOpcode::Binary => {
449                if self.assembling {
450                    return Err(ProtocolError::NewMessageDuringAssembly);
451                }
452                if fin {
453                    return Ok(Some(opcode));
454                }
455                // Start assembly — payload already in msg_buf
456                self.assembling = true;
457                self.assembly_opcode = Some(opcode);
458                self.utf8_valid_up_to = 0;
459                if opcode == RawOpcode::Text {
460                    let pending = validate_utf8_incremental(&self.msg_buf, false)?;
461                    self.utf8_valid_up_to = self.msg_buf.len() - pending as usize;
462                }
463                Ok(None)
464            }
465            RawOpcode::Continuation => {
466                if !self.assembling {
467                    return Err(ProtocolError::ContinuationWithoutStart);
468                }
469                if self.assembly_opcode == Some(RawOpcode::Text) {
470                    let to_check = &self.msg_buf[self.utf8_valid_up_to..];
471                    let pending = validate_utf8_incremental(to_check, fin)?;
472                    self.utf8_valid_up_to = self.msg_buf.len() - pending as usize;
473                }
474                if fin {
475                    self.assembling = false;
476                    let opcode = self
477                        .assembly_opcode
478                        .take()
479                        .expect("assembly_opcode must be Some when assembling is true");
480                    self.utf8_valid_up_to = 0;
481                    return Ok(Some(opcode));
482                }
483                Ok(None)
484            }
485            _ => unreachable!(),
486        }
487    }
488
489    /// Construct a Message. For zero-copy: borrows from ReadBuf.
490    /// For assembled: borrows from msg_buf.
491    #[inline(always)]
492    fn make_message(&self, opcode: RawOpcode) -> Result<Option<Message<'_>>, ProtocolError> {
493        let payload = match self.pending_cleanup {
494            PendingCleanup::AdvanceReadBuf(n) => &self.buf.data()[..n],
495            PendingCleanup::TruncateMsgBuf(offset) => &self.msg_buf[offset..],
496            PendingCleanup::ClearMsgBuf | PendingCleanup::None => &self.msg_buf[..],
497        };
498
499        match opcode {
500            RawOpcode::Ping => Ok(Some(Message::Ping(payload))),
501            RawOpcode::Pong => Ok(Some(Message::Pong(payload))),
502            RawOpcode::Close => Self::parse_close_from(payload),
503            RawOpcode::Text => {
504                let s = match self.pending_cleanup {
505                    PendingCleanup::ClearMsgBuf => {
506                        // SAFETY: Every byte in msg_buf was validated via
507                        // validate_utf8_incremental() in route_opcode():
508                        //   1. Initial text frame: validated on entry (line 435)
509                        //   2. Each continuation: validated on append (line 447)
510                        //   3. Final frame (fin=true): validated with is_final=true
511                        //      which rejects incomplete codepoints at the boundary
512                        // No bytes enter msg_buf without passing through this
513                        // validation chain. Re-validating here would waste cycles
514                        // on the hot path (~5-20 cycles for 128B via simdutf8).
515                        unsafe { std::str::from_utf8_unchecked(payload) }
516                    }
517                    _ => {
518                        // Single-frame zero-copy: first and only validation.
519                        simdutf8::basic::from_utf8(payload)
520                            .map_err(|_| ProtocolError::InvalidUtf8)?
521                    }
522                };
523                Ok(Some(Message::Text(s)))
524            }
525            RawOpcode::Binary => Ok(Some(Message::Binary(payload))),
526            RawOpcode::Continuation => unreachable!("pump never returns Continuation"),
527        }
528    }
529
530    #[inline]
531    fn header_size(byte1: u8) -> usize {
532        let masked = byte1 & 0x80 != 0;
533        let len_code = byte1 & 0x7F;
534        let base = match len_code {
535            0..=125 => 2,
536            126 => 4,
537            _ => 10,
538        };
539        if masked { base + 4 } else { base }
540    }
541
542    #[inline]
543    fn parse_header(&self, header: &[u8]) -> Result<ParsedHeader, ProtocolError> {
544        let byte0 = header[0];
545        let byte1 = header[1];
546        let fin = byte0 & 0x80 != 0;
547        let rsv = (byte0 >> 4) & 0x07;
548        let opcode_raw = byte0 & 0x0F;
549        let masked = byte1 & 0x80 != 0;
550        let len_code = byte1 & 0x7F;
551
552        if rsv != 0 {
553            return Err(ProtocolError::ReservedBitsSet { bits: rsv });
554        }
555
556        let opcode =
557            RawOpcode::from_u8(opcode_raw).ok_or(ProtocolError::InvalidOpcode(opcode_raw))?;
558
559        match self.role {
560            Role::Server if !masked => return Err(ProtocolError::UnmaskedFrameFromClient),
561            Role::Client if masked => return Err(ProtocolError::MaskedFrameFromServer),
562            _ => {}
563        }
564
565        let (payload_len, mask_offset) = match len_code {
566            0..=125 => (u64::from(len_code), 2),
567            126 => {
568                let len = u16::from_be_bytes([header[2], header[3]]);
569                (u64::from(len), 4)
570            }
571            _ => {
572                let len = u64::from_be_bytes(
573                    header[2..10]
574                        .try_into()
575                        .expect("64-bit length field is 8 bytes"),
576                );
577                (len, 10)
578            }
579        };
580
581        if opcode.is_control() {
582            if payload_len > 125 {
583                return Err(ProtocolError::ControlFrameTooLarge { size: payload_len });
584            }
585            if !fin {
586                return Err(ProtocolError::FragmentedControlFrame);
587            }
588        }
589
590        if payload_len > self.max_frame_size {
591            return Err(ProtocolError::PayloadTooLarge {
592                size: payload_len,
593                max: self.max_frame_size,
594            });
595        }
596
597        let mask_key = if masked {
598            Some([
599                header[mask_offset],
600                header[mask_offset + 1],
601                header[mask_offset + 2],
602                header[mask_offset + 3],
603            ])
604        } else {
605            None
606        };
607
608        let payload_len =
609            usize::try_from(payload_len).map_err(|_| ProtocolError::PayloadTooLarge {
610                size: payload_len,
611                max: self.max_frame_size,
612            })?;
613
614        Ok(ParsedHeader {
615            fin,
616            opcode,
617            mask_key,
618            payload_len,
619        })
620    }
621
622    /// Consume partial payload from ReadBuf → msg_buf (for frames spanning reads).
623    #[cold]
624    fn consume_partial_payload(&mut self, n: usize) {
625        if n == 0 {
626            return;
627        }
628        if let Some(key) = self.mask_key {
629            let data = &mut self.buf.data_mut()[..n];
630            let offset = self.mask_offset as usize;
631            let rotated = [
632                key[offset % 4],
633                key[(offset + 1) % 4],
634                key[(offset + 2) % 4],
635                key[(offset + 3) % 4],
636            ];
637            apply_mask(data, rotated);
638            self.mask_offset = ((offset + n) % 4) as u8;
639        }
640        let data = &self.buf.data()[..n];
641        self.msg_buf.extend_from_slice(data);
642        self.buf.advance(n);
643        self.remaining_payload -= n;
644    }
645
646    #[cold]
647    fn parse_close_from(buf: &[u8]) -> Result<Option<Message<'_>>, ProtocolError> {
648        if buf.is_empty() {
649            return Ok(Some(Message::Close(CloseFrame {
650                code: CloseCode::NoStatus,
651                reason: "",
652            })));
653        }
654        if buf.len() == 1 {
655            return Err(ProtocolError::CloseFrameTooShort);
656        }
657        let raw_code = u16::from_be_bytes([buf[0], buf[1]]);
658        let code = CloseCode::from_u16(raw_code)?;
659        let reason_bytes = &buf[2..];
660        let reason = simdutf8::compat::from_utf8(reason_bytes)
661            .map_err(|_| ProtocolError::InvalidUtf8InCloseReason)?;
662        Ok(Some(Message::Close(CloseFrame { code, reason })))
663    }
664}
665
666struct ParsedHeader {
667    fin: bool,
668    opcode: RawOpcode,
669    mask_key: Option<[u8; 4]>,
670    payload_len: usize,
671}
672
673impl std::fmt::Debug for FrameReader {
674    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
675        f.debug_struct("FrameReader")
676            .field("buffered", &self.buf.len())
677            .field("remaining", &self.buf.remaining())
678            .field("assembling", &self.assembling)
679            .field("role", &self.role)
680            .finish()
681    }
682}
683
684/// Validate UTF-8 incrementally. Returns the number of trailing bytes
685/// that might be an incomplete codepoint (0-3).
686///
687/// On `is_final=true`, no trailing bytes are allowed — the entire
688/// buffer must be valid UTF-8.
689fn validate_utf8_incremental(data: &[u8], is_final: bool) -> Result<u8, ProtocolError> {
690    if data.is_empty() {
691        return Ok(0);
692    }
693
694    if is_final {
695        simdutf8::compat::from_utf8(data).map_err(|_| ProtocolError::InvalidUtf8)?;
696        return Ok(0);
697    }
698
699    match simdutf8::compat::from_utf8(data) {
700        Ok(_) => Ok(0),
701        Err(e) => {
702            let valid_up_to = e.valid_up_to();
703            if e.error_len().is_some() {
704                // Definitively invalid byte sequence
705                return Err(ProtocolError::InvalidUtf8);
706            }
707            // error_len() is None → incomplete sequence at the end
708            let pending = data.len() - valid_up_to;
709            if pending > 3 {
710                return Err(ProtocolError::InvalidUtf8);
711            }
712            Ok(pending as u8)
713        }
714    }
715}
716
717/// Lets a [`WireStream`](crate::WireStream) feed bytes directly into
718/// the FrameReader's spare region — one fewer copy than going through
719/// a slice intermediary.
720impl crate::ParserSink for FrameReader {
721    #[inline]
722    fn spare(&mut self) -> &mut [u8] {
723        FrameReader::spare(self)
724    }
725
726    #[inline]
727    fn filled(&mut self, n: usize) {
728        FrameReader::filled(self, n);
729    }
730}
731
732impl FrameReaderBuilder {
733    /// ReadBuf capacity. Default: 1MB.
734    #[must_use]
735    pub fn buffer_capacity(mut self, n: usize) -> Self {
736        self.buffer_capacity = n;
737        self
738    }
739
740    /// ReadBuf pre-padding. Default: 16.
741    #[must_use]
742    pub fn pre_padding(mut self, n: usize) -> Self {
743        self.pre_padding = n;
744        self
745    }
746
747    /// ReadBuf post-padding. Default: 4.
748    #[must_use]
749    pub fn post_padding(mut self, n: usize) -> Self {
750        self.post_padding = n;
751        self
752    }
753
754    /// Pre-allocate message assembly buffer. Default: 4KB.
755    #[must_use]
756    pub fn message_capacity(mut self, n: usize) -> Self {
757        self.prealloc_capacity = n;
758        self
759    }
760
761    /// Fraction of buffer capacity consumed before proactive compaction.
762    ///
763    /// When the read head has advanced past this fraction of the buffer,
764    /// [`should_compact()`](FrameReader::should_compact) returns `true`.
765    /// This spreads compaction cost across messages instead of concentrating
766    /// it in a single stall when the buffer runs out of spare room.
767    ///
768    /// - `1.0`: never proactively compact — only when spare is empty.
769    /// - `0.5` (default): compact when half the buffer has been consumed.
770    /// - `0.0`: compact on every recv after the first byte is consumed
771    ///   (degenerate — not useful in practice).
772    ///
773    /// Lower values reduce tail latency at the cost of more frequent (but smaller)
774    /// memmoves.
775    #[must_use]
776    pub fn compact_at(mut self, fraction: f64) -> Self {
777        assert!(
778            (0.0..=1.0).contains(&fraction),
779            "compact_at fraction must be in 0.0..=1.0, got {fraction}"
780        );
781        self.compact_at = fraction;
782        self
783    }
784
785    /// Maximum single frame payload. Default: 16MB.
786    #[must_use]
787    pub fn max_frame_size(mut self, n: u64) -> Self {
788        self.max_frame_size = n;
789        self
790    }
791
792    /// Maximum assembled message size. Default: 16MB.
793    #[must_use]
794    pub fn max_message_size(mut self, n: usize) -> Self {
795        self.max_message_size = n;
796        self
797    }
798
799    /// Connection role. Default: Server.
800    #[must_use]
801    pub fn role(mut self, r: Role) -> Self {
802        self.role = r;
803        self
804    }
805
806    /// Build the reader.
807    #[must_use]
808    pub fn build(self) -> FrameReader {
809        let buf_compact_at = if self.compact_at >= 1.0 {
810            usize::MAX
811        } else if self.compact_at <= 0.0 {
812            0
813        } else {
814            (self.buffer_capacity as f64 * self.compact_at).ceil() as usize
815        };
816        FrameReader {
817            buf: ReadBuf::new(self.buffer_capacity, self.pre_padding, self.post_padding),
818            msg_buf: Vec::with_capacity(self.prealloc_capacity),
819            buf_compact_at,
820            state: ParseState::Head,
821            remaining_payload: 0,
822            mask_key: None,
823            mask_offset: 0,
824            assembling: false,
825            assembly_opcode: None,
826            utf8_valid_up_to: 0,
827            role: self.role,
828            max_frame_size: self.max_frame_size,
829            max_message_size: self.max_message_size,
830            pending_cleanup: PendingCleanup::None,
831            pending_opcode: None,
832            ctrl_payload_offset: 0,
833        }
834    }
835}
836
837#[cfg(test)]
838mod tests {
839    use super::*;
840
841    fn make_frame(fin: bool, opcode: u8, payload: &[u8]) -> Vec<u8> {
842        let mut frame = Vec::new();
843        let byte0 = if fin { 0x80 } else { 0x00 } | opcode;
844        frame.push(byte0);
845        if payload.len() <= 125 {
846            frame.push(payload.len() as u8);
847        } else if payload.len() <= 65535 {
848            frame.push(126);
849            frame.extend_from_slice(&(payload.len() as u16).to_be_bytes());
850        } else {
851            frame.push(127);
852            frame.extend_from_slice(&(payload.len() as u64).to_be_bytes());
853        }
854        frame.extend_from_slice(payload);
855        frame
856    }
857
858    fn make_masked_frame(fin: bool, opcode: u8, payload: &[u8], mask: [u8; 4]) -> Vec<u8> {
859        let mut frame = Vec::new();
860        let byte0 = if fin { 0x80 } else { 0x00 } | opcode;
861        frame.push(byte0);
862        let len_byte = if payload.len() <= 125 {
863            payload.len() as u8
864        } else if payload.len() <= 65535 {
865            126
866        } else {
867            127
868        };
869        frame.push(0x80 | len_byte);
870        if payload.len() > 125 && payload.len() <= 65535 {
871            frame.extend_from_slice(&(payload.len() as u16).to_be_bytes());
872        } else if payload.len() > 65535 {
873            frame.extend_from_slice(&(payload.len() as u64).to_be_bytes());
874        }
875        frame.extend_from_slice(&mask);
876        let mut masked = payload.to_vec();
877        apply_mask(&mut masked, mask);
878        frame.extend_from_slice(&masked);
879        frame
880    }
881
882    fn client_reader() -> FrameReader {
883        FrameReader::builder().role(Role::Client).build()
884    }
885
886    fn server_reader() -> FrameReader {
887        FrameReader::builder().role(Role::Server).build()
888    }
889
890    // === Single frame ===
891
892    #[test]
893    fn text_message() {
894        let mut r = client_reader();
895        r.read(&make_frame(true, 0x1, b"Hello")).unwrap();
896        match r.next().unwrap().unwrap() {
897            Message::Text(s) => assert_eq!(s, "Hello"),
898            other => panic!("expected Text, got {other:?}"),
899        }
900    }
901
902    #[test]
903    fn binary_message() {
904        let mut r = client_reader();
905        r.read(&make_frame(true, 0x2, &[0xDE, 0xAD])).unwrap();
906        match r.next().unwrap().unwrap() {
907            Message::Binary(b) => assert_eq!(b, &[0xDE, 0xAD]),
908            other => panic!("expected Binary, got {other:?}"),
909        }
910    }
911
912    #[test]
913    fn empty_text() {
914        let mut r = client_reader();
915        r.read(&make_frame(true, 0x1, b"")).unwrap();
916        match r.next().unwrap().unwrap() {
917            Message::Text(s) => assert_eq!(s, ""),
918            other => panic!("expected empty Text, got {other:?}"),
919        }
920    }
921
922    #[test]
923    fn masked_text() {
924        let mut r = server_reader();
925        let mask = [0x37, 0xFA, 0x21, 0x3D];
926        r.read(&make_masked_frame(true, 0x1, b"Hello", mask))
927            .unwrap();
928        match r.next().unwrap().unwrap() {
929            Message::Text(s) => assert_eq!(s, "Hello"),
930            other => panic!("expected Text, got {other:?}"),
931        }
932    }
933
934    // === Fragment assembly ===
935
936    #[test]
937    fn two_fragments() {
938        let mut r = client_reader();
939        r.read(&make_frame(false, 0x1, b"Hel")).unwrap();
940        r.read(&make_frame(true, 0x0, b"lo")).unwrap();
941        // Both frames buffered — pump assembles in one next() call
942        match r.next().unwrap().unwrap() {
943            Message::Text(s) => assert_eq!(s, "Hello"),
944            other => panic!("expected Text, got {other:?}"),
945        }
946    }
947
948    #[test]
949    fn three_binary_fragments() {
950        let mut r = client_reader();
951        r.read(&make_frame(false, 0x2, b"AB")).unwrap();
952        r.read(&make_frame(false, 0x0, b"CD")).unwrap();
953        r.read(&make_frame(true, 0x0, b"EF")).unwrap();
954        // All three frames buffered — assembles in one next()
955        match r.next().unwrap().unwrap() {
956            Message::Binary(b) => assert_eq!(b, b"ABCDEF"),
957            other => panic!("expected Binary, got {other:?}"),
958        }
959    }
960
961    // === Control frames during assembly ===
962
963    #[test]
964    fn ping_during_assembly() {
965        let mut r = client_reader();
966        r.read(&make_frame(false, 0x1, b"Hel")).unwrap();
967        r.read(&make_frame(true, 0x9, b"ping")).unwrap();
968        r.read(&make_frame(true, 0x0, b"lo")).unwrap();
969
970        // Ping is interleaved — returned first
971        match r.next().unwrap().unwrap() {
972            Message::Ping(p) => assert_eq!(p, b"ping"),
973            other => panic!("expected Ping, got {other:?}"),
974        }
975        // Then the assembled text
976        match r.next().unwrap().unwrap() {
977            Message::Text(s) => assert_eq!(s, "Hello"),
978            other => panic!("expected Text, got {other:?}"),
979        }
980    }
981
982    // === Close frames ===
983
984    #[test]
985    fn close_with_code_and_reason() {
986        let mut r = client_reader();
987        let mut payload = vec![];
988        payload.extend_from_slice(&1000u16.to_be_bytes());
989        payload.extend_from_slice(b"goodbye");
990        r.read(&make_frame(true, 0x8, &payload)).unwrap();
991        match r.next().unwrap().unwrap() {
992            Message::Close(cf) => {
993                assert_eq!(cf.code, CloseCode::Normal);
994                assert_eq!(cf.reason, "goodbye");
995            }
996            other => panic!("expected Close, got {other:?}"),
997        }
998    }
999
1000    #[test]
1001    fn close_no_body() {
1002        let mut r = client_reader();
1003        r.read(&make_frame(true, 0x8, b"")).unwrap();
1004        match r.next().unwrap().unwrap() {
1005            Message::Close(cf) => {
1006                assert_eq!(cf.code, CloseCode::NoStatus);
1007                assert_eq!(cf.reason, "");
1008            }
1009            other => panic!("expected Close, got {other:?}"),
1010        }
1011    }
1012
1013    #[test]
1014    fn close_code_only() {
1015        let mut r = client_reader();
1016        r.read(&make_frame(true, 0x8, &1001u16.to_be_bytes()))
1017            .unwrap();
1018        match r.next().unwrap().unwrap() {
1019            Message::Close(cf) => {
1020                assert_eq!(cf.code, CloseCode::GoingAway);
1021                assert_eq!(cf.reason, "");
1022            }
1023            other => panic!("expected Close, got {other:?}"),
1024        }
1025    }
1026
1027    #[test]
1028    fn close_invalid_code() {
1029        let mut r = client_reader();
1030        r.read(&make_frame(true, 0x8, &999u16.to_be_bytes()))
1031            .unwrap();
1032        assert!(matches!(
1033            r.next(),
1034            Err(ProtocolError::InvalidCloseCode(999))
1035        ));
1036    }
1037
1038    #[test]
1039    fn close_invalid_utf8_reason() {
1040        let mut r = client_reader();
1041        let mut payload = vec![];
1042        payload.extend_from_slice(&1000u16.to_be_bytes());
1043        payload.extend_from_slice(&[0xFF, 0xFE]); // invalid UTF-8
1044        r.read(&make_frame(true, 0x8, &payload)).unwrap();
1045        assert!(matches!(
1046            r.next(),
1047            Err(ProtocolError::InvalidUtf8InCloseReason)
1048        ));
1049    }
1050
1051    #[test]
1052    fn close_too_short() {
1053        let mut r = client_reader();
1054        r.read(&make_frame(true, 0x8, &[0x03])).unwrap(); // 1 byte
1055        assert!(matches!(r.next(), Err(ProtocolError::CloseFrameTooShort)));
1056    }
1057
1058    // === UTF-8 validation ===
1059
1060    #[test]
1061    fn invalid_utf8_text() {
1062        let mut r = client_reader();
1063        r.read(&make_frame(true, 0x1, &[0xFF, 0xFE])).unwrap();
1064        assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1065    }
1066
1067    #[test]
1068    fn multibyte_utf8_across_fragments() {
1069        let mut r = client_reader();
1070        // "é" is [0xC3, 0xA9] — split across two fragments
1071        r.read(&make_frame(false, 0x1, &[0xC3])).unwrap();
1072        r.read(&make_frame(true, 0x0, &[0xA9])).unwrap();
1073        // Both buffered — assembles in one next()
1074        match r.next().unwrap().unwrap() {
1075            Message::Text(s) => assert_eq!(s, "é"),
1076            other => panic!("expected Text, got {other:?}"),
1077        }
1078    }
1079
1080    // === Partial delivery ===
1081
1082    #[test]
1083    fn partial_header() {
1084        let mut r = client_reader();
1085        let frame = make_frame(true, 0x1, b"Hello");
1086        r.read(&frame[..1]).unwrap();
1087        assert!(r.next().unwrap().is_none());
1088        r.read(&frame[1..]).unwrap();
1089        assert!(matches!(r.next().unwrap().unwrap(), Message::Text("Hello")));
1090    }
1091
1092    #[test]
1093    fn payload_spans_reads() {
1094        let mut r = client_reader();
1095        let frame = make_frame(true, 0x1, b"Hello, World!");
1096        r.read(&frame[..7]).unwrap();
1097        assert!(r.next().unwrap().is_none());
1098        r.read(&frame[7..]).unwrap();
1099        assert!(matches!(
1100            r.next().unwrap().unwrap(),
1101            Message::Text("Hello, World!")
1102        ));
1103    }
1104
1105    // === Multiple messages ===
1106
1107    #[test]
1108    fn two_messages_one_read() {
1109        let mut r = client_reader();
1110        let mut data = make_frame(true, 0x1, b"one");
1111        data.extend_from_slice(&make_frame(true, 0x1, b"two"));
1112        r.read(&data).unwrap();
1113
1114        assert!(matches!(r.next().unwrap().unwrap(), Message::Text("one")));
1115        assert!(matches!(r.next().unwrap().unwrap(), Message::Text("two")));
1116    }
1117
1118    // === Protocol errors ===
1119
1120    #[test]
1121    fn invalid_opcode() {
1122        let mut r = client_reader();
1123        // Opcode 0x3 is undefined in RFC 6455
1124        r.read(&make_frame(true, 0x3, b"x")).unwrap();
1125        assert!(matches!(r.next(), Err(ProtocolError::InvalidOpcode(0x3))));
1126    }
1127
1128    #[test]
1129    fn invalid_opcode_0x0f() {
1130        let mut r = client_reader();
1131        // Opcode 0xF is also undefined
1132        r.read(&make_frame(true, 0xF, b"x")).unwrap();
1133        assert!(matches!(r.next(), Err(ProtocolError::InvalidOpcode(0xF))));
1134    }
1135
1136    #[test]
1137    fn payload_too_large() {
1138        let mut r = FrameReader::builder()
1139            .role(Role::Client)
1140            .max_frame_size(64)
1141            .buffer_capacity(256)
1142            .build();
1143        r.read(&make_frame(true, 0x1, &[b'x'; 100])).unwrap();
1144        assert!(matches!(
1145            r.next(),
1146            Err(ProtocolError::PayloadTooLarge { size: 100, max: 64 })
1147        ));
1148    }
1149
1150    #[test]
1151    fn masked_from_server() {
1152        let mut r = client_reader();
1153        r.read(&make_masked_frame(true, 0x1, b"x", [1, 2, 3, 4]))
1154            .unwrap();
1155        assert!(matches!(
1156            r.next(),
1157            Err(ProtocolError::MaskedFrameFromServer)
1158        ));
1159    }
1160
1161    #[test]
1162    fn unmasked_from_client() {
1163        let mut r = server_reader();
1164        r.read(&make_frame(true, 0x1, b"x")).unwrap();
1165        assert!(matches!(
1166            r.next(),
1167            Err(ProtocolError::UnmaskedFrameFromClient)
1168        ));
1169    }
1170
1171    #[test]
1172    fn reserved_bits() {
1173        let mut r = client_reader();
1174        let mut frame = make_frame(true, 0x1, b"x");
1175        frame[0] |= 0x40;
1176        r.read(&frame).unwrap();
1177        assert!(matches!(
1178            r.next(),
1179            Err(ProtocolError::ReservedBitsSet { .. })
1180        ));
1181    }
1182
1183    #[test]
1184    fn continuation_without_start() {
1185        let mut r = client_reader();
1186        r.read(&make_frame(true, 0x0, b"orphan")).unwrap();
1187        assert!(matches!(
1188            r.next(),
1189            Err(ProtocolError::ContinuationWithoutStart)
1190        ));
1191    }
1192
1193    #[test]
1194    fn new_message_during_assembly() {
1195        let mut r = client_reader();
1196        r.read(&make_frame(false, 0x1, b"start")).unwrap();
1197        r.read(&make_frame(true, 0x1, b"new")).unwrap();
1198        // pump() encounters the error during assembly
1199        assert!(matches!(
1200            r.next(),
1201            Err(ProtocolError::NewMessageDuringAssembly)
1202        ));
1203    }
1204
1205    #[test]
1206    fn message_too_large() {
1207        let mut r = FrameReader::builder()
1208            .role(Role::Client)
1209            .max_message_size(10)
1210            .build();
1211        r.read(&make_frame(true, 0x1, b"way too long!!")).unwrap();
1212        assert!(matches!(
1213            r.next(),
1214            Err(ProtocolError::MessageTooLarge { .. })
1215        ));
1216    }
1217
1218    #[test]
1219    fn control_frame_too_large() {
1220        let mut r = client_reader();
1221        r.read(&make_frame(true, 0x9, &[0; 126])).unwrap();
1222        assert!(matches!(
1223            r.next(),
1224            Err(ProtocolError::ControlFrameTooLarge { .. })
1225        ));
1226    }
1227
1228    #[test]
1229    fn fragmented_control() {
1230        let mut r = client_reader();
1231        r.read(&make_frame(false, 0x9, b"ping")).unwrap();
1232        assert!(matches!(
1233            r.next(),
1234            Err(ProtocolError::FragmentedControlFrame)
1235        ));
1236    }
1237
1238    // === into_owned ===
1239
1240    #[test]
1241    fn message_into_owned() {
1242        let mut r = client_reader();
1243        r.read(&make_frame(true, 0x1, b"owned")).unwrap();
1244        let msg = r.next().unwrap().unwrap();
1245        let owned = msg.into_owned();
1246        assert!(matches!(owned, super::super::message::OwnedMessage::Text(s) if s == "owned"));
1247    }
1248
1249    // === Buffer full ===
1250
1251    #[test]
1252    fn buffer_full() {
1253        let mut r = FrameReader::builder()
1254            .role(Role::Client)
1255            .buffer_capacity(16)
1256            .build();
1257        assert!(matches!(
1258            r.read(&[0; 32]),
1259            Err(ReadError::BufferFull { .. })
1260        ));
1261    }
1262
1263    // === Reset ===
1264
1265    #[test]
1266    fn reset_then_new_message() {
1267        let mut r = client_reader();
1268        r.read(&make_frame(false, 0x1, b"partial")).unwrap();
1269        let _ = r.next();
1270        r.reset();
1271        assert_eq!(r.buffered(), 0);
1272        // After reset, accepts new messages cleanly
1273        r.read(&make_frame(true, 0x1, b"fresh")).unwrap();
1274        assert!(matches!(r.next().unwrap().unwrap(), Message::Text("fresh")));
1275    }
1276
1277    // === spare/filled direct I/O ===
1278
1279    #[test]
1280    fn spare_filled_path() {
1281        let mut r = client_reader();
1282        let frame = make_frame(true, 0x1, b"direct");
1283        let spare = r.spare();
1284        spare[..frame.len()].copy_from_slice(&frame);
1285        r.filled(frame.len());
1286        assert!(matches!(
1287            r.next().unwrap().unwrap(),
1288            Message::Text("direct")
1289        ));
1290    }
1291
1292    // === Masked payload spanning reads (#8) ===
1293
1294    #[test]
1295    fn masked_payload_spans_reads() {
1296        let mut r = server_reader();
1297        let mask = [0x37, 0xFA, 0x21, 0x3D];
1298        let frame = make_masked_frame(true, 0x1, b"Hello, World!", mask);
1299        // Split mid-payload: 2 header + 4 mask + 4 payload bytes
1300        let split = 10;
1301        r.read(&frame[..split]).unwrap();
1302        assert!(r.next().unwrap().is_none());
1303        r.read(&frame[split..]).unwrap();
1304        assert!(matches!(
1305            r.next().unwrap().unwrap(),
1306            Message::Text("Hello, World!")
1307        ));
1308    }
1309
1310    // === Multiple control frames during assembly (#9) ===
1311
1312    #[test]
1313    fn multiple_controls_during_assembly() {
1314        let mut r = client_reader();
1315        r.read(&make_frame(false, 0x1, b"Hel")).unwrap();
1316        r.read(&make_frame(true, 0x9, b"ping1")).unwrap();
1317        r.read(&make_frame(true, 0xA, b"pong1")).unwrap();
1318        r.read(&make_frame(true, 0x0, b"lo")).unwrap();
1319
1320        match r.next().unwrap().unwrap() {
1321            Message::Ping(p) => assert_eq!(p, b"ping1"),
1322            other => panic!("expected Ping, got {other:?}"),
1323        }
1324        match r.next().unwrap().unwrap() {
1325            Message::Pong(p) => assert_eq!(p, b"pong1"),
1326            other => panic!("expected Pong, got {other:?}"),
1327        }
1328        match r.next().unwrap().unwrap() {
1329            Message::Text(s) => assert_eq!(s, "Hello"),
1330            other => panic!("expected Text, got {other:?}"),
1331        }
1332    }
1333
1334    // === msg_buf clear retains capacity (#10) ===
1335
1336    #[test]
1337    fn msg_buf_clear_retains_capacity() {
1338        let mut r = FrameReader::builder()
1339            .role(Role::Client)
1340            .message_capacity(64)
1341            .buffer_capacity(128 * 1024)
1342            .max_frame_size(128 * 1024)
1343            .max_message_size(128 * 1024)
1344            .build();
1345
1346        let big_payload = vec![0x42; 512];
1347        r.read(&make_frame(false, 0x2, &big_payload[..256]))
1348            .unwrap();
1349        r.read(&make_frame(true, 0x0, &big_payload[256..])).unwrap();
1350
1351        let msg = r.next().unwrap().unwrap();
1352        assert!(matches!(&msg, Message::Binary(b) if b.len() == 512));
1353        let _ = msg;
1354
1355        // Next call triggers cleanup — msg_buf cleared but capacity retained.
1356        // No reallocation: buffer stays warm for the next continuation set.
1357        assert!(r.next().unwrap().is_none());
1358        assert!(r.msg_buf.capacity() >= 512);
1359        assert!(r.msg_buf.is_empty());
1360    }
1361
1362    // === 64-bit payload length (#11) ===
1363
1364    #[test]
1365    fn extended_64bit_length() {
1366        let mut r = FrameReader::builder()
1367            .role(Role::Client)
1368            .buffer_capacity(128 * 1024)
1369            .max_frame_size(128 * 1024)
1370            .max_message_size(128 * 1024)
1371            .build();
1372
1373        let payload = vec![0x42; 70_000];
1374        let frame = make_frame(true, 0x2, &payload);
1375        r.read(&frame).unwrap();
1376        match r.next().unwrap().unwrap() {
1377            Message::Binary(b) => assert_eq!(b.len(), 70_000),
1378            other => panic!("expected Binary, got {other:?}"),
1379        }
1380    }
1381
1382    // === Buffer full with diagnostics (#5) ===
1383
1384    #[test]
1385    fn buffer_full_diagnostics() {
1386        let mut r = FrameReader::builder()
1387            .role(Role::Client)
1388            .buffer_capacity(16)
1389            .build();
1390        match r.read(&[0; 32]) {
1391            Err(ReadError::BufferFull { needed, available }) => {
1392                assert_eq!(needed, 32);
1393                assert_eq!(available, 16);
1394            }
1395            other => panic!("expected BufferFull, got {other:?}"),
1396        }
1397    }
1398
1399    // === Autobahn regression tests ===
1400
1401    /// Autobahn 7.9.4: Close code 1005 must be rejected on the wire.
1402    #[test]
1403    fn close_code_1005_rejected_on_wire() {
1404        let mut r = client_reader();
1405        r.read(&make_frame(true, 0x8, &1005u16.to_be_bytes()))
1406            .unwrap();
1407        assert!(matches!(
1408            r.next(),
1409            Err(ProtocolError::InvalidCloseCode(1005))
1410        ));
1411    }
1412
1413    /// Autobahn 6.4.1: Invalid UTF-8 split across fragments.
1414    #[test]
1415    fn invalid_utf8_across_fragments() {
1416        let mut r = client_reader();
1417        r.read(&make_frame(false, 0x1, b"valid")).unwrap();
1418        r.read(&make_frame(true, 0x0, &[0xFF])).unwrap();
1419        assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1420    }
1421
1422    /// Autobahn 6.4.2: Valid UTF-8 in first fragment, invalid continuation.
1423    #[test]
1424    fn invalid_utf8_in_continuation() {
1425        let mut r = client_reader();
1426        r.read(&make_frame(false, 0x1, &[0xCE, 0xBA])).unwrap(); // valid "κ"
1427        r.read(&make_frame(false, 0x0, &[0xE1, 0xBD])).unwrap(); // incomplete 3-byte
1428        r.read(&make_frame(true, 0x0, &[0xFF])).unwrap(); // invalid continuation byte
1429        assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1430    }
1431
1432    /// Autobahn 1.1.6: 65535-byte text (16-bit length boundary).
1433    #[test]
1434    fn text_65535_bytes() {
1435        let mut r = FrameReader::builder()
1436            .role(Role::Client)
1437            .buffer_capacity(128 * 1024)
1438            .max_message_size(128 * 1024)
1439            .build();
1440        let payload = vec![b'x'; 65535];
1441        r.read(&make_frame(true, 0x1, &payload)).unwrap();
1442        match r.next().unwrap().unwrap() {
1443            Message::Text(s) => assert_eq!(s.len(), 65535),
1444            other => panic!("expected Text, got {other:?}"),
1445        }
1446    }
1447
1448    /// Autobahn 1.1.7: 65536-byte text (crosses into 64-bit length encoding).
1449    #[test]
1450    fn text_65536_bytes() {
1451        let mut r = FrameReader::builder()
1452            .role(Role::Client)
1453            .buffer_capacity(128 * 1024)
1454            .max_message_size(128 * 1024)
1455            .build();
1456        let payload = vec![b'x'; 65536];
1457        r.read(&make_frame(true, 0x1, &payload)).unwrap();
1458        match r.next().unwrap().unwrap() {
1459            Message::Text(s) => assert_eq!(s.len(), 65536),
1460            other => panic!("expected Text, got {other:?}"),
1461        }
1462    }
1463
1464    // === Incremental UTF-8 validation ===
1465
1466    /// Invalid UTF-8 detected on first non-final Text fragment.
1467    #[test]
1468    fn invalid_utf8_detected_on_first_fragment() {
1469        let mut r = client_reader();
1470        r.read(&make_frame(false, 0x1, &[0xFF, 0xFE])).unwrap();
1471        assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1472    }
1473
1474    /// Invalid UTF-8 detected on continuation (before final).
1475    #[test]
1476    fn invalid_utf8_detected_mid_assembly() {
1477        let mut r = client_reader();
1478        r.read(&make_frame(false, 0x1, b"valid")).unwrap();
1479        r.read(&make_frame(false, 0x0, &[0xFF])).unwrap();
1480        // Should fail immediately, not wait for final fragment
1481        assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1482    }
1483
1484    /// Multi-byte codepoint split across two fragments is OK.
1485    #[test]
1486    fn split_codepoint_across_fragments() {
1487        let mut r = client_reader();
1488        // "é" = [0xC3, 0xA9]
1489        r.read(&make_frame(false, 0x1, &[0xC3])).unwrap();
1490        r.read(&make_frame(true, 0x0, &[0xA9])).unwrap();
1491        match r.next().unwrap().unwrap() {
1492            Message::Text(s) => assert_eq!(s, "é"),
1493            other => panic!("expected Text, got {other:?}"),
1494        }
1495    }
1496
1497    /// 4-byte codepoint split 1+3 across fragments.
1498    #[test]
1499    fn split_4byte_codepoint() {
1500        let mut r = client_reader();
1501        // U+1F600 (😀) = [0xF0, 0x9F, 0x98, 0x80]
1502        r.read(&make_frame(false, 0x1, &[0xF0])).unwrap();
1503        r.read(&make_frame(true, 0x0, &[0x9F, 0x98, 0x80])).unwrap();
1504        match r.next().unwrap().unwrap() {
1505            Message::Text(s) => assert_eq!(s, "😀"),
1506            other => panic!("expected Text, got {other:?}"),
1507        }
1508    }
1509
1510    /// Incomplete codepoint at end of final fragment is invalid.
1511    #[test]
1512    fn incomplete_codepoint_at_end() {
1513        let mut r = client_reader();
1514        // Start of 2-byte sequence [0xC3] but message ends
1515        r.read(&make_frame(true, 0x1, &[0xC3])).unwrap();
1516        assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1517    }
1518
1519    /// Binary fragments are NOT UTF-8 validated.
1520    #[test]
1521    fn binary_fragments_skip_utf8() {
1522        let mut r = client_reader();
1523        r.read(&make_frame(false, 0x2, &[0xFF, 0xFE])).unwrap();
1524        r.read(&make_frame(true, 0x0, &[0xFD])).unwrap();
1525        match r.next().unwrap().unwrap() {
1526            Message::Binary(b) => assert_eq!(b, &[0xFF, 0xFE, 0xFD]),
1527            other => panic!("expected Binary, got {other:?}"),
1528        }
1529    }
1530
1531    /// Three fragments with valid UTF-8 split at boundaries.
1532    #[test]
1533    fn three_fragments_valid_utf8() {
1534        let mut r = client_reader();
1535        // "Héllo" = [72, 0xC3, 0xA9, 108, 108, 111]
1536        // Split: "H" + [0xC3] | [0xA9] + "ll" | "o"
1537        r.read(&make_frame(false, 0x1, &[72, 0xC3])).unwrap();
1538        r.read(&make_frame(false, 0x0, &[0xA9, 108, 108])).unwrap();
1539        r.read(&make_frame(true, 0x0, &[111])).unwrap();
1540        match r.next().unwrap().unwrap() {
1541            Message::Text(s) => assert_eq!(s, "Héllo"),
1542            other => panic!("expected Text, got {other:?}"),
1543        }
1544    }
1545
1546    // === FIFO ordering tests ===
1547
1548    fn assert_text(result: Result<Option<Message<'_>>, ProtocolError>, expected: &str) {
1549        match result.unwrap().unwrap() {
1550            Message::Text(s) => assert_eq!(s, expected),
1551            other => panic!("expected Text({expected:?}), got {other:?}"),
1552        }
1553    }
1554
1555    fn assert_binary(result: Result<Option<Message<'_>>, ProtocolError>, expected: &[u8]) {
1556        match result.unwrap().unwrap() {
1557            Message::Binary(b) => assert_eq!(b, expected),
1558            other => panic!("expected Binary, got {other:?}"),
1559        }
1560    }
1561
1562    fn assert_ping(result: Result<Option<Message<'_>>, ProtocolError>, expected: &[u8]) {
1563        match result.unwrap().unwrap() {
1564            Message::Ping(b) => assert_eq!(b, expected),
1565            other => panic!("expected Ping, got {other:?}"),
1566        }
1567    }
1568
1569    fn assert_pong(result: Result<Option<Message<'_>>, ProtocolError>, expected: &[u8]) {
1570        match result.unwrap().unwrap() {
1571            Message::Pong(b) => assert_eq!(b, expected),
1572            other => panic!("expected Pong, got {other:?}"),
1573        }
1574    }
1575
1576    #[test]
1577    fn fifo_three_texts_one_read() {
1578        let mut r = client_reader();
1579        let mut data = make_frame(true, 0x1, b"first");
1580        data.extend(&make_frame(true, 0x1, b"second"));
1581        data.extend(&make_frame(true, 0x1, b"third"));
1582        r.read(&data).unwrap();
1583        assert_text(r.next(), "first");
1584        assert_text(r.next(), "second");
1585        assert_text(r.next(), "third");
1586    }
1587
1588    #[test]
1589    fn fifo_mixed_text_binary() {
1590        let mut r = client_reader();
1591        let mut data = make_frame(true, 0x1, b"text1");
1592        data.extend(&make_frame(true, 0x2, &[0x01]));
1593        data.extend(&make_frame(true, 0x1, b"text2"));
1594        data.extend(&make_frame(true, 0x2, &[0x02]));
1595        r.read(&data).unwrap();
1596        assert_text(r.next(), "text1");
1597        assert_binary(r.next(), &[0x01]);
1598        assert_text(r.next(), "text2");
1599        assert_binary(r.next(), &[0x02]);
1600    }
1601
1602    #[test]
1603    fn fifo_single_assembled_single() {
1604        let mut r = client_reader();
1605        let mut data = make_frame(true, 0x1, b"before");
1606        data.extend(&make_frame(false, 0x1, b"frag"));
1607        data.extend(&make_frame(true, 0x0, b"mented"));
1608        data.extend(&make_frame(true, 0x1, b"after"));
1609        r.read(&data).unwrap();
1610        assert_text(r.next(), "before");
1611        assert_text(r.next(), "fragmented");
1612        assert_text(r.next(), "after");
1613    }
1614
1615    #[test]
1616    fn fifo_assembled_then_single() {
1617        let mut r = client_reader();
1618        let mut data = make_frame(false, 0x2, &[0xAA]);
1619        data.extend(&make_frame(true, 0x0, &[0xBB]));
1620        data.extend(&make_frame(true, 0x1, b"after"));
1621        r.read(&data).unwrap();
1622        assert_binary(r.next(), &[0xAA, 0xBB]);
1623        assert_text(r.next(), "after");
1624    }
1625
1626    #[test]
1627    fn fifo_data_ping_data() {
1628        let mut r = client_reader();
1629        let mut data = make_frame(true, 0x1, b"msg1");
1630        data.extend(&make_frame(true, 0x9, b"ping"));
1631        data.extend(&make_frame(true, 0x1, b"msg2"));
1632        r.read(&data).unwrap();
1633        assert_text(r.next(), "msg1");
1634        assert_ping(r.next(), b"ping");
1635        assert_text(r.next(), "msg2");
1636    }
1637
1638    #[test]
1639    fn fifo_assembly_with_control_then_data() {
1640        let mut r = client_reader();
1641        let mut data = make_frame(false, 0x1, b"hel");
1642        data.extend(&make_frame(true, 0x9, b"ping"));
1643        data.extend(&make_frame(true, 0x0, b"lo"));
1644        data.extend(&make_frame(true, 0x1, b"next"));
1645        r.read(&data).unwrap();
1646        assert_ping(r.next(), b"ping");
1647        assert_text(r.next(), "hello");
1648        assert_text(r.next(), "next");
1649    }
1650
1651    #[test]
1652    fn fifo_assembly_with_multiple_controls() {
1653        let mut r = client_reader();
1654        let mut data = make_frame(false, 0x2, &[0x01]);
1655        data.extend(&make_frame(true, 0x9, b"p1"));
1656        data.extend(&make_frame(true, 0xA, b"p2"));
1657        data.extend(&make_frame(true, 0x0, &[0x02]));
1658        data.extend(&make_frame(true, 0x1, b"after"));
1659        r.read(&data).unwrap();
1660        assert_ping(r.next(), b"p1");
1661        assert_pong(r.next(), b"p2");
1662        assert_binary(r.next(), &[0x01, 0x02]);
1663        assert_text(r.next(), "after");
1664    }
1665
1666    #[test]
1667    fn fifo_across_reads() {
1668        let mut r = client_reader();
1669        let frame1 = make_frame(true, 0x1, b"first");
1670        let frame2 = make_frame(true, 0x1, b"second");
1671        r.read(&frame1).unwrap();
1672        assert_text(r.next(), "first");
1673        r.read(&frame2).unwrap();
1674        assert_text(r.next(), "second");
1675    }
1676
1677    #[test]
1678    fn fifo_partial_then_complete() {
1679        let mut r = client_reader();
1680        let frame1 = make_frame(true, 0x1, b"first");
1681        let frame2 = make_frame(true, 0x1, b"second");
1682        let mut all = frame1;
1683        all.extend(&frame2);
1684        r.read(&all[..3]).unwrap();
1685        assert!(r.next().unwrap().is_none());
1686        r.read(&all[3..]).unwrap();
1687        assert_text(r.next(), "first");
1688        assert_text(r.next(), "second");
1689    }
1690
1691    #[test]
1692    fn fifo_100_messages_one_read() {
1693        let mut r = FrameReader::builder()
1694            .role(Role::Client)
1695            .buffer_capacity(256 * 1024)
1696            .build();
1697
1698        let mut data = Vec::new();
1699        for i in 0u32..100 {
1700            let payload = i.to_be_bytes();
1701            data.extend(&make_frame(true, 0x2, &payload));
1702        }
1703        r.read(&data).unwrap();
1704
1705        for i in 0u32..100 {
1706            match r.next().unwrap().unwrap() {
1707                Message::Binary(b) => {
1708                    let val = u32::from_be_bytes(b.try_into().unwrap());
1709                    assert_eq!(val, i, "message {i} out of order");
1710                }
1711                other => panic!("expected Binary, got {other:?}"),
1712            }
1713        }
1714        assert!(r.next().unwrap().is_none());
1715    }
1716
1717    // =========================================================================
1718    // should_compact() edge cases
1719    // =========================================================================
1720
1721    #[test]
1722    fn should_compact_default_half() {
1723        let mut r = FrameReader::builder()
1724            .buffer_capacity(1024)
1725            .role(Role::Client)
1726            .build();
1727        // Nothing consumed yet — should not compact.
1728        assert!(!r.should_compact());
1729
1730        // Feed two frames. Consume the first, then call poll() to trigger
1731        // deferred cleanup (ReadBuf advance). The second frame keeps data
1732        // in the buffer so head doesn't auto-reset.
1733        let mut data = make_frame(true, 0x2, &[0xAA; 600]);
1734        data.extend_from_slice(&make_frame(true, 0x2, &[0xBB; 10]));
1735        r.read(&data).unwrap();
1736        assert!(r.next().unwrap().is_some());
1737        // Trigger deferred cleanup — advances head past first frame.
1738        let _ = r.poll().unwrap();
1739        // consumed ~604 > 512 (50% of 1024) → should compact.
1740        assert!(r.should_compact());
1741    }
1742
1743    #[test]
1744    fn should_compact_at_one_never_triggers() {
1745        let mut r = FrameReader::builder()
1746            .buffer_capacity(256)
1747            .compact_at(1.0)
1748            .role(Role::Client)
1749            .build();
1750        // Consume nearly all the buffer.
1751        let frame = make_frame(true, 0x2, &[0xBB; 200]);
1752        r.read(&frame).unwrap();
1753        let _ = r.next().unwrap();
1754        // compact_at(1.0) → buf_compact_at = usize::MAX, never triggers.
1755        assert!(!r.should_compact());
1756    }
1757
1758    #[test]
1759    fn should_compact_at_zero() {
1760        let mut r = FrameReader::builder()
1761            .buffer_capacity(256)
1762            .compact_at(0.0)
1763            .role(Role::Client)
1764            .build();
1765        // Nothing consumed — should NOT compact even with threshold 0.
1766        assert!(!r.should_compact());
1767
1768        // Feed two frames, consume the first, trigger deferred cleanup.
1769        let mut data = make_frame(true, 0x2, &[0xCC; 10]);
1770        data.extend_from_slice(&make_frame(true, 0x2, &[0xDD; 5]));
1771        r.read(&data).unwrap();
1772        assert!(r.next().unwrap().is_some());
1773        let _ = r.poll().unwrap(); // deferred advance
1774        // Now consumed > 0 and threshold is 0 — should compact.
1775        assert!(r.should_compact());
1776    }
1777
1778    #[test]
1779    fn should_compact_small_buffer_small_fraction() {
1780        // buffer_capacity=64, compact_at=0.1 → ceil(6.4) = 7
1781        let mut r = FrameReader::builder()
1782            .buffer_capacity(64)
1783            .compact_at(0.1)
1784            .role(Role::Client)
1785            .build();
1786        assert!(!r.should_compact());
1787
1788        // Feed two small frames, consume the first, trigger deferred cleanup.
1789        let mut data = make_frame(true, 0x2, &[0xDD; 10]);
1790        data.extend_from_slice(&make_frame(true, 0x2, &[0xEE; 5]));
1791        r.read(&data).unwrap();
1792        assert!(r.next().unwrap().is_some());
1793        let _ = r.poll().unwrap(); // deferred advance
1794        // consumed (12) >= 7 (ceil threshold) → should compact.
1795        assert!(r.should_compact());
1796    }
1797}