Skip to main content

nexus_net/ws/
frame_reader.rs

1use super::error::ProtocolError;
2use super::frame::{RawOpcode, Role};
3use super::mask::apply_mask;
4use super::message::{CloseCode, CloseFrame, Message};
5use crate::buf::ReadBuf;
6
7/// Error from [`FrameReader::read`].
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum ReadError {
10    /// ReadBuf cannot accept the incoming bytes.
11    BufferFull {
12        /// Bytes the caller tried to write.
13        needed: usize,
14        /// Bytes available in spare region.
15        available: usize,
16    },
17}
18
19impl std::fmt::Display for ReadError {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        match self {
22            Self::BufferFull { needed, available } => {
23                write!(f, "buffer full: need {needed} bytes, {available} available")
24            }
25        }
26    }
27}
28
29impl std::error::Error for ReadError {}
30
31/// WebSocket frame reader — parses wire bytes into [`Message`]s.
32///
33/// Handles wire frame parsing, fragment assembly, control frame
34/// interleaving, masking, and UTF-8 validation. The user sees complete
35/// `Message` values — never raw frames or continuations.
36///
37/// # Usage
38///
39/// ```
40/// use nexus_net::ws::{FrameReader, Role, Message};
41///
42/// let mut reader = FrameReader::builder()
43///     .role(Role::Client)
44///     .buffer_capacity(65_536)
45///     .build();
46///
47/// // Feed wire bytes
48/// reader.read(&[0x81, 0x05, 0x48, 0x65, 0x6C, 0x6C, 0x6F]).unwrap();
49///
50/// // Parse messages
51/// match reader.next().unwrap().unwrap() {
52///     Message::Text(s) => assert_eq!(s, "Hello"),
53///     _ => panic!("expected text"),
54/// }
55/// ```
56pub struct FrameReader {
57    buf: ReadBuf,
58    msg_buf: Vec<u8>,
59    /// ReadBuf compaction trigger: compact when consumed bytes exceed this.
60    buf_compact_at: usize,
61
62    state: ParseState,
63    remaining_payload: usize,
64    mask_key: Option<[u8; 4]>,
65    mask_offset: u8,
66
67    assembling: bool,
68    assembly_opcode: Option<RawOpcode>,
69    utf8_valid_up_to: usize,
70
71    role: Role,
72    max_frame_size: u64,
73    max_message_size: usize,
74
75    /// Tracks what to clean up when next()/poll() is called again.
76    pending_cleanup: PendingCleanup,
77    /// Opcode of the pending message (set by poll, consumed by next).
78    pending_opcode: Option<RawOpcode>,
79    /// For control frames during assembly that span reads: the offset
80    /// in msg_buf where the control payload starts (after assembly data).
81    ctrl_payload_offset: usize,
82}
83
84/// What to clean up from the previously returned Message.
85#[derive(Clone, Copy, Default)]
86enum PendingCleanup {
87    #[default]
88    None,
89    /// Single-frame: advance ReadBuf past the payload.
90    AdvanceReadBuf(usize),
91    /// Assembled: clear msg_buf (and compact if oversized).
92    ClearMsgBuf,
93    /// Control frame during assembly: truncate msg_buf back to assembly data.
94    TruncateMsgBuf(usize),
95}
96
97#[derive(Clone, Copy, Default)]
98enum ParseState {
99    #[default]
100    Head,
101    /// Payload spans reads — always goes to msg_buf.
102    Payload { opcode: RawOpcode, fin: bool },
103}
104
105/// Builder for [`FrameReader`].
106pub struct FrameReaderBuilder {
107    buffer_capacity: usize,
108    pre_padding: usize,
109    post_padding: usize,
110    prealloc_capacity: usize,
111    compact_at: f64,
112    max_frame_size: u64,
113    max_message_size: usize,
114    role: Role,
115}
116
117impl FrameReader {
118    /// Create a builder.
119    #[must_use]
120    pub fn builder() -> FrameReaderBuilder {
121        FrameReaderBuilder {
122            buffer_capacity: 1024 * 1024,
123            pre_padding: 16,
124            post_padding: 4,
125            prealloc_capacity: 4096,
126            compact_at: 0.5,
127            max_frame_size: 16 * 1024 * 1024,
128            max_message_size: 16 * 1024 * 1024,
129            role: Role::Server,
130        }
131    }
132
133    /// Buffer wire bytes from a source.
134    pub fn read(&mut self, src: &[u8]) -> Result<(), ReadError> {
135        let mut spare = self.buf.spare();
136        if src.len() > spare.len() {
137            // Try compacting before giving up
138            self.buf.compact();
139            spare = self.buf.spare();
140            if src.len() > spare.len() {
141                return Err(ReadError::BufferFull {
142                    needed: src.len(),
143                    available: spare.len(),
144                });
145            }
146        }
147        spare[..src.len()].copy_from_slice(src);
148        self.buf.filled(src.len());
149        Ok(())
150    }
151
152    /// Read bytes from a source directly into the internal buffer.
153    ///
154    /// Convenience for `spare()` + `filled()`. Returns bytes read,
155    /// or 0 on EOF. Returns `Err` if the buffer is full after compaction
156    /// (indicates the buffer is undersized for the current message).
157    ///
158    /// ```ignore
159    /// let n = reader.read_from(&mut socket)?;
160    /// ```
161    pub fn read_from<R: std::io::Read>(&mut self, src: &mut R) -> std::io::Result<usize> {
162        let mut spare = self.buf.spare();
163        if spare.is_empty() {
164            // Reclaim consumed space (partial frame at end of buffer)
165            self.buf.compact();
166            spare = self.buf.spare();
167            if spare.is_empty() {
168                return Err(std::io::Error::other("frame reader buffer full"));
169            }
170        }
171        let n = src.read(spare)?;
172        self.buf.filled(n);
173        Ok(n)
174    }
175
176    /// Writable region for direct socket reads.
177    #[inline]
178    pub fn spare(&mut self) -> &mut [u8] {
179        self.buf.spare()
180    }
181
182    /// Commit bytes written into [`spare()`](Self::spare).
183    #[inline]
184    pub fn filled(&mut self, n: usize) {
185        self.buf.filled(n);
186    }
187
188    /// Reclaim consumed buffer space by moving unconsumed data to the front.
189    ///
190    /// Call when [`spare()`](Self::spare) is empty but there's still data to read.
191    /// This is O(n) in the amount of unconsumed data.
192    #[inline]
193    pub fn compact(&mut self) {
194        self.buf.compact();
195    }
196
197    /// Whether the ReadBuf should be compacted based on the configured threshold.
198    ///
199    /// Returns `true` when at least one byte has been consumed, consumed bytes
200    /// meet or exceed the threshold set by [`FrameReaderBuilder::compact_at`],
201    /// and there is unconsumed data to preserve.
202    /// Default threshold is 50% of buffer capacity.
203    #[inline]
204    pub fn should_compact(&self) -> bool {
205        let consumed = self.buf.consumed();
206        consumed > 0 && consumed >= self.buf_compact_at && !self.buf.is_empty()
207    }
208
209    /// Parse the next complete message.
210    #[inline]
211    #[allow(clippy::should_implement_trait)]
212    pub fn next(&mut self) -> Result<Option<Message<'_>>, ProtocolError> {
213        // If poll() already prepared a message, return it
214        if let Some(opcode) = self.pending_opcode.take() {
215            return self.make_message(opcode);
216        }
217
218        // Clean up from previously returned message
219        self.do_cleanup();
220
221        self.pump()?
222            .map_or(Ok(None), |opcode| self.make_message(opcode))
223    }
224
225    /// Advance the parser without constructing a Message.
226    /// Returns `true` if the next call to `next()` will return a message.
227    #[inline]
228    pub fn poll(&mut self) -> Result<bool, ProtocolError> {
229        if self.pending_opcode.is_some() {
230            return Ok(true);
231        }
232
233        self.do_cleanup();
234
235        match self.pump()? {
236            None => Ok(false),
237            Some(opcode) => {
238                self.pending_opcode = Some(opcode);
239                Ok(true)
240            }
241        }
242    }
243
244    /// Bytes of buffer space remaining.
245    #[inline]
246    pub fn remaining(&self) -> usize {
247        self.buf.remaining()
248    }
249
250    /// Bytes of unconsumed data in the buffer.
251    #[inline]
252    pub fn buffered(&self) -> usize {
253        self.buf.len()
254    }
255
256    /// Reset all state.
257    pub fn reset(&mut self) {
258        self.buf.clear();
259        self.msg_buf.clear();
260        self.state = ParseState::Head;
261        self.remaining_payload = 0;
262        self.mask_key = None;
263        self.mask_offset = 0;
264        self.assembling = false;
265        self.assembly_opcode = None;
266        self.utf8_valid_up_to = 0;
267        self.pending_cleanup = PendingCleanup::None;
268        self.pending_opcode = None;
269        self.ctrl_payload_offset = 0;
270    }
271
272    // =========================================================================
273    // Internals
274    // =========================================================================
275
276    /// Execute pending cleanup from the previously returned Message.
277    #[inline]
278    fn do_cleanup(&mut self) {
279        match self.pending_cleanup {
280            PendingCleanup::None => return,
281            PendingCleanup::AdvanceReadBuf(n) => {
282                self.buf.advance(n);
283            }
284            PendingCleanup::ClearMsgBuf => {
285                self.do_cleanup_msg_buf();
286            }
287            PendingCleanup::TruncateMsgBuf(len) => {
288                self.msg_buf.truncate(len);
289            }
290        }
291        self.pending_cleanup = PendingCleanup::None;
292    }
293
294    /// Cold path: clear msg_buf (multi-frame assembly buffer).
295    /// Capacity is retained — no allocation, no shrinking.
296    #[cold]
297    fn do_cleanup_msg_buf(&mut self) {
298        self.msg_buf.clear();
299    }
300
301    /// State machine: consume frames from ReadBuf.
302    /// Returns opcode of a completed message, or None if more bytes needed.
303    ///
304    /// For single-frame messages: payload stays in ReadBuf (zero-copy).
305    /// For assembled messages: payload accumulated in msg_buf.
306    #[inline]
307    fn pump(&mut self) -> Result<Option<RawOpcode>, ProtocolError> {
308        loop {
309            let state = self.state;
310            match state {
311                ParseState::Payload { opcode, fin } => {
312                    // Payload spans reads — goes to msg_buf
313                    let available = self.buf.len();
314                    if available == 0 {
315                        return Ok(None);
316                    }
317
318                    let take = available.min(self.remaining_payload);
319                    self.consume_partial_payload(take);
320
321                    if self.remaining_payload == 0 {
322                        self.state = ParseState::Head;
323                        if let Some(completed) = self.route_opcode(opcode, fin)? {
324                            if opcode.is_control() && self.assembling {
325                                // Control during assembly: payload appended after assembly data
326                                self.pending_cleanup =
327                                    PendingCleanup::TruncateMsgBuf(self.ctrl_payload_offset);
328                            } else {
329                                self.pending_cleanup = PendingCleanup::ClearMsgBuf;
330                            }
331                            return Ok(Some(completed));
332                        }
333                        continue;
334                    }
335                    return Ok(None);
336                }
337
338                ParseState::Head => {
339                    let data_len = self.buf.len();
340                    if data_len < 2 {
341                        return Ok(None);
342                    }
343
344                    let byte1 = self.buf.data()[1];
345                    let header_size = Self::header_size(byte1);
346                    if data_len < header_size {
347                        return Ok(None);
348                    }
349
350                    let parsed = {
351                        let data = self.buf.data();
352                        self.parse_header(&data[..header_size])?
353                    };
354
355                    let is_control = parsed.opcode.is_control();
356
357                    if !is_control {
358                        let total = self.msg_buf.len() + parsed.payload_len;
359                        if total > self.max_message_size {
360                            return Err(ProtocolError::MessageTooLarge {
361                                accumulated: total,
362                                max: self.max_message_size,
363                            });
364                        }
365                    }
366
367                    // Advance past header
368                    self.buf.advance(header_size);
369
370                    let available = self.buf.len();
371
372                    if available >= parsed.payload_len {
373                        // Full payload in ReadBuf
374                        let payload_len = parsed.payload_len;
375
376                        // Unmask in-place if needed
377                        if let Some(mask) = parsed.mask_key {
378                            if payload_len > 0 {
379                                let data = &mut self.buf.data_mut()[..payload_len];
380                                apply_mask(data, mask);
381                            }
382                        }
383
384                        let is_single = parsed.fin && !self.assembling;
385
386                        if is_single || is_control {
387                            // ZERO-COPY: leave payload in ReadBuf, borrow directly.
388                            // Don't advance ReadBuf — cleanup will do it.
389                            if let Some(completed) = self.route_opcode(parsed.opcode, parsed.fin)? {
390                                self.pending_cleanup = PendingCleanup::AdvanceReadBuf(payload_len);
391                                return Ok(Some(completed));
392                            }
393                            // route_opcode returned None for a control frame? Shouldn't happen.
394                            // Advance and continue.
395                            self.buf.advance(payload_len);
396                            continue;
397                        }
398
399                        // Assembly path: copy to msg_buf, advance ReadBuf
400                        let data = &self.buf.data()[..payload_len];
401                        self.msg_buf.extend_from_slice(data);
402                        self.buf.advance(payload_len);
403
404                        if let Some(completed) = self.route_opcode(parsed.opcode, parsed.fin)? {
405                            self.pending_cleanup = PendingCleanup::ClearMsgBuf;
406                            return Ok(Some(completed));
407                        }
408                        continue;
409                    }
410
411                    // Partial payload — goes to msg_buf
412                    self.remaining_payload = parsed.payload_len;
413                    self.mask_key = parsed.mask_key;
414                    self.mask_offset = 0;
415
416                    // Track where control payload starts during assembly
417                    if parsed.opcode.is_control() && self.assembling {
418                        self.ctrl_payload_offset = self.msg_buf.len();
419                    }
420
421                    if available > 0 {
422                        self.consume_partial_payload(available);
423                    }
424
425                    self.state = ParseState::Payload {
426                        opcode: parsed.opcode,
427                        fin: parsed.fin,
428                    };
429                    return Ok(None);
430                }
431            }
432        }
433    }
434
435    /// Route a completed frame. Returns the opcode to surface as a
436    /// Message, or None if the frame was consumed internally (assembly).
437    #[inline(always)]
438    fn route_opcode(
439        &mut self,
440        opcode: RawOpcode,
441        fin: bool,
442    ) -> Result<Option<RawOpcode>, ProtocolError> {
443        if opcode.is_control() {
444            return Ok(Some(opcode));
445        }
446
447        match opcode {
448            RawOpcode::Text | RawOpcode::Binary => {
449                if self.assembling {
450                    return Err(ProtocolError::NewMessageDuringAssembly);
451                }
452                if fin {
453                    return Ok(Some(opcode));
454                }
455                // Start assembly — payload already in msg_buf
456                self.assembling = true;
457                self.assembly_opcode = Some(opcode);
458                self.utf8_valid_up_to = 0;
459                if opcode == RawOpcode::Text {
460                    let pending = validate_utf8_incremental(&self.msg_buf, false)?;
461                    self.utf8_valid_up_to = self.msg_buf.len() - pending as usize;
462                }
463                Ok(None)
464            }
465            RawOpcode::Continuation => {
466                if !self.assembling {
467                    return Err(ProtocolError::ContinuationWithoutStart);
468                }
469                if self.assembly_opcode == Some(RawOpcode::Text) {
470                    let to_check = &self.msg_buf[self.utf8_valid_up_to..];
471                    let pending = validate_utf8_incremental(to_check, fin)?;
472                    self.utf8_valid_up_to = self.msg_buf.len() - pending as usize;
473                }
474                if fin {
475                    self.assembling = false;
476                    let opcode = self
477                        .assembly_opcode
478                        .take()
479                        .expect("assembly_opcode must be Some when assembling is true");
480                    self.utf8_valid_up_to = 0;
481                    return Ok(Some(opcode));
482                }
483                Ok(None)
484            }
485            _ => unreachable!(),
486        }
487    }
488
489    /// Construct a Message. For zero-copy: borrows from ReadBuf.
490    /// For assembled: borrows from msg_buf.
491    #[inline(always)]
492    fn make_message(&self, opcode: RawOpcode) -> Result<Option<Message<'_>>, ProtocolError> {
493        let payload = match self.pending_cleanup {
494            PendingCleanup::AdvanceReadBuf(n) => &self.buf.data()[..n],
495            PendingCleanup::TruncateMsgBuf(offset) => &self.msg_buf[offset..],
496            PendingCleanup::ClearMsgBuf | PendingCleanup::None => &self.msg_buf[..],
497        };
498
499        match opcode {
500            RawOpcode::Ping => Ok(Some(Message::Ping(payload))),
501            RawOpcode::Pong => Ok(Some(Message::Pong(payload))),
502            RawOpcode::Close => Self::parse_close_from(payload),
503            RawOpcode::Text => {
504                let s = match self.pending_cleanup {
505                    PendingCleanup::ClearMsgBuf => {
506                        // SAFETY: Every byte in msg_buf was validated via
507                        // validate_utf8_incremental() in route_opcode():
508                        //   1. Initial text frame: validated on entry (line 435)
509                        //   2. Each continuation: validated on append (line 447)
510                        //   3. Final frame (fin=true): validated with is_final=true
511                        //      which rejects incomplete codepoints at the boundary
512                        // No bytes enter msg_buf without passing through this
513                        // validation chain. Re-validating here would waste cycles
514                        // on the hot path (~5-20 cycles for 128B via simdutf8).
515                        unsafe { std::str::from_utf8_unchecked(payload) }
516                    }
517                    _ => {
518                        // Single-frame zero-copy: first and only validation.
519                        simdutf8::basic::from_utf8(payload)
520                            .map_err(|_| ProtocolError::InvalidUtf8)?
521                    }
522                };
523                Ok(Some(Message::Text(s)))
524            }
525            RawOpcode::Binary => Ok(Some(Message::Binary(payload))),
526            RawOpcode::Continuation => unreachable!("pump never returns Continuation"),
527        }
528    }
529
530    #[inline]
531    fn header_size(byte1: u8) -> usize {
532        let masked = byte1 & 0x80 != 0;
533        let len_code = byte1 & 0x7F;
534        let base = match len_code {
535            0..=125 => 2,
536            126 => 4,
537            _ => 10,
538        };
539        if masked { base + 4 } else { base }
540    }
541
542    #[inline]
543    fn parse_header(&self, header: &[u8]) -> Result<ParsedHeader, ProtocolError> {
544        let byte0 = header[0];
545        let byte1 = header[1];
546        let fin = byte0 & 0x80 != 0;
547        let rsv = (byte0 >> 4) & 0x07;
548        let opcode_raw = byte0 & 0x0F;
549        let masked = byte1 & 0x80 != 0;
550        let len_code = byte1 & 0x7F;
551
552        if rsv != 0 {
553            return Err(ProtocolError::ReservedBitsSet { bits: rsv });
554        }
555
556        let opcode =
557            RawOpcode::from_u8(opcode_raw).ok_or(ProtocolError::InvalidOpcode(opcode_raw))?;
558
559        match self.role {
560            Role::Server if !masked => return Err(ProtocolError::UnmaskedFrameFromClient),
561            Role::Client if masked => return Err(ProtocolError::MaskedFrameFromServer),
562            _ => {}
563        }
564
565        let (payload_len, mask_offset) = match len_code {
566            0..=125 => (u64::from(len_code), 2),
567            126 => {
568                let len = u16::from_be_bytes([header[2], header[3]]);
569                (u64::from(len), 4)
570            }
571            _ => {
572                let len = u64::from_be_bytes(
573                    header[2..10]
574                        .try_into()
575                        .expect("64-bit length field is 8 bytes"),
576                );
577                (len, 10)
578            }
579        };
580
581        if opcode.is_control() {
582            if payload_len > 125 {
583                return Err(ProtocolError::ControlFrameTooLarge { size: payload_len });
584            }
585            if !fin {
586                return Err(ProtocolError::FragmentedControlFrame);
587            }
588        }
589
590        if payload_len > self.max_frame_size {
591            return Err(ProtocolError::PayloadTooLarge {
592                size: payload_len,
593                max: self.max_frame_size,
594            });
595        }
596
597        let mask_key = if masked {
598            Some([
599                header[mask_offset],
600                header[mask_offset + 1],
601                header[mask_offset + 2],
602                header[mask_offset + 3],
603            ])
604        } else {
605            None
606        };
607
608        let payload_len =
609            usize::try_from(payload_len).map_err(|_| ProtocolError::PayloadTooLarge {
610                size: payload_len,
611                max: self.max_frame_size,
612            })?;
613
614        Ok(ParsedHeader {
615            fin,
616            opcode,
617            mask_key,
618            payload_len,
619        })
620    }
621
622    /// Consume partial payload from ReadBuf → msg_buf (for frames spanning reads).
623    #[cold]
624    fn consume_partial_payload(&mut self, n: usize) {
625        if n == 0 {
626            return;
627        }
628        if let Some(key) = self.mask_key {
629            let data = &mut self.buf.data_mut()[..n];
630            let offset = self.mask_offset as usize;
631            let rotated = [
632                key[offset % 4],
633                key[(offset + 1) % 4],
634                key[(offset + 2) % 4],
635                key[(offset + 3) % 4],
636            ];
637            apply_mask(data, rotated);
638            self.mask_offset = ((offset + n) % 4) as u8;
639        }
640        let data = &self.buf.data()[..n];
641        self.msg_buf.extend_from_slice(data);
642        self.buf.advance(n);
643        self.remaining_payload -= n;
644    }
645
646    #[cold]
647    fn parse_close_from(buf: &[u8]) -> Result<Option<Message<'_>>, ProtocolError> {
648        if buf.is_empty() {
649            return Ok(Some(Message::Close(CloseFrame {
650                code: CloseCode::NoStatus,
651                reason: "",
652            })));
653        }
654        if buf.len() == 1 {
655            return Err(ProtocolError::CloseFrameTooShort);
656        }
657        let raw_code = u16::from_be_bytes([buf[0], buf[1]]);
658        let code = CloseCode::from_u16(raw_code)?;
659        let reason_bytes = &buf[2..];
660        let reason = simdutf8::compat::from_utf8(reason_bytes)
661            .map_err(|_| ProtocolError::InvalidUtf8InCloseReason)?;
662        Ok(Some(Message::Close(CloseFrame { code, reason })))
663    }
664}
665
666struct ParsedHeader {
667    fin: bool,
668    opcode: RawOpcode,
669    mask_key: Option<[u8; 4]>,
670    payload_len: usize,
671}
672
673impl std::fmt::Debug for FrameReader {
674    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
675        f.debug_struct("FrameReader")
676            .field("buffered", &self.buf.len())
677            .field("remaining", &self.buf.remaining())
678            .field("assembling", &self.assembling)
679            .field("role", &self.role)
680            .finish()
681    }
682}
683
684/// Validate UTF-8 incrementally. Returns the number of trailing bytes
685/// that might be an incomplete codepoint (0-3).
686///
687/// On `is_final=true`, no trailing bytes are allowed — the entire
688/// buffer must be valid UTF-8.
689fn validate_utf8_incremental(data: &[u8], is_final: bool) -> Result<u8, ProtocolError> {
690    if data.is_empty() {
691        return Ok(0);
692    }
693
694    if is_final {
695        simdutf8::compat::from_utf8(data).map_err(|_| ProtocolError::InvalidUtf8)?;
696        return Ok(0);
697    }
698
699    match simdutf8::compat::from_utf8(data) {
700        Ok(_) => Ok(0),
701        Err(e) => {
702            let valid_up_to = e.valid_up_to();
703            if e.error_len().is_some() {
704                // Definitively invalid byte sequence
705                return Err(ProtocolError::InvalidUtf8);
706            }
707            // error_len() is None → incomplete sequence at the end
708            let pending = data.len() - valid_up_to;
709            if pending > 3 {
710                return Err(ProtocolError::InvalidUtf8);
711            }
712            Ok(pending as u8)
713        }
714    }
715}
716
717/// Lets a [`WireStream`](crate::WireStream) feed bytes directly into
718/// the FrameReader's spare region — one fewer copy than going through
719/// a slice intermediary.
720impl crate::ParserSink for FrameReader {
721    #[inline]
722    fn spare(&mut self) -> &mut [u8] {
723        FrameReader::spare(self)
724    }
725
726    #[inline]
727    fn filled(&mut self, n: usize) {
728        FrameReader::filled(self, n);
729    }
730}
731
732impl FrameReaderBuilder {
733    /// ReadBuf capacity. Default: 1MB.
734    #[must_use]
735    pub fn buffer_capacity(mut self, n: usize) -> Self {
736        self.buffer_capacity = n;
737        self
738    }
739
740    /// ReadBuf pre-padding. Default: 16.
741    #[must_use]
742    pub fn pre_padding(mut self, n: usize) -> Self {
743        self.pre_padding = n;
744        self
745    }
746
747    /// ReadBuf post-padding. Default: 4.
748    #[must_use]
749    pub fn post_padding(mut self, n: usize) -> Self {
750        self.post_padding = n;
751        self
752    }
753
754    /// Pre-allocate message assembly buffer. Default: 4KB.
755    #[must_use]
756    pub fn message_capacity(mut self, n: usize) -> Self {
757        self.prealloc_capacity = n;
758        self
759    }
760
761    /// Fraction of buffer capacity consumed before proactive compaction.
762    ///
763    /// When the read head has advanced past this fraction of the buffer,
764    /// [`should_compact()`](FrameReader::should_compact) returns `true`.
765    /// This spreads compaction cost across messages instead of concentrating
766    /// it in a single stall when the buffer runs out of spare room.
767    ///
768    /// - `1.0`: never proactively compact — only when spare is empty.
769    /// - `0.5` (default): compact when half the buffer has been consumed.
770    /// - `0.0`: compact on every recv after the first byte is consumed
771    ///   (degenerate — not useful in practice).
772    ///
773    /// Lower values reduce tail latency at the cost of more frequent (but smaller)
774    /// memmoves.
775    #[must_use]
776    pub fn compact_at(mut self, fraction: f64) -> Self {
777        assert!(
778            (0.0..=1.0).contains(&fraction),
779            "compact_at fraction must be in 0.0..=1.0, got {fraction}"
780        );
781        self.compact_at = fraction;
782        self
783    }
784
785    /// Maximum single frame payload. Default: 16MB.
786    #[must_use]
787    pub fn max_frame_size(mut self, n: u64) -> Self {
788        self.max_frame_size = n;
789        self
790    }
791
792    /// Maximum assembled message size. Default: 16MB.
793    #[must_use]
794    pub fn max_message_size(mut self, n: usize) -> Self {
795        self.max_message_size = n;
796        self
797    }
798
799    /// Connection role. Default: Server.
800    #[must_use]
801    pub fn role(mut self, r: Role) -> Self {
802        self.role = r;
803        self
804    }
805
806    /// Build the reader.
807    #[must_use]
808    pub fn build(self) -> FrameReader {
809        let buf_compact_at = if self.compact_at >= 1.0 {
810            usize::MAX
811        } else if self.compact_at <= 0.0 {
812            0
813        } else {
814            (self.buffer_capacity as f64 * self.compact_at).ceil() as usize
815        };
816        FrameReader {
817            buf: ReadBuf::new(self.buffer_capacity, self.pre_padding, self.post_padding),
818            msg_buf: Vec::with_capacity(self.prealloc_capacity),
819            buf_compact_at,
820            state: ParseState::Head,
821            remaining_payload: 0,
822            mask_key: None,
823            mask_offset: 0,
824            assembling: false,
825            assembly_opcode: None,
826            utf8_valid_up_to: 0,
827            role: self.role,
828            max_frame_size: self.max_frame_size,
829            max_message_size: self.max_message_size,
830            pending_cleanup: PendingCleanup::None,
831            pending_opcode: None,
832            ctrl_payload_offset: 0,
833        }
834    }
835}
836
837#[cfg(test)]
838mod tests {
839    use super::*;
840
841    fn make_frame(fin: bool, opcode: u8, payload: &[u8]) -> Vec<u8> {
842        let mut frame = Vec::new();
843        let byte0 = if fin { 0x80 } else { 0x00 } | opcode;
844        frame.push(byte0);
845        if payload.len() <= 125 {
846            frame.push(payload.len() as u8);
847        } else if payload.len() <= 65535 {
848            frame.push(126);
849            frame.extend_from_slice(&(payload.len() as u16).to_be_bytes());
850        } else {
851            frame.push(127);
852            frame.extend_from_slice(&(payload.len() as u64).to_be_bytes());
853        }
854        frame.extend_from_slice(payload);
855        frame
856    }
857
858    fn make_masked_frame(fin: bool, opcode: u8, payload: &[u8], mask: [u8; 4]) -> Vec<u8> {
859        let mut frame = Vec::new();
860        let byte0 = if fin { 0x80 } else { 0x00 } | opcode;
861        frame.push(byte0);
862        let len_byte = if payload.len() <= 125 {
863            payload.len() as u8
864        } else if payload.len() <= 65535 {
865            126
866        } else {
867            127
868        };
869        frame.push(0x80 | len_byte);
870        if payload.len() > 125 && payload.len() <= 65535 {
871            frame.extend_from_slice(&(payload.len() as u16).to_be_bytes());
872        } else if payload.len() > 65535 {
873            frame.extend_from_slice(&(payload.len() as u64).to_be_bytes());
874        }
875        frame.extend_from_slice(&mask);
876        let mut masked = payload.to_vec();
877        apply_mask(&mut masked, mask);
878        frame.extend_from_slice(&masked);
879        frame
880    }
881
882    fn client_reader() -> FrameReader {
883        FrameReader::builder().role(Role::Client).build()
884    }
885
886    fn server_reader() -> FrameReader {
887        FrameReader::builder().role(Role::Server).build()
888    }
889
890    // === Single frame ===
891
892    #[test]
893    fn text_message() {
894        let mut r = client_reader();
895        r.read(&make_frame(true, 0x1, b"Hello")).unwrap();
896        match r.next().unwrap().unwrap() {
897            Message::Text(s) => assert_eq!(s, "Hello"),
898            other => panic!("expected Text, got {other:?}"),
899        }
900    }
901
902    #[test]
903    fn binary_message() {
904        let mut r = client_reader();
905        r.read(&make_frame(true, 0x2, &[0xDE, 0xAD])).unwrap();
906        match r.next().unwrap().unwrap() {
907            Message::Binary(b) => assert_eq!(b, &[0xDE, 0xAD]),
908            other => panic!("expected Binary, got {other:?}"),
909        }
910    }
911
912    #[test]
913    fn empty_text() {
914        let mut r = client_reader();
915        r.read(&make_frame(true, 0x1, b"")).unwrap();
916        match r.next().unwrap().unwrap() {
917            Message::Text(s) => assert_eq!(s, ""),
918            other => panic!("expected empty Text, got {other:?}"),
919        }
920    }
921
922    #[test]
923    fn masked_text() {
924        let mut r = server_reader();
925        let mask = [0x37, 0xFA, 0x21, 0x3D];
926        r.read(&make_masked_frame(true, 0x1, b"Hello", mask))
927            .unwrap();
928        match r.next().unwrap().unwrap() {
929            Message::Text(s) => assert_eq!(s, "Hello"),
930            other => panic!("expected Text, got {other:?}"),
931        }
932    }
933
934    // === Fragment assembly ===
935
936    #[test]
937    fn two_fragments() {
938        let mut r = client_reader();
939        r.read(&make_frame(false, 0x1, b"Hel")).unwrap();
940        r.read(&make_frame(true, 0x0, b"lo")).unwrap();
941        // Both frames buffered — pump assembles in one next() call
942        match r.next().unwrap().unwrap() {
943            Message::Text(s) => assert_eq!(s, "Hello"),
944            other => panic!("expected Text, got {other:?}"),
945        }
946    }
947
948    #[test]
949    fn three_binary_fragments() {
950        let mut r = client_reader();
951        r.read(&make_frame(false, 0x2, b"AB")).unwrap();
952        r.read(&make_frame(false, 0x0, b"CD")).unwrap();
953        r.read(&make_frame(true, 0x0, b"EF")).unwrap();
954        // All three frames buffered — assembles in one next()
955        match r.next().unwrap().unwrap() {
956            Message::Binary(b) => assert_eq!(b, b"ABCDEF"),
957            other => panic!("expected Binary, got {other:?}"),
958        }
959    }
960
961    // === Control frames during assembly ===
962
963    #[test]
964    fn ping_during_assembly() {
965        let mut r = client_reader();
966        r.read(&make_frame(false, 0x1, b"Hel")).unwrap();
967        r.read(&make_frame(true, 0x9, b"ping")).unwrap();
968        r.read(&make_frame(true, 0x0, b"lo")).unwrap();
969
970        // Ping is interleaved — returned first
971        match r.next().unwrap().unwrap() {
972            Message::Ping(p) => assert_eq!(p, b"ping"),
973            other => panic!("expected Ping, got {other:?}"),
974        }
975        // Then the assembled text
976        match r.next().unwrap().unwrap() {
977            Message::Text(s) => assert_eq!(s, "Hello"),
978            other => panic!("expected Text, got {other:?}"),
979        }
980    }
981
982    // === Close frames ===
983
984    #[test]
985    fn close_with_code_and_reason() {
986        let mut r = client_reader();
987        let mut payload = vec![];
988        payload.extend_from_slice(&1000u16.to_be_bytes());
989        payload.extend_from_slice(b"goodbye");
990        r.read(&make_frame(true, 0x8, &payload)).unwrap();
991        match r.next().unwrap().unwrap() {
992            Message::Close(cf) => {
993                assert_eq!(cf.code, CloseCode::Normal);
994                assert_eq!(cf.reason, "goodbye");
995            }
996            other => panic!("expected Close, got {other:?}"),
997        }
998    }
999
1000    #[test]
1001    fn close_no_body() {
1002        let mut r = client_reader();
1003        r.read(&make_frame(true, 0x8, b"")).unwrap();
1004        match r.next().unwrap().unwrap() {
1005            Message::Close(cf) => {
1006                assert_eq!(cf.code, CloseCode::NoStatus);
1007                assert_eq!(cf.reason, "");
1008            }
1009            other => panic!("expected Close, got {other:?}"),
1010        }
1011    }
1012
1013    #[test]
1014    fn close_code_only() {
1015        let mut r = client_reader();
1016        r.read(&make_frame(true, 0x8, &1001u16.to_be_bytes()))
1017            .unwrap();
1018        match r.next().unwrap().unwrap() {
1019            Message::Close(cf) => {
1020                assert_eq!(cf.code, CloseCode::GoingAway);
1021                assert_eq!(cf.reason, "");
1022            }
1023            other => panic!("expected Close, got {other:?}"),
1024        }
1025    }
1026
1027    #[test]
1028    fn close_invalid_code() {
1029        let mut r = client_reader();
1030        r.read(&make_frame(true, 0x8, &999u16.to_be_bytes()))
1031            .unwrap();
1032        assert!(matches!(
1033            r.next(),
1034            Err(ProtocolError::InvalidCloseCode(999))
1035        ));
1036    }
1037
1038    #[test]
1039    fn close_invalid_utf8_reason() {
1040        let mut r = client_reader();
1041        let mut payload = vec![];
1042        payload.extend_from_slice(&1000u16.to_be_bytes());
1043        payload.extend_from_slice(&[0xFF, 0xFE]); // invalid UTF-8
1044        r.read(&make_frame(true, 0x8, &payload)).unwrap();
1045        assert!(matches!(
1046            r.next(),
1047            Err(ProtocolError::InvalidUtf8InCloseReason)
1048        ));
1049    }
1050
1051    #[test]
1052    fn close_too_short() {
1053        let mut r = client_reader();
1054        r.read(&make_frame(true, 0x8, &[0x03])).unwrap(); // 1 byte
1055        assert!(matches!(r.next(), Err(ProtocolError::CloseFrameTooShort)));
1056    }
1057
1058    // === UTF-8 validation ===
1059
1060    #[test]
1061    fn invalid_utf8_text() {
1062        let mut r = client_reader();
1063        r.read(&make_frame(true, 0x1, &[0xFF, 0xFE])).unwrap();
1064        assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1065    }
1066
1067    #[test]
1068    fn multibyte_utf8_across_fragments() {
1069        let mut r = client_reader();
1070        // "é" is [0xC3, 0xA9] — split across two fragments
1071        r.read(&make_frame(false, 0x1, &[0xC3])).unwrap();
1072        r.read(&make_frame(true, 0x0, &[0xA9])).unwrap();
1073        // Both buffered — assembles in one next()
1074        match r.next().unwrap().unwrap() {
1075            Message::Text(s) => assert_eq!(s, "é"),
1076            other => panic!("expected Text, got {other:?}"),
1077        }
1078    }
1079
1080    // === Partial delivery ===
1081
1082    #[test]
1083    fn partial_header() {
1084        let mut r = client_reader();
1085        let frame = make_frame(true, 0x1, b"Hello");
1086        r.read(&frame[..1]).unwrap();
1087        assert!(r.next().unwrap().is_none());
1088        r.read(&frame[1..]).unwrap();
1089        assert!(matches!(r.next().unwrap().unwrap(), Message::Text("Hello")));
1090    }
1091
1092    #[test]
1093    fn payload_spans_reads() {
1094        let mut r = client_reader();
1095        let frame = make_frame(true, 0x1, b"Hello, World!");
1096        r.read(&frame[..7]).unwrap();
1097        assert!(r.next().unwrap().is_none());
1098        r.read(&frame[7..]).unwrap();
1099        assert!(matches!(
1100            r.next().unwrap().unwrap(),
1101            Message::Text("Hello, World!")
1102        ));
1103    }
1104
1105    // === Multiple messages ===
1106
1107    #[test]
1108    fn two_messages_one_read() {
1109        let mut r = client_reader();
1110        let mut data = make_frame(true, 0x1, b"one");
1111        data.extend_from_slice(&make_frame(true, 0x1, b"two"));
1112        r.read(&data).unwrap();
1113
1114        assert!(matches!(r.next().unwrap().unwrap(), Message::Text("one")));
1115        assert!(matches!(r.next().unwrap().unwrap(), Message::Text("two")));
1116    }
1117
1118    // === Protocol errors ===
1119
1120    #[test]
1121    fn masked_from_server() {
1122        let mut r = client_reader();
1123        r.read(&make_masked_frame(true, 0x1, b"x", [1, 2, 3, 4]))
1124            .unwrap();
1125        assert!(matches!(
1126            r.next(),
1127            Err(ProtocolError::MaskedFrameFromServer)
1128        ));
1129    }
1130
1131    #[test]
1132    fn unmasked_from_client() {
1133        let mut r = server_reader();
1134        r.read(&make_frame(true, 0x1, b"x")).unwrap();
1135        assert!(matches!(
1136            r.next(),
1137            Err(ProtocolError::UnmaskedFrameFromClient)
1138        ));
1139    }
1140
1141    #[test]
1142    fn reserved_bits() {
1143        let mut r = client_reader();
1144        let mut frame = make_frame(true, 0x1, b"x");
1145        frame[0] |= 0x40;
1146        r.read(&frame).unwrap();
1147        assert!(matches!(
1148            r.next(),
1149            Err(ProtocolError::ReservedBitsSet { .. })
1150        ));
1151    }
1152
1153    #[test]
1154    fn continuation_without_start() {
1155        let mut r = client_reader();
1156        r.read(&make_frame(true, 0x0, b"orphan")).unwrap();
1157        assert!(matches!(
1158            r.next(),
1159            Err(ProtocolError::ContinuationWithoutStart)
1160        ));
1161    }
1162
1163    #[test]
1164    fn new_message_during_assembly() {
1165        let mut r = client_reader();
1166        r.read(&make_frame(false, 0x1, b"start")).unwrap();
1167        r.read(&make_frame(true, 0x1, b"new")).unwrap();
1168        // pump() encounters the error during assembly
1169        assert!(matches!(
1170            r.next(),
1171            Err(ProtocolError::NewMessageDuringAssembly)
1172        ));
1173    }
1174
1175    #[test]
1176    fn message_too_large() {
1177        let mut r = FrameReader::builder()
1178            .role(Role::Client)
1179            .max_message_size(10)
1180            .build();
1181        r.read(&make_frame(true, 0x1, b"way too long!!")).unwrap();
1182        assert!(matches!(
1183            r.next(),
1184            Err(ProtocolError::MessageTooLarge { .. })
1185        ));
1186    }
1187
1188    #[test]
1189    fn control_frame_too_large() {
1190        let mut r = client_reader();
1191        r.read(&make_frame(true, 0x9, &[0; 126])).unwrap();
1192        assert!(matches!(
1193            r.next(),
1194            Err(ProtocolError::ControlFrameTooLarge { .. })
1195        ));
1196    }
1197
1198    #[test]
1199    fn fragmented_control() {
1200        let mut r = client_reader();
1201        r.read(&make_frame(false, 0x9, b"ping")).unwrap();
1202        assert!(matches!(
1203            r.next(),
1204            Err(ProtocolError::FragmentedControlFrame)
1205        ));
1206    }
1207
1208    // === into_owned ===
1209
1210    #[test]
1211    fn message_into_owned() {
1212        let mut r = client_reader();
1213        r.read(&make_frame(true, 0x1, b"owned")).unwrap();
1214        let msg = r.next().unwrap().unwrap();
1215        let owned = msg.into_owned();
1216        assert!(matches!(owned, super::super::message::OwnedMessage::Text(s) if s == "owned"));
1217    }
1218
1219    // === Buffer full ===
1220
1221    #[test]
1222    fn buffer_full() {
1223        let mut r = FrameReader::builder()
1224            .role(Role::Client)
1225            .buffer_capacity(16)
1226            .build();
1227        assert!(matches!(
1228            r.read(&[0; 32]),
1229            Err(ReadError::BufferFull { .. })
1230        ));
1231    }
1232
1233    // === Reset ===
1234
1235    #[test]
1236    fn reset_then_new_message() {
1237        let mut r = client_reader();
1238        r.read(&make_frame(false, 0x1, b"partial")).unwrap();
1239        let _ = r.next();
1240        r.reset();
1241        assert_eq!(r.buffered(), 0);
1242        // After reset, accepts new messages cleanly
1243        r.read(&make_frame(true, 0x1, b"fresh")).unwrap();
1244        assert!(matches!(r.next().unwrap().unwrap(), Message::Text("fresh")));
1245    }
1246
1247    // === spare/filled direct I/O ===
1248
1249    #[test]
1250    fn spare_filled_path() {
1251        let mut r = client_reader();
1252        let frame = make_frame(true, 0x1, b"direct");
1253        let spare = r.spare();
1254        spare[..frame.len()].copy_from_slice(&frame);
1255        r.filled(frame.len());
1256        assert!(matches!(
1257            r.next().unwrap().unwrap(),
1258            Message::Text("direct")
1259        ));
1260    }
1261
1262    // === Masked payload spanning reads (#8) ===
1263
1264    #[test]
1265    fn masked_payload_spans_reads() {
1266        let mut r = server_reader();
1267        let mask = [0x37, 0xFA, 0x21, 0x3D];
1268        let frame = make_masked_frame(true, 0x1, b"Hello, World!", mask);
1269        // Split mid-payload: 2 header + 4 mask + 4 payload bytes
1270        let split = 10;
1271        r.read(&frame[..split]).unwrap();
1272        assert!(r.next().unwrap().is_none());
1273        r.read(&frame[split..]).unwrap();
1274        assert!(matches!(
1275            r.next().unwrap().unwrap(),
1276            Message::Text("Hello, World!")
1277        ));
1278    }
1279
1280    // === Multiple control frames during assembly (#9) ===
1281
1282    #[test]
1283    fn multiple_controls_during_assembly() {
1284        let mut r = client_reader();
1285        r.read(&make_frame(false, 0x1, b"Hel")).unwrap();
1286        r.read(&make_frame(true, 0x9, b"ping1")).unwrap();
1287        r.read(&make_frame(true, 0xA, b"pong1")).unwrap();
1288        r.read(&make_frame(true, 0x0, b"lo")).unwrap();
1289
1290        match r.next().unwrap().unwrap() {
1291            Message::Ping(p) => assert_eq!(p, b"ping1"),
1292            other => panic!("expected Ping, got {other:?}"),
1293        }
1294        match r.next().unwrap().unwrap() {
1295            Message::Pong(p) => assert_eq!(p, b"pong1"),
1296            other => panic!("expected Pong, got {other:?}"),
1297        }
1298        match r.next().unwrap().unwrap() {
1299            Message::Text(s) => assert_eq!(s, "Hello"),
1300            other => panic!("expected Text, got {other:?}"),
1301        }
1302    }
1303
1304    // === msg_buf clear retains capacity (#10) ===
1305
1306    #[test]
1307    fn msg_buf_clear_retains_capacity() {
1308        let mut r = FrameReader::builder()
1309            .role(Role::Client)
1310            .message_capacity(64)
1311            .buffer_capacity(128 * 1024)
1312            .max_frame_size(128 * 1024)
1313            .max_message_size(128 * 1024)
1314            .build();
1315
1316        let big_payload = vec![0x42; 512];
1317        r.read(&make_frame(false, 0x2, &big_payload[..256]))
1318            .unwrap();
1319        r.read(&make_frame(true, 0x0, &big_payload[256..])).unwrap();
1320
1321        let msg = r.next().unwrap().unwrap();
1322        assert!(matches!(&msg, Message::Binary(b) if b.len() == 512));
1323        let _ = msg;
1324
1325        // Next call triggers cleanup — msg_buf cleared but capacity retained.
1326        // No reallocation: buffer stays warm for the next continuation set.
1327        assert!(r.next().unwrap().is_none());
1328        assert!(r.msg_buf.capacity() >= 512);
1329        assert!(r.msg_buf.is_empty());
1330    }
1331
1332    // === 64-bit payload length (#11) ===
1333
1334    #[test]
1335    fn extended_64bit_length() {
1336        let mut r = FrameReader::builder()
1337            .role(Role::Client)
1338            .buffer_capacity(128 * 1024)
1339            .max_frame_size(128 * 1024)
1340            .max_message_size(128 * 1024)
1341            .build();
1342
1343        let payload = vec![0x42; 70_000];
1344        let frame = make_frame(true, 0x2, &payload);
1345        r.read(&frame).unwrap();
1346        match r.next().unwrap().unwrap() {
1347            Message::Binary(b) => assert_eq!(b.len(), 70_000),
1348            other => panic!("expected Binary, got {other:?}"),
1349        }
1350    }
1351
1352    // === Buffer full with diagnostics (#5) ===
1353
1354    #[test]
1355    fn buffer_full_diagnostics() {
1356        let mut r = FrameReader::builder()
1357            .role(Role::Client)
1358            .buffer_capacity(16)
1359            .build();
1360        match r.read(&[0; 32]) {
1361            Err(ReadError::BufferFull { needed, available }) => {
1362                assert_eq!(needed, 32);
1363                assert_eq!(available, 16);
1364            }
1365            other => panic!("expected BufferFull, got {other:?}"),
1366        }
1367    }
1368
1369    // === Autobahn regression tests ===
1370
1371    /// Autobahn 7.9.4: Close code 1005 must be rejected on the wire.
1372    #[test]
1373    fn close_code_1005_rejected_on_wire() {
1374        let mut r = client_reader();
1375        r.read(&make_frame(true, 0x8, &1005u16.to_be_bytes()))
1376            .unwrap();
1377        assert!(matches!(
1378            r.next(),
1379            Err(ProtocolError::InvalidCloseCode(1005))
1380        ));
1381    }
1382
1383    /// Autobahn 6.4.1: Invalid UTF-8 split across fragments.
1384    #[test]
1385    fn invalid_utf8_across_fragments() {
1386        let mut r = client_reader();
1387        r.read(&make_frame(false, 0x1, b"valid")).unwrap();
1388        r.read(&make_frame(true, 0x0, &[0xFF])).unwrap();
1389        assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1390    }
1391
1392    /// Autobahn 6.4.2: Valid UTF-8 in first fragment, invalid continuation.
1393    #[test]
1394    fn invalid_utf8_in_continuation() {
1395        let mut r = client_reader();
1396        r.read(&make_frame(false, 0x1, &[0xCE, 0xBA])).unwrap(); // valid "κ"
1397        r.read(&make_frame(false, 0x0, &[0xE1, 0xBD])).unwrap(); // incomplete 3-byte
1398        r.read(&make_frame(true, 0x0, &[0xFF])).unwrap(); // invalid continuation byte
1399        assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1400    }
1401
1402    /// Autobahn 1.1.6: 65535-byte text (16-bit length boundary).
1403    #[test]
1404    fn text_65535_bytes() {
1405        let mut r = FrameReader::builder()
1406            .role(Role::Client)
1407            .buffer_capacity(128 * 1024)
1408            .max_message_size(128 * 1024)
1409            .build();
1410        let payload = vec![b'x'; 65535];
1411        r.read(&make_frame(true, 0x1, &payload)).unwrap();
1412        match r.next().unwrap().unwrap() {
1413            Message::Text(s) => assert_eq!(s.len(), 65535),
1414            other => panic!("expected Text, got {other:?}"),
1415        }
1416    }
1417
1418    /// Autobahn 1.1.7: 65536-byte text (crosses into 64-bit length encoding).
1419    #[test]
1420    fn text_65536_bytes() {
1421        let mut r = FrameReader::builder()
1422            .role(Role::Client)
1423            .buffer_capacity(128 * 1024)
1424            .max_message_size(128 * 1024)
1425            .build();
1426        let payload = vec![b'x'; 65536];
1427        r.read(&make_frame(true, 0x1, &payload)).unwrap();
1428        match r.next().unwrap().unwrap() {
1429            Message::Text(s) => assert_eq!(s.len(), 65536),
1430            other => panic!("expected Text, got {other:?}"),
1431        }
1432    }
1433
1434    // === Incremental UTF-8 validation ===
1435
1436    /// Invalid UTF-8 detected on first non-final Text fragment.
1437    #[test]
1438    fn invalid_utf8_detected_on_first_fragment() {
1439        let mut r = client_reader();
1440        r.read(&make_frame(false, 0x1, &[0xFF, 0xFE])).unwrap();
1441        assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1442    }
1443
1444    /// Invalid UTF-8 detected on continuation (before final).
1445    #[test]
1446    fn invalid_utf8_detected_mid_assembly() {
1447        let mut r = client_reader();
1448        r.read(&make_frame(false, 0x1, b"valid")).unwrap();
1449        r.read(&make_frame(false, 0x0, &[0xFF])).unwrap();
1450        // Should fail immediately, not wait for final fragment
1451        assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1452    }
1453
1454    /// Multi-byte codepoint split across two fragments is OK.
1455    #[test]
1456    fn split_codepoint_across_fragments() {
1457        let mut r = client_reader();
1458        // "é" = [0xC3, 0xA9]
1459        r.read(&make_frame(false, 0x1, &[0xC3])).unwrap();
1460        r.read(&make_frame(true, 0x0, &[0xA9])).unwrap();
1461        match r.next().unwrap().unwrap() {
1462            Message::Text(s) => assert_eq!(s, "é"),
1463            other => panic!("expected Text, got {other:?}"),
1464        }
1465    }
1466
1467    /// 4-byte codepoint split 1+3 across fragments.
1468    #[test]
1469    fn split_4byte_codepoint() {
1470        let mut r = client_reader();
1471        // U+1F600 (😀) = [0xF0, 0x9F, 0x98, 0x80]
1472        r.read(&make_frame(false, 0x1, &[0xF0])).unwrap();
1473        r.read(&make_frame(true, 0x0, &[0x9F, 0x98, 0x80])).unwrap();
1474        match r.next().unwrap().unwrap() {
1475            Message::Text(s) => assert_eq!(s, "😀"),
1476            other => panic!("expected Text, got {other:?}"),
1477        }
1478    }
1479
1480    /// Incomplete codepoint at end of final fragment is invalid.
1481    #[test]
1482    fn incomplete_codepoint_at_end() {
1483        let mut r = client_reader();
1484        // Start of 2-byte sequence [0xC3] but message ends
1485        r.read(&make_frame(true, 0x1, &[0xC3])).unwrap();
1486        assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1487    }
1488
1489    /// Binary fragments are NOT UTF-8 validated.
1490    #[test]
1491    fn binary_fragments_skip_utf8() {
1492        let mut r = client_reader();
1493        r.read(&make_frame(false, 0x2, &[0xFF, 0xFE])).unwrap();
1494        r.read(&make_frame(true, 0x0, &[0xFD])).unwrap();
1495        match r.next().unwrap().unwrap() {
1496            Message::Binary(b) => assert_eq!(b, &[0xFF, 0xFE, 0xFD]),
1497            other => panic!("expected Binary, got {other:?}"),
1498        }
1499    }
1500
1501    /// Three fragments with valid UTF-8 split at boundaries.
1502    #[test]
1503    fn three_fragments_valid_utf8() {
1504        let mut r = client_reader();
1505        // "Héllo" = [72, 0xC3, 0xA9, 108, 108, 111]
1506        // Split: "H" + [0xC3] | [0xA9] + "ll" | "o"
1507        r.read(&make_frame(false, 0x1, &[72, 0xC3])).unwrap();
1508        r.read(&make_frame(false, 0x0, &[0xA9, 108, 108])).unwrap();
1509        r.read(&make_frame(true, 0x0, &[111])).unwrap();
1510        match r.next().unwrap().unwrap() {
1511            Message::Text(s) => assert_eq!(s, "Héllo"),
1512            other => panic!("expected Text, got {other:?}"),
1513        }
1514    }
1515
1516    // === FIFO ordering tests ===
1517
1518    fn assert_text(result: Result<Option<Message<'_>>, ProtocolError>, expected: &str) {
1519        match result.unwrap().unwrap() {
1520            Message::Text(s) => assert_eq!(s, expected),
1521            other => panic!("expected Text({expected:?}), got {other:?}"),
1522        }
1523    }
1524
1525    fn assert_binary(result: Result<Option<Message<'_>>, ProtocolError>, expected: &[u8]) {
1526        match result.unwrap().unwrap() {
1527            Message::Binary(b) => assert_eq!(b, expected),
1528            other => panic!("expected Binary, got {other:?}"),
1529        }
1530    }
1531
1532    fn assert_ping(result: Result<Option<Message<'_>>, ProtocolError>, expected: &[u8]) {
1533        match result.unwrap().unwrap() {
1534            Message::Ping(b) => assert_eq!(b, expected),
1535            other => panic!("expected Ping, got {other:?}"),
1536        }
1537    }
1538
1539    fn assert_pong(result: Result<Option<Message<'_>>, ProtocolError>, expected: &[u8]) {
1540        match result.unwrap().unwrap() {
1541            Message::Pong(b) => assert_eq!(b, expected),
1542            other => panic!("expected Pong, got {other:?}"),
1543        }
1544    }
1545
1546    #[test]
1547    fn fifo_three_texts_one_read() {
1548        let mut r = client_reader();
1549        let mut data = make_frame(true, 0x1, b"first");
1550        data.extend(&make_frame(true, 0x1, b"second"));
1551        data.extend(&make_frame(true, 0x1, b"third"));
1552        r.read(&data).unwrap();
1553        assert_text(r.next(), "first");
1554        assert_text(r.next(), "second");
1555        assert_text(r.next(), "third");
1556    }
1557
1558    #[test]
1559    fn fifo_mixed_text_binary() {
1560        let mut r = client_reader();
1561        let mut data = make_frame(true, 0x1, b"text1");
1562        data.extend(&make_frame(true, 0x2, &[0x01]));
1563        data.extend(&make_frame(true, 0x1, b"text2"));
1564        data.extend(&make_frame(true, 0x2, &[0x02]));
1565        r.read(&data).unwrap();
1566        assert_text(r.next(), "text1");
1567        assert_binary(r.next(), &[0x01]);
1568        assert_text(r.next(), "text2");
1569        assert_binary(r.next(), &[0x02]);
1570    }
1571
1572    #[test]
1573    fn fifo_single_assembled_single() {
1574        let mut r = client_reader();
1575        let mut data = make_frame(true, 0x1, b"before");
1576        data.extend(&make_frame(false, 0x1, b"frag"));
1577        data.extend(&make_frame(true, 0x0, b"mented"));
1578        data.extend(&make_frame(true, 0x1, b"after"));
1579        r.read(&data).unwrap();
1580        assert_text(r.next(), "before");
1581        assert_text(r.next(), "fragmented");
1582        assert_text(r.next(), "after");
1583    }
1584
1585    #[test]
1586    fn fifo_assembled_then_single() {
1587        let mut r = client_reader();
1588        let mut data = make_frame(false, 0x2, &[0xAA]);
1589        data.extend(&make_frame(true, 0x0, &[0xBB]));
1590        data.extend(&make_frame(true, 0x1, b"after"));
1591        r.read(&data).unwrap();
1592        assert_binary(r.next(), &[0xAA, 0xBB]);
1593        assert_text(r.next(), "after");
1594    }
1595
1596    #[test]
1597    fn fifo_data_ping_data() {
1598        let mut r = client_reader();
1599        let mut data = make_frame(true, 0x1, b"msg1");
1600        data.extend(&make_frame(true, 0x9, b"ping"));
1601        data.extend(&make_frame(true, 0x1, b"msg2"));
1602        r.read(&data).unwrap();
1603        assert_text(r.next(), "msg1");
1604        assert_ping(r.next(), b"ping");
1605        assert_text(r.next(), "msg2");
1606    }
1607
1608    #[test]
1609    fn fifo_assembly_with_control_then_data() {
1610        let mut r = client_reader();
1611        let mut data = make_frame(false, 0x1, b"hel");
1612        data.extend(&make_frame(true, 0x9, b"ping"));
1613        data.extend(&make_frame(true, 0x0, b"lo"));
1614        data.extend(&make_frame(true, 0x1, b"next"));
1615        r.read(&data).unwrap();
1616        assert_ping(r.next(), b"ping");
1617        assert_text(r.next(), "hello");
1618        assert_text(r.next(), "next");
1619    }
1620
1621    #[test]
1622    fn fifo_assembly_with_multiple_controls() {
1623        let mut r = client_reader();
1624        let mut data = make_frame(false, 0x2, &[0x01]);
1625        data.extend(&make_frame(true, 0x9, b"p1"));
1626        data.extend(&make_frame(true, 0xA, b"p2"));
1627        data.extend(&make_frame(true, 0x0, &[0x02]));
1628        data.extend(&make_frame(true, 0x1, b"after"));
1629        r.read(&data).unwrap();
1630        assert_ping(r.next(), b"p1");
1631        assert_pong(r.next(), b"p2");
1632        assert_binary(r.next(), &[0x01, 0x02]);
1633        assert_text(r.next(), "after");
1634    }
1635
1636    #[test]
1637    fn fifo_across_reads() {
1638        let mut r = client_reader();
1639        let frame1 = make_frame(true, 0x1, b"first");
1640        let frame2 = make_frame(true, 0x1, b"second");
1641        r.read(&frame1).unwrap();
1642        assert_text(r.next(), "first");
1643        r.read(&frame2).unwrap();
1644        assert_text(r.next(), "second");
1645    }
1646
1647    #[test]
1648    fn fifo_partial_then_complete() {
1649        let mut r = client_reader();
1650        let frame1 = make_frame(true, 0x1, b"first");
1651        let frame2 = make_frame(true, 0x1, b"second");
1652        let mut all = frame1;
1653        all.extend(&frame2);
1654        r.read(&all[..3]).unwrap();
1655        assert!(r.next().unwrap().is_none());
1656        r.read(&all[3..]).unwrap();
1657        assert_text(r.next(), "first");
1658        assert_text(r.next(), "second");
1659    }
1660
1661    #[test]
1662    fn fifo_100_messages_one_read() {
1663        let mut r = FrameReader::builder()
1664            .role(Role::Client)
1665            .buffer_capacity(256 * 1024)
1666            .build();
1667
1668        let mut data = Vec::new();
1669        for i in 0u32..100 {
1670            let payload = i.to_be_bytes();
1671            data.extend(&make_frame(true, 0x2, &payload));
1672        }
1673        r.read(&data).unwrap();
1674
1675        for i in 0u32..100 {
1676            match r.next().unwrap().unwrap() {
1677                Message::Binary(b) => {
1678                    let val = u32::from_be_bytes(b.try_into().unwrap());
1679                    assert_eq!(val, i, "message {i} out of order");
1680                }
1681                other => panic!("expected Binary, got {other:?}"),
1682            }
1683        }
1684        assert!(r.next().unwrap().is_none());
1685    }
1686
1687    // =========================================================================
1688    // should_compact() edge cases
1689    // =========================================================================
1690
1691    #[test]
1692    fn should_compact_default_half() {
1693        let mut r = FrameReader::builder()
1694            .buffer_capacity(1024)
1695            .role(Role::Client)
1696            .build();
1697        // Nothing consumed yet — should not compact.
1698        assert!(!r.should_compact());
1699
1700        // Feed two frames. Consume the first, then call poll() to trigger
1701        // deferred cleanup (ReadBuf advance). The second frame keeps data
1702        // in the buffer so head doesn't auto-reset.
1703        let mut data = make_frame(true, 0x2, &[0xAA; 600]);
1704        data.extend_from_slice(&make_frame(true, 0x2, &[0xBB; 10]));
1705        r.read(&data).unwrap();
1706        assert!(r.next().unwrap().is_some());
1707        // Trigger deferred cleanup — advances head past first frame.
1708        let _ = r.poll().unwrap();
1709        // consumed ~604 > 512 (50% of 1024) → should compact.
1710        assert!(r.should_compact());
1711    }
1712
1713    #[test]
1714    fn should_compact_at_one_never_triggers() {
1715        let mut r = FrameReader::builder()
1716            .buffer_capacity(256)
1717            .compact_at(1.0)
1718            .role(Role::Client)
1719            .build();
1720        // Consume nearly all the buffer.
1721        let frame = make_frame(true, 0x2, &[0xBB; 200]);
1722        r.read(&frame).unwrap();
1723        let _ = r.next().unwrap();
1724        // compact_at(1.0) → buf_compact_at = usize::MAX, never triggers.
1725        assert!(!r.should_compact());
1726    }
1727
1728    #[test]
1729    fn should_compact_at_zero() {
1730        let mut r = FrameReader::builder()
1731            .buffer_capacity(256)
1732            .compact_at(0.0)
1733            .role(Role::Client)
1734            .build();
1735        // Nothing consumed — should NOT compact even with threshold 0.
1736        assert!(!r.should_compact());
1737
1738        // Feed two frames, consume the first, trigger deferred cleanup.
1739        let mut data = make_frame(true, 0x2, &[0xCC; 10]);
1740        data.extend_from_slice(&make_frame(true, 0x2, &[0xDD; 5]));
1741        r.read(&data).unwrap();
1742        assert!(r.next().unwrap().is_some());
1743        let _ = r.poll().unwrap(); // deferred advance
1744        // Now consumed > 0 and threshold is 0 — should compact.
1745        assert!(r.should_compact());
1746    }
1747
1748    #[test]
1749    fn should_compact_small_buffer_small_fraction() {
1750        // buffer_capacity=64, compact_at=0.1 → ceil(6.4) = 7
1751        let mut r = FrameReader::builder()
1752            .buffer_capacity(64)
1753            .compact_at(0.1)
1754            .role(Role::Client)
1755            .build();
1756        assert!(!r.should_compact());
1757
1758        // Feed two small frames, consume the first, trigger deferred cleanup.
1759        let mut data = make_frame(true, 0x2, &[0xDD; 10]);
1760        data.extend_from_slice(&make_frame(true, 0x2, &[0xEE; 5]));
1761        r.read(&data).unwrap();
1762        assert!(r.next().unwrap().is_some());
1763        let _ = r.poll().unwrap(); // deferred advance
1764        // consumed (12) >= 7 (ceil threshold) → should compact.
1765        assert!(r.should_compact());
1766    }
1767}