graphql_ws_client/
ws_stream_wasm.rs

1use futures_lite::{FutureExt, StreamExt};
2use pharos::{Observable, ObserveConfig};
3use ws_stream_wasm::{WsEvent, WsMessage, WsMeta, WsStream};
4
5use crate::{sink_ext::SinkExt, Error};
6
7/// A websocket connection for [ws_stream_wasm][1]
8///
9/// [1]: https://docs.rs/ws_stream/latest/ws_stream
10#[cfg_attr(docsrs, doc(cfg(feature = "ws_stream_wasm")))]
11pub struct Connection {
12    messages: WsStream,
13    event_stream: pharos::Events<WsEvent>,
14    meta: WsMeta,
15}
16
17impl Connection {
18    /// Creates a new Connection from a [`WsMeta`] and [`WsStream`] combo
19    ///
20    /// # Panics
21    ///
22    /// Will panic if `meta.observe` fails.
23    pub async fn new((mut meta, messages): (WsMeta, WsStream)) -> Self {
24        let event_stream = meta.observe(ObserveConfig::default()).await.unwrap();
25
26        Connection {
27            messages,
28            event_stream,
29            meta,
30        }
31    }
32}
33
34impl crate::next::Connection for Connection {
35    async fn receive(&mut self) -> Option<crate::next::Message> {
36        use crate::next::Message;
37        loop {
38            match self.next().await? {
39                EventOrMessage::Event(WsEvent::Closed(close)) => {
40                    return Some(Message::Close {
41                        code: Some(close.code),
42                        reason: Some(close.reason),
43                    });
44                }
45                EventOrMessage::Event(WsEvent::Error | WsEvent::WsErr(_)) => {
46                    return None;
47                }
48                EventOrMessage::Event(WsEvent::Open | WsEvent::Closing) => {
49                    continue;
50                }
51                EventOrMessage::Message(WsMessage::Text(text)) => return Some(Message::Text(text)),
52                EventOrMessage::Message(WsMessage::Binary(_)) => {
53                    // We shouldn't receive binary messages, but ignore them if we do
54                    continue;
55                }
56            }
57        }
58    }
59
60    async fn send(&mut self, message: crate::next::Message) -> Result<(), Error> {
61        use crate::next::Message;
62
63        match message {
64            Message::Text(text) => self.messages.send(WsMessage::Text(text)).await,
65            Message::Close { code, reason } => match (code, reason) {
66                (Some(code), Some(reason)) => self.meta.close_reason(code, reason).await,
67                (Some(code), _) => self.meta.close_code(code).await,
68                _ => self.meta.close().await,
69            }
70            .map(|_| ()),
71            Message::Ping | Message::Pong => return Ok(()),
72        }
73        .map_err(|error| Error::Send(error.to_string()))
74    }
75}
76
77impl Connection {
78    async fn next(&mut self) -> Option<EventOrMessage> {
79        let event = async { self.event_stream.next().await.map(EventOrMessage::Event) };
80        let message = async { self.messages.next().await.map(EventOrMessage::Message) };
81
82        event.race(message).await
83    }
84}
85
86enum EventOrMessage {
87    Event(WsEvent),
88    Message(WsMessage),
89}