ws_tool/codec/binary/
blocking.rs

1use http;
2use crate::{
3    codec::{
4        FrameCodec, FrameConfig, FrameReadState, FrameRecv, FrameSend, FrameWriteState, Split,
5    },
6    errors::WsError,
7    frame::OpCode,
8    protocol::standard_handshake_resp_check,
9    Message,
10};
11use bytes::Buf;
12use std::borrow::Cow;
13use std::io::{Read, Write};
14
15macro_rules! impl_recv {
16    () => {
17        /// receive a message
18        pub fn receive(&mut self) -> Result<Message<Cow<[u8]>>, WsError> {
19            let (header, mut data) = self.frame_codec.receive()?;
20            let close_code = if header.code == OpCode::Close {
21                let code = if data.len() >= 2 {
22                    data.get_u16()
23                } else {
24                    1000
25                };
26                Some(code)
27            } else {
28                None
29            };
30            Ok(Message {
31                code: header.code,
32                data: Cow::Borrowed(data),
33                close_code,
34            })
35        }
36    };
37}
38
39macro_rules! impl_send {
40    () => {
41        /// helper method to send ping message
42        pub fn ping<'a>(&mut self, msg: &'a [u8]) -> Result<(), WsError> {
43            self.send((OpCode::Ping, msg))
44        }
45
46        /// helper method to send pong message
47        pub fn pong<'a>(&mut self, msg: &'a [u8]) -> Result<(), WsError> {
48            self.send((OpCode::Pong, msg))
49        }
50
51        /// helper method to send close message
52        pub fn close<'a>(&mut self, code: u16, msg: &'a [u8]) -> Result<(), WsError> {
53            self.send((code, msg))
54        }
55
56        /// send a message
57        pub fn send<'a, T: Into<Message<Cow<'a, [u8]>>>>(&mut self, msg: T) -> Result<(), WsError> {
58            let msg: Message<Cow<'a, [u8]>> = msg.into();
59            if let Some(close_code) = msg.close_code {
60                if msg.code == OpCode::Close {
61                    let mut data = close_code.to_be_bytes().to_vec();
62                    data.extend_from_slice(msg.data.as_ref());
63                    self.frame_codec.send(msg.code, &data)
64                } else {
65                    self.frame_codec.send(msg.code, msg.data.as_ref())
66                }
67            } else {
68                self.frame_codec.send(msg.code, msg.data.as_ref())
69            }
70        }
71
72        /// flush underlying stream
73        pub fn flush(&mut self) -> Result<(), WsError> {
74            self.frame_codec.flush()
75        }
76    };
77}
78
79/// recv part of bytes message
80pub struct BytesRecv<S: Read> {
81    frame_codec: FrameRecv<S>,
82}
83
84impl<S: Read> BytesRecv<S> {
85    /// construct method
86    pub fn new(stream: S, state: FrameReadState) -> Self {
87        Self {
88            frame_codec: FrameRecv::new(stream, state),
89        }
90    }
91
92    impl_recv! {}
93}
94
95/// send part of bytes message
96pub struct BytesSend<S: Write> {
97    frame_codec: FrameSend<S>,
98}
99
100impl<S: Write> BytesSend<S> {
101    /// construct method
102    pub fn new(stream: S, state: FrameWriteState) -> Self {
103        Self {
104            frame_codec: FrameSend::new(stream, state),
105        }
106    }
107
108    impl_send! {}
109}
110
111/// recv/send bytes message
112pub struct BytesCodec<S: Read + Write> {
113    frame_codec: FrameCodec<S>,
114}
115
116impl<S: Read + Write> BytesCodec<S> {
117    /// construct method
118    pub fn new(stream: S) -> Self {
119        Self {
120            frame_codec: FrameCodec::new(stream),
121        }
122    }
123
124    /// construct with stream & config
125    pub fn new_with(stream: S, config: FrameConfig) -> Self {
126        Self {
127            frame_codec: FrameCodec::new_with(stream, config),
128        }
129    }
130
131    /// used for server side to construct a new server
132    pub fn factory(_req: http::Request<()>, stream: S) -> Result<Self, WsError> {
133        let config = FrameConfig {
134            mask_send_frame: false,
135            ..Default::default()
136        };
137        Ok(Self::new_with(stream, config))
138    }
139
140    /// used to client side to construct a new client
141    pub fn check_fn(key: String, resp: http::Response<()>, stream: S) -> Result<Self, WsError> {
142        standard_handshake_resp_check(key.as_bytes(), &resp)?;
143        Ok(Self::new_with(stream, FrameConfig::default()))
144    }
145
146    /// get mutable underlying stream
147    pub fn stream_mut(&mut self) -> &mut S {
148        self.frame_codec.stream_mut()
149    }
150
151    impl_recv! {}
152
153    impl_send! {}
154}
155
156impl<R, W, S> BytesCodec<S>
157where
158    R: Read,
159    W: Write,
160    S: Read + Write + Split<R = R, W = W>,
161{
162    /// split codec to recv and send parts
163    pub fn split(self) -> (BytesRecv<R>, BytesSend<W>) {
164        let FrameCodec {
165            stream,
166            read_state,
167            write_state,
168        } = self.frame_codec;
169        let (read, write) = stream.split();
170        (
171            BytesRecv::new(read, read_state),
172            BytesSend::new(write, write_state),
173        )
174    }
175}