orb/io/
mod.rs

1//! Asynchronous I/O traits and utilities.
2//!
3//! This module provides traits for performing asynchronous I/O operations
4//! in a runtime-agnostic way. It includes functionality for connecting to
5//! network services, working with file descriptors, and performing async
6//! read/write operations.
7//!
8//! Further more, we have abstract buffered I/O  with [AsyncBufRead], [AsyncBufWrite], and [AsyncBufStream]
9//!
10//! # Design Notes
11//!
12//! We choose to provide `async fn` style IO function instead of `poll_xxx` style functions, because:
13//!
14//! - `async-io` crate don't have `poll_xxx` interfaces
15//! - `poll_xxx` functions is pre-async-await stuff and difficult to use.
16//! - you can always make an async fn with `poll_xxx`
17//!
18//! We choose to abstract [AsyncFd] instead of stream, because:
19//! - All async stream can be converted between std version of stream
20//! - All types of files/streams and be converted between OS raw fd.
21//! - There's slight difference between tokio stream and async-io counterparts.
22//! - What we do here is just wrap any std blocking function with async poller when they are
23//! readable or writeable, similar with `async-io`, as a light-weight implementation.
24
25use std::future::Future;
26use std::io;
27use std::net::SocketAddr;
28use std::net::TcpStream;
29use std::ops::Deref;
30use std::os::fd::{AsFd, AsRawFd};
31use std::os::unix::net::UnixStream;
32use std::path::PathBuf;
33
34mod buf_io;
35pub use buf_io::{AsyncBufRead, AsyncBufStream, AsyncBufWrite, AsyncRead, AsyncWrite};
36
37/// Helper macro to convert timeout errors to IO errors.
38///
39/// This macro is used internally to convert the `()` error returned by
40/// timeout functions into a proper `io::Error` with `TimedOut` kind.
41#[macro_export]
42macro_rules! io_with_timeout {
43    ($IO: path, $timeout: expr, $f: expr) => {{
44        if $timeout == Duration::from_secs(0) {
45            $f.await
46        } else {
47            // rust 2018 macro will replace crate name after export
48            match <$IO as crate::time::AsyncTime>::timeout($timeout, $f).await {
49                Ok(Ok(r)) => Ok(r),
50                Ok(Err(e)) => Err(e),
51                Err(_) => Err(io::ErrorKind::TimedOut.into()),
52            }
53        }
54    }};
55}
56
57/// Trait for async I/O operations.
58///
59/// This trait defines the interface for performing asynchronous I/O operations
60/// such as connecting to network services and converting file descriptors to
61/// async handles.
62///
63/// # Associated Types
64///
65/// * `AsyncFd` - The type used to represent async file descriptors
66pub trait AsyncIO: Send + Sync + 'static {
67    /// The type used to represent async file descriptors.
68    ///
69    /// This associated type represents a wrapper around a file descriptor
70    /// that provides async read/write operations.
71    type AsyncFd<T: AsRawFd + AsFd + Send + Sync + 'static>: AsyncFd<T>;
72
73    /// Connect to a TCP address asynchronously.
74    ///
75    /// # NOTE
76    ///
77    /// This is for runtime implementation, for user should use [`TcpStream::<IO>::connect()`](crate::net::TcpStream) instead**.
78    ///
79    /// This method attempts to establish a TCP connection to the specified
80    /// address, returning an async file descriptor that can be used for
81    /// communication.
82    ///
83    /// # Parameters
84    ///
85    /// * `addr` - The socket address to connect to
86    ///
87    /// # Returns
88    ///
89    /// A future that resolves to a `Result` containing either the connected
90    /// async file descriptor or an I/O error.
91    fn connect_tcp(
92        addr: &SocketAddr,
93    ) -> impl Future<Output = io::Result<Self::AsyncFd<TcpStream>>> + Send;
94
95    /// Connect to a Unix socket address asynchronously.
96    ///
97    /// # NOTE
98    ///
99    /// This is for runtime implementation, for user should use [`UnixStream::<IO>::connect()`](crate::net::UnixStream) instead**.
100    ///
101    /// This method attempts to establish a Unix socket connection to the
102    /// specified path, returning an async file descriptor that can be used
103    /// for communication.
104    ///
105    /// # Parameters
106    ///
107    /// * `addr` - The path to the Unix socket
108    ///
109    /// # Returns
110    ///
111    /// A future that resolves to a `Result` containing either the connected
112    /// async file descriptor or an I/O error.
113    fn connect_unix(
114        addr: &PathBuf,
115    ) -> impl Future<Output = io::Result<Self::AsyncFd<UnixStream>>> + Send;
116
117    /// Wrap a readable file object as an async handle
118    ///
119    /// The file descriptor will subscribe for read
120    /// to the runtime poller
121    ///
122    /// # Parameters
123    ///
124    /// * `fd` - The file descriptor to wrap
125    ///
126    /// # Returns
127    ///
128    /// A `Result` containing either the async file descriptor handle or
129    /// an I/O error.
130    ///
131    /// # Safety
132    ///
133    /// The file descriptor must be set to non-blocking mode before calling
134    /// this method.
135    fn to_async_fd_rd<T: AsRawFd + AsFd + Send + Sync + 'static>(
136        fd: T,
137    ) -> io::Result<Self::AsyncFd<T>>;
138
139    /// Wrap a readable/writable file object as an async handle.
140    ///
141    /// The file descriptor will subscribe for read + write
142    /// to the runtime poller
143    ///
144    /// # Parameters
145    ///
146    /// * `fd` - The file descriptor to wrap
147    ///
148    /// # Returns
149    ///
150    /// A `Result` containing either the async file descriptor handle or
151    /// an I/O error.
152    ///
153    /// # Safety
154    ///
155    /// The file descriptor must be set to non-blocking mode before calling
156    /// this method.
157    fn to_async_fd_rw<T: AsRawFd + AsFd + Send + Sync + 'static>(
158        fd: T,
159    ) -> io::Result<Self::AsyncFd<T>>;
160}
161
162/// Trait for async file descriptor operations.
163///
164/// This trait provides methods for performing async read and write operations
165/// on file descriptors.
166///
167/// # Type Parameters
168///
169/// * `T` - The underlying file descriptor type
170pub trait AsyncFd<T: AsRawFd + AsFd + Send + Sync + 'static>:
171    Send + Sync + 'static + Deref<Target = T>
172{
173    /// Perform an async read operation.
174    ///
175    /// This method executes the provided closure asynchronously, allowing
176    /// it to perform read operations on the underlying file descriptor.
177    ///
178    /// # Parameters
179    ///
180    /// * `f` - A closure that performs the actual read operation
181    ///
182    /// # Returns
183    ///
184    /// A future that resolves to the result of the read operation.
185    fn async_read<R>(
186        &self, f: impl FnMut(&T) -> io::Result<R> + Send,
187    ) -> impl Future<Output = io::Result<R>> + Send;
188
189    /// Perform an async write operation.
190    ///
191    /// This method executes the provided closure asynchronously, allowing
192    /// it to perform write operations on the underlying file descriptor.
193    ///
194    /// # Parameters
195    ///
196    /// * `f` - A closure that performs the actual write operation
197    ///
198    /// # Returns
199    ///
200    /// A future that resolves to the result of the write operation.
201    fn async_write<R>(
202        &self, f: impl FnMut(&T) -> io::Result<R> + Send,
203    ) -> impl Future<Output = io::Result<R>> + Send;
204}
205
206impl<F: std::ops::Deref<Target = IO> + Send + Sync + 'static, IO: AsyncIO> AsyncIO for F {
207    type AsyncFd<T: AsRawFd + AsFd + Send + Sync + 'static> = IO::AsyncFd<T>;
208
209    fn connect_tcp(
210        addr: &SocketAddr,
211    ) -> impl Future<Output = io::Result<Self::AsyncFd<TcpStream>>> + Send {
212        IO::connect_tcp(addr)
213    }
214
215    fn connect_unix(
216        addr: &PathBuf,
217    ) -> impl Future<Output = io::Result<Self::AsyncFd<UnixStream>>> + Send {
218        IO::connect_unix(addr)
219    }
220
221    fn to_async_fd_rd<T: AsRawFd + AsFd + Send + Sync + 'static>(
222        fd: T,
223    ) -> io::Result<Self::AsyncFd<T>> {
224        IO::to_async_fd_rd(fd)
225    }
226
227    fn to_async_fd_rw<T: AsRawFd + AsFd + Send + Sync + 'static>(
228        fd: T,
229    ) -> io::Result<Self::AsyncFd<T>> {
230        IO::to_async_fd_rw(fd)
231    }
232}