#![allow(clippy::cast_possible_truncation)]
use std::cell::{Cell, RefCell};
use std::collections::{BTreeMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::task::{Context, Poll, Wake, Waker};
use auralis_signal::Signal;
use crate::Priority;
type TaskId = u64;
pub trait ScheduleFlush {
fn schedule(&self, callback: Box<dyn FnOnce()>);
}
#[cfg(test)]
pub struct TestScheduleFlush;
#[cfg(test)]
impl ScheduleFlush for TestScheduleFlush {
fn schedule(&self, callback: Box<dyn FnOnce()>) {
callback();
}
}
pub trait TimeSource {
fn now_ms(&self) -> u64;
}
#[cfg(test)]
pub struct TestTimeSource {
now: std::cell::Cell<u64>,
}
#[cfg(test)]
impl TestTimeSource {
#[must_use]
pub fn new(initial_ms: u64) -> Self {
Self {
now: std::cell::Cell::new(initial_ms),
}
}
pub fn set(&self, ms: u64) {
self.now.set(ms);
}
pub fn advance(&self, ms: u64) {
self.now.set(self.now.get() + ms);
}
}
#[cfg(test)]
impl TimeSource for TestTimeSource {
fn now_ms(&self) -> u64 {
self.now.get()
}
}
struct Slot {
weak: Weak<RefCell<Executor>>,
generation: u64,
}
thread_local! {
static SLOTS: RefCell<Vec<Slot>> = const { RefCell::new(Vec::new()) };
}
fn register_executor(weak: Weak<RefCell<Executor>>) -> (u64, u64) {
SLOTS.with(|slots| {
let mut slots = slots.borrow_mut();
for (i, slot) in slots.iter_mut().enumerate() {
if slot.weak.upgrade().is_none() {
slot.weak = weak;
slot.generation = slot.generation.wrapping_add(1);
return (i as u64, slot.generation);
}
}
let gen = 0;
slots.push(Slot {
weak,
generation: gen,
});
((slots.len() - 1) as u64, gen)
})
}
fn lookup_executor(slot_id: u64, generation: u64) -> Option<Rc<RefCell<Executor>>> {
SLOTS.with(|slots| {
let slots = slots.borrow();
let slot = slots.get(slot_id as usize)?;
if slot.generation != generation {
return None;
}
slot.weak.upgrade()
})
}
struct TaskWaker {
task_id: TaskId,
priority: Priority,
slot_id: u64,
generation: u64,
}
impl Wake for TaskWaker {
fn wake(self: Arc<Self>) {
let Some(exec) = lookup_executor(self.slot_id, self.generation) else {
return;
};
let maybe_sched = if let Ok(mut ex) = exec.try_borrow_mut() {
match self.priority {
Priority::High => ex.high_queue.push_back(self.task_id),
Priority::Low => ex.low_queue.push_back(self.task_id),
}
if ex.in_flush {
None
} else {
ex.try_schedule_flush()
}
} else {
PENDING_WAKES.with(|pw| {
pw.borrow_mut()
.push((self.task_id, self.slot_id, self.generation));
});
None
};
if let Some(sched) = maybe_sched {
let sid = self.slot_id;
let gen = self.generation;
sched.schedule(Box::new(move || {
if let Some(ex) = lookup_executor(sid, gen) {
Executor::flush_instance(&ex);
}
}));
}
}
}
struct TaskState {
future: Pin<Box<dyn Future<Output = ()> + 'static>>,
priority: Priority,
scope_id: u64,
timer_deadline: u64,
}
#[derive(Debug)]
pub struct PanicInfo {
pub task_id: u64,
pub scope_id: u64,
pub payload: Box<dyn std::any::Any + Send>,
}
pub struct Executor {
high_queue: VecDeque<TaskId>,
low_queue: VecDeque<TaskId>,
tasks: Vec<Option<TaskState>>,
free_slots: Vec<TaskId>,
next_task_id: TaskId,
is_flush_scheduled: bool,
in_flush: bool,
deferred_ops: Vec<DeferredOp>,
deferred_callbacks: Vec<Box<dyn FnOnce()>>,
flush_scheduler: Option<Rc<dyn ScheduleFlush>>,
time_source: Option<Rc<dyn TimeSource>>,
time_budget_ms: u64,
panic_hook: Option<Rc<dyn Fn(PanicInfo)>>,
timers: BTreeMap<u64, Vec<TaskId>>,
slot_id: u64,
generation: u64,
registered: bool,
}
thread_local! {
static CURRENT_POLLING_TASK: Cell<Option<TaskId>> = const { Cell::new(None) };
}
pub(crate) fn with_current_polling_task<R>(f: impl FnOnce(Option<TaskId>) -> R) -> R {
CURRENT_POLLING_TASK.with(|c| f(c.get()))
}
struct DeferredOp {
f: Box<dyn FnOnce()>,
}
impl Executor {
fn new() -> Self {
Self {
high_queue: VecDeque::new(),
low_queue: VecDeque::new(),
tasks: Vec::new(),
free_slots: Vec::new(),
next_task_id: 0,
is_flush_scheduled: false,
in_flush: false,
deferred_ops: Vec::new(),
deferred_callbacks: Vec::new(),
flush_scheduler: None,
time_source: None,
time_budget_ms: 8,
panic_hook: None,
timers: BTreeMap::new(),
slot_id: 0,
generation: 0,
registered: false,
}
}
fn allocate_id(&mut self) -> TaskId {
if let Some(id) = self.free_slots.pop() {
return id;
}
let id = self.next_task_id;
self.next_task_id += 1;
self.tasks.push(None);
id
}
fn free_slot(&mut self, task_id: TaskId) {
if let Some(Some(ref t)) = self.tasks.get(task_id as usize) {
if t.timer_deadline != 0 {
self.cleanup_timer(task_id, t.timer_deadline);
}
}
self.tasks[task_id as usize] = None;
self.free_slots.push(task_id);
}
fn cleanup_timer(&mut self, task_id: TaskId, deadline: u64) {
if let Some(tids) = self.timers.get_mut(&deadline) {
tids.retain(|id| *id != task_id);
if tids.is_empty() {
self.timers.remove(&deadline);
}
}
}
fn enqueue(&mut self, task_id: TaskId) {
let priority = match self.tasks.get(task_id as usize).and_then(Option::as_ref) {
Some(t) => t.priority,
None => return,
};
match priority {
Priority::High => self.high_queue.push_back(task_id),
Priority::Low => self.low_queue.push_back(task_id),
}
}
fn dequeue(&mut self) -> Option<TaskId> {
self.high_queue
.pop_front()
.or_else(|| self.low_queue.pop_front())
}
fn try_schedule_flush(&mut self) -> Option<Rc<dyn ScheduleFlush>> {
if self.is_flush_scheduled {
return None;
}
self.is_flush_scheduled = true;
self.flush_scheduler.clone()
}
pub(crate) fn now_ms(&self) -> u64 {
self.time_source.as_ref().map_or(0, |ts| ts.now_ms())
}
#[must_use]
pub fn active_task_count(&self) -> usize {
self.tasks.iter().filter(|t| t.is_some()).count()
}
}
thread_local! {
static EXECUTOR: Rc<RefCell<Executor>> = Rc::new(RefCell::new(Executor::new()));
static PENDING_WAKES: RefCell<Vec<(TaskId, u64, u64)>> =
const { RefCell::new(Vec::new()) };
}
fn ensure_global_registered() -> (u64, u64) {
SLOTS.with(|slots| {
let mut slots = slots.borrow_mut();
if slots.is_empty() {
let weak = EXECUTOR.with(Rc::downgrade);
slots.push(Slot {
weak,
generation: 0,
});
} else {
let global = EXECUTOR.with(Rc::clone);
let is_global = slots[0]
.weak
.upgrade()
.is_some_and(|ex| Rc::ptr_eq(&ex, &global));
if !is_global {
slots[0] = Slot {
weak: Rc::downgrade(&global),
generation: slots[0].generation.wrapping_add(1),
};
}
}
EXECUTOR.with(|ex| {
let mut e = ex.borrow_mut();
e.slot_id = 0;
e.generation = slots[0].generation;
e.registered = true;
});
let gen = slots[0].generation;
(0, gen)
})
}
impl Executor {
#[must_use]
pub fn new_instance() -> Rc<RefCell<Executor>> {
let ex = Rc::new(RefCell::new(Executor::new()));
let (slot_id, generation) = register_executor(Rc::downgrade(&ex));
{
let mut e = ex.borrow_mut();
e.slot_id = slot_id;
e.generation = generation;
e.registered = true;
}
ex
}
pub fn install_flush_scheduler(ex: &Rc<RefCell<Executor>>, sched: Rc<dyn ScheduleFlush>) {
ex.borrow_mut().flush_scheduler = Some(sched);
}
pub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>) {
ex.borrow_mut().time_source = Some(ts);
}
pub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64) {
ex.borrow_mut().time_budget_ms = budget_ms;
}
pub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>) {
ex.borrow_mut().panic_hook = Some(hook);
}
pub(crate) fn schedule_timer(ex: &Rc<RefCell<Executor>>, deadline_ms: u64, task_id: TaskId) {
let mut e = ex.borrow_mut();
e.timers.entry(deadline_ms).or_default().push(task_id);
if let Some(Some(ref mut t)) = e.tasks.get_mut(task_id as usize) {
t.timer_deadline = deadline_ms;
}
e.is_flush_scheduled = false;
let maybe_sched = e.try_schedule_flush();
drop(e);
if let Some(sched) = maybe_sched {
let ex2 = Rc::clone(ex);
sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
}
}
pub fn spawn(ex: &Rc<RefCell<Executor>>, future: impl Future<Output = ()> + 'static) {
let maybe_sched = {
let mut e = ex.borrow_mut();
let tid = e.allocate_id();
e.tasks[tid as usize] = Some(TaskState {
future: Box::pin(future),
priority: Priority::Low,
scope_id: 0,
timer_deadline: 0,
});
e.enqueue(tid);
e.try_schedule_flush()
};
if let Some(sched) = maybe_sched {
let ex2 = Rc::clone(ex);
sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
}
}
#[allow(clippy::too_many_lines)]
pub fn flush_instance(ex: &Rc<RefCell<Executor>>) {
{
let mut e = ex.borrow_mut();
if e.in_flush {
#[cfg(debug_assertions)]
{
eprintln!(
"[auralis-task] WARNING: Executor::flush_instance called \
re-entrantly (already inside a flush). This is a no-op. \
Check for nested flush() calls in signal callbacks or \
ScheduleFlush implementations."
);
}
return;
}
e.in_flush = true;
}
let prev_executor = CURRENT_EXECUTOR.with(|c| c.borrow_mut().replace(Rc::clone(ex)));
let _restore = RestoreExecutor(prev_executor);
{
let mut e = ex.borrow_mut();
let now = e.now_ms();
if now == 0 {
for (_, tasks) in std::mem::take(&mut e.timers) {
for tid in tasks {
if let Some(Some(ref mut t)) = e.tasks.get_mut(tid as usize) {
t.timer_deadline = 0;
}
e.enqueue(tid);
}
}
} else {
let expired: Vec<u64> =
e.timers.keys().copied().take_while(|&d| d <= now).collect();
for deadline in expired {
if let Some(tasks) = e.timers.remove(&deadline) {
for tid in tasks {
if let Some(Some(ref mut t)) = e.tasks.get_mut(tid as usize) {
t.timer_deadline = 0;
}
e.enqueue(tid);
}
}
}
}
}
let deferred = std::mem::take(&mut ex.borrow_mut().deferred_ops);
for op in deferred {
(op.f)();
}
{
let cb_start = ex.borrow().now_ms();
loop {
let callbacks = std::mem::take(&mut ex.borrow_mut().deferred_callbacks);
if callbacks.is_empty() {
break;
}
for cb in callbacks {
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(cb));
}
if ex.borrow().now_ms().saturating_sub(cb_start) >= ex.borrow().time_budget_ms {
if !ex.borrow().deferred_callbacks.is_empty() {
let (sched, ex2) = {
let mut e = ex.borrow_mut();
e.in_flush = false;
e.is_flush_scheduled = false;
(e.try_schedule_flush(), Rc::clone(ex))
};
if let Some(sched) = sched {
sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
}
return;
}
break;
}
}
}
let poll_start = ex.borrow().now_ms();
loop {
let task_id = ex.borrow_mut().dequeue();
let Some(tid) = task_id else {
let mut e = ex.borrow_mut();
e.is_flush_scheduled = false;
e.in_flush = false;
break;
};
let maybe_state = ex.borrow_mut().tasks[tid as usize].take();
if let Some(mut state) = maybe_state {
let priority = state.priority;
let scope_id = state.scope_id;
let scope = crate::scope::find_scope(scope_id);
if let Some(ref s) = scope {
if s.is_suspended() {
let mut e = ex.borrow_mut();
if e.tasks[tid as usize].is_none() {
e.tasks[tid as usize] = Some(state);
}
continue;
}
}
let (slot_id, gen) = {
let e = ex.borrow();
if e.registered {
(e.slot_id, e.generation)
} else {
drop(e);
ensure_global_registered()
}
};
let waker = Waker::from(Arc::new(TaskWaker {
task_id: tid,
priority,
slot_id,
generation: gen,
}));
let mut cx = Context::from_waker(&waker);
let prev_scope = crate::scope::get_scope_direct();
if scope.is_some() {
crate::scope::set_scope_direct(scope);
}
CURRENT_POLLING_TASK.with(|c| c.set(Some(tid)));
let result: Result<Poll<()>, Box<dyn std::any::Any + Send>> =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
state.future.as_mut().poll(&mut cx)
}));
CURRENT_POLLING_TASK.with(|c| c.set(None));
crate::scope::set_scope_direct(prev_scope);
let timer_dl = state.timer_deadline;
match result {
Ok(Poll::Ready(())) => {
if timer_dl != 0 {
ex.borrow_mut().cleanup_timer(tid, timer_dl);
}
ex.borrow_mut().free_slot(tid);
}
Err(payload) => {
if timer_dl != 0 {
ex.borrow_mut().cleanup_timer(tid, timer_dl);
}
let hook = ex.borrow().panic_hook.clone();
if let Some(h) = hook {
h(PanicInfo {
task_id: tid,
scope_id,
payload,
});
}
ex.borrow_mut().free_slot(tid);
}
Ok(Poll::Pending) => {
let mut e = ex.borrow_mut();
if e.tasks[tid as usize].is_none() {
e.tasks[tid as usize] = Some(state);
}
}
}
}
{
let elapsed = ex.borrow().now_ms().saturating_sub(poll_start);
if elapsed >= ex.borrow().time_budget_ms {
let (maybe_sched, ex_clone) = {
let mut e = ex.borrow_mut();
e.is_flush_scheduled = false;
e.in_flush = false;
let sched = if !e.high_queue.is_empty() || !e.low_queue.is_empty() {
e.try_schedule_flush()
} else {
None
};
(sched, Rc::clone(ex))
};
if let Some(sched) = maybe_sched {
sched.schedule(Box::new(move || Self::flush_instance(&ex_clone)));
}
break;
}
}
}
drain_pending_wakes();
}
}
pub(crate) type ExecutorRef = Rc<RefCell<Executor>>;
struct RestoreExecutor(Option<ExecutorRef>);
impl Drop for RestoreExecutor {
fn drop(&mut self) {
CURRENT_EXECUTOR.with(|c| {
*c.borrow_mut() = self.0.take();
});
}
}
thread_local! {
static CURRENT_EXECUTOR: RefCell<Option<ExecutorRef>> = const { RefCell::new(None) };
}
pub fn with_executor<R>(ex: &ExecutorRef, f: impl FnOnce() -> R) -> R {
CURRENT_EXECUTOR.with(|exec| {
let prev = exec.borrow_mut().replace(Rc::clone(ex));
let result = f();
*exec.borrow_mut() = prev;
result
})
}
fn current_executor() -> Option<ExecutorRef> {
CURRENT_EXECUTOR.with(|exec| exec.borrow().clone())
}
pub(crate) fn current_executor_instance() -> ExecutorRef {
current_executor().unwrap_or_else(|| EXECUTOR.with(Rc::clone))
}
pub(crate) fn current_time_ms() -> u64 {
current_executor_instance().borrow().now_ms()
}
fn drain_pending_wakes() {
PENDING_WAKES.with(|pw| {
let wakes = std::mem::take(&mut *pw.borrow_mut());
for (tid, slot_id, gen) in wakes {
let Some(exec) = lookup_executor(slot_id, gen) else {
continue;
};
exec.borrow_mut().enqueue(tid);
let maybe_sched = exec.borrow_mut().try_schedule_flush();
if let Some(sched) = maybe_sched {
let sid = slot_id;
let g = gen;
sched.schedule(Box::new(move || {
if let Some(ex) = lookup_executor(sid, g) {
Executor::flush_instance(&ex);
}
}));
}
}
});
}
fn flush() {
EXECUTOR.with(Executor::flush_instance);
}
pub fn init_flush_scheduler(sched: Rc<dyn ScheduleFlush>) {
EXECUTOR.with(|exec| exec.borrow_mut().flush_scheduler = Some(sched));
install_signal_hook_once();
}
fn install_signal_hook_once() {
use std::sync::OnceLock;
static INSTALLED: OnceLock<()> = OnceLock::new();
INSTALLED.get_or_init(|| {
auralis_signal::install_schedule_hook(Box::new(|cb: Box<dyn FnOnce()>| {
if let Some(ex) = current_executor() {
let maybe_sched = {
let mut e = ex.borrow_mut();
e.deferred_callbacks.push(cb);
if e.in_flush {
None
} else {
e.try_schedule_flush()
}
};
if let Some(sched) = maybe_sched {
let ex2 = Rc::clone(&ex);
sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
}
} else {
EXECUTOR.with(|exec| {
let maybe_sched = {
let mut ex = exec.borrow_mut();
ex.deferred_callbacks.push(cb);
if ex.in_flush {
None
} else {
ex.try_schedule_flush()
}
};
if let Some(sched) = maybe_sched {
sched.schedule(Box::new(flush));
}
});
}
}));
});
}
pub fn init_time_source(ts: Rc<dyn TimeSource>) {
EXECUTOR.with(|exec| exec.borrow_mut().time_source = Some(ts));
}
pub fn set_global_time_budget(budget_ms: u64) {
EXECUTOR.with(|exec| exec.borrow_mut().time_budget_ms = budget_ms);
}
pub fn set_panic_hook(hook: Rc<dyn Fn(PanicInfo)>) {
EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = Some(hook));
}
pub fn remove_panic_hook() {
EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = None);
}
pub fn spawn_global(future: impl Future<Output = ()> + 'static) {
spawn_global_with_priority(Priority::Low, future);
}
pub fn spawn_global_with_priority(priority: Priority, future: impl Future<Output = ()> + 'static) {
spawn_inner_on(&EXECUTOR.with(Rc::clone), Box::pin(future), priority, 0);
}
pub(crate) fn spawn_scoped_on(
ex: &Rc<RefCell<Executor>>,
priority: Priority,
scope_id: u64,
future: impl Future<Output = ()> + 'static,
) -> TaskId {
spawn_inner_on(ex, Box::pin(future), priority, scope_id)
}
fn spawn_inner_on(
ex: &Rc<RefCell<Executor>>,
future: Pin<Box<dyn Future<Output = ()> + 'static>>,
priority: Priority,
scope_id: u64,
) -> TaskId {
let (task_id, maybe_sched) = {
let mut e = ex.borrow_mut();
let task_id = e.allocate_id();
e.tasks[task_id as usize] = Some(TaskState {
future,
priority,
scope_id,
timer_deadline: 0,
});
e.enqueue(task_id);
let sched = e.try_schedule_flush();
(task_id, sched)
};
if let Some(sched) = maybe_sched {
let ex2 = Rc::clone(ex);
sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
}
task_id
}
pub(crate) fn enqueue_scope_tasks_on(ex: &ExecutorRef, scope_id: u64) {
let task_ids: Vec<TaskId> = {
let e = ex.borrow();
e.tasks
.iter()
.enumerate()
.filter(|(_, slot)| slot.as_ref().is_some_and(|t| t.scope_id == scope_id))
.map(|(idx, _)| idx as TaskId)
.collect()
};
let maybe_sched = {
let mut e = ex.borrow_mut();
for tid in &task_ids {
e.enqueue(*tid);
}
if e.in_flush {
None
} else {
e.try_schedule_flush()
}
};
if let Some(sched) = maybe_sched {
let ex2 = Rc::clone(ex);
sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
}
}
pub(crate) fn cancel_scope_tasks_on(
ex: &Rc<RefCell<Executor>>,
scope_id: u64,
) -> Vec<Pin<Box<dyn Future<Output = ()>>>> {
let mut e = ex.borrow_mut();
let mut dropped = Vec::new();
let mut timer_deadlines: Vec<(u64, TaskId)> = Vec::new();
for (tid, slot) in e.tasks.iter().enumerate() {
if let Some(ref t) = slot {
if t.scope_id == scope_id && t.timer_deadline != 0 {
timer_deadlines.push((t.timer_deadline, tid as TaskId));
}
}
}
for (dl, tid) in &timer_deadlines {
e.cleanup_timer(*tid, *dl);
}
for slot in &mut e.tasks {
if let Some(ref t) = slot {
if t.scope_id == scope_id {
if let Some(state) = slot.take() {
dropped.push(state.future);
}
}
}
}
let high: Vec<TaskId> = e
.high_queue
.iter()
.filter(|id| e.tasks[**id as usize].is_some())
.copied()
.collect();
e.high_queue.clear();
e.high_queue.extend(high);
let low: Vec<TaskId> = e
.low_queue
.iter()
.filter(|id| e.tasks[**id as usize].is_some())
.copied()
.collect();
e.low_queue.clear();
e.low_queue.extend(low);
let mut all_free: Vec<TaskId> = e
.tasks
.iter()
.enumerate()
.filter(|(_, s)| s.is_none())
.map(|(i, _)| i as TaskId)
.chain(e.free_slots.iter().copied())
.collect();
all_free.sort_unstable();
all_free.dedup();
e.free_slots = all_free;
dropped
}
#[must_use = "yield_now() does nothing unless awaited"]
pub fn yield_now() -> YieldNow {
YieldNow { yielded: false }
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct YieldNow {
yielded: bool,
}
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.yielded {
Poll::Ready(())
} else {
self.yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
pub fn schedule_callback(f: Box<dyn FnOnce()>) {
let exec = current_executor_instance();
let maybe_sched = {
let mut ex = exec.borrow_mut();
ex.deferred_callbacks.push(f);
if ex.in_flush {
None
} else {
ex.try_schedule_flush()
}
};
if let Some(sched) = maybe_sched {
let ex2 = Rc::clone(&exec);
sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
}
}
pub fn set_deferred<T: 'static>(signal: &Signal<T>, value: T) {
let signal = signal.clone();
let exec = current_executor_instance();
let maybe_sched = {
let mut ex = exec.borrow_mut();
ex.deferred_ops.push(DeferredOp {
f: Box::new(move || signal.set(value)),
});
ex.try_schedule_flush()
};
if let Some(sched) = maybe_sched {
let ex2 = Rc::clone(&exec);
sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
}
}
pub fn reset_executor_for_test() {
PENDING_WAKES.with(|pw| pw.borrow_mut().clear());
SLOTS.with(|s| s.borrow_mut().clear());
CURRENT_EXECUTOR.with(|c| *c.borrow_mut() = None);
EXECUTOR.with(|exec| {
let mut ex = exec.borrow_mut();
ex.high_queue.clear();
ex.low_queue.clear();
ex.tasks.clear();
ex.free_slots.clear();
ex.next_task_id = 0;
ex.is_flush_scheduled = false;
ex.in_flush = false;
ex.deferred_ops.clear();
ex.deferred_callbacks.clear();
ex.flush_scheduler = None;
ex.time_source = None;
ex.slot_id = 0;
ex.generation = 0;
ex.registered = false;
});
crate::scope::clear_scope_registry();
}
#[cfg(test)]
pub(crate) fn debug_task_count() -> usize {
EXECUTOR.with(|exec| exec.borrow().tasks.iter().filter(|t| t.is_some()).count())
}
#[cfg(feature = "debug")]
pub(crate) fn debug_task_snapshot() -> Vec<(TaskId, Priority, u64)> {
EXECUTOR.with(|exec| {
let ex = exec.borrow();
let mut snap = Vec::new();
for (idx, slot) in ex.tasks.iter().enumerate() {
if let Some(ref t) = slot {
snap.push((idx as u64, t.priority, t.scope_id));
}
}
snap
})
}
#[cfg(feature = "debug")]
pub(crate) fn debug_queued_task_ids() -> Vec<TaskId> {
EXECUTOR.with(|exec| {
let ex = exec.borrow();
let mut ids: Vec<TaskId> = ex
.high_queue
.iter()
.chain(ex.low_queue.iter())
.copied()
.collect();
ids.sort_unstable();
ids.dedup();
ids
})
}
#[cfg(test)]
pub(crate) fn spawn_no_auto_flush(
priority: Priority,
future: impl Future<Output = ()> + 'static,
) -> TaskId {
EXECUTOR.with(|exec| {
let mut ex = exec.borrow_mut();
let task_id = ex.allocate_id();
ex.tasks[task_id as usize] = Some(TaskState {
future: Box::pin(future),
priority,
scope_id: 0,
timer_deadline: 0,
});
ex.enqueue(task_id);
task_id
})
}
#[cfg(test)]
pub(crate) fn flush_all() {
flush();
}