use std::future::Future;
#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))]
use crate::time::TimeDriver;
#[cfg(all(target_os = "linux", feature = "iouring"))]
use crate::IoUringDriver;
#[cfg(feature = "legacy")]
use crate::LegacyDriver;
use crate::{
driver::Driver,
scheduler::{LocalScheduler, TaskQueue},
task::{
new_task,
waker_fn::{dummy_waker, set_poll, should_poll},
JoinHandle,
},
time::driver::Handle as TimeHandle,
};
#[cfg(feature = "sync")]
thread_local! {
pub(crate) static DEFAULT_CTX: Context = Context {
thread_id: crate::utils::thread_id::DEFAULT_THREAD_ID,
unpark_cache: std::cell::RefCell::new(rustc_hash::FxHashMap::default()),
waker_sender_cache: std::cell::RefCell::new(rustc_hash::FxHashMap::default()),
tasks: Default::default(),
time_handle: None,
blocking_handle: crate::blocking::BlockingHandle::Empty(crate::blocking::BlockingStrategy::Panic),
};
}
scoped_thread_local!(pub(crate) static CURRENT: Context);
pub(crate) struct Context {
pub(crate) tasks: TaskQueue,
pub(crate) thread_id: usize,
#[cfg(feature = "sync")]
pub(crate) unpark_cache:
std::cell::RefCell<rustc_hash::FxHashMap<usize, crate::driver::UnparkHandle>>,
#[cfg(feature = "sync")]
pub(crate) waker_sender_cache:
std::cell::RefCell<rustc_hash::FxHashMap<usize, flume::Sender<std::task::Waker>>>,
pub(crate) time_handle: Option<TimeHandle>,
#[cfg(feature = "sync")]
pub(crate) blocking_handle: crate::blocking::BlockingHandle,
}
impl Context {
#[cfg(feature = "sync")]
pub(crate) fn new(blocking_handle: crate::blocking::BlockingHandle) -> Self {
let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
Self {
thread_id,
unpark_cache: std::cell::RefCell::new(rustc_hash::FxHashMap::default()),
waker_sender_cache: std::cell::RefCell::new(rustc_hash::FxHashMap::default()),
tasks: TaskQueue::default(),
time_handle: None,
blocking_handle,
}
}
#[cfg(not(feature = "sync"))]
pub(crate) fn new() -> Self {
let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
Self {
thread_id,
tasks: TaskQueue::default(),
time_handle: None,
}
}
#[allow(unused)]
#[cfg(feature = "sync")]
pub(crate) fn unpark_thread(&self, id: usize) {
use crate::driver::{thread::get_unpark_handle, unpark::Unpark};
if let Some(handle) = self.unpark_cache.borrow().get(&id) {
handle.unpark();
return;
}
if let Some(v) = get_unpark_handle(id) {
let w = v.clone();
self.unpark_cache.borrow_mut().insert(id, w);
v.unpark();
}
}
#[allow(unused)]
#[cfg(feature = "sync")]
pub(crate) fn send_waker(&self, id: usize, w: std::task::Waker) {
use crate::driver::thread::get_waker_sender;
if let Some(sender) = self.waker_sender_cache.borrow().get(&id) {
let _ = sender.send(w);
return;
}
if let Some(s) = get_waker_sender(id) {
let _ = s.send(w);
self.waker_sender_cache.borrow_mut().insert(id, s);
}
}
}
pub struct Runtime<D> {
pub(crate) context: Context,
pub(crate) driver: D,
}
impl<D> Runtime<D> {
pub(crate) fn new(context: Context, driver: D) -> Self {
Self { context, driver }
}
pub fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future,
D: Driver,
{
assert!(
!CURRENT.is_set(),
"Can not start a runtime inside a runtime"
);
let waker = dummy_waker();
let cx = &mut std::task::Context::from_waker(&waker);
self.driver.with(|| {
CURRENT.set(&self.context, || {
#[cfg(feature = "sync")]
let join = unsafe { spawn_without_static(future) };
#[cfg(not(feature = "sync"))]
let join = future;
let mut join = std::pin::pin!(join);
set_poll();
loop {
loop {
let mut max_round = self.context.tasks.len() * 2;
while let Some(t) = self.context.tasks.pop() {
t.run();
if max_round == 0 {
break;
} else {
max_round -= 1;
}
}
while should_poll() {
if let std::task::Poll::Ready(t) = join.as_mut().poll(cx) {
return t;
}
}
if self.context.tasks.is_empty() {
break;
}
let _ = self.driver.submit();
}
#[cfg(not(all(debug_assertions, feature = "debug")))]
let _ = self.driver.park();
#[cfg(all(debug_assertions, feature = "debug"))]
if let Err(e) = self.driver.park() {
trace!("park error: {:?}", e);
}
}
})
})
}
}
#[cfg(feature = "legacy")]
pub enum FusionRuntime<#[cfg(all(target_os = "linux", feature = "iouring"))] L, R> {
#[cfg(all(target_os = "linux", feature = "iouring"))]
Uring(Runtime<L>),
Legacy(Runtime<R>),
}
#[cfg(all(target_os = "linux", feature = "iouring", not(feature = "legacy")))]
pub enum FusionRuntime<L> {
Uring(Runtime<L>),
}
#[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))]
impl<L, R> FusionRuntime<L, R>
where
L: Driver,
R: Driver,
{
pub fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future,
{
match self {
FusionRuntime::Uring(inner) => {
info!("Monoio is running with io_uring driver");
inner.block_on(future)
}
FusionRuntime::Legacy(inner) => {
info!("Monoio is running with legacy driver");
inner.block_on(future)
}
}
}
}
#[cfg(all(feature = "legacy", not(all(target_os = "linux", feature = "iouring"))))]
impl<R> FusionRuntime<R>
where
R: Driver,
{
pub fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future,
{
match self {
FusionRuntime::Legacy(inner) => inner.block_on(future),
}
}
}
#[cfg(all(not(feature = "legacy"), all(target_os = "linux", feature = "iouring")))]
impl<R> FusionRuntime<R>
where
R: Driver,
{
pub fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future,
{
match self {
FusionRuntime::Uring(inner) => inner.block_on(future),
}
}
}
#[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))]
impl From<Runtime<IoUringDriver>> for FusionRuntime<IoUringDriver, LegacyDriver> {
fn from(r: Runtime<IoUringDriver>) -> Self {
Self::Uring(r)
}
}
#[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))]
impl From<Runtime<TimeDriver<IoUringDriver>>>
for FusionRuntime<TimeDriver<IoUringDriver>, TimeDriver<LegacyDriver>>
{
fn from(r: Runtime<TimeDriver<IoUringDriver>>) -> Self {
Self::Uring(r)
}
}
#[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))]
impl From<Runtime<LegacyDriver>> for FusionRuntime<IoUringDriver, LegacyDriver> {
fn from(r: Runtime<LegacyDriver>) -> Self {
Self::Legacy(r)
}
}
#[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))]
impl From<Runtime<TimeDriver<LegacyDriver>>>
for FusionRuntime<TimeDriver<IoUringDriver>, TimeDriver<LegacyDriver>>
{
fn from(r: Runtime<TimeDriver<LegacyDriver>>) -> Self {
Self::Legacy(r)
}
}
#[cfg(all(feature = "legacy", not(all(target_os = "linux", feature = "iouring"))))]
impl From<Runtime<LegacyDriver>> for FusionRuntime<LegacyDriver> {
fn from(r: Runtime<LegacyDriver>) -> Self {
Self::Legacy(r)
}
}
#[cfg(all(feature = "legacy", not(all(target_os = "linux", feature = "iouring"))))]
impl From<Runtime<TimeDriver<LegacyDriver>>> for FusionRuntime<TimeDriver<LegacyDriver>> {
fn from(r: Runtime<TimeDriver<LegacyDriver>>) -> Self {
Self::Legacy(r)
}
}
#[cfg(all(target_os = "linux", feature = "iouring", not(feature = "legacy")))]
impl From<Runtime<IoUringDriver>> for FusionRuntime<IoUringDriver> {
fn from(r: Runtime<IoUringDriver>) -> Self {
Self::Uring(r)
}
}
#[cfg(all(target_os = "linux", feature = "iouring", not(feature = "legacy")))]
impl From<Runtime<TimeDriver<IoUringDriver>>> for FusionRuntime<TimeDriver<IoUringDriver>> {
fn from(r: Runtime<TimeDriver<IoUringDriver>>) -> Self {
Self::Uring(r)
}
}
pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
T: Future + 'static,
T::Output: 'static,
{
let (task, join) = new_task(
crate::utils::thread_id::get_current_thread_id(),
future,
LocalScheduler,
);
CURRENT.with(|ctx| {
ctx.tasks.push(task);
});
join
}
pub fn spawn_without_static<T>(future: T) -> JoinHandle<T::Output>
where
T: Future,
{
use crate::task::new_task_holding;
let (task, join) = new_task_holding(
crate::utils::thread_id::get_current_thread_id(),
future,
LocalScheduler,
);
CURRENT.with(|ctx| {
ctx.tasks.push(task);
});
join
}
#[cfg(test)]
mod tests {
#[cfg(all(feature = "sync", target_os = "linux", feature = "iouring"))]
#[test]
fn across_thread() {
use futures::channel::oneshot;
use crate::driver::IoUringDriver;
let (tx1, rx1) = oneshot::channel::<u8>();
let (tx2, rx2) = oneshot::channel::<u8>();
std::thread::spawn(move || {
let mut rt = crate::RuntimeBuilder::<IoUringDriver>::new()
.build()
.unwrap();
rt.block_on(async move {
let n = rx1.await.expect("unable to receive rx1");
assert!(tx2.send(n).is_ok());
});
});
let mut rt = crate::RuntimeBuilder::<IoUringDriver>::new()
.build()
.unwrap();
rt.block_on(async move {
assert!(tx1.send(24).is_ok());
assert_eq!(rx2.await.expect("unable to receive rx2"), 24);
});
}
#[cfg(all(target_os = "linux", feature = "iouring"))]
#[test]
fn timer() {
use crate::driver::IoUringDriver;
let mut rt = crate::RuntimeBuilder::<IoUringDriver>::new()
.enable_timer()
.build()
.unwrap();
let instant = std::time::Instant::now();
rt.block_on(async {
crate::time::sleep(std::time::Duration::from_millis(200)).await;
});
let eps = instant.elapsed().subsec_millis();
assert!((eps as i32 - 200).abs() < 50);
}
}