use std::cell::{Cell, RefCell};
use std::future::Future;
use std::pin::pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
use parking::Parker;
use crate::reactor::Reactor;
static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0);
fn unparker() -> &'static parking::Unparker {
static UNPARKER: OnceLock<parking::Unparker> = OnceLock::new();
UNPARKER.get_or_init(|| {
let (parker, unparker) = parking::pair();
thread::Builder::new()
.name("async-io".to_string())
.spawn(move || main_loop(parker))
.expect("cannot spawn async-io thread");
unparker
})
}
pub(crate) fn init() {
let _ = unparker();
}
fn main_loop(parker: parking::Parker) {
#[cfg(feature = "tracing")]
let span = tracing::trace_span!("async_io::main_loop");
#[cfg(feature = "tracing")]
let _enter = span.enter();
let mut last_tick = 0;
let mut sleeps = 0u64;
loop {
let tick = Reactor::get().ticker();
if last_tick == tick {
let reactor_lock = if sleeps >= 10 {
Some(Reactor::get().lock())
} else {
Reactor::get().try_lock()
};
if let Some(mut reactor_lock) = reactor_lock {
#[cfg(feature = "tracing")]
tracing::trace!("waiting on I/O");
reactor_lock.react(None).ok();
last_tick = Reactor::get().ticker();
sleeps = 0;
}
} else {
last_tick = tick;
}
if BLOCK_ON_COUNT.load(Ordering::SeqCst) > 0 {
let delay_us = [50, 75, 100, 250, 500, 750, 1000, 2500, 5000]
.get(sleeps as usize)
.unwrap_or(&10_000);
#[cfg(feature = "tracing")]
tracing::trace!("sleeping for {} us", delay_us);
if parker.park_timeout(Duration::from_micros(*delay_us)) {
#[cfg(feature = "tracing")]
tracing::trace!("notified");
last_tick = Reactor::get().ticker();
sleeps = 0;
} else {
sleeps += 1;
}
}
}
}
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
#[cfg(feature = "tracing")]
let span = tracing::trace_span!("async_io::block_on");
#[cfg(feature = "tracing")]
let _enter = span.enter();
BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst);
let _guard = CallOnDrop(|| {
BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst);
unparker().unpark();
});
fn parker_and_waker() -> (Parker, Waker, Arc<AtomicBool>) {
let (p, u) = parking::pair();
let io_blocked = Arc::new(AtomicBool::new(false));
let waker = BlockOnWaker::create(io_blocked.clone(), u);
(p, waker, io_blocked)
}
thread_local! {
static CACHE: RefCell<(Parker, Waker, Arc<AtomicBool>)> = RefCell::new(parker_and_waker());
static IO_POLLING: Cell<bool> = const { Cell::new(false) };
}
struct BlockOnWaker {
io_blocked: Arc<AtomicBool>,
unparker: parking::Unparker,
}
impl BlockOnWaker {
fn create(io_blocked: Arc<AtomicBool>, unparker: parking::Unparker) -> Waker {
Waker::from(Arc::new(BlockOnWaker {
io_blocked,
unparker,
}))
}
}
impl std::task::Wake for BlockOnWaker {
fn wake_by_ref(self: &Arc<Self>) {
if self.unparker.unpark() {
if !IO_POLLING.with(Cell::get) && self.io_blocked.load(Ordering::SeqCst) {
Reactor::get().notify();
}
}
}
fn wake(self: Arc<Self>) {
self.wake_by_ref()
}
}
CACHE.with(|cache| {
let tmp_cached;
let tmp_fresh;
let (p, waker, io_blocked) = match cache.try_borrow_mut() {
Ok(cache) => {
tmp_cached = cache;
&*tmp_cached
}
Err(_) => {
tmp_fresh = parker_and_waker();
&tmp_fresh
}
};
let mut future = pin!(future);
let cx = &mut Context::from_waker(waker);
loop {
if let Poll::Ready(t) = future.as_mut().poll(cx) {
p.park_timeout(Duration::from_secs(0));
#[cfg(feature = "tracing")]
tracing::trace!("completed");
return t;
}
if p.park_timeout(Duration::from_secs(0)) {
#[cfg(feature = "tracing")]
tracing::trace!("notified");
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
IO_POLLING.with(|io| io.set(true));
let _guard = CallOnDrop(|| {
IO_POLLING.with(|io| io.set(false));
});
reactor_lock.react(Some(Duration::from_secs(0))).ok();
}
continue;
}
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
let start = Instant::now();
loop {
IO_POLLING.with(|io| io.set(true));
io_blocked.store(true, Ordering::SeqCst);
let _guard = CallOnDrop(|| {
IO_POLLING.with(|io| io.set(false));
io_blocked.store(false, Ordering::SeqCst);
});
if p.park_timeout(Duration::from_secs(0)) {
#[cfg(feature = "tracing")]
tracing::trace!("notified");
break;
}
#[cfg(feature = "tracing")]
tracing::trace!("waiting on I/O");
reactor_lock.react(None).ok();
if p.park_timeout(Duration::from_secs(0)) {
#[cfg(feature = "tracing")]
tracing::trace!("notified");
break;
}
if start.elapsed() > Duration::from_micros(500) {
#[cfg(feature = "tracing")]
tracing::trace!("stops hogging the reactor");
drop(reactor_lock);
unparker().unpark();
p.park();
break;
}
}
} else {
#[cfg(feature = "tracing")]
tracing::trace!("sleep until notification");
p.park();
}
}
})
}
struct CallOnDrop<F: Fn()>(F);
impl<F: Fn()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}