tide_websockets_sink/
websocket_connection.rs

1use std::pin::Pin;
2
3use async_dup::{Arc, Mutex};
4use async_std::task;
5use async_tungstenite::WebSocketStream;
6use futures_util::sink::Sink;
7use futures_util::stream::{SplitSink, SplitStream, Stream};
8use futures_util::{SinkExt, StreamExt};
9
10use crate::Message;
11use tide::http::upgrade::Connection;
12
13/// # WebSocket connection
14///
15/// This is the type that the handler passed to [`WebSocket::new`]
16/// receives. It represents a bidirectional stream of websocket data.
17#[derive(Clone, Debug)]
18pub struct WebSocketConnection(
19    Arc<Mutex<SplitSink<WebSocketStream<Connection>, Message>>>,
20    Arc<Mutex<SplitStream<WebSocketStream<Connection>>>>,
21);
22
23impl WebSocketConnection {
24    /// Sends a string message to the connected websocket client. This is equivalent to `.send(Message::Text(string))`
25    pub async fn send_string(&self, string: String) -> async_tungstenite::tungstenite::Result<()> {
26        self.send(Message::Text(string)).await
27    }
28
29    /// Sends a binary message to the connected websocket client. This is equivalent to `.send(Message::Binary(bytes))`
30    pub async fn send_bytes(&self, bytes: Vec<u8>) -> async_tungstenite::tungstenite::Result<()> {
31        self.send(Message::Binary(bytes)).await
32    }
33
34    /// Sends a [`Message`] to the client
35    pub async fn send(&self, message: Message) -> async_tungstenite::tungstenite::Result<()> {
36        self.0.lock().send(message).await?;
37        Ok(())
38    }
39
40    /// Sends the serde_json serialization of the provided type as a string to the connected websocket client
41    pub async fn send_json(&self, json: &impl serde::Serialize) -> tide::Result<()> {
42        self.send_string(serde_json::to_string(json)?).await?;
43        Ok(())
44    }
45
46    pub(crate) fn new(ws: WebSocketStream<Connection>) -> Self {
47        let (s, r) = ws.split();
48        Self(Arc::new(Mutex::new(s)), Arc::new(Mutex::new(r)))
49    }
50}
51
52impl Stream for WebSocketConnection {
53    type Item = Result<Message, async_tungstenite::tungstenite::Error>;
54
55    fn poll_next(
56        self: Pin<&mut Self>,
57        cx: &mut task::Context<'_>,
58    ) -> task::Poll<Option<Self::Item>> {
59        Pin::new(&mut *self.1.lock()).poll_next(cx)
60    }
61}
62
63impl Sink<Message> for WebSocketConnection {
64    type Error = async_tungstenite::tungstenite::Error;
65
66    fn poll_ready(
67        self: Pin<&mut Self>,
68        cx: &mut task::Context<'_>,
69    ) -> task::Poll<Result<(), Self::Error>> {
70        Pin::new(&mut *self.0.lock()).poll_ready(cx)
71    }
72
73    fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
74        Pin::new(&mut *self.0.lock()).start_send(item)
75    }
76
77    fn poll_flush(
78        self: Pin<&mut Self>,
79        cx: &mut task::Context<'_>,
80    ) -> task::Poll<Result<(), Self::Error>> {
81        Pin::new(&mut *self.0.lock()).poll_flush(cx)
82    }
83
84    fn poll_close(
85        self: Pin<&mut Self>,
86        cx: &mut task::Context<'_>,
87    ) -> task::Poll<Result<(), Self::Error>> {
88        Pin::new(&mut *self.0.lock()).poll_close(cx)
89    }
90}
91
92impl From<WebSocketStream<Connection>> for WebSocketConnection {
93    fn from(ws: WebSocketStream<Connection>) -> Self {
94        Self::new(ws)
95    }
96}