1#![allow(unsafe_code)]
5#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
6
7use async_trait::async_trait;
8use executor_trait::BlockingExecutor;
9use futures_core::Stream;
10use futures_io::{AsyncRead, AsyncWrite};
11use std::{
12 fmt,
13 io::{self, IoSlice, IoSliceMut, Read, Write},
14 net::{SocketAddr, ToSocketAddrs},
15 ops::Deref,
16 time::{Duration, Instant},
17};
18use sys::IO;
19
20pub struct IOHandle(Box<dyn IO + Send>);
22
23impl IOHandle {
24 pub fn new<H: IO + Send + 'static>(io: H) -> Self {
26 Self(Box::new(io))
27 }
28}
29
30impl Read for IOHandle {
31 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
32 self.0.read(buf)
33 }
34
35 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
36 self.0.read_vectored(bufs)
37 }
38
39 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
40 self.0.read_to_end(buf)
41 }
42
43 fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
44 self.0.read_to_string(buf)
45 }
46
47 fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
48 self.0.read_exact(buf)
49 }
50}
51
52impl Write for IOHandle {
53 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
54 self.0.write(buf)
55 }
56
57 fn flush(&mut self) -> io::Result<()> {
58 self.0.flush()
59 }
60
61 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
62 self.0.write_vectored(bufs)
63 }
64
65 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
66 self.0.write_all(buf)
67 }
68
69 fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> {
70 self.0.write_fmt(fmt)
71 }
72}
73
74impl fmt::Debug for IOHandle {
75 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76 f.debug_tuple("IOHandle").finish()
77 }
78}
79
80#[cfg(feature = "async_io_safe")]
81unsafe impl async_io::IoSafe for IOHandle {}
82
83pub trait AsyncIOHandle: AsyncRead + AsyncWrite {}
85impl<IO: AsyncRead + AsyncWrite> AsyncIOHandle for IO {}
86
87pub trait Reactor {
89 fn register(&self, socket: IOHandle) -> io::Result<Box<dyn AsyncIOHandle + Send>>;
91}
92
93#[async_trait]
95pub trait TimeReactor {
96 async fn sleep(&self, dur: Duration);
98 fn interval(&self, dur: Duration) -> Box<dyn Stream<Item = Instant>>;
100}
101
102#[async_trait]
104pub trait TcpReactor {
105 async fn connect(&self, addr: SocketAddr) -> io::Result<Box<dyn AsyncIOHandle + Send>>;
107}
108
109#[async_trait]
111pub trait AsyncToSocketAddrs {
112 async fn to_socket_addrs(
114 &self,
115 ) -> io::Result<Box<dyn Iterator<Item = SocketAddr> + Send + Sync>>;
116}
117
118#[async_trait]
119impl<E: Deref + Send + Sync, A: ToSocketAddrs + Clone + Send + Sync + 'static> AsyncToSocketAddrs
120 for (E, A)
121where
122 E::Target: BlockingExecutor + Send + Sync,
123{
124 async fn to_socket_addrs(
127 &self,
128 ) -> io::Result<Box<dyn Iterator<Item = SocketAddr> + Send + Sync>> {
129 let (executor, addrs) = self;
130 let addrs = addrs.clone();
131 let (sender, receiver) = flume::bounded(1);
132
133 executor
134 .spawn_blocking(Box::new(move || {
135 sender
136 .send(
137 addrs
138 .to_socket_addrs()
139 .map(|addrs| addrs.collect::<Vec<_>>()),
140 )
141 .unwrap();
142 }))
143 .await;
144 let addrs = receiver.recv_async().await.map_err(io::Error::other)??;
145 Ok(Box::new(addrs.into_iter()))
146 }
147}
148
149#[cfg(unix)]
150mod sys {
151 use crate::IOHandle;
152 use std::{
153 io::{Read, Write},
154 os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd},
155 };
156
157 pub trait IO: Read + Write + AsFd {}
158 impl<H: Read + Write + AsFd> IO for H {}
159
160 impl AsFd for IOHandle {
161 fn as_fd(&self) -> BorrowedFd<'_> {
162 self.0.as_fd()
163 }
164 }
165
166 impl AsRawFd for IOHandle {
167 fn as_raw_fd(&self) -> RawFd {
168 self.as_fd().as_raw_fd()
169 }
170 }
171}
172
173#[cfg(windows)]
174mod sys {
175 use crate::IOHandle;
176 use std::{
177 io::{Read, Write},
178 os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, RawSocket},
179 };
180
181 pub trait IO: Read + Write + AsSocket {}
182 impl<H: Read + Write + AsSocket> IO for H {}
183
184 impl AsSocket for IOHandle {
185 fn as_socket(&self) -> BorrowedSocket<'_> {
186 self.0.as_socket()
187 }
188 }
189
190 impl AsRawSocket for IOHandle {
191 fn as_raw_socket(&self) -> RawSocket {
192 self.as_socket().as_raw_socket()
193 }
194 }
195}