Skip to main content

auralis_task/
executor.rs

1//! Single-threaded executor with priority scheduling, time-budget
2//! awareness, and deferred-signal support.
3//!
4//! ## Architecture
5//!
6//! The executor is stored via a pluggable [`ExecutorStorage`] strategy
7//! (defaulting to a per-thread slot).  Before polling a task the future
8//! is **temporarily removed** so that the poll never holds an executor
9//! borrow — this allows nested spawns, wakes, and `set_deferred` calls
10//! without `RefCell` panics.
11//!
12//! The waker carries only a `task_id: u64`, making it trivially
13//! [`Send`] + [`Sync`] for [`Waker::from`].
14
15#![allow(clippy::cast_possible_truncation)]
16
17use std::cell::{Cell, RefCell};
18use std::collections::{BTreeMap, VecDeque};
19use std::future::Future;
20use std::pin::Pin;
21use std::rc::Rc;
22use std::sync::Arc;
23use std::task::{Context, Poll, Wake, Waker};
24
25use auralis_signal::Signal;
26
27use crate::Priority;
28
29// ---------------------------------------------------------------------------
30// Types
31// ---------------------------------------------------------------------------
32
33type TaskId = u64;
34
35// ---------------------------------------------------------------------------
36// ScheduleFlush
37// ---------------------------------------------------------------------------
38
39/// Platform hook for scheduling a microtask callback.
40pub trait ScheduleFlush {
41    /// Request that `callback` runs at the next microtask boundary.
42    fn schedule(&self, callback: Box<dyn FnOnce()>);
43}
44
45/// A [`ScheduleFlush`] that fires the callback synchronously.
46///
47/// Makes the executor run-to-completion in unit tests without a browser
48/// event loop.
49#[cfg(test)]
50pub struct TestScheduleFlush;
51
52#[cfg(test)]
53impl ScheduleFlush for TestScheduleFlush {
54    fn schedule(&self, callback: Box<dyn FnOnce()>) {
55        callback();
56    }
57}
58
59// ---------------------------------------------------------------------------
60// TimeSource
61// ---------------------------------------------------------------------------
62
63/// High-resolution time source for the executor's time-budget
64/// accounting.
65///
66/// When registered via [`init_time_source`], the executor queries this
67/// before and after each task poll to decide whether it should yield
68/// control back to the host event loop (default budget: 8 ms).
69///
70/// In Wasm environments the implementation typically delegates to
71/// `performance.now()`.  If no [`TimeSource`] is registered the time
72/// budget check is a no-op and the executor runs tasks until the
73/// queues are drained.
74pub trait TimeSource {
75    /// Return the current time in milliseconds.
76    fn now_ms(&self) -> u64;
77}
78
79/// A [`TimeSource`] whose value is explicitly controlled by the test.
80///
81/// Use [`set`](TestTimeSource::set) or [`advance`](TestTimeSource::advance)
82/// to simulate the passage of time during a flush cycle.
83#[cfg(test)]
84pub struct TestTimeSource {
85    now: std::cell::Cell<u64>,
86}
87
88#[cfg(test)]
89impl TestTimeSource {
90    /// Create a new [`TestTimeSource`] with the given initial time.
91    #[must_use]
92    pub fn new(initial_ms: u64) -> Self {
93        Self {
94            now: std::cell::Cell::new(initial_ms),
95        }
96    }
97
98    /// Set the current time to `ms` milliseconds.
99    pub fn set(&self, ms: u64) {
100        self.now.set(ms);
101    }
102
103    /// Advance the current time by `ms` milliseconds.
104    pub fn advance(&self, ms: u64) {
105        self.now.set(self.now.get() + ms);
106    }
107}
108
109#[cfg(test)]
110impl TimeSource for TestTimeSource {
111    fn now_ms(&self) -> u64 {
112        self.now.get()
113    }
114}
115
116// ---------------------------------------------------------------------------
117// TaskWaker
118// ---------------------------------------------------------------------------
119
120struct TaskWaker {
121    task_id: TaskId,
122    priority: Priority,
123}
124
125impl Wake for TaskWaker {
126    fn wake(self: Arc<Self>) {
127        let maybe_sched = EXECUTOR.with(|exec| {
128            if let Ok(mut ex) = exec.try_borrow_mut() {
129                match self.priority {
130                    Priority::High => ex.high_queue.push_back(self.task_id),
131                    Priority::Low => ex.low_queue.push_back(self.task_id),
132                }
133                // Only schedule a fresh flush if we're NOT already inside
134                // one (the running flush loop will pick up the task).
135                if ex.in_flush {
136                    None
137                } else {
138                    ex.try_schedule_flush()
139                }
140            } else {
141                PENDING_WAKES.with(|pw| {
142                    pw.borrow_mut().push((self.task_id, self.priority));
143                });
144                None
145            }
146        });
147        if let Some(sched) = maybe_sched {
148            sched.schedule(Box::new(flush));
149        }
150    }
151}
152
153// ---------------------------------------------------------------------------
154// TaskState
155// ---------------------------------------------------------------------------
156
157struct TaskState {
158    future: Pin<Box<dyn Future<Output = ()> + 'static>>,
159    priority: Priority,
160    scope_id: u64,
161}
162
163// ---------------------------------------------------------------------------
164// Executor
165// ---------------------------------------------------------------------------
166
167/// Information about a task panic, passed to the user-registered
168/// [`set_panic_hook`].
169#[derive(Debug)]
170pub struct PanicInfo {
171    /// The executor-assigned task id.
172    pub task_id: u64,
173    /// The scope that owned the task (0 for global tasks).
174    pub scope_id: u64,
175    /// The boxed panic payload.
176    pub payload: Box<dyn std::any::Any + Send>,
177}
178
179/// A single-threaded async task executor with priority queues.
180///
181/// Each [`Executor`] manages its own task slots, ready queues, and
182/// deferred callback buffers.  Use [`Executor::new_instance`] to create
183/// an isolated executor (e.g. per SSR request), or use the global
184/// thread-local executor via [`spawn_global`](crate::spawn_global).
185pub struct Executor {
186    high_queue: VecDeque<TaskId>,
187    low_queue: VecDeque<TaskId>,
188    tasks: Vec<Option<TaskState>>,
189    free_slots: Vec<TaskId>,
190    next_task_id: TaskId,
191    is_flush_scheduled: bool,
192    in_flush: bool,
193    deferred_ops: Vec<DeferredOp>,
194    /// Callbacks pushed by `Signal::set` via the schedule hook.
195    /// Drained at the start of every flush before polling tasks.
196    deferred_callbacks: Vec<Box<dyn FnOnce()>>,
197    flush_scheduler: Option<Rc<dyn ScheduleFlush>>,
198    time_source: Option<Rc<dyn TimeSource>>,
199    /// Maximum milliseconds to spend inside a single flush before
200    /// yielding back to the host event loop.  Default: 8 ms.
201    time_budget_ms: u64,
202    /// Optional hook invoked when a spawned task panics.
203    panic_hook: Option<Rc<dyn Fn(PanicInfo)>>,
204    /// Timer queue: map from deadline (ms) to task ids that should be
205    /// woken when that deadline expires.  Processed at the start of
206    /// every flush.
207    timers: BTreeMap<u64, Vec<TaskId>>,
208}
209
210// Set by the executor before polling a task, cleared afterward.
211// Lets futures discover their task id without threading it through
212// layers of combinators.
213thread_local! {
214    static CURRENT_POLLING_TASK: Cell<Option<TaskId>> = const { Cell::new(None) };
215}
216
217pub(crate) fn with_current_polling_task<R>(f: impl FnOnce(Option<TaskId>) -> R) -> R {
218    CURRENT_POLLING_TASK.with(|c| f(c.get()))
219}
220
221struct DeferredOp {
222    f: Box<dyn FnOnce()>,
223}
224
225impl Executor {
226    fn new() -> Self {
227        Self {
228            high_queue: VecDeque::new(),
229            low_queue: VecDeque::new(),
230            tasks: Vec::new(),
231            free_slots: Vec::new(),
232            next_task_id: 0,
233            is_flush_scheduled: false,
234            in_flush: false,
235            deferred_ops: Vec::new(),
236            deferred_callbacks: Vec::new(),
237            flush_scheduler: None,
238            time_source: None,
239            time_budget_ms: 8,
240            panic_hook: None,
241            timers: BTreeMap::new(),
242        }
243    }
244
245    fn allocate_id(&mut self) -> TaskId {
246        if let Some(id) = self.free_slots.pop() {
247            return id;
248        }
249        let id = self.next_task_id;
250        self.next_task_id += 1;
251        self.tasks.push(None);
252        id
253    }
254
255    fn free_slot(&mut self, task_id: TaskId) {
256        self.tasks[task_id as usize] = None;
257        self.free_slots.push(task_id);
258    }
259
260    fn enqueue(&mut self, task_id: TaskId) {
261        let priority = match self.tasks.get(task_id as usize).and_then(Option::as_ref) {
262            Some(t) => t.priority,
263            None => return,
264        };
265        match priority {
266            Priority::High => self.high_queue.push_back(task_id),
267            Priority::Low => self.low_queue.push_back(task_id),
268        }
269    }
270
271    fn dequeue(&mut self) -> Option<TaskId> {
272        self.high_queue
273            .pop_front()
274            .or_else(|| self.low_queue.pop_front())
275    }
276
277    /// Mark that a flush is needed and return the scheduler if one is
278    /// registered.  The caller **must** invoke the scheduler **after**
279    /// releasing the executor borrow.
280    fn try_schedule_flush(&mut self) -> Option<Rc<dyn ScheduleFlush>> {
281        if self.is_flush_scheduled {
282            return None;
283        }
284        self.is_flush_scheduled = true;
285        self.flush_scheduler.clone()
286    }
287
288    /// Return the current time in ms, or 0 if no [`TimeSource`] is
289    /// registered.  When this returns 0 the time-budget check is
290    /// effectively a no-op.
291    pub(crate) fn now_ms(&self) -> u64 {
292        self.time_source.as_ref().map_or(0, |ts| ts.now_ms())
293    }
294
295    /// Return the number of currently active (not-yet-completed) tasks.
296    ///
297    /// Used by streaming SSR to determine whether the stream should
298    /// wait for more work or terminate.
299    #[must_use]
300    pub fn active_task_count(&self) -> usize {
301        self.tasks.iter().filter(|t| t.is_some()).count()
302    }
303}
304
305// ---------------------------------------------------------------------------
306// Thread-local globals (default storage)
307// ---------------------------------------------------------------------------
308
309thread_local! {
310    static EXECUTOR: Rc<RefCell<Executor>> = Rc::new(RefCell::new(Executor::new()));
311    static PENDING_WAKES: RefCell<Vec<(TaskId, Priority)>> = const { RefCell::new(Vec::new()) };
312}
313
314// ---------------------------------------------------------------------------
315// Executor instance methods (for isolated executors, e.g. SSR)
316// ---------------------------------------------------------------------------
317
318impl Executor {
319    /// Create a new isolated executor, wrapped for shared access.
320    ///
321    /// The returned executor is independent of the global thread-local
322    /// executor.  Use [`with_executor`] to make it the current executor
323    /// for the duration of a closure, so that spawned tasks and signal
324    /// callbacks are routed to it.
325    #[must_use]
326    pub fn new_instance() -> Rc<RefCell<Executor>> {
327        Rc::new(RefCell::new(Executor::new()))
328    }
329
330    /// Install a flush scheduler on this executor instance.
331    pub fn install_flush_scheduler(ex: &Rc<RefCell<Executor>>, sched: Rc<dyn ScheduleFlush>) {
332        ex.borrow_mut().flush_scheduler = Some(sched);
333    }
334
335    /// Install a time source on this executor instance.
336    pub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>) {
337        ex.borrow_mut().time_source = Some(ts);
338    }
339
340    /// Set the maximum time (in milliseconds) a single flush may spend
341    /// before yielding back to the host event loop.
342    ///
343    /// The default is 8 ms.  Set to `u64::MAX` to disable time-budget
344    /// yielding (flush runs to completion).
345    pub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64) {
346        ex.borrow_mut().time_budget_ms = budget_ms;
347    }
348
349    /// Register a callback invoked whenever a spawned task panics.
350    ///
351    /// The default is no hook — panicking tasks are silently removed
352    /// from the executor (the same behaviour as a task returning
353    /// `Poll::Ready(())`).
354    ///
355    /// # Example
356    ///
357    /// ```rust,ignore
358    /// Executor::set_panic_hook(&ex, Rc::new(|info| {
359    ///     eprintln!("task {} in scope {} panicked", info.task_id, info.scope_id);
360    /// }));
361    /// ```
362    pub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>) {
363        ex.borrow_mut().panic_hook = Some(hook);
364    }
365
366    /// Register a timer: when `now_ms() >= deadline_ms`, enqueue
367    /// `task_id` so it gets polled on the next flush.
368    pub(crate) fn schedule_timer(ex: &Rc<RefCell<Executor>>, deadline_ms: u64, task_id: TaskId) {
369        let mut e = ex.borrow_mut();
370        e.timers.entry(deadline_ms).or_default().push(task_id);
371        // Request a flush so the timer is checked.
372        e.is_flush_scheduled = false;
373        let maybe_sched = e.try_schedule_flush();
374        drop(e);
375        if let Some(sched) = maybe_sched {
376            let ex2 = Rc::clone(ex);
377            sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
378        }
379    }
380
381    /// Spawn a future on this executor instance.
382    pub fn spawn(ex: &Rc<RefCell<Executor>>, future: impl Future<Output = ()> + 'static) {
383        let maybe_sched = {
384            let mut e = ex.borrow_mut();
385            let tid = e.allocate_id();
386            e.tasks[tid as usize] = Some(TaskState {
387                future: Box::pin(future),
388                priority: Priority::Low,
389                scope_id: 0,
390            });
391            e.enqueue(tid);
392            e.try_schedule_flush()
393        };
394        if let Some(sched) = maybe_sched {
395            let ex2 = Rc::clone(ex);
396            sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
397        }
398    }
399
400    /// Run a full flush cycle on this executor instance.
401    ///
402    /// Mirrors the global flush cycle but operates on an
403    /// isolated executor (used for SSR).  Includes all the same
404    /// protections: `catch_unwind`, suspend checks, time-budget
405    /// yielding, and callback-drain budget.
406    #[allow(clippy::too_many_lines)]
407    pub fn flush_instance(ex: &Rc<RefCell<Executor>>) {
408        // Guard against re-entrant flushes.
409        {
410            let mut e = ex.borrow_mut();
411            if e.in_flush {
412                #[cfg(debug_assertions)]
413                {
414                    eprintln!(
415                        "[auralis-task] WARNING: Executor::flush_instance called \
416                         re-entrantly (already inside a flush). This is a no-op. \
417                         Check for nested flush() calls in signal callbacks or \
418                         ScheduleFlush implementations."
419                    );
420                }
421                return;
422            }
423            e.in_flush = true;
424        }
425
426        // Step 0: drain expired timers.
427        {
428            let mut e = ex.borrow_mut();
429            let now = e.now_ms();
430            if now > 0 {
431                let expired: Vec<u64> = e.timers.keys().copied().take_while(|&d| d <= now).collect();
432                for deadline in expired {
433                    if let Some(tasks) = e.timers.remove(&deadline) {
434                        for tid in tasks {
435                            e.enqueue(tid);
436                        }
437                    }
438                }
439            }
440        }
441
442        // Step 1: deferred ops.
443        let deferred = std::mem::take(&mut ex.borrow_mut().deferred_ops);
444        for op in deferred {
445            (op.f)();
446        }
447
448        // Step 2: drain deferred signal callbacks with time budget.
449        {
450            let cb_start = ex.borrow().now_ms();
451            loop {
452                let callbacks = std::mem::take(&mut ex.borrow_mut().deferred_callbacks);
453                if callbacks.is_empty() {
454                    break;
455                }
456                for cb in callbacks {
457                    cb();
458                }
459                if ex.borrow().now_ms().saturating_sub(cb_start) >= ex.borrow().time_budget_ms {
460                    if !ex.borrow().deferred_callbacks.is_empty() {
461                        let (sched, ex2) = {
462                            let mut e = ex.borrow_mut();
463                            e.in_flush = false;
464                            e.is_flush_scheduled = false;
465                            (e.try_schedule_flush(), Rc::clone(ex))
466                        };
467                        if let Some(sched) = sched {
468                            sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
469                        }
470                        return;
471                    }
472                    break;
473                }
474            }
475        }
476
477        // Step 3: main poll loop with time-budget check.
478        let poll_start = ex.borrow().now_ms();
479        loop {
480            let task_id = ex.borrow_mut().dequeue();
481            let Some(tid) = task_id else {
482                let mut e = ex.borrow_mut();
483                e.is_flush_scheduled = false;
484                e.in_flush = false;
485                break;
486            };
487
488            // Take the task out so the poll doesn't hold an executor borrow.
489            let maybe_state = ex.borrow_mut().tasks[tid as usize].take();
490            if let Some(mut state) = maybe_state {
491                let priority = state.priority;
492                let scope_id = state.scope_id;
493
494                // Check if the owning scope is suspended.
495                let scope = crate::scope::find_scope(scope_id);
496                if let Some(ref s) = scope {
497                    if s.is_suspended() {
498                        let mut e = ex.borrow_mut();
499                        if e.tasks[tid as usize].is_none() {
500                            e.tasks[tid as usize] = Some(state);
501                        }
502                        continue;
503                    }
504                }
505
506                let waker = Waker::from(Arc::new(TaskWaker {
507                    task_id: tid,
508                    priority,
509                }));
510                let mut cx = Context::from_waker(&waker);
511
512                // Inject owning scope.
513                let prev_scope = crate::scope::get_scope_direct();
514                if scope.is_some() {
515                    crate::scope::set_scope_direct(scope);
516                }
517
518                // Let futures discover their task id (used by timer::sleep).
519                CURRENT_POLLING_TASK.with(|c| c.set(Some(tid)));
520
521                // Task isolation (non-Wasm).
522                #[cfg(not(target_arch = "wasm32"))]
523                let result: Result<Poll<()>, Box<dyn std::any::Any + Send>> =
524                    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
525                        state.future.as_mut().poll(&mut cx)
526                    }));
527                #[cfg(target_arch = "wasm32")]
528                let poll = state.future.as_mut().poll(&mut cx);
529
530                CURRENT_POLLING_TASK.with(|c| c.set(None));
531                crate::scope::set_scope_direct(prev_scope);
532
533                #[cfg(not(target_arch = "wasm32"))]
534                {
535                    match result {
536                        Ok(Poll::Ready(())) => {
537                            ex.borrow_mut().free_slot(tid);
538                        }
539                        Err(payload) => {
540                            // Notify the panic hook (if any) before freeing the slot.
541                            let hook = ex.borrow().panic_hook.clone();
542                            if let Some(h) = hook {
543                                h(PanicInfo {
544                                    task_id: tid,
545                                    scope_id,
546                                    payload,
547                                });
548                            }
549                            ex.borrow_mut().free_slot(tid);
550                        }
551                        Ok(Poll::Pending) => {
552                            let mut e = ex.borrow_mut();
553                            if e.tasks[tid as usize].is_none() {
554                                e.tasks[tid as usize] = Some(state);
555                            }
556                        }
557                    }
558                }
559                #[cfg(target_arch = "wasm32")]
560                {
561                    match poll {
562                        Poll::Ready(()) => {
563                            ex.borrow_mut().free_slot(tid);
564                        }
565                        Poll::Pending => {
566                            let mut e = ex.borrow_mut();
567                            if e.tasks[tid as usize].is_none() {
568                                e.tasks[tid as usize] = Some(state);
569                            }
570                        }
571                    }
572                }
573            }
574
575            // Time budget check.
576            {
577                let elapsed = ex.borrow().now_ms().saturating_sub(poll_start);
578                if elapsed >= ex.borrow().time_budget_ms {
579                    let (maybe_sched, ex_clone) = {
580                        let mut e = ex.borrow_mut();
581                        e.is_flush_scheduled = false;
582                        e.in_flush = false;
583                        let sched = if !e.high_queue.is_empty() || !e.low_queue.is_empty() {
584                            e.try_schedule_flush()
585                        } else {
586                            None
587                        };
588                        (sched, Rc::clone(ex))
589                    };
590                    if let Some(sched) = maybe_sched {
591                        sched.schedule(Box::new(move || Self::flush_instance(&ex_clone)));
592                    }
593                    break;
594                }
595            }
596        }
597    }
598}
599
600// ---------------------------------------------------------------------------
601// Current-executor storage — injectable, defaults to thread-local
602// ---------------------------------------------------------------------------
603
604type ExecutorRef = Rc<RefCell<Executor>>;
605
606thread_local! {
607    static CURRENT_EXECUTOR: RefCell<Option<ExecutorRef>> = const { RefCell::new(None) };
608}
609
610/// Run `f` with `ex` set as the current executor.
611///
612/// Signal callbacks and `spawn_global` calls inside `f` will be routed
613/// to `ex` instead of the global thread-local executor.  Restores the
614/// previous executor afterward.
615///
616/// # Signal routing constraints
617///
618/// Auralis uses a **single global schedule hook** (installed once by the
619/// first call to [`init_flush_scheduler`]) that decides where signal
620/// notifications land by checking the current executor **at the time the
621/// notification fires**, not at the time `Signal::set` is called.
622///
623/// This design implies two hard requirements for multi-instance users:
624///
625/// 1. **`init_flush_scheduler` must be called at least once** — without
626///    it, `Signal::set` falls back to synchronous callback execution,
627///    which breaks the deferred-notification model and can cause
628///    re-entrant borrow panics.
629/// 2. **The instance executor must still be "current" when the flush
630///    runs** — if `with_executor` has already exited, deferred callbacks
631///    from signals set inside `f` will be routed to the global executor
632///    (or synchronously if no global hook is installed).
633///
634/// For the typical single-threaded case (Wasm, game loop, CLI), both
635/// requirements are satisfied trivially: call `init_flush_scheduler`
636/// once at startup and never use `with_executor`.  For SSR / multi-tenant
637/// servers, ensure that `with_executor` wraps the entire request
638/// lifecycle — from signal creation through the final flush.
639///
640/// # Example
641///
642/// ```rust,ignore
643/// use auralis_task::Executor;
644///
645/// let ex = Executor::new_instance();
646/// Executor::install_flush_scheduler(&ex, my_scheduler);
647/// auralis_task::with_executor(&ex, || {
648///     // Signal notifications and task spawns here go to `ex`.
649/// });
650/// ```
651pub fn with_executor<R>(ex: &ExecutorRef, f: impl FnOnce() -> R) -> R {
652    CURRENT_EXECUTOR.with(|exec| {
653        let prev = exec.borrow_mut().replace(Rc::clone(ex));
654        let result = f();
655        *exec.borrow_mut() = prev;
656        result
657    })
658}
659
660/// Return the current executor, if any.
661///
662/// If no executor has been set via [`with_executor`], returns `None` —
663/// callers should fall back to the global thread-local executor.
664fn current_executor() -> Option<ExecutorRef> {
665    CURRENT_EXECUTOR.with(|exec| exec.borrow().clone())
666}
667
668/// Return the currently active executor instance.
669///
670/// If [`with_executor`] was used to set an instance executor, returns
671/// that; otherwise returns the global thread-local executor.
672pub(crate) fn current_executor_instance() -> ExecutorRef {
673    current_executor().unwrap_or_else(|| EXECUTOR.with(Rc::clone))
674}
675
676/// Return the current time in milliseconds from the active executor's
677/// [`TimeSource`], or 0 if none is installed.
678pub(crate) fn current_time_ms() -> u64 {
679    current_executor_instance().borrow().now_ms()
680}
681
682// ---------------------------------------------------------------------------
683// Helpers — use thread_local EXECUTOR
684// ---------------------------------------------------------------------------
685
686fn drain_pending_wakes() {
687    PENDING_WAKES.with(|pw| {
688        let wakes = std::mem::take(&mut *pw.borrow_mut());
689        if wakes.is_empty() {
690            return;
691        }
692        EXECUTOR.with(|exec| {
693            let maybe_sched = {
694                let mut ex = exec.borrow_mut();
695                for (id, priority) in wakes {
696                    match priority {
697                        Priority::High => ex.high_queue.push_back(id),
698                        Priority::Low => ex.low_queue.push_back(id),
699                    }
700                }
701                ex.try_schedule_flush()
702            };
703            if let Some(sched) = maybe_sched {
704                sched.schedule(Box::new(flush));
705            }
706        });
707    });
708}
709
710// ---------------------------------------------------------------------------
711// Flush
712// ---------------------------------------------------------------------------
713
714fn flush() {
715    EXECUTOR.with(Executor::flush_instance);
716    // Drain any wakes that landed in PENDING_WAKES because the executor
717    // RefCell was borrowed during a callback or task poll.
718    drain_pending_wakes();
719}
720
721// ---------------------------------------------------------------------------
722// Public API
723// ---------------------------------------------------------------------------
724
725/// Set the platform flush scheduler and install the signal deferred-
726/// callback hook (idempotent — subsequent calls are no-ops for the hook).
727pub fn init_flush_scheduler(sched: Rc<dyn ScheduleFlush>) {
728    EXECUTOR.with(|exec| exec.borrow_mut().flush_scheduler = Some(sched));
729    install_signal_hook_once();
730}
731
732/// Install the hook that bridges `auralis_signal::Signal::set` to the
733/// executor's deferred-callback queue.
734///
735/// Idempotent — safe to call multiple times.
736fn install_signal_hook_once() {
737    use std::sync::OnceLock;
738    static INSTALLED: OnceLock<()> = OnceLock::new();
739    INSTALLED.get_or_init(|| {
740        auralis_signal::install_schedule_hook(Box::new(|cb: Box<dyn FnOnce()>| {
741            // Prefer the current executor (set via `with_executor`) for
742            // SSR multi-request isolation; fall back to the global one.
743            if let Some(ex) = current_executor() {
744                let maybe_sched = {
745                    let mut e = ex.borrow_mut();
746                    e.deferred_callbacks.push(cb);
747                    if e.in_flush {
748                        None
749                    } else {
750                        e.try_schedule_flush()
751                    }
752                };
753                if let Some(sched) = maybe_sched {
754                    let ex2 = Rc::clone(&ex);
755                    sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
756                }
757            } else {
758                EXECUTOR.with(|exec| {
759                    let maybe_sched = {
760                        let mut ex = exec.borrow_mut();
761                        ex.deferred_callbacks.push(cb);
762                        if ex.in_flush {
763                            None
764                        } else {
765                            ex.try_schedule_flush()
766                        }
767                    };
768                    if let Some(sched) = maybe_sched {
769                        sched.schedule(Box::new(flush));
770                    }
771                });
772            }
773        }));
774    });
775}
776
777/// Set the platform time source used for time-budget accounting.
778///
779/// If no [`TimeSource`] is registered the executor runs every flush to
780/// completion without yielding, which is acceptable for short-running
781/// workloads but may cause frame drops in the browser.
782pub fn init_time_source(ts: Rc<dyn TimeSource>) {
783    EXECUTOR.with(|exec| exec.borrow_mut().time_source = Some(ts));
784}
785
786/// Set the per-flush time budget on the global executor.
787///
788/// See [`Executor::set_time_budget`] for details.
789pub fn set_global_time_budget(budget_ms: u64) {
790    EXECUTOR.with(|exec| exec.borrow_mut().time_budget_ms = budget_ms);
791}
792
793/// Register a global panic hook called when any globally-spawned
794/// task panics.
795///
796/// See [`Executor::set_panic_hook`] for details.
797pub fn set_panic_hook(hook: Rc<dyn Fn(PanicInfo)>) {
798    EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = Some(hook));
799}
800
801/// Remove the global panic hook, restoring the default silent
802/// behaviour.
803pub fn remove_panic_hook() {
804    EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = None);
805}
806
807/// Spawn a future on the global executor at low priority.
808pub fn spawn_global(future: impl Future<Output = ()> + 'static) {
809    spawn_global_with_priority(Priority::Low, future);
810}
811
812/// Spawn a future on the global executor at the given priority.
813pub fn spawn_global_with_priority(priority: Priority, future: impl Future<Output = ()> + 'static) {
814    spawn_inner(Box::pin(future), priority, 0);
815}
816
817pub(crate) fn spawn_scoped(
818    priority: Priority,
819    scope_id: u64,
820    future: impl Future<Output = ()> + 'static,
821) -> TaskId {
822    spawn_inner(Box::pin(future), priority, scope_id)
823}
824
825fn spawn_inner(
826    future: Pin<Box<dyn Future<Output = ()> + 'static>>,
827    priority: Priority,
828    scope_id: u64,
829) -> TaskId {
830    EXECUTOR.with(|exec| {
831        let (task_id, maybe_sched) = {
832            let mut ex = exec.borrow_mut();
833            let task_id = ex.allocate_id();
834            ex.tasks[task_id as usize] = Some(TaskState {
835                future,
836                priority,
837                scope_id,
838            });
839            ex.enqueue(task_id);
840            let sched = ex.try_schedule_flush();
841            (task_id, sched)
842        };
843        // Schedule outside the borrow.
844        if let Some(sched) = maybe_sched {
845            sched.schedule(Box::new(flush));
846        }
847        task_id
848    })
849}
850
851/// Enqueue all tasks belonging to `scope_id` and trigger a flush.
852///
853/// Used by [`TaskScope::resume`] to restart tasks after a suspend.
854pub(crate) fn enqueue_scope_tasks(scope_id: u64) {
855    EXECUTOR.with(|exec| {
856        let task_ids: Vec<TaskId> = {
857            let ex = exec.borrow();
858            ex.tasks
859                .iter()
860                .enumerate()
861                .filter(|(_, slot)| slot.as_ref().is_some_and(|t| t.scope_id == scope_id))
862                .map(|(idx, _)| idx as TaskId)
863                .collect()
864        };
865        let maybe_sched = {
866            let mut ex = exec.borrow_mut();
867            for tid in task_ids {
868                ex.enqueue(tid);
869            }
870            if ex.in_flush {
871                None
872            } else {
873                ex.try_schedule_flush()
874            }
875        };
876        if let Some(sched) = maybe_sched {
877            sched.schedule(Box::new(flush));
878        }
879    });
880}
881
882pub(crate) fn cancel_scope_tasks(scope_id: u64) -> Vec<Pin<Box<dyn Future<Output = ()>>>> {
883    EXECUTOR.with(|exec| {
884        let mut ex = exec.borrow_mut();
885        let mut dropped = Vec::new();
886
887        for slot in &mut ex.tasks {
888            if let Some(ref t) = slot {
889                if t.scope_id == scope_id {
890                    dropped.push(
891                        slot.take()
892                            .expect("task slot was None after is_some check")
893                            .future,
894                    );
895                }
896            }
897        }
898
899        // Filter queues.
900        let high: Vec<TaskId> = ex
901            .high_queue
902            .iter()
903            .filter(|id| ex.tasks[**id as usize].is_some())
904            .copied()
905            .collect();
906        ex.high_queue.clear();
907        ex.high_queue.extend(high);
908
909        let low: Vec<TaskId> = ex
910            .low_queue
911            .iter()
912            .filter(|id| ex.tasks[**id as usize].is_some())
913            .copied()
914            .collect();
915        ex.low_queue.clear();
916        ex.low_queue.extend(low);
917
918        let mut all_free: Vec<TaskId> = ex
919            .tasks
920            .iter()
921            .enumerate()
922            .filter(|(_, s)| s.is_none())
923            .map(|(i, _)| i as TaskId)
924            .chain(ex.free_slots.iter().copied())
925            .collect();
926        all_free.sort_unstable();
927        all_free.dedup();
928        ex.free_slots = all_free;
929
930        dropped
931    })
932}
933
934// ---------------------------------------------------------------------------
935// yield_now
936// ---------------------------------------------------------------------------
937
938/// Return a [`Future`] that yields control back to the executor once.
939#[must_use = "yield_now() does nothing unless awaited"]
940pub fn yield_now() -> YieldNow {
941    YieldNow { yielded: false }
942}
943
944/// Future returned by [`yield_now`].
945#[derive(Debug)]
946#[must_use = "futures do nothing unless polled"]
947pub struct YieldNow {
948    yielded: bool,
949}
950
951impl Future for YieldNow {
952    type Output = ();
953
954    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
955        if self.yielded {
956            Poll::Ready(())
957        } else {
958            self.yielded = true;
959            cx.waker().wake_by_ref();
960            Poll::Pending
961        }
962    }
963}
964
965// ---------------------------------------------------------------------------
966// schedule_callback — hook for auralis-signal's deferred callback model
967// ---------------------------------------------------------------------------
968
969/// Schedule a closure to run at the start of the next executor flush.
970///
971/// Used internally by `auralis_signal` to defer subscriber callback
972/// execution.  The closure is drained before the main poll loop.
973pub fn schedule_callback(f: Box<dyn FnOnce()>) {
974    EXECUTOR.with(|exec| {
975        let maybe_sched = {
976            let mut ex = exec.borrow_mut();
977            ex.deferred_callbacks.push(f);
978            if ex.in_flush {
979                None
980            } else {
981                ex.try_schedule_flush()
982            }
983        };
984        if let Some(sched) = maybe_sched {
985            sched.schedule(Box::new(flush));
986        }
987    });
988}
989
990// ---------------------------------------------------------------------------
991// set_deferred
992// ---------------------------------------------------------------------------
993
994/// Schedule a [`Signal::set`] call for the **next** executor flush.
995///
996/// Safe to call from inside [`Drop`] — the actual `signal.set(value)` is
997/// deferred to a subsequent flush, avoiding re-entrant borrow panics.
998pub fn set_deferred<T: 'static>(signal: &Signal<T>, value: T) {
999    let signal = signal.clone();
1000    EXECUTOR.with(|exec| {
1001        let maybe_sched = {
1002            let mut ex = exec.borrow_mut();
1003            ex.deferred_ops.push(DeferredOp {
1004                f: Box::new(move || signal.set(value)),
1005            });
1006            ex.try_schedule_flush()
1007        };
1008        if let Some(sched) = maybe_sched {
1009            sched.schedule(Box::new(flush));
1010        }
1011    });
1012}
1013
1014// ---------------------------------------------------------------------------
1015// Test / debug helpers
1016// ---------------------------------------------------------------------------
1017
1018/// Completely reset the global executor to a pristine state.
1019///
1020/// Clears all task slots, queues, deferred ops, flush/scheduler flags,
1021/// and injected [`ScheduleFlush`]/[`TimeSource`].  Call at the start
1022/// of every test to prevent cross-test state leakage.
1023///
1024/// # Safety / usage
1025///
1026/// This function is intended **only** for testing.  Calling it while
1027/// the executor is processing tasks will silently drop all live
1028/// futures and may cause panics or undefined behavior in running
1029/// application code.
1030pub fn reset_executor_for_test() {
1031    PENDING_WAKES.with(|pw| pw.borrow_mut().clear());
1032    EXECUTOR.with(|exec| {
1033        let mut ex = exec.borrow_mut();
1034        ex.high_queue.clear();
1035        ex.low_queue.clear();
1036        ex.tasks.clear();
1037        ex.free_slots.clear();
1038        ex.next_task_id = 0;
1039        ex.is_flush_scheduled = false;
1040        ex.in_flush = false;
1041        ex.deferred_ops.clear();
1042        ex.deferred_callbacks.clear();
1043        ex.flush_scheduler = None;
1044        ex.time_source = None;
1045    });
1046    crate::scope::clear_scope_registry();
1047}
1048
1049#[cfg(test)]
1050pub(crate) fn debug_task_count() -> usize {
1051    EXECUTOR.with(|exec| exec.borrow().tasks.iter().filter(|t| t.is_some()).count())
1052}
1053
1054/// Return a snapshot of all active tasks: `(task_id, priority, scope_id)`.
1055#[cfg(feature = "debug")]
1056pub(crate) fn debug_task_snapshot() -> Vec<(TaskId, Priority, u64)> {
1057    EXECUTOR.with(|exec| {
1058        let ex = exec.borrow();
1059        let mut snap = Vec::new();
1060        for (idx, slot) in ex.tasks.iter().enumerate() {
1061            if let Some(ref t) = slot {
1062                snap.push((idx as u64, t.priority, t.scope_id));
1063            }
1064        }
1065        snap
1066    })
1067}
1068
1069/// Return the set of task IDs currently in the ready queues.
1070#[cfg(feature = "debug")]
1071pub(crate) fn debug_queued_task_ids() -> Vec<TaskId> {
1072    EXECUTOR.with(|exec| {
1073        let ex = exec.borrow();
1074        let mut ids: Vec<TaskId> = ex
1075            .high_queue
1076            .iter()
1077            .chain(ex.low_queue.iter())
1078            .copied()
1079            .collect();
1080        ids.sort_unstable();
1081        ids.dedup();
1082        ids
1083    })
1084}
1085
1086/// Spawn a task without triggering an automatic flush.
1087/// Used in tests to batch multiple spawns before executing them.
1088#[cfg(test)]
1089pub(crate) fn spawn_no_auto_flush(
1090    priority: Priority,
1091    future: impl Future<Output = ()> + 'static,
1092) -> TaskId {
1093    EXECUTOR.with(|exec| {
1094        let mut ex = exec.borrow_mut();
1095        let task_id = ex.allocate_id();
1096        ex.tasks[task_id as usize] = Some(TaskState {
1097            future: Box::pin(future),
1098            priority,
1099            scope_id: 0,
1100        });
1101        ex.enqueue(task_id);
1102        // Do NOT schedule flush.
1103        task_id
1104    })
1105}
1106
1107/// Run a manual flush cycle (for tests that need to control timing).
1108#[cfg(test)]
1109pub(crate) fn flush_all() {
1110    flush();
1111}