use std::{
cell::RefCell,
future::{ready, Future},
io,
rc::{Rc, Weak},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
};
use async_task::{Runnable, Task};
use compio_driver::{AsRawFd, Entry, OpCode, Proactor, ProactorBuilder, PushEntry, RawFd};
use compio_log::{debug, instrument};
use crossbeam_queue::SegQueue;
use futures_util::future::Either;
use smallvec::SmallVec;
pub(crate) mod op;
#[cfg(feature = "time")]
pub(crate) mod time;
#[cfg(feature = "time")]
use crate::runtime::time::{TimerFuture, TimerRuntime};
use crate::{
runtime::op::{OpFuture, OpRuntime},
BufResult, Key,
};
static RUNTIME_COUNTER: AtomicUsize = AtomicUsize::new(0);
pub(crate) struct RuntimeInner {
id: usize,
driver: RefCell<Proactor>,
runnables: Arc<SegQueue<Runnable>>,
op_runtime: RefCell<OpRuntime>,
#[cfg(feature = "time")]
timer_runtime: RefCell<TimerRuntime>,
}
impl RuntimeInner {
pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
Ok(Self {
id: RUNTIME_COUNTER.fetch_add(1, Ordering::AcqRel),
driver: RefCell::new(builder.build()?),
runnables: Arc::new(SegQueue::new()),
op_runtime: RefCell::default(),
#[cfg(feature = "time")]
timer_runtime: RefCell::new(TimerRuntime::new()),
})
}
pub fn id(&self) -> usize {
self.id
}
#[cfg(all(windows, feature = "event"))]
pub unsafe fn handle_for(&self, user_data: usize) -> io::Result<compio_driver::NotifyHandle> {
self.driver.borrow().handle_for(user_data)
}
unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> Task<F::Output> {
let runnables = self.runnables.clone();
let handle = self
.driver
.borrow()
.handle()
.expect("cannot create notify handle of the proactor");
let schedule = move |runnable| {
runnables.push(runnable);
handle.notify().ok();
};
let (runnable, task) = async_task::spawn_unchecked(future, schedule);
runnable.schedule();
task
}
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
let mut result = None;
unsafe { self.spawn_unchecked(async { result = Some(future.await) }) }.detach();
loop {
loop {
let next_task = self.runnables.pop();
if let Some(task) = next_task {
task.run();
} else {
break;
}
}
if let Some(result) = result.take() {
return result;
}
self.poll();
}
}
pub fn spawn<F: Future + 'static>(&self, future: F) -> Task<F::Output> {
unsafe { self.spawn_unchecked(future) }
}
pub fn attach(&self, fd: RawFd) -> io::Result<()> {
self.driver.borrow_mut().attach(fd)
}
pub fn submit_raw<T: OpCode + 'static>(&self, op: T) -> PushEntry<Key<T>, BufResult<usize, T>> {
self.driver
.borrow_mut()
.push(op)
.map_pending(|user_data| unsafe { Key::<T>::new(user_data) })
}
pub fn submit<T: OpCode + 'static>(&self, op: T) -> impl Future<Output = BufResult<usize, T>> {
match self.submit_raw(op) {
PushEntry::Pending(user_data) => Either::Left(OpFuture::new(user_data)),
PushEntry::Ready(res) => Either::Right(ready(res)),
}
}
#[cfg(feature = "time")]
pub fn create_timer(&self, delay: std::time::Duration) -> impl Future<Output = ()> {
let mut timer_runtime = self.timer_runtime.borrow_mut();
if let Some(key) = timer_runtime.insert(delay) {
Either::Left(TimerFuture::new(key))
} else {
Either::Right(std::future::ready(()))
}
}
pub fn cancel_op<T>(&self, user_data: Key<T>) {
self.driver.borrow_mut().cancel(*user_data);
self.op_runtime.borrow_mut().cancel(*user_data);
}
#[cfg(feature = "time")]
pub fn cancel_timer(&self, key: usize) {
self.timer_runtime.borrow_mut().cancel(key);
}
pub fn poll_task<T: OpCode>(
&self,
cx: &mut Context,
user_data: Key<T>,
) -> Poll<BufResult<usize, T>> {
instrument!(compio_log::Level::DEBUG, "poll_task", ?user_data,);
let mut op_runtime = self.op_runtime.borrow_mut();
if op_runtime.has_result(*user_data) {
debug!("has result");
let op = op_runtime.remove(*user_data);
let res = self
.driver
.borrow_mut()
.pop(&mut op.entry.into_iter())
.next()
.expect("the result should have come");
Poll::Ready(res.map_buffer(|op| unsafe { op.into_op::<T>() }))
} else {
debug!("update waker");
op_runtime.update_waker(*user_data, cx.waker().clone());
Poll::Pending
}
}
#[cfg(feature = "time")]
pub fn poll_timer(&self, cx: &mut Context, key: usize) -> Poll<()> {
instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);
let mut timer_runtime = self.timer_runtime.borrow_mut();
if timer_runtime.contains(key) {
debug!("pending");
timer_runtime.update_waker(key, cx.waker().clone());
Poll::Pending
} else {
debug!("ready");
Poll::Ready(())
}
}
fn poll(&self) {
instrument!(compio_log::Level::DEBUG, "poll");
#[cfg(not(feature = "time"))]
let timeout = None;
#[cfg(feature = "time")]
let timeout = self.timer_runtime.borrow().min_timeout();
debug!("timeout: {:?}", timeout);
let mut entries = SmallVec::<[Entry; 1024]>::new();
let mut driver = self.driver.borrow_mut();
match driver.poll(timeout, &mut entries) {
Ok(_) => {
debug!("poll driver ok, entries: {}", entries.len());
for entry in entries {
self.op_runtime
.borrow_mut()
.update_result(entry.user_data(), entry);
}
}
Err(e) => match e.kind() {
io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
debug!("expected error: {e}");
}
_ => panic!("{:?}", e),
},
}
#[cfg(feature = "time")]
self.timer_runtime.borrow_mut().wake();
}
}
impl AsRawFd for RuntimeInner {
fn as_raw_fd(&self) -> RawFd {
self.driver.borrow().as_raw_fd()
}
}
struct RuntimeContext {
depth: usize,
ptr: Weak<RuntimeInner>,
}
impl RuntimeContext {
pub fn new() -> Self {
Self {
depth: 0,
ptr: Weak::new(),
}
}
pub fn inc_depth(&mut self) -> usize {
let depth = self.depth;
self.depth += 1;
depth
}
pub fn dec_depth(&mut self) -> usize {
self.depth -= 1;
self.depth
}
pub fn set_runtime(&mut self, ptr: Weak<RuntimeInner>) -> Weak<RuntimeInner> {
std::mem::replace(&mut self.ptr, ptr)
}
pub fn upgrade_runtime(&self) -> Option<Runtime> {
self.ptr.upgrade().map(|inner| Runtime { inner })
}
}
thread_local! {
static CURRENT_RUNTIME: RefCell<RuntimeContext> = RefCell::new(RuntimeContext::new());
}
#[derive(Clone)]
pub struct Runtime {
inner: Rc<RuntimeInner>,
}
impl Runtime {
pub fn new() -> io::Result<Self> {
Self::builder().build()
}
pub fn builder() -> RuntimeBuilder {
RuntimeBuilder::new()
}
pub fn try_current() -> Option<Self> {
CURRENT_RUNTIME.with_borrow(|r| r.upgrade_runtime())
}
pub fn current() -> Self {
Self::try_current().expect("not in a compio runtime")
}
pub(crate) fn inner(&self) -> &RuntimeInner {
&self.inner
}
pub fn enter(&self) -> EnterGuard {
EnterGuard::new(self)
}
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
let guard = self.enter();
guard.block_on(future)
}
pub fn spawn<F: Future + 'static>(&self, future: F) -> Task<F::Output> {
self.inner.spawn(future)
}
pub fn attach(&self, fd: RawFd) -> io::Result<()> {
self.inner.attach(fd)
}
pub fn submit<T: OpCode + 'static>(&self, op: T) -> impl Future<Output = BufResult<usize, T>> {
self.inner.submit(op)
}
}
impl AsRawFd for Runtime {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
#[cfg(feature = "criterion")]
impl criterion::async_executor::AsyncExecutor for Runtime {
fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
self.block_on(future)
}
}
#[cfg(feature = "criterion")]
impl criterion::async_executor::AsyncExecutor for &Runtime {
fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
(**self).block_on(future)
}
}
#[derive(Debug, Clone)]
pub struct RuntimeBuilder {
proactor_builder: ProactorBuilder,
}
impl Default for RuntimeBuilder {
fn default() -> Self {
Self::new()
}
}
impl RuntimeBuilder {
pub fn new() -> Self {
Self {
proactor_builder: ProactorBuilder::new(),
}
}
pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self {
self.proactor_builder = builder;
self
}
pub fn build(&self) -> io::Result<Runtime> {
Ok(Runtime {
inner: Rc::new(RuntimeInner::new(&self.proactor_builder)?),
})
}
}
#[must_use]
pub struct EnterGuard<'a> {
runtime: &'a Runtime,
old_ptr: Weak<RuntimeInner>,
depth: usize,
}
impl<'a> EnterGuard<'a> {
fn new(runtime: &'a Runtime) -> Self {
let (old_ptr, depth) = CURRENT_RUNTIME.with_borrow_mut(|ctx| {
(
ctx.set_runtime(Rc::downgrade(&runtime.inner)),
ctx.inc_depth(),
)
});
Self {
runtime,
old_ptr,
depth,
}
}
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.runtime.inner.block_on(future)
}
}
#[cold]
fn panic_incorrect_drop_order() {
if !std::thread::panicking() {
panic!(
"`EnterGuard` values dropped out of order. Guards returned by `Runtime::enter()` must \
be dropped in the reverse order as they were acquired."
)
}
}
impl Drop for EnterGuard<'_> {
fn drop(&mut self) {
let depth = CURRENT_RUNTIME.with_borrow_mut(|ctx| {
ctx.set_runtime(std::mem::take(&mut self.old_ptr));
ctx.dec_depth()
});
if depth != self.depth {
panic_incorrect_drop_order()
}
}
}
pub fn spawn<F: Future + 'static>(future: F) -> Task<F::Output> {
Runtime::current().spawn(future)
}