1#![forbid(unsafe_code)]
2#![warn(clippy::pedantic)]
3#![allow(clippy::missing_errors_doc)]
4
5use std::{
6 net::SocketAddr,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11use futures::{ready, AsyncRead, AsyncWrite, Future, StreamExt};
12use wta_reactor::net::{Accept, TcpListener, TcpStream};
13
14pub struct Incoming {
15 accept: Accept,
16}
17
18impl Incoming {
19 pub fn bind(addr: SocketAddr) -> std::io::Result<Self> {
20 Ok(Self::new(TcpListener::bind(addr)?))
21 }
22
23 pub fn new(listener: TcpListener) -> Self {
24 Self {
25 accept: listener.accept(),
26 }
27 }
28}
29
30impl Unpin for Incoming {}
31
32impl hyper::server::accept::Accept for Incoming {
33 type Conn = AddrStream;
34
35 type Error = std::io::Error;
36
37 fn poll_accept(
38 mut self: std::pin::Pin<&mut Self>,
39 cx: &mut Context<'_>,
40 ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
41 match self.accept.poll_next_unpin(cx) {
42 Poll::Ready(Some(Ok((stream, socket)))) => {
43 Poll::Ready(Some(Ok(AddrStream { stream, socket })))
44 }
45 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
46 Poll::Ready(None) => Poll::Ready(None),
47 Poll::Pending => Poll::Pending,
48 }
49 }
50}
51
52#[derive(Clone)]
54pub struct Executor;
55impl<F> hyper::rt::Executor<F> for Executor
56where
57 F: Future + Send + Sync + 'static,
58 F::Output: Send,
59{
60 fn execute(&self, fut: F) {
61 wta_executor::spawn(fut);
62 }
63}
64
65pub struct AddrStream {
66 stream: TcpStream,
67 socket: SocketAddr,
68}
69
70impl AddrStream {
71 pub fn remote_addr(&self) -> SocketAddr {
72 self.socket
73 }
74}
75
76impl tokio::io::AsyncRead for AddrStream {
77 fn poll_read(
78 mut self: Pin<&mut Self>,
79 cx: &mut Context<'_>,
80 buf: &mut tokio::io::ReadBuf<'_>,
81 ) -> Poll<std::io::Result<()>> {
82 let pin = Pin::new(&mut self.stream);
83 let n = ready!(pin.poll_read(cx, buf.initialize_unfilled())?);
84 buf.advance(n);
85 Poll::Ready(Ok(()))
86 }
87}
88
89impl tokio::io::AsyncWrite for AddrStream {
90 fn poll_write(
91 mut self: Pin<&mut Self>,
92 cx: &mut Context<'_>,
93 buf: &[u8],
94 ) -> Poll<Result<usize, std::io::Error>> {
95 let pin = Pin::new(&mut self.stream);
96 pin.poll_write(cx, buf)
97 }
98
99 fn poll_flush(
100 mut self: Pin<&mut Self>,
101 cx: &mut Context<'_>,
102 ) -> Poll<Result<(), std::io::Error>> {
103 let pin = Pin::new(&mut self.stream);
104 pin.poll_flush(cx)
105 }
106
107 fn poll_shutdown(
108 mut self: Pin<&mut Self>,
109 cx: &mut Context<'_>,
110 ) -> Poll<Result<(), std::io::Error>> {
111 let pin = Pin::new(&mut self.stream);
112 pin.poll_close(cx)
113 }
114}