1use std::{
2 io,
3 ops::DerefMut,
4 pin::Pin,
5 task::{Context, Poll, ready},
6};
7
8use compio::io::{AsyncRead, AsyncWrite, compat::AsyncStream};
9use send_wrapper::SendWrapper;
10
11pub struct HyperStream<S>(SendWrapper<AsyncStream<S>>);
13
14impl<S> HyperStream<S> {
15 pub fn new(s: S) -> Self {
17 Self(SendWrapper::new(AsyncStream::new(s)))
18 }
19
20 pub fn get_ref(&self) -> &S {
22 self.0.get_ref()
23 }
24}
25
26impl<S> std::fmt::Debug for HyperStream<S> {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 f.debug_struct("HyperStream").finish_non_exhaustive()
29 }
30}
31
32impl<S: AsyncRead + Unpin + 'static> hyper::rt::Read for HyperStream<S> {
33 fn poll_read(
34 self: Pin<&mut Self>,
35 cx: &mut Context<'_>,
36 mut buf: hyper::rt::ReadBufCursor<'_>,
37 ) -> Poll<io::Result<()>> {
38 let stream = unsafe { self.map_unchecked_mut(|this| this.0.deref_mut()) };
39 let slice = unsafe { buf.as_mut() };
40 let len = ready!(stream.poll_read_uninit(cx, slice))?;
41 unsafe { buf.advance(len) };
42 Poll::Ready(Ok(()))
43 }
44}
45
46impl<S: AsyncWrite + Unpin + 'static> hyper::rt::Write for HyperStream<S> {
47 fn poll_write(
48 self: Pin<&mut Self>,
49 cx: &mut Context<'_>,
50 buf: &[u8],
51 ) -> Poll<io::Result<usize>> {
52 let stream = unsafe { self.map_unchecked_mut(|this| this.0.deref_mut()) };
53 futures_util::AsyncWrite::poll_write(stream, cx, buf)
54 }
55
56 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
57 let stream = unsafe { self.map_unchecked_mut(|this| this.0.deref_mut()) };
58 futures_util::AsyncWrite::poll_flush(stream, cx)
59 }
60
61 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
62 let stream = unsafe { self.map_unchecked_mut(|this| this.0.deref_mut()) };
63 futures_util::AsyncWrite::poll_close(stream, cx)
64 }
65}