auralis_task/executor.rs
1//! Single-threaded executor with priority scheduling, time-budget
2//! awareness, and deferred-signal support.
3//!
4//! ## Architecture
5//!
6//! The executor is stored via a pluggable [`ExecutorStorage`] strategy
7//! (defaulting to a per-thread slot). Before polling a task the future
8//! is **temporarily removed** so that the poll never holds an executor
9//! borrow — this allows nested spawns, wakes, and `set_deferred` calls
10//! without `RefCell` panics.
11//!
12//! The waker carries only a `task_id: u64`, making it trivially
13//! [`Send`] + [`Sync`] for [`Waker::from`].
14
15#![allow(clippy::cast_possible_truncation)]
16
17use std::cell::RefCell;
18use std::collections::VecDeque;
19use std::future::Future;
20use std::pin::Pin;
21use std::rc::Rc;
22use std::sync::Arc;
23use std::task::{Context, Poll, Wake, Waker};
24
25use auralis_signal::Signal;
26
27use crate::Priority;
28
29// ---------------------------------------------------------------------------
30// Types
31// ---------------------------------------------------------------------------
32
33type TaskId = u64;
34
35// ---------------------------------------------------------------------------
36// ScheduleFlush
37// ---------------------------------------------------------------------------
38
39/// Platform hook for scheduling a microtask callback.
40pub trait ScheduleFlush {
41 /// Request that `callback` runs at the next microtask boundary.
42 fn schedule(&self, callback: Box<dyn FnOnce()>);
43}
44
45/// A [`ScheduleFlush`] that fires the callback synchronously.
46///
47/// Makes the executor run-to-completion in unit tests without a browser
48/// event loop.
49#[cfg(test)]
50pub struct TestScheduleFlush;
51
52#[cfg(test)]
53impl ScheduleFlush for TestScheduleFlush {
54 fn schedule(&self, callback: Box<dyn FnOnce()>) {
55 callback();
56 }
57}
58
59// ---------------------------------------------------------------------------
60// TimeSource
61// ---------------------------------------------------------------------------
62
63/// High-resolution time source for the executor's time-budget
64/// accounting.
65///
66/// When registered via [`init_time_source`], the executor queries this
67/// before and after each task poll to decide whether it should yield
68/// control back to the host event loop (default budget: 8 ms).
69///
70/// In Wasm environments the implementation typically delegates to
71/// `performance.now()`. If no [`TimeSource`] is registered the time
72/// budget check is a no-op and the executor runs tasks until the
73/// queues are drained.
74pub trait TimeSource {
75 /// Return the current time in milliseconds.
76 fn now_ms(&self) -> u64;
77}
78
79/// A [`TimeSource`] whose value is explicitly controlled by the test.
80///
81/// Use [`set`](TestTimeSource::set) or [`advance`](TestTimeSource::advance)
82/// to simulate the passage of time during a flush cycle.
83#[cfg(test)]
84pub struct TestTimeSource {
85 now: std::cell::Cell<u64>,
86}
87
88#[cfg(test)]
89impl TestTimeSource {
90 /// Create a new [`TestTimeSource`] with the given initial time.
91 #[must_use]
92 pub fn new(initial_ms: u64) -> Self {
93 Self {
94 now: std::cell::Cell::new(initial_ms),
95 }
96 }
97
98 /// Set the current time to `ms` milliseconds.
99 pub fn set(&self, ms: u64) {
100 self.now.set(ms);
101 }
102
103 /// Advance the current time by `ms` milliseconds.
104 pub fn advance(&self, ms: u64) {
105 self.now.set(self.now.get() + ms);
106 }
107}
108
109#[cfg(test)]
110impl TimeSource for TestTimeSource {
111 fn now_ms(&self) -> u64 {
112 self.now.get()
113 }
114}
115
116// ---------------------------------------------------------------------------
117// TaskWaker
118// ---------------------------------------------------------------------------
119
120struct TaskWaker {
121 task_id: TaskId,
122 priority: Priority,
123}
124
125impl Wake for TaskWaker {
126 fn wake(self: Arc<Self>) {
127 let maybe_sched = EXECUTOR.with(|exec| {
128 if let Ok(mut ex) = exec.try_borrow_mut() {
129 match self.priority {
130 Priority::High => ex.high_queue.push_back(self.task_id),
131 Priority::Low => ex.low_queue.push_back(self.task_id),
132 }
133 // Only schedule a fresh flush if we're NOT already inside
134 // one (the running flush loop will pick up the task).
135 if ex.in_flush {
136 None
137 } else {
138 ex.try_schedule_flush()
139 }
140 } else {
141 PENDING_WAKES.with(|pw| {
142 pw.borrow_mut().push((self.task_id, self.priority));
143 });
144 None
145 }
146 });
147 if let Some(sched) = maybe_sched {
148 sched.schedule(Box::new(flush));
149 }
150 }
151}
152
153// ---------------------------------------------------------------------------
154// TaskState
155// ---------------------------------------------------------------------------
156
157struct TaskState {
158 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
159 priority: Priority,
160 scope_id: u64,
161}
162
163// ---------------------------------------------------------------------------
164// Executor
165// ---------------------------------------------------------------------------
166
167/// Information about a task panic, passed to the user-registered
168/// [`set_panic_hook`].
169#[derive(Debug)]
170pub struct PanicInfo {
171 /// The executor-assigned task id.
172 pub task_id: u64,
173 /// The scope that owned the task (0 for global tasks).
174 pub scope_id: u64,
175 /// The boxed panic payload.
176 pub payload: Box<dyn std::any::Any + Send>,
177}
178
179/// A single-threaded async task executor with priority queues.
180///
181/// Each [`Executor`] manages its own task slots, ready queues, and
182/// deferred callback buffers. Use [`Executor::new_instance`] to create
183/// an isolated executor (e.g. per SSR request), or use the global
184/// thread-local executor via [`spawn_global`](crate::spawn_global).
185pub struct Executor {
186 high_queue: VecDeque<TaskId>,
187 low_queue: VecDeque<TaskId>,
188 tasks: Vec<Option<TaskState>>,
189 free_slots: Vec<TaskId>,
190 next_task_id: TaskId,
191 is_flush_scheduled: bool,
192 in_flush: bool,
193 deferred_ops: Vec<DeferredOp>,
194 /// Callbacks pushed by `Signal::set` via the schedule hook.
195 /// Drained at the start of every flush before polling tasks.
196 deferred_callbacks: Vec<Box<dyn FnOnce()>>,
197 flush_scheduler: Option<Rc<dyn ScheduleFlush>>,
198 time_source: Option<Rc<dyn TimeSource>>,
199 /// Maximum milliseconds to spend inside a single flush before
200 /// yielding back to the host event loop. Default: 8 ms.
201 time_budget_ms: u64,
202 /// Optional hook invoked when a spawned task panics.
203 panic_hook: Option<Rc<dyn Fn(PanicInfo)>>,
204}
205
206struct DeferredOp {
207 f: Box<dyn FnOnce()>,
208}
209
210impl Executor {
211 fn new() -> Self {
212 Self {
213 high_queue: VecDeque::new(),
214 low_queue: VecDeque::new(),
215 tasks: Vec::new(),
216 free_slots: Vec::new(),
217 next_task_id: 0,
218 is_flush_scheduled: false,
219 in_flush: false,
220 deferred_ops: Vec::new(),
221 deferred_callbacks: Vec::new(),
222 flush_scheduler: None,
223 time_source: None,
224 time_budget_ms: 8,
225 panic_hook: None,
226 }
227 }
228
229 fn allocate_id(&mut self) -> TaskId {
230 if let Some(id) = self.free_slots.pop() {
231 return id;
232 }
233 let id = self.next_task_id;
234 self.next_task_id += 1;
235 self.tasks.push(None);
236 id
237 }
238
239 fn free_slot(&mut self, task_id: TaskId) {
240 self.tasks[task_id as usize] = None;
241 self.free_slots.push(task_id);
242 }
243
244 fn enqueue(&mut self, task_id: TaskId) {
245 let priority = match self.tasks.get(task_id as usize).and_then(Option::as_ref) {
246 Some(t) => t.priority,
247 None => return,
248 };
249 match priority {
250 Priority::High => self.high_queue.push_back(task_id),
251 Priority::Low => self.low_queue.push_back(task_id),
252 }
253 }
254
255 fn dequeue(&mut self) -> Option<TaskId> {
256 self.high_queue
257 .pop_front()
258 .or_else(|| self.low_queue.pop_front())
259 }
260
261 /// Mark that a flush is needed and return the scheduler if one is
262 /// registered. The caller **must** invoke the scheduler **after**
263 /// releasing the executor borrow.
264 fn try_schedule_flush(&mut self) -> Option<Rc<dyn ScheduleFlush>> {
265 if self.is_flush_scheduled {
266 return None;
267 }
268 self.is_flush_scheduled = true;
269 self.flush_scheduler.clone()
270 }
271
272 /// Return the current time in ms, or 0 if no [`TimeSource`] is
273 /// registered. When this returns 0 the time-budget check is
274 /// effectively a no-op.
275 pub(crate) fn now_ms(&self) -> u64 {
276 self.time_source.as_ref().map_or(0, |ts| ts.now_ms())
277 }
278
279 /// Return the number of currently active (not-yet-completed) tasks.
280 ///
281 /// Used by streaming SSR to determine whether the stream should
282 /// wait for more work or terminate.
283 #[must_use]
284 pub fn active_task_count(&self) -> usize {
285 self.tasks.iter().filter(|t| t.is_some()).count()
286 }
287}
288
289// ---------------------------------------------------------------------------
290// Thread-local globals (default storage)
291// ---------------------------------------------------------------------------
292
293thread_local! {
294 static EXECUTOR: Rc<RefCell<Executor>> = Rc::new(RefCell::new(Executor::new()));
295 static PENDING_WAKES: RefCell<Vec<(TaskId, Priority)>> = const { RefCell::new(Vec::new()) };
296}
297
298// ---------------------------------------------------------------------------
299// Executor instance methods (for isolated executors, e.g. SSR)
300// ---------------------------------------------------------------------------
301
302impl Executor {
303 /// Create a new isolated executor, wrapped for shared access.
304 ///
305 /// The returned executor is independent of the global thread-local
306 /// executor. Use [`with_executor`] to make it the current executor
307 /// for the duration of a closure, so that spawned tasks and signal
308 /// callbacks are routed to it.
309 #[must_use]
310 pub fn new_instance() -> Rc<RefCell<Executor>> {
311 Rc::new(RefCell::new(Executor::new()))
312 }
313
314 /// Install a flush scheduler on this executor instance.
315 pub fn install_flush_scheduler(ex: &Rc<RefCell<Executor>>, sched: Rc<dyn ScheduleFlush>) {
316 ex.borrow_mut().flush_scheduler = Some(sched);
317 }
318
319 /// Install a time source on this executor instance.
320 pub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>) {
321 ex.borrow_mut().time_source = Some(ts);
322 }
323
324 /// Set the maximum time (in milliseconds) a single flush may spend
325 /// before yielding back to the host event loop.
326 ///
327 /// The default is 8 ms. Set to `u64::MAX` to disable time-budget
328 /// yielding (flush runs to completion).
329 pub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64) {
330 ex.borrow_mut().time_budget_ms = budget_ms;
331 }
332
333 /// Register a callback invoked whenever a spawned task panics.
334 ///
335 /// The default is no hook — panicking tasks are silently removed
336 /// from the executor (the same behaviour as a task returning
337 /// `Poll::Ready(())`).
338 ///
339 /// # Example
340 ///
341 /// ```rust,ignore
342 /// Executor::set_panic_hook(&ex, Rc::new(|info| {
343 /// eprintln!("task {} in scope {} panicked", info.task_id, info.scope_id);
344 /// }));
345 /// ```
346 pub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>) {
347 ex.borrow_mut().panic_hook = Some(hook);
348 }
349
350 /// Spawn a future on this executor instance.
351 pub fn spawn(ex: &Rc<RefCell<Executor>>, future: impl Future<Output = ()> + 'static) {
352 let maybe_sched = {
353 let mut e = ex.borrow_mut();
354 let tid = e.allocate_id();
355 e.tasks[tid as usize] = Some(TaskState {
356 future: Box::pin(future),
357 priority: Priority::Low,
358 scope_id: 0,
359 });
360 e.enqueue(tid);
361 e.try_schedule_flush()
362 };
363 if let Some(sched) = maybe_sched {
364 let ex2 = Rc::clone(ex);
365 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
366 }
367 }
368
369 /// Run a full flush cycle on this executor instance.
370 ///
371 /// Mirrors the global flush cycle but operates on an
372 /// isolated executor (used for SSR). Includes all the same
373 /// protections: `catch_unwind`, suspend checks, time-budget
374 /// yielding, and callback-drain budget.
375 #[allow(clippy::too_many_lines)]
376 pub fn flush_instance(ex: &Rc<RefCell<Executor>>) {
377 // Guard against re-entrant flushes.
378 {
379 let mut e = ex.borrow_mut();
380 if e.in_flush {
381 return;
382 }
383 e.in_flush = true;
384 }
385
386 // Step 1: deferred ops.
387 let deferred = std::mem::take(&mut ex.borrow_mut().deferred_ops);
388 for op in deferred {
389 (op.f)();
390 }
391
392 // Step 2: drain deferred signal callbacks with time budget.
393 {
394 let cb_start = ex.borrow().now_ms();
395 loop {
396 let callbacks = std::mem::take(&mut ex.borrow_mut().deferred_callbacks);
397 if callbacks.is_empty() {
398 break;
399 }
400 for cb in callbacks {
401 cb();
402 }
403 if ex.borrow().now_ms().saturating_sub(cb_start) >= ex.borrow().time_budget_ms {
404 if !ex.borrow().deferred_callbacks.is_empty() {
405 let (sched, ex2) = {
406 let mut e = ex.borrow_mut();
407 e.in_flush = false;
408 e.is_flush_scheduled = false;
409 (e.try_schedule_flush(), Rc::clone(ex))
410 };
411 if let Some(sched) = sched {
412 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
413 }
414 return;
415 }
416 break;
417 }
418 }
419 }
420
421 // Step 3: main poll loop with time-budget check.
422 let poll_start = ex.borrow().now_ms();
423 loop {
424 let task_id = ex.borrow_mut().dequeue();
425 let Some(tid) = task_id else {
426 let mut e = ex.borrow_mut();
427 e.is_flush_scheduled = false;
428 e.in_flush = false;
429 break;
430 };
431
432 // Take the task out so the poll doesn't hold an executor borrow.
433 let maybe_state = ex.borrow_mut().tasks[tid as usize].take();
434 if let Some(mut state) = maybe_state {
435 let priority = state.priority;
436 let scope_id = state.scope_id;
437
438 // Check if the owning scope is suspended.
439 let scope = crate::scope::find_scope(scope_id);
440 if let Some(ref s) = scope {
441 if s.is_suspended() {
442 let mut e = ex.borrow_mut();
443 if e.tasks[tid as usize].is_none() {
444 e.tasks[tid as usize] = Some(state);
445 }
446 continue;
447 }
448 }
449
450 let waker = Waker::from(Arc::new(TaskWaker {
451 task_id: tid,
452 priority,
453 }));
454 let mut cx = Context::from_waker(&waker);
455
456 // Inject owning scope.
457 let prev_scope = crate::scope::get_scope_direct();
458 if scope.is_some() {
459 crate::scope::set_scope_direct(scope);
460 }
461
462 // Task isolation (non-Wasm).
463 #[cfg(not(target_arch = "wasm32"))]
464 let result: Result<Poll<()>, Box<dyn std::any::Any + Send>> =
465 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
466 state.future.as_mut().poll(&mut cx)
467 }));
468 #[cfg(target_arch = "wasm32")]
469 let poll = state.future.as_mut().poll(&mut cx);
470
471 crate::scope::set_scope_direct(prev_scope);
472
473 #[cfg(not(target_arch = "wasm32"))]
474 {
475 match result {
476 Ok(Poll::Ready(())) => {
477 ex.borrow_mut().free_slot(tid);
478 }
479 Err(payload) => {
480 // Notify the panic hook (if any) before freeing the slot.
481 let hook = ex.borrow().panic_hook.clone();
482 if let Some(h) = hook {
483 h(PanicInfo {
484 task_id: tid,
485 scope_id,
486 payload,
487 });
488 }
489 ex.borrow_mut().free_slot(tid);
490 }
491 Ok(Poll::Pending) => {
492 let mut e = ex.borrow_mut();
493 if e.tasks[tid as usize].is_none() {
494 e.tasks[tid as usize] = Some(state);
495 }
496 }
497 }
498 }
499 #[cfg(target_arch = "wasm32")]
500 {
501 match poll {
502 Poll::Ready(()) => {
503 ex.borrow_mut().free_slot(tid);
504 }
505 Poll::Pending => {
506 let mut e = ex.borrow_mut();
507 if e.tasks[tid as usize].is_none() {
508 e.tasks[tid as usize] = Some(state);
509 }
510 }
511 }
512 }
513 }
514
515 // Time budget check.
516 {
517 let elapsed = ex.borrow().now_ms().saturating_sub(poll_start);
518 if elapsed >= ex.borrow().time_budget_ms {
519 let (maybe_sched, ex_clone) = {
520 let mut e = ex.borrow_mut();
521 e.is_flush_scheduled = false;
522 e.in_flush = false;
523 let sched = if !e.high_queue.is_empty() || !e.low_queue.is_empty() {
524 e.try_schedule_flush()
525 } else {
526 None
527 };
528 (sched, Rc::clone(ex))
529 };
530 if let Some(sched) = maybe_sched {
531 sched.schedule(Box::new(move || Self::flush_instance(&ex_clone)));
532 }
533 break;
534 }
535 }
536 }
537 }
538}
539
540// ---------------------------------------------------------------------------
541// Current-executor storage — injectable, defaults to thread-local
542// ---------------------------------------------------------------------------
543
544type ExecutorRef = Rc<RefCell<Executor>>;
545
546thread_local! {
547 static CURRENT_EXECUTOR: RefCell<Option<ExecutorRef>> = const { RefCell::new(None) };
548}
549
550/// Run `f` with `ex` set as the current executor.
551///
552/// Signal callbacks and `spawn_global` calls inside `f` will be routed
553/// to `ex` instead of the global thread-local executor. Restores the
554/// previous executor afterward.
555///
556/// # Signal routing constraints
557///
558/// Auralis uses a **single global schedule hook** (installed once by the
559/// first call to [`init_flush_scheduler`]) that decides where signal
560/// notifications land by checking the current executor **at the time the
561/// notification fires**, not at the time `Signal::set` is called.
562///
563/// This design implies two hard requirements for multi-instance users:
564///
565/// 1. **`init_flush_scheduler` must be called at least once** — without
566/// it, `Signal::set` falls back to synchronous callback execution,
567/// which breaks the deferred-notification model and can cause
568/// re-entrant borrow panics.
569/// 2. **The instance executor must still be "current" when the flush
570/// runs** — if `with_executor` has already exited, deferred callbacks
571/// from signals set inside `f` will be routed to the global executor
572/// (or synchronously if no global hook is installed).
573///
574/// For the typical single-threaded case (Wasm, game loop, CLI), both
575/// requirements are satisfied trivially: call `init_flush_scheduler`
576/// once at startup and never use `with_executor`. For SSR / multi-tenant
577/// servers, ensure that `with_executor` wraps the entire request
578/// lifecycle — from signal creation through the final flush.
579///
580/// # Example
581///
582/// ```rust,ignore
583/// use auralis_task::Executor;
584///
585/// let ex = Executor::new_instance();
586/// Executor::install_flush_scheduler(&ex, my_scheduler);
587/// auralis_task::with_executor(&ex, || {
588/// // Signal notifications and task spawns here go to `ex`.
589/// });
590/// ```
591pub fn with_executor<R>(ex: &ExecutorRef, f: impl FnOnce() -> R) -> R {
592 CURRENT_EXECUTOR.with(|exec| {
593 let prev = exec.borrow_mut().replace(Rc::clone(ex));
594 let result = f();
595 *exec.borrow_mut() = prev;
596 result
597 })
598}
599
600/// Return the current executor, if any.
601///
602/// If no executor has been set via [`with_executor`], returns `None` —
603/// callers should fall back to the global thread-local executor.
604fn current_executor() -> Option<ExecutorRef> {
605 CURRENT_EXECUTOR.with(|exec| exec.borrow().clone())
606}
607
608// ---------------------------------------------------------------------------
609// Helpers — use thread_local EXECUTOR
610// ---------------------------------------------------------------------------
611
612fn drain_pending_wakes() {
613 PENDING_WAKES.with(|pw| {
614 let wakes = std::mem::take(&mut *pw.borrow_mut());
615 if wakes.is_empty() {
616 return;
617 }
618 EXECUTOR.with(|exec| {
619 let maybe_sched = {
620 let mut ex = exec.borrow_mut();
621 for (id, priority) in wakes {
622 match priority {
623 Priority::High => ex.high_queue.push_back(id),
624 Priority::Low => ex.low_queue.push_back(id),
625 }
626 }
627 ex.try_schedule_flush()
628 };
629 if let Some(sched) = maybe_sched {
630 sched.schedule(Box::new(flush));
631 }
632 });
633 });
634}
635
636// ---------------------------------------------------------------------------
637// Flush
638// ---------------------------------------------------------------------------
639
640fn flush() {
641 EXECUTOR.with(Executor::flush_instance);
642 // Drain any wakes that landed in PENDING_WAKES because the executor
643 // RefCell was borrowed during a callback or task poll.
644 drain_pending_wakes();
645}
646
647// ---------------------------------------------------------------------------
648// Public API
649// ---------------------------------------------------------------------------
650
651/// Set the platform flush scheduler and install the signal deferred-
652/// callback hook (idempotent — subsequent calls are no-ops for the hook).
653pub fn init_flush_scheduler(sched: Rc<dyn ScheduleFlush>) {
654 EXECUTOR.with(|exec| exec.borrow_mut().flush_scheduler = Some(sched));
655 install_signal_hook_once();
656}
657
658/// Install the hook that bridges `auralis_signal::Signal::set` to the
659/// executor's deferred-callback queue.
660///
661/// Idempotent — safe to call multiple times.
662fn install_signal_hook_once() {
663 use std::sync::OnceLock;
664 static INSTALLED: OnceLock<()> = OnceLock::new();
665 INSTALLED.get_or_init(|| {
666 auralis_signal::install_schedule_hook(Box::new(|cb: Box<dyn FnOnce()>| {
667 // Prefer the current executor (set via `with_executor`) for
668 // SSR multi-request isolation; fall back to the global one.
669 if let Some(ex) = current_executor() {
670 let maybe_sched = {
671 let mut e = ex.borrow_mut();
672 e.deferred_callbacks.push(cb);
673 if e.in_flush {
674 None
675 } else {
676 e.try_schedule_flush()
677 }
678 };
679 if let Some(sched) = maybe_sched {
680 let ex2 = Rc::clone(&ex);
681 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
682 }
683 } else {
684 EXECUTOR.with(|exec| {
685 let maybe_sched = {
686 let mut ex = exec.borrow_mut();
687 ex.deferred_callbacks.push(cb);
688 if ex.in_flush {
689 None
690 } else {
691 ex.try_schedule_flush()
692 }
693 };
694 if let Some(sched) = maybe_sched {
695 sched.schedule(Box::new(flush));
696 }
697 });
698 }
699 }));
700 });
701}
702
703/// Set the platform time source used for time-budget accounting.
704///
705/// If no [`TimeSource`] is registered the executor runs every flush to
706/// completion without yielding, which is acceptable for short-running
707/// workloads but may cause frame drops in the browser.
708pub fn init_time_source(ts: Rc<dyn TimeSource>) {
709 EXECUTOR.with(|exec| exec.borrow_mut().time_source = Some(ts));
710}
711
712/// Set the per-flush time budget on the global executor.
713///
714/// See [`Executor::set_time_budget`] for details.
715pub fn set_global_time_budget(budget_ms: u64) {
716 EXECUTOR.with(|exec| exec.borrow_mut().time_budget_ms = budget_ms);
717}
718
719/// Register a global panic hook called when any globally-spawned
720/// task panics.
721///
722/// See [`Executor::set_panic_hook`] for details.
723pub fn set_panic_hook(hook: Rc<dyn Fn(PanicInfo)>) {
724 EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = Some(hook));
725}
726
727/// Remove the global panic hook, restoring the default silent
728/// behaviour.
729pub fn remove_panic_hook() {
730 EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = None);
731}
732
733/// Spawn a future on the global executor at low priority.
734pub fn spawn_global(future: impl Future<Output = ()> + 'static) {
735 spawn_global_with_priority(Priority::Low, future);
736}
737
738/// Spawn a future on the global executor at the given priority.
739pub fn spawn_global_with_priority(priority: Priority, future: impl Future<Output = ()> + 'static) {
740 spawn_inner(Box::pin(future), priority, 0);
741}
742
743pub(crate) fn spawn_scoped(
744 priority: Priority,
745 scope_id: u64,
746 future: impl Future<Output = ()> + 'static,
747) -> TaskId {
748 spawn_inner(Box::pin(future), priority, scope_id)
749}
750
751fn spawn_inner(
752 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
753 priority: Priority,
754 scope_id: u64,
755) -> TaskId {
756 EXECUTOR.with(|exec| {
757 let (task_id, maybe_sched) = {
758 let mut ex = exec.borrow_mut();
759 let task_id = ex.allocate_id();
760 ex.tasks[task_id as usize] = Some(TaskState {
761 future,
762 priority,
763 scope_id,
764 });
765 ex.enqueue(task_id);
766 let sched = ex.try_schedule_flush();
767 (task_id, sched)
768 };
769 // Schedule outside the borrow.
770 if let Some(sched) = maybe_sched {
771 sched.schedule(Box::new(flush));
772 }
773 task_id
774 })
775}
776
777/// Enqueue all tasks belonging to `scope_id` and trigger a flush.
778///
779/// Used by [`TaskScope::resume`] to restart tasks after a suspend.
780pub(crate) fn enqueue_scope_tasks(scope_id: u64) {
781 EXECUTOR.with(|exec| {
782 let task_ids: Vec<TaskId> = {
783 let ex = exec.borrow();
784 ex.tasks
785 .iter()
786 .enumerate()
787 .filter(|(_, slot)| slot.as_ref().is_some_and(|t| t.scope_id == scope_id))
788 .map(|(idx, _)| idx as TaskId)
789 .collect()
790 };
791 let maybe_sched = {
792 let mut ex = exec.borrow_mut();
793 for tid in task_ids {
794 ex.enqueue(tid);
795 }
796 if ex.in_flush {
797 None
798 } else {
799 ex.try_schedule_flush()
800 }
801 };
802 if let Some(sched) = maybe_sched {
803 sched.schedule(Box::new(flush));
804 }
805 });
806}
807
808pub(crate) fn cancel_scope_tasks(scope_id: u64) -> Vec<Pin<Box<dyn Future<Output = ()>>>> {
809 EXECUTOR.with(|exec| {
810 let mut ex = exec.borrow_mut();
811 let mut dropped = Vec::new();
812
813 for slot in &mut ex.tasks {
814 if let Some(ref t) = slot {
815 if t.scope_id == scope_id {
816 dropped.push(
817 slot.take()
818 .expect("task slot was None after is_some check")
819 .future,
820 );
821 }
822 }
823 }
824
825 // Filter queues.
826 let high: Vec<TaskId> = ex
827 .high_queue
828 .iter()
829 .filter(|id| ex.tasks[**id as usize].is_some())
830 .copied()
831 .collect();
832 ex.high_queue.clear();
833 ex.high_queue.extend(high);
834
835 let low: Vec<TaskId> = ex
836 .low_queue
837 .iter()
838 .filter(|id| ex.tasks[**id as usize].is_some())
839 .copied()
840 .collect();
841 ex.low_queue.clear();
842 ex.low_queue.extend(low);
843
844 let mut all_free: Vec<TaskId> = ex
845 .tasks
846 .iter()
847 .enumerate()
848 .filter(|(_, s)| s.is_none())
849 .map(|(i, _)| i as TaskId)
850 .chain(ex.free_slots.iter().copied())
851 .collect();
852 all_free.sort_unstable();
853 all_free.dedup();
854 ex.free_slots = all_free;
855
856 dropped
857 })
858}
859
860// ---------------------------------------------------------------------------
861// yield_now
862// ---------------------------------------------------------------------------
863
864/// Return a [`Future`] that yields control back to the executor once.
865#[must_use = "yield_now() does nothing unless awaited"]
866pub fn yield_now() -> YieldNow {
867 YieldNow { yielded: false }
868}
869
870/// Future returned by [`yield_now`].
871#[derive(Debug)]
872#[must_use = "futures do nothing unless polled"]
873pub struct YieldNow {
874 yielded: bool,
875}
876
877impl Future for YieldNow {
878 type Output = ();
879
880 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
881 if self.yielded {
882 Poll::Ready(())
883 } else {
884 self.yielded = true;
885 cx.waker().wake_by_ref();
886 Poll::Pending
887 }
888 }
889}
890
891// ---------------------------------------------------------------------------
892// schedule_callback — hook for auralis-signal's deferred callback model
893// ---------------------------------------------------------------------------
894
895/// Schedule a closure to run at the start of the next executor flush.
896///
897/// Used internally by `auralis_signal` to defer subscriber callback
898/// execution. The closure is drained before the main poll loop.
899pub fn schedule_callback(f: Box<dyn FnOnce()>) {
900 EXECUTOR.with(|exec| {
901 let maybe_sched = {
902 let mut ex = exec.borrow_mut();
903 ex.deferred_callbacks.push(f);
904 if ex.in_flush {
905 None
906 } else {
907 ex.try_schedule_flush()
908 }
909 };
910 if let Some(sched) = maybe_sched {
911 sched.schedule(Box::new(flush));
912 }
913 });
914}
915
916// ---------------------------------------------------------------------------
917// set_deferred
918// ---------------------------------------------------------------------------
919
920/// Schedule a [`Signal::set`] call for the **next** executor flush.
921///
922/// Safe to call from inside [`Drop`] — the actual `signal.set(value)` is
923/// deferred to a subsequent flush, avoiding re-entrant borrow panics.
924pub fn set_deferred<T: 'static>(signal: &Signal<T>, value: T) {
925 let signal = signal.clone();
926 EXECUTOR.with(|exec| {
927 let maybe_sched = {
928 let mut ex = exec.borrow_mut();
929 ex.deferred_ops.push(DeferredOp {
930 f: Box::new(move || signal.set(value)),
931 });
932 ex.try_schedule_flush()
933 };
934 if let Some(sched) = maybe_sched {
935 sched.schedule(Box::new(flush));
936 }
937 });
938}
939
940// ---------------------------------------------------------------------------
941// Test / debug helpers
942// ---------------------------------------------------------------------------
943
944/// Completely reset the global executor to a pristine state.
945///
946/// Clears all task slots, queues, deferred ops, flush/scheduler flags,
947/// and injected [`ScheduleFlush`]/[`TimeSource`]. Call at the start
948/// of every test to prevent cross-test state leakage.
949///
950/// # Safety / usage
951///
952/// This function is intended **only** for testing. Calling it while
953/// the executor is processing tasks will silently drop all live
954/// futures and may cause panics or undefined behavior in running
955/// application code.
956pub fn reset_executor_for_test() {
957 PENDING_WAKES.with(|pw| pw.borrow_mut().clear());
958 EXECUTOR.with(|exec| {
959 let mut ex = exec.borrow_mut();
960 ex.high_queue.clear();
961 ex.low_queue.clear();
962 ex.tasks.clear();
963 ex.free_slots.clear();
964 ex.next_task_id = 0;
965 ex.is_flush_scheduled = false;
966 ex.in_flush = false;
967 ex.deferred_ops.clear();
968 ex.deferred_callbacks.clear();
969 ex.flush_scheduler = None;
970 ex.time_source = None;
971 });
972 crate::scope::clear_scope_registry();
973}
974
975#[cfg(test)]
976pub(crate) fn debug_task_count() -> usize {
977 EXECUTOR.with(|exec| exec.borrow().tasks.iter().filter(|t| t.is_some()).count())
978}
979
980/// Return a snapshot of all active tasks: `(task_id, priority, scope_id)`.
981#[cfg(feature = "debug")]
982pub(crate) fn debug_task_snapshot() -> Vec<(TaskId, Priority, u64)> {
983 EXECUTOR.with(|exec| {
984 let ex = exec.borrow();
985 let mut snap = Vec::new();
986 for (idx, slot) in ex.tasks.iter().enumerate() {
987 if let Some(ref t) = slot {
988 snap.push((idx as u64, t.priority, t.scope_id));
989 }
990 }
991 snap
992 })
993}
994
995/// Return the set of task IDs currently in the ready queues.
996#[cfg(feature = "debug")]
997pub(crate) fn debug_queued_task_ids() -> Vec<TaskId> {
998 EXECUTOR.with(|exec| {
999 let ex = exec.borrow();
1000 let mut ids: Vec<TaskId> = ex
1001 .high_queue
1002 .iter()
1003 .chain(ex.low_queue.iter())
1004 .copied()
1005 .collect();
1006 ids.sort_unstable();
1007 ids.dedup();
1008 ids
1009 })
1010}
1011
1012/// Spawn a task without triggering an automatic flush.
1013/// Used in tests to batch multiple spawns before executing them.
1014#[cfg(test)]
1015pub(crate) fn spawn_no_auto_flush(
1016 priority: Priority,
1017 future: impl Future<Output = ()> + 'static,
1018) -> TaskId {
1019 EXECUTOR.with(|exec| {
1020 let mut ex = exec.borrow_mut();
1021 let task_id = ex.allocate_id();
1022 ex.tasks[task_id as usize] = Some(TaskState {
1023 future: Box::pin(future),
1024 priority,
1025 scope_id: 0,
1026 });
1027 ex.enqueue(task_id);
1028 // Do NOT schedule flush.
1029 task_id
1030 })
1031}
1032
1033/// Run a manual flush cycle (for tests that need to control timing).
1034#[cfg(test)]
1035pub(crate) fn flush_all() {
1036 flush();
1037}