Skip to main content

ws_bridge/
connection.rs

1use std::fmt;
2use std::marker::PhantomData;
3
4use crate::codec::{DecodeError, EncodeError, WsCodec, WsMessage};
5
6/// Error when sending a message.
7#[derive(Debug)]
8pub enum SendError {
9    Encode(EncodeError),
10    Closed,
11}
12
13impl fmt::Display for SendError {
14    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
15        match self {
16            SendError::Encode(e) => write!(f, "send error: {e}"),
17            SendError::Closed => write!(f, "connection closed"),
18        }
19    }
20}
21
22impl std::error::Error for SendError {}
23
24impl From<EncodeError> for SendError {
25    fn from(e: EncodeError) -> Self {
26        SendError::Encode(e)
27    }
28}
29
30/// Error when receiving a message.
31#[derive(Debug)]
32pub enum RecvError {
33    Decode(DecodeError),
34    Closed,
35}
36
37impl fmt::Display for RecvError {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        match self {
40            RecvError::Decode(e) => write!(f, "recv error: {e}"),
41            RecvError::Closed => write!(f, "connection closed"),
42        }
43    }
44}
45
46impl std::error::Error for RecvError {}
47
48impl From<DecodeError> for RecvError {
49    fn from(e: DecodeError) -> Self {
50        RecvError::Decode(e)
51    }
52}
53
54/// The sending half of a typed WebSocket connection.
55///
56/// Obtained by calling [`WsConnection::split`].
57pub struct WsSender<S: WsCodec> {
58    pub(crate) sink: Box<dyn ErasedSink>,
59    pub(crate) _send: PhantomData<S>,
60}
61
62impl<S: WsCodec> WsSender<S> {
63    pub async fn send(&mut self, msg: S) -> Result<(), SendError> {
64        let ws_msg = msg.encode()?;
65        self.sink.send(ws_msg).await.map_err(|_| SendError::Closed)
66    }
67
68    pub async fn close(&mut self) -> Result<(), SendError> {
69        self.sink.close().await.map_err(|_| SendError::Closed)
70    }
71}
72
73/// The receiving half of a typed WebSocket connection.
74///
75/// Obtained by calling [`WsConnection::split`].
76pub struct WsReceiver<R: WsCodec> {
77    pub(crate) stream: Box<dyn ErasedStream>,
78    pub(crate) _recv: PhantomData<R>,
79}
80
81impl<R: WsCodec> WsReceiver<R> {
82    pub async fn recv(&mut self) -> Option<Result<R, RecvError>> {
83        match self.stream.next().await {
84            None => None,
85            Some(Err(_)) => Some(Err(RecvError::Closed)),
86            Some(Ok(ws_msg)) => Some(R::decode(ws_msg).map_err(RecvError::Decode)),
87        }
88    }
89}
90
91/// A typed WebSocket connection parameterized by send and receive types.
92///
93/// On the server side: `Send = E::ServerMsg`, `Recv = E::ClientMsg`.
94/// On the client side: `Send = E::ClientMsg`, `Recv = E::ServerMsg`.
95pub struct WsConnection<S: WsCodec, R: WsCodec> {
96    pub(crate) sink: Box<dyn ErasedSink>,
97    pub(crate) stream: Box<dyn ErasedStream>,
98    pub(crate) _types: PhantomData<(S, R)>,
99}
100
101impl<S: WsCodec, R: WsCodec> WsConnection<S, R> {
102    pub async fn send(&mut self, msg: S) -> Result<(), SendError> {
103        let ws_msg = msg.encode()?;
104        self.sink.send(ws_msg).await.map_err(|_| SendError::Closed)
105    }
106
107    pub async fn recv(&mut self) -> Option<Result<R, RecvError>> {
108        match self.stream.next().await {
109            None => None,
110            Some(Err(_)) => Some(Err(RecvError::Closed)),
111            Some(Ok(ws_msg)) => Some(R::decode(ws_msg).map_err(RecvError::Decode)),
112        }
113    }
114
115    pub fn split(self) -> (WsSender<S>, WsReceiver<R>) {
116        (
117            WsSender {
118                sink: self.sink,
119                _send: PhantomData,
120            },
121            WsReceiver {
122                stream: self.stream,
123                _recv: PhantomData,
124            },
125        )
126    }
127}
128
129// -- Erased trait objects for transport independence --
130
131pub(crate) type BoxFuture<'a, T> =
132    std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
133
134/// Type-erased sink that can send `WsMessage`s.
135pub(crate) trait ErasedSink: Send {
136    fn send(&mut self, msg: WsMessage) -> BoxFuture<'_, Result<(), ()>>;
137    fn close(&mut self) -> BoxFuture<'_, Result<(), ()>>;
138}
139
140/// Type-erased stream that yields `WsMessage`s.
141pub(crate) trait ErasedStream: Send {
142    fn next(&mut self) -> BoxFuture<'_, Option<Result<WsMessage, ()>>>;
143}