use hyper::buffer::BufReader;
use hyper::header::Headers;
use std::io::Result as IoResult;
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::net::TcpStream;
use crate::dataframe::DataFrame;
use crate::header::extensions::Extension;
use crate::header::{WebSocketExtensions, WebSocketProtocol};
use crate::message::OwnedMessage;
use crate::result::WebSocketResult;
use crate::stream::sync::{AsTcpStream, Shutdown, Splittable, Stream};
use crate::ws;
use crate::ws::receiver::Receiver as ReceiverTrait;
use crate::ws::receiver::{DataFrameIterator, MessageIterator};
use crate::ws::sender::Sender as SenderTrait;
pub use crate::receiver::Reader;
use crate::receiver::Receiver;
use crate::sender::Sender;
pub use crate::sender::Writer;
use crate::ws::dataframe::DataFrame as DataFrameable;
pub struct Client<S>
where
S: Stream,
{
stream: BufReader<S>,
headers: Headers,
sender: Sender,
receiver: Receiver,
}
impl Client<TcpStream> {
pub fn shutdown_sender(&self) -> IoResult<()> {
self.stream.get_ref().as_tcp().shutdown(Shutdown::Write)
}
pub fn shutdown_receiver(&self) -> IoResult<()> {
self.stream.get_ref().as_tcp().shutdown(Shutdown::Read)
}
}
impl<S> Client<S>
where
S: AsTcpStream + Stream,
{
pub fn shutdown(&self) -> IoResult<()> {
self.stream.get_ref().as_tcp().shutdown(Shutdown::Both)
}
pub fn peer_addr(&self) -> IoResult<SocketAddr> {
self.stream.get_ref().as_tcp().peer_addr()
}
pub fn local_addr(&self) -> IoResult<SocketAddr> {
self.stream.get_ref().as_tcp().local_addr()
}
pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
self.stream.get_ref().as_tcp().set_nodelay(nodelay)
}
pub fn set_nonblocking(&self, nonblocking: bool) -> IoResult<()> {
self.stream.get_ref().as_tcp().set_nonblocking(nonblocking)
}
}
impl<S> Client<S>
where
S: Stream,
{
#[doc(hidden)]
pub fn unchecked(
stream: BufReader<S>,
headers: Headers,
out_mask: bool,
in_mask: bool,
) -> Self {
Client {
headers,
stream,
sender: Sender::new(out_mask), receiver: Receiver::new(in_mask), }
}
#[doc(hidden)]
pub fn unchecked_with_limits(
stream: BufReader<S>,
headers: Headers,
out_mask: bool,
in_mask: bool,
max_dataframe_size: usize,
max_message_size: usize,
) -> Self {
Client {
headers,
stream,
sender: Sender::new(out_mask), receiver: Receiver::new_with_limits(in_mask, max_dataframe_size, max_message_size), }
}
pub fn send_dataframe<D>(&mut self, dataframe: &D) -> WebSocketResult<()>
where
D: DataFrameable,
{
self.sender.send_dataframe(self.stream.get_mut(), dataframe)
}
pub fn send_message<M>(&mut self, message: &M) -> WebSocketResult<()>
where
M: ws::Message,
{
self.sender.send_message(self.stream.get_mut(), message)
}
pub fn recv_dataframe(&mut self) -> WebSocketResult<DataFrame> {
self.receiver.recv_dataframe(&mut self.stream)
}
pub fn incoming_dataframes(&mut self) -> DataFrameIterator<Receiver, BufReader<S>> {
self.receiver.incoming_dataframes(&mut self.stream)
}
pub fn recv_message(&mut self) -> WebSocketResult<OwnedMessage> {
self.receiver.recv_message(&mut self.stream)
}
pub fn headers(&self) -> &Headers {
&self.headers
}
pub fn protocols(&self) -> &[String] {
self.headers
.get::<WebSocketProtocol>()
.map(|p| p.0.as_slice())
.unwrap_or(&[])
}
pub fn extensions(&self) -> &[Extension] {
self.headers
.get::<WebSocketExtensions>()
.map(|e| e.0.as_slice())
.unwrap_or(&[])
}
pub fn stream_ref(&self) -> &S {
self.stream.get_ref()
}
pub fn writer_mut(&mut self) -> &mut dyn Write {
self.stream.get_mut()
}
pub fn reader_mut(&mut self) -> &mut dyn Read {
&mut self.stream
}
pub fn into_stream(self) -> (S, Option<(Vec<u8>, usize, usize)>) {
let (stream, buf, pos, cap) = self.stream.into_parts();
(stream, Some((buf, pos, cap)))
}
pub fn incoming_messages<'a>(&'a mut self) -> MessageIterator<'a, Receiver, BufReader<S>> {
self.receiver.incoming_messages(&mut self.stream)
}
}
impl<S> Client<S>
where
S: Splittable + Stream,
{
pub fn split(
self,
) -> IoResult<(
Reader<<S as Splittable>::Reader>,
Writer<<S as Splittable>::Writer>,
)> {
let (stream, buf, pos, cap) = self.stream.into_parts();
let (read, write) = stream.split()?;
Ok((
Reader {
stream: BufReader::from_parts(read, buf, pos, cap),
receiver: self.receiver,
},
Writer {
stream: write,
sender: self.sender,
},
))
}
}