use std::os::unix::io::{AsRawFd, RawFd};
use std::{task::Context, task::Poll};
use std::io;
use mio::unix::SourceFd;
use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo};
use crate::util::slab;
pub struct AsyncFd<T: AsRawFd> {
handle: Handle,
shared: slab::Ref<ScheduledIo>,
inner: Option<T>,
}
impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_ref().unwrap().as_raw_fd()
}
}
impl<T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFd<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncFd")
.field("inner", &self.inner)
.finish()
}
}
const ALL_INTEREST: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE);
#[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"]
pub struct AsyncFdReadyGuard<'a, T: AsRawFd> {
async_fd: &'a AsyncFd<T>,
event: Option<ReadyEvent>,
}
impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReadyGuard")
.field("async_fd", &self.async_fd)
.finish()
}
}
impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
pub fn clear_ready(&mut self) {
if let Some(event) = self.event.take() {
self.async_fd.shared.clear_readiness(event);
}
}
pub fn retain_ready(&mut self) {
}
pub fn with_io<R>(&mut self, f: impl FnOnce() -> io::Result<R>) -> io::Result<R> {
let result = f();
if let Err(e) = result.as_ref() {
if e.kind() == io::ErrorKind::WouldBlock {
self.clear_ready();
}
}
result
}
pub fn with_poll<R>(&mut self, f: impl FnOnce() -> std::task::Poll<R>) -> std::task::Poll<R> {
let result = f();
if result.is_pending() {
self.clear_ready();
}
result
}
}
impl<T: AsRawFd> Drop for AsyncFd<T> {
fn drop(&mut self) {
if let Some(driver) = self.handle.inner() {
if let Some(inner) = self.inner.as_ref() {
let fd = inner.as_raw_fd();
let _ = driver.deregister_source(&mut SourceFd(&fd));
}
}
}
}
impl<T: AsRawFd> AsyncFd<T> {
pub fn new(inner: T) -> io::Result<Self>
where
T: AsRawFd,
{
Self::new_with_handle(inner, Handle::current())
}
pub(crate) fn new_with_handle(inner: T, handle: Handle) -> io::Result<Self> {
let fd = inner.as_raw_fd();
let shared = if let Some(inner) = handle.inner() {
inner.add_source(&mut SourceFd(&fd), ALL_INTEREST)?
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to find event loop",
));
};
Ok(AsyncFd {
handle,
shared,
inner: Some(inner),
})
}
#[inline]
pub fn get_ref(&self) -> &T {
self.inner.as_ref().unwrap()
}
#[inline]
pub fn get_mut(&mut self) -> &mut T {
self.inner.as_mut().unwrap()
}
pub fn into_inner(mut self) -> T {
self.inner.take().unwrap()
}
pub fn poll_read_ready<'a>(
&'a self,
cx: &mut Context<'_>,
) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
let event = ready!(self.shared.poll_readiness(cx, Direction::Read));
if !self.handle.is_alive() {
return Err(io::Error::new(
io::ErrorKind::Other,
"IO driver has terminated",
))
.into();
}
Ok(AsyncFdReadyGuard {
async_fd: self,
event: Some(event),
})
.into()
}
pub fn poll_write_ready<'a>(
&'a self,
cx: &mut Context<'_>,
) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
let event = ready!(self.shared.poll_readiness(cx, Direction::Write));
if !self.handle.is_alive() {
return Err(io::Error::new(
io::ErrorKind::Other,
"IO driver has terminated",
))
.into();
}
Ok(AsyncFdReadyGuard {
async_fd: self,
event: Some(event),
})
.into()
}
async fn readiness(&self, interest: mio::Interest) -> io::Result<AsyncFdReadyGuard<'_, T>> {
let event = self.shared.readiness(interest);
if !self.handle.is_alive() {
return Err(io::Error::new(
io::ErrorKind::Other,
"IO driver has terminated",
));
}
let event = event.await;
Ok(AsyncFdReadyGuard {
async_fd: self,
event: Some(event),
})
}
pub async fn readable(&self) -> io::Result<AsyncFdReadyGuard<'_, T>> {
self.readiness(mio::Interest::READABLE).await
}
pub async fn writable(&self) -> io::Result<AsyncFdReadyGuard<'_, T>> {
self.readiness(mio::Interest::WRITABLE).await
}
}