Skip to main content

nexus_async_rt/
lib.rs

1//! Single-threaded async runtime.
2//!
3//! Two spawn strategies:
4//! - **`spawn_boxed()`** — Box-allocated. Default. No setup needed.
5//! - **`spawn_slab()`** — Slab-allocated. Pre-allocated, zero-alloc
6//!   hot path. Requires slab configured via [`RuntimeBuilder::slab_unbounded`] or [`RuntimeBuilder::slab_bounded`].
7//!
8//! ```ignore
9//! use nexus_async_rt::*;
10//! use nexus_slab::byte::unbounded::Slab;
11//! use nexus_rt::WorldBuilder;
12//!
13//! let mut world = WorldBuilder::new().build();
14//!
15//! // Simple — Box-allocated tasks, no slab setup
16//! let mut rt = Runtime::new(&mut world);
17//! rt.block_on(async {
18//!     spawn_boxed(async { /* Box-allocated */ });
19//! });
20//!
21//! // Power user — with slab for hot-path tasks
22//! // SAFETY: single-threaded runtime.
23//! let slab = unsafe { Slab::<256>::with_chunk_capacity(64) };
24//! let mut rt = Runtime::builder(&mut world)
25//!     .slab_unbounded(slab)
26//!     .build();
27//! rt.block_on(async {
28//!     spawn_boxed(async { /* Box-allocated, long-lived */ });
29//!     spawn_slab(async { /* slab-allocated, hot path */ });
30//! });
31//! ```
32
33// Single-threaded runtime — futures are intentionally !Send.
34#![allow(clippy::future_not_send)]
35#![cfg(unix)]
36#![warn(missing_docs)]
37
38mod alloc;
39mod backoff;
40mod cancel;
41pub mod channel;
42mod context;
43pub(crate) mod cross_wake;
44mod io;
45pub mod net;
46mod runtime;
47mod shutdown;
48mod task;
49mod timer;
50#[cfg(feature = "tokio-compat")]
51pub mod tokio_compat;
52#[cfg(feature = "tokio-compat")]
53pub use tokio_compat::{TokioJoinError, TokioJoinHandle, spawn_on_tokio};
54mod waker;
55mod world_ctx;
56
57// Re-export slab type for convenience — users create the slab and hand it to the builder.
58pub use alloc::SlabClaim;
59pub use backoff::{Backoff, BackoffBuilder, Exhausted};
60pub use cancel::{CancellationToken, DropGuard};
61pub use context::{
62    after, after_delay, event_time, interval, interval_at, sleep, sleep_until, timeout, timeout_at,
63    yield_now,
64};
65pub use io::IoHandle;
66pub use net::{
67    AsyncRead, AsyncWrite, OwnedReadHalf, OwnedWriteHalf, ReadHalf, TcpListener, TcpSocket,
68    TcpStream, UdpSocket, WriteHalf,
69};
70pub use nexus_slab::byte::unbounded::Slab as ByteSlab;
71pub use runtime::{
72    QuiesceTimeout, Runtime, RuntimeBuilder, claim_slab, spawn_boxed, spawn_slab, try_claim_slab,
73};
74// `ShutdownStats` is the snapshot type users match on. `ShutdownStatsAtomics`
75// is the Arc-shared inner that survives Runtime drop — `Runtime::shutdown_stats`
76// returns `Arc<ShutdownStatsAtomics>` and users call `.snapshot()` to get a
77// plain `ShutdownStats`.
78pub use shutdown::{ShutdownHandle, ShutdownSignal};
79pub use task::{JoinHandle, TASK_HEADER_SIZE};
80pub use timer::{Elapsed, Interval, MissedTickBehavior, Sleep, Timeout, TimerHandle, YieldNow};
81pub use world_ctx::WorldCtx;
82
83use std::future::Future;
84use std::task::{Context, Poll};
85
86use waker::set_poll_context;
87
88/// Recommended minimum slab slot size.
89///
90/// The actual minimum depends on the task: header (72 bytes) + `max(size_of::<F>(),
91/// size_of::<T>())`. ZST futures need only 72 bytes. 128 is a conservative default
92/// that covers most small futures.
93pub const MIN_SLOT_SIZE: usize = 128;
94
95// =============================================================================
96// Executor
97// =============================================================================
98
99/// Single-threaded async executor.
100///
101/// Manages task lifecycle: spawn, poll, complete, free. Tasks are
102/// allocated via Box (default) or slab (via `spawn_slab`). Each
103/// task's header contains a `free_fn` that knows how to deallocate
104/// its own storage — the executor doesn't know or care which
105/// allocator was used.
106/// # UnsafeCell on `incoming` and `deferred_free`
107///
108/// These fields are wrapped in `UnsafeCell` to prevent a provenance
109/// aliasing violation. During `poll()`, raw pointers to these Vecs are
110/// stored in TLS for wakers to push into. Later in the same `poll()`,
111/// `complete_task(&mut self)` takes `&mut self` — which under Rust's
112/// aliasing rules asserts exclusive access to ALL fields. Without
113/// `UnsafeCell`, this invalidates the TLS pointers because two `&mut`
114/// paths to the same memory exist. `UnsafeCell` opts these fields out
115/// of `&mut`'s exclusivity guarantee, telling the compiler they may be
116/// accessed through other paths (the TLS raw pointers).
117///
118/// This is NOT a performance concern — `UnsafeCell` is zero-sized and
119/// `get()` compiles to a no-op pointer cast. The only effect is that
120/// the compiler won't optimize based on exclusive access to these fields.
121pub struct Executor {
122    /// Incoming ready tasks. Wakers and spawn push here.
123    /// Swapped with `draining` at the start of each poll cycle.
124    ///
125    /// Wrapped in `UnsafeCell` because raw pointers to this Vec are stored
126    /// in TLS during `poll()`. Without `UnsafeCell`, `&mut self` on methods
127    /// like `complete_task` would invalidate the TLS pointer's provenance
128    /// (exclusive `&mut` covers all non-UnsafeCell fields).
129    incoming: std::cell::UnsafeCell<Vec<*mut u8>>,
130
131    /// Tasks being drained this cycle. Iterated linearly.
132    /// Does NOT need UnsafeCell — only accessed through `&mut self` in poll().
133    draining: Vec<*mut u8>,
134
135    /// All live task pointers. Slab-indexed for O(1) removal.
136    all_tasks: slab::Slab<*mut u8>,
137
138    /// Number of live tasks.
139    live_count: usize,
140
141    /// Maximum tasks to poll per cycle before yielding to IO.
142    tasks_per_cycle: usize,
143
144    /// Completed task slots awaiting deferred free.
145    ///
146    /// Same UnsafeCell rationale as `incoming` — TLS pointer stored during poll.
147    deferred_free: std::cell::UnsafeCell<Vec<*mut u8>>,
148
149    /// Atomic counters for abnormal-shutdown paths. Surfaced via
150    /// [`Runtime::shutdown_stats`](crate::Runtime::shutdown_stats),
151    /// which returns an `Arc` clone so users can read AFTER Runtime
152    /// drop (the counters fire DURING `Executor::drop`; pre-drop
153    /// snapshots always read zero). Per CALLOUT 5 of PR 2's plan,
154    /// these paths increment counters ONLY — no `eprintln!`/`tracing`
155    /// in new paths. PR 1a's existing eprintlns in the
156    /// slab-unwinding-abort path stay (only signal at moment of
157    /// process abort).
158    shutdown_stats: std::sync::Arc<ShutdownStatsAtomics>,
159
160    /// Cross-wake context, set by Runtime via [`Executor::install_cross_wake_for_drop`]
161    /// after construction. `Executor::drop` uses it to drain the
162    /// cross-thread queue at shutdown end and tally
163    /// `cross_queue_undrained`. `None` for bare `Executor` use in
164    /// tests (no Runtime, no cross-queue inspection at drop).
165    cross_wake_for_drop: Option<std::sync::Arc<crate::cross_wake::CrossWakeContext>>,
166}
167
168/// Atomic counters backing [`ShutdownStats`]. Written by `Executor`,
169/// readable via the handle returned by
170/// [`Runtime::shutdown_stats`](crate::Runtime::shutdown_stats).
171///
172/// Atomics are used (not `Cell`) so the user-facing handle can survive
173/// `Runtime::drop` and be read on the same thread post-drop. All
174/// updates use `Relaxed` ordering — the counters are observability,
175/// not synchronization.
176#[derive(Default, Debug)]
177pub struct ShutdownStatsAtomics {
178    aborted_unwinds: std::sync::atomic::AtomicU64,
179    leaked_box_tasks: std::sync::atomic::AtomicU64,
180    unbalanced_normal_shutdowns: std::sync::atomic::AtomicU64,
181    cross_queue_undrained: std::sync::atomic::AtomicU64,
182}
183
184impl ShutdownStatsAtomics {
185    /// Snapshot the current counter values into a plain
186    /// [`ShutdownStats`]. Loads are `Relaxed` — observability, not
187    /// synchronization.
188    pub fn snapshot(&self) -> ShutdownStats {
189        use std::sync::atomic::Ordering;
190        ShutdownStats {
191            aborted_unwinds: self.aborted_unwinds.load(Ordering::Relaxed),
192            leaked_box_tasks: self.leaked_box_tasks.load(Ordering::Relaxed),
193            unbalanced_normal_shutdowns: self.unbalanced_normal_shutdowns.load(Ordering::Relaxed),
194            cross_queue_undrained: self.cross_queue_undrained.load(Ordering::Relaxed),
195        }
196    }
197}
198
199/// Counters for abnormal-shutdown paths. Snapshot returned by
200/// [`Runtime::shutdown_stats`](crate::Runtime::shutdown_stats).
201///
202/// All counters are `0` for a clean shutdown. Any non-zero counter is a
203/// signal to investigate — the runtime hit a defensive code path that
204/// should be unreachable in normal operation. Users own their
205/// observability stack; the runtime emits no logs of its own (per
206/// PR 2's design — see `ShutdownStats` doc-comment for the user
207/// pattern).
208///
209/// # Example
210///
211/// ```ignore
212/// let handle = runtime.shutdown_stats();   // Arc<ShutdownStatsAtomics>
213/// drop(runtime);                            // counters fire during drop
214/// let stats = handle.snapshot();            // plain ShutdownStats for matching
215/// if stats.aborted_unwinds != 0
216///     || stats.leaked_box_tasks != 0
217///     || stats.unbalanced_normal_shutdowns != 0
218///     || stats.cross_queue_undrained != 0
219/// {
220///     // user's own observability — log to wherever they want
221///     my_logger::warn!("nexus runtime shutdown: {stats:?}");
222/// }
223/// ```
224#[derive(Default, Debug, Clone, Copy)]
225pub struct ShutdownStats {
226    /// `Executor::drop` hit the slab-unwinding 100ms-wait-then-abort
227    /// path. Indicates a producer thread held a slab task ref past
228    /// Runtime drop during a panic. **The process aborted before this
229    /// counter could be read** — non-zero means a previous run aborted
230    /// (the counter is preserved across the abort by being stored in
231    /// the executor's state, but reading it requires the runtime to
232    /// have survived; in practice this counter is set just before
233    /// abort and serves as a guarantee the abort path was hit if the
234    /// runtime somehow survived).
235    pub aborted_unwinds: u64,
236    /// Box-allocated tasks the executor couldn't free during shutdown
237    /// unwinding (outstanding cross-thread refs, leaked to avoid
238    /// double-panic). Memory leak, not UAF. Box memory is reclaimed
239    /// at process exit.
240    pub leaked_box_tasks: u64,
241    /// Normal shutdown (no panic in flight) found an `all_tasks` entry
242    /// with `rc > 0`. Debug builds panic. Release builds eprintln +
243    /// leak. Indicates a producer didn't release refs before Runtime
244    /// drop — call [`Runtime::shutdown_quiesce`](crate::Runtime::shutdown_quiesce)
245    /// before drop to surface this as an `Err` instead.
246    pub unbalanced_normal_shutdowns: u64,
247    /// Cross-thread queue entries that landed after Runtime drop and
248    /// were never drained (the leak path inherited from PR 1a's
249    /// dispose_terminal off-thread branch). Pure memory leak.
250    pub cross_queue_undrained: u64,
251}
252
253/// Default poll limit.
254const DEFAULT_TASKS_PER_CYCLE: usize = 64;
255
256impl Executor {
257    /// Create an executor.
258    pub fn new(initial_capacity: usize) -> Self {
259        Self {
260            incoming: std::cell::UnsafeCell::new(Vec::with_capacity(initial_capacity)),
261            draining: Vec::with_capacity(initial_capacity),
262            all_tasks: slab::Slab::with_capacity(initial_capacity),
263            live_count: 0,
264            tasks_per_cycle: DEFAULT_TASKS_PER_CYCLE,
265            shutdown_stats: std::sync::Arc::new(ShutdownStatsAtomics::default()),
266            cross_wake_for_drop: None,
267            deferred_free: std::cell::UnsafeCell::new(Vec::new()),
268        }
269    }
270
271    /// Reserve a tracker key for external allocation (slab spawn).
272    pub(crate) fn next_tracker_key(&self) -> u32 {
273        let key = self.all_tasks.vacant_key();
274        debug_assert!(
275            u32::try_from(key).is_ok(),
276            "more than 4 billion concurrent tasks — tracker_key overflow"
277        );
278        key as u32
279    }
280
281    /// Spawn an async task via Box allocation. Returns a [`JoinHandle`]
282    /// that can be awaited for the task's output.
283    pub fn spawn_boxed<F>(&mut self, future: F) -> task::JoinHandle<F::Output>
284    where
285        F: Future + 'static,
286        F::Output: 'static,
287    {
288        let tracker_key = self.all_tasks.vacant_key();
289        debug_assert!(
290            u32::try_from(tracker_key).is_ok(),
291            "more than 4 billion concurrent tasks — tracker_key overflow"
292        );
293        // Read the runtime's cross-wake context from TLS — installed at
294        // RuntimeBuilder::build, lifetime of Runtime. Null when no
295        // Runtime is alive (e.g., direct Executor use in tests); the
296        // task header's cross_wake_ctx becomes null and dispose_terminal
297        // routes those tasks via its null-ctx fallback.
298        let cross_wake_ctx = crate::cross_wake::current_runtime_ctx();
299        let ptr = task::box_spawn_joinable(future, tracker_key as u32, cross_wake_ctx);
300
301        self.enqueue(ptr);
302        task::JoinHandle::new(ptr)
303    }
304
305    /// Spawn a task with a pre-allocated pointer (from slab).
306    ///
307    /// The task at `ptr` must have been constructed with joinable or
308    /// fire-and-forget constructors and a valid `free_fn`.
309    pub(crate) fn spawn_raw(&mut self, ptr: *mut u8) {
310        self.enqueue(ptr);
311    }
312
313    /// Common enqueue logic for spawn and spawn_raw.
314    fn enqueue(&mut self, ptr: *mut u8) {
315        self.all_tasks.insert(ptr);
316        unsafe { task::set_queued(ptr, true) };
317        // SAFETY: single-threaded, no concurrent access during enqueue.
318        unsafe { &mut *self.incoming.get() }.push(ptr);
319        self.live_count += 1;
320    }
321
322    /// Drain the cross-thread wake inbox into the local ready queue.
323    ///
324    /// Called at the start of each poll cycle. Tasks pushed from other
325    /// threads via `CrossWakeQueue::push` are moved into `incoming`.
326    /// Completed tasks are routed to `deferred_free` instead — they
327    /// were pushed for cleanup (not re-polling) by `cross_task_drop`.
328    /// Drains at most `limit` tasks (remaining are picked up next cycle).
329    pub(crate) fn drain_cross_thread(
330        &mut self,
331        inbox: &crate::cross_wake::CrossWakeQueue,
332        limit: usize,
333    ) -> usize {
334        let mut drained = 0;
335        while drained < limit {
336            match inbox.pop() {
337                Some(task_ptr) => {
338                    // Clear QUEUED flag now that we've popped it.
339                    unsafe { task::clear_queued(task_ptr) };
340
341                    // Check if TERMINAL was reached (e.g., cross-thread waker
342                    // produced TERMINAL via ref_dec while the task was queued).
343                    // Only TERMINAL tasks go to deferred_free. Completed tasks
344                    // with outstanding refs must NOT be freed prematurely.
345                    if unsafe { task::is_terminal(task_ptr) } {
346                        unsafe { &mut *self.deferred_free.get() }.push(task_ptr);
347                    } else {
348                        unsafe { &mut *self.incoming.get() }.push(task_ptr);
349                    }
350                    drained += 1;
351                }
352                None => break,
353            }
354        }
355        drained
356    }
357
358    /// Poll all ready tasks once.
359    pub fn poll(&mut self) -> usize {
360        let mut completed = 0;
361
362        // Drain deferred frees from last cycle.
363        // SAFETY: single-threaded, TLS not yet set for this cycle.
364        for ptr in unsafe { &mut *self.deferred_free.get() }.drain(..) {
365            let key = unsafe { task::tracker_key(ptr) } as usize;
366            // SAFETY: free_fn was set at spawn time.
367            unsafe { task::free_task(ptr) };
368            if self.all_tasks.contains(key) {
369                self.all_tasks.remove(key);
370            }
371        }
372
373        // SAFETY: single-threaded, swapping before TLS is set.
374        std::mem::swap(unsafe { &mut *self.incoming.get() }, &mut self.draining);
375
376        // Derive TLS pointers from UnsafeCell — NOT from &mut self field borrows.
377        // This is critical: complete_task(&mut self) later in this function must
378        // not invalidate the TLS pointers. UnsafeCell fields are excluded from
379        // &mut self's exclusivity guarantee.
380        let _guard = set_poll_context(self.incoming.get(), self.deferred_free.get());
381
382        let limit = self.tasks_per_cycle.min(self.draining.len());
383        let draining_ptr: *const Vec<*mut u8> = &raw const self.draining;
384        let drain_slice = unsafe { &(&*draining_ptr)[..limit] };
385
386        for &ptr in drain_slice {
387            if unsafe { task::is_completed(ptr) } {
388                continue;
389            }
390
391            unsafe { task::set_queued(ptr, false) };
392
393            // SAFETY: ptr is a live task, ref_count >= 1 (executor holds a ref).
394            // task_waker increments ref_count; drop after poll decrements it.
395            let waker = unsafe { crate::waker::task_waker(ptr) };
396            let mut cx = Context::from_waker(&waker);
397
398            let poll_result = unsafe { task::poll_task(ptr, &mut cx) };
399
400            drop(waker);
401
402            match poll_result {
403                Poll::Pending => {}
404                Poll::Ready(()) => {
405                    self.complete_task(ptr);
406                    completed += 1;
407                }
408            }
409        }
410
411        if limit < self.draining.len() {
412            // SAFETY: single-threaded, TLS guard is about to drop.
413            unsafe { &mut *self.incoming.get() }.extend_from_slice(&self.draining[limit..]);
414        }
415        self.draining.clear();
416
417        completed
418    }
419
420    /// Number of live tasks.
421    pub fn task_count(&self) -> usize {
422        self.live_count
423    }
424
425    /// Number of tasks tracked in the executor's `all_tasks` slab.
426    /// Includes COMPLETED-but-still-referenced tasks (a `JoinHandle`
427    /// or cross-thread waker holds a ref) — distinguishing it from
428    /// `task_count()` which decrements `live_count` unconditionally on
429    /// completion.
430    ///
431    /// `shutdown_quiesce` uses this for its quiesce check: a task that
432    /// completed but has outstanding refs WILL fire one of the
433    /// abnormal-shutdown branches in `Executor::drop` (debug-panic
434    /// "outstanding references" or release-eprintln + counter
435    /// increment). Quiesce-as-`Ok` requires `all_tasks` to be empty,
436    /// not just `live_count == 0`. (PR2-John-review item 2.)
437    pub(crate) fn outstanding_tasks(&self) -> usize {
438        self.all_tasks.len()
439    }
440
441    /// Number of completed task slots awaiting deferred free.
442    #[cfg(test)]
443    pub fn deferred_free_count(&self) -> usize {
444        // SAFETY: single-threaded, read-only snapshot.
445        unsafe { &*self.deferred_free.get() }.len()
446    }
447
448    /// Returns an Arc handle to the shutdown counters. Callers can
449    /// hold it past Runtime drop to read final values via
450    /// [`ShutdownStatsAtomics::snapshot`].
451    pub(crate) fn shutdown_stats(&self) -> std::sync::Arc<ShutdownStatsAtomics> {
452        std::sync::Arc::clone(&self.shutdown_stats)
453    }
454
455    /// Counter increments for the abnormal-shutdown branches.
456    /// Per CALLOUT 5 of PR 2's plan: counter-only — no eprintln,
457    /// no tracing, no log calls. Users own their observability.
458    fn record_aborted_unwind(&self) {
459        self.shutdown_stats
460            .aborted_unwinds
461            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
462    }
463
464    fn record_leaked_box(&self) {
465        self.shutdown_stats
466            .leaked_box_tasks
467            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
468    }
469
470    fn record_unbalanced_normal(&self) {
471        self.shutdown_stats
472            .unbalanced_normal_shutdowns
473            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
474    }
475
476    /// Add `count` to the `cross_queue_undrained` counter. Called from
477    /// `Executor::drop` after the all_tasks loop, when the cross-thread
478    /// queue's tail-end is drained for the diagnostic count.
479    fn record_cross_queue_undrained(&self, count: u64) {
480        self.shutdown_stats
481            .cross_queue_undrained
482            .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
483    }
484
485    /// Wire the runtime's cross-wake context into the executor so
486    /// `Executor::drop` can drain + count the cross-thread queue at
487    /// shutdown end. Called by `RuntimeBuilder::build` after both
488    /// `Executor::new` and `Arc::new(CrossWakeContext { ... })`.
489    pub(crate) fn install_cross_wake_for_drop(
490        &mut self,
491        cross_wake: std::sync::Arc<crate::cross_wake::CrossWakeContext>,
492    ) {
493        self.cross_wake_for_drop = Some(cross_wake);
494    }
495
496    /// Returns `true` if any tasks are queued for polling.
497    pub fn has_ready(&self) -> bool {
498        // SAFETY: single-threaded, read-only snapshot.
499        !unsafe { &*self.incoming.get() }.is_empty()
500    }
501
502    /// Set the maximum tasks to poll per cycle.
503    pub fn set_tasks_per_cycle(&mut self, limit: usize) {
504        self.tasks_per_cycle = limit;
505    }
506
507    /// Complete a task: handle joinable vs fire-and-forget paths.
508    ///
509    /// Uses `complete_and_unref` to atomically set COMPLETED and decrement
510    /// the executor's reference in a single atomic operation — eliminating
511    /// the race window that caused SIGABRT with cross-thread wakers.
512    ///
513    /// Three branches based on task state:
514    /// - **Aborted:** drop F (still live — poll_join short-circuited), notify joiner
515    /// - **Joinable (HAS_JOIN):** T is live in the union, don't touch it — JoinHandle owns it
516    /// - **Fire-and-forget / detached:** drop the value (F or T) and free
517    ///
518    /// # Safety invariants
519    ///
520    /// `ptr` must point to a task that just returned `Poll::Ready(())` from poll_task.
521    fn complete_task(&mut self, ptr: *mut u8) {
522        let aborted = unsafe { task::is_aborted(ptr) };
523
524        if aborted {
525            // Aborted: poll_join saw ABORTED and returned Ready without polling F.
526            // F is still live in the union. drop_fn still targets F.
527            unsafe { task::drop_task_future(ptr) };
528            self.live_count -= 1;
529
530            if unsafe { task::has_join(ptr) } {
531                let waker = unsafe { task::take_join_waker(ptr) };
532                if let Some(w) = waker {
533                    w.wake();
534                }
535            }
536
537            match unsafe { task::complete_and_unref(ptr) } {
538                task::FreeAction::Retain => {}
539                task::FreeAction::FreeBox | task::FreeAction::FreeSlab => {
540                    let key = unsafe { task::tracker_key(ptr) } as usize;
541                    unsafe { task::free_task(ptr) };
542                    self.all_tasks.remove(key);
543                }
544            }
545        } else if unsafe { task::has_join(ptr) } {
546            // Joinable: poll_join dropped F and wrote T. drop_fn = drop_output::<T>.
547            // Don't drop T — JoinHandle will read it or drop it on handle drop.
548            self.live_count -= 1;
549
550            // Wake the joiner so it can poll the JoinHandle and read T.
551            let waker = unsafe { task::take_join_waker(ptr) };
552            if let Some(w) = waker {
553                w.wake();
554            }
555
556            match unsafe { task::complete_and_unref(ptr) } {
557                task::FreeAction::Retain => {}
558                task::FreeAction::FreeBox | task::FreeAction::FreeSlab => {
559                    // Terminal — JoinHandle already dropped (detached). Drop output.
560                    unsafe { task::drop_task_future(ptr) };
561                    let key = unsafe { task::tracker_key(ptr) } as usize;
562                    unsafe { task::free_task(ptr) };
563                    self.all_tasks.remove(key);
564                }
565            }
566        } else {
567            // Fire-and-forget or detached (HAS_JOIN cleared by JoinHandle::Drop).
568            unsafe { task::drop_task_future(ptr) };
569            self.live_count -= 1;
570
571            match unsafe { task::complete_and_unref(ptr) } {
572                task::FreeAction::Retain => {}
573                task::FreeAction::FreeBox | task::FreeAction::FreeSlab => {
574                    let key = unsafe { task::tracker_key(ptr) } as usize;
575                    unsafe { task::free_task(ptr) };
576                    self.all_tasks.remove(key);
577                }
578            }
579        }
580    }
581
582    /// Returns raw pointers for TLS setup.
583    ///
584    /// Takes `&self` because `UnsafeCell::get()` only needs a shared reference.
585    /// The raw pointers carry write provenance from the `UnsafeCell`.
586    pub(crate) fn poll_context_ptrs(&self) -> (*mut Vec<*mut u8>, *mut Vec<*mut u8>) {
587        (self.incoming.get(), self.deferred_free.get())
588    }
589
590    /// Cancel a task by ID.
591    #[allow(dead_code)]
592    pub(crate) fn cancel(&mut self, id: task::TaskId) {
593        let ptr = id.0;
594        // Skip if already completed (e.g. double-cancel or cancel after poll).
595        if unsafe { task::is_completed(ptr) } {
596            return;
597        }
598        // SAFETY: single-threaded, no TLS active during cancel.
599        unsafe { &mut *self.incoming.get() }.retain(|p| *p != ptr);
600        self.draining.retain(|p| *p != ptr);
601        self.complete_task(ptr);
602    }
603}
604
605impl Drop for Executor {
606    fn drop(&mut self) {
607        // Step 1 (PR 2 §2.3, fixed in PR2-John-review item 1): drain
608        // the cross-thread queue FIRST, before walking `all_tasks`.
609        //
610        // **Why first.** An off-thread holder dropping a TaskRef
611        // terminal between the runtime's last drain and `Executor::drop`
612        // start enqueues a TERMINAL task pointer in `cross_queue`
613        // (`try_set_queued + push`). The task allocation is alive (we
614        // haven't freed it yet) but rc=0, COMPLETED set, QUEUED set.
615        //
616        // If we walked `all_tasks` BEFORE draining cross_queue:
617        //   - `is_terminal` returns false (QUEUED bit is set, mask
618        //     `INERT_MASK` doesn't clear it).
619        //   - Falls through to the rc=0 branch → `free_task(ptr)`.
620        //   - Step 3's pop then derefs `cross_next` at offset 32 of
621        //     the freed allocation. **UAF.**
622        //
623        // By draining cross_queue first, `drain_cross_thread` clears
624        // QUEUED and routes the terminal entry to `deferred_free`
625        // (state is now just COMPLETED → `is_terminal` returns true
626        // there). Step 2's deferred_free drain frees + removes from
627        // `all_tasks`. Step 3's all_tasks walk no longer sees it.
628        //
629        // Entries that arrive AFTER step 1 (off-thread holder pushes
630        // mid-drop) leave a stale pointer in cross_queue. No one pops
631        // it post-drop (no executor) so no UAF; the leak is bounded
632        // by the lifetime of `Arc<CrossWakeContext>` and the entry
633        // is freed-then-pointer-leaked when the last Arc clone drops.
634        let undrained = self.cross_wake_for_drop.take().map_or(0u64, |ctx| {
635            self.drain_cross_thread(&ctx.queue, usize::MAX) as u64
636        });
637        if undrained > 0 {
638            self.record_cross_queue_undrained(undrained);
639        }
640
641        // Step 2: drain deferred-free (now includes any terminals
642        // routed by step 1's cross-queue drain). Updates `all_tasks`
643        // bookkeeping in the right order (read tracker_key BEFORE
644        // free_task).
645        self.drop_drain_deferred_free();
646
647        // Step 3: walk surviving tasks. Each task hits one of four
648        // branches: TERMINAL (free directly), not-completed (try to
649        // complete + maybe free), outstanding-refs (route to unwinding
650        // or normal-shutdown handlers), or zero-refs (free).
651        for (_, &ptr) in &self.all_tasks {
652            if unsafe { task::is_terminal(ptr) } {
653                // TERMINAL: completed, zero refs, all flags cleared.
654                // Happens when a cross-thread waker produced TERMINAL
655                // via ref_dec but the executor hadn't scanned yet.
656                unsafe { task::free_task(ptr) };
657                continue;
658            }
659
660            if !unsafe { task::is_completed(ptr) } && Self::drop_complete_and_maybe_free(ptr) {
661                continue;
662            }
663
664            let rc = unsafe { task::ref_count(ptr) };
665            if rc > 0 {
666                if std::thread::panicking() {
667                    self.drop_outstanding_unwinding(ptr, rc);
668                } else {
669                    self.drop_outstanding_normal(ptr, rc);
670                }
671                continue;
672            }
673
674            unsafe { task::free_task(ptr) };
675        }
676    }
677}
678
679impl Executor {
680    /// Drop step 1: drain deferred-free entries from the last poll
681    /// cycle (or accumulated since one). Each entry is a completed
682    /// task whose final ref dropped after the last poll cycle's drain
683    /// ran; we own them and must free the storage + remove from
684    /// `all_tasks`. The order (read tracker_key, then free_task, then
685    /// remove key) matters because tracker_key reads from the task
686    /// header — must happen before the allocation is freed.
687    ///
688    /// SAFETY: `&mut self` in Drop, no concurrent access.
689    fn drop_drain_deferred_free(&mut self) {
690        for ptr in unsafe { &mut *self.deferred_free.get() }.drain(..) {
691            let key = unsafe { task::tracker_key(ptr) } as usize;
692            unsafe { task::free_task(ptr) };
693            if self.all_tasks.contains(key) {
694                self.all_tasks.remove(key);
695            }
696        }
697    }
698
699    /// Drop step 2 / branch B: task hasn't completed yet. Drop its
700    /// future (running its destructors — Aeron publishers, sockets,
701    /// file handles all release here), then atomically set COMPLETED +
702    /// decrement the executor's ref. Returns true if the resulting
703    /// state is terminal (we freed the slot) — caller `continue`s.
704    /// Returns false when the task still has cross-thread refs and
705    /// the caller falls through to the rc-check.
706    ///
707    /// SAFETY: caller guarantees `ptr` references a not-yet-completed
708    /// task with the executor's ref still held.
709    fn drop_complete_and_maybe_free(ptr: *mut u8) -> bool {
710        unsafe { task::drop_task_future(ptr) };
711        match unsafe { task::complete_and_unref(ptr) } {
712            task::FreeAction::Retain => false,
713            task::FreeAction::FreeBox | task::FreeAction::FreeSlab => {
714                unsafe { task::free_task(ptr) };
715                true
716            }
717        }
718    }
719
720    /// Drop step 2 / branch C+D: task completed but has outstanding
721    /// cross-thread refs, and we're mid-unwind. Behavior splits by
722    /// allocation type:
723    ///
724    /// - **Slab task**: wait up to 100ms for refs to settle (producer
725    ///   threads may be racing to release). If settled, free cleanly.
726    ///   If not, abort — leaking would UAF when `_slab_guard` releases
727    ///   the slab backing storage after `Executor::drop` returns.
728    /// - **Box task**: leak. The Box sits in process memory until
729    ///   process exit; outstanding cross-thread refs that later run
730    ///   `ref_dec` see valid memory.
731    ///
732    /// The eprintln!s in this branch are PR 1a's existing signals —
733    /// they stay (per CALLOUT 5 of PR 2's plan, removable post-§2.4
734    /// once `shutdown_quiesce` makes this branch unreachable in
735    /// normal operation). The slab and box helpers each increment
736    /// the relevant `ShutdownStats` counter (`aborted_unwinds` /
737    /// `leaked_box_tasks`).
738    ///
739    /// SAFETY: caller guarantees `ptr` references a completed task
740    /// with rc > 0, called during unwind.
741    fn drop_outstanding_unwinding(&self, ptr: *mut u8, rc: usize) {
742        if unsafe { task::is_slab_allocated(ptr) } {
743            self.drop_outstanding_slab_unwinding(ptr);
744        } else {
745            self.drop_outstanding_box_unwinding(ptr, rc);
746        }
747    }
748
749    /// Slab branch of the unwinding path. See `drop_outstanding_unwinding`
750    /// for context. Increments `aborted_unwinds` counter on the
751    /// abort path (PR 2 §2.3) BEFORE calling `std::process::abort()`
752    /// so a parent process inspecting the runtime's state can see
753    /// the counter via shared memory or memory-mapped logging.
754    fn drop_outstanding_slab_unwinding(&self, ptr: *mut u8) {
755        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(100);
756        while unsafe { task::ref_count(ptr) } > 0 && std::time::Instant::now() < deadline {
757            std::thread::yield_now();
758        }
759        if unsafe { task::ref_count(ptr) } > 0 {
760            // Record before the abort — the eprintln stays per CALLOUT 5
761            // (only signal at moment of process abort).
762            self.record_aborted_unwind();
763            eprintln!(
764                "nexus-async-rt: slab task {ptr:p} has \
765                 outstanding refs after 100ms during unwinding \
766                 — aborting to avoid UAF on slab memory \
767                 release. Cross-thread waker producer thread \
768                 may be deadlocked or starved."
769            );
770            std::process::abort();
771        }
772        // Refs settled — free cleanly. Avoid the panic path.
773        unsafe { task::free_task(ptr) };
774    }
775
776    /// Box branch of the unwinding path. See `drop_outstanding_unwinding`
777    /// for context. Leaks the box; safe — outstanding refs see valid
778    /// memory until process exit. Increments `leaked_box_tasks` (PR 2 §2.3).
779    fn drop_outstanding_box_unwinding(&self, _ptr: *mut u8, rc: usize) {
780        self.record_leaked_box();
781        eprintln!(
782            "nexus-async-rt: executor dropped with {rc} outstanding \
783             reference(s) during unwinding — suppressing panic to \
784             avoid abort. Task resources were released via \
785             drop_task_future; leaking box task allocation + waker \
786             bookkeeping memory."
787        );
788    }
789
790    /// Drop step 2 / branch E: task completed but has outstanding
791    /// cross-thread refs, normal shutdown (no panic in flight). This
792    /// indicates a user-side lifetime discipline violation — wakers
793    /// or JoinHandles weren't dropped before the Runtime. Debug builds
794    /// panic to surface the bug; release builds eprintln + leak to
795    /// avoid UB. Increments `unbalanced_normal_shutdowns` (PR 2 §2.3)
796    /// before either path.
797    ///
798    /// SAFETY: caller guarantees `ptr` references a completed task
799    /// with rc > 0, called outside any panic.
800    fn drop_outstanding_normal(&self, _ptr: *mut u8, rc: usize) {
801        self.record_unbalanced_normal();
802        #[cfg(debug_assertions)]
803        panic!(
804            "executor dropped with {rc} outstanding reference(s) — \
805             all wakers and JoinHandles must be dropped before the Runtime"
806        );
807        #[cfg(not(debug_assertions))]
808        eprintln!(
809            "nexus-async-rt: executor dropped with {rc} outstanding task \
810             reference(s) — leaking to avoid UB"
811        );
812    }
813}
814
815// =============================================================================
816// Tests
817// =============================================================================
818
819#[cfg(test)]
820mod tests {
821    use super::*;
822    use std::hint::black_box;
823    use std::pin::Pin;
824    use task::Task;
825
826    fn test_executor() -> Executor {
827        Executor::new(16)
828    }
829
830    // =========================================================================
831    // Basic spawn + poll
832    // =========================================================================
833
834    #[test]
835    fn spawn_and_poll_single_task() {
836        let mut exec = test_executor();
837        let mut done = false;
838        let flag = &raw mut done;
839
840        exec.spawn_boxed(async move {
841            // SAFETY: single-threaded, flag lives on stack.
842            unsafe { *flag = true };
843        });
844
845        assert_eq!(exec.task_count(), 1);
846        let completed = exec.poll();
847        assert_eq!(completed, 1);
848        assert!(done);
849        assert_eq!(exec.task_count(), 0);
850    }
851
852    #[test]
853    fn spawn_multiple_tasks() {
854        let mut exec = test_executor();
855
856        for _ in 0..8 {
857            exec.spawn_boxed(async {});
858        }
859
860        assert_eq!(exec.task_count(), 8);
861        let completed = exec.poll();
862        assert_eq!(completed, 8);
863        assert_eq!(exec.task_count(), 0);
864    }
865
866    // =========================================================================
867    // Pending tasks
868    // =========================================================================
869
870    #[test]
871    fn pending_task_not_completed() {
872        let mut exec = test_executor();
873
874        // A future that is always pending.
875        exec.spawn_boxed(std::future::pending::<()>());
876
877        let completed = exec.poll();
878        assert_eq!(completed, 0);
879        assert_eq!(exec.task_count(), 1);
880    }
881
882    // =========================================================================
883    // Waker: re-queue via wake_by_ref
884    // =========================================================================
885
886    #[test]
887    fn immediate_task_completes() {
888        let mut exec = test_executor();
889
890        exec.spawn_boxed(async {
891            // Immediately ready.
892        });
893
894        let completed = exec.poll();
895        assert_eq!(completed, 1);
896        assert_eq!(exec.task_count(), 0);
897    }
898
899    // =========================================================================
900    // Self-waking task
901    // =========================================================================
902
903    #[test]
904    fn self_waking_task_polled_again() {
905        use std::cell::Cell;
906        use std::rc::Rc;
907
908        let mut exec = test_executor();
909
910        let counter = Rc::new(Cell::new(0u32));
911        let c = counter.clone();
912
913        exec.spawn_boxed(async move {
914            struct SelfWake {
915                counter: Rc<Cell<u32>>,
916            }
917            impl Future for SelfWake {
918                type Output = ();
919                fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
920                    let n = self.counter.get();
921                    self.counter.set(n + 1);
922                    if n < 3 {
923                        cx.waker().wake_by_ref();
924                        Poll::Pending
925                    } else {
926                        Poll::Ready(())
927                    }
928                }
929            }
930            SelfWake { counter: c }.await;
931        });
932
933        // Drain all polls.
934        let mut total = 0;
935        for _ in 0..10 {
936            total += exec.poll();
937            if exec.task_count() == 0 {
938                break;
939            }
940        }
941        assert_eq!(total, 1); // completed once
942        assert_eq!(counter.get(), 4); // polled 4 times
943    }
944
945    // =========================================================================
946    // Cancel
947    // =========================================================================
948
949    #[test]
950    fn abort_task() {
951        let mut exec = test_executor();
952        let handle = exec.spawn_boxed(std::future::pending::<()>());
953
954        assert_eq!(exec.task_count(), 1);
955        assert!(handle.abort()); // was running, handle consumed
956        exec.poll(); // abort takes effect on next poll
957        assert_eq!(exec.task_count(), 0);
958    }
959
960    #[test]
961    fn abort_frees_slot_for_reuse() {
962        let mut exec = test_executor();
963        let handle = exec.spawn_boxed(std::future::pending::<()>());
964        handle.abort(); // consumes handle
965
966        exec.poll(); // process abort + deferred free
967
968        // Should be able to spawn again.
969        exec.spawn_boxed(async {});
970        assert_eq!(exec.task_count(), 1);
971        exec.poll();
972        assert_eq!(exec.task_count(), 0);
973    }
974
975    // =========================================================================
976    // Poll limit (tasks_per_cycle)
977    // =========================================================================
978
979    #[test]
980    fn poll_limit_respected() {
981        let mut exec = test_executor();
982        exec.set_tasks_per_cycle(2);
983
984        for _ in 0..5 {
985            exec.spawn_boxed(async {});
986        }
987
988        // Only 2 polled per cycle.
989        let completed = exec.poll();
990        assert_eq!(completed, 2);
991        assert_eq!(exec.task_count(), 3);
992
993        let completed = exec.poll();
994        assert_eq!(completed, 2);
995        assert_eq!(exec.task_count(), 1);
996
997        let completed = exec.poll();
998        assert_eq!(completed, 1);
999        assert_eq!(exec.task_count(), 0);
1000    }
1001
1002    // =========================================================================
1003    // Stale ready entries after cancel
1004    // =========================================================================
1005
1006    #[test]
1007    fn cancel_with_stale_ready_entry() {
1008        use std::cell::Cell;
1009        use std::rc::Rc;
1010
1011        let mut exec = test_executor();
1012
1013        let polled = Rc::new(Cell::new(false));
1014        let p = polled.clone();
1015
1016        // Spawn a self-waking task.
1017        struct WakeOnce(bool);
1018        impl Future for WakeOnce {
1019            type Output = ();
1020            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1021                if !self.0 {
1022                    self.0 = true;
1023                    cx.waker().wake_by_ref();
1024                    Poll::Pending
1025                } else {
1026                    Poll::Ready(())
1027                }
1028            }
1029        }
1030
1031        let handle = exec.spawn_boxed(WakeOnce(false));
1032
1033        // First poll: sets is_queued again via wake_by_ref.
1034        exec.poll();
1035
1036        // Abort while the task is in the ready queue (consumes handle).
1037        handle.abort();
1038
1039        // Spawn a new task to prove we don't crash on the stale pointer.
1040        exec.spawn_boxed(async move {
1041            p.set(true);
1042        });
1043
1044        exec.poll(); // processes abort + new task
1045        assert!(polled.get());
1046    }
1047
1048    // =========================================================================
1049    // Refcount behavior
1050    // =========================================================================
1051
1052    #[test]
1053    fn refcount_starts_at_one() {
1054        let task = Box::new(Task::new_boxed(async {}, 0));
1055        let ptr = Box::into_raw(task) as *mut u8;
1056        assert_eq!(unsafe { task::ref_count(ptr) }, 1);
1057        unsafe { task::free_task(ptr) };
1058    }
1059
1060    #[test]
1061    fn executor_drop_cleans_up_queued_tasks() {
1062        let mut exec = test_executor();
1063        exec.spawn_boxed(std::future::pending::<()>());
1064        exec.spawn_boxed(std::future::pending::<()>());
1065        exec.poll(); // poll them once
1066        // Drop executor — should free all tasks without panic.
1067        drop(exec);
1068    }
1069
1070    // =========================================================================
1071    // Dispatch latency (rough, not controlled)
1072    // =========================================================================
1073
1074    #[test]
1075    #[ignore]
1076    fn dispatch_latency() {
1077        use std::time::Instant;
1078
1079        struct Noop;
1080        impl Future for Noop {
1081            type Output = ();
1082            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1083                cx.waker().wake_by_ref();
1084                Poll::Pending
1085            }
1086        }
1087
1088        let mut exec = test_executor();
1089        exec.spawn_boxed(Noop);
1090
1091        // Warmup.
1092        for _ in 0..10_000 {
1093            exec.poll();
1094        }
1095
1096        let iters = 100_000;
1097        let start = Instant::now();
1098        for _ in 0..iters {
1099            exec.poll();
1100        }
1101        let elapsed = start.elapsed();
1102        let ns_per = elapsed.as_nanos() / iters;
1103        println!("dispatch: {ns_per} ns/poll (Box-allocated)");
1104        black_box(ns_per);
1105    }
1106}