graphql_ws_client/
native.rs1use futures_lite::{Stream, StreamExt};
2use futures_sink::Sink;
3use tungstenite::{self, protocol::CloseFrame};
4
5use crate::{sink_ext::SinkExt, Error, Message};
6
7#[cfg_attr(docsrs, doc(cfg(feature = "tungstenite")))]
8impl<T> crate::next::Connection for T
9where
10 T: Stream<Item = Result<tungstenite::Message, tungstenite::Error>>
11 + Sink<tungstenite::Message>
12 + Send
13 + Sync
14 + Unpin,
15 <T as Sink<tungstenite::Message>>::Error: std::fmt::Display,
16{
17 async fn receive(&mut self) -> Option<Message> {
18 loop {
19 match self.next().await? {
20 Ok(tungstenite::Message::Text(text)) => {
21 return Some(crate::next::Message::Text(text))
22 }
23 Ok(tungstenite::Message::Ping(_)) => return Some(crate::next::Message::Ping),
24 Ok(tungstenite::Message::Pong(_)) => return Some(crate::next::Message::Pong),
25 Ok(tungstenite::Message::Close(frame)) => {
26 return Some(crate::next::Message::Close {
27 code: frame.as_ref().map(|frame| frame.code.into()),
28 reason: frame.map(|frame| frame.reason.to_string()),
29 })
30 }
31 Ok(tungstenite::Message::Frame(_) | tungstenite::Message::Binary(_)) => continue,
32 Err(error) => {
33 #[allow(unused)]
34 let error = error;
35 crate::logging::warning!("error receiving message: {error:?}");
36 return None;
37 }
38 }
39 }
40 }
41
42 async fn send(&mut self, message: crate::next::Message) -> Result<(), Error> {
43 <Self as SinkExt<tungstenite::Message>>::send(
44 self,
45 match message {
46 crate::next::Message::Text(text) => tungstenite::Message::Text(text),
47 crate::next::Message::Close { code, reason } => {
48 tungstenite::Message::Close(code.zip(reason).map(|(code, reason)| CloseFrame {
49 code: code.into(),
50 reason: reason.into(),
51 }))
52 }
53 crate::next::Message::Ping => tungstenite::Message::Ping(vec![]),
54 crate::next::Message::Pong => tungstenite::Message::Pong(vec![]),
55 },
56 )
57 .await
58 .map_err(|error| Error::Send(error.to_string()))
59 }
60}