cyper_core/
stream.rs

1use std::{
2    io,
3    ops::DerefMut,
4    pin::Pin,
5    task::{Context, Poll, ready},
6};
7
8use compio::io::{AsyncRead, AsyncWrite, compat::AsyncStream};
9use send_wrapper::SendWrapper;
10
11/// A stream wrapper for hyper.
12pub struct HyperStream<S>(SendWrapper<AsyncStream<S>>);
13
14impl<S> HyperStream<S> {
15    /// Create a hyper stream wrapper.
16    pub fn new(s: S) -> Self {
17        Self(SendWrapper::new(AsyncStream::new(s)))
18    }
19
20    /// Get the reference of the inner stream.
21    pub fn get_ref(&self) -> &S {
22        self.0.get_ref()
23    }
24}
25
26impl<S> std::fmt::Debug for HyperStream<S> {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("HyperStream").finish_non_exhaustive()
29    }
30}
31
32impl<S: AsyncRead + Unpin + 'static> hyper::rt::Read for HyperStream<S> {
33    fn poll_read(
34        self: Pin<&mut Self>,
35        cx: &mut Context<'_>,
36        mut buf: hyper::rt::ReadBufCursor<'_>,
37    ) -> Poll<io::Result<()>> {
38        let stream = unsafe { self.map_unchecked_mut(|this| this.0.deref_mut()) };
39        let slice = unsafe { buf.as_mut() };
40        let len = ready!(stream.poll_read_uninit(cx, slice))?;
41        unsafe { buf.advance(len) };
42        Poll::Ready(Ok(()))
43    }
44}
45
46impl<S: AsyncWrite + Unpin + 'static> hyper::rt::Write for HyperStream<S> {
47    fn poll_write(
48        self: Pin<&mut Self>,
49        cx: &mut Context<'_>,
50        buf: &[u8],
51    ) -> Poll<io::Result<usize>> {
52        let stream = unsafe { self.map_unchecked_mut(|this| this.0.deref_mut()) };
53        futures_util::AsyncWrite::poll_write(stream, cx, buf)
54    }
55
56    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
57        let stream = unsafe { self.map_unchecked_mut(|this| this.0.deref_mut()) };
58        futures_util::AsyncWrite::poll_flush(stream, cx)
59    }
60
61    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
62        let stream = unsafe { self.map_unchecked_mut(|this| this.0.deref_mut()) };
63        futures_util::AsyncWrite::poll_close(stream, cx)
64    }
65}