cross_ws/websocket/backend/switch/tungstenite/
mod.rs1mod sender;
4mod receiver;
5
6use futures_util::StreamExt;
7pub use sender::*;
8pub use receiver::*;
9
10use crate::{prelude::*, traits::WebSocketTrait};
11use crate::websocket::Result;
12use tokio_tungstenite::connect_async;
13
14pub type BackendError = tokio_tungstenite::tungstenite::Error;
16use std::future::Future;
17use std::sync::{Arc, Mutex};
18
19use crate::message::Message;
20use std::convert::TryFrom;
21
22impl TryFrom<tokio_tungstenite::tungstenite::Message> for Message {
23 type Error = crate::error::Error;
24
25 fn try_from(message: tokio_tungstenite::tungstenite::Message) -> std::result::Result<Self, Self::Error> {
26 match message {
27 tokio_tungstenite::tungstenite::Message::Binary(binary) => {
28 Ok(Message::Binary(binary.into()))
29 },
30 tokio_tungstenite::tungstenite::Message::Text(text) => {
31 Ok(Message::Text(text.as_str().into()))
32 },
33 tokio_tungstenite::tungstenite::Message::Close(close) => {
34 let close = close.map(|close| (close.code.into(), close.reason.to_string()));
35 Ok(Message::Close(close))
36 },
37 other => {
38 Err(crate::error::Error::UnsupportedMessageType(format!("Unsupported message type: {:#?}", other)))
39 }
40 }
41 }
42}
43
44#[derive(Derivative)]
46#[derivative(Debug)]
47pub struct WebSocket;
48
49impl WebSocketTrait for WebSocket {
50 type Sender = WebSocketSender;
51 type Receiver = WebSocketReceiver;
52
53 fn new(url: &str) -> impl Future<Output = Result<(WebSocketSender, WebSocketReceiver)>> {
56 async move {
57 connect_async(url)
58 .await
59 .map(|(socket, _response)| {
60 let (sender, receiver) = socket.split();
61 let sender = Arc::new(Mutex::new(sender));
62 let sender_clone = sender.clone();
63 (WebSocketSender::from(sender), WebSocketReceiver::from((receiver, sender_clone)))
64 })
65 .map_err(|error| crate::error::Error::ConnectionError(error))
66 }
67 }
68}
69