pub(crate) mod clock;
pub(crate) mod task;
pub(crate) mod waker;
#[cfg(feature = "send")]
pub(crate) mod send;
use crate::time::Instant;
use crate::time::timer::{PendingTimer, PendingTimerHandlerEnum, Timer};
use clock::{AdvanceClock, AdvanceToNextWake, RuntimeClock};
use parking_lot::Mutex;
use std::cell::{Cell, RefCell};
use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::fmt::{Debug, Formatter};
use std::pin::{Pin, pin};
use std::rc::Rc;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant as StdInstant};
use task::{Task, WakeTask};
thread_local! {
static ACTIVE_RT: RefCell<Option<Runtime >> = const { RefCell::new(None) };
}
#[derive(Clone)]
pub struct Runtime {
inner: Rc<RuntimeInner>,
}
pub(crate) struct RuntimeInner {
pub(crate) clock: RuntimeClock,
pub(crate) advance_clock: Box<dyn AdvanceClock>,
next_task_id: Cell<u64>,
task_wakes_since_last_advance_clock: Arc<Mutex<Vec<WakeTask>>>,
pub(crate) pending_timers_since_last_advance_clock: Arc<Mutex<Vec<Arc<PendingTimer>>>>,
ready_to_poll_tasks: RefCell<VecDeque<Task>>,
pending_timers: RefCell<BinaryHeap<Arc<PendingTimer>>>,
wakers_by_timer_id: RefCell<HashMap<u64, Vec<Waker>>>,
blocked_tasks_by_id: RefCell<HashMap<u64, Task>>,
}
impl RuntimeInner {
pub(crate) fn get_next_id(&self) -> u64 {
let next_id = self.next_task_id.get();
self.next_task_id.set(next_id + 1);
next_id
}
}
impl Debug for Runtime {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Rt")
}
}
impl Default for Runtime {
fn default() -> Self {
Self::new(Box::new(AdvanceToNextWake))
}
}
impl Runtime {
pub fn new(advance_clock: Box<dyn AdvanceClock>) -> Self {
let now = StdInstant::now();
Self {
inner: Rc::new(RuntimeInner {
clock: RuntimeClock {
now: Arc::new(Mutex::new(now)),
},
advance_clock,
next_task_id: Default::default(),
task_wakes_since_last_advance_clock: Arc::default(),
pending_timers_since_last_advance_clock: Arc::default(),
ready_to_poll_tasks: Default::default(),
pending_timers: Default::default(),
wakers_by_timer_id: Default::default(),
blocked_tasks_by_id: Default::default(),
}),
}
}
pub fn new_timer(&self, deadline: StdInstant) -> Timer {
Timer::new(&self.inner, deadline)
}
pub fn sleep(&self, duration: Duration) -> Timer {
let deadline = self.now() + duration;
self.new_timer(deadline)
}
pub fn sleep_until(&self, deadline: Instant) -> Timer {
self.new_timer(deadline.0)
}
pub fn spawn<T: Future<Output = ()> + 'static>(&self, future: T) {
self.spawn_boxed(Box::pin(future))
}
pub fn spawn_boxed(&self, future: Pin<Box<dyn Future<Output = ()> + 'static>>) {
self.inner
.ready_to_poll_tasks
.borrow_mut()
.push_back(Task { future });
}
pub fn now(&self) -> std::time::Instant {
self.inner.clock.now()
}
pub fn active() -> Runtime {
let maybe_rt = ACTIVE_RT.with_borrow(Option::clone);
match maybe_rt {
Some(rt) => rt,
None => panic!("async runtime is not active in the current thread"),
}
}
fn register_active(&self) {
let previously_registered = ACTIVE_RT.replace(Some(self.clone()));
if previously_registered.is_some() {
panic!("Called `Rt::block_on` inside an active `Rt::block_on`");
}
}
fn unregister_active(&self) {
ACTIVE_RT.set(None);
}
pub fn block_on<T>(&self, f: impl Future<Output = T>) -> T {
self.register_active();
let mut f = pin!(f);
let ready = loop {
self.wake_tasks_since_last_advance_clock();
self.poll_tasks_until_all_blocked();
self.wake_tasks_since_last_advance_clock();
let mut cx = Context::from_waker(Waker::noop());
if let Poll::Ready(value) = f.as_mut().poll(&mut cx) {
break value;
}
self.set_timer_wakers_since_last_advance_clock();
if self.inner.ready_to_poll_tasks.borrow().is_empty() {
self.advance_clock();
}
};
self.unregister_active();
ready
}
fn wake_tasks_since_last_advance_clock(&self) {
let mut task_wakes = self.inner.task_wakes_since_last_advance_clock.lock();
for wake in task_wakes.drain(..) {
let Some(blocked) = self
.inner
.blocked_tasks_by_id
.borrow_mut()
.remove(&wake.task_id)
else {
continue;
};
self.inner
.ready_to_poll_tasks
.borrow_mut()
.push_back(blocked);
}
}
fn poll_tasks_until_all_blocked(&self) {
loop {
let Some(mut task) = self.inner.ready_to_poll_tasks.borrow_mut().pop_front() else {
break;
};
let task_id = self.inner.get_next_id();
let waker = Arc::new(waker::TaskWaker::new(
self.inner.task_wakes_since_last_advance_clock.clone(),
task_id,
));
let cx_waker = waker.clone().into_waker();
let mut cx = Context::from_waker(&cx_waker);
match task.future.as_mut().poll(&mut cx) {
Poll::Ready(()) => {
}
Poll::Pending => {
self.inner
.blocked_tasks_by_id
.borrow_mut()
.insert(task_id, task);
}
}
}
}
fn set_timer_wakers_since_last_advance_clock(&self) {
for pending in self
.inner
.pending_timers_since_last_advance_clock
.lock()
.drain(..)
{
self.inner
.wakers_by_timer_id
.borrow_mut()
.entry(pending.timer_id)
.or_default()
.push(pending.waker.clone());
self.inner.pending_timers.borrow_mut().push(pending);
}
}
fn advance_clock(&self) {
loop {
let blocked_tasks = self.inner.blocked_tasks_by_id.borrow().len();
let timer = match self.inner.pending_timers.borrow_mut().pop() {
Some(timer) => timer,
None => panic!(
"`block_on` is stuck: the task's future is pending and there is nothing else to do, nor timers waiting to advance. There are {blocked_tasks} blocked tasks."
),
};
let at_least_one_task_unblocked = self.handle_timer_elapsed(timer);
if at_least_one_task_unblocked {
let next_timer_ready = self
.inner
.pending_timers
.borrow()
.peek()
.is_some_and(|t| t.elapsed_at <= self.inner.clock.now());
if !next_timer_ready {
break;
}
}
}
}
fn handle_timer_elapsed(&self, timer: Arc<PendingTimer>) -> bool {
match timer.handler.as_enum() {
PendingTimerHandlerEnum::WakeWaitingTasks => {
let now = self.inner.clock.now();
if now < timer.elapsed_at {
let new_time = self
.inner
.advance_clock
.advance_clock(now, timer.elapsed_at);
if new_time < timer.elapsed_at {
panic!(
"`AdvanceClock` implementation returned an instant before the next wake is reached"
);
}
self.inner.clock.set_now(new_time);
}
let wakers = self
.inner
.wakers_by_timer_id
.borrow_mut()
.remove(&timer.timer_id)
.unwrap_or_default();
for waker in wakers {
waker.wake();
}
true
}
PendingTimerHandlerEnum::Ignore => {
false
}
PendingTimerHandlerEnum::CancelWaitingTasks => {
let wakers = self
.inner
.wakers_by_timer_id
.borrow_mut()
.remove(&timer.timer_id)
.unwrap_or_default();
for waker in wakers {
let is_noop_waker = waker.data().is_null();
if is_noop_waker {
continue;
}
let waker: Arc<waker::TaskWaker> = unsafe { Arc::from_raw(waker.data() as _) };
let task_id = waker.task_id();
std::mem::forget(waker);
self.inner.blocked_tasks_by_id.borrow_mut().remove(&task_id);
}
false
}
}
}
}