dcl_rpc/transports/web_sockets/
mod.rs

1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use log::debug;
5use tokio::time::interval;
6
7use super::{Transport, TransportError, TransportMessage};
8
9#[cfg(feature = "tungstenite")]
10pub mod tungstenite;
11
12#[cfg(feature = "warp")]
13pub mod warp;
14
15#[derive(Debug)]
16pub enum Error {
17    ConnectionClosed,
18    AlreadyClosed,
19    Other(Box<dyn std::error::Error + Sync + Send>),
20}
21
22#[derive(Debug)]
23pub enum Message {
24    Text(String),
25    Binary(Vec<u8>),
26    Ping,
27    Pong,
28    Close,
29}
30
31#[async_trait]
32pub trait WebSocket: Send + Sync + 'static {
33    async fn send(&self, message: Message) -> Result<(), Error>;
34
35    async fn receive(&self) -> Option<Result<Message, Error>>;
36
37    async fn close(&self) -> Result<(), Error>;
38
39    async fn ping_every(self: Arc<Self>, ping_interval: Duration) {
40        tokio::spawn(async move {
41            let mut ping_interval = interval(ping_interval);
42            loop {
43                ping_interval.tick().await;
44                _ = self.send(Message::Ping).await;
45            }
46        });
47    }
48}
49
50pub struct WebSocketTransport<WebSocket, Context> {
51    websocket: Arc<WebSocket>,
52    pub context: Context,
53}
54
55impl<WebSocket> WebSocketTransport<WebSocket, ()> {
56    /// Crates a new [`WebSocketTransport`] from a websocket connection generated by [`WebSocketServer`] or [`WebSocketClient`]
57    pub fn new(websocket: Arc<WebSocket>) -> Self {
58        Self {
59            websocket,
60            context: (),
61        }
62    }
63}
64
65impl<WebSocket, Context> WebSocketTransport<WebSocket, Context> {
66    pub fn with_context(websocket: Arc<WebSocket>, context: Context) -> Self {
67        Self { websocket, context }
68    }
69}
70
71#[async_trait]
72impl<W: WebSocket, C: Send + Sync + 'static> Transport for WebSocketTransport<W, C> {
73    async fn receive(&self) -> Result<TransportMessage, TransportError> {
74        loop {
75            match self.websocket.receive().await {
76                Some(Ok(message)) => match message {
77                    Message::Binary(data) => return Ok(data),
78                    Message::Ping | Message::Pong => continue,
79                    Message::Close => return Err(TransportError::Closed),
80                    _ => return Err(TransportError::NotBinaryMessage),
81                },
82                Some(Err(err)) => {
83                    debug!("> WebSocketTransport > Failed to receive message {:?}", err);
84                    match err {
85                        Error::ConnectionClosed | Error::AlreadyClosed => {
86                            return Err(TransportError::Closed)
87                        }
88                        Error::Other(error) => return Err(TransportError::Internal(error)),
89                    }
90                }
91                None => {
92                    debug!("> WebSocketTransport > None received > Closing...");
93                    return Err(TransportError::Closed);
94                }
95            }
96        }
97    }
98
99    async fn send(&self, message: Vec<u8>) -> Result<(), TransportError> {
100        let message = Message::Binary(message);
101        match self.websocket.send(message).await {
102            Err(err) => {
103                debug!(
104                    "> WebSocketTransport > Error on sending in a ws connection {:?}",
105                    err
106                );
107
108                let error = match err {
109                    Error::ConnectionClosed | Error::AlreadyClosed => TransportError::Closed,
110                    Error::Other(error) => TransportError::Internal(error),
111                };
112
113                Err(error)
114            }
115            Ok(_) => Ok(()),
116        }
117    }
118
119    async fn close(&self) {
120        match self.websocket.close().await {
121            Ok(_) => {
122                debug!("> WebSocketTransport > Closed successfully")
123            }
124            Err(err) => {
125                debug!("> WebSocketTransport > Error: Couldn't close tranport: {err:?}")
126            }
127        }
128    }
129}
130
131pub fn convert<M, E>(value: Result<M, E>) -> Result<Message, Error>
132where
133    M: Into<Message>,
134    E: Into<Error>,
135{
136    value.map(|m| m.into()).map_err(|e| e.into())
137}