pipebuf_websocket/lib.rs
1//! [`PipeBuf`] wrapper for [embedded-websocket]
2//!
3//! This handles websocket protocol only, independent of the
4//! transport. So this can be combined with `pipebuf_mio` or
5//! `pipebuf_rustls` or other crates to meet different needs.
6//!
7//! This is efficient because [embedded-websocket] exposes a
8//! slice-based interface and works between buffers provided by the
9//! caller. So it is ideal to be wrapped by [`PipeBuf`]. Since
10//! websocket permits streaming of message data via fragments, a
11//! message is here handled as a pipe-buffer allowing the caller to
12//! also stream the data if they wish.
13//!
14//! On the sending side, a "push" is indicated after each message
15//! sent.
16//!
17//! TODO: Support client-side with a `WebsocketClient` wrapper.
18//! (Similar to existing code but would need testing.)
19//!
20//! TODO: Rewrite this as a native PipeBuf-based websocket
21//! implementation that for Ping/Pong/Close consumes only whole frames
22//! (with limits), to simplify things. The message content can still
23//! be streamed, though. Also see [Autobahn
24//! testsuite](https://github.com/crossbario/autobahn-testsuite).
25//!
26//! [embedded-websocket]: https://crates.io/crates/embedded-websocket
27//! [`PipeBuf`]: https://crates.io/crates/pipebuf
28
29use embedded_websocket as ws;
30use httparse::Status;
31use pipebuf::{PBufRdWr, PBufWr};
32use ws::WebSocketReceiveMessageType as RxMsgType;
33use ws::WebSocketSendMessageType as TxMsgType;
34use ws::{WebSocketSendMessageType, WebSocketServer, WebSocketSubProtocol};
35
36/// Wraps an [`embedded_websocket::WebSocketServer`]
37///
38/// [`embedded_websocket::WebSocketServer`]:
39/// https://docs.rs/embedded-websocket/0.8.0/embedded_websocket/type.WebSocketServer.html
40pub struct WebsocketServer {
41 ws: ws::WebSocketServer,
42 in_data: Vec<u8>,
43 max_msg_len: usize,
44 max_aux_len: usize,
45}
46
47impl WebsocketServer {
48 /// Attempt to interpret the initial data in the given pipe-buffer
49 /// stream as websocket HTTP headers and initialise the websocket
50 /// stream from them.
51 ///
52 /// Returns:
53 ///
54 /// - `Ok(None)` if more data is required
55 ///
56 /// - `Ok(Some(Self))` if valid HTTP websocket headers were found
57 /// and consumed and the websocket is now ready. A protocol reply
58 /// will have been sent back on `pb.wr`.
59 ///
60 /// - `Err(_)` if the HTTP headers are invalid, or contain invalid
61 /// data for a websocket stream. All the initial data will be
62 /// left unconsumed in the pipe buffer in case it can be
63 /// interpreted as another protocol
64 ///
65 /// `subprotocol` argument may be used to specify a subprotocol to
66 /// pass back to the client, if required. See
67 /// `embedded_websocket` documentation.
68 ///
69 /// `max_msg_len` puts a limit on the size of data that will be
70 /// allowed in the message buffer before failing the websocket, as
71 /// a protection against denial of service attacks. This is the
72 /// limit of how much unread data is allowed in that buffer. If
73 /// the caller streams the data out as it is read, then an
74 /// unlimited amount of data may still be received. In case of
75 /// exceeding this limit, `Error::WriteToBufferTooSmall` is
76 /// returned.
77 ///
78 /// `max_aux_len` puts a limit on the size of data associated with
79 /// `Ping` and `Close` messages before failing the websocket, as a
80 /// protection against denial of service attacks. In case of
81 /// exceeding this limit, `Error::WriteToBufferTooSmall` is
82 /// returned.
83 ///
84 /// `header_cb` is called for each HTTP header line as
85 /// `header_cb(field_name, field_value)` once the websocket
86 /// connection has been verified in order to allow the caller to
87 /// extract whatever details may be required, such as `Origin`.
88 pub fn from_http_scan(
89 mut pb: PBufRdWr,
90 subprotocol: Option<&WebSocketSubProtocol>,
91 max_msg_len: usize,
92 max_aux_len: usize,
93 mut header_cb: impl FnMut(&str, &[u8]),
94 ) -> Result<Option<Self>, ws::Error> {
95 // `Header` is 2 pointers, so this is 128 bytes (on 64-bit)
96 let mut headers = [httparse::EMPTY_HEADER; 32];
97 let mut request = httparse::Request::new(&mut headers);
98 match request.parse(pb.rd.data()) {
99 Err(e) => Err(ws::Error::HttpHeader(e)),
100 Ok(Status::Partial) => Ok(None), // Wait for more data
101 Ok(Status::Complete(count)) => {
102 let headers = request.headers.iter().map(|f| (f.name, f.value));
103 match ws::read_http_header(headers)? {
104 None => Err(ws::Error::Unknown), // Actually: not valid WS HTTP headers
105 Some(ws_context) => {
106 let mut ws = WebSocketServer::new_server();
107 let blen = ws.server_accept(
108 &ws_context.sec_websocket_key,
109 subprotocol,
110 pb.wr.space(1024),
111 )?;
112 for h in request.headers.iter() {
113 header_cb(h.name, h.value);
114 }
115 pb.wr.commit(blen);
116 pb.rd.consume(count);
117 Ok(Some(Self::from_wss(ws, max_msg_len, max_aux_len)))
118 }
119 }
120 }
121 }
122 }
123
124 /// Attempt to interpret the initial data in the given pipe-buffer
125 /// stream as websocket HTTP headers and initialise the websocket
126 /// stream from them.
127 ///
128 /// See [`WebsocketServer::from_http_scan`] for details of
129 /// arguments and returns.
130 pub fn from_http(
131 pb: PBufRdWr,
132 subprotocol: Option<&WebSocketSubProtocol>,
133 max_msg_len: usize,
134 max_aux_len: usize,
135 ) -> Result<Option<Self>, ws::Error> {
136 Self::from_http_scan(pb, subprotocol, max_msg_len, max_aux_len, |_, _| ())
137 }
138
139 /// Create from an already-initialised [`WebSocketServer`]
140 ///
141 /// `max_msg_len` puts a limit on the size of data that will be
142 /// allowed in the message buffer before failing the websocket, as
143 /// a protection against denial of service attacks. This is the
144 /// limit of how much unread data is allowed in that buffer. If
145 /// the caller streams the data out as it is read, then an
146 /// unlimited amount of data may still be received. In case of
147 /// exceeding this limit, `Error::WriteToBufferTooSmall` is
148 /// returned.
149 ///
150 /// `max_aux_len` puts a limit on the size of data associated with
151 /// `Ping` and `Close` messages before failing the websocket, as a
152 /// protection against denial of service attacks. In case of
153 /// exceeding this limit, `Error::WriteToBufferTooSmall` is
154 /// returned.
155 pub fn from_wss(ws: WebSocketServer, max_msg_len: usize, max_aux_len: usize) -> Self {
156 Self {
157 ws,
158 in_data: Vec::new(),
159 max_msg_len,
160 max_aux_len,
161 }
162 }
163
164 /// Send an unfragmented websocket text message
165 pub fn send_text(&mut self, pb: PBufRdWr, data: &str) -> Result<(), ws::Error> {
166 self.send(pb, WebSocketSendMessageType::Text, true, data.as_bytes())
167 }
168
169 /// Send an unfragmented websocket binary message
170 pub fn send_binary(&mut self, pb: PBufRdWr, data: &[u8]) -> Result<(), ws::Error> {
171 self.send(pb, WebSocketSendMessageType::Binary, true, data)
172 }
173
174 /// Send an arbitrary websocket message. This is a wrapper around
175 /// [`WebSocketServer::write`]. For an unfragmented message,
176 /// `eom` should be `true`. For a fragmented message, it should
177 /// be `true` only for the final fragment.
178 pub fn send(
179 &mut self,
180 mut pb: PBufRdWr,
181 msg: WebSocketSendMessageType,
182 eom: bool,
183 data: &[u8],
184 ) -> Result<(), ws::Error> {
185 if pb.wr.is_eof() {
186 Err(ws::Error::WebSocketNotOpen)
187 } else {
188 let reserve = 12 + data.len(); // Server frame header is max 10
189 let used = self.ws.write(msg, eom, data, pb.wr.space(reserve))?;
190 pb.wr.commit(used);
191 pb.wr.push();
192 Ok(())
193 }
194 }
195
196 /// Send a reply with the contents of `self.in_data`
197 fn send_reply(
198 &mut self,
199 mut pb: PBufRdWr,
200 msg: WebSocketSendMessageType,
201 ) -> Result<(), ws::Error> {
202 if pb.wr.is_eof() {
203 Err(ws::Error::WebSocketNotOpen)
204 } else {
205 let data = &self.in_data[..];
206 let reserve = 12 + data.len(); // Server frame header is max 10
207 let used = self.ws.write(msg, true, data, pb.wr.space(reserve))?;
208 pb.wr.commit(used);
209 Ok(())
210 }
211 }
212
213 /// Process as much data as possible from the stream. Whilst
214 /// processing, sends back `Pong` and `CloseReply` messages as
215 /// necessary according to protocol. If the stream is closed at a
216 /// websocket protocol level, closes the output stream `pb.wr`.
217 ///
218 /// Received message data is streamed into the `message`
219 /// pipe-buffer. When the end of the message is reached, EOF is
220 /// indicated on the pipe-buffer (with state `Closing`). This
221 /// takes care of websocket fragments being used to stream data.
222 /// The caller may wait for an EOF and process the entire message,
223 /// or else process the data as it comes in (streaming style).
224 /// Even partial fragments may result in data being added to the
225 /// pipe-buffer, so you can't count on seeing data with the
226 /// original fragment boundaries. `*is_text` will be set
227 /// according to the message type: `true` for text, `false` for
228 /// binary. When EOF is indicated on the `message` pipe-buffer,
229 /// the caller must process the contents and reset the buffer
230 /// (with `PipeBuf::reset()`) before calling this method again, so
231 /// that a new message can be read into it.
232 ///
233 /// Returns `Ok(true)` if there was activity, `Ok(false)` if it is
234 /// not possible to advance right now, or `Err(_)` in case of
235 /// protocol or limit errors. After each call check to see
236 /// whether a partial or complete message was received. In case
237 /// of EOF on a message, there may be more websocket frames still
238 /// to read, so call again.
239 pub fn receive(
240 &mut self,
241 mut pb: PBufRdWr,
242 mut message: PBufWr,
243 is_text: &mut bool,
244 ) -> Result<bool, ws::Error> {
245 assert!(!message.is_eof(), "Caller must .reset() buffer after EOF");
246 let mut activity = false;
247 while !pb.rd.is_empty() {
248 // Make sure there is space to read all available data
249 let space = message.space(pb.rd.len());
250 match self.ws.read(pb.rd.data(), space) {
251 Err(ws::Error::ReadFrameIncomplete) => break,
252 Err(e) => return Err(e),
253 Ok(rr) => {
254 pb.rd.consume(rr.len_from);
255 let to_commit = rr.len_to;
256 activity = true;
257 match rr.message_type {
258 RxMsgType::Text | RxMsgType::Binary => {
259 *is_text = rr.message_type == RxMsgType::Text;
260 message.commit(to_commit);
261 if message.exceeds_limit(self.max_msg_len) {
262 return Err(ws::Error::WriteToBufferTooSmall);
263 }
264 if rr.end_of_message {
265 message.close();
266 break;
267 }
268 }
269 RxMsgType::CloseCompleted => {
270 pb.wr.close();
271 }
272 RxMsgType::CloseMustReply | RxMsgType::Ping | RxMsgType::Pong => {
273 // Due to text/binary message
274 // fragmentation and embedded_websocket's
275 // behaviour regarding partial frames, we
276 // have to build up close/ping/pong data
277 // separately.
278 self.in_data.extend_from_slice(&space[..to_commit]);
279 if self.in_data.len() > self.max_aux_len {
280 return Err(ws::Error::WriteToBufferTooSmall);
281 }
282 if rr.end_of_message {
283 match rr.message_type {
284 RxMsgType::CloseMustReply => {
285 self.send_reply(pb.reborrow(), TxMsgType::CloseReply)?;
286 pb.wr.close();
287 }
288 RxMsgType::Ping => {
289 self.send_reply(pb.reborrow(), TxMsgType::Pong)?;
290 }
291 RxMsgType::Pong => (), // Ignore Pongs
292 _ => (),
293 }
294 self.in_data.clear();
295 }
296 }
297 }
298 }
299 }
300 }
301 Ok(activity)
302 }
303}