cross_ws/backend/switch/
tungstenite.rs1use crate::prelude::*;
4use crate::error::WebsocketResult;
5use tokio_tungstenite::{connect_async, WebSocketStream, MaybeTlsStream};
6
7pub type BackendError = tokio_tungstenite::tungstenite::Error;
9use tokio::net::TcpStream;
10use futures_util::SinkExt;
11use std::sync::{Arc, Mutex};
12
13use crate::message::Message;
14use futures_util::StreamExt;
15use futures_util::stream::{SplitSink, SplitStream};
18
19impl From<tokio_tungstenite::tungstenite::Message> for Message {
20 fn from(message: tokio_tungstenite::tungstenite::Message) -> Self {
21 match message {
22 tokio_tungstenite::tungstenite::Message::Binary(binary) => {
23 Message::Binary(binary.into())
24 },
25 tokio_tungstenite::tungstenite::Message::Text(text) => {
26 Message::Text(text.as_str().into())
27 },
28 tokio_tungstenite::tungstenite::Message::Close(close) => {
29 let close = close.map(|close| close.reason.to_string());
30 Message::Close(close)
31 },
32 _ => unimplemented!("Some message types aren't implemented yet: {:#?}", message)
33 }
34 }
35}
36
37#[derive(Derivative)]
39#[derivative(Debug)]
40pub struct WebSocket {}
41
42#[derive(Derivative, Clone)]
44#[derivative(Debug)]
45pub struct WebSocketSender {
46 #[derivative(Debug="ignore")]
47 sender: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tokio_tungstenite::tungstenite::Message>>>
48}
49
50#[derive(Derivative)]
52#[derivative(Debug)]
53pub struct WebSocketReceiver {
54 #[derivative(Debug="ignore")]
55 receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>
56}
57
58impl WebSocket {
59 pub async fn new(url: &str) -> WebsocketResult<(WebSocketSender, WebSocketReceiver)> {
62 connect_async(url)
63 .await
64 .map(|(socket, _response)| {
65 let (sender, receiver) = socket.split();
66 let sender = Arc::new(Mutex::new(sender));
67 (WebSocketSender { sender }, WebSocketReceiver { receiver })
68 })
69 .map_err(|error| crate::error::Error::ConnectionError(error))
70 }
71}
72
73impl WebSocketSender {
74 pub async fn send(&mut self, message: &Message) -> WebsocketResult<()> {
76 match message {
77 Message::Text(text) => {
78 let text = text.clone().into();
79 self.sender.lock().expect("Failed to lock sender.").send(text).await.map_err(|error| crate::error::Error::SendError(error))
80 },
81 Message::Binary(binary) => {
82 let binary = binary.clone().into();
83 self.sender.lock().expect("Failed to lock sender.").send(binary).await.map_err(|error| crate::error::Error::SendError(error))
84 },
85 Message::Close(_close) => {
86 self.sender.lock().expect("Failed to lock sender.").close().await.map_err(|error| crate::error::Error::SendError(error))
87 }
88 }
89 }
90
91 pub async fn close(&mut self, message: Option<String>) -> WebsocketResult<()> {
93 self.send(&Message::Close(message)).await
94 }
95}
96
97impl WebSocketReceiver {
98 pub async fn next(&mut self) -> Option<WebsocketResult<Message>> {
100 self.receiver.next().await.map(|result| {
101 result
102 .map(|result| {
103 result.into()
104 })
105 .map_err(|error| {
106 crate::error::Error::ReceiveError(error)
107 })
108 })
109 }
110}