auralis_task/executor.rs
1//! Single-threaded executor with priority scheduling, time-budget
2//! awareness, and deferred-signal support.
3//!
4//! ## Architecture
5//!
6//! The executor is stored via a pluggable [`ExecutorStorage`] strategy
7//! (defaulting to a per-thread slot). Before polling a task the future
8//! is **temporarily removed** so that the poll never holds an executor
9//! borrow — this allows nested spawns, wakes, and `set_deferred` calls
10//! without `RefCell` panics.
11//!
12//! The waker carries only a `task_id: u64`, making it trivially
13//! [`Send`] + [`Sync`] for [`Waker::from`].
14
15#![allow(clippy::cast_possible_truncation)]
16
17use std::cell::RefCell;
18use std::collections::VecDeque;
19use std::future::Future;
20use std::pin::Pin;
21use std::rc::Rc;
22use std::sync::Arc;
23use std::task::{Context, Poll, Wake, Waker};
24
25use auralis_signal::Signal;
26
27use crate::Priority;
28
29// ---------------------------------------------------------------------------
30// Types
31// ---------------------------------------------------------------------------
32
33type TaskId = u64;
34
35// ---------------------------------------------------------------------------
36// ScheduleFlush
37// ---------------------------------------------------------------------------
38
39/// Platform hook for scheduling a microtask callback.
40pub trait ScheduleFlush {
41 /// Request that `callback` runs at the next microtask boundary.
42 fn schedule(&self, callback: Box<dyn FnOnce()>);
43}
44
45/// A [`ScheduleFlush`] that fires the callback synchronously.
46///
47/// Makes the executor run-to-completion in unit tests without a browser
48/// event loop.
49#[cfg(test)]
50pub struct TestScheduleFlush;
51
52#[cfg(test)]
53impl ScheduleFlush for TestScheduleFlush {
54 fn schedule(&self, callback: Box<dyn FnOnce()>) {
55 callback();
56 }
57}
58
59// ---------------------------------------------------------------------------
60// TimeSource
61// ---------------------------------------------------------------------------
62
63/// High-resolution time source for the executor's time-budget
64/// accounting.
65///
66/// When registered via [`init_time_source`], the executor queries this
67/// before and after each task poll to decide whether it should yield
68/// control back to the host event loop (default budget: 8 ms).
69///
70/// In Wasm environments the implementation typically delegates to
71/// `performance.now()`. If no [`TimeSource`] is registered the time
72/// budget check is a no-op and the executor runs tasks until the
73/// queues are drained.
74pub trait TimeSource {
75 /// Return the current time in milliseconds.
76 fn now_ms(&self) -> u64;
77}
78
79/// A [`TimeSource`] whose value is explicitly controlled by the test.
80///
81/// Use [`set`](TestTimeSource::set) or [`advance`](TestTimeSource::advance)
82/// to simulate the passage of time during a flush cycle.
83#[cfg(test)]
84pub struct TestTimeSource {
85 now: std::cell::Cell<u64>,
86}
87
88#[cfg(test)]
89impl TestTimeSource {
90 /// Create a new [`TestTimeSource`] with the given initial time.
91 #[must_use]
92 pub fn new(initial_ms: u64) -> Self {
93 Self {
94 now: std::cell::Cell::new(initial_ms),
95 }
96 }
97
98 /// Set the current time to `ms` milliseconds.
99 pub fn set(&self, ms: u64) {
100 self.now.set(ms);
101 }
102
103 /// Advance the current time by `ms` milliseconds.
104 pub fn advance(&self, ms: u64) {
105 self.now.set(self.now.get() + ms);
106 }
107}
108
109#[cfg(test)]
110impl TimeSource for TestTimeSource {
111 fn now_ms(&self) -> u64 {
112 self.now.get()
113 }
114}
115
116// ---------------------------------------------------------------------------
117// TaskWaker
118// ---------------------------------------------------------------------------
119
120struct TaskWaker {
121 task_id: TaskId,
122 priority: Priority,
123}
124
125impl Wake for TaskWaker {
126 fn wake(self: Arc<Self>) {
127 let maybe_sched = EXECUTOR.with(|exec| {
128 if let Ok(mut ex) = exec.try_borrow_mut() {
129 match self.priority {
130 Priority::High => ex.high_queue.push_back(self.task_id),
131 Priority::Low => ex.low_queue.push_back(self.task_id),
132 }
133 // Only schedule a fresh flush if we're NOT already inside
134 // one (the running flush loop will pick up the task).
135 if ex.in_flush {
136 None
137 } else {
138 ex.try_schedule_flush()
139 }
140 } else {
141 PENDING_WAKES.with(|pw| {
142 pw.borrow_mut().push((self.task_id, self.priority));
143 });
144 None
145 }
146 });
147 if let Some(sched) = maybe_sched {
148 sched.schedule(Box::new(flush));
149 }
150 }
151}
152
153// ---------------------------------------------------------------------------
154// TaskState
155// ---------------------------------------------------------------------------
156
157struct TaskState {
158 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
159 priority: Priority,
160 scope_id: u64,
161}
162
163// ---------------------------------------------------------------------------
164// Executor
165// ---------------------------------------------------------------------------
166
167/// Information about a task panic, passed to the user-registered
168/// [`set_panic_hook`].
169#[derive(Debug)]
170pub struct PanicInfo {
171 /// The executor-assigned task id.
172 pub task_id: u64,
173 /// The scope that owned the task (0 for global tasks).
174 pub scope_id: u64,
175 /// The boxed panic payload.
176 pub payload: Box<dyn std::any::Any + Send>,
177}
178
179/// A single-threaded async task executor with priority queues.
180///
181/// Each [`Executor`] manages its own task slots, ready queues, and
182/// deferred callback buffers. Use [`Executor::new_instance`] to create
183/// an isolated executor (e.g. per SSR request), or use the global
184/// thread-local executor via [`spawn_global`](crate::spawn_global).
185pub struct Executor {
186 high_queue: VecDeque<TaskId>,
187 low_queue: VecDeque<TaskId>,
188 tasks: Vec<Option<TaskState>>,
189 free_slots: Vec<TaskId>,
190 next_task_id: TaskId,
191 is_flush_scheduled: bool,
192 in_flush: bool,
193 deferred_ops: Vec<DeferredOp>,
194 /// Callbacks pushed by `Signal::set` via the schedule hook.
195 /// Drained at the start of every flush before polling tasks.
196 deferred_callbacks: Vec<Box<dyn FnOnce()>>,
197 flush_scheduler: Option<Rc<dyn ScheduleFlush>>,
198 time_source: Option<Rc<dyn TimeSource>>,
199 /// Maximum milliseconds to spend inside a single flush before
200 /// yielding back to the host event loop. Default: 8 ms.
201 time_budget_ms: u64,
202 /// Optional hook invoked when a spawned task panics.
203 panic_hook: Option<Rc<dyn Fn(PanicInfo)>>,
204}
205
206struct DeferredOp {
207 f: Box<dyn FnOnce()>,
208}
209
210impl Executor {
211 fn new() -> Self {
212 Self {
213 high_queue: VecDeque::new(),
214 low_queue: VecDeque::new(),
215 tasks: Vec::new(),
216 free_slots: Vec::new(),
217 next_task_id: 0,
218 is_flush_scheduled: false,
219 in_flush: false,
220 deferred_ops: Vec::new(),
221 deferred_callbacks: Vec::new(),
222 flush_scheduler: None,
223 time_source: None,
224 time_budget_ms: 8,
225 panic_hook: None,
226 }
227 }
228
229 fn allocate_id(&mut self) -> TaskId {
230 if let Some(id) = self.free_slots.pop() {
231 return id;
232 }
233 let id = self.next_task_id;
234 self.next_task_id += 1;
235 self.tasks.push(None);
236 id
237 }
238
239 fn free_slot(&mut self, task_id: TaskId) {
240 self.tasks[task_id as usize] = None;
241 self.free_slots.push(task_id);
242 }
243
244 fn enqueue(&mut self, task_id: TaskId) {
245 let priority = match self.tasks.get(task_id as usize).and_then(Option::as_ref) {
246 Some(t) => t.priority,
247 None => return,
248 };
249 match priority {
250 Priority::High => self.high_queue.push_back(task_id),
251 Priority::Low => self.low_queue.push_back(task_id),
252 }
253 }
254
255 fn dequeue(&mut self) -> Option<TaskId> {
256 self.high_queue
257 .pop_front()
258 .or_else(|| self.low_queue.pop_front())
259 }
260
261 /// Mark that a flush is needed and return the scheduler if one is
262 /// registered. The caller **must** invoke the scheduler **after**
263 /// releasing the executor borrow.
264 fn try_schedule_flush(&mut self) -> Option<Rc<dyn ScheduleFlush>> {
265 if self.is_flush_scheduled {
266 return None;
267 }
268 self.is_flush_scheduled = true;
269 self.flush_scheduler.clone()
270 }
271
272 /// Return the current time in ms, or 0 if no [`TimeSource`] is
273 /// registered. When this returns 0 the time-budget check is
274 /// effectively a no-op.
275 pub(crate) fn now_ms(&self) -> u64 {
276 self.time_source.as_ref().map_or(0, |ts| ts.now_ms())
277 }
278
279 /// Return the number of currently active (not-yet-completed) tasks.
280 ///
281 /// Used by streaming SSR to determine whether the stream should
282 /// wait for more work or terminate.
283 #[must_use]
284 pub fn active_task_count(&self) -> usize {
285 self.tasks.iter().filter(|t| t.is_some()).count()
286 }
287}
288
289// ---------------------------------------------------------------------------
290// Thread-local globals (default storage)
291// ---------------------------------------------------------------------------
292
293thread_local! {
294 static EXECUTOR: Rc<RefCell<Executor>> = Rc::new(RefCell::new(Executor::new()));
295 static PENDING_WAKES: RefCell<Vec<(TaskId, Priority)>> = const { RefCell::new(Vec::new()) };
296}
297
298// ---------------------------------------------------------------------------
299// Executor instance methods (for isolated executors, e.g. SSR)
300// ---------------------------------------------------------------------------
301
302impl Executor {
303 /// Create a new isolated executor, wrapped for shared access.
304 ///
305 /// The returned executor is independent of the global thread-local
306 /// executor. Use [`with_executor`] to make it the current executor
307 /// for the duration of a closure, so that spawned tasks and signal
308 /// callbacks are routed to it.
309 #[must_use]
310 pub fn new_instance() -> Rc<RefCell<Executor>> {
311 Rc::new(RefCell::new(Executor::new()))
312 }
313
314 /// Install a flush scheduler on this executor instance.
315 pub fn install_flush_scheduler(ex: &Rc<RefCell<Executor>>, sched: Rc<dyn ScheduleFlush>) {
316 ex.borrow_mut().flush_scheduler = Some(sched);
317 }
318
319 /// Install a time source on this executor instance.
320 pub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>) {
321 ex.borrow_mut().time_source = Some(ts);
322 }
323
324 /// Set the maximum time (in milliseconds) a single flush may spend
325 /// before yielding back to the host event loop.
326 ///
327 /// The default is 8 ms. Set to `u64::MAX` to disable time-budget
328 /// yielding (flush runs to completion).
329 pub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64) {
330 ex.borrow_mut().time_budget_ms = budget_ms;
331 }
332
333 /// Register a callback invoked whenever a spawned task panics.
334 ///
335 /// The default is no hook — panicking tasks are silently removed
336 /// from the executor (the same behaviour as a task returning
337 /// `Poll::Ready(())`).
338 ///
339 /// # Example
340 ///
341 /// ```rust,ignore
342 /// Executor::set_panic_hook(&ex, Rc::new(|info| {
343 /// eprintln!("task {} in scope {} panicked", info.task_id, info.scope_id);
344 /// }));
345 /// ```
346 pub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>) {
347 ex.borrow_mut().panic_hook = Some(hook);
348 }
349
350 /// Spawn a future on this executor instance.
351 pub fn spawn(ex: &Rc<RefCell<Executor>>, future: impl Future<Output = ()> + 'static) {
352 let maybe_sched = {
353 let mut e = ex.borrow_mut();
354 let tid = e.allocate_id();
355 e.tasks[tid as usize] = Some(TaskState {
356 future: Box::pin(future),
357 priority: Priority::Low,
358 scope_id: 0,
359 });
360 e.enqueue(tid);
361 e.try_schedule_flush()
362 };
363 if let Some(sched) = maybe_sched {
364 let ex2 = Rc::clone(ex);
365 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
366 }
367 }
368
369 /// Run a full flush cycle on this executor instance.
370 ///
371 /// Mirrors the global flush cycle but operates on an
372 /// isolated executor (used for SSR). Includes all the same
373 /// protections: `catch_unwind`, suspend checks, time-budget
374 /// yielding, and callback-drain budget.
375 #[allow(clippy::too_many_lines)]
376 pub fn flush_instance(ex: &Rc<RefCell<Executor>>) {
377 // Guard against re-entrant flushes.
378 {
379 let mut e = ex.borrow_mut();
380 if e.in_flush {
381 #[cfg(debug_assertions)]
382 {
383 eprintln!(
384 "[auralis-task] WARNING: Executor::flush_instance called \
385 re-entrantly (already inside a flush). This is a no-op. \
386 Check for nested flush() calls in signal callbacks or \
387 ScheduleFlush implementations."
388 );
389 }
390 return;
391 }
392 e.in_flush = true;
393 }
394
395 // Step 1: deferred ops.
396 let deferred = std::mem::take(&mut ex.borrow_mut().deferred_ops);
397 for op in deferred {
398 (op.f)();
399 }
400
401 // Step 2: drain deferred signal callbacks with time budget.
402 {
403 let cb_start = ex.borrow().now_ms();
404 loop {
405 let callbacks = std::mem::take(&mut ex.borrow_mut().deferred_callbacks);
406 if callbacks.is_empty() {
407 break;
408 }
409 for cb in callbacks {
410 cb();
411 }
412 if ex.borrow().now_ms().saturating_sub(cb_start) >= ex.borrow().time_budget_ms {
413 if !ex.borrow().deferred_callbacks.is_empty() {
414 let (sched, ex2) = {
415 let mut e = ex.borrow_mut();
416 e.in_flush = false;
417 e.is_flush_scheduled = false;
418 (e.try_schedule_flush(), Rc::clone(ex))
419 };
420 if let Some(sched) = sched {
421 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
422 }
423 return;
424 }
425 break;
426 }
427 }
428 }
429
430 // Step 3: main poll loop with time-budget check.
431 let poll_start = ex.borrow().now_ms();
432 loop {
433 let task_id = ex.borrow_mut().dequeue();
434 let Some(tid) = task_id else {
435 let mut e = ex.borrow_mut();
436 e.is_flush_scheduled = false;
437 e.in_flush = false;
438 break;
439 };
440
441 // Take the task out so the poll doesn't hold an executor borrow.
442 let maybe_state = ex.borrow_mut().tasks[tid as usize].take();
443 if let Some(mut state) = maybe_state {
444 let priority = state.priority;
445 let scope_id = state.scope_id;
446
447 // Check if the owning scope is suspended.
448 let scope = crate::scope::find_scope(scope_id);
449 if let Some(ref s) = scope {
450 if s.is_suspended() {
451 let mut e = ex.borrow_mut();
452 if e.tasks[tid as usize].is_none() {
453 e.tasks[tid as usize] = Some(state);
454 }
455 continue;
456 }
457 }
458
459 let waker = Waker::from(Arc::new(TaskWaker {
460 task_id: tid,
461 priority,
462 }));
463 let mut cx = Context::from_waker(&waker);
464
465 // Inject owning scope.
466 let prev_scope = crate::scope::get_scope_direct();
467 if scope.is_some() {
468 crate::scope::set_scope_direct(scope);
469 }
470
471 // Task isolation (non-Wasm).
472 #[cfg(not(target_arch = "wasm32"))]
473 let result: Result<Poll<()>, Box<dyn std::any::Any + Send>> =
474 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
475 state.future.as_mut().poll(&mut cx)
476 }));
477 #[cfg(target_arch = "wasm32")]
478 let poll = state.future.as_mut().poll(&mut cx);
479
480 crate::scope::set_scope_direct(prev_scope);
481
482 #[cfg(not(target_arch = "wasm32"))]
483 {
484 match result {
485 Ok(Poll::Ready(())) => {
486 ex.borrow_mut().free_slot(tid);
487 }
488 Err(payload) => {
489 // Notify the panic hook (if any) before freeing the slot.
490 let hook = ex.borrow().panic_hook.clone();
491 if let Some(h) = hook {
492 h(PanicInfo {
493 task_id: tid,
494 scope_id,
495 payload,
496 });
497 }
498 ex.borrow_mut().free_slot(tid);
499 }
500 Ok(Poll::Pending) => {
501 let mut e = ex.borrow_mut();
502 if e.tasks[tid as usize].is_none() {
503 e.tasks[tid as usize] = Some(state);
504 }
505 }
506 }
507 }
508 #[cfg(target_arch = "wasm32")]
509 {
510 match poll {
511 Poll::Ready(()) => {
512 ex.borrow_mut().free_slot(tid);
513 }
514 Poll::Pending => {
515 let mut e = ex.borrow_mut();
516 if e.tasks[tid as usize].is_none() {
517 e.tasks[tid as usize] = Some(state);
518 }
519 }
520 }
521 }
522 }
523
524 // Time budget check.
525 {
526 let elapsed = ex.borrow().now_ms().saturating_sub(poll_start);
527 if elapsed >= ex.borrow().time_budget_ms {
528 let (maybe_sched, ex_clone) = {
529 let mut e = ex.borrow_mut();
530 e.is_flush_scheduled = false;
531 e.in_flush = false;
532 let sched = if !e.high_queue.is_empty() || !e.low_queue.is_empty() {
533 e.try_schedule_flush()
534 } else {
535 None
536 };
537 (sched, Rc::clone(ex))
538 };
539 if let Some(sched) = maybe_sched {
540 sched.schedule(Box::new(move || Self::flush_instance(&ex_clone)));
541 }
542 break;
543 }
544 }
545 }
546 }
547}
548
549// ---------------------------------------------------------------------------
550// Current-executor storage — injectable, defaults to thread-local
551// ---------------------------------------------------------------------------
552
553type ExecutorRef = Rc<RefCell<Executor>>;
554
555thread_local! {
556 static CURRENT_EXECUTOR: RefCell<Option<ExecutorRef>> = const { RefCell::new(None) };
557}
558
559/// Run `f` with `ex` set as the current executor.
560///
561/// Signal callbacks and `spawn_global` calls inside `f` will be routed
562/// to `ex` instead of the global thread-local executor. Restores the
563/// previous executor afterward.
564///
565/// # Signal routing constraints
566///
567/// Auralis uses a **single global schedule hook** (installed once by the
568/// first call to [`init_flush_scheduler`]) that decides where signal
569/// notifications land by checking the current executor **at the time the
570/// notification fires**, not at the time `Signal::set` is called.
571///
572/// This design implies two hard requirements for multi-instance users:
573///
574/// 1. **`init_flush_scheduler` must be called at least once** — without
575/// it, `Signal::set` falls back to synchronous callback execution,
576/// which breaks the deferred-notification model and can cause
577/// re-entrant borrow panics.
578/// 2. **The instance executor must still be "current" when the flush
579/// runs** — if `with_executor` has already exited, deferred callbacks
580/// from signals set inside `f` will be routed to the global executor
581/// (or synchronously if no global hook is installed).
582///
583/// For the typical single-threaded case (Wasm, game loop, CLI), both
584/// requirements are satisfied trivially: call `init_flush_scheduler`
585/// once at startup and never use `with_executor`. For SSR / multi-tenant
586/// servers, ensure that `with_executor` wraps the entire request
587/// lifecycle — from signal creation through the final flush.
588///
589/// # Example
590///
591/// ```rust,ignore
592/// use auralis_task::Executor;
593///
594/// let ex = Executor::new_instance();
595/// Executor::install_flush_scheduler(&ex, my_scheduler);
596/// auralis_task::with_executor(&ex, || {
597/// // Signal notifications and task spawns here go to `ex`.
598/// });
599/// ```
600pub fn with_executor<R>(ex: &ExecutorRef, f: impl FnOnce() -> R) -> R {
601 CURRENT_EXECUTOR.with(|exec| {
602 let prev = exec.borrow_mut().replace(Rc::clone(ex));
603 let result = f();
604 *exec.borrow_mut() = prev;
605 result
606 })
607}
608
609/// Return the current executor, if any.
610///
611/// If no executor has been set via [`with_executor`], returns `None` —
612/// callers should fall back to the global thread-local executor.
613fn current_executor() -> Option<ExecutorRef> {
614 CURRENT_EXECUTOR.with(|exec| exec.borrow().clone())
615}
616
617// ---------------------------------------------------------------------------
618// Helpers — use thread_local EXECUTOR
619// ---------------------------------------------------------------------------
620
621fn drain_pending_wakes() {
622 PENDING_WAKES.with(|pw| {
623 let wakes = std::mem::take(&mut *pw.borrow_mut());
624 if wakes.is_empty() {
625 return;
626 }
627 EXECUTOR.with(|exec| {
628 let maybe_sched = {
629 let mut ex = exec.borrow_mut();
630 for (id, priority) in wakes {
631 match priority {
632 Priority::High => ex.high_queue.push_back(id),
633 Priority::Low => ex.low_queue.push_back(id),
634 }
635 }
636 ex.try_schedule_flush()
637 };
638 if let Some(sched) = maybe_sched {
639 sched.schedule(Box::new(flush));
640 }
641 });
642 });
643}
644
645// ---------------------------------------------------------------------------
646// Flush
647// ---------------------------------------------------------------------------
648
649fn flush() {
650 EXECUTOR.with(Executor::flush_instance);
651 // Drain any wakes that landed in PENDING_WAKES because the executor
652 // RefCell was borrowed during a callback or task poll.
653 drain_pending_wakes();
654}
655
656// ---------------------------------------------------------------------------
657// Public API
658// ---------------------------------------------------------------------------
659
660/// Set the platform flush scheduler and install the signal deferred-
661/// callback hook (idempotent — subsequent calls are no-ops for the hook).
662pub fn init_flush_scheduler(sched: Rc<dyn ScheduleFlush>) {
663 EXECUTOR.with(|exec| exec.borrow_mut().flush_scheduler = Some(sched));
664 install_signal_hook_once();
665}
666
667/// Install the hook that bridges `auralis_signal::Signal::set` to the
668/// executor's deferred-callback queue.
669///
670/// Idempotent — safe to call multiple times.
671fn install_signal_hook_once() {
672 use std::sync::OnceLock;
673 static INSTALLED: OnceLock<()> = OnceLock::new();
674 INSTALLED.get_or_init(|| {
675 auralis_signal::install_schedule_hook(Box::new(|cb: Box<dyn FnOnce()>| {
676 // Prefer the current executor (set via `with_executor`) for
677 // SSR multi-request isolation; fall back to the global one.
678 if let Some(ex) = current_executor() {
679 let maybe_sched = {
680 let mut e = ex.borrow_mut();
681 e.deferred_callbacks.push(cb);
682 if e.in_flush {
683 None
684 } else {
685 e.try_schedule_flush()
686 }
687 };
688 if let Some(sched) = maybe_sched {
689 let ex2 = Rc::clone(&ex);
690 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
691 }
692 } else {
693 EXECUTOR.with(|exec| {
694 let maybe_sched = {
695 let mut ex = exec.borrow_mut();
696 ex.deferred_callbacks.push(cb);
697 if ex.in_flush {
698 None
699 } else {
700 ex.try_schedule_flush()
701 }
702 };
703 if let Some(sched) = maybe_sched {
704 sched.schedule(Box::new(flush));
705 }
706 });
707 }
708 }));
709 });
710}
711
712/// Set the platform time source used for time-budget accounting.
713///
714/// If no [`TimeSource`] is registered the executor runs every flush to
715/// completion without yielding, which is acceptable for short-running
716/// workloads but may cause frame drops in the browser.
717pub fn init_time_source(ts: Rc<dyn TimeSource>) {
718 EXECUTOR.with(|exec| exec.borrow_mut().time_source = Some(ts));
719}
720
721/// Set the per-flush time budget on the global executor.
722///
723/// See [`Executor::set_time_budget`] for details.
724pub fn set_global_time_budget(budget_ms: u64) {
725 EXECUTOR.with(|exec| exec.borrow_mut().time_budget_ms = budget_ms);
726}
727
728/// Register a global panic hook called when any globally-spawned
729/// task panics.
730///
731/// See [`Executor::set_panic_hook`] for details.
732pub fn set_panic_hook(hook: Rc<dyn Fn(PanicInfo)>) {
733 EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = Some(hook));
734}
735
736/// Remove the global panic hook, restoring the default silent
737/// behaviour.
738pub fn remove_panic_hook() {
739 EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = None);
740}
741
742/// Spawn a future on the global executor at low priority.
743pub fn spawn_global(future: impl Future<Output = ()> + 'static) {
744 spawn_global_with_priority(Priority::Low, future);
745}
746
747/// Spawn a future on the global executor at the given priority.
748pub fn spawn_global_with_priority(priority: Priority, future: impl Future<Output = ()> + 'static) {
749 spawn_inner(Box::pin(future), priority, 0);
750}
751
752pub(crate) fn spawn_scoped(
753 priority: Priority,
754 scope_id: u64,
755 future: impl Future<Output = ()> + 'static,
756) -> TaskId {
757 spawn_inner(Box::pin(future), priority, scope_id)
758}
759
760fn spawn_inner(
761 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
762 priority: Priority,
763 scope_id: u64,
764) -> TaskId {
765 EXECUTOR.with(|exec| {
766 let (task_id, maybe_sched) = {
767 let mut ex = exec.borrow_mut();
768 let task_id = ex.allocate_id();
769 ex.tasks[task_id as usize] = Some(TaskState {
770 future,
771 priority,
772 scope_id,
773 });
774 ex.enqueue(task_id);
775 let sched = ex.try_schedule_flush();
776 (task_id, sched)
777 };
778 // Schedule outside the borrow.
779 if let Some(sched) = maybe_sched {
780 sched.schedule(Box::new(flush));
781 }
782 task_id
783 })
784}
785
786/// Enqueue all tasks belonging to `scope_id` and trigger a flush.
787///
788/// Used by [`TaskScope::resume`] to restart tasks after a suspend.
789pub(crate) fn enqueue_scope_tasks(scope_id: u64) {
790 EXECUTOR.with(|exec| {
791 let task_ids: Vec<TaskId> = {
792 let ex = exec.borrow();
793 ex.tasks
794 .iter()
795 .enumerate()
796 .filter(|(_, slot)| slot.as_ref().is_some_and(|t| t.scope_id == scope_id))
797 .map(|(idx, _)| idx as TaskId)
798 .collect()
799 };
800 let maybe_sched = {
801 let mut ex = exec.borrow_mut();
802 for tid in task_ids {
803 ex.enqueue(tid);
804 }
805 if ex.in_flush {
806 None
807 } else {
808 ex.try_schedule_flush()
809 }
810 };
811 if let Some(sched) = maybe_sched {
812 sched.schedule(Box::new(flush));
813 }
814 });
815}
816
817pub(crate) fn cancel_scope_tasks(scope_id: u64) -> Vec<Pin<Box<dyn Future<Output = ()>>>> {
818 EXECUTOR.with(|exec| {
819 let mut ex = exec.borrow_mut();
820 let mut dropped = Vec::new();
821
822 for slot in &mut ex.tasks {
823 if let Some(ref t) = slot {
824 if t.scope_id == scope_id {
825 dropped.push(
826 slot.take()
827 .expect("task slot was None after is_some check")
828 .future,
829 );
830 }
831 }
832 }
833
834 // Filter queues.
835 let high: Vec<TaskId> = ex
836 .high_queue
837 .iter()
838 .filter(|id| ex.tasks[**id as usize].is_some())
839 .copied()
840 .collect();
841 ex.high_queue.clear();
842 ex.high_queue.extend(high);
843
844 let low: Vec<TaskId> = ex
845 .low_queue
846 .iter()
847 .filter(|id| ex.tasks[**id as usize].is_some())
848 .copied()
849 .collect();
850 ex.low_queue.clear();
851 ex.low_queue.extend(low);
852
853 let mut all_free: Vec<TaskId> = ex
854 .tasks
855 .iter()
856 .enumerate()
857 .filter(|(_, s)| s.is_none())
858 .map(|(i, _)| i as TaskId)
859 .chain(ex.free_slots.iter().copied())
860 .collect();
861 all_free.sort_unstable();
862 all_free.dedup();
863 ex.free_slots = all_free;
864
865 dropped
866 })
867}
868
869// ---------------------------------------------------------------------------
870// yield_now
871// ---------------------------------------------------------------------------
872
873/// Return a [`Future`] that yields control back to the executor once.
874#[must_use = "yield_now() does nothing unless awaited"]
875pub fn yield_now() -> YieldNow {
876 YieldNow { yielded: false }
877}
878
879/// Future returned by [`yield_now`].
880#[derive(Debug)]
881#[must_use = "futures do nothing unless polled"]
882pub struct YieldNow {
883 yielded: bool,
884}
885
886impl Future for YieldNow {
887 type Output = ();
888
889 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
890 if self.yielded {
891 Poll::Ready(())
892 } else {
893 self.yielded = true;
894 cx.waker().wake_by_ref();
895 Poll::Pending
896 }
897 }
898}
899
900// ---------------------------------------------------------------------------
901// schedule_callback — hook for auralis-signal's deferred callback model
902// ---------------------------------------------------------------------------
903
904/// Schedule a closure to run at the start of the next executor flush.
905///
906/// Used internally by `auralis_signal` to defer subscriber callback
907/// execution. The closure is drained before the main poll loop.
908pub fn schedule_callback(f: Box<dyn FnOnce()>) {
909 EXECUTOR.with(|exec| {
910 let maybe_sched = {
911 let mut ex = exec.borrow_mut();
912 ex.deferred_callbacks.push(f);
913 if ex.in_flush {
914 None
915 } else {
916 ex.try_schedule_flush()
917 }
918 };
919 if let Some(sched) = maybe_sched {
920 sched.schedule(Box::new(flush));
921 }
922 });
923}
924
925// ---------------------------------------------------------------------------
926// set_deferred
927// ---------------------------------------------------------------------------
928
929/// Schedule a [`Signal::set`] call for the **next** executor flush.
930///
931/// Safe to call from inside [`Drop`] — the actual `signal.set(value)` is
932/// deferred to a subsequent flush, avoiding re-entrant borrow panics.
933pub fn set_deferred<T: 'static>(signal: &Signal<T>, value: T) {
934 let signal = signal.clone();
935 EXECUTOR.with(|exec| {
936 let maybe_sched = {
937 let mut ex = exec.borrow_mut();
938 ex.deferred_ops.push(DeferredOp {
939 f: Box::new(move || signal.set(value)),
940 });
941 ex.try_schedule_flush()
942 };
943 if let Some(sched) = maybe_sched {
944 sched.schedule(Box::new(flush));
945 }
946 });
947}
948
949// ---------------------------------------------------------------------------
950// Test / debug helpers
951// ---------------------------------------------------------------------------
952
953/// Completely reset the global executor to a pristine state.
954///
955/// Clears all task slots, queues, deferred ops, flush/scheduler flags,
956/// and injected [`ScheduleFlush`]/[`TimeSource`]. Call at the start
957/// of every test to prevent cross-test state leakage.
958///
959/// # Safety / usage
960///
961/// This function is intended **only** for testing. Calling it while
962/// the executor is processing tasks will silently drop all live
963/// futures and may cause panics or undefined behavior in running
964/// application code.
965pub fn reset_executor_for_test() {
966 PENDING_WAKES.with(|pw| pw.borrow_mut().clear());
967 EXECUTOR.with(|exec| {
968 let mut ex = exec.borrow_mut();
969 ex.high_queue.clear();
970 ex.low_queue.clear();
971 ex.tasks.clear();
972 ex.free_slots.clear();
973 ex.next_task_id = 0;
974 ex.is_flush_scheduled = false;
975 ex.in_flush = false;
976 ex.deferred_ops.clear();
977 ex.deferred_callbacks.clear();
978 ex.flush_scheduler = None;
979 ex.time_source = None;
980 });
981 crate::scope::clear_scope_registry();
982}
983
984#[cfg(test)]
985pub(crate) fn debug_task_count() -> usize {
986 EXECUTOR.with(|exec| exec.borrow().tasks.iter().filter(|t| t.is_some()).count())
987}
988
989/// Return a snapshot of all active tasks: `(task_id, priority, scope_id)`.
990#[cfg(feature = "debug")]
991pub(crate) fn debug_task_snapshot() -> Vec<(TaskId, Priority, u64)> {
992 EXECUTOR.with(|exec| {
993 let ex = exec.borrow();
994 let mut snap = Vec::new();
995 for (idx, slot) in ex.tasks.iter().enumerate() {
996 if let Some(ref t) = slot {
997 snap.push((idx as u64, t.priority, t.scope_id));
998 }
999 }
1000 snap
1001 })
1002}
1003
1004/// Return the set of task IDs currently in the ready queues.
1005#[cfg(feature = "debug")]
1006pub(crate) fn debug_queued_task_ids() -> Vec<TaskId> {
1007 EXECUTOR.with(|exec| {
1008 let ex = exec.borrow();
1009 let mut ids: Vec<TaskId> = ex
1010 .high_queue
1011 .iter()
1012 .chain(ex.low_queue.iter())
1013 .copied()
1014 .collect();
1015 ids.sort_unstable();
1016 ids.dedup();
1017 ids
1018 })
1019}
1020
1021/// Spawn a task without triggering an automatic flush.
1022/// Used in tests to batch multiple spawns before executing them.
1023#[cfg(test)]
1024pub(crate) fn spawn_no_auto_flush(
1025 priority: Priority,
1026 future: impl Future<Output = ()> + 'static,
1027) -> TaskId {
1028 EXECUTOR.with(|exec| {
1029 let mut ex = exec.borrow_mut();
1030 let task_id = ex.allocate_id();
1031 ex.tasks[task_id as usize] = Some(TaskState {
1032 future: Box::pin(future),
1033 priority,
1034 scope_id: 0,
1035 });
1036 ex.enqueue(task_id);
1037 // Do NOT schedule flush.
1038 task_id
1039 })
1040}
1041
1042/// Run a manual flush cycle (for tests that need to control timing).
1043#[cfg(test)]
1044pub(crate) fn flush_all() {
1045 flush();
1046}