1use super::error::ProtocolError;
2use super::frame::{RawOpcode, Role};
3use super::mask::apply_mask;
4use super::message::{CloseCode, CloseFrame, Message};
5use crate::buf::ReadBuf;
6
7#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum ReadError {
10 BufferFull {
12 needed: usize,
14 available: usize,
16 },
17}
18
19impl std::fmt::Display for ReadError {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 match self {
22 Self::BufferFull { needed, available } => {
23 write!(f, "buffer full: need {needed} bytes, {available} available")
24 }
25 }
26 }
27}
28
29impl std::error::Error for ReadError {}
30
31pub struct FrameReader {
57 buf: ReadBuf,
58 msg_buf: Vec<u8>,
59 buf_compact_at: usize,
61
62 state: ParseState,
63 remaining_payload: usize,
64 mask_key: Option<[u8; 4]>,
65 mask_offset: u8,
66
67 assembling: bool,
68 assembly_opcode: Option<RawOpcode>,
69 utf8_valid_up_to: usize,
70
71 role: Role,
72 max_frame_size: u64,
73 max_message_size: usize,
74
75 pending_cleanup: PendingCleanup,
77 pending_opcode: Option<RawOpcode>,
79 ctrl_payload_offset: usize,
82}
83
84#[derive(Clone, Copy, Default)]
86enum PendingCleanup {
87 #[default]
88 None,
89 AdvanceReadBuf(usize),
91 ClearMsgBuf,
93 TruncateMsgBuf(usize),
95}
96
97#[derive(Clone, Copy, Default)]
98enum ParseState {
99 #[default]
100 Head,
101 Payload { opcode: RawOpcode, fin: bool },
103}
104
105pub struct FrameReaderBuilder {
107 buffer_capacity: usize,
108 pre_padding: usize,
109 post_padding: usize,
110 prealloc_capacity: usize,
111 compact_at: f64,
112 max_frame_size: u64,
113 max_message_size: usize,
114 role: Role,
115}
116
117impl FrameReader {
118 #[must_use]
120 pub fn builder() -> FrameReaderBuilder {
121 FrameReaderBuilder {
122 buffer_capacity: 1024 * 1024,
123 pre_padding: 16,
124 post_padding: 4,
125 prealloc_capacity: 4096,
126 compact_at: 0.5,
127 max_frame_size: 16 * 1024 * 1024,
128 max_message_size: 16 * 1024 * 1024,
129 role: Role::Server,
130 }
131 }
132
133 pub fn read(&mut self, src: &[u8]) -> Result<(), ReadError> {
135 let mut spare = self.buf.spare();
136 if src.len() > spare.len() {
137 self.buf.compact();
139 spare = self.buf.spare();
140 if src.len() > spare.len() {
141 return Err(ReadError::BufferFull {
142 needed: src.len(),
143 available: spare.len(),
144 });
145 }
146 }
147 spare[..src.len()].copy_from_slice(src);
148 self.buf.filled(src.len());
149 Ok(())
150 }
151
152 pub fn read_from<R: std::io::Read>(&mut self, src: &mut R) -> std::io::Result<usize> {
162 let mut spare = self.buf.spare();
163 if spare.is_empty() {
164 self.buf.compact();
166 spare = self.buf.spare();
167 if spare.is_empty() {
168 return Err(std::io::Error::other("frame reader buffer full"));
169 }
170 }
171 let n = src.read(spare)?;
172 self.buf.filled(n);
173 Ok(n)
174 }
175
176 #[inline]
178 pub fn spare(&mut self) -> &mut [u8] {
179 self.buf.spare()
180 }
181
182 #[inline]
184 pub fn filled(&mut self, n: usize) {
185 self.buf.filled(n);
186 }
187
188 #[inline]
193 pub fn compact(&mut self) {
194 self.buf.compact();
195 }
196
197 #[inline]
204 pub fn should_compact(&self) -> bool {
205 let consumed = self.buf.consumed();
206 consumed > 0 && consumed >= self.buf_compact_at && !self.buf.is_empty()
207 }
208
209 #[inline]
211 #[allow(clippy::should_implement_trait)]
212 pub fn next(&mut self) -> Result<Option<Message<'_>>, ProtocolError> {
213 if let Some(opcode) = self.pending_opcode.take() {
215 return self.make_message(opcode);
216 }
217
218 self.do_cleanup();
220
221 self.pump()?
222 .map_or(Ok(None), |opcode| self.make_message(opcode))
223 }
224
225 #[inline]
228 pub fn poll(&mut self) -> Result<bool, ProtocolError> {
229 if self.pending_opcode.is_some() {
230 return Ok(true);
231 }
232
233 self.do_cleanup();
234
235 match self.pump()? {
236 None => Ok(false),
237 Some(opcode) => {
238 self.pending_opcode = Some(opcode);
239 Ok(true)
240 }
241 }
242 }
243
244 #[inline]
246 pub fn remaining(&self) -> usize {
247 self.buf.remaining()
248 }
249
250 #[inline]
252 pub fn buffered(&self) -> usize {
253 self.buf.len()
254 }
255
256 pub fn reset(&mut self) {
258 self.buf.clear();
259 self.msg_buf.clear();
260 self.state = ParseState::Head;
261 self.remaining_payload = 0;
262 self.mask_key = None;
263 self.mask_offset = 0;
264 self.assembling = false;
265 self.assembly_opcode = None;
266 self.utf8_valid_up_to = 0;
267 self.pending_cleanup = PendingCleanup::None;
268 self.pending_opcode = None;
269 self.ctrl_payload_offset = 0;
270 }
271
272 #[inline]
278 fn do_cleanup(&mut self) {
279 match self.pending_cleanup {
280 PendingCleanup::None => return,
281 PendingCleanup::AdvanceReadBuf(n) => {
282 self.buf.advance(n);
283 }
284 PendingCleanup::ClearMsgBuf => {
285 self.do_cleanup_msg_buf();
286 }
287 PendingCleanup::TruncateMsgBuf(len) => {
288 self.msg_buf.truncate(len);
289 }
290 }
291 self.pending_cleanup = PendingCleanup::None;
292 }
293
294 #[cold]
297 fn do_cleanup_msg_buf(&mut self) {
298 self.msg_buf.clear();
299 }
300
301 #[inline]
307 fn pump(&mut self) -> Result<Option<RawOpcode>, ProtocolError> {
308 loop {
309 let state = self.state;
310 match state {
311 ParseState::Payload { opcode, fin } => {
312 let available = self.buf.len();
314 if available == 0 {
315 return Ok(None);
316 }
317
318 let take = available.min(self.remaining_payload);
319 self.consume_partial_payload(take);
320
321 if self.remaining_payload == 0 {
322 self.state = ParseState::Head;
323 if let Some(completed) = self.route_opcode(opcode, fin)? {
324 if opcode.is_control() && self.assembling {
325 self.pending_cleanup =
327 PendingCleanup::TruncateMsgBuf(self.ctrl_payload_offset);
328 } else {
329 self.pending_cleanup = PendingCleanup::ClearMsgBuf;
330 }
331 return Ok(Some(completed));
332 }
333 continue;
334 }
335 return Ok(None);
336 }
337
338 ParseState::Head => {
339 let data_len = self.buf.len();
340 if data_len < 2 {
341 return Ok(None);
342 }
343
344 let byte1 = self.buf.data()[1];
345 let header_size = Self::header_size(byte1);
346 if data_len < header_size {
347 return Ok(None);
348 }
349
350 let parsed = {
351 let data = self.buf.data();
352 self.parse_header(&data[..header_size])?
353 };
354
355 let is_control = parsed.opcode.is_control();
356
357 if !is_control {
358 let total = self.msg_buf.len() + parsed.payload_len;
359 if total > self.max_message_size {
360 return Err(ProtocolError::MessageTooLarge {
361 accumulated: total,
362 max: self.max_message_size,
363 });
364 }
365 }
366
367 self.buf.advance(header_size);
369
370 let available = self.buf.len();
371
372 if available >= parsed.payload_len {
373 let payload_len = parsed.payload_len;
375
376 if let Some(mask) = parsed.mask_key {
378 if payload_len > 0 {
379 let data = &mut self.buf.data_mut()[..payload_len];
380 apply_mask(data, mask);
381 }
382 }
383
384 let is_single = parsed.fin && !self.assembling;
385
386 if is_single || is_control {
387 if let Some(completed) = self.route_opcode(parsed.opcode, parsed.fin)? {
390 self.pending_cleanup = PendingCleanup::AdvanceReadBuf(payload_len);
391 return Ok(Some(completed));
392 }
393 self.buf.advance(payload_len);
396 continue;
397 }
398
399 let data = &self.buf.data()[..payload_len];
401 self.msg_buf.extend_from_slice(data);
402 self.buf.advance(payload_len);
403
404 if let Some(completed) = self.route_opcode(parsed.opcode, parsed.fin)? {
405 self.pending_cleanup = PendingCleanup::ClearMsgBuf;
406 return Ok(Some(completed));
407 }
408 continue;
409 }
410
411 self.remaining_payload = parsed.payload_len;
413 self.mask_key = parsed.mask_key;
414 self.mask_offset = 0;
415
416 if parsed.opcode.is_control() && self.assembling {
418 self.ctrl_payload_offset = self.msg_buf.len();
419 }
420
421 if available > 0 {
422 self.consume_partial_payload(available);
423 }
424
425 self.state = ParseState::Payload {
426 opcode: parsed.opcode,
427 fin: parsed.fin,
428 };
429 return Ok(None);
430 }
431 }
432 }
433 }
434
435 #[inline(always)]
438 fn route_opcode(
439 &mut self,
440 opcode: RawOpcode,
441 fin: bool,
442 ) -> Result<Option<RawOpcode>, ProtocolError> {
443 if opcode.is_control() {
444 return Ok(Some(opcode));
445 }
446
447 match opcode {
448 RawOpcode::Text | RawOpcode::Binary => {
449 if self.assembling {
450 return Err(ProtocolError::NewMessageDuringAssembly);
451 }
452 if fin {
453 return Ok(Some(opcode));
454 }
455 self.assembling = true;
457 self.assembly_opcode = Some(opcode);
458 self.utf8_valid_up_to = 0;
459 if opcode == RawOpcode::Text {
460 let pending = validate_utf8_incremental(&self.msg_buf, false)?;
461 self.utf8_valid_up_to = self.msg_buf.len() - pending as usize;
462 }
463 Ok(None)
464 }
465 RawOpcode::Continuation => {
466 if !self.assembling {
467 return Err(ProtocolError::ContinuationWithoutStart);
468 }
469 if self.assembly_opcode == Some(RawOpcode::Text) {
470 let to_check = &self.msg_buf[self.utf8_valid_up_to..];
471 let pending = validate_utf8_incremental(to_check, fin)?;
472 self.utf8_valid_up_to = self.msg_buf.len() - pending as usize;
473 }
474 if fin {
475 self.assembling = false;
476 let opcode = self
477 .assembly_opcode
478 .take()
479 .expect("assembly_opcode must be Some when assembling is true");
480 self.utf8_valid_up_to = 0;
481 return Ok(Some(opcode));
482 }
483 Ok(None)
484 }
485 _ => unreachable!(),
486 }
487 }
488
489 #[inline(always)]
492 fn make_message(&self, opcode: RawOpcode) -> Result<Option<Message<'_>>, ProtocolError> {
493 let payload = match self.pending_cleanup {
494 PendingCleanup::AdvanceReadBuf(n) => &self.buf.data()[..n],
495 PendingCleanup::TruncateMsgBuf(offset) => &self.msg_buf[offset..],
496 PendingCleanup::ClearMsgBuf | PendingCleanup::None => &self.msg_buf[..],
497 };
498
499 match opcode {
500 RawOpcode::Ping => Ok(Some(Message::Ping(payload))),
501 RawOpcode::Pong => Ok(Some(Message::Pong(payload))),
502 RawOpcode::Close => Self::parse_close_from(payload),
503 RawOpcode::Text => {
504 let s = match self.pending_cleanup {
505 PendingCleanup::ClearMsgBuf => {
506 unsafe { std::str::from_utf8_unchecked(payload) }
516 }
517 _ => {
518 simdutf8::basic::from_utf8(payload)
520 .map_err(|_| ProtocolError::InvalidUtf8)?
521 }
522 };
523 Ok(Some(Message::Text(s)))
524 }
525 RawOpcode::Binary => Ok(Some(Message::Binary(payload))),
526 RawOpcode::Continuation => unreachable!("pump never returns Continuation"),
527 }
528 }
529
530 #[inline]
531 fn header_size(byte1: u8) -> usize {
532 let masked = byte1 & 0x80 != 0;
533 let len_code = byte1 & 0x7F;
534 let base = match len_code {
535 0..=125 => 2,
536 126 => 4,
537 _ => 10,
538 };
539 if masked { base + 4 } else { base }
540 }
541
542 #[inline]
543 fn parse_header(&self, header: &[u8]) -> Result<ParsedHeader, ProtocolError> {
544 let byte0 = header[0];
545 let byte1 = header[1];
546 let fin = byte0 & 0x80 != 0;
547 let rsv = (byte0 >> 4) & 0x07;
548 let opcode_raw = byte0 & 0x0F;
549 let masked = byte1 & 0x80 != 0;
550 let len_code = byte1 & 0x7F;
551
552 if rsv != 0 {
553 return Err(ProtocolError::ReservedBitsSet { bits: rsv });
554 }
555
556 let opcode =
557 RawOpcode::from_u8(opcode_raw).ok_or(ProtocolError::InvalidOpcode(opcode_raw))?;
558
559 match self.role {
560 Role::Server if !masked => return Err(ProtocolError::UnmaskedFrameFromClient),
561 Role::Client if masked => return Err(ProtocolError::MaskedFrameFromServer),
562 _ => {}
563 }
564
565 let (payload_len, mask_offset) = match len_code {
566 0..=125 => (u64::from(len_code), 2),
567 126 => {
568 let len = u16::from_be_bytes([header[2], header[3]]);
569 (u64::from(len), 4)
570 }
571 _ => {
572 let len = u64::from_be_bytes(
573 header[2..10]
574 .try_into()
575 .expect("64-bit length field is 8 bytes"),
576 );
577 (len, 10)
578 }
579 };
580
581 if opcode.is_control() {
582 if payload_len > 125 {
583 return Err(ProtocolError::ControlFrameTooLarge { size: payload_len });
584 }
585 if !fin {
586 return Err(ProtocolError::FragmentedControlFrame);
587 }
588 }
589
590 if payload_len > self.max_frame_size {
591 return Err(ProtocolError::PayloadTooLarge {
592 size: payload_len,
593 max: self.max_frame_size,
594 });
595 }
596
597 let mask_key = if masked {
598 Some([
599 header[mask_offset],
600 header[mask_offset + 1],
601 header[mask_offset + 2],
602 header[mask_offset + 3],
603 ])
604 } else {
605 None
606 };
607
608 let payload_len =
609 usize::try_from(payload_len).map_err(|_| ProtocolError::PayloadTooLarge {
610 size: payload_len,
611 max: self.max_frame_size,
612 })?;
613
614 Ok(ParsedHeader {
615 fin,
616 opcode,
617 mask_key,
618 payload_len,
619 })
620 }
621
622 #[cold]
624 fn consume_partial_payload(&mut self, n: usize) {
625 if n == 0 {
626 return;
627 }
628 if let Some(key) = self.mask_key {
629 let data = &mut self.buf.data_mut()[..n];
630 let offset = self.mask_offset as usize;
631 let rotated = [
632 key[offset % 4],
633 key[(offset + 1) % 4],
634 key[(offset + 2) % 4],
635 key[(offset + 3) % 4],
636 ];
637 apply_mask(data, rotated);
638 self.mask_offset = ((offset + n) % 4) as u8;
639 }
640 let data = &self.buf.data()[..n];
641 self.msg_buf.extend_from_slice(data);
642 self.buf.advance(n);
643 self.remaining_payload -= n;
644 }
645
646 #[cold]
647 fn parse_close_from(buf: &[u8]) -> Result<Option<Message<'_>>, ProtocolError> {
648 if buf.is_empty() {
649 return Ok(Some(Message::Close(CloseFrame {
650 code: CloseCode::NoStatus,
651 reason: "",
652 })));
653 }
654 if buf.len() == 1 {
655 return Err(ProtocolError::CloseFrameTooShort);
656 }
657 let raw_code = u16::from_be_bytes([buf[0], buf[1]]);
658 let code = CloseCode::from_u16(raw_code)?;
659 let reason_bytes = &buf[2..];
660 let reason = simdutf8::compat::from_utf8(reason_bytes)
661 .map_err(|_| ProtocolError::InvalidUtf8InCloseReason)?;
662 Ok(Some(Message::Close(CloseFrame { code, reason })))
663 }
664}
665
666struct ParsedHeader {
667 fin: bool,
668 opcode: RawOpcode,
669 mask_key: Option<[u8; 4]>,
670 payload_len: usize,
671}
672
673impl std::fmt::Debug for FrameReader {
674 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
675 f.debug_struct("FrameReader")
676 .field("buffered", &self.buf.len())
677 .field("remaining", &self.buf.remaining())
678 .field("assembling", &self.assembling)
679 .field("role", &self.role)
680 .finish()
681 }
682}
683
684fn validate_utf8_incremental(data: &[u8], is_final: bool) -> Result<u8, ProtocolError> {
690 if data.is_empty() {
691 return Ok(0);
692 }
693
694 if is_final {
695 simdutf8::compat::from_utf8(data).map_err(|_| ProtocolError::InvalidUtf8)?;
696 return Ok(0);
697 }
698
699 match simdutf8::compat::from_utf8(data) {
700 Ok(_) => Ok(0),
701 Err(e) => {
702 let valid_up_to = e.valid_up_to();
703 if e.error_len().is_some() {
704 return Err(ProtocolError::InvalidUtf8);
706 }
707 let pending = data.len() - valid_up_to;
709 if pending > 3 {
710 return Err(ProtocolError::InvalidUtf8);
711 }
712 Ok(pending as u8)
713 }
714 }
715}
716
717impl crate::ParserSink for FrameReader {
721 #[inline]
722 fn spare(&mut self) -> &mut [u8] {
723 FrameReader::spare(self)
724 }
725
726 #[inline]
727 fn filled(&mut self, n: usize) {
728 FrameReader::filled(self, n);
729 }
730}
731
732impl FrameReaderBuilder {
733 #[must_use]
735 pub fn buffer_capacity(mut self, n: usize) -> Self {
736 self.buffer_capacity = n;
737 self
738 }
739
740 #[must_use]
742 pub fn pre_padding(mut self, n: usize) -> Self {
743 self.pre_padding = n;
744 self
745 }
746
747 #[must_use]
749 pub fn post_padding(mut self, n: usize) -> Self {
750 self.post_padding = n;
751 self
752 }
753
754 #[must_use]
756 pub fn message_capacity(mut self, n: usize) -> Self {
757 self.prealloc_capacity = n;
758 self
759 }
760
761 #[must_use]
776 pub fn compact_at(mut self, fraction: f64) -> Self {
777 assert!(
778 (0.0..=1.0).contains(&fraction),
779 "compact_at fraction must be in 0.0..=1.0, got {fraction}"
780 );
781 self.compact_at = fraction;
782 self
783 }
784
785 #[must_use]
787 pub fn max_frame_size(mut self, n: u64) -> Self {
788 self.max_frame_size = n;
789 self
790 }
791
792 #[must_use]
794 pub fn max_message_size(mut self, n: usize) -> Self {
795 self.max_message_size = n;
796 self
797 }
798
799 #[must_use]
801 pub fn role(mut self, r: Role) -> Self {
802 self.role = r;
803 self
804 }
805
806 #[must_use]
808 pub fn build(self) -> FrameReader {
809 let buf_compact_at = if self.compact_at >= 1.0 {
810 usize::MAX
811 } else if self.compact_at <= 0.0 {
812 0
813 } else {
814 (self.buffer_capacity as f64 * self.compact_at).ceil() as usize
815 };
816 FrameReader {
817 buf: ReadBuf::new(self.buffer_capacity, self.pre_padding, self.post_padding),
818 msg_buf: Vec::with_capacity(self.prealloc_capacity),
819 buf_compact_at,
820 state: ParseState::Head,
821 remaining_payload: 0,
822 mask_key: None,
823 mask_offset: 0,
824 assembling: false,
825 assembly_opcode: None,
826 utf8_valid_up_to: 0,
827 role: self.role,
828 max_frame_size: self.max_frame_size,
829 max_message_size: self.max_message_size,
830 pending_cleanup: PendingCleanup::None,
831 pending_opcode: None,
832 ctrl_payload_offset: 0,
833 }
834 }
835}
836
837#[cfg(test)]
838mod tests {
839 use super::*;
840
841 fn make_frame(fin: bool, opcode: u8, payload: &[u8]) -> Vec<u8> {
842 let mut frame = Vec::new();
843 let byte0 = if fin { 0x80 } else { 0x00 } | opcode;
844 frame.push(byte0);
845 if payload.len() <= 125 {
846 frame.push(payload.len() as u8);
847 } else if payload.len() <= 65535 {
848 frame.push(126);
849 frame.extend_from_slice(&(payload.len() as u16).to_be_bytes());
850 } else {
851 frame.push(127);
852 frame.extend_from_slice(&(payload.len() as u64).to_be_bytes());
853 }
854 frame.extend_from_slice(payload);
855 frame
856 }
857
858 fn make_masked_frame(fin: bool, opcode: u8, payload: &[u8], mask: [u8; 4]) -> Vec<u8> {
859 let mut frame = Vec::new();
860 let byte0 = if fin { 0x80 } else { 0x00 } | opcode;
861 frame.push(byte0);
862 let len_byte = if payload.len() <= 125 {
863 payload.len() as u8
864 } else if payload.len() <= 65535 {
865 126
866 } else {
867 127
868 };
869 frame.push(0x80 | len_byte);
870 if payload.len() > 125 && payload.len() <= 65535 {
871 frame.extend_from_slice(&(payload.len() as u16).to_be_bytes());
872 } else if payload.len() > 65535 {
873 frame.extend_from_slice(&(payload.len() as u64).to_be_bytes());
874 }
875 frame.extend_from_slice(&mask);
876 let mut masked = payload.to_vec();
877 apply_mask(&mut masked, mask);
878 frame.extend_from_slice(&masked);
879 frame
880 }
881
882 fn client_reader() -> FrameReader {
883 FrameReader::builder().role(Role::Client).build()
884 }
885
886 fn server_reader() -> FrameReader {
887 FrameReader::builder().role(Role::Server).build()
888 }
889
890 #[test]
893 fn text_message() {
894 let mut r = client_reader();
895 r.read(&make_frame(true, 0x1, b"Hello")).unwrap();
896 match r.next().unwrap().unwrap() {
897 Message::Text(s) => assert_eq!(s, "Hello"),
898 other => panic!("expected Text, got {other:?}"),
899 }
900 }
901
902 #[test]
903 fn binary_message() {
904 let mut r = client_reader();
905 r.read(&make_frame(true, 0x2, &[0xDE, 0xAD])).unwrap();
906 match r.next().unwrap().unwrap() {
907 Message::Binary(b) => assert_eq!(b, &[0xDE, 0xAD]),
908 other => panic!("expected Binary, got {other:?}"),
909 }
910 }
911
912 #[test]
913 fn empty_text() {
914 let mut r = client_reader();
915 r.read(&make_frame(true, 0x1, b"")).unwrap();
916 match r.next().unwrap().unwrap() {
917 Message::Text(s) => assert_eq!(s, ""),
918 other => panic!("expected empty Text, got {other:?}"),
919 }
920 }
921
922 #[test]
923 fn masked_text() {
924 let mut r = server_reader();
925 let mask = [0x37, 0xFA, 0x21, 0x3D];
926 r.read(&make_masked_frame(true, 0x1, b"Hello", mask))
927 .unwrap();
928 match r.next().unwrap().unwrap() {
929 Message::Text(s) => assert_eq!(s, "Hello"),
930 other => panic!("expected Text, got {other:?}"),
931 }
932 }
933
934 #[test]
937 fn two_fragments() {
938 let mut r = client_reader();
939 r.read(&make_frame(false, 0x1, b"Hel")).unwrap();
940 r.read(&make_frame(true, 0x0, b"lo")).unwrap();
941 match r.next().unwrap().unwrap() {
943 Message::Text(s) => assert_eq!(s, "Hello"),
944 other => panic!("expected Text, got {other:?}"),
945 }
946 }
947
948 #[test]
949 fn three_binary_fragments() {
950 let mut r = client_reader();
951 r.read(&make_frame(false, 0x2, b"AB")).unwrap();
952 r.read(&make_frame(false, 0x0, b"CD")).unwrap();
953 r.read(&make_frame(true, 0x0, b"EF")).unwrap();
954 match r.next().unwrap().unwrap() {
956 Message::Binary(b) => assert_eq!(b, b"ABCDEF"),
957 other => panic!("expected Binary, got {other:?}"),
958 }
959 }
960
961 #[test]
964 fn ping_during_assembly() {
965 let mut r = client_reader();
966 r.read(&make_frame(false, 0x1, b"Hel")).unwrap();
967 r.read(&make_frame(true, 0x9, b"ping")).unwrap();
968 r.read(&make_frame(true, 0x0, b"lo")).unwrap();
969
970 match r.next().unwrap().unwrap() {
972 Message::Ping(p) => assert_eq!(p, b"ping"),
973 other => panic!("expected Ping, got {other:?}"),
974 }
975 match r.next().unwrap().unwrap() {
977 Message::Text(s) => assert_eq!(s, "Hello"),
978 other => panic!("expected Text, got {other:?}"),
979 }
980 }
981
982 #[test]
985 fn close_with_code_and_reason() {
986 let mut r = client_reader();
987 let mut payload = vec![];
988 payload.extend_from_slice(&1000u16.to_be_bytes());
989 payload.extend_from_slice(b"goodbye");
990 r.read(&make_frame(true, 0x8, &payload)).unwrap();
991 match r.next().unwrap().unwrap() {
992 Message::Close(cf) => {
993 assert_eq!(cf.code, CloseCode::Normal);
994 assert_eq!(cf.reason, "goodbye");
995 }
996 other => panic!("expected Close, got {other:?}"),
997 }
998 }
999
1000 #[test]
1001 fn close_no_body() {
1002 let mut r = client_reader();
1003 r.read(&make_frame(true, 0x8, b"")).unwrap();
1004 match r.next().unwrap().unwrap() {
1005 Message::Close(cf) => {
1006 assert_eq!(cf.code, CloseCode::NoStatus);
1007 assert_eq!(cf.reason, "");
1008 }
1009 other => panic!("expected Close, got {other:?}"),
1010 }
1011 }
1012
1013 #[test]
1014 fn close_code_only() {
1015 let mut r = client_reader();
1016 r.read(&make_frame(true, 0x8, &1001u16.to_be_bytes()))
1017 .unwrap();
1018 match r.next().unwrap().unwrap() {
1019 Message::Close(cf) => {
1020 assert_eq!(cf.code, CloseCode::GoingAway);
1021 assert_eq!(cf.reason, "");
1022 }
1023 other => panic!("expected Close, got {other:?}"),
1024 }
1025 }
1026
1027 #[test]
1028 fn close_invalid_code() {
1029 let mut r = client_reader();
1030 r.read(&make_frame(true, 0x8, &999u16.to_be_bytes()))
1031 .unwrap();
1032 assert!(matches!(
1033 r.next(),
1034 Err(ProtocolError::InvalidCloseCode(999))
1035 ));
1036 }
1037
1038 #[test]
1039 fn close_invalid_utf8_reason() {
1040 let mut r = client_reader();
1041 let mut payload = vec![];
1042 payload.extend_from_slice(&1000u16.to_be_bytes());
1043 payload.extend_from_slice(&[0xFF, 0xFE]); r.read(&make_frame(true, 0x8, &payload)).unwrap();
1045 assert!(matches!(
1046 r.next(),
1047 Err(ProtocolError::InvalidUtf8InCloseReason)
1048 ));
1049 }
1050
1051 #[test]
1052 fn close_too_short() {
1053 let mut r = client_reader();
1054 r.read(&make_frame(true, 0x8, &[0x03])).unwrap(); assert!(matches!(r.next(), Err(ProtocolError::CloseFrameTooShort)));
1056 }
1057
1058 #[test]
1061 fn invalid_utf8_text() {
1062 let mut r = client_reader();
1063 r.read(&make_frame(true, 0x1, &[0xFF, 0xFE])).unwrap();
1064 assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1065 }
1066
1067 #[test]
1068 fn multibyte_utf8_across_fragments() {
1069 let mut r = client_reader();
1070 r.read(&make_frame(false, 0x1, &[0xC3])).unwrap();
1072 r.read(&make_frame(true, 0x0, &[0xA9])).unwrap();
1073 match r.next().unwrap().unwrap() {
1075 Message::Text(s) => assert_eq!(s, "é"),
1076 other => panic!("expected Text, got {other:?}"),
1077 }
1078 }
1079
1080 #[test]
1083 fn partial_header() {
1084 let mut r = client_reader();
1085 let frame = make_frame(true, 0x1, b"Hello");
1086 r.read(&frame[..1]).unwrap();
1087 assert!(r.next().unwrap().is_none());
1088 r.read(&frame[1..]).unwrap();
1089 assert!(matches!(r.next().unwrap().unwrap(), Message::Text("Hello")));
1090 }
1091
1092 #[test]
1093 fn payload_spans_reads() {
1094 let mut r = client_reader();
1095 let frame = make_frame(true, 0x1, b"Hello, World!");
1096 r.read(&frame[..7]).unwrap();
1097 assert!(r.next().unwrap().is_none());
1098 r.read(&frame[7..]).unwrap();
1099 assert!(matches!(
1100 r.next().unwrap().unwrap(),
1101 Message::Text("Hello, World!")
1102 ));
1103 }
1104
1105 #[test]
1108 fn two_messages_one_read() {
1109 let mut r = client_reader();
1110 let mut data = make_frame(true, 0x1, b"one");
1111 data.extend_from_slice(&make_frame(true, 0x1, b"two"));
1112 r.read(&data).unwrap();
1113
1114 assert!(matches!(r.next().unwrap().unwrap(), Message::Text("one")));
1115 assert!(matches!(r.next().unwrap().unwrap(), Message::Text("two")));
1116 }
1117
1118 #[test]
1121 fn masked_from_server() {
1122 let mut r = client_reader();
1123 r.read(&make_masked_frame(true, 0x1, b"x", [1, 2, 3, 4]))
1124 .unwrap();
1125 assert!(matches!(
1126 r.next(),
1127 Err(ProtocolError::MaskedFrameFromServer)
1128 ));
1129 }
1130
1131 #[test]
1132 fn unmasked_from_client() {
1133 let mut r = server_reader();
1134 r.read(&make_frame(true, 0x1, b"x")).unwrap();
1135 assert!(matches!(
1136 r.next(),
1137 Err(ProtocolError::UnmaskedFrameFromClient)
1138 ));
1139 }
1140
1141 #[test]
1142 fn reserved_bits() {
1143 let mut r = client_reader();
1144 let mut frame = make_frame(true, 0x1, b"x");
1145 frame[0] |= 0x40;
1146 r.read(&frame).unwrap();
1147 assert!(matches!(
1148 r.next(),
1149 Err(ProtocolError::ReservedBitsSet { .. })
1150 ));
1151 }
1152
1153 #[test]
1154 fn continuation_without_start() {
1155 let mut r = client_reader();
1156 r.read(&make_frame(true, 0x0, b"orphan")).unwrap();
1157 assert!(matches!(
1158 r.next(),
1159 Err(ProtocolError::ContinuationWithoutStart)
1160 ));
1161 }
1162
1163 #[test]
1164 fn new_message_during_assembly() {
1165 let mut r = client_reader();
1166 r.read(&make_frame(false, 0x1, b"start")).unwrap();
1167 r.read(&make_frame(true, 0x1, b"new")).unwrap();
1168 assert!(matches!(
1170 r.next(),
1171 Err(ProtocolError::NewMessageDuringAssembly)
1172 ));
1173 }
1174
1175 #[test]
1176 fn message_too_large() {
1177 let mut r = FrameReader::builder()
1178 .role(Role::Client)
1179 .max_message_size(10)
1180 .build();
1181 r.read(&make_frame(true, 0x1, b"way too long!!")).unwrap();
1182 assert!(matches!(
1183 r.next(),
1184 Err(ProtocolError::MessageTooLarge { .. })
1185 ));
1186 }
1187
1188 #[test]
1189 fn control_frame_too_large() {
1190 let mut r = client_reader();
1191 r.read(&make_frame(true, 0x9, &[0; 126])).unwrap();
1192 assert!(matches!(
1193 r.next(),
1194 Err(ProtocolError::ControlFrameTooLarge { .. })
1195 ));
1196 }
1197
1198 #[test]
1199 fn fragmented_control() {
1200 let mut r = client_reader();
1201 r.read(&make_frame(false, 0x9, b"ping")).unwrap();
1202 assert!(matches!(
1203 r.next(),
1204 Err(ProtocolError::FragmentedControlFrame)
1205 ));
1206 }
1207
1208 #[test]
1211 fn message_into_owned() {
1212 let mut r = client_reader();
1213 r.read(&make_frame(true, 0x1, b"owned")).unwrap();
1214 let msg = r.next().unwrap().unwrap();
1215 let owned = msg.into_owned();
1216 assert!(matches!(owned, super::super::message::OwnedMessage::Text(s) if s == "owned"));
1217 }
1218
1219 #[test]
1222 fn buffer_full() {
1223 let mut r = FrameReader::builder()
1224 .role(Role::Client)
1225 .buffer_capacity(16)
1226 .build();
1227 assert!(matches!(
1228 r.read(&[0; 32]),
1229 Err(ReadError::BufferFull { .. })
1230 ));
1231 }
1232
1233 #[test]
1236 fn reset_then_new_message() {
1237 let mut r = client_reader();
1238 r.read(&make_frame(false, 0x1, b"partial")).unwrap();
1239 let _ = r.next();
1240 r.reset();
1241 assert_eq!(r.buffered(), 0);
1242 r.read(&make_frame(true, 0x1, b"fresh")).unwrap();
1244 assert!(matches!(r.next().unwrap().unwrap(), Message::Text("fresh")));
1245 }
1246
1247 #[test]
1250 fn spare_filled_path() {
1251 let mut r = client_reader();
1252 let frame = make_frame(true, 0x1, b"direct");
1253 let spare = r.spare();
1254 spare[..frame.len()].copy_from_slice(&frame);
1255 r.filled(frame.len());
1256 assert!(matches!(
1257 r.next().unwrap().unwrap(),
1258 Message::Text("direct")
1259 ));
1260 }
1261
1262 #[test]
1265 fn masked_payload_spans_reads() {
1266 let mut r = server_reader();
1267 let mask = [0x37, 0xFA, 0x21, 0x3D];
1268 let frame = make_masked_frame(true, 0x1, b"Hello, World!", mask);
1269 let split = 10;
1271 r.read(&frame[..split]).unwrap();
1272 assert!(r.next().unwrap().is_none());
1273 r.read(&frame[split..]).unwrap();
1274 assert!(matches!(
1275 r.next().unwrap().unwrap(),
1276 Message::Text("Hello, World!")
1277 ));
1278 }
1279
1280 #[test]
1283 fn multiple_controls_during_assembly() {
1284 let mut r = client_reader();
1285 r.read(&make_frame(false, 0x1, b"Hel")).unwrap();
1286 r.read(&make_frame(true, 0x9, b"ping1")).unwrap();
1287 r.read(&make_frame(true, 0xA, b"pong1")).unwrap();
1288 r.read(&make_frame(true, 0x0, b"lo")).unwrap();
1289
1290 match r.next().unwrap().unwrap() {
1291 Message::Ping(p) => assert_eq!(p, b"ping1"),
1292 other => panic!("expected Ping, got {other:?}"),
1293 }
1294 match r.next().unwrap().unwrap() {
1295 Message::Pong(p) => assert_eq!(p, b"pong1"),
1296 other => panic!("expected Pong, got {other:?}"),
1297 }
1298 match r.next().unwrap().unwrap() {
1299 Message::Text(s) => assert_eq!(s, "Hello"),
1300 other => panic!("expected Text, got {other:?}"),
1301 }
1302 }
1303
1304 #[test]
1307 fn msg_buf_clear_retains_capacity() {
1308 let mut r = FrameReader::builder()
1309 .role(Role::Client)
1310 .message_capacity(64)
1311 .buffer_capacity(128 * 1024)
1312 .max_frame_size(128 * 1024)
1313 .max_message_size(128 * 1024)
1314 .build();
1315
1316 let big_payload = vec![0x42; 512];
1317 r.read(&make_frame(false, 0x2, &big_payload[..256]))
1318 .unwrap();
1319 r.read(&make_frame(true, 0x0, &big_payload[256..])).unwrap();
1320
1321 let msg = r.next().unwrap().unwrap();
1322 assert!(matches!(&msg, Message::Binary(b) if b.len() == 512));
1323 let _ = msg;
1324
1325 assert!(r.next().unwrap().is_none());
1328 assert!(r.msg_buf.capacity() >= 512);
1329 assert!(r.msg_buf.is_empty());
1330 }
1331
1332 #[test]
1335 fn extended_64bit_length() {
1336 let mut r = FrameReader::builder()
1337 .role(Role::Client)
1338 .buffer_capacity(128 * 1024)
1339 .max_frame_size(128 * 1024)
1340 .max_message_size(128 * 1024)
1341 .build();
1342
1343 let payload = vec![0x42; 70_000];
1344 let frame = make_frame(true, 0x2, &payload);
1345 r.read(&frame).unwrap();
1346 match r.next().unwrap().unwrap() {
1347 Message::Binary(b) => assert_eq!(b.len(), 70_000),
1348 other => panic!("expected Binary, got {other:?}"),
1349 }
1350 }
1351
1352 #[test]
1355 fn buffer_full_diagnostics() {
1356 let mut r = FrameReader::builder()
1357 .role(Role::Client)
1358 .buffer_capacity(16)
1359 .build();
1360 match r.read(&[0; 32]) {
1361 Err(ReadError::BufferFull { needed, available }) => {
1362 assert_eq!(needed, 32);
1363 assert_eq!(available, 16);
1364 }
1365 other => panic!("expected BufferFull, got {other:?}"),
1366 }
1367 }
1368
1369 #[test]
1373 fn close_code_1005_rejected_on_wire() {
1374 let mut r = client_reader();
1375 r.read(&make_frame(true, 0x8, &1005u16.to_be_bytes()))
1376 .unwrap();
1377 assert!(matches!(
1378 r.next(),
1379 Err(ProtocolError::InvalidCloseCode(1005))
1380 ));
1381 }
1382
1383 #[test]
1385 fn invalid_utf8_across_fragments() {
1386 let mut r = client_reader();
1387 r.read(&make_frame(false, 0x1, b"valid")).unwrap();
1388 r.read(&make_frame(true, 0x0, &[0xFF])).unwrap();
1389 assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1390 }
1391
1392 #[test]
1394 fn invalid_utf8_in_continuation() {
1395 let mut r = client_reader();
1396 r.read(&make_frame(false, 0x1, &[0xCE, 0xBA])).unwrap(); r.read(&make_frame(false, 0x0, &[0xE1, 0xBD])).unwrap(); r.read(&make_frame(true, 0x0, &[0xFF])).unwrap(); assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1400 }
1401
1402 #[test]
1404 fn text_65535_bytes() {
1405 let mut r = FrameReader::builder()
1406 .role(Role::Client)
1407 .buffer_capacity(128 * 1024)
1408 .max_message_size(128 * 1024)
1409 .build();
1410 let payload = vec![b'x'; 65535];
1411 r.read(&make_frame(true, 0x1, &payload)).unwrap();
1412 match r.next().unwrap().unwrap() {
1413 Message::Text(s) => assert_eq!(s.len(), 65535),
1414 other => panic!("expected Text, got {other:?}"),
1415 }
1416 }
1417
1418 #[test]
1420 fn text_65536_bytes() {
1421 let mut r = FrameReader::builder()
1422 .role(Role::Client)
1423 .buffer_capacity(128 * 1024)
1424 .max_message_size(128 * 1024)
1425 .build();
1426 let payload = vec![b'x'; 65536];
1427 r.read(&make_frame(true, 0x1, &payload)).unwrap();
1428 match r.next().unwrap().unwrap() {
1429 Message::Text(s) => assert_eq!(s.len(), 65536),
1430 other => panic!("expected Text, got {other:?}"),
1431 }
1432 }
1433
1434 #[test]
1438 fn invalid_utf8_detected_on_first_fragment() {
1439 let mut r = client_reader();
1440 r.read(&make_frame(false, 0x1, &[0xFF, 0xFE])).unwrap();
1441 assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1442 }
1443
1444 #[test]
1446 fn invalid_utf8_detected_mid_assembly() {
1447 let mut r = client_reader();
1448 r.read(&make_frame(false, 0x1, b"valid")).unwrap();
1449 r.read(&make_frame(false, 0x0, &[0xFF])).unwrap();
1450 assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1452 }
1453
1454 #[test]
1456 fn split_codepoint_across_fragments() {
1457 let mut r = client_reader();
1458 r.read(&make_frame(false, 0x1, &[0xC3])).unwrap();
1460 r.read(&make_frame(true, 0x0, &[0xA9])).unwrap();
1461 match r.next().unwrap().unwrap() {
1462 Message::Text(s) => assert_eq!(s, "é"),
1463 other => panic!("expected Text, got {other:?}"),
1464 }
1465 }
1466
1467 #[test]
1469 fn split_4byte_codepoint() {
1470 let mut r = client_reader();
1471 r.read(&make_frame(false, 0x1, &[0xF0])).unwrap();
1473 r.read(&make_frame(true, 0x0, &[0x9F, 0x98, 0x80])).unwrap();
1474 match r.next().unwrap().unwrap() {
1475 Message::Text(s) => assert_eq!(s, "😀"),
1476 other => panic!("expected Text, got {other:?}"),
1477 }
1478 }
1479
1480 #[test]
1482 fn incomplete_codepoint_at_end() {
1483 let mut r = client_reader();
1484 r.read(&make_frame(true, 0x1, &[0xC3])).unwrap();
1486 assert!(matches!(r.next(), Err(ProtocolError::InvalidUtf8)));
1487 }
1488
1489 #[test]
1491 fn binary_fragments_skip_utf8() {
1492 let mut r = client_reader();
1493 r.read(&make_frame(false, 0x2, &[0xFF, 0xFE])).unwrap();
1494 r.read(&make_frame(true, 0x0, &[0xFD])).unwrap();
1495 match r.next().unwrap().unwrap() {
1496 Message::Binary(b) => assert_eq!(b, &[0xFF, 0xFE, 0xFD]),
1497 other => panic!("expected Binary, got {other:?}"),
1498 }
1499 }
1500
1501 #[test]
1503 fn three_fragments_valid_utf8() {
1504 let mut r = client_reader();
1505 r.read(&make_frame(false, 0x1, &[72, 0xC3])).unwrap();
1508 r.read(&make_frame(false, 0x0, &[0xA9, 108, 108])).unwrap();
1509 r.read(&make_frame(true, 0x0, &[111])).unwrap();
1510 match r.next().unwrap().unwrap() {
1511 Message::Text(s) => assert_eq!(s, "Héllo"),
1512 other => panic!("expected Text, got {other:?}"),
1513 }
1514 }
1515
1516 fn assert_text(result: Result<Option<Message<'_>>, ProtocolError>, expected: &str) {
1519 match result.unwrap().unwrap() {
1520 Message::Text(s) => assert_eq!(s, expected),
1521 other => panic!("expected Text({expected:?}), got {other:?}"),
1522 }
1523 }
1524
1525 fn assert_binary(result: Result<Option<Message<'_>>, ProtocolError>, expected: &[u8]) {
1526 match result.unwrap().unwrap() {
1527 Message::Binary(b) => assert_eq!(b, expected),
1528 other => panic!("expected Binary, got {other:?}"),
1529 }
1530 }
1531
1532 fn assert_ping(result: Result<Option<Message<'_>>, ProtocolError>, expected: &[u8]) {
1533 match result.unwrap().unwrap() {
1534 Message::Ping(b) => assert_eq!(b, expected),
1535 other => panic!("expected Ping, got {other:?}"),
1536 }
1537 }
1538
1539 fn assert_pong(result: Result<Option<Message<'_>>, ProtocolError>, expected: &[u8]) {
1540 match result.unwrap().unwrap() {
1541 Message::Pong(b) => assert_eq!(b, expected),
1542 other => panic!("expected Pong, got {other:?}"),
1543 }
1544 }
1545
1546 #[test]
1547 fn fifo_three_texts_one_read() {
1548 let mut r = client_reader();
1549 let mut data = make_frame(true, 0x1, b"first");
1550 data.extend(&make_frame(true, 0x1, b"second"));
1551 data.extend(&make_frame(true, 0x1, b"third"));
1552 r.read(&data).unwrap();
1553 assert_text(r.next(), "first");
1554 assert_text(r.next(), "second");
1555 assert_text(r.next(), "third");
1556 }
1557
1558 #[test]
1559 fn fifo_mixed_text_binary() {
1560 let mut r = client_reader();
1561 let mut data = make_frame(true, 0x1, b"text1");
1562 data.extend(&make_frame(true, 0x2, &[0x01]));
1563 data.extend(&make_frame(true, 0x1, b"text2"));
1564 data.extend(&make_frame(true, 0x2, &[0x02]));
1565 r.read(&data).unwrap();
1566 assert_text(r.next(), "text1");
1567 assert_binary(r.next(), &[0x01]);
1568 assert_text(r.next(), "text2");
1569 assert_binary(r.next(), &[0x02]);
1570 }
1571
1572 #[test]
1573 fn fifo_single_assembled_single() {
1574 let mut r = client_reader();
1575 let mut data = make_frame(true, 0x1, b"before");
1576 data.extend(&make_frame(false, 0x1, b"frag"));
1577 data.extend(&make_frame(true, 0x0, b"mented"));
1578 data.extend(&make_frame(true, 0x1, b"after"));
1579 r.read(&data).unwrap();
1580 assert_text(r.next(), "before");
1581 assert_text(r.next(), "fragmented");
1582 assert_text(r.next(), "after");
1583 }
1584
1585 #[test]
1586 fn fifo_assembled_then_single() {
1587 let mut r = client_reader();
1588 let mut data = make_frame(false, 0x2, &[0xAA]);
1589 data.extend(&make_frame(true, 0x0, &[0xBB]));
1590 data.extend(&make_frame(true, 0x1, b"after"));
1591 r.read(&data).unwrap();
1592 assert_binary(r.next(), &[0xAA, 0xBB]);
1593 assert_text(r.next(), "after");
1594 }
1595
1596 #[test]
1597 fn fifo_data_ping_data() {
1598 let mut r = client_reader();
1599 let mut data = make_frame(true, 0x1, b"msg1");
1600 data.extend(&make_frame(true, 0x9, b"ping"));
1601 data.extend(&make_frame(true, 0x1, b"msg2"));
1602 r.read(&data).unwrap();
1603 assert_text(r.next(), "msg1");
1604 assert_ping(r.next(), b"ping");
1605 assert_text(r.next(), "msg2");
1606 }
1607
1608 #[test]
1609 fn fifo_assembly_with_control_then_data() {
1610 let mut r = client_reader();
1611 let mut data = make_frame(false, 0x1, b"hel");
1612 data.extend(&make_frame(true, 0x9, b"ping"));
1613 data.extend(&make_frame(true, 0x0, b"lo"));
1614 data.extend(&make_frame(true, 0x1, b"next"));
1615 r.read(&data).unwrap();
1616 assert_ping(r.next(), b"ping");
1617 assert_text(r.next(), "hello");
1618 assert_text(r.next(), "next");
1619 }
1620
1621 #[test]
1622 fn fifo_assembly_with_multiple_controls() {
1623 let mut r = client_reader();
1624 let mut data = make_frame(false, 0x2, &[0x01]);
1625 data.extend(&make_frame(true, 0x9, b"p1"));
1626 data.extend(&make_frame(true, 0xA, b"p2"));
1627 data.extend(&make_frame(true, 0x0, &[0x02]));
1628 data.extend(&make_frame(true, 0x1, b"after"));
1629 r.read(&data).unwrap();
1630 assert_ping(r.next(), b"p1");
1631 assert_pong(r.next(), b"p2");
1632 assert_binary(r.next(), &[0x01, 0x02]);
1633 assert_text(r.next(), "after");
1634 }
1635
1636 #[test]
1637 fn fifo_across_reads() {
1638 let mut r = client_reader();
1639 let frame1 = make_frame(true, 0x1, b"first");
1640 let frame2 = make_frame(true, 0x1, b"second");
1641 r.read(&frame1).unwrap();
1642 assert_text(r.next(), "first");
1643 r.read(&frame2).unwrap();
1644 assert_text(r.next(), "second");
1645 }
1646
1647 #[test]
1648 fn fifo_partial_then_complete() {
1649 let mut r = client_reader();
1650 let frame1 = make_frame(true, 0x1, b"first");
1651 let frame2 = make_frame(true, 0x1, b"second");
1652 let mut all = frame1;
1653 all.extend(&frame2);
1654 r.read(&all[..3]).unwrap();
1655 assert!(r.next().unwrap().is_none());
1656 r.read(&all[3..]).unwrap();
1657 assert_text(r.next(), "first");
1658 assert_text(r.next(), "second");
1659 }
1660
1661 #[test]
1662 fn fifo_100_messages_one_read() {
1663 let mut r = FrameReader::builder()
1664 .role(Role::Client)
1665 .buffer_capacity(256 * 1024)
1666 .build();
1667
1668 let mut data = Vec::new();
1669 for i in 0u32..100 {
1670 let payload = i.to_be_bytes();
1671 data.extend(&make_frame(true, 0x2, &payload));
1672 }
1673 r.read(&data).unwrap();
1674
1675 for i in 0u32..100 {
1676 match r.next().unwrap().unwrap() {
1677 Message::Binary(b) => {
1678 let val = u32::from_be_bytes(b.try_into().unwrap());
1679 assert_eq!(val, i, "message {i} out of order");
1680 }
1681 other => panic!("expected Binary, got {other:?}"),
1682 }
1683 }
1684 assert!(r.next().unwrap().is_none());
1685 }
1686
1687 #[test]
1692 fn should_compact_default_half() {
1693 let mut r = FrameReader::builder()
1694 .buffer_capacity(1024)
1695 .role(Role::Client)
1696 .build();
1697 assert!(!r.should_compact());
1699
1700 let mut data = make_frame(true, 0x2, &[0xAA; 600]);
1704 data.extend_from_slice(&make_frame(true, 0x2, &[0xBB; 10]));
1705 r.read(&data).unwrap();
1706 assert!(r.next().unwrap().is_some());
1707 let _ = r.poll().unwrap();
1709 assert!(r.should_compact());
1711 }
1712
1713 #[test]
1714 fn should_compact_at_one_never_triggers() {
1715 let mut r = FrameReader::builder()
1716 .buffer_capacity(256)
1717 .compact_at(1.0)
1718 .role(Role::Client)
1719 .build();
1720 let frame = make_frame(true, 0x2, &[0xBB; 200]);
1722 r.read(&frame).unwrap();
1723 let _ = r.next().unwrap();
1724 assert!(!r.should_compact());
1726 }
1727
1728 #[test]
1729 fn should_compact_at_zero() {
1730 let mut r = FrameReader::builder()
1731 .buffer_capacity(256)
1732 .compact_at(0.0)
1733 .role(Role::Client)
1734 .build();
1735 assert!(!r.should_compact());
1737
1738 let mut data = make_frame(true, 0x2, &[0xCC; 10]);
1740 data.extend_from_slice(&make_frame(true, 0x2, &[0xDD; 5]));
1741 r.read(&data).unwrap();
1742 assert!(r.next().unwrap().is_some());
1743 let _ = r.poll().unwrap(); assert!(r.should_compact());
1746 }
1747
1748 #[test]
1749 fn should_compact_small_buffer_small_fraction() {
1750 let mut r = FrameReader::builder()
1752 .buffer_capacity(64)
1753 .compact_at(0.1)
1754 .role(Role::Client)
1755 .build();
1756 assert!(!r.should_compact());
1757
1758 let mut data = make_frame(true, 0x2, &[0xDD; 10]);
1760 data.extend_from_slice(&make_frame(true, 0x2, &[0xEE; 5]));
1761 r.read(&data).unwrap();
1762 assert!(r.next().unwrap().is_some());
1763 let _ = r.poll().unwrap(); assert!(r.should_compact());
1766 }
1767}