#![allow(clippy::cast_possible_truncation)]
use std::cell::RefCell;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::task::{Context, Poll, Wake, Waker};
use auralis_signal::Signal;
use crate::Priority;
type TaskId = u64;
pub trait ScheduleFlush {
fn schedule(&self, callback: Box<dyn FnOnce()>);
}
#[cfg(test)]
pub struct TestScheduleFlush;
#[cfg(test)]
impl ScheduleFlush for TestScheduleFlush {
fn schedule(&self, callback: Box<dyn FnOnce()>) {
callback();
}
}
pub trait TimeSource {
fn now_ms(&self) -> u64;
}
#[cfg(test)]
pub struct TestTimeSource {
now: std::cell::Cell<u64>,
}
#[cfg(test)]
impl TestTimeSource {
#[must_use]
pub fn new(initial_ms: u64) -> Self {
Self {
now: std::cell::Cell::new(initial_ms),
}
}
pub fn set(&self, ms: u64) {
self.now.set(ms);
}
pub fn advance(&self, ms: u64) {
self.now.set(self.now.get() + ms);
}
}
#[cfg(test)]
impl TimeSource for TestTimeSource {
fn now_ms(&self) -> u64 {
self.now.get()
}
}
struct TaskWaker {
task_id: TaskId,
priority: Priority,
}
impl Wake for TaskWaker {
fn wake(self: Arc<Self>) {
let maybe_sched = EXECUTOR.with(|exec| {
if let Ok(mut ex) = exec.try_borrow_mut() {
match self.priority {
Priority::High => ex.high_queue.push_back(self.task_id),
Priority::Low => ex.low_queue.push_back(self.task_id),
}
if ex.in_flush {
None
} else {
ex.try_schedule_flush()
}
} else {
PENDING_WAKES.with(|pw| {
pw.borrow_mut().push((self.task_id, self.priority));
});
None
}
});
if let Some(sched) = maybe_sched {
sched.schedule(Box::new(flush));
}
}
}
struct TaskState {
future: Pin<Box<dyn Future<Output = ()> + 'static>>,
priority: Priority,
scope_id: u64,
}
#[derive(Debug)]
pub struct PanicInfo {
pub task_id: u64,
pub scope_id: u64,
pub payload: Box<dyn std::any::Any + Send>,
}
pub struct Executor {
high_queue: VecDeque<TaskId>,
low_queue: VecDeque<TaskId>,
tasks: Vec<Option<TaskState>>,
free_slots: Vec<TaskId>,
next_task_id: TaskId,
is_flush_scheduled: bool,
in_flush: bool,
deferred_ops: Vec<DeferredOp>,
deferred_callbacks: Vec<Box<dyn FnOnce()>>,
flush_scheduler: Option<Rc<dyn ScheduleFlush>>,
time_source: Option<Rc<dyn TimeSource>>,
time_budget_ms: u64,
panic_hook: Option<Rc<dyn Fn(PanicInfo)>>,
}
struct DeferredOp {
f: Box<dyn FnOnce()>,
}
impl Executor {
fn new() -> Self {
Self {
high_queue: VecDeque::new(),
low_queue: VecDeque::new(),
tasks: Vec::new(),
free_slots: Vec::new(),
next_task_id: 0,
is_flush_scheduled: false,
in_flush: false,
deferred_ops: Vec::new(),
deferred_callbacks: Vec::new(),
flush_scheduler: None,
time_source: None,
time_budget_ms: 8,
panic_hook: None,
}
}
fn allocate_id(&mut self) -> TaskId {
if let Some(id) = self.free_slots.pop() {
return id;
}
let id = self.next_task_id;
self.next_task_id += 1;
self.tasks.push(None);
id
}
fn free_slot(&mut self, task_id: TaskId) {
self.tasks[task_id as usize] = None;
self.free_slots.push(task_id);
}
fn enqueue(&mut self, task_id: TaskId) {
let priority = match self.tasks.get(task_id as usize).and_then(Option::as_ref) {
Some(t) => t.priority,
None => return,
};
match priority {
Priority::High => self.high_queue.push_back(task_id),
Priority::Low => self.low_queue.push_back(task_id),
}
}
fn dequeue(&mut self) -> Option<TaskId> {
self.high_queue
.pop_front()
.or_else(|| self.low_queue.pop_front())
}
fn try_schedule_flush(&mut self) -> Option<Rc<dyn ScheduleFlush>> {
if self.is_flush_scheduled {
return None;
}
self.is_flush_scheduled = true;
self.flush_scheduler.clone()
}
pub(crate) fn now_ms(&self) -> u64 {
self.time_source.as_ref().map_or(0, |ts| ts.now_ms())
}
#[must_use]
pub fn active_task_count(&self) -> usize {
self.tasks.iter().filter(|t| t.is_some()).count()
}
}
thread_local! {
static EXECUTOR: Rc<RefCell<Executor>> = Rc::new(RefCell::new(Executor::new()));
static PENDING_WAKES: RefCell<Vec<(TaskId, Priority)>> = const { RefCell::new(Vec::new()) };
}
impl Executor {
#[must_use]
pub fn new_instance() -> Rc<RefCell<Executor>> {
Rc::new(RefCell::new(Executor::new()))
}
pub fn install_flush_scheduler(ex: &Rc<RefCell<Executor>>, sched: Rc<dyn ScheduleFlush>) {
ex.borrow_mut().flush_scheduler = Some(sched);
}
pub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>) {
ex.borrow_mut().time_source = Some(ts);
}
pub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64) {
ex.borrow_mut().time_budget_ms = budget_ms;
}
pub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>) {
ex.borrow_mut().panic_hook = Some(hook);
}
pub fn spawn(ex: &Rc<RefCell<Executor>>, future: impl Future<Output = ()> + 'static) {
let maybe_sched = {
let mut e = ex.borrow_mut();
let tid = e.allocate_id();
e.tasks[tid as usize] = Some(TaskState {
future: Box::pin(future),
priority: Priority::Low,
scope_id: 0,
});
e.enqueue(tid);
e.try_schedule_flush()
};
if let Some(sched) = maybe_sched {
let ex2 = Rc::clone(ex);
sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
}
}
#[allow(clippy::too_many_lines)]
pub fn flush_instance(ex: &Rc<RefCell<Executor>>) {
{
let mut e = ex.borrow_mut();
if e.in_flush {
#[cfg(debug_assertions)]
{
eprintln!(
"[auralis-task] WARNING: Executor::flush_instance called \
re-entrantly (already inside a flush). This is a no-op. \
Check for nested flush() calls in signal callbacks or \
ScheduleFlush implementations."
);
}
return;
}
e.in_flush = true;
}
let deferred = std::mem::take(&mut ex.borrow_mut().deferred_ops);
for op in deferred {
(op.f)();
}
{
let cb_start = ex.borrow().now_ms();
loop {
let callbacks = std::mem::take(&mut ex.borrow_mut().deferred_callbacks);
if callbacks.is_empty() {
break;
}
for cb in callbacks {
cb();
}
if ex.borrow().now_ms().saturating_sub(cb_start) >= ex.borrow().time_budget_ms {
if !ex.borrow().deferred_callbacks.is_empty() {
let (sched, ex2) = {
let mut e = ex.borrow_mut();
e.in_flush = false;
e.is_flush_scheduled = false;
(e.try_schedule_flush(), Rc::clone(ex))
};
if let Some(sched) = sched {
sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
}
return;
}
break;
}
}
}
let poll_start = ex.borrow().now_ms();
loop {
let task_id = ex.borrow_mut().dequeue();
let Some(tid) = task_id else {
let mut e = ex.borrow_mut();
e.is_flush_scheduled = false;
e.in_flush = false;
break;
};
let maybe_state = ex.borrow_mut().tasks[tid as usize].take();
if let Some(mut state) = maybe_state {
let priority = state.priority;
let scope_id = state.scope_id;
let scope = crate::scope::find_scope(scope_id);
if let Some(ref s) = scope {
if s.is_suspended() {
let mut e = ex.borrow_mut();
if e.tasks[tid as usize].is_none() {
e.tasks[tid as usize] = Some(state);
}
continue;
}
}
let waker = Waker::from(Arc::new(TaskWaker {
task_id: tid,
priority,
}));
let mut cx = Context::from_waker(&waker);
let prev_scope = crate::scope::get_scope_direct();
if scope.is_some() {
crate::scope::set_scope_direct(scope);
}
#[cfg(not(target_arch = "wasm32"))]
let result: Result<Poll<()>, Box<dyn std::any::Any + Send>> =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
state.future.as_mut().poll(&mut cx)
}));
#[cfg(target_arch = "wasm32")]
let poll = state.future.as_mut().poll(&mut cx);
crate::scope::set_scope_direct(prev_scope);
#[cfg(not(target_arch = "wasm32"))]
{
match result {
Ok(Poll::Ready(())) => {
ex.borrow_mut().free_slot(tid);
}
Err(payload) => {
let hook = ex.borrow().panic_hook.clone();
if let Some(h) = hook {
h(PanicInfo {
task_id: tid,
scope_id,
payload,
});
}
ex.borrow_mut().free_slot(tid);
}
Ok(Poll::Pending) => {
let mut e = ex.borrow_mut();
if e.tasks[tid as usize].is_none() {
e.tasks[tid as usize] = Some(state);
}
}
}
}
#[cfg(target_arch = "wasm32")]
{
match poll {
Poll::Ready(()) => {
ex.borrow_mut().free_slot(tid);
}
Poll::Pending => {
let mut e = ex.borrow_mut();
if e.tasks[tid as usize].is_none() {
e.tasks[tid as usize] = Some(state);
}
}
}
}
}
{
let elapsed = ex.borrow().now_ms().saturating_sub(poll_start);
if elapsed >= ex.borrow().time_budget_ms {
let (maybe_sched, ex_clone) = {
let mut e = ex.borrow_mut();
e.is_flush_scheduled = false;
e.in_flush = false;
let sched = if !e.high_queue.is_empty() || !e.low_queue.is_empty() {
e.try_schedule_flush()
} else {
None
};
(sched, Rc::clone(ex))
};
if let Some(sched) = maybe_sched {
sched.schedule(Box::new(move || Self::flush_instance(&ex_clone)));
}
break;
}
}
}
}
}
type ExecutorRef = Rc<RefCell<Executor>>;
thread_local! {
static CURRENT_EXECUTOR: RefCell<Option<ExecutorRef>> = const { RefCell::new(None) };
}
pub fn with_executor<R>(ex: &ExecutorRef, f: impl FnOnce() -> R) -> R {
CURRENT_EXECUTOR.with(|exec| {
let prev = exec.borrow_mut().replace(Rc::clone(ex));
let result = f();
*exec.borrow_mut() = prev;
result
})
}
fn current_executor() -> Option<ExecutorRef> {
CURRENT_EXECUTOR.with(|exec| exec.borrow().clone())
}
fn drain_pending_wakes() {
PENDING_WAKES.with(|pw| {
let wakes = std::mem::take(&mut *pw.borrow_mut());
if wakes.is_empty() {
return;
}
EXECUTOR.with(|exec| {
let maybe_sched = {
let mut ex = exec.borrow_mut();
for (id, priority) in wakes {
match priority {
Priority::High => ex.high_queue.push_back(id),
Priority::Low => ex.low_queue.push_back(id),
}
}
ex.try_schedule_flush()
};
if let Some(sched) = maybe_sched {
sched.schedule(Box::new(flush));
}
});
});
}
fn flush() {
EXECUTOR.with(Executor::flush_instance);
drain_pending_wakes();
}
pub fn init_flush_scheduler(sched: Rc<dyn ScheduleFlush>) {
EXECUTOR.with(|exec| exec.borrow_mut().flush_scheduler = Some(sched));
install_signal_hook_once();
}
fn install_signal_hook_once() {
use std::sync::OnceLock;
static INSTALLED: OnceLock<()> = OnceLock::new();
INSTALLED.get_or_init(|| {
auralis_signal::install_schedule_hook(Box::new(|cb: Box<dyn FnOnce()>| {
if let Some(ex) = current_executor() {
let maybe_sched = {
let mut e = ex.borrow_mut();
e.deferred_callbacks.push(cb);
if e.in_flush {
None
} else {
e.try_schedule_flush()
}
};
if let Some(sched) = maybe_sched {
let ex2 = Rc::clone(&ex);
sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
}
} else {
EXECUTOR.with(|exec| {
let maybe_sched = {
let mut ex = exec.borrow_mut();
ex.deferred_callbacks.push(cb);
if ex.in_flush {
None
} else {
ex.try_schedule_flush()
}
};
if let Some(sched) = maybe_sched {
sched.schedule(Box::new(flush));
}
});
}
}));
});
}
pub fn init_time_source(ts: Rc<dyn TimeSource>) {
EXECUTOR.with(|exec| exec.borrow_mut().time_source = Some(ts));
}
pub fn set_global_time_budget(budget_ms: u64) {
EXECUTOR.with(|exec| exec.borrow_mut().time_budget_ms = budget_ms);
}
pub fn set_panic_hook(hook: Rc<dyn Fn(PanicInfo)>) {
EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = Some(hook));
}
pub fn remove_panic_hook() {
EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = None);
}
pub fn spawn_global(future: impl Future<Output = ()> + 'static) {
spawn_global_with_priority(Priority::Low, future);
}
pub fn spawn_global_with_priority(priority: Priority, future: impl Future<Output = ()> + 'static) {
spawn_inner(Box::pin(future), priority, 0);
}
pub(crate) fn spawn_scoped(
priority: Priority,
scope_id: u64,
future: impl Future<Output = ()> + 'static,
) -> TaskId {
spawn_inner(Box::pin(future), priority, scope_id)
}
fn spawn_inner(
future: Pin<Box<dyn Future<Output = ()> + 'static>>,
priority: Priority,
scope_id: u64,
) -> TaskId {
EXECUTOR.with(|exec| {
let (task_id, maybe_sched) = {
let mut ex = exec.borrow_mut();
let task_id = ex.allocate_id();
ex.tasks[task_id as usize] = Some(TaskState {
future,
priority,
scope_id,
});
ex.enqueue(task_id);
let sched = ex.try_schedule_flush();
(task_id, sched)
};
if let Some(sched) = maybe_sched {
sched.schedule(Box::new(flush));
}
task_id
})
}
pub(crate) fn enqueue_scope_tasks(scope_id: u64) {
EXECUTOR.with(|exec| {
let task_ids: Vec<TaskId> = {
let ex = exec.borrow();
ex.tasks
.iter()
.enumerate()
.filter(|(_, slot)| slot.as_ref().is_some_and(|t| t.scope_id == scope_id))
.map(|(idx, _)| idx as TaskId)
.collect()
};
let maybe_sched = {
let mut ex = exec.borrow_mut();
for tid in task_ids {
ex.enqueue(tid);
}
if ex.in_flush {
None
} else {
ex.try_schedule_flush()
}
};
if let Some(sched) = maybe_sched {
sched.schedule(Box::new(flush));
}
});
}
pub(crate) fn cancel_scope_tasks(scope_id: u64) -> Vec<Pin<Box<dyn Future<Output = ()>>>> {
EXECUTOR.with(|exec| {
let mut ex = exec.borrow_mut();
let mut dropped = Vec::new();
for slot in &mut ex.tasks {
if let Some(ref t) = slot {
if t.scope_id == scope_id {
dropped.push(
slot.take()
.expect("task slot was None after is_some check")
.future,
);
}
}
}
let high: Vec<TaskId> = ex
.high_queue
.iter()
.filter(|id| ex.tasks[**id as usize].is_some())
.copied()
.collect();
ex.high_queue.clear();
ex.high_queue.extend(high);
let low: Vec<TaskId> = ex
.low_queue
.iter()
.filter(|id| ex.tasks[**id as usize].is_some())
.copied()
.collect();
ex.low_queue.clear();
ex.low_queue.extend(low);
let mut all_free: Vec<TaskId> = ex
.tasks
.iter()
.enumerate()
.filter(|(_, s)| s.is_none())
.map(|(i, _)| i as TaskId)
.chain(ex.free_slots.iter().copied())
.collect();
all_free.sort_unstable();
all_free.dedup();
ex.free_slots = all_free;
dropped
})
}
#[must_use = "yield_now() does nothing unless awaited"]
pub fn yield_now() -> YieldNow {
YieldNow { yielded: false }
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct YieldNow {
yielded: bool,
}
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.yielded {
Poll::Ready(())
} else {
self.yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
pub fn schedule_callback(f: Box<dyn FnOnce()>) {
EXECUTOR.with(|exec| {
let maybe_sched = {
let mut ex = exec.borrow_mut();
ex.deferred_callbacks.push(f);
if ex.in_flush {
None
} else {
ex.try_schedule_flush()
}
};
if let Some(sched) = maybe_sched {
sched.schedule(Box::new(flush));
}
});
}
pub fn set_deferred<T: 'static>(signal: &Signal<T>, value: T) {
let signal = signal.clone();
EXECUTOR.with(|exec| {
let maybe_sched = {
let mut ex = exec.borrow_mut();
ex.deferred_ops.push(DeferredOp {
f: Box::new(move || signal.set(value)),
});
ex.try_schedule_flush()
};
if let Some(sched) = maybe_sched {
sched.schedule(Box::new(flush));
}
});
}
pub fn reset_executor_for_test() {
PENDING_WAKES.with(|pw| pw.borrow_mut().clear());
EXECUTOR.with(|exec| {
let mut ex = exec.borrow_mut();
ex.high_queue.clear();
ex.low_queue.clear();
ex.tasks.clear();
ex.free_slots.clear();
ex.next_task_id = 0;
ex.is_flush_scheduled = false;
ex.in_flush = false;
ex.deferred_ops.clear();
ex.deferred_callbacks.clear();
ex.flush_scheduler = None;
ex.time_source = None;
});
crate::scope::clear_scope_registry();
}
#[cfg(test)]
pub(crate) fn debug_task_count() -> usize {
EXECUTOR.with(|exec| exec.borrow().tasks.iter().filter(|t| t.is_some()).count())
}
#[cfg(feature = "debug")]
pub(crate) fn debug_task_snapshot() -> Vec<(TaskId, Priority, u64)> {
EXECUTOR.with(|exec| {
let ex = exec.borrow();
let mut snap = Vec::new();
for (idx, slot) in ex.tasks.iter().enumerate() {
if let Some(ref t) = slot {
snap.push((idx as u64, t.priority, t.scope_id));
}
}
snap
})
}
#[cfg(feature = "debug")]
pub(crate) fn debug_queued_task_ids() -> Vec<TaskId> {
EXECUTOR.with(|exec| {
let ex = exec.borrow();
let mut ids: Vec<TaskId> = ex
.high_queue
.iter()
.chain(ex.low_queue.iter())
.copied()
.collect();
ids.sort_unstable();
ids.dedup();
ids
})
}
#[cfg(test)]
pub(crate) fn spawn_no_auto_flush(
priority: Priority,
future: impl Future<Output = ()> + 'static,
) -> TaskId {
EXECUTOR.with(|exec| {
let mut ex = exec.borrow_mut();
let task_id = ex.allocate_id();
ex.tasks[task_id as usize] = Some(TaskState {
future: Box::pin(future),
priority,
scope_id: 0,
});
ex.enqueue(task_id);
task_id
})
}
#[cfg(test)]
pub(crate) fn flush_all() {
flush();
}