1use std::{
2 collections::VecDeque,
3 future::{poll_fn, Future},
4 io, mem,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use actix_codec::{Decoder, Encoder};
10use actix_http::{
11 ws::{Codec, Frame, Message, ProtocolError},
12 Payload,
13};
14use actix_web::{
15 web::{Bytes, BytesMut},
16 Error,
17};
18use bytestring::ByteString;
19use futures_core::stream::Stream;
20use tokio::sync::{mpsc::Receiver, oneshot};
21
22use crate::AggregatedMessageStream;
23
24const MAX_CONTROL_PAYLOAD_BYTES: usize = 125;
27const MAX_CLOSE_REASON_BYTES: usize = MAX_CONTROL_PAYLOAD_BYTES - 2;
28
29pub struct StreamingBody {
31 session_rx: Receiver<Message>,
32 connection_closed: Option<oneshot::Sender<()>>,
33 messages: VecDeque<Message>,
34 buf: BytesMut,
35 codec: Codec,
36 closing: bool,
37}
38
39impl StreamingBody {
40 pub(super) fn new(session_rx: Receiver<Message>) -> Self {
41 StreamingBody {
42 session_rx,
43 connection_closed: None,
44 messages: VecDeque::new(),
45 buf: BytesMut::new(),
46 codec: Codec::new(),
47 closing: false,
48 }
49 }
50
51 pub(super) fn with_connection_close_signal(
52 mut self,
53 connection_closed: oneshot::Sender<()>,
54 ) -> Self {
55 self.connection_closed = Some(connection_closed);
56 self
57 }
58}
59
60pub struct MessageStream {
62 payload: Payload,
63 connection_closed: Option<oneshot::Receiver<()>>,
64
65 messages: VecDeque<Message>,
66 buf: BytesMut,
67 codec: Codec,
68 closing: bool,
69}
70
71impl MessageStream {
72 pub(super) fn new(payload: Payload) -> Self {
73 MessageStream {
74 payload,
75 connection_closed: None,
76 messages: VecDeque::new(),
77 buf: BytesMut::new(),
78 codec: Codec::new(),
79 closing: false,
80 }
81 }
82
83 pub(super) fn with_connection_close_signal(
84 mut self,
85 connection_closed: oneshot::Receiver<()>,
86 ) -> Self {
87 self.connection_closed = Some(connection_closed);
88 self
89 }
90
91 #[must_use]
106 pub fn max_frame_size(mut self, max_size: usize) -> Self {
107 self.codec = self.codec.max_size(max_size);
108 self
109 }
110
111 #[must_use]
118 pub fn aggregate_continuations(self) -> AggregatedMessageStream {
119 AggregatedMessageStream::new(self)
120 }
121
122 #[must_use]
135 pub async fn recv(&mut self) -> Option<Result<Message, ProtocolError>> {
136 poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
137 }
138}
139
140impl Stream for StreamingBody {
141 type Item = Result<Bytes, Error>;
142
143 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
144 let this = self.get_mut();
145
146 if this.closing && this.buf.is_empty() {
148 return Poll::Ready(None);
149 }
150
151 if !this.closing {
152 loop {
153 match Pin::new(&mut this.session_rx).poll_recv(cx) {
154 Poll::Ready(Some(msg)) => {
155 this.messages.push_back(msg);
156 }
157 Poll::Ready(None) => {
158 this.closing = true;
159 break;
160 }
161 Poll::Pending => break,
162 }
163 }
164 }
165
166 while let Some(mut msg) = this.messages.pop_front() {
167 let is_close = matches!(msg, Message::Close(_));
168
169 match &mut msg {
173 Message::Close(Some(reason)) => {
174 if let Some(desc) = reason.description.as_mut() {
175 if desc.len() > MAX_CLOSE_REASON_BYTES {
176 let mut end = MAX_CLOSE_REASON_BYTES;
177 while end > 0 && !desc.is_char_boundary(end) {
178 end -= 1;
179 }
180 desc.truncate(end);
181 }
182 }
183 }
184 Message::Ping(bytes) | Message::Pong(bytes) => {
185 if bytes.len() > MAX_CONTROL_PAYLOAD_BYTES {
186 *bytes = bytes.slice(..MAX_CONTROL_PAYLOAD_BYTES);
187 }
188 }
189 _ => {}
190 }
191
192 if let Err(err) = this.codec.encode(msg, &mut this.buf) {
193 return Poll::Ready(Some(Err(err.into())));
194 }
195
196 if is_close {
197 this.closing = true;
200 this.session_rx.close();
201 this.messages.clear();
202 break;
203 }
204 }
205
206 if !this.buf.is_empty() {
207 return Poll::Ready(Some(Ok(mem::take(&mut this.buf).freeze())));
210 }
211
212 if this.closing {
213 return Poll::Ready(None);
214 }
215
216 Poll::Pending
217 }
218}
219
220impl Drop for StreamingBody {
221 fn drop(&mut self) {
222 if let Some(connection_closed) = self.connection_closed.take() {
223 let _ = connection_closed.send(());
224 }
225 }
226}
227
228impl Stream for MessageStream {
229 type Item = Result<Message, ProtocolError>;
230
231 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
232 let this = self.get_mut();
233
234 if let Some(msg) = this.messages.pop_front() {
238 return Poll::Ready(Some(Ok(msg)));
239 }
240
241 if !this.closing {
242 let connection_closed = this
243 .connection_closed
244 .as_mut()
245 .is_some_and(|connection_closed| Pin::new(connection_closed).poll(cx).is_ready());
246
247 if connection_closed {
248 this.closing = true;
249 this.connection_closed = None;
250 }
251
252 loop {
254 match Pin::new(&mut this.payload).poll_next(cx) {
255 Poll::Ready(Some(Ok(bytes))) => {
256 if this.buf.is_empty() {
257 this.buf = BytesMut::from(bytes);
259 } else {
260 this.buf.extend_from_slice(&bytes);
261 }
262 }
263 Poll::Ready(Some(Err(err))) => {
264 this.closing = true;
265 return Poll::Ready(Some(Err(ProtocolError::Io(io::Error::other(err)))));
266 }
267 Poll::Ready(None) => {
268 this.closing = true;
269 break;
270 }
271 Poll::Pending => break,
272 }
273 }
274 }
275
276 while let Some(frame) = this.codec.decode(&mut this.buf)? {
278 let message = match frame {
279 Frame::Text(bytes) => {
280 ByteString::try_from(bytes)
281 .map(Message::Text)
282 .map_err(|err| {
283 ProtocolError::Io(io::Error::new(io::ErrorKind::InvalidData, err))
284 })?
285 }
286 Frame::Binary(bytes) => Message::Binary(bytes),
287 Frame::Ping(bytes) => Message::Ping(bytes),
288 Frame::Pong(bytes) => Message::Pong(bytes),
289 Frame::Close(reason) => Message::Close(reason),
290 Frame::Continuation(item) => Message::Continuation(item),
291 };
292
293 this.messages.push_back(message);
294 }
295
296 if let Some(msg) = this.messages.pop_front() {
298 return Poll::Ready(Some(Ok(msg)));
299 }
300
301 if this.closing {
303 return Poll::Ready(None);
304 }
305
306 Poll::Pending
307 }
308}
309
310#[cfg(test)]
311pub(crate) mod tests {
312 use std::{
313 future::Future,
314 io,
315 pin::Pin,
316 task::{ready, Context, Poll},
317 };
318
319 use actix_http::{error::PayloadError, ws::ProtocolError};
320 use futures_core::Stream;
321 use tokio::sync::{
322 mpsc::{Receiver, Sender},
323 oneshot,
324 };
325
326 use super::{
327 Bytes, BytesMut, Codec, Decoder, Encoder, Message, MessageStream, Payload, StreamingBody,
328 };
329
330 pub(crate) struct PayloadReceiver {
331 rx: Receiver<Bytes>,
332 }
333 pub(crate) struct PayloadSender {
334 codec: Codec,
335 tx: Sender<Bytes>,
336 }
337 impl PayloadSender {
338 pub(crate) async fn send(&mut self, message: Message) {
339 self.send_many(vec![message]).await
340 }
341 pub(crate) async fn send_many(&mut self, messages: Vec<Message>) {
342 let mut buf = BytesMut::new();
343
344 for message in messages {
345 self.codec.encode(message, &mut buf).unwrap();
346 }
347
348 self.tx.send(buf.freeze()).await.unwrap()
349 }
350 }
351 impl Stream for PayloadReceiver {
352 type Item = Result<Bytes, PayloadError>;
353
354 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
355 let opt = ready!(self.get_mut().rx.poll_recv(cx));
356
357 Poll::Ready(opt.map(Ok))
358 }
359 }
360 pub(crate) fn payload_pair(capacity: usize) -> (PayloadSender, Payload) {
361 let (tx, rx) = tokio::sync::mpsc::channel(capacity);
362
363 (
364 PayloadSender {
365 codec: Codec::new().client_mode(),
366 tx,
367 },
368 Payload::Stream {
369 payload: Box::pin(PayloadReceiver { rx }),
370 },
371 )
372 }
373
374 #[tokio::test]
375 async fn message_stream_yields_messages() {
376 std::future::poll_fn(move |cx| {
377 let (mut tx, rx) = payload_pair(8);
378 let message_stream = MessageStream::new(rx);
379 let mut stream = std::pin::pin!(message_stream);
380
381 let messages = [
382 Message::Binary(Bytes::from(vec![0, 1, 2, 3])),
383 Message::Ping(Bytes::from(vec![3, 2, 1, 0])),
384 Message::Close(None),
385 ];
386
387 for msg in messages {
388 let poll = stream.as_mut().poll_next(cx);
389 assert!(
390 poll.is_pending(),
391 "Stream should be pending when no messages are present {poll:?}"
392 );
393
394 let fut = tx.send(msg);
395 let fut = std::pin::pin!(fut);
396
397 assert!(fut.poll(cx).is_ready(), "Sending should not yield");
398 assert!(
399 stream.as_mut().poll_next(cx).is_ready(),
400 "Stream should be ready"
401 );
402 }
403
404 assert!(
405 stream.as_mut().poll_next(cx).is_pending(),
406 "Stream should be pending after processing messages"
407 );
408
409 Poll::Ready(())
410 })
411 .await
412 }
413
414 #[tokio::test]
415 async fn message_stream_yields_consecutive_messages() {
416 std::future::poll_fn(move |cx| {
417 let (mut tx, rx) = payload_pair(8);
418 let message_stream = MessageStream::new(rx);
419 let mut stream = std::pin::pin!(message_stream);
420
421 let messages = vec![
422 Message::Binary(Bytes::from(vec![0, 1, 2, 3])),
423 Message::Ping(Bytes::from(vec![3, 2, 1, 0])),
424 Message::Close(None),
425 ];
426
427 let size = messages.len();
428
429 let fut = tx.send_many(messages);
430 let fut = std::pin::pin!(fut);
431 assert!(fut.poll(cx).is_ready(), "Sending should not yield");
432
433 for _ in 0..size {
434 assert!(
435 stream.as_mut().poll_next(cx).is_ready(),
436 "Stream should be ready"
437 );
438 }
439
440 assert!(
441 stream.as_mut().poll_next(cx).is_pending(),
442 "Stream should be pending after processing messages"
443 );
444
445 Poll::Ready(())
446 })
447 .await
448 }
449
450 #[tokio::test]
451 async fn message_stream_closes() {
452 std::future::poll_fn(move |cx| {
453 let (tx, rx) = payload_pair(8);
454 drop(tx);
455 let message_stream = MessageStream::new(rx);
456 let mut stream = std::pin::pin!(message_stream);
457
458 let poll = stream.as_mut().poll_next(cx);
459 assert!(
460 matches!(poll, Poll::Ready(None)),
461 "Stream should be ready when closing {poll:?}"
462 );
463
464 Poll::Ready(())
465 })
466 .await
467 }
468
469 #[tokio::test]
470 async fn message_stream_closes_after_payload_error() {
471 struct ErrorPayload {
472 yielded_error: bool,
473 }
474
475 impl Stream for ErrorPayload {
476 type Item = Result<Bytes, PayloadError>;
477
478 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
479 let this = self.get_mut();
480
481 if this.yielded_error {
482 Poll::Ready(None)
483 } else {
484 this.yielded_error = true;
485
486 Poll::Ready(Some(Err(PayloadError::Io(io::Error::new(
487 io::ErrorKind::UnexpectedEof,
488 "simulated abrupt disconnect",
489 )))))
490 }
491 }
492 }
493
494 std::future::poll_fn(move |cx| {
495 let payload = Payload::Stream {
496 payload: Box::pin(ErrorPayload {
497 yielded_error: false,
498 })
499 as Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>>,
500 };
501 let message_stream = MessageStream::new(payload);
502 let mut stream = std::pin::pin!(message_stream);
503
504 let poll = stream.as_mut().poll_next(cx);
505 assert!(
506 matches!(poll, Poll::Ready(Some(Err(ProtocolError::Io(_))))),
507 "stream should surface payload error: {poll:?}"
508 );
509
510 let poll = stream.as_mut().poll_next(cx);
511 assert!(
512 matches!(poll, Poll::Ready(None)),
513 "stream should terminate after payload error: {poll:?}"
514 );
515
516 Poll::Ready(())
517 })
518 .await
519 }
520
521 #[tokio::test]
522 async fn message_stream_closes_when_response_body_drops() {
523 struct PendingPayload;
524
525 impl Stream for PendingPayload {
526 type Item = Result<Bytes, PayloadError>;
527
528 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
529 Poll::Pending
530 }
531 }
532
533 std::future::poll_fn(move |cx| {
534 let (_tx, rx) = tokio::sync::mpsc::channel(1);
535 let (connection_closed_tx, connection_closed_rx) = oneshot::channel();
536
537 let response_body =
538 StreamingBody::new(rx).with_connection_close_signal(connection_closed_tx);
539
540 let payload = Payload::Stream {
541 payload: Box::pin(PendingPayload)
542 as Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>>,
543 };
544 let message_stream =
545 MessageStream::new(payload).with_connection_close_signal(connection_closed_rx);
546 let mut stream = std::pin::pin!(message_stream);
547
548 let poll = stream.as_mut().poll_next(cx);
549 assert!(
550 poll.is_pending(),
551 "stream should be pending before close signal: {poll:?}"
552 );
553
554 drop(response_body);
555
556 let poll = stream.as_mut().poll_next(cx);
557 assert!(
558 matches!(poll, Poll::Ready(None)),
559 "stream should terminate when response body drops: {poll:?}"
560 );
561
562 Poll::Ready(())
563 })
564 .await
565 }
566
567 #[tokio::test]
568 async fn stream_produces_bytes_from_messages() {
569 std::future::poll_fn(move |cx| {
570 let (tx, rx) = tokio::sync::mpsc::channel(1);
571
572 let stream = StreamingBody::new(rx);
573
574 let messages = [
575 Message::Binary(Bytes::from(vec![0, 1, 2, 3])),
576 Message::Ping(Bytes::from(vec![3, 2, 1, 0])),
577 Message::Close(None),
578 ];
579
580 let mut stream = std::pin::pin!(stream);
581
582 for msg in messages {
583 assert!(
584 stream.as_mut().poll_next(cx).is_pending(),
585 "Stream should be pending when no messages are present"
586 );
587
588 let fut = tx.send(msg);
589 let fut = std::pin::pin!(fut);
590
591 assert!(fut.poll(cx).is_ready(), "Sending should not yield");
592 assert!(
593 stream.as_mut().poll_next(cx).is_ready(),
594 "Stream should be ready"
595 );
596 }
597
598 assert!(
599 matches!(stream.as_mut().poll_next(cx), Poll::Ready(None)),
600 "stream should close after processing close message"
601 );
602
603 Poll::Ready(())
604 })
605 .await;
606 }
607
608 #[tokio::test]
609 async fn stream_processes_many_consecutive_messages() {
610 std::future::poll_fn(move |cx| {
611 let (tx, rx) = tokio::sync::mpsc::channel(3);
612
613 let stream = StreamingBody::new(rx);
614
615 let messages = [
616 Message::Binary(Bytes::from(vec![0, 1, 2, 3])),
617 Message::Ping(Bytes::from(vec![3, 2, 1, 0])),
618 Message::Close(None),
619 ];
620
621 let mut stream = std::pin::pin!(stream);
622
623 assert!(stream.as_mut().poll_next(cx).is_pending());
624
625 for msg in messages {
626 let fut = tx.send(msg);
627 let fut = std::pin::pin!(fut);
628 assert!(fut.poll(cx).is_ready(), "Sending should not yield");
629 }
630
631 assert!(
632 stream.as_mut().poll_next(cx).is_ready(),
633 "Stream should be ready"
634 );
635 assert!(
636 matches!(stream.as_mut().poll_next(cx), Poll::Ready(None)),
637 "stream should close after processing close message"
638 );
639
640 Poll::Ready(())
641 })
642 .await;
643 }
644
645 #[tokio::test]
646 async fn stream_closes_after_close_message_even_if_sender_alive() {
647 std::future::poll_fn(move |cx| {
648 let (tx, rx) = tokio::sync::mpsc::channel(1);
649
650 let stream = StreamingBody::new(rx);
651 let mut stream = std::pin::pin!(stream);
652
653 assert!(
654 stream.as_mut().poll_next(cx).is_pending(),
655 "stream should start pending"
656 );
657
658 {
660 let fut = tx.send(Message::Close(None));
661 let fut = std::pin::pin!(fut);
662 assert!(fut.poll(cx).is_ready(), "Sending should not yield");
663 }
664
665 assert!(
666 stream.as_mut().poll_next(cx).is_ready(),
667 "stream should yield close frame bytes"
668 );
669
670 let poll = stream.as_mut().poll_next(cx);
671 assert!(
672 matches!(poll, Poll::Ready(None)),
673 "stream should close after close frame even if sender is still alive"
674 );
675
676 Poll::Ready(())
677 })
678 .await;
679 }
680
681 #[tokio::test]
682 async fn stream_closes() {
683 std::future::poll_fn(move |cx| {
684 let (tx, rx) = tokio::sync::mpsc::channel(3);
685
686 drop(tx);
687 let stream = StreamingBody::new(rx);
688
689 let mut stream = std::pin::pin!(stream);
690
691 let poll = stream.as_mut().poll_next(cx);
692
693 assert!(
694 matches!(poll, Poll::Ready(None)),
695 "stream should close after dropped tx"
696 );
697
698 Poll::Ready(())
699 })
700 .await;
701 }
702
703 #[tokio::test]
704 async fn stream_truncates_oversized_close_reason() {
705 use actix_http::ws::{CloseCode, CloseReason, Frame};
706 use futures_util::StreamExt as _;
707
708 let (tx, rx) = tokio::sync::mpsc::channel(1);
709 let mut stream = StreamingBody::new(rx);
710
711 let reason = CloseReason {
712 code: CloseCode::Normal,
713 description: Some("a".repeat(200)),
714 };
715
716 tx.send(Message::Close(Some(reason))).await.unwrap();
717
718 let bytes = stream.next().await.unwrap().unwrap();
719
720 assert_eq!(bytes[0], 0x88, "FIN + Close opcode");
721 assert_eq!(bytes[1] & 0x80, 0, "server frames must not be masked");
722 assert_eq!(
723 (bytes[1] & 0x7F) as usize,
724 2 + super::MAX_CLOSE_REASON_BYTES,
725 "Close payload must be limited to 125 bytes (2-byte code + 123-byte reason)"
726 );
727
728 let mut buf = BytesMut::from(&bytes[..]);
729 let mut codec = Codec::new().client_mode();
730
731 let frame = codec.decode(&mut buf).unwrap().unwrap();
732 match frame {
733 Frame::Close(Some(reason)) => {
734 assert_eq!(reason.code, CloseCode::Normal);
735 let desc = reason.description.unwrap();
736 assert_eq!(desc.len(), super::MAX_CLOSE_REASON_BYTES);
737 assert_eq!(desc, "a".repeat(super::MAX_CLOSE_REASON_BYTES));
738 }
739 other => panic!("expected Close frame, got: {other:?}"),
740 }
741
742 assert!(buf.is_empty(), "should decode entire buffer");
743 }
744
745 #[tokio::test]
746 async fn stream_truncates_close_reason_at_utf8_boundary() {
747 use actix_http::ws::{CloseCode, CloseReason, Frame};
748 use futures_util::StreamExt as _;
749
750 let (tx, rx) = tokio::sync::mpsc::channel(1);
751 let mut stream = StreamingBody::new(rx);
752
753 let description = format!("{}{}b", "a".repeat(122), "\u{00E9}");
755 assert!(description.len() > super::MAX_CLOSE_REASON_BYTES);
756
757 let reason = CloseReason {
758 code: CloseCode::Normal,
759 description: Some(description),
760 };
761
762 tx.send(Message::Close(Some(reason))).await.unwrap();
763
764 let bytes = stream.next().await.unwrap().unwrap();
765
766 assert_eq!(bytes[0], 0x88, "FIN + Close opcode");
767 assert_eq!(bytes[1] & 0x80, 0, "server frames must not be masked");
768 assert!(
769 (bytes[1] & 0x7F) <= 125,
770 "control frames must have payload length <= 125"
771 );
772
773 let mut buf = BytesMut::from(&bytes[..]);
774 let mut codec = Codec::new().client_mode();
775
776 let frame = codec.decode(&mut buf).unwrap().unwrap();
777 match frame {
778 Frame::Close(Some(reason)) => {
779 assert_eq!(reason.code, CloseCode::Normal);
780 let desc = reason.description.unwrap();
781 assert_eq!(desc, "a".repeat(122));
782 }
783 other => panic!("expected Close frame, got: {other:?}"),
784 }
785
786 assert!(buf.is_empty(), "should decode entire buffer");
787 }
788
789 #[tokio::test]
790 async fn stream_truncates_oversized_ping_payload() {
791 use actix_http::ws::Frame;
792 use futures_util::StreamExt as _;
793
794 let (tx, rx) = tokio::sync::mpsc::channel(1);
795 let mut stream = StreamingBody::new(rx);
796
797 let payload = Bytes::from(vec![0xAA; 200]);
798 let expected = payload.slice(..super::MAX_CONTROL_PAYLOAD_BYTES);
799
800 tx.send(Message::Ping(payload)).await.unwrap();
801
802 let bytes = stream.next().await.unwrap().unwrap();
803
804 assert_eq!(bytes[0], 0x89, "FIN + Ping opcode");
805 assert_eq!(bytes[1] & 0x80, 0, "server frames must not be masked");
806 assert_eq!(
807 (bytes[1] & 0x7F) as usize,
808 super::MAX_CONTROL_PAYLOAD_BYTES,
809 "Ping payload must be <= 125 bytes"
810 );
811
812 let mut buf = BytesMut::from(&bytes[..]);
813 let mut codec = Codec::new().client_mode();
814
815 let frame = codec.decode(&mut buf).unwrap().unwrap();
816 match frame {
817 Frame::Ping(pl) => {
818 assert_eq!(pl, expected);
819 }
820 other => panic!("expected Ping frame, got: {other:?}"),
821 }
822
823 assert!(buf.is_empty(), "should decode entire buffer");
824 }
825
826 #[tokio::test]
827 async fn stream_truncates_oversized_pong_payload() {
828 use actix_http::ws::Frame;
829 use futures_util::StreamExt as _;
830
831 let (tx, rx) = tokio::sync::mpsc::channel(1);
832 let mut stream = StreamingBody::new(rx);
833
834 let payload = Bytes::from(vec![0xBB; 200]);
835 let expected = payload.slice(..super::MAX_CONTROL_PAYLOAD_BYTES);
836
837 tx.send(Message::Pong(payload)).await.unwrap();
838
839 let bytes = stream.next().await.unwrap().unwrap();
840
841 assert_eq!(bytes[0], 0x8A, "FIN + Pong opcode");
842 assert_eq!(bytes[1] & 0x80, 0, "server frames must not be masked");
843 assert_eq!(
844 (bytes[1] & 0x7F) as usize,
845 super::MAX_CONTROL_PAYLOAD_BYTES,
846 "Pong payload must be <= 125 bytes"
847 );
848
849 let mut buf = BytesMut::from(&bytes[..]);
850 let mut codec = Codec::new().client_mode();
851
852 let frame = codec.decode(&mut buf).unwrap().unwrap();
853 match frame {
854 Frame::Pong(pl) => {
855 assert_eq!(pl, expected);
856 }
857 other => panic!("expected Pong frame, got: {other:?}"),
858 }
859
860 assert!(buf.is_empty(), "should decode entire buffer");
861 }
862}