Skip to main content

actix_ws/
stream.rs

1use std::{
2    collections::VecDeque,
3    future::{poll_fn, Future},
4    io, mem,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use actix_codec::{Decoder, Encoder};
10use actix_http::{
11    ws::{Codec, Frame, Message, ProtocolError},
12    Payload,
13};
14use actix_web::{
15    web::{Bytes, BytesMut},
16    Error,
17};
18use bytestring::ByteString;
19use futures_core::stream::Stream;
20use tokio::sync::{mpsc::Receiver, oneshot};
21
22use crate::AggregatedMessageStream;
23
24// RFC 6455: Control frames MUST have payload length <= 125 bytes.
25// Close payload is: 2-byte close code + optional UTF-8 reason, therefore the reason is <= 123 bytes.
26const MAX_CONTROL_PAYLOAD_BYTES: usize = 125;
27const MAX_CLOSE_REASON_BYTES: usize = MAX_CONTROL_PAYLOAD_BYTES - 2;
28
29/// Response body for a WebSocket.
30pub struct StreamingBody {
31    session_rx: Receiver<Message>,
32    connection_closed: Option<oneshot::Sender<()>>,
33    messages: VecDeque<Message>,
34    buf: BytesMut,
35    codec: Codec,
36    closing: bool,
37}
38
39impl StreamingBody {
40    pub(super) fn new(session_rx: Receiver<Message>) -> Self {
41        StreamingBody {
42            session_rx,
43            connection_closed: None,
44            messages: VecDeque::new(),
45            buf: BytesMut::new(),
46            codec: Codec::new(),
47            closing: false,
48        }
49    }
50
51    pub(super) fn with_connection_close_signal(
52        mut self,
53        connection_closed: oneshot::Sender<()>,
54    ) -> Self {
55        self.connection_closed = Some(connection_closed);
56        self
57    }
58}
59
60/// Stream of messages from a WebSocket client.
61pub struct MessageStream {
62    payload: Payload,
63    connection_closed: Option<oneshot::Receiver<()>>,
64
65    messages: VecDeque<Message>,
66    buf: BytesMut,
67    codec: Codec,
68    closing: bool,
69}
70
71impl MessageStream {
72    pub(super) fn new(payload: Payload) -> Self {
73        MessageStream {
74            payload,
75            connection_closed: None,
76            messages: VecDeque::new(),
77            buf: BytesMut::new(),
78            codec: Codec::new(),
79            closing: false,
80        }
81    }
82
83    pub(super) fn with_connection_close_signal(
84        mut self,
85        connection_closed: oneshot::Receiver<()>,
86    ) -> Self {
87        self.connection_closed = Some(connection_closed);
88        self
89    }
90
91    /// Sets the maximum permitted size for received WebSocket frames, in bytes.
92    ///
93    /// By default, up to 64KiB is allowed.
94    ///
95    /// Any received frames larger than the permitted value will return
96    /// `Err(ProtocolError::Overflow)` instead.
97    ///
98    /// ```no_run
99    /// # use actix_ws::MessageStream;
100    /// # fn test(stream: MessageStream) {
101    /// // increase permitted frame size from 64KB to 1MB
102    /// let stream = stream.max_frame_size(1024 * 1024);
103    /// # }
104    /// ```
105    #[must_use]
106    pub fn max_frame_size(mut self, max_size: usize) -> Self {
107        self.codec = self.codec.max_size(max_size);
108        self
109    }
110
111    /// Returns a stream wrapper that collects continuation frames into their equivalent aggregated
112    /// forms, i.e., binary or text.
113    ///
114    /// By default, continuations will be aggregated up to 1MiB in size (customizable with
115    /// [`AggregatedMessageStream::max_continuation_size()`]). The stream implementation returns an
116    /// error if this size is exceeded.
117    #[must_use]
118    pub fn aggregate_continuations(self) -> AggregatedMessageStream {
119        AggregatedMessageStream::new(self)
120    }
121
122    /// Waits for the next item from the message stream
123    ///
124    /// This is a convenience for calling the [`Stream`](Stream::poll_next()) implementation.
125    ///
126    /// ```no_run
127    /// # use actix_ws::MessageStream;
128    /// # async fn test(mut stream: MessageStream) {
129    /// while let Some(Ok(msg)) = stream.recv().await {
130    ///     // handle message
131    /// }
132    /// # }
133    /// ```
134    #[must_use]
135    pub async fn recv(&mut self) -> Option<Result<Message, ProtocolError>> {
136        poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
137    }
138}
139
140impl Stream for StreamingBody {
141    type Item = Result<Bytes, Error>;
142
143    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
144        let this = self.get_mut();
145
146        // If close has been initiated and there is no pending buffered data, end the stream.
147        if this.closing && this.buf.is_empty() {
148            return Poll::Ready(None);
149        }
150
151        if !this.closing {
152            loop {
153                match Pin::new(&mut this.session_rx).poll_recv(cx) {
154                    Poll::Ready(Some(msg)) => {
155                        this.messages.push_back(msg);
156                    }
157                    Poll::Ready(None) => {
158                        this.closing = true;
159                        break;
160                    }
161                    Poll::Pending => break,
162                }
163            }
164        }
165
166        while let Some(mut msg) = this.messages.pop_front() {
167            let is_close = matches!(msg, Message::Close(_));
168
169            // Avoid emitting invalid control frames. Some clients (e.g., Chromium-based browsers)
170            // are strict about RFC 6455 requirements and will treat oversized control frames as
171            // malformed.
172            match &mut msg {
173                Message::Close(Some(reason)) => {
174                    if let Some(desc) = reason.description.as_mut() {
175                        if desc.len() > MAX_CLOSE_REASON_BYTES {
176                            let mut end = MAX_CLOSE_REASON_BYTES;
177                            while end > 0 && !desc.is_char_boundary(end) {
178                                end -= 1;
179                            }
180                            desc.truncate(end);
181                        }
182                    }
183                }
184                Message::Ping(bytes) | Message::Pong(bytes) => {
185                    if bytes.len() > MAX_CONTROL_PAYLOAD_BYTES {
186                        *bytes = bytes.slice(..MAX_CONTROL_PAYLOAD_BYTES);
187                    }
188                }
189                _ => {}
190            }
191
192            if let Err(err) = this.codec.encode(msg, &mut this.buf) {
193                return Poll::Ready(Some(Err(err.into())));
194            }
195
196            if is_close {
197                // A WebSocket Close frame is terminal. End the response body after flushing this
198                // frame, even if there are still `Session` clones holding the sender.
199                this.closing = true;
200                this.session_rx.close();
201                this.messages.clear();
202                break;
203            }
204        }
205
206        if !this.buf.is_empty() {
207            // Avoid retaining an ever-growing buffer after large payloads:
208            // https://github.com/actix/actix-extras/commit/81954844158c27de3aa034d1b727d1c13753f325
209            return Poll::Ready(Some(Ok(mem::take(&mut this.buf).freeze())));
210        }
211
212        if this.closing {
213            return Poll::Ready(None);
214        }
215
216        Poll::Pending
217    }
218}
219
220impl Drop for StreamingBody {
221    fn drop(&mut self) {
222        if let Some(connection_closed) = self.connection_closed.take() {
223            let _ = connection_closed.send(());
224        }
225    }
226}
227
228impl Stream for MessageStream {
229    type Item = Result<Message, ProtocolError>;
230
231    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
232        let this = self.get_mut();
233
234        // Return the first message in the queue if one exists
235        //
236        // This is faster than polling and parsing
237        if let Some(msg) = this.messages.pop_front() {
238            return Poll::Ready(Some(Ok(msg)));
239        }
240
241        if !this.closing {
242            let connection_closed = this
243                .connection_closed
244                .as_mut()
245                .is_some_and(|connection_closed| Pin::new(connection_closed).poll(cx).is_ready());
246
247            if connection_closed {
248                this.closing = true;
249                this.connection_closed = None;
250            }
251
252            // Read in bytes until there's nothing left to read
253            loop {
254                match Pin::new(&mut this.payload).poll_next(cx) {
255                    Poll::Ready(Some(Ok(bytes))) => {
256                        if this.buf.is_empty() {
257                            // Avoid a copy when there is no buffered data.
258                            this.buf = BytesMut::from(bytes);
259                        } else {
260                            this.buf.extend_from_slice(&bytes);
261                        }
262                    }
263                    Poll::Ready(Some(Err(err))) => {
264                        this.closing = true;
265                        return Poll::Ready(Some(Err(ProtocolError::Io(io::Error::other(err)))));
266                    }
267                    Poll::Ready(None) => {
268                        this.closing = true;
269                        break;
270                    }
271                    Poll::Pending => break,
272                }
273            }
274        }
275
276        // Create messages until there's no more bytes left
277        while let Some(frame) = this.codec.decode(&mut this.buf)? {
278            let message = match frame {
279                Frame::Text(bytes) => {
280                    ByteString::try_from(bytes)
281                        .map(Message::Text)
282                        .map_err(|err| {
283                            ProtocolError::Io(io::Error::new(io::ErrorKind::InvalidData, err))
284                        })?
285                }
286                Frame::Binary(bytes) => Message::Binary(bytes),
287                Frame::Ping(bytes) => Message::Ping(bytes),
288                Frame::Pong(bytes) => Message::Pong(bytes),
289                Frame::Close(reason) => Message::Close(reason),
290                Frame::Continuation(item) => Message::Continuation(item),
291            };
292
293            this.messages.push_back(message);
294        }
295
296        // Return the first message in the queue
297        if let Some(msg) = this.messages.pop_front() {
298            return Poll::Ready(Some(Ok(msg)));
299        }
300
301        // If we've exhausted our message queue and we're closing, close the stream
302        if this.closing {
303            return Poll::Ready(None);
304        }
305
306        Poll::Pending
307    }
308}
309
310#[cfg(test)]
311pub(crate) mod tests {
312    use std::{
313        future::Future,
314        io,
315        pin::Pin,
316        task::{ready, Context, Poll},
317    };
318
319    use actix_http::{error::PayloadError, ws::ProtocolError};
320    use futures_core::Stream;
321    use tokio::sync::{
322        mpsc::{Receiver, Sender},
323        oneshot,
324    };
325
326    use super::{
327        Bytes, BytesMut, Codec, Decoder, Encoder, Message, MessageStream, Payload, StreamingBody,
328    };
329
330    pub(crate) struct PayloadReceiver {
331        rx: Receiver<Bytes>,
332    }
333    pub(crate) struct PayloadSender {
334        codec: Codec,
335        tx: Sender<Bytes>,
336    }
337    impl PayloadSender {
338        pub(crate) async fn send(&mut self, message: Message) {
339            self.send_many(vec![message]).await
340        }
341        pub(crate) async fn send_many(&mut self, messages: Vec<Message>) {
342            let mut buf = BytesMut::new();
343
344            for message in messages {
345                self.codec.encode(message, &mut buf).unwrap();
346            }
347
348            self.tx.send(buf.freeze()).await.unwrap()
349        }
350    }
351    impl Stream for PayloadReceiver {
352        type Item = Result<Bytes, PayloadError>;
353
354        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
355            let opt = ready!(self.get_mut().rx.poll_recv(cx));
356
357            Poll::Ready(opt.map(Ok))
358        }
359    }
360    pub(crate) fn payload_pair(capacity: usize) -> (PayloadSender, Payload) {
361        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
362
363        (
364            PayloadSender {
365                codec: Codec::new().client_mode(),
366                tx,
367            },
368            Payload::Stream {
369                payload: Box::pin(PayloadReceiver { rx }),
370            },
371        )
372    }
373
374    #[tokio::test]
375    async fn message_stream_yields_messages() {
376        std::future::poll_fn(move |cx| {
377            let (mut tx, rx) = payload_pair(8);
378            let message_stream = MessageStream::new(rx);
379            let mut stream = std::pin::pin!(message_stream);
380
381            let messages = [
382                Message::Binary(Bytes::from(vec![0, 1, 2, 3])),
383                Message::Ping(Bytes::from(vec![3, 2, 1, 0])),
384                Message::Close(None),
385            ];
386
387            for msg in messages {
388                let poll = stream.as_mut().poll_next(cx);
389                assert!(
390                    poll.is_pending(),
391                    "Stream should be pending when no messages are present {poll:?}"
392                );
393
394                let fut = tx.send(msg);
395                let fut = std::pin::pin!(fut);
396
397                assert!(fut.poll(cx).is_ready(), "Sending should not yield");
398                assert!(
399                    stream.as_mut().poll_next(cx).is_ready(),
400                    "Stream should be ready"
401                );
402            }
403
404            assert!(
405                stream.as_mut().poll_next(cx).is_pending(),
406                "Stream should be pending after processing messages"
407            );
408
409            Poll::Ready(())
410        })
411        .await
412    }
413
414    #[tokio::test]
415    async fn message_stream_yields_consecutive_messages() {
416        std::future::poll_fn(move |cx| {
417            let (mut tx, rx) = payload_pair(8);
418            let message_stream = MessageStream::new(rx);
419            let mut stream = std::pin::pin!(message_stream);
420
421            let messages = vec![
422                Message::Binary(Bytes::from(vec![0, 1, 2, 3])),
423                Message::Ping(Bytes::from(vec![3, 2, 1, 0])),
424                Message::Close(None),
425            ];
426
427            let size = messages.len();
428
429            let fut = tx.send_many(messages);
430            let fut = std::pin::pin!(fut);
431            assert!(fut.poll(cx).is_ready(), "Sending should not yield");
432
433            for _ in 0..size {
434                assert!(
435                    stream.as_mut().poll_next(cx).is_ready(),
436                    "Stream should be ready"
437                );
438            }
439
440            assert!(
441                stream.as_mut().poll_next(cx).is_pending(),
442                "Stream should be pending after processing messages"
443            );
444
445            Poll::Ready(())
446        })
447        .await
448    }
449
450    #[tokio::test]
451    async fn message_stream_closes() {
452        std::future::poll_fn(move |cx| {
453            let (tx, rx) = payload_pair(8);
454            drop(tx);
455            let message_stream = MessageStream::new(rx);
456            let mut stream = std::pin::pin!(message_stream);
457
458            let poll = stream.as_mut().poll_next(cx);
459            assert!(
460                matches!(poll, Poll::Ready(None)),
461                "Stream should be ready when closing {poll:?}"
462            );
463
464            Poll::Ready(())
465        })
466        .await
467    }
468
469    #[tokio::test]
470    async fn message_stream_closes_after_payload_error() {
471        struct ErrorPayload {
472            yielded_error: bool,
473        }
474
475        impl Stream for ErrorPayload {
476            type Item = Result<Bytes, PayloadError>;
477
478            fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
479                let this = self.get_mut();
480
481                if this.yielded_error {
482                    Poll::Ready(None)
483                } else {
484                    this.yielded_error = true;
485
486                    Poll::Ready(Some(Err(PayloadError::Io(io::Error::new(
487                        io::ErrorKind::UnexpectedEof,
488                        "simulated abrupt disconnect",
489                    )))))
490                }
491            }
492        }
493
494        std::future::poll_fn(move |cx| {
495            let payload = Payload::Stream {
496                payload: Box::pin(ErrorPayload {
497                    yielded_error: false,
498                })
499                    as Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>>,
500            };
501            let message_stream = MessageStream::new(payload);
502            let mut stream = std::pin::pin!(message_stream);
503
504            let poll = stream.as_mut().poll_next(cx);
505            assert!(
506                matches!(poll, Poll::Ready(Some(Err(ProtocolError::Io(_))))),
507                "stream should surface payload error: {poll:?}"
508            );
509
510            let poll = stream.as_mut().poll_next(cx);
511            assert!(
512                matches!(poll, Poll::Ready(None)),
513                "stream should terminate after payload error: {poll:?}"
514            );
515
516            Poll::Ready(())
517        })
518        .await
519    }
520
521    #[tokio::test]
522    async fn message_stream_closes_when_response_body_drops() {
523        struct PendingPayload;
524
525        impl Stream for PendingPayload {
526            type Item = Result<Bytes, PayloadError>;
527
528            fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
529                Poll::Pending
530            }
531        }
532
533        std::future::poll_fn(move |cx| {
534            let (_tx, rx) = tokio::sync::mpsc::channel(1);
535            let (connection_closed_tx, connection_closed_rx) = oneshot::channel();
536
537            let response_body =
538                StreamingBody::new(rx).with_connection_close_signal(connection_closed_tx);
539
540            let payload = Payload::Stream {
541                payload: Box::pin(PendingPayload)
542                    as Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>>,
543            };
544            let message_stream =
545                MessageStream::new(payload).with_connection_close_signal(connection_closed_rx);
546            let mut stream = std::pin::pin!(message_stream);
547
548            let poll = stream.as_mut().poll_next(cx);
549            assert!(
550                poll.is_pending(),
551                "stream should be pending before close signal: {poll:?}"
552            );
553
554            drop(response_body);
555
556            let poll = stream.as_mut().poll_next(cx);
557            assert!(
558                matches!(poll, Poll::Ready(None)),
559                "stream should terminate when response body drops: {poll:?}"
560            );
561
562            Poll::Ready(())
563        })
564        .await
565    }
566
567    #[tokio::test]
568    async fn stream_produces_bytes_from_messages() {
569        std::future::poll_fn(move |cx| {
570            let (tx, rx) = tokio::sync::mpsc::channel(1);
571
572            let stream = StreamingBody::new(rx);
573
574            let messages = [
575                Message::Binary(Bytes::from(vec![0, 1, 2, 3])),
576                Message::Ping(Bytes::from(vec![3, 2, 1, 0])),
577                Message::Close(None),
578            ];
579
580            let mut stream = std::pin::pin!(stream);
581
582            for msg in messages {
583                assert!(
584                    stream.as_mut().poll_next(cx).is_pending(),
585                    "Stream should be pending when no messages are present"
586                );
587
588                let fut = tx.send(msg);
589                let fut = std::pin::pin!(fut);
590
591                assert!(fut.poll(cx).is_ready(), "Sending should not yield");
592                assert!(
593                    stream.as_mut().poll_next(cx).is_ready(),
594                    "Stream should be ready"
595                );
596            }
597
598            assert!(
599                matches!(stream.as_mut().poll_next(cx), Poll::Ready(None)),
600                "stream should close after processing close message"
601            );
602
603            Poll::Ready(())
604        })
605        .await;
606    }
607
608    #[tokio::test]
609    async fn stream_processes_many_consecutive_messages() {
610        std::future::poll_fn(move |cx| {
611            let (tx, rx) = tokio::sync::mpsc::channel(3);
612
613            let stream = StreamingBody::new(rx);
614
615            let messages = [
616                Message::Binary(Bytes::from(vec![0, 1, 2, 3])),
617                Message::Ping(Bytes::from(vec![3, 2, 1, 0])),
618                Message::Close(None),
619            ];
620
621            let mut stream = std::pin::pin!(stream);
622
623            assert!(stream.as_mut().poll_next(cx).is_pending());
624
625            for msg in messages {
626                let fut = tx.send(msg);
627                let fut = std::pin::pin!(fut);
628                assert!(fut.poll(cx).is_ready(), "Sending should not yield");
629            }
630
631            assert!(
632                stream.as_mut().poll_next(cx).is_ready(),
633                "Stream should be ready"
634            );
635            assert!(
636                matches!(stream.as_mut().poll_next(cx), Poll::Ready(None)),
637                "stream should close after processing close message"
638            );
639
640            Poll::Ready(())
641        })
642        .await;
643    }
644
645    #[tokio::test]
646    async fn stream_closes_after_close_message_even_if_sender_alive() {
647        std::future::poll_fn(move |cx| {
648            let (tx, rx) = tokio::sync::mpsc::channel(1);
649
650            let stream = StreamingBody::new(rx);
651            let mut stream = std::pin::pin!(stream);
652
653            assert!(
654                stream.as_mut().poll_next(cx).is_pending(),
655                "stream should start pending"
656            );
657
658            // Send a Close frame but keep the sender alive (e.g. a `Session` clone held elsewhere).
659            {
660                let fut = tx.send(Message::Close(None));
661                let fut = std::pin::pin!(fut);
662                assert!(fut.poll(cx).is_ready(), "Sending should not yield");
663            }
664
665            assert!(
666                stream.as_mut().poll_next(cx).is_ready(),
667                "stream should yield close frame bytes"
668            );
669
670            let poll = stream.as_mut().poll_next(cx);
671            assert!(
672                matches!(poll, Poll::Ready(None)),
673                "stream should close after close frame even if sender is still alive"
674            );
675
676            Poll::Ready(())
677        })
678        .await;
679    }
680
681    #[tokio::test]
682    async fn stream_closes() {
683        std::future::poll_fn(move |cx| {
684            let (tx, rx) = tokio::sync::mpsc::channel(3);
685
686            drop(tx);
687            let stream = StreamingBody::new(rx);
688
689            let mut stream = std::pin::pin!(stream);
690
691            let poll = stream.as_mut().poll_next(cx);
692
693            assert!(
694                matches!(poll, Poll::Ready(None)),
695                "stream should close after dropped tx"
696            );
697
698            Poll::Ready(())
699        })
700        .await;
701    }
702
703    #[tokio::test]
704    async fn stream_truncates_oversized_close_reason() {
705        use actix_http::ws::{CloseCode, CloseReason, Frame};
706        use futures_util::StreamExt as _;
707
708        let (tx, rx) = tokio::sync::mpsc::channel(1);
709        let mut stream = StreamingBody::new(rx);
710
711        let reason = CloseReason {
712            code: CloseCode::Normal,
713            description: Some("a".repeat(200)),
714        };
715
716        tx.send(Message::Close(Some(reason))).await.unwrap();
717
718        let bytes = stream.next().await.unwrap().unwrap();
719
720        assert_eq!(bytes[0], 0x88, "FIN + Close opcode");
721        assert_eq!(bytes[1] & 0x80, 0, "server frames must not be masked");
722        assert_eq!(
723            (bytes[1] & 0x7F) as usize,
724            2 + super::MAX_CLOSE_REASON_BYTES,
725            "Close payload must be limited to 125 bytes (2-byte code + 123-byte reason)"
726        );
727
728        let mut buf = BytesMut::from(&bytes[..]);
729        let mut codec = Codec::new().client_mode();
730
731        let frame = codec.decode(&mut buf).unwrap().unwrap();
732        match frame {
733            Frame::Close(Some(reason)) => {
734                assert_eq!(reason.code, CloseCode::Normal);
735                let desc = reason.description.unwrap();
736                assert_eq!(desc.len(), super::MAX_CLOSE_REASON_BYTES);
737                assert_eq!(desc, "a".repeat(super::MAX_CLOSE_REASON_BYTES));
738            }
739            other => panic!("expected Close frame, got: {other:?}"),
740        }
741
742        assert!(buf.is_empty(), "should decode entire buffer");
743    }
744
745    #[tokio::test]
746    async fn stream_truncates_close_reason_at_utf8_boundary() {
747        use actix_http::ws::{CloseCode, CloseReason, Frame};
748        use futures_util::StreamExt as _;
749
750        let (tx, rx) = tokio::sync::mpsc::channel(1);
751        let mut stream = StreamingBody::new(rx);
752
753        // Create a reason where truncating at 123 bytes would split a multi-byte UTF-8 character.
754        let description = format!("{}{}b", "a".repeat(122), "\u{00E9}");
755        assert!(description.len() > super::MAX_CLOSE_REASON_BYTES);
756
757        let reason = CloseReason {
758            code: CloseCode::Normal,
759            description: Some(description),
760        };
761
762        tx.send(Message::Close(Some(reason))).await.unwrap();
763
764        let bytes = stream.next().await.unwrap().unwrap();
765
766        assert_eq!(bytes[0], 0x88, "FIN + Close opcode");
767        assert_eq!(bytes[1] & 0x80, 0, "server frames must not be masked");
768        assert!(
769            (bytes[1] & 0x7F) <= 125,
770            "control frames must have payload length <= 125"
771        );
772
773        let mut buf = BytesMut::from(&bytes[..]);
774        let mut codec = Codec::new().client_mode();
775
776        let frame = codec.decode(&mut buf).unwrap().unwrap();
777        match frame {
778            Frame::Close(Some(reason)) => {
779                assert_eq!(reason.code, CloseCode::Normal);
780                let desc = reason.description.unwrap();
781                assert_eq!(desc, "a".repeat(122));
782            }
783            other => panic!("expected Close frame, got: {other:?}"),
784        }
785
786        assert!(buf.is_empty(), "should decode entire buffer");
787    }
788
789    #[tokio::test]
790    async fn stream_truncates_oversized_ping_payload() {
791        use actix_http::ws::Frame;
792        use futures_util::StreamExt as _;
793
794        let (tx, rx) = tokio::sync::mpsc::channel(1);
795        let mut stream = StreamingBody::new(rx);
796
797        let payload = Bytes::from(vec![0xAA; 200]);
798        let expected = payload.slice(..super::MAX_CONTROL_PAYLOAD_BYTES);
799
800        tx.send(Message::Ping(payload)).await.unwrap();
801
802        let bytes = stream.next().await.unwrap().unwrap();
803
804        assert_eq!(bytes[0], 0x89, "FIN + Ping opcode");
805        assert_eq!(bytes[1] & 0x80, 0, "server frames must not be masked");
806        assert_eq!(
807            (bytes[1] & 0x7F) as usize,
808            super::MAX_CONTROL_PAYLOAD_BYTES,
809            "Ping payload must be <= 125 bytes"
810        );
811
812        let mut buf = BytesMut::from(&bytes[..]);
813        let mut codec = Codec::new().client_mode();
814
815        let frame = codec.decode(&mut buf).unwrap().unwrap();
816        match frame {
817            Frame::Ping(pl) => {
818                assert_eq!(pl, expected);
819            }
820            other => panic!("expected Ping frame, got: {other:?}"),
821        }
822
823        assert!(buf.is_empty(), "should decode entire buffer");
824    }
825
826    #[tokio::test]
827    async fn stream_truncates_oversized_pong_payload() {
828        use actix_http::ws::Frame;
829        use futures_util::StreamExt as _;
830
831        let (tx, rx) = tokio::sync::mpsc::channel(1);
832        let mut stream = StreamingBody::new(rx);
833
834        let payload = Bytes::from(vec![0xBB; 200]);
835        let expected = payload.slice(..super::MAX_CONTROL_PAYLOAD_BYTES);
836
837        tx.send(Message::Pong(payload)).await.unwrap();
838
839        let bytes = stream.next().await.unwrap().unwrap();
840
841        assert_eq!(bytes[0], 0x8A, "FIN + Pong opcode");
842        assert_eq!(bytes[1] & 0x80, 0, "server frames must not be masked");
843        assert_eq!(
844            (bytes[1] & 0x7F) as usize,
845            super::MAX_CONTROL_PAYLOAD_BYTES,
846            "Pong payload must be <= 125 bytes"
847        );
848
849        let mut buf = BytesMut::from(&bytes[..]);
850        let mut codec = Codec::new().client_mode();
851
852        let frame = codec.decode(&mut buf).unwrap().unwrap();
853        match frame {
854            Frame::Pong(pl) => {
855                assert_eq!(pl, expected);
856            }
857            other => panic!("expected Pong frame, got: {other:?}"),
858        }
859
860        assert!(buf.is_empty(), "should decode entire buffer");
861    }
862}