orb/io/
mod.rs

1//! Asynchronous I/O traits and utilities.
2//!
3//! This module provides traits for performing asynchronous I/O operations
4//! in a runtime-agnostic way. It includes functionality for connecting to
5//! network services, working with file descriptors, and performing async
6//! read/write operations.
7//!
8//! Further more, we have abstract buffered I/O  with [AsyncBufRead], [AsyncBufWrite], and [AsyncBufStream]
9//!
10//! # Design Notes
11//!
12//! We choose to provide `async fn` style IO function instead of `poll_xxx` style functions, because:
13//!
14//! - `async-io` crate don't have `poll_xxx` interfaces
15//! - `poll_xxx` functions is pre-async-await stuff and difficult to use.
16//! - you can always make an async fn with `poll_xxx`
17//!
18//! We choose to abstract [AsyncFd] instead of stream, because:
19//! - All async stream can be converted between std version of stream
20//! - All types of files/streams and be converted between OS raw fd.
21//! - There's slight difference between tokio stream and async-io counterparts.
22//! - What we do here is just wrap any std blocking function with async poller when they are
23//! readable or writeable, similar with `async-io`, as a light-weight implementation.
24
25use std::future::Future;
26use std::io;
27use std::net::SocketAddr;
28use std::net::TcpStream;
29use std::ops::Deref;
30use std::os::fd::{AsFd, AsRawFd};
31use std::os::unix::net::UnixStream;
32use std::path::PathBuf;
33
34mod buf_io;
35pub use buf_io::{AsyncBufRead, AsyncBufStream, AsyncBufWrite};
36
37/// Helper macro to convert timeout errors to IO errors.
38///
39/// This macro is used internally to convert the `()` error returned by
40/// timeout functions into a proper `io::Error` with `TimedOut` kind.
41macro_rules! io_with_timeout {
42    ($IO: path, $timeout: expr, $f: expr) => {{
43        if $timeout == Duration::from_secs(0) {
44            $f.await
45        } else {
46            // the crate reference make this macro not exportable
47            match <$IO as crate::time::AsyncTime>::timeout($timeout, $f).await {
48                Ok(Ok(r)) => Ok(r),
49                Ok(Err(e)) => Err(e),
50                Err(_) => Err(io::ErrorKind::TimedOut.into()),
51            }
52        }
53    }};
54}
55pub(super) use io_with_timeout;
56
57/// Trait for async I/O operations.
58///
59/// This trait defines the interface for performing asynchronous I/O operations
60/// such as connecting to network services and converting file descriptors to
61/// async handles.
62///
63/// # Associated Types
64///
65/// * `AsyncFd` - The type used to represent async file descriptors
66pub trait AsyncIO {
67    /// The type used to represent async file descriptors.
68    ///
69    /// This associated type represents a wrapper around a file descriptor
70    /// that provides async read/write operations.
71    type AsyncFd<T: AsRawFd + AsFd + Send + Sync + 'static>: AsyncFd<T>;
72
73    /// Connect to a TCP address asynchronously.
74    ///
75    /// # NOTE
76    ///
77    /// This is for runtime implementation, for user should use [`TcpStream::<IO>::connect()`](crate::net::TcpStream) instead**.
78    ///
79    /// This method attempts to establish a TCP connection to the specified
80    /// address, returning an async file descriptor that can be used for
81    /// communication.
82    ///
83    /// # Parameters
84    ///
85    /// * `addr` - The socket address to connect to
86    ///
87    /// # Returns
88    ///
89    /// A future that resolves to a `Result` containing either the connected
90    /// async file descriptor or an I/O error.
91    fn connect_tcp(
92        addr: &SocketAddr,
93    ) -> impl Future<Output = io::Result<Self::AsyncFd<TcpStream>>> + Send;
94
95    /// Connect to a Unix socket address asynchronously.
96    ///
97    /// # NOTE
98    ///
99    /// This is for runtime implementation, for user should use [`UnixStream::<IO>::connect()`](crate::net::UnixStream) instead**.
100    ///
101    /// This method attempts to establish a Unix socket connection to the
102    /// specified path, returning an async file descriptor that can be used
103    /// for communication.
104    ///
105    /// # Parameters
106    ///
107    /// * `addr` - The path to the Unix socket
108    ///
109    /// # Returns
110    ///
111    /// A future that resolves to a `Result` containing either the connected
112    /// async file descriptor or an I/O error.
113    fn connect_unix(
114        addr: &PathBuf,
115    ) -> impl Future<Output = io::Result<Self::AsyncFd<UnixStream>>> + Send;
116
117    /// Wrap a readable file object as an async handle
118    ///
119    /// The file descriptor will subscribe for read
120    /// to the runtime poller
121    ///
122    /// # Parameters
123    ///
124    /// * `fd` - The file descriptor to wrap
125    ///
126    /// # Returns
127    ///
128    /// A `Result` containing either the async file descriptor handle or
129    /// an I/O error.
130    ///
131    /// # Safety
132    ///
133    /// The file descriptor must be set to non-blocking mode before calling
134    /// this method.
135    fn to_async_fd_rd<T: AsRawFd + AsFd + Send + Sync + 'static>(
136        fd: T,
137    ) -> io::Result<Self::AsyncFd<T>>;
138
139    /// Wrap a readable/writable file object as an async handle.
140    ///
141    /// The file descriptor will subscribe for read + write
142    /// to the runtime poller
143    ///
144    /// # Parameters
145    ///
146    /// * `fd` - The file descriptor to wrap
147    ///
148    /// # Returns
149    ///
150    /// A `Result` containing either the async file descriptor handle or
151    /// an I/O error.
152    ///
153    /// # Safety
154    ///
155    /// The file descriptor must be set to non-blocking mode before calling
156    /// this method.
157    fn to_async_fd_rw<T: AsRawFd + AsFd + Send + Sync + 'static>(
158        fd: T,
159    ) -> io::Result<Self::AsyncFd<T>>;
160}
161
162/// Trait for async file descriptor operations.
163///
164/// This trait provides methods for performing async read and write operations
165/// on file descriptors.
166///
167/// # Type Parameters
168///
169/// * `T` - The underlying file descriptor type
170pub trait AsyncFd<T: AsRawFd + AsFd + Send + Sync + 'static>:
171    Send + Sync + 'static + Deref<Target = T>
172{
173    /// Perform an async read operation.
174    ///
175    /// This method executes the provided closure asynchronously, allowing
176    /// it to perform read operations on the underlying file descriptor.
177    ///
178    /// # Parameters
179    ///
180    /// * `f` - A closure that performs the actual read operation
181    ///
182    /// # Returns
183    ///
184    /// A future that resolves to the result of the read operation.
185    fn async_read<R>(
186        &self, f: impl FnMut(&T) -> io::Result<R> + Send,
187    ) -> impl Future<Output = io::Result<R>> + Send;
188
189    /// Perform an async write operation.
190    ///
191    /// This method executes the provided closure asynchronously, allowing
192    /// it to perform write operations on the underlying file descriptor.
193    ///
194    /// # Parameters
195    ///
196    /// * `f` - A closure that performs the actual write operation
197    ///
198    /// # Returns
199    ///
200    /// A future that resolves to the result of the write operation.
201    fn async_write<R>(
202        &self, f: impl FnMut(&T) -> io::Result<R> + Send,
203    ) -> impl Future<Output = io::Result<R>> + Send;
204}
205
206impl<F: std::ops::Deref<Target = IO>, IO: AsyncIO> AsyncIO for F {
207    type AsyncFd<T: AsRawFd + AsFd + Send + Sync + 'static> = IO::AsyncFd<T>;
208
209    fn connect_tcp(
210        addr: &SocketAddr,
211    ) -> impl Future<Output = io::Result<Self::AsyncFd<TcpStream>>> + Send {
212        IO::connect_tcp(addr)
213    }
214
215    fn connect_unix(
216        addr: &PathBuf,
217    ) -> impl Future<Output = io::Result<Self::AsyncFd<UnixStream>>> + Send {
218        IO::connect_unix(addr)
219    }
220
221    fn to_async_fd_rd<T: AsRawFd + AsFd + Send + Sync + 'static>(
222        fd: T,
223    ) -> io::Result<Self::AsyncFd<T>> {
224        IO::to_async_fd_rd(fd)
225    }
226
227    fn to_async_fd_rw<T: AsRawFd + AsFd + Send + Sync + 'static>(
228        fd: T,
229    ) -> io::Result<Self::AsyncFd<T>> {
230        IO::to_async_fd_rw(fd)
231    }
232}
233
234/// AsyncRead trait for runtime adapter
235pub trait AsyncRead: Send {
236    /// Async version of read function
237    ///
238    /// On ok, return the bytes read
239    fn read(&mut self, buf: &mut [u8]) -> impl Future<Output = io::Result<usize>> + Send;
240
241    /// Read the exact number of bytes required to fill `buf`.
242    ///
243    /// This function repeatedly calls `read` until the buffer is completely filled.
244    ///
245    /// # Errors
246    ///
247    /// This function will return an error if the stream is closed before the
248    /// buffer is filled.
249    fn read_exact<'a>(
250        &'a mut self, mut buf: &'a mut [u8],
251    ) -> impl Future<Output = io::Result<()>> + Send + 'a {
252        async move {
253            while !buf.is_empty() {
254                match self.read(buf).await {
255                    Ok(0) => break,
256                    Ok(n) => {
257                        let tmp = buf;
258                        buf = &mut tmp[n..];
259                    }
260                    Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
261                    Err(e) => return Err(e),
262                }
263            }
264            if !buf.is_empty() {
265                Err(io::Error::new(io::ErrorKind::UnexpectedEof, "failed to fill whole buffer"))
266            } else {
267                Ok(())
268            }
269        }
270    }
271
272    /// Reads at least `min_len` bytes into `buf`.
273    ///
274    /// This function repeatedly calls `read` until at least `min_len` bytes have been
275    /// read. It is allowed to read more than `min_len` bytes, but not more than
276    /// the length of `buf`.
277    ///
278    /// # Returns
279    ///
280    /// On success, returns the total number of bytes read. This will be at least
281    /// `min_len`, and could be more, up to the length of `buf`.
282    ///
283    /// # Errors
284    ///
285    /// It will return an `UnexpectedEof` error if the stream is closed before at least `min_len` bytes have been read.
286    fn read_at_least<'a>(
287        &'a mut self, buf: &'a mut [u8], min_len: usize,
288    ) -> impl Future<Output = io::Result<usize>> + Send + 'a {
289        async move {
290            let mut total_read = 0;
291            while total_read < min_len && total_read < buf.len() {
292                match self.read(&mut buf[total_read..]).await {
293                    Ok(0) => {
294                        return Err(io::Error::new(
295                            io::ErrorKind::UnexpectedEof,
296                            "failed to read minimum number of bytes",
297                        ));
298                    }
299                    Ok(n) => total_read += n,
300                    Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
301                    Err(e) => return Err(e),
302                };
303            }
304            Ok(total_read)
305        }
306    }
307}
308
309/// AsyncWrite trait for runtime adapter
310pub trait AsyncWrite: Send {
311    /// Async version of write function
312    ///
313    /// On ok, return the bytes written
314    fn write(&mut self, buf: &[u8]) -> impl Future<Output = io::Result<usize>> + Send;
315
316    /// Write the entire buffer `buf`.
317    ///
318    /// This function repeatedly calls `write` until the entire buffer is written.
319    ///
320    /// # Errors
321    ///
322    /// This function will return an error if the stream is closed before the
323    /// entire buffer is written.
324    fn write_all<'a>(
325        &'a mut self, mut buf: &'a [u8],
326    ) -> impl Future<Output = io::Result<()>> + Send + 'a {
327        async move {
328            while !buf.is_empty() {
329                match self.write(buf).await {
330                    Ok(0) => {
331                        return Err(io::Error::new(
332                            io::ErrorKind::WriteZero,
333                            "failed to write whole buffer",
334                        ));
335                    }
336                    Ok(n) => {
337                        buf = &buf[n..];
338                    }
339                    Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
340                    Err(e) => return Err(e),
341                }
342            }
343            Ok(())
344        }
345    }
346}