use super::{validate, HyperIntoWsError, Request, WsUpgrade};
use bytes::BytesMut;
use client::async::ClientNew;
use codec::http::HttpServerCodec;
use codec::ws::{Context, MessageCodec};
use futures::sink::Send as SinkSend;
use futures::Stream as StreamTrait;
use futures::{Future, Sink};
use hyper::header::Headers;
use hyper::http::h1::Incoming;
use hyper::status::StatusCode;
use std::io::{self, ErrorKind};
use stream::async::Stream;
use tokio_codec::{Decoder, Framed, FramedParts};
use ws::util::update_framed_codec;
pub type Upgrade<S> = WsUpgrade<S, BytesMut>;
impl<S> WsUpgrade<S, BytesMut>
where
S: Stream + Send + 'static,
{
pub fn accept(self) -> ClientNew<S> {
self.internal_accept(None)
}
pub fn accept_with(self, custom_headers: &Headers) -> ClientNew<S> {
self.internal_accept(Some(custom_headers))
}
fn internal_accept(mut self, custom_headers: Option<&Headers>) -> ClientNew<S> {
let status = self.prepare_headers(custom_headers);
let WsUpgrade {
headers,
stream,
request,
buffer,
} = self;
let mut parts = FramedParts::new(stream, HttpServerCodec);
parts.read_buf = buffer;
let duplex = Framed::from_parts(parts);
let future = duplex
.send(Incoming {
version: request.version,
subject: status,
headers: headers.clone(),
})
.map(move |s| {
let codec = MessageCodec::default(Context::Server);
let client = update_framed_codec(s, codec);
(client, headers)
})
.map_err(Into::into);
Box::new(future)
}
pub fn reject(self) -> SinkSend<Framed<S, HttpServerCodec>> {
self.internal_reject(None)
}
pub fn reject_with(self, headers: &Headers) -> SinkSend<Framed<S, HttpServerCodec>> {
self.internal_reject(Some(headers))
}
fn internal_reject(
mut self,
headers: Option<&Headers>,
) -> SinkSend<Framed<S, HttpServerCodec>> {
if let Some(custom) = headers {
self.headers.extend(custom.iter());
}
let mut parts = FramedParts::new(self.stream, HttpServerCodec);
parts.read_buf = self.buffer;
let duplex = Framed::from_parts(parts);
duplex.send(Incoming {
version: self.request.version,
subject: StatusCode::BadRequest,
headers: self.headers,
})
}
}
pub trait IntoWs {
type Stream: Stream;
type Error;
fn into_ws(self) -> Box<Future<Item = Upgrade<Self::Stream>, Error = Self::Error> + Send>;
}
impl<S> IntoWs for S
where
S: Stream + Send + 'static,
{
type Stream = S;
type Error = (S, Option<Request>, BytesMut, HyperIntoWsError);
fn into_ws(self) -> Box<Future<Item = Upgrade<Self::Stream>, Error = Self::Error> + Send> {
let future = HttpServerCodec
.framed(self)
.into_future()
.map_err(|(e, s)| {
let FramedParts { io, read_buf, .. } = s.into_parts();
(io, None, read_buf, e.into())
})
.and_then(|(m, s)| {
let FramedParts { io, read_buf, .. } = s.into_parts();
if let Some(msg) = m {
match validate(&msg.subject.0, msg.version, &msg.headers) {
Ok(()) => Ok((msg, io, read_buf)),
Err(e) => Err((io, Some(msg), read_buf, e)),
}
} else {
let err = HyperIntoWsError::Io(io::Error::new(
ErrorKind::ConnectionReset,
"Connection dropped before handshake could be read",
));
Err((io, None, read_buf, err))
}
})
.map(|(m, stream, buffer)| WsUpgrade {
headers: Headers::new(),
stream,
request: m,
buffer,
});
Box::new(future)
}
}