use std::{
io,
ops::DerefMut,
pin::Pin,
task::{Context, Poll, ready},
};
use compio::io::{AsyncRead, AsyncWrite, compat::AsyncStream};
use send_wrapper::SendWrapper;
pub struct HyperStream<S>(SendWrapper<AsyncStream<S>>);
impl<S> HyperStream<S> {
pub fn new(s: S) -> Self {
Self(SendWrapper::new(AsyncStream::new(s)))
}
pub fn get_ref(&self) -> &S {
self.0.get_ref()
}
}
impl<S> std::fmt::Debug for HyperStream<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HyperStream").finish_non_exhaustive()
}
}
impl<S: AsyncRead + Unpin + 'static> hyper::rt::Read for HyperStream<S> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut buf: hyper::rt::ReadBufCursor<'_>,
) -> Poll<io::Result<()>> {
let stream = unsafe { self.map_unchecked_mut(|this| this.0.deref_mut()) };
let slice = unsafe { buf.as_mut() };
let len = ready!(stream.poll_read_uninit(cx, slice))?;
unsafe { buf.advance(len) };
Poll::Ready(Ok(()))
}
}
impl<S: AsyncWrite + Unpin + 'static> hyper::rt::Write for HyperStream<S> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let stream = unsafe { self.map_unchecked_mut(|this| this.0.deref_mut()) };
futures_util::AsyncWrite::poll_write(stream, cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let stream = unsafe { self.map_unchecked_mut(|this| this.0.deref_mut()) };
futures_util::AsyncWrite::poll_flush(stream, cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let stream = unsafe { self.map_unchecked_mut(|this| this.0.deref_mut()) };
futures_util::AsyncWrite::poll_close(stream, cx)
}
}