graphql_ws_client/
ws_stream_wasm.rs1use futures_lite::{FutureExt, StreamExt};
2use pharos::{Observable, ObserveConfig};
3use ws_stream_wasm::{WsEvent, WsMessage, WsMeta, WsStream};
4
5use crate::{sink_ext::SinkExt, Error};
6
7#[cfg_attr(docsrs, doc(cfg(feature = "ws_stream_wasm")))]
11pub struct Connection {
12 messages: WsStream,
13 event_stream: pharos::Events<WsEvent>,
14 meta: WsMeta,
15}
16
17impl Connection {
18 pub async fn new((mut meta, messages): (WsMeta, WsStream)) -> Self {
24 let event_stream = meta.observe(ObserveConfig::default()).await.unwrap();
25
26 Connection {
27 messages,
28 event_stream,
29 meta,
30 }
31 }
32}
33
34impl crate::next::Connection for Connection {
35 async fn receive(&mut self) -> Option<crate::next::Message> {
36 use crate::next::Message;
37 loop {
38 match self.next().await? {
39 EventOrMessage::Event(WsEvent::Closed(close)) => {
40 return Some(Message::Close {
41 code: Some(close.code),
42 reason: Some(close.reason),
43 });
44 }
45 EventOrMessage::Event(WsEvent::Error | WsEvent::WsErr(_)) => {
46 return None;
47 }
48 EventOrMessage::Event(WsEvent::Open | WsEvent::Closing) => {
49 continue;
50 }
51 EventOrMessage::Message(WsMessage::Text(text)) => return Some(Message::Text(text)),
52 EventOrMessage::Message(WsMessage::Binary(_)) => {
53 continue;
55 }
56 }
57 }
58 }
59
60 async fn send(&mut self, message: crate::next::Message) -> Result<(), Error> {
61 use crate::next::Message;
62
63 match message {
64 Message::Text(text) => self.messages.send(WsMessage::Text(text)).await,
65 Message::Close { code, reason } => match (code, reason) {
66 (Some(code), Some(reason)) => self.meta.close_reason(code, reason).await,
67 (Some(code), _) => self.meta.close_code(code).await,
68 _ => self.meta.close().await,
69 }
70 .map(|_| ()),
71 Message::Ping | Message::Pong => return Ok(()),
72 }
73 .map_err(|error| Error::Send(error.to_string()))
74 }
75}
76
77impl Connection {
78 async fn next(&mut self) -> Option<EventOrMessage> {
79 let event = async { self.event_stream.next().await.map(EventOrMessage::Event) };
80 let message = async { self.messages.next().await.map(EventOrMessage::Message) };
81
82 event.race(message).await
83 }
84}
85
86enum EventOrMessage {
87 Event(WsEvent),
88 Message(WsMessage),
89}