use core::{
cell::{
LazyCell,
UnsafeCell,
},
future::Future,
mem,
panic,
ptr::{
self,
NonNull,
},
task::{
Context,
Poll,
Waker,
},
};
use crate::{
future::pending_state,
io_uring::{
self,
QueueFullError,
},
op::Op,
task::{
Task,
TaskHeader,
},
};
pub(crate) struct Reactor {
io_uring: io_uring::Instance,
to_submit: u32,
tasks: usize,
}
#[thread_local]
pub(crate) static REACTOR: LazyCell<UnsafeCell<Reactor>> =
LazyCell::new(|| UnsafeCell::new(Reactor::new().unwrap()));
#[thread_local]
pub(crate) static TASK: UnsafeCell<Option<NonNull<TaskHeader>>> = UnsafeCell::new(None);
impl Reactor {
pub(crate) fn new() -> lx::Result<Self> {
let mut io_uring = io_uring::Instance::new(32768)?;
unsafe { io_uring.set_sq_mapping(&io_uring::IdentityMapping) };
Ok(Self {
io_uring,
to_submit: 0,
tasks: 0,
})
}
pub(crate) unsafe fn submit<O: Op>(
this: &UnsafeCell<Self>,
op: &mut O,
pending_state: *mut pending_state::Packed,
) -> lx::Result<()> {
loop {
let sqe = (*this.get()).io_uring.try_push(&io_uring::IdentityMapping);
match sqe {
Ok(sqe) => {
*sqe = unsafe { mem::zeroed() };
sqe.user_data = pending_state as u64;
op.fill_sqe(sqe);
break;
}
Err(QueueFullError) => (*this.get()).io_uring.start_push(),
}
}
(*this.get()).to_submit += 1;
Ok(())
}
pub(crate) fn cancel(
this: &UnsafeCell<Reactor>,
pending_state: *mut pending_state::Packed,
) -> lx::Result<()> {
loop {
let sqe = unsafe { (*this.get()).io_uring.try_push(&io_uring::IdentityMapping) };
match sqe {
Ok(sqe) => {
*sqe = unsafe { mem::zeroed() };
sqe.opcode = lx::IORING_OP_ASYNC_CANCEL;
sqe.user_data = 0;
sqe.addr = pending_state as u64;
break;
}
Err(QueueFullError) => unsafe { (*this.get()).io_uring.start_push() },
}
}
unsafe {
let this = &mut (*this.get());
this.io_uring.end_push();
let to_submit = mem::replace(&mut this.to_submit, 0) + 1;
this.io_uring.get_events(to_submit, 0)?;
this.io_uring.start_pop();
}
let mut cancelled = None;
while let Some(cqe) = unsafe { (*this.get()).io_uring.pop() } {
if cqe.user_data == 0 {
assert!(
cancelled.is_none(),
"multiple completion queue entries for cancellation operation"
);
let ret = cqe.ret;
if ret == 0 || ret == -i32::from(lx::ENOENT) {
cancelled = Some(true);
} else if ret == -i32::from(lx::EALREADY) {
cancelled = Some(false);
} else {
panic!("invalid return value for cancellation operation");
}
} else if cqe.user_data != pending_state as u64 {
Reactor::handle_cqe(this, cqe);
}
}
unsafe {
(*this.get()).io_uring.end_pop();
};
match cancelled {
Some(true) => return Ok(()),
Some(false) => {}
None => panic!("missing completion queue entry for cancellation operation"),
}
let mut done = false;
while !done {
unsafe {
let this = &mut (*this.get());
let to_submit = mem::replace(&mut this.to_submit, 0);
this.io_uring.get_events(to_submit, 1)?;
this.io_uring.start_pop();
}
while let Some(cqe) = unsafe { (*this.get()).io_uring.pop() } {
if cqe.user_data == pending_state as u64 {
done = true;
} else {
Reactor::handle_cqe(this, cqe);
}
}
unsafe {
(*this.get()).io_uring.end_pop();
};
}
Ok(())
}
pub(crate) fn react(this: &UnsafeCell<Reactor>) -> usize {
let mut count = 0;
unsafe {
(*this.get()).io_uring.start_pop();
}
while let Some(cqe) = unsafe { (*this.get()).io_uring.pop() } {
Reactor::handle_cqe(this, cqe);
count += 1;
}
unsafe {
(*this.get()).io_uring.end_pop();
}
count
}
pub(crate) fn enter(this: &UnsafeCell<Reactor>, min_complete: u32) -> lx::Result<()> {
unsafe {
let this = &mut (*this.get());
this.io_uring.end_push();
let to_submit = mem::replace(&mut this.to_submit, 0);
this.io_uring.get_events(to_submit, min_complete)?;
}
Ok(())
}
pub(crate) fn run(this: &UnsafeCell<Reactor>) -> lx::Result<()> {
loop {
while Reactor::react(this) > 0 {}
if unsafe { (*this.get()).tasks == 0 } {
break;
}
Reactor::enter(this, 1)?;
}
Ok(())
}
fn handle_cqe(this: &UnsafeCell<Reactor>, cqe: &lx::io_uring_cqe) {
let state = unsafe {
ptr::replace(
cqe.user_data as *mut pending_state::Packed,
pending_state::Packed::from(pending_state::Unpacked::Completed { ret: cqe.ret }),
)
};
let task = match pending_state::Unpacked::from(state) {
pending_state::Unpacked::Submitted { task, .. } => task,
_ => panic!("unexpected operation state"),
};
unsafe {
Reactor::poll_task(this, task);
}
}
unsafe fn poll_task(this: &UnsafeCell<Reactor>, task: NonNull<TaskHeader>) {
let old = mem::replace(unsafe { &mut *TASK.get() }, Some(task));
let waker = Waker::noop();
let mut cx = Context::from_waker(&waker);
let poll = TaskHeader::poll(task, &mut cx);
unsafe {
*TASK.get() = old;
}
match poll {
Poll::Ready(()) => {
(*this.get()).tasks -= 1;
}
Poll::Pending => {}
}
TaskHeader::decrement_ref_count(task);
}
pub(crate) fn spawn<F: Future<Output = ()> + 'static>(this: &UnsafeCell<Reactor>, future: F) {
unsafe { Self::spawn_unchecked(this, future) }
}
pub(crate) unsafe fn spawn_unchecked<F: Future<Output = ()>>(
this: &UnsafeCell<Reactor>,
future: F,
) {
unsafe {
(*this.get()).tasks += 1;
}
let ptr = Task::new(future);
unsafe { Reactor::poll_task(this, Task::header(ptr)) }
}
}