Skip to main content

auralis_task/
executor.rs

1//! Single-threaded executor with priority scheduling, time-budget
2//! awareness, and deferred-signal support.
3//!
4//! ## Architecture
5//!
6//! The executor is stored via a pluggable [`ExecutorStorage`] strategy
7//! (defaulting to a per-thread slot).  Before polling a task the future
8//! is **temporarily removed** so that the poll never holds an executor
9//! borrow — this allows nested spawns, wakes, and `set_deferred` calls
10//! without `RefCell` panics.
11//!
12//! The waker carries only a `task_id: u64`, making it trivially
13//! [`Send`] + [`Sync`] for [`Waker::from`].
14
15#![allow(clippy::cast_possible_truncation)]
16
17use std::cell::{Cell, RefCell};
18use std::collections::{BTreeMap, VecDeque};
19use std::future::Future;
20use std::pin::Pin;
21use std::rc::{Rc, Weak};
22use std::sync::Arc;
23use std::task::{Context, Poll, Wake, Waker};
24
25use auralis_signal::Signal;
26
27use crate::Priority;
28
29// ---------------------------------------------------------------------------
30// Types
31// ---------------------------------------------------------------------------
32
33type TaskId = u64;
34
35// ---------------------------------------------------------------------------
36// ScheduleFlush
37// ---------------------------------------------------------------------------
38
39/// Platform hook for scheduling a microtask callback.
40pub trait ScheduleFlush {
41    /// Request that `callback` runs at the next microtask boundary.
42    fn schedule(&self, callback: Box<dyn FnOnce()>);
43}
44
45/// A [`ScheduleFlush`] that fires the callback synchronously.
46///
47/// Makes the executor run-to-completion in unit tests without a browser
48/// event loop.
49#[cfg(test)]
50pub struct TestScheduleFlush;
51
52#[cfg(test)]
53impl ScheduleFlush for TestScheduleFlush {
54    fn schedule(&self, callback: Box<dyn FnOnce()>) {
55        callback();
56    }
57}
58
59// ---------------------------------------------------------------------------
60// TimeSource
61// ---------------------------------------------------------------------------
62
63/// High-resolution time source for the executor's time-budget
64/// accounting.
65///
66/// When registered via [`init_time_source`], the executor queries this
67/// before and after each task poll to decide whether it should yield
68/// control back to the host event loop (default budget: 8 ms).
69///
70/// In Wasm environments the implementation typically delegates to
71/// `performance.now()`.  If no [`TimeSource`] is registered the time
72/// budget check is a no-op and the executor runs tasks until the
73/// queues are drained.
74pub trait TimeSource {
75    /// Return the current time in milliseconds.
76    fn now_ms(&self) -> u64;
77}
78
79/// A [`TimeSource`] whose value is explicitly controlled by the test.
80///
81/// Use [`set`](TestTimeSource::set) or [`advance`](TestTimeSource::advance)
82/// to simulate the passage of time during a flush cycle.
83#[cfg(test)]
84pub struct TestTimeSource {
85    now: std::cell::Cell<u64>,
86}
87
88#[cfg(test)]
89impl TestTimeSource {
90    /// Create a new [`TestTimeSource`] with the given initial time.
91    #[must_use]
92    pub fn new(initial_ms: u64) -> Self {
93        Self {
94            now: std::cell::Cell::new(initial_ms),
95        }
96    }
97
98    /// Set the current time to `ms` milliseconds.
99    pub fn set(&self, ms: u64) {
100        self.now.set(ms);
101    }
102
103    /// Advance the current time by `ms` milliseconds.
104    pub fn advance(&self, ms: u64) {
105        self.now.set(self.now.get() + ms);
106    }
107}
108
109#[cfg(test)]
110impl TimeSource for TestTimeSource {
111    fn now_ms(&self) -> u64 {
112        self.now.get()
113    }
114}
115
116// ---------------------------------------------------------------------------
117// TaskWaker — routes wakes to the correct executor via a slot table.
118//
119// Waker::from requires Send + Sync + 'static, so the waker cannot hold
120// an Rc<RefCell<Executor>>.  Instead it stores a slot index + generation
121// number.  The SLOTS thread_local maps (index, generation) → Weak<Executor>.
122// On wake, the generation is validated before the weak pointer is upgraded.
123// Dead slots are reclaimed when new executors are registered.
124// ---------------------------------------------------------------------------
125
126/// A registered executor slot.  The `generation` counter distinguishes
127/// between successive executors that occupy the same slot index (e.g.
128/// after the previous one was dropped and a new one recycles the slot).
129struct Slot {
130    weak: Weak<RefCell<Executor>>,
131    /// Incremented (wrapping) every time this slot is reused.
132    /// A [`TaskWaker`] must present the generation it was created with;
133    /// a mismatch means the waker is stale and is silently ignored.
134    generation: u64,
135}
136
137thread_local! {
138    /// Slot 0 is reserved for the global executor.  Instance executors
139    /// occupy subsequent slots.  Dead slots (Weak::upgrade returns None)
140    /// are recycled in [`register_executor`].
141    static SLOTS: RefCell<Vec<Slot>> = const { RefCell::new(Vec::new()) };
142}
143
144/// Register an executor in the slot table, returning the assigned
145/// (`slot_id`, `generation`) pair.  Dead slots are recycled in-place;
146/// if no dead slot is found a new entry is appended.
147fn register_executor(weak: Weak<RefCell<Executor>>) -> (u64, u64) {
148    SLOTS.with(|slots| {
149        let mut slots = slots.borrow_mut();
150        for (i, slot) in slots.iter_mut().enumerate() {
151            if slot.weak.upgrade().is_none() {
152                slot.weak = weak;
153                // Wrapping is safe: 2^64 reuses of a single slot
154                // would take ~10^14 years at 1 reuse/μs.
155                slot.generation = slot.generation.wrapping_add(1);
156                return (i as u64, slot.generation);
157            }
158        }
159        let gen = 0;
160        slots.push(Slot {
161            weak,
162            generation: gen,
163        });
164        ((slots.len() - 1) as u64, gen)
165    })
166}
167
168/// Look up an executor by slot id, validating the generation.
169fn lookup_executor(slot_id: u64, generation: u64) -> Option<Rc<RefCell<Executor>>> {
170    SLOTS.with(|slots| {
171        let slots = slots.borrow();
172        let slot = slots.get(slot_id as usize)?;
173        if slot.generation != generation {
174            return None;
175        }
176        slot.weak.upgrade()
177    })
178}
179
180struct TaskWaker {
181    task_id: TaskId,
182    priority: Priority,
183    slot_id: u64,
184    generation: u64,
185}
186
187impl Wake for TaskWaker {
188    fn wake(self: Arc<Self>) {
189        let Some(exec) = lookup_executor(self.slot_id, self.generation) else {
190            return;
191        };
192        let maybe_sched = if let Ok(mut ex) = exec.try_borrow_mut() {
193            match self.priority {
194                Priority::High => ex.high_queue.push_back(self.task_id),
195                Priority::Low => ex.low_queue.push_back(self.task_id),
196            }
197            if ex.in_flush {
198                None
199            } else {
200                ex.try_schedule_flush()
201            }
202        } else {
203            PENDING_WAKES.with(|pw| {
204                pw.borrow_mut()
205                    .push((self.task_id, self.slot_id, self.generation));
206            });
207            None
208        };
209        if let Some(sched) = maybe_sched {
210            let sid = self.slot_id;
211            let gen = self.generation;
212            sched.schedule(Box::new(move || {
213                if let Some(ex) = lookup_executor(sid, gen) {
214                    Executor::flush_instance(&ex);
215                }
216            }));
217        }
218    }
219}
220
221// ---------------------------------------------------------------------------
222// TaskState
223// ---------------------------------------------------------------------------
224
225struct TaskState {
226    future: Pin<Box<dyn Future<Output = ()> + 'static>>,
227    priority: Priority,
228    scope_id: u64,
229    /// Key in [`Executor::timers`] for this task's pending sleep,
230    /// or 0 if the task is not waiting on a timer.
231    timer_deadline: u64,
232    /// Number of times this task has been polled.
233    total_poll_count: u64,
234    /// Microseconds spent in the most recent poll.
235    last_poll_duration_us: u64,
236}
237
238// ---------------------------------------------------------------------------
239// Executor
240// ---------------------------------------------------------------------------
241
242/// Information about a task panic, passed to the user-registered
243/// [`set_panic_hook`].
244#[derive(Debug)]
245pub struct PanicInfo {
246    /// The executor-assigned task id.
247    pub task_id: u64,
248    /// The scope that owned the task (0 for global tasks).
249    pub scope_id: u64,
250    /// The boxed panic payload.
251    pub payload: Box<dyn std::any::Any + Send>,
252}
253
254/// A single-threaded async task executor with priority queues.
255///
256/// Each [`Executor`] manages its own task slots, ready queues, and
257/// deferred callback buffers.  Use [`Executor::new_instance`] to create
258/// an isolated executor (e.g. per SSR request), or use the global
259/// thread-local executor via [`spawn_global`](crate::spawn_global).
260pub struct Executor {
261    high_queue: VecDeque<TaskId>,
262    low_queue: VecDeque<TaskId>,
263    tasks: Vec<Option<TaskState>>,
264    free_slots: Vec<TaskId>,
265    next_task_id: TaskId,
266    is_flush_scheduled: bool,
267    in_flush: bool,
268    deferred_ops: Vec<DeferredOp>,
269    /// Callbacks pushed by `Signal::set` via the schedule hook.
270    /// Drained at the start of every flush before polling tasks.
271    ///
272    /// Unbounded by design — in a single-threaded Wasm context, a tight
273    /// loop of signal sets will block the UI thread anyway, so adding a
274    /// capacity limit wouldn't improve the situation.  SSR / multi-tenant
275    /// users should ensure that application code doesn't produce
276    /// unbounded signal churn within a single request.
277    deferred_callbacks: Vec<Box<dyn FnOnce()>>,
278    flush_scheduler: Option<Rc<dyn ScheduleFlush>>,
279    time_source: Option<Rc<dyn TimeSource>>,
280    /// Maximum milliseconds to spend inside a single flush before
281    /// yielding back to the host event loop.  Default: 8 ms.
282    time_budget_ms: u64,
283    /// Optional cap on the number of deferred signal callbacks that can
284    /// accumulate between two flushes.  Exceeding this limit triggers a
285    /// panic — useful as a safety net in SSR / multi-tenant deployments
286    /// where a runaway signal loop could OOM the process.
287    ///
288    /// Default: `None` (no limit).
289    max_deferred_callbacks: Option<usize>,
290    /// Optional hook invoked when a spawned task panics.
291    panic_hook: Option<Rc<dyn Fn(PanicInfo)>>,
292    /// Timer queue: map from deadline (ms) to task ids that should be
293    /// woken when that deadline expires.  Processed at the start of
294    /// every flush.
295    timers: BTreeMap<u64, Vec<TaskId>>,
296    /// Slot index and generation in [`SLOTS`] for routing wakes back
297    /// to this executor.  Set by [`new_instance`] or lazily for the
298    /// global executor.
299    slot_id: u64,
300    generation: u64,
301    /// Whether this executor has been registered in [`SLOTS`].
302    registered: bool,
303}
304
305// Set by the executor before polling a task, cleared afterward.
306// Lets futures discover their task id without threading it through
307// layers of combinators.
308thread_local! {
309    static CURRENT_POLLING_TASK: Cell<Option<TaskId>> = const { Cell::new(None) };
310}
311
312pub(crate) fn with_current_polling_task<R>(f: impl FnOnce(Option<TaskId>) -> R) -> R {
313    CURRENT_POLLING_TASK.with(|c| f(c.get()))
314}
315
316struct DeferredOp {
317    f: Box<dyn FnOnce()>,
318}
319
320impl Executor {
321    fn new() -> Self {
322        Self {
323            high_queue: VecDeque::new(),
324            low_queue: VecDeque::new(),
325            tasks: Vec::new(),
326            free_slots: Vec::new(),
327            next_task_id: 0,
328            is_flush_scheduled: false,
329            in_flush: false,
330            deferred_ops: Vec::new(),
331            deferred_callbacks: Vec::new(),
332            flush_scheduler: None,
333            time_source: None,
334            time_budget_ms: 8,
335            max_deferred_callbacks: None,
336            panic_hook: None,
337            timers: BTreeMap::new(),
338            slot_id: 0,
339            generation: 0,
340            registered: false,
341        }
342    }
343
344    fn allocate_id(&mut self) -> TaskId {
345        if let Some(id) = self.free_slots.pop() {
346            return id;
347        }
348        let id = self.next_task_id;
349        self.next_task_id += 1;
350        self.tasks.push(None);
351        id
352    }
353
354    /// Release a task slot back to the free list.
355    ///
356    /// **Caller must ensure** that `task_id` has not already been freed
357    /// (e.g. via [`cancel_task`] or [`cancel_scope_tasks_on`]).  This
358    /// method unconditionally pushes to `free_slots` — pushing the same
359    /// id twice would cause [`allocate_id`] to hand it out twice.
360    fn free_slot(&mut self, task_id: TaskId) {
361        // Clean up any pending timer for this task so a recycled
362        // task ID is not spuriously woken by an old deadline.
363        // This works when the slot is still occupied (scope cancel
364        // path).  For normal completion (Poll::Ready), the slot is
365        // already None and the caller must call cleanup_timer first.
366        if let Some(Some(ref t)) = self.tasks.get(task_id as usize) {
367            if t.timer_deadline != 0 {
368                self.cleanup_timer(task_id, t.timer_deadline);
369            }
370        }
371        self.tasks[task_id as usize] = None;
372        self.free_slots.push(task_id);
373    }
374
375    /// Remove a timer entry for `task_id` from the timer map.
376    fn cleanup_timer(&mut self, task_id: TaskId, deadline: u64) {
377        if let Some(tids) = self.timers.get_mut(&deadline) {
378            tids.retain(|id| *id != task_id);
379            if tids.is_empty() {
380                self.timers.remove(&deadline);
381            }
382        }
383    }
384
385    fn enqueue(&mut self, task_id: TaskId) {
386        let priority = match self.tasks.get(task_id as usize).and_then(Option::as_ref) {
387            Some(t) => t.priority,
388            None => return,
389        };
390        match priority {
391            Priority::High => self.high_queue.push_back(task_id),
392            Priority::Low => self.low_queue.push_back(task_id),
393        }
394    }
395
396    fn dequeue(&mut self) -> Option<TaskId> {
397        self.high_queue
398            .pop_front()
399            .or_else(|| self.low_queue.pop_front())
400    }
401
402    /// Mark that a flush is needed and return the scheduler if one is
403    /// registered.  The caller **must** invoke the scheduler **after**
404    /// releasing the executor borrow.
405    fn try_schedule_flush(&mut self) -> Option<Rc<dyn ScheduleFlush>> {
406        if self.is_flush_scheduled {
407            return None;
408        }
409        self.is_flush_scheduled = true;
410        self.flush_scheduler.clone()
411    }
412
413    /// Return the current time in ms, or 0 if no [`TimeSource`] is
414    /// registered.  When this returns 0 the time-budget check is
415    /// effectively a no-op.
416    pub(crate) fn now_ms(&self) -> u64 {
417        self.time_source.as_ref().map_or(0, |ts| ts.now_ms())
418    }
419
420    /// Return the number of currently active (not-yet-completed) tasks.
421    ///
422    /// Used by streaming SSR to determine whether the stream should
423    /// wait for more work or terminate.
424    #[must_use]
425    pub fn active_task_count(&self) -> usize {
426        self.tasks.iter().filter(|t| t.is_some()).count()
427    }
428}
429
430// ---------------------------------------------------------------------------
431// Thread-local globals (default storage)
432// ---------------------------------------------------------------------------
433
434thread_local! {
435    static EXECUTOR: Rc<RefCell<Executor>> = Rc::new(RefCell::new(Executor::new()));
436    static PENDING_WAKES: RefCell<Vec<(TaskId, u64, u64)>> =
437        const { RefCell::new(Vec::new()) };
438}
439
440/// Ensure the global executor is registered in slot 0 (lazy, idempotent).
441/// Returns (`slot_id`, `generation`) for the global executor.
442fn ensure_global_registered() -> (u64, u64) {
443    SLOTS.with(|slots| {
444        let mut slots = slots.borrow_mut();
445        if slots.is_empty() {
446            let weak = EXECUTOR.with(Rc::downgrade);
447            slots.push(Slot {
448                weak,
449                generation: 0,
450            });
451        } else {
452            // Verify slot 0 still holds the global executor.
453            let global = EXECUTOR.with(Rc::clone);
454            let is_global = slots[0]
455                .weak
456                .upgrade()
457                .is_some_and(|ex| Rc::ptr_eq(&ex, &global));
458            if !is_global {
459                slots[0] = Slot {
460                    weak: Rc::downgrade(&global),
461                    generation: slots[0].generation.wrapping_add(1),
462                };
463            }
464        }
465        // Mark the global executor as registered so flush_instance
466        // doesn't call this function again on every flush.
467        EXECUTOR.with(|ex| {
468            let mut e = ex.borrow_mut();
469            e.slot_id = 0;
470            e.generation = slots[0].generation;
471            e.registered = true;
472        });
473        let gen = slots[0].generation;
474        (0, gen)
475    })
476}
477
478// ---------------------------------------------------------------------------
479// Executor instance methods (for isolated executors, e.g. SSR)
480// ---------------------------------------------------------------------------
481
482impl Executor {
483    /// Create a new isolated executor, wrapped for shared access.
484    ///
485    /// The returned executor is independent of the global thread-local
486    /// executor.  Use [`with_executor`] to make it the current executor
487    /// for the duration of a closure, so that spawned tasks and signal
488    /// callbacks are routed to it.
489    #[must_use]
490    pub fn new_instance() -> Rc<RefCell<Executor>> {
491        let ex = Rc::new(RefCell::new(Executor::new()));
492        // Register in the slot table so TaskWaker can find this executor.
493        let (slot_id, generation) = register_executor(Rc::downgrade(&ex));
494        {
495            let mut e = ex.borrow_mut();
496            e.slot_id = slot_id;
497            e.generation = generation;
498            e.registered = true;
499        }
500        ex
501    }
502
503    /// Install a flush scheduler on this executor instance.
504    pub fn install_flush_scheduler(ex: &Rc<RefCell<Executor>>, sched: Rc<dyn ScheduleFlush>) {
505        ex.borrow_mut().flush_scheduler = Some(sched);
506    }
507
508    /// Install a time source on this executor instance.
509    pub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>) {
510        ex.borrow_mut().time_source = Some(ts);
511    }
512
513    /// Set the maximum time (in milliseconds) a single flush may spend
514    /// before yielding back to the host event loop.
515    ///
516    /// The default is 8 ms (~120 fps frame budget, leaving time for the
517    /// browser to render between flushes).  Set to `u64::MAX` to disable
518    /// time-budget yielding (flush runs to completion).
519    ///
520    /// # Semantics
521    ///
522    /// The budget is checked **between** task polls — the currently
523    /// executing task is never interrupted.  When the budget is exhausted
524    /// the executor sets `in_flush = false` and schedules a follow-up
525    /// flush so the remaining ready tasks will be polled on the next
526    /// microtask tick.  This is cooperative (`.await`-bound) yielding,
527    /// not preemptive.
528    ///
529    /// This affects **this executor only**.  For the global thread-local
530    /// executor use [`set_global_time_budget`].
531    pub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64) {
532        ex.borrow_mut().time_budget_ms = budget_ms;
533    }
534
535    /// Set a safety cap on the deferred signal callback queue.
536    ///
537    /// When set to `Some(n)`, the executor will panic if more than `n`
538    /// deferred callbacks accumulate between two flush cycles.  This is a
539    /// safety net for SSR / multi-tenant servers where a runaway signal
540    /// loop could exhaust memory — in a single-threaded Wasm context,
541    /// unbounded accumulation is acceptable because it blocks the UI
542    /// thread anyway.
543    ///
544    /// Default: `None` (no limit).
545    pub fn set_max_deferred_callbacks(ex: &Rc<RefCell<Executor>>, limit: Option<usize>) {
546        ex.borrow_mut().max_deferred_callbacks = limit;
547    }
548
549    /// Register a callback invoked whenever a spawned task panics.
550    ///
551    /// The default is no hook — panicking tasks are silently removed
552    /// from the executor (the same behaviour as a task returning
553    /// `Poll::Ready(())`).
554    ///
555    /// # Example
556    ///
557    /// ```rust,ignore
558    /// Executor::set_panic_hook(&ex, Rc::new(|info| {
559    ///     eprintln!("task {} in scope {} panicked", info.task_id, info.scope_id);
560    /// }));
561    /// ```
562    pub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>) {
563        ex.borrow_mut().panic_hook = Some(hook);
564    }
565
566    /// Register a timer: when `now_ms() >= deadline_ms`, enqueue
567    /// `task_id` so it gets polled on the next flush.
568    pub(crate) fn schedule_timer(ex: &Rc<RefCell<Executor>>, deadline_ms: u64, task_id: TaskId) {
569        let mut e = ex.borrow_mut();
570        // If this task already has a pending timer (e.g. previous SleepFuture
571        // was dropped via select!), clean up the old entry so the timer map
572        // doesn't accumulate stale deadlines.
573        let old_deadline = e
574            .tasks
575            .get(task_id as usize)
576            .and_then(Option::as_ref)
577            .map_or(0, |t| t.timer_deadline);
578        if old_deadline != 0 {
579            e.cleanup_timer(task_id, old_deadline);
580        }
581        e.timers.entry(deadline_ms).or_default().push(task_id);
582        // Set the reverse index so cancel_scope_tasks can find this entry.
583        if let Some(Some(ref mut t)) = e.tasks.get_mut(task_id as usize) {
584            t.timer_deadline = deadline_ms;
585        }
586        // Request a flush so the timer is checked.
587        e.is_flush_scheduled = false;
588        let maybe_sched = e.try_schedule_flush();
589        drop(e);
590        if let Some(sched) = maybe_sched {
591            let ex2 = Rc::clone(ex);
592            sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
593        }
594    }
595
596    /// Spawn a future on this executor instance.
597    pub fn spawn(ex: &Rc<RefCell<Executor>>, future: impl Future<Output = ()> + 'static) {
598        let maybe_sched = {
599            let mut e = ex.borrow_mut();
600            let tid = e.allocate_id();
601            e.tasks[tid as usize] = Some(TaskState {
602                future: Box::pin(future),
603                priority: Priority::Low,
604                scope_id: 0,
605                timer_deadline: 0,
606                total_poll_count: 0,
607                last_poll_duration_us: 0,
608            });
609            e.enqueue(tid);
610            e.try_schedule_flush()
611        };
612        if let Some(sched) = maybe_sched {
613            let ex2 = Rc::clone(ex);
614            sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
615        }
616    }
617
618    /// Run a full flush cycle on this executor instance.
619    ///
620    /// Mirrors the global flush cycle but operates on an
621    /// isolated executor (used for SSR).  Includes all the same
622    /// protections: `catch_unwind`, suspend checks, time-budget
623    /// yielding, and callback-drain budget.
624    #[allow(clippy::too_many_lines)]
625    pub fn flush_instance(ex: &Rc<RefCell<Executor>>) {
626        // Guard against re-entrant flushes.
627        {
628            let mut e = ex.borrow_mut();
629            if e.in_flush {
630                #[cfg(debug_assertions)]
631                {
632                    eprintln!(
633                        "[auralis-task] WARNING: Executor::flush_instance called \
634                         re-entrantly (already inside a flush). This is a no-op. \
635                         Check for nested flush() calls in signal callbacks or \
636                         ScheduleFlush implementations."
637                    );
638                }
639                return;
640            }
641            e.in_flush = true;
642        }
643
644        // Set this executor as the current one so that TaskWaker
645        // (which cannot hold an Rc) can discover it via thread-local.
646        // Restore on scope exit (including early returns for time-budget
647        // yielding and re-entrancy).
648        let prev_executor = CURRENT_EXECUTOR.with(|c| c.borrow_mut().replace(Rc::clone(ex)));
649        let _restore = RestoreExecutor(prev_executor);
650
651        // Step 0: drain expired timers.
652        {
653            let mut e = ex.borrow_mut();
654            let now = e.now_ms();
655            // When no TimeSource is registered (now == 0), expire all
656            // timers — they've already been woken via wake_by_ref and
657            // just need to be re-polled.
658            if now == 0 {
659                for (_, tasks) in std::mem::take(&mut e.timers) {
660                    for tid in tasks {
661                        // Clear the reverse index since the timer has fired.
662                        if let Some(Some(ref mut t)) = e.tasks.get_mut(tid as usize) {
663                            t.timer_deadline = 0;
664                        }
665                        e.enqueue(tid);
666                    }
667                }
668            } else {
669                let expired: Vec<u64> =
670                    e.timers.keys().copied().take_while(|&d| d <= now).collect();
671                for deadline in expired {
672                    if let Some(tasks) = e.timers.remove(&deadline) {
673                        for tid in tasks {
674                            if let Some(Some(ref mut t)) = e.tasks.get_mut(tid as usize) {
675                                t.timer_deadline = 0;
676                            }
677                            e.enqueue(tid);
678                        }
679                    }
680                }
681            }
682        }
683
684        // Step 1: deferred ops.
685        let deferred = std::mem::take(&mut ex.borrow_mut().deferred_ops);
686        for op in deferred {
687            (op.f)();
688        }
689
690        // Steps 2+3 may need to re-run if task polling queues new
691        // signal callbacks (re-entrant cross-scope propagation).
692        for _pass in 0..3_u8 {
693            {
694                let cb_start = ex.borrow().now_ms();
695                loop {
696                    let callbacks = std::mem::take(&mut ex.borrow_mut().deferred_callbacks);
697                    if callbacks.is_empty() {
698                        break;
699                    }
700                    for cb in callbacks {
701                        // Isolate each callback so a panic in one subscriber
702                        // doesn't block the remaining notifications or wedge
703                        // the executor (in_flush stays true on unwind).
704                        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(cb));
705                    }
706                    if ex.borrow().now_ms().saturating_sub(cb_start) >= ex.borrow().time_budget_ms {
707                        if !ex.borrow().deferred_callbacks.is_empty() {
708                            let (sched, ex2) = {
709                                let mut e = ex.borrow_mut();
710                                e.in_flush = false;
711                                e.is_flush_scheduled = false;
712                                (e.try_schedule_flush(), Rc::clone(ex))
713                            };
714                            if let Some(sched) = sched {
715                                sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
716                            }
717                            return;
718                        }
719                        break;
720                    }
721                }
722            }
723
724            // Step 3: main poll loop with time-budget check.
725            let poll_start = ex.borrow().now_ms();
726            loop {
727                let task_id = ex.borrow_mut().dequeue();
728                let Some(tid) = task_id else {
729                    let mut e = ex.borrow_mut();
730                    e.is_flush_scheduled = false;
731                    e.in_flush = false;
732                    break;
733                };
734
735                // Take the task out so the poll doesn't hold an executor borrow.
736                let maybe_state = ex.borrow_mut().tasks[tid as usize].take();
737                if let Some(mut state) = maybe_state {
738                    let priority = state.priority;
739                    let scope_id = state.scope_id;
740
741                    // Check if the owning scope is suspended.
742                    let scope = crate::scope::find_scope(scope_id);
743                    if let Some(ref s) = scope {
744                        if s.is_suspended() {
745                            let mut e = ex.borrow_mut();
746                            if e.tasks[tid as usize].is_none() {
747                                e.tasks[tid as usize] = Some(state);
748                            }
749                            continue;
750                        }
751                    }
752
753                    // Ensure the executor is registered in the slot table.
754                    // Must not call ensure_global_registered while holding
755                    // a borrow on ex (it borrows the global EXECUTOR).
756                    let (slot_id, gen) = {
757                        let e = ex.borrow();
758                        if e.registered {
759                            (e.slot_id, e.generation)
760                        } else {
761                            drop(e);
762                            ensure_global_registered()
763                        }
764                    };
765                    let waker = Waker::from(Arc::new(TaskWaker {
766                        task_id: tid,
767                        priority,
768                        slot_id,
769                        generation: gen,
770                    }));
771                    let mut cx = Context::from_waker(&waker);
772
773                    // Inject owning scope.
774                    let prev_scope = crate::scope::get_scope_direct();
775                    if scope.is_some() {
776                        crate::scope::set_scope_direct(scope);
777                    }
778
779                    // Let futures discover their task id (used by timer::sleep).
780                    // Save and restore so that a nested flush (sync scheduler)
781                    // doesn't leave the outer task without its id afterward.
782                    let prev_polling = CURRENT_POLLING_TASK.with(|c| c.replace(Some(tid)));
783
784                    // Task isolation + timing.
785                    state.total_poll_count = state.total_poll_count.wrapping_add(1);
786                    let t0 = auralis_signal::now_us();
787                    let result: Result<Poll<()>, Box<dyn std::any::Any + Send>> =
788                        std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
789                            state.future.as_mut().poll(&mut cx)
790                        }));
791                    let elapsed = auralis_signal::now_us().saturating_sub(t0);
792
793                    CURRENT_POLLING_TASK.with(|c| c.set(prev_polling));
794                    crate::scope::set_scope_direct(prev_scope);
795
796                    // Extract timer_deadline before state is dropped, so
797                    // we can clean up the timer entry (free_slot can't
798                    // read it because the slot is already None).
799                    let timer_dl = state.timer_deadline;
800
801                    state.last_poll_duration_us = elapsed;
802                    match result {
803                        Ok(Poll::Ready(())) => {
804                            if timer_dl != 0 {
805                                ex.borrow_mut().cleanup_timer(tid, timer_dl);
806                            }
807                            ex.borrow_mut().free_slot(tid);
808                        }
809                        Err(payload) => {
810                            if timer_dl != 0 {
811                                ex.borrow_mut().cleanup_timer(tid, timer_dl);
812                            }
813                            let hook = ex.borrow().panic_hook.clone();
814                            if let Some(h) = hook {
815                                h(PanicInfo {
816                                    task_id: tid,
817                                    scope_id,
818                                    payload,
819                                });
820                            }
821                            ex.borrow_mut().free_slot(tid);
822                        }
823                        Ok(Poll::Pending) => {
824                            let mut e = ex.borrow_mut();
825                            if e.tasks[tid as usize].is_none() {
826                                e.tasks[tid as usize] = Some(state);
827                            }
828                        }
829                    }
830                }
831
832                // Time budget check.
833                {
834                    let elapsed = ex.borrow().now_ms().saturating_sub(poll_start);
835                    if elapsed >= ex.borrow().time_budget_ms {
836                        let (maybe_sched, ex_clone) = {
837                            let mut e = ex.borrow_mut();
838                            e.is_flush_scheduled = false;
839                            e.in_flush = false;
840                            let sched = if !e.high_queue.is_empty() || !e.low_queue.is_empty() {
841                                e.try_schedule_flush()
842                            } else {
843                                None
844                            };
845                            (sched, Rc::clone(ex))
846                        };
847                        if let Some(sched) = maybe_sched {
848                            sched.schedule(Box::new(move || Self::flush_instance(&ex_clone)));
849                        }
850                        break;
851                    }
852                }
853            }
854
855            // Drain any wakes that were buffered while the executor RefCell
856            // was borrowed (PENDING_WAKES fallback in TaskWaker::wake).
857            drain_pending_wakes();
858
859            // Continue only if signal callbacks accumulated during
860            // polling and there are tasks to wake.
861            if ex.borrow().deferred_callbacks.is_empty() {
862                break;
863            }
864        } // end passes loop
865    }
866}
867
868// ---------------------------------------------------------------------------
869// Current-executor storage — injectable, defaults to thread-local
870// ---------------------------------------------------------------------------
871
872pub(crate) type ExecutorRef = Rc<RefCell<Executor>>;
873
874/// RAII guard that restores the previous executor when dropped.
875struct RestoreExecutor(Option<ExecutorRef>);
876
877impl Drop for RestoreExecutor {
878    fn drop(&mut self) {
879        CURRENT_EXECUTOR.with(|c| {
880            *c.borrow_mut() = self.0.take();
881        });
882    }
883}
884
885thread_local! {
886    static CURRENT_EXECUTOR: RefCell<Option<ExecutorRef>> = const { RefCell::new(None) };
887}
888
889/// Run `f` with `ex` set as the current executor.
890///
891/// Signal callbacks and `spawn_global` calls inside `f` will be routed
892/// to `ex` instead of the global thread-local executor.  Restores the
893/// previous executor afterward.
894///
895/// # Signal routing constraints
896///
897/// Auralis uses a **single global schedule hook** (installed once by the
898/// first call to [`init_flush_scheduler`]) that decides where signal
899/// notifications land by checking the current executor **at the time the
900/// notification fires**, not at the time `Signal::set` is called.
901///
902/// This design implies two hard requirements for multi-instance users:
903///
904/// 1. **`init_flush_scheduler` must be called at least once** — without
905///    it, `Signal::set` falls back to synchronous callback execution,
906///    which breaks the deferred-notification model and can cause
907///    re-entrant borrow panics.
908/// 2. **The instance executor must still be "current" when the flush
909///    runs** — if `with_executor` has already exited, deferred callbacks
910///    from signals set inside `f` will be routed to the global executor
911///    (or synchronously if no global hook is installed).
912///
913/// For the typical single-threaded case (Wasm, game loop, CLI), both
914/// requirements are satisfied trivially: call `init_flush_scheduler`
915/// once at startup and never use `with_executor`.  For SSR / multi-tenant
916/// servers, ensure that `with_executor` wraps the entire request
917/// lifecycle — from signal creation through the final flush.
918///
919/// # Example
920///
921/// ```rust,ignore
922/// use auralis_task::Executor;
923///
924/// let ex = Executor::new_instance();
925/// Executor::install_flush_scheduler(&ex, my_scheduler);
926/// auralis_task::with_executor(&ex, || {
927///     // Signal notifications and task spawns here go to `ex`.
928/// });
929/// ```
930pub fn with_executor<R>(ex: &ExecutorRef, f: impl FnOnce() -> R) -> R {
931    CURRENT_EXECUTOR.with(|exec| {
932        let prev = exec.borrow_mut().replace(Rc::clone(ex));
933        let result = f();
934        *exec.borrow_mut() = prev;
935        result
936    })
937}
938
939/// Return the current executor, if any.
940///
941/// If no executor has been set via [`with_executor`], returns `None` —
942/// callers should fall back to the global thread-local executor.
943fn current_executor() -> Option<ExecutorRef> {
944    CURRENT_EXECUTOR.with(|exec| exec.borrow().clone())
945}
946
947/// Return the currently active executor instance.
948///
949/// If [`with_executor`] was used to set an instance executor, returns
950/// that; otherwise returns the global thread-local executor.
951pub(crate) fn current_executor_instance() -> ExecutorRef {
952    current_executor().unwrap_or_else(|| EXECUTOR.with(Rc::clone))
953}
954
955/// Return the current time in milliseconds from the active executor's
956/// [`TimeSource`], or 0 if none is installed.
957pub(crate) fn current_time_ms() -> u64 {
958    current_executor_instance().borrow().now_ms()
959}
960
961// ---------------------------------------------------------------------------
962// Helpers — use thread_local EXECUTOR
963// ---------------------------------------------------------------------------
964
965/// Drain wakes that were buffered into [`PENDING_WAKES`] because the
966/// executor's `RefCell` was borrowed at the time [`TaskWaker::wake`]
967/// fired.  Called at the end of every [`Executor::flush_instance`].
968fn drain_pending_wakes() {
969    PENDING_WAKES.with(|pw| {
970        let wakes = std::mem::take(&mut *pw.borrow_mut());
971        for (tid, slot_id, gen) in wakes {
972            let Some(exec) = lookup_executor(slot_id, gen) else {
973                continue;
974            };
975            // Use enqueue() for the stale-task-id safety check.
976            exec.borrow_mut().enqueue(tid);
977            let maybe_sched = exec.borrow_mut().try_schedule_flush();
978            if let Some(sched) = maybe_sched {
979                let sid = slot_id;
980                let g = gen;
981                sched.schedule(Box::new(move || {
982                    if let Some(ex) = lookup_executor(sid, g) {
983                        Executor::flush_instance(&ex);
984                    }
985                }));
986            }
987        }
988    });
989}
990
991// ---------------------------------------------------------------------------
992// Flush
993// ---------------------------------------------------------------------------
994
995fn flush() {
996    EXECUTOR.with(Executor::flush_instance);
997}
998
999// ---------------------------------------------------------------------------
1000// Public API
1001// ---------------------------------------------------------------------------
1002
1003/// Check the deferred callback limit before pushing.
1004fn check_callback_limit(ex: &Executor) {
1005    if let Some(limit) = ex.max_deferred_callbacks {
1006        assert!(
1007            ex.deferred_callbacks.len() < limit,
1008            "deferred callback limit exceeded ({limit}). \
1009             This usually indicates an unbounded signal-set loop. \
1010             Increase the limit via set_max_deferred_callbacks() \
1011             or disable it with None."
1012        );
1013    }
1014}
1015
1016/// Set the platform flush scheduler and install the signal deferred-
1017/// callback hook.
1018///
1019/// Idempotent — subsequent calls are no-ops (the hook is installed via
1020/// [`std::sync::OnceLock`], so it fires exactly once per process).
1021///
1022/// # Threading constraint
1023///
1024/// The hook is **per-process** and routes signal notifications to the
1025/// executor that is "current" when the notification fires (see
1026/// [`with_executor`]).  For single-threaded use (Wasm, CLI) this is
1027/// transparent.  For multi-threaded SSR, enable the `ssr-tokio` feature
1028/// and call [`init_scope_store_tokio`](crate::init_scope_store_tokio).
1029/// See [`with_executor`] for the full routing contract.
1030pub fn init_flush_scheduler(sched: Rc<dyn ScheduleFlush>) {
1031    EXECUTOR.with(|exec| exec.borrow_mut().flush_scheduler = Some(sched));
1032    install_signal_hook_once();
1033}
1034
1035/// Check whether a flush scheduler has already been installed on the
1036/// global executor.
1037///
1038/// Used by [`auralis_devtools::init`] to decide whether to auto-install
1039/// a [`DeferredScheduler`](crate::scheduler::DeferredScheduler).
1040#[must_use]
1041pub fn has_flush_scheduler() -> bool {
1042    EXECUTOR.with(|exec| exec.borrow().flush_scheduler.is_some())
1043}
1044
1045/// Drain pending deferred signal callbacks on the global executor.
1046///
1047/// Unlike [`Executor::flush_instance`], this does **not** poll tasks or
1048/// expire timers — it only processes callbacks that [`Signal::set`]
1049/// pushed before a scheduler was installed.  Snapshot and diagnostic
1050/// tools call this to ensure memo dirty state is consistent before
1051/// reading the registry.
1052///
1053/// Safe to call when no scheduler is installed (no-op).  Callbacks are
1054/// [`catch_unwind`]-isolated so a panic in one subscriber doesn't block
1055/// the rest.  Uses [`RefCell::try_borrow_mut`] so that calling this from
1056/// within an in-progress flush (re-entrant snapshot) is a safe no-op
1057/// instead of a panic.
1058///
1059/// [`RefCell::try_borrow_mut`]: std::cell::RefCell::try_borrow_mut
1060pub fn drain_deferred_signal_callbacks() {
1061    loop {
1062        let callbacks = EXECUTOR.with(|ex| {
1063            ex.try_borrow_mut()
1064                .map(|mut e| std::mem::take(&mut e.deferred_callbacks))
1065                .unwrap_or_default()
1066        });
1067        if callbacks.is_empty() {
1068            break;
1069        }
1070        for cb in callbacks {
1071            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(cb));
1072        }
1073    }
1074}
1075
1076/// Install the hook that bridges `auralis_signal::Signal::set` to the
1077/// executor's deferred-callback queue.
1078///
1079/// Idempotent — safe to call multiple times.
1080fn install_signal_hook_once() {
1081    use std::sync::OnceLock;
1082    static INSTALLED: OnceLock<()> = OnceLock::new();
1083    INSTALLED.get_or_init(|| {
1084        auralis_signal::install_schedule_hook(Box::new(|cb: Box<dyn FnOnce()>| {
1085            // Prefer the current executor (set via `with_executor`) for
1086            // SSR multi-request isolation; fall back to the global one.
1087            if let Some(ex) = current_executor() {
1088                let maybe_sched = {
1089                    let mut e = ex.borrow_mut();
1090                    check_callback_limit(&e);
1091                    e.deferred_callbacks.push(cb);
1092                    if e.in_flush {
1093                        None
1094                    } else {
1095                        e.try_schedule_flush()
1096                    }
1097                };
1098                if let Some(sched) = maybe_sched {
1099                    let ex2 = Rc::clone(&ex);
1100                    sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1101                }
1102            } else {
1103                EXECUTOR.with(|exec| {
1104                    let maybe_sched = {
1105                        let mut ex = exec.borrow_mut();
1106                        check_callback_limit(&ex);
1107                        ex.deferred_callbacks.push(cb);
1108                        if ex.in_flush {
1109                            None
1110                        } else {
1111                            ex.try_schedule_flush()
1112                        }
1113                    };
1114                    if let Some(sched) = maybe_sched {
1115                        sched.schedule(Box::new(flush));
1116                    }
1117                });
1118            }
1119        }));
1120    });
1121}
1122
1123/// Set the platform time source used for time-budget accounting.
1124///
1125/// If no [`TimeSource`] is registered the executor runs every flush to
1126/// completion without yielding, which is acceptable for short-running
1127/// workloads but may cause frame drops in the browser.
1128pub fn init_time_source(ts: Rc<dyn TimeSource>) {
1129    EXECUTOR.with(|exec| exec.borrow_mut().time_source = Some(ts));
1130}
1131
1132/// Set the per-flush time budget on the **global** thread-local executor.
1133///
1134/// This does **not** affect instance executors created via
1135/// [`Executor::new_instance`] — those carry their own budget (default
1136/// 8 ms) and must be configured via [`Executor::set_time_budget`].
1137///
1138/// See [`Executor::set_time_budget`] for the full semantics.
1139pub fn set_global_time_budget(budget_ms: u64) {
1140    EXECUTOR.with(|exec| exec.borrow_mut().time_budget_ms = budget_ms);
1141}
1142
1143/// Set the deferred callback safety cap on the global executor.
1144///
1145/// See [`Executor::set_max_deferred_callbacks`] for details.
1146pub fn set_global_max_deferred_callbacks(limit: Option<usize>) {
1147    EXECUTOR.with(|exec| exec.borrow_mut().max_deferred_callbacks = limit);
1148}
1149
1150/// Register a global panic hook called when any globally-spawned
1151/// task panics.
1152///
1153/// See [`Executor::set_panic_hook`] for details.
1154pub fn set_panic_hook(hook: Rc<dyn Fn(PanicInfo)>) {
1155    EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = Some(hook));
1156}
1157
1158/// Remove the global panic hook, restoring the default silent
1159/// behaviour.
1160pub fn remove_panic_hook() {
1161    EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = None);
1162}
1163
1164/// Spawn a future on the global executor at low priority.
1165///
1166/// **Important:** [`init_flush_scheduler`] must be called before spawning
1167/// any tasks.  Without a flush scheduler, spawned tasks will sit in the
1168/// queue indefinitely because the executor has no way to schedule a flush
1169/// cycle.
1170pub fn spawn_global(future: impl Future<Output = ()> + 'static) {
1171    spawn_global_with_priority(Priority::Low, future);
1172}
1173
1174/// Spawn a future on the global executor at the given priority.
1175pub fn spawn_global_with_priority(priority: Priority, future: impl Future<Output = ()> + 'static) {
1176    spawn_inner_on(&EXECUTOR.with(Rc::clone), Box::pin(future), priority, 0);
1177}
1178
1179/// Spawn a future on a specific executor and scope.
1180pub(crate) fn spawn_scoped_on(
1181    ex: &Rc<RefCell<Executor>>,
1182    priority: Priority,
1183    scope_id: u64,
1184    future: impl Future<Output = ()> + 'static,
1185) -> TaskId {
1186    spawn_inner_on(ex, Box::pin(future), priority, scope_id)
1187}
1188
1189fn spawn_inner_on(
1190    ex: &Rc<RefCell<Executor>>,
1191    future: Pin<Box<dyn Future<Output = ()> + 'static>>,
1192    priority: Priority,
1193    scope_id: u64,
1194) -> TaskId {
1195    let (task_id, maybe_sched) = {
1196        let mut e = ex.borrow_mut();
1197        let task_id = e.allocate_id();
1198        e.tasks[task_id as usize] = Some(TaskState {
1199            future,
1200            priority,
1201            scope_id,
1202            timer_deadline: 0,
1203            total_poll_count: 0,
1204            last_poll_duration_us: 0,
1205        });
1206        e.enqueue(task_id);
1207        let sched = e.try_schedule_flush();
1208        (task_id, sched)
1209    };
1210    if let Some(sched) = maybe_sched {
1211        let ex2 = Rc::clone(ex);
1212        sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1213    }
1214    task_id
1215}
1216
1217/// Enqueue all tasks belonging to `scope_id` on a given executor.
1218///
1219/// Used by [`TaskScope::resume`] to restart tasks after a suspend.
1220pub(crate) fn enqueue_scope_tasks_on(ex: &ExecutorRef, task_ids: &[TaskId]) {
1221    if task_ids.is_empty() {
1222        return;
1223    }
1224    let maybe_sched = {
1225        let mut e = ex.borrow_mut();
1226        for tid in task_ids {
1227            e.enqueue(*tid);
1228        }
1229        if e.in_flush {
1230            None
1231        } else {
1232            e.try_schedule_flush()
1233        }
1234    };
1235    if let Some(sched) = maybe_sched {
1236        let ex2 = Rc::clone(ex);
1237        sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1238    }
1239}
1240/// Cancel all tasks belonging to `scope_id` on a specific executor.
1241pub(crate) fn cancel_scope_tasks_on(
1242    ex: &Rc<RefCell<Executor>>,
1243    task_ids: &[TaskId],
1244) -> Vec<Pin<Box<dyn Future<Output = ()>>>> {
1245    if task_ids.is_empty() {
1246        return Vec::new();
1247    }
1248
1249    let mut e = ex.borrow_mut();
1250    let mut dropped = Vec::with_capacity(task_ids.len());
1251
1252    // Collect timer deadlines before mutating.
1253    let mut timer_deadlines: Vec<(u64, TaskId)> = Vec::new();
1254    for &tid in task_ids {
1255        let idx = tid as usize;
1256        if idx < e.tasks.len() {
1257            if let Some(ref t) = e.tasks[idx] {
1258                if t.timer_deadline != 0 {
1259                    timer_deadlines.push((t.timer_deadline, tid));
1260                }
1261            }
1262        }
1263    }
1264    for (dl, tid) in &timer_deadlines {
1265        e.cleanup_timer(*tid, *dl);
1266    }
1267
1268    // Cancel each task by id (direct lookup, no full-table scan).
1269    // Only push to free_slots for slots we actually took.
1270    for &tid in task_ids {
1271        let idx = tid as usize;
1272        if idx < e.tasks.len() {
1273            if let Some(state) = e.tasks[idx].take() {
1274                dropped.push(state.future);
1275                e.free_slots.push(tid);
1276            }
1277        }
1278    }
1279    e.free_slots.sort_unstable();
1280    e.free_slots.dedup();
1281
1282    // Filter queues to remove cancelled tasks.
1283    let high: Vec<TaskId> = e
1284        .high_queue
1285        .iter()
1286        .copied()
1287        .filter(|&id| {
1288            let idx = id as usize;
1289            idx < e.tasks.len() && e.tasks[idx].is_some()
1290        })
1291        .collect();
1292    e.high_queue.clear();
1293    e.high_queue.extend(high);
1294
1295    let low: Vec<TaskId> = e
1296        .low_queue
1297        .iter()
1298        .copied()
1299        .filter(|&id| {
1300            let idx = id as usize;
1301            idx < e.tasks.len() && e.tasks[idx].is_some()
1302        })
1303        .collect();
1304    e.low_queue.clear();
1305    e.low_queue.extend(low);
1306
1307    dropped
1308}
1309
1310/// Cancel a single task by its id, dropping its future and cleaning up
1311/// its timer if any.  No-op if the task has already completed.
1312pub(crate) fn cancel_task(ex: &Rc<RefCell<Executor>>, task_id: TaskId) {
1313    let mut e = ex.borrow_mut();
1314    let idx = task_id as usize;
1315    if idx >= e.tasks.len() {
1316        return;
1317    }
1318    let deadline = e.tasks[idx].as_ref().map_or(0, |t| t.timer_deadline);
1319    if deadline != 0 {
1320        e.cleanup_timer(task_id, deadline);
1321    }
1322    let slot = e.tasks[idx].take();
1323    if slot.is_some() {
1324        e.free_slots.push(task_id);
1325        e.high_queue.retain(|&id| id != task_id);
1326        e.low_queue.retain(|&id| id != task_id);
1327    }
1328}
1329
1330/// Check whether a task slot is empty (task completed or was cancelled).
1331pub(crate) fn is_task_finished(ex: &Rc<RefCell<Executor>>, task_id: TaskId) -> bool {
1332    let e = ex.borrow();
1333    let idx = task_id as usize;
1334    idx >= e.tasks.len() || e.tasks[idx].is_none()
1335}
1336
1337// ---------------------------------------------------------------------------
1338// yield_now
1339// ---------------------------------------------------------------------------
1340
1341/// Return a [`Future`] that yields control back to the executor once.
1342#[must_use = "yield_now() does nothing unless awaited"]
1343pub fn yield_now() -> YieldNow {
1344    YieldNow { yielded: false }
1345}
1346
1347/// Future returned by [`yield_now`].
1348#[derive(Debug)]
1349#[must_use = "futures do nothing unless polled"]
1350pub struct YieldNow {
1351    yielded: bool,
1352}
1353
1354impl Future for YieldNow {
1355    type Output = ();
1356
1357    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1358        if self.yielded {
1359            Poll::Ready(())
1360        } else {
1361            self.yielded = true;
1362            cx.waker().wake_by_ref();
1363            Poll::Pending
1364        }
1365    }
1366}
1367
1368// ---------------------------------------------------------------------------
1369// schedule_callback — hook for auralis-signal's deferred callback model
1370// ---------------------------------------------------------------------------
1371
1372/// Schedule a closure to run at the start of the next executor flush.
1373///
1374/// Used internally by `auralis_signal` to defer subscriber callback
1375/// execution.  The closure is drained before the main poll loop.
1376///
1377/// Routes to the current executor (via [`with_executor`]) when one is
1378/// active; falls back to the global thread-local executor.
1379pub fn schedule_callback(f: Box<dyn FnOnce()>) {
1380    let exec = current_executor_instance();
1381    let maybe_sched = {
1382        let mut ex = exec.borrow_mut();
1383        check_callback_limit(&ex);
1384        ex.deferred_callbacks.push(f);
1385        if ex.in_flush {
1386            None
1387        } else {
1388            ex.try_schedule_flush()
1389        }
1390    };
1391    if let Some(sched) = maybe_sched {
1392        let ex2 = Rc::clone(&exec);
1393        sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1394    }
1395}
1396
1397// ---------------------------------------------------------------------------
1398// set_deferred
1399// ---------------------------------------------------------------------------
1400
1401/// Schedule a [`Signal::set`] call for the **next** executor flush.
1402///
1403/// Safe to call from inside [`Drop`] — the actual `signal.set(value)` is
1404/// deferred to a subsequent flush, avoiding re-entrant borrow panics.
1405///
1406/// Routes to the current executor (via [`with_executor`]) when one is
1407/// active; falls back to the global thread-local executor.
1408pub fn set_deferred<T: 'static>(signal: &Signal<T>, value: T) {
1409    let signal = signal.clone();
1410    let exec = current_executor_instance();
1411    let maybe_sched = {
1412        let mut ex = exec.borrow_mut();
1413        ex.deferred_ops.push(DeferredOp {
1414            f: Box::new(move || signal.set(value)),
1415        });
1416        ex.try_schedule_flush()
1417    };
1418    if let Some(sched) = maybe_sched {
1419        let ex2 = Rc::clone(&exec);
1420        sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1421    }
1422}
1423
1424// ---------------------------------------------------------------------------
1425// Test / debug helpers
1426// ---------------------------------------------------------------------------
1427
1428/// Completely reset the global executor to a pristine state.
1429///
1430/// Clears all task slots, queues, deferred ops, flush/scheduler flags,
1431/// and injected [`ScheduleFlush`]/[`TimeSource`].  Call at the start
1432/// of every test to prevent cross-test state leakage.
1433///
1434/// Note that the signal schedule hook (installed by
1435/// [`init_flush_scheduler`] via [`std::sync::OnceLock`]) **persists**
1436/// across resets — the hook references the global [`EXECUTOR`]
1437/// thread-local, and this function re-initialises that same executor
1438/// in place rather than replacing it.  This is correct behaviour:
1439/// after reset, signal notifications route to the freshly-cleared
1440/// global executor.
1441///
1442/// # Safety / usage
1443///
1444/// This function is intended **only** for testing.  Calling it while
1445/// the executor is processing tasks will silently drop all live
1446/// futures and may cause panics or undefined behavior in running
1447/// application code.
1448pub fn reset_executor_for_test() {
1449    PENDING_WAKES.with(|pw| pw.borrow_mut().clear());
1450    SLOTS.with(|s| s.borrow_mut().clear());
1451    CURRENT_EXECUTOR.with(|c| *c.borrow_mut() = None);
1452    EXECUTOR.with(|exec| {
1453        let mut ex = exec.borrow_mut();
1454        ex.high_queue.clear();
1455        ex.low_queue.clear();
1456        ex.tasks.clear();
1457        ex.free_slots.clear();
1458        ex.next_task_id = 0;
1459        ex.is_flush_scheduled = false;
1460        ex.in_flush = false;
1461        ex.deferred_ops.clear();
1462        ex.deferred_callbacks.clear();
1463        ex.flush_scheduler = None;
1464        ex.time_source = None;
1465        ex.slot_id = 0;
1466        ex.generation = 0;
1467        ex.registered = false;
1468    });
1469    crate::scope::clear_scope_registry();
1470}
1471
1472#[cfg(any(test, feature = "debug"))]
1473pub(crate) fn debug_task_count() -> usize {
1474    EXECUTOR.with(|exec| exec.borrow().tasks.iter().filter(|t| t.is_some()).count())
1475}
1476
1477/// Return timing info for all active tasks: `task_id` → (`total_poll_count`, `last_poll_us`).
1478#[cfg(feature = "debug")]
1479pub(crate) fn debug_task_timing() -> std::collections::HashMap<TaskId, (u64, u64)> {
1480    EXECUTOR.with(|exec| {
1481        let ex = exec.borrow();
1482        let mut map = std::collections::HashMap::new();
1483        for (idx, slot) in ex.tasks.iter().enumerate() {
1484            if let Some(ref t) = slot {
1485                map.insert(idx as u64, (t.total_poll_count, t.last_poll_duration_us));
1486            }
1487        }
1488        map
1489    })
1490}
1491
1492/// Return a snapshot of all active tasks: `(task_id, priority, scope_id)`.
1493#[cfg(feature = "debug")]
1494pub(crate) fn debug_task_snapshot() -> Vec<(TaskId, Priority, u64)> {
1495    EXECUTOR.with(|exec| {
1496        let ex = exec.borrow();
1497        let mut snap = Vec::new();
1498        for (idx, slot) in ex.tasks.iter().enumerate() {
1499            if let Some(ref t) = slot {
1500                snap.push((idx as u64, t.priority, t.scope_id));
1501            }
1502        }
1503        snap
1504    })
1505}
1506
1507/// Return the set of task IDs currently in the ready queues.
1508#[cfg(feature = "debug")]
1509pub(crate) fn debug_queued_task_ids() -> Vec<TaskId> {
1510    EXECUTOR.with(|exec| {
1511        let ex = exec.borrow();
1512        let mut ids: Vec<TaskId> = ex
1513            .high_queue
1514            .iter()
1515            .chain(ex.low_queue.iter())
1516            .copied()
1517            .collect();
1518        ids.sort_unstable();
1519        ids.dedup();
1520        ids
1521    })
1522}
1523
1524/// Spawn a task without triggering an automatic flush.
1525/// Used in tests to batch multiple spawns before executing them.
1526#[cfg(test)]
1527pub(crate) fn spawn_no_auto_flush(
1528    priority: Priority,
1529    future: impl Future<Output = ()> + 'static,
1530) -> TaskId {
1531    EXECUTOR.with(|exec| {
1532        let mut ex = exec.borrow_mut();
1533        let task_id = ex.allocate_id();
1534        ex.tasks[task_id as usize] = Some(TaskState {
1535            future: Box::pin(future),
1536            priority,
1537            scope_id: 0,
1538            timer_deadline: 0,
1539            total_poll_count: 0,
1540            last_poll_duration_us: 0,
1541        });
1542        ex.enqueue(task_id);
1543        // Do NOT schedule flush.
1544        task_id
1545    })
1546}
1547
1548/// Run a manual flush cycle (for tests that need to control timing).
1549#[cfg(test)]
1550pub(crate) fn flush_all() {
1551    flush();
1552}