use std::cell::UnsafeCell;
use std::io;
use std::mem::{self, MaybeUninit, drop as unlock, replace};
use std::panic::RefUnwindSafe;
use std::ptr::NonNull;
use std::sync::Mutex;
use std::task::{self, Poll};
use crate::asan;
use crate::io::BufId;
use crate::io_uring::cq::{Completion, MULTISHOT_TAG, SINGLESHOT_TAG};
use crate::io_uring::libc;
use crate::io_uring::sq::{QueueFull, Submission};
use crate::op::OpState;
use crate::{AsyncFd, OpPollResult, SubmissionQueue, debug_detail, get_mut, lock};
#[derive(Debug)]
pub(crate) struct State<T, R, A> {
data: NonNull<Data<T, R, A>>,
}
#[derive(Debug)]
#[repr(C)] struct Data<T, R, A> {
shared: Mutex<Shared<T>>,
tail: Tail<R, A>,
}
pub(super) type SingleShared = Mutex<Shared<Singleshot>>;
pub(super) type MultiShared = Mutex<Shared<Multishot>>;
#[derive(Debug)]
pub(super) struct Shared<T> {
status: Status<T>,
waker: Option<task::Waker>,
drop: unsafe fn(*mut ()),
}
#[derive(Debug)]
enum Status<T> {
NotStarted,
Running { results: T },
Done { results: T },
Dropped,
Complete,
}
#[derive(Debug)]
struct Tail<R, A> {
resources: UnsafeCell<MaybeUninit<R>>,
args: A,
}
impl<T, R, A> OpState for State<T, R, A> {
type Resources = R;
type Args = A;
fn new(resources: R, args: A) -> State<T, R, A> {
let data = Box::new(Data {
shared: Mutex::new(Shared {
status: Status::<T>::NotStarted,
waker: None,
drop: drop_state::<T, R, A>,
}),
tail: Tail {
resources: UnsafeCell::new(MaybeUninit::new(resources)),
args,
},
});
let data = unsafe { NonNull::new_unchecked(Box::into_raw(data)) };
State { data }
}
fn resources_mut(&mut self) -> Option<&mut Self::Resources> {
let data = unsafe { self.data.as_ref() };
if let Status::NotStarted = lock(&data.shared).status {
Some(unsafe {
self.data
.as_mut()
.tail
.resources
.get_mut()
.assume_init_mut()
})
} else {
None
}
}
fn args_mut(&mut self) -> Option<&mut Self::Args> {
let data = unsafe { self.data.as_ref() };
if let Status::NotStarted = lock(&data.shared).status {
Some(unsafe { &mut self.data.as_mut().tail.args })
} else {
None
}
}
unsafe fn drop(&mut self, sq: &SubmissionQueue) {
{
let mut shared = unsafe { lock(&self.data.as_ref().shared) };
if matches!(&shared.status, Status::Running { .. }) {
let user_data = self.data.expose_provenance().get() as u64;
if let Err(err) = sq.submissions().cancel(user_data) {
log::debug!("failed to cancel operation, will wait on result: {err}");
}
shared.status = Status::Dropped;
unlock(shared);
return;
}
unlock(shared);
}
unsafe { drop_state::<T, R, A>(self.data.as_ptr().cast()) };
}
fn reset(&mut self, resources: Self::Resources, args: Self::Args) {
let data = unsafe { &mut *self.data.as_ptr() };
let mut shared = lock(&data.shared);
assert!(matches!(shared.status, Status::Complete));
shared.status = Status::NotStarted;
data.tail.resources = UnsafeCell::new(MaybeUninit::new(resources));
data.tail.args = args;
drop(shared);
}
}
unsafe fn drop_state<T, R, A>(ptr: *mut ()) {
let ptr = ptr.cast::<Data<T, R, A>>();
{
let data = unsafe { &mut *ptr };
let shared = get_mut(&mut data.shared);
if !matches!(shared.status, Status::Complete) {
asan::unpoison(data.tail.resources.get());
unsafe { data.tail.resources.get_mut().assume_init_drop() }
}
}
mem::drop(unsafe { Box::<Data<T, R, A>>::from_raw(ptr) });
}
#[allow(private_bounds)]
impl<T: OpResult> Shared<T> {
pub(super) fn update(&mut self, completion: &Completion) -> StatusUpdate {
match &mut self.status {
Status::Running { results } | Status::Done { results } => {
let completion_result = CompletionResult {
result: completion.0.res,
flags: CompletionFlags(completion.0.flags),
};
let completion_flags = completion.0.flags;
results.update(completion_result, completion_flags);
let done = if completion.complete()
&& let Status::Running { results } | Status::Done { results } =
replace(&mut self.status, Status::Complete)
{
self.status = Status::Done { results };
true
} else {
false
};
if (done || T::IS_MULTISHOT)
&& let Some(waker) = self.waker.take()
{
StatusUpdate::Wake(waker)
} else {
StatusUpdate::Ok
}
}
Status::Dropped => {
if completion.complete() {
StatusUpdate::Drop { drop: self.drop }
} else {
StatusUpdate::Ok
}
}
Status::NotStarted | Status::Complete => unreachable!(),
}
}
}
#[derive(Debug)]
pub(super) enum StatusUpdate {
Ok,
Wake(task::Waker),
Drop { drop: unsafe fn(*mut ()) },
}
unsafe impl<T, R, A> Send for State<T, R, A> where Data<T, R, A>: Send {}
unsafe impl<T, R, A> Sync for State<T, R, A> where Data<T, R, A>: Sync {}
unsafe impl<R: Sync, A: Sync> Sync for Tail<R, A> {}
impl<R: RefUnwindSafe, A: RefUnwindSafe> RefUnwindSafe for Tail<R, A> {}
trait OpResult {
fn empty() -> Self;
fn update(&mut self, result: CompletionResult, completion_flags: u32);
const IS_MULTISHOT: bool;
fn next(&mut self) -> Option<CompletionResult>;
fn has_next(&self) -> bool {
false
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub(crate) struct CompletionResult {
flags: CompletionFlags,
result: i32,
}
impl CompletionResult {
pub(crate) fn check_result(self) -> io::Result<u32> {
if let Ok(result) = u32::try_from(self.result) {
Ok(result)
} else {
Err(io::Error::from_raw_os_error(-self.result))
}
}
}
#[derive(Copy, Clone, Eq, PartialEq)]
pub(crate) struct CompletionFlags(u32);
impl CompletionFlags {
pub(super) const fn empty() -> CompletionFlags {
CompletionFlags(0)
}
pub(super) const fn buf_id(self) -> Option<BufId> {
if self.0 & libc::IORING_CQE_F_BUFFER != 0 {
Some(BufId((self.0 >> libc::IORING_CQE_BUFFER_SHIFT) as u16))
} else {
None
}
}
}
debug_detail!(
impl bitset for CompletionFlags(u32),
libc::IORING_CQE_F_BUFFER,
libc::IORING_CQE_F_MORE,
libc::IORING_CQE_F_SOCK_NONEMPTY,
libc::IORING_CQE_F_NOTIF,
libc::IORING_CQE_F_BUF_MORE,
);
pub(super) type OpReturn = (CompletionFlags, u32);
#[derive(Debug)]
pub(crate) struct Singleshot(CompletionResult);
impl OpResult for Singleshot {
fn empty() -> Singleshot {
Singleshot(CompletionResult {
flags: CompletionFlags::empty(),
result: 0,
})
}
fn update(&mut self, result: CompletionResult, completion_flags: u32) {
if completion_flags & libc::IORING_CQE_F_NOTIF != 0 {
return;
}
self.0 = result;
}
const IS_MULTISHOT: bool = false;
fn next(&mut self) -> Option<CompletionResult> {
Some(self.0)
}
}
#[derive(Debug)]
pub(crate) struct Multishot(Vec<CompletionResult>);
impl OpResult for Multishot {
fn empty() -> Multishot {
Multishot(Vec::new())
}
fn update(&mut self, result: CompletionResult, _: u32) {
self.0.push(result);
}
const IS_MULTISHOT: bool = true;
fn next(&mut self) -> Option<CompletionResult> {
if self.0.is_empty() {
return None;
}
Some(self.0.remove(0))
}
fn has_next(&self) -> bool {
!self.0.is_empty()
}
}
pub(crate) trait Op {
type Output;
type Resources;
type Args;
fn fill_submission(
resources: &mut Self::Resources,
args: &mut Self::Args,
submission: &mut Submission,
);
fn map_ok(
sq: &SubmissionQueue,
resources: Self::Resources,
op_return: OpReturn,
) -> Self::Output;
fn fallback(
sq: &SubmissionQueue,
resources: Self::Resources,
err: io::Error,
) -> io::Result<Self::Output> {
_ = sq;
_ = resources;
_ = err;
Err(fallback(err))
}
}
impl<T: Op> crate::op::Op for T {
type Output = io::Result<T::Output>;
type Resources = T::Resources;
type Args = T::Args;
type State = State<Singleshot, T::Resources, T::Args>;
fn poll(
state: &mut Self::State,
ctx: &mut task::Context<'_>,
sq: &SubmissionQueue,
) -> Poll<Self::Output> {
poll(
sq,
state,
ctx,
|_, resources, args, submission| T::fill_submission(resources, args, submission),
T::map_ok,
T::fallback,
)
}
}
pub(crate) trait OpExtract: Op {
type ExtractOutput;
fn map_ok_extract(
sq: &SubmissionQueue,
resources: Self::Resources,
op_return: OpReturn,
) -> Self::ExtractOutput;
}
impl<T: Op + OpExtract> crate::op::OpExtract for T {
type ExtractOutput = io::Result<T::ExtractOutput>;
fn poll_extract(
state: &mut Self::State,
ctx: &mut task::Context<'_>,
sq: &SubmissionQueue,
) -> Poll<Self::ExtractOutput> {
poll(
sq,
state,
ctx,
|_, resources, args, submission| T::fill_submission(resources, args, submission),
T::map_ok_extract,
|_, _, err| Err(fallback(err)),
)
}
}
pub(crate) trait FdOp {
type Output;
type Resources;
type Args;
fn fill_submission(
fd: &AsyncFd,
resources: &mut Self::Resources,
args: &mut Self::Args,
submission: &mut Submission,
);
fn map_ok(fd: &AsyncFd, resources: Self::Resources, op_return: OpReturn) -> Self::Output;
}
impl<T: FdOp> crate::op::FdOp for T {
type Output = io::Result<T::Output>;
type Resources = T::Resources;
type Args = T::Args;
type State = State<Singleshot, T::Resources, T::Args>;
fn poll(
state: &mut Self::State,
ctx: &mut task::Context<'_>,
fd: &AsyncFd,
) -> Poll<Self::Output> {
poll(
fd,
state,
ctx,
T::fill_submission,
T::map_ok,
|_, _, err| Err(fallback(err)),
)
}
}
pub(crate) trait FdOpExtract: FdOp {
type ExtractOutput;
fn map_ok_extract(
fd: &AsyncFd,
resources: Self::Resources,
op_return: OpReturn,
) -> Self::ExtractOutput;
}
impl<T: FdOp + FdOpExtract> crate::op::FdOpExtract for T {
type ExtractOutput = io::Result<T::ExtractOutput>;
fn poll_extract(
state: &mut Self::State,
ctx: &mut task::Context<'_>,
fd: &AsyncFd,
) -> Poll<Self::ExtractOutput> {
poll(
fd,
state,
ctx,
T::fill_submission,
T::map_ok_extract,
|_, _, err| Err(fallback(err)),
)
}
}
pub(crate) trait FdIter {
type Output;
type Resources;
type Args;
fn fill_submission(
fd: &AsyncFd,
resources: &mut Self::Resources,
args: &mut Self::Args,
submission: &mut Submission,
);
fn map_next(fd: &AsyncFd, resources: &Self::Resources, op_return: OpReturn) -> Self::Output;
}
impl<T: FdIter> crate::op::FdIter for T {
type Output = io::Result<T::Output>;
type Resources = T::Resources;
type Args = T::Args;
type State = State<Multishot, T::Resources, T::Args>;
fn poll_next(
state: &mut Self::State,
ctx: &mut task::Context<'_>,
fd: &AsyncFd,
) -> Poll<Option<Self::Output>> {
poll_next(
fd,
state,
ctx,
T::fill_submission,
T::map_next,
|_, _, err| Err(fallback(err)),
)
}
}
fn poll<T, O, R, A, Out>(
target: &T,
state: &mut State<O, R, A>,
ctx: &mut task::Context<'_>,
fill_submission: impl Fn(&T, &mut R, &mut A, &mut Submission),
map_ok: impl Fn(&T, R, OpReturn) -> Out,
fallback: impl Fn(&T, R, io::Error) -> io::Result<Out>,
) -> Poll<io::Result<Out>>
where
T: OpTarget,
O: OpResult,
{
let read_resources = |resources_ptr: *mut R| unsafe { resources_ptr.read() };
poll_inner(
target,
state,
ctx,
fill_submission,
read_resources,
map_ok,
fallback,
)
}
fn poll_next<T, O, R, A, Out>(
target: &T,
state: &mut State<O, R, A>,
ctx: &mut task::Context<'_>,
fill_submission: impl Fn(&T, &mut R, &mut A, &mut Submission),
map_next: impl Fn(&T, &R, OpReturn) -> Out,
fallback: impl Fn(&T, &R, io::Error) -> io::Result<Out>,
) -> Poll<Option<io::Result<Out>>>
where
T: OpTarget,
O: OpResult,
{
let get_resources = |resources_ptr: *mut R| unsafe { &*resources_ptr };
poll_inner(
target,
state,
ctx,
fill_submission,
get_resources,
map_next,
fallback,
)
}
#[allow(clippy::needless_pass_by_ref_mut)] #[allow(clippy::too_many_lines)] fn poll_inner<T, O, R, R2, A, Ok, Res>(
target: &T,
state: &mut State<O, R, A>,
ctx: &mut task::Context<'_>,
fill_submission: impl Fn(&T, &mut R, &mut A, &mut Submission),
get_resources: impl Fn(*mut R) -> R2,
map_ok: impl Fn(&T, R2, OpReturn) -> Ok,
fallback: impl Fn(&T, R2, io::Error) -> io::Result<Ok>,
) -> Poll<Res>
where
T: OpTarget,
O: OpResult,
Res: OpPollResult<Ok>,
{
let data = unsafe { state.data.as_mut() };
let mut shared = lock(&data.shared);
loop {
match &mut shared.status {
Status::NotStarted => {
let submissions = target.sq().submissions();
let result = submissions.add(|submission| {
let resources = unsafe { data.tail.resources.get_mut().assume_init_mut() };
let args = &mut data.tail.args;
fill_submission(target, resources, args, submission);
target.set_flags(submission);
submission.0.user_data = state.data.expose_provenance().get() as u64;
if O::IS_MULTISHOT {
submission.0.user_data |= MULTISHOT_TAG as u64;
} else {
asan::poison(resources);
submission.0.user_data |= SINGLESHOT_TAG as u64;
}
});
match result {
Ok(()) => {
shared.waker = Some(ctx.waker().clone());
shared.status = Status::Running {
results: O::empty(),
};
unlock(shared);
}
Err(QueueFull) => {
unlock(shared);
submissions.wait_for_submission(ctx.waker().clone());
}
}
return Poll::Pending;
}
Status::Running { results } if O::IS_MULTISHOT => {
let Some(result) = results.next() else {
set_waker(&mut shared.waker, ctx.waker());
unlock(shared);
return Poll::Pending;
};
unlock(shared);
let res = match result.check_result() {
Ok(res) => res,
Err(err) => return Poll::Ready(Res::from_err(err)),
};
let resources = get_resources(data.tail.resources.get().cast::<R>());
let op_return = (result.flags, res);
return Poll::Ready(Res::from_ok(map_ok(target, resources, op_return)));
}
Status::Running { .. } => {
set_waker(&mut shared.waker, ctx.waker());
unlock(shared);
return Poll::Pending;
}
Status::Done { results } => {
let Some(result) = results.next() else {
assert!(O::IS_MULTISHOT);
shared.status = Status::Complete;
unlock(shared);
unsafe { data.tail.resources.get().cast::<R>().drop_in_place() }
return Poll::Ready(Res::done());
};
if !O::IS_MULTISHOT {
shared.status = Status::Complete;
asan::unpoison(data.tail.resources.get());
}
match result.check_result() {
Ok(res) => {
unlock(shared);
let resources = get_resources(data.tail.resources.get().cast::<R>());
let op_return = (result.flags, res);
return Poll::Ready(Res::from_ok(map_ok(target, resources, op_return)));
}
Err(ref err)
if matches!(err.raw_os_error(), Some(libc::EINTR | libc::ECANCELED)) =>
{
if O::IS_MULTISHOT {
assert!(
matches!(&shared.status, Status::Done { results } if !results.has_next())
);
} else {
assert!(matches!(shared.status, Status::Complete));
}
shared.status = Status::NotStarted;
}
Err(err) => {
unlock(shared);
let resources = get_resources(data.tail.resources.get().cast::<R>());
return Poll::Ready(Res::from_res(fallback(target, resources, err)));
}
}
}
Status::Dropped => {
unlock(shared);
unreachable!()
}
Status::Complete => {
unlock(shared);
panic!("polled Future after completion")
}
}
}
}
fn set_waker(waker: &mut Option<task::Waker>, w: &task::Waker) {
match waker {
Some(waker) if waker.will_wake(w) => { }
Some(waker) => waker.clone_from(w),
None => *waker = Some(w.clone()),
}
}
trait OpTarget {
fn sq(&self) -> &SubmissionQueue;
fn set_flags(&self, submission: &mut Submission);
}
impl OpTarget for AsyncFd {
fn sq(&self) -> &SubmissionQueue {
self.sq()
}
fn set_flags(&self, submission: &mut Submission) {
self.kind().use_flags(submission);
}
}
impl OpTarget for SubmissionQueue {
fn sq(&self) -> &SubmissionQueue {
self
}
fn set_flags(&self, _: &mut Submission) {
}
}
fn fallback(err: io::Error) -> io::Error {
match err.raw_os_error() {
Some(libc::EINVAL) => io::Error::new(
io::ErrorKind::Unsupported,
"operation not supported, please update your Linux kernel version",
),
_ => err,
}
}