Skip to main content

websocket_web/
lib.rs

1//! # WebSockets on the web 🕸️
2//!
3//! This crate provides WebSocket support in a JavaScript runtime environment, usually a web browser.
4//!
5//! If available it uses the experimental [WebSocketStream API](https://developer.mozilla.org/en-US/docs/Web/API/WebSocketStream),
6//! otherwise the standardized [WebSocket API](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API)
7//! is used.
8//!
9//! The WebSocketStream API provides backpressure in both transmit and receive directions.
10//! The standardized WebSocket API provides backpressure only in the transmit direction;
11//! if the receive buffer overflows, the WebSocket is closed and an error is returned.
12//!
13//! ## Sending WebSocket messages
14//!
15//! WebSocket is a message oriented protocol, i.e. it deals with sending and receiving discrete
16//! messages rather than streams of data. A message can be either text (UTF-8 string) or binary.
17//!
18//! [WebSocket] and [WebSocketSender] implement the [Sink] trait for sending messages.
19//! A Rust [String] or [`&str`](str) is sent as a text message and
20//! a [`Vec<u8>`] or `&[u8]` is transmitted as a binary message.
21//!
22//! Additionally, both types implement [AsyncWrite]. When using this trait, each write
23//! is sent as a binary message containg the whole buffer.
24//!
25//! ## Receiving WebSocket messages
26//!
27//! [WebSocket] and [WebSocketReceiver] implement the [Stream] trait for receiving messages.
28//! The received data type is [`Msg`], which can either be [text](Msg::Text) or [binary](Msg::Binary).
29//!
30//! Additionally, both types implement [AsyncRead]. When using this trait, each received
31//! message is converted to binary format and buffered to support partial reads, i.e.
32//! a read using a buffer with a size smaller than the received message.
33//!
34//! ## Example
35//!
36//! The following example establishes a WebSocket connection to `localhost` on port `8765`.
37//! It then sends the text message `Test123` and then receiving one incoming message.
38//! Finally, it explicitly closes the WebSocket with the reason `Goodbye!`.
39//!
40//! ```no_run
41//! use websocket_web::{WebSocket, CloseCode};
42//! use futures_util::{SinkExt, StreamExt};
43//!
44//! # async fn example() {
45//! // Connect to WebSocket echo server running on localhost.
46//! let mut socket = WebSocket::connect("ws://127.0.0.1:8765").await.unwrap();
47//!
48//! // Send WebSocket text message.
49//! socket.send("Test123").await.unwrap();
50//!
51//! // Receive WebSocket message.
52//! let msg = socket.next().await.unwrap().unwrap();
53//! assert_eq!(msg.to_string(), "Test123");
54//!
55//! // Explicitly close WebSocket with close code and reason (optional).
56//! socket.close_with_reason(CloseCode::NormalClosure, "Goodbye!");
57//! # }
58//! ```
59
60#![warn(missing_docs)]
61#[cfg(not(target_family = "wasm"))]
62compile_error!("websocket-web requires a WebAssembly target");
63
64mod closed;
65mod standard;
66mod stream;
67mod util;
68
69use futures_core::Stream;
70use futures_sink::Sink;
71use futures_util::{SinkExt, StreamExt};
72use js_sys::{Reflect, Uint8Array};
73use std::{
74    fmt, io,
75    io::ErrorKind,
76    pin::Pin,
77    rc::Rc,
78    task::{ready, Context, Poll},
79};
80use tokio::io::{AsyncRead, AsyncWrite};
81use util::uint8_array_for_api;
82use wasm_bindgen::prelude::*;
83
84pub use closed::{CloseCode, Closed, ClosedReason};
85
86/// The WebSocket API used to interact with the JavaScript runtime.
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum Interface {
89    /// Experimental [WebSocketStream](https://developer.mozilla.org/en-US/docs/Web/API/WebSocketStream) interface.
90    ///
91    /// This provides backpressure in both directions and is recommend if supported by the browser.
92    Stream,
93    /// Standarized [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) interface.
94    ///
95    /// This provides backpressure only in the transmit direction and should only be used as a fallback.
96    Standard,
97}
98
99impl Interface {
100    /// Whether the interface is supported by the current runtime.
101    pub fn is_supported(&self) -> bool {
102        let global = js_sys::global();
103        match self {
104            Self::Stream => Reflect::has(&global, &JsValue::from_str("WebSocketStream")).unwrap_or_default(),
105            Self::Standard => Reflect::has(&global, &JsValue::from_str("WebSocket")).unwrap_or_default(),
106        }
107    }
108}
109
110/// A WebSocket message.
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub enum Msg {
113    /// Text message.
114    Text(String),
115    /// Binary message.
116    Binary(Vec<u8>),
117}
118
119impl Msg {
120    /// Whether this is a text message.
121    pub const fn is_text(&self) -> bool {
122        matches!(self, Self::Text(_))
123    }
124
125    /// Whether this is a binary message.
126    pub const fn is_binary(&self) -> bool {
127        matches!(self, Self::Binary(_))
128    }
129
130    /// Convert to binary message.
131    pub fn to_vec(self) -> Vec<u8> {
132        match self {
133            Self::Text(text) => text.into_bytes(),
134            Self::Binary(vec) => vec,
135        }
136    }
137
138    /// Length of message in bytes.
139    pub fn len(&self) -> usize {
140        match self {
141            Self::Text(text) => text.len(),
142            Self::Binary(vec) => vec.len(),
143        }
144    }
145
146    /// Whether the length of this message is zero.
147    pub fn is_empty(&self) -> bool {
148        match self {
149            Self::Text(text) => text.is_empty(),
150            Self::Binary(vec) => vec.is_empty(),
151        }
152    }
153}
154
155impl fmt::Display for Msg {
156    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
157        match self {
158            Self::Text(text) => write!(f, "{text}"),
159            Self::Binary(binary) => write!(f, "{}", String::from_utf8_lossy(binary)),
160        }
161    }
162}
163
164impl From<Msg> for Vec<u8> {
165    fn from(msg: Msg) -> Self {
166        msg.to_vec()
167    }
168}
169
170impl AsRef<[u8]> for Msg {
171    fn as_ref(&self) -> &[u8] {
172        match self {
173            Self::Text(text) => text.as_bytes(),
174            Self::Binary(vec) => vec,
175        }
176    }
177}
178
179/// Builder for connecting a WebSocket.
180#[derive(Debug, Clone)]
181pub struct WebSocketBuilder {
182    url: String,
183    protocols: Vec<String>,
184    interface: Option<Interface>,
185    send_buffer_size: Option<usize>,
186    receive_buffer_size: Option<usize>,
187}
188
189impl WebSocketBuilder {
190    /// Creates a new WebSocket builder that will connect to the specified URL.
191    pub fn new(url: impl AsRef<str>) -> Self {
192        Self {
193            url: url.as_ref().to_string(),
194            protocols: Vec::new(),
195            interface: None,
196            send_buffer_size: None,
197            receive_buffer_size: None,
198        }
199    }
200
201    /// Sets the WebSocket browser interface to use.
202    ///
203    /// If unset, the stream-based interface is preferred when available.
204    pub fn set_interface(&mut self, interface: Interface) {
205        self.interface = Some(interface);
206    }
207
208    /// Sets the sub-protocol(s) that the client would like to use.
209    ///
210    /// Subprotocols may be selected from the [IANA WebSocket Subprotocol Name Registry]
211    /// or may be custom names jointly understood by the client and the server.
212    /// A single server can implement multiple WebSocket sub-protocols, and
213    /// handle different types of interactions depending on the specified value.
214    ///
215    /// If protocols is included, the connection will only be established if the server
216    /// reports that it has selected one of these sub-protocols.
217    ///
218    /// [IANA WebSocket Subprotocol Name Registry]: https://www.iana.org/assignments/websocket/websocket.xml#subprotocol-name
219    pub fn set_protocols<P>(&mut self, protocols: impl IntoIterator<Item = P>)
220    where
221        P: AsRef<str>,
222    {
223        self.protocols = protocols.into_iter().map(|s| s.as_ref().to_string()).collect();
224    }
225
226    /// Sets the maximum send buffer size in bytes.
227    ///
228    /// The behavior depends on which [WebSocket interface](Interface) is used:
229    ///
230    ///   * For the [standard WebSocket interface](Interface::Standard):
231    ///     when the maximum send buffer size is reached, all sending function stop
232    ///     accepting data until the send buffer size falls below the specified size.
233    ///
234    ///   * For the [stream-baed WebSocket interface](Interface::Stream):
235    ///     when the maximum send buffer size is reach, the application yields to
236    ///     the browser, which decides whether more data can be buffered or not.
237    pub fn set_send_buffer_size(&mut self, send_buffer_size: usize) {
238        self.send_buffer_size = Some(send_buffer_size);
239    }
240
241    /// Sets the maximum receive buffer size in bytes.
242    ///
243    /// This only affects the [standard WebSocket interface](Interface::Standard).
244    ///
245    /// If the maximum receive buffer size is reached, the WebSocket is closed and an
246    /// error is returned when trying to read from it.
247    ///
248    /// When using the [stream-baed WebSocket interface](Interface::Stream), the receive
249    /// buffer size is fully managed by the browser.
250    pub fn set_receive_buffer_size(&mut self, receive_buffer_size: usize) {
251        self.receive_buffer_size = Some(receive_buffer_size);
252    }
253
254    /// Establishes the WebSocket connection.
255    pub async fn connect(self) -> io::Result<WebSocket> {
256        let interface = match self.interface {
257            Some(interface) => interface,
258            None if Interface::Stream.is_supported() => Interface::Stream,
259            None => Interface::Standard,
260        };
261
262        if !interface.is_supported() {
263            match interface {
264                Interface::Stream => {
265                    return Err(io::Error::new(ErrorKind::Unsupported, "WebSocketStream not supported"))
266                }
267                Interface::Standard => {
268                    return Err(io::Error::new(ErrorKind::Unsupported, "WebSocket not supported"))
269                }
270            }
271        }
272
273        match interface {
274            Interface::Stream => {
275                let (stream, info) = stream::Inner::new(self).await?;
276                Ok(WebSocket { inner: Inner::Stream(stream), info: Rc::new(info), read_buf: Vec::new() })
277            }
278            Interface::Standard => {
279                let (standard, info) = standard::Inner::new(self).await?;
280                Ok(WebSocket { inner: Inner::Standard(standard), info: Rc::new(info), read_buf: Vec::new() })
281            }
282        }
283    }
284}
285
286struct Info {
287    url: String,
288    protocol: String,
289    interface: Interface,
290}
291
292/// A WebSocket provided by the JavaScript runtime (usually the web browser).
293///
294/// The WebSocket is closed when dropped.
295pub struct WebSocket {
296    inner: Inner,
297    info: Rc<Info>,
298    read_buf: Vec<u8>,
299}
300
301enum Inner {
302    Stream(stream::Inner),
303    Standard(standard::Inner),
304}
305
306impl fmt::Debug for WebSocket {
307    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
308        f.debug_struct("WebSocket")
309            .field("url", &self.info.url)
310            .field("protocol", &self.protocol())
311            .field("interface", &self.interface())
312            .finish()
313    }
314}
315
316impl WebSocket {
317    /// Connect to the specified WebSocket URL using default options.
318    pub async fn connect(url: impl AsRef<str>) -> io::Result<Self> {
319        WebSocketBuilder::new(url).connect().await
320    }
321
322    /// The URL of the WebSocket server.
323    pub fn url(&self) -> &str {
324        &self.info.url
325    }
326
327    /// A string representing the sub-protocol used to open the current WebSocket connection
328    /// (chosen from the options specified in the [WebSocketBuilder]).
329    ///
330    /// Returns an empty string if no sub-protocol has been used to open the connection
331    /// (i.e. no sub-protocol options were specified in the [WebSocketBuilder]).
332    pub fn protocol(&self) -> &str {
333        &self.info.protocol
334    }
335
336    /// The used WebSocket browser interface.
337    pub fn interface(&self) -> Interface {
338        self.info.interface
339    }
340
341    /// Splits this WebSocket into a sender and receiver.
342    pub fn into_split(self) -> (WebSocketSender, WebSocketReceiver) {
343        let Self { inner, info, read_buf } = self;
344        match inner {
345            Inner::Stream(inner) => {
346                let (sender, receiver) = inner.into_split();
347                let sender = WebSocketSender { inner: SenderInner::Stream(sender), info: info.clone() };
348                let receiver = WebSocketReceiver { inner: ReceiverInner::Stream(receiver), info, read_buf };
349                (sender, receiver)
350            }
351            Inner::Standard(inner) => {
352                let (sender, receiver) = inner.into_split();
353                let sender = WebSocketSender { inner: SenderInner::Standard(sender), info: info.clone() };
354                let receiver =
355                    WebSocketReceiver { inner: ReceiverInner::Standard(receiver), info, read_buf: Vec::new() };
356                (sender, receiver)
357            }
358        }
359    }
360
361    /// Closes the WebSocket.
362    pub fn close(self) {
363        self.into_split().0.close();
364    }
365
366    /// Closes the WebSocket with the specified close code and reason.
367    ///
368    /// ## Panics
369    /// Panics if the close code is neither [CloseCode::NormalClosure] nor
370    /// [CloseCode::Other] with a value between 3000 and 4999.
371    #[track_caller]
372    pub fn close_with_reason(self, code: CloseCode, reason: &str) {
373        self.into_split().0.close_with_reason(code, reason);
374    }
375
376    /// Returns a future that resolves when the WebSocket is closed remotely.
377    pub fn closed(&self) -> Closed {
378        match &self.inner {
379            Inner::Stream(inner) => inner.closed(),
380            Inner::Standard(inner) => inner.closed(),
381        }
382    }
383
384    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
385        match &mut self.inner {
386            Inner::Stream(inner) => inner.sender.poll_ready_unpin(cx),
387            Inner::Standard(inner) => inner.sender.poll_ready_unpin(cx),
388        }
389    }
390
391    fn start_send(mut self: Pin<&mut Self>, item: &JsValue, len: usize) -> Result<(), io::Error> {
392        match &mut self.inner {
393            Inner::Stream(inner) => inner.sender.start_send_unpin((item, len)),
394            Inner::Standard(inner) => inner.sender.start_send_unpin(item),
395        }
396    }
397
398    fn start_send_binary(mut self: Pin<&mut Self>, data: &[u8]) -> Result<(), io::Error> {
399        match &mut self.inner {
400            Inner::Stream(inner) => {
401                let array: JsValue = Uint8Array::from(data).into();
402                inner.sender.start_send_unpin((&array, data.len()))
403            }
404            Inner::Standard(inner) => {
405                // SAFETY: WebSocket.send() copies data synchronously.
406                let array: JsValue = unsafe { uint8_array_for_api(data) }.into();
407                inner.sender.start_send_unpin(&array)
408            }
409        }
410    }
411
412    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
413        match &mut self.inner {
414            Inner::Stream(inner) => inner.sender.poll_flush_unpin(cx),
415            Inner::Standard(inner) => inner.sender.poll_flush_unpin(cx),
416        }
417    }
418
419    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
420        match &mut self.inner {
421            Inner::Stream(inner) => inner.sender.poll_close_unpin(cx),
422            Inner::Standard(inner) => inner.sender.poll_close_unpin(cx),
423        }
424    }
425}
426
427impl Sink<&str> for WebSocket {
428    type Error = io::Error;
429
430    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
431        self.poll_ready(cx)
432    }
433
434    fn start_send(self: Pin<&mut Self>, item: &str) -> Result<(), Self::Error> {
435        self.start_send(&JsValue::from_str(item), item.len())
436    }
437
438    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
439        self.poll_flush(cx)
440    }
441
442    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
443        self.poll_close(cx)
444    }
445}
446
447impl Sink<String> for WebSocket {
448    type Error = io::Error;
449
450    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
451        self.poll_ready(cx)
452    }
453
454    fn start_send(self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> {
455        self.start_send(&JsValue::from_str(&item), item.len())
456    }
457
458    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
459        self.poll_flush(cx)
460    }
461
462    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
463        self.poll_close(cx)
464    }
465}
466
467impl Sink<&[u8]> for WebSocket {
468    type Error = io::Error;
469
470    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
471        self.poll_ready(cx)
472    }
473
474    fn start_send(self: Pin<&mut Self>, item: &[u8]) -> Result<(), Self::Error> {
475        self.start_send_binary(item)
476    }
477
478    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
479        self.poll_flush(cx)
480    }
481
482    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
483        self.poll_close(cx)
484    }
485}
486
487impl Sink<Vec<u8>> for WebSocket {
488    type Error = io::Error;
489
490    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
491        self.poll_ready(cx)
492    }
493
494    fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
495        self.start_send_binary(&item)
496    }
497
498    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
499        self.poll_flush(cx)
500    }
501
502    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
503        self.poll_close(cx)
504    }
505}
506
507impl Sink<Msg> for WebSocket {
508    type Error = io::Error;
509
510    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
511        self.poll_ready(cx)
512    }
513
514    fn start_send(self: Pin<&mut Self>, item: Msg) -> Result<(), Self::Error> {
515        match item {
516            Msg::Text(text) => self.start_send(&JsValue::from_str(&text), text.len()),
517            Msg::Binary(vec) => self.start_send_binary(&vec),
518        }
519    }
520
521    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
522        self.poll_flush(cx)
523    }
524
525    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
526        self.poll_close(cx)
527    }
528}
529
530impl AsyncWrite for WebSocket {
531    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
532        ready!(self.as_mut().poll_ready(cx))?;
533        self.start_send_binary(buf)?;
534        Poll::Ready(Ok(buf.len()))
535    }
536
537    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
538        self.poll_flush(cx)
539    }
540
541    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
542        self.poll_close(cx)
543    }
544}
545
546impl Stream for WebSocket {
547    type Item = io::Result<Msg>;
548
549    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
550        match &mut self.inner {
551            Inner::Stream(inner) => inner.receiver.poll_next_unpin(cx),
552            Inner::Standard(inner) => inner.receiver.poll_next_unpin(cx),
553        }
554    }
555}
556
557impl AsyncRead for WebSocket {
558    fn poll_read(
559        mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut tokio::io::ReadBuf,
560    ) -> Poll<io::Result<()>> {
561        while self.read_buf.is_empty() {
562            let Some(msg) = ready!(self.as_mut().poll_next(cx)?) else { return Poll::Ready(Ok(())) };
563            self.read_buf = msg.to_vec();
564        }
565
566        let to_copy = buf.remaining().min(self.read_buf.len());
567        buf.put_slice(&self.read_buf[..to_copy]);
568        self.read_buf.drain(..to_copy);
569
570        Poll::Ready(Ok(()))
571    }
572}
573
574/// Sending part of a [WebSocket].
575///
576/// The WebSocket is closed when both the [WebSocketSender] and [WebSocketReceiver]
577/// are dropped.
578pub struct WebSocketSender {
579    inner: SenderInner,
580    info: Rc<Info>,
581}
582
583enum SenderInner {
584    Stream(stream::Sender),
585    Standard(standard::Sender),
586}
587
588impl fmt::Debug for WebSocketSender {
589    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
590        f.debug_struct("WebSocketSender")
591            .field("url", &self.info.url)
592            .field("protocol", &self.protocol())
593            .field("interface", &self.interface())
594            .finish()
595    }
596}
597
598impl WebSocketSender {
599    /// The URL of the WebSocket server.
600    pub fn url(&self) -> &str {
601        &self.info.url
602    }
603
604    /// A string representing the sub-protocol used to open the current WebSocket connection.
605    pub fn protocol(&self) -> &str {
606        &self.info.protocol
607    }
608
609    /// The used WebSocket browser interface.
610    pub fn interface(&self) -> Interface {
611        self.info.interface
612    }
613
614    /// Closes the WebSocket.
615    ///
616    /// This also closes the corresponding [WebSocketReceiver].
617    pub fn close(self) {
618        self.close_with_reason(CloseCode::NormalClosure, "");
619    }
620
621    /// Closes the WebSocket with the specified close code and reason.
622    ///
623    /// This also closes the corresponding [WebSocketReceiver].
624    ///
625    /// ## Panics
626    /// Panics if the close code is neither [CloseCode::NormalClosure] nor
627    /// [CloseCode::Other] with a value between 3000 and 4999.
628    #[track_caller]
629    pub fn close_with_reason(self, code: CloseCode, reason: &str) {
630        if !code.is_valid() {
631            panic!("WebSocket close code {code} is invalid");
632        }
633
634        match self.inner {
635            SenderInner::Stream(sender) => sender.close(code.into(), reason),
636            SenderInner::Standard(sender) => sender.close(code.into(), reason),
637        }
638    }
639
640    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
641        match &mut self.inner {
642            SenderInner::Stream(inner) => inner.poll_ready_unpin(cx),
643            SenderInner::Standard(inner) => inner.poll_ready_unpin(cx),
644        }
645    }
646
647    fn start_send(mut self: Pin<&mut Self>, item: &JsValue, len: usize) -> Result<(), io::Error> {
648        match &mut self.inner {
649            SenderInner::Stream(inner) => inner.start_send_unpin((item, len)),
650            SenderInner::Standard(inner) => inner.start_send_unpin(item),
651        }
652    }
653
654    fn start_send_binary(mut self: Pin<&mut Self>, data: &[u8]) -> Result<(), io::Error> {
655        match &mut self.inner {
656            SenderInner::Stream(inner) => {
657                let array: JsValue = Uint8Array::from(data).into();
658                inner.start_send_unpin((&array, data.len()))
659            }
660            SenderInner::Standard(inner) => {
661                // SAFETY: WebSocket.send() copies data synchronously.
662                let array: JsValue = unsafe { uint8_array_for_api(data) }.into();
663                inner.start_send_unpin(&array)
664            }
665        }
666    }
667
668    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
669        match &mut self.inner {
670            SenderInner::Stream(inner) => inner.poll_flush_unpin(cx),
671            SenderInner::Standard(inner) => inner.poll_flush_unpin(cx),
672        }
673    }
674
675    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
676        match &mut self.inner {
677            SenderInner::Stream(inner) => inner.poll_close_unpin(cx),
678            SenderInner::Standard(inner) => inner.poll_close_unpin(cx),
679        }
680    }
681}
682
683impl Sink<&str> for WebSocketSender {
684    type Error = io::Error;
685
686    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
687        self.poll_ready(cx)
688    }
689
690    fn start_send(self: Pin<&mut Self>, item: &str) -> Result<(), Self::Error> {
691        self.start_send(&JsValue::from_str(item), item.len())
692    }
693
694    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
695        self.poll_flush(cx)
696    }
697
698    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
699        self.poll_close(cx)
700    }
701}
702
703impl Sink<String> for WebSocketSender {
704    type Error = io::Error;
705
706    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
707        self.poll_ready(cx)
708    }
709
710    fn start_send(self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> {
711        self.start_send(&JsValue::from_str(&item), item.len())
712    }
713
714    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
715        self.poll_flush(cx)
716    }
717
718    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
719        self.poll_close(cx)
720    }
721}
722
723impl Sink<&[u8]> for WebSocketSender {
724    type Error = io::Error;
725
726    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
727        self.poll_ready(cx)
728    }
729
730    fn start_send(self: Pin<&mut Self>, item: &[u8]) -> Result<(), Self::Error> {
731        self.start_send_binary(item)
732    }
733
734    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
735        self.poll_flush(cx)
736    }
737
738    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
739        self.poll_close(cx)
740    }
741}
742
743impl Sink<Vec<u8>> for WebSocketSender {
744    type Error = io::Error;
745
746    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
747        self.poll_ready(cx)
748    }
749
750    fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
751        self.start_send_binary(&item)
752    }
753
754    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
755        self.poll_flush(cx)
756    }
757
758    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
759        self.poll_close(cx)
760    }
761}
762
763impl Sink<Msg> for WebSocketSender {
764    type Error = io::Error;
765
766    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
767        self.poll_ready(cx)
768    }
769
770    fn start_send(self: Pin<&mut Self>, item: Msg) -> Result<(), Self::Error> {
771        match item {
772            Msg::Text(text) => self.start_send(&JsValue::from_str(&text), text.len()),
773            Msg::Binary(vec) => self.start_send_binary(&vec),
774        }
775    }
776
777    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
778        self.poll_flush(cx)
779    }
780
781    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
782        self.poll_close(cx)
783    }
784}
785
786impl AsyncWrite for WebSocketSender {
787    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
788        ready!(self.as_mut().poll_ready(cx))?;
789        self.start_send_binary(buf)?;
790        Poll::Ready(Ok(buf.len()))
791    }
792
793    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
794        self.poll_flush(cx)
795    }
796
797    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
798        self.poll_close(cx)
799    }
800}
801
802/// Receiving part of a [WebSocket].
803///
804/// The WebSocket is closed when both the [WebSocketSender] and [WebSocketReceiver]
805/// are dropped.
806pub struct WebSocketReceiver {
807    inner: ReceiverInner,
808    info: Rc<Info>,
809    read_buf: Vec<u8>,
810}
811
812enum ReceiverInner {
813    Stream(stream::Receiver),
814    Standard(standard::Receiver),
815}
816
817impl fmt::Debug for WebSocketReceiver {
818    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
819        f.debug_struct("WebSocketReceiver")
820            .field("url", &self.info.url)
821            .field("protocol", &self.protocol())
822            .field("interface", &self.interface())
823            .finish()
824    }
825}
826
827impl WebSocketReceiver {
828    /// The URL of the WebSocket server.
829    pub fn url(&self) -> &str {
830        &self.info.url
831    }
832
833    /// A string representing the sub-protocol used to open the current WebSocket connection.
834    pub fn protocol(&self) -> &str {
835        &self.info.protocol
836    }
837
838    /// The used WebSocket browser interface.
839    pub fn interface(&self) -> Interface {
840        self.info.interface
841    }
842}
843
844impl Stream for WebSocketReceiver {
845    type Item = io::Result<Msg>;
846
847    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
848        match &mut self.inner {
849            ReceiverInner::Stream(inner) => inner.poll_next_unpin(cx),
850            ReceiverInner::Standard(inner) => inner.poll_next_unpin(cx),
851        }
852    }
853}
854
855impl AsyncRead for WebSocketReceiver {
856    fn poll_read(
857        mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut tokio::io::ReadBuf,
858    ) -> Poll<io::Result<()>> {
859        while self.read_buf.is_empty() {
860            let Some(msg) = ready!(self.as_mut().poll_next(cx)?) else { return Poll::Ready(Ok(())) };
861            self.read_buf = msg.to_vec();
862        }
863
864        let to_copy = buf.remaining().min(self.read_buf.len());
865        buf.put_slice(&self.read_buf[..to_copy]);
866        self.read_buf.drain(..to_copy);
867
868        Poll::Ready(Ok(()))
869    }
870}