graphql_ws_client/
native.rs

1use futures_lite::{Stream, StreamExt};
2use futures_sink::Sink;
3use tungstenite::{self, protocol::CloseFrame};
4
5use crate::{sink_ext::SinkExt, Error, Message};
6
7#[cfg_attr(docsrs, doc(cfg(feature = "tungstenite")))]
8impl<T> crate::next::Connection for T
9where
10    T: Stream<Item = Result<tungstenite::Message, tungstenite::Error>>
11        + Sink<tungstenite::Message>
12        + Send
13        + Sync
14        + Unpin,
15    <T as Sink<tungstenite::Message>>::Error: std::fmt::Display,
16{
17    async fn receive(&mut self) -> Option<Message> {
18        loop {
19            match self.next().await? {
20                Ok(tungstenite::Message::Text(text)) => {
21                    return Some(crate::next::Message::Text(text))
22                }
23                Ok(tungstenite::Message::Ping(_)) => return Some(crate::next::Message::Ping),
24                Ok(tungstenite::Message::Pong(_)) => return Some(crate::next::Message::Pong),
25                Ok(tungstenite::Message::Close(frame)) => {
26                    return Some(crate::next::Message::Close {
27                        code: frame.as_ref().map(|frame| frame.code.into()),
28                        reason: frame.map(|frame| frame.reason.to_string()),
29                    })
30                }
31                Ok(tungstenite::Message::Frame(_) | tungstenite::Message::Binary(_)) => continue,
32                Err(error) => {
33                    #[allow(unused)]
34                    let error = error;
35                    crate::logging::warning!("error receiving message: {error:?}");
36                    return None;
37                }
38            }
39        }
40    }
41
42    async fn send(&mut self, message: crate::next::Message) -> Result<(), Error> {
43        <Self as SinkExt<tungstenite::Message>>::send(
44            self,
45            match message {
46                crate::next::Message::Text(text) => tungstenite::Message::Text(text),
47                crate::next::Message::Close { code, reason } => {
48                    tungstenite::Message::Close(code.zip(reason).map(|(code, reason)| CloseFrame {
49                        code: code.into(),
50                        reason: reason.into(),
51                    }))
52                }
53                crate::next::Message::Ping => tungstenite::Message::Ping(vec![]),
54                crate::next::Message::Pong => tungstenite::Message::Pong(vec![]),
55            },
56        )
57        .await
58        .map_err(|error| Error::Send(error.to_string()))
59    }
60}