auralis_task/executor.rs
1//! Single-threaded executor with priority scheduling, time-budget
2//! awareness, and deferred-signal support.
3//!
4//! ## Architecture
5//!
6//! The executor is stored via a pluggable [`ExecutorStorage`] strategy
7//! (defaulting to a per-thread slot). Before polling a task the future
8//! is **temporarily removed** so that the poll never holds an executor
9//! borrow — this allows nested spawns, wakes, and `set_deferred` calls
10//! without `RefCell` panics.
11//!
12//! The waker carries only a `task_id: u64`, making it trivially
13//! [`Send`] + [`Sync`] for [`Waker::from`].
14
15#![allow(clippy::cast_possible_truncation)]
16
17use std::cell::{Cell, RefCell};
18use std::collections::{BTreeMap, VecDeque};
19use std::future::Future;
20use std::pin::Pin;
21use std::rc::{Rc, Weak};
22use std::sync::Arc;
23use std::task::{Context, Poll, Wake, Waker};
24
25use auralis_signal::Signal;
26
27use crate::Priority;
28
29// ---------------------------------------------------------------------------
30// Types
31// ---------------------------------------------------------------------------
32
33type TaskId = u64;
34
35// ---------------------------------------------------------------------------
36// ScheduleFlush
37// ---------------------------------------------------------------------------
38
39/// Platform hook for scheduling a microtask callback.
40pub trait ScheduleFlush {
41 /// Request that `callback` runs at the next microtask boundary.
42 fn schedule(&self, callback: Box<dyn FnOnce()>);
43}
44
45/// A [`ScheduleFlush`] that fires the callback synchronously.
46///
47/// Makes the executor run-to-completion in unit tests without a browser
48/// event loop.
49#[cfg(test)]
50pub struct TestScheduleFlush;
51
52#[cfg(test)]
53impl ScheduleFlush for TestScheduleFlush {
54 fn schedule(&self, callback: Box<dyn FnOnce()>) {
55 callback();
56 }
57}
58
59// ---------------------------------------------------------------------------
60// TimeSource
61// ---------------------------------------------------------------------------
62
63/// High-resolution time source for the executor's time-budget
64/// accounting.
65///
66/// When registered via [`init_time_source`], the executor queries this
67/// before and after each task poll to decide whether it should yield
68/// control back to the host event loop (default budget: 8 ms).
69///
70/// In Wasm environments the implementation typically delegates to
71/// `performance.now()`. If no [`TimeSource`] is registered the time
72/// budget check is a no-op and the executor runs tasks until the
73/// queues are drained.
74pub trait TimeSource {
75 /// Return the current time in milliseconds.
76 fn now_ms(&self) -> u64;
77}
78
79/// A [`TimeSource`] whose value is explicitly controlled by the test.
80///
81/// Use [`set`](TestTimeSource::set) or [`advance`](TestTimeSource::advance)
82/// to simulate the passage of time during a flush cycle.
83#[cfg(test)]
84pub struct TestTimeSource {
85 now: std::cell::Cell<u64>,
86}
87
88#[cfg(test)]
89impl TestTimeSource {
90 /// Create a new [`TestTimeSource`] with the given initial time.
91 #[must_use]
92 pub fn new(initial_ms: u64) -> Self {
93 Self {
94 now: std::cell::Cell::new(initial_ms),
95 }
96 }
97
98 /// Set the current time to `ms` milliseconds.
99 pub fn set(&self, ms: u64) {
100 self.now.set(ms);
101 }
102
103 /// Advance the current time by `ms` milliseconds.
104 pub fn advance(&self, ms: u64) {
105 self.now.set(self.now.get() + ms);
106 }
107}
108
109#[cfg(test)]
110impl TimeSource for TestTimeSource {
111 fn now_ms(&self) -> u64 {
112 self.now.get()
113 }
114}
115
116// ---------------------------------------------------------------------------
117// TaskWaker — routes wakes to the correct executor via a slot table.
118//
119// Waker::from requires Send + Sync + 'static, so the waker cannot hold
120// an Rc<RefCell<Executor>>. Instead it stores a slot index + generation
121// number. The SLOTS thread_local maps (index, generation) → Weak<Executor>.
122// On wake, the generation is validated before the weak pointer is upgraded.
123// Dead slots are reclaimed when new executors are registered.
124// ---------------------------------------------------------------------------
125
126/// A registered executor slot. The `generation` counter distinguishes
127/// between successive executors that occupy the same slot index (e.g.
128/// after the previous one was dropped and a new one recycles the slot).
129struct Slot {
130 weak: Weak<RefCell<Executor>>,
131 /// Incremented (wrapping) every time this slot is reused.
132 /// A [`TaskWaker`] must present the generation it was created with;
133 /// a mismatch means the waker is stale and is silently ignored.
134 generation: u64,
135}
136
137thread_local! {
138 /// Slot 0 is reserved for the global executor. Instance executors
139 /// occupy subsequent slots. Dead slots (Weak::upgrade returns None)
140 /// are recycled in [`register_executor`].
141 static SLOTS: RefCell<Vec<Slot>> = const { RefCell::new(Vec::new()) };
142}
143
144/// Register an executor in the slot table, returning the assigned
145/// (`slot_id`, `generation`) pair. Dead slots are recycled in-place;
146/// if no dead slot is found a new entry is appended.
147fn register_executor(weak: Weak<RefCell<Executor>>) -> (u64, u64) {
148 SLOTS.with(|slots| {
149 let mut slots = slots.borrow_mut();
150 for (i, slot) in slots.iter_mut().enumerate() {
151 if slot.weak.upgrade().is_none() {
152 slot.weak = weak;
153 // Wrapping is safe: 2^64 reuses of a single slot
154 // would take ~10^14 years at 1 reuse/μs.
155 slot.generation = slot.generation.wrapping_add(1);
156 return (i as u64, slot.generation);
157 }
158 }
159 let gen = 0;
160 slots.push(Slot {
161 weak,
162 generation: gen,
163 });
164 ((slots.len() - 1) as u64, gen)
165 })
166}
167
168/// Look up an executor by slot id, validating the generation.
169fn lookup_executor(slot_id: u64, generation: u64) -> Option<Rc<RefCell<Executor>>> {
170 SLOTS.with(|slots| {
171 let slots = slots.borrow();
172 let slot = slots.get(slot_id as usize)?;
173 if slot.generation != generation {
174 return None;
175 }
176 slot.weak.upgrade()
177 })
178}
179
180struct TaskWaker {
181 task_id: TaskId,
182 priority: Priority,
183 slot_id: u64,
184 generation: u64,
185}
186
187impl Wake for TaskWaker {
188 fn wake(self: Arc<Self>) {
189 let Some(exec) = lookup_executor(self.slot_id, self.generation) else {
190 return;
191 };
192 let maybe_sched = if let Ok(mut ex) = exec.try_borrow_mut() {
193 match self.priority {
194 Priority::High => ex.high_queue.push_back(self.task_id),
195 Priority::Low => ex.low_queue.push_back(self.task_id),
196 }
197 if ex.in_flush {
198 None
199 } else {
200 ex.try_schedule_flush()
201 }
202 } else {
203 PENDING_WAKES.with(|pw| {
204 pw.borrow_mut()
205 .push((self.task_id, self.slot_id, self.generation));
206 });
207 None
208 };
209 if let Some(sched) = maybe_sched {
210 let sid = self.slot_id;
211 let gen = self.generation;
212 sched.schedule(Box::new(move || {
213 if let Some(ex) = lookup_executor(sid, gen) {
214 Executor::flush_instance(&ex);
215 }
216 }));
217 }
218 }
219}
220
221// ---------------------------------------------------------------------------
222// TaskState
223// ---------------------------------------------------------------------------
224
225struct TaskState {
226 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
227 priority: Priority,
228 scope_id: u64,
229 /// Key in [`Executor::timers`] for this task's pending sleep,
230 /// or 0 if the task is not waiting on a timer.
231 timer_deadline: u64,
232}
233
234// ---------------------------------------------------------------------------
235// Executor
236// ---------------------------------------------------------------------------
237
238/// Information about a task panic, passed to the user-registered
239/// [`set_panic_hook`].
240#[derive(Debug)]
241pub struct PanicInfo {
242 /// The executor-assigned task id.
243 pub task_id: u64,
244 /// The scope that owned the task (0 for global tasks).
245 pub scope_id: u64,
246 /// The boxed panic payload.
247 pub payload: Box<dyn std::any::Any + Send>,
248}
249
250/// A single-threaded async task executor with priority queues.
251///
252/// Each [`Executor`] manages its own task slots, ready queues, and
253/// deferred callback buffers. Use [`Executor::new_instance`] to create
254/// an isolated executor (e.g. per SSR request), or use the global
255/// thread-local executor via [`spawn_global`](crate::spawn_global).
256pub struct Executor {
257 high_queue: VecDeque<TaskId>,
258 low_queue: VecDeque<TaskId>,
259 tasks: Vec<Option<TaskState>>,
260 free_slots: Vec<TaskId>,
261 next_task_id: TaskId,
262 is_flush_scheduled: bool,
263 in_flush: bool,
264 deferred_ops: Vec<DeferredOp>,
265 /// Callbacks pushed by `Signal::set` via the schedule hook.
266 /// Drained at the start of every flush before polling tasks.
267 ///
268 /// Unbounded by design — in a single-threaded Wasm context, a tight
269 /// loop of signal sets will block the UI thread anyway, so adding a
270 /// capacity limit wouldn't improve the situation. SSR / multi-tenant
271 /// users should ensure that application code doesn't produce
272 /// unbounded signal churn within a single request.
273 deferred_callbacks: Vec<Box<dyn FnOnce()>>,
274 flush_scheduler: Option<Rc<dyn ScheduleFlush>>,
275 time_source: Option<Rc<dyn TimeSource>>,
276 /// Maximum milliseconds to spend inside a single flush before
277 /// yielding back to the host event loop. Default: 8 ms.
278 time_budget_ms: u64,
279 /// Optional hook invoked when a spawned task panics.
280 panic_hook: Option<Rc<dyn Fn(PanicInfo)>>,
281 /// Timer queue: map from deadline (ms) to task ids that should be
282 /// woken when that deadline expires. Processed at the start of
283 /// every flush.
284 timers: BTreeMap<u64, Vec<TaskId>>,
285 /// Slot index and generation in [`SLOTS`] for routing wakes back
286 /// to this executor. Set by [`new_instance`] or lazily for the
287 /// global executor.
288 slot_id: u64,
289 generation: u64,
290 /// Whether this executor has been registered in [`SLOTS`].
291 registered: bool,
292}
293
294// Set by the executor before polling a task, cleared afterward.
295// Lets futures discover their task id without threading it through
296// layers of combinators.
297thread_local! {
298 static CURRENT_POLLING_TASK: Cell<Option<TaskId>> = const { Cell::new(None) };
299}
300
301pub(crate) fn with_current_polling_task<R>(f: impl FnOnce(Option<TaskId>) -> R) -> R {
302 CURRENT_POLLING_TASK.with(|c| f(c.get()))
303}
304
305struct DeferredOp {
306 f: Box<dyn FnOnce()>,
307}
308
309impl Executor {
310 fn new() -> Self {
311 Self {
312 high_queue: VecDeque::new(),
313 low_queue: VecDeque::new(),
314 tasks: Vec::new(),
315 free_slots: Vec::new(),
316 next_task_id: 0,
317 is_flush_scheduled: false,
318 in_flush: false,
319 deferred_ops: Vec::new(),
320 deferred_callbacks: Vec::new(),
321 flush_scheduler: None,
322 time_source: None,
323 time_budget_ms: 8,
324 panic_hook: None,
325 timers: BTreeMap::new(),
326 slot_id: 0,
327 generation: 0,
328 registered: false,
329 }
330 }
331
332 fn allocate_id(&mut self) -> TaskId {
333 if let Some(id) = self.free_slots.pop() {
334 return id;
335 }
336 let id = self.next_task_id;
337 self.next_task_id += 1;
338 self.tasks.push(None);
339 id
340 }
341
342 /// Release a task slot back to the free list.
343 ///
344 /// **Caller must ensure** that `task_id` has not already been freed
345 /// (e.g. via [`cancel_task`] or [`cancel_scope_tasks_on`]). This
346 /// method unconditionally pushes to `free_slots` — pushing the same
347 /// id twice would cause [`allocate_id`] to hand it out twice.
348 fn free_slot(&mut self, task_id: TaskId) {
349 // Clean up any pending timer for this task so a recycled
350 // task ID is not spuriously woken by an old deadline.
351 // This works when the slot is still occupied (scope cancel
352 // path). For normal completion (Poll::Ready), the slot is
353 // already None and the caller must call cleanup_timer first.
354 if let Some(Some(ref t)) = self.tasks.get(task_id as usize) {
355 if t.timer_deadline != 0 {
356 self.cleanup_timer(task_id, t.timer_deadline);
357 }
358 }
359 self.tasks[task_id as usize] = None;
360 self.free_slots.push(task_id);
361 }
362
363 /// Remove a timer entry for `task_id` from the timer map.
364 fn cleanup_timer(&mut self, task_id: TaskId, deadline: u64) {
365 if let Some(tids) = self.timers.get_mut(&deadline) {
366 tids.retain(|id| *id != task_id);
367 if tids.is_empty() {
368 self.timers.remove(&deadline);
369 }
370 }
371 }
372
373 fn enqueue(&mut self, task_id: TaskId) {
374 let priority = match self.tasks.get(task_id as usize).and_then(Option::as_ref) {
375 Some(t) => t.priority,
376 None => return,
377 };
378 match priority {
379 Priority::High => self.high_queue.push_back(task_id),
380 Priority::Low => self.low_queue.push_back(task_id),
381 }
382 }
383
384 fn dequeue(&mut self) -> Option<TaskId> {
385 self.high_queue
386 .pop_front()
387 .or_else(|| self.low_queue.pop_front())
388 }
389
390 /// Mark that a flush is needed and return the scheduler if one is
391 /// registered. The caller **must** invoke the scheduler **after**
392 /// releasing the executor borrow.
393 fn try_schedule_flush(&mut self) -> Option<Rc<dyn ScheduleFlush>> {
394 if self.is_flush_scheduled {
395 return None;
396 }
397 self.is_flush_scheduled = true;
398 self.flush_scheduler.clone()
399 }
400
401 /// Return the current time in ms, or 0 if no [`TimeSource`] is
402 /// registered. When this returns 0 the time-budget check is
403 /// effectively a no-op.
404 pub(crate) fn now_ms(&self) -> u64 {
405 self.time_source.as_ref().map_or(0, |ts| ts.now_ms())
406 }
407
408 /// Return the number of currently active (not-yet-completed) tasks.
409 ///
410 /// Used by streaming SSR to determine whether the stream should
411 /// wait for more work or terminate.
412 #[must_use]
413 pub fn active_task_count(&self) -> usize {
414 self.tasks.iter().filter(|t| t.is_some()).count()
415 }
416}
417
418// ---------------------------------------------------------------------------
419// Thread-local globals (default storage)
420// ---------------------------------------------------------------------------
421
422thread_local! {
423 static EXECUTOR: Rc<RefCell<Executor>> = Rc::new(RefCell::new(Executor::new()));
424 static PENDING_WAKES: RefCell<Vec<(TaskId, u64, u64)>> =
425 const { RefCell::new(Vec::new()) };
426}
427
428/// Ensure the global executor is registered in slot 0 (lazy, idempotent).
429/// Returns (`slot_id`, `generation`) for the global executor.
430fn ensure_global_registered() -> (u64, u64) {
431 SLOTS.with(|slots| {
432 let mut slots = slots.borrow_mut();
433 if slots.is_empty() {
434 let weak = EXECUTOR.with(Rc::downgrade);
435 slots.push(Slot {
436 weak,
437 generation: 0,
438 });
439 } else {
440 // Verify slot 0 still holds the global executor.
441 let global = EXECUTOR.with(Rc::clone);
442 let is_global = slots[0]
443 .weak
444 .upgrade()
445 .is_some_and(|ex| Rc::ptr_eq(&ex, &global));
446 if !is_global {
447 slots[0] = Slot {
448 weak: Rc::downgrade(&global),
449 generation: slots[0].generation.wrapping_add(1),
450 };
451 }
452 }
453 // Mark the global executor as registered so flush_instance
454 // doesn't call this function again on every flush.
455 EXECUTOR.with(|ex| {
456 let mut e = ex.borrow_mut();
457 e.slot_id = 0;
458 e.generation = slots[0].generation;
459 e.registered = true;
460 });
461 let gen = slots[0].generation;
462 (0, gen)
463 })
464}
465
466// ---------------------------------------------------------------------------
467// Executor instance methods (for isolated executors, e.g. SSR)
468// ---------------------------------------------------------------------------
469
470impl Executor {
471 /// Create a new isolated executor, wrapped for shared access.
472 ///
473 /// The returned executor is independent of the global thread-local
474 /// executor. Use [`with_executor`] to make it the current executor
475 /// for the duration of a closure, so that spawned tasks and signal
476 /// callbacks are routed to it.
477 #[must_use]
478 pub fn new_instance() -> Rc<RefCell<Executor>> {
479 let ex = Rc::new(RefCell::new(Executor::new()));
480 // Register in the slot table so TaskWaker can find this executor.
481 let (slot_id, generation) = register_executor(Rc::downgrade(&ex));
482 {
483 let mut e = ex.borrow_mut();
484 e.slot_id = slot_id;
485 e.generation = generation;
486 e.registered = true;
487 }
488 ex
489 }
490
491 /// Install a flush scheduler on this executor instance.
492 pub fn install_flush_scheduler(ex: &Rc<RefCell<Executor>>, sched: Rc<dyn ScheduleFlush>) {
493 ex.borrow_mut().flush_scheduler = Some(sched);
494 }
495
496 /// Install a time source on this executor instance.
497 pub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>) {
498 ex.borrow_mut().time_source = Some(ts);
499 }
500
501 /// Set the maximum time (in milliseconds) a single flush may spend
502 /// before yielding back to the host event loop.
503 ///
504 /// The default is 8 ms. Set to `u64::MAX` to disable time-budget
505 /// yielding (flush runs to completion).
506 pub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64) {
507 ex.borrow_mut().time_budget_ms = budget_ms;
508 }
509
510 /// Register a callback invoked whenever a spawned task panics.
511 ///
512 /// The default is no hook — panicking tasks are silently removed
513 /// from the executor (the same behaviour as a task returning
514 /// `Poll::Ready(())`).
515 ///
516 /// # Example
517 ///
518 /// ```rust,ignore
519 /// Executor::set_panic_hook(&ex, Rc::new(|info| {
520 /// eprintln!("task {} in scope {} panicked", info.task_id, info.scope_id);
521 /// }));
522 /// ```
523 pub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>) {
524 ex.borrow_mut().panic_hook = Some(hook);
525 }
526
527 /// Register a timer: when `now_ms() >= deadline_ms`, enqueue
528 /// `task_id` so it gets polled on the next flush.
529 pub(crate) fn schedule_timer(ex: &Rc<RefCell<Executor>>, deadline_ms: u64, task_id: TaskId) {
530 let mut e = ex.borrow_mut();
531 // If this task already has a pending timer (e.g. previous SleepFuture
532 // was dropped via select!), clean up the old entry so the timer map
533 // doesn't accumulate stale deadlines.
534 let old_deadline = e
535 .tasks
536 .get(task_id as usize)
537 .and_then(Option::as_ref)
538 .map_or(0, |t| t.timer_deadline);
539 if old_deadline != 0 {
540 e.cleanup_timer(task_id, old_deadline);
541 }
542 e.timers.entry(deadline_ms).or_default().push(task_id);
543 // Set the reverse index so cancel_scope_tasks can find this entry.
544 if let Some(Some(ref mut t)) = e.tasks.get_mut(task_id as usize) {
545 t.timer_deadline = deadline_ms;
546 }
547 // Request a flush so the timer is checked.
548 e.is_flush_scheduled = false;
549 let maybe_sched = e.try_schedule_flush();
550 drop(e);
551 if let Some(sched) = maybe_sched {
552 let ex2 = Rc::clone(ex);
553 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
554 }
555 }
556
557 /// Spawn a future on this executor instance.
558 pub fn spawn(ex: &Rc<RefCell<Executor>>, future: impl Future<Output = ()> + 'static) {
559 let maybe_sched = {
560 let mut e = ex.borrow_mut();
561 let tid = e.allocate_id();
562 e.tasks[tid as usize] = Some(TaskState {
563 future: Box::pin(future),
564 priority: Priority::Low,
565 scope_id: 0,
566 timer_deadline: 0,
567 });
568 e.enqueue(tid);
569 e.try_schedule_flush()
570 };
571 if let Some(sched) = maybe_sched {
572 let ex2 = Rc::clone(ex);
573 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
574 }
575 }
576
577 /// Run a full flush cycle on this executor instance.
578 ///
579 /// Mirrors the global flush cycle but operates on an
580 /// isolated executor (used for SSR). Includes all the same
581 /// protections: `catch_unwind`, suspend checks, time-budget
582 /// yielding, and callback-drain budget.
583 #[allow(clippy::too_many_lines)]
584 pub fn flush_instance(ex: &Rc<RefCell<Executor>>) {
585 // Guard against re-entrant flushes.
586 {
587 let mut e = ex.borrow_mut();
588 if e.in_flush {
589 #[cfg(debug_assertions)]
590 {
591 eprintln!(
592 "[auralis-task] WARNING: Executor::flush_instance called \
593 re-entrantly (already inside a flush). This is a no-op. \
594 Check for nested flush() calls in signal callbacks or \
595 ScheduleFlush implementations."
596 );
597 }
598 return;
599 }
600 e.in_flush = true;
601 }
602
603 // Set this executor as the current one so that TaskWaker
604 // (which cannot hold an Rc) can discover it via thread-local.
605 // Restore on scope exit (including early returns for time-budget
606 // yielding and re-entrancy).
607 let prev_executor = CURRENT_EXECUTOR.with(|c| c.borrow_mut().replace(Rc::clone(ex)));
608 let _restore = RestoreExecutor(prev_executor);
609
610 // Step 0: drain expired timers.
611 {
612 let mut e = ex.borrow_mut();
613 let now = e.now_ms();
614 // When no TimeSource is registered (now == 0), expire all
615 // timers — they've already been woken via wake_by_ref and
616 // just need to be re-polled.
617 if now == 0 {
618 for (_, tasks) in std::mem::take(&mut e.timers) {
619 for tid in tasks {
620 // Clear the reverse index since the timer has fired.
621 if let Some(Some(ref mut t)) = e.tasks.get_mut(tid as usize) {
622 t.timer_deadline = 0;
623 }
624 e.enqueue(tid);
625 }
626 }
627 } else {
628 let expired: Vec<u64> =
629 e.timers.keys().copied().take_while(|&d| d <= now).collect();
630 for deadline in expired {
631 if let Some(tasks) = e.timers.remove(&deadline) {
632 for tid in tasks {
633 if let Some(Some(ref mut t)) = e.tasks.get_mut(tid as usize) {
634 t.timer_deadline = 0;
635 }
636 e.enqueue(tid);
637 }
638 }
639 }
640 }
641 }
642
643 // Step 1: deferred ops.
644 let deferred = std::mem::take(&mut ex.borrow_mut().deferred_ops);
645 for op in deferred {
646 (op.f)();
647 }
648
649 // Step 2: drain deferred signal callbacks with time budget.
650 {
651 let cb_start = ex.borrow().now_ms();
652 loop {
653 let callbacks = std::mem::take(&mut ex.borrow_mut().deferred_callbacks);
654 if callbacks.is_empty() {
655 break;
656 }
657 for cb in callbacks {
658 // Isolate each callback so a panic in one subscriber
659 // doesn't block the remaining notifications or wedge
660 // the executor (in_flush stays true on unwind).
661 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(cb));
662 }
663 if ex.borrow().now_ms().saturating_sub(cb_start) >= ex.borrow().time_budget_ms {
664 if !ex.borrow().deferred_callbacks.is_empty() {
665 let (sched, ex2) = {
666 let mut e = ex.borrow_mut();
667 e.in_flush = false;
668 e.is_flush_scheduled = false;
669 (e.try_schedule_flush(), Rc::clone(ex))
670 };
671 if let Some(sched) = sched {
672 sched.schedule(Box::new(move || Self::flush_instance(&ex2)));
673 }
674 return;
675 }
676 break;
677 }
678 }
679 }
680
681 // Step 3: main poll loop with time-budget check.
682 let poll_start = ex.borrow().now_ms();
683 loop {
684 let task_id = ex.borrow_mut().dequeue();
685 let Some(tid) = task_id else {
686 let mut e = ex.borrow_mut();
687 e.is_flush_scheduled = false;
688 e.in_flush = false;
689 break;
690 };
691
692 // Take the task out so the poll doesn't hold an executor borrow.
693 let maybe_state = ex.borrow_mut().tasks[tid as usize].take();
694 if let Some(mut state) = maybe_state {
695 let priority = state.priority;
696 let scope_id = state.scope_id;
697
698 // Check if the owning scope is suspended.
699 let scope = crate::scope::find_scope(scope_id);
700 if let Some(ref s) = scope {
701 if s.is_suspended() {
702 let mut e = ex.borrow_mut();
703 if e.tasks[tid as usize].is_none() {
704 e.tasks[tid as usize] = Some(state);
705 }
706 continue;
707 }
708 }
709
710 // Ensure the executor is registered in the slot table.
711 // Must not call ensure_global_registered while holding
712 // a borrow on ex (it borrows the global EXECUTOR).
713 let (slot_id, gen) = {
714 let e = ex.borrow();
715 if e.registered {
716 (e.slot_id, e.generation)
717 } else {
718 drop(e);
719 ensure_global_registered()
720 }
721 };
722 let waker = Waker::from(Arc::new(TaskWaker {
723 task_id: tid,
724 priority,
725 slot_id,
726 generation: gen,
727 }));
728 let mut cx = Context::from_waker(&waker);
729
730 // Inject owning scope.
731 let prev_scope = crate::scope::get_scope_direct();
732 if scope.is_some() {
733 crate::scope::set_scope_direct(scope);
734 }
735
736 // Let futures discover their task id (used by timer::sleep).
737 // Save and restore so that a nested flush (sync scheduler)
738 // doesn't leave the outer task without its id afterward.
739 let prev_polling = CURRENT_POLLING_TASK.with(|c| c.replace(Some(tid)));
740
741 // Task isolation — prevents a panicking task from
742 // unwinding through flush and leaving in_flush set.
743 let result: Result<Poll<()>, Box<dyn std::any::Any + Send>> =
744 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
745 state.future.as_mut().poll(&mut cx)
746 }));
747
748 CURRENT_POLLING_TASK.with(|c| c.set(prev_polling));
749 crate::scope::set_scope_direct(prev_scope);
750
751 // Extract timer_deadline before state is dropped, so
752 // we can clean up the timer entry (free_slot can't
753 // read it because the slot is already None).
754 let timer_dl = state.timer_deadline;
755
756 match result {
757 Ok(Poll::Ready(())) => {
758 if timer_dl != 0 {
759 ex.borrow_mut().cleanup_timer(tid, timer_dl);
760 }
761 ex.borrow_mut().free_slot(tid);
762 }
763 Err(payload) => {
764 if timer_dl != 0 {
765 ex.borrow_mut().cleanup_timer(tid, timer_dl);
766 }
767 // Notify the panic hook (if any) before freeing the slot.
768 let hook = ex.borrow().panic_hook.clone();
769 if let Some(h) = hook {
770 h(PanicInfo {
771 task_id: tid,
772 scope_id,
773 payload,
774 });
775 }
776 ex.borrow_mut().free_slot(tid);
777 }
778 Ok(Poll::Pending) => {
779 let mut e = ex.borrow_mut();
780 if e.tasks[tid as usize].is_none() {
781 e.tasks[tid as usize] = Some(state);
782 }
783 }
784 }
785 }
786
787 // Time budget check.
788 {
789 let elapsed = ex.borrow().now_ms().saturating_sub(poll_start);
790 if elapsed >= ex.borrow().time_budget_ms {
791 let (maybe_sched, ex_clone) = {
792 let mut e = ex.borrow_mut();
793 e.is_flush_scheduled = false;
794 e.in_flush = false;
795 let sched = if !e.high_queue.is_empty() || !e.low_queue.is_empty() {
796 e.try_schedule_flush()
797 } else {
798 None
799 };
800 (sched, Rc::clone(ex))
801 };
802 if let Some(sched) = maybe_sched {
803 sched.schedule(Box::new(move || Self::flush_instance(&ex_clone)));
804 }
805 break;
806 }
807 }
808 }
809
810 // Drain any wakes that were buffered while the executor RefCell
811 // was borrowed (PENDING_WAKES fallback in TaskWaker::wake).
812 drain_pending_wakes();
813 }
814}
815
816// ---------------------------------------------------------------------------
817// Current-executor storage — injectable, defaults to thread-local
818// ---------------------------------------------------------------------------
819
820pub(crate) type ExecutorRef = Rc<RefCell<Executor>>;
821
822/// RAII guard that restores the previous executor when dropped.
823struct RestoreExecutor(Option<ExecutorRef>);
824
825impl Drop for RestoreExecutor {
826 fn drop(&mut self) {
827 CURRENT_EXECUTOR.with(|c| {
828 *c.borrow_mut() = self.0.take();
829 });
830 }
831}
832
833thread_local! {
834 static CURRENT_EXECUTOR: RefCell<Option<ExecutorRef>> = const { RefCell::new(None) };
835}
836
837/// Run `f` with `ex` set as the current executor.
838///
839/// Signal callbacks and `spawn_global` calls inside `f` will be routed
840/// to `ex` instead of the global thread-local executor. Restores the
841/// previous executor afterward.
842///
843/// # Signal routing constraints
844///
845/// Auralis uses a **single global schedule hook** (installed once by the
846/// first call to [`init_flush_scheduler`]) that decides where signal
847/// notifications land by checking the current executor **at the time the
848/// notification fires**, not at the time `Signal::set` is called.
849///
850/// This design implies two hard requirements for multi-instance users:
851///
852/// 1. **`init_flush_scheduler` must be called at least once** — without
853/// it, `Signal::set` falls back to synchronous callback execution,
854/// which breaks the deferred-notification model and can cause
855/// re-entrant borrow panics.
856/// 2. **The instance executor must still be "current" when the flush
857/// runs** — if `with_executor` has already exited, deferred callbacks
858/// from signals set inside `f` will be routed to the global executor
859/// (or synchronously if no global hook is installed).
860///
861/// For the typical single-threaded case (Wasm, game loop, CLI), both
862/// requirements are satisfied trivially: call `init_flush_scheduler`
863/// once at startup and never use `with_executor`. For SSR / multi-tenant
864/// servers, ensure that `with_executor` wraps the entire request
865/// lifecycle — from signal creation through the final flush.
866///
867/// # Example
868///
869/// ```rust,ignore
870/// use auralis_task::Executor;
871///
872/// let ex = Executor::new_instance();
873/// Executor::install_flush_scheduler(&ex, my_scheduler);
874/// auralis_task::with_executor(&ex, || {
875/// // Signal notifications and task spawns here go to `ex`.
876/// });
877/// ```
878pub fn with_executor<R>(ex: &ExecutorRef, f: impl FnOnce() -> R) -> R {
879 CURRENT_EXECUTOR.with(|exec| {
880 let prev = exec.borrow_mut().replace(Rc::clone(ex));
881 let result = f();
882 *exec.borrow_mut() = prev;
883 result
884 })
885}
886
887/// Return the current executor, if any.
888///
889/// If no executor has been set via [`with_executor`], returns `None` —
890/// callers should fall back to the global thread-local executor.
891fn current_executor() -> Option<ExecutorRef> {
892 CURRENT_EXECUTOR.with(|exec| exec.borrow().clone())
893}
894
895/// Return the currently active executor instance.
896///
897/// If [`with_executor`] was used to set an instance executor, returns
898/// that; otherwise returns the global thread-local executor.
899pub(crate) fn current_executor_instance() -> ExecutorRef {
900 current_executor().unwrap_or_else(|| EXECUTOR.with(Rc::clone))
901}
902
903/// Return the current time in milliseconds from the active executor's
904/// [`TimeSource`], or 0 if none is installed.
905pub(crate) fn current_time_ms() -> u64 {
906 current_executor_instance().borrow().now_ms()
907}
908
909// ---------------------------------------------------------------------------
910// Helpers — use thread_local EXECUTOR
911// ---------------------------------------------------------------------------
912
913/// Drain wakes that were buffered into [`PENDING_WAKES`] because the
914/// executor's `RefCell` was borrowed at the time [`TaskWaker::wake`]
915/// fired. Called at the end of every [`Executor::flush_instance`].
916fn drain_pending_wakes() {
917 PENDING_WAKES.with(|pw| {
918 let wakes = std::mem::take(&mut *pw.borrow_mut());
919 for (tid, slot_id, gen) in wakes {
920 let Some(exec) = lookup_executor(slot_id, gen) else {
921 continue;
922 };
923 // Use enqueue() for the stale-task-id safety check.
924 exec.borrow_mut().enqueue(tid);
925 let maybe_sched = exec.borrow_mut().try_schedule_flush();
926 if let Some(sched) = maybe_sched {
927 let sid = slot_id;
928 let g = gen;
929 sched.schedule(Box::new(move || {
930 if let Some(ex) = lookup_executor(sid, g) {
931 Executor::flush_instance(&ex);
932 }
933 }));
934 }
935 }
936 });
937}
938
939// ---------------------------------------------------------------------------
940// Flush
941// ---------------------------------------------------------------------------
942
943fn flush() {
944 EXECUTOR.with(Executor::flush_instance);
945}
946
947// ---------------------------------------------------------------------------
948// Public API
949// ---------------------------------------------------------------------------
950
951/// Set the platform flush scheduler and install the signal deferred-
952/// callback hook (idempotent — subsequent calls are no-ops for the hook).
953pub fn init_flush_scheduler(sched: Rc<dyn ScheduleFlush>) {
954 EXECUTOR.with(|exec| exec.borrow_mut().flush_scheduler = Some(sched));
955 install_signal_hook_once();
956}
957
958/// Install the hook that bridges `auralis_signal::Signal::set` to the
959/// executor's deferred-callback queue.
960///
961/// Idempotent — safe to call multiple times.
962fn install_signal_hook_once() {
963 use std::sync::OnceLock;
964 static INSTALLED: OnceLock<()> = OnceLock::new();
965 INSTALLED.get_or_init(|| {
966 auralis_signal::install_schedule_hook(Box::new(|cb: Box<dyn FnOnce()>| {
967 // Prefer the current executor (set via `with_executor`) for
968 // SSR multi-request isolation; fall back to the global one.
969 if let Some(ex) = current_executor() {
970 let maybe_sched = {
971 let mut e = ex.borrow_mut();
972 e.deferred_callbacks.push(cb);
973 if e.in_flush {
974 None
975 } else {
976 e.try_schedule_flush()
977 }
978 };
979 if let Some(sched) = maybe_sched {
980 let ex2 = Rc::clone(&ex);
981 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
982 }
983 } else {
984 EXECUTOR.with(|exec| {
985 let maybe_sched = {
986 let mut ex = exec.borrow_mut();
987 ex.deferred_callbacks.push(cb);
988 if ex.in_flush {
989 None
990 } else {
991 ex.try_schedule_flush()
992 }
993 };
994 if let Some(sched) = maybe_sched {
995 sched.schedule(Box::new(flush));
996 }
997 });
998 }
999 }));
1000 });
1001}
1002
1003/// Set the platform time source used for time-budget accounting.
1004///
1005/// If no [`TimeSource`] is registered the executor runs every flush to
1006/// completion without yielding, which is acceptable for short-running
1007/// workloads but may cause frame drops in the browser.
1008pub fn init_time_source(ts: Rc<dyn TimeSource>) {
1009 EXECUTOR.with(|exec| exec.borrow_mut().time_source = Some(ts));
1010}
1011
1012/// Set the per-flush time budget on the global executor.
1013///
1014/// See [`Executor::set_time_budget`] for details.
1015pub fn set_global_time_budget(budget_ms: u64) {
1016 EXECUTOR.with(|exec| exec.borrow_mut().time_budget_ms = budget_ms);
1017}
1018
1019/// Register a global panic hook called when any globally-spawned
1020/// task panics.
1021///
1022/// See [`Executor::set_panic_hook`] for details.
1023pub fn set_panic_hook(hook: Rc<dyn Fn(PanicInfo)>) {
1024 EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = Some(hook));
1025}
1026
1027/// Remove the global panic hook, restoring the default silent
1028/// behaviour.
1029pub fn remove_panic_hook() {
1030 EXECUTOR.with(|exec| exec.borrow_mut().panic_hook = None);
1031}
1032
1033/// Spawn a future on the global executor at low priority.
1034///
1035/// **Important:** [`init_flush_scheduler`] must be called before spawning
1036/// any tasks. Without a flush scheduler, spawned tasks will sit in the
1037/// queue indefinitely because the executor has no way to schedule a flush
1038/// cycle.
1039pub fn spawn_global(future: impl Future<Output = ()> + 'static) {
1040 spawn_global_with_priority(Priority::Low, future);
1041}
1042
1043/// Spawn a future on the global executor at the given priority.
1044pub fn spawn_global_with_priority(priority: Priority, future: impl Future<Output = ()> + 'static) {
1045 spawn_inner_on(&EXECUTOR.with(Rc::clone), Box::pin(future), priority, 0);
1046}
1047
1048/// Spawn a future on a specific executor and scope.
1049pub(crate) fn spawn_scoped_on(
1050 ex: &Rc<RefCell<Executor>>,
1051 priority: Priority,
1052 scope_id: u64,
1053 future: impl Future<Output = ()> + 'static,
1054) -> TaskId {
1055 spawn_inner_on(ex, Box::pin(future), priority, scope_id)
1056}
1057
1058fn spawn_inner_on(
1059 ex: &Rc<RefCell<Executor>>,
1060 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
1061 priority: Priority,
1062 scope_id: u64,
1063) -> TaskId {
1064 let (task_id, maybe_sched) = {
1065 let mut e = ex.borrow_mut();
1066 let task_id = e.allocate_id();
1067 e.tasks[task_id as usize] = Some(TaskState {
1068 future,
1069 priority,
1070 scope_id,
1071 timer_deadline: 0,
1072 });
1073 e.enqueue(task_id);
1074 let sched = e.try_schedule_flush();
1075 (task_id, sched)
1076 };
1077 if let Some(sched) = maybe_sched {
1078 let ex2 = Rc::clone(ex);
1079 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1080 }
1081 task_id
1082}
1083
1084/// Enqueue all tasks belonging to `scope_id` on a given executor.
1085///
1086/// Used by [`TaskScope::resume`] to restart tasks after a suspend.
1087pub(crate) fn enqueue_scope_tasks_on(ex: &ExecutorRef, task_ids: &[TaskId]) {
1088 if task_ids.is_empty() {
1089 return;
1090 }
1091 let maybe_sched = {
1092 let mut e = ex.borrow_mut();
1093 for tid in task_ids {
1094 e.enqueue(*tid);
1095 }
1096 if e.in_flush {
1097 None
1098 } else {
1099 e.try_schedule_flush()
1100 }
1101 };
1102 if let Some(sched) = maybe_sched {
1103 let ex2 = Rc::clone(ex);
1104 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1105 }
1106}
1107/// Cancel all tasks belonging to `scope_id` on a specific executor.
1108pub(crate) fn cancel_scope_tasks_on(
1109 ex: &Rc<RefCell<Executor>>,
1110 task_ids: &[TaskId],
1111) -> Vec<Pin<Box<dyn Future<Output = ()>>>> {
1112 if task_ids.is_empty() {
1113 return Vec::new();
1114 }
1115
1116 let mut e = ex.borrow_mut();
1117 let mut dropped = Vec::with_capacity(task_ids.len());
1118
1119 // Collect timer deadlines before mutating.
1120 let mut timer_deadlines: Vec<(u64, TaskId)> = Vec::new();
1121 for &tid in task_ids {
1122 let idx = tid as usize;
1123 if idx < e.tasks.len() {
1124 if let Some(ref t) = e.tasks[idx] {
1125 if t.timer_deadline != 0 {
1126 timer_deadlines.push((t.timer_deadline, tid));
1127 }
1128 }
1129 }
1130 }
1131 for (dl, tid) in &timer_deadlines {
1132 e.cleanup_timer(*tid, *dl);
1133 }
1134
1135 // Cancel each task by id (direct lookup, no full-table scan).
1136 // Cancel each task by id (direct lookup, no full-table scan).
1137 // Only push to free_slots for slots we actually took.
1138 for &tid in task_ids {
1139 let idx = tid as usize;
1140 if idx < e.tasks.len() {
1141 if let Some(state) = e.tasks[idx].take() {
1142 dropped.push(state.future);
1143 e.free_slots.push(tid);
1144 }
1145 }
1146 }
1147 e.free_slots.sort_unstable();
1148 e.free_slots.dedup();
1149
1150 // Filter queues to remove cancelled tasks.
1151 let high: Vec<TaskId> = e
1152 .high_queue
1153 .iter()
1154 .copied()
1155 .filter(|&id| {
1156 let idx = id as usize;
1157 idx < e.tasks.len() && e.tasks[idx].is_some()
1158 })
1159 .collect();
1160 e.high_queue.clear();
1161 e.high_queue.extend(high);
1162
1163 let low: Vec<TaskId> = e
1164 .low_queue
1165 .iter()
1166 .copied()
1167 .filter(|&id| {
1168 let idx = id as usize;
1169 idx < e.tasks.len() && e.tasks[idx].is_some()
1170 })
1171 .collect();
1172 e.low_queue.clear();
1173 e.low_queue.extend(low);
1174
1175 dropped
1176}
1177
1178/// Cancel a single task by its id, dropping its future and cleaning up
1179/// its timer if any. No-op if the task has already completed.
1180pub(crate) fn cancel_task(ex: &Rc<RefCell<Executor>>, task_id: TaskId) {
1181 let mut e = ex.borrow_mut();
1182 let idx = task_id as usize;
1183 if idx >= e.tasks.len() {
1184 return;
1185 }
1186 let deadline = e.tasks[idx].as_ref().map_or(0, |t| t.timer_deadline);
1187 if deadline != 0 {
1188 e.cleanup_timer(task_id, deadline);
1189 }
1190 let slot = e.tasks[idx].take();
1191 if slot.is_some() {
1192 e.free_slots.push(task_id);
1193 e.high_queue.retain(|&id| id != task_id);
1194 e.low_queue.retain(|&id| id != task_id);
1195 }
1196}
1197
1198/// Check whether a task slot is empty (task completed or was cancelled).
1199pub(crate) fn is_task_finished(ex: &Rc<RefCell<Executor>>, task_id: TaskId) -> bool {
1200 let e = ex.borrow();
1201 let idx = task_id as usize;
1202 idx >= e.tasks.len() || e.tasks[idx].is_none()
1203}
1204
1205// ---------------------------------------------------------------------------
1206// yield_now
1207// ---------------------------------------------------------------------------
1208
1209/// Return a [`Future`] that yields control back to the executor once.
1210#[must_use = "yield_now() does nothing unless awaited"]
1211pub fn yield_now() -> YieldNow {
1212 YieldNow { yielded: false }
1213}
1214
1215/// Future returned by [`yield_now`].
1216#[derive(Debug)]
1217#[must_use = "futures do nothing unless polled"]
1218pub struct YieldNow {
1219 yielded: bool,
1220}
1221
1222impl Future for YieldNow {
1223 type Output = ();
1224
1225 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1226 if self.yielded {
1227 Poll::Ready(())
1228 } else {
1229 self.yielded = true;
1230 cx.waker().wake_by_ref();
1231 Poll::Pending
1232 }
1233 }
1234}
1235
1236// ---------------------------------------------------------------------------
1237// schedule_callback — hook for auralis-signal's deferred callback model
1238// ---------------------------------------------------------------------------
1239
1240/// Schedule a closure to run at the start of the next executor flush.
1241///
1242/// Used internally by `auralis_signal` to defer subscriber callback
1243/// execution. The closure is drained before the main poll loop.
1244///
1245/// Routes to the current executor (via [`with_executor`]) when one is
1246/// active; falls back to the global thread-local executor.
1247pub fn schedule_callback(f: Box<dyn FnOnce()>) {
1248 let exec = current_executor_instance();
1249 let maybe_sched = {
1250 let mut ex = exec.borrow_mut();
1251 ex.deferred_callbacks.push(f);
1252 if ex.in_flush {
1253 None
1254 } else {
1255 ex.try_schedule_flush()
1256 }
1257 };
1258 if let Some(sched) = maybe_sched {
1259 let ex2 = Rc::clone(&exec);
1260 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1261 }
1262}
1263
1264// ---------------------------------------------------------------------------
1265// set_deferred
1266// ---------------------------------------------------------------------------
1267
1268/// Schedule a [`Signal::set`] call for the **next** executor flush.
1269///
1270/// Safe to call from inside [`Drop`] — the actual `signal.set(value)` is
1271/// deferred to a subsequent flush, avoiding re-entrant borrow panics.
1272///
1273/// Routes to the current executor (via [`with_executor`]) when one is
1274/// active; falls back to the global thread-local executor.
1275pub fn set_deferred<T: 'static>(signal: &Signal<T>, value: T) {
1276 let signal = signal.clone();
1277 let exec = current_executor_instance();
1278 let maybe_sched = {
1279 let mut ex = exec.borrow_mut();
1280 ex.deferred_ops.push(DeferredOp {
1281 f: Box::new(move || signal.set(value)),
1282 });
1283 ex.try_schedule_flush()
1284 };
1285 if let Some(sched) = maybe_sched {
1286 let ex2 = Rc::clone(&exec);
1287 sched.schedule(Box::new(move || Executor::flush_instance(&ex2)));
1288 }
1289}
1290
1291// ---------------------------------------------------------------------------
1292// Test / debug helpers
1293// ---------------------------------------------------------------------------
1294
1295/// Completely reset the global executor to a pristine state.
1296///
1297/// Clears all task slots, queues, deferred ops, flush/scheduler flags,
1298/// and injected [`ScheduleFlush`]/[`TimeSource`]. Call at the start
1299/// of every test to prevent cross-test state leakage.
1300///
1301/// # Safety / usage
1302///
1303/// This function is intended **only** for testing. Calling it while
1304/// the executor is processing tasks will silently drop all live
1305/// futures and may cause panics or undefined behavior in running
1306/// application code.
1307pub fn reset_executor_for_test() {
1308 PENDING_WAKES.with(|pw| pw.borrow_mut().clear());
1309 SLOTS.with(|s| s.borrow_mut().clear());
1310 CURRENT_EXECUTOR.with(|c| *c.borrow_mut() = None);
1311 EXECUTOR.with(|exec| {
1312 let mut ex = exec.borrow_mut();
1313 ex.high_queue.clear();
1314 ex.low_queue.clear();
1315 ex.tasks.clear();
1316 ex.free_slots.clear();
1317 ex.next_task_id = 0;
1318 ex.is_flush_scheduled = false;
1319 ex.in_flush = false;
1320 ex.deferred_ops.clear();
1321 ex.deferred_callbacks.clear();
1322 ex.flush_scheduler = None;
1323 ex.time_source = None;
1324 ex.slot_id = 0;
1325 ex.generation = 0;
1326 ex.registered = false;
1327 });
1328 crate::scope::clear_scope_registry();
1329}
1330
1331#[cfg(test)]
1332pub(crate) fn debug_task_count() -> usize {
1333 EXECUTOR.with(|exec| exec.borrow().tasks.iter().filter(|t| t.is_some()).count())
1334}
1335
1336/// Return a snapshot of all active tasks: `(task_id, priority, scope_id)`.
1337#[cfg(feature = "debug")]
1338pub(crate) fn debug_task_snapshot() -> Vec<(TaskId, Priority, u64)> {
1339 EXECUTOR.with(|exec| {
1340 let ex = exec.borrow();
1341 let mut snap = Vec::new();
1342 for (idx, slot) in ex.tasks.iter().enumerate() {
1343 if let Some(ref t) = slot {
1344 snap.push((idx as u64, t.priority, t.scope_id));
1345 }
1346 }
1347 snap
1348 })
1349}
1350
1351/// Return the set of task IDs currently in the ready queues.
1352#[cfg(feature = "debug")]
1353pub(crate) fn debug_queued_task_ids() -> Vec<TaskId> {
1354 EXECUTOR.with(|exec| {
1355 let ex = exec.borrow();
1356 let mut ids: Vec<TaskId> = ex
1357 .high_queue
1358 .iter()
1359 .chain(ex.low_queue.iter())
1360 .copied()
1361 .collect();
1362 ids.sort_unstable();
1363 ids.dedup();
1364 ids
1365 })
1366}
1367
1368/// Spawn a task without triggering an automatic flush.
1369/// Used in tests to batch multiple spawns before executing them.
1370#[cfg(test)]
1371pub(crate) fn spawn_no_auto_flush(
1372 priority: Priority,
1373 future: impl Future<Output = ()> + 'static,
1374) -> TaskId {
1375 EXECUTOR.with(|exec| {
1376 let mut ex = exec.borrow_mut();
1377 let task_id = ex.allocate_id();
1378 ex.tasks[task_id as usize] = Some(TaskState {
1379 future: Box::pin(future),
1380 priority,
1381 scope_id: 0,
1382 timer_deadline: 0,
1383 });
1384 ex.enqueue(task_id);
1385 // Do NOT schedule flush.
1386 task_id
1387 })
1388}
1389
1390/// Run a manual flush cycle (for tests that need to control timing).
1391#[cfg(test)]
1392pub(crate) fn flush_all() {
1393 flush();
1394}