nexus_async_rt/lib.rs
1//! Single-threaded async runtime.
2//!
3//! Two spawn strategies:
4//! - **`spawn_boxed()`** — Box-allocated. Default. No setup needed.
5//! - **`spawn_slab()`** — Slab-allocated. Pre-allocated, zero-alloc
6//! hot path. Requires slab configured via [`RuntimeBuilder::slab_unbounded`] or [`RuntimeBuilder::slab_bounded`].
7//!
8//! ```ignore
9//! use nexus_async_rt::*;
10//! use nexus_slab::byte::unbounded::Slab;
11//! use nexus_rt::WorldBuilder;
12//!
13//! let mut world = WorldBuilder::new().build();
14//!
15//! // Simple — Box-allocated tasks, no slab setup
16//! let mut rt = Runtime::new(&mut world);
17//! rt.block_on(async {
18//! spawn_boxed(async { /* Box-allocated */ });
19//! });
20//!
21//! // Power user — with slab for hot-path tasks
22//! // SAFETY: single-threaded runtime.
23//! let slab = unsafe { Slab::<256>::with_chunk_capacity(64) };
24//! let mut rt = Runtime::builder(&mut world)
25//! .slab_unbounded(slab)
26//! .build();
27//! rt.block_on(async {
28//! spawn_boxed(async { /* Box-allocated, long-lived */ });
29//! spawn_slab(async { /* slab-allocated, hot path */ });
30//! });
31//! ```
32
33// Single-threaded runtime — futures are intentionally !Send.
34#![allow(clippy::future_not_send)]
35#![cfg(unix)]
36#![warn(missing_docs)]
37
38mod alloc;
39mod backoff;
40mod cancel;
41pub mod channel;
42mod context;
43pub(crate) mod cross_wake;
44mod io;
45pub mod net;
46mod runtime;
47mod shutdown;
48mod task;
49mod timer;
50#[cfg(feature = "tokio-compat")]
51pub mod tokio_compat;
52#[cfg(feature = "tokio-compat")]
53pub use tokio_compat::{TokioJoinError, TokioJoinHandle, spawn_on_tokio};
54mod waker;
55mod world_ctx;
56
57// Re-export slab type for convenience — users create the slab and hand it to the builder.
58pub use alloc::SlabClaim;
59pub use backoff::{Backoff, BackoffBuilder, Exhausted};
60pub use cancel::{CancellationToken, DropGuard};
61pub use context::{
62 after, after_delay, event_time, interval, interval_at, sleep, sleep_until, timeout, timeout_at,
63 yield_now,
64};
65pub use io::IoHandle;
66pub use net::{
67 AsyncRead, AsyncWrite, OwnedReadHalf, OwnedWriteHalf, ReadHalf, TcpListener, TcpSocket,
68 TcpStream, UdpSocket, WriteHalf,
69};
70pub use nexus_slab::byte::unbounded::Slab as ByteSlab;
71pub use runtime::{
72 QuiesceTimeout, Runtime, RuntimeBuilder, claim_slab, spawn_boxed, spawn_slab, try_claim_slab,
73};
74// `ShutdownStats` is the snapshot type users match on. `ShutdownStatsAtomics`
75// is the Arc-shared inner that survives Runtime drop — `Runtime::shutdown_stats`
76// returns `Arc<ShutdownStatsAtomics>` and users call `.snapshot()` to get a
77// plain `ShutdownStats`.
78pub use shutdown::{ShutdownHandle, ShutdownSignal};
79pub use task::{JoinHandle, TASK_HEADER_SIZE};
80pub use timer::{Elapsed, Interval, MissedTickBehavior, Sleep, Timeout, TimerHandle, YieldNow};
81pub use world_ctx::WorldCtx;
82
83use std::future::Future;
84use std::task::{Context, Poll};
85
86use waker::set_poll_context;
87
88/// Recommended minimum slab slot size.
89///
90/// The actual minimum depends on the task: header (72 bytes) + `max(size_of::<F>(),
91/// size_of::<T>())`. ZST futures need only 72 bytes. 128 is a conservative default
92/// that covers most small futures.
93pub const MIN_SLOT_SIZE: usize = 128;
94
95// =============================================================================
96// Executor
97// =============================================================================
98
99/// Single-threaded async executor.
100///
101/// Manages task lifecycle: spawn, poll, complete, free. Tasks are
102/// allocated via Box (default) or slab (via `spawn_slab`). Each
103/// task's header contains a `free_fn` that knows how to deallocate
104/// its own storage — the executor doesn't know or care which
105/// allocator was used.
106/// # UnsafeCell on `incoming` and `deferred_free`
107///
108/// These fields are wrapped in `UnsafeCell` to prevent a provenance
109/// aliasing violation. During `poll()`, raw pointers to these Vecs are
110/// stored in TLS for wakers to push into. Later in the same `poll()`,
111/// `complete_task(&mut self)` takes `&mut self` — which under Rust's
112/// aliasing rules asserts exclusive access to ALL fields. Without
113/// `UnsafeCell`, this invalidates the TLS pointers because two `&mut`
114/// paths to the same memory exist. `UnsafeCell` opts these fields out
115/// of `&mut`'s exclusivity guarantee, telling the compiler they may be
116/// accessed through other paths (the TLS raw pointers).
117///
118/// This is NOT a performance concern — `UnsafeCell` is zero-sized and
119/// `get()` compiles to a no-op pointer cast. The only effect is that
120/// the compiler won't optimize based on exclusive access to these fields.
121pub struct Executor {
122 /// Incoming ready tasks. Wakers and spawn push here.
123 /// Swapped with `draining` at the start of each poll cycle.
124 ///
125 /// Wrapped in `UnsafeCell` because raw pointers to this Vec are stored
126 /// in TLS during `poll()`. Without `UnsafeCell`, `&mut self` on methods
127 /// like `complete_task` would invalidate the TLS pointer's provenance
128 /// (exclusive `&mut` covers all non-UnsafeCell fields).
129 incoming: std::cell::UnsafeCell<Vec<*mut u8>>,
130
131 /// Tasks being drained this cycle. Iterated linearly.
132 /// Does NOT need UnsafeCell — only accessed through `&mut self` in poll().
133 draining: Vec<*mut u8>,
134
135 /// All live task pointers. Slab-indexed for O(1) removal.
136 all_tasks: slab::Slab<*mut u8>,
137
138 /// Number of live tasks.
139 live_count: usize,
140
141 /// Maximum tasks to poll per cycle before yielding to IO.
142 tasks_per_cycle: usize,
143
144 /// Completed task slots awaiting deferred free.
145 ///
146 /// Same UnsafeCell rationale as `incoming` — TLS pointer stored during poll.
147 deferred_free: std::cell::UnsafeCell<Vec<*mut u8>>,
148
149 /// Atomic counters for abnormal-shutdown paths. Surfaced via
150 /// [`Runtime::shutdown_stats`](crate::Runtime::shutdown_stats),
151 /// which returns an `Arc` clone so users can read AFTER Runtime
152 /// drop (the counters fire DURING `Executor::drop`; pre-drop
153 /// snapshots always read zero). Per CALLOUT 5 of PR 2's plan,
154 /// these paths increment counters ONLY — no `eprintln!`/`tracing`
155 /// in new paths. PR 1a's existing eprintlns in the
156 /// slab-unwinding-abort path stay (only signal at moment of
157 /// process abort).
158 shutdown_stats: std::sync::Arc<ShutdownStatsAtomics>,
159
160 /// Cross-wake context, set by Runtime via [`Executor::install_cross_wake_for_drop`]
161 /// after construction. `Executor::drop` uses it to drain the
162 /// cross-thread queue at shutdown end and tally
163 /// `cross_queue_undrained`. `None` for bare `Executor` use in
164 /// tests (no Runtime, no cross-queue inspection at drop).
165 cross_wake_for_drop: Option<std::sync::Arc<crate::cross_wake::CrossWakeContext>>,
166}
167
168/// Atomic counters backing [`ShutdownStats`]. Written by `Executor`,
169/// readable via the handle returned by
170/// [`Runtime::shutdown_stats`](crate::Runtime::shutdown_stats).
171///
172/// Atomics are used (not `Cell`) so the user-facing handle can survive
173/// `Runtime::drop` and be read on the same thread post-drop. All
174/// updates use `Relaxed` ordering — the counters are observability,
175/// not synchronization.
176#[derive(Default, Debug)]
177pub struct ShutdownStatsAtomics {
178 aborted_unwinds: std::sync::atomic::AtomicU64,
179 leaked_box_tasks: std::sync::atomic::AtomicU64,
180 unbalanced_normal_shutdowns: std::sync::atomic::AtomicU64,
181 cross_queue_undrained: std::sync::atomic::AtomicU64,
182}
183
184impl ShutdownStatsAtomics {
185 /// Snapshot the current counter values into a plain
186 /// [`ShutdownStats`]. Loads are `Relaxed` — observability, not
187 /// synchronization.
188 pub fn snapshot(&self) -> ShutdownStats {
189 use std::sync::atomic::Ordering;
190 ShutdownStats {
191 aborted_unwinds: self.aborted_unwinds.load(Ordering::Relaxed),
192 leaked_box_tasks: self.leaked_box_tasks.load(Ordering::Relaxed),
193 unbalanced_normal_shutdowns: self.unbalanced_normal_shutdowns.load(Ordering::Relaxed),
194 cross_queue_undrained: self.cross_queue_undrained.load(Ordering::Relaxed),
195 }
196 }
197}
198
199/// Counters for abnormal-shutdown paths. Snapshot returned by
200/// [`Runtime::shutdown_stats`](crate::Runtime::shutdown_stats).
201///
202/// All counters are `0` for a clean shutdown. Any non-zero counter is a
203/// signal to investigate — the runtime hit a defensive code path that
204/// should be unreachable in normal operation. Users own their
205/// observability stack; the runtime emits no logs of its own (per
206/// PR 2's design — see `ShutdownStats` doc-comment for the user
207/// pattern).
208///
209/// # Example
210///
211/// ```ignore
212/// let handle = runtime.shutdown_stats(); // Arc<ShutdownStatsAtomics>
213/// drop(runtime); // counters fire during drop
214/// let stats = handle.snapshot(); // plain ShutdownStats for matching
215/// if stats.aborted_unwinds != 0
216/// || stats.leaked_box_tasks != 0
217/// || stats.unbalanced_normal_shutdowns != 0
218/// || stats.cross_queue_undrained != 0
219/// {
220/// // user's own observability — log to wherever they want
221/// my_logger::warn!("nexus runtime shutdown: {stats:?}");
222/// }
223/// ```
224#[derive(Default, Debug, Clone, Copy)]
225pub struct ShutdownStats {
226 /// `Executor::drop` hit the slab-unwinding 100ms-wait-then-abort
227 /// path. Indicates a producer thread held a slab task ref past
228 /// Runtime drop during a panic. **The process aborted before this
229 /// counter could be read** — non-zero means a previous run aborted
230 /// (the counter is preserved across the abort by being stored in
231 /// the executor's state, but reading it requires the runtime to
232 /// have survived; in practice this counter is set just before
233 /// abort and serves as a guarantee the abort path was hit if the
234 /// runtime somehow survived).
235 pub aborted_unwinds: u64,
236 /// Box-allocated tasks the executor couldn't free during shutdown
237 /// unwinding (outstanding cross-thread refs, leaked to avoid
238 /// double-panic). Memory leak, not UAF. Box memory is reclaimed
239 /// at process exit.
240 pub leaked_box_tasks: u64,
241 /// Normal shutdown (no panic in flight) found an `all_tasks` entry
242 /// with `rc > 0`. Debug builds panic. Release builds eprintln +
243 /// leak. Indicates a producer didn't release refs before Runtime
244 /// drop — call [`Runtime::shutdown_quiesce`](crate::Runtime::shutdown_quiesce)
245 /// before drop to surface this as an `Err` instead.
246 pub unbalanced_normal_shutdowns: u64,
247 /// Cross-thread queue entries that landed after Runtime drop and
248 /// were never drained (the leak path inherited from PR 1a's
249 /// dispose_terminal off-thread branch). Pure memory leak.
250 pub cross_queue_undrained: u64,
251}
252
253/// Default poll limit.
254const DEFAULT_TASKS_PER_CYCLE: usize = 64;
255
256impl Executor {
257 /// Create an executor.
258 pub fn new(initial_capacity: usize) -> Self {
259 Self {
260 incoming: std::cell::UnsafeCell::new(Vec::with_capacity(initial_capacity)),
261 draining: Vec::with_capacity(initial_capacity),
262 all_tasks: slab::Slab::with_capacity(initial_capacity),
263 live_count: 0,
264 tasks_per_cycle: DEFAULT_TASKS_PER_CYCLE,
265 shutdown_stats: std::sync::Arc::new(ShutdownStatsAtomics::default()),
266 cross_wake_for_drop: None,
267 deferred_free: std::cell::UnsafeCell::new(Vec::new()),
268 }
269 }
270
271 /// Reserve a tracker key for external allocation (slab spawn).
272 pub(crate) fn next_tracker_key(&self) -> u32 {
273 let key = self.all_tasks.vacant_key();
274 debug_assert!(
275 u32::try_from(key).is_ok(),
276 "more than 4 billion concurrent tasks — tracker_key overflow"
277 );
278 key as u32
279 }
280
281 /// Spawn an async task via Box allocation. Returns a [`JoinHandle`]
282 /// that can be awaited for the task's output.
283 pub fn spawn_boxed<F>(&mut self, future: F) -> task::JoinHandle<F::Output>
284 where
285 F: Future + 'static,
286 F::Output: 'static,
287 {
288 let tracker_key = self.all_tasks.vacant_key();
289 debug_assert!(
290 u32::try_from(tracker_key).is_ok(),
291 "more than 4 billion concurrent tasks — tracker_key overflow"
292 );
293 // Read the runtime's cross-wake context from TLS — installed at
294 // RuntimeBuilder::build, lifetime of Runtime. Null when no
295 // Runtime is alive (e.g., direct Executor use in tests); the
296 // task header's cross_wake_ctx becomes null and dispose_terminal
297 // routes those tasks via its null-ctx fallback.
298 let cross_wake_ctx = crate::cross_wake::current_runtime_ctx();
299 let ptr = task::box_spawn_joinable(future, tracker_key as u32, cross_wake_ctx);
300
301 self.enqueue(ptr);
302 task::JoinHandle::new(ptr)
303 }
304
305 /// Spawn a task with a pre-allocated pointer (from slab).
306 ///
307 /// The task at `ptr` must have been constructed with joinable or
308 /// fire-and-forget constructors and a valid `free_fn`.
309 pub(crate) fn spawn_raw(&mut self, ptr: *mut u8) {
310 self.enqueue(ptr);
311 }
312
313 /// Common enqueue logic for spawn and spawn_raw.
314 fn enqueue(&mut self, ptr: *mut u8) {
315 self.all_tasks.insert(ptr);
316 unsafe { task::set_queued(ptr, true) };
317 // SAFETY: single-threaded, no concurrent access during enqueue.
318 unsafe { &mut *self.incoming.get() }.push(ptr);
319 self.live_count += 1;
320 }
321
322 /// Drain the cross-thread wake inbox into the local ready queue.
323 ///
324 /// Called at the start of each poll cycle. Tasks pushed from other
325 /// threads via `CrossWakeQueue::push` are moved into `incoming`.
326 /// Completed tasks are routed to `deferred_free` instead — they
327 /// were pushed for cleanup (not re-polling) by `cross_task_drop`.
328 /// Drains at most `limit` tasks (remaining are picked up next cycle).
329 pub(crate) fn drain_cross_thread(
330 &mut self,
331 inbox: &crate::cross_wake::CrossWakeQueue,
332 limit: usize,
333 ) -> usize {
334 let mut drained = 0;
335 while drained < limit {
336 match inbox.pop() {
337 Some(task_ptr) => {
338 // Clear QUEUED flag now that we've popped it.
339 unsafe { task::clear_queued(task_ptr) };
340
341 // Check if TERMINAL was reached (e.g., cross-thread waker
342 // produced TERMINAL via ref_dec while the task was queued).
343 // Only TERMINAL tasks go to deferred_free. Completed tasks
344 // with outstanding refs must NOT be freed prematurely.
345 if unsafe { task::is_terminal(task_ptr) } {
346 unsafe { &mut *self.deferred_free.get() }.push(task_ptr);
347 } else {
348 unsafe { &mut *self.incoming.get() }.push(task_ptr);
349 }
350 drained += 1;
351 }
352 None => break,
353 }
354 }
355 drained
356 }
357
358 /// Poll all ready tasks once.
359 pub fn poll(&mut self) -> usize {
360 let mut completed = 0;
361
362 // Drain deferred frees from last cycle.
363 // SAFETY: single-threaded, TLS not yet set for this cycle.
364 for ptr in unsafe { &mut *self.deferred_free.get() }.drain(..) {
365 let key = unsafe { task::tracker_key(ptr) } as usize;
366 // SAFETY: free_fn was set at spawn time.
367 unsafe { task::free_task(ptr) };
368 if self.all_tasks.contains(key) {
369 self.all_tasks.remove(key);
370 }
371 }
372
373 // SAFETY: single-threaded, swapping before TLS is set.
374 std::mem::swap(unsafe { &mut *self.incoming.get() }, &mut self.draining);
375
376 // Derive TLS pointers from UnsafeCell — NOT from &mut self field borrows.
377 // This is critical: complete_task(&mut self) later in this function must
378 // not invalidate the TLS pointers. UnsafeCell fields are excluded from
379 // &mut self's exclusivity guarantee.
380 let _guard = set_poll_context(self.incoming.get(), self.deferred_free.get());
381
382 let limit = self.tasks_per_cycle.min(self.draining.len());
383 let draining_ptr: *const Vec<*mut u8> = &raw const self.draining;
384 let drain_slice = unsafe { &(&*draining_ptr)[..limit] };
385
386 for &ptr in drain_slice {
387 if unsafe { task::is_completed(ptr) } {
388 continue;
389 }
390
391 unsafe { task::set_queued(ptr, false) };
392
393 // SAFETY: ptr is a live task, ref_count >= 1 (executor holds a ref).
394 // task_waker increments ref_count; drop after poll decrements it.
395 let waker = unsafe { crate::waker::task_waker(ptr) };
396 let mut cx = Context::from_waker(&waker);
397
398 let poll_result = unsafe { task::poll_task(ptr, &mut cx) };
399
400 drop(waker);
401
402 match poll_result {
403 Poll::Pending => {}
404 Poll::Ready(()) => {
405 self.complete_task(ptr);
406 completed += 1;
407 }
408 }
409 }
410
411 if limit < self.draining.len() {
412 // SAFETY: single-threaded, TLS guard is about to drop.
413 unsafe { &mut *self.incoming.get() }.extend_from_slice(&self.draining[limit..]);
414 }
415 self.draining.clear();
416
417 completed
418 }
419
420 /// Number of live tasks.
421 pub fn task_count(&self) -> usize {
422 self.live_count
423 }
424
425 /// Number of tasks tracked in the executor's `all_tasks` slab.
426 /// Includes COMPLETED-but-still-referenced tasks (a `JoinHandle`
427 /// or cross-thread waker holds a ref) — distinguishing it from
428 /// `task_count()` which decrements `live_count` unconditionally on
429 /// completion.
430 ///
431 /// `shutdown_quiesce` uses this for its quiesce check: a task that
432 /// completed but has outstanding refs WILL fire one of the
433 /// abnormal-shutdown branches in `Executor::drop` (debug-panic
434 /// "outstanding references" or release-eprintln + counter
435 /// increment). Quiesce-as-`Ok` requires `all_tasks` to be empty,
436 /// not just `live_count == 0`. (PR2-John-review item 2.)
437 pub(crate) fn outstanding_tasks(&self) -> usize {
438 self.all_tasks.len()
439 }
440
441 /// Number of completed task slots awaiting deferred free.
442 #[cfg(test)]
443 pub fn deferred_free_count(&self) -> usize {
444 // SAFETY: single-threaded, read-only snapshot.
445 unsafe { &*self.deferred_free.get() }.len()
446 }
447
448 /// Returns an Arc handle to the shutdown counters. Callers can
449 /// hold it past Runtime drop to read final values via
450 /// [`ShutdownStatsAtomics::snapshot`].
451 pub(crate) fn shutdown_stats(&self) -> std::sync::Arc<ShutdownStatsAtomics> {
452 std::sync::Arc::clone(&self.shutdown_stats)
453 }
454
455 /// Counter increments for the abnormal-shutdown branches.
456 /// Per CALLOUT 5 of PR 2's plan: counter-only — no eprintln,
457 /// no tracing, no log calls. Users own their observability.
458 fn record_aborted_unwind(&self) {
459 self.shutdown_stats
460 .aborted_unwinds
461 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
462 }
463
464 fn record_leaked_box(&self) {
465 self.shutdown_stats
466 .leaked_box_tasks
467 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
468 }
469
470 fn record_unbalanced_normal(&self) {
471 self.shutdown_stats
472 .unbalanced_normal_shutdowns
473 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
474 }
475
476 /// Add `count` to the `cross_queue_undrained` counter. Called from
477 /// `Executor::drop` after the all_tasks loop, when the cross-thread
478 /// queue's tail-end is drained for the diagnostic count.
479 fn record_cross_queue_undrained(&self, count: u64) {
480 self.shutdown_stats
481 .cross_queue_undrained
482 .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
483 }
484
485 /// Wire the runtime's cross-wake context into the executor so
486 /// `Executor::drop` can drain + count the cross-thread queue at
487 /// shutdown end. Called by `RuntimeBuilder::build` after both
488 /// `Executor::new` and `Arc::new(CrossWakeContext { ... })`.
489 pub(crate) fn install_cross_wake_for_drop(
490 &mut self,
491 cross_wake: std::sync::Arc<crate::cross_wake::CrossWakeContext>,
492 ) {
493 self.cross_wake_for_drop = Some(cross_wake);
494 }
495
496 /// Returns `true` if any tasks are queued for polling.
497 pub fn has_ready(&self) -> bool {
498 // SAFETY: single-threaded, read-only snapshot.
499 !unsafe { &*self.incoming.get() }.is_empty()
500 }
501
502 /// Set the maximum tasks to poll per cycle.
503 pub fn set_tasks_per_cycle(&mut self, limit: usize) {
504 self.tasks_per_cycle = limit;
505 }
506
507 /// Complete a task: handle joinable vs fire-and-forget paths.
508 ///
509 /// Uses `complete_and_unref` to atomically set COMPLETED and decrement
510 /// the executor's reference in a single atomic operation — eliminating
511 /// the race window that caused SIGABRT with cross-thread wakers.
512 ///
513 /// Three branches based on task state:
514 /// - **Aborted:** drop F (still live — poll_join short-circuited), notify joiner
515 /// - **Joinable (HAS_JOIN):** T is live in the union, don't touch it — JoinHandle owns it
516 /// - **Fire-and-forget / detached:** drop the value (F or T) and free
517 ///
518 /// # Safety invariants
519 ///
520 /// `ptr` must point to a task that just returned `Poll::Ready(())` from poll_task.
521 fn complete_task(&mut self, ptr: *mut u8) {
522 let aborted = unsafe { task::is_aborted(ptr) };
523
524 if aborted {
525 // Aborted: poll_join saw ABORTED and returned Ready without polling F.
526 // F is still live in the union. drop_fn still targets F.
527 unsafe { task::drop_task_future(ptr) };
528 self.live_count -= 1;
529
530 if unsafe { task::has_join(ptr) } {
531 let waker = unsafe { task::take_join_waker(ptr) };
532 if let Some(w) = waker {
533 w.wake();
534 }
535 }
536
537 match unsafe { task::complete_and_unref(ptr) } {
538 task::FreeAction::Retain => {}
539 task::FreeAction::FreeBox | task::FreeAction::FreeSlab => {
540 let key = unsafe { task::tracker_key(ptr) } as usize;
541 unsafe { task::free_task(ptr) };
542 self.all_tasks.remove(key);
543 }
544 }
545 } else if unsafe { task::has_join(ptr) } {
546 // Joinable: poll_join dropped F and wrote T. drop_fn = drop_output::<T>.
547 // Don't drop T — JoinHandle will read it or drop it on handle drop.
548 self.live_count -= 1;
549
550 // Wake the joiner so it can poll the JoinHandle and read T.
551 let waker = unsafe { task::take_join_waker(ptr) };
552 if let Some(w) = waker {
553 w.wake();
554 }
555
556 match unsafe { task::complete_and_unref(ptr) } {
557 task::FreeAction::Retain => {}
558 task::FreeAction::FreeBox | task::FreeAction::FreeSlab => {
559 // Terminal — JoinHandle already dropped (detached). Drop output.
560 unsafe { task::drop_task_future(ptr) };
561 let key = unsafe { task::tracker_key(ptr) } as usize;
562 unsafe { task::free_task(ptr) };
563 self.all_tasks.remove(key);
564 }
565 }
566 } else {
567 // Fire-and-forget or detached (HAS_JOIN cleared by JoinHandle::Drop).
568 unsafe { task::drop_task_future(ptr) };
569 self.live_count -= 1;
570
571 match unsafe { task::complete_and_unref(ptr) } {
572 task::FreeAction::Retain => {}
573 task::FreeAction::FreeBox | task::FreeAction::FreeSlab => {
574 let key = unsafe { task::tracker_key(ptr) } as usize;
575 unsafe { task::free_task(ptr) };
576 self.all_tasks.remove(key);
577 }
578 }
579 }
580 }
581
582 /// Returns raw pointers for TLS setup.
583 ///
584 /// Takes `&self` because `UnsafeCell::get()` only needs a shared reference.
585 /// The raw pointers carry write provenance from the `UnsafeCell`.
586 pub(crate) fn poll_context_ptrs(&self) -> (*mut Vec<*mut u8>, *mut Vec<*mut u8>) {
587 (self.incoming.get(), self.deferred_free.get())
588 }
589
590 /// Cancel a task by ID.
591 #[allow(dead_code)]
592 pub(crate) fn cancel(&mut self, id: task::TaskId) {
593 let ptr = id.0;
594 // Skip if already completed (e.g. double-cancel or cancel after poll).
595 if unsafe { task::is_completed(ptr) } {
596 return;
597 }
598 // SAFETY: single-threaded, no TLS active during cancel.
599 unsafe { &mut *self.incoming.get() }.retain(|p| *p != ptr);
600 self.draining.retain(|p| *p != ptr);
601 self.complete_task(ptr);
602 }
603}
604
605impl Drop for Executor {
606 fn drop(&mut self) {
607 // Step 1 (PR 2 §2.3, fixed in PR2-John-review item 1): drain
608 // the cross-thread queue FIRST, before walking `all_tasks`.
609 //
610 // **Why first.** An off-thread holder dropping a TaskRef
611 // terminal between the runtime's last drain and `Executor::drop`
612 // start enqueues a TERMINAL task pointer in `cross_queue`
613 // (`try_set_queued + push`). The task allocation is alive (we
614 // haven't freed it yet) but rc=0, COMPLETED set, QUEUED set.
615 //
616 // If we walked `all_tasks` BEFORE draining cross_queue:
617 // - `is_terminal` returns false (QUEUED bit is set, mask
618 // `INERT_MASK` doesn't clear it).
619 // - Falls through to the rc=0 branch → `free_task(ptr)`.
620 // - Step 3's pop then derefs `cross_next` at offset 32 of
621 // the freed allocation. **UAF.**
622 //
623 // By draining cross_queue first, `drain_cross_thread` clears
624 // QUEUED and routes the terminal entry to `deferred_free`
625 // (state is now just COMPLETED → `is_terminal` returns true
626 // there). Step 2's deferred_free drain frees + removes from
627 // `all_tasks`. Step 3's all_tasks walk no longer sees it.
628 //
629 // Entries that arrive AFTER step 1 (off-thread holder pushes
630 // mid-drop) leave a stale pointer in cross_queue. No one pops
631 // it post-drop (no executor) so no UAF; the leak is bounded
632 // by the lifetime of `Arc<CrossWakeContext>` and the entry
633 // is freed-then-pointer-leaked when the last Arc clone drops.
634 let undrained = self.cross_wake_for_drop.take().map_or(0u64, |ctx| {
635 self.drain_cross_thread(&ctx.queue, usize::MAX) as u64
636 });
637 if undrained > 0 {
638 self.record_cross_queue_undrained(undrained);
639 }
640
641 // Step 2: drain deferred-free (now includes any terminals
642 // routed by step 1's cross-queue drain). Updates `all_tasks`
643 // bookkeeping in the right order (read tracker_key BEFORE
644 // free_task).
645 self.drop_drain_deferred_free();
646
647 // Step 3: walk surviving tasks. Each task hits one of four
648 // branches: TERMINAL (free directly), not-completed (try to
649 // complete + maybe free), outstanding-refs (route to unwinding
650 // or normal-shutdown handlers), or zero-refs (free).
651 for (_, &ptr) in &self.all_tasks {
652 if unsafe { task::is_terminal(ptr) } {
653 // TERMINAL: completed, zero refs, all flags cleared.
654 // Happens when a cross-thread waker produced TERMINAL
655 // via ref_dec but the executor hadn't scanned yet.
656 unsafe { task::free_task(ptr) };
657 continue;
658 }
659
660 if !unsafe { task::is_completed(ptr) } && Self::drop_complete_and_maybe_free(ptr) {
661 continue;
662 }
663
664 let rc = unsafe { task::ref_count(ptr) };
665 if rc > 0 {
666 if std::thread::panicking() {
667 self.drop_outstanding_unwinding(ptr, rc);
668 } else {
669 self.drop_outstanding_normal(ptr, rc);
670 }
671 continue;
672 }
673
674 unsafe { task::free_task(ptr) };
675 }
676 }
677}
678
679impl Executor {
680 /// Drop step 1: drain deferred-free entries from the last poll
681 /// cycle (or accumulated since one). Each entry is a completed
682 /// task whose final ref dropped after the last poll cycle's drain
683 /// ran; we own them and must free the storage + remove from
684 /// `all_tasks`. The order (read tracker_key, then free_task, then
685 /// remove key) matters because tracker_key reads from the task
686 /// header — must happen before the allocation is freed.
687 ///
688 /// SAFETY: `&mut self` in Drop, no concurrent access.
689 fn drop_drain_deferred_free(&mut self) {
690 for ptr in unsafe { &mut *self.deferred_free.get() }.drain(..) {
691 let key = unsafe { task::tracker_key(ptr) } as usize;
692 unsafe { task::free_task(ptr) };
693 if self.all_tasks.contains(key) {
694 self.all_tasks.remove(key);
695 }
696 }
697 }
698
699 /// Drop step 2 / branch B: task hasn't completed yet. Drop its
700 /// future (running its destructors — Aeron publishers, sockets,
701 /// file handles all release here), then atomically set COMPLETED +
702 /// decrement the executor's ref. Returns true if the resulting
703 /// state is terminal (we freed the slot) — caller `continue`s.
704 /// Returns false when the task still has cross-thread refs and
705 /// the caller falls through to the rc-check.
706 ///
707 /// SAFETY: caller guarantees `ptr` references a not-yet-completed
708 /// task with the executor's ref still held.
709 fn drop_complete_and_maybe_free(ptr: *mut u8) -> bool {
710 unsafe { task::drop_task_future(ptr) };
711 match unsafe { task::complete_and_unref(ptr) } {
712 task::FreeAction::Retain => false,
713 task::FreeAction::FreeBox | task::FreeAction::FreeSlab => {
714 unsafe { task::free_task(ptr) };
715 true
716 }
717 }
718 }
719
720 /// Drop step 2 / branch C+D: task completed but has outstanding
721 /// cross-thread refs, and we're mid-unwind. Behavior splits by
722 /// allocation type:
723 ///
724 /// - **Slab task**: wait up to 100ms for refs to settle (producer
725 /// threads may be racing to release). If settled, free cleanly.
726 /// If not, abort — leaking would UAF when `_slab_guard` releases
727 /// the slab backing storage after `Executor::drop` returns.
728 /// - **Box task**: leak. The Box sits in process memory until
729 /// process exit; outstanding cross-thread refs that later run
730 /// `ref_dec` see valid memory.
731 ///
732 /// The eprintln!s in this branch are PR 1a's existing signals —
733 /// they stay (per CALLOUT 5 of PR 2's plan, removable post-§2.4
734 /// once `shutdown_quiesce` makes this branch unreachable in
735 /// normal operation). The slab and box helpers each increment
736 /// the relevant `ShutdownStats` counter (`aborted_unwinds` /
737 /// `leaked_box_tasks`).
738 ///
739 /// SAFETY: caller guarantees `ptr` references a completed task
740 /// with rc > 0, called during unwind.
741 fn drop_outstanding_unwinding(&self, ptr: *mut u8, rc: usize) {
742 if unsafe { task::is_slab_allocated(ptr) } {
743 self.drop_outstanding_slab_unwinding(ptr);
744 } else {
745 self.drop_outstanding_box_unwinding(ptr, rc);
746 }
747 }
748
749 /// Slab branch of the unwinding path. See `drop_outstanding_unwinding`
750 /// for context. Increments `aborted_unwinds` counter on the
751 /// abort path (PR 2 §2.3) BEFORE calling `std::process::abort()`
752 /// so a parent process inspecting the runtime's state can see
753 /// the counter via shared memory or memory-mapped logging.
754 fn drop_outstanding_slab_unwinding(&self, ptr: *mut u8) {
755 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(100);
756 while unsafe { task::ref_count(ptr) } > 0 && std::time::Instant::now() < deadline {
757 std::thread::yield_now();
758 }
759 if unsafe { task::ref_count(ptr) } > 0 {
760 // Record before the abort — the eprintln stays per CALLOUT 5
761 // (only signal at moment of process abort).
762 self.record_aborted_unwind();
763 eprintln!(
764 "nexus-async-rt: slab task {ptr:p} has \
765 outstanding refs after 100ms during unwinding \
766 — aborting to avoid UAF on slab memory \
767 release. Cross-thread waker producer thread \
768 may be deadlocked or starved."
769 );
770 std::process::abort();
771 }
772 // Refs settled — free cleanly. Avoid the panic path.
773 unsafe { task::free_task(ptr) };
774 }
775
776 /// Box branch of the unwinding path. See `drop_outstanding_unwinding`
777 /// for context. Leaks the box; safe — outstanding refs see valid
778 /// memory until process exit. Increments `leaked_box_tasks` (PR 2 §2.3).
779 fn drop_outstanding_box_unwinding(&self, _ptr: *mut u8, rc: usize) {
780 self.record_leaked_box();
781 eprintln!(
782 "nexus-async-rt: executor dropped with {rc} outstanding \
783 reference(s) during unwinding — suppressing panic to \
784 avoid abort. Task resources were released via \
785 drop_task_future; leaking box task allocation + waker \
786 bookkeeping memory."
787 );
788 }
789
790 /// Drop step 2 / branch E: task completed but has outstanding
791 /// cross-thread refs, normal shutdown (no panic in flight). This
792 /// indicates a user-side lifetime discipline violation — wakers
793 /// or JoinHandles weren't dropped before the Runtime. Debug builds
794 /// panic to surface the bug; release builds eprintln + leak to
795 /// avoid UB. Increments `unbalanced_normal_shutdowns` (PR 2 §2.3)
796 /// before either path.
797 ///
798 /// SAFETY: caller guarantees `ptr` references a completed task
799 /// with rc > 0, called outside any panic.
800 fn drop_outstanding_normal(&self, _ptr: *mut u8, rc: usize) {
801 self.record_unbalanced_normal();
802 #[cfg(debug_assertions)]
803 panic!(
804 "executor dropped with {rc} outstanding reference(s) — \
805 all wakers and JoinHandles must be dropped before the Runtime"
806 );
807 #[cfg(not(debug_assertions))]
808 eprintln!(
809 "nexus-async-rt: executor dropped with {rc} outstanding task \
810 reference(s) — leaking to avoid UB"
811 );
812 }
813}
814
815// =============================================================================
816// Tests
817// =============================================================================
818
819#[cfg(test)]
820mod tests {
821 use super::*;
822 use std::hint::black_box;
823 use std::pin::Pin;
824 use task::Task;
825
826 fn test_executor() -> Executor {
827 Executor::new(16)
828 }
829
830 // =========================================================================
831 // Basic spawn + poll
832 // =========================================================================
833
834 #[test]
835 fn spawn_and_poll_single_task() {
836 let mut exec = test_executor();
837 let mut done = false;
838 let flag = &raw mut done;
839
840 exec.spawn_boxed(async move {
841 // SAFETY: single-threaded, flag lives on stack.
842 unsafe { *flag = true };
843 });
844
845 assert_eq!(exec.task_count(), 1);
846 let completed = exec.poll();
847 assert_eq!(completed, 1);
848 assert!(done);
849 assert_eq!(exec.task_count(), 0);
850 }
851
852 #[test]
853 fn spawn_multiple_tasks() {
854 let mut exec = test_executor();
855
856 for _ in 0..8 {
857 exec.spawn_boxed(async {});
858 }
859
860 assert_eq!(exec.task_count(), 8);
861 let completed = exec.poll();
862 assert_eq!(completed, 8);
863 assert_eq!(exec.task_count(), 0);
864 }
865
866 // =========================================================================
867 // Pending tasks
868 // =========================================================================
869
870 #[test]
871 fn pending_task_not_completed() {
872 let mut exec = test_executor();
873
874 // A future that is always pending.
875 exec.spawn_boxed(std::future::pending::<()>());
876
877 let completed = exec.poll();
878 assert_eq!(completed, 0);
879 assert_eq!(exec.task_count(), 1);
880 }
881
882 // =========================================================================
883 // Waker: re-queue via wake_by_ref
884 // =========================================================================
885
886 #[test]
887 fn immediate_task_completes() {
888 let mut exec = test_executor();
889
890 exec.spawn_boxed(async {
891 // Immediately ready.
892 });
893
894 let completed = exec.poll();
895 assert_eq!(completed, 1);
896 assert_eq!(exec.task_count(), 0);
897 }
898
899 // =========================================================================
900 // Self-waking task
901 // =========================================================================
902
903 #[test]
904 fn self_waking_task_polled_again() {
905 use std::cell::Cell;
906 use std::rc::Rc;
907
908 let mut exec = test_executor();
909
910 let counter = Rc::new(Cell::new(0u32));
911 let c = counter.clone();
912
913 exec.spawn_boxed(async move {
914 struct SelfWake {
915 counter: Rc<Cell<u32>>,
916 }
917 impl Future for SelfWake {
918 type Output = ();
919 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
920 let n = self.counter.get();
921 self.counter.set(n + 1);
922 if n < 3 {
923 cx.waker().wake_by_ref();
924 Poll::Pending
925 } else {
926 Poll::Ready(())
927 }
928 }
929 }
930 SelfWake { counter: c }.await;
931 });
932
933 // Drain all polls.
934 let mut total = 0;
935 for _ in 0..10 {
936 total += exec.poll();
937 if exec.task_count() == 0 {
938 break;
939 }
940 }
941 assert_eq!(total, 1); // completed once
942 assert_eq!(counter.get(), 4); // polled 4 times
943 }
944
945 // =========================================================================
946 // Cancel
947 // =========================================================================
948
949 #[test]
950 fn abort_task() {
951 let mut exec = test_executor();
952 let handle = exec.spawn_boxed(std::future::pending::<()>());
953
954 assert_eq!(exec.task_count(), 1);
955 assert!(handle.abort()); // was running, handle consumed
956 exec.poll(); // abort takes effect on next poll
957 assert_eq!(exec.task_count(), 0);
958 }
959
960 #[test]
961 fn abort_frees_slot_for_reuse() {
962 let mut exec = test_executor();
963 let handle = exec.spawn_boxed(std::future::pending::<()>());
964 handle.abort(); // consumes handle
965
966 exec.poll(); // process abort + deferred free
967
968 // Should be able to spawn again.
969 exec.spawn_boxed(async {});
970 assert_eq!(exec.task_count(), 1);
971 exec.poll();
972 assert_eq!(exec.task_count(), 0);
973 }
974
975 // =========================================================================
976 // Poll limit (tasks_per_cycle)
977 // =========================================================================
978
979 #[test]
980 fn poll_limit_respected() {
981 let mut exec = test_executor();
982 exec.set_tasks_per_cycle(2);
983
984 for _ in 0..5 {
985 exec.spawn_boxed(async {});
986 }
987
988 // Only 2 polled per cycle.
989 let completed = exec.poll();
990 assert_eq!(completed, 2);
991 assert_eq!(exec.task_count(), 3);
992
993 let completed = exec.poll();
994 assert_eq!(completed, 2);
995 assert_eq!(exec.task_count(), 1);
996
997 let completed = exec.poll();
998 assert_eq!(completed, 1);
999 assert_eq!(exec.task_count(), 0);
1000 }
1001
1002 // =========================================================================
1003 // Stale ready entries after cancel
1004 // =========================================================================
1005
1006 #[test]
1007 fn cancel_with_stale_ready_entry() {
1008 use std::cell::Cell;
1009 use std::rc::Rc;
1010
1011 let mut exec = test_executor();
1012
1013 let polled = Rc::new(Cell::new(false));
1014 let p = polled.clone();
1015
1016 // Spawn a self-waking task.
1017 struct WakeOnce(bool);
1018 impl Future for WakeOnce {
1019 type Output = ();
1020 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1021 if !self.0 {
1022 self.0 = true;
1023 cx.waker().wake_by_ref();
1024 Poll::Pending
1025 } else {
1026 Poll::Ready(())
1027 }
1028 }
1029 }
1030
1031 let handle = exec.spawn_boxed(WakeOnce(false));
1032
1033 // First poll: sets is_queued again via wake_by_ref.
1034 exec.poll();
1035
1036 // Abort while the task is in the ready queue (consumes handle).
1037 handle.abort();
1038
1039 // Spawn a new task to prove we don't crash on the stale pointer.
1040 exec.spawn_boxed(async move {
1041 p.set(true);
1042 });
1043
1044 exec.poll(); // processes abort + new task
1045 assert!(polled.get());
1046 }
1047
1048 // =========================================================================
1049 // Refcount behavior
1050 // =========================================================================
1051
1052 #[test]
1053 fn refcount_starts_at_one() {
1054 let task = Box::new(Task::new_boxed(async {}, 0));
1055 let ptr = Box::into_raw(task) as *mut u8;
1056 assert_eq!(unsafe { task::ref_count(ptr) }, 1);
1057 unsafe { task::free_task(ptr) };
1058 }
1059
1060 #[test]
1061 fn executor_drop_cleans_up_queued_tasks() {
1062 let mut exec = test_executor();
1063 exec.spawn_boxed(std::future::pending::<()>());
1064 exec.spawn_boxed(std::future::pending::<()>());
1065 exec.poll(); // poll them once
1066 // Drop executor — should free all tasks without panic.
1067 drop(exec);
1068 }
1069
1070 // =========================================================================
1071 // Dispatch latency (rough, not controlled)
1072 // =========================================================================
1073
1074 #[test]
1075 #[ignore]
1076 fn dispatch_latency() {
1077 use std::time::Instant;
1078
1079 struct Noop;
1080 impl Future for Noop {
1081 type Output = ();
1082 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1083 cx.waker().wake_by_ref();
1084 Poll::Pending
1085 }
1086 }
1087
1088 let mut exec = test_executor();
1089 exec.spawn_boxed(Noop);
1090
1091 // Warmup.
1092 for _ in 0..10_000 {
1093 exec.poll();
1094 }
1095
1096 let iters = 100_000;
1097 let start = Instant::now();
1098 for _ in 0..iters {
1099 exec.poll();
1100 }
1101 let elapsed = start.elapsed();
1102 let ns_per = elapsed.as_nanos() / iters;
1103 println!("dispatch: {ns_per} ns/poll (Box-allocated)");
1104 black_box(ns_per);
1105 }
1106}