use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::net::TcpStream;
use std::ops::Deref;
use std::os::fd::{AsFd, AsRawFd};
use std::os::unix::net::UnixStream;
use std::path::Path;
mod buf_io;
pub use buf_io::{AsyncBufRead, AsyncBufStream, AsyncBufWrite};
macro_rules! io_with_timeout {
($IO: path, $timeout: expr, $f: expr) => {{
if $timeout == Duration::from_secs(0) {
$f.await
} else {
match <$IO as crate::time::AsyncTime>::timeout($timeout, $f).await {
Ok(Ok(r)) => Ok(r),
Ok(Err(e)) => Err(e),
Err(_) => Err(io::ErrorKind::TimedOut.into()),
}
}
}};
}
pub(super) use io_with_timeout;
pub trait AsyncIO {
type AsyncFd<T: AsRawFd + AsFd + Send + Sync + 'static>: AsyncFd<T>;
fn connect_tcp(
addr: &SocketAddr,
) -> impl Future<Output = io::Result<Self::AsyncFd<TcpStream>>> + Send;
fn connect_unix(
addr: &Path,
) -> impl Future<Output = io::Result<Self::AsyncFd<UnixStream>>> + Send;
fn to_async_fd_rd<T: AsRawFd + AsFd + Send + Sync + 'static>(
fd: T,
) -> io::Result<Self::AsyncFd<T>>;
fn to_async_fd_rw<T: AsRawFd + AsFd + Send + Sync + 'static>(
fd: T,
) -> io::Result<Self::AsyncFd<T>>;
}
pub trait AsyncFd<T: AsRawFd + AsFd + Send + Sync + 'static>:
Send + Sync + 'static + Deref<Target = T>
{
fn async_read<R>(
&self, f: impl FnMut(&T) -> io::Result<R> + Send,
) -> impl Future<Output = io::Result<R>> + Send;
fn async_write<R>(
&self, f: impl FnMut(&T) -> io::Result<R> + Send,
) -> impl Future<Output = io::Result<R>> + Send;
}
pub trait AsyncRead: Send {
fn read(&mut self, buf: &mut [u8]) -> impl Future<Output = io::Result<usize>> + Send;
fn read_exact<'a>(
&'a mut self, mut buf: &'a mut [u8],
) -> impl Future<Output = io::Result<()>> + Send + 'a {
async move {
while !buf.is_empty() {
match self.read(buf).await {
Ok(0) => break,
Ok(n) => {
let tmp = buf;
buf = &mut tmp[n..];
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
if !buf.is_empty() {
Err(io::Error::new(io::ErrorKind::UnexpectedEof, "failed to fill whole buffer"))
} else {
Ok(())
}
}
}
fn read_at_least<'a>(
&'a mut self, buf: &'a mut [u8], min_len: usize,
) -> impl Future<Output = io::Result<usize>> + Send + 'a {
async move {
let mut total_read = 0;
while total_read < min_len && total_read < buf.len() {
match self.read(&mut buf[total_read..]).await {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to read minimum number of bytes",
));
}
Ok(n) => total_read += n,
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
}
Ok(total_read)
}
}
}
pub trait AsyncWrite: Send {
fn write(&mut self, buf: &[u8]) -> impl Future<Output = io::Result<usize>> + Send;
fn write_all<'a>(
&'a mut self, mut buf: &'a [u8],
) -> impl Future<Output = io::Result<()>> + Send + 'a {
async move {
while !buf.is_empty() {
match self.write(buf).await {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
}
Ok(n) => {
buf = &buf[n..];
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
}
}