use embedded_websocket as ws;
use httparse::Status;
use pipebuf::{PBufRdWr, PBufWr};
use ws::WebSocketReceiveMessageType as RxMsgType;
use ws::WebSocketSendMessageType as TxMsgType;
use ws::{WebSocketSendMessageType, WebSocketServer, WebSocketSubProtocol};
pub struct WebsocketServer {
ws: ws::WebSocketServer,
in_data: Vec<u8>,
max_msg_len: usize,
max_aux_len: usize,
}
impl WebsocketServer {
pub fn from_http_scan(
mut pb: PBufRdWr,
subprotocol: Option<&WebSocketSubProtocol>,
max_msg_len: usize,
max_aux_len: usize,
mut header_cb: impl FnMut(&str, &[u8]),
) -> Result<Option<Self>, ws::Error> {
let mut headers = [httparse::EMPTY_HEADER; 32];
let mut request = httparse::Request::new(&mut headers);
match request.parse(pb.rd.data()) {
Err(e) => Err(ws::Error::HttpHeader(e)),
Ok(Status::Partial) => Ok(None), Ok(Status::Complete(count)) => {
let headers = request.headers.iter().map(|f| (f.name, f.value));
match ws::read_http_header(headers)? {
None => Err(ws::Error::Unknown), Some(ws_context) => {
let mut ws = WebSocketServer::new_server();
let blen = ws.server_accept(
&ws_context.sec_websocket_key,
subprotocol,
pb.wr.space(1024),
)?;
for h in request.headers.iter() {
header_cb(h.name, h.value);
}
pb.wr.commit(blen);
pb.rd.consume(count);
Ok(Some(Self::from_wss(ws, max_msg_len, max_aux_len)))
}
}
}
}
}
pub fn from_http(
pb: PBufRdWr,
subprotocol: Option<&WebSocketSubProtocol>,
max_msg_len: usize,
max_aux_len: usize,
) -> Result<Option<Self>, ws::Error> {
Self::from_http_scan(pb, subprotocol, max_msg_len, max_aux_len, |_, _| ())
}
pub fn from_wss(ws: WebSocketServer, max_msg_len: usize, max_aux_len: usize) -> Self {
Self {
ws,
in_data: Vec::new(),
max_msg_len,
max_aux_len,
}
}
pub fn send_text(&mut self, pb: PBufRdWr, data: &str) -> Result<(), ws::Error> {
self.send(pb, WebSocketSendMessageType::Text, true, data.as_bytes())
}
pub fn send_binary(&mut self, pb: PBufRdWr, data: &[u8]) -> Result<(), ws::Error> {
self.send(pb, WebSocketSendMessageType::Binary, true, data)
}
pub fn send(
&mut self,
mut pb: PBufRdWr,
msg: WebSocketSendMessageType,
eom: bool,
data: &[u8],
) -> Result<(), ws::Error> {
if pb.wr.is_eof() {
Err(ws::Error::WebSocketNotOpen)
} else {
let reserve = 12 + data.len(); let used = self.ws.write(msg, eom, data, pb.wr.space(reserve))?;
pb.wr.commit(used);
pb.wr.push();
Ok(())
}
}
fn send_reply(
&mut self,
mut pb: PBufRdWr,
msg: WebSocketSendMessageType,
) -> Result<(), ws::Error> {
if pb.wr.is_eof() {
Err(ws::Error::WebSocketNotOpen)
} else {
let data = &self.in_data[..];
let reserve = 12 + data.len(); let used = self.ws.write(msg, true, data, pb.wr.space(reserve))?;
pb.wr.commit(used);
Ok(())
}
}
pub fn receive(
&mut self,
mut pb: PBufRdWr,
mut message: PBufWr,
is_text: &mut bool,
) -> Result<bool, ws::Error> {
assert!(!message.is_eof(), "Caller must .reset() buffer after EOF");
let mut activity = false;
while !pb.rd.is_empty() {
let space = message.space(pb.rd.len());
match self.ws.read(pb.rd.data(), space) {
Err(ws::Error::ReadFrameIncomplete) => break,
Err(e) => return Err(e),
Ok(rr) => {
pb.rd.consume(rr.len_from);
let to_commit = rr.len_to;
activity = true;
match rr.message_type {
RxMsgType::Text | RxMsgType::Binary => {
*is_text = rr.message_type == RxMsgType::Text;
message.commit(to_commit);
if message.exceeds_limit(self.max_msg_len) {
return Err(ws::Error::WriteToBufferTooSmall);
}
if rr.end_of_message {
message.close();
break;
}
}
RxMsgType::CloseCompleted => {
pb.wr.close();
}
RxMsgType::CloseMustReply | RxMsgType::Ping | RxMsgType::Pong => {
self.in_data.extend_from_slice(&space[..to_commit]);
if self.in_data.len() > self.max_aux_len {
return Err(ws::Error::WriteToBufferTooSmall);
}
if rr.end_of_message {
match rr.message_type {
RxMsgType::CloseMustReply => {
self.send_reply(pb.reborrow(), TxMsgType::CloseReply)?;
pb.wr.close();
}
RxMsgType::Ping => {
self.send_reply(pb.reborrow(), TxMsgType::Pong)?;
}
RxMsgType::Pong => (), _ => (),
}
self.in_data.clear();
}
}
}
}
}
}
Ok(activity)
}
}