use std::io;
use std::mem::replace;
use std::task::{self, Poll};
use crate::kqueue::fd::OpKind;
use crate::op::OpState;
use crate::{AsyncFd, OpPollResult, SubmissionQueue};
#[derive(Debug)]
pub(crate) enum DirectState<R, A> {
NotStarted { resources: R, args: A },
Complete,
}
impl<R, A> OpState for DirectState<R, A> {
type Resources = R;
type Args = A;
fn new(resources: Self::Resources, args: Self::Args) -> Self {
DirectState::NotStarted { resources, args }
}
fn resources_mut(&mut self) -> Option<&mut Self::Resources> {
if let DirectState::NotStarted { resources, .. } = self {
Some(resources)
} else {
None
}
}
fn args_mut(&mut self) -> Option<&mut Self::Args> {
if let DirectState::NotStarted { args, .. } = self {
Some(args)
} else {
None
}
}
unsafe fn drop(&mut self, _: &SubmissionQueue) {
}
fn reset(&mut self, resources: Self::Resources, args: Self::Args) {
assert!(matches!(self, DirectState::Complete));
*self = Self::new(resources, args);
}
}
pub(crate) trait DirectOp {
type Output;
type Resources;
type Args;
fn run(
sq: &SubmissionQueue,
resources: Self::Resources,
args: Self::Args,
) -> io::Result<Self::Output>;
}
impl<T: DirectOp> crate::op::Op for T {
type Output = io::Result<T::Output>;
type Resources = T::Resources;
type Args = T::Args;
type State = DirectState<T::Resources, T::Args>;
fn poll(
state: &mut Self::State,
_: &mut task::Context<'_>,
sq: &SubmissionQueue,
) -> Poll<Self::Output> {
match replace(state, DirectState::Complete) {
DirectState::NotStarted { resources, args } => Poll::Ready(T::run(sq, resources, args)),
DirectState::Complete => panic!("polled Future after completion"),
}
}
}
pub(crate) trait DirectOpExtract: DirectOp {
type ExtractOutput;
fn run_extract(
sq: &SubmissionQueue,
resources: Self::Resources,
args: Self::Args,
) -> io::Result<Self::ExtractOutput>;
}
impl<T: DirectOpExtract> crate::op::OpExtract for T {
type ExtractOutput = io::Result<<Self as DirectOpExtract>::ExtractOutput>;
fn poll_extract(
state: &mut Self::State,
_: &mut task::Context<'_>,
sq: &SubmissionQueue,
) -> Poll<Self::ExtractOutput> {
match replace(state, DirectState::Complete) {
DirectState::NotStarted { resources, args } => {
Poll::Ready(T::run_extract(sq, resources, args))
}
DirectState::Complete => panic!("polled Future after completion"),
}
}
}
pub(crate) trait DirectFdOp {
type Output;
type Resources;
type Args;
fn run(fd: &AsyncFd, resources: Self::Resources, args: Self::Args) -> io::Result<Self::Output>;
}
macro_rules! impl_fd_op {
( $( $T: ident $( < $( $gen: ident ),+ > )? ),* ) => {
$(
impl $( < $( $gen ),+ > )? crate::op::FdOp for $T $( < $( $gen ),* > )?
where Self: $crate::kqueue::op::DirectFdOp,
{
type Output = ::std::io::Result<<Self as $crate::kqueue::op::DirectFdOp>::Output>;
type Resources = <Self as $crate::kqueue::op::DirectFdOp>::Resources;
type Args = <Self as $crate::kqueue::op::DirectFdOp>::Args;
type State = $crate::kqueue::op::DirectState<<Self as $crate::kqueue::op::DirectFdOp>::Resources, <Self as $crate::kqueue::op::DirectFdOp>::Args>;
fn poll(
state: &mut Self::State,
_: &mut ::std::task::Context<'_>,
fd: &$crate::AsyncFd,
) -> ::std::task::Poll<Self::Output> {
match ::std::mem::replace(state, $crate::kqueue::op::DirectState::Complete) {
$crate::kqueue::op::DirectState::NotStarted { resources, args } => ::std::task::Poll::Ready(Self::run(fd, resources, args)),
$crate::kqueue::op::DirectState::Complete => ::std::panic!("polled Future after completion"),
}
}
}
)*
};
}
pub(super) use impl_fd_op;
#[derive(Debug)]
pub(crate) enum EventedState<R, A> {
NotStarted { resources: R, args: A },
ToSubmit { resources: R, args: A },
Waiting { resources: R, args: A },
Complete,
}
impl<R, A> OpState for EventedState<R, A> {
type Resources = R;
type Args = A;
fn new(resources: Self::Resources, args: Self::Args) -> Self {
EventedState::NotStarted { resources, args }
}
fn resources_mut(&mut self) -> Option<&mut Self::Resources> {
if let EventedState::NotStarted { resources, .. } = self {
Some(resources)
} else {
None
}
}
fn args_mut(&mut self) -> Option<&mut Self::Args> {
if let EventedState::NotStarted { args, .. } = self {
Some(args)
} else {
None
}
}
unsafe fn drop(&mut self, _: &SubmissionQueue) {
}
fn reset(&mut self, resources: Self::Resources, args: Self::Args) {
assert!(matches!(self, EventedState::Complete));
*self = Self::new(resources, args);
}
}
pub(crate) trait FdOp {
type Output;
type Resources;
type Args;
type OperationOutput;
fn setup(
fd: &AsyncFd,
resources: &mut Self::Resources,
args: &mut Self::Args,
) -> io::Result<()> {
_ = (fd, resources, args);
Ok(())
}
const OP_KIND: OpKind;
fn try_run(
fd: &AsyncFd,
resources: &mut Self::Resources,
args: &mut Self::Args,
) -> io::Result<Self::OperationOutput>;
fn map_ok(
fd: &AsyncFd,
resources: Self::Resources,
output: Self::OperationOutput,
) -> Self::Output;
}
impl<T: FdOp> crate::op::FdOp for T {
type Output = io::Result<T::Output>;
type Resources = T::Resources;
type Args = T::Args;
type State = EventedState<T::Resources, T::Args>;
fn poll(
state: &mut Self::State,
ctx: &mut task::Context<'_>,
fd: &AsyncFd,
) -> Poll<Self::Output> {
poll::<T, _>(state, ctx, fd, T::map_ok)
}
}
pub(crate) trait FdOpExtract: FdOp {
type ExtractOutput;
fn map_ok_extract(
fd: &AsyncFd,
resources: Self::Resources,
output: Self::OperationOutput,
) -> Self::ExtractOutput;
}
impl<T: FdOpExtract> crate::op::FdOpExtract for T {
type ExtractOutput = io::Result<T::ExtractOutput>;
fn poll_extract(
state: &mut Self::State,
ctx: &mut task::Context<'_>,
fd: &AsyncFd,
) -> Poll<Self::ExtractOutput> {
poll::<T, _>(state, ctx, fd, T::map_ok_extract)
}
}
pub(crate) trait FdIter {
type Output;
type Resources;
type Args;
type OperationOutput;
fn setup(
fd: &AsyncFd,
resources: &mut Self::Resources,
args: &mut Self::Args,
) -> io::Result<()> {
_ = (fd, resources, args);
Ok(())
}
const OP_KIND: OpKind;
fn try_run(
fd: &AsyncFd,
resources: &mut Self::Resources,
args: &mut Self::Args,
) -> io::Result<Self::OperationOutput>;
fn is_complete(output: &Self::OperationOutput) -> bool {
_ = output;
false
}
fn map_next(
fd: &AsyncFd,
resources: &Self::Resources,
output: Self::OperationOutput,
) -> Self::Output;
}
impl<T: FdIter> crate::op::FdIter for T {
type Output = io::Result<T::Output>;
type Resources = T::Resources;
type Args = T::Args;
type State = EventedState<T::Resources, T::Args>;
fn poll_next(
state: &mut Self::State,
ctx: &mut task::Context<'_>,
fd: &AsyncFd,
) -> Poll<Option<Self::Output>> {
let mut r = None;
poll_inner(
state,
ctx,
fd,
T::setup,
T::OP_KIND,
T::try_run,
|state, output| {
if let EventedState::Waiting { resources, args } =
replace(state, EventedState::Complete)
{
if T::is_complete(output) {
return r.insert(resources);
}
*state = EventedState::ToSubmit { resources, args };
if let EventedState::ToSubmit { resources, .. } = &*state {
return resources;
}
}
unreachable!()
},
T::map_next,
|| None,
)
}
}
fn poll<T: FdOp, Out>(
state: &mut EventedState<T::Resources, T::Args>,
ctx: &mut task::Context<'_>,
fd: &AsyncFd,
map_ok: impl FnOnce(&AsyncFd, T::Resources, T::OperationOutput) -> Out,
) -> Poll<io::Result<Out>> {
poll_inner(
state,
ctx,
fd,
T::setup,
T::OP_KIND,
T::try_run,
|state, _| {
if let EventedState::Waiting { resources, .. } = replace(state, EventedState::Complete)
{
return resources;
}
unreachable!()
},
map_ok,
|| panic!("polled Future after completion"),
)
}
#[allow(clippy::needless_pass_by_ref_mut)] #[allow(clippy::too_many_arguments)]
fn poll_inner<'s, R, A, OpOut, R2, Ok, Res>(
state: &'s mut EventedState<R, A>,
ctx: &mut task::Context<'_>,
fd: &AsyncFd,
setup: impl Fn(&AsyncFd, &mut R, &mut A) -> io::Result<()>,
op_kind: OpKind,
try_run: impl Fn(&AsyncFd, &mut R, &mut A) -> io::Result<OpOut>,
after_try_run: impl FnOnce(&'s mut EventedState<R, A>, &OpOut) -> R2,
map_ok: impl FnOnce(&AsyncFd, R2, OpOut) -> Ok,
poll_complete: impl FnOnce() -> Res,
) -> Poll<Res>
where
Res: OpPollResult<Ok>,
R2: 's,
{
loop {
match state {
EventedState::NotStarted { resources, args } => {
if let Err(err) = setup(fd, resources, args) {
return Poll::Ready(Res::from_err(err));
}
if let EventedState::NotStarted { resources, args } =
replace(state, EventedState::Complete)
{
*state = EventedState::ToSubmit { resources, args };
}
}
EventedState::ToSubmit { .. } => {
let fd_state = fd.state();
let needs_register = {
let mut fd_state = fd_state.lock();
let needs_register = !fd_state.has_waiting_op(op_kind);
fd_state.add(op_kind, ctx.waker().clone());
needs_register
};
if needs_register {
fd.sq.submissions().add(|event| {
event.0.filter = match op_kind {
OpKind::Read => libc::EVFILT_READ,
OpKind::Write => libc::EVFILT_WRITE,
};
event.0.ident = fd.fd().cast_unsigned() as _;
event.0.udata = fd_state.as_udata();
});
}
if let EventedState::ToSubmit { resources, args } =
replace(state, EventedState::Complete)
{
*state = EventedState::Waiting { resources, args };
}
return Poll::Pending;
}
EventedState::Waiting { resources, args } => {
match try_run(fd, resources, args) {
Ok(output) => {
let resources = after_try_run(state, &output);
return Poll::Ready(Res::from_ok(map_ok(fd, resources, output)));
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
if let EventedState::Waiting { resources, args } =
replace(state, EventedState::Complete)
{
*state = EventedState::ToSubmit { resources, args };
}
}
Err(err) => {
*state = EventedState::Complete;
return Poll::Ready(Res::from_err(err));
}
}
}
EventedState::Complete => return Poll::Ready(poll_complete()),
}
}
}