nexus_async_rt/runtime.rs
1//! Single-threaded async runtime.
2//!
3//! [`Runtime`] owns an [`Executor`](crate::Executor) for spawned tasks, a
4//! boxed root future, and an event-cycle timestamp. The root future is
5//! driven to completion by [`block_on`](Runtime::block_on) or
6//! [`block_on_busy`](Runtime::block_on_busy).
7//!
8//! Two spawn strategies:
9//! - **`spawn_boxed()`** — Box-allocated. Default. No setup needed.
10//! - **`spawn_slab()`** — Slab-allocated. Zero-alloc hot path.
11//! Requires slab configured via [`RuntimeBuilder::slab`].
12//!
13//! # Thread-local spawn
14//!
15//! [`spawn`] and [`spawn_slab`] are free functions that push tasks into
16//! the current runtime via thread-local pointers set during `block_on`.
17//! Calling them outside `block_on` panics.
18
19use std::cell::Cell;
20use std::future::Future;
21use std::marker::PhantomData;
22use std::pin::Pin;
23use std::task::{Context, Poll, Wake, Waker};
24use std::time::{Duration, Instant};
25
26use crate::io::IoDriver;
27use crate::task::JoinHandle;
28use crate::timer::TimerDriver;
29use crate::{Executor, WorldCtx};
30
31/// Default number of loop iterations between non-blocking IO polls.
32/// Matches tokio's heuristic (61, originally from Go's scheduler).
33const DEFAULT_EVENT_INTERVAL: u32 = 61;
34
35// =============================================================================
36// Thread-local spawn context
37// =============================================================================
38
39thread_local! {
40 /// Raw pointer to the active runtime's executor.
41 /// Set on `block_on` entry, cleared on exit.
42 static CURRENT: Cell<*mut Executor> = const { Cell::new(std::ptr::null_mut()) };
43}
44
45/// Spawn a Box-allocated task into the current runtime.
46///
47/// Returns a [`JoinHandle`] that can be awaited for the task's output.
48/// Drop the handle to detach the task.
49///
50/// Must be called from within [`Runtime::block_on`] or
51/// [`Runtime::block_on_busy`]. Panics otherwise.
52///
53/// # Panics
54///
55/// - If called outside a runtime context.
56pub fn spawn_boxed<F>(future: F) -> JoinHandle<F::Output>
57where
58 F: Future + 'static,
59 F::Output: 'static,
60{
61 CURRENT.with(|cell| {
62 let ptr = cell.get();
63 assert!(
64 !ptr.is_null(),
65 "spawn_boxed() called outside of Runtime::block_on"
66 );
67 // SAFETY: pointer valid for duration of block_on. Single-threaded.
68 let executor = unsafe { &mut *ptr };
69 executor.spawn_boxed(future)
70 })
71}
72
73/// Spawn a slab-allocated task into the current runtime.
74///
75/// Returns a [`JoinHandle`] that can be awaited for the task's output.
76/// Zero allocation — the task is placed directly into a pre-allocated
77/// slab slot via TLS.
78///
79/// # Panics
80///
81/// - If called outside a runtime context.
82/// - If no slab is configured.
83/// - If the slab is full (bounded slab).
84/// - If the task future exceeds the slab's slot capacity.
85pub fn spawn_slab<F>(future: F) -> JoinHandle<F::Output>
86where
87 F: Future + 'static,
88 F::Output: 'static,
89{
90 CURRENT.with(|cell| {
91 let ptr = cell.get();
92 assert!(
93 !ptr.is_null(),
94 "spawn_slab() called outside of Runtime::block_on"
95 );
96 let executor = unsafe { &mut *ptr };
97 let tracker_key = executor.next_tracker_key();
98 let task_ptr = crate::alloc::slab_spawn(future, tracker_key);
99 executor.spawn_raw(task_ptr);
100 JoinHandle::new(task_ptr)
101 })
102}
103
104/// Access the current executor via TLS. Panics if outside `block_on`.
105pub(crate) fn with_executor<R>(f: impl FnOnce(&mut Executor) -> R) -> R {
106 CURRENT.with(|cell| {
107 let ptr = cell.get();
108 assert!(!ptr.is_null(), "called outside of Runtime::block_on");
109 let executor = unsafe { &mut *ptr };
110 f(executor)
111 })
112}
113
114/// Try to reserve a slab slot. Returns `None` if the slab is full.
115///
116/// Call `.spawn(future)` on the returned [`SlabClaim`](crate::alloc::SlabClaim)
117/// to write a task and enqueue it. If dropped without spawning, the
118/// slot is returned to the freelist automatically.
119///
120/// # Panics
121///
122/// - If called outside a runtime context.
123/// - If no slab is configured.
124pub fn try_claim_slab() -> Option<crate::alloc::SlabClaim> {
125 CURRENT.with(|cell| {
126 assert!(
127 !cell.get().is_null(),
128 "try_claim_slab() called outside of Runtime::block_on"
129 );
130 });
131 crate::alloc::try_claim()
132}
133
134/// Reserve a slab slot. Panics if full or no slab configured.
135///
136/// Call `.spawn(future)` on the returned [`SlabClaim`](crate::alloc::SlabClaim)
137/// to write a task and enqueue it. If dropped without spawning, the
138/// slot is returned to the freelist automatically.
139///
140/// # Panics
141///
142/// - If called outside a runtime context.
143/// - If no slab is configured.
144/// - If the slab is full (bounded slab).
145pub fn claim_slab() -> crate::alloc::SlabClaim {
146 CURRENT.with(|cell| {
147 assert!(
148 !cell.get().is_null(),
149 "claim_slab() called outside of Runtime::block_on"
150 );
151 });
152 crate::alloc::claim()
153}
154
155// =============================================================================
156// Runtime
157// =============================================================================
158
159/// Single-threaded async runtime.
160///
161/// `Runtime` is intrinsically thread-bound — its slab TLS state is
162/// per-thread, so moving it to another thread would silently
163/// desynchronize allocation dispatch. The type is therefore both
164/// `!Send` and `!Sync`, enforced by a `PhantomData<*const ()>` marker.
165///
166/// ```compile_fail
167/// use nexus_async_rt::Runtime;
168/// fn assert_send<T: Send>() {}
169/// assert_send::<Runtime>();
170/// ```
171///
172/// ```compile_fail
173/// use nexus_async_rt::Runtime;
174/// fn assert_sync<T: Sync>() {}
175/// assert_sync::<Runtime>();
176/// ```
177///
178/// # Examples
179///
180/// ```ignore
181/// use nexus_async_rt::{Runtime, spawn_boxed, spawn_slab};
182/// use nexus_slab::byte::unbounded::Slab;
183/// use nexus_rt::WorldBuilder;
184///
185/// let mut world = WorldBuilder::new().build();
186///
187/// // Simple — Box-allocated tasks
188/// let mut rt = Runtime::new(&mut world);
189/// rt.block_on(async {
190/// spawn_boxed(async { /* Box-allocated */ });
191/// });
192///
193/// // With slab for hot-path tasks
194/// let slab = unsafe { Slab::<256>::with_chunk_capacity(64) };
195/// let mut rt = Runtime::builder(&mut world)
196/// .slab_unbounded(slab)
197/// .build();
198/// rt.block_on(async {
199/// spawn_boxed(async { /* Box-allocated */ });
200/// spawn_slab(async { /* slab-allocated */ });
201/// });
202/// ```
203//
204// `#[repr(C)]` is required for the `offset_of` assertion below to be
205// sound. Under `repr(Rust)` (the default), the compiler is free to
206// reorder fields for layout optimization, which would let an accidental
207// declaration-order swap silently re-introduce BUG-1 (#167) while the
208// offset comparison still happened to pass. `#[repr(C)]` guarantees
209// field offsets follow declaration order modulo alignment padding,
210// making the assertion enforce what it claims.
211//
212// This is NOT for FFI — `Runtime` has no foreign caller. It's purely
213// to back the BUG-1 invariant with a language-spec guarantee instead
214// of empirical rustc behavior.
215#[repr(C)]
216pub struct Runtime {
217 /// Spawned task storage.
218 ///
219 /// Drops first (declaration order). `Executor::drop` walks
220 /// `all_tasks` and frees any survivors via the slab TLS dispatch
221 /// path, which requires `_slab_guard` to still be alive — see the
222 /// field-order invariant on `_slab_guard`. Surviving tasks may
223 /// also trigger `TaskRef::Drop → dispose_terminal`, which reads
224 /// the runtime's cross-wake context from TLS — see the field-order
225 /// invariant on `_cross_wake_tls_guard` below.
226 executor: Executor,
227
228 /// Clears the runtime's `CURRENT_RUNTIME_CTX` TLS slot on drop.
229 ///
230 /// **MUST drop AFTER `executor`**: when `Executor::drop` walks
231 /// `all_tasks` and frees terminal tasks, any TaskRef::Drop fired
232 /// from cross-thread holders (or local wakers cleaned up during
233 /// teardown) reads `CURRENT_RUNTIME_CTX` via
234 /// `crate::cross_wake::on_owning_executor` to decide whether to
235 /// defer locally or queue cross-thread. If this guard drops before
236 /// `executor`, the on-thread fast path silently misroutes terminal
237 /// frees to the cross-queue. The `const _: ()` block below this
238 /// struct enforces the ordering at compile time.
239 ///
240 /// **FAILURE MODE: silent UAF in production for slab tasks.** If
241 /// this guard drops before `executor`, the on-thread fast path in
242 /// `dispose_terminal::on_owning_executor` silently misroutes
243 /// terminal `TaskRef::Drop` calls to the cross-queue. Nothing
244 /// drains the cross-queue at this point in shutdown. `_slab_guard`
245 /// then releases the slab backing storage. Any off-thread holder
246 /// still pointing into the freed slab memory dereferences a
247 /// dangling pointer. Do NOT modify the field declaration order
248 /// without re-running miri tree-borrows on the full test suite AND
249 /// the BUG-4 unwind regression tests.
250 _cross_wake_tls_guard: crate::cross_wake::RuntimeCrossWakeGuard,
251
252 /// IO driver (mio). Wrapped in `UnsafeCell` because a raw pointer
253 /// is stored in TLS during `block_on`. Task futures access the IO
254 /// driver through TLS (e.g., `TcpStream::poll_read`), while the
255 /// run loop accesses it through `&mut self` (e.g., `poll_io()`).
256 /// Without `UnsafeCell`, `&mut self` would invalidate the TLS
257 /// pointer's provenance — see `Executor` docs for the full
258 /// explanation.
259 io: std::cell::UnsafeCell<IoDriver>,
260
261 /// Timer driver. Same `UnsafeCell` rationale — `Sleep::poll` accesses
262 /// through a stored raw pointer, `run_loop` accesses through `&mut self`.
263 timers: std::cell::UnsafeCell<TimerDriver>,
264
265 /// World access handle.
266 ctx: WorldCtx,
267
268 /// Event-cycle timestamp.
269 event_time: Cell<Instant>,
270
271 /// Graceful shutdown handle.
272 shutdown: crate::ShutdownHandle,
273
274 /// Cross-thread wake context. Shared with cross-thread wakers via Arc.
275 /// Contains the intrusive MPSC inbox + mio::Waker for eventfd.
276 cross_wake: std::sync::Arc<crate::cross_wake::CrossWakeContext>,
277
278 /// Max cross-thread wakes drained per poll cycle.
279 cross_thread_drain_limit: usize,
280
281 /// Loop iterations between non-blocking IO polls.
282 event_interval: u32,
283
284 /// Slab allocator + TLS install. Owned via a single guard so that
285 /// TLS dispatch stays valid for the Runtime's entire lifetime.
286 ///
287 /// **MUST drop AFTER `executor`**: when `Executor::drop` frees
288 /// surviving slab tasks via TLS dispatch, the slab and its install
289 /// must still be alive. Reordering re-introduces BUG-1 (#167) — a
290 /// panic at `Runtime::drop` from surviving slab tasks calling into
291 /// a cleared TLS dispatch path. The `const _: ()` block below this
292 /// struct enforces the ordering at compile time.
293 _slab_guard: Option<crate::alloc::SlabGuard>,
294
295 /// Tracks Runtime presence on the thread. Installed at construction
296 /// (panics if another Runtime is already alive), cleared on drop.
297 /// Declared after `_slab_guard` so the "Runtime alive" flag stays
298 /// set throughout the entire drop sequence — defensive against any
299 /// inner Drop impl trying to construct another Runtime mid-teardown.
300 _runtime_presence: RuntimePresenceGuard,
301
302 /// Marker — `Runtime` is intrinsically thread-bound (per-thread TLS
303 /// state). `*const ()` is `!Send + !Sync`; the `PhantomData`
304 /// propagates that at the type level regardless of other field
305 /// changes. See the `compile_fail` doc-tests on `Runtime`.
306 _not_thread_safe: PhantomData<*const ()>,
307}
308
309// =============================================================================
310// Runtime field ordering — invariants
311// =============================================================================
312//
313// Field declaration order in `struct Runtime` is load-bearing. Each
314// field has a position relative to others enforced by the requirements
315// below. Rust's default layout doesn't guarantee declaration order,
316// but `#[repr(C)]` + the `const _: ()` asserts below catch any
317// reordering at compile time regardless of layout.
318//
319// Order (top → bottom = first → last drop):
320//
321// executor ← drops first; frees tasks
322// _cross_wake_tls_guard ← drops second; clears CURRENT_RUNTIME_CTX TLS
323// io ← UnsafeCell, no Drop concerns
324// timers ← UnsafeCell, no Drop concerns
325// ctx ← WorldCtx, holds raw pointer to user's World
326// event_time ← Cell<Instant>, trivial
327// shutdown ← clone of ShutdownHandle, has Arc inside
328// cross_wake ← Arc<CrossWakeContext>; off-thread holders may
329// still exist; Arc keeps it alive past Runtime drop
330// cross_thread_drain_limit, event_interval ← trivial
331// _slab_guard ← MUST drop AFTER executor (invariant 1)
332// _runtime_presence ← MUST drop AFTER everything (invariant 3)
333// _not_thread_safe ← PhantomData, no Drop
334//
335// Invariants:
336//
337// 1. `_slab_guard` after `executor`
338// Reason: BUG-1 (#167). When `Executor::drop` walks `all_tasks`
339// and encounters slab-allocated survivors, it dispatches their
340// `free_fn` through TLS. The TLS install lives on `_slab_guard`.
341// If `_slab_guard` drops first, slab tasks see the no-slab panic
342// stub.
343// Enforced: `const _:` offset assert below (added pre-PR-1a).
344//
345// 2. `_cross_wake_tls_guard` after `executor`
346// Reason: PR 1a. When `Executor::drop` walks `all_tasks` and
347// triggers `TaskRef::Drop`, the terminal-drop routing in
348// `dispose_terminal` reads `CURRENT_RUNTIME_CTX` to decide
349// on-thread (defer) vs off-thread (queue). If `_cross_wake_tls_guard`
350// drops first, the on-thread fast path silently misroutes terminal
351// frees to the cross-queue, where nothing drains them — leak,
352// OR for slab tasks, eventual UAF when `_slab_guard` releases
353// the slab backing storage.
354// FAILURE MODE: silent UAF in production for slab tasks.
355// Enforced: `const _:` offset assert below (added in PR 1a).
356//
357// 3. `_runtime_presence` after everything else
358// Reason: defensive. Some inner Drop impl might attempt to construct
359// another Runtime mid-teardown. With `_runtime_presence` dropping
360// last, the "Runtime alive" flag remains set, and that nested
361// construction panics rather than silently corrupting TLS.
362// Enforced: convention. No const_assert because there are too many
363// fields to assert against; relies on the doc-block + code review.
364//
365// 4. `cross_wake` outlives off-thread holders implicitly via Arc
366// Reason: cross-thread wakers (channel slots, tokio_compat) hold an
367// Arc<CrossWakeContext>. The Arc keeps the queue alive after Runtime
368// drops. When the LAST off-thread waker drops its Arc, the queue is
369// finally freed. Off-thread `dispose_terminal` calling
370// `try_set_queued + push` on a queue whose Runtime has been dropped
371// is safe: the queue is alive, we push to it, but no consumer
372// drains. The terminal task pointer leaks (memory, not UAF). PR 2
373// §2.3's `ShutdownStats` will surface this as a counter.
374//
375// Adding new fields:
376// - Place trivial fields anywhere; non-trivial fields go to the
377// bottom of the appropriate group.
378// - If your field has a Drop with cross-thread implications, document
379// the invariant here AND add an `offset_of` assert next to the
380// existing ones.
381// - When in doubt, add to the bottom and document.
382
383// BUG-1 (#167) invariant: `_slab_guard` MUST drop after `executor`.
384// Field drop order is declaration order, and offset is a proxy: a
385// later-declared field has a higher offset (modulo alignment padding,
386// which preserves order). If anyone reorders the fields above, this
387// fires at compile time.
388const _: () = assert!(
389 std::mem::offset_of!(Runtime, _slab_guard) > std::mem::offset_of!(Runtime, executor),
390 "BUG-1 (#167) invariant violated: Runtime::_slab_guard MUST be \
391 declared after Runtime::executor so it drops after the executor \
392 frees surviving slab tasks. Restore the declaration order or BUG-1 \
393 reappears as a panic at Runtime::drop."
394);
395
396// PR 1a (TaskRef + dispose_terminal) invariant: `_cross_wake_tls_guard`
397// MUST drop after `executor`. The executor's drop path runs TaskRef::Drop
398// for any cross-thread holder ref that lands during teardown; those
399// drops route through `dispose_terminal` which checks
400// `CURRENT_RUNTIME_CTX` to pick the on-thread (defer) vs off-thread
401// (queue) branch. If this guard clears the TLS before `executor`
402// finishes, the comparison silently fails and on-thread terminal frees
403// get misrouted to the cross-queue (where nothing drains them — leak,
404// or for slab tasks, eventual UAF when the slab backing storage drops
405// behind `_slab_guard`).
406const _: () = assert!(
407 std::mem::offset_of!(Runtime, _cross_wake_tls_guard) > std::mem::offset_of!(Runtime, executor),
408 "PR 1a invariant violated: Runtime::_cross_wake_tls_guard MUST be \
409 declared after Runtime::executor so the runtime's CURRENT_RUNTIME_CTX \
410 TLS stays installed while Executor::drop runs. Reordering \
411 misroutes terminal TaskRef::Drop calls during teardown."
412);
413
414impl Runtime {
415 /// Create a runtime with default settings. Box-allocated tasks only.
416 ///
417 /// For slab allocation or custom configuration, use [`Runtime::builder`].
418 pub fn new(world: &mut nexus_rt::World) -> Self {
419 RuntimeBuilder::new(world).build()
420 }
421
422 /// Create a runtime via the builder pattern.
423 pub fn builder(world: &mut nexus_rt::World) -> RuntimeBuilder<'_> {
424 RuntimeBuilder::new(world)
425 }
426
427 /// Returns a [`ShutdownHandle`](crate::ShutdownHandle) for triggering or observing shutdown.
428 pub fn shutdown_handle(&self) -> crate::ShutdownHandle {
429 self.shutdown.clone()
430 }
431
432 /// Install signal handlers for SIGTERM and SIGINT.
433 pub fn install_signal_handlers(&self) {
434 // SAFETY: single-threaded, called during setup before block_on.
435 crate::shutdown::install_signal_handlers(
436 &self.shutdown.flag_ptr(),
437 &unsafe { &*self.io.get() }.mio_waker(),
438 );
439 }
440
441 /// Number of live spawned tasks.
442 pub fn task_count(&self) -> usize {
443 self.executor.task_count()
444 }
445
446 /// Returns a handle to the abnormal-shutdown counter atomics.
447 /// **Hold the handle past `drop(runtime)`** to inspect final
448 /// values — the counters fire DURING `Executor::drop`, so a
449 /// snapshot taken before drop will show all zeros for the
450 /// shutdown-only counters.
451 ///
452 /// Useful as a signal — if any counter is non-zero, the shutdown
453 /// hit a defensive path that should be investigated.
454 ///
455 /// ```ignore
456 /// let stats_handle = runtime.shutdown_stats();
457 /// drop(runtime);
458 /// let stats = stats_handle.snapshot();
459 /// if stats.aborted_unwinds != 0
460 /// || stats.leaked_box_tasks != 0
461 /// || stats.unbalanced_normal_shutdowns != 0
462 /// || stats.cross_queue_undrained != 0
463 /// {
464 /// // user's own observability — log to wherever they want
465 /// my_logger::warn!("nexus runtime shutdown: {stats:?}");
466 /// }
467 /// ```
468 ///
469 /// Per PR 2's design (CALLOUT 5 of the plan), the runtime emits no
470 /// log events of its own when these counters fire — users own
471 /// their observability stack. The PR 1a `eprintln!` calls in the
472 /// slab-unwinding-abort path remain (the only signal at the
473 /// moment of process abort) but new abnormal paths added in PR 2
474 /// are pure counter increments.
475 ///
476 /// # Counters
477 ///
478 /// See [`ShutdownStats`](crate::ShutdownStats) for what each
479 /// counter signifies, and [`ShutdownStatsAtomics::snapshot`] for
480 /// the read API on the returned handle.
481 pub fn shutdown_stats(&self) -> std::sync::Arc<crate::ShutdownStatsAtomics> {
482 self.executor.shutdown_stats()
483 }
484}
485
486// =============================================================================
487// RuntimeBuilder
488// =============================================================================
489
490/// Type-erased closure that boxes the slab and returns (ownership, TLS config).
491type SlabInstaller = Box<dyn FnOnce() -> (Box<dyn std::any::Any>, crate::alloc::SlabTlsConfig)>;
492
493/// Builder for configuring a [`Runtime`].
494///
495/// # Examples
496///
497/// ```ignore
498/// use nexus_async_rt::*;
499/// use nexus_slab::byte::unbounded::Slab;
500///
501/// let mut world = nexus_rt::WorldBuilder::new().build();
502/// let slab = unsafe { Slab::<256>::with_chunk_capacity(64) };
503///
504/// let mut rt = Runtime::builder(&mut world)
505/// .tasks_per_cycle(128)
506/// .slab_unbounded(slab)
507/// .signal_handlers(true)
508/// .build();
509/// ```
510pub struct RuntimeBuilder<'w> {
511 world: &'w mut nexus_rt::World,
512 tasks_per_cycle: usize,
513 cross_thread_drain_limit: usize,
514 event_interval: u32,
515 queue_capacity: usize,
516 event_capacity: usize,
517 token_capacity: usize,
518 signal_handlers: bool,
519 /// Type-erased slab + guard installer. None = no slab (Box-only).
520 slab_installer: Option<SlabInstaller>,
521}
522
523impl<'w> RuntimeBuilder<'w> {
524 fn new(world: &'w mut nexus_rt::World) -> Self {
525 Self {
526 world,
527 tasks_per_cycle: crate::DEFAULT_TASKS_PER_CYCLE,
528 cross_thread_drain_limit: usize::MAX,
529 event_interval: DEFAULT_EVENT_INTERVAL,
530 queue_capacity: 64,
531 event_capacity: 1024,
532 token_capacity: 64,
533 signal_handlers: false,
534 slab_installer: None,
535 }
536 }
537
538 /// Maximum tasks polled per cycle before yielding to check IO.
539 /// Default: 64.
540 #[must_use]
541 pub fn tasks_per_cycle(mut self, limit: usize) -> Self {
542 self.tasks_per_cycle = limit;
543 self
544 }
545
546 /// Number of loop iterations between non-blocking IO driver polls.
547 /// Default: 61 (matches tokio's heuristic).
548 ///
549 /// Every `event_interval` iterations the runtime does a non-blocking
550 /// `epoll_wait(0)` to check for socket events, even if tasks are
551 /// ready. Lower values improve IO responsiveness at the cost of
552 /// more syscalls; higher values favor task throughput.
553 #[must_use]
554 pub fn event_interval(mut self, n: u32) -> Self {
555 assert!(n > 0, "event_interval must be > 0");
556 self.event_interval = n;
557 self
558 }
559
560 /// Maximum cross-thread wakes drained per poll cycle.
561 /// Default: unlimited.
562 ///
563 /// Caps how many tasks woken from other threads are moved into the
564 /// local ready queue per iteration. Prevents a firehose of
565 /// cross-thread wakes from starving local tasks and IO. Remaining
566 /// wakes are drained on the next iteration.
567 #[must_use]
568 pub fn cross_thread_drain_limit(mut self, limit: usize) -> Self {
569 self.cross_thread_drain_limit = limit;
570 self
571 }
572
573 /// Pre-allocated capacity for internal queues. Default: 64.
574 #[must_use]
575 pub fn queue_capacity(mut self, cap: usize) -> Self {
576 self.queue_capacity = cap;
577 self
578 }
579
580 /// Maximum IO events processed per epoll cycle. Default: 1024.
581 #[must_use]
582 pub fn event_capacity(mut self, cap: usize) -> Self {
583 self.event_capacity = cap;
584 self
585 }
586
587 /// Initial number of IO source slots. Default: 64.
588 #[must_use]
589 pub fn token_capacity(mut self, cap: usize) -> Self {
590 self.token_capacity = cap;
591 self
592 }
593
594 /// Install SIGTERM/SIGINT signal handlers. Default: false.
595 #[must_use]
596 pub fn signal_handlers(mut self, enable: bool) -> Self {
597 self.signal_handlers = enable;
598 self
599 }
600
601 /// Hand off a growable (unbounded) slab for [`spawn_slab`].
602 ///
603 /// `S` is the total slot size in bytes. The task header uses 72 bytes,
604 /// so `Slab<256>` gives 184 bytes for the future. Most async IO
605 /// futures are 128–256 bytes — `Slab<256>` or `Slab<512>` covers
606 /// the common cases.
607 ///
608 /// The slab grows by allocating new chunks when full. No task spawn
609 /// will ever fail due to capacity.
610 ///
611 /// # Examples
612 ///
613 /// ```ignore
614 /// use nexus_slab::byte::unbounded::Slab;
615 ///
616 /// // SAFETY: single-threaded runtime.
617 /// let slab = unsafe { Slab::<256>::with_chunk_capacity(64) };
618 ///
619 /// let mut rt = Runtime::builder(&mut world)
620 /// .slab_unbounded(slab)
621 /// .build();
622 /// ```
623 pub fn slab_unbounded<const S: usize>(
624 mut self,
625 slab: nexus_slab::byte::unbounded::Slab<S>,
626 ) -> Self {
627 const {
628 assert!(
629 S >= 72,
630 "slab slot size must be at least 72 bytes (TASK_HEADER_SIZE)"
631 );
632 }
633 self.slab_installer = Some(Box::new(move || {
634 let mut slab = Box::new(slab);
635 // Derive pointer via &mut to get write provenance. Using &ref
636 // gives read-only provenance under stacked borrows, but the
637 // allocator writes through this pointer.
638 let slab_ptr = std::ptr::from_mut(slab.as_mut()).cast::<u8>();
639 let config = crate::alloc::make_unbounded_config::<S>(slab_ptr);
640 (slab as Box<dyn std::any::Any>, config)
641 }));
642 self
643 }
644
645 /// Hand off a fixed-capacity (bounded) slab for [`spawn_slab`].
646 ///
647 /// `S` is the total slot size in bytes. The slab has a fixed number
648 /// of slots — `spawn_slab` panics if the slab is full. Use this
649 /// when you want deterministic memory usage and know the maximum
650 /// number of concurrent hot-path tasks.
651 ///
652 /// # Examples
653 ///
654 /// ```ignore
655 /// use nexus_slab::byte::bounded::Slab;
656 ///
657 /// // SAFETY: single-threaded runtime.
658 /// let slab = unsafe { Slab::<256>::with_capacity(64) };
659 ///
660 /// let mut rt = Runtime::builder(&mut world)
661 /// .slab_bounded(slab)
662 /// .build();
663 /// ```
664 pub fn slab_bounded<const S: usize>(
665 mut self,
666 slab: nexus_slab::byte::bounded::Slab<S>,
667 ) -> Self {
668 const {
669 assert!(
670 S >= 72,
671 "slab slot size must be at least 72 bytes (TASK_HEADER_SIZE)"
672 );
673 }
674 self.slab_installer = Some(Box::new(move || {
675 let mut slab = Box::new(slab);
676 // Derive pointer via &mut to get write provenance. Using &ref
677 // gives read-only provenance under stacked borrows, but the
678 // allocator writes through this pointer.
679 let slab_ptr = std::ptr::from_mut(slab.as_mut()).cast::<u8>();
680 let config = crate::alloc::make_bounded_config::<S>(slab_ptr);
681 (slab as Box<dyn std::any::Any>, config)
682 }));
683 self
684 }
685
686 /// Build the runtime.
687 pub fn build(self) -> Runtime {
688 // Fail-fast if another Runtime is already alive on this thread.
689 // Done before any resource allocation so we don't leak IoDriver,
690 // mio::Poll, etc. on the panic path.
691 let runtime_presence = RuntimePresenceGuard::install();
692
693 let io = IoDriver::new(self.event_capacity, self.token_capacity)
694 .expect("failed to create mio::Poll");
695 let mut shutdown = crate::ShutdownHandle::new();
696 shutdown.set_mio_waker(io.mio_waker());
697
698 let mut executor = Executor::new(self.queue_capacity);
699 executor.set_tasks_per_cycle(self.tasks_per_cycle);
700
701 let ctx = WorldCtx::new(self.world);
702 let event_time = Cell::new(Instant::now());
703
704 // Create slab if configured and install TLS immediately. The
705 // returned guard owns the slab and the TLS install; it lives
706 // on Runtime so it drops AFTER `executor` (which frees surviving
707 // slab tasks via TLS dispatch). This is the architectural fix
708 // for BUG-1 (#167) — TLS scope now matches Runtime lifetime
709 // instead of run_loop scope.
710 let slab_guard = self.slab_installer.map(|install| {
711 let (slab, config) = install();
712 crate::alloc::install_slab(slab, &config)
713 });
714
715 let cross_wake = std::sync::Arc::new(crate::cross_wake::CrossWakeContext {
716 queue: crate::cross_wake::CrossWakeQueue::new(),
717 mio_waker: io.mio_waker(),
718 parked: std::sync::atomic::AtomicBool::new(false),
719 });
720
721 // Wire the cross-wake context into the executor for the
722 // shutdown-time `cross_queue_undrained` tally (PR 2 §2.3).
723 // Bare Executor use in tests has no Runtime, no cross-wake
724 // context, no tally — we install it here for the Runtime
725 // path only.
726 executor.install_cross_wake_for_drop(std::sync::Arc::clone(&cross_wake));
727
728 // Install the runtime's cross-wake context as the current-thread
729 // owning-executor identity. Lives lifetime-of-Runtime via the
730 // guard field below — `dispose_terminal::on_owning_executor`
731 // reads this slot to decide local-defer vs cross-queue routing
732 // for TaskRef terminal drops.
733 let cross_wake_tls_guard = crate::cross_wake::install_runtime_cross_wake(&cross_wake);
734
735 let rt = Runtime {
736 executor,
737 _cross_wake_tls_guard: cross_wake_tls_guard,
738 io: std::cell::UnsafeCell::new(io),
739 timers: std::cell::UnsafeCell::new(TimerDriver::new(64)),
740 ctx,
741 event_time,
742 shutdown,
743 cross_wake,
744 cross_thread_drain_limit: self.cross_thread_drain_limit,
745 event_interval: self.event_interval,
746 _slab_guard: slab_guard,
747 _runtime_presence: runtime_presence,
748 _not_thread_safe: PhantomData,
749 };
750
751 if self.signal_handlers {
752 rt.install_signal_handlers();
753 }
754
755 rt
756 }
757}
758
759// =============================================================================
760// block_on / run_loop
761// =============================================================================
762
763impl Runtime {
764 /// Drive the root future to completion. CPU-friendly.
765 ///
766 /// Parks the thread when no work is available.
767 pub fn block_on<F>(&mut self, future: F) -> F::Output
768 where
769 F: Future + 'static,
770 {
771 self.run_loop(future, ParkMode::Park)
772 }
773
774 /// Drive the root future to completion. Busy-wait.
775 ///
776 /// Never parks. Minimum wake latency at 100% CPU.
777 pub fn block_on_busy<F>(&mut self, future: F) -> F::Output
778 where
779 F: Future + 'static,
780 {
781 self.run_loop(future, ParkMode::Spin)
782 }
783
784 /// Drive the executor until pending cross-thread work has settled,
785 /// before shutdown. The canonical "quiesce" step before
786 /// `drop(runtime)` — see `docs/SHUTDOWN.md` for the full pattern.
787 ///
788 /// Loops while:
789 /// 1. The cross-thread queue has entries (drains them).
790 /// 2. The local ready queue has entries (polls them).
791 ///
792 /// Returns `Ok(())` once both are empty (or detected to no longer
793 /// receive new entries). Returns `Err(QuiesceTimeout)` if `timeout`
794 /// elapses first; the error contains diagnostic counts useful for
795 /// determining which producer didn't release its refs.
796 ///
797 /// **This is for clean shutdown, not panic-during-shutdown.** The
798 /// 100ms unwinding-wait in `Executor::drop` remains as
799 /// defense-in-depth for the panic case (where this method can't
800 /// be called).
801 ///
802 /// **The `timeout` parameter has no default — callers must pick a
803 /// budget deliberately.** PR 2 §2.4 open-item 4 evaluated `100ms`
804 /// (matches the unwinding defense), `500ms` (forgiving), and
805 /// "parameter-only" — chose parameter-only to force the user to
806 /// pick a budget appropriate for their producer landscape (a
807 /// trading-system shutdown sequence with multiple Aeron drivers
808 /// plus tokio futures plus channel senders has very different
809 /// settling characteristics than a unit test).
810 ///
811 /// # Canonical shutdown sequence
812 ///
813 /// ```ignore
814 /// // 1. Stop producers of cross-thread refs:
815 /// // - Drop tokio runtime (or shutdown_timeout)
816 /// // - Stop Aeron driver thread
817 /// // - Drop external channel senders
818 ///
819 /// // 2. Quiesce.
820 /// runtime.shutdown_quiesce(Duration::from_millis(500))?;
821 ///
822 /// // 3. Drop the Runtime. Outstanding-ref panic paths in
823 /// // Executor::drop should be unreachable in normal operation.
824 /// drop(runtime);
825 /// ```
826 ///
827 /// If step 2 returns `QuiesceTimeout`, a producer hasn't released
828 /// its refs. Investigate before letting Runtime drop — the
829 /// unwind-abort path in `Executor::drop` is defensive, not
830 /// desired.
831 pub fn shutdown_quiesce(&mut self, timeout: Duration) -> Result<(), QuiesceTimeout> {
832 // Install the same TLS context block_on uses, so any cross-thread
833 // wakes that fire during quiesce still find a runtime.
834 let _ctx_guard = crate::context::install(
835 self.ctx.as_ptr(),
836 self.io.get(),
837 self.timers.get(),
838 &raw const self.event_time,
839 std::sync::Arc::as_ptr(&self.shutdown.flag_ptr()),
840 std::ptr::from_ref(&self.shutdown.task_waker),
841 );
842 let _cross_wake_guard = crate::cross_wake::install_cross_wake(&self.cross_wake);
843 let _spawn_guard = RuntimeGuard::enter(&raw mut self.executor);
844 let (ready, deferred) = self.executor.poll_context_ptrs();
845 let _ready_guard = crate::waker::set_poll_context(ready, deferred);
846
847 let cross_queue = &*self.cross_wake;
848 let start = Instant::now();
849
850 loop {
851 // Drain whatever's in the cross-thread queue. The returned
852 // count tells us if anything was pending (a non-consuming
853 // "is empty" check on the Vyukov queue would race the
854 // producer; drain-and-count is the right primitive).
855 let drained_this_pass = self
856 .executor
857 .drain_cross_thread(&cross_queue.queue, self.cross_thread_drain_limit);
858
859 // Poll any ready tasks (drains the local ready queue).
860 self.executor.poll();
861
862 // Quiesced means: `all_tasks` is empty (no tasks with
863 // outstanding refs — completed-but-held tasks would still
864 // fire the abnormal-shutdown branches in `Executor::drop`),
865 // no ready work pending, no cross-queue entries drained
866 // THIS pass. Live or parked tasks that are still in
867 // `all_tasks` count as not-quiesced — they're holding
868 // refs that prevent a clean Runtime drop, even if they're
869 // not making progress.
870 //
871 // Use `outstanding_tasks` (`all_tasks.len()`), NOT
872 // `task_count` (`live_count`). `live_count` decrements
873 // unconditionally on completion; `all_tasks` only loses an
874 // entry when its refcount actually hits zero. A completed
875 // task with a held `JoinHandle` has `live_count -= 1` but
876 // is still in `all_tasks` — quiesce-as-Ok with
877 // `task_count == 0` would mis-claim quiesced and the
878 // user's subsequent drop would fire the
879 // `unbalanced_normal_shutdowns` branch
880 // (PR2-John-review item 2).
881 let has_ready = self.executor.has_ready();
882 let all_tasks_empty = self.executor.outstanding_tasks() == 0;
883 if !has_ready && drained_this_pass == 0 && all_tasks_empty {
884 return Ok(());
885 }
886
887 if start.elapsed() >= timeout {
888 // Final drain to count what's left in the cross-queue
889 // for the diagnostic. Anything after this drain is a
890 // post-timeout race — not counted (and at this point
891 // the user is about to investigate or drop).
892 let remaining_cross_queue = self
893 .executor
894 .drain_cross_thread(&cross_queue.queue, usize::MAX)
895 as u64;
896 return Err(QuiesceTimeout {
897 remaining_cross_queue,
898 remaining_outstanding_refs: self.executor.outstanding_tasks() as u64,
899 elapsed: start.elapsed(),
900 });
901 }
902
903 // Avoid a tight spin on transient "queue popped a stub"
904 // states. yield_now is a hint to the scheduler.
905 std::thread::yield_now();
906 }
907 }
908
909 fn run_loop<F>(&mut self, future: F, mode: ParkMode) -> F::Output
910 where
911 F: Future + 'static,
912 {
913 // Install TLS context.
914 let _ctx_guard = crate::context::install(
915 self.ctx.as_ptr(),
916 self.io.get(),
917 self.timers.get(),
918 &raw const self.event_time,
919 std::sync::Arc::as_ptr(&self.shutdown.flag_ptr()),
920 std::ptr::from_ref(&self.shutdown.task_waker),
921 );
922
923 // Slab TLS is installed at Runtime construction (BUG-1 #167 fix)
924 // and torn down when the Runtime drops — no longer scoped to
925 // run_loop, so nothing to install here.
926
927 // Install cross-thread wake context in TLS.
928 let _cross_wake_guard = crate::cross_wake::install_cross_wake(&self.cross_wake);
929
930 let mut root: Pin<Box<dyn Future<Output = F::Output>>> = Box::pin(future);
931
932 let woken = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
933 let root_waker = Waker::from(std::sync::Arc::new(RootWake {
934 woken: std::sync::Arc::clone(&woken),
935 // SAFETY: single-threaded, called during block_on setup.
936 mio_waker: unsafe { &*self.io.get() }.mio_waker(),
937 }));
938 let mut root_cx = Context::from_waker(&root_waker);
939
940 // Install spawn TLS.
941 let _spawn_guard = RuntimeGuard::enter(&raw mut self.executor);
942
943 // Install waker TLS: ready queue + deferred free list.
944 // Uses UnsafeCell::get() to derive pointers that survive &mut self reborrows.
945 let (ready, deferred) = self.executor.poll_context_ptrs();
946 let _ready_guard = crate::waker::set_poll_context(ready, deferred);
947
948 self.event_time.set(Instant::now());
949
950 // The cross-thread queue uses interior mutability (UnsafeCell)
951 // for the consumer head. pop() takes &self, so a shared ref
952 // from the Arc is sufficient. No unsafe cast needed.
953 let cross_queue = &*self.cross_wake;
954
955 let mut tick: u32 = 0;
956
957 loop {
958 // 1. Poll root future if woken or shutdown requested.
959 if woken.swap(false, std::sync::atomic::Ordering::Acquire)
960 || self.shutdown.is_shutdown()
961 {
962 match root.as_mut().poll(&mut root_cx) {
963 Poll::Ready(output) => return output,
964 Poll::Pending => {}
965 }
966 }
967
968 // 2. Drain cross-thread inbox.
969 self.executor
970 .drain_cross_thread(&cross_queue.queue, self.cross_thread_drain_limit);
971
972 // 3. Poll ready tasks (up to tasks_per_cycle).
973 self.executor.poll();
974
975 // 4. Fire expired timers.
976 // SAFETY: single-threaded runtime, no concurrent access.
977 unsafe { &mut *self.timers.get() }.fire_expired(Instant::now());
978
979 // 4.5. Set parked early (park mode only) so cross-thread
980 // wakers arriving from here on will poke the eventfd.
981 if matches!(mode, ParkMode::Park) {
982 cross_queue
983 .parked
984 .store(true, std::sync::atomic::Ordering::Release);
985 }
986
987 // 5. Drain cross-thread inbox again (wakes during step 3/4).
988 self.executor
989 .drain_cross_thread(&cross_queue.queue, self.cross_thread_drain_limit);
990
991 tick = tick.wrapping_add(1);
992
993 // 6. Periodic non-blocking IO check every event_interval ticks.
994 // Prevents IO starvation under sustained task load.
995 if tick % self.event_interval == 0 {
996 if let Err(e) = unsafe { &mut *self.io.get() }.poll_io(Some(Duration::ZERO)) {
997 assert!(
998 e.kind() == std::io::ErrorKind::Interrupted,
999 "mio::Poll::poll failed: {e}"
1000 );
1001 }
1002 self.event_time.set(Instant::now());
1003 }
1004
1005 // 7. If work remains, loop immediately.
1006 let has_work =
1007 self.executor.has_ready() || woken.load(std::sync::atomic::Ordering::Acquire);
1008
1009 if has_work {
1010 if matches!(mode, ParkMode::Park) {
1011 cross_queue
1012 .parked
1013 .store(false, std::sync::atomic::Ordering::Release);
1014 }
1015 continue;
1016 }
1017
1018 // 8. No work. Spin mode loops; park mode sleeps in epoll.
1019 match mode {
1020 ParkMode::Spin => {
1021 // Non-blocking IO check before spinning again.
1022 if let Err(e) = unsafe { &mut *self.io.get() }.poll_io(Some(Duration::ZERO)) {
1023 assert!(
1024 e.kind() == std::io::ErrorKind::Interrupted,
1025 "mio::Poll::poll failed: {e}"
1026 );
1027 }
1028 self.event_time.set(Instant::now());
1029 }
1030 ParkMode::Park => {
1031 // parked is already true (set at step 4.5).
1032 // Park in epoll_wait until IO, timer, or cross-thread
1033 // eventfd wakes us.
1034 // SAFETY: single-threaded, no concurrent timer access.
1035 let timeout = unsafe { &*self.timers.get() }
1036 .next_deadline()
1037 .map(|d| d.saturating_duration_since(Instant::now()));
1038
1039 if let Err(e) = unsafe { &mut *self.io.get() }.poll_io(timeout) {
1040 assert!(
1041 e.kind() == std::io::ErrorKind::Interrupted,
1042 "mio::Poll::poll failed: {e}"
1043 );
1044 }
1045
1046 cross_queue
1047 .parked
1048 .store(false, std::sync::atomic::Ordering::Release);
1049 self.event_time.set(Instant::now());
1050 }
1051 }
1052 }
1053 }
1054}
1055
1056// =============================================================================
1057// QuiesceTimeout — error type for `Runtime::shutdown_quiesce`
1058// =============================================================================
1059
1060/// Returned by [`Runtime::shutdown_quiesce`] when the timeout elapses
1061/// before the executor reaches a quiesced state.
1062///
1063/// The diagnostic fields help identify which producer didn't release
1064/// its refs:
1065///
1066/// - `remaining_cross_queue`: number of cross-thread queue entries
1067/// still pending at the moment of timeout. Non-zero indicates a
1068/// producer thread is still pushing wakes faster than quiesce can
1069/// drain them, OR a final-drain wake landed after the last drain
1070/// pass — investigate which off-thread producer is still active.
1071/// - `remaining_outstanding_refs`: number of tasks still in
1072/// `Executor::all_tasks` at the moment of timeout. Each represents a
1073/// task with outstanding cross-thread refs (or a held JoinHandle).
1074/// - `elapsed`: how long quiesce ran before timing out.
1075///
1076/// PR 2 §2.4 open-item 5 noted that finer-grained diagnostics
1077/// ("which task ID had the outstanding ref") could be added if
1078/// implementation revealed them as cheap to surface. The implementation
1079/// uses `Executor::task_count()` which doesn't enumerate tasks; adding
1080/// per-task data here would require new accessors. Out of scope for
1081/// initial PR 2; future enhancement.
1082#[derive(Debug)]
1083pub struct QuiesceTimeout {
1084 /// Number of cross-thread queue entries still pending at timeout.
1085 /// Non-zero means a producer is racing the drain loop.
1086 pub remaining_cross_queue: u64,
1087 /// Number of tasks still alive at the moment of timeout. Each is
1088 /// a candidate for "producer hasn't released its refs."
1089 pub remaining_outstanding_refs: u64,
1090 /// Time elapsed inside `shutdown_quiesce` before returning timeout.
1091 /// Approximately equal to the input `timeout`.
1092 pub elapsed: Duration,
1093}
1094
1095impl std::fmt::Display for QuiesceTimeout {
1096 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1097 write!(
1098 f,
1099 "Runtime::shutdown_quiesce timed out after {:?} with {} outstanding tasks, \
1100 {} cross-queue entries pending",
1101 self.elapsed, self.remaining_outstanding_refs, self.remaining_cross_queue
1102 )
1103 }
1104}
1105
1106impl std::error::Error for QuiesceTimeout {}
1107
1108// =============================================================================
1109// Park mode
1110// =============================================================================
1111
1112#[derive(Clone, Copy)]
1113enum ParkMode {
1114 Park,
1115 Spin,
1116}
1117
1118// =============================================================================
1119// Root future waker
1120// =============================================================================
1121
1122struct RootWake {
1123 woken: std::sync::Arc<std::sync::atomic::AtomicBool>,
1124 mio_waker: std::sync::Arc<mio::Waker>,
1125}
1126
1127impl Wake for RootWake {
1128 fn wake(self: std::sync::Arc<Self>) {
1129 self.wake_by_ref();
1130 }
1131
1132 fn wake_by_ref(self: &std::sync::Arc<Self>) {
1133 let was_woken = self.woken.swap(true, std::sync::atomic::Ordering::Release);
1134 if !was_woken {
1135 let _ = self.mio_waker.wake();
1136 }
1137 }
1138}
1139
1140// =============================================================================
1141// RAII guard for spawn TLS
1142// =============================================================================
1143
1144struct RuntimeGuard {
1145 prev: *mut Executor,
1146}
1147
1148impl RuntimeGuard {
1149 fn enter(executor: *mut Executor) -> Self {
1150 let prev = CURRENT.with(|cell| cell.replace(executor));
1151 Self { prev }
1152 }
1153}
1154
1155impl Drop for RuntimeGuard {
1156 fn drop(&mut self) {
1157 CURRENT.with(|cell| cell.set(self.prev));
1158 }
1159}
1160
1161// =============================================================================
1162// RAII guard for Runtime presence on this thread
1163// =============================================================================
1164//
1165// Enforces "at most one Runtime alive per thread" at construction time. This
1166// is the right scope because:
1167//
1168// - Slab TLS is installed at construction (post BUG-1 fix). A second
1169// construction would silently overwrite the first's slab dispatch state,
1170// corrupting allocator routing for the first Runtime's surviving tasks.
1171// - !Send + !Sync prevents cross-thread coexistence at the type level.
1172// This guard prevents same-thread coexistence at runtime.
1173//
1174// Different from `RuntimeGuard` above: that one is per-`block_on` for spawn
1175// TLS, this one is per-Runtime for existence tracking.
1176
1177thread_local! {
1178 static RUNTIME_PRESENT: Cell<bool> = const { Cell::new(false) };
1179}
1180
1181pub(crate) struct RuntimePresenceGuard;
1182
1183impl RuntimePresenceGuard {
1184 /// Install the Runtime-presence flag. Panics if another Runtime is
1185 /// already alive on this thread.
1186 fn install() -> Self {
1187 assert!(
1188 !RUNTIME_PRESENT.with(Cell::get),
1189 "nexus-async-rt: another Runtime is already alive on this \
1190 thread. Only one Runtime is supported per thread because \
1191 thread-local state (slab dispatch, IO/timer drivers, \
1192 cross-thread wake context) cannot be shared between \
1193 Runtimes. Drop the existing Runtime first."
1194 );
1195 RUNTIME_PRESENT.with(|c| c.set(true));
1196 Self
1197 }
1198}
1199
1200impl Drop for RuntimePresenceGuard {
1201 fn drop(&mut self) {
1202 RUNTIME_PRESENT.with(|c| c.set(false));
1203 }
1204}
1205
1206// =============================================================================
1207// Tests
1208// =============================================================================
1209
1210#[cfg(test)]
1211mod tests {
1212 use super::*;
1213 use nexus_rt::{Handler, IntoHandler, Res, ResMut, WorldBuilder};
1214
1215 nexus_rt::new_resource!(Val(u64));
1216 nexus_rt::new_resource!(Out(u64));
1217
1218 #[test]
1219 fn block_on_returns_value() {
1220 let mut wb = WorldBuilder::new();
1221 wb.register(Val(42));
1222 let mut world = wb.build();
1223
1224 let mut rt = Runtime::new(&mut world);
1225 let result = rt.block_on(async { 42u64 });
1226 assert_eq!(result, 42);
1227 }
1228
1229 #[test]
1230 fn block_on_with_world_access() {
1231 let mut wb = WorldBuilder::new();
1232 wb.register(Val(42));
1233 wb.register(Out(0));
1234 let mut world = wb.build();
1235
1236 let mut rt = Runtime::new(&mut world);
1237
1238 let result = rt.block_on(async move {
1239 crate::WorldCtx::current().with_world(|world| {
1240 let v = world.resource::<Val>().0;
1241 world.resource_mut::<Out>().0 = v + 10;
1242 });
1243 crate::WorldCtx::current().with_world_ref(|world| world.resource::<Out>().0)
1244 });
1245
1246 assert_eq!(result, 52);
1247 }
1248
1249 #[test]
1250 fn block_on_with_pre_resolved_handler() {
1251 let mut wb = WorldBuilder::new();
1252 wb.register(Val(42));
1253 wb.register(Out(0));
1254 let mut world = wb.build();
1255
1256 let mut rt = Runtime::new(&mut world);
1257
1258 let mut h = (|val: Res<Val>, mut out: ResMut<Out>, event: u64| {
1259 out.0 = val.0 + event;
1260 })
1261 .into_handler(world.registry());
1262
1263 let result = rt.block_on(async move {
1264 crate::WorldCtx::current().with_world(|world| h.run(world, 10));
1265 crate::WorldCtx::current().with_world_ref(|world| world.resource::<Out>().0)
1266 });
1267
1268 assert_eq!(result, 52);
1269 }
1270
1271 #[test]
1272 fn spawn_from_root_future() {
1273 let mut wb = WorldBuilder::new();
1274 wb.register(Out(0));
1275 let mut world = wb.build();
1276
1277 let mut rt = Runtime::new(&mut world);
1278
1279 rt.block_on(async move {
1280 for i in 1..=3u64 {
1281 spawn_boxed(async move {
1282 crate::WorldCtx::current().with_world(|world| {
1283 world.resource_mut::<Out>().0 += i;
1284 });
1285 });
1286 }
1287
1288 YieldOnce(false).await;
1289 });
1290
1291 assert_eq!(world.resource::<Out>().0, 6);
1292 }
1293
1294 #[test]
1295 fn block_on_busy_returns_value() {
1296 let mut wb = WorldBuilder::new();
1297 wb.register(Val(7));
1298 let mut world = wb.build();
1299
1300 let mut rt = Runtime::new(&mut world);
1301 let result = rt.block_on_busy(async { 6 * 7 });
1302 assert_eq!(result, 42);
1303 }
1304
1305 #[test]
1306 fn block_on_busy_with_spawned_tasks() {
1307 let mut wb = WorldBuilder::new();
1308 wb.register(Out(0));
1309 let mut world = wb.build();
1310
1311 let mut rt = Runtime::new(&mut world);
1312
1313 rt.block_on_busy(async move {
1314 spawn_boxed(async move {
1315 crate::WorldCtx::current().with_world(|world| {
1316 world.resource_mut::<Out>().0 = 99;
1317 });
1318 });
1319
1320 YieldOnce(false).await;
1321 });
1322
1323 assert_eq!(world.resource::<Out>().0, 99);
1324 }
1325
1326 #[test]
1327 fn event_time_is_set() {
1328 let mut wb = WorldBuilder::new();
1329 wb.register(Val(0));
1330 let mut world = wb.build();
1331
1332 let mut rt = Runtime::new(&mut world);
1333
1334 let before = Instant::now();
1335 rt.block_on(async move {
1336 let t = crate::context::event_time();
1337 assert!(t >= before);
1338 });
1339 }
1340
1341 #[test]
1342 #[should_panic(expected = "spawn_boxed() called outside of Runtime::block_on")]
1343 fn spawn_outside_runtime_panics() {
1344 spawn_boxed(async {});
1345 }
1346
1347 fn test_slab() -> nexus_slab::byte::unbounded::Slab<256> {
1348 // SAFETY: single-threaded test.
1349 unsafe { nexus_slab::byte::unbounded::Slab::with_chunk_capacity(16) }
1350 }
1351
1352 #[test]
1353 #[should_panic(expected = "spawn_slab() called without a slab")]
1354 fn spawn_slab_without_slab_panics() {
1355 let mut wb = WorldBuilder::new();
1356 let mut world = wb.build();
1357 let mut rt = Runtime::new(&mut world);
1358
1359 rt.block_on(async {
1360 spawn_slab(async {});
1361 });
1362 }
1363
1364 #[test]
1365 fn spawn_slab_with_slab() {
1366 let mut wb = WorldBuilder::new();
1367 wb.register(Out(0));
1368 let mut world = wb.build();
1369
1370 let mut rt = Runtime::builder(&mut world)
1371 .slab_unbounded(test_slab())
1372 .build();
1373
1374 rt.block_on(async move {
1375 spawn_slab(async move {
1376 crate::WorldCtx::current().with_world(|world| {
1377 world.resource_mut::<Out>().0 = 77;
1378 });
1379 });
1380
1381 YieldOnce(false).await;
1382 });
1383
1384 assert_eq!(world.resource::<Out>().0, 77);
1385 }
1386
1387 #[test]
1388 fn mixed_spawn_and_spawn_slab() {
1389 let mut wb = WorldBuilder::new();
1390 wb.register(Out(0));
1391 let mut world = wb.build();
1392
1393 let mut rt = Runtime::builder(&mut world)
1394 .slab_unbounded(test_slab())
1395 .build();
1396
1397 rt.block_on(async move {
1398 // Box-allocated
1399 spawn_boxed(async move {
1400 crate::WorldCtx::current().with_world(|world| {
1401 world.resource_mut::<Out>().0 += 10;
1402 });
1403 });
1404 // Slab-allocated
1405 spawn_slab(async move {
1406 crate::WorldCtx::current().with_world(|world| {
1407 world.resource_mut::<Out>().0 += 20;
1408 });
1409 });
1410
1411 YieldOnce(false).await;
1412 });
1413
1414 assert_eq!(world.resource::<Out>().0, 30);
1415 }
1416
1417 // =========================================================================
1418 // Claim API tests
1419 // =========================================================================
1420
1421 #[test]
1422 fn claim_slab_spawn_executes() {
1423 let mut wb = WorldBuilder::new();
1424 wb.register(Out(0));
1425 let mut world = wb.build();
1426
1427 let mut rt = Runtime::builder(&mut world)
1428 .slab_unbounded(test_slab())
1429 .build();
1430
1431 rt.block_on(async move {
1432 let claim = claim_slab();
1433 claim.spawn(async move {
1434 crate::WorldCtx::current().with_world(|world| {
1435 world.resource_mut::<Out>().0 = 55;
1436 });
1437 });
1438
1439 YieldOnce(false).await;
1440 });
1441
1442 assert_eq!(world.resource::<Out>().0, 55);
1443 }
1444
1445 #[test]
1446 fn claim_slab_drop_returns_slot() {
1447 let mut wb = WorldBuilder::new();
1448 let mut world = wb.build();
1449
1450 let bounded = unsafe { nexus_slab::byte::bounded::Slab::<256>::with_capacity(1) };
1451 let mut rt = Runtime::builder(&mut world).slab_bounded(bounded).build();
1452
1453 rt.block_on(async {
1454 // Claim the only slot, then drop without spawning.
1455 let claim = claim_slab();
1456 drop(claim);
1457
1458 // Slot should be back — can claim again.
1459 let claim = claim_slab();
1460 claim.spawn(async {});
1461
1462 YieldOnce(false).await;
1463 });
1464 }
1465
1466 #[test]
1467 fn try_claim_slab_returns_none_when_full() {
1468 let mut wb = WorldBuilder::new();
1469 let mut world = wb.build();
1470
1471 let bounded = unsafe { nexus_slab::byte::bounded::Slab::<256>::with_capacity(1) };
1472 let mut rt = Runtime::builder(&mut world).slab_bounded(bounded).build();
1473
1474 rt.block_on(async {
1475 let _held = claim_slab(); // hold the only slot
1476 assert!(try_claim_slab().is_none());
1477 });
1478 }
1479
1480 #[test]
1481 fn mixed_spawn_boxed_and_claim_slab() {
1482 let mut wb = WorldBuilder::new();
1483 wb.register(Out(0));
1484 let mut world = wb.build();
1485
1486 let mut rt = Runtime::builder(&mut world)
1487 .slab_unbounded(test_slab())
1488 .build();
1489
1490 rt.block_on(async move {
1491 spawn_boxed(async move {
1492 crate::WorldCtx::current().with_world(|world| {
1493 world.resource_mut::<Out>().0 += 10;
1494 });
1495 });
1496
1497 let claim = claim_slab();
1498 claim.spawn(async move {
1499 crate::WorldCtx::current().with_world(|world| {
1500 world.resource_mut::<Out>().0 += 20;
1501 });
1502 });
1503
1504 YieldOnce(false).await;
1505 });
1506
1507 assert_eq!(world.resource::<Out>().0, 30);
1508 }
1509
1510 // =========================================================================
1511 // Timer tests
1512 // =========================================================================
1513
1514 #[test]
1515 fn sleep_completes() {
1516 let mut wb = WorldBuilder::new();
1517 wb.register(Out(0));
1518 let mut world = wb.build();
1519
1520 let mut rt = Runtime::new(&mut world);
1521
1522 let before = Instant::now();
1523 rt.block_on(async move {
1524 crate::context::sleep(Duration::from_millis(50)).await;
1525 });
1526 let elapsed = before.elapsed();
1527
1528 assert!(
1529 elapsed >= Duration::from_millis(40),
1530 "elapsed {elapsed:?} too short"
1531 );
1532 assert!(
1533 elapsed < Duration::from_millis(200),
1534 "elapsed {elapsed:?} too long"
1535 );
1536 }
1537
1538 #[test]
1539 fn sleep_in_spawned_task() {
1540 let mut wb = WorldBuilder::new();
1541 wb.register(Out(0));
1542 let mut world = wb.build();
1543
1544 let mut rt = Runtime::new(&mut world);
1545
1546 let before = Instant::now();
1547 rt.block_on(async move {
1548 spawn_boxed(async move {
1549 crate::context::sleep(Duration::from_millis(50)).await;
1550 crate::WorldCtx::current().with_world(|world| {
1551 world.resource_mut::<Out>().0 = 42;
1552 });
1553 });
1554
1555 crate::context::sleep(Duration::from_millis(100)).await;
1556 });
1557
1558 let elapsed = before.elapsed();
1559 assert!(elapsed >= Duration::from_millis(80));
1560 assert_eq!(world.resource::<Out>().0, 42);
1561 }
1562
1563 #[test]
1564 fn sleep_zero_duration_ready_immediately() {
1565 let mut wb = WorldBuilder::new();
1566 let mut world = wb.build();
1567 let mut rt = Runtime::new(&mut world);
1568
1569 let before = Instant::now();
1570 rt.block_on(async move {
1571 crate::context::sleep(Duration::ZERO).await;
1572 });
1573 assert!(before.elapsed() < Duration::from_millis(10));
1574 }
1575
1576 #[test]
1577 fn sleep_past_deadline_ready_immediately() {
1578 let mut wb = WorldBuilder::new();
1579 let mut world = wb.build();
1580 let mut rt = Runtime::new(&mut world);
1581
1582 let past = Instant::now() - Duration::from_secs(1);
1583 let before = Instant::now();
1584 rt.block_on(async move {
1585 crate::context::sleep_until(past).await;
1586 });
1587 assert!(before.elapsed() < Duration::from_millis(10));
1588 }
1589
1590 // =========================================================================
1591 // Timeout tests
1592 // =========================================================================
1593
1594 #[test]
1595 fn timeout_completes_before_deadline() {
1596 let mut wb = WorldBuilder::new();
1597 let mut world = wb.build();
1598 let mut rt = Runtime::new(&mut world);
1599
1600 let result = rt.block_on(async {
1601 crate::context::timeout(Duration::from_millis(500), async { 42u64 }).await
1602 });
1603
1604 assert_eq!(result.unwrap(), 42);
1605 }
1606
1607 #[test]
1608 fn timeout_expires() {
1609 let mut wb = WorldBuilder::new();
1610 let mut world = wb.build();
1611 let mut rt = Runtime::new(&mut world);
1612
1613 let result = rt.block_on(async {
1614 crate::context::timeout(
1615 Duration::from_millis(10),
1616 crate::context::sleep(Duration::from_secs(10)),
1617 )
1618 .await
1619 });
1620
1621 assert!(result.is_err());
1622 }
1623
1624 // =========================================================================
1625 // Interval tests
1626 // =========================================================================
1627
1628 #[test]
1629 fn interval_ticks() {
1630 let mut wb = WorldBuilder::new();
1631 wb.register(Out(0));
1632 let mut world = wb.build();
1633 let mut rt = Runtime::new(&mut world);
1634
1635 let before = Instant::now();
1636 rt.block_on(async move {
1637 let mut iv = crate::context::interval(Duration::from_millis(20));
1638 iv.tick().await; // ~20ms
1639 iv.tick().await; // ~40ms
1640 iv.tick().await; // ~60ms
1641 });
1642 let elapsed = before.elapsed();
1643
1644 assert!(
1645 elapsed >= Duration::from_millis(50),
1646 "too fast: {elapsed:?}"
1647 );
1648 assert!(
1649 elapsed < Duration::from_millis(200),
1650 "too slow: {elapsed:?}"
1651 );
1652 }
1653
1654 // =========================================================================
1655 // yield_now tests
1656 // =========================================================================
1657
1658 #[test]
1659 fn yield_now_lets_other_tasks_run() {
1660 let mut wb = WorldBuilder::new();
1661 wb.register(Out(0));
1662 let mut world = wb.build();
1663 let mut rt = Runtime::new(&mut world);
1664
1665 rt.block_on(async move {
1666 spawn_boxed(async move {
1667 crate::WorldCtx::current().with_world(|world| {
1668 world.resource_mut::<Out>().0 = 99;
1669 });
1670 });
1671
1672 // Yield so the spawned task gets a turn.
1673 crate::context::yield_now().await;
1674
1675 let val = crate::WorldCtx::current().with_world_ref(|world| world.resource::<Out>().0);
1676 assert_eq!(val, 99);
1677 });
1678 }
1679
1680 // =========================================================================
1681 // Test helpers
1682 // =========================================================================
1683
1684 struct YieldOnce(bool);
1685
1686 impl Future for YieldOnce {
1687 type Output = ();
1688 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1689 if self.0 {
1690 Poll::Ready(())
1691 } else {
1692 self.0 = true;
1693 cx.waker().wake_by_ref();
1694 Poll::Pending
1695 }
1696 }
1697 }
1698
1699 // =========================================================================
1700 // JoinHandle tests
1701 // =========================================================================
1702
1703 #[test]
1704 fn join_handle_await_gets_value() {
1705 let wb = WorldBuilder::new();
1706 let mut world = wb.build();
1707 let mut rt = Runtime::new(&mut world);
1708
1709 rt.block_on(async {
1710 let handle = spawn_boxed(async { 42u64 });
1711 let result = handle.await;
1712 assert_eq!(result, 42);
1713 });
1714 }
1715
1716 #[test]
1717 fn join_handle_await_string() {
1718 let wb = WorldBuilder::new();
1719 let mut world = wb.build();
1720 let mut rt = Runtime::new(&mut world);
1721
1722 rt.block_on(async {
1723 let handle = spawn_boxed(async { String::from("hello world") });
1724 let result = handle.await;
1725 assert_eq!(result, "hello world");
1726 });
1727 }
1728
1729 #[test]
1730 fn join_handle_detach() {
1731 use std::cell::Cell;
1732 use std::rc::Rc;
1733
1734 let wb = WorldBuilder::new();
1735 let mut world = wb.build();
1736 let mut rt = Runtime::new(&mut world);
1737
1738 let ran = Rc::new(Cell::new(false));
1739 let r = ran.clone();
1740
1741 rt.block_on(async move {
1742 // Spawn and immediately drop handle (detach).
1743 drop(spawn_boxed(async move {
1744 r.set(true);
1745 }));
1746 // Yield to let the spawned task run.
1747 crate::context::yield_now().await;
1748 });
1749
1750 assert!(ran.get());
1751 }
1752
1753 #[test]
1754 fn join_handle_is_finished() {
1755 let wb = WorldBuilder::new();
1756 let mut world = wb.build();
1757 let mut rt = Runtime::new(&mut world);
1758
1759 rt.block_on(async {
1760 let handle = spawn_boxed(async { 1 });
1761 // The task hasn't been polled yet.
1762 assert!(!handle.is_finished());
1763 // Yield to let the task run.
1764 crate::context::yield_now().await;
1765 assert!(handle.is_finished());
1766 let val = handle.await;
1767 assert_eq!(val, 1);
1768 });
1769 }
1770
1771 #[test]
1772 fn join_handle_abort_returns_true() {
1773 let wb = WorldBuilder::new();
1774 let mut world = wb.build();
1775 let mut rt = Runtime::new(&mut world);
1776
1777 rt.block_on(async {
1778 let handle = spawn_boxed(std::future::pending::<()>());
1779 assert!(handle.abort()); // was running, handle consumed
1780 });
1781 }
1782
1783 #[test]
1784 fn join_handle_abort_completed_returns_false() {
1785 let wb = WorldBuilder::new();
1786 let mut world = wb.build();
1787 let mut rt = Runtime::new(&mut world);
1788
1789 rt.block_on(async {
1790 let handle = spawn_boxed(async { 42 });
1791 crate::context::yield_now().await;
1792 assert!(handle.is_finished());
1793 assert!(!handle.abort()); // already done, handle consumed
1794 });
1795 }
1796
1797 #[test]
1798 fn join_handle_drop_after_completion_drops_output() {
1799 use std::cell::Cell;
1800 use std::rc::Rc;
1801
1802 let wb = WorldBuilder::new();
1803 let mut world = wb.build();
1804 let mut rt = Runtime::new(&mut world);
1805
1806 let drop_count = Rc::new(Cell::new(0u32));
1807 let dc = drop_count.clone();
1808
1809 struct DropCounter(Rc<Cell<u32>>);
1810 impl Drop for DropCounter {
1811 fn drop(&mut self) {
1812 self.0.set(self.0.get() + 1);
1813 }
1814 }
1815
1816 rt.block_on(async move {
1817 let handle = spawn_boxed(async move { DropCounter(dc) });
1818 // Let it complete.
1819 crate::context::yield_now().await;
1820 assert!(handle.is_finished());
1821 // Drop handle without reading — output should be dropped.
1822 drop(handle);
1823 });
1824
1825 assert_eq!(drop_count.get(), 1, "output should be dropped exactly once");
1826 }
1827
1828 #[test]
1829 fn join_handle_multiple_concurrent() {
1830 let wb = WorldBuilder::new();
1831 let mut world = wb.build();
1832 let mut rt = Runtime::new(&mut world);
1833
1834 rt.block_on(async {
1835 let h1 = spawn_boxed(async { 10u64 });
1836 let h2 = spawn_boxed(async { 20u64 });
1837 let h3 = spawn_boxed(async { 30u64 });
1838
1839 let r3 = h3.await;
1840 let r1 = h1.await;
1841 let r2 = h2.await;
1842
1843 assert_eq!(r1, 10);
1844 assert_eq!(r2, 20);
1845 assert_eq!(r3, 30);
1846 });
1847 }
1848
1849 #[test]
1850 fn join_handle_output_larger_than_future() {
1851 let wb = WorldBuilder::new();
1852 let mut world = wb.build();
1853 let mut rt = Runtime::new(&mut world);
1854
1855 rt.block_on(async {
1856 // The future is tiny, the output is large.
1857 let handle = spawn_boxed(async { [42u64; 32] });
1858 let result = handle.await;
1859 assert_eq!(result[0], 42);
1860 assert_eq!(result[31], 42);
1861 });
1862 }
1863}