#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(feature = "current_thread_id", feature(current_thread_id))]
#![allow(unused_features)]
#![warn(missing_docs)]
#![deny(rustdoc::broken_intra_doc_links)]
#![doc(
html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
)]
#![doc(
html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
)]
mod affinity;
mod attacher;
mod cancel;
mod future;
mod waker;
pub mod fd;
#[cfg(feature = "time")]
pub mod time;
use std::{
cell::RefCell,
collections::HashSet,
fmt::Debug,
future::Future,
io,
ops::Deref,
rc::Rc,
sync::Arc,
task::{Context, Poll, Waker},
time::Duration,
};
use compio_buf::{BufResult, IntoInner};
use compio_driver::{
AsRawFd, Cancel, DriverType, Extra, Key, OpCode, Proactor, ProactorBuilder, PushEntry, RawFd,
op::Asyncify,
};
pub use compio_driver::{BufferPool, ErrorExt};
use compio_executor::{Executor, ExecutorConfig};
pub use compio_executor::{JoinHandle, ResumeUnwind};
use compio_log::{debug, instrument};
use crate::affinity::bind_to_cpu_set;
#[cfg(feature = "time")]
use crate::time::{TimerFuture, TimerKey, TimerRuntime};
pub use crate::{attacher::*, cancel::CancelToken, future::*, waker::OptWaker};
scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
#[cold]
fn not_in_compio_runtime() -> ! {
panic!("not in a compio runtime")
}
pub struct RuntimeInner {
executor: Executor,
driver: RefCell<Proactor>,
#[cfg(feature = "time")]
timer_runtime: RefCell<TimerRuntime>,
}
#[derive(Clone)]
pub struct Runtime(Rc<RuntimeInner>);
impl Debug for Runtime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = f.debug_struct("Runtime");
s.field("executor", &self.0.executor)
.field("driver", &"...")
.field("scheduler", &"...");
#[cfg(feature = "time")]
s.field("timer_runtime", &"...");
s.finish()
}
}
impl Deref for Runtime {
type Target = RuntimeInner;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Runtime {
pub fn new() -> io::Result<Self> {
Self::builder().build()
}
pub fn builder() -> RuntimeBuilder {
RuntimeBuilder::new()
}
pub fn driver_type(&self) -> DriverType {
self.driver.borrow().driver_type()
}
pub fn try_with_current<T, F: FnOnce(&Self) -> T>(f: F) -> Result<T, F> {
if CURRENT_RUNTIME.is_set() {
Ok(CURRENT_RUNTIME.with(f))
} else {
Err(f)
}
}
pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
if CURRENT_RUNTIME.is_set() {
CURRENT_RUNTIME.with(f)
} else {
not_in_compio_runtime()
}
}
pub fn try_current() -> Option<Self> {
if CURRENT_RUNTIME.is_set() {
Some(CURRENT_RUNTIME.with(|r| r.clone()))
} else {
None
}
}
pub fn current() -> Self {
if CURRENT_RUNTIME.is_set() {
CURRENT_RUNTIME.with(|r| r.clone())
} else {
not_in_compio_runtime()
}
}
pub fn enter<T, F: FnOnce() -> T>(&self, f: F) -> T {
CURRENT_RUNTIME.set(self, f)
}
pub fn run(&self) -> bool {
self.executor.tick()
}
pub fn waker(&self) -> Waker {
self.driver.borrow().waker()
}
pub fn opt_waker(&self) -> Arc<OptWaker> {
OptWaker::new(self.waker())
}
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.enter(|| {
let opt_waker = self.opt_waker();
let waker = Waker::from(opt_waker.clone());
let mut context = Context::from_waker(&waker);
let mut future = std::pin::pin!(future);
loop {
if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
self.run();
return result;
}
let remaining_tasks = self.run() | opt_waker.reset();
if remaining_tasks {
self.poll_with(Some(Duration::ZERO));
} else {
self.poll();
}
}
})
}
pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
self.0.executor.spawn(future)
}
pub fn spawn_blocking<T: Send + 'static>(
&self,
f: impl (FnOnce() -> T) + Send + 'static,
) -> JoinHandle<T> {
let op = Asyncify::new(move || {
let res = f();
BufResult(Ok(0), res)
});
let submit = self.submit(op);
self.spawn(async move { submit.await.1.into_inner() })
}
pub fn attach(&self, fd: RawFd) -> io::Result<()> {
self.driver.borrow_mut().attach(fd)
}
fn submit_raw<T: OpCode + 'static>(
&self,
op: T,
extra: Option<Extra>,
) -> PushEntry<Key<T>, BufResult<usize, T>> {
let mut this = self.driver.borrow_mut();
match extra {
Some(e) => this.push_with_extra(op, e),
None => this.push(op),
}
}
fn default_extra(&self) -> Extra {
self.driver.borrow().default_extra()
}
pub fn submit<T: OpCode + 'static>(&self, op: T) -> Submit<T> {
Submit::new(self.clone(), op)
}
pub fn submit_multi<T: OpCode + 'static>(&self, op: T) -> SubmitMulti<T> {
SubmitMulti::new(self.clone(), op)
}
pub(crate) fn cancel<T: OpCode>(&self, key: Key<T>) {
self.driver.borrow_mut().cancel(key);
}
pub(crate) fn register_cancel<T: OpCode>(&self, key: &Key<T>) -> Cancel {
self.driver.borrow_mut().register_cancel(key)
}
pub(crate) fn cancel_token(&self, token: Cancel) -> bool {
self.driver.borrow_mut().cancel_token(token)
}
#[cfg(feature = "time")]
pub(crate) fn cancel_timer(&self, key: &TimerKey) {
self.timer_runtime.borrow_mut().cancel(key);
}
pub(crate) fn poll_task<T: OpCode>(
&self,
waker: &Waker,
key: Key<T>,
) -> PushEntry<Key<T>, BufResult<usize, T>> {
instrument!(compio_log::Level::DEBUG, "poll_task", ?key);
let mut driver = self.driver.borrow_mut();
driver.pop(key).map_pending(|k| {
driver.update_waker(&k, waker);
k
})
}
pub(crate) fn poll_task_with_extra<T: OpCode>(
&self,
waker: &Waker,
key: Key<T>,
) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
instrument!(compio_log::Level::DEBUG, "poll_task_with_extra", ?key);
let mut driver = self.driver.borrow_mut();
driver.pop_with_extra(key).map_pending(|k| {
driver.update_waker(&k, waker);
k
})
}
pub(crate) fn poll_multishot<T: OpCode>(
&self,
waker: &Waker,
key: &Key<T>,
) -> Option<BufResult<usize, Extra>> {
instrument!(compio_log::Level::DEBUG, "poll_multishot", ?key);
let mut driver = self.driver.borrow_mut();
if let Some(res) = driver.pop_multishot(key) {
return Some(res);
}
driver.update_waker(key, waker);
None
}
#[cfg(feature = "time")]
pub(crate) fn poll_timer(&self, cx: &mut Context, key: &TimerKey) -> Poll<()> {
instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);
let mut timer_runtime = self.timer_runtime.borrow_mut();
if timer_runtime.is_completed(key) {
debug!("ready");
Poll::Ready(())
} else {
debug!("pending");
timer_runtime.update_waker(key, cx.waker());
Poll::Pending
}
}
pub fn current_timeout(&self) -> Option<Duration> {
#[cfg(not(feature = "time"))]
let timeout = None;
#[cfg(feature = "time")]
let timeout = self.timer_runtime.borrow().min_timeout();
timeout
}
pub fn poll(&self) {
instrument!(compio_log::Level::DEBUG, "poll");
let timeout = self.current_timeout();
debug!("timeout: {:?}", timeout);
self.poll_with(timeout)
}
pub fn poll_with(&self, timeout: Option<Duration>) {
instrument!(compio_log::Level::DEBUG, "poll_with");
let mut driver = self.driver.borrow_mut();
match driver.poll(timeout) {
Ok(()) => {}
Err(e) => match e.kind() {
io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
debug!("expected error: {e}");
}
_ => panic!("{e:?}"),
},
}
#[cfg(feature = "time")]
self.timer_runtime.borrow_mut().wake();
}
pub fn buffer_pool(&self) -> io::Result<BufferPool> {
self.driver.borrow_mut().buffer_pool()
}
pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
self.driver.borrow_mut().register_files(fds)
}
pub fn unregister_files(&self) -> io::Result<()> {
self.driver.borrow_mut().unregister_files()
}
pub fn register_personality(&self) -> io::Result<u16> {
self.driver.borrow_mut().register_personality()
}
pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
self.driver.borrow_mut().unregister_personality(personality)
}
}
impl Drop for Runtime {
fn drop(&mut self) {
if Rc::strong_count(&self.0) > 1 {
return;
}
self.enter(|| {
self.executor.clear();
})
}
}
impl AsRawFd for Runtime {
fn as_raw_fd(&self) -> RawFd {
self.driver.borrow().as_raw_fd()
}
}
#[cfg(feature = "criterion")]
impl criterion::async_executor::AsyncExecutor for Runtime {
fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
self.block_on(future)
}
}
#[cfg(feature = "criterion")]
impl criterion::async_executor::AsyncExecutor for &Runtime {
fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
(**self).block_on(future)
}
}
#[derive(Debug, Clone)]
pub struct RuntimeBuilder {
proactor_builder: ProactorBuilder,
thread_affinity: HashSet<usize>,
sync_queue_size: usize,
local_queue_size: usize,
event_interval: u32,
}
impl Default for RuntimeBuilder {
fn default() -> Self {
Self::new()
}
}
impl RuntimeBuilder {
pub fn new() -> Self {
Self {
proactor_builder: ProactorBuilder::new(),
event_interval: 61,
sync_queue_size: 64,
local_queue_size: 64,
thread_affinity: HashSet::new(),
}
}
pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self {
self.proactor_builder = builder;
self
}
pub fn thread_affinity(&mut self, cpus: HashSet<usize>) -> &mut Self {
self.thread_affinity = cpus;
self
}
pub fn event_interval(&mut self, val: usize) -> &mut Self {
self.event_interval = val as _;
self
}
pub fn sync_queue_size(&mut self, val: usize) -> &mut Self {
self.sync_queue_size = val;
self
}
pub fn local_queue_size(&mut self, val: usize) -> &mut Self {
self.local_queue_size = val;
self
}
pub fn build(&self) -> io::Result<Runtime> {
let RuntimeBuilder {
proactor_builder,
thread_affinity,
sync_queue_size,
local_queue_size,
event_interval,
} = self;
if !thread_affinity.is_empty() {
bind_to_cpu_set(thread_affinity);
}
let driver = proactor_builder.build()?;
let executor = Executor::with_config(ExecutorConfig {
max_interval: *event_interval,
sync_queue_size: *sync_queue_size,
local_queue_size: *local_queue_size,
waker: Some(driver.waker()),
});
let inner = RuntimeInner {
executor,
driver: RefCell::new(driver),
#[cfg(feature = "time")]
timer_runtime: RefCell::new(TimerRuntime::new()),
};
Ok(Runtime(Rc::new(inner)))
}
}
pub fn spawn<F: Future + 'static>(future: F) -> JoinHandle<F::Output> {
Runtime::with_current(|r| r.spawn(future))
}
pub fn spawn_blocking<T: Send + 'static>(
f: impl (FnOnce() -> T) + Send + 'static,
) -> JoinHandle<T> {
Runtime::with_current(|r| r.spawn_blocking(f))
}
pub fn submit<T: OpCode + 'static>(op: T) -> Submit<T> {
Runtime::with_current(|r| r.submit(op))
}
pub fn submit_multi<T: OpCode + 'static>(op: T) -> SubmitMulti<T> {
Runtime::with_current(|r| r.submit_multi(op))
}
pub fn register_files(fds: &[RawFd]) -> io::Result<()> {
Runtime::with_current(|r| r.register_files(fds))
}
pub fn unregister_files() -> io::Result<()> {
Runtime::with_current(|r| r.unregister_files())
}
#[cfg(feature = "time")]
pub(crate) async fn create_timer(instant: std::time::Instant) {
let key = Runtime::with_current(|r| r.timer_runtime.borrow_mut().insert(instant));
if let Some(key) = key {
TimerFuture::new(key).await
}
}