use super::{validate, HyperIntoWsError, Request, WsUpgrade};
use crate::client::r#async::ClientNew;
use crate::codec::http::HttpServerCodec;
use crate::codec::ws::{Context, MessageCodec};
use crate::stream::r#async::Stream;
use crate::ws::util::update_framed_codec;
use bytes::BytesMut;
use futures::sink::Send as SinkSend;
use futures::Stream as StreamTrait;
use futures::{Future, Sink};
use hyper::header::Headers;
use hyper::http::h1::Incoming;
use hyper::status::StatusCode;
use std::io::{self, ErrorKind};
use tokio_codec::{Decoder, Framed, FramedParts};
const DEFAULT_MAX_DATAFRAME_SIZE : usize = 1024*1024*100;
const DEFAULT_MAX_MESSAGE_SIZE : usize = 1024*1024*200;
pub type Upgrade<S> = WsUpgrade<S, BytesMut>;
impl<S> WsUpgrade<S, BytesMut>
where
S: Stream + Send + 'static,
{
pub fn accept(self) -> ClientNew<S> {
self.internal_accept(None)
}
pub fn accept_with(self, custom_headers: &Headers) -> ClientNew<S> {
self.internal_accept(Some(custom_headers))
}
pub fn accept_with_limits(self, max_dataframe_size: usize, max_message_size: usize) -> ClientNew<S> {
self.internal_accept_with_limits(None, max_dataframe_size, max_message_size)
}
pub fn accept_with_headers_and_limits(self, custom_headers: &Headers, max_dataframe_size: usize, max_message_size: usize) -> ClientNew<S> {
self.internal_accept_with_limits(Some(custom_headers), max_dataframe_size, max_message_size)
}
fn internal_accept(self, custom_headers: Option<&Headers>) -> ClientNew<S> {
self.internal_accept_with_limits(custom_headers, DEFAULT_MAX_DATAFRAME_SIZE, DEFAULT_MAX_MESSAGE_SIZE)
}
fn internal_accept_with_limits(mut self, custom_headers: Option<&Headers>, max_dataframe_size: usize, max_message_size: usize) -> ClientNew<S> {
let status = self.prepare_headers(custom_headers);
let WsUpgrade {
headers,
stream,
request,
buffer,
} = self;
let mut parts = FramedParts::new(stream, HttpServerCodec);
parts.read_buf = buffer;
let duplex = Framed::from_parts(parts);
let future = duplex
.send(Incoming {
version: request.version,
subject: status,
headers: headers.clone(),
})
.map(move |s| {
let codec = MessageCodec::new_with_limits(Context::Server, max_dataframe_size, max_message_size);
let client = update_framed_codec(s, codec);
(client, headers)
})
.map_err(Into::into);
Box::new(future)
}
pub fn reject(self) -> SinkSend<Framed<S, HttpServerCodec>> {
self.internal_reject(None)
}
pub fn reject_with(self, headers: &Headers) -> SinkSend<Framed<S, HttpServerCodec>> {
self.internal_reject(Some(headers))
}
fn internal_reject(
mut self,
headers: Option<&Headers>,
) -> SinkSend<Framed<S, HttpServerCodec>> {
if let Some(custom) = headers {
self.headers.extend(custom.iter());
}
let mut parts = FramedParts::new(self.stream, HttpServerCodec);
parts.read_buf = self.buffer;
let duplex = Framed::from_parts(parts);
duplex.send(Incoming {
version: self.request.version,
subject: StatusCode::BadRequest,
headers: self.headers,
})
}
}
pub trait IntoWs {
type Stream: Stream;
type Error;
fn into_ws(self) -> Box<dyn Future<Item = Upgrade<Self::Stream>, Error = Self::Error> + Send>;
}
impl<S> IntoWs for S
where
S: Stream + Send + 'static,
{
type Stream = S;
type Error = (S, Option<Request>, BytesMut, HyperIntoWsError);
fn into_ws(self) -> Box<dyn Future<Item = Upgrade<Self::Stream>, Error = Self::Error> + Send> {
let future = HttpServerCodec
.framed(self)
.into_future()
.map_err(|(e, s)| {
let FramedParts { io, read_buf, .. } = s.into_parts();
(io, None, read_buf, e.into())
})
.and_then(|(m, s)| {
let FramedParts { io, read_buf, .. } = s.into_parts();
if let Some(msg) = m {
match validate(&msg.subject.0, msg.version, &msg.headers) {
Ok(()) => Ok((msg, io, read_buf)),
Err(e) => Err((io, Some(msg), read_buf, e)),
}
} else {
let err = HyperIntoWsError::Io(io::Error::new(
ErrorKind::ConnectionReset,
"Connection dropped before handshake could be read",
));
Err((io, None, read_buf, err))
}
})
.map(|(m, stream, buffer)| WsUpgrade {
headers: Headers::new(),
stream,
request: m,
buffer,
});
Box::new(future)
}
}