Skip to main content

auralis_task/
executor.rs

1//! Single-threaded executor with priority scheduling, time-budget
2//! awareness, and deferred-signal support.
3//!
4//! ## Architecture
5//!
6//! The executor is stored via a pluggable [`ExecutorStorage`] strategy
7//! (defaulting to a per-thread slot).  Before polling a task the future
8//! is **temporarily removed** so that the poll never holds an executor
9//! borrow — this allows nested spawns, wakes, and `set_deferred` calls
10//! without `RefCell` panics.
11//!
12//! The waker carries only a `task_id: u64`, making it trivially
13//! [`Send`] + [`Sync`] for [`Waker::from`].
14
15#![allow(clippy::cast_possible_truncation)]
16
17use std::cell::RefCell;
18use std::collections::VecDeque;
19use std::future::Future;
20use std::pin::Pin;
21use std::rc::Rc;
22use std::sync::Arc;
23use std::task::{Context, Poll, Wake, Waker};
24
25use auralis_signal::Signal;
26
27use crate::Priority;
28
29// ---------------------------------------------------------------------------
30// Types
31// ---------------------------------------------------------------------------
32
33type TaskId = u64;
34
35// ---------------------------------------------------------------------------
36// ScheduleFlush
37// ---------------------------------------------------------------------------
38
39/// Platform hook for scheduling a microtask callback.
40pub trait ScheduleFlush {
41    /// Request that `callback` runs at the next microtask boundary.
42    fn schedule(&self, callback: Box<dyn FnOnce()>);
43}
44
45/// A [`ScheduleFlush`] that fires the callback synchronously.
46///
47/// Makes the executor run-to-completion in unit tests without a browser
48/// event loop.
49#[cfg(test)]
50pub struct TestScheduleFlush;
51
52#[cfg(test)]
53impl ScheduleFlush for TestScheduleFlush {
54    fn schedule(&self, callback: Box<dyn FnOnce()>) {
55        callback();
56    }
57}
58
59// ---------------------------------------------------------------------------
60// TimeSource
61// ---------------------------------------------------------------------------
62
63/// High-resolution time source for the executor's time-budget
64/// accounting.
65///
66/// When registered via [`init_time_source`], the executor queries this
67/// before and after each task poll to decide whether it should yield
68/// control back to the host event loop (default budget: 8 ms).
69///
70/// In Wasm environments the implementation typically delegates to
71/// `performance.now()`.  If no [`TimeSource`] is registered the time
72/// budget check is a no-op and the executor runs tasks until the
73/// queues are drained.
74pub trait TimeSource {
75    /// Return the current time in milliseconds.
76    fn now_ms(&self) -> u64;
77}
78
79/// A [`TimeSource`] whose value is explicitly controlled by the test.
80///
81/// Use [`set`](TestTimeSource::set) or [`advance`](TestTimeSource::advance)
82/// to simulate the passage of time during a flush cycle.
83#[cfg(test)]
84pub struct TestTimeSource {
85    now: std::cell::Cell<u64>,
86}
87
88#[cfg(test)]
89impl TestTimeSource {
90    /// Create a new [`TestTimeSource`] with the given initial time.
91    #[must_use]
92    pub fn new(initial_ms: u64) -> Self {
93        Self {
94            now: std::cell::Cell::new(initial_ms),
95        }
96    }
97
98    /// Set the current time to `ms` milliseconds.
99    pub fn set(&self, ms: u64) {
100        self.now.set(ms);
101    }
102
103    /// Advance the current time by `ms` milliseconds.
104    pub fn advance(&self, ms: u64) {
105        self.now.set(self.now.get() + ms);
106    }
107}
108
109#[cfg(test)]
110impl TimeSource for TestTimeSource {
111    fn now_ms(&self) -> u64 {
112        self.now.get()
113    }
114}
115
116// ---------------------------------------------------------------------------
117// TaskWaker
118// ---------------------------------------------------------------------------
119
120struct TaskWaker {
121    task_id: TaskId,
122    priority: Priority,
123}
124
125impl Wake for TaskWaker {
126    fn wake(self: Arc<Self>) {
127        let maybe_sched = EXECUTOR.with(|exec| {
128            if let Ok(mut ex) = exec.try_borrow_mut() {
129                match self.priority {
130                    Priority::High => ex.high_queue.push_back(self.task_id),
131                    Priority::Low => ex.low_queue.push_back(self.task_id),
132                }
133                // Only schedule a fresh flush if we're NOT already inside
134                // one (the running flush loop will pick up the task).
135                if ex.in_flush {
136                    None
137                } else {
138                    ex.try_schedule_flush()
139                }
140            } else {
141                PENDING_WAKES.with(|pw| {
142                    pw.borrow_mut().push((self.task_id, self.priority));
143                });
144                None
145            }
146        });
147        if let Some(sched) = maybe_sched {
148            sched.schedule(Box::new(flush));
149        }
150    }
151}
152
153// ---------------------------------------------------------------------------
154// TaskState
155// ---------------------------------------------------------------------------
156
157struct TaskState {
158    future: Pin<Box<dyn Future<Output = ()> + 'static>>,
159    priority: Priority,
160    scope_id: u64,
161}
162
163// ---------------------------------------------------------------------------
164// Executor
165// ---------------------------------------------------------------------------
166
167/// Information about a task panic, passed to the user-registered
168/// [`set_panic_hook`].
169#[derive(Debug)]
170pub struct PanicInfo {
171    /// The executor-assigned task id.
172    pub task_id: u64,
173    /// The scope that owned the task (0 for global tasks).
174    pub scope_id: u64,
175    /// The boxed panic payload.
176    pub payload: Box<dyn std::any::Any + Send>,
177}
178
179/// A single-threaded async task executor with priority queues.
180///
181/// Each [`Executor`] manages its own task slots, ready queues, and
182/// deferred callback buffers.  Use [`Executor::new_instance`] to create
183/// an isolated executor (e.g. per SSR request), or use the global
184/// thread-local executor via [`spawn_global`](crate::spawn_global).
185pub struct Executor {
186    high_queue: VecDeque<TaskId>,
187    low_queue: VecDeque<TaskId>,
188    tasks: Vec<Option<TaskState>>,
189    free_slots: Vec<TaskId>,
190    next_task_id: TaskId,
191    is_flush_scheduled: bool,
192    in_flush: bool,
193    deferred_ops: Vec<DeferredOp>,
194    /// Callbacks pushed by `Signal::set` via the schedule hook.
195    /// Drained at the start of every flush before polling tasks.
196    deferred_callbacks: Vec<Box<dyn FnOnce()>>,
197    flush_scheduler: Option<Rc<dyn ScheduleFlush>>,
198    time_source: Option<Rc<dyn TimeSource>>,
199    /// Maximum milliseconds to spend inside a single flush before
200    /// yielding back to the host event loop.  Default: 8 ms.
201    time_budget_ms: u64,
202    /// Optional hook invoked when a spawned task panics.
203    panic_hook: Option<Rc<dyn Fn(PanicInfo)>>,
204}
205
206struct DeferredOp {
207    f: Box<dyn FnOnce()>,
208}
209
210impl Executor {
211    fn new() -> Self {
212        Self {
213            high_queue: VecDeque::new(),
214            low_queue: VecDeque::new(),
215            tasks: Vec::new(),
216            free_slots: Vec::new(),
217            next_task_id: 0,
218            is_flush_scheduled: false,
219            in_flush: false,
220            deferred_ops: Vec::new(),
221            deferred_callbacks: Vec::new(),
222            flush_scheduler: None,
223            time_source: None,
224            time_budget_ms: 8,
225            panic_hook: None,
226        }
227    }
228
229    fn allocate_id(&mut self) -> TaskId {
230        if let Some(id) = self.free_slots.pop() {
231            return id;
232        }
233        let id = self.next_task_id;
234        self.next_task_id += 1;
235        self.tasks.push(None);
236        id
237    }
238
239    fn free_slot(&mut self, task_id: TaskId) {
240        self.tasks[task_id as usize] = None;
241        self.free_slots.push(task_id);
242    }
243
244    fn enqueue(&mut self, task_id: TaskId) {
245        let priority = match self.tasks.get(task_id as usize).and_then(Option::as_ref) {
246            Some(t) => t.priority,
247            None => return,
248        };
249        match priority {
250            Priority::High => self.high_queue.push_back(task_id),
251            Priority::Low => self.low_queue.push_back(task_id),
252        }
253    }
254
255    fn dequeue(&mut self) -> Option<TaskId> {
256        self.high_queue
257            .pop_front()
258            .or_else(|| self.low_queue.pop_front())
259    }
260
261    /// Mark that a flush is needed and return the scheduler if one is
262    /// registered.  The caller **must** invoke the scheduler **after**
263    /// releasing the executor borrow.
264    fn try_schedule_flush(&mut self) -> Option<Rc<dyn ScheduleFlush>> {
265        if self.is_flush_scheduled {
266            return None;
267        }
268        self.is_flush_scheduled = true;
269        self.flush_scheduler.clone()
270    }
271
272    /// Return the current time in ms, or 0 if no [`TimeSource`] is
273    /// registered.  When this returns 0 the time-budget check is
274    /// effectively a no-op.
275    pub(crate) fn now_ms(&self) -> u64 {
276        self.time_source.as_ref().map_or(0, |ts| ts.now_ms())
277    }
278
279    /// Return the number of currently active (not-yet-completed) tasks.
280    ///
281    /// Used by streaming SSR to determine whether the stream should
282    /// wait for more work or terminate.
283    #[must_use]
284    pub fn active_task_count(&self) -> usize {
285        self.tasks.iter().filter(|t| t.is_some()).count()
286    }
287}
288
289// ---------------------------------------------------------------------------
290// Thread-local globals (default storage)
291// ---------------------------------------------------------------------------
292
293thread_local! {
294    static EXECUTOR: Rc<RefCell<Executor>> = Rc::new(RefCell::new(Executor::new()));
295    static PENDING_WAKES: RefCell<Vec<(TaskId, Priority)>> = const { RefCell::new(Vec::new()) };
296}
297
298// ---------------------------------------------------------------------------
299// Executor instance methods (for isolated executors, e.g. SSR)
300// ---------------------------------------------------------------------------
301
302impl Executor {
303    /// Create a new isolated executor, wrapped for shared access.
304    ///
305    /// The returned executor is independent of the global thread-local
306    /// executor.  Use [`with_executor`] to make it the current executor
307    /// for the duration of a closure, so that spawned tasks and signal
308    /// callbacks are routed to it.
309    #[must_use]
310    pub fn new_instance() -> Rc<RefCell<Executor>> {
311        Rc::new(RefCell::new(Executor::new()))
312    }
313
314    /// Install a flush scheduler on this executor instance.
315    pub fn install_flush_scheduler(ex: &Rc<RefCell<Executor>>, sched: Rc<dyn ScheduleFlush>) {
316        ex.borrow_mut().flush_scheduler = Some(sched);
317    }
318
319    /// Install a time source on this executor instance.
320    pub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>) {
321        ex.borrow_mut().time_source = Some(ts);
322    }
323
324    /// Set the maximum time (in milliseconds) a single flush may spend
325    /// before yielding back to the host event loop.
326    ///
327    /// The default is 8 ms.  Set to `u64::MAX` to disable time-budget
328    /// yielding (flush runs to completion).
329    pub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64) {
330        ex.borrow_mut().time_budget_ms = budget_ms;
331    }
332
333    /// Register a callback invoked whenever a spawned task panics.
334    ///
335    /// The default is no hook — panicking tasks are silently removed
336    /// from the executor (the same behaviour as a task returning
337    /// `Poll::Ready(())`).
338    ///
339    /// # Example
340    ///
341    /// ```rust,ignore
342    /// Executor::set_panic_hook(&ex, Rc::new(|info| {
343    ///     eprintln!("task {} in scope {} panicked", info.task_id, info.scope_id);
344    /// }));
345    /// ```
346    pub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>) {
347        ex.borrow_mut().panic_hook = Some(hook);
348    }
349
350    /// Spawn a future on this executor instance.
351    pub fn spawn(ex: &Rc<RefCell<Executor>>, future: impl Future<Output = ()> + 'static) {
352        let maybe_sched = {
353            let mut e = ex.borrow_mut();
354            let tid = e.allocate_id();
355            e.tasks[tid as usize] = Some(TaskState {
356                future: Box::pin(future),
357                priority: Priority::Low,
358                scope_id: 0,
359            });
360            e.enqueue(tid);
361            e.try_schedule_flush()
362        };
363        if let Some(sched) = maybe_sched {
364            let ex2 = Rc::clone(ex);
365            sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
366        }
367    }
368
369    /// Run a full flush cycle on this executor instance.
370    ///
371    /// Mirrors the global flush cycle but operates on an
372    /// isolated executor (used for SSR).  Includes all the same
373    /// protections: `catch_unwind`, suspend checks, time-budget
374    /// yielding, and callback-drain budget.
375    #[allow(clippy::too_many_lines)]
376    pub fn flush_instance(ex: &Rc<RefCell<Executor>>) {
377        // Guard against re-entrant flushes.
378        {
379            let mut e = ex.borrow_mut();
380            if e.in_flush {
381                #[cfg(debug_assertions)]
382                {
383                    eprintln!(
384                        "[auralis-task] WARNING: Executor::flush_instance called \
385                         re-entrantly (already inside a flush). This is a no-op. \
386                         Check for nested flush() calls in signal callbacks or \
387                         ScheduleFlush implementations."
388                    );
389                }
390                return;
391            }
392            e.in_flush = true;
393        }
394
395        // Step 1: deferred ops.
396        let deferred = std::mem::take(&mut ex.borrow_mut().deferred_ops);
397        for op in deferred {
398            (op.f)();
399        }
400
401        // Step 2: drain deferred signal callbacks with time budget.
402        {
403            let cb_start = ex.borrow().now_ms();
404            loop {
405                let callbacks = std::mem::take(&mut ex.borrow_mut().deferred_callbacks);
406                if callbacks.is_empty() {
407                    break;
408                }
409                for cb in callbacks {
410                    cb();
411                }
412                if ex.borrow().now_ms().saturating_sub(cb_start) >= ex.borrow().time_budget_ms {
413                    if !ex.borrow().deferred_callbacks.is_empty() {
414                        let (sched, ex2) = {
415                            let mut e = ex.borrow_mut();
416                            e.in_flush = false;
417                            e.is_flush_scheduled = false;
418                            (e.try_schedule_flush(), Rc::clone(ex))
419                        };
420                        if let Some(sched) = sched {
421                            sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
422                        }
423                        return;
424                    }
425                    break;
426                }
427            }
428        }
429
430        // Step 3: main poll loop with time-budget check.
431        let poll_start = ex.borrow().now_ms();
432        loop {
433            let task_id = ex.borrow_mut().dequeue();
434            let Some(tid) = task_id else {
435                let mut e = ex.borrow_mut();
436                e.is_flush_scheduled = false;
437                e.in_flush = false;
438                break;
439            };
440
441            // Take the task out so the poll doesn't hold an executor borrow.
442            let maybe_state = ex.borrow_mut().tasks[tid as usize].take();
443            if let Some(mut state) = maybe_state {
444                let priority = state.priority;
445                let scope_id = state.scope_id;
446
447                // Check if the owning scope is suspended.
448                let scope = crate::scope::find_scope(scope_id);
449                if let Some(ref s) = scope {
450                    if s.is_suspended() {
451                        let mut e = ex.borrow_mut();
452                        if e.tasks[tid as usize].is_none() {
453                            e.tasks[tid as usize] = Some(state);
454                        }
455                        continue;
456                    }
457                }
458
459                let waker = Waker::from(Arc::new(TaskWaker {
460                    task_id: tid,
461                    priority,
462                }));
463                let mut cx = Context::from_waker(&waker);
464
465                // Inject owning scope.
466                let prev_scope = crate::scope::get_scope_direct();
467                if scope.is_some() {
468                    crate::scope::set_scope_direct(scope);
469                }
470
471                // Task isolation (non-Wasm).
472                #[cfg(not(target_arch = "wasm32"))]
473                let result: Result<Poll<()>, Box<dyn std::any::Any + Send>> =
474                    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
475                        state.future.as_mut().poll(&mut cx)
476                    }));
477                #[cfg(target_arch = "wasm32")]
478                let poll = state.future.as_mut().poll(&mut cx);
479
480                crate::scope::set_scope_direct(prev_scope);
481
482                #[cfg(not(target_arch = "wasm32"))]
483                {
484                    match result {
485                        Ok(Poll::Ready(())) => {
486                            ex.borrow_mut().free_slot(tid);
487                        }
488                        Err(payload) => {
489                            // Notify the panic hook (if any) before freeing the slot.
490                            let hook = ex.borrow().panic_hook.clone();
491                            if let Some(h) = hook {
492                                h(PanicInfo {
493                                    task_id: tid,
494                                    scope_id,
495                                    payload,
496                                });
497                            }
498                            ex.borrow_mut().free_slot(tid);
499                        }
500                        Ok(Poll::Pending) => {
501                            let mut e = ex.borrow_mut();
502                            if e.tasks[tid as usize].is_none() {
503                                e.tasks[tid as usize] = Some(state);
504                            }
505                        }
506                    }
507                }
508                #[cfg(target_arch = "wasm32")]
509                {
510                    match poll {
511                        Poll::Ready(()) => {
512                            ex.borrow_mut().free_slot(tid);
513                        }
514                        Poll::Pending => {
515                            let mut e = ex.borrow_mut();
516                            if e.tasks[tid as usize].is_none() {
517                                e.tasks[tid as usize] = Some(state);
518                            }
519                        }
520                    }
521                }
522            }
523
524            // Time budget check.
525            {
526                let elapsed = ex.borrow().now_ms().saturating_sub(poll_start);
527                if elapsed >= ex.borrow().time_budget_ms {
528                    let (maybe_sched, ex_clone) = {
529                        let mut e = ex.borrow_mut();
530                        e.is_flush_scheduled = false;
531                        e.in_flush = false;
532                        let sched = if !e.high_queue.is_empty() || !e.low_queue.is_empty() {
533                            e.try_schedule_flush()
534                        } else {
535                            None
536                        };
537                        (sched, Rc::clone(ex))
538                    };
539                    if let Some(sched) = maybe_sched {
540                        sched.schedule(Box::new(move || Self::flush_instance(&ex_clone)));
541                    }
542                    break;
543                }
544            }
545        }
546    }
547}
548
549// ---------------------------------------------------------------------------
550// Current-executor storage — injectable, defaults to thread-local
551// ---------------------------------------------------------------------------
552
553type ExecutorRef = Rc<RefCell<Executor>>;
554
555thread_local! {
556    static CURRENT_EXECUTOR: RefCell<Option<ExecutorRef>> = const { RefCell::new(None) };
557}
558
559/// Run `f` with `ex` set as the current executor.
560///
561/// Signal callbacks and `spawn_global` calls inside `f` will be routed
562/// to `ex` instead of the global thread-local executor.  Restores the
563/// previous executor afterward.
564///
565/// # Signal routing constraints
566///
567/// Auralis uses a **single global schedule hook** (installed once by the
568/// first call to [`init_flush_scheduler`]) that decides where signal
569/// notifications land by checking the current executor **at the time the
570/// notification fires**, not at the time `Signal::set` is called.
571///
572/// This design implies two hard requirements for multi-instance users:
573///
574/// 1. **`init_flush_scheduler` must be called at least once** — without
575///    it, `Signal::set` falls back to synchronous callback execution,
576///    which breaks the deferred-notification model and can cause
577///    re-entrant borrow panics.
578/// 2. **The instance executor must still be "current" when the flush
579///    runs** — if `with_executor` has already exited, deferred callbacks
580///    from signals set inside `f` will be routed to the global executor
581///    (or synchronously if no global hook is installed).
582///
583/// For the typical single-threaded case (Wasm, game loop, CLI), both
584/// requirements are satisfied trivially: call `init_flush_scheduler`
585/// once at startup and never use `with_executor`.  For SSR / multi-tenant
586/// servers, ensure that `with_executor` wraps the entire request
587/// lifecycle — from signal creation through the final flush.
588///
589/// # Example
590///
591/// ```rust,ignore
592/// use auralis_task::Executor;
593///
594/// let ex = Executor::new_instance();
595/// Executor::install_flush_scheduler(&ex, my_scheduler);
596/// auralis_task::with_executor(&ex, || {
597///     // Signal notifications and task spawns here go to `ex`.
598/// });
599/// ```
600pub fn with_executor<R>(ex: &ExecutorRef, f: impl FnOnce() -> R) -> R {
601    CURRENT_EXECUTOR.with(|exec| {
602        let prev = exec.borrow_mut().replace(Rc::clone(ex));
603        let result = f();
604        *exec.borrow_mut() = prev;
605        result
606    })
607}
608
609/// Return the current executor, if any.
610///
611/// If no executor has been set via [`with_executor`], returns `None` —
612/// callers should fall back to the global thread-local executor.
613fn current_executor() -> Option<ExecutorRef> {
614    CURRENT_EXECUTOR.with(|exec| exec.borrow().clone())
615}
616
617// ---------------------------------------------------------------------------
618// Helpers — use thread_local EXECUTOR
619// ---------------------------------------------------------------------------
620
621fn drain_pending_wakes() {
622    PENDING_WAKES.with(|pw| {
623        let wakes = std::mem::take(&mut *pw.borrow_mut());
624        if wakes.is_empty() {
625            return;
626        }
627        EXECUTOR.with(|exec| {
628            let maybe_sched = {
629                let mut ex = exec.borrow_mut();
630                for (id, priority) in wakes {
631                    match priority {
632                        Priority::High => ex.high_queue.push_back(id),
633                        Priority::Low => ex.low_queue.push_back(id),
634                    }
635                }
636                ex.try_schedule_flush()
637            };
638            if let Some(sched) = maybe_sched {
639                sched.schedule(Box::new(flush));
640            }
641        });
642    });
643}
644
645// ---------------------------------------------------------------------------
646// Flush
647// ---------------------------------------------------------------------------
648
649fn flush() {
650    EXECUTOR.with(Executor::flush_instance);
651    // Drain any wakes that landed in PENDING_WAKES because the executor
652    // RefCell was borrowed during a callback or task poll.
653    drain_pending_wakes();
654}
655
656// ---------------------------------------------------------------------------
657// Public API
658// ---------------------------------------------------------------------------
659
660/// Set the platform flush scheduler and install the signal deferred-
661/// callback hook (idempotent — subsequent calls are no-ops for the hook).
662pub fn init_flush_scheduler(sched: Rc<dyn ScheduleFlush>) {
663    EXECUTOR.with(|exec| exec.borrow_mut().flush_scheduler = Some(sched));
664    install_signal_hook_once();
665}
666
667/// Install the hook that bridges `auralis_signal::Signal::set` to the
668/// executor's deferred-callback queue.
669///
670/// Idempotent — safe to call multiple times.
671fn install_signal_hook_once() {
672    use std::sync::OnceLock;
673    static INSTALLED: OnceLock<()> = OnceLock::new();
674    INSTALLED.get_or_init(|| {
675        auralis_signal::install_schedule_hook(Box::new(|cb: Box<dyn FnOnce()>| {
676            // Prefer the current executor (set via `with_executor`) for
677            // SSR multi-request isolation; fall back to the global one.
678            if let Some(ex) = current_executor() {
679                let maybe_sched = {
680                    let mut e = ex.borrow_mut();
681                    e.deferred_callbacks.push(cb);
682                    if e.in_flush {
683                        None
684                    } else {
685                        e.try_schedule_flush()
686                    }
687                };
688                if let Some(sched) = maybe_sched {
689                    let ex2 = Rc::clone(&ex);
690                    sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
691                }
692            } else {
693                EXECUTOR.with(|exec| {
694                    let maybe_sched = {
695                        let mut ex = exec.borrow_mut();
696                        ex.deferred_callbacks.push(cb);
697                        if ex.in_flush {
698                            None
699                        } else {
700                            ex.try_schedule_flush()
701                        }
702                    };
703                    if let Some(sched) = maybe_sched {
704                        sched.schedule(Box::new(flush));
705                    }
706                });
707            }
708        }));
709    });
710}
711
712/// Set the platform time source used for time-budget accounting.
713///
714/// If no [`TimeSource`] is registered the executor runs every flush to
715/// completion without yielding, which is acceptable for short-running
716/// workloads but may cause frame drops in the browser.
717pub fn init_time_source(ts: Rc<dyn TimeSource>) {
718    EXECUTOR.with(|exec| exec.borrow_mut().time_source = Some(ts));
719}
720
721/// Set the per-flush time budget on the global executor.
722///
723/// See [`Executor::set_time_budget`] for details.
724pub fn set_global_time_budget(budget_ms: u64) {
725    EXECUTOR.with(|exec| exec.borrow_mut().time_budget_ms = budget_ms);
726}
727
728/// Register a global panic hook called when any globally-spawned
729/// task panics.
730///
731/// See [`Executor::set_panic_hook`] for details.
732pub fn set_panic_hook(hook: Rc<dyn Fn(PanicInfo)>) {
733    EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = Some(hook));
734}
735
736/// Remove the global panic hook, restoring the default silent
737/// behaviour.
738pub fn remove_panic_hook() {
739    EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = None);
740}
741
742/// Spawn a future on the global executor at low priority.
743pub fn spawn_global(future: impl Future<Output = ()> + 'static) {
744    spawn_global_with_priority(Priority::Low, future);
745}
746
747/// Spawn a future on the global executor at the given priority.
748pub fn spawn_global_with_priority(priority: Priority, future: impl Future<Output = ()> + 'static) {
749    spawn_inner(Box::pin(future), priority, 0);
750}
751
752pub(crate) fn spawn_scoped(
753    priority: Priority,
754    scope_id: u64,
755    future: impl Future<Output = ()> + 'static,
756) -> TaskId {
757    spawn_inner(Box::pin(future), priority, scope_id)
758}
759
760fn spawn_inner(
761    future: Pin<Box<dyn Future<Output = ()> + 'static>>,
762    priority: Priority,
763    scope_id: u64,
764) -> TaskId {
765    EXECUTOR.with(|exec| {
766        let (task_id, maybe_sched) = {
767            let mut ex = exec.borrow_mut();
768            let task_id = ex.allocate_id();
769            ex.tasks[task_id as usize] = Some(TaskState {
770                future,
771                priority,
772                scope_id,
773            });
774            ex.enqueue(task_id);
775            let sched = ex.try_schedule_flush();
776            (task_id, sched)
777        };
778        // Schedule outside the borrow.
779        if let Some(sched) = maybe_sched {
780            sched.schedule(Box::new(flush));
781        }
782        task_id
783    })
784}
785
786/// Enqueue all tasks belonging to `scope_id` and trigger a flush.
787///
788/// Used by [`TaskScope::resume`] to restart tasks after a suspend.
789pub(crate) fn enqueue_scope_tasks(scope_id: u64) {
790    EXECUTOR.with(|exec| {
791        let task_ids: Vec<TaskId> = {
792            let ex = exec.borrow();
793            ex.tasks
794                .iter()
795                .enumerate()
796                .filter(|(_, slot)| slot.as_ref().is_some_and(|t| t.scope_id == scope_id))
797                .map(|(idx, _)| idx as TaskId)
798                .collect()
799        };
800        let maybe_sched = {
801            let mut ex = exec.borrow_mut();
802            for tid in task_ids {
803                ex.enqueue(tid);
804            }
805            if ex.in_flush {
806                None
807            } else {
808                ex.try_schedule_flush()
809            }
810        };
811        if let Some(sched) = maybe_sched {
812            sched.schedule(Box::new(flush));
813        }
814    });
815}
816
817pub(crate) fn cancel_scope_tasks(scope_id: u64) -> Vec<Pin<Box<dyn Future<Output = ()>>>> {
818    EXECUTOR.with(|exec| {
819        let mut ex = exec.borrow_mut();
820        let mut dropped = Vec::new();
821
822        for slot in &mut ex.tasks {
823            if let Some(ref t) = slot {
824                if t.scope_id == scope_id {
825                    dropped.push(
826                        slot.take()
827                            .expect("task slot was None after is_some check")
828                            .future,
829                    );
830                }
831            }
832        }
833
834        // Filter queues.
835        let high: Vec<TaskId> = ex
836            .high_queue
837            .iter()
838            .filter(|id| ex.tasks[**id as usize].is_some())
839            .copied()
840            .collect();
841        ex.high_queue.clear();
842        ex.high_queue.extend(high);
843
844        let low: Vec<TaskId> = ex
845            .low_queue
846            .iter()
847            .filter(|id| ex.tasks[**id as usize].is_some())
848            .copied()
849            .collect();
850        ex.low_queue.clear();
851        ex.low_queue.extend(low);
852
853        let mut all_free: Vec<TaskId> = ex
854            .tasks
855            .iter()
856            .enumerate()
857            .filter(|(_, s)| s.is_none())
858            .map(|(i, _)| i as TaskId)
859            .chain(ex.free_slots.iter().copied())
860            .collect();
861        all_free.sort_unstable();
862        all_free.dedup();
863        ex.free_slots = all_free;
864
865        dropped
866    })
867}
868
869// ---------------------------------------------------------------------------
870// yield_now
871// ---------------------------------------------------------------------------
872
873/// Return a [`Future`] that yields control back to the executor once.
874#[must_use = "yield_now() does nothing unless awaited"]
875pub fn yield_now() -> YieldNow {
876    YieldNow { yielded: false }
877}
878
879/// Future returned by [`yield_now`].
880#[derive(Debug)]
881#[must_use = "futures do nothing unless polled"]
882pub struct YieldNow {
883    yielded: bool,
884}
885
886impl Future for YieldNow {
887    type Output = ();
888
889    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
890        if self.yielded {
891            Poll::Ready(())
892        } else {
893            self.yielded = true;
894            cx.waker().wake_by_ref();
895            Poll::Pending
896        }
897    }
898}
899
900// ---------------------------------------------------------------------------
901// schedule_callback — hook for auralis-signal's deferred callback model
902// ---------------------------------------------------------------------------
903
904/// Schedule a closure to run at the start of the next executor flush.
905///
906/// Used internally by `auralis_signal` to defer subscriber callback
907/// execution.  The closure is drained before the main poll loop.
908pub fn schedule_callback(f: Box<dyn FnOnce()>) {
909    EXECUTOR.with(|exec| {
910        let maybe_sched = {
911            let mut ex = exec.borrow_mut();
912            ex.deferred_callbacks.push(f);
913            if ex.in_flush {
914                None
915            } else {
916                ex.try_schedule_flush()
917            }
918        };
919        if let Some(sched) = maybe_sched {
920            sched.schedule(Box::new(flush));
921        }
922    });
923}
924
925// ---------------------------------------------------------------------------
926// set_deferred
927// ---------------------------------------------------------------------------
928
929/// Schedule a [`Signal::set`] call for the **next** executor flush.
930///
931/// Safe to call from inside [`Drop`] — the actual `signal.set(value)` is
932/// deferred to a subsequent flush, avoiding re-entrant borrow panics.
933pub fn set_deferred<T: 'static>(signal: &Signal<T>, value: T) {
934    let signal = signal.clone();
935    EXECUTOR.with(|exec| {
936        let maybe_sched = {
937            let mut ex = exec.borrow_mut();
938            ex.deferred_ops.push(DeferredOp {
939                f: Box::new(move || signal.set(value)),
940            });
941            ex.try_schedule_flush()
942        };
943        if let Some(sched) = maybe_sched {
944            sched.schedule(Box::new(flush));
945        }
946    });
947}
948
949// ---------------------------------------------------------------------------
950// Test / debug helpers
951// ---------------------------------------------------------------------------
952
953/// Completely reset the global executor to a pristine state.
954///
955/// Clears all task slots, queues, deferred ops, flush/scheduler flags,
956/// and injected [`ScheduleFlush`]/[`TimeSource`].  Call at the start
957/// of every test to prevent cross-test state leakage.
958///
959/// # Safety / usage
960///
961/// This function is intended **only** for testing.  Calling it while
962/// the executor is processing tasks will silently drop all live
963/// futures and may cause panics or undefined behavior in running
964/// application code.
965pub fn reset_executor_for_test() {
966    PENDING_WAKES.with(|pw| pw.borrow_mut().clear());
967    EXECUTOR.with(|exec| {
968        let mut ex = exec.borrow_mut();
969        ex.high_queue.clear();
970        ex.low_queue.clear();
971        ex.tasks.clear();
972        ex.free_slots.clear();
973        ex.next_task_id = 0;
974        ex.is_flush_scheduled = false;
975        ex.in_flush = false;
976        ex.deferred_ops.clear();
977        ex.deferred_callbacks.clear();
978        ex.flush_scheduler = None;
979        ex.time_source = None;
980    });
981    crate::scope::clear_scope_registry();
982}
983
984#[cfg(test)]
985pub(crate) fn debug_task_count() -> usize {
986    EXECUTOR.with(|exec| exec.borrow().tasks.iter().filter(|t| t.is_some()).count())
987}
988
989/// Return a snapshot of all active tasks: `(task_id, priority, scope_id)`.
990#[cfg(feature = "debug")]
991pub(crate) fn debug_task_snapshot() -> Vec<(TaskId, Priority, u64)> {
992    EXECUTOR.with(|exec| {
993        let ex = exec.borrow();
994        let mut snap = Vec::new();
995        for (idx, slot) in ex.tasks.iter().enumerate() {
996            if let Some(ref t) = slot {
997                snap.push((idx as u64, t.priority, t.scope_id));
998            }
999        }
1000        snap
1001    })
1002}
1003
1004/// Return the set of task IDs currently in the ready queues.
1005#[cfg(feature = "debug")]
1006pub(crate) fn debug_queued_task_ids() -> Vec<TaskId> {
1007    EXECUTOR.with(|exec| {
1008        let ex = exec.borrow();
1009        let mut ids: Vec<TaskId> = ex
1010            .high_queue
1011            .iter()
1012            .chain(ex.low_queue.iter())
1013            .copied()
1014            .collect();
1015        ids.sort_unstable();
1016        ids.dedup();
1017        ids
1018    })
1019}
1020
1021/// Spawn a task without triggering an automatic flush.
1022/// Used in tests to batch multiple spawns before executing them.
1023#[cfg(test)]
1024pub(crate) fn spawn_no_auto_flush(
1025    priority: Priority,
1026    future: impl Future<Output = ()> + 'static,
1027) -> TaskId {
1028    EXECUTOR.with(|exec| {
1029        let mut ex = exec.borrow_mut();
1030        let task_id = ex.allocate_id();
1031        ex.tasks[task_id as usize] = Some(TaskState {
1032            future: Box::pin(future),
1033            priority,
1034            scope_id: 0,
1035        });
1036        ex.enqueue(task_id);
1037        // Do NOT schedule flush.
1038        task_id
1039    })
1040}
1041
1042/// Run a manual flush cycle (for tests that need to control timing).
1043#[cfg(test)]
1044pub(crate) fn flush_all() {
1045    flush();
1046}