use std::io::Read;
use std::io::Result as IoResult;
use hyper::buffer::BufReader;
use crate::dataframe::{DataFrame, Opcode};
use crate::message::OwnedMessage;
use crate::result::{WebSocketError, WebSocketResult};
pub use crate::stream::sync::Shutdown;
use crate::stream::sync::{AsTcpStream, Stream};
use crate::ws;
use crate::ws::receiver::Receiver as ReceiverTrait;
use crate::ws::receiver::{DataFrameIterator, MessageIterator};
pub struct Reader<R>
where
R: Read,
{
pub stream: BufReader<R>,
pub receiver: Receiver,
}
impl<R> Reader<R>
where
R: Read,
{
pub fn recv_dataframe(&mut self) -> WebSocketResult<DataFrame> {
self.receiver.recv_dataframe(&mut self.stream)
}
pub fn incoming_dataframes(&mut self) -> DataFrameIterator<Receiver, BufReader<R>> {
self.receiver.incoming_dataframes(&mut self.stream)
}
pub fn recv_message(&mut self) -> WebSocketResult<OwnedMessage> {
self.receiver.recv_message(&mut self.stream)
}
pub fn incoming_messages<'a>(&'a mut self) -> MessageIterator<'a, Receiver, BufReader<R>> {
self.receiver.incoming_messages(&mut self.stream)
}
}
impl<S> Reader<S>
where
S: AsTcpStream + Stream + Read,
{
pub fn shutdown(&self) -> IoResult<()> {
self.stream.get_ref().as_tcp().shutdown(Shutdown::Read)
}
pub fn shutdown_all(&self) -> IoResult<()> {
self.stream.get_ref().as_tcp().shutdown(Shutdown::Both)
}
}
pub struct Receiver {
buffer: Vec<DataFrame>,
mask: bool,
}
impl Receiver {
pub fn new(mask: bool) -> Receiver {
Receiver {
buffer: Vec::new(),
mask,
}
}
}
impl ws::Receiver for Receiver {
type F = DataFrame;
type M = OwnedMessage;
fn recv_dataframe<R>(&mut self, reader: &mut R) -> WebSocketResult<DataFrame>
where
R: Read,
{
DataFrame::read_dataframe(reader, self.mask)
}
fn recv_message_dataframes<R>(&mut self, reader: &mut R) -> WebSocketResult<Vec<DataFrame>>
where
R: Read,
{
let mut finished = if self.buffer.is_empty() {
let first = self.recv_dataframe(reader)?;
if first.opcode == Opcode::Continuation {
return Err(WebSocketError::ProtocolError(
"Unexpected continuation data frame opcode",
));
}
let finished = first.finished;
self.buffer.push(first);
finished
} else {
false
};
while !finished {
let next = self.recv_dataframe(reader)?;
finished = next.finished;
match next.opcode as u8 {
0 => self.buffer.push(next),
8..=15 => {
return Ok(vec![next]);
}
_ => {
return Err(WebSocketError::ProtocolError(
"Unexpected data frame opcode",
));
}
}
}
Ok(::std::mem::replace(&mut self.buffer, Vec::new()))
}
}