use std::io;
use std::mem::MaybeUninit;
use std::task::{self, Poll};
use crate::kqueue::fd::OpKind;
use crate::op::OpState;
use crate::{AsyncFd, OpPollResult, SubmissionQueue};
#[derive(Debug)]
#[allow(private_bounds)] pub(crate) struct State<T: StateStatus, R, A> {
pub(super) status: T,
pub(super) resources: MaybeUninit<R>,
pub(super) args: A,
}
impl<T: StateStatus, R, A> OpState for State<T, R, A> {
type Resources = R;
type Args = A;
fn new(resources: Self::Resources, args: Self::Args) -> Self {
State {
status: T::new(),
resources: MaybeUninit::new(resources),
args,
}
}
fn resources_mut(&mut self) -> Option<&mut Self::Resources> {
if self.status.not_started() {
Some(unsafe { self.resources.assume_init_mut() })
} else {
None
}
}
fn args(&self) -> &Self::Args {
&self.args
}
fn args_mut(&mut self) -> Option<&mut Self::Args> {
if self.status.not_started() {
Some(&mut self.args)
} else {
None
}
}
unsafe fn drop(&mut self, _: &SubmissionQueue) {
}
fn reset(&mut self, resources: Self::Resources, args: Self::Args) {
assert!(self.status.complete());
*self = State::new(resources, args);
}
}
impl<T: StateStatus, R, A> Drop for State<T, R, A> {
fn drop(&mut self) {
if !self.status.complete() {
unsafe { self.resources.assume_init_drop() }
}
}
}
trait StateStatus {
fn new() -> Self;
fn not_started(&self) -> bool;
fn complete(&self) -> bool;
}
#[derive(Copy, Clone, Debug)]
pub(crate) enum Direct {
NotStarted,
Complete,
}
impl StateStatus for Direct {
fn new() -> Self {
Direct::NotStarted
}
fn not_started(&self) -> bool {
matches!(self, Direct::NotStarted)
}
fn complete(&self) -> bool {
matches!(self, Direct::Complete)
}
}
pub(crate) trait DirectOp {
type Output;
type Resources;
type Args: Copy;
fn run(
sq: &SubmissionQueue,
resources: Self::Resources,
args: Self::Args,
) -> io::Result<Self::Output>;
}
impl<T: DirectOp> crate::op::Op for T {
type Output = io::Result<T::Output>;
type Resources = T::Resources;
type Args = T::Args;
type State = State<Direct, T::Resources, T::Args>;
fn poll(
state: &mut Self::State,
_: &mut task::Context<'_>,
sq: &SubmissionQueue,
) -> Poll<Self::Output> {
match state.status {
Direct::NotStarted => {
state.status = Direct::Complete;
let resources = unsafe { state.resources.assume_init_read() };
Poll::Ready(T::run(sq, resources, state.args))
}
Direct::Complete => panic!("polled Future after completion"),
}
}
}
pub(crate) trait DirectOpExtract: DirectOp {
type ExtractOutput;
fn run_extract(
sq: &SubmissionQueue,
resources: Self::Resources,
args: Self::Args,
) -> io::Result<Self::ExtractOutput>;
}
impl<T: DirectOpExtract> crate::op::OpExtract for T {
type ExtractOutput = io::Result<<Self as DirectOpExtract>::ExtractOutput>;
fn poll_extract(
state: &mut Self::State,
_: &mut task::Context<'_>,
sq: &SubmissionQueue,
) -> Poll<Self::ExtractOutput> {
match state.status {
Direct::NotStarted => {
state.status = Direct::Complete;
let resources = unsafe { state.resources.assume_init_read() };
Poll::Ready(T::run_extract(sq, resources, state.args))
}
Direct::Complete => panic!("polled Future after completion"),
}
}
}
pub(crate) trait DirectFdOp {
type Output;
type Resources;
type Args: Copy;
fn run(fd: &AsyncFd, resources: Self::Resources, args: Self::Args) -> io::Result<Self::Output>;
}
macro_rules! impl_fd_op {
( $( $T: ident $( < $( $gen: ident ),+ > )? ),* ) => {
$(
impl $( < $( $gen ),+ > )? crate::op::FdOp for $T $( < $( $gen ),* > )?
where Self: $crate::kqueue::op::DirectFdOp,
{
type Output = ::std::io::Result<<Self as $crate::kqueue::op::DirectFdOp>::Output>;
type Resources = <Self as $crate::kqueue::op::DirectFdOp>::Resources;
type Args = <Self as $crate::kqueue::op::DirectFdOp>::Args;
type State = $crate::kqueue::op::State<$crate::kqueue::op::Direct, <Self as $crate::kqueue::op::DirectFdOp>::Resources, <Self as $crate::kqueue::op::DirectFdOp>::Args>;
fn poll(
state: &mut Self::State,
_: &mut ::std::task::Context<'_>,
fd: &$crate::AsyncFd,
) -> ::std::task::Poll<Self::Output> {
match state.status {
$crate::kqueue::op::Direct::NotStarted => {
state.status = $crate::kqueue::op::Direct::Complete;
let resources = unsafe { state.resources.assume_init_read() };
::std::task::Poll::Ready(Self::run(fd, resources, state.args))
},
$crate::kqueue::op::Direct::Complete => ::std::panic!("polled Future after completion"),
}
}
}
)*
};
}
pub(super) use impl_fd_op;
#[derive(Copy, Clone, Debug)]
pub(crate) enum Evented {
NotStarted,
ToSubmit,
Waiting,
Complete,
}
impl StateStatus for Evented {
fn new() -> Self {
Evented::NotStarted
}
fn not_started(&self) -> bool {
matches!(self, Evented::NotStarted)
}
fn complete(&self) -> bool {
matches!(self, Evented::Complete)
}
}
pub(crate) trait FdOp {
type Output;
type Resources;
type Args;
type OperationOutput;
fn setup(
fd: &AsyncFd,
resources: &mut Self::Resources,
args: &mut Self::Args,
) -> io::Result<()> {
_ = (fd, resources, args);
Ok(())
}
const OP_KIND: OpKind;
fn try_run(
fd: &AsyncFd,
resources: &mut Self::Resources,
args: &mut Self::Args,
) -> io::Result<Self::OperationOutput>;
fn map_ok(
fd: &AsyncFd,
resources: Self::Resources,
output: Self::OperationOutput,
) -> Self::Output;
}
impl<T: FdOp> crate::op::FdOp for T {
type Output = io::Result<T::Output>;
type Resources = T::Resources;
type Args = T::Args;
type State = State<Evented, T::Resources, T::Args>;
fn poll(
state: &mut Self::State,
ctx: &mut task::Context<'_>,
fd: &AsyncFd,
) -> Poll<Self::Output> {
poll::<T, _>(state, ctx, fd, T::map_ok)
}
}
pub(crate) trait FdOpExtract: FdOp {
type ExtractOutput;
fn map_ok_extract(
fd: &AsyncFd,
resources: Self::Resources,
output: Self::OperationOutput,
) -> Self::ExtractOutput;
}
impl<T: FdOpExtract> crate::op::FdOpExtract for T {
type ExtractOutput = io::Result<T::ExtractOutput>;
fn poll_extract(
state: &mut Self::State,
ctx: &mut task::Context<'_>,
fd: &AsyncFd,
) -> Poll<Self::ExtractOutput> {
poll::<T, _>(state, ctx, fd, T::map_ok_extract)
}
}
pub(crate) trait FdIter {
type Output;
type Resources;
type Args;
type OperationOutput;
fn setup(
fd: &AsyncFd,
resources: &mut Self::Resources,
args: &mut Self::Args,
) -> io::Result<()> {
_ = (fd, resources, args);
Ok(())
}
const OP_KIND: OpKind;
fn try_run(
fd: &AsyncFd,
resources: &mut Self::Resources,
args: &mut Self::Args,
) -> io::Result<Self::OperationOutput>;
fn next(resources: &Self::Resources, output: &Self::OperationOutput) -> Next;
fn map_next(
fd: &AsyncFd,
resources: &Self::Resources,
output: Self::OperationOutput,
) -> Self::Output;
}
pub(crate) enum Next {
TryRun,
Submit,
Complete,
}
impl<T: FdIter> crate::op::FdIter for T {
type Output = io::Result<T::Output>;
type Resources = T::Resources;
type Args = T::Args;
type State = State<Evented, T::Resources, T::Args>;
fn poll_next(
state: &mut Self::State,
ctx: &mut task::Context<'_>,
fd: &AsyncFd,
) -> Poll<Option<Self::Output>> {
let mut r = None;
poll_inner(
state,
ctx,
fd,
T::setup,
T::OP_KIND,
T::try_run,
|state, output| {
debug_assert!(!matches!(state.status, Evented::Complete));
let resources = unsafe { state.resources.assume_init_ref() };
match T::next(resources, output) {
Next::TryRun => {
state.status = Evented::Waiting;
resources
}
Next::Submit => {
state.status = Evented::ToSubmit;
resources
}
Next::Complete => {
state.status = Evented::Complete;
let resources = unsafe { state.resources.assume_init_read() };
r.insert(resources)
}
}
},
T::map_next,
|| None,
)
}
}
fn poll<T: FdOp, Out>(
state: &mut State<Evented, T::Resources, T::Args>,
ctx: &mut task::Context<'_>,
fd: &AsyncFd,
map_ok: impl FnOnce(&AsyncFd, T::Resources, T::OperationOutput) -> Out,
) -> Poll<io::Result<Out>> {
poll_inner(
state,
ctx,
fd,
T::setup,
T::OP_KIND,
T::try_run,
|state, _| {
debug_assert!(!matches!(state.status, Evented::Complete));
state.status = Evented::Complete;
unsafe { state.resources.assume_init_read() }
},
map_ok,
|| panic!("polled Future after completion"),
)
}
#[allow(clippy::needless_pass_by_ref_mut)] #[allow(clippy::too_many_arguments)]
fn poll_inner<'s, R, A, OpOut, R2, Ok, Res>(
state: &'s mut State<Evented, R, A>,
ctx: &mut task::Context<'_>,
fd: &AsyncFd,
setup: impl Fn(&AsyncFd, &mut R, &mut A) -> io::Result<()>,
op_kind: OpKind,
try_run: impl Fn(&AsyncFd, &mut R, &mut A) -> io::Result<OpOut>,
after_try_run: impl FnOnce(&'s mut State<Evented, R, A>, &OpOut) -> R2,
map_ok: impl FnOnce(&AsyncFd, R2, OpOut) -> Ok,
poll_complete: impl FnOnce() -> Res,
) -> Poll<Res>
where
Res: OpPollResult<Ok>,
R2: 's,
{
loop {
match &mut state.status {
Evented::NotStarted => {
let resources = unsafe { state.resources.assume_init_mut() };
if let Err(err) = setup(fd, resources, &mut state.args) {
return Poll::Ready(Res::from_err(err));
}
state.status = Evented::ToSubmit;
}
Evented::ToSubmit => {
let fd_state = fd.state();
let needs_register = {
let mut fd_state = fd_state.lock();
let needs_register = !fd_state.has_waiting_op(op_kind);
fd_state.add(op_kind, ctx.waker().clone());
needs_register
};
if needs_register {
fd.sq.submissions().add(|event| {
event.0.filter = match op_kind {
OpKind::Read => libc::EVFILT_READ,
OpKind::Write => libc::EVFILT_WRITE,
};
event.0.ident = fd.fd().cast_unsigned() as _;
event.0.udata = fd_state.as_udata();
});
}
state.status = Evented::Waiting;
return Poll::Pending;
}
Evented::Waiting => {
let resources = unsafe { state.resources.assume_init_mut() };
match try_run(fd, resources, &mut state.args) {
Ok(output) => {
let resources = after_try_run(state, &output);
return Poll::Ready(Res::from_ok(map_ok(fd, resources, output)));
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
state.status = Evented::ToSubmit;
}
Err(err) => {
state.status = Evented::Complete;
return Poll::Ready(Res::from_err(err));
}
}
}
Evented::Complete => return Poll::Ready(poll_complete()),
}
}
}