orb/io/mod.rs
1//! Asynchronous I/O traits and utilities.
2//!
3//! This module provides traits for performing asynchronous I/O operations
4//! in a runtime-agnostic way. It includes functionality for connecting to
5//! network services, working with file descriptors, and performing async
6//! read/write operations.
7//!
8//! Further more, we have abstract buffered I/O with [AsyncBufRead], [AsyncBufWrite], and [AsyncBufStream]
9//!
10//! # Design Notes
11//!
12//! We choose to provide `async fn` style IO function instead of `poll_xxx` style functions, because:
13//!
14//! - `async-io` crate don't have `poll_xxx` interfaces
15//! - `poll_xxx` functions is pre-async-await stuff and difficult to use.
16//! - you can always make an async fn with `poll_xxx`
17//!
18//! We choose to abstract [AsyncFd] instead of stream, because:
19//! - All async stream can be converted between std version of stream
20//! - All types of files/streams and be converted between OS raw fd.
21//! - There's slight difference between tokio stream and async-io counterparts.
22//! - What we do here is just wrap any std blocking function with async poller when they are
23//! readable or writeable, similar with `async-io`, as a light-weight implementation.
24
25use std::future::Future;
26use std::io;
27use std::net::SocketAddr;
28use std::net::TcpStream;
29use std::ops::Deref;
30use std::os::fd::{AsFd, AsRawFd};
31use std::os::unix::net::UnixStream;
32use std::path::PathBuf;
33
34mod buf_io;
35pub use buf_io::{AsyncBufRead, AsyncBufStream, AsyncBufWrite};
36
37/// Helper macro to convert timeout errors to IO errors.
38///
39/// This macro is used internally to convert the `()` error returned by
40/// timeout functions into a proper `io::Error` with `TimedOut` kind.
41macro_rules! io_with_timeout {
42 ($IO: path, $timeout: expr, $f: expr) => {{
43 if $timeout == Duration::from_secs(0) {
44 $f.await
45 } else {
46 // the crate reference make this macro not exportable
47 match <$IO as crate::time::AsyncTime>::timeout($timeout, $f).await {
48 Ok(Ok(r)) => Ok(r),
49 Ok(Err(e)) => Err(e),
50 Err(_) => Err(io::ErrorKind::TimedOut.into()),
51 }
52 }
53 }};
54}
55pub(super) use io_with_timeout;
56
57/// Trait for async I/O operations.
58///
59/// This trait defines the interface for performing asynchronous I/O operations
60/// such as connecting to network services and converting file descriptors to
61/// async handles.
62///
63/// # Associated Types
64///
65/// * `AsyncFd` - The type used to represent async file descriptors
66pub trait AsyncIO {
67 /// The type used to represent async file descriptors.
68 ///
69 /// This associated type represents a wrapper around a file descriptor
70 /// that provides async read/write operations.
71 type AsyncFd<T: AsRawFd + AsFd + Send + Sync + 'static>: AsyncFd<T>;
72
73 /// Connect to a TCP address asynchronously.
74 ///
75 /// # NOTE
76 ///
77 /// This is for runtime implementation, for user should use [`TcpStream::<IO>::connect()`](crate::net::TcpStream) instead**.
78 ///
79 /// This method attempts to establish a TCP connection to the specified
80 /// address, returning an async file descriptor that can be used for
81 /// communication.
82 ///
83 /// # Parameters
84 ///
85 /// * `addr` - The socket address to connect to
86 ///
87 /// # Returns
88 ///
89 /// A future that resolves to a `Result` containing either the connected
90 /// async file descriptor or an I/O error.
91 fn connect_tcp(
92 addr: &SocketAddr,
93 ) -> impl Future<Output = io::Result<Self::AsyncFd<TcpStream>>> + Send;
94
95 /// Connect to a Unix socket address asynchronously.
96 ///
97 /// # NOTE
98 ///
99 /// This is for runtime implementation, for user should use [`UnixStream::<IO>::connect()`](crate::net::UnixStream) instead**.
100 ///
101 /// This method attempts to establish a Unix socket connection to the
102 /// specified path, returning an async file descriptor that can be used
103 /// for communication.
104 ///
105 /// # Parameters
106 ///
107 /// * `addr` - The path to the Unix socket
108 ///
109 /// # Returns
110 ///
111 /// A future that resolves to a `Result` containing either the connected
112 /// async file descriptor or an I/O error.
113 fn connect_unix(
114 addr: &PathBuf,
115 ) -> impl Future<Output = io::Result<Self::AsyncFd<UnixStream>>> + Send;
116
117 /// Wrap a readable file object as an async handle
118 ///
119 /// The file descriptor will subscribe for read
120 /// to the runtime poller
121 ///
122 /// # Parameters
123 ///
124 /// * `fd` - The file descriptor to wrap
125 ///
126 /// # Returns
127 ///
128 /// A `Result` containing either the async file descriptor handle or
129 /// an I/O error.
130 ///
131 /// # Safety
132 ///
133 /// The file descriptor must be set to non-blocking mode before calling
134 /// this method.
135 fn to_async_fd_rd<T: AsRawFd + AsFd + Send + Sync + 'static>(
136 fd: T,
137 ) -> io::Result<Self::AsyncFd<T>>;
138
139 /// Wrap a readable/writable file object as an async handle.
140 ///
141 /// The file descriptor will subscribe for read + write
142 /// to the runtime poller
143 ///
144 /// # Parameters
145 ///
146 /// * `fd` - The file descriptor to wrap
147 ///
148 /// # Returns
149 ///
150 /// A `Result` containing either the async file descriptor handle or
151 /// an I/O error.
152 ///
153 /// # Safety
154 ///
155 /// The file descriptor must be set to non-blocking mode before calling
156 /// this method.
157 fn to_async_fd_rw<T: AsRawFd + AsFd + Send + Sync + 'static>(
158 fd: T,
159 ) -> io::Result<Self::AsyncFd<T>>;
160}
161
162/// Trait for async file descriptor operations.
163///
164/// This trait provides methods for performing async read and write operations
165/// on file descriptors.
166///
167/// # Type Parameters
168///
169/// * `T` - The underlying file descriptor type
170pub trait AsyncFd<T: AsRawFd + AsFd + Send + Sync + 'static>:
171 Send + Sync + 'static + Deref<Target = T>
172{
173 /// Perform an async read operation.
174 ///
175 /// This method executes the provided closure asynchronously, allowing
176 /// it to perform read operations on the underlying file descriptor.
177 ///
178 /// # Parameters
179 ///
180 /// * `f` - A closure that performs the actual read operation
181 ///
182 /// # Returns
183 ///
184 /// A future that resolves to the result of the read operation.
185 fn async_read<R>(
186 &self, f: impl FnMut(&T) -> io::Result<R> + Send,
187 ) -> impl Future<Output = io::Result<R>> + Send;
188
189 /// Perform an async write operation.
190 ///
191 /// This method executes the provided closure asynchronously, allowing
192 /// it to perform write operations on the underlying file descriptor.
193 ///
194 /// # Parameters
195 ///
196 /// * `f` - A closure that performs the actual write operation
197 ///
198 /// # Returns
199 ///
200 /// A future that resolves to the result of the write operation.
201 fn async_write<R>(
202 &self, f: impl FnMut(&T) -> io::Result<R> + Send,
203 ) -> impl Future<Output = io::Result<R>> + Send;
204}
205
206impl<F: std::ops::Deref<Target = IO>, IO: AsyncIO> AsyncIO for F {
207 type AsyncFd<T: AsRawFd + AsFd + Send + Sync + 'static> = IO::AsyncFd<T>;
208
209 fn connect_tcp(
210 addr: &SocketAddr,
211 ) -> impl Future<Output = io::Result<Self::AsyncFd<TcpStream>>> + Send {
212 IO::connect_tcp(addr)
213 }
214
215 fn connect_unix(
216 addr: &PathBuf,
217 ) -> impl Future<Output = io::Result<Self::AsyncFd<UnixStream>>> + Send {
218 IO::connect_unix(addr)
219 }
220
221 fn to_async_fd_rd<T: AsRawFd + AsFd + Send + Sync + 'static>(
222 fd: T,
223 ) -> io::Result<Self::AsyncFd<T>> {
224 IO::to_async_fd_rd(fd)
225 }
226
227 fn to_async_fd_rw<T: AsRawFd + AsFd + Send + Sync + 'static>(
228 fd: T,
229 ) -> io::Result<Self::AsyncFd<T>> {
230 IO::to_async_fd_rw(fd)
231 }
232}
233
234/// AsyncRead trait for runtime adapter
235pub trait AsyncRead: Send {
236 /// Async version of read function
237 ///
238 /// On ok, return the bytes read
239 fn read(&mut self, buf: &mut [u8]) -> impl Future<Output = io::Result<usize>> + Send;
240
241 /// Read the exact number of bytes required to fill `buf`.
242 ///
243 /// This function repeatedly calls `read` until the buffer is completely filled.
244 ///
245 /// # Errors
246 ///
247 /// This function will return an error if the stream is closed before the
248 /// buffer is filled.
249 fn read_exact<'a>(
250 &'a mut self, mut buf: &'a mut [u8],
251 ) -> impl Future<Output = io::Result<()>> + Send + 'a {
252 async move {
253 while !buf.is_empty() {
254 match self.read(buf).await {
255 Ok(0) => break,
256 Ok(n) => {
257 let tmp = buf;
258 buf = &mut tmp[n..];
259 }
260 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
261 Err(e) => return Err(e),
262 }
263 }
264 if !buf.is_empty() {
265 Err(io::Error::new(io::ErrorKind::UnexpectedEof, "failed to fill whole buffer"))
266 } else {
267 Ok(())
268 }
269 }
270 }
271
272 /// Reads at least `min_len` bytes into `buf`.
273 ///
274 /// This function repeatedly calls `read` until at least `min_len` bytes have been
275 /// read. It is allowed to read more than `min_len` bytes, but not more than
276 /// the length of `buf`.
277 ///
278 /// # Returns
279 ///
280 /// On success, returns the total number of bytes read. This will be at least
281 /// `min_len`, and could be more, up to the length of `buf`.
282 ///
283 /// # Errors
284 ///
285 /// It will return an `UnexpectedEof` error if the stream is closed before at least `min_len` bytes have been read.
286 fn read_at_least<'a>(
287 &'a mut self, buf: &'a mut [u8], min_len: usize,
288 ) -> impl Future<Output = io::Result<usize>> + Send + 'a {
289 async move {
290 let mut total_read = 0;
291 while total_read < min_len && total_read < buf.len() {
292 match self.read(&mut buf[total_read..]).await {
293 Ok(0) => {
294 return Err(io::Error::new(
295 io::ErrorKind::UnexpectedEof,
296 "failed to read minimum number of bytes",
297 ));
298 }
299 Ok(n) => total_read += n,
300 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
301 Err(e) => return Err(e),
302 };
303 }
304 Ok(total_read)
305 }
306 }
307}
308
309/// AsyncWrite trait for runtime adapter
310pub trait AsyncWrite: Send {
311 /// Async version of write function
312 ///
313 /// On ok, return the bytes written
314 fn write(&mut self, buf: &[u8]) -> impl Future<Output = io::Result<usize>> + Send;
315
316 /// Write the entire buffer `buf`.
317 ///
318 /// This function repeatedly calls `write` until the entire buffer is written.
319 ///
320 /// # Errors
321 ///
322 /// This function will return an error if the stream is closed before the
323 /// entire buffer is written.
324 fn write_all<'a>(
325 &'a mut self, mut buf: &'a [u8],
326 ) -> impl Future<Output = io::Result<()>> + Send + 'a {
327 async move {
328 while !buf.is_empty() {
329 match self.write(buf).await {
330 Ok(0) => {
331 return Err(io::Error::new(
332 io::ErrorKind::WriteZero,
333 "failed to write whole buffer",
334 ));
335 }
336 Ok(n) => {
337 buf = &buf[n..];
338 }
339 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
340 Err(e) => return Err(e),
341 }
342 }
343 Ok(())
344 }
345 }
346}