dcl_rpc/transports/web_sockets/
mod.rs1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use log::debug;
5use tokio::time::interval;
6
7use super::{Transport, TransportError, TransportMessage};
8
9#[cfg(feature = "tungstenite")]
10pub mod tungstenite;
11
12#[cfg(feature = "warp")]
13pub mod warp;
14
15#[derive(Debug)]
16pub enum Error {
17 ConnectionClosed,
18 AlreadyClosed,
19 Other(Box<dyn std::error::Error + Sync + Send>),
20}
21
22#[derive(Debug)]
23pub enum Message {
24 Text(String),
25 Binary(Vec<u8>),
26 Ping,
27 Pong,
28 Close,
29}
30
31#[async_trait]
32pub trait WebSocket: Send + Sync + 'static {
33 async fn send(&self, message: Message) -> Result<(), Error>;
34
35 async fn receive(&self) -> Option<Result<Message, Error>>;
36
37 async fn close(&self) -> Result<(), Error>;
38
39 async fn ping_every(self: Arc<Self>, ping_interval: Duration) {
40 tokio::spawn(async move {
41 let mut ping_interval = interval(ping_interval);
42 loop {
43 ping_interval.tick().await;
44 _ = self.send(Message::Ping).await;
45 }
46 });
47 }
48}
49
50pub struct WebSocketTransport<WebSocket, Context> {
51 websocket: Arc<WebSocket>,
52 pub context: Context,
53}
54
55impl<WebSocket> WebSocketTransport<WebSocket, ()> {
56 pub fn new(websocket: Arc<WebSocket>) -> Self {
58 Self {
59 websocket,
60 context: (),
61 }
62 }
63}
64
65impl<WebSocket, Context> WebSocketTransport<WebSocket, Context> {
66 pub fn with_context(websocket: Arc<WebSocket>, context: Context) -> Self {
67 Self { websocket, context }
68 }
69}
70
71#[async_trait]
72impl<W: WebSocket, C: Send + Sync + 'static> Transport for WebSocketTransport<W, C> {
73 async fn receive(&self) -> Result<TransportMessage, TransportError> {
74 loop {
75 match self.websocket.receive().await {
76 Some(Ok(message)) => match message {
77 Message::Binary(data) => return Ok(data),
78 Message::Ping | Message::Pong => continue,
79 Message::Close => return Err(TransportError::Closed),
80 _ => return Err(TransportError::NotBinaryMessage),
81 },
82 Some(Err(err)) => {
83 debug!("> WebSocketTransport > Failed to receive message {:?}", err);
84 match err {
85 Error::ConnectionClosed | Error::AlreadyClosed => {
86 return Err(TransportError::Closed)
87 }
88 Error::Other(error) => return Err(TransportError::Internal(error)),
89 }
90 }
91 None => {
92 debug!("> WebSocketTransport > None received > Closing...");
93 return Err(TransportError::Closed);
94 }
95 }
96 }
97 }
98
99 async fn send(&self, message: Vec<u8>) -> Result<(), TransportError> {
100 let message = Message::Binary(message);
101 match self.websocket.send(message).await {
102 Err(err) => {
103 debug!(
104 "> WebSocketTransport > Error on sending in a ws connection {:?}",
105 err
106 );
107
108 let error = match err {
109 Error::ConnectionClosed | Error::AlreadyClosed => TransportError::Closed,
110 Error::Other(error) => TransportError::Internal(error),
111 };
112
113 Err(error)
114 }
115 Ok(_) => Ok(()),
116 }
117 }
118
119 async fn close(&self) {
120 match self.websocket.close().await {
121 Ok(_) => {
122 debug!("> WebSocketTransport > Closed successfully")
123 }
124 Err(err) => {
125 debug!("> WebSocketTransport > Error: Couldn't close tranport: {err:?}")
126 }
127 }
128 }
129}
130
131pub fn convert<M, E>(value: Result<M, E>) -> Result<Message, Error>
132where
133 M: Into<Message>,
134 E: Into<Error>,
135{
136 value.map(|m| m.into()).map_err(|e| e.into())
137}