pipebuf_websocket/
lib.rs

1//! [`PipeBuf`] wrapper for [embedded-websocket]
2//!
3//! This handles websocket protocol only, independent of the
4//! transport.  So this can be combined with `pipebuf_mio` or
5//! `pipebuf_rustls` or other crates to meet different needs.
6//!
7//! This is efficient because [embedded-websocket] exposes a
8//! slice-based interface and works between buffers provided by the
9//! caller.  So it is ideal to be wrapped by [`PipeBuf`].  Since
10//! websocket permits streaming of message data via fragments, a
11//! message is here handled as a pipe-buffer allowing the caller to
12//! also stream the data if they wish.
13//!
14//! On the sending side, a "push" is indicated after each message
15//! sent.
16//!
17//! TODO: Support client-side with a `WebsocketClient` wrapper.
18//! (Similar to existing code but would need testing.)
19//!
20//! TODO: Rewrite this as a native PipeBuf-based websocket
21//! implementation that for Ping/Pong/Close consumes only whole frames
22//! (with limits), to simplify things.  The message content can still
23//! be streamed, though.  Also see [Autobahn
24//! testsuite](https://github.com/crossbario/autobahn-testsuite).
25//!
26//! [embedded-websocket]: https://crates.io/crates/embedded-websocket
27//! [`PipeBuf`]: https://crates.io/crates/pipebuf
28
29use embedded_websocket as ws;
30use httparse::Status;
31use pipebuf::{PBufRdWr, PBufWr};
32use ws::WebSocketReceiveMessageType as RxMsgType;
33use ws::WebSocketSendMessageType as TxMsgType;
34use ws::{WebSocketSendMessageType, WebSocketServer, WebSocketSubProtocol};
35
36/// Wraps an [`embedded_websocket::WebSocketServer`]
37///
38/// [`embedded_websocket::WebSocketServer`]:
39/// https://docs.rs/embedded-websocket/0.8.0/embedded_websocket/type.WebSocketServer.html
40pub struct WebsocketServer {
41    ws: ws::WebSocketServer,
42    in_data: Vec<u8>,
43    max_msg_len: usize,
44    max_aux_len: usize,
45}
46
47impl WebsocketServer {
48    /// Attempt to interpret the initial data in the given pipe-buffer
49    /// stream as websocket HTTP headers and initialise the websocket
50    /// stream from them.
51    ///
52    /// Returns:
53    ///
54    /// - `Ok(None)` if more data is required
55    ///
56    /// - `Ok(Some(Self))` if valid HTTP websocket headers were found
57    /// and consumed and the websocket is now ready.  A protocol reply
58    /// will have been sent back on `pb.wr`.
59    ///
60    /// - `Err(_)` if the HTTP headers are invalid, or contain invalid
61    /// data for a websocket stream.  All the initial data will be
62    /// left unconsumed in the pipe buffer in case it can be
63    /// interpreted as another protocol
64    ///
65    /// `subprotocol` argument may be used to specify a subprotocol to
66    /// pass back to the client, if required.  See
67    /// `embedded_websocket` documentation.
68    ///
69    /// `max_msg_len` puts a limit on the size of data that will be
70    /// allowed in the message buffer before failing the websocket, as
71    /// a protection against denial of service attacks.  This is the
72    /// limit of how much unread data is allowed in that buffer.  If
73    /// the caller streams the data out as it is read, then an
74    /// unlimited amount of data may still be received.  In case of
75    /// exceeding this limit, `Error::WriteToBufferTooSmall` is
76    /// returned.
77    ///
78    /// `max_aux_len` puts a limit on the size of data associated with
79    /// `Ping` and `Close` messages before failing the websocket, as a
80    /// protection against denial of service attacks.  In case of
81    /// exceeding this limit, `Error::WriteToBufferTooSmall` is
82    /// returned.
83    ///
84    /// `header_cb` is called for each HTTP header line as
85    /// `header_cb(field_name, field_value)` once the websocket
86    /// connection has been verified in order to allow the caller to
87    /// extract whatever details may be required, such as `Origin`.
88    pub fn from_http_scan(
89        mut pb: PBufRdWr,
90        subprotocol: Option<&WebSocketSubProtocol>,
91        max_msg_len: usize,
92        max_aux_len: usize,
93        mut header_cb: impl FnMut(&str, &[u8]),
94    ) -> Result<Option<Self>, ws::Error> {
95        // `Header` is 2 pointers, so this is 128 bytes (on 64-bit)
96        let mut headers = [httparse::EMPTY_HEADER; 32];
97        let mut request = httparse::Request::new(&mut headers);
98        match request.parse(pb.rd.data()) {
99            Err(e) => Err(ws::Error::HttpHeader(e)),
100            Ok(Status::Partial) => Ok(None), // Wait for more data
101            Ok(Status::Complete(count)) => {
102                let headers = request.headers.iter().map(|f| (f.name, f.value));
103                match ws::read_http_header(headers)? {
104                    None => Err(ws::Error::Unknown), // Actually: not valid WS HTTP headers
105                    Some(ws_context) => {
106                        let mut ws = WebSocketServer::new_server();
107                        let blen = ws.server_accept(
108                            &ws_context.sec_websocket_key,
109                            subprotocol,
110                            pb.wr.space(1024),
111                        )?;
112                        for h in request.headers.iter() {
113                            header_cb(h.name, h.value);
114                        }
115                        pb.wr.commit(blen);
116                        pb.rd.consume(count);
117                        Ok(Some(Self::from_wss(ws, max_msg_len, max_aux_len)))
118                    }
119                }
120            }
121        }
122    }
123
124    /// Attempt to interpret the initial data in the given pipe-buffer
125    /// stream as websocket HTTP headers and initialise the websocket
126    /// stream from them.
127    ///
128    /// See [`WebsocketServer::from_http_scan`] for details of
129    /// arguments and returns.
130    pub fn from_http(
131        pb: PBufRdWr,
132        subprotocol: Option<&WebSocketSubProtocol>,
133        max_msg_len: usize,
134        max_aux_len: usize,
135    ) -> Result<Option<Self>, ws::Error> {
136        Self::from_http_scan(pb, subprotocol, max_msg_len, max_aux_len, |_, _| ())
137    }
138
139    /// Create from an already-initialised [`WebSocketServer`]
140    ///
141    /// `max_msg_len` puts a limit on the size of data that will be
142    /// allowed in the message buffer before failing the websocket, as
143    /// a protection against denial of service attacks.  This is the
144    /// limit of how much unread data is allowed in that buffer.  If
145    /// the caller streams the data out as it is read, then an
146    /// unlimited amount of data may still be received.  In case of
147    /// exceeding this limit, `Error::WriteToBufferTooSmall` is
148    /// returned.
149    ///
150    /// `max_aux_len` puts a limit on the size of data associated with
151    /// `Ping` and `Close` messages before failing the websocket, as a
152    /// protection against denial of service attacks.  In case of
153    /// exceeding this limit, `Error::WriteToBufferTooSmall` is
154    /// returned.
155    pub fn from_wss(ws: WebSocketServer, max_msg_len: usize, max_aux_len: usize) -> Self {
156        Self {
157            ws,
158            in_data: Vec::new(),
159            max_msg_len,
160            max_aux_len,
161        }
162    }
163
164    /// Send an unfragmented websocket text message
165    pub fn send_text(&mut self, pb: PBufRdWr, data: &str) -> Result<(), ws::Error> {
166        self.send(pb, WebSocketSendMessageType::Text, true, data.as_bytes())
167    }
168
169    /// Send an unfragmented websocket binary message
170    pub fn send_binary(&mut self, pb: PBufRdWr, data: &[u8]) -> Result<(), ws::Error> {
171        self.send(pb, WebSocketSendMessageType::Binary, true, data)
172    }
173
174    /// Send an arbitrary websocket message.  This is a wrapper around
175    /// [`WebSocketServer::write`].  For an unfragmented message,
176    /// `eom` should be `true`.  For a fragmented message, it should
177    /// be `true` only for the final fragment.
178    pub fn send(
179        &mut self,
180        mut pb: PBufRdWr,
181        msg: WebSocketSendMessageType,
182        eom: bool,
183        data: &[u8],
184    ) -> Result<(), ws::Error> {
185        if pb.wr.is_eof() {
186            Err(ws::Error::WebSocketNotOpen)
187        } else {
188            let reserve = 12 + data.len(); // Server frame header is max 10
189            let used = self.ws.write(msg, eom, data, pb.wr.space(reserve))?;
190            pb.wr.commit(used);
191            pb.wr.push();
192            Ok(())
193        }
194    }
195
196    /// Send a reply with the contents of `self.in_data`
197    fn send_reply(
198        &mut self,
199        mut pb: PBufRdWr,
200        msg: WebSocketSendMessageType,
201    ) -> Result<(), ws::Error> {
202        if pb.wr.is_eof() {
203            Err(ws::Error::WebSocketNotOpen)
204        } else {
205            let data = &self.in_data[..];
206            let reserve = 12 + data.len(); // Server frame header is max 10
207            let used = self.ws.write(msg, true, data, pb.wr.space(reserve))?;
208            pb.wr.commit(used);
209            Ok(())
210        }
211    }
212
213    /// Process as much data as possible from the stream.  Whilst
214    /// processing, sends back `Pong` and `CloseReply` messages as
215    /// necessary according to protocol.  If the stream is closed at a
216    /// websocket protocol level, closes the output stream `pb.wr`.
217    ///
218    /// Received message data is streamed into the `message`
219    /// pipe-buffer.  When the end of the message is reached, EOF is
220    /// indicated on the pipe-buffer (with state `Closing`).  This
221    /// takes care of websocket fragments being used to stream data.
222    /// The caller may wait for an EOF and process the entire message,
223    /// or else process the data as it comes in (streaming style).
224    /// Even partial fragments may result in data being added to the
225    /// pipe-buffer, so you can't count on seeing data with the
226    /// original fragment boundaries.  `*is_text` will be set
227    /// according to the message type: `true` for text, `false` for
228    /// binary.  When EOF is indicated on the `message` pipe-buffer,
229    /// the caller must process the contents and reset the buffer
230    /// (with `PipeBuf::reset()`) before calling this method again, so
231    /// that a new message can be read into it.
232    ///
233    /// Returns `Ok(true)` if there was activity, `Ok(false)` if it is
234    /// not possible to advance right now, or `Err(_)` in case of
235    /// protocol or limit errors.  After each call check to see
236    /// whether a partial or complete message was received.  In case
237    /// of EOF on a message, there may be more websocket frames still
238    /// to read, so call again.
239    pub fn receive(
240        &mut self,
241        mut pb: PBufRdWr,
242        mut message: PBufWr,
243        is_text: &mut bool,
244    ) -> Result<bool, ws::Error> {
245        assert!(!message.is_eof(), "Caller must .reset() buffer after EOF");
246        let mut activity = false;
247        while !pb.rd.is_empty() {
248            // Make sure there is space to read all available data
249            let space = message.space(pb.rd.len());
250            match self.ws.read(pb.rd.data(), space) {
251                Err(ws::Error::ReadFrameIncomplete) => break,
252                Err(e) => return Err(e),
253                Ok(rr) => {
254                    pb.rd.consume(rr.len_from);
255                    let to_commit = rr.len_to;
256                    activity = true;
257                    match rr.message_type {
258                        RxMsgType::Text | RxMsgType::Binary => {
259                            *is_text = rr.message_type == RxMsgType::Text;
260                            message.commit(to_commit);
261                            if message.exceeds_limit(self.max_msg_len) {
262                                return Err(ws::Error::WriteToBufferTooSmall);
263                            }
264                            if rr.end_of_message {
265                                message.close();
266                                break;
267                            }
268                        }
269                        RxMsgType::CloseCompleted => {
270                            pb.wr.close();
271                        }
272                        RxMsgType::CloseMustReply | RxMsgType::Ping | RxMsgType::Pong => {
273                            // Due to text/binary message
274                            // fragmentation and embedded_websocket's
275                            // behaviour regarding partial frames, we
276                            // have to build up close/ping/pong data
277                            // separately.
278                            self.in_data.extend_from_slice(&space[..to_commit]);
279                            if self.in_data.len() > self.max_aux_len {
280                                return Err(ws::Error::WriteToBufferTooSmall);
281                            }
282                            if rr.end_of_message {
283                                match rr.message_type {
284                                    RxMsgType::CloseMustReply => {
285                                        self.send_reply(pb.reborrow(), TxMsgType::CloseReply)?;
286                                        pb.wr.close();
287                                    }
288                                    RxMsgType::Ping => {
289                                        self.send_reply(pb.reborrow(), TxMsgType::Pong)?;
290                                    }
291                                    RxMsgType::Pong => (), // Ignore Pongs
292                                    _ => (),
293                                }
294                                self.in_data.clear();
295                            }
296                        }
297                    }
298                }
299            }
300        }
301        Ok(activity)
302    }
303}