use std::{
error::Error as StdError,
fmt,
future::Future,
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use bytes::Bytes;
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
sync::oneshot,
};
use super::{Error, Result, common::rewind::Rewind};
use crate::sync::Mutex;
pub struct Upgraded {
io: Rewind<Box<dyn Io + Send>>,
}
#[derive(Clone)]
pub struct OnUpgrade {
rx: Option<Arc<Mutex<oneshot::Receiver<Result<Upgraded>>>>>,
}
#[inline]
pub fn on<T: sealed::CanUpgrade>(msg: T) -> OnUpgrade {
msg.on_upgrade()
}
pub(super) struct Pending {
tx: oneshot::Sender<Result<Upgraded>>,
}
pub(super) fn pending() -> (Pending, OnUpgrade) {
let (tx, rx) = oneshot::channel();
(
Pending { tx },
OnUpgrade {
rx: Some(Arc::new(Mutex::new(rx))),
},
)
}
impl Upgraded {
#[inline]
pub(super) fn new<T>(io: T, read_buf: Bytes) -> Self
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
Upgraded {
io: Rewind::new_buffered(Box::new(io), read_buf),
}
}
}
impl AsyncRead for Upgraded {
#[inline]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_read(cx, buf)
}
}
impl AsyncWrite for Upgraded {
#[inline]
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.io).poll_write(cx, buf)
}
#[inline]
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.io).poll_write_vectored(cx, bufs)
}
#[inline]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_flush(cx)
}
#[inline]
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_shutdown(cx)
}
#[inline]
fn is_write_vectored(&self) -> bool {
self.io.is_write_vectored()
}
}
impl fmt::Debug for Upgraded {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Upgraded").finish()
}
}
impl OnUpgrade {
#[inline]
pub(super) fn none() -> Self {
OnUpgrade { rx: None }
}
#[inline]
pub(super) fn is_none(&self) -> bool {
self.rx.is_none()
}
}
impl Future for OnUpgrade {
type Output = std::result::Result<Upgraded, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.rx {
Some(ref rx) => Pin::new(&mut *rx.lock()).poll(cx).map(|res| match res {
Ok(Ok(upgraded)) => Ok(upgraded),
Ok(Err(err)) => Err(err),
Err(_oneshot_canceled) => Err(Error::new_canceled().with(UpgradeExpected)),
}),
None => Poll::Ready(Err(Error::new_user_no_upgrade())),
}
}
}
impl Pending {
#[inline]
pub(super) fn fulfill(self, upgraded: Upgraded) {
trace!("pending upgrade fulfill");
let _ = self.tx.send(Ok(upgraded));
}
#[inline]
pub(super) fn manual(self) {
trace!("pending upgrade handled manually");
let _ = self.tx.send(Err(Error::new_user_manual_upgrade()));
}
}
#[derive(Debug)]
struct UpgradeExpected;
impl fmt::Display for UpgradeExpected {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("upgrade expected but not completed")
}
}
impl StdError for UpgradeExpected {}
trait Io: AsyncRead + AsyncWrite + Unpin + 'static {}
impl<T: AsyncRead + AsyncWrite + Unpin + 'static> Io for T {}
impl dyn Io + Send {}
mod sealed {
use super::OnUpgrade;
pub trait CanUpgrade {
fn on_upgrade(self) -> OnUpgrade;
}
impl<B> CanUpgrade for http::Request<B> {
fn on_upgrade(mut self) -> OnUpgrade {
self.extensions_mut()
.remove::<OnUpgrade>()
.unwrap_or_else(OnUpgrade::none)
}
}
impl<B> CanUpgrade for &'_ mut http::Request<B> {
fn on_upgrade(self) -> OnUpgrade {
self.extensions_mut()
.remove::<OnUpgrade>()
.unwrap_or_else(OnUpgrade::none)
}
}
impl<B> CanUpgrade for http::Response<B> {
fn on_upgrade(mut self) -> OnUpgrade {
self.extensions_mut()
.remove::<OnUpgrade>()
.unwrap_or_else(OnUpgrade::none)
}
}
impl<B> CanUpgrade for &'_ mut http::Response<B> {
fn on_upgrade(self) -> OnUpgrade {
self.extensions_mut()
.remove::<OnUpgrade>()
.unwrap_or_else(OnUpgrade::none)
}
}
}