1#![allow(unsafe_code)]
5#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
6
7use async_trait::async_trait;
8use executor_trait::BlockingExecutor;
9use futures_core::Stream;
10use futures_io::{AsyncRead, AsyncWrite};
11use std::{
12 fmt,
13 io::{self, IoSlice, IoSliceMut, Read, Write},
14 net::{SocketAddr, ToSocketAddrs},
15 ops::Deref,
16 time::{Duration, Instant},
17};
18use sys::IO;
19
20pub struct IOHandle(Box<dyn IO + Send>);
22
23impl IOHandle {
24 pub fn new<H: IO + Send + 'static>(io: H) -> Self {
26 Self(Box::new(io))
27 }
28}
29
30impl Read for IOHandle {
31 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
32 self.0.read(buf)
33 }
34
35 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
36 self.0.read_vectored(bufs)
37 }
38
39 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
40 self.0.read_to_end(buf)
41 }
42
43 fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
44 self.0.read_to_string(buf)
45 }
46
47 fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
48 self.0.read_exact(buf)
49 }
50}
51
52impl Write for IOHandle {
53 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
54 self.0.write(buf)
55 }
56
57 fn flush(&mut self) -> io::Result<()> {
58 self.0.flush()
59 }
60
61 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
62 self.0.write_vectored(bufs)
63 }
64
65 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
66 self.0.write_all(buf)
67 }
68
69 fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> {
70 self.0.write_fmt(fmt)
71 }
72}
73
74impl fmt::Debug for IOHandle {
75 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76 f.debug_tuple("IOHandle").finish()
77 }
78}
79
80#[cfg(feature = "async_io_safe")]
81unsafe impl async_io::IoSafe for IOHandle {}
82
83pub trait AsyncIOHandle: AsyncRead + AsyncWrite {}
85impl<IO: AsyncRead + AsyncWrite> AsyncIOHandle for IO {}
86
87pub trait Reactor {
89 fn register(&self, socket: IOHandle) -> io::Result<Box<dyn AsyncIOHandle + Send>>;
91}
92
93#[async_trait]
95pub trait TimeReactor {
96 async fn sleep(&self, dur: Duration);
98 fn interval(&self, dur: Duration) -> Box<dyn Stream<Item = Instant>>;
100}
101
102#[async_trait]
104pub trait TcpReactor {
105 async fn connect<A: Into<SocketAddr> + Send>(
107 addr: A,
109 ) -> io::Result<Box<dyn AsyncIOHandle + Send>>;
110}
111
112#[async_trait]
114pub trait AsyncToSocketAddrs {
115 async fn to_socket_addrs(&self) -> io::Result<Box<dyn Iterator<Item = SocketAddr> + Send + Sync>>;
117}
118
119#[async_trait]
120impl<E: Deref + Send + Sync, A: ToSocketAddrs + Clone + Send + Sync + 'static> AsyncToSocketAddrs
121 for (E, A)
122where
123 E::Target: BlockingExecutor + Send + Sync,
124{
125 async fn to_socket_addrs(&self) -> io::Result<Box<dyn Iterator<Item = SocketAddr> + Send + Sync>> {
128 let (executor, addrs) = self;
129 let addrs = addrs.clone();
130 let (sender, receiver) = flume::bounded(1);
131
132 executor
133 .spawn_blocking(Box::new(move || {
134 sender
135 .send(
136 addrs
137 .to_socket_addrs()
138 .map(|addrs| addrs.collect::<Vec<_>>()),
139 )
140 .unwrap();
141 }))
142 .await;
143 let addrs = receiver.recv_async().await.map_err(io::Error::other)??;
144 Ok(Box::new(addrs.into_iter()))
145 }
146}
147
148#[cfg(unix)]
149mod sys {
150 use crate::IOHandle;
151 use std::{
152 io::{Read, Write},
153 os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd},
154 };
155
156 pub trait IO: Read + Write + AsFd {}
157 impl<H: Read + Write + AsFd> IO for H {}
158
159 impl AsFd for IOHandle {
160 fn as_fd(&self) -> BorrowedFd<'_> {
161 self.0.as_fd()
162 }
163 }
164
165 impl AsRawFd for IOHandle {
166 fn as_raw_fd(&self) -> RawFd {
167 self.as_fd().as_raw_fd()
168 }
169 }
170}
171
172#[cfg(windows)]
173mod sys {
174 use crate::IOHandle;
175 use std::{
176 io::{Read, Write},
177 os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, RawSocket},
178 };
179
180 pub trait IO: Read + Write + AsSocket {}
181 impl<H: Read + Write + AsSocket> IO for H {}
182
183 impl AsSocket for IOHandle {
184 fn as_socket(&self) -> BorrowedSocket<'_> {
185 self.0.as_socket()
186 }
187 }
188
189 impl AsRawSocket for IOHandle {
190 fn as_raw_socket(&self) -> RawSocket {
191 self.as_socket().as_raw_socket()
192 }
193 }
194}