1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use std::pin::Pin;
use async_dup::{Arc, Mutex};
use async_std::task;
use async_tungstenite::WebSocketStream;
use futures_util::stream::{SplitSink, SplitStream, Stream};
use futures_util::{SinkExt, StreamExt};
use crate::Message;
use tide::http::upgrade::Connection;
#[derive(Clone, Debug)]
pub struct WebSocketConnection(
Arc<Mutex<SplitSink<WebSocketStream<Connection>, Message>>>,
Arc<Mutex<SplitStream<WebSocketStream<Connection>>>>,
);
impl WebSocketConnection {
pub async fn send_string(&self, s: String) -> tide::Result<()> {
self.0.lock().send(Message::Text(s)).await?;
Ok(())
}
pub async fn send_bytes(&self, bytes: Vec<u8>) -> tide::Result<()> {
self.0.lock().send(Message::Binary(bytes)).await?;
Ok(())
}
pub async fn send_json(&self, json: &impl serde::Serialize) -> tide::Result<()> {
self.send_string(serde_json::to_string(json)?).await
}
pub(crate) fn new(ws: WebSocketStream<Connection>) -> Self {
let (s, r) = ws.split();
Self(Arc::new(Mutex::new(s)), Arc::new(Mutex::new(r)))
}
}
impl Stream for WebSocketConnection {
type Item = Result<Message, async_tungstenite::tungstenite::Error>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
Pin::new(&mut *self.1.lock()).poll_next(cx)
}
}
impl From<WebSocketStream<Connection>> for WebSocketConnection {
fn from(ws: WebSocketStream<Connection>) -> Self {
Self::new(ws)
}
}