auralis_task/executor.rs
1//! Single-threaded executor with priority scheduling, time-budget
2//! awareness, and deferred-signal support.
3//!
4//! ## Architecture
5//!
6//! The executor is stored via a pluggable [`ExecutorStorage`] strategy
7//! (defaulting to a per-thread slot). Before polling a task the future
8//! is **temporarily removed** so that the poll never holds an executor
9//! borrow — this allows nested spawns, wakes, and `set_deferred` calls
10//! without `RefCell` panics.
11//!
12//! The waker carries only a `task_id: u64`, making it trivially
13//! [`Send`] + [`Sync`] for [`Waker::from`].
14
15#![allow(clippy::cast_possible_truncation)]
16
17use std::cell::{Cell, RefCell};
18use std::collections::{BTreeMap, VecDeque};
19use std::future::Future;
20use std::pin::Pin;
21use std::rc::{Rc, Weak};
22use std::sync::Arc;
23use std::task::{Context, Poll, Wake, Waker};
24
25use auralis_signal::Signal;
26
27use crate::Priority;
28
29// ---------------------------------------------------------------------------
30// Types
31// ---------------------------------------------------------------------------
32
33type TaskId = u64;
34
35// ---------------------------------------------------------------------------
36// ScheduleFlush
37// ---------------------------------------------------------------------------
38
39/// Platform hook for scheduling a microtask callback.
40pub trait ScheduleFlush {
41 /// Request that `callback` runs at the next microtask boundary.
42 fn schedule(&self, callback: Box<dyn FnOnce()>);
43}
44
45/// A [`ScheduleFlush`] that fires the callback synchronously.
46///
47/// Makes the executor run-to-completion in unit tests without a browser
48/// event loop.
49#[cfg(test)]
50pub struct TestScheduleFlush;
51
52#[cfg(test)]
53impl ScheduleFlush for TestScheduleFlush {
54 fn schedule(&self, callback: Box<dyn FnOnce()>) {
55 callback();
56 }
57}
58
59// ---------------------------------------------------------------------------
60// TimeSource
61// ---------------------------------------------------------------------------
62
63/// High-resolution time source for the executor's time-budget
64/// accounting.
65///
66/// When registered via [`init_time_source`], the executor queries this
67/// before and after each task poll to decide whether it should yield
68/// control back to the host event loop (default budget: 8 ms).
69///
70/// In Wasm environments the implementation typically delegates to
71/// `performance.now()`. If no [`TimeSource`] is registered the time
72/// budget check is a no-op and the executor runs tasks until the
73/// queues are drained.
74pub trait TimeSource {
75 /// Return the current time in milliseconds.
76 fn now_ms(&self) -> u64;
77}
78
79/// A [`TimeSource`] whose value is explicitly controlled by the test.
80///
81/// Use [`set`](TestTimeSource::set) or [`advance`](TestTimeSource::advance)
82/// to simulate the passage of time during a flush cycle.
83#[cfg(test)]
84pub struct TestTimeSource {
85 now: std::cell::Cell<u64>,
86}
87
88#[cfg(test)]
89impl TestTimeSource {
90 /// Create a new [`TestTimeSource`] with the given initial time.
91 #[must_use]
92 pub fn new(initial_ms: u64) -> Self {
93 Self {
94 now: std::cell::Cell::new(initial_ms),
95 }
96 }
97
98 /// Set the current time to `ms` milliseconds.
99 pub fn set(&self, ms: u64) {
100 self.now.set(ms);
101 }
102
103 /// Advance the current time by `ms` milliseconds.
104 pub fn advance(&self, ms: u64) {
105 self.now.set(self.now.get() + ms);
106 }
107}
108
109#[cfg(test)]
110impl TimeSource for TestTimeSource {
111 fn now_ms(&self) -> u64 {
112 self.now.get()
113 }
114}
115
116// ---------------------------------------------------------------------------
117// TaskWaker — routes wakes to the correct executor via a slot table.
118//
119// Waker::from requires Send + Sync + 'static, so the waker cannot hold
120// an Rc<RefCell<Executor>>. Instead it stores a slot index + generation
121// number. The SLOTS thread_local maps (index, generation) → Weak<Executor>.
122// On wake, the generation is validated before the weak pointer is upgraded.
123// Dead slots are reclaimed when new executors are registered.
124// ---------------------------------------------------------------------------
125
126/// A registered executor slot. The `generation` counter distinguishes
127/// between successive executors that occupy the same slot index (e.g.
128/// after the previous one was dropped and a new one recycles the slot).
129struct Slot {
130 weak: Weak<RefCell<Executor>>,
131 /// Incremented (wrapping) every time this slot is reused.
132 /// A [`TaskWaker`] must present the generation it was created with;
133 /// a mismatch means the waker is stale and is silently ignored.
134 generation: u64,
135}
136
137thread_local! {
138 /// Slot 0 is reserved for the global executor. Instance executors
139 /// occupy subsequent slots. Dead slots (Weak::upgrade returns None)
140 /// are recycled in [`register_executor`].
141 static SLOTS: RefCell<Vec<Slot>> = const { RefCell::new(Vec::new()) };
142}
143
144/// Register an executor in the slot table, returning the assigned
145/// (`slot_id`, `generation`) pair. Dead slots are recycled in-place;
146/// if no dead slot is found a new entry is appended.
147fn register_executor(weak: Weak<RefCell<Executor>>) -> (u64, u64) {
148 SLOTS.with(|slots| {
149 let mut slots = slots.borrow_mut();
150 for (i, slot) in slots.iter_mut().enumerate() {
151 if slot.weak.upgrade().is_none() {
152 slot.weak = weak;
153 // Wrapping is safe: 2^64 reuses of a single slot
154 // would take ~10^14 years at 1 reuse/μs.
155 slot.generation = slot.generation.wrapping_add(1);
156 return (i as u64, slot.generation);
157 }
158 }
159 let gen = 0;
160 slots.push(Slot {
161 weak,
162 generation: gen,
163 });
164 ((slots.len() - 1) as u64, gen)
165 })
166}
167
168/// Look up an executor by slot id, validating the generation.
169fn lookup_executor(slot_id: u64, generation: u64) -> Option<Rc<RefCell<Executor>>> {
170 SLOTS.with(|slots| {
171 let slots = slots.borrow();
172 let slot = slots.get(slot_id as usize)?;
173 if slot.generation != generation {
174 return None;
175 }
176 slot.weak.upgrade()
177 })
178}
179
180struct TaskWaker {
181 task_id: TaskId,
182 priority: Priority,
183 slot_id: u64,
184 generation: u64,
185}
186
187impl Wake for TaskWaker {
188 fn wake(self: Arc<Self>) {
189 let Some(exec) = lookup_executor(self.slot_id, self.generation) else {
190 return;
191 };
192 let maybe_sched = if let Ok(mut ex) = exec.try_borrow_mut() {
193 match self.priority {
194 Priority::High => ex.high_queue.push_back(self.task_id),
195 Priority::Low => ex.low_queue.push_back(self.task_id),
196 }
197 if ex.in_flush {
198 None
199 } else {
200 ex.try_schedule_flush()
201 }
202 } else {
203 PENDING_WAKES.with(|pw| {
204 pw.borrow_mut()
205 .push((self.task_id, self.slot_id, self.generation));
206 });
207 None
208 };
209 if let Some(sched) = maybe_sched {
210 let sid = self.slot_id;
211 let gen = self.generation;
212 sched.schedule(Box::new(move || {
213 if let Some(ex) = lookup_executor(sid, gen) {
214 Executor::flush_instance(&ex);
215 }
216 }));
217 }
218 }
219}
220
221// ---------------------------------------------------------------------------
222// TaskState
223// ---------------------------------------------------------------------------
224
225struct TaskState {
226 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
227 priority: Priority,
228 scope_id: u64,
229 /// Key in [`Executor::timers`] for this task's pending sleep,
230 /// or 0 if the task is not waiting on a timer.
231 timer_deadline: u64,
232}
233
234// ---------------------------------------------------------------------------
235// Executor
236// ---------------------------------------------------------------------------
237
238/// Information about a task panic, passed to the user-registered
239/// [`set_panic_hook`].
240#[derive(Debug)]
241pub struct PanicInfo {
242 /// The executor-assigned task id.
243 pub task_id: u64,
244 /// The scope that owned the task (0 for global tasks).
245 pub scope_id: u64,
246 /// The boxed panic payload.
247 pub payload: Box<dyn std::any::Any + Send>,
248}
249
250/// A single-threaded async task executor with priority queues.
251///
252/// Each [`Executor`] manages its own task slots, ready queues, and
253/// deferred callback buffers. Use [`Executor::new_instance`] to create
254/// an isolated executor (e.g. per SSR request), or use the global
255/// thread-local executor via [`spawn_global`](crate::spawn_global).
256pub struct Executor {
257 high_queue: VecDeque<TaskId>,
258 low_queue: VecDeque<TaskId>,
259 tasks: Vec<Option<TaskState>>,
260 free_slots: Vec<TaskId>,
261 next_task_id: TaskId,
262 is_flush_scheduled: bool,
263 in_flush: bool,
264 deferred_ops: Vec<DeferredOp>,
265 /// Callbacks pushed by `Signal::set` via the schedule hook.
266 /// Drained at the start of every flush before polling tasks.
267 ///
268 /// Unbounded by design — in a single-threaded Wasm context, a tight
269 /// loop of signal sets will block the UI thread anyway, so adding a
270 /// capacity limit wouldn't improve the situation. SSR / multi-tenant
271 /// users should ensure that application code doesn't produce
272 /// unbounded signal churn within a single request.
273 deferred_callbacks: Vec<Box<dyn FnOnce()>>,
274 flush_scheduler: Option<Rc<dyn ScheduleFlush>>,
275 time_source: Option<Rc<dyn TimeSource>>,
276 /// Maximum milliseconds to spend inside a single flush before
277 /// yielding back to the host event loop. Default: 8 ms.
278 time_budget_ms: u64,
279 /// Optional cap on the number of deferred signal callbacks that can
280 /// accumulate between two flushes. Exceeding this limit triggers a
281 /// panic — useful as a safety net in SSR / multi-tenant deployments
282 /// where a runaway signal loop could OOM the process.
283 ///
284 /// Default: `None` (no limit).
285 max_deferred_callbacks: Option<usize>,
286 /// Optional hook invoked when a spawned task panics.
287 panic_hook: Option<Rc<dyn Fn(PanicInfo)>>,
288 /// Timer queue: map from deadline (ms) to task ids that should be
289 /// woken when that deadline expires. Processed at the start of
290 /// every flush.
291 timers: BTreeMap<u64, Vec<TaskId>>,
292 /// Slot index and generation in [`SLOTS`] for routing wakes back
293 /// to this executor. Set by [`new_instance`] or lazily for the
294 /// global executor.
295 slot_id: u64,
296 generation: u64,
297 /// Whether this executor has been registered in [`SLOTS`].
298 registered: bool,
299}
300
301// Set by the executor before polling a task, cleared afterward.
302// Lets futures discover their task id without threading it through
303// layers of combinators.
304thread_local! {
305 static CURRENT_POLLING_TASK: Cell<Option<TaskId>> = const { Cell::new(None) };
306}
307
308pub(crate) fn with_current_polling_task<R>(f: impl FnOnce(Option<TaskId>) -> R) -> R {
309 CURRENT_POLLING_TASK.with(|c| f(c.get()))
310}
311
312struct DeferredOp {
313 f: Box<dyn FnOnce()>,
314}
315
316impl Executor {
317 fn new() -> Self {
318 Self {
319 high_queue: VecDeque::new(),
320 low_queue: VecDeque::new(),
321 tasks: Vec::new(),
322 free_slots: Vec::new(),
323 next_task_id: 0,
324 is_flush_scheduled: false,
325 in_flush: false,
326 deferred_ops: Vec::new(),
327 deferred_callbacks: Vec::new(),
328 flush_scheduler: None,
329 time_source: None,
330 time_budget_ms: 8,
331 max_deferred_callbacks: None,
332 panic_hook: None,
333 timers: BTreeMap::new(),
334 slot_id: 0,
335 generation: 0,
336 registered: false,
337 }
338 }
339
340 fn allocate_id(&mut self) -> TaskId {
341 if let Some(id) = self.free_slots.pop() {
342 return id;
343 }
344 let id = self.next_task_id;
345 self.next_task_id += 1;
346 self.tasks.push(None);
347 id
348 }
349
350 /// Release a task slot back to the free list.
351 ///
352 /// **Caller must ensure** that `task_id` has not already been freed
353 /// (e.g. via [`cancel_task`] or [`cancel_scope_tasks_on`]). This
354 /// method unconditionally pushes to `free_slots` — pushing the same
355 /// id twice would cause [`allocate_id`] to hand it out twice.
356 fn free_slot(&mut self, task_id: TaskId) {
357 // Clean up any pending timer for this task so a recycled
358 // task ID is not spuriously woken by an old deadline.
359 // This works when the slot is still occupied (scope cancel
360 // path). For normal completion (Poll::Ready), the slot is
361 // already None and the caller must call cleanup_timer first.
362 if let Some(Some(ref t)) = self.tasks.get(task_id as usize) {
363 if t.timer_deadline != 0 {
364 self.cleanup_timer(task_id, t.timer_deadline);
365 }
366 }
367 self.tasks[task_id as usize] = None;
368 self.free_slots.push(task_id);
369 }
370
371 /// Remove a timer entry for `task_id` from the timer map.
372 fn cleanup_timer(&mut self, task_id: TaskId, deadline: u64) {
373 if let Some(tids) = self.timers.get_mut(&deadline) {
374 tids.retain(|id| *id != task_id);
375 if tids.is_empty() {
376 self.timers.remove(&deadline);
377 }
378 }
379 }
380
381 fn enqueue(&mut self, task_id: TaskId) {
382 let priority = match self.tasks.get(task_id as usize).and_then(Option::as_ref) {
383 Some(t) => t.priority,
384 None => return,
385 };
386 match priority {
387 Priority::High => self.high_queue.push_back(task_id),
388 Priority::Low => self.low_queue.push_back(task_id),
389 }
390 }
391
392 fn dequeue(&mut self) -> Option<TaskId> {
393 self.high_queue
394 .pop_front()
395 .or_else(|| self.low_queue.pop_front())
396 }
397
398 /// Mark that a flush is needed and return the scheduler if one is
399 /// registered. The caller **must** invoke the scheduler **after**
400 /// releasing the executor borrow.
401 fn try_schedule_flush(&mut self) -> Option<Rc<dyn ScheduleFlush>> {
402 if self.is_flush_scheduled {
403 return None;
404 }
405 self.is_flush_scheduled = true;
406 self.flush_scheduler.clone()
407 }
408
409 /// Return the current time in ms, or 0 if no [`TimeSource`] is
410 /// registered. When this returns 0 the time-budget check is
411 /// effectively a no-op.
412 pub(crate) fn now_ms(&self) -> u64 {
413 self.time_source.as_ref().map_or(0, |ts| ts.now_ms())
414 }
415
416 /// Return the number of currently active (not-yet-completed) tasks.
417 ///
418 /// Used by streaming SSR to determine whether the stream should
419 /// wait for more work or terminate.
420 #[must_use]
421 pub fn active_task_count(&self) -> usize {
422 self.tasks.iter().filter(|t| t.is_some()).count()
423 }
424}
425
426// ---------------------------------------------------------------------------
427// Thread-local globals (default storage)
428// ---------------------------------------------------------------------------
429
430thread_local! {
431 static EXECUTOR: Rc<RefCell<Executor>> = Rc::new(RefCell::new(Executor::new()));
432 static PENDING_WAKES: RefCell<Vec<(TaskId, u64, u64)>> =
433 const { RefCell::new(Vec::new()) };
434}
435
436/// Ensure the global executor is registered in slot 0 (lazy, idempotent).
437/// Returns (`slot_id`, `generation`) for the global executor.
438fn ensure_global_registered() -> (u64, u64) {
439 SLOTS.with(|slots| {
440 let mut slots = slots.borrow_mut();
441 if slots.is_empty() {
442 let weak = EXECUTOR.with(Rc::downgrade);
443 slots.push(Slot {
444 weak,
445 generation: 0,
446 });
447 } else {
448 // Verify slot 0 still holds the global executor.
449 let global = EXECUTOR.with(Rc::clone);
450 let is_global = slots[0]
451 .weak
452 .upgrade()
453 .is_some_and(|ex| Rc::ptr_eq(&ex, &global));
454 if !is_global {
455 slots[0] = Slot {
456 weak: Rc::downgrade(&global),
457 generation: slots[0].generation.wrapping_add(1),
458 };
459 }
460 }
461 // Mark the global executor as registered so flush_instance
462 // doesn't call this function again on every flush.
463 EXECUTOR.with(|ex| {
464 let mut e = ex.borrow_mut();
465 e.slot_id = 0;
466 e.generation = slots[0].generation;
467 e.registered = true;
468 });
469 let gen = slots[0].generation;
470 (0, gen)
471 })
472}
473
474// ---------------------------------------------------------------------------
475// Executor instance methods (for isolated executors, e.g. SSR)
476// ---------------------------------------------------------------------------
477
478impl Executor {
479 /// Create a new isolated executor, wrapped for shared access.
480 ///
481 /// The returned executor is independent of the global thread-local
482 /// executor. Use [`with_executor`] to make it the current executor
483 /// for the duration of a closure, so that spawned tasks and signal
484 /// callbacks are routed to it.
485 #[must_use]
486 pub fn new_instance() -> Rc<RefCell<Executor>> {
487 let ex = Rc::new(RefCell::new(Executor::new()));
488 // Register in the slot table so TaskWaker can find this executor.
489 let (slot_id, generation) = register_executor(Rc::downgrade(&ex));
490 {
491 let mut e = ex.borrow_mut();
492 e.slot_id = slot_id;
493 e.generation = generation;
494 e.registered = true;
495 }
496 ex
497 }
498
499 /// Install a flush scheduler on this executor instance.
500 pub fn install_flush_scheduler(ex: &Rc<RefCell<Executor>>, sched: Rc<dyn ScheduleFlush>) {
501 ex.borrow_mut().flush_scheduler = Some(sched);
502 }
503
504 /// Install a time source on this executor instance.
505 pub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>) {
506 ex.borrow_mut().time_source = Some(ts);
507 }
508
509 /// Set the maximum time (in milliseconds) a single flush may spend
510 /// before yielding back to the host event loop.
511 ///
512 /// The default is 8 ms (~120 fps frame budget, leaving time for the
513 /// browser to render between flushes). Set to `u64::MAX` to disable
514 /// time-budget yielding (flush runs to completion).
515 ///
516 /// # Semantics
517 ///
518 /// The budget is checked **between** task polls — the currently
519 /// executing task is never interrupted. When the budget is exhausted
520 /// the executor sets `in_flush = false` and schedules a follow-up
521 /// flush so the remaining ready tasks will be polled on the next
522 /// microtask tick. This is cooperative (`.await`-bound) yielding,
523 /// not preemptive.
524 ///
525 /// This affects **this executor only**. For the global thread-local
526 /// executor use [`set_global_time_budget`].
527 pub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64) {
528 ex.borrow_mut().time_budget_ms = budget_ms;
529 }
530
531 /// Set a safety cap on the deferred signal callback queue.
532 ///
533 /// When set to `Some(n)`, the executor will panic if more than `n`
534 /// deferred callbacks accumulate between two flush cycles. This is a
535 /// safety net for SSR / multi-tenant servers where a runaway signal
536 /// loop could exhaust memory — in a single-threaded Wasm context,
537 /// unbounded accumulation is acceptable because it blocks the UI
538 /// thread anyway.
539 ///
540 /// Default: `None` (no limit).
541 pub fn set_max_deferred_callbacks(ex: &Rc<RefCell<Executor>>, limit: Option<usize>) {
542 ex.borrow_mut().max_deferred_callbacks = limit;
543 }
544
545 /// Register a callback invoked whenever a spawned task panics.
546 ///
547 /// The default is no hook — panicking tasks are silently removed
548 /// from the executor (the same behaviour as a task returning
549 /// `Poll::Ready(())`).
550 ///
551 /// # Example
552 ///
553 /// ```rust,ignore
554 /// Executor::set_panic_hook(&ex, Rc::new(|info| {
555 /// eprintln!("task {} in scope {} panicked", info.task_id, info.scope_id);
556 /// }));
557 /// ```
558 pub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>) {
559 ex.borrow_mut().panic_hook = Some(hook);
560 }
561
562 /// Register a timer: when `now_ms() >= deadline_ms`, enqueue
563 /// `task_id` so it gets polled on the next flush.
564 pub(crate) fn schedule_timer(ex: &Rc<RefCell<Executor>>, deadline_ms: u64, task_id: TaskId) {
565 let mut e = ex.borrow_mut();
566 // If this task already has a pending timer (e.g. previous SleepFuture
567 // was dropped via select!), clean up the old entry so the timer map
568 // doesn't accumulate stale deadlines.
569 let old_deadline = e
570 .tasks
571 .get(task_id as usize)
572 .and_then(Option::as_ref)
573 .map_or(0, |t| t.timer_deadline);
574 if old_deadline != 0 {
575 e.cleanup_timer(task_id, old_deadline);
576 }
577 e.timers.entry(deadline_ms).or_default().push(task_id);
578 // Set the reverse index so cancel_scope_tasks can find this entry.
579 if let Some(Some(ref mut t)) = e.tasks.get_mut(task_id as usize) {
580 t.timer_deadline = deadline_ms;
581 }
582 // Request a flush so the timer is checked.
583 e.is_flush_scheduled = false;
584 let maybe_sched = e.try_schedule_flush();
585 drop(e);
586 if let Some(sched) = maybe_sched {
587 let ex2 = Rc::clone(ex);
588 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
589 }
590 }
591
592 /// Spawn a future on this executor instance.
593 pub fn spawn(ex: &Rc<RefCell<Executor>>, future: impl Future<Output = ()> + 'static) {
594 let maybe_sched = {
595 let mut e = ex.borrow_mut();
596 let tid = e.allocate_id();
597 e.tasks[tid as usize] = Some(TaskState {
598 future: Box::pin(future),
599 priority: Priority::Low,
600 scope_id: 0,
601 timer_deadline: 0,
602 });
603 e.enqueue(tid);
604 e.try_schedule_flush()
605 };
606 if let Some(sched) = maybe_sched {
607 let ex2 = Rc::clone(ex);
608 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
609 }
610 }
611
612 /// Run a full flush cycle on this executor instance.
613 ///
614 /// Mirrors the global flush cycle but operates on an
615 /// isolated executor (used for SSR). Includes all the same
616 /// protections: `catch_unwind`, suspend checks, time-budget
617 /// yielding, and callback-drain budget.
618 #[allow(clippy::too_many_lines)]
619 pub fn flush_instance(ex: &Rc<RefCell<Executor>>) {
620 // Guard against re-entrant flushes.
621 {
622 let mut e = ex.borrow_mut();
623 if e.in_flush {
624 #[cfg(debug_assertions)]
625 {
626 eprintln!(
627 "[auralis-task] WARNING: Executor::flush_instance called \
628 re-entrantly (already inside a flush). This is a no-op. \
629 Check for nested flush() calls in signal callbacks or \
630 ScheduleFlush implementations."
631 );
632 }
633 return;
634 }
635 e.in_flush = true;
636 }
637
638 // Set this executor as the current one so that TaskWaker
639 // (which cannot hold an Rc) can discover it via thread-local.
640 // Restore on scope exit (including early returns for time-budget
641 // yielding and re-entrancy).
642 let prev_executor = CURRENT_EXECUTOR.with(|c| c.borrow_mut().replace(Rc::clone(ex)));
643 let _restore = RestoreExecutor(prev_executor);
644
645 // Step 0: drain expired timers.
646 {
647 let mut e = ex.borrow_mut();
648 let now = e.now_ms();
649 // When no TimeSource is registered (now == 0), expire all
650 // timers — they've already been woken via wake_by_ref and
651 // just need to be re-polled.
652 if now == 0 {
653 for (_, tasks) in std::mem::take(&mut e.timers) {
654 for tid in tasks {
655 // Clear the reverse index since the timer has fired.
656 if let Some(Some(ref mut t)) = e.tasks.get_mut(tid as usize) {
657 t.timer_deadline = 0;
658 }
659 e.enqueue(tid);
660 }
661 }
662 } else {
663 let expired: Vec<u64> =
664 e.timers.keys().copied().take_while(|&d| d <= now).collect();
665 for deadline in expired {
666 if let Some(tasks) = e.timers.remove(&deadline) {
667 for tid in tasks {
668 if let Some(Some(ref mut t)) = e.tasks.get_mut(tid as usize) {
669 t.timer_deadline = 0;
670 }
671 e.enqueue(tid);
672 }
673 }
674 }
675 }
676 }
677
678 // Step 1: deferred ops.
679 let deferred = std::mem::take(&mut ex.borrow_mut().deferred_ops);
680 for op in deferred {
681 (op.f)();
682 }
683
684 // Step 2: drain deferred signal callbacks with time budget.
685 {
686 let cb_start = ex.borrow().now_ms();
687 loop {
688 let callbacks = std::mem::take(&mut ex.borrow_mut().deferred_callbacks);
689 if callbacks.is_empty() {
690 break;
691 }
692 for cb in callbacks {
693 // Isolate each callback so a panic in one subscriber
694 // doesn't block the remaining notifications or wedge
695 // the executor (in_flush stays true on unwind).
696 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(cb));
697 }
698 if ex.borrow().now_ms().saturating_sub(cb_start) >= ex.borrow().time_budget_ms {
699 if !ex.borrow().deferred_callbacks.is_empty() {
700 let (sched, ex2) = {
701 let mut e = ex.borrow_mut();
702 e.in_flush = false;
703 e.is_flush_scheduled = false;
704 (e.try_schedule_flush(), Rc::clone(ex))
705 };
706 if let Some(sched) = sched {
707 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
708 }
709 return;
710 }
711 break;
712 }
713 }
714 }
715
716 // Step 3: main poll loop with time-budget check.
717 let poll_start = ex.borrow().now_ms();
718 loop {
719 let task_id = ex.borrow_mut().dequeue();
720 let Some(tid) = task_id else {
721 let mut e = ex.borrow_mut();
722 e.is_flush_scheduled = false;
723 e.in_flush = false;
724 break;
725 };
726
727 // Take the task out so the poll doesn't hold an executor borrow.
728 let maybe_state = ex.borrow_mut().tasks[tid as usize].take();
729 if let Some(mut state) = maybe_state {
730 let priority = state.priority;
731 let scope_id = state.scope_id;
732
733 // Check if the owning scope is suspended.
734 let scope = crate::scope::find_scope(scope_id);
735 if let Some(ref s) = scope {
736 if s.is_suspended() {
737 let mut e = ex.borrow_mut();
738 if e.tasks[tid as usize].is_none() {
739 e.tasks[tid as usize] = Some(state);
740 }
741 continue;
742 }
743 }
744
745 // Ensure the executor is registered in the slot table.
746 // Must not call ensure_global_registered while holding
747 // a borrow on ex (it borrows the global EXECUTOR).
748 let (slot_id, gen) = {
749 let e = ex.borrow();
750 if e.registered {
751 (e.slot_id, e.generation)
752 } else {
753 drop(e);
754 ensure_global_registered()
755 }
756 };
757 let waker = Waker::from(Arc::new(TaskWaker {
758 task_id: tid,
759 priority,
760 slot_id,
761 generation: gen,
762 }));
763 let mut cx = Context::from_waker(&waker);
764
765 // Inject owning scope.
766 let prev_scope = crate::scope::get_scope_direct();
767 if scope.is_some() {
768 crate::scope::set_scope_direct(scope);
769 }
770
771 // Let futures discover their task id (used by timer::sleep).
772 // Save and restore so that a nested flush (sync scheduler)
773 // doesn't leave the outer task without its id afterward.
774 let prev_polling = CURRENT_POLLING_TASK.with(|c| c.replace(Some(tid)));
775
776 // Task isolation — prevents a panicking task from
777 // unwinding through flush and leaving in_flush set.
778 let result: Result<Poll<()>, Box<dyn std::any::Any + Send>> =
779 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
780 state.future.as_mut().poll(&mut cx)
781 }));
782
783 CURRENT_POLLING_TASK.with(|c| c.set(prev_polling));
784 crate::scope::set_scope_direct(prev_scope);
785
786 // Extract timer_deadline before state is dropped, so
787 // we can clean up the timer entry (free_slot can't
788 // read it because the slot is already None).
789 let timer_dl = state.timer_deadline;
790
791 match result {
792 Ok(Poll::Ready(())) => {
793 if timer_dl != 0 {
794 ex.borrow_mut().cleanup_timer(tid, timer_dl);
795 }
796 ex.borrow_mut().free_slot(tid);
797 }
798 Err(payload) => {
799 if timer_dl != 0 {
800 ex.borrow_mut().cleanup_timer(tid, timer_dl);
801 }
802 // Notify the panic hook (if any) before freeing the slot.
803 let hook = ex.borrow().panic_hook.clone();
804 if let Some(h) = hook {
805 h(PanicInfo {
806 task_id: tid,
807 scope_id,
808 payload,
809 });
810 }
811 ex.borrow_mut().free_slot(tid);
812 }
813 Ok(Poll::Pending) => {
814 let mut e = ex.borrow_mut();
815 if e.tasks[tid as usize].is_none() {
816 e.tasks[tid as usize] = Some(state);
817 }
818 }
819 }
820 }
821
822 // Time budget check.
823 {
824 let elapsed = ex.borrow().now_ms().saturating_sub(poll_start);
825 if elapsed >= ex.borrow().time_budget_ms {
826 let (maybe_sched, ex_clone) = {
827 let mut e = ex.borrow_mut();
828 e.is_flush_scheduled = false;
829 e.in_flush = false;
830 let sched = if !e.high_queue.is_empty() || !e.low_queue.is_empty() {
831 e.try_schedule_flush()
832 } else {
833 None
834 };
835 (sched, Rc::clone(ex))
836 };
837 if let Some(sched) = maybe_sched {
838 sched.schedule(Box::new(move || Self::flush_instance(&ex_clone)));
839 }
840 break;
841 }
842 }
843 }
844
845 // Drain any wakes that were buffered while the executor RefCell
846 // was borrowed (PENDING_WAKES fallback in TaskWaker::wake).
847 drain_pending_wakes();
848 }
849}
850
851// ---------------------------------------------------------------------------
852// Current-executor storage — injectable, defaults to thread-local
853// ---------------------------------------------------------------------------
854
855pub(crate) type ExecutorRef = Rc<RefCell<Executor>>;
856
857/// RAII guard that restores the previous executor when dropped.
858struct RestoreExecutor(Option<ExecutorRef>);
859
860impl Drop for RestoreExecutor {
861 fn drop(&mut self) {
862 CURRENT_EXECUTOR.with(|c| {
863 *c.borrow_mut() = self.0.take();
864 });
865 }
866}
867
868thread_local! {
869 static CURRENT_EXECUTOR: RefCell<Option<ExecutorRef>> = const { RefCell::new(None) };
870}
871
872/// Run `f` with `ex` set as the current executor.
873///
874/// Signal callbacks and `spawn_global` calls inside `f` will be routed
875/// to `ex` instead of the global thread-local executor. Restores the
876/// previous executor afterward.
877///
878/// # Signal routing constraints
879///
880/// Auralis uses a **single global schedule hook** (installed once by the
881/// first call to [`init_flush_scheduler`]) that decides where signal
882/// notifications land by checking the current executor **at the time the
883/// notification fires**, not at the time `Signal::set` is called.
884///
885/// This design implies two hard requirements for multi-instance users:
886///
887/// 1. **`init_flush_scheduler` must be called at least once** — without
888/// it, `Signal::set` falls back to synchronous callback execution,
889/// which breaks the deferred-notification model and can cause
890/// re-entrant borrow panics.
891/// 2. **The instance executor must still be "current" when the flush
892/// runs** — if `with_executor` has already exited, deferred callbacks
893/// from signals set inside `f` will be routed to the global executor
894/// (or synchronously if no global hook is installed).
895///
896/// For the typical single-threaded case (Wasm, game loop, CLI), both
897/// requirements are satisfied trivially: call `init_flush_scheduler`
898/// once at startup and never use `with_executor`. For SSR / multi-tenant
899/// servers, ensure that `with_executor` wraps the entire request
900/// lifecycle — from signal creation through the final flush.
901///
902/// # Example
903///
904/// ```rust,ignore
905/// use auralis_task::Executor;
906///
907/// let ex = Executor::new_instance();
908/// Executor::install_flush_scheduler(&ex, my_scheduler);
909/// auralis_task::with_executor(&ex, || {
910/// // Signal notifications and task spawns here go to `ex`.
911/// });
912/// ```
913pub fn with_executor<R>(ex: &ExecutorRef, f: impl FnOnce() -> R) -> R {
914 CURRENT_EXECUTOR.with(|exec| {
915 let prev = exec.borrow_mut().replace(Rc::clone(ex));
916 let result = f();
917 *exec.borrow_mut() = prev;
918 result
919 })
920}
921
922/// Return the current executor, if any.
923///
924/// If no executor has been set via [`with_executor`], returns `None` —
925/// callers should fall back to the global thread-local executor.
926fn current_executor() -> Option<ExecutorRef> {
927 CURRENT_EXECUTOR.with(|exec| exec.borrow().clone())
928}
929
930/// Return the currently active executor instance.
931///
932/// If [`with_executor`] was used to set an instance executor, returns
933/// that; otherwise returns the global thread-local executor.
934pub(crate) fn current_executor_instance() -> ExecutorRef {
935 current_executor().unwrap_or_else(|| EXECUTOR.with(Rc::clone))
936}
937
938/// Return the current time in milliseconds from the active executor's
939/// [`TimeSource`], or 0 if none is installed.
940pub(crate) fn current_time_ms() -> u64 {
941 current_executor_instance().borrow().now_ms()
942}
943
944// ---------------------------------------------------------------------------
945// Helpers — use thread_local EXECUTOR
946// ---------------------------------------------------------------------------
947
948/// Drain wakes that were buffered into [`PENDING_WAKES`] because the
949/// executor's `RefCell` was borrowed at the time [`TaskWaker::wake`]
950/// fired. Called at the end of every [`Executor::flush_instance`].
951fn drain_pending_wakes() {
952 PENDING_WAKES.with(|pw| {
953 let wakes = std::mem::take(&mut *pw.borrow_mut());
954 for (tid, slot_id, gen) in wakes {
955 let Some(exec) = lookup_executor(slot_id, gen) else {
956 continue;
957 };
958 // Use enqueue() for the stale-task-id safety check.
959 exec.borrow_mut().enqueue(tid);
960 let maybe_sched = exec.borrow_mut().try_schedule_flush();
961 if let Some(sched) = maybe_sched {
962 let sid = slot_id;
963 let g = gen;
964 sched.schedule(Box::new(move || {
965 if let Some(ex) = lookup_executor(sid, g) {
966 Executor::flush_instance(&ex);
967 }
968 }));
969 }
970 }
971 });
972}
973
974// ---------------------------------------------------------------------------
975// Flush
976// ---------------------------------------------------------------------------
977
978fn flush() {
979 EXECUTOR.with(Executor::flush_instance);
980}
981
982// ---------------------------------------------------------------------------
983// Public API
984// ---------------------------------------------------------------------------
985
986/// Check the deferred callback limit before pushing.
987fn check_callback_limit(ex: &Executor) {
988 if let Some(limit) = ex.max_deferred_callbacks {
989 assert!(
990 ex.deferred_callbacks.len() < limit,
991 "deferred callback limit exceeded ({limit}). \
992 This usually indicates an unbounded signal-set loop. \
993 Increase the limit via set_max_deferred_callbacks() \
994 or disable it with None."
995 );
996 }
997}
998
999/// Set the platform flush scheduler and install the signal deferred-
1000/// callback hook.
1001///
1002/// Idempotent — subsequent calls are no-ops (the hook is installed via
1003/// [`std::sync::OnceLock`], so it fires exactly once per process).
1004///
1005/// # Threading constraint
1006///
1007/// The hook is **per-process** and routes signal notifications to the
1008/// executor that is "current" when the notification fires (see
1009/// [`with_executor`]). For single-threaded use (Wasm, CLI) this is
1010/// transparent. For multi-threaded SSR, enable the `ssr-tokio` feature
1011/// and call [`init_scope_store_tokio`](crate::init_scope_store_tokio).
1012/// See [`with_executor`] for the full routing contract.
1013pub fn init_flush_scheduler(sched: Rc<dyn ScheduleFlush>) {
1014 EXECUTOR.with(|exec| exec.borrow_mut().flush_scheduler = Some(sched));
1015 install_signal_hook_once();
1016}
1017
1018/// Install the hook that bridges `auralis_signal::Signal::set` to the
1019/// executor's deferred-callback queue.
1020///
1021/// Idempotent — safe to call multiple times.
1022fn install_signal_hook_once() {
1023 use std::sync::OnceLock;
1024 static INSTALLED: OnceLock<()> = OnceLock::new();
1025 INSTALLED.get_or_init(|| {
1026 auralis_signal::install_schedule_hook(Box::new(|cb: Box<dyn FnOnce()>| {
1027 // Prefer the current executor (set via `with_executor`) for
1028 // SSR multi-request isolation; fall back to the global one.
1029 if let Some(ex) = current_executor() {
1030 let maybe_sched = {
1031 let mut e = ex.borrow_mut();
1032 check_callback_limit(&e);
1033 e.deferred_callbacks.push(cb);
1034 if e.in_flush {
1035 None
1036 } else {
1037 e.try_schedule_flush()
1038 }
1039 };
1040 if let Some(sched) = maybe_sched {
1041 let ex2 = Rc::clone(&ex);
1042 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1043 }
1044 } else {
1045 EXECUTOR.with(|exec| {
1046 let maybe_sched = {
1047 let mut ex = exec.borrow_mut();
1048 check_callback_limit(&ex);
1049 ex.deferred_callbacks.push(cb);
1050 if ex.in_flush {
1051 None
1052 } else {
1053 ex.try_schedule_flush()
1054 }
1055 };
1056 if let Some(sched) = maybe_sched {
1057 sched.schedule(Box::new(flush));
1058 }
1059 });
1060 }
1061 }));
1062 });
1063}
1064
1065/// Set the platform time source used for time-budget accounting.
1066///
1067/// If no [`TimeSource`] is registered the executor runs every flush to
1068/// completion without yielding, which is acceptable for short-running
1069/// workloads but may cause frame drops in the browser.
1070pub fn init_time_source(ts: Rc<dyn TimeSource>) {
1071 EXECUTOR.with(|exec| exec.borrow_mut().time_source = Some(ts));
1072}
1073
1074/// Set the per-flush time budget on the **global** thread-local executor.
1075///
1076/// This does **not** affect instance executors created via
1077/// [`Executor::new_instance`] — those carry their own budget (default
1078/// 8 ms) and must be configured via [`Executor::set_time_budget`].
1079///
1080/// See [`Executor::set_time_budget`] for the full semantics.
1081pub fn set_global_time_budget(budget_ms: u64) {
1082 EXECUTOR.with(|exec| exec.borrow_mut().time_budget_ms = budget_ms);
1083}
1084
1085/// Set the deferred callback safety cap on the global executor.
1086///
1087/// See [`Executor::set_max_deferred_callbacks`] for details.
1088pub fn set_global_max_deferred_callbacks(limit: Option<usize>) {
1089 EXECUTOR.with(|exec| exec.borrow_mut().max_deferred_callbacks = limit);
1090}
1091
1092/// Register a global panic hook called when any globally-spawned
1093/// task panics.
1094///
1095/// See [`Executor::set_panic_hook`] for details.
1096pub fn set_panic_hook(hook: Rc<dyn Fn(PanicInfo)>) {
1097 EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = Some(hook));
1098}
1099
1100/// Remove the global panic hook, restoring the default silent
1101/// behaviour.
1102pub fn remove_panic_hook() {
1103 EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = None);
1104}
1105
1106/// Spawn a future on the global executor at low priority.
1107///
1108/// **Important:** [`init_flush_scheduler`] must be called before spawning
1109/// any tasks. Without a flush scheduler, spawned tasks will sit in the
1110/// queue indefinitely because the executor has no way to schedule a flush
1111/// cycle.
1112pub fn spawn_global(future: impl Future<Output = ()> + 'static) {
1113 spawn_global_with_priority(Priority::Low, future);
1114}
1115
1116/// Spawn a future on the global executor at the given priority.
1117pub fn spawn_global_with_priority(priority: Priority, future: impl Future<Output = ()> + 'static) {
1118 spawn_inner_on(&EXECUTOR.with(Rc::clone), Box::pin(future), priority, 0);
1119}
1120
1121/// Spawn a future on a specific executor and scope.
1122pub(crate) fn spawn_scoped_on(
1123 ex: &Rc<RefCell<Executor>>,
1124 priority: Priority,
1125 scope_id: u64,
1126 future: impl Future<Output = ()> + 'static,
1127) -> TaskId {
1128 spawn_inner_on(ex, Box::pin(future), priority, scope_id)
1129}
1130
1131fn spawn_inner_on(
1132 ex: &Rc<RefCell<Executor>>,
1133 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
1134 priority: Priority,
1135 scope_id: u64,
1136) -> TaskId {
1137 let (task_id, maybe_sched) = {
1138 let mut e = ex.borrow_mut();
1139 let task_id = e.allocate_id();
1140 e.tasks[task_id as usize] = Some(TaskState {
1141 future,
1142 priority,
1143 scope_id,
1144 timer_deadline: 0,
1145 });
1146 e.enqueue(task_id);
1147 let sched = e.try_schedule_flush();
1148 (task_id, sched)
1149 };
1150 if let Some(sched) = maybe_sched {
1151 let ex2 = Rc::clone(ex);
1152 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1153 }
1154 task_id
1155}
1156
1157/// Enqueue all tasks belonging to `scope_id` on a given executor.
1158///
1159/// Used by [`TaskScope::resume`] to restart tasks after a suspend.
1160pub(crate) fn enqueue_scope_tasks_on(ex: &ExecutorRef, task_ids: &[TaskId]) {
1161 if task_ids.is_empty() {
1162 return;
1163 }
1164 let maybe_sched = {
1165 let mut e = ex.borrow_mut();
1166 for tid in task_ids {
1167 e.enqueue(*tid);
1168 }
1169 if e.in_flush {
1170 None
1171 } else {
1172 e.try_schedule_flush()
1173 }
1174 };
1175 if let Some(sched) = maybe_sched {
1176 let ex2 = Rc::clone(ex);
1177 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1178 }
1179}
1180/// Cancel all tasks belonging to `scope_id` on a specific executor.
1181pub(crate) fn cancel_scope_tasks_on(
1182 ex: &Rc<RefCell<Executor>>,
1183 task_ids: &[TaskId],
1184) -> Vec<Pin<Box<dyn Future<Output = ()>>>> {
1185 if task_ids.is_empty() {
1186 return Vec::new();
1187 }
1188
1189 let mut e = ex.borrow_mut();
1190 let mut dropped = Vec::with_capacity(task_ids.len());
1191
1192 // Collect timer deadlines before mutating.
1193 let mut timer_deadlines: Vec<(u64, TaskId)> = Vec::new();
1194 for &tid in task_ids {
1195 let idx = tid as usize;
1196 if idx < e.tasks.len() {
1197 if let Some(ref t) = e.tasks[idx] {
1198 if t.timer_deadline != 0 {
1199 timer_deadlines.push((t.timer_deadline, tid));
1200 }
1201 }
1202 }
1203 }
1204 for (dl, tid) in &timer_deadlines {
1205 e.cleanup_timer(*tid, *dl);
1206 }
1207
1208 // Cancel each task by id (direct lookup, no full-table scan).
1209 // Only push to free_slots for slots we actually took.
1210 for &tid in task_ids {
1211 let idx = tid as usize;
1212 if idx < e.tasks.len() {
1213 if let Some(state) = e.tasks[idx].take() {
1214 dropped.push(state.future);
1215 e.free_slots.push(tid);
1216 }
1217 }
1218 }
1219 e.free_slots.sort_unstable();
1220 e.free_slots.dedup();
1221
1222 // Filter queues to remove cancelled tasks.
1223 let high: Vec<TaskId> = e
1224 .high_queue
1225 .iter()
1226 .copied()
1227 .filter(|&id| {
1228 let idx = id as usize;
1229 idx < e.tasks.len() && e.tasks[idx].is_some()
1230 })
1231 .collect();
1232 e.high_queue.clear();
1233 e.high_queue.extend(high);
1234
1235 let low: Vec<TaskId> = e
1236 .low_queue
1237 .iter()
1238 .copied()
1239 .filter(|&id| {
1240 let idx = id as usize;
1241 idx < e.tasks.len() && e.tasks[idx].is_some()
1242 })
1243 .collect();
1244 e.low_queue.clear();
1245 e.low_queue.extend(low);
1246
1247 dropped
1248}
1249
1250/// Cancel a single task by its id, dropping its future and cleaning up
1251/// its timer if any. No-op if the task has already completed.
1252pub(crate) fn cancel_task(ex: &Rc<RefCell<Executor>>, task_id: TaskId) {
1253 let mut e = ex.borrow_mut();
1254 let idx = task_id as usize;
1255 if idx >= e.tasks.len() {
1256 return;
1257 }
1258 let deadline = e.tasks[idx].as_ref().map_or(0, |t| t.timer_deadline);
1259 if deadline != 0 {
1260 e.cleanup_timer(task_id, deadline);
1261 }
1262 let slot = e.tasks[idx].take();
1263 if slot.is_some() {
1264 e.free_slots.push(task_id);
1265 e.high_queue.retain(|&id| id != task_id);
1266 e.low_queue.retain(|&id| id != task_id);
1267 }
1268}
1269
1270/// Check whether a task slot is empty (task completed or was cancelled).
1271pub(crate) fn is_task_finished(ex: &Rc<RefCell<Executor>>, task_id: TaskId) -> bool {
1272 let e = ex.borrow();
1273 let idx = task_id as usize;
1274 idx >= e.tasks.len() || e.tasks[idx].is_none()
1275}
1276
1277// ---------------------------------------------------------------------------
1278// yield_now
1279// ---------------------------------------------------------------------------
1280
1281/// Return a [`Future`] that yields control back to the executor once.
1282#[must_use = "yield_now() does nothing unless awaited"]
1283pub fn yield_now() -> YieldNow {
1284 YieldNow { yielded: false }
1285}
1286
1287/// Future returned by [`yield_now`].
1288#[derive(Debug)]
1289#[must_use = "futures do nothing unless polled"]
1290pub struct YieldNow {
1291 yielded: bool,
1292}
1293
1294impl Future for YieldNow {
1295 type Output = ();
1296
1297 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1298 if self.yielded {
1299 Poll::Ready(())
1300 } else {
1301 self.yielded = true;
1302 cx.waker().wake_by_ref();
1303 Poll::Pending
1304 }
1305 }
1306}
1307
1308// ---------------------------------------------------------------------------
1309// schedule_callback — hook for auralis-signal's deferred callback model
1310// ---------------------------------------------------------------------------
1311
1312/// Schedule a closure to run at the start of the next executor flush.
1313///
1314/// Used internally by `auralis_signal` to defer subscriber callback
1315/// execution. The closure is drained before the main poll loop.
1316///
1317/// Routes to the current executor (via [`with_executor`]) when one is
1318/// active; falls back to the global thread-local executor.
1319pub fn schedule_callback(f: Box<dyn FnOnce()>) {
1320 let exec = current_executor_instance();
1321 let maybe_sched = {
1322 let mut ex = exec.borrow_mut();
1323 check_callback_limit(&ex);
1324 ex.deferred_callbacks.push(f);
1325 if ex.in_flush {
1326 None
1327 } else {
1328 ex.try_schedule_flush()
1329 }
1330 };
1331 if let Some(sched) = maybe_sched {
1332 let ex2 = Rc::clone(&exec);
1333 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1334 }
1335}
1336
1337// ---------------------------------------------------------------------------
1338// set_deferred
1339// ---------------------------------------------------------------------------
1340
1341/// Schedule a [`Signal::set`] call for the **next** executor flush.
1342///
1343/// Safe to call from inside [`Drop`] — the actual `signal.set(value)` is
1344/// deferred to a subsequent flush, avoiding re-entrant borrow panics.
1345///
1346/// Routes to the current executor (via [`with_executor`]) when one is
1347/// active; falls back to the global thread-local executor.
1348pub fn set_deferred<T: 'static>(signal: &Signal<T>, value: T) {
1349 let signal = signal.clone();
1350 let exec = current_executor_instance();
1351 let maybe_sched = {
1352 let mut ex = exec.borrow_mut();
1353 ex.deferred_ops.push(DeferredOp {
1354 f: Box::new(move || signal.set(value)),
1355 });
1356 ex.try_schedule_flush()
1357 };
1358 if let Some(sched) = maybe_sched {
1359 let ex2 = Rc::clone(&exec);
1360 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1361 }
1362}
1363
1364// ---------------------------------------------------------------------------
1365// Test / debug helpers
1366// ---------------------------------------------------------------------------
1367
1368/// Completely reset the global executor to a pristine state.
1369///
1370/// Clears all task slots, queues, deferred ops, flush/scheduler flags,
1371/// and injected [`ScheduleFlush`]/[`TimeSource`]. Call at the start
1372/// of every test to prevent cross-test state leakage.
1373///
1374/// Note that the signal schedule hook (installed by
1375/// [`init_flush_scheduler`] via [`std::sync::OnceLock`]) **persists**
1376/// across resets — the hook references the global [`EXECUTOR`]
1377/// thread-local, and this function re-initialises that same executor
1378/// in place rather than replacing it. This is correct behaviour:
1379/// after reset, signal notifications route to the freshly-cleared
1380/// global executor.
1381///
1382/// # Safety / usage
1383///
1384/// This function is intended **only** for testing. Calling it while
1385/// the executor is processing tasks will silently drop all live
1386/// futures and may cause panics or undefined behavior in running
1387/// application code.
1388pub fn reset_executor_for_test() {
1389 PENDING_WAKES.with(|pw| pw.borrow_mut().clear());
1390 SLOTS.with(|s| s.borrow_mut().clear());
1391 CURRENT_EXECUTOR.with(|c| *c.borrow_mut() = None);
1392 EXECUTOR.with(|exec| {
1393 let mut ex = exec.borrow_mut();
1394 ex.high_queue.clear();
1395 ex.low_queue.clear();
1396 ex.tasks.clear();
1397 ex.free_slots.clear();
1398 ex.next_task_id = 0;
1399 ex.is_flush_scheduled = false;
1400 ex.in_flush = false;
1401 ex.deferred_ops.clear();
1402 ex.deferred_callbacks.clear();
1403 ex.flush_scheduler = None;
1404 ex.time_source = None;
1405 ex.slot_id = 0;
1406 ex.generation = 0;
1407 ex.registered = false;
1408 });
1409 crate::scope::clear_scope_registry();
1410}
1411
1412#[cfg(any(test, feature = "debug"))]
1413pub(crate) fn debug_task_count() -> usize {
1414 EXECUTOR.with(|exec| exec.borrow().tasks.iter().filter(|t| t.is_some()).count())
1415}
1416
1417/// Return a snapshot of all active tasks: `(task_id, priority, scope_id)`.
1418#[cfg(feature = "debug")]
1419pub(crate) fn debug_task_snapshot() -> Vec<(TaskId, Priority, u64)> {
1420 EXECUTOR.with(|exec| {
1421 let ex = exec.borrow();
1422 let mut snap = Vec::new();
1423 for (idx, slot) in ex.tasks.iter().enumerate() {
1424 if let Some(ref t) = slot {
1425 snap.push((idx as u64, t.priority, t.scope_id));
1426 }
1427 }
1428 snap
1429 })
1430}
1431
1432/// Return the set of task IDs currently in the ready queues.
1433#[cfg(feature = "debug")]
1434pub(crate) fn debug_queued_task_ids() -> Vec<TaskId> {
1435 EXECUTOR.with(|exec| {
1436 let ex = exec.borrow();
1437 let mut ids: Vec<TaskId> = ex
1438 .high_queue
1439 .iter()
1440 .chain(ex.low_queue.iter())
1441 .copied()
1442 .collect();
1443 ids.sort_unstable();
1444 ids.dedup();
1445 ids
1446 })
1447}
1448
1449/// Spawn a task without triggering an automatic flush.
1450/// Used in tests to batch multiple spawns before executing them.
1451#[cfg(test)]
1452pub(crate) fn spawn_no_auto_flush(
1453 priority: Priority,
1454 future: impl Future<Output = ()> + 'static,
1455) -> TaskId {
1456 EXECUTOR.with(|exec| {
1457 let mut ex = exec.borrow_mut();
1458 let task_id = ex.allocate_id();
1459 ex.tasks[task_id as usize] = Some(TaskState {
1460 future: Box::pin(future),
1461 priority,
1462 scope_id: 0,
1463 timer_deadline: 0,
1464 });
1465 ex.enqueue(task_id);
1466 // Do NOT schedule flush.
1467 task_id
1468 })
1469}
1470
1471/// Run a manual flush cycle (for tests that need to control timing).
1472#[cfg(test)]
1473pub(crate) fn flush_all() {
1474 flush();
1475}