wta_hyper/
lib.rs

1#![forbid(unsafe_code)]
2#![warn(clippy::pedantic)]
3#![allow(clippy::missing_errors_doc)]
4
5use std::{
6    net::SocketAddr,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use futures::{ready, AsyncRead, AsyncWrite, Future, StreamExt};
12use wta_reactor::net::{Accept, TcpListener, TcpStream};
13
14pub struct Incoming {
15    accept: Accept,
16}
17
18impl Incoming {
19    pub fn bind(addr: SocketAddr) -> std::io::Result<Self> {
20        Ok(Self::new(TcpListener::bind(addr)?))
21    }
22
23    pub fn new(listener: TcpListener) -> Self {
24        Self {
25            accept: listener.accept(),
26        }
27    }
28}
29
30impl Unpin for Incoming {}
31
32impl hyper::server::accept::Accept for Incoming {
33    type Conn = AddrStream;
34
35    type Error = std::io::Error;
36
37    fn poll_accept(
38        mut self: std::pin::Pin<&mut Self>,
39        cx: &mut Context<'_>,
40    ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
41        match self.accept.poll_next_unpin(cx) {
42            Poll::Ready(Some(Ok((stream, socket)))) => {
43                Poll::Ready(Some(Ok(AddrStream { stream, socket })))
44            }
45            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
46            Poll::Ready(None) => Poll::Ready(None),
47            Poll::Pending => Poll::Pending,
48        }
49    }
50}
51
52/// what-the-async executor
53#[derive(Clone)]
54pub struct Executor;
55impl<F> hyper::rt::Executor<F> for Executor
56where
57    F: Future + Send + Sync + 'static,
58    F::Output: Send,
59{
60    fn execute(&self, fut: F) {
61        wta_executor::spawn(fut);
62    }
63}
64
65pub struct AddrStream {
66    stream: TcpStream,
67    socket: SocketAddr,
68}
69
70impl AddrStream {
71    pub fn remote_addr(&self) -> SocketAddr {
72        self.socket
73    }
74}
75
76impl tokio::io::AsyncRead for AddrStream {
77    fn poll_read(
78        mut self: Pin<&mut Self>,
79        cx: &mut Context<'_>,
80        buf: &mut tokio::io::ReadBuf<'_>,
81    ) -> Poll<std::io::Result<()>> {
82        let pin = Pin::new(&mut self.stream);
83        let n = ready!(pin.poll_read(cx, buf.initialize_unfilled())?);
84        buf.advance(n);
85        Poll::Ready(Ok(()))
86    }
87}
88
89impl tokio::io::AsyncWrite for AddrStream {
90    fn poll_write(
91        mut self: Pin<&mut Self>,
92        cx: &mut Context<'_>,
93        buf: &[u8],
94    ) -> Poll<Result<usize, std::io::Error>> {
95        let pin = Pin::new(&mut self.stream);
96        pin.poll_write(cx, buf)
97    }
98
99    fn poll_flush(
100        mut self: Pin<&mut Self>,
101        cx: &mut Context<'_>,
102    ) -> Poll<Result<(), std::io::Error>> {
103        let pin = Pin::new(&mut self.stream);
104        pin.poll_flush(cx)
105    }
106
107    fn poll_shutdown(
108        mut self: Pin<&mut Self>,
109        cx: &mut Context<'_>,
110    ) -> Poll<Result<(), std::io::Error>> {
111        let pin = Pin::new(&mut self.stream);
112        pin.poll_close(cx)
113    }
114}