tide_websockets/
websocket_connection.rs1use std::pin::Pin;
2
3use async_dup::{Arc, Mutex};
4use async_std::task;
5use async_tungstenite::WebSocketStream;
6use futures_util::stream::{SplitSink, SplitStream, Stream};
7use futures_util::{SinkExt, StreamExt};
8
9use crate::Message;
10use tide::http::upgrade::Connection;
11
12#[derive(Clone, Debug)]
17pub struct WebSocketConnection(
18 Arc<Mutex<SplitSink<WebSocketStream<Connection>, Message>>>,
19 Arc<Mutex<SplitStream<WebSocketStream<Connection>>>>,
20);
21
22impl WebSocketConnection {
23 pub async fn send_string(&self, string: String) -> async_tungstenite::tungstenite::Result<()> {
25 self.send(Message::Text(string)).await
26 }
27
28 pub async fn send_bytes(&self, bytes: Vec<u8>) -> async_tungstenite::tungstenite::Result<()> {
30 self.send(Message::Binary(bytes)).await
31 }
32
33 pub async fn send(&self, message: Message) -> async_tungstenite::tungstenite::Result<()> {
35 self.0.lock().send(message).await?;
36 Ok(())
37 }
38
39 pub async fn send_json(&self, json: &impl serde::Serialize) -> tide::Result<()> {
41 self.send_string(serde_json::to_string(json)?).await?;
42 Ok(())
43 }
44
45 pub(crate) fn new(ws: WebSocketStream<Connection>) -> Self {
46 let (s, r) = ws.split();
47 Self(Arc::new(Mutex::new(s)), Arc::new(Mutex::new(r)))
48 }
49}
50
51impl Stream for WebSocketConnection {
52 type Item = Result<Message, async_tungstenite::tungstenite::Error>;
53
54 fn poll_next(
55 self: Pin<&mut Self>,
56 cx: &mut task::Context<'_>,
57 ) -> task::Poll<Option<Self::Item>> {
58 Pin::new(&mut *self.1.lock()).poll_next(cx)
59 }
60}
61
62impl From<WebSocketStream<Connection>> for WebSocketConnection {
63 fn from(ws: WebSocketStream<Connection>) -> Self {
64 Self::new(ws)
65 }
66}