tide_websockets_sink/
websocket_connection.rs1use std::pin::Pin;
2
3use async_dup::{Arc, Mutex};
4use async_std::task;
5use async_tungstenite::WebSocketStream;
6use futures_util::sink::Sink;
7use futures_util::stream::{SplitSink, SplitStream, Stream};
8use futures_util::{SinkExt, StreamExt};
9
10use crate::Message;
11use tide::http::upgrade::Connection;
12
13#[derive(Clone, Debug)]
18pub struct WebSocketConnection(
19 Arc<Mutex<SplitSink<WebSocketStream<Connection>, Message>>>,
20 Arc<Mutex<SplitStream<WebSocketStream<Connection>>>>,
21);
22
23impl WebSocketConnection {
24 pub async fn send_string(&self, string: String) -> async_tungstenite::tungstenite::Result<()> {
26 self.send(Message::Text(string)).await
27 }
28
29 pub async fn send_bytes(&self, bytes: Vec<u8>) -> async_tungstenite::tungstenite::Result<()> {
31 self.send(Message::Binary(bytes)).await
32 }
33
34 pub async fn send(&self, message: Message) -> async_tungstenite::tungstenite::Result<()> {
36 self.0.lock().send(message).await?;
37 Ok(())
38 }
39
40 pub async fn send_json(&self, json: &impl serde::Serialize) -> tide::Result<()> {
42 self.send_string(serde_json::to_string(json)?).await?;
43 Ok(())
44 }
45
46 pub(crate) fn new(ws: WebSocketStream<Connection>) -> Self {
47 let (s, r) = ws.split();
48 Self(Arc::new(Mutex::new(s)), Arc::new(Mutex::new(r)))
49 }
50}
51
52impl Stream for WebSocketConnection {
53 type Item = Result<Message, async_tungstenite::tungstenite::Error>;
54
55 fn poll_next(
56 self: Pin<&mut Self>,
57 cx: &mut task::Context<'_>,
58 ) -> task::Poll<Option<Self::Item>> {
59 Pin::new(&mut *self.1.lock()).poll_next(cx)
60 }
61}
62
63impl Sink<Message> for WebSocketConnection {
64 type Error = async_tungstenite::tungstenite::Error;
65
66 fn poll_ready(
67 self: Pin<&mut Self>,
68 cx: &mut task::Context<'_>,
69 ) -> task::Poll<Result<(), Self::Error>> {
70 Pin::new(&mut *self.0.lock()).poll_ready(cx)
71 }
72
73 fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
74 Pin::new(&mut *self.0.lock()).start_send(item)
75 }
76
77 fn poll_flush(
78 self: Pin<&mut Self>,
79 cx: &mut task::Context<'_>,
80 ) -> task::Poll<Result<(), Self::Error>> {
81 Pin::new(&mut *self.0.lock()).poll_flush(cx)
82 }
83
84 fn poll_close(
85 self: Pin<&mut Self>,
86 cx: &mut task::Context<'_>,
87 ) -> task::Poll<Result<(), Self::Error>> {
88 Pin::new(&mut *self.0.lock()).poll_close(cx)
89 }
90}
91
92impl From<WebSocketStream<Connection>> for WebSocketConnection {
93 fn from(ws: WebSocketStream<Connection>) -> Self {
94 Self::new(ws)
95 }
96}