auralis_task/executor.rs
1//! Single-threaded executor with priority scheduling, time-budget
2//! awareness, and deferred-signal support.
3//!
4//! ## Architecture
5//!
6//! The executor is stored via a pluggable [`ExecutorStorage`] strategy
7//! (defaulting to a per-thread slot). Before polling a task the future
8//! is **temporarily removed** so that the poll never holds an executor
9//! borrow — this allows nested spawns, wakes, and `set_deferred` calls
10//! without `RefCell` panics.
11//!
12//! The waker carries only a `task_id: u64`, making it trivially
13//! [`Send`] + [`Sync`] for [`Waker::from`].
14
15#![allow(clippy::cast_possible_truncation)]
16
17use std::cell::{Cell, RefCell};
18use std::collections::{BTreeMap, VecDeque};
19use std::future::Future;
20use std::pin::Pin;
21use std::rc::{Rc, Weak};
22use std::sync::Arc;
23use std::task::{Context, Poll, Wake, Waker};
24
25use auralis_signal::Signal;
26
27use crate::Priority;
28
29// ---------------------------------------------------------------------------
30// Types
31// ---------------------------------------------------------------------------
32
33type TaskId = u64;
34
35// ---------------------------------------------------------------------------
36// ScheduleFlush
37// ---------------------------------------------------------------------------
38
39/// Platform hook for scheduling a microtask callback.
40pub trait ScheduleFlush {
41 /// Request that `callback` runs at the next microtask boundary.
42 fn schedule(&self, callback: Box<dyn FnOnce()>);
43}
44
45/// A [`ScheduleFlush`] that fires the callback synchronously.
46///
47/// Makes the executor run-to-completion in unit tests without a browser
48/// event loop.
49#[cfg(test)]
50pub struct TestScheduleFlush;
51
52#[cfg(test)]
53impl ScheduleFlush for TestScheduleFlush {
54 fn schedule(&self, callback: Box<dyn FnOnce()>) {
55 callback();
56 }
57}
58
59// ---------------------------------------------------------------------------
60// TimeSource
61// ---------------------------------------------------------------------------
62
63/// High-resolution time source for the executor's time-budget
64/// accounting.
65///
66/// When registered via [`init_time_source`], the executor queries this
67/// before and after each task poll to decide whether it should yield
68/// control back to the host event loop (default budget: 8 ms).
69///
70/// In Wasm environments the implementation typically delegates to
71/// `performance.now()`. If no [`TimeSource`] is registered the time
72/// budget check is a no-op and the executor runs tasks until the
73/// queues are drained.
74pub trait TimeSource {
75 /// Return the current time in milliseconds.
76 fn now_ms(&self) -> u64;
77}
78
79/// A [`TimeSource`] whose value is explicitly controlled by the test.
80///
81/// Use [`set`](TestTimeSource::set) or [`advance`](TestTimeSource::advance)
82/// to simulate the passage of time during a flush cycle.
83#[cfg(test)]
84pub struct TestTimeSource {
85 now: std::cell::Cell<u64>,
86}
87
88#[cfg(test)]
89impl TestTimeSource {
90 /// Create a new [`TestTimeSource`] with the given initial time.
91 #[must_use]
92 pub fn new(initial_ms: u64) -> Self {
93 Self {
94 now: std::cell::Cell::new(initial_ms),
95 }
96 }
97
98 /// Set the current time to `ms` milliseconds.
99 pub fn set(&self, ms: u64) {
100 self.now.set(ms);
101 }
102
103 /// Advance the current time by `ms` milliseconds.
104 pub fn advance(&self, ms: u64) {
105 self.now.set(self.now.get() + ms);
106 }
107}
108
109#[cfg(test)]
110impl TimeSource for TestTimeSource {
111 fn now_ms(&self) -> u64 {
112 self.now.get()
113 }
114}
115
116// ---------------------------------------------------------------------------
117// TaskWaker — routes wakes to the correct executor via a slot table.
118//
119// Waker::from requires Send + Sync + 'static, so the waker cannot hold
120// an Rc<RefCell<Executor>>. Instead it stores a slot index + generation
121// number. The SLOTS thread_local maps (index, generation) → Weak<Executor>.
122// On wake, the generation is validated before the weak pointer is upgraded.
123// Dead slots are reclaimed when new executors are registered.
124// ---------------------------------------------------------------------------
125
126/// A registered executor slot. The `generation` counter distinguishes
127/// between successive executors that occupy the same slot index (e.g.
128/// after the previous one was dropped and a new one recycles the slot).
129struct Slot {
130 weak: Weak<RefCell<Executor>>,
131 /// Incremented (wrapping) every time this slot is reused.
132 /// A [`TaskWaker`] must present the generation it was created with;
133 /// a mismatch means the waker is stale and is silently ignored.
134 generation: u64,
135}
136
137thread_local! {
138 /// Slot 0 is reserved for the global executor. Instance executors
139 /// occupy subsequent slots. Dead slots (Weak::upgrade returns None)
140 /// are recycled in [`register_executor`].
141 static SLOTS: RefCell<Vec<Slot>> = const { RefCell::new(Vec::new()) };
142}
143
144/// Register an executor in the slot table, returning the assigned
145/// (`slot_id`, `generation`) pair. Dead slots are recycled in-place;
146/// if no dead slot is found a new entry is appended.
147fn register_executor(weak: Weak<RefCell<Executor>>) -> (u64, u64) {
148 SLOTS.with(|slots| {
149 let mut slots = slots.borrow_mut();
150 for (i, slot) in slots.iter_mut().enumerate() {
151 if slot.weak.upgrade().is_none() {
152 slot.weak = weak;
153 // Wrapping is safe: 2^64 reuses of a single slot
154 // would take ~10^14 years at 1 reuse/μs.
155 slot.generation = slot.generation.wrapping_add(1);
156 return (i as u64, slot.generation);
157 }
158 }
159 let gen = 0;
160 slots.push(Slot {
161 weak,
162 generation: gen,
163 });
164 ((slots.len() - 1) as u64, gen)
165 })
166}
167
168/// Look up an executor by slot id, validating the generation.
169fn lookup_executor(slot_id: u64, generation: u64) -> Option<Rc<RefCell<Executor>>> {
170 SLOTS.with(|slots| {
171 let slots = slots.borrow();
172 let slot = slots.get(slot_id as usize)?;
173 if slot.generation != generation {
174 return None;
175 }
176 slot.weak.upgrade()
177 })
178}
179
180struct TaskWaker {
181 task_id: TaskId,
182 priority: Priority,
183 slot_id: u64,
184 generation: u64,
185}
186
187impl Wake for TaskWaker {
188 fn wake(self: Arc<Self>) {
189 let Some(exec) = lookup_executor(self.slot_id, self.generation) else {
190 return;
191 };
192 let maybe_sched = if let Ok(mut ex) = exec.try_borrow_mut() {
193 match self.priority {
194 Priority::High => ex.high_queue.push_back(self.task_id),
195 Priority::Low => ex.low_queue.push_back(self.task_id),
196 }
197 if ex.in_flush {
198 None
199 } else {
200 ex.try_schedule_flush()
201 }
202 } else {
203 PENDING_WAKES.with(|pw| {
204 pw.borrow_mut()
205 .push((self.task_id, self.slot_id, self.generation));
206 });
207 None
208 };
209 if let Some(sched) = maybe_sched {
210 let sid = self.slot_id;
211 let gen = self.generation;
212 sched.schedule(Box::new(move || {
213 if let Some(ex) = lookup_executor(sid, gen) {
214 Executor::flush_instance(&ex);
215 }
216 }));
217 }
218 }
219}
220
221// ---------------------------------------------------------------------------
222// TaskState
223// ---------------------------------------------------------------------------
224
225struct TaskState {
226 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
227 priority: Priority,
228 scope_id: u64,
229 /// Key in [`Executor::timers`] for this task's pending sleep,
230 /// or 0 if the task is not waiting on a timer.
231 timer_deadline: u64,
232 /// Number of times this task has been polled.
233 total_poll_count: u64,
234 /// Microseconds spent in the most recent poll.
235 last_poll_duration_us: u64,
236}
237
238// ---------------------------------------------------------------------------
239// Executor
240// ---------------------------------------------------------------------------
241
242/// Information about a task panic, passed to the user-registered
243/// [`set_panic_hook`].
244#[derive(Debug)]
245pub struct PanicInfo {
246 /// The executor-assigned task id.
247 pub task_id: u64,
248 /// The scope that owned the task (0 for global tasks).
249 pub scope_id: u64,
250 /// The boxed panic payload.
251 pub payload: Box<dyn std::any::Any + Send>,
252}
253
254/// A single-threaded async task executor with priority queues.
255///
256/// Each [`Executor`] manages its own task slots, ready queues, and
257/// deferred callback buffers. Use [`Executor::new_instance`] to create
258/// an isolated executor (e.g. per SSR request), or use the global
259/// thread-local executor via [`spawn_global`](crate::spawn_global).
260pub struct Executor {
261 high_queue: VecDeque<TaskId>,
262 low_queue: VecDeque<TaskId>,
263 tasks: Vec<Option<TaskState>>,
264 free_slots: Vec<TaskId>,
265 next_task_id: TaskId,
266 is_flush_scheduled: bool,
267 in_flush: bool,
268 deferred_ops: Vec<DeferredOp>,
269 /// Callbacks pushed by `Signal::set` via the schedule hook.
270 /// Drained at the start of every flush before polling tasks.
271 ///
272 /// Unbounded by design — in a single-threaded Wasm context, a tight
273 /// loop of signal sets will block the UI thread anyway, so adding a
274 /// capacity limit wouldn't improve the situation. SSR / multi-tenant
275 /// users should ensure that application code doesn't produce
276 /// unbounded signal churn within a single request.
277 deferred_callbacks: Vec<Box<dyn FnOnce()>>,
278 flush_scheduler: Option<Rc<dyn ScheduleFlush>>,
279 time_source: Option<Rc<dyn TimeSource>>,
280 /// Maximum milliseconds to spend inside a single flush before
281 /// yielding back to the host event loop. Default: 8 ms.
282 time_budget_ms: u64,
283 /// Optional cap on the number of deferred signal callbacks that can
284 /// accumulate between two flushes. Exceeding this limit triggers a
285 /// panic — useful as a safety net in SSR / multi-tenant deployments
286 /// where a runaway signal loop could OOM the process.
287 ///
288 /// Default: `None` (no limit).
289 max_deferred_callbacks: Option<usize>,
290 /// Optional hook invoked when a spawned task panics.
291 panic_hook: Option<Rc<dyn Fn(PanicInfo)>>,
292 /// Timer queue: map from deadline (ms) to task ids that should be
293 /// woken when that deadline expires. Processed at the start of
294 /// every flush.
295 timers: BTreeMap<u64, Vec<TaskId>>,
296 /// Slot index and generation in [`SLOTS`] for routing wakes back
297 /// to this executor. Set by [`new_instance`] or lazily for the
298 /// global executor.
299 slot_id: u64,
300 generation: u64,
301 /// Whether this executor has been registered in [`SLOTS`].
302 registered: bool,
303}
304
305// Set by the executor before polling a task, cleared afterward.
306// Lets futures discover their task id without threading it through
307// layers of combinators.
308thread_local! {
309 static CURRENT_POLLING_TASK: Cell<Option<TaskId>> = const { Cell::new(None) };
310}
311
312pub(crate) fn with_current_polling_task<R>(f: impl FnOnce(Option<TaskId>) -> R) -> R {
313 CURRENT_POLLING_TASK.with(|c| f(c.get()))
314}
315
316struct DeferredOp {
317 f: Box<dyn FnOnce()>,
318}
319
320impl Executor {
321 fn new() -> Self {
322 Self {
323 high_queue: VecDeque::new(),
324 low_queue: VecDeque::new(),
325 tasks: Vec::new(),
326 free_slots: Vec::new(),
327 next_task_id: 0,
328 is_flush_scheduled: false,
329 in_flush: false,
330 deferred_ops: Vec::new(),
331 deferred_callbacks: Vec::new(),
332 flush_scheduler: None,
333 time_source: None,
334 time_budget_ms: 8,
335 max_deferred_callbacks: None,
336 panic_hook: None,
337 timers: BTreeMap::new(),
338 slot_id: 0,
339 generation: 0,
340 registered: false,
341 }
342 }
343
344 fn allocate_id(&mut self) -> TaskId {
345 if let Some(id) = self.free_slots.pop() {
346 return id;
347 }
348 let id = self.next_task_id;
349 self.next_task_id += 1;
350 self.tasks.push(None);
351 id
352 }
353
354 /// Release a task slot back to the free list.
355 ///
356 /// **Caller must ensure** that `task_id` has not already been freed
357 /// (e.g. via [`cancel_task`] or [`cancel_scope_tasks_on`]). This
358 /// method unconditionally pushes to `free_slots` — pushing the same
359 /// id twice would cause [`allocate_id`] to hand it out twice.
360 fn free_slot(&mut self, task_id: TaskId) {
361 // Clean up any pending timer for this task so a recycled
362 // task ID is not spuriously woken by an old deadline.
363 // This works when the slot is still occupied (scope cancel
364 // path). For normal completion (Poll::Ready), the slot is
365 // already None and the caller must call cleanup_timer first.
366 if let Some(Some(ref t)) = self.tasks.get(task_id as usize) {
367 if t.timer_deadline != 0 {
368 self.cleanup_timer(task_id, t.timer_deadline);
369 }
370 }
371 self.tasks[task_id as usize] = None;
372 self.free_slots.push(task_id);
373 }
374
375 /// Remove a timer entry for `task_id` from the timer map.
376 fn cleanup_timer(&mut self, task_id: TaskId, deadline: u64) {
377 if let Some(tids) = self.timers.get_mut(&deadline) {
378 tids.retain(|id| *id != task_id);
379 if tids.is_empty() {
380 self.timers.remove(&deadline);
381 }
382 }
383 }
384
385 fn enqueue(&mut self, task_id: TaskId) {
386 let priority = match self.tasks.get(task_id as usize).and_then(Option::as_ref) {
387 Some(t) => t.priority,
388 None => return,
389 };
390 match priority {
391 Priority::High => self.high_queue.push_back(task_id),
392 Priority::Low => self.low_queue.push_back(task_id),
393 }
394 }
395
396 fn dequeue(&mut self) -> Option<TaskId> {
397 self.high_queue
398 .pop_front()
399 .or_else(|| self.low_queue.pop_front())
400 }
401
402 /// Mark that a flush is needed and return the scheduler if one is
403 /// registered. The caller **must** invoke the scheduler **after**
404 /// releasing the executor borrow.
405 fn try_schedule_flush(&mut self) -> Option<Rc<dyn ScheduleFlush>> {
406 if self.is_flush_scheduled {
407 return None;
408 }
409 self.is_flush_scheduled = true;
410 self.flush_scheduler.clone()
411 }
412
413 /// Return the current time in ms, or 0 if no [`TimeSource`] is
414 /// registered. When this returns 0 the time-budget check is
415 /// effectively a no-op.
416 pub(crate) fn now_ms(&self) -> u64 {
417 self.time_source.as_ref().map_or(0, |ts| ts.now_ms())
418 }
419
420 /// Return the number of currently active (not-yet-completed) tasks.
421 ///
422 /// Used by streaming SSR to determine whether the stream should
423 /// wait for more work or terminate.
424 #[must_use]
425 pub fn active_task_count(&self) -> usize {
426 self.tasks.iter().filter(|t| t.is_some()).count()
427 }
428}
429
430// ---------------------------------------------------------------------------
431// Thread-local globals (default storage)
432// ---------------------------------------------------------------------------
433
434thread_local! {
435 static EXECUTOR: Rc<RefCell<Executor>> = Rc::new(RefCell::new(Executor::new()));
436 static PENDING_WAKES: RefCell<Vec<(TaskId, u64, u64)>> =
437 const { RefCell::new(Vec::new()) };
438}
439
440/// Ensure the global executor is registered in slot 0 (lazy, idempotent).
441/// Returns (`slot_id`, `generation`) for the global executor.
442fn ensure_global_registered() -> (u64, u64) {
443 SLOTS.with(|slots| {
444 let mut slots = slots.borrow_mut();
445 if slots.is_empty() {
446 let weak = EXECUTOR.with(Rc::downgrade);
447 slots.push(Slot {
448 weak,
449 generation: 0,
450 });
451 } else {
452 // Verify slot 0 still holds the global executor.
453 let global = EXECUTOR.with(Rc::clone);
454 let is_global = slots[0]
455 .weak
456 .upgrade()
457 .is_some_and(|ex| Rc::ptr_eq(&ex, &global));
458 if !is_global {
459 slots[0] = Slot {
460 weak: Rc::downgrade(&global),
461 generation: slots[0].generation.wrapping_add(1),
462 };
463 }
464 }
465 // Mark the global executor as registered so flush_instance
466 // doesn't call this function again on every flush.
467 EXECUTOR.with(|ex| {
468 let mut e = ex.borrow_mut();
469 e.slot_id = 0;
470 e.generation = slots[0].generation;
471 e.registered = true;
472 });
473 let gen = slots[0].generation;
474 (0, gen)
475 })
476}
477
478// ---------------------------------------------------------------------------
479// Executor instance methods (for isolated executors, e.g. SSR)
480// ---------------------------------------------------------------------------
481
482impl Executor {
483 /// Create a new isolated executor, wrapped for shared access.
484 ///
485 /// The returned executor is independent of the global thread-local
486 /// executor. Use [`with_executor`] to make it the current executor
487 /// for the duration of a closure, so that spawned tasks and signal
488 /// callbacks are routed to it.
489 #[must_use]
490 pub fn new_instance() -> Rc<RefCell<Executor>> {
491 let ex = Rc::new(RefCell::new(Executor::new()));
492 // Register in the slot table so TaskWaker can find this executor.
493 let (slot_id, generation) = register_executor(Rc::downgrade(&ex));
494 {
495 let mut e = ex.borrow_mut();
496 e.slot_id = slot_id;
497 e.generation = generation;
498 e.registered = true;
499 }
500 ex
501 }
502
503 /// Install a flush scheduler on this executor instance.
504 pub fn install_flush_scheduler(ex: &Rc<RefCell<Executor>>, sched: Rc<dyn ScheduleFlush>) {
505 ex.borrow_mut().flush_scheduler = Some(sched);
506 }
507
508 /// Install a time source on this executor instance.
509 pub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>) {
510 ex.borrow_mut().time_source = Some(ts);
511 }
512
513 /// Set the maximum time (in milliseconds) a single flush may spend
514 /// before yielding back to the host event loop.
515 ///
516 /// The default is 8 ms (~120 fps frame budget, leaving time for the
517 /// browser to render between flushes). Set to `u64::MAX` to disable
518 /// time-budget yielding (flush runs to completion).
519 ///
520 /// # Semantics
521 ///
522 /// The budget is checked **between** task polls — the currently
523 /// executing task is never interrupted. When the budget is exhausted
524 /// the executor sets `in_flush = false` and schedules a follow-up
525 /// flush so the remaining ready tasks will be polled on the next
526 /// microtask tick. This is cooperative (`.await`-bound) yielding,
527 /// not preemptive.
528 ///
529 /// This affects **this executor only**. For the global thread-local
530 /// executor use [`set_global_time_budget`].
531 pub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64) {
532 ex.borrow_mut().time_budget_ms = budget_ms;
533 }
534
535 /// Set a safety cap on the deferred signal callback queue.
536 ///
537 /// When set to `Some(n)`, the executor will panic if more than `n`
538 /// deferred callbacks accumulate between two flush cycles. This is a
539 /// safety net for SSR / multi-tenant servers where a runaway signal
540 /// loop could exhaust memory — in a single-threaded Wasm context,
541 /// unbounded accumulation is acceptable because it blocks the UI
542 /// thread anyway.
543 ///
544 /// Default: `None` (no limit).
545 pub fn set_max_deferred_callbacks(ex: &Rc<RefCell<Executor>>, limit: Option<usize>) {
546 ex.borrow_mut().max_deferred_callbacks = limit;
547 }
548
549 /// Register a callback invoked whenever a spawned task panics.
550 ///
551 /// The default is no hook — panicking tasks are silently removed
552 /// from the executor (the same behaviour as a task returning
553 /// `Poll::Ready(())`).
554 ///
555 /// # Example
556 ///
557 /// ```rust,ignore
558 /// Executor::set_panic_hook(&ex, Rc::new(|info| {
559 /// eprintln!("task {} in scope {} panicked", info.task_id, info.scope_id);
560 /// }));
561 /// ```
562 pub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>) {
563 ex.borrow_mut().panic_hook = Some(hook);
564 }
565
566 /// Register a timer: when `now_ms() >= deadline_ms`, enqueue
567 /// `task_id` so it gets polled on the next flush.
568 pub(crate) fn schedule_timer(ex: &Rc<RefCell<Executor>>, deadline_ms: u64, task_id: TaskId) {
569 let mut e = ex.borrow_mut();
570 // If this task already has a pending timer (e.g. previous SleepFuture
571 // was dropped via select!), clean up the old entry so the timer map
572 // doesn't accumulate stale deadlines.
573 let old_deadline = e
574 .tasks
575 .get(task_id as usize)
576 .and_then(Option::as_ref)
577 .map_or(0, |t| t.timer_deadline);
578 if old_deadline != 0 {
579 e.cleanup_timer(task_id, old_deadline);
580 }
581 e.timers.entry(deadline_ms).or_default().push(task_id);
582 // Set the reverse index so cancel_scope_tasks can find this entry.
583 if let Some(Some(ref mut t)) = e.tasks.get_mut(task_id as usize) {
584 t.timer_deadline = deadline_ms;
585 }
586 // Request a flush so the timer is checked.
587 e.is_flush_scheduled = false;
588 let maybe_sched = e.try_schedule_flush();
589 drop(e);
590 if let Some(sched) = maybe_sched {
591 let ex2 = Rc::clone(ex);
592 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
593 }
594 }
595
596 /// Spawn a future on this executor instance.
597 pub fn spawn(ex: &Rc<RefCell<Executor>>, future: impl Future<Output = ()> + 'static) {
598 let maybe_sched = {
599 let mut e = ex.borrow_mut();
600 let tid = e.allocate_id();
601 e.tasks[tid as usize] = Some(TaskState {
602 future: Box::pin(future),
603 priority: Priority::Low,
604 scope_id: 0,
605 timer_deadline: 0,
606 total_poll_count: 0,
607 last_poll_duration_us: 0,
608 });
609 e.enqueue(tid);
610 e.try_schedule_flush()
611 };
612 if let Some(sched) = maybe_sched {
613 let ex2 = Rc::clone(ex);
614 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
615 }
616 }
617
618 /// Run a full flush cycle on this executor instance.
619 ///
620 /// Mirrors the global flush cycle but operates on an
621 /// isolated executor (used for SSR). Includes all the same
622 /// protections: `catch_unwind`, suspend checks, time-budget
623 /// yielding, and callback-drain budget.
624 #[allow(clippy::too_many_lines)]
625 pub fn flush_instance(ex: &Rc<RefCell<Executor>>) {
626 // Guard against re-entrant flushes.
627 {
628 let mut e = ex.borrow_mut();
629 if e.in_flush {
630 #[cfg(debug_assertions)]
631 {
632 eprintln!(
633 "[auralis-task] WARNING: Executor::flush_instance called \
634 re-entrantly (already inside a flush). This is a no-op. \
635 Check for nested flush() calls in signal callbacks or \
636 ScheduleFlush implementations."
637 );
638 }
639 return;
640 }
641 e.in_flush = true;
642 }
643
644 // Set this executor as the current one so that TaskWaker
645 // (which cannot hold an Rc) can discover it via thread-local.
646 // Restore on scope exit (including early returns for time-budget
647 // yielding and re-entrancy).
648 let prev_executor = CURRENT_EXECUTOR.with(|c| c.borrow_mut().replace(Rc::clone(ex)));
649 let _restore = RestoreExecutor(prev_executor);
650
651 // Step 0: drain expired timers.
652 {
653 let mut e = ex.borrow_mut();
654 let now = e.now_ms();
655 // When no TimeSource is registered (now == 0), expire all
656 // timers — they've already been woken via wake_by_ref and
657 // just need to be re-polled.
658 if now == 0 {
659 for (_, tasks) in std::mem::take(&mut e.timers) {
660 for tid in tasks {
661 // Clear the reverse index since the timer has fired.
662 if let Some(Some(ref mut t)) = e.tasks.get_mut(tid as usize) {
663 t.timer_deadline = 0;
664 }
665 e.enqueue(tid);
666 }
667 }
668 } else {
669 let expired: Vec<u64> =
670 e.timers.keys().copied().take_while(|&d| d <= now).collect();
671 for deadline in expired {
672 if let Some(tasks) = e.timers.remove(&deadline) {
673 for tid in tasks {
674 if let Some(Some(ref mut t)) = e.tasks.get_mut(tid as usize) {
675 t.timer_deadline = 0;
676 }
677 e.enqueue(tid);
678 }
679 }
680 }
681 }
682 }
683
684 // Step 1: deferred ops.
685 let deferred = std::mem::take(&mut ex.borrow_mut().deferred_ops);
686 for op in deferred {
687 (op.f)();
688 }
689
690 // Steps 2+3 may need to re-run if task polling queues new
691 // signal callbacks (re-entrant cross-scope propagation).
692 for _pass in 0..3_u8 {
693 {
694 let cb_start = ex.borrow().now_ms();
695 loop {
696 let callbacks = std::mem::take(&mut ex.borrow_mut().deferred_callbacks);
697 if callbacks.is_empty() {
698 break;
699 }
700 for cb in callbacks {
701 // Isolate each callback so a panic in one subscriber
702 // doesn't block the remaining notifications or wedge
703 // the executor (in_flush stays true on unwind).
704 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(cb));
705 }
706 if ex.borrow().now_ms().saturating_sub(cb_start) >= ex.borrow().time_budget_ms {
707 if !ex.borrow().deferred_callbacks.is_empty() {
708 let (sched, ex2) = {
709 let mut e = ex.borrow_mut();
710 e.in_flush = false;
711 e.is_flush_scheduled = false;
712 (e.try_schedule_flush(), Rc::clone(ex))
713 };
714 if let Some(sched) = sched {
715 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
716 }
717 return;
718 }
719 break;
720 }
721 }
722 }
723
724 // Step 3: main poll loop with time-budget check.
725 let poll_start = ex.borrow().now_ms();
726 loop {
727 let task_id = ex.borrow_mut().dequeue();
728 let Some(tid) = task_id else {
729 let mut e = ex.borrow_mut();
730 e.is_flush_scheduled = false;
731 e.in_flush = false;
732 break;
733 };
734
735 // Take the task out so the poll doesn't hold an executor borrow.
736 let maybe_state = ex.borrow_mut().tasks[tid as usize].take();
737 if let Some(mut state) = maybe_state {
738 let priority = state.priority;
739 let scope_id = state.scope_id;
740
741 // Check if the owning scope is suspended.
742 let scope = crate::scope::find_scope(scope_id);
743 if let Some(ref s) = scope {
744 if s.is_suspended() {
745 let mut e = ex.borrow_mut();
746 if e.tasks[tid as usize].is_none() {
747 e.tasks[tid as usize] = Some(state);
748 }
749 continue;
750 }
751 }
752
753 // Ensure the executor is registered in the slot table.
754 // Must not call ensure_global_registered while holding
755 // a borrow on ex (it borrows the global EXECUTOR).
756 let (slot_id, gen) = {
757 let e = ex.borrow();
758 if e.registered {
759 (e.slot_id, e.generation)
760 } else {
761 drop(e);
762 ensure_global_registered()
763 }
764 };
765 let waker = Waker::from(Arc::new(TaskWaker {
766 task_id: tid,
767 priority,
768 slot_id,
769 generation: gen,
770 }));
771 let mut cx = Context::from_waker(&waker);
772
773 // Inject owning scope.
774 let prev_scope = crate::scope::get_scope_direct();
775 if scope.is_some() {
776 crate::scope::set_scope_direct(scope);
777 }
778
779 // Let futures discover their task id (used by timer::sleep).
780 // Save and restore so that a nested flush (sync scheduler)
781 // doesn't leave the outer task without its id afterward.
782 let prev_polling = CURRENT_POLLING_TASK.with(|c| c.replace(Some(tid)));
783
784 // Task isolation + timing.
785 state.total_poll_count = state.total_poll_count.wrapping_add(1);
786 let t0 = auralis_signal::now_us();
787 let result: Result<Poll<()>, Box<dyn std::any::Any + Send>> =
788 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
789 state.future.as_mut().poll(&mut cx)
790 }));
791 let elapsed = auralis_signal::now_us().saturating_sub(t0);
792
793 CURRENT_POLLING_TASK.with(|c| c.set(prev_polling));
794 crate::scope::set_scope_direct(prev_scope);
795
796 // Extract timer_deadline before state is dropped, so
797 // we can clean up the timer entry (free_slot can't
798 // read it because the slot is already None).
799 let timer_dl = state.timer_deadline;
800
801 state.last_poll_duration_us = elapsed;
802 match result {
803 Ok(Poll::Ready(())) => {
804 if timer_dl != 0 {
805 ex.borrow_mut().cleanup_timer(tid, timer_dl);
806 }
807 ex.borrow_mut().free_slot(tid);
808 }
809 Err(payload) => {
810 if timer_dl != 0 {
811 ex.borrow_mut().cleanup_timer(tid, timer_dl);
812 }
813 let hook = ex.borrow().panic_hook.clone();
814 if let Some(h) = hook {
815 h(PanicInfo {
816 task_id: tid,
817 scope_id,
818 payload,
819 });
820 }
821 ex.borrow_mut().free_slot(tid);
822 }
823 Ok(Poll::Pending) => {
824 let mut e = ex.borrow_mut();
825 if e.tasks[tid as usize].is_none() {
826 e.tasks[tid as usize] = Some(state);
827 }
828 }
829 }
830 }
831
832 // Time budget check.
833 {
834 let elapsed = ex.borrow().now_ms().saturating_sub(poll_start);
835 if elapsed >= ex.borrow().time_budget_ms {
836 let (maybe_sched, ex_clone) = {
837 let mut e = ex.borrow_mut();
838 e.is_flush_scheduled = false;
839 e.in_flush = false;
840 let sched = if !e.high_queue.is_empty() || !e.low_queue.is_empty() {
841 e.try_schedule_flush()
842 } else {
843 None
844 };
845 (sched, Rc::clone(ex))
846 };
847 if let Some(sched) = maybe_sched {
848 sched.schedule(Box::new(move || Self::flush_instance(&ex_clone)));
849 }
850 break;
851 }
852 }
853 }
854
855 // Drain any wakes that were buffered while the executor RefCell
856 // was borrowed (PENDING_WAKES fallback in TaskWaker::wake).
857 drain_pending_wakes();
858
859 // Continue only if signal callbacks accumulated during
860 // polling and there are tasks to wake.
861 if ex.borrow().deferred_callbacks.is_empty() {
862 break;
863 }
864 } // end passes loop
865 }
866}
867
868// ---------------------------------------------------------------------------
869// Current-executor storage — injectable, defaults to thread-local
870// ---------------------------------------------------------------------------
871
872pub(crate) type ExecutorRef = Rc<RefCell<Executor>>;
873
874/// RAII guard that restores the previous executor when dropped.
875struct RestoreExecutor(Option<ExecutorRef>);
876
877impl Drop for RestoreExecutor {
878 fn drop(&mut self) {
879 CURRENT_EXECUTOR.with(|c| {
880 *c.borrow_mut() = self.0.take();
881 });
882 }
883}
884
885thread_local! {
886 static CURRENT_EXECUTOR: RefCell<Option<ExecutorRef>> = const { RefCell::new(None) };
887}
888
889/// Run `f` with `ex` set as the current executor.
890///
891/// Signal callbacks and `spawn_global` calls inside `f` will be routed
892/// to `ex` instead of the global thread-local executor. Restores the
893/// previous executor afterward.
894///
895/// # Signal routing constraints
896///
897/// Auralis uses a **single global schedule hook** (installed once by the
898/// first call to [`init_flush_scheduler`]) that decides where signal
899/// notifications land by checking the current executor **at the time the
900/// notification fires**, not at the time `Signal::set` is called.
901///
902/// This design implies two hard requirements for multi-instance users:
903///
904/// 1. **`init_flush_scheduler` must be called at least once** — without
905/// it, `Signal::set` falls back to synchronous callback execution,
906/// which breaks the deferred-notification model and can cause
907/// re-entrant borrow panics.
908/// 2. **The instance executor must still be "current" when the flush
909/// runs** — if `with_executor` has already exited, deferred callbacks
910/// from signals set inside `f` will be routed to the global executor
911/// (or synchronously if no global hook is installed).
912///
913/// For the typical single-threaded case (Wasm, game loop, CLI), both
914/// requirements are satisfied trivially: call `init_flush_scheduler`
915/// once at startup and never use `with_executor`. For SSR / multi-tenant
916/// servers, ensure that `with_executor` wraps the entire request
917/// lifecycle — from signal creation through the final flush.
918///
919/// # Example
920///
921/// ```rust,ignore
922/// use auralis_task::Executor;
923///
924/// let ex = Executor::new_instance();
925/// Executor::install_flush_scheduler(&ex, my_scheduler);
926/// auralis_task::with_executor(&ex, || {
927/// // Signal notifications and task spawns here go to `ex`.
928/// });
929/// ```
930pub fn with_executor<R>(ex: &ExecutorRef, f: impl FnOnce() -> R) -> R {
931 CURRENT_EXECUTOR.with(|exec| {
932 let prev = exec.borrow_mut().replace(Rc::clone(ex));
933 let result = f();
934 *exec.borrow_mut() = prev;
935 result
936 })
937}
938
939/// Return the current executor, if any.
940///
941/// If no executor has been set via [`with_executor`], returns `None` —
942/// callers should fall back to the global thread-local executor.
943fn current_executor() -> Option<ExecutorRef> {
944 CURRENT_EXECUTOR.with(|exec| exec.borrow().clone())
945}
946
947/// Return the currently active executor instance.
948///
949/// If [`with_executor`] was used to set an instance executor, returns
950/// that; otherwise returns the global thread-local executor.
951pub(crate) fn current_executor_instance() -> ExecutorRef {
952 current_executor().unwrap_or_else(|| EXECUTOR.with(Rc::clone))
953}
954
955/// Return the current time in milliseconds from the active executor's
956/// [`TimeSource`], or 0 if none is installed.
957pub(crate) fn current_time_ms() -> u64 {
958 current_executor_instance().borrow().now_ms()
959}
960
961// ---------------------------------------------------------------------------
962// Helpers — use thread_local EXECUTOR
963// ---------------------------------------------------------------------------
964
965/// Drain wakes that were buffered into [`PENDING_WAKES`] because the
966/// executor's `RefCell` was borrowed at the time [`TaskWaker::wake`]
967/// fired. Called at the end of every [`Executor::flush_instance`].
968fn drain_pending_wakes() {
969 PENDING_WAKES.with(|pw| {
970 let wakes = std::mem::take(&mut *pw.borrow_mut());
971 for (tid, slot_id, gen) in wakes {
972 let Some(exec) = lookup_executor(slot_id, gen) else {
973 continue;
974 };
975 // Use enqueue() for the stale-task-id safety check.
976 exec.borrow_mut().enqueue(tid);
977 let maybe_sched = exec.borrow_mut().try_schedule_flush();
978 if let Some(sched) = maybe_sched {
979 let sid = slot_id;
980 let g = gen;
981 sched.schedule(Box::new(move || {
982 if let Some(ex) = lookup_executor(sid, g) {
983 Executor::flush_instance(&ex);
984 }
985 }));
986 }
987 }
988 });
989}
990
991// ---------------------------------------------------------------------------
992// Flush
993// ---------------------------------------------------------------------------
994
995fn flush() {
996 EXECUTOR.with(Executor::flush_instance);
997}
998
999// ---------------------------------------------------------------------------
1000// Public API
1001// ---------------------------------------------------------------------------
1002
1003/// Check the deferred callback limit before pushing.
1004fn check_callback_limit(ex: &Executor) {
1005 if let Some(limit) = ex.max_deferred_callbacks {
1006 assert!(
1007 ex.deferred_callbacks.len() < limit,
1008 "deferred callback limit exceeded ({limit}). \
1009 This usually indicates an unbounded signal-set loop. \
1010 Increase the limit via set_max_deferred_callbacks() \
1011 or disable it with None."
1012 );
1013 }
1014}
1015
1016/// Set the platform flush scheduler and install the signal deferred-
1017/// callback hook.
1018///
1019/// Idempotent — subsequent calls are no-ops (the hook is installed via
1020/// [`std::sync::OnceLock`], so it fires exactly once per process).
1021///
1022/// # Threading constraint
1023///
1024/// The hook is **per-process** and routes signal notifications to the
1025/// executor that is "current" when the notification fires (see
1026/// [`with_executor`]). For single-threaded use (Wasm, CLI) this is
1027/// transparent. For multi-threaded SSR, enable the `ssr-tokio` feature
1028/// and call [`init_scope_store_tokio`](crate::init_scope_store_tokio).
1029/// See [`with_executor`] for the full routing contract.
1030pub fn init_flush_scheduler(sched: Rc<dyn ScheduleFlush>) {
1031 EXECUTOR.with(|exec| exec.borrow_mut().flush_scheduler = Some(sched));
1032 install_signal_hook_once();
1033}
1034
1035/// Install the hook that bridges `auralis_signal::Signal::set` to the
1036/// executor's deferred-callback queue.
1037///
1038/// Idempotent — safe to call multiple times.
1039fn install_signal_hook_once() {
1040 use std::sync::OnceLock;
1041 static INSTALLED: OnceLock<()> = OnceLock::new();
1042 INSTALLED.get_or_init(|| {
1043 auralis_signal::install_schedule_hook(Box::new(|cb: Box<dyn FnOnce()>| {
1044 // Prefer the current executor (set via `with_executor`) for
1045 // SSR multi-request isolation; fall back to the global one.
1046 if let Some(ex) = current_executor() {
1047 let maybe_sched = {
1048 let mut e = ex.borrow_mut();
1049 check_callback_limit(&e);
1050 e.deferred_callbacks.push(cb);
1051 if e.in_flush {
1052 None
1053 } else {
1054 e.try_schedule_flush()
1055 }
1056 };
1057 if let Some(sched) = maybe_sched {
1058 let ex2 = Rc::clone(&ex);
1059 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1060 }
1061 } else {
1062 EXECUTOR.with(|exec| {
1063 let maybe_sched = {
1064 let mut ex = exec.borrow_mut();
1065 check_callback_limit(&ex);
1066 ex.deferred_callbacks.push(cb);
1067 if ex.in_flush {
1068 None
1069 } else {
1070 ex.try_schedule_flush()
1071 }
1072 };
1073 if let Some(sched) = maybe_sched {
1074 sched.schedule(Box::new(flush));
1075 }
1076 });
1077 }
1078 }));
1079 });
1080}
1081
1082/// Set the platform time source used for time-budget accounting.
1083///
1084/// If no [`TimeSource`] is registered the executor runs every flush to
1085/// completion without yielding, which is acceptable for short-running
1086/// workloads but may cause frame drops in the browser.
1087pub fn init_time_source(ts: Rc<dyn TimeSource>) {
1088 EXECUTOR.with(|exec| exec.borrow_mut().time_source = Some(ts));
1089}
1090
1091/// Set the per-flush time budget on the **global** thread-local executor.
1092///
1093/// This does **not** affect instance executors created via
1094/// [`Executor::new_instance`] — those carry their own budget (default
1095/// 8 ms) and must be configured via [`Executor::set_time_budget`].
1096///
1097/// See [`Executor::set_time_budget`] for the full semantics.
1098pub fn set_global_time_budget(budget_ms: u64) {
1099 EXECUTOR.with(|exec| exec.borrow_mut().time_budget_ms = budget_ms);
1100}
1101
1102/// Set the deferred callback safety cap on the global executor.
1103///
1104/// See [`Executor::set_max_deferred_callbacks`] for details.
1105pub fn set_global_max_deferred_callbacks(limit: Option<usize>) {
1106 EXECUTOR.with(|exec| exec.borrow_mut().max_deferred_callbacks = limit);
1107}
1108
1109/// Register a global panic hook called when any globally-spawned
1110/// task panics.
1111///
1112/// See [`Executor::set_panic_hook`] for details.
1113pub fn set_panic_hook(hook: Rc<dyn Fn(PanicInfo)>) {
1114 EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = Some(hook));
1115}
1116
1117/// Remove the global panic hook, restoring the default silent
1118/// behaviour.
1119pub fn remove_panic_hook() {
1120 EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = None);
1121}
1122
1123/// Spawn a future on the global executor at low priority.
1124///
1125/// **Important:** [`init_flush_scheduler`] must be called before spawning
1126/// any tasks. Without a flush scheduler, spawned tasks will sit in the
1127/// queue indefinitely because the executor has no way to schedule a flush
1128/// cycle.
1129pub fn spawn_global(future: impl Future<Output = ()> + 'static) {
1130 spawn_global_with_priority(Priority::Low, future);
1131}
1132
1133/// Spawn a future on the global executor at the given priority.
1134pub fn spawn_global_with_priority(priority: Priority, future: impl Future<Output = ()> + 'static) {
1135 spawn_inner_on(&EXECUTOR.with(Rc::clone), Box::pin(future), priority, 0);
1136}
1137
1138/// Spawn a future on a specific executor and scope.
1139pub(crate) fn spawn_scoped_on(
1140 ex: &Rc<RefCell<Executor>>,
1141 priority: Priority,
1142 scope_id: u64,
1143 future: impl Future<Output = ()> + 'static,
1144) -> TaskId {
1145 spawn_inner_on(ex, Box::pin(future), priority, scope_id)
1146}
1147
1148fn spawn_inner_on(
1149 ex: &Rc<RefCell<Executor>>,
1150 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
1151 priority: Priority,
1152 scope_id: u64,
1153) -> TaskId {
1154 let (task_id, maybe_sched) = {
1155 let mut e = ex.borrow_mut();
1156 let task_id = e.allocate_id();
1157 e.tasks[task_id as usize] = Some(TaskState {
1158 future,
1159 priority,
1160 scope_id,
1161 timer_deadline: 0,
1162 total_poll_count: 0,
1163 last_poll_duration_us: 0,
1164 });
1165 e.enqueue(task_id);
1166 let sched = e.try_schedule_flush();
1167 (task_id, sched)
1168 };
1169 if let Some(sched) = maybe_sched {
1170 let ex2 = Rc::clone(ex);
1171 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1172 }
1173 task_id
1174}
1175
1176/// Enqueue all tasks belonging to `scope_id` on a given executor.
1177///
1178/// Used by [`TaskScope::resume`] to restart tasks after a suspend.
1179pub(crate) fn enqueue_scope_tasks_on(ex: &ExecutorRef, task_ids: &[TaskId]) {
1180 if task_ids.is_empty() {
1181 return;
1182 }
1183 let maybe_sched = {
1184 let mut e = ex.borrow_mut();
1185 for tid in task_ids {
1186 e.enqueue(*tid);
1187 }
1188 if e.in_flush {
1189 None
1190 } else {
1191 e.try_schedule_flush()
1192 }
1193 };
1194 if let Some(sched) = maybe_sched {
1195 let ex2 = Rc::clone(ex);
1196 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1197 }
1198}
1199/// Cancel all tasks belonging to `scope_id` on a specific executor.
1200pub(crate) fn cancel_scope_tasks_on(
1201 ex: &Rc<RefCell<Executor>>,
1202 task_ids: &[TaskId],
1203) -> Vec<Pin<Box<dyn Future<Output = ()>>>> {
1204 if task_ids.is_empty() {
1205 return Vec::new();
1206 }
1207
1208 let mut e = ex.borrow_mut();
1209 let mut dropped = Vec::with_capacity(task_ids.len());
1210
1211 // Collect timer deadlines before mutating.
1212 let mut timer_deadlines: Vec<(u64, TaskId)> = Vec::new();
1213 for &tid in task_ids {
1214 let idx = tid as usize;
1215 if idx < e.tasks.len() {
1216 if let Some(ref t) = e.tasks[idx] {
1217 if t.timer_deadline != 0 {
1218 timer_deadlines.push((t.timer_deadline, tid));
1219 }
1220 }
1221 }
1222 }
1223 for (dl, tid) in &timer_deadlines {
1224 e.cleanup_timer(*tid, *dl);
1225 }
1226
1227 // Cancel each task by id (direct lookup, no full-table scan).
1228 // Only push to free_slots for slots we actually took.
1229 for &tid in task_ids {
1230 let idx = tid as usize;
1231 if idx < e.tasks.len() {
1232 if let Some(state) = e.tasks[idx].take() {
1233 dropped.push(state.future);
1234 e.free_slots.push(tid);
1235 }
1236 }
1237 }
1238 e.free_slots.sort_unstable();
1239 e.free_slots.dedup();
1240
1241 // Filter queues to remove cancelled tasks.
1242 let high: Vec<TaskId> = e
1243 .high_queue
1244 .iter()
1245 .copied()
1246 .filter(|&id| {
1247 let idx = id as usize;
1248 idx < e.tasks.len() && e.tasks[idx].is_some()
1249 })
1250 .collect();
1251 e.high_queue.clear();
1252 e.high_queue.extend(high);
1253
1254 let low: Vec<TaskId> = e
1255 .low_queue
1256 .iter()
1257 .copied()
1258 .filter(|&id| {
1259 let idx = id as usize;
1260 idx < e.tasks.len() && e.tasks[idx].is_some()
1261 })
1262 .collect();
1263 e.low_queue.clear();
1264 e.low_queue.extend(low);
1265
1266 dropped
1267}
1268
1269/// Cancel a single task by its id, dropping its future and cleaning up
1270/// its timer if any. No-op if the task has already completed.
1271pub(crate) fn cancel_task(ex: &Rc<RefCell<Executor>>, task_id: TaskId) {
1272 let mut e = ex.borrow_mut();
1273 let idx = task_id as usize;
1274 if idx >= e.tasks.len() {
1275 return;
1276 }
1277 let deadline = e.tasks[idx].as_ref().map_or(0, |t| t.timer_deadline);
1278 if deadline != 0 {
1279 e.cleanup_timer(task_id, deadline);
1280 }
1281 let slot = e.tasks[idx].take();
1282 if slot.is_some() {
1283 e.free_slots.push(task_id);
1284 e.high_queue.retain(|&id| id != task_id);
1285 e.low_queue.retain(|&id| id != task_id);
1286 }
1287}
1288
1289/// Check whether a task slot is empty (task completed or was cancelled).
1290pub(crate) fn is_task_finished(ex: &Rc<RefCell<Executor>>, task_id: TaskId) -> bool {
1291 let e = ex.borrow();
1292 let idx = task_id as usize;
1293 idx >= e.tasks.len() || e.tasks[idx].is_none()
1294}
1295
1296// ---------------------------------------------------------------------------
1297// yield_now
1298// ---------------------------------------------------------------------------
1299
1300/// Return a [`Future`] that yields control back to the executor once.
1301#[must_use = "yield_now() does nothing unless awaited"]
1302pub fn yield_now() -> YieldNow {
1303 YieldNow { yielded: false }
1304}
1305
1306/// Future returned by [`yield_now`].
1307#[derive(Debug)]
1308#[must_use = "futures do nothing unless polled"]
1309pub struct YieldNow {
1310 yielded: bool,
1311}
1312
1313impl Future for YieldNow {
1314 type Output = ();
1315
1316 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1317 if self.yielded {
1318 Poll::Ready(())
1319 } else {
1320 self.yielded = true;
1321 cx.waker().wake_by_ref();
1322 Poll::Pending
1323 }
1324 }
1325}
1326
1327// ---------------------------------------------------------------------------
1328// schedule_callback — hook for auralis-signal's deferred callback model
1329// ---------------------------------------------------------------------------
1330
1331/// Schedule a closure to run at the start of the next executor flush.
1332///
1333/// Used internally by `auralis_signal` to defer subscriber callback
1334/// execution. The closure is drained before the main poll loop.
1335///
1336/// Routes to the current executor (via [`with_executor`]) when one is
1337/// active; falls back to the global thread-local executor.
1338pub fn schedule_callback(f: Box<dyn FnOnce()>) {
1339 let exec = current_executor_instance();
1340 let maybe_sched = {
1341 let mut ex = exec.borrow_mut();
1342 check_callback_limit(&ex);
1343 ex.deferred_callbacks.push(f);
1344 if ex.in_flush {
1345 None
1346 } else {
1347 ex.try_schedule_flush()
1348 }
1349 };
1350 if let Some(sched) = maybe_sched {
1351 let ex2 = Rc::clone(&exec);
1352 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1353 }
1354}
1355
1356// ---------------------------------------------------------------------------
1357// set_deferred
1358// ---------------------------------------------------------------------------
1359
1360/// Schedule a [`Signal::set`] call for the **next** executor flush.
1361///
1362/// Safe to call from inside [`Drop`] — the actual `signal.set(value)` is
1363/// deferred to a subsequent flush, avoiding re-entrant borrow panics.
1364///
1365/// Routes to the current executor (via [`with_executor`]) when one is
1366/// active; falls back to the global thread-local executor.
1367pub fn set_deferred<T: 'static>(signal: &Signal<T>, value: T) {
1368 let signal = signal.clone();
1369 let exec = current_executor_instance();
1370 let maybe_sched = {
1371 let mut ex = exec.borrow_mut();
1372 ex.deferred_ops.push(DeferredOp {
1373 f: Box::new(move || signal.set(value)),
1374 });
1375 ex.try_schedule_flush()
1376 };
1377 if let Some(sched) = maybe_sched {
1378 let ex2 = Rc::clone(&exec);
1379 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1380 }
1381}
1382
1383// ---------------------------------------------------------------------------
1384// Test / debug helpers
1385// ---------------------------------------------------------------------------
1386
1387/// Completely reset the global executor to a pristine state.
1388///
1389/// Clears all task slots, queues, deferred ops, flush/scheduler flags,
1390/// and injected [`ScheduleFlush`]/[`TimeSource`]. Call at the start
1391/// of every test to prevent cross-test state leakage.
1392///
1393/// Note that the signal schedule hook (installed by
1394/// [`init_flush_scheduler`] via [`std::sync::OnceLock`]) **persists**
1395/// across resets — the hook references the global [`EXECUTOR`]
1396/// thread-local, and this function re-initialises that same executor
1397/// in place rather than replacing it. This is correct behaviour:
1398/// after reset, signal notifications route to the freshly-cleared
1399/// global executor.
1400///
1401/// # Safety / usage
1402///
1403/// This function is intended **only** for testing. Calling it while
1404/// the executor is processing tasks will silently drop all live
1405/// futures and may cause panics or undefined behavior in running
1406/// application code.
1407pub fn reset_executor_for_test() {
1408 PENDING_WAKES.with(|pw| pw.borrow_mut().clear());
1409 SLOTS.with(|s| s.borrow_mut().clear());
1410 CURRENT_EXECUTOR.with(|c| *c.borrow_mut() = None);
1411 EXECUTOR.with(|exec| {
1412 let mut ex = exec.borrow_mut();
1413 ex.high_queue.clear();
1414 ex.low_queue.clear();
1415 ex.tasks.clear();
1416 ex.free_slots.clear();
1417 ex.next_task_id = 0;
1418 ex.is_flush_scheduled = false;
1419 ex.in_flush = false;
1420 ex.deferred_ops.clear();
1421 ex.deferred_callbacks.clear();
1422 ex.flush_scheduler = None;
1423 ex.time_source = None;
1424 ex.slot_id = 0;
1425 ex.generation = 0;
1426 ex.registered = false;
1427 });
1428 crate::scope::clear_scope_registry();
1429}
1430
1431#[cfg(any(test, feature = "debug"))]
1432pub(crate) fn debug_task_count() -> usize {
1433 EXECUTOR.with(|exec| exec.borrow().tasks.iter().filter(|t| t.is_some()).count())
1434}
1435
1436/// Return timing info for all active tasks: `task_id` → (`total_poll_count`, `last_poll_us`).
1437#[cfg(feature = "debug")]
1438pub(crate) fn debug_task_timing() -> std::collections::HashMap<TaskId, (u64, u64)> {
1439 EXECUTOR.with(|exec| {
1440 let ex = exec.borrow();
1441 let mut map = std::collections::HashMap::new();
1442 for (idx, slot) in ex.tasks.iter().enumerate() {
1443 if let Some(ref t) = slot {
1444 map.insert(idx as u64, (t.total_poll_count, t.last_poll_duration_us));
1445 }
1446 }
1447 map
1448 })
1449}
1450
1451/// Return a snapshot of all active tasks: `(task_id, priority, scope_id)`.
1452#[cfg(feature = "debug")]
1453pub(crate) fn debug_task_snapshot() -> Vec<(TaskId, Priority, u64)> {
1454 EXECUTOR.with(|exec| {
1455 let ex = exec.borrow();
1456 let mut snap = Vec::new();
1457 for (idx, slot) in ex.tasks.iter().enumerate() {
1458 if let Some(ref t) = slot {
1459 snap.push((idx as u64, t.priority, t.scope_id));
1460 }
1461 }
1462 snap
1463 })
1464}
1465
1466/// Return the set of task IDs currently in the ready queues.
1467#[cfg(feature = "debug")]
1468pub(crate) fn debug_queued_task_ids() -> Vec<TaskId> {
1469 EXECUTOR.with(|exec| {
1470 let ex = exec.borrow();
1471 let mut ids: Vec<TaskId> = ex
1472 .high_queue
1473 .iter()
1474 .chain(ex.low_queue.iter())
1475 .copied()
1476 .collect();
1477 ids.sort_unstable();
1478 ids.dedup();
1479 ids
1480 })
1481}
1482
1483/// Spawn a task without triggering an automatic flush.
1484/// Used in tests to batch multiple spawns before executing them.
1485#[cfg(test)]
1486pub(crate) fn spawn_no_auto_flush(
1487 priority: Priority,
1488 future: impl Future<Output = ()> + 'static,
1489) -> TaskId {
1490 EXECUTOR.with(|exec| {
1491 let mut ex = exec.borrow_mut();
1492 let task_id = ex.allocate_id();
1493 ex.tasks[task_id as usize] = Some(TaskState {
1494 future: Box::pin(future),
1495 priority,
1496 scope_id: 0,
1497 timer_deadline: 0,
1498 total_poll_count: 0,
1499 last_poll_duration_us: 0,
1500 });
1501 ex.enqueue(task_id);
1502 // Do NOT schedule flush.
1503 task_id
1504 })
1505}
1506
1507/// Run a manual flush cycle (for tests that need to control timing).
1508#[cfg(test)]
1509pub(crate) fn flush_all() {
1510 flush();
1511}