use std::any::TypeId;
use std::error::Error as StdError;
use std::fmt;
use std::io::{self, Read, Write};
use bytes::{Buf, BufMut, Bytes};
use futures::{Async, Future, Poll};
use futures::sync::oneshot;
use tokio_io::{AsyncRead, AsyncWrite};
use common::io::Rewind;
pub struct Upgraded {
io: Rewind<Box<Io + Send>>,
}
pub struct OnUpgrade {
rx: Option<oneshot::Receiver<::Result<Upgraded>>>,
}
#[derive(Debug)]
pub struct Parts<T> {
pub io: T,
pub read_buf: Bytes,
_inner: (),
}
pub(crate) struct Pending {
tx: oneshot::Sender<::Result<Upgraded>>
}
#[derive(Debug)]
struct UpgradeExpected(());
pub(crate) fn pending() -> (Pending, OnUpgrade) {
let (tx, rx) = oneshot::channel();
(
Pending {
tx,
},
OnUpgrade {
rx: Some(rx),
},
)
}
pub(crate) trait Io: AsyncRead + AsyncWrite + 'static {
fn __hyper_type_id(&self) -> TypeId {
TypeId::of::<Self>()
}
}
impl Io + Send {
fn __hyper_is<T: Io>(&self) -> bool {
let t = TypeId::of::<T>();
self.__hyper_type_id() == t
}
fn __hyper_downcast<T: Io>(self: Box<Self>) -> Result<Box<T>, Box<Self>> {
if self.__hyper_is::<T>() {
unsafe {
let raw: *mut Io = Box::into_raw(self);
Ok(Box::from_raw(raw as *mut T))
}
} else {
Err(self)
}
}
}
impl<T: AsyncRead + AsyncWrite + 'static> Io for T {}
impl Upgraded {
pub(crate) fn new(io: Box<Io + Send>, read_buf: Bytes) -> Self {
Upgraded {
io: Rewind::new_buffered(io, read_buf),
}
}
pub fn downcast<T: AsyncRead + AsyncWrite + 'static>(self) -> Result<Parts<T>, Self> {
let (io, buf) = self.io.into_inner();
match io.__hyper_downcast() {
Ok(t) => Ok(Parts {
io: *t,
read_buf: buf,
_inner: (),
}),
Err(io) => Err(Upgraded {
io: Rewind::new_buffered(io, buf),
})
}
}
}
impl Read for Upgraded {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.io.read(buf)
}
}
impl Write for Upgraded {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf)
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
self.io.flush()
}
}
impl AsyncRead for Upgraded {
#[inline]
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.io.prepare_uninitialized_buffer(buf)
}
#[inline]
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.io.read_buf(buf)
}
}
impl AsyncWrite for Upgraded {
#[inline]
fn shutdown(&mut self) -> Poll<(), io::Error> {
AsyncWrite::shutdown(&mut self.io)
}
#[inline]
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.io.write_buf(buf)
}
}
impl fmt::Debug for Upgraded {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Upgraded")
.finish()
}
}
impl OnUpgrade {
pub(crate) fn none() -> Self {
OnUpgrade {
rx: None,
}
}
pub(crate) fn is_none(&self) -> bool {
self.rx.is_none()
}
}
impl Future for OnUpgrade {
type Item = Upgraded;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.rx {
Some(ref mut rx) => match rx.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(Ok(upgraded))) => Ok(Async::Ready(upgraded)),
Ok(Async::Ready(Err(err))) => Err(err),
Err(_oneshot_canceled) => Err(
::Error::new_canceled(Some(UpgradeExpected(())))
),
},
None => Err(::Error::new_user_no_upgrade()),
}
}
}
impl fmt::Debug for OnUpgrade {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("OnUpgrade")
.finish()
}
}
impl Pending {
pub(crate) fn fulfill(self, upgraded: Upgraded) {
trace!("pending upgrade fulfill");
let _ = self.tx.send(Ok(upgraded));
}
pub(crate) fn manual(self) {
trace!("pending upgrade handled manually");
let _ = self.tx.send(Err(::Error::new_user_manual_upgrade()));
}
}
impl fmt::Display for UpgradeExpected {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(self.description())
}
}
impl StdError for UpgradeExpected {
fn description(&self) -> &str {
"upgrade expected but not completed"
}
}