asynk_hyper/
tcp.rs

1use futures::{AsyncRead, AsyncWrite, Stream, StreamExt};
2use hyper::rt::{Read, ReadBufCursor, Write};
3use std::{
4    io,
5    mem::MaybeUninit,
6    net::SocketAddr,
7    pin::Pin,
8    task::{ready, Context, Poll},
9};
10
11pub struct TcpListener(asynk::net::TcpListener);
12
13impl TcpListener {
14    pub fn bind(addr: SocketAddr) -> io::Result<Self> {
15        Ok(Self(asynk::net::TcpListener::bind(addr)?))
16    }
17
18    pub fn accept(self) -> io::Result<Accept> {
19        Ok(Accept(self.0.accept()?))
20    }
21}
22
23pub struct Accept(asynk::net::Accept);
24
25impl Stream for Accept {
26    type Item = io::Result<(TcpStream, SocketAddr)>;
27
28    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
29        let Some(res) = ready!(self.0.poll_next_unpin(cx)) else {
30            return Poll::Ready(None);
31        };
32
33        let (stream, addr) = res?;
34        let stream = stream.into();
35
36        Poll::Ready(Some(Ok((stream, addr))))
37    }
38}
39
40/// TcpStream adapter for `hyper`
41pub struct TcpStream(asynk::net::TcpStream);
42
43impl From<asynk::net::TcpStream> for TcpStream {
44    fn from(stream: asynk::net::TcpStream) -> Self {
45        Self(stream)
46    }
47}
48
49impl Read for TcpStream {
50    fn poll_read(
51        mut self: Pin<&mut Self>,
52        cx: &mut Context<'_>,
53        mut buf: ReadBufCursor<'_>,
54    ) -> Poll<io::Result<()>> {
55        unsafe {
56            let b = buf.as_mut();
57            let b = &mut *(b as *mut [MaybeUninit<u8>] as *mut [u8]);
58            let n = ready!(Pin::new(&mut self.0).poll_read(cx, b))?;
59            // TODO: safety?
60            buf.advance(n);
61        };
62
63        Poll::Ready(Ok(()))
64    }
65}
66
67impl Write for TcpStream {
68    fn poll_write(
69        mut self: Pin<&mut Self>,
70        cx: &mut Context<'_>,
71        buf: &[u8],
72    ) -> Poll<io::Result<usize>> {
73        Pin::new(&mut self.0).poll_write(cx, buf)
74    }
75
76    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
77        Pin::new(&mut self.0).poll_flush(cx)
78    }
79
80    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
81        Pin::new(&mut self.0).poll_close(cx)
82    }
83}