Skip to main content

auralis_task/
executor.rs

1//! Single-threaded executor with priority scheduling, time-budget
2//! awareness, and deferred-signal support.
3//!
4//! ## Architecture
5//!
6//! The executor is stored via a pluggable [`ExecutorStorage`] strategy
7//! (defaulting to a per-thread slot).  Before polling a task the future
8//! is **temporarily removed** so that the poll never holds an executor
9//! borrow — this allows nested spawns, wakes, and `set_deferred` calls
10//! without `RefCell` panics.
11//!
12//! The waker carries only a `task_id: u64`, making it trivially
13//! [`Send`] + [`Sync`] for [`Waker::from`].
14
15#![allow(clippy::cast_possible_truncation)]
16
17use std::cell::{Cell, RefCell};
18use std::collections::{BTreeMap, VecDeque};
19use std::future::Future;
20use std::pin::Pin;
21use std::rc::{Rc, Weak};
22use std::sync::Arc;
23use std::task::{Context, Poll, Wake, Waker};
24
25use auralis_signal::Signal;
26
27use crate::Priority;
28
29// ---------------------------------------------------------------------------
30// Types
31// ---------------------------------------------------------------------------
32
33type TaskId = u64;
34
35// ---------------------------------------------------------------------------
36// ScheduleFlush
37// ---------------------------------------------------------------------------
38
39/// Platform hook for scheduling a microtask callback.
40pub trait ScheduleFlush {
41    /// Request that `callback` runs at the next microtask boundary.
42    fn schedule(&self, callback: Box<dyn FnOnce()>);
43}
44
45/// A [`ScheduleFlush`] that fires the callback synchronously.
46///
47/// Makes the executor run-to-completion in unit tests without a browser
48/// event loop.
49#[cfg(test)]
50pub struct TestScheduleFlush;
51
52#[cfg(test)]
53impl ScheduleFlush for TestScheduleFlush {
54    fn schedule(&self, callback: Box<dyn FnOnce()>) {
55        callback();
56    }
57}
58
59// ---------------------------------------------------------------------------
60// TimeSource
61// ---------------------------------------------------------------------------
62
63/// High-resolution time source for the executor's time-budget
64/// accounting.
65///
66/// When registered via [`init_time_source`], the executor queries this
67/// before and after each task poll to decide whether it should yield
68/// control back to the host event loop (default budget: 8 ms).
69///
70/// In Wasm environments the implementation typically delegates to
71/// `performance.now()`.  If no [`TimeSource`] is registered the time
72/// budget check is a no-op and the executor runs tasks until the
73/// queues are drained.
74pub trait TimeSource {
75    /// Return the current time in milliseconds.
76    fn now_ms(&self) -> u64;
77}
78
79/// A [`TimeSource`] whose value is explicitly controlled by the test.
80///
81/// Use [`set`](TestTimeSource::set) or [`advance`](TestTimeSource::advance)
82/// to simulate the passage of time during a flush cycle.
83#[cfg(test)]
84pub struct TestTimeSource {
85    now: std::cell::Cell<u64>,
86}
87
88#[cfg(test)]
89impl TestTimeSource {
90    /// Create a new [`TestTimeSource`] with the given initial time.
91    #[must_use]
92    pub fn new(initial_ms: u64) -> Self {
93        Self {
94            now: std::cell::Cell::new(initial_ms),
95        }
96    }
97
98    /// Set the current time to `ms` milliseconds.
99    pub fn set(&self, ms: u64) {
100        self.now.set(ms);
101    }
102
103    /// Advance the current time by `ms` milliseconds.
104    pub fn advance(&self, ms: u64) {
105        self.now.set(self.now.get() + ms);
106    }
107}
108
109#[cfg(test)]
110impl TimeSource for TestTimeSource {
111    fn now_ms(&self) -> u64 {
112        self.now.get()
113    }
114}
115
116// ---------------------------------------------------------------------------
117// TaskWaker — routes wakes to the correct executor via a slot table.
118//
119// Waker::from requires Send + Sync + 'static, so the waker cannot hold
120// an Rc<RefCell<Executor>>.  Instead it stores a slot index + generation
121// number.  The SLOTS thread_local maps (index, generation) → Weak<Executor>.
122// On wake, the generation is validated before the weak pointer is upgraded.
123// Dead slots are reclaimed when new executors are registered.
124// ---------------------------------------------------------------------------
125
126/// A registered executor slot.  The `generation` counter distinguishes
127/// between successive executors that occupy the same slot index (e.g.
128/// after the previous one was dropped and a new one recycles the slot).
129struct Slot {
130    weak: Weak<RefCell<Executor>>,
131    /// Incremented (wrapping) every time this slot is reused.
132    /// A [`TaskWaker`] must present the generation it was created with;
133    /// a mismatch means the waker is stale and is silently ignored.
134    generation: u64,
135}
136
137thread_local! {
138    /// Slot 0 is reserved for the global executor.  Instance executors
139    /// occupy subsequent slots.  Dead slots (Weak::upgrade returns None)
140    /// are recycled in [`register_executor`].
141    static SLOTS: RefCell<Vec<Slot>> = const { RefCell::new(Vec::new()) };
142}
143
144/// Register an executor in the slot table, returning the assigned
145/// (`slot_id`, `generation`) pair.  Dead slots are recycled in-place;
146/// if no dead slot is found a new entry is appended.
147fn register_executor(weak: Weak<RefCell<Executor>>) -> (u64, u64) {
148    SLOTS.with(|slots| {
149        let mut slots = slots.borrow_mut();
150        for (i, slot) in slots.iter_mut().enumerate() {
151            if slot.weak.upgrade().is_none() {
152                slot.weak = weak;
153                // Wrapping is safe: 2^64 reuses of a single slot
154                // would take ~10^14 years at 1 reuse/μs.
155                slot.generation = slot.generation.wrapping_add(1);
156                return (i as u64, slot.generation);
157            }
158        }
159        let gen = 0;
160        slots.push(Slot {
161            weak,
162            generation: gen,
163        });
164        ((slots.len() - 1) as u64, gen)
165    })
166}
167
168/// Look up an executor by slot id, validating the generation.
169fn lookup_executor(slot_id: u64, generation: u64) -> Option<Rc<RefCell<Executor>>> {
170    SLOTS.with(|slots| {
171        let slots = slots.borrow();
172        let slot = slots.get(slot_id as usize)?;
173        if slot.generation != generation {
174            return None;
175        }
176        slot.weak.upgrade()
177    })
178}
179
180struct TaskWaker {
181    task_id: TaskId,
182    priority: Priority,
183    slot_id: u64,
184    generation: u64,
185}
186
187impl Wake for TaskWaker {
188    fn wake(self: Arc<Self>) {
189        let Some(exec) = lookup_executor(self.slot_id, self.generation) else {
190            return;
191        };
192        let maybe_sched = if let Ok(mut ex) = exec.try_borrow_mut() {
193            match self.priority {
194                Priority::High => ex.high_queue.push_back(self.task_id),
195                Priority::Low => ex.low_queue.push_back(self.task_id),
196            }
197            if ex.in_flush {
198                None
199            } else {
200                ex.try_schedule_flush()
201            }
202        } else {
203            PENDING_WAKES.with(|pw| {
204                pw.borrow_mut()
205                    .push((self.task_id, self.slot_id, self.generation));
206            });
207            None
208        };
209        if let Some(sched) = maybe_sched {
210            let sid = self.slot_id;
211            let gen = self.generation;
212            sched.schedule(Box::new(move || {
213                if let Some(ex) = lookup_executor(sid, gen) {
214                    Executor::flush_instance(&ex);
215                }
216            }));
217        }
218    }
219}
220
221// ---------------------------------------------------------------------------
222// TaskState
223// ---------------------------------------------------------------------------
224
225struct TaskState {
226    future: Pin<Box<dyn Future<Output = ()> + 'static>>,
227    priority: Priority,
228    scope_id: u64,
229    /// Key in [`Executor::timers`] for this task's pending sleep,
230    /// or 0 if the task is not waiting on a timer.
231    timer_deadline: u64,
232}
233
234// ---------------------------------------------------------------------------
235// Executor
236// ---------------------------------------------------------------------------
237
238/// Information about a task panic, passed to the user-registered
239/// [`set_panic_hook`].
240#[derive(Debug)]
241pub struct PanicInfo {
242    /// The executor-assigned task id.
243    pub task_id: u64,
244    /// The scope that owned the task (0 for global tasks).
245    pub scope_id: u64,
246    /// The boxed panic payload.
247    pub payload: Box<dyn std::any::Any + Send>,
248}
249
250/// A single-threaded async task executor with priority queues.
251///
252/// Each [`Executor`] manages its own task slots, ready queues, and
253/// deferred callback buffers.  Use [`Executor::new_instance`] to create
254/// an isolated executor (e.g. per SSR request), or use the global
255/// thread-local executor via [`spawn_global`](crate::spawn_global).
256pub struct Executor {
257    high_queue: VecDeque<TaskId>,
258    low_queue: VecDeque<TaskId>,
259    tasks: Vec<Option<TaskState>>,
260    free_slots: Vec<TaskId>,
261    next_task_id: TaskId,
262    is_flush_scheduled: bool,
263    in_flush: bool,
264    deferred_ops: Vec<DeferredOp>,
265    /// Callbacks pushed by `Signal::set` via the schedule hook.
266    /// Drained at the start of every flush before polling tasks.
267    deferred_callbacks: Vec<Box<dyn FnOnce()>>,
268    flush_scheduler: Option<Rc<dyn ScheduleFlush>>,
269    time_source: Option<Rc<dyn TimeSource>>,
270    /// Maximum milliseconds to spend inside a single flush before
271    /// yielding back to the host event loop.  Default: 8 ms.
272    time_budget_ms: u64,
273    /// Optional hook invoked when a spawned task panics.
274    panic_hook: Option<Rc<dyn Fn(PanicInfo)>>,
275    /// Timer queue: map from deadline (ms) to task ids that should be
276    /// woken when that deadline expires.  Processed at the start of
277    /// every flush.
278    timers: BTreeMap<u64, Vec<TaskId>>,
279    /// Slot index and generation in [`SLOTS`] for routing wakes back
280    /// to this executor.  Set by [`new_instance`] or lazily for the
281    /// global executor.
282    slot_id: u64,
283    generation: u64,
284    /// Whether this executor has been registered in [`SLOTS`].
285    registered: bool,
286}
287
288// Set by the executor before polling a task, cleared afterward.
289// Lets futures discover their task id without threading it through
290// layers of combinators.
291thread_local! {
292    static CURRENT_POLLING_TASK: Cell<Option<TaskId>> = const { Cell::new(None) };
293}
294
295pub(crate) fn with_current_polling_task<R>(f: impl FnOnce(Option<TaskId>) -> R) -> R {
296    CURRENT_POLLING_TASK.with(|c| f(c.get()))
297}
298
299struct DeferredOp {
300    f: Box<dyn FnOnce()>,
301}
302
303impl Executor {
304    fn new() -> Self {
305        Self {
306            high_queue: VecDeque::new(),
307            low_queue: VecDeque::new(),
308            tasks: Vec::new(),
309            free_slots: Vec::new(),
310            next_task_id: 0,
311            is_flush_scheduled: false,
312            in_flush: false,
313            deferred_ops: Vec::new(),
314            deferred_callbacks: Vec::new(),
315            flush_scheduler: None,
316            time_source: None,
317            time_budget_ms: 8,
318            panic_hook: None,
319            timers: BTreeMap::new(),
320            slot_id: 0,
321            generation: 0,
322            registered: false,
323        }
324    }
325
326    fn allocate_id(&mut self) -> TaskId {
327        if let Some(id) = self.free_slots.pop() {
328            return id;
329        }
330        let id = self.next_task_id;
331        self.next_task_id += 1;
332        self.tasks.push(None);
333        id
334    }
335
336    fn free_slot(&mut self, task_id: TaskId) {
337        // Clean up any pending timer for this task so a recycled
338        // task ID is not spuriously woken by an old deadline.
339        // This works when the slot is still occupied (scope cancel
340        // path).  For normal completion (Poll::Ready), the slot is
341        // already None and the caller must call cleanup_timer first.
342        if let Some(Some(ref t)) = self.tasks.get(task_id as usize) {
343            if t.timer_deadline != 0 {
344                self.cleanup_timer(task_id, t.timer_deadline);
345            }
346        }
347        self.tasks[task_id as usize] = None;
348        self.free_slots.push(task_id);
349    }
350
351    /// Remove a timer entry for `task_id` from the timer map.
352    fn cleanup_timer(&mut self, task_id: TaskId, deadline: u64) {
353        if let Some(tids) = self.timers.get_mut(&deadline) {
354            tids.retain(|id| *id != task_id);
355            if tids.is_empty() {
356                self.timers.remove(&deadline);
357            }
358        }
359    }
360
361    fn enqueue(&mut self, task_id: TaskId) {
362        let priority = match self.tasks.get(task_id as usize).and_then(Option::as_ref) {
363            Some(t) => t.priority,
364            None => return,
365        };
366        match priority {
367            Priority::High => self.high_queue.push_back(task_id),
368            Priority::Low => self.low_queue.push_back(task_id),
369        }
370    }
371
372    fn dequeue(&mut self) -> Option<TaskId> {
373        self.high_queue
374            .pop_front()
375            .or_else(|| self.low_queue.pop_front())
376    }
377
378    /// Mark that a flush is needed and return the scheduler if one is
379    /// registered.  The caller **must** invoke the scheduler **after**
380    /// releasing the executor borrow.
381    fn try_schedule_flush(&mut self) -> Option<Rc<dyn ScheduleFlush>> {
382        if self.is_flush_scheduled {
383            return None;
384        }
385        self.is_flush_scheduled = true;
386        self.flush_scheduler.clone()
387    }
388
389    /// Return the current time in ms, or 0 if no [`TimeSource`] is
390    /// registered.  When this returns 0 the time-budget check is
391    /// effectively a no-op.
392    pub(crate) fn now_ms(&self) -> u64 {
393        self.time_source.as_ref().map_or(0, |ts| ts.now_ms())
394    }
395
396    /// Return the number of currently active (not-yet-completed) tasks.
397    ///
398    /// Used by streaming SSR to determine whether the stream should
399    /// wait for more work or terminate.
400    #[must_use]
401    pub fn active_task_count(&self) -> usize {
402        self.tasks.iter().filter(|t| t.is_some()).count()
403    }
404}
405
406// ---------------------------------------------------------------------------
407// Thread-local globals (default storage)
408// ---------------------------------------------------------------------------
409
410thread_local! {
411    static EXECUTOR: Rc<RefCell<Executor>> = Rc::new(RefCell::new(Executor::new()));
412    static PENDING_WAKES: RefCell<Vec<(TaskId, u64, u64)>> =
413        const { RefCell::new(Vec::new()) };
414}
415
416/// Ensure the global executor is registered in slot 0 (lazy, idempotent).
417/// Returns (`slot_id`, `generation`) for the global executor.
418fn ensure_global_registered() -> (u64, u64) {
419    SLOTS.with(|slots| {
420        let mut slots = slots.borrow_mut();
421        if slots.is_empty() {
422            let weak = EXECUTOR.with(Rc::downgrade);
423            slots.push(Slot {
424                weak,
425                generation: 0,
426            });
427        } else {
428            // Verify slot 0 still holds the global executor.
429            let global = EXECUTOR.with(Rc::clone);
430            let is_global = slots[0]
431                .weak
432                .upgrade()
433                .is_some_and(|ex| Rc::ptr_eq(&ex, &global));
434            if !is_global {
435                slots[0] = Slot {
436                    weak: Rc::downgrade(&global),
437                    generation: slots[0].generation.wrapping_add(1),
438                };
439            }
440        }
441        // Mark the global executor as registered so flush_instance
442        // doesn't call this function again on every flush.
443        EXECUTOR.with(|ex| {
444            let mut e = ex.borrow_mut();
445            e.slot_id = 0;
446            e.generation = slots[0].generation;
447            e.registered = true;
448        });
449        let gen = slots[0].generation;
450        (0, gen)
451    })
452}
453
454// ---------------------------------------------------------------------------
455// Executor instance methods (for isolated executors, e.g. SSR)
456// ---------------------------------------------------------------------------
457
458impl Executor {
459    /// Create a new isolated executor, wrapped for shared access.
460    ///
461    /// The returned executor is independent of the global thread-local
462    /// executor.  Use [`with_executor`] to make it the current executor
463    /// for the duration of a closure, so that spawned tasks and signal
464    /// callbacks are routed to it.
465    #[must_use]
466    pub fn new_instance() -> Rc<RefCell<Executor>> {
467        let ex = Rc::new(RefCell::new(Executor::new()));
468        // Register in the slot table so TaskWaker can find this executor.
469        let (slot_id, generation) = register_executor(Rc::downgrade(&ex));
470        {
471            let mut e = ex.borrow_mut();
472            e.slot_id = slot_id;
473            e.generation = generation;
474            e.registered = true;
475        }
476        ex
477    }
478
479    /// Install a flush scheduler on this executor instance.
480    pub fn install_flush_scheduler(ex: &Rc<RefCell<Executor>>, sched: Rc<dyn ScheduleFlush>) {
481        ex.borrow_mut().flush_scheduler = Some(sched);
482    }
483
484    /// Install a time source on this executor instance.
485    pub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>) {
486        ex.borrow_mut().time_source = Some(ts);
487    }
488
489    /// Set the maximum time (in milliseconds) a single flush may spend
490    /// before yielding back to the host event loop.
491    ///
492    /// The default is 8 ms.  Set to `u64::MAX` to disable time-budget
493    /// yielding (flush runs to completion).
494    pub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64) {
495        ex.borrow_mut().time_budget_ms = budget_ms;
496    }
497
498    /// Register a callback invoked whenever a spawned task panics.
499    ///
500    /// The default is no hook — panicking tasks are silently removed
501    /// from the executor (the same behaviour as a task returning
502    /// `Poll::Ready(())`).
503    ///
504    /// # Example
505    ///
506    /// ```rust,ignore
507    /// Executor::set_panic_hook(&ex, Rc::new(|info| {
508    ///     eprintln!("task {} in scope {} panicked", info.task_id, info.scope_id);
509    /// }));
510    /// ```
511    pub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>) {
512        ex.borrow_mut().panic_hook = Some(hook);
513    }
514
515    /// Register a timer: when `now_ms() >= deadline_ms`, enqueue
516    /// `task_id` so it gets polled on the next flush.
517    pub(crate) fn schedule_timer(ex: &Rc<RefCell<Executor>>, deadline_ms: u64, task_id: TaskId) {
518        let mut e = ex.borrow_mut();
519        e.timers.entry(deadline_ms).or_default().push(task_id);
520        // Set the reverse index so cancel_scope_tasks can find this entry.
521        if let Some(Some(ref mut t)) = e.tasks.get_mut(task_id as usize) {
522            t.timer_deadline = deadline_ms;
523        }
524        // Request a flush so the timer is checked.
525        e.is_flush_scheduled = false;
526        let maybe_sched = e.try_schedule_flush();
527        drop(e);
528        if let Some(sched) = maybe_sched {
529            let ex2 = Rc::clone(ex);
530            sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
531        }
532    }
533
534    /// Spawn a future on this executor instance.
535    pub fn spawn(ex: &Rc<RefCell<Executor>>, future: impl Future<Output = ()> + 'static) {
536        let maybe_sched = {
537            let mut e = ex.borrow_mut();
538            let tid = e.allocate_id();
539            e.tasks[tid as usize] = Some(TaskState {
540                future: Box::pin(future),
541                priority: Priority::Low,
542                scope_id: 0,
543                timer_deadline: 0,
544            });
545            e.enqueue(tid);
546            e.try_schedule_flush()
547        };
548        if let Some(sched) = maybe_sched {
549            let ex2 = Rc::clone(ex);
550            sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
551        }
552    }
553
554    /// Run a full flush cycle on this executor instance.
555    ///
556    /// Mirrors the global flush cycle but operates on an
557    /// isolated executor (used for SSR).  Includes all the same
558    /// protections: `catch_unwind`, suspend checks, time-budget
559    /// yielding, and callback-drain budget.
560    #[allow(clippy::too_many_lines)]
561    pub fn flush_instance(ex: &Rc<RefCell<Executor>>) {
562        // Guard against re-entrant flushes.
563        {
564            let mut e = ex.borrow_mut();
565            if e.in_flush {
566                #[cfg(debug_assertions)]
567                {
568                    eprintln!(
569                        "[auralis-task] WARNING: Executor::flush_instance called \
570                         re-entrantly (already inside a flush). This is a no-op. \
571                         Check for nested flush() calls in signal callbacks or \
572                         ScheduleFlush implementations."
573                    );
574                }
575                return;
576            }
577            e.in_flush = true;
578        }
579
580        // Set this executor as the current one so that TaskWaker
581        // (which cannot hold an Rc) can discover it via thread-local.
582        // Restore on scope exit (including early returns for time-budget
583        // yielding and re-entrancy).
584        let prev_executor = CURRENT_EXECUTOR.with(|c| c.borrow_mut().replace(Rc::clone(ex)));
585        let _restore = RestoreExecutor(prev_executor);
586
587        // Step 0: drain expired timers.
588        {
589            let mut e = ex.borrow_mut();
590            let now = e.now_ms();
591            // When no TimeSource is registered (now == 0), expire all
592            // timers — they've already been woken via wake_by_ref and
593            // just need to be re-polled.
594            if now == 0 {
595                for (_, tasks) in std::mem::take(&mut e.timers) {
596                    for tid in tasks {
597                        // Clear the reverse index since the timer has fired.
598                        if let Some(Some(ref mut t)) = e.tasks.get_mut(tid as usize) {
599                            t.timer_deadline = 0;
600                        }
601                        e.enqueue(tid);
602                    }
603                }
604            } else {
605                let expired: Vec<u64> =
606                    e.timers.keys().copied().take_while(|&d| d <= now).collect();
607                for deadline in expired {
608                    if let Some(tasks) = e.timers.remove(&deadline) {
609                        for tid in tasks {
610                            if let Some(Some(ref mut t)) = e.tasks.get_mut(tid as usize) {
611                                t.timer_deadline = 0;
612                            }
613                            e.enqueue(tid);
614                        }
615                    }
616                }
617            }
618        }
619
620        // Step 1: deferred ops.
621        let deferred = std::mem::take(&mut ex.borrow_mut().deferred_ops);
622        for op in deferred {
623            (op.f)();
624        }
625
626        // Step 2: drain deferred signal callbacks with time budget.
627        {
628            let cb_start = ex.borrow().now_ms();
629            loop {
630                let callbacks = std::mem::take(&mut ex.borrow_mut().deferred_callbacks);
631                if callbacks.is_empty() {
632                    break;
633                }
634                for cb in callbacks {
635                    // Isolate each callback so a panic in one subscriber
636                    // doesn't block the remaining notifications or wedge
637                    // the executor (in_flush stays true on unwind).
638                    let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(cb));
639                }
640                if ex.borrow().now_ms().saturating_sub(cb_start) >= ex.borrow().time_budget_ms {
641                    if !ex.borrow().deferred_callbacks.is_empty() {
642                        let (sched, ex2) = {
643                            let mut e = ex.borrow_mut();
644                            e.in_flush = false;
645                            e.is_flush_scheduled = false;
646                            (e.try_schedule_flush(), Rc::clone(ex))
647                        };
648                        if let Some(sched) = sched {
649                            sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
650                        }
651                        return;
652                    }
653                    break;
654                }
655            }
656        }
657
658        // Step 3: main poll loop with time-budget check.
659        let poll_start = ex.borrow().now_ms();
660        loop {
661            let task_id = ex.borrow_mut().dequeue();
662            let Some(tid) = task_id else {
663                let mut e = ex.borrow_mut();
664                e.is_flush_scheduled = false;
665                e.in_flush = false;
666                break;
667            };
668
669            // Take the task out so the poll doesn't hold an executor borrow.
670            let maybe_state = ex.borrow_mut().tasks[tid as usize].take();
671            if let Some(mut state) = maybe_state {
672                let priority = state.priority;
673                let scope_id = state.scope_id;
674
675                // Check if the owning scope is suspended.
676                let scope = crate::scope::find_scope(scope_id);
677                if let Some(ref s) = scope {
678                    if s.is_suspended() {
679                        let mut e = ex.borrow_mut();
680                        if e.tasks[tid as usize].is_none() {
681                            e.tasks[tid as usize] = Some(state);
682                        }
683                        continue;
684                    }
685                }
686
687                // Ensure the executor is registered in the slot table.
688                // Must not call ensure_global_registered while holding
689                // a borrow on ex (it borrows the global EXECUTOR).
690                let (slot_id, gen) = {
691                    let e = ex.borrow();
692                    if e.registered {
693                        (e.slot_id, e.generation)
694                    } else {
695                        drop(e);
696                        ensure_global_registered()
697                    }
698                };
699                let waker = Waker::from(Arc::new(TaskWaker {
700                    task_id: tid,
701                    priority,
702                    slot_id,
703                    generation: gen,
704                }));
705                let mut cx = Context::from_waker(&waker);
706
707                // Inject owning scope.
708                let prev_scope = crate::scope::get_scope_direct();
709                if scope.is_some() {
710                    crate::scope::set_scope_direct(scope);
711                }
712
713                // Let futures discover their task id (used by timer::sleep).
714                CURRENT_POLLING_TASK.with(|c| c.set(Some(tid)));
715
716                // Task isolation — prevents a panicking task from
717                // unwinding through flush and leaving in_flush set.
718                let result: Result<Poll<()>, Box<dyn std::any::Any + Send>> =
719                    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
720                        state.future.as_mut().poll(&mut cx)
721                    }));
722
723                CURRENT_POLLING_TASK.with(|c| c.set(None));
724                crate::scope::set_scope_direct(prev_scope);
725
726                // Extract timer_deadline before state is dropped, so
727                // we can clean up the timer entry (free_slot can't
728                // read it because the slot is already None).
729                let timer_dl = state.timer_deadline;
730
731                match result {
732                    Ok(Poll::Ready(())) => {
733                        if timer_dl != 0 {
734                            ex.borrow_mut().cleanup_timer(tid, timer_dl);
735                        }
736                        ex.borrow_mut().free_slot(tid);
737                    }
738                    Err(payload) => {
739                        if timer_dl != 0 {
740                            ex.borrow_mut().cleanup_timer(tid, timer_dl);
741                        }
742                        // Notify the panic hook (if any) before freeing the slot.
743                        let hook = ex.borrow().panic_hook.clone();
744                        if let Some(h) = hook {
745                            h(PanicInfo {
746                                task_id: tid,
747                                scope_id,
748                                payload,
749                            });
750                        }
751                        ex.borrow_mut().free_slot(tid);
752                    }
753                    Ok(Poll::Pending) => {
754                        let mut e = ex.borrow_mut();
755                        if e.tasks[tid as usize].is_none() {
756                            e.tasks[tid as usize] = Some(state);
757                        }
758                    }
759                }
760            }
761
762            // Time budget check.
763            {
764                let elapsed = ex.borrow().now_ms().saturating_sub(poll_start);
765                if elapsed >= ex.borrow().time_budget_ms {
766                    let (maybe_sched, ex_clone) = {
767                        let mut e = ex.borrow_mut();
768                        e.is_flush_scheduled = false;
769                        e.in_flush = false;
770                        let sched = if !e.high_queue.is_empty() || !e.low_queue.is_empty() {
771                            e.try_schedule_flush()
772                        } else {
773                            None
774                        };
775                        (sched, Rc::clone(ex))
776                    };
777                    if let Some(sched) = maybe_sched {
778                        sched.schedule(Box::new(move || Self::flush_instance(&ex_clone)));
779                    }
780                    break;
781                }
782            }
783        }
784
785        // Drain any wakes that were buffered while the executor RefCell
786        // was borrowed (PENDING_WAKES fallback in TaskWaker::wake).
787        drain_pending_wakes();
788    }
789}
790
791// ---------------------------------------------------------------------------
792// Current-executor storage — injectable, defaults to thread-local
793// ---------------------------------------------------------------------------
794
795pub(crate) type ExecutorRef = Rc<RefCell<Executor>>;
796
797/// RAII guard that restores the previous executor when dropped.
798struct RestoreExecutor(Option<ExecutorRef>);
799
800impl Drop for RestoreExecutor {
801    fn drop(&mut self) {
802        CURRENT_EXECUTOR.with(|c| {
803            *c.borrow_mut() = self.0.take();
804        });
805    }
806}
807
808thread_local! {
809    static CURRENT_EXECUTOR: RefCell<Option<ExecutorRef>> = const { RefCell::new(None) };
810}
811
812/// Run `f` with `ex` set as the current executor.
813///
814/// Signal callbacks and `spawn_global` calls inside `f` will be routed
815/// to `ex` instead of the global thread-local executor.  Restores the
816/// previous executor afterward.
817///
818/// # Signal routing constraints
819///
820/// Auralis uses a **single global schedule hook** (installed once by the
821/// first call to [`init_flush_scheduler`]) that decides where signal
822/// notifications land by checking the current executor **at the time the
823/// notification fires**, not at the time `Signal::set` is called.
824///
825/// This design implies two hard requirements for multi-instance users:
826///
827/// 1. **`init_flush_scheduler` must be called at least once** — without
828///    it, `Signal::set` falls back to synchronous callback execution,
829///    which breaks the deferred-notification model and can cause
830///    re-entrant borrow panics.
831/// 2. **The instance executor must still be "current" when the flush
832///    runs** — if `with_executor` has already exited, deferred callbacks
833///    from signals set inside `f` will be routed to the global executor
834///    (or synchronously if no global hook is installed).
835///
836/// For the typical single-threaded case (Wasm, game loop, CLI), both
837/// requirements are satisfied trivially: call `init_flush_scheduler`
838/// once at startup and never use `with_executor`.  For SSR / multi-tenant
839/// servers, ensure that `with_executor` wraps the entire request
840/// lifecycle — from signal creation through the final flush.
841///
842/// # Example
843///
844/// ```rust,ignore
845/// use auralis_task::Executor;
846///
847/// let ex = Executor::new_instance();
848/// Executor::install_flush_scheduler(&ex, my_scheduler);
849/// auralis_task::with_executor(&ex, || {
850///     // Signal notifications and task spawns here go to `ex`.
851/// });
852/// ```
853pub fn with_executor<R>(ex: &ExecutorRef, f: impl FnOnce() -> R) -> R {
854    CURRENT_EXECUTOR.with(|exec| {
855        let prev = exec.borrow_mut().replace(Rc::clone(ex));
856        let result = f();
857        *exec.borrow_mut() = prev;
858        result
859    })
860}
861
862/// Return the current executor, if any.
863///
864/// If no executor has been set via [`with_executor`], returns `None` —
865/// callers should fall back to the global thread-local executor.
866fn current_executor() -> Option<ExecutorRef> {
867    CURRENT_EXECUTOR.with(|exec| exec.borrow().clone())
868}
869
870/// Return the currently active executor instance.
871///
872/// If [`with_executor`] was used to set an instance executor, returns
873/// that; otherwise returns the global thread-local executor.
874pub(crate) fn current_executor_instance() -> ExecutorRef {
875    current_executor().unwrap_or_else(|| EXECUTOR.with(Rc::clone))
876}
877
878/// Return the current time in milliseconds from the active executor's
879/// [`TimeSource`], or 0 if none is installed.
880pub(crate) fn current_time_ms() -> u64 {
881    current_executor_instance().borrow().now_ms()
882}
883
884// ---------------------------------------------------------------------------
885// Helpers — use thread_local EXECUTOR
886// ---------------------------------------------------------------------------
887
888/// Drain wakes that were buffered into [`PENDING_WAKES`] because the
889/// executor's `RefCell` was borrowed at the time [`TaskWaker::wake`]
890/// fired.  Called at the end of every [`Executor::flush_instance`].
891fn drain_pending_wakes() {
892    PENDING_WAKES.with(|pw| {
893        let wakes = std::mem::take(&mut *pw.borrow_mut());
894        for (tid, slot_id, gen) in wakes {
895            let Some(exec) = lookup_executor(slot_id, gen) else {
896                continue;
897            };
898            // Use enqueue() for the stale-task-id safety check.
899            exec.borrow_mut().enqueue(tid);
900            let maybe_sched = exec.borrow_mut().try_schedule_flush();
901            if let Some(sched) = maybe_sched {
902                let sid = slot_id;
903                let g = gen;
904                sched.schedule(Box::new(move || {
905                    if let Some(ex) = lookup_executor(sid, g) {
906                        Executor::flush_instance(&ex);
907                    }
908                }));
909            }
910        }
911    });
912}
913
914// ---------------------------------------------------------------------------
915// Flush
916// ---------------------------------------------------------------------------
917
918fn flush() {
919    EXECUTOR.with(Executor::flush_instance);
920}
921
922// ---------------------------------------------------------------------------
923// Public API
924// ---------------------------------------------------------------------------
925
926/// Set the platform flush scheduler and install the signal deferred-
927/// callback hook (idempotent — subsequent calls are no-ops for the hook).
928pub fn init_flush_scheduler(sched: Rc<dyn ScheduleFlush>) {
929    EXECUTOR.with(|exec| exec.borrow_mut().flush_scheduler = Some(sched));
930    install_signal_hook_once();
931}
932
933/// Install the hook that bridges `auralis_signal::Signal::set` to the
934/// executor's deferred-callback queue.
935///
936/// Idempotent — safe to call multiple times.
937fn install_signal_hook_once() {
938    use std::sync::OnceLock;
939    static INSTALLED: OnceLock<()> = OnceLock::new();
940    INSTALLED.get_or_init(|| {
941        auralis_signal::install_schedule_hook(Box::new(|cb: Box<dyn FnOnce()>| {
942            // Prefer the current executor (set via `with_executor`) for
943            // SSR multi-request isolation; fall back to the global one.
944            if let Some(ex) = current_executor() {
945                let maybe_sched = {
946                    let mut e = ex.borrow_mut();
947                    e.deferred_callbacks.push(cb);
948                    if e.in_flush {
949                        None
950                    } else {
951                        e.try_schedule_flush()
952                    }
953                };
954                if let Some(sched) = maybe_sched {
955                    let ex2 = Rc::clone(&ex);
956                    sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
957                }
958            } else {
959                EXECUTOR.with(|exec| {
960                    let maybe_sched = {
961                        let mut ex = exec.borrow_mut();
962                        ex.deferred_callbacks.push(cb);
963                        if ex.in_flush {
964                            None
965                        } else {
966                            ex.try_schedule_flush()
967                        }
968                    };
969                    if let Some(sched) = maybe_sched {
970                        sched.schedule(Box::new(flush));
971                    }
972                });
973            }
974        }));
975    });
976}
977
978/// Set the platform time source used for time-budget accounting.
979///
980/// If no [`TimeSource`] is registered the executor runs every flush to
981/// completion without yielding, which is acceptable for short-running
982/// workloads but may cause frame drops in the browser.
983pub fn init_time_source(ts: Rc<dyn TimeSource>) {
984    EXECUTOR.with(|exec| exec.borrow_mut().time_source = Some(ts));
985}
986
987/// Set the per-flush time budget on the global executor.
988///
989/// See [`Executor::set_time_budget`] for details.
990pub fn set_global_time_budget(budget_ms: u64) {
991    EXECUTOR.with(|exec| exec.borrow_mut().time_budget_ms = budget_ms);
992}
993
994/// Register a global panic hook called when any globally-spawned
995/// task panics.
996///
997/// See [`Executor::set_panic_hook`] for details.
998pub fn set_panic_hook(hook: Rc<dyn Fn(PanicInfo)>) {
999    EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = Some(hook));
1000}
1001
1002/// Remove the global panic hook, restoring the default silent
1003/// behaviour.
1004pub fn remove_panic_hook() {
1005    EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = None);
1006}
1007
1008/// Spawn a future on the global executor at low priority.
1009pub fn spawn_global(future: impl Future<Output = ()> + 'static) {
1010    spawn_global_with_priority(Priority::Low, future);
1011}
1012
1013/// Spawn a future on the global executor at the given priority.
1014pub fn spawn_global_with_priority(priority: Priority, future: impl Future<Output = ()> + 'static) {
1015    spawn_inner_on(&EXECUTOR.with(Rc::clone), Box::pin(future), priority, 0);
1016}
1017
1018/// Spawn a future on a specific executor and scope.
1019pub(crate) fn spawn_scoped_on(
1020    ex: &Rc<RefCell<Executor>>,
1021    priority: Priority,
1022    scope_id: u64,
1023    future: impl Future<Output = ()> + 'static,
1024) -> TaskId {
1025    spawn_inner_on(ex, Box::pin(future), priority, scope_id)
1026}
1027
1028fn spawn_inner_on(
1029    ex: &Rc<RefCell<Executor>>,
1030    future: Pin<Box<dyn Future<Output = ()> + 'static>>,
1031    priority: Priority,
1032    scope_id: u64,
1033) -> TaskId {
1034    let (task_id, maybe_sched) = {
1035        let mut e = ex.borrow_mut();
1036        let task_id = e.allocate_id();
1037        e.tasks[task_id as usize] = Some(TaskState {
1038            future,
1039            priority,
1040            scope_id,
1041            timer_deadline: 0,
1042        });
1043        e.enqueue(task_id);
1044        let sched = e.try_schedule_flush();
1045        (task_id, sched)
1046    };
1047    if let Some(sched) = maybe_sched {
1048        let ex2 = Rc::clone(ex);
1049        sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1050    }
1051    task_id
1052}
1053
1054/// Enqueue all tasks belonging to `scope_id` on a given executor.
1055///
1056/// Used by [`TaskScope::resume`] to restart tasks after a suspend.
1057pub(crate) fn enqueue_scope_tasks_on(ex: &ExecutorRef, scope_id: u64) {
1058    let task_ids: Vec<TaskId> = {
1059        let e = ex.borrow();
1060        e.tasks
1061            .iter()
1062            .enumerate()
1063            .filter(|(_, slot)| slot.as_ref().is_some_and(|t| t.scope_id == scope_id))
1064            .map(|(idx, _)| idx as TaskId)
1065            .collect()
1066    };
1067    let maybe_sched = {
1068        let mut e = ex.borrow_mut();
1069        for tid in &task_ids {
1070            e.enqueue(*tid);
1071        }
1072        if e.in_flush {
1073            None
1074        } else {
1075            e.try_schedule_flush()
1076        }
1077    };
1078    if let Some(sched) = maybe_sched {
1079        let ex2 = Rc::clone(ex);
1080        sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1081    }
1082}
1083/// Cancel all tasks belonging to `scope_id` on a specific executor.
1084pub(crate) fn cancel_scope_tasks_on(
1085    ex: &Rc<RefCell<Executor>>,
1086    scope_id: u64,
1087) -> Vec<Pin<Box<dyn Future<Output = ()>>>> {
1088    let mut e = ex.borrow_mut();
1089    let mut dropped = Vec::new();
1090
1091    // Collect timer deadlines for scope tasks before mutating timers.
1092    let mut timer_deadlines: Vec<(u64, TaskId)> = Vec::new();
1093    for (tid, slot) in e.tasks.iter().enumerate() {
1094        if let Some(ref t) = slot {
1095            if t.scope_id == scope_id && t.timer_deadline != 0 {
1096                timer_deadlines.push((t.timer_deadline, tid as TaskId));
1097            }
1098        }
1099    }
1100    for (dl, tid) in &timer_deadlines {
1101        e.cleanup_timer(*tid, *dl);
1102    }
1103
1104    for slot in &mut e.tasks {
1105        if let Some(ref t) = slot {
1106            if t.scope_id == scope_id {
1107                if let Some(state) = slot.take() {
1108                    dropped.push(state.future);
1109                }
1110            }
1111        }
1112    }
1113
1114    // Filter queues.
1115    let high: Vec<TaskId> = e
1116        .high_queue
1117        .iter()
1118        .filter(|id| e.tasks[**id as usize].is_some())
1119        .copied()
1120        .collect();
1121    e.high_queue.clear();
1122    e.high_queue.extend(high);
1123
1124    let low: Vec<TaskId> = e
1125        .low_queue
1126        .iter()
1127        .filter(|id| e.tasks[**id as usize].is_some())
1128        .copied()
1129        .collect();
1130    e.low_queue.clear();
1131    e.low_queue.extend(low);
1132
1133    let mut all_free: Vec<TaskId> = e
1134        .tasks
1135        .iter()
1136        .enumerate()
1137        .filter(|(_, s)| s.is_none())
1138        .map(|(i, _)| i as TaskId)
1139        .chain(e.free_slots.iter().copied())
1140        .collect();
1141    all_free.sort_unstable();
1142    all_free.dedup();
1143    e.free_slots = all_free;
1144
1145    dropped
1146}
1147
1148// ---------------------------------------------------------------------------
1149// yield_now
1150// ---------------------------------------------------------------------------
1151
1152/// Return a [`Future`] that yields control back to the executor once.
1153#[must_use = "yield_now() does nothing unless awaited"]
1154pub fn yield_now() -> YieldNow {
1155    YieldNow { yielded: false }
1156}
1157
1158/// Future returned by [`yield_now`].
1159#[derive(Debug)]
1160#[must_use = "futures do nothing unless polled"]
1161pub struct YieldNow {
1162    yielded: bool,
1163}
1164
1165impl Future for YieldNow {
1166    type Output = ();
1167
1168    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1169        if self.yielded {
1170            Poll::Ready(())
1171        } else {
1172            self.yielded = true;
1173            cx.waker().wake_by_ref();
1174            Poll::Pending
1175        }
1176    }
1177}
1178
1179// ---------------------------------------------------------------------------
1180// schedule_callback — hook for auralis-signal's deferred callback model
1181// ---------------------------------------------------------------------------
1182
1183/// Schedule a closure to run at the start of the next executor flush.
1184///
1185/// Used internally by `auralis_signal` to defer subscriber callback
1186/// execution.  The closure is drained before the main poll loop.
1187///
1188/// Routes to the current executor (via [`with_executor`]) when one is
1189/// active; falls back to the global thread-local executor.
1190pub fn schedule_callback(f: Box<dyn FnOnce()>) {
1191    let exec = current_executor_instance();
1192    let maybe_sched = {
1193        let mut ex = exec.borrow_mut();
1194        ex.deferred_callbacks.push(f);
1195        if ex.in_flush {
1196            None
1197        } else {
1198            ex.try_schedule_flush()
1199        }
1200    };
1201    if let Some(sched) = maybe_sched {
1202        let ex2 = Rc::clone(&exec);
1203        sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1204    }
1205}
1206
1207// ---------------------------------------------------------------------------
1208// set_deferred
1209// ---------------------------------------------------------------------------
1210
1211/// Schedule a [`Signal::set`] call for the **next** executor flush.
1212///
1213/// Safe to call from inside [`Drop`] — the actual `signal.set(value)` is
1214/// deferred to a subsequent flush, avoiding re-entrant borrow panics.
1215///
1216/// Routes to the current executor (via [`with_executor`]) when one is
1217/// active; falls back to the global thread-local executor.
1218pub fn set_deferred<T: 'static>(signal: &Signal<T>, value: T) {
1219    let signal = signal.clone();
1220    let exec = current_executor_instance();
1221    let maybe_sched = {
1222        let mut ex = exec.borrow_mut();
1223        ex.deferred_ops.push(DeferredOp {
1224            f: Box::new(move || signal.set(value)),
1225        });
1226        ex.try_schedule_flush()
1227    };
1228    if let Some(sched) = maybe_sched {
1229        let ex2 = Rc::clone(&exec);
1230        sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1231    }
1232}
1233
1234// ---------------------------------------------------------------------------
1235// Test / debug helpers
1236// ---------------------------------------------------------------------------
1237
1238/// Completely reset the global executor to a pristine state.
1239///
1240/// Clears all task slots, queues, deferred ops, flush/scheduler flags,
1241/// and injected [`ScheduleFlush`]/[`TimeSource`].  Call at the start
1242/// of every test to prevent cross-test state leakage.
1243///
1244/// # Safety / usage
1245///
1246/// This function is intended **only** for testing.  Calling it while
1247/// the executor is processing tasks will silently drop all live
1248/// futures and may cause panics or undefined behavior in running
1249/// application code.
1250pub fn reset_executor_for_test() {
1251    PENDING_WAKES.with(|pw| pw.borrow_mut().clear());
1252    SLOTS.with(|s| s.borrow_mut().clear());
1253    CURRENT_EXECUTOR.with(|c| *c.borrow_mut() = None);
1254    EXECUTOR.with(|exec| {
1255        let mut ex = exec.borrow_mut();
1256        ex.high_queue.clear();
1257        ex.low_queue.clear();
1258        ex.tasks.clear();
1259        ex.free_slots.clear();
1260        ex.next_task_id = 0;
1261        ex.is_flush_scheduled = false;
1262        ex.in_flush = false;
1263        ex.deferred_ops.clear();
1264        ex.deferred_callbacks.clear();
1265        ex.flush_scheduler = None;
1266        ex.time_source = None;
1267        ex.slot_id = 0;
1268        ex.generation = 0;
1269        ex.registered = false;
1270    });
1271    crate::scope::clear_scope_registry();
1272}
1273
1274#[cfg(test)]
1275pub(crate) fn debug_task_count() -> usize {
1276    EXECUTOR.with(|exec| exec.borrow().tasks.iter().filter(|t| t.is_some()).count())
1277}
1278
1279/// Return a snapshot of all active tasks: `(task_id, priority, scope_id)`.
1280#[cfg(feature = "debug")]
1281pub(crate) fn debug_task_snapshot() -> Vec<(TaskId, Priority, u64)> {
1282    EXECUTOR.with(|exec| {
1283        let ex = exec.borrow();
1284        let mut snap = Vec::new();
1285        for (idx, slot) in ex.tasks.iter().enumerate() {
1286            if let Some(ref t) = slot {
1287                snap.push((idx as u64, t.priority, t.scope_id));
1288            }
1289        }
1290        snap
1291    })
1292}
1293
1294/// Return the set of task IDs currently in the ready queues.
1295#[cfg(feature = "debug")]
1296pub(crate) fn debug_queued_task_ids() -> Vec<TaskId> {
1297    EXECUTOR.with(|exec| {
1298        let ex = exec.borrow();
1299        let mut ids: Vec<TaskId> = ex
1300            .high_queue
1301            .iter()
1302            .chain(ex.low_queue.iter())
1303            .copied()
1304            .collect();
1305        ids.sort_unstable();
1306        ids.dedup();
1307        ids
1308    })
1309}
1310
1311/// Spawn a task without triggering an automatic flush.
1312/// Used in tests to batch multiple spawns before executing them.
1313#[cfg(test)]
1314pub(crate) fn spawn_no_auto_flush(
1315    priority: Priority,
1316    future: impl Future<Output = ()> + 'static,
1317) -> TaskId {
1318    EXECUTOR.with(|exec| {
1319        let mut ex = exec.borrow_mut();
1320        let task_id = ex.allocate_id();
1321        ex.tasks[task_id as usize] = Some(TaskState {
1322            future: Box::pin(future),
1323            priority,
1324            scope_id: 0,
1325            timer_deadline: 0,
1326        });
1327        ex.enqueue(task_id);
1328        // Do NOT schedule flush.
1329        task_id
1330    })
1331}
1332
1333/// Run a manual flush cycle (for tests that need to control timing).
1334#[cfg(test)]
1335pub(crate) fn flush_all() {
1336    flush();
1337}