ws_tool/codec/binary/
blocking.rs1use http;
2use crate::{
3 codec::{
4 FrameCodec, FrameConfig, FrameReadState, FrameRecv, FrameSend, FrameWriteState, Split,
5 },
6 errors::WsError,
7 frame::OpCode,
8 protocol::standard_handshake_resp_check,
9 Message,
10};
11use bytes::Buf;
12use std::borrow::Cow;
13use std::io::{Read, Write};
14
15macro_rules! impl_recv {
16 () => {
17 pub fn receive(&mut self) -> Result<Message<Cow<[u8]>>, WsError> {
19 let (header, mut data) = self.frame_codec.receive()?;
20 let close_code = if header.code == OpCode::Close {
21 let code = if data.len() >= 2 {
22 data.get_u16()
23 } else {
24 1000
25 };
26 Some(code)
27 } else {
28 None
29 };
30 Ok(Message {
31 code: header.code,
32 data: Cow::Borrowed(data),
33 close_code,
34 })
35 }
36 };
37}
38
39macro_rules! impl_send {
40 () => {
41 pub fn ping<'a>(&mut self, msg: &'a [u8]) -> Result<(), WsError> {
43 self.send((OpCode::Ping, msg))
44 }
45
46 pub fn pong<'a>(&mut self, msg: &'a [u8]) -> Result<(), WsError> {
48 self.send((OpCode::Pong, msg))
49 }
50
51 pub fn close<'a>(&mut self, code: u16, msg: &'a [u8]) -> Result<(), WsError> {
53 self.send((code, msg))
54 }
55
56 pub fn send<'a, T: Into<Message<Cow<'a, [u8]>>>>(&mut self, msg: T) -> Result<(), WsError> {
58 let msg: Message<Cow<'a, [u8]>> = msg.into();
59 if let Some(close_code) = msg.close_code {
60 if msg.code == OpCode::Close {
61 let mut data = close_code.to_be_bytes().to_vec();
62 data.extend_from_slice(msg.data.as_ref());
63 self.frame_codec.send(msg.code, &data)
64 } else {
65 self.frame_codec.send(msg.code, msg.data.as_ref())
66 }
67 } else {
68 self.frame_codec.send(msg.code, msg.data.as_ref())
69 }
70 }
71
72 pub fn flush(&mut self) -> Result<(), WsError> {
74 self.frame_codec.flush()
75 }
76 };
77}
78
79pub struct BytesRecv<S: Read> {
81 frame_codec: FrameRecv<S>,
82}
83
84impl<S: Read> BytesRecv<S> {
85 pub fn new(stream: S, state: FrameReadState) -> Self {
87 Self {
88 frame_codec: FrameRecv::new(stream, state),
89 }
90 }
91
92 impl_recv! {}
93}
94
95pub struct BytesSend<S: Write> {
97 frame_codec: FrameSend<S>,
98}
99
100impl<S: Write> BytesSend<S> {
101 pub fn new(stream: S, state: FrameWriteState) -> Self {
103 Self {
104 frame_codec: FrameSend::new(stream, state),
105 }
106 }
107
108 impl_send! {}
109}
110
111pub struct BytesCodec<S: Read + Write> {
113 frame_codec: FrameCodec<S>,
114}
115
116impl<S: Read + Write> BytesCodec<S> {
117 pub fn new(stream: S) -> Self {
119 Self {
120 frame_codec: FrameCodec::new(stream),
121 }
122 }
123
124 pub fn new_with(stream: S, config: FrameConfig) -> Self {
126 Self {
127 frame_codec: FrameCodec::new_with(stream, config),
128 }
129 }
130
131 pub fn factory(_req: http::Request<()>, stream: S) -> Result<Self, WsError> {
133 let config = FrameConfig {
134 mask_send_frame: false,
135 ..Default::default()
136 };
137 Ok(Self::new_with(stream, config))
138 }
139
140 pub fn check_fn(key: String, resp: http::Response<()>, stream: S) -> Result<Self, WsError> {
142 standard_handshake_resp_check(key.as_bytes(), &resp)?;
143 Ok(Self::new_with(stream, FrameConfig::default()))
144 }
145
146 pub fn stream_mut(&mut self) -> &mut S {
148 self.frame_codec.stream_mut()
149 }
150
151 impl_recv! {}
152
153 impl_send! {}
154}
155
156impl<R, W, S> BytesCodec<S>
157where
158 R: Read,
159 W: Write,
160 S: Read + Write + Split<R = R, W = W>,
161{
162 pub fn split(self) -> (BytesRecv<R>, BytesSend<W>) {
164 let FrameCodec {
165 stream,
166 read_state,
167 write_state,
168 } = self.frame_codec;
169 let (read, write) = stream.split();
170 (
171 BytesRecv::new(read, read_state),
172 BytesSend::new(write, write_state),
173 )
174 }
175}