websocket_util/
wrap.rs

1// Copyright (C) 2019-2024 Daniel Mueller <deso@posteo.net>
2// SPDX-License-Identifier: GPL-3.0-or-later
3
4use std::fmt::Debug;
5use std::fmt::Formatter;
6use std::fmt::Result as FmtResult;
7use std::io;
8use std::marker::PhantomData;
9use std::ops::Deref as _;
10use std::pin::Pin;
11use std::str::from_utf8 as str_from_utf8;
12use std::task::Poll;
13use std::time::Duration;
14
15use futures::task::Context;
16use futures::Sink;
17use futures::SinkExt as _;
18use futures::Stream;
19use futures::StreamExt as _;
20
21use tokio::time::interval;
22use tokio::time::Interval;
23use tokio::time::MissedTickBehavior;
24use tokio_tungstenite::tungstenite::Bytes;
25use tokio_tungstenite::tungstenite::Error as WebSocketError;
26use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
27use tokio_tungstenite::tungstenite::Utf8Bytes;
28
29use tracing::debug;
30use tracing::error;
31use tracing::field::debug;
32use tracing::field::DebugValue;
33use tracing::trace;
34
35
36/// An enum encapsulating the state machine to handle pings to the
37/// server.
38#[derive(Clone, Copy, Debug)]
39enum Ping {
40  /// No ping is needed because we know the connection is still alive.
41  NotNeeded,
42  /// We haven't heard back from the server in a while and will issue a
43  /// ping next.
44  Needed,
45  /// A ping has been issued and is pending. If we subsequently get
46  /// woken up as part of our interval that means no pong was received
47  /// and the connection to the server is broken.
48  Pending,
49}
50
51
52/// A message received over a [`Wrapper`].
53#[derive(Debug, PartialEq)]
54pub enum Message {
55  /// A text WebSocket message.
56  Text(String),
57  /// A binary WebSocket message.
58  Binary(Vec<u8>),
59}
60
61impl From<Message> for WebSocketMessage {
62  fn from(message: Message) -> Self {
63    match message {
64      Message::Text(data) => WebSocketMessage::Text(Utf8Bytes::from(data)),
65      Message::Binary(data) => WebSocketMessage::Binary(Bytes::from(data)),
66    }
67  }
68}
69
70
71/// The state we maintain to track the sending of control messages.
72#[derive(Debug)]
73enum SendMessageState<M> {
74  /// The message slot is not in use currently.
75  Unused,
76  /// A message is pending to be sent.
77  ///
78  /// Note that the `Option` part is an implementation detail allowing
79  /// us to `take` ownership of the message from a `&mut self` context
80  /// without requiring that `M: Default` or making similar assumptions.
81  Pending(Option<M>),
82  /// A message has been sent but not yet flushed.
83  Flush,
84}
85
86impl<M> SendMessageState<M> {
87  /// Attempt to advance the message state.
88  fn advance<S>(&mut self, sink: &mut S, ctx: &mut Context<'_>) -> Result<(), S::Error>
89  where
90    S: Sink<M> + Unpin,
91    M: Debug,
92  {
93    loop {
94      match self {
95        Self::Unused => break Ok(()),
96        Self::Pending(message) => {
97          match sink.poll_ready_unpin(ctx) {
98            Poll::Pending => return Ok(()),
99            Poll::Ready(Ok(())) => (),
100            Poll::Ready(Err(err)) => {
101              *self = Self::Unused;
102              return Err(err)
103            },
104          }
105
106          let message = message.take();
107          *self = Self::Unused;
108          debug!(
109            channel = debug(sink as *const _),
110            send_msg = debug(&message)
111          );
112
113          if let Some(message) = message {
114            sink.start_send_unpin(message)?;
115            *self = Self::Flush;
116          }
117        },
118        Self::Flush => {
119          trace!(channel = debug(sink as *const _), msg = "flushing");
120          match sink.poll_flush_unpin(ctx) {
121            Poll::Pending => break Ok(()),
122            Poll::Ready(Ok(())) => {
123              *self = Self::Unused;
124            },
125            Poll::Ready(Err(err)) => {
126              *self = Self::Unused;
127              break Err(err)
128            },
129          }
130        },
131      }
132    }
133  }
134
135  /// Set a message to be sent
136  fn set(&mut self, message: M) {
137    *self = Self::Pending(Some(message))
138  }
139}
140
141/// A helper function for changing some message state that logs
142/// issues caused by a message "overrun", i.e., a message not having
143/// been sent or flushed yet but an overwrite being requested already.
144/// In all likelihood, such errors are caused by a misconfiguration on
145/// the user side, i.e., when an extremely small ping interval is set.
146fn set_message<S, M>(channel: &S, message_state: &mut SendMessageState<M>, message: M)
147where
148  M: Debug,
149{
150  match message_state {
151    SendMessageState::Unused => (),
152    SendMessageState::Pending(old_message) => {
153      debug!(
154        channel = debug(channel as *const _),
155        send_msg_old = debug(&old_message),
156        send_msg_new = debug(&message),
157        msg = "message overrun; last message has not been sent"
158      );
159    },
160    SendMessageState::Flush => {
161      debug!(
162        channel = debug(channel as *const _),
163        msg = "message overrun; last message has not been flushed"
164      );
165    },
166  }
167
168  message_state.set(message);
169}
170
171
172/// A type for displaying debug information for a WebSocket message.
173struct DebugMessage<'m> {
174  message: &'m WebSocketMessage,
175}
176
177impl Debug for DebugMessage<'_> {
178  fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
179    match self.message {
180      // We could consider also attempting to decode the data passed to
181      // Pings/Pongs and Close messages.
182      WebSocketMessage::Binary(data) => {
183        if let Ok(s) = str_from_utf8(data) {
184          f.debug_tuple("Binary").field(&s).finish()
185        } else {
186          f.debug_tuple("Binary").field(&data.deref()).finish()
187        }
188      },
189      WebSocketMessage::Ping(data) => f.debug_tuple("Ping").field(&data.deref()).finish(),
190      WebSocketMessage::Pong(data) => f.debug_tuple("Pong").field(&data.deref()).finish(),
191      _ => Debug::fmt(self.message, f),
192    }
193  }
194}
195
196/// Emit a debug representation of a WebSocket message that takes care
197/// of converting binary messages to string.
198fn debug_message(message: &WebSocketMessage) -> DebugValue<DebugMessage<'_>> {
199  debug(DebugMessage { message })
200}
201
202
203/// An internally used type encapsulating the logic of sending pings at
204/// regular intervals (if needed).
205#[derive(Debug)]
206struct Pinger {
207  /// The state we maintain for sending pings.
208  ping: SendMessageState<WebSocketMessage>,
209  /// An object keeping track of when we should be sending the next
210  /// ping to the server.
211  next_ping: Interval,
212  /// State helping us keep track of pings that we want to send to the
213  /// server.
214  ping_state: Ping,
215}
216
217impl Pinger {
218  /// Create a new `Pinger` object for sending pings spaced by
219  /// `ping_interval`.
220  fn new(ping_interval: Duration) -> Self {
221    let mut next_ping = interval(ping_interval);
222    // If we were not polled in time to send pings we want to just delay
223    // sending instead of sending a burst of them, as would be the
224    // default.
225    let () = next_ping.set_missed_tick_behavior(MissedTickBehavior::Delay);
226
227    Self {
228      ping: SendMessageState::Unused,
229      next_ping,
230      ping_state: Ping::NotNeeded,
231    }
232  }
233
234  /// Attempt to advance the ping state by one step.
235  #[allow(clippy::result_large_err)]
236  fn advance<S>(&mut self, sink: &mut S, ctx: &mut Context<'_>) -> Result<(), S::Error>
237  where
238    S: Sink<WebSocketMessage, Error = WebSocketError> + Unpin,
239  {
240    let () = self.ping.advance(sink, ctx)?;
241    let mut result = Ok(());
242
243    // We always loop until `next_ping` indicates `Pending`, to be sure
244    // the next wake up is scheduled properly.
245    loop {
246      match self.next_ping.poll_tick(ctx) {
247        Poll::Ready(_instant) => {
248          // We are due sending a ping according to the user's specified
249          // ping interval. Check the existing ping state to decide what
250          // to actually do. We may not need to send a ping if we can
251          // infer that we had activity in said interval already.
252          self.ping_state = match self.ping_state {
253            Ping::NotNeeded => {
254              trace!(
255                channel = debug(sink as *const _),
256                msg = "skipping ping due to activity"
257              );
258              // If anything caused our ping state to change to
259              // `NotNeeded` (from the last time we set it to `Needed`)
260              // then just change it back to `Needed`.
261              Ping::Needed
262            },
263            Ping::Needed => {
264              // The ping state is still `Needed`, which is what we set it
265              // to at the last interval. We need to make sure to actually
266              // send a ping over the wire now to check whether our
267              // connection is still alive.
268              let message = WebSocketMessage::Ping(Bytes::new());
269              let () = set_message(sink, &mut self.ping, message);
270
271              self.ping.advance(sink, ctx)?;
272              Ping::Pending
273            },
274            Ping::Pending => {
275              error!(
276                channel = debug(sink as *const _),
277                msg = "server failed to respond to pings"
278              );
279
280              let err = WebSocketError::Io(io::Error::new(
281                io::ErrorKind::TimedOut,
282                "server failed to respond to pings",
283              ));
284              result = Err(err);
285
286              // We leave it up to clients to decide how to handle missed
287              // pings. But in order to not end up in an endless error
288              // cycle in case the client does not care, clear our ping
289              // state.
290              Ping::Needed
291            },
292          };
293        },
294        Poll::Pending => break result,
295      }
296    }
297  }
298
299  /// Register the fact that activity occurred over the associated
300  /// WebSocket channel, meaning that there is no need to ping the
301  /// server this interval.
302  fn activity(&mut self) {
303    self.ping_state = Ping::NotNeeded;
304  }
305}
306
307
308/// A type helping with the construction of [`Wrapper`] objects.
309#[derive(Debug)]
310pub struct Builder<S> {
311  /// The interval at which to send pings. A value of `None` disables
312  /// sending of pings.
313  ping_interval: Option<Duration>,
314  /// Phantom data for the WebSocket type.
315  _phantom: PhantomData<S>,
316}
317
318impl<S> Builder<S> {
319  /// Overwrite the default ping interval of 30s with a custom one.
320  pub fn set_ping_interval(mut self, interval: Option<Duration>) -> Builder<S> {
321    self.ping_interval = interval;
322    self
323  }
324
325  /// Build the final [`Wrapper`] wrapping the provided WebSocket channel.
326  pub fn build(self, channel: S) -> Wrapper<S> {
327    Wrapper {
328      inner: channel,
329      ping: self.ping_interval.map(Pinger::new),
330    }
331  }
332}
333
334impl<S> Default for Builder<S> {
335  fn default() -> Self {
336    Self {
337      ping_interval: Some(Duration::from_secs(30)),
338      _phantom: PhantomData,
339    }
340  }
341}
342
343
344/// A wrapped WebSocket stream that handles responding to pings with
345/// pongs, sending of pings to check for liveness of server, and
346/// filtering out of WebSocket control messages in the process.
347#[derive(Debug)]
348#[must_use = "streams do nothing unless polled"]
349pub struct Wrapper<S> {
350  /// The wrapped stream & sink.
351  inner: S,
352  /// The state we maintain for sending pings over our internal sink.
353  ping: Option<Pinger>,
354}
355
356impl<S> Wrapper<S> {
357  /// Create a [`Builder`] for creating a customized `Wrapper`.
358  pub fn builder() -> Builder<S> {
359    Builder::default()
360  }
361}
362
363impl<S> Stream for Wrapper<S>
364where
365  S: Sink<WebSocketMessage, Error = WebSocketError>
366    + Stream<Item = Result<WebSocketMessage, WebSocketError>>
367    + Unpin,
368{
369  type Item = Result<Message, S::Error>;
370
371  fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
372    let this = Pin::get_mut(self);
373
374    if let Some(ping) = &mut this.ping {
375      if let Err(err) = ping.advance(&mut this.inner, ctx) {
376        return Poll::Ready(Some(Err(err)))
377      }
378    }
379
380    loop {
381      match this.inner.poll_next_unpin(ctx) {
382        Poll::Pending => {
383          // No new data is available yet. There is nothing to do for us
384          // except bubble up this result.
385          break Poll::Pending
386        },
387        Poll::Ready(None) => {
388          // The stream is exhausted. Bubble up the result and be done.
389          break Poll::Ready(None)
390        },
391        Poll::Ready(Some(Err(err))) => break Poll::Ready(Some(Err(err))),
392        Poll::Ready(Some(Ok(message))) => {
393          debug!(
394            channel = debug(&this.inner as *const _),
395            recv_msg = debug_message(&message)
396          );
397          let () = this.ping.as_mut().map(Pinger::activity).unwrap_or(());
398
399          match message {
400            WebSocketMessage::Text(data) => {
401              break Poll::Ready(Some(Ok(Message::Text(data.to_string()))))
402            },
403            WebSocketMessage::Binary(data) => {
404              break Poll::Ready(Some(Ok(Message::Binary(data.to_vec()))))
405            },
406            WebSocketMessage::Ping(_) => {
407              // Ping messages are automatically and transparently
408              // responded to by tungstenite.
409            },
410            WebSocketMessage::Pong(_) => {
411              // We don't handle pongs any specifically. We already
412              // registered that we received a message above.
413            },
414            WebSocketMessage::Close(_) => {
415              // Once we received a close message from the remote, we
416              // can safely stop anything we've been doing. We do not
417              // expose those to higher layers and instead just indicate
418              // stream exhaustion.
419              break Poll::Ready(None)
420            },
421            WebSocketMessage::Frame(_) => {
422              // We should never receive such a value while reading
423              // messages.
424            },
425          }
426        },
427      }
428    }
429  }
430}
431
432impl<S> Sink<Message> for Wrapper<S>
433where
434  S: Sink<WebSocketMessage, Error = WebSocketError> + Unpin,
435{
436  type Error = S::Error;
437
438  fn poll_ready(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
439    self.inner.poll_ready_unpin(ctx)
440  }
441
442  fn start_send(mut self: Pin<&mut Self>, message: Message) -> Result<(), Self::Error> {
443    let message = message.into();
444    debug!(
445      channel = debug(&self.inner as *const _),
446      send_msg = debug_message(&message)
447    );
448    self.inner.start_send_unpin(message)
449  }
450
451  fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
452    trace!(channel = debug(&self.inner as *const _), msg = "flushing");
453    self.inner.poll_flush_unpin(ctx)
454  }
455
456  fn poll_close(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
457    self.inner.poll_close_unpin(ctx)
458  }
459}
460
461
462#[cfg(test)]
463mod tests {
464  use super::*;
465
466  use std::future::Future;
467  use std::sync::atomic::AtomicUsize;
468  use std::sync::atomic::Ordering;
469  use std::sync::Arc;
470
471  use futures::future::ready;
472  use futures::TryStreamExt as _;
473
474  use rand::seq::IteratorRandom as _;
475  use rand::thread_rng;
476  use rand::Rng as _;
477
478  use test_log::test;
479
480  use tokio::time::pause;
481  use tokio::time::sleep;
482  use tokio::time::timeout;
483
484  use tokio_tungstenite::connect_async;
485  use tokio_tungstenite::tungstenite::error::ProtocolError;
486
487  use url::Url;
488
489  use crate::test::mock_server;
490  use crate::test::WebSocketStream;
491
492
493  /// Check that we can show proper debug representation of WebSocket
494  /// messages.
495  #[test]
496  fn debug_websocket_message() {
497    let message = WebSocketMessage::Binary(Bytes::from(b"this is a test".as_slice()));
498    let expected = r#"Binary("this is a test")"#;
499    assert_eq!(format!("{:?}", debug_message(&message)), expected);
500
501    // Also try with some invalid Unicode.
502    let message = WebSocketMessage::Binary(Bytes::from([0xf0, 0x90, 0x80].as_slice()));
503    let expected = r#"Binary([240, 144, 128])"#;
504    assert_eq!(format!("{:?}", debug_message(&message)), expected);
505
506    let message = WebSocketMessage::Ping(Bytes::new());
507    let expected = r#"Ping([])"#;
508    assert_eq!(format!("{:?}", debug_message(&message)), expected);
509  }
510
511
512  /// Instantiate a WebSocket server serving data provided by the
513  /// given function, connect to said server, and return the resulting
514  /// wrapped stream.
515  async fn serve_and_connect_with_builder<F, R>(
516    builder: Builder<WebSocketStream>,
517    f: F,
518  ) -> Wrapper<WebSocketStream>
519  where
520    F: FnOnce(WebSocketStream) -> R + Send + Sync + 'static,
521    R: Future<Output = Result<(), WebSocketError>> + Send + Sync + 'static,
522  {
523    let addr = mock_server(f).await;
524    let url = Url::parse(&format!("ws://{}", addr)).unwrap();
525
526    let (stream, _) = connect_async(url).await.unwrap();
527    builder.build(stream)
528  }
529
530  /// Create a WebSocket server and connect to it, similar to
531  /// `serve_and_connect_with_builder`, but use a pre-defined builder
532  /// with a 10ms ping interval.
533  async fn serve_and_connect<F, R>(f: F) -> Wrapper<WebSocketStream>
534  where
535    F: FnOnce(WebSocketStream) -> R + Send + Sync + 'static,
536    R: Future<Output = Result<(), WebSocketError>> + Send + Sync + 'static,
537  {
538    let ping = Some(Duration::from_millis(10));
539    let builder = Wrapper::builder().set_ping_interval(ping);
540    serve_and_connect_with_builder(builder, f).await
541  }
542
543  /// Check that our `Wrapper` behaves correctly if no messages are sent
544  /// at all.
545  #[test(tokio::test)]
546  async fn no_messages() {
547    async fn test(_stream: WebSocketStream) -> Result<(), WebSocketError> {
548      Ok(())
549    }
550
551    let err = serve_and_connect(test)
552      .await
553      .try_for_each(|_| ready(Ok(())))
554      .await
555      .unwrap_err();
556
557    match err {
558      WebSocketError::Protocol(ProtocolError::ResetWithoutClosingHandshake) => (),
559      e => panic!("received unexpected error: {}", e),
560    }
561  }
562
563  /// Check that our `Wrapper` handles a straight close without other
564  /// messages correctly.
565  #[test(tokio::test)]
566  async fn direct_close() {
567    async fn test(mut stream: WebSocketStream) -> Result<(), WebSocketError> {
568      // Just respond with a Close.
569      stream.send(WebSocketMessage::Close(None)).await?;
570      Ok(())
571    }
572
573    serve_and_connect(test)
574      .await
575      .try_for_each(|_| ready(Ok(())))
576      .await
577      .unwrap();
578  }
579
580  /// Verify that ping requests are acknowledged by pongs by the
581  /// underlying server.
582  #[test(tokio::test)]
583  async fn ping_pong() {
584    async fn test(stream: WebSocketStream) -> Result<(), WebSocketError> {
585      let mut stream = stream.fuse();
586
587      // Ping.
588      stream.send(WebSocketMessage::Ping(Bytes::new())).await?;
589      // Expect Pong.
590      assert_eq!(
591        stream.next().await.unwrap()?,
592        WebSocketMessage::Pong(Bytes::new()),
593      );
594
595      let future = stream.select_next_some();
596      assert!(timeout(Duration::from_millis(20), future).await.is_err());
597
598      stream.send(WebSocketMessage::Close(None)).await?;
599      Ok(())
600    }
601
602    let builder = Wrapper::builder().set_ping_interval(None);
603    serve_and_connect_with_builder(builder, test)
604      .await
605      .try_for_each(|_| ready(Ok(())))
606      .await
607      .unwrap();
608  }
609
610  /// Verify that pings are being sent by our `Wrapper`.
611  #[test(tokio::test)]
612  async fn pings_are_sent() {
613    async fn test(mut stream: WebSocketStream) -> Result<(), WebSocketError> {
614      // Check that we receive Ping messages. The server will take care
615      // of responding to them behind our back already, so there is
616      // nothing else for us to be done.
617      for _ in 0..2 {
618        assert!(matches!(
619          stream.next().await.unwrap()?,
620          WebSocketMessage::Ping(_)
621        ));
622      }
623
624      stream.send(WebSocketMessage::Close(None)).await?;
625      Ok(())
626    }
627
628    serve_and_connect(test)
629      .await
630      .try_for_each(|_| ready(Ok(())))
631      .await
632      .unwrap();
633  }
634
635  /// Make sure that we do not get any ping bursts or erroneous ping
636  /// timeouts when polling is delayed.
637  #[test(tokio::test)]
638  async fn no_ping_bursts() {
639    let counter = Arc::new(AtomicUsize::new(0));
640    let clone = counter.clone();
641
642    let test = |stream: WebSocketStream| async move {
643      let mut stream = stream.fuse();
644
645      loop {
646        let msg = stream.next().await.unwrap().unwrap();
647        if let WebSocketMessage::Ping(_) = msg {
648          let _ = clone.fetch_add(1, Ordering::Relaxed);
649
650          // We don't need to respond with a `Pong` here as that's
651          // already done by the underlying infrastructure.
652        } else {
653          panic!("received unexpected message: {msg:?}")
654        }
655      }
656    };
657
658    // Pause time on this thread; this way we have full control over how
659    // much time passes while the test runs.
660    let () = pause();
661
662    let wrapper = serve_and_connect(test).await;
663    // Sleep for 10s (in virtual time). That would have caused roughly
664    // 10s / 10ms = 1000 ping intervals to have been missed back when
665    // wake ups were accumulated.
666    let () = sleep(Duration::from_secs(10)).await;
667
668    let future = wrapper.for_each(|result| {
669      assert!(result.is_ok(), "{result:?}");
670      ready(())
671    });
672
673    // Drain the stream for 15ms, which is enough to trigger a single
674    // ping.
675    assert!(timeout(Duration::from_millis(15), future).await.is_err());
676
677    // We expect to have seen one ping.
678    assert_eq!(counter.load(Ordering::Relaxed), 1);
679  }
680
681  /// Verify that no pings are being sent by our `Wrapper` when the
682  /// feature is disabled.
683  #[test(tokio::test)]
684  async fn no_pings_are_sent_when_disabled() {
685    async fn test(stream: WebSocketStream) -> Result<(), WebSocketError> {
686      let mut stream = stream.fuse();
687      let future = stream.select_next_some();
688      assert!(timeout(Duration::from_millis(20), future).await.is_err());
689
690      stream.send(WebSocketMessage::Close(None)).await?;
691      Ok(())
692    }
693
694    let builder = Wrapper::builder().set_ping_interval(None);
695    serve_and_connect_with_builder(builder, test)
696      .await
697      .try_for_each(|_| ready(Ok(())))
698      .await
699      .unwrap();
700  }
701
702  /// Check that we report an error when the server fails to respond to
703  /// pings.
704  #[test(tokio::test)]
705  async fn no_pong_response() {
706    async fn test(_stream: WebSocketStream) -> Result<(), WebSocketError> {
707      sleep(Duration::from_secs(10)).await;
708      Ok(())
709    }
710
711    let mut stream = serve_and_connect(test).await;
712
713    // We should see a timeout error. We test repeatedly to make sure
714    // that we continue pinging if errors are ignored by clients.
715    for _ in 0..5 {
716      let err = stream.next().await.unwrap().unwrap_err();
717      match err {
718        WebSocketError::Io(err) => {
719          assert_eq!(err.kind(), io::ErrorKind::TimedOut);
720          assert_eq!(err.to_string(), "server failed to respond to pings");
721        },
722        _ => panic!("Received unexpected error: {err:?}"),
723      }
724    }
725  }
726
727  /// Check that messages sent by the server are transported correctly.
728  #[test(tokio::test)]
729  async fn send_messages() {
730    async fn test(mut stream: WebSocketStream) -> Result<(), WebSocketError> {
731      stream
732        .send(WebSocketMessage::Text(Utf8Bytes::from_static("42")))
733        .await?;
734      stream.send(WebSocketMessage::Pong(Bytes::new())).await?;
735      stream
736        .send(WebSocketMessage::Text(Utf8Bytes::from_static("43")))
737        .await?;
738      stream.send(WebSocketMessage::Close(None)).await?;
739      Ok(())
740    }
741
742    let stream = serve_and_connect(test).await;
743    let messages = stream.try_collect::<Vec<_>>().await.unwrap();
744    assert_eq!(
745      messages,
746      vec![
747        Message::Text("42".to_string()),
748        Message::Text("43".to_string())
749      ]
750    );
751  }
752
753  /// Stress test our `Wrapper` type's `Stream` part by sending
754  /// excessive amount of messages through it.
755  #[test(tokio::test)]
756  #[ignore = "stress test; test takes a long time"]
757  async fn stress_stream() {
758    async fn test(mut stream: WebSocketStream) -> Result<(), WebSocketError> {
759      fn random_buf() -> Bytes {
760        let len = (0..32).choose(&mut thread_rng()).unwrap();
761        let mut vec = Vec::new();
762        vec.extend((0..len).map(|_| thread_rng().gen::<u8>()));
763        Bytes::from(vec)
764      }
765
766      fn random_str() -> Utf8Bytes {
767        let len = (0..32).choose(&mut thread_rng()).unwrap();
768        let mut string = String::new();
769        string.extend((0..len).map(|_| thread_rng().gen::<char>()));
770        Utf8Bytes::from(string)
771      }
772
773      for _ in 0..50000 {
774        let message = match (0..5).choose(&mut thread_rng()).unwrap() {
775          0 => WebSocketMessage::Pong(random_buf()),
776          // Note that we can't really spam `Ping` messages here. The
777          // server may actually send them itself and so if the server
778          // sends one, then "we" send one immediately after, we may
779          // drop the first pong response on the floor and just send the
780          // second one. However, because the payloads do not match the
781          // server may conclude that something was amiss and terminate
782          // the connection.
783          i => {
784            if i & 0x1 == 0 {
785              WebSocketMessage::Text(random_str())
786            } else {
787              WebSocketMessage::Binary(random_buf())
788            }
789          },
790        };
791
792        stream.send(message).await?;
793      }
794
795      stream.send(WebSocketMessage::Close(None)).await?;
796      Ok(())
797    }
798
799    serve_and_connect(test)
800      .await
801      .try_for_each(|_| ready(Ok(())))
802      .await
803      .unwrap();
804  }
805}