auralis_task/executor.rs
1//! Single-threaded executor with priority scheduling, time-budget
2//! awareness, and deferred-signal support.
3//!
4//! ## Architecture
5//!
6//! The executor is stored via a pluggable [`ExecutorStorage`] strategy
7//! (defaulting to a per-thread slot). Before polling a task the future
8//! is **temporarily removed** so that the poll never holds an executor
9//! borrow — this allows nested spawns, wakes, and `set_deferred` calls
10//! without `RefCell` panics.
11//!
12//! The waker carries only a `task_id: u64`, making it trivially
13//! [`Send`] + [`Sync`] for [`Waker::from`].
14
15#![allow(clippy::cast_possible_truncation)]
16
17use std::cell::{Cell, RefCell};
18use std::collections::{BTreeMap, VecDeque};
19use std::future::Future;
20use std::pin::Pin;
21use std::rc::Rc;
22use std::sync::Arc;
23use std::task::{Context, Poll, Wake, Waker};
24
25use auralis_signal::Signal;
26
27use crate::Priority;
28
29// ---------------------------------------------------------------------------
30// Types
31// ---------------------------------------------------------------------------
32
33type TaskId = u64;
34
35// ---------------------------------------------------------------------------
36// ScheduleFlush
37// ---------------------------------------------------------------------------
38
39/// Platform hook for scheduling a microtask callback.
40pub trait ScheduleFlush {
41 /// Request that `callback` runs at the next microtask boundary.
42 fn schedule(&self, callback: Box<dyn FnOnce()>);
43}
44
45/// A [`ScheduleFlush`] that fires the callback synchronously.
46///
47/// Makes the executor run-to-completion in unit tests without a browser
48/// event loop.
49#[cfg(test)]
50pub struct TestScheduleFlush;
51
52#[cfg(test)]
53impl ScheduleFlush for TestScheduleFlush {
54 fn schedule(&self, callback: Box<dyn FnOnce()>) {
55 callback();
56 }
57}
58
59// ---------------------------------------------------------------------------
60// TimeSource
61// ---------------------------------------------------------------------------
62
63/// High-resolution time source for the executor's time-budget
64/// accounting.
65///
66/// When registered via [`init_time_source`], the executor queries this
67/// before and after each task poll to decide whether it should yield
68/// control back to the host event loop (default budget: 8 ms).
69///
70/// In Wasm environments the implementation typically delegates to
71/// `performance.now()`. If no [`TimeSource`] is registered the time
72/// budget check is a no-op and the executor runs tasks until the
73/// queues are drained.
74pub trait TimeSource {
75 /// Return the current time in milliseconds.
76 fn now_ms(&self) -> u64;
77}
78
79/// A [`TimeSource`] whose value is explicitly controlled by the test.
80///
81/// Use [`set`](TestTimeSource::set) or [`advance`](TestTimeSource::advance)
82/// to simulate the passage of time during a flush cycle.
83#[cfg(test)]
84pub struct TestTimeSource {
85 now: std::cell::Cell<u64>,
86}
87
88#[cfg(test)]
89impl TestTimeSource {
90 /// Create a new [`TestTimeSource`] with the given initial time.
91 #[must_use]
92 pub fn new(initial_ms: u64) -> Self {
93 Self {
94 now: std::cell::Cell::new(initial_ms),
95 }
96 }
97
98 /// Set the current time to `ms` milliseconds.
99 pub fn set(&self, ms: u64) {
100 self.now.set(ms);
101 }
102
103 /// Advance the current time by `ms` milliseconds.
104 pub fn advance(&self, ms: u64) {
105 self.now.set(self.now.get() + ms);
106 }
107}
108
109#[cfg(test)]
110impl TimeSource for TestTimeSource {
111 fn now_ms(&self) -> u64 {
112 self.now.get()
113 }
114}
115
116// ---------------------------------------------------------------------------
117// TaskWaker
118// ---------------------------------------------------------------------------
119
120struct TaskWaker {
121 task_id: TaskId,
122 priority: Priority,
123}
124
125impl Wake for TaskWaker {
126 fn wake(self: Arc<Self>) {
127 let maybe_sched = EXECUTOR.with(|exec| {
128 if let Ok(mut ex) = exec.try_borrow_mut() {
129 match self.priority {
130 Priority::High => ex.high_queue.push_back(self.task_id),
131 Priority::Low => ex.low_queue.push_back(self.task_id),
132 }
133 // Only schedule a fresh flush if we're NOT already inside
134 // one (the running flush loop will pick up the task).
135 if ex.in_flush {
136 None
137 } else {
138 ex.try_schedule_flush()
139 }
140 } else {
141 PENDING_WAKES.with(|pw| {
142 pw.borrow_mut().push((self.task_id, self.priority));
143 });
144 None
145 }
146 });
147 if let Some(sched) = maybe_sched {
148 sched.schedule(Box::new(flush));
149 }
150 }
151}
152
153// ---------------------------------------------------------------------------
154// TaskState
155// ---------------------------------------------------------------------------
156
157struct TaskState {
158 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
159 priority: Priority,
160 scope_id: u64,
161}
162
163// ---------------------------------------------------------------------------
164// Executor
165// ---------------------------------------------------------------------------
166
167/// Information about a task panic, passed to the user-registered
168/// [`set_panic_hook`].
169#[derive(Debug)]
170pub struct PanicInfo {
171 /// The executor-assigned task id.
172 pub task_id: u64,
173 /// The scope that owned the task (0 for global tasks).
174 pub scope_id: u64,
175 /// The boxed panic payload.
176 pub payload: Box<dyn std::any::Any + Send>,
177}
178
179/// A single-threaded async task executor with priority queues.
180///
181/// Each [`Executor`] manages its own task slots, ready queues, and
182/// deferred callback buffers. Use [`Executor::new_instance`] to create
183/// an isolated executor (e.g. per SSR request), or use the global
184/// thread-local executor via [`spawn_global`](crate::spawn_global).
185pub struct Executor {
186 high_queue: VecDeque<TaskId>,
187 low_queue: VecDeque<TaskId>,
188 tasks: Vec<Option<TaskState>>,
189 free_slots: Vec<TaskId>,
190 next_task_id: TaskId,
191 is_flush_scheduled: bool,
192 in_flush: bool,
193 deferred_ops: Vec<DeferredOp>,
194 /// Callbacks pushed by `Signal::set` via the schedule hook.
195 /// Drained at the start of every flush before polling tasks.
196 deferred_callbacks: Vec<Box<dyn FnOnce()>>,
197 flush_scheduler: Option<Rc<dyn ScheduleFlush>>,
198 time_source: Option<Rc<dyn TimeSource>>,
199 /// Maximum milliseconds to spend inside a single flush before
200 /// yielding back to the host event loop. Default: 8 ms.
201 time_budget_ms: u64,
202 /// Optional hook invoked when a spawned task panics.
203 panic_hook: Option<Rc<dyn Fn(PanicInfo)>>,
204 /// Timer queue: map from deadline (ms) to task ids that should be
205 /// woken when that deadline expires. Processed at the start of
206 /// every flush.
207 timers: BTreeMap<u64, Vec<TaskId>>,
208}
209
210// Set by the executor before polling a task, cleared afterward.
211// Lets futures discover their task id without threading it through
212// layers of combinators.
213thread_local! {
214 static CURRENT_POLLING_TASK: Cell<Option<TaskId>> = const { Cell::new(None) };
215}
216
217pub(crate) fn with_current_polling_task<R>(f: impl FnOnce(Option<TaskId>) -> R) -> R {
218 CURRENT_POLLING_TASK.with(|c| f(c.get()))
219}
220
221struct DeferredOp {
222 f: Box<dyn FnOnce()>,
223}
224
225impl Executor {
226 fn new() -> Self {
227 Self {
228 high_queue: VecDeque::new(),
229 low_queue: VecDeque::new(),
230 tasks: Vec::new(),
231 free_slots: Vec::new(),
232 next_task_id: 0,
233 is_flush_scheduled: false,
234 in_flush: false,
235 deferred_ops: Vec::new(),
236 deferred_callbacks: Vec::new(),
237 flush_scheduler: None,
238 time_source: None,
239 time_budget_ms: 8,
240 panic_hook: None,
241 timers: BTreeMap::new(),
242 }
243 }
244
245 fn allocate_id(&mut self) -> TaskId {
246 if let Some(id) = self.free_slots.pop() {
247 return id;
248 }
249 let id = self.next_task_id;
250 self.next_task_id += 1;
251 self.tasks.push(None);
252 id
253 }
254
255 fn free_slot(&mut self, task_id: TaskId) {
256 self.tasks[task_id as usize] = None;
257 self.free_slots.push(task_id);
258 }
259
260 fn enqueue(&mut self, task_id: TaskId) {
261 let priority = match self.tasks.get(task_id as usize).and_then(Option::as_ref) {
262 Some(t) => t.priority,
263 None => return,
264 };
265 match priority {
266 Priority::High => self.high_queue.push_back(task_id),
267 Priority::Low => self.low_queue.push_back(task_id),
268 }
269 }
270
271 fn dequeue(&mut self) -> Option<TaskId> {
272 self.high_queue
273 .pop_front()
274 .or_else(|| self.low_queue.pop_front())
275 }
276
277 /// Mark that a flush is needed and return the scheduler if one is
278 /// registered. The caller **must** invoke the scheduler **after**
279 /// releasing the executor borrow.
280 fn try_schedule_flush(&mut self) -> Option<Rc<dyn ScheduleFlush>> {
281 if self.is_flush_scheduled {
282 return None;
283 }
284 self.is_flush_scheduled = true;
285 self.flush_scheduler.clone()
286 }
287
288 /// Return the current time in ms, or 0 if no [`TimeSource`] is
289 /// registered. When this returns 0 the time-budget check is
290 /// effectively a no-op.
291 pub(crate) fn now_ms(&self) -> u64 {
292 self.time_source.as_ref().map_or(0, |ts| ts.now_ms())
293 }
294
295 /// Return the number of currently active (not-yet-completed) tasks.
296 ///
297 /// Used by streaming SSR to determine whether the stream should
298 /// wait for more work or terminate.
299 #[must_use]
300 pub fn active_task_count(&self) -> usize {
301 self.tasks.iter().filter(|t| t.is_some()).count()
302 }
303}
304
305// ---------------------------------------------------------------------------
306// Thread-local globals (default storage)
307// ---------------------------------------------------------------------------
308
309thread_local! {
310 static EXECUTOR: Rc<RefCell<Executor>> = Rc::new(RefCell::new(Executor::new()));
311 static PENDING_WAKES: RefCell<Vec<(TaskId, Priority)>> = const { RefCell::new(Vec::new()) };
312}
313
314// ---------------------------------------------------------------------------
315// Executor instance methods (for isolated executors, e.g. SSR)
316// ---------------------------------------------------------------------------
317
318impl Executor {
319 /// Create a new isolated executor, wrapped for shared access.
320 ///
321 /// The returned executor is independent of the global thread-local
322 /// executor. Use [`with_executor`] to make it the current executor
323 /// for the duration of a closure, so that spawned tasks and signal
324 /// callbacks are routed to it.
325 #[must_use]
326 pub fn new_instance() -> Rc<RefCell<Executor>> {
327 Rc::new(RefCell::new(Executor::new()))
328 }
329
330 /// Install a flush scheduler on this executor instance.
331 pub fn install_flush_scheduler(ex: &Rc<RefCell<Executor>>, sched: Rc<dyn ScheduleFlush>) {
332 ex.borrow_mut().flush_scheduler = Some(sched);
333 }
334
335 /// Install a time source on this executor instance.
336 pub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>) {
337 ex.borrow_mut().time_source = Some(ts);
338 }
339
340 /// Set the maximum time (in milliseconds) a single flush may spend
341 /// before yielding back to the host event loop.
342 ///
343 /// The default is 8 ms. Set to `u64::MAX` to disable time-budget
344 /// yielding (flush runs to completion).
345 pub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64) {
346 ex.borrow_mut().time_budget_ms = budget_ms;
347 }
348
349 /// Register a callback invoked whenever a spawned task panics.
350 ///
351 /// The default is no hook — panicking tasks are silently removed
352 /// from the executor (the same behaviour as a task returning
353 /// `Poll::Ready(())`).
354 ///
355 /// # Example
356 ///
357 /// ```rust,ignore
358 /// Executor::set_panic_hook(&ex, Rc::new(|info| {
359 /// eprintln!("task {} in scope {} panicked", info.task_id, info.scope_id);
360 /// }));
361 /// ```
362 pub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>) {
363 ex.borrow_mut().panic_hook = Some(hook);
364 }
365
366 /// Register a timer: when `now_ms() >= deadline_ms`, enqueue
367 /// `task_id` so it gets polled on the next flush.
368 pub(crate) fn schedule_timer(ex: &Rc<RefCell<Executor>>, deadline_ms: u64, task_id: TaskId) {
369 let mut e = ex.borrow_mut();
370 e.timers.entry(deadline_ms).or_default().push(task_id);
371 // Request a flush so the timer is checked.
372 e.is_flush_scheduled = false;
373 let maybe_sched = e.try_schedule_flush();
374 drop(e);
375 if let Some(sched) = maybe_sched {
376 let ex2 = Rc::clone(ex);
377 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
378 }
379 }
380
381 /// Spawn a future on this executor instance.
382 pub fn spawn(ex: &Rc<RefCell<Executor>>, future: impl Future<Output = ()> + 'static) {
383 let maybe_sched = {
384 let mut e = ex.borrow_mut();
385 let tid = e.allocate_id();
386 e.tasks[tid as usize] = Some(TaskState {
387 future: Box::pin(future),
388 priority: Priority::Low,
389 scope_id: 0,
390 });
391 e.enqueue(tid);
392 e.try_schedule_flush()
393 };
394 if let Some(sched) = maybe_sched {
395 let ex2 = Rc::clone(ex);
396 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
397 }
398 }
399
400 /// Run a full flush cycle on this executor instance.
401 ///
402 /// Mirrors the global flush cycle but operates on an
403 /// isolated executor (used for SSR). Includes all the same
404 /// protections: `catch_unwind`, suspend checks, time-budget
405 /// yielding, and callback-drain budget.
406 #[allow(clippy::too_many_lines)]
407 pub fn flush_instance(ex: &Rc<RefCell<Executor>>) {
408 // Guard against re-entrant flushes.
409 {
410 let mut e = ex.borrow_mut();
411 if e.in_flush {
412 #[cfg(debug_assertions)]
413 {
414 eprintln!(
415 "[auralis-task] WARNING: Executor::flush_instance called \
416 re-entrantly (already inside a flush). This is a no-op. \
417 Check for nested flush() calls in signal callbacks or \
418 ScheduleFlush implementations."
419 );
420 }
421 return;
422 }
423 e.in_flush = true;
424 }
425
426 // Step 0: drain expired timers.
427 {
428 let mut e = ex.borrow_mut();
429 let now = e.now_ms();
430 if now > 0 {
431 let expired: Vec<u64> = e.timers.keys().copied().take_while(|&d| d <= now).collect();
432 for deadline in expired {
433 if let Some(tasks) = e.timers.remove(&deadline) {
434 for tid in tasks {
435 e.enqueue(tid);
436 }
437 }
438 }
439 }
440 }
441
442 // Step 1: deferred ops.
443 let deferred = std::mem::take(&mut ex.borrow_mut().deferred_ops);
444 for op in deferred {
445 (op.f)();
446 }
447
448 // Step 2: drain deferred signal callbacks with time budget.
449 {
450 let cb_start = ex.borrow().now_ms();
451 loop {
452 let callbacks = std::mem::take(&mut ex.borrow_mut().deferred_callbacks);
453 if callbacks.is_empty() {
454 break;
455 }
456 for cb in callbacks {
457 cb();
458 }
459 if ex.borrow().now_ms().saturating_sub(cb_start) >= ex.borrow().time_budget_ms {
460 if !ex.borrow().deferred_callbacks.is_empty() {
461 let (sched, ex2) = {
462 let mut e = ex.borrow_mut();
463 e.in_flush = false;
464 e.is_flush_scheduled = false;
465 (e.try_schedule_flush(), Rc::clone(ex))
466 };
467 if let Some(sched) = sched {
468 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
469 }
470 return;
471 }
472 break;
473 }
474 }
475 }
476
477 // Step 3: main poll loop with time-budget check.
478 let poll_start = ex.borrow().now_ms();
479 loop {
480 let task_id = ex.borrow_mut().dequeue();
481 let Some(tid) = task_id else {
482 let mut e = ex.borrow_mut();
483 e.is_flush_scheduled = false;
484 e.in_flush = false;
485 break;
486 };
487
488 // Take the task out so the poll doesn't hold an executor borrow.
489 let maybe_state = ex.borrow_mut().tasks[tid as usize].take();
490 if let Some(mut state) = maybe_state {
491 let priority = state.priority;
492 let scope_id = state.scope_id;
493
494 // Check if the owning scope is suspended.
495 let scope = crate::scope::find_scope(scope_id);
496 if let Some(ref s) = scope {
497 if s.is_suspended() {
498 let mut e = ex.borrow_mut();
499 if e.tasks[tid as usize].is_none() {
500 e.tasks[tid as usize] = Some(state);
501 }
502 continue;
503 }
504 }
505
506 let waker = Waker::from(Arc::new(TaskWaker {
507 task_id: tid,
508 priority,
509 }));
510 let mut cx = Context::from_waker(&waker);
511
512 // Inject owning scope.
513 let prev_scope = crate::scope::get_scope_direct();
514 if scope.is_some() {
515 crate::scope::set_scope_direct(scope);
516 }
517
518 // Let futures discover their task id (used by timer::sleep).
519 CURRENT_POLLING_TASK.with(|c| c.set(Some(tid)));
520
521 // Task isolation (non-Wasm).
522 #[cfg(not(target_arch = "wasm32"))]
523 let result: Result<Poll<()>, Box<dyn std::any::Any + Send>> =
524 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
525 state.future.as_mut().poll(&mut cx)
526 }));
527 #[cfg(target_arch = "wasm32")]
528 let poll = state.future.as_mut().poll(&mut cx);
529
530 CURRENT_POLLING_TASK.with(|c| c.set(None));
531 crate::scope::set_scope_direct(prev_scope);
532
533 #[cfg(not(target_arch = "wasm32"))]
534 {
535 match result {
536 Ok(Poll::Ready(())) => {
537 ex.borrow_mut().free_slot(tid);
538 }
539 Err(payload) => {
540 // Notify the panic hook (if any) before freeing the slot.
541 let hook = ex.borrow().panic_hook.clone();
542 if let Some(h) = hook {
543 h(PanicInfo {
544 task_id: tid,
545 scope_id,
546 payload,
547 });
548 }
549 ex.borrow_mut().free_slot(tid);
550 }
551 Ok(Poll::Pending) => {
552 let mut e = ex.borrow_mut();
553 if e.tasks[tid as usize].is_none() {
554 e.tasks[tid as usize] = Some(state);
555 }
556 }
557 }
558 }
559 #[cfg(target_arch = "wasm32")]
560 {
561 match poll {
562 Poll::Ready(()) => {
563 ex.borrow_mut().free_slot(tid);
564 }
565 Poll::Pending => {
566 let mut e = ex.borrow_mut();
567 if e.tasks[tid as usize].is_none() {
568 e.tasks[tid as usize] = Some(state);
569 }
570 }
571 }
572 }
573 }
574
575 // Time budget check.
576 {
577 let elapsed = ex.borrow().now_ms().saturating_sub(poll_start);
578 if elapsed >= ex.borrow().time_budget_ms {
579 let (maybe_sched, ex_clone) = {
580 let mut e = ex.borrow_mut();
581 e.is_flush_scheduled = false;
582 e.in_flush = false;
583 let sched = if !e.high_queue.is_empty() || !e.low_queue.is_empty() {
584 e.try_schedule_flush()
585 } else {
586 None
587 };
588 (sched, Rc::clone(ex))
589 };
590 if let Some(sched) = maybe_sched {
591 sched.schedule(Box::new(move || Self::flush_instance(&ex_clone)));
592 }
593 break;
594 }
595 }
596 }
597 }
598}
599
600// ---------------------------------------------------------------------------
601// Current-executor storage — injectable, defaults to thread-local
602// ---------------------------------------------------------------------------
603
604type ExecutorRef = Rc<RefCell<Executor>>;
605
606thread_local! {
607 static CURRENT_EXECUTOR: RefCell<Option<ExecutorRef>> = const { RefCell::new(None) };
608}
609
610/// Run `f` with `ex` set as the current executor.
611///
612/// Signal callbacks and `spawn_global` calls inside `f` will be routed
613/// to `ex` instead of the global thread-local executor. Restores the
614/// previous executor afterward.
615///
616/// # Signal routing constraints
617///
618/// Auralis uses a **single global schedule hook** (installed once by the
619/// first call to [`init_flush_scheduler`]) that decides where signal
620/// notifications land by checking the current executor **at the time the
621/// notification fires**, not at the time `Signal::set` is called.
622///
623/// This design implies two hard requirements for multi-instance users:
624///
625/// 1. **`init_flush_scheduler` must be called at least once** — without
626/// it, `Signal::set` falls back to synchronous callback execution,
627/// which breaks the deferred-notification model and can cause
628/// re-entrant borrow panics.
629/// 2. **The instance executor must still be "current" when the flush
630/// runs** — if `with_executor` has already exited, deferred callbacks
631/// from signals set inside `f` will be routed to the global executor
632/// (or synchronously if no global hook is installed).
633///
634/// For the typical single-threaded case (Wasm, game loop, CLI), both
635/// requirements are satisfied trivially: call `init_flush_scheduler`
636/// once at startup and never use `with_executor`. For SSR / multi-tenant
637/// servers, ensure that `with_executor` wraps the entire request
638/// lifecycle — from signal creation through the final flush.
639///
640/// # Example
641///
642/// ```rust,ignore
643/// use auralis_task::Executor;
644///
645/// let ex = Executor::new_instance();
646/// Executor::install_flush_scheduler(&ex, my_scheduler);
647/// auralis_task::with_executor(&ex, || {
648/// // Signal notifications and task spawns here go to `ex`.
649/// });
650/// ```
651pub fn with_executor<R>(ex: &ExecutorRef, f: impl FnOnce() -> R) -> R {
652 CURRENT_EXECUTOR.with(|exec| {
653 let prev = exec.borrow_mut().replace(Rc::clone(ex));
654 let result = f();
655 *exec.borrow_mut() = prev;
656 result
657 })
658}
659
660/// Return the current executor, if any.
661///
662/// If no executor has been set via [`with_executor`], returns `None` —
663/// callers should fall back to the global thread-local executor.
664fn current_executor() -> Option<ExecutorRef> {
665 CURRENT_EXECUTOR.with(|exec| exec.borrow().clone())
666}
667
668/// Return the currently active executor instance.
669///
670/// If [`with_executor`] was used to set an instance executor, returns
671/// that; otherwise returns the global thread-local executor.
672pub(crate) fn current_executor_instance() -> ExecutorRef {
673 current_executor().unwrap_or_else(|| EXECUTOR.with(Rc::clone))
674}
675
676/// Return the current time in milliseconds from the active executor's
677/// [`TimeSource`], or 0 if none is installed.
678pub(crate) fn current_time_ms() -> u64 {
679 current_executor_instance().borrow().now_ms()
680}
681
682// ---------------------------------------------------------------------------
683// Helpers — use thread_local EXECUTOR
684// ---------------------------------------------------------------------------
685
686fn drain_pending_wakes() {
687 PENDING_WAKES.with(|pw| {
688 let wakes = std::mem::take(&mut *pw.borrow_mut());
689 if wakes.is_empty() {
690 return;
691 }
692 EXECUTOR.with(|exec| {
693 let maybe_sched = {
694 let mut ex = exec.borrow_mut();
695 for (id, priority) in wakes {
696 match priority {
697 Priority::High => ex.high_queue.push_back(id),
698 Priority::Low => ex.low_queue.push_back(id),
699 }
700 }
701 ex.try_schedule_flush()
702 };
703 if let Some(sched) = maybe_sched {
704 sched.schedule(Box::new(flush));
705 }
706 });
707 });
708}
709
710// ---------------------------------------------------------------------------
711// Flush
712// ---------------------------------------------------------------------------
713
714fn flush() {
715 EXECUTOR.with(Executor::flush_instance);
716 // Drain any wakes that landed in PENDING_WAKES because the executor
717 // RefCell was borrowed during a callback or task poll.
718 drain_pending_wakes();
719}
720
721// ---------------------------------------------------------------------------
722// Public API
723// ---------------------------------------------------------------------------
724
725/// Set the platform flush scheduler and install the signal deferred-
726/// callback hook (idempotent — subsequent calls are no-ops for the hook).
727pub fn init_flush_scheduler(sched: Rc<dyn ScheduleFlush>) {
728 EXECUTOR.with(|exec| exec.borrow_mut().flush_scheduler = Some(sched));
729 install_signal_hook_once();
730}
731
732/// Install the hook that bridges `auralis_signal::Signal::set` to the
733/// executor's deferred-callback queue.
734///
735/// Idempotent — safe to call multiple times.
736fn install_signal_hook_once() {
737 use std::sync::OnceLock;
738 static INSTALLED: OnceLock<()> = OnceLock::new();
739 INSTALLED.get_or_init(|| {
740 auralis_signal::install_schedule_hook(Box::new(|cb: Box<dyn FnOnce()>| {
741 // Prefer the current executor (set via `with_executor`) for
742 // SSR multi-request isolation; fall back to the global one.
743 if let Some(ex) = current_executor() {
744 let maybe_sched = {
745 let mut e = ex.borrow_mut();
746 e.deferred_callbacks.push(cb);
747 if e.in_flush {
748 None
749 } else {
750 e.try_schedule_flush()
751 }
752 };
753 if let Some(sched) = maybe_sched {
754 let ex2 = Rc::clone(&ex);
755 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
756 }
757 } else {
758 EXECUTOR.with(|exec| {
759 let maybe_sched = {
760 let mut ex = exec.borrow_mut();
761 ex.deferred_callbacks.push(cb);
762 if ex.in_flush {
763 None
764 } else {
765 ex.try_schedule_flush()
766 }
767 };
768 if let Some(sched) = maybe_sched {
769 sched.schedule(Box::new(flush));
770 }
771 });
772 }
773 }));
774 });
775}
776
777/// Set the platform time source used for time-budget accounting.
778///
779/// If no [`TimeSource`] is registered the executor runs every flush to
780/// completion without yielding, which is acceptable for short-running
781/// workloads but may cause frame drops in the browser.
782pub fn init_time_source(ts: Rc<dyn TimeSource>) {
783 EXECUTOR.with(|exec| exec.borrow_mut().time_source = Some(ts));
784}
785
786/// Set the per-flush time budget on the global executor.
787///
788/// See [`Executor::set_time_budget`] for details.
789pub fn set_global_time_budget(budget_ms: u64) {
790 EXECUTOR.with(|exec| exec.borrow_mut().time_budget_ms = budget_ms);
791}
792
793/// Register a global panic hook called when any globally-spawned
794/// task panics.
795///
796/// See [`Executor::set_panic_hook`] for details.
797pub fn set_panic_hook(hook: Rc<dyn Fn(PanicInfo)>) {
798 EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = Some(hook));
799}
800
801/// Remove the global panic hook, restoring the default silent
802/// behaviour.
803pub fn remove_panic_hook() {
804 EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = None);
805}
806
807/// Spawn a future on the global executor at low priority.
808pub fn spawn_global(future: impl Future<Output = ()> + 'static) {
809 spawn_global_with_priority(Priority::Low, future);
810}
811
812/// Spawn a future on the global executor at the given priority.
813pub fn spawn_global_with_priority(priority: Priority, future: impl Future<Output = ()> + 'static) {
814 spawn_inner(Box::pin(future), priority, 0);
815}
816
817pub(crate) fn spawn_scoped(
818 priority: Priority,
819 scope_id: u64,
820 future: impl Future<Output = ()> + 'static,
821) -> TaskId {
822 spawn_inner(Box::pin(future), priority, scope_id)
823}
824
825fn spawn_inner(
826 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
827 priority: Priority,
828 scope_id: u64,
829) -> TaskId {
830 EXECUTOR.with(|exec| {
831 let (task_id, maybe_sched) = {
832 let mut ex = exec.borrow_mut();
833 let task_id = ex.allocate_id();
834 ex.tasks[task_id as usize] = Some(TaskState {
835 future,
836 priority,
837 scope_id,
838 });
839 ex.enqueue(task_id);
840 let sched = ex.try_schedule_flush();
841 (task_id, sched)
842 };
843 // Schedule outside the borrow.
844 if let Some(sched) = maybe_sched {
845 sched.schedule(Box::new(flush));
846 }
847 task_id
848 })
849}
850
851/// Enqueue all tasks belonging to `scope_id` and trigger a flush.
852///
853/// Used by [`TaskScope::resume`] to restart tasks after a suspend.
854pub(crate) fn enqueue_scope_tasks(scope_id: u64) {
855 EXECUTOR.with(|exec| {
856 let task_ids: Vec<TaskId> = {
857 let ex = exec.borrow();
858 ex.tasks
859 .iter()
860 .enumerate()
861 .filter(|(_, slot)| slot.as_ref().is_some_and(|t| t.scope_id == scope_id))
862 .map(|(idx, _)| idx as TaskId)
863 .collect()
864 };
865 let maybe_sched = {
866 let mut ex = exec.borrow_mut();
867 for tid in task_ids {
868 ex.enqueue(tid);
869 }
870 if ex.in_flush {
871 None
872 } else {
873 ex.try_schedule_flush()
874 }
875 };
876 if let Some(sched) = maybe_sched {
877 sched.schedule(Box::new(flush));
878 }
879 });
880}
881
882pub(crate) fn cancel_scope_tasks(scope_id: u64) -> Vec<Pin<Box<dyn Future<Output = ()>>>> {
883 EXECUTOR.with(|exec| {
884 let mut ex = exec.borrow_mut();
885 let mut dropped = Vec::new();
886
887 for slot in &mut ex.tasks {
888 if let Some(ref t) = slot {
889 if t.scope_id == scope_id {
890 dropped.push(
891 slot.take()
892 .expect("task slot was None after is_some check")
893 .future,
894 );
895 }
896 }
897 }
898
899 // Filter queues.
900 let high: Vec<TaskId> = ex
901 .high_queue
902 .iter()
903 .filter(|id| ex.tasks[**id as usize].is_some())
904 .copied()
905 .collect();
906 ex.high_queue.clear();
907 ex.high_queue.extend(high);
908
909 let low: Vec<TaskId> = ex
910 .low_queue
911 .iter()
912 .filter(|id| ex.tasks[**id as usize].is_some())
913 .copied()
914 .collect();
915 ex.low_queue.clear();
916 ex.low_queue.extend(low);
917
918 let mut all_free: Vec<TaskId> = ex
919 .tasks
920 .iter()
921 .enumerate()
922 .filter(|(_, s)| s.is_none())
923 .map(|(i, _)| i as TaskId)
924 .chain(ex.free_slots.iter().copied())
925 .collect();
926 all_free.sort_unstable();
927 all_free.dedup();
928 ex.free_slots = all_free;
929
930 dropped
931 })
932}
933
934// ---------------------------------------------------------------------------
935// yield_now
936// ---------------------------------------------------------------------------
937
938/// Return a [`Future`] that yields control back to the executor once.
939#[must_use = "yield_now() does nothing unless awaited"]
940pub fn yield_now() -> YieldNow {
941 YieldNow { yielded: false }
942}
943
944/// Future returned by [`yield_now`].
945#[derive(Debug)]
946#[must_use = "futures do nothing unless polled"]
947pub struct YieldNow {
948 yielded: bool,
949}
950
951impl Future for YieldNow {
952 type Output = ();
953
954 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
955 if self.yielded {
956 Poll::Ready(())
957 } else {
958 self.yielded = true;
959 cx.waker().wake_by_ref();
960 Poll::Pending
961 }
962 }
963}
964
965// ---------------------------------------------------------------------------
966// schedule_callback — hook for auralis-signal's deferred callback model
967// ---------------------------------------------------------------------------
968
969/// Schedule a closure to run at the start of the next executor flush.
970///
971/// Used internally by `auralis_signal` to defer subscriber callback
972/// execution. The closure is drained before the main poll loop.
973pub fn schedule_callback(f: Box<dyn FnOnce()>) {
974 EXECUTOR.with(|exec| {
975 let maybe_sched = {
976 let mut ex = exec.borrow_mut();
977 ex.deferred_callbacks.push(f);
978 if ex.in_flush {
979 None
980 } else {
981 ex.try_schedule_flush()
982 }
983 };
984 if let Some(sched) = maybe_sched {
985 sched.schedule(Box::new(flush));
986 }
987 });
988}
989
990// ---------------------------------------------------------------------------
991// set_deferred
992// ---------------------------------------------------------------------------
993
994/// Schedule a [`Signal::set`] call for the **next** executor flush.
995///
996/// Safe to call from inside [`Drop`] — the actual `signal.set(value)` is
997/// deferred to a subsequent flush, avoiding re-entrant borrow panics.
998pub fn set_deferred<T: 'static>(signal: &Signal<T>, value: T) {
999 let signal = signal.clone();
1000 EXECUTOR.with(|exec| {
1001 let maybe_sched = {
1002 let mut ex = exec.borrow_mut();
1003 ex.deferred_ops.push(DeferredOp {
1004 f: Box::new(move || signal.set(value)),
1005 });
1006 ex.try_schedule_flush()
1007 };
1008 if let Some(sched) = maybe_sched {
1009 sched.schedule(Box::new(flush));
1010 }
1011 });
1012}
1013
1014// ---------------------------------------------------------------------------
1015// Test / debug helpers
1016// ---------------------------------------------------------------------------
1017
1018/// Completely reset the global executor to a pristine state.
1019///
1020/// Clears all task slots, queues, deferred ops, flush/scheduler flags,
1021/// and injected [`ScheduleFlush`]/[`TimeSource`]. Call at the start
1022/// of every test to prevent cross-test state leakage.
1023///
1024/// # Safety / usage
1025///
1026/// This function is intended **only** for testing. Calling it while
1027/// the executor is processing tasks will silently drop all live
1028/// futures and may cause panics or undefined behavior in running
1029/// application code.
1030pub fn reset_executor_for_test() {
1031 PENDING_WAKES.with(|pw| pw.borrow_mut().clear());
1032 EXECUTOR.with(|exec| {
1033 let mut ex = exec.borrow_mut();
1034 ex.high_queue.clear();
1035 ex.low_queue.clear();
1036 ex.tasks.clear();
1037 ex.free_slots.clear();
1038 ex.next_task_id = 0;
1039 ex.is_flush_scheduled = false;
1040 ex.in_flush = false;
1041 ex.deferred_ops.clear();
1042 ex.deferred_callbacks.clear();
1043 ex.flush_scheduler = None;
1044 ex.time_source = None;
1045 });
1046 crate::scope::clear_scope_registry();
1047}
1048
1049#[cfg(test)]
1050pub(crate) fn debug_task_count() -> usize {
1051 EXECUTOR.with(|exec| exec.borrow().tasks.iter().filter(|t| t.is_some()).count())
1052}
1053
1054/// Return a snapshot of all active tasks: `(task_id, priority, scope_id)`.
1055#[cfg(feature = "debug")]
1056pub(crate) fn debug_task_snapshot() -> Vec<(TaskId, Priority, u64)> {
1057 EXECUTOR.with(|exec| {
1058 let ex = exec.borrow();
1059 let mut snap = Vec::new();
1060 for (idx, slot) in ex.tasks.iter().enumerate() {
1061 if let Some(ref t) = slot {
1062 snap.push((idx as u64, t.priority, t.scope_id));
1063 }
1064 }
1065 snap
1066 })
1067}
1068
1069/// Return the set of task IDs currently in the ready queues.
1070#[cfg(feature = "debug")]
1071pub(crate) fn debug_queued_task_ids() -> Vec<TaskId> {
1072 EXECUTOR.with(|exec| {
1073 let ex = exec.borrow();
1074 let mut ids: Vec<TaskId> = ex
1075 .high_queue
1076 .iter()
1077 .chain(ex.low_queue.iter())
1078 .copied()
1079 .collect();
1080 ids.sort_unstable();
1081 ids.dedup();
1082 ids
1083 })
1084}
1085
1086/// Spawn a task without triggering an automatic flush.
1087/// Used in tests to batch multiple spawns before executing them.
1088#[cfg(test)]
1089pub(crate) fn spawn_no_auto_flush(
1090 priority: Priority,
1091 future: impl Future<Output = ()> + 'static,
1092) -> TaskId {
1093 EXECUTOR.with(|exec| {
1094 let mut ex = exec.borrow_mut();
1095 let task_id = ex.allocate_id();
1096 ex.tasks[task_id as usize] = Some(TaskState {
1097 future: Box::pin(future),
1098 priority,
1099 scope_id: 0,
1100 });
1101 ex.enqueue(task_id);
1102 // Do NOT schedule flush.
1103 task_id
1104 })
1105}
1106
1107/// Run a manual flush cycle (for tests that need to control timing).
1108#[cfg(test)]
1109pub(crate) fn flush_all() {
1110 flush();
1111}