1use std::fmt;
2use std::marker::PhantomData;
3
4use crate::codec::{DecodeError, EncodeError, WsCodec, WsMessage};
5
6#[derive(Debug)]
8pub enum SendError {
9 Encode(EncodeError),
10 Closed,
11}
12
13impl fmt::Display for SendError {
14 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
15 match self {
16 SendError::Encode(e) => write!(f, "send error: {e}"),
17 SendError::Closed => write!(f, "connection closed"),
18 }
19 }
20}
21
22impl std::error::Error for SendError {}
23
24impl From<EncodeError> for SendError {
25 fn from(e: EncodeError) -> Self {
26 SendError::Encode(e)
27 }
28}
29
30#[derive(Debug)]
32pub enum RecvError {
33 Decode(DecodeError),
34 Closed,
35}
36
37impl fmt::Display for RecvError {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 match self {
40 RecvError::Decode(e) => write!(f, "recv error: {e}"),
41 RecvError::Closed => write!(f, "connection closed"),
42 }
43 }
44}
45
46impl std::error::Error for RecvError {}
47
48impl From<DecodeError> for RecvError {
49 fn from(e: DecodeError) -> Self {
50 RecvError::Decode(e)
51 }
52}
53
54pub struct WsSender<S: WsCodec> {
58 pub(crate) sink: Box<dyn ErasedSink>,
59 pub(crate) _send: PhantomData<S>,
60}
61
62impl<S: WsCodec> WsSender<S> {
63 pub async fn send(&mut self, msg: S) -> Result<(), SendError> {
64 let ws_msg = msg.encode()?;
65 self.sink.send(ws_msg).await.map_err(|_| SendError::Closed)
66 }
67
68 pub async fn close(&mut self) -> Result<(), SendError> {
69 self.sink.close().await.map_err(|_| SendError::Closed)
70 }
71}
72
73pub struct WsReceiver<R: WsCodec> {
77 pub(crate) stream: Box<dyn ErasedStream>,
78 pub(crate) _recv: PhantomData<R>,
79}
80
81impl<R: WsCodec> WsReceiver<R> {
82 pub async fn recv(&mut self) -> Option<Result<R, RecvError>> {
83 match self.stream.next().await {
84 None => None,
85 Some(Err(_)) => Some(Err(RecvError::Closed)),
86 Some(Ok(ws_msg)) => Some(R::decode(ws_msg).map_err(RecvError::Decode)),
87 }
88 }
89}
90
91pub struct WsConnection<S: WsCodec, R: WsCodec> {
96 pub(crate) sink: Box<dyn ErasedSink>,
97 pub(crate) stream: Box<dyn ErasedStream>,
98 pub(crate) _types: PhantomData<(S, R)>,
99}
100
101impl<S: WsCodec, R: WsCodec> WsConnection<S, R> {
102 pub async fn send(&mut self, msg: S) -> Result<(), SendError> {
103 let ws_msg = msg.encode()?;
104 self.sink.send(ws_msg).await.map_err(|_| SendError::Closed)
105 }
106
107 pub async fn recv(&mut self) -> Option<Result<R, RecvError>> {
108 match self.stream.next().await {
109 None => None,
110 Some(Err(_)) => Some(Err(RecvError::Closed)),
111 Some(Ok(ws_msg)) => Some(R::decode(ws_msg).map_err(RecvError::Decode)),
112 }
113 }
114
115 pub fn split(self) -> (WsSender<S>, WsReceiver<R>) {
116 (
117 WsSender {
118 sink: self.sink,
119 _send: PhantomData,
120 },
121 WsReceiver {
122 stream: self.stream,
123 _recv: PhantomData,
124 },
125 )
126 }
127}
128
129pub(crate) type BoxFuture<'a, T> =
132 std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
133
134pub(crate) trait ErasedSink: Send {
136 fn send(&mut self, msg: WsMessage) -> BoxFuture<'_, Result<(), ()>>;
137 fn close(&mut self) -> BoxFuture<'_, Result<(), ()>>;
138}
139
140pub(crate) trait ErasedStream: Send {
142 fn next(&mut self) -> BoxFuture<'_, Option<Result<WsMessage, ()>>>;
143}