1use super::error::ProtocolError;
2use super::frame::{RawOpcode, Role};
3use super::mask::apply_mask;
4use super::message::{CloseCode, CloseFrame, Message};
5use nexus_net::buf::ReadBuf;
6
7#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum ReadError {
10 BufferFull {
12 needed: usize,
14 available: usize,
16 },
17}
18
19impl std::fmt::Display for ReadError {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 match self {
22 Self::BufferFull { needed, available } => {
23 write!(f, "buffer full: need {needed} bytes, {available} available")
24 }
25 }
26 }
27}
28
29impl std::error::Error for ReadError {}
30
31pub struct FrameReader {
57 buf: ReadBuf,
58 msg_buf: Vec<u8>,
59 buf_compact_at: usize,
61
62 state: ParseState,
63 remaining_payload: usize,
64 mask_key: Option<[u8; 4]>,
65 mask_offset: u8,
66
67 assembling: bool,
68 assembly_opcode: Option<RawOpcode>,
69 utf8_valid_up_to: usize,
70
71 role: Role,
72 max_frame_size: u64,
73 max_message_size: usize,
74
75 pending_cleanup: PendingCleanup,
77 pending_opcode: Option<RawOpcode>,
79 ctrl_payload_offset: usize,
82}
83
84#[derive(Clone, Copy, Default)]
86enum PendingCleanup {
87 #[default]
88 None,
89 AdvanceReadBuf(usize),
91 ClearMsgBuf,
93 TruncateMsgBuf(usize),
95}
96
97#[derive(Clone, Copy, Default)]
98enum ParseState {
99 #[default]
100 Head,
101 Payload { opcode: RawOpcode, fin: bool },
103}
104
105pub struct FrameReaderBuilder {
107 buffer_capacity: usize,
108 pre_padding: usize,
109 post_padding: usize,
110 prealloc_capacity: usize,
111 compact_at: f64,
112 max_frame_size: u64,
113 max_message_size: usize,
114 role: Role,
115}
116
117impl FrameReader {
118 #[must_use]
120 pub fn builder() -> FrameReaderBuilder {
121 FrameReaderBuilder {
122 buffer_capacity: 1024 * 1024,
123 pre_padding: 16,
124 post_padding: 4,
125 prealloc_capacity: 4096,
126 compact_at: 0.5,
127 max_frame_size: 16 * 1024 * 1024,
128 max_message_size: 16 * 1024 * 1024,
129 role: Role::Server,
130 }
131 }
132
133 pub fn read(&mut self, src: &[u8]) -> Result<(), ReadError> {
135 let mut spare = self.buf.spare();
136 if src.len() > spare.len() {
137 self.buf.compact();
139 spare = self.buf.spare();
140 if src.len() > spare.len() {
141 return Err(ReadError::BufferFull {
142 needed: src.len(),
143 available: spare.len(),
144 });
145 }
146 }
147 spare[..src.len()].copy_from_slice(src);
148 self.buf.filled(src.len());
149 Ok(())
150 }
151
152 pub fn read_from<R: std::io::Read>(&mut self, src: &mut R) -> std::io::Result<usize> {
162 let mut spare = self.buf.spare();
163 if spare.is_empty() {
164 self.buf.compact();
166 spare = self.buf.spare();
167 if spare.is_empty() {
168 return Err(std::io::Error::other("frame reader buffer full"));
169 }
170 }
171 let n = src.read(spare)?;
172 self.buf.filled(n);
173 Ok(n)
174 }
175
176 #[inline]
178 pub fn spare(&mut self) -> &mut [u8] {
179 self.buf.spare()
180 }
181
182 #[inline]
184 pub fn filled(&mut self, n: usize) {
185 self.buf.filled(n);
186 }
187
188 #[inline]
193 pub fn compact(&mut self) {
194 self.buf.compact();
195 }
196
197 #[inline]
204 pub fn should_compact(&self) -> bool {
205 let consumed = self.buf.consumed();
206 consumed > 0 && consumed >= self.buf_compact_at && !self.buf.is_empty()
207 }
208
209 #[inline]
211 #[allow(clippy::should_implement_trait)]
212 pub fn next(&mut self) -> Result<Option<Message<'_>>, ProtocolError> {
213 if let Some(opcode) = self.pending_opcode.take() {
215 return self.make_message(opcode);
216 }
217
218 self.do_cleanup();
220
221 self.pump()?
222 .map_or(Ok(None), |opcode| self.make_message(opcode))
223 }
224
225 #[inline]
228 pub fn poll(&mut self) -> Result<bool, ProtocolError> {
229 if self.pending_opcode.is_some() {
230 return Ok(true);
231 }
232
233 self.do_cleanup();
234
235 match self.pump()? {
236 None => Ok(false),
237 Some(opcode) => {
238 self.pending_opcode = Some(opcode);
239 Ok(true)
240 }
241 }
242 }
243
244 #[inline]
246 pub fn remaining(&self) -> usize {
247 self.buf.remaining()
248 }
249
250 #[inline]
252 pub fn buffered(&self) -> usize {
253 self.buf.len()
254 }
255
256 pub fn reset(&mut self) {
258 self.buf.clear();
259 self.msg_buf.clear();
260 self.state = ParseState::Head;
261 self.remaining_payload = 0;
262 self.mask_key = None;
263 self.mask_offset = 0;
264 self.assembling = false;
265 self.assembly_opcode = None;
266 self.utf8_valid_up_to = 0;
267 self.pending_cleanup = PendingCleanup::None;
268 self.pending_opcode = None;
269 self.ctrl_payload_offset = 0;
270 }
271
272 #[inline]
278 fn do_cleanup(&mut self) {
279 match self.pending_cleanup {
280 PendingCleanup::None => return,
281 PendingCleanup::AdvanceReadBuf(n) => {
282 self.buf.advance(n);
283 }
284 PendingCleanup::ClearMsgBuf => {
285 self.do_cleanup_msg_buf();
286 }
287 PendingCleanup::TruncateMsgBuf(len) => {
288 self.msg_buf.truncate(len);
289 }
290 }
291 self.pending_cleanup = PendingCleanup::None;
292 }
293
294 #[cold]
297 fn do_cleanup_msg_buf(&mut self) {
298 self.msg_buf.clear();
299 }
300
301 #[inline]
307 fn pump(&mut self) -> Result<Option<RawOpcode>, ProtocolError> {
308 loop {
309 let state = self.state;
310 match state {
311 ParseState::Payload { opcode, fin } => {
312 let available = self.buf.len();
314 if available == 0 {
315 return Ok(None);
316 }
317
318 let take = available.min(self.remaining_payload);
319 self.consume_partial_payload(take);
320
321 if self.remaining_payload == 0 {
322 self.state = ParseState::Head;
323 if let Some(completed) = self.route_opcode(opcode, fin)? {
324 if opcode.is_control() && self.assembling {
325 self.pending_cleanup =
327 PendingCleanup::TruncateMsgBuf(self.ctrl_payload_offset);
328 } else {
329 self.pending_cleanup = PendingCleanup::ClearMsgBuf;
330 }
331 return Ok(Some(completed));
332 }
333 continue;
334 }
335 return Ok(None);
336 }
337
338 ParseState::Head => {
339 let data_len = self.buf.len();
340 if data_len < 2 {
341 return Ok(None);
342 }
343
344 let byte1 = self.buf.data()[1];
345 let header_size = Self::header_size(byte1);
346 if data_len < header_size {
347 return Ok(None);
348 }
349
350 let parsed = {
351 let data = self.buf.data();
352 self.parse_header(&data[..header_size])?
353 };
354
355 let is_control = parsed.opcode.is_control();
356
357 if !is_control {
358 let total = self.msg_buf.len() + parsed.payload_len;
359 if total > self.max_message_size {
360 return Err(ProtocolError::MessageTooLarge {
361 accumulated: total,
362 max: self.max_message_size,
363 });
364 }
365 }
366
367 self.buf.advance(header_size);
369
370 let available = self.buf.len();
371
372 if available >= parsed.payload_len {
373 let payload_len = parsed.payload_len;
375
376 if let Some(mask) = parsed.mask_key
378 && payload_len > 0
379 {
380 let data = &mut self.buf.data_mut()[..payload_len];
381 apply_mask(data, mask);
382 }
383
384 let is_single = parsed.fin && !self.assembling;
385
386 if is_single || is_control {
387 if let Some(completed) = self.route_opcode(parsed.opcode, parsed.fin)? {
390 self.pending_cleanup = PendingCleanup::AdvanceReadBuf(payload_len);
391 return Ok(Some(completed));
392 }
393 self.buf.advance(payload_len);
396 continue;
397 }
398
399 let data = &self.buf.data()[..payload_len];
401 self.msg_buf.extend_from_slice(data);
402 self.buf.advance(payload_len);
403
404 if let Some(completed) = self.route_opcode(parsed.opcode, parsed.fin)? {
405 self.pending_cleanup = PendingCleanup::ClearMsgBuf;
406 return Ok(Some(completed));
407 }
408 continue;
409 }
410
411 self.remaining_payload = parsed.payload_len;
413 self.mask_key = parsed.mask_key;
414 self.mask_offset = 0;
415
416 if parsed.opcode.is_control() && self.assembling {
418 self.ctrl_payload_offset = self.msg_buf.len();
419 }
420
421 if available > 0 {
422 self.consume_partial_payload(available);
423 }
424
425 self.state = ParseState::Payload {
426 opcode: parsed.opcode,
427 fin: parsed.fin,
428 };
429 return Ok(None);
430 }
431 }
432 }
433 }
434
435 #[inline(always)]
438 fn route_opcode(
439 &mut self,
440 opcode: RawOpcode,
441 fin: bool,
442 ) -> Result<Option<RawOpcode>, ProtocolError> {
443 if opcode.is_control() {
444 return Ok(Some(opcode));
445 }
446
447 match opcode {
448 RawOpcode::Text | RawOpcode::Binary => {
449 if self.assembling {
450 return Err(ProtocolError::NewMessageDuringAssembly);
451 }
452 if fin {
453 return Ok(Some(opcode));
454 }
455 self.assembling = true;
457 self.assembly_opcode = Some(opcode);
458 self.utf8_valid_up_to = 0;
459 if opcode == RawOpcode::Text {
460 let pending = validate_utf8_incremental(&self.msg_buf, false)?;
461 self.utf8_valid_up_to = self.msg_buf.len() - pending as usize;
462 }
463 Ok(None)
464 }
465 RawOpcode::Continuation => {
466 if !self.assembling {
467 return Err(ProtocolError::ContinuationWithoutStart);
468 }
469 if self.assembly_opcode == Some(RawOpcode::Text) {
470 let to_check = &self.msg_buf[self.utf8_valid_up_to..];
471 let pending = validate_utf8_incremental(to_check, fin)?;
472 self.utf8_valid_up_to = self.msg_buf.len() - pending as usize;
473 }
474 if fin {
475 self.assembling = false;
476 let opcode = self
477 .assembly_opcode
478 .take()
479 .expect("assembly_opcode must be Some when assembling is true");
480 self.utf8_valid_up_to = 0;
481 return Ok(Some(opcode));
482 }
483 Ok(None)
484 }
485 _ => unreachable!(),
486 }
487 }
488
489 #[inline(always)]
492 fn make_message(&self, opcode: RawOpcode) -> Result<Option<Message<'_>>, ProtocolError> {
493 let payload = match self.pending_cleanup {
494 PendingCleanup::AdvanceReadBuf(n) => &self.buf.data()[..n],
495 PendingCleanup::TruncateMsgBuf(offset) => &self.msg_buf[offset..],
496 PendingCleanup::ClearMsgBuf | PendingCleanup::None => &self.msg_buf[..],
497 };
498
499 match opcode {
500 RawOpcode::Ping => Ok(Some(Message::Ping(payload))),
501 RawOpcode::Pong => Ok(Some(Message::Pong(payload))),
502 RawOpcode::Close => Self::parse_close_from(payload),
503 RawOpcode::Text => {
504 let s = match self.pending_cleanup {
505 PendingCleanup::ClearMsgBuf => {
506 unsafe { std::str::from_utf8_unchecked(payload) }
516 }
517 _ => {
518 simdutf8::basic::from_utf8(payload)
520 .map_err(|_| ProtocolError::InvalidUtf8)?
521 }
522 };
523 Ok(Some(Message::Text(s)))
524 }
525 RawOpcode::Binary => Ok(Some(Message::Binary(payload))),
526 RawOpcode::Continuation => unreachable!("pump never returns Continuation"),
527 }
528 }
529
530 #[inline]
531 fn header_size(byte1: u8) -> usize {
532 let masked = byte1 & 0x80 != 0;
533 let len_code = byte1 & 0x7F;
534 let base = match len_code {
535 0..=125 => 2,
536 126 => 4,
537 _ => 10,
538 };
539 if masked { base + 4 } else { base }
540 }
541
542 #[inline]
543 fn parse_header(&self, header: &[u8]) -> Result<ParsedHeader, ProtocolError> {
544 let byte0 = header[0];
545 let byte1 = header[1];
546 let fin = byte0 & 0x80 != 0;
547 let rsv = (byte0 >> 4) & 0x07;
548 let opcode_raw = byte0 & 0x0F;
549 let masked = byte1 & 0x80 != 0;
550 let len_code = byte1 & 0x7F;
551
552 if rsv != 0 {
553 return Err(ProtocolError::ReservedBitsSet { bits: rsv });
554 }
555
556 let opcode =
557 RawOpcode::from_u8(opcode_raw).ok_or(ProtocolError::InvalidOpcode(opcode_raw))?;
558
559 match self.role {
560 Role::Server if !masked => return Err(ProtocolError::UnmaskedFrameFromClient),
561 Role::Client if masked => return Err(ProtocolError::MaskedFrameFromServer),
562 _ => {}
563 }
564
565 let (payload_len, mask_offset) = match len_code {
566 0..=125 => (u64::from(len_code), 2),
567 126 => {
568 let len = u16::from_be_bytes([header[2], header[3]]);
569 (u64::from(len), 4)
570 }
571 _ => {
572 let len = u64::from_be_bytes(
573 header[2..10]
574 .try_into()
575 .expect("64-bit length field is 8 bytes"),
576 );
577 (len, 10)
578 }
579 };
580
581 if opcode.is_control() {
582 if payload_len > 125 {
583 return Err(ProtocolError::ControlFrameTooLarge { size: payload_len });
584 }
585 if !fin {
586 return Err(ProtocolError::FragmentedControlFrame);
587 }
588 }
589
590 if payload_len > self.max_frame_size {
591 return Err(ProtocolError::PayloadTooLarge {
592 size: payload_len,
593 max: self.max_frame_size,
594 });
595 }
596
597 let mask_key = if masked {
598 Some([
599 header[mask_offset],
600 header[mask_offset + 1],
601 header[mask_offset + 2],
602 header[mask_offset + 3],
603 ])
604 } else {
605 None
606 };
607
608 let payload_len =
609 usize::try_from(payload_len).map_err(|_| ProtocolError::PayloadTooLarge {
610 size: payload_len,
611 max: self.max_frame_size,
612 })?;
613
614 Ok(ParsedHeader {
615 fin,
616 opcode,
617 mask_key,
618 payload_len,
619 })
620 }
621
622 #[cold]
624 fn consume_partial_payload(&mut self, n: usize) {
625 if n == 0 {
626 return;
627 }
628 if let Some(key) = self.mask_key {
629 let data = &mut self.buf.data_mut()[..n];
630 let offset = self.mask_offset as usize;
631 let rotated = [
632 key[offset % 4],
633 key[(offset + 1) % 4],
634 key[(offset + 2) % 4],
635 key[(offset + 3) % 4],
636 ];
637 apply_mask(data, rotated);
638 self.mask_offset = ((offset + n) % 4) as u8;
639 }
640 let data = &self.buf.data()[..n];
641 self.msg_buf.extend_from_slice(data);
642 self.buf.advance(n);
643 self.remaining_payload -= n;
644 }
645
646 #[cold]
647 fn parse_close_from(buf: &[u8]) -> Result<Option<Message<'_>>, ProtocolError> {
648 if buf.is_empty() {
649 return Ok(Some(Message::Close(CloseFrame {
650 code: CloseCode::NoStatus,
651 reason: "",
652 })));
653 }
654 if buf.len() == 1 {
655 return Err(ProtocolError::CloseFrameTooShort);
656 }
657 let raw_code = u16::from_be_bytes([buf[0], buf[1]]);
658 let code = CloseCode::from_u16(raw_code)?;
659 let reason_bytes = &buf[2..];
660 let reason = simdutf8::compat::from_utf8(reason_bytes)
661 .map_err(|_| ProtocolError::InvalidUtf8InCloseReason)?;
662 Ok(Some(Message::Close(CloseFrame { code, reason })))
663 }
664}
665
666struct ParsedHeader {
667 fin: bool,
668 opcode: RawOpcode,
669 mask_key: Option<[u8; 4]>,
670 payload_len: usize,
671}
672
673impl std::fmt::Debug for FrameReader {
674 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
675 f.debug_struct("FrameReader")
676 .field("buffered", &self.buf.len())
677 .field("remaining", &self.buf.remaining())
678 .field("assembling", &self.assembling)
679 .field("role", &self.role)
680 .finish()
681 }
682}
683
684fn validate_utf8_incremental(data: &[u8], is_final: bool) -> Result<u8, ProtocolError> {
690 if data.is_empty() {
691 return Ok(0);
692 }
693
694 if is_final {
695 simdutf8::compat::from_utf8(data).map_err(|_| ProtocolError::InvalidUtf8)?;
696 return Ok(0);
697 }
698
699 match simdutf8::compat::from_utf8(data) {
700 Ok(_) => Ok(0),
701 Err(e) => {
702 let valid_up_to = e.valid_up_to();
703 if e.error_len().is_some() {
704 return Err(ProtocolError::InvalidUtf8);
706 }
707 let pending = data.len() - valid_up_to;
709 if pending > 3 {
710 return Err(ProtocolError::InvalidUtf8);
711 }
712 Ok(pending as u8)
713 }
714 }
715}
716
717impl crate::ParserSink for FrameReader {
721 #[inline]
722 fn spare(&mut self) -> &mut [u8] {
723 FrameReader::spare(self)
724 }
725
726 #[inline]
727 fn filled(&mut self, n: usize) {
728 FrameReader::filled(self, n);
729 }
730}
731
732impl FrameReaderBuilder {
733 #[must_use]
735 pub fn buffer_capacity(mut self, n: usize) -> Self {
736 self.buffer_capacity = n;
737 self
738 }
739
740 #[must_use]
742 pub fn pre_padding(mut self, n: usize) -> Self {
743 self.pre_padding = n;
744 self
745 }
746
747 #[must_use]
749 pub fn post_padding(mut self, n: usize) -> Self {
750 self.post_padding = n;
751 self
752 }
753
754 #[must_use]
756 pub fn message_capacity(mut self, n: usize) -> Self {
757 self.prealloc_capacity = n;
758 self
759 }
760
761 #[must_use]
776 pub fn compact_at(mut self, fraction: f64) -> Self {
777 assert!(
778 (0.0..=1.0).contains(&fraction),
779 "compact_at fraction must be in 0.0..=1.0, got {fraction}"
780 );
781 self.compact_at = fraction;
782 self
783 }
784
785 #[must_use]
787 pub fn max_frame_size(mut self, n: u64) -> Self {
788 self.max_frame_size = n;
789 self
790 }
791
792 #[must_use]
794 pub fn max_message_size(mut self, n: usize) -> Self {
795 self.max_message_size = n;
796 self
797 }
798
799 #[must_use]
801 pub fn role(mut self, r: Role) -> Self {
802 self.role = r;
803 self
804 }
805
806 #[must_use]
808 pub fn build(self) -> FrameReader {
809 let buf_compact_at = if self.compact_at >= 1.0 {
810 usize::MAX
811 } else if self.compact_at <= 0.0 {
812 0
813 } else {
814 (self.buffer_capacity as f64 * self.compact_at).ceil() as usize
815 };
816 FrameReader {
817 buf: ReadBuf::new(self.buffer_capacity, self.pre_padding, self.post_padding),
818 msg_buf: Vec::with_capacity(self.prealloc_capacity),
819 buf_compact_at,
820 state: ParseState::Head,
821 remaining_payload: 0,
822 mask_key: None,
823 mask_offset: 0,
824 assembling: false,
825 assembly_opcode: None,
826 utf8_valid_up_to: 0,
827 role: self.role,
828 max_frame_size: self.max_frame_size,
829 max_message_size: self.max_message_size,
830 pending_cleanup: PendingCleanup::None,
831 pending_opcode: None,
832 ctrl_payload_offset: 0,
833 }
834 }
835}
836
837#[cfg(test)]
838mod tests {
839 use super::*;
840
841 fn make_frame(fin: bool, opcode: u8, payload: &[u8]) -> Vec<u8> {
842 let mut frame = Vec::new();
843 let byte0 = if fin { 0x80 } else { 0x00 } | opcode;
844 frame.push(byte0);
845 if payload.len() <= 125 {
846 frame.push(payload.len() as u8);
847 } else if payload.len() <= 65535 {
848 frame.push(126);
849 frame.extend_from_slice(&(payload.len() as u16).to_be_bytes());
850 } else {
851 frame.push(127);
852 frame.extend_from_slice(&(payload.len() as u64).to_be_bytes());
853 }
854 frame.extend_from_slice(payload);
855 frame
856 }
857
858 fn make_masked_frame(fin: bool, opcode: u8, payload: &[u8], mask: [u8; 4]) -> Vec<u8> {
859 let mut frame = Vec::new();
860 let byte0 = if fin { 0x80 } else { 0x00 } | opcode;
861 frame.push(byte0);
862 let len_byte = if payload.len() <= 125 {
863 payload.len() as u8
864 } else if payload.len() <= 65535 {
865 126
866 } else {
867 127
868 };
869 frame.push(0x80 | len_byte);
870 if payload.len() > 125 && payload.len() <= 65535 {
871 frame.extend_from_slice(&(payload.len() as u16).to_be_bytes());
872 } else if payload.len() > 65535 {
873 frame.extend_from_slice(&(payload.len() as u64).to_be_bytes());
874 }
875 frame.extend_from_slice(&mask);
876 let mut masked = payload.to_vec();
877 apply_mask(&mut masked, mask);
878 frame.extend_from_slice(&masked);
879 frame
880 }
881
882 fn client_reader() -> FrameReader {
883 FrameReader::builder().role(Role::Client).build()
884 }
885
886 fn server_reader() -> FrameReader {
887 FrameReader::builder().role(Role::Server).build()
888 }
889
890 #[test]
893 fn text_message() {
894 let mut r = client_reader();
895 r.read(&make_frame(true, 0x1, b"Hello")).unwrap();
896 match r.next().unwrap().unwrap() {
897 Message::Text(s) => assert_eq!(s, "Hello"),
898 other => panic!("expected Text, got {other:?}"),
899 }
900 }
901
902 #[test]
903 fn binary_message() {
904 let mut r = client_reader();
905 r.read(&make_frame(true, 0x2, &[0xDE, 0xAD])).unwrap();
906 match r.next().unwrap().unwrap() {
907 Message::Binary(b) => assert_eq!(b, &[0xDE, 0xAD]),
908 other => panic!("expected Binary, got {other:?}"),
909 }
910 }
911
912 #[test]
913 fn empty_text() {
914 let mut r = client_reader();
915 r.read(&make_frame(true, 0x1, b"")).unwrap();
916 match r.next().unwrap().unwrap() {
917 Message::Text(s) => assert_eq!(s, ""),
918 other => panic!("expected empty Text, got {other:?}"),
919 }
920 }
921
922 #[test]
923 fn masked_text() {
924 let mut r = server_reader();
925 let mask = [0x37, 0xFA, 0x21, 0x3D];
926 r.read(&make_masked_frame(true, 0x1, b"Hello", mask))
927 .unwrap();
928 match r.next().unwrap().unwrap() {
929 Message::Text(s) => assert_eq!(s, "Hello"),
930 other => panic!("expected Text, got {other:?}"),
931 }
932 }
933
934 #[test]
937 fn two_fragments() {
938 let mut r = client_reader();
939 r.read(&make_frame(false, 0x1, b"Hel")).unwrap();
940 r.read(&make_frame(true, 0x0, b"lo")).unwrap();
941 match r.next().unwrap().unwrap() {
943 Message::Text(s) => assert_eq!(s, "Hello"),
944 other => panic!("expected Text, got {other:?}"),
945 }
946 }
947
948 #[test]
949 fn three_binary_fragments() {
950 let mut r = client_reader();
951 r.read(&make_frame(false, 0x2, b"AB")).unwrap();
952 r.read(&make_frame(false, 0x0, b"CD")).unwrap();
953 r.read(&make_frame(true, 0x0, b"EF")).unwrap();
954 match r.next().unwrap().unwrap() {
956 Message::Binary(b) => assert_eq!(b, b"ABCDEF"),
957 other => panic!("expected Binary, got {other:?}"),
958 }
959 }
960
961 #[test]
964 fn ping_during_assembly() {
965 let mut r = client_reader();
966 r.read(&make_frame(false, 0x1, b"Hel")).unwrap();
967 r.read(&make_frame(true, 0x9, b"ping")).unwrap();
968 r.read(&make_frame(true, 0x0, b"lo")).unwrap();
969
970 match r.next().unwrap().unwrap() {
972 Message::Ping(p) => assert_eq!(p, b"ping"),
973 other => panic!("expected Ping, got {other:?}"),
974 }
975 match r.next().unwrap().unwrap() {
977 Message::Text(s) => assert_eq!(s, "Hello"),
978 other => panic!("expected Text, got {other:?}"),
979 }
980 }
981
982 #[test]
985 fn close_with_code_and_reason() {
986 let mut r = client_reader();
987 let mut payload = vec![];
988 payload.extend_from_slice(&1000u16.to_be_bytes());
989 payload.extend_from_slice(b"goodbye");
990 r.read(&make_frame(true, 0x8, &payload)).unwrap();
991 match r.next().unwrap().unwrap() {
992 Message::Close(cf) => {
993 assert_eq!(cf.code, CloseCode::Normal);
994 assert_eq!(cf.reason, "goodbye");
995 }
996 other => panic!("expected Close, got {other:?}"),
997 }
998 }
999
1000 #[test]
1001 fn close_no_body() {
1002 let mut r = client_reader();
1003 r.read(&make_frame(true, 0x8, b"")).unwrap();
1004 match r.next().unwrap().unwrap() {
1005 Message::Close(cf) => {
1006 assert_eq!(cf.code, CloseCode::NoStatus);
1007 assert_eq!(cf.reason, "");
1008 }
1009 other => panic!("expected Close, got {other:?}"),
1010 }
1011 }
1012
1013 #[test]
1014 fn close_code_only() {
1015 let mut r = client_reader();
1016 r.read(&make_frame(true, 0x8, &1001u16.to_be_bytes()))
1017 .unwrap();
1018 match r.next().unwrap().unwrap() {
1019 Message::Close(cf) => {
1020 assert_eq!(cf.code, CloseCode::GoingAway);
1021 assert_eq!(cf.reason, "");
1022 }
1023 other => panic!("expected Close, got {other:?}"),
1024 }
1025 }
1026
1027 #[test]
1028 fn close_invalid_code() {
1029 let mut r = client_reader();
1030 r.read(&make_frame(true, 0x8, &999u16.to_be_bytes()))
1031 .unwrap();
1032 assert!(matches!(
1033 r.next(),
1034 Err(ProtocolError::InvalidCloseCode(999))
1035 ));
1036 }
1037
1038 #[test]
1039 fn close_invalid_utf8_reason() {
1040 let mut r = client_reader();
1041 let mut payload = vec![];
1042 payload.extend_from_slice(&1000u16.to_be_bytes());
1043 payload.extend_from_slice(&[0xFF, 0xFE]); r.read(&make_frame(true, 0x8, &payload)).unwrap();
1045 assert!(matches!(
1046 r.next(),
1047 Err(ProtocolError::InvalidUtf8InCloseReason)
1048 ));
1049 }
1050
1051 #[test]
1052 fn close_too_short() {
1053 let mut r = client_reader();
1054 r.read(&make_frame(true, 0x8, &[0x03])).unwrap(); assert!(matches!(r.next(), Err(ProtocolError::CloseFrameTooShort)));
1056 }
1057
1058 #[test]
1061 fn invalid_utf8_text() {
1062 let mut r = client_reader();
1063 r.read(&make_frame(true, 0x1, &[0xFF, 0xFE])).unwrap();
1064 assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1065 }
1066
1067 #[test]
1068 fn multibyte_utf8_across_fragments() {
1069 let mut r = client_reader();
1070 r.read(&make_frame(false, 0x1, &[0xC3])).unwrap();
1072 r.read(&make_frame(true, 0x0, &[0xA9])).unwrap();
1073 match r.next().unwrap().unwrap() {
1075 Message::Text(s) => assert_eq!(s, "é"),
1076 other => panic!("expected Text, got {other:?}"),
1077 }
1078 }
1079
1080 #[test]
1083 fn partial_header() {
1084 let mut r = client_reader();
1085 let frame = make_frame(true, 0x1, b"Hello");
1086 r.read(&frame[..1]).unwrap();
1087 assert!(r.next().unwrap().is_none());
1088 r.read(&frame[1..]).unwrap();
1089 assert!(matches!(r.next().unwrap().unwrap(), Message::Text("Hello")));
1090 }
1091
1092 #[test]
1093 fn payload_spans_reads() {
1094 let mut r = client_reader();
1095 let frame = make_frame(true, 0x1, b"Hello, World!");
1096 r.read(&frame[..7]).unwrap();
1097 assert!(r.next().unwrap().is_none());
1098 r.read(&frame[7..]).unwrap();
1099 assert!(matches!(
1100 r.next().unwrap().unwrap(),
1101 Message::Text("Hello, World!")
1102 ));
1103 }
1104
1105 #[test]
1108 fn two_messages_one_read() {
1109 let mut r = client_reader();
1110 let mut data = make_frame(true, 0x1, b"one");
1111 data.extend_from_slice(&make_frame(true, 0x1, b"two"));
1112 r.read(&data).unwrap();
1113
1114 assert!(matches!(r.next().unwrap().unwrap(), Message::Text("one")));
1115 assert!(matches!(r.next().unwrap().unwrap(), Message::Text("two")));
1116 }
1117
1118 #[test]
1121 fn invalid_opcode() {
1122 let mut r = client_reader();
1123 r.read(&make_frame(true, 0x3, b"x")).unwrap();
1125 assert!(matches!(r.next(), Err(ProtocolError::InvalidOpcode(0x3))));
1126 }
1127
1128 #[test]
1129 fn invalid_opcode_0x0f() {
1130 let mut r = client_reader();
1131 r.read(&make_frame(true, 0xF, b"x")).unwrap();
1133 assert!(matches!(r.next(), Err(ProtocolError::InvalidOpcode(0xF))));
1134 }
1135
1136 #[test]
1137 fn payload_too_large() {
1138 let mut r = FrameReader::builder()
1139 .role(Role::Client)
1140 .max_frame_size(64)
1141 .buffer_capacity(256)
1142 .build();
1143 r.read(&make_frame(true, 0x1, &[b'x'; 100])).unwrap();
1144 assert!(matches!(
1145 r.next(),
1146 Err(ProtocolError::PayloadTooLarge { size: 100, max: 64 })
1147 ));
1148 }
1149
1150 #[test]
1151 fn masked_from_server() {
1152 let mut r = client_reader();
1153 r.read(&make_masked_frame(true, 0x1, b"x", [1, 2, 3, 4]))
1154 .unwrap();
1155 assert!(matches!(
1156 r.next(),
1157 Err(ProtocolError::MaskedFrameFromServer)
1158 ));
1159 }
1160
1161 #[test]
1162 fn unmasked_from_client() {
1163 let mut r = server_reader();
1164 r.read(&make_frame(true, 0x1, b"x")).unwrap();
1165 assert!(matches!(
1166 r.next(),
1167 Err(ProtocolError::UnmaskedFrameFromClient)
1168 ));
1169 }
1170
1171 #[test]
1172 fn reserved_bits() {
1173 let mut r = client_reader();
1174 let mut frame = make_frame(true, 0x1, b"x");
1175 frame[0] |= 0x40;
1176 r.read(&frame).unwrap();
1177 assert!(matches!(
1178 r.next(),
1179 Err(ProtocolError::ReservedBitsSet { .. })
1180 ));
1181 }
1182
1183 #[test]
1184 fn continuation_without_start() {
1185 let mut r = client_reader();
1186 r.read(&make_frame(true, 0x0, b"orphan")).unwrap();
1187 assert!(matches!(
1188 r.next(),
1189 Err(ProtocolError::ContinuationWithoutStart)
1190 ));
1191 }
1192
1193 #[test]
1194 fn new_message_during_assembly() {
1195 let mut r = client_reader();
1196 r.read(&make_frame(false, 0x1, b"start")).unwrap();
1197 r.read(&make_frame(true, 0x1, b"new")).unwrap();
1198 assert!(matches!(
1200 r.next(),
1201 Err(ProtocolError::NewMessageDuringAssembly)
1202 ));
1203 }
1204
1205 #[test]
1206 fn message_too_large() {
1207 let mut r = FrameReader::builder()
1208 .role(Role::Client)
1209 .max_message_size(10)
1210 .build();
1211 r.read(&make_frame(true, 0x1, b"way too long!!")).unwrap();
1212 assert!(matches!(
1213 r.next(),
1214 Err(ProtocolError::MessageTooLarge { .. })
1215 ));
1216 }
1217
1218 #[test]
1219 fn control_frame_too_large() {
1220 let mut r = client_reader();
1221 r.read(&make_frame(true, 0x9, &[0; 126])).unwrap();
1222 assert!(matches!(
1223 r.next(),
1224 Err(ProtocolError::ControlFrameTooLarge { .. })
1225 ));
1226 }
1227
1228 #[test]
1229 fn fragmented_control() {
1230 let mut r = client_reader();
1231 r.read(&make_frame(false, 0x9, b"ping")).unwrap();
1232 assert!(matches!(
1233 r.next(),
1234 Err(ProtocolError::FragmentedControlFrame)
1235 ));
1236 }
1237
1238 #[test]
1241 fn message_into_owned() {
1242 let mut r = client_reader();
1243 r.read(&make_frame(true, 0x1, b"owned")).unwrap();
1244 let msg = r.next().unwrap().unwrap();
1245 let owned = msg.into_owned();
1246 assert!(matches!(owned, super::super::message::OwnedMessage::Text(s) if s == "owned"));
1247 }
1248
1249 #[test]
1252 fn buffer_full() {
1253 let mut r = FrameReader::builder()
1254 .role(Role::Client)
1255 .buffer_capacity(16)
1256 .build();
1257 assert!(matches!(
1258 r.read(&[0; 32]),
1259 Err(ReadError::BufferFull { .. })
1260 ));
1261 }
1262
1263 #[test]
1266 fn reset_then_new_message() {
1267 let mut r = client_reader();
1268 r.read(&make_frame(false, 0x1, b"partial")).unwrap();
1269 let _ = r.next();
1270 r.reset();
1271 assert_eq!(r.buffered(), 0);
1272 r.read(&make_frame(true, 0x1, b"fresh")).unwrap();
1274 assert!(matches!(r.next().unwrap().unwrap(), Message::Text("fresh")));
1275 }
1276
1277 #[test]
1280 fn spare_filled_path() {
1281 let mut r = client_reader();
1282 let frame = make_frame(true, 0x1, b"direct");
1283 let spare = r.spare();
1284 spare[..frame.len()].copy_from_slice(&frame);
1285 r.filled(frame.len());
1286 assert!(matches!(
1287 r.next().unwrap().unwrap(),
1288 Message::Text("direct")
1289 ));
1290 }
1291
1292 #[test]
1295 fn masked_payload_spans_reads() {
1296 let mut r = server_reader();
1297 let mask = [0x37, 0xFA, 0x21, 0x3D];
1298 let frame = make_masked_frame(true, 0x1, b"Hello, World!", mask);
1299 let split = 10;
1301 r.read(&frame[..split]).unwrap();
1302 assert!(r.next().unwrap().is_none());
1303 r.read(&frame[split..]).unwrap();
1304 assert!(matches!(
1305 r.next().unwrap().unwrap(),
1306 Message::Text("Hello, World!")
1307 ));
1308 }
1309
1310 #[test]
1313 fn multiple_controls_during_assembly() {
1314 let mut r = client_reader();
1315 r.read(&make_frame(false, 0x1, b"Hel")).unwrap();
1316 r.read(&make_frame(true, 0x9, b"ping1")).unwrap();
1317 r.read(&make_frame(true, 0xA, b"pong1")).unwrap();
1318 r.read(&make_frame(true, 0x0, b"lo")).unwrap();
1319
1320 match r.next().unwrap().unwrap() {
1321 Message::Ping(p) => assert_eq!(p, b"ping1"),
1322 other => panic!("expected Ping, got {other:?}"),
1323 }
1324 match r.next().unwrap().unwrap() {
1325 Message::Pong(p) => assert_eq!(p, b"pong1"),
1326 other => panic!("expected Pong, got {other:?}"),
1327 }
1328 match r.next().unwrap().unwrap() {
1329 Message::Text(s) => assert_eq!(s, "Hello"),
1330 other => panic!("expected Text, got {other:?}"),
1331 }
1332 }
1333
1334 #[test]
1337 fn msg_buf_clear_retains_capacity() {
1338 let mut r = FrameReader::builder()
1339 .role(Role::Client)
1340 .message_capacity(64)
1341 .buffer_capacity(128 * 1024)
1342 .max_frame_size(128 * 1024)
1343 .max_message_size(128 * 1024)
1344 .build();
1345
1346 let big_payload = vec![0x42; 512];
1347 r.read(&make_frame(false, 0x2, &big_payload[..256]))
1348 .unwrap();
1349 r.read(&make_frame(true, 0x0, &big_payload[256..])).unwrap();
1350
1351 let msg = r.next().unwrap().unwrap();
1352 assert!(matches!(&msg, Message::Binary(b) if b.len() == 512));
1353 let _ = msg;
1354
1355 assert!(r.next().unwrap().is_none());
1358 assert!(r.msg_buf.capacity() >= 512);
1359 assert!(r.msg_buf.is_empty());
1360 }
1361
1362 #[test]
1365 fn extended_64bit_length() {
1366 let mut r = FrameReader::builder()
1367 .role(Role::Client)
1368 .buffer_capacity(128 * 1024)
1369 .max_frame_size(128 * 1024)
1370 .max_message_size(128 * 1024)
1371 .build();
1372
1373 let payload = vec![0x42; 70_000];
1374 let frame = make_frame(true, 0x2, &payload);
1375 r.read(&frame).unwrap();
1376 match r.next().unwrap().unwrap() {
1377 Message::Binary(b) => assert_eq!(b.len(), 70_000),
1378 other => panic!("expected Binary, got {other:?}"),
1379 }
1380 }
1381
1382 #[test]
1385 fn buffer_full_diagnostics() {
1386 let mut r = FrameReader::builder()
1387 .role(Role::Client)
1388 .buffer_capacity(16)
1389 .build();
1390 match r.read(&[0; 32]) {
1391 Err(ReadError::BufferFull { needed, available }) => {
1392 assert_eq!(needed, 32);
1393 assert_eq!(available, 16);
1394 }
1395 other => panic!("expected BufferFull, got {other:?}"),
1396 }
1397 }
1398
1399 #[test]
1403 fn close_code_1005_rejected_on_wire() {
1404 let mut r = client_reader();
1405 r.read(&make_frame(true, 0x8, &1005u16.to_be_bytes()))
1406 .unwrap();
1407 assert!(matches!(
1408 r.next(),
1409 Err(ProtocolError::InvalidCloseCode(1005))
1410 ));
1411 }
1412
1413 #[test]
1415 fn invalid_utf8_across_fragments() {
1416 let mut r = client_reader();
1417 r.read(&make_frame(false, 0x1, b"valid")).unwrap();
1418 r.read(&make_frame(true, 0x0, &[0xFF])).unwrap();
1419 assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1420 }
1421
1422 #[test]
1424 fn invalid_utf8_in_continuation() {
1425 let mut r = client_reader();
1426 r.read(&make_frame(false, 0x1, &[0xCE, 0xBA])).unwrap(); r.read(&make_frame(false, 0x0, &[0xE1, 0xBD])).unwrap(); r.read(&make_frame(true, 0x0, &[0xFF])).unwrap(); assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1430 }
1431
1432 #[test]
1434 fn text_65535_bytes() {
1435 let mut r = FrameReader::builder()
1436 .role(Role::Client)
1437 .buffer_capacity(128 * 1024)
1438 .max_message_size(128 * 1024)
1439 .build();
1440 let payload = vec![b'x'; 65535];
1441 r.read(&make_frame(true, 0x1, &payload)).unwrap();
1442 match r.next().unwrap().unwrap() {
1443 Message::Text(s) => assert_eq!(s.len(), 65535),
1444 other => panic!("expected Text, got {other:?}"),
1445 }
1446 }
1447
1448 #[test]
1450 fn text_65536_bytes() {
1451 let mut r = FrameReader::builder()
1452 .role(Role::Client)
1453 .buffer_capacity(128 * 1024)
1454 .max_message_size(128 * 1024)
1455 .build();
1456 let payload = vec![b'x'; 65536];
1457 r.read(&make_frame(true, 0x1, &payload)).unwrap();
1458 match r.next().unwrap().unwrap() {
1459 Message::Text(s) => assert_eq!(s.len(), 65536),
1460 other => panic!("expected Text, got {other:?}"),
1461 }
1462 }
1463
1464 #[test]
1468 fn invalid_utf8_detected_on_first_fragment() {
1469 let mut r = client_reader();
1470 r.read(&make_frame(false, 0x1, &[0xFF, 0xFE])).unwrap();
1471 assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1472 }
1473
1474 #[test]
1476 fn invalid_utf8_detected_mid_assembly() {
1477 let mut r = client_reader();
1478 r.read(&make_frame(false, 0x1, b"valid")).unwrap();
1479 r.read(&make_frame(false, 0x0, &[0xFF])).unwrap();
1480 assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1482 }
1483
1484 #[test]
1486 fn split_codepoint_across_fragments() {
1487 let mut r = client_reader();
1488 r.read(&make_frame(false, 0x1, &[0xC3])).unwrap();
1490 r.read(&make_frame(true, 0x0, &[0xA9])).unwrap();
1491 match r.next().unwrap().unwrap() {
1492 Message::Text(s) => assert_eq!(s, "é"),
1493 other => panic!("expected Text, got {other:?}"),
1494 }
1495 }
1496
1497 #[test]
1499 fn split_4byte_codepoint() {
1500 let mut r = client_reader();
1501 r.read(&make_frame(false, 0x1, &[0xF0])).unwrap();
1503 r.read(&make_frame(true, 0x0, &[0x9F, 0x98, 0x80])).unwrap();
1504 match r.next().unwrap().unwrap() {
1505 Message::Text(s) => assert_eq!(s, "😀"),
1506 other => panic!("expected Text, got {other:?}"),
1507 }
1508 }
1509
1510 #[test]
1512 fn incomplete_codepoint_at_end() {
1513 let mut r = client_reader();
1514 r.read(&make_frame(true, 0x1, &[0xC3])).unwrap();
1516 assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1517 }
1518
1519 #[test]
1521 fn binary_fragments_skip_utf8() {
1522 let mut r = client_reader();
1523 r.read(&make_frame(false, 0x2, &[0xFF, 0xFE])).unwrap();
1524 r.read(&make_frame(true, 0x0, &[0xFD])).unwrap();
1525 match r.next().unwrap().unwrap() {
1526 Message::Binary(b) => assert_eq!(b, &[0xFF, 0xFE, 0xFD]),
1527 other => panic!("expected Binary, got {other:?}"),
1528 }
1529 }
1530
1531 #[test]
1533 fn three_fragments_valid_utf8() {
1534 let mut r = client_reader();
1535 r.read(&make_frame(false, 0x1, &[72, 0xC3])).unwrap();
1538 r.read(&make_frame(false, 0x0, &[0xA9, 108, 108])).unwrap();
1539 r.read(&make_frame(true, 0x0, &[111])).unwrap();
1540 match r.next().unwrap().unwrap() {
1541 Message::Text(s) => assert_eq!(s, "Héllo"),
1542 other => panic!("expected Text, got {other:?}"),
1543 }
1544 }
1545
1546 fn assert_text(result: Result<Option<Message<'_>>, ProtocolError>, expected: &str) {
1549 match result.unwrap().unwrap() {
1550 Message::Text(s) => assert_eq!(s, expected),
1551 other => panic!("expected Text({expected:?}), got {other:?}"),
1552 }
1553 }
1554
1555 fn assert_binary(result: Result<Option<Message<'_>>, ProtocolError>, expected: &[u8]) {
1556 match result.unwrap().unwrap() {
1557 Message::Binary(b) => assert_eq!(b, expected),
1558 other => panic!("expected Binary, got {other:?}"),
1559 }
1560 }
1561
1562 fn assert_ping(result: Result<Option<Message<'_>>, ProtocolError>, expected: &[u8]) {
1563 match result.unwrap().unwrap() {
1564 Message::Ping(b) => assert_eq!(b, expected),
1565 other => panic!("expected Ping, got {other:?}"),
1566 }
1567 }
1568
1569 fn assert_pong(result: Result<Option<Message<'_>>, ProtocolError>, expected: &[u8]) {
1570 match result.unwrap().unwrap() {
1571 Message::Pong(b) => assert_eq!(b, expected),
1572 other => panic!("expected Pong, got {other:?}"),
1573 }
1574 }
1575
1576 #[test]
1577 fn fifo_three_texts_one_read() {
1578 let mut r = client_reader();
1579 let mut data = make_frame(true, 0x1, b"first");
1580 data.extend(&make_frame(true, 0x1, b"second"));
1581 data.extend(&make_frame(true, 0x1, b"third"));
1582 r.read(&data).unwrap();
1583 assert_text(r.next(), "first");
1584 assert_text(r.next(), "second");
1585 assert_text(r.next(), "third");
1586 }
1587
1588 #[test]
1589 fn fifo_mixed_text_binary() {
1590 let mut r = client_reader();
1591 let mut data = make_frame(true, 0x1, b"text1");
1592 data.extend(&make_frame(true, 0x2, &[0x01]));
1593 data.extend(&make_frame(true, 0x1, b"text2"));
1594 data.extend(&make_frame(true, 0x2, &[0x02]));
1595 r.read(&data).unwrap();
1596 assert_text(r.next(), "text1");
1597 assert_binary(r.next(), &[0x01]);
1598 assert_text(r.next(), "text2");
1599 assert_binary(r.next(), &[0x02]);
1600 }
1601
1602 #[test]
1603 fn fifo_single_assembled_single() {
1604 let mut r = client_reader();
1605 let mut data = make_frame(true, 0x1, b"before");
1606 data.extend(&make_frame(false, 0x1, b"frag"));
1607 data.extend(&make_frame(true, 0x0, b"mented"));
1608 data.extend(&make_frame(true, 0x1, b"after"));
1609 r.read(&data).unwrap();
1610 assert_text(r.next(), "before");
1611 assert_text(r.next(), "fragmented");
1612 assert_text(r.next(), "after");
1613 }
1614
1615 #[test]
1616 fn fifo_assembled_then_single() {
1617 let mut r = client_reader();
1618 let mut data = make_frame(false, 0x2, &[0xAA]);
1619 data.extend(&make_frame(true, 0x0, &[0xBB]));
1620 data.extend(&make_frame(true, 0x1, b"after"));
1621 r.read(&data).unwrap();
1622 assert_binary(r.next(), &[0xAA, 0xBB]);
1623 assert_text(r.next(), "after");
1624 }
1625
1626 #[test]
1627 fn fifo_data_ping_data() {
1628 let mut r = client_reader();
1629 let mut data = make_frame(true, 0x1, b"msg1");
1630 data.extend(&make_frame(true, 0x9, b"ping"));
1631 data.extend(&make_frame(true, 0x1, b"msg2"));
1632 r.read(&data).unwrap();
1633 assert_text(r.next(), "msg1");
1634 assert_ping(r.next(), b"ping");
1635 assert_text(r.next(), "msg2");
1636 }
1637
1638 #[test]
1639 fn fifo_assembly_with_control_then_data() {
1640 let mut r = client_reader();
1641 let mut data = make_frame(false, 0x1, b"hel");
1642 data.extend(&make_frame(true, 0x9, b"ping"));
1643 data.extend(&make_frame(true, 0x0, b"lo"));
1644 data.extend(&make_frame(true, 0x1, b"next"));
1645 r.read(&data).unwrap();
1646 assert_ping(r.next(), b"ping");
1647 assert_text(r.next(), "hello");
1648 assert_text(r.next(), "next");
1649 }
1650
1651 #[test]
1652 fn fifo_assembly_with_multiple_controls() {
1653 let mut r = client_reader();
1654 let mut data = make_frame(false, 0x2, &[0x01]);
1655 data.extend(&make_frame(true, 0x9, b"p1"));
1656 data.extend(&make_frame(true, 0xA, b"p2"));
1657 data.extend(&make_frame(true, 0x0, &[0x02]));
1658 data.extend(&make_frame(true, 0x1, b"after"));
1659 r.read(&data).unwrap();
1660 assert_ping(r.next(), b"p1");
1661 assert_pong(r.next(), b"p2");
1662 assert_binary(r.next(), &[0x01, 0x02]);
1663 assert_text(r.next(), "after");
1664 }
1665
1666 #[test]
1667 fn fifo_across_reads() {
1668 let mut r = client_reader();
1669 let frame1 = make_frame(true, 0x1, b"first");
1670 let frame2 = make_frame(true, 0x1, b"second");
1671 r.read(&frame1).unwrap();
1672 assert_text(r.next(), "first");
1673 r.read(&frame2).unwrap();
1674 assert_text(r.next(), "second");
1675 }
1676
1677 #[test]
1678 fn fifo_partial_then_complete() {
1679 let mut r = client_reader();
1680 let frame1 = make_frame(true, 0x1, b"first");
1681 let frame2 = make_frame(true, 0x1, b"second");
1682 let mut all = frame1;
1683 all.extend(&frame2);
1684 r.read(&all[..3]).unwrap();
1685 assert!(r.next().unwrap().is_none());
1686 r.read(&all[3..]).unwrap();
1687 assert_text(r.next(), "first");
1688 assert_text(r.next(), "second");
1689 }
1690
1691 #[test]
1692 fn fifo_100_messages_one_read() {
1693 let mut r = FrameReader::builder()
1694 .role(Role::Client)
1695 .buffer_capacity(256 * 1024)
1696 .build();
1697
1698 let mut data = Vec::new();
1699 for i in 0u32..100 {
1700 let payload = i.to_be_bytes();
1701 data.extend(&make_frame(true, 0x2, &payload));
1702 }
1703 r.read(&data).unwrap();
1704
1705 for i in 0u32..100 {
1706 match r.next().unwrap().unwrap() {
1707 Message::Binary(b) => {
1708 let val = u32::from_be_bytes(b.try_into().unwrap());
1709 assert_eq!(val, i, "message {i} out of order");
1710 }
1711 other => panic!("expected Binary, got {other:?}"),
1712 }
1713 }
1714 assert!(r.next().unwrap().is_none());
1715 }
1716
1717 #[test]
1722 fn should_compact_default_half() {
1723 let mut r = FrameReader::builder()
1724 .buffer_capacity(1024)
1725 .role(Role::Client)
1726 .build();
1727 assert!(!r.should_compact());
1729
1730 let mut data = make_frame(true, 0x2, &[0xAA; 600]);
1734 data.extend_from_slice(&make_frame(true, 0x2, &[0xBB; 10]));
1735 r.read(&data).unwrap();
1736 assert!(r.next().unwrap().is_some());
1737 let _ = r.poll().unwrap();
1739 assert!(r.should_compact());
1741 }
1742
1743 #[test]
1744 fn should_compact_at_one_never_triggers() {
1745 let mut r = FrameReader::builder()
1746 .buffer_capacity(256)
1747 .compact_at(1.0)
1748 .role(Role::Client)
1749 .build();
1750 let frame = make_frame(true, 0x2, &[0xBB; 200]);
1752 r.read(&frame).unwrap();
1753 let _ = r.next().unwrap();
1754 assert!(!r.should_compact());
1756 }
1757
1758 #[test]
1759 fn should_compact_at_zero() {
1760 let mut r = FrameReader::builder()
1761 .buffer_capacity(256)
1762 .compact_at(0.0)
1763 .role(Role::Client)
1764 .build();
1765 assert!(!r.should_compact());
1767
1768 let mut data = make_frame(true, 0x2, &[0xCC; 10]);
1770 data.extend_from_slice(&make_frame(true, 0x2, &[0xDD; 5]));
1771 r.read(&data).unwrap();
1772 assert!(r.next().unwrap().is_some());
1773 let _ = r.poll().unwrap(); assert!(r.should_compact());
1776 }
1777
1778 #[test]
1779 fn should_compact_small_buffer_small_fraction() {
1780 let mut r = FrameReader::builder()
1782 .buffer_capacity(64)
1783 .compact_at(0.1)
1784 .role(Role::Client)
1785 .build();
1786 assert!(!r.should_compact());
1787
1788 let mut data = make_frame(true, 0x2, &[0xDD; 10]);
1790 data.extend_from_slice(&make_frame(true, 0x2, &[0xEE; 5]));
1791 r.read(&data).unwrap();
1792 assert!(r.next().unwrap().is_some());
1793 let _ = r.poll().unwrap(); assert!(r.should_compact());
1796 }
1797}