Skip to main content

nexus_async_rt/
runtime.rs

1//! Single-threaded async runtime.
2//!
3//! [`Runtime`] owns an [`Executor`](crate::Executor) for spawned tasks, a
4//! boxed root future, and an event-cycle timestamp. The root future is
5//! driven to completion by [`block_on`](Runtime::block_on) or
6//! [`block_on_busy`](Runtime::block_on_busy).
7//!
8//! Two spawn strategies:
9//! - **`spawn_boxed()`** — Box-allocated. Default. No setup needed.
10//! - **`spawn_slab()`** — Slab-allocated. Zero-alloc hot path.
11//!   Requires slab configured via [`RuntimeBuilder::slab`].
12//!
13//! # Thread-local spawn
14//!
15//! [`spawn`] and [`spawn_slab`] are free functions that push tasks into
16//! the current runtime via thread-local pointers set during `block_on`.
17//! Calling them outside `block_on` panics.
18
19use std::cell::Cell;
20use std::future::Future;
21use std::marker::PhantomData;
22use std::pin::Pin;
23use std::task::{Context, Poll, Wake, Waker};
24use std::time::{Duration, Instant};
25
26use crate::io::IoDriver;
27use crate::task::JoinHandle;
28use crate::timer::TimerDriver;
29use crate::{Executor, WorldCtx};
30
31/// Default number of loop iterations between non-blocking IO polls.
32/// Matches tokio's heuristic (61, originally from Go's scheduler).
33const DEFAULT_EVENT_INTERVAL: u32 = 61;
34
35// =============================================================================
36// Thread-local spawn context
37// =============================================================================
38
39thread_local! {
40    /// Raw pointer to the active runtime's executor.
41    /// Set on `block_on` entry, cleared on exit.
42    static CURRENT: Cell<*mut Executor> = const { Cell::new(std::ptr::null_mut()) };
43}
44
45/// Spawn a Box-allocated task into the current runtime.
46///
47/// Returns a [`JoinHandle`] that can be awaited for the task's output.
48/// Drop the handle to detach the task.
49///
50/// Must be called from within [`Runtime::block_on`] or
51/// [`Runtime::block_on_busy`]. Panics otherwise.
52///
53/// # Panics
54///
55/// - If called outside a runtime context.
56pub fn spawn_boxed<F>(future: F) -> JoinHandle<F::Output>
57where
58    F: Future + 'static,
59    F::Output: 'static,
60{
61    CURRENT.with(|cell| {
62        let ptr = cell.get();
63        assert!(
64            !ptr.is_null(),
65            "spawn_boxed() called outside of Runtime::block_on"
66        );
67        // SAFETY: pointer valid for duration of block_on. Single-threaded.
68        let executor = unsafe { &mut *ptr };
69        executor.spawn_boxed(future)
70    })
71}
72
73/// Spawn a slab-allocated task into the current runtime.
74///
75/// Returns a [`JoinHandle`] that can be awaited for the task's output.
76/// Zero allocation — the task is placed directly into a pre-allocated
77/// slab slot via TLS.
78///
79/// # Panics
80///
81/// - If called outside a runtime context.
82/// - If no slab is configured.
83/// - If the slab is full (bounded slab).
84/// - If the task future exceeds the slab's slot capacity.
85pub fn spawn_slab<F>(future: F) -> JoinHandle<F::Output>
86where
87    F: Future + 'static,
88    F::Output: 'static,
89{
90    CURRENT.with(|cell| {
91        let ptr = cell.get();
92        assert!(
93            !ptr.is_null(),
94            "spawn_slab() called outside of Runtime::block_on"
95        );
96        let executor = unsafe { &mut *ptr };
97        let tracker_key = executor.next_tracker_key();
98        let task_ptr = crate::alloc::slab_spawn(future, tracker_key);
99        executor.spawn_raw(task_ptr);
100        JoinHandle::new(task_ptr)
101    })
102}
103
104/// Access the current executor via TLS. Panics if outside `block_on`.
105pub(crate) fn with_executor<R>(f: impl FnOnce(&mut Executor) -> R) -> R {
106    CURRENT.with(|cell| {
107        let ptr = cell.get();
108        assert!(!ptr.is_null(), "called outside of Runtime::block_on");
109        let executor = unsafe { &mut *ptr };
110        f(executor)
111    })
112}
113
114/// Try to reserve a slab slot. Returns `None` if the slab is full.
115///
116/// Call `.spawn(future)` on the returned [`SlabClaim`](crate::alloc::SlabClaim)
117/// to write a task and enqueue it. If dropped without spawning, the
118/// slot is returned to the freelist automatically.
119///
120/// # Panics
121///
122/// - If called outside a runtime context.
123/// - If no slab is configured.
124pub fn try_claim_slab() -> Option<crate::alloc::SlabClaim> {
125    CURRENT.with(|cell| {
126        assert!(
127            !cell.get().is_null(),
128            "try_claim_slab() called outside of Runtime::block_on"
129        );
130    });
131    crate::alloc::try_claim()
132}
133
134/// Reserve a slab slot. Panics if full or no slab configured.
135///
136/// Call `.spawn(future)` on the returned [`SlabClaim`](crate::alloc::SlabClaim)
137/// to write a task and enqueue it. If dropped without spawning, the
138/// slot is returned to the freelist automatically.
139///
140/// # Panics
141///
142/// - If called outside a runtime context.
143/// - If no slab is configured.
144/// - If the slab is full (bounded slab).
145pub fn claim_slab() -> crate::alloc::SlabClaim {
146    CURRENT.with(|cell| {
147        assert!(
148            !cell.get().is_null(),
149            "claim_slab() called outside of Runtime::block_on"
150        );
151    });
152    crate::alloc::claim()
153}
154
155// =============================================================================
156// Runtime
157// =============================================================================
158
159/// Single-threaded async runtime.
160///
161/// `Runtime` is intrinsically thread-bound — its slab TLS state is
162/// per-thread, so moving it to another thread would silently
163/// desynchronize allocation dispatch. The type is therefore both
164/// `!Send` and `!Sync`, enforced by a `PhantomData<*const ()>` marker.
165///
166/// ```compile_fail
167/// use nexus_async_rt::Runtime;
168/// fn assert_send<T: Send>() {}
169/// assert_send::<Runtime>();
170/// ```
171///
172/// ```compile_fail
173/// use nexus_async_rt::Runtime;
174/// fn assert_sync<T: Sync>() {}
175/// assert_sync::<Runtime>();
176/// ```
177///
178/// # Examples
179///
180/// ```ignore
181/// use nexus_async_rt::{Runtime, spawn_boxed, spawn_slab};
182/// use nexus_slab::byte::unbounded::Slab;
183/// use nexus_rt::WorldBuilder;
184///
185/// let mut world = WorldBuilder::new().build();
186///
187/// // Simple — Box-allocated tasks
188/// let mut rt = Runtime::new(&mut world);
189/// rt.block_on(async {
190///     spawn_boxed(async { /* Box-allocated */ });
191/// });
192///
193/// // With slab for hot-path tasks
194/// let slab = unsafe { Slab::<256>::with_chunk_capacity(64) };
195/// let mut rt = Runtime::builder(&mut world)
196///     .slab_unbounded(slab)
197///     .build();
198/// rt.block_on(async {
199///     spawn_boxed(async { /* Box-allocated */ });
200///     spawn_slab(async { /* slab-allocated */ });
201/// });
202/// ```
203//
204// `#[repr(C)]` is required for the `offset_of` assertion below to be
205// sound. Under `repr(Rust)` (the default), the compiler is free to
206// reorder fields for layout optimization, which would let an accidental
207// declaration-order swap silently re-introduce BUG-1 (#167) while the
208// offset comparison still happened to pass. `#[repr(C)]` guarantees
209// field offsets follow declaration order modulo alignment padding,
210// making the assertion enforce what it claims.
211//
212// This is NOT for FFI — `Runtime` has no foreign caller. It's purely
213// to back the BUG-1 invariant with a language-spec guarantee instead
214// of empirical rustc behavior.
215#[repr(C)]
216pub struct Runtime {
217    /// Spawned task storage.
218    ///
219    /// Drops first (declaration order). `Executor::drop` walks
220    /// `all_tasks` and frees any survivors via the slab TLS dispatch
221    /// path, which requires `_slab_guard` to still be alive — see the
222    /// field-order invariant on `_slab_guard`. Surviving tasks may
223    /// also trigger `TaskRef::Drop → dispose_terminal`, which reads
224    /// the runtime's cross-wake context from TLS — see the field-order
225    /// invariant on `_cross_wake_tls_guard` below.
226    executor: Executor,
227
228    /// Clears the runtime's `CURRENT_RUNTIME_CTX` TLS slot on drop.
229    ///
230    /// **MUST drop AFTER `executor`**: when `Executor::drop` walks
231    /// `all_tasks` and frees terminal tasks, any TaskRef::Drop fired
232    /// from cross-thread holders (or local wakers cleaned up during
233    /// teardown) reads `CURRENT_RUNTIME_CTX` via
234    /// `crate::cross_wake::on_owning_executor` to decide whether to
235    /// defer locally or queue cross-thread. If this guard drops before
236    /// `executor`, the on-thread fast path silently misroutes terminal
237    /// frees to the cross-queue. The `const _: ()` block below this
238    /// struct enforces the ordering at compile time.
239    ///
240    /// **FAILURE MODE: silent UAF in production for slab tasks.** If
241    /// this guard drops before `executor`, the on-thread fast path in
242    /// `dispose_terminal::on_owning_executor` silently misroutes
243    /// terminal `TaskRef::Drop` calls to the cross-queue. Nothing
244    /// drains the cross-queue at this point in shutdown. `_slab_guard`
245    /// then releases the slab backing storage. Any off-thread holder
246    /// still pointing into the freed slab memory dereferences a
247    /// dangling pointer. Do NOT modify the field declaration order
248    /// without re-running miri tree-borrows on the full test suite AND
249    /// the BUG-4 unwind regression tests.
250    _cross_wake_tls_guard: crate::cross_wake::RuntimeCrossWakeGuard,
251
252    /// IO driver (mio). Wrapped in `UnsafeCell` because a raw pointer
253    /// is stored in TLS during `block_on`. Task futures access the IO
254    /// driver through TLS (e.g., `TcpStream::poll_read`), while the
255    /// run loop accesses it through `&mut self` (e.g., `poll_io()`).
256    /// Without `UnsafeCell`, `&mut self` would invalidate the TLS
257    /// pointer's provenance — see `Executor` docs for the full
258    /// explanation.
259    io: std::cell::UnsafeCell<IoDriver>,
260
261    /// Timer driver. Same `UnsafeCell` rationale — `Sleep::poll` accesses
262    /// through a stored raw pointer, `run_loop` accesses through `&mut self`.
263    timers: std::cell::UnsafeCell<TimerDriver>,
264
265    /// World access handle.
266    ctx: WorldCtx,
267
268    /// Event-cycle timestamp.
269    event_time: Cell<Instant>,
270
271    /// Graceful shutdown handle.
272    shutdown: crate::ShutdownHandle,
273
274    /// Cross-thread wake context. Shared with cross-thread wakers via Arc.
275    /// Contains the intrusive MPSC inbox + mio::Waker for eventfd.
276    cross_wake: std::sync::Arc<crate::cross_wake::CrossWakeContext>,
277
278    /// Max cross-thread wakes drained per poll cycle.
279    cross_thread_drain_limit: usize,
280
281    /// Loop iterations between non-blocking IO polls.
282    event_interval: u32,
283
284    /// Slab allocator + TLS install. Owned via a single guard so that
285    /// TLS dispatch stays valid for the Runtime's entire lifetime.
286    ///
287    /// **MUST drop AFTER `executor`**: when `Executor::drop` frees
288    /// surviving slab tasks via TLS dispatch, the slab and its install
289    /// must still be alive. Reordering re-introduces BUG-1 (#167) — a
290    /// panic at `Runtime::drop` from surviving slab tasks calling into
291    /// a cleared TLS dispatch path. The `const _: ()` block below this
292    /// struct enforces the ordering at compile time.
293    _slab_guard: Option<crate::alloc::SlabGuard>,
294
295    /// Tracks Runtime presence on the thread. Installed at construction
296    /// (panics if another Runtime is already alive), cleared on drop.
297    /// Declared after `_slab_guard` so the "Runtime alive" flag stays
298    /// set throughout the entire drop sequence — defensive against any
299    /// inner Drop impl trying to construct another Runtime mid-teardown.
300    _runtime_presence: RuntimePresenceGuard,
301
302    /// Marker — `Runtime` is intrinsically thread-bound (per-thread TLS
303    /// state). `*const ()` is `!Send + !Sync`; the `PhantomData`
304    /// propagates that at the type level regardless of other field
305    /// changes. See the `compile_fail` doc-tests on `Runtime`.
306    _not_thread_safe: PhantomData<*const ()>,
307}
308
309// =============================================================================
310// Runtime field ordering — invariants
311// =============================================================================
312//
313// Field declaration order in `struct Runtime` is load-bearing. Each
314// field has a position relative to others enforced by the requirements
315// below. Rust's default layout doesn't guarantee declaration order,
316// but `#[repr(C)]` + the `const _: ()` asserts below catch any
317// reordering at compile time regardless of layout.
318//
319// Order (top → bottom = first → last drop):
320//
321//   executor              ← drops first; frees tasks
322//   _cross_wake_tls_guard ← drops second; clears CURRENT_RUNTIME_CTX TLS
323//   io                    ← UnsafeCell, no Drop concerns
324//   timers                ← UnsafeCell, no Drop concerns
325//   ctx                   ← WorldCtx, holds raw pointer to user's World
326//   event_time            ← Cell<Instant>, trivial
327//   shutdown              ← clone of ShutdownHandle, has Arc inside
328//   cross_wake            ← Arc<CrossWakeContext>; off-thread holders may
329//                           still exist; Arc keeps it alive past Runtime drop
330//   cross_thread_drain_limit, event_interval ← trivial
331//   _slab_guard           ← MUST drop AFTER executor (invariant 1)
332//   _runtime_presence     ← MUST drop AFTER everything (invariant 3)
333//   _not_thread_safe      ← PhantomData, no Drop
334//
335// Invariants:
336//
337// 1. `_slab_guard` after `executor`
338//    Reason: BUG-1 (#167). When `Executor::drop` walks `all_tasks`
339//    and encounters slab-allocated survivors, it dispatches their
340//    `free_fn` through TLS. The TLS install lives on `_slab_guard`.
341//    If `_slab_guard` drops first, slab tasks see the no-slab panic
342//    stub.
343//    Enforced: `const _:` offset assert below (added pre-PR-1a).
344//
345// 2. `_cross_wake_tls_guard` after `executor`
346//    Reason: PR 1a. When `Executor::drop` walks `all_tasks` and
347//    triggers `TaskRef::Drop`, the terminal-drop routing in
348//    `dispose_terminal` reads `CURRENT_RUNTIME_CTX` to decide
349//    on-thread (defer) vs off-thread (queue). If `_cross_wake_tls_guard`
350//    drops first, the on-thread fast path silently misroutes terminal
351//    frees to the cross-queue, where nothing drains them — leak,
352//    OR for slab tasks, eventual UAF when `_slab_guard` releases
353//    the slab backing storage.
354//    FAILURE MODE: silent UAF in production for slab tasks.
355//    Enforced: `const _:` offset assert below (added in PR 1a).
356//
357// 3. `_runtime_presence` after everything else
358//    Reason: defensive. Some inner Drop impl might attempt to construct
359//    another Runtime mid-teardown. With `_runtime_presence` dropping
360//    last, the "Runtime alive" flag remains set, and that nested
361//    construction panics rather than silently corrupting TLS.
362//    Enforced: convention. No const_assert because there are too many
363//    fields to assert against; relies on the doc-block + code review.
364//
365// 4. `cross_wake` outlives off-thread holders implicitly via Arc
366//    Reason: cross-thread wakers (channel slots, tokio_compat) hold an
367//    Arc<CrossWakeContext>. The Arc keeps the queue alive after Runtime
368//    drops. When the LAST off-thread waker drops its Arc, the queue is
369//    finally freed. Off-thread `dispose_terminal` calling
370//    `try_set_queued + push` on a queue whose Runtime has been dropped
371//    is safe: the queue is alive, we push to it, but no consumer
372//    drains. The terminal task pointer leaks (memory, not UAF). PR 2
373//    §2.3's `ShutdownStats` will surface this as a counter.
374//
375// Adding new fields:
376// - Place trivial fields anywhere; non-trivial fields go to the
377//   bottom of the appropriate group.
378// - If your field has a Drop with cross-thread implications, document
379//   the invariant here AND add an `offset_of` assert next to the
380//   existing ones.
381// - When in doubt, add to the bottom and document.
382
383// BUG-1 (#167) invariant: `_slab_guard` MUST drop after `executor`.
384// Field drop order is declaration order, and offset is a proxy: a
385// later-declared field has a higher offset (modulo alignment padding,
386// which preserves order). If anyone reorders the fields above, this
387// fires at compile time.
388const _: () = assert!(
389    std::mem::offset_of!(Runtime, _slab_guard) > std::mem::offset_of!(Runtime, executor),
390    "BUG-1 (#167) invariant violated: Runtime::_slab_guard MUST be \
391     declared after Runtime::executor so it drops after the executor \
392     frees surviving slab tasks. Restore the declaration order or BUG-1 \
393     reappears as a panic at Runtime::drop."
394);
395
396// PR 1a (TaskRef + dispose_terminal) invariant: `_cross_wake_tls_guard`
397// MUST drop after `executor`. The executor's drop path runs TaskRef::Drop
398// for any cross-thread holder ref that lands during teardown; those
399// drops route through `dispose_terminal` which checks
400// `CURRENT_RUNTIME_CTX` to pick the on-thread (defer) vs off-thread
401// (queue) branch. If this guard clears the TLS before `executor`
402// finishes, the comparison silently fails and on-thread terminal frees
403// get misrouted to the cross-queue (where nothing drains them — leak,
404// or for slab tasks, eventual UAF when the slab backing storage drops
405// behind `_slab_guard`).
406const _: () = assert!(
407    std::mem::offset_of!(Runtime, _cross_wake_tls_guard) > std::mem::offset_of!(Runtime, executor),
408    "PR 1a invariant violated: Runtime::_cross_wake_tls_guard MUST be \
409     declared after Runtime::executor so the runtime's CURRENT_RUNTIME_CTX \
410     TLS stays installed while Executor::drop runs. Reordering \
411     misroutes terminal TaskRef::Drop calls during teardown."
412);
413
414impl Runtime {
415    /// Create a runtime with default settings. Box-allocated tasks only.
416    ///
417    /// For slab allocation or custom configuration, use [`Runtime::builder`].
418    pub fn new(world: &mut nexus_rt::World) -> Self {
419        RuntimeBuilder::new(world).build()
420    }
421
422    /// Create a runtime via the builder pattern.
423    pub fn builder(world: &mut nexus_rt::World) -> RuntimeBuilder<'_> {
424        RuntimeBuilder::new(world)
425    }
426
427    /// Returns a [`ShutdownHandle`](crate::ShutdownHandle) for triggering or observing shutdown.
428    pub fn shutdown_handle(&self) -> crate::ShutdownHandle {
429        self.shutdown.clone()
430    }
431
432    /// Install signal handlers for SIGTERM and SIGINT.
433    pub fn install_signal_handlers(&self) {
434        // SAFETY: single-threaded, called during setup before block_on.
435        crate::shutdown::install_signal_handlers(
436            &self.shutdown.flag_ptr(),
437            &unsafe { &*self.io.get() }.mio_waker(),
438        );
439    }
440
441    /// Number of live spawned tasks.
442    pub fn task_count(&self) -> usize {
443        self.executor.task_count()
444    }
445
446    /// Returns a handle to the abnormal-shutdown counter atomics.
447    /// **Hold the handle past `drop(runtime)`** to inspect final
448    /// values — the counters fire DURING `Executor::drop`, so a
449    /// snapshot taken before drop will show all zeros for the
450    /// shutdown-only counters.
451    ///
452    /// Useful as a signal — if any counter is non-zero, the shutdown
453    /// hit a defensive path that should be investigated.
454    ///
455    /// ```ignore
456    /// let stats_handle = runtime.shutdown_stats();
457    /// drop(runtime);
458    /// let stats = stats_handle.snapshot();
459    /// if stats.aborted_unwinds != 0
460    ///     || stats.leaked_box_tasks != 0
461    ///     || stats.unbalanced_normal_shutdowns != 0
462    ///     || stats.cross_queue_undrained != 0
463    /// {
464    ///     // user's own observability — log to wherever they want
465    ///     my_logger::warn!("nexus runtime shutdown: {stats:?}");
466    /// }
467    /// ```
468    ///
469    /// Per PR 2's design (CALLOUT 5 of the plan), the runtime emits no
470    /// log events of its own when these counters fire — users own
471    /// their observability stack. The PR 1a `eprintln!` calls in the
472    /// slab-unwinding-abort path remain (the only signal at the
473    /// moment of process abort) but new abnormal paths added in PR 2
474    /// are pure counter increments.
475    ///
476    /// # Counters
477    ///
478    /// See [`ShutdownStats`](crate::ShutdownStats) for what each
479    /// counter signifies, and [`ShutdownStatsAtomics::snapshot`] for
480    /// the read API on the returned handle.
481    pub fn shutdown_stats(&self) -> std::sync::Arc<crate::ShutdownStatsAtomics> {
482        self.executor.shutdown_stats()
483    }
484}
485
486// =============================================================================
487// RuntimeBuilder
488// =============================================================================
489
490/// Type-erased closure that boxes the slab and returns (ownership, TLS config).
491type SlabInstaller = Box<dyn FnOnce() -> (Box<dyn std::any::Any>, crate::alloc::SlabTlsConfig)>;
492
493/// Builder for configuring a [`Runtime`].
494///
495/// # Examples
496///
497/// ```ignore
498/// use nexus_async_rt::*;
499/// use nexus_slab::byte::unbounded::Slab;
500///
501/// let mut world = nexus_rt::WorldBuilder::new().build();
502/// let slab = unsafe { Slab::<256>::with_chunk_capacity(64) };
503///
504/// let mut rt = Runtime::builder(&mut world)
505///     .tasks_per_cycle(128)
506///     .slab_unbounded(slab)
507///     .signal_handlers(true)
508///     .build();
509/// ```
510pub struct RuntimeBuilder<'w> {
511    world: &'w mut nexus_rt::World,
512    tasks_per_cycle: usize,
513    cross_thread_drain_limit: usize,
514    event_interval: u32,
515    queue_capacity: usize,
516    event_capacity: usize,
517    token_capacity: usize,
518    signal_handlers: bool,
519    /// Type-erased slab + guard installer. None = no slab (Box-only).
520    slab_installer: Option<SlabInstaller>,
521}
522
523impl<'w> RuntimeBuilder<'w> {
524    fn new(world: &'w mut nexus_rt::World) -> Self {
525        Self {
526            world,
527            tasks_per_cycle: crate::DEFAULT_TASKS_PER_CYCLE,
528            cross_thread_drain_limit: usize::MAX,
529            event_interval: DEFAULT_EVENT_INTERVAL,
530            queue_capacity: 64,
531            event_capacity: 1024,
532            token_capacity: 64,
533            signal_handlers: false,
534            slab_installer: None,
535        }
536    }
537
538    /// Maximum tasks polled per cycle before yielding to check IO.
539    /// Default: 64.
540    #[must_use]
541    pub fn tasks_per_cycle(mut self, limit: usize) -> Self {
542        self.tasks_per_cycle = limit;
543        self
544    }
545
546    /// Number of loop iterations between non-blocking IO driver polls.
547    /// Default: 61 (matches tokio's heuristic).
548    ///
549    /// Every `event_interval` iterations the runtime does a non-blocking
550    /// `epoll_wait(0)` to check for socket events, even if tasks are
551    /// ready. Lower values improve IO responsiveness at the cost of
552    /// more syscalls; higher values favor task throughput.
553    #[must_use]
554    pub fn event_interval(mut self, n: u32) -> Self {
555        assert!(n > 0, "event_interval must be > 0");
556        self.event_interval = n;
557        self
558    }
559
560    /// Maximum cross-thread wakes drained per poll cycle.
561    /// Default: unlimited.
562    ///
563    /// Caps how many tasks woken from other threads are moved into the
564    /// local ready queue per iteration. Prevents a firehose of
565    /// cross-thread wakes from starving local tasks and IO. Remaining
566    /// wakes are drained on the next iteration.
567    #[must_use]
568    pub fn cross_thread_drain_limit(mut self, limit: usize) -> Self {
569        self.cross_thread_drain_limit = limit;
570        self
571    }
572
573    /// Pre-allocated capacity for internal queues. Default: 64.
574    #[must_use]
575    pub fn queue_capacity(mut self, cap: usize) -> Self {
576        self.queue_capacity = cap;
577        self
578    }
579
580    /// Maximum IO events processed per epoll cycle. Default: 1024.
581    #[must_use]
582    pub fn event_capacity(mut self, cap: usize) -> Self {
583        self.event_capacity = cap;
584        self
585    }
586
587    /// Initial number of IO source slots. Default: 64.
588    #[must_use]
589    pub fn token_capacity(mut self, cap: usize) -> Self {
590        self.token_capacity = cap;
591        self
592    }
593
594    /// Install SIGTERM/SIGINT signal handlers. Default: false.
595    #[must_use]
596    pub fn signal_handlers(mut self, enable: bool) -> Self {
597        self.signal_handlers = enable;
598        self
599    }
600
601    /// Hand off a growable (unbounded) slab for [`spawn_slab`].
602    ///
603    /// `S` is the total slot size in bytes. The task header uses 72 bytes,
604    /// so `Slab<256>` gives 184 bytes for the future. Most async IO
605    /// futures are 128–256 bytes — `Slab<256>` or `Slab<512>` covers
606    /// the common cases.
607    ///
608    /// The slab grows by allocating new chunks when full. No task spawn
609    /// will ever fail due to capacity.
610    ///
611    /// # Examples
612    ///
613    /// ```ignore
614    /// use nexus_slab::byte::unbounded::Slab;
615    ///
616    /// // SAFETY: single-threaded runtime.
617    /// let slab = unsafe { Slab::<256>::with_chunk_capacity(64) };
618    ///
619    /// let mut rt = Runtime::builder(&mut world)
620    ///     .slab_unbounded(slab)
621    ///     .build();
622    /// ```
623    pub fn slab_unbounded<const S: usize>(
624        mut self,
625        slab: nexus_slab::byte::unbounded::Slab<S>,
626    ) -> Self {
627        const {
628            assert!(
629                S >= 72,
630                "slab slot size must be at least 72 bytes (TASK_HEADER_SIZE)"
631            );
632        }
633        self.slab_installer = Some(Box::new(move || {
634            let mut slab = Box::new(slab);
635            // Derive pointer via &mut to get write provenance. Using &ref
636            // gives read-only provenance under stacked borrows, but the
637            // allocator writes through this pointer.
638            let slab_ptr = std::ptr::from_mut(slab.as_mut()).cast::<u8>();
639            let config = crate::alloc::make_unbounded_config::<S>(slab_ptr);
640            (slab as Box<dyn std::any::Any>, config)
641        }));
642        self
643    }
644
645    /// Hand off a fixed-capacity (bounded) slab for [`spawn_slab`].
646    ///
647    /// `S` is the total slot size in bytes. The slab has a fixed number
648    /// of slots — `spawn_slab` panics if the slab is full. Use this
649    /// when you want deterministic memory usage and know the maximum
650    /// number of concurrent hot-path tasks.
651    ///
652    /// # Examples
653    ///
654    /// ```ignore
655    /// use nexus_slab::byte::bounded::Slab;
656    ///
657    /// // SAFETY: single-threaded runtime.
658    /// let slab = unsafe { Slab::<256>::with_capacity(64) };
659    ///
660    /// let mut rt = Runtime::builder(&mut world)
661    ///     .slab_bounded(slab)
662    ///     .build();
663    /// ```
664    pub fn slab_bounded<const S: usize>(
665        mut self,
666        slab: nexus_slab::byte::bounded::Slab<S>,
667    ) -> Self {
668        const {
669            assert!(
670                S >= 72,
671                "slab slot size must be at least 72 bytes (TASK_HEADER_SIZE)"
672            );
673        }
674        self.slab_installer = Some(Box::new(move || {
675            let mut slab = Box::new(slab);
676            // Derive pointer via &mut to get write provenance. Using &ref
677            // gives read-only provenance under stacked borrows, but the
678            // allocator writes through this pointer.
679            let slab_ptr = std::ptr::from_mut(slab.as_mut()).cast::<u8>();
680            let config = crate::alloc::make_bounded_config::<S>(slab_ptr);
681            (slab as Box<dyn std::any::Any>, config)
682        }));
683        self
684    }
685
686    /// Build the runtime.
687    pub fn build(self) -> Runtime {
688        // Fail-fast if another Runtime is already alive on this thread.
689        // Done before any resource allocation so we don't leak IoDriver,
690        // mio::Poll, etc. on the panic path.
691        let runtime_presence = RuntimePresenceGuard::install();
692
693        let io = IoDriver::new(self.event_capacity, self.token_capacity)
694            .expect("failed to create mio::Poll");
695        let mut shutdown = crate::ShutdownHandle::new();
696        shutdown.set_mio_waker(io.mio_waker());
697
698        let mut executor = Executor::new(self.queue_capacity);
699        executor.set_tasks_per_cycle(self.tasks_per_cycle);
700
701        let ctx = WorldCtx::new(self.world);
702        let event_time = Cell::new(Instant::now());
703
704        // Create slab if configured and install TLS immediately. The
705        // returned guard owns the slab and the TLS install; it lives
706        // on Runtime so it drops AFTER `executor` (which frees surviving
707        // slab tasks via TLS dispatch). This is the architectural fix
708        // for BUG-1 (#167) — TLS scope now matches Runtime lifetime
709        // instead of run_loop scope.
710        let slab_guard = self.slab_installer.map(|install| {
711            let (slab, config) = install();
712            crate::alloc::install_slab(slab, &config)
713        });
714
715        let cross_wake = std::sync::Arc::new(crate::cross_wake::CrossWakeContext {
716            queue: crate::cross_wake::CrossWakeQueue::new(),
717            mio_waker: io.mio_waker(),
718            parked: std::sync::atomic::AtomicBool::new(false),
719        });
720
721        // Wire the cross-wake context into the executor for the
722        // shutdown-time `cross_queue_undrained` tally (PR 2 §2.3).
723        // Bare Executor use in tests has no Runtime, no cross-wake
724        // context, no tally — we install it here for the Runtime
725        // path only.
726        executor.install_cross_wake_for_drop(std::sync::Arc::clone(&cross_wake));
727
728        // Install the runtime's cross-wake context as the current-thread
729        // owning-executor identity. Lives lifetime-of-Runtime via the
730        // guard field below — `dispose_terminal::on_owning_executor`
731        // reads this slot to decide local-defer vs cross-queue routing
732        // for TaskRef terminal drops.
733        let cross_wake_tls_guard = crate::cross_wake::install_runtime_cross_wake(&cross_wake);
734
735        let rt = Runtime {
736            executor,
737            _cross_wake_tls_guard: cross_wake_tls_guard,
738            io: std::cell::UnsafeCell::new(io),
739            timers: std::cell::UnsafeCell::new(TimerDriver::new(64)),
740            ctx,
741            event_time,
742            shutdown,
743            cross_wake,
744            cross_thread_drain_limit: self.cross_thread_drain_limit,
745            event_interval: self.event_interval,
746            _slab_guard: slab_guard,
747            _runtime_presence: runtime_presence,
748            _not_thread_safe: PhantomData,
749        };
750
751        if self.signal_handlers {
752            rt.install_signal_handlers();
753        }
754
755        rt
756    }
757}
758
759// =============================================================================
760// block_on / run_loop
761// =============================================================================
762
763impl Runtime {
764    /// Drive the root future to completion. CPU-friendly.
765    ///
766    /// Parks the thread when no work is available.
767    pub fn block_on<F>(&mut self, future: F) -> F::Output
768    where
769        F: Future + 'static,
770    {
771        self.run_loop(future, ParkMode::Park)
772    }
773
774    /// Drive the root future to completion. Busy-wait.
775    ///
776    /// Never parks. Minimum wake latency at 100% CPU.
777    pub fn block_on_busy<F>(&mut self, future: F) -> F::Output
778    where
779        F: Future + 'static,
780    {
781        self.run_loop(future, ParkMode::Spin)
782    }
783
784    /// Drive the executor until pending cross-thread work has settled,
785    /// before shutdown. The canonical "quiesce" step before
786    /// `drop(runtime)` — see `docs/SHUTDOWN.md` for the full pattern.
787    ///
788    /// Loops while:
789    /// 1. The cross-thread queue has entries (drains them).
790    /// 2. The local ready queue has entries (polls them).
791    ///
792    /// Returns `Ok(())` once both are empty (or detected to no longer
793    /// receive new entries). Returns `Err(QuiesceTimeout)` if `timeout`
794    /// elapses first; the error contains diagnostic counts useful for
795    /// determining which producer didn't release its refs.
796    ///
797    /// **This is for clean shutdown, not panic-during-shutdown.** The
798    /// 100ms unwinding-wait in `Executor::drop` remains as
799    /// defense-in-depth for the panic case (where this method can't
800    /// be called).
801    ///
802    /// **The `timeout` parameter has no default — callers must pick a
803    /// budget deliberately.** PR 2 §2.4 open-item 4 evaluated `100ms`
804    /// (matches the unwinding defense), `500ms` (forgiving), and
805    /// "parameter-only" — chose parameter-only to force the user to
806    /// pick a budget appropriate for their producer landscape (a
807    /// trading-system shutdown sequence with multiple Aeron drivers
808    /// plus tokio futures plus channel senders has very different
809    /// settling characteristics than a unit test).
810    ///
811    /// # Canonical shutdown sequence
812    ///
813    /// ```ignore
814    /// // 1. Stop producers of cross-thread refs:
815    /// //    - Drop tokio runtime (or shutdown_timeout)
816    /// //    - Stop Aeron driver thread
817    /// //    - Drop external channel senders
818    ///
819    /// // 2. Quiesce.
820    /// runtime.shutdown_quiesce(Duration::from_millis(500))?;
821    ///
822    /// // 3. Drop the Runtime. Outstanding-ref panic paths in
823    /// //    Executor::drop should be unreachable in normal operation.
824    /// drop(runtime);
825    /// ```
826    ///
827    /// If step 2 returns `QuiesceTimeout`, a producer hasn't released
828    /// its refs. Investigate before letting Runtime drop — the
829    /// unwind-abort path in `Executor::drop` is defensive, not
830    /// desired.
831    pub fn shutdown_quiesce(&mut self, timeout: Duration) -> Result<(), QuiesceTimeout> {
832        // Install the same TLS context block_on uses, so any cross-thread
833        // wakes that fire during quiesce still find a runtime.
834        let _ctx_guard = crate::context::install(
835            self.ctx.as_ptr(),
836            self.io.get(),
837            self.timers.get(),
838            &raw const self.event_time,
839            std::sync::Arc::as_ptr(&self.shutdown.flag_ptr()),
840            std::ptr::from_ref(&self.shutdown.task_waker),
841        );
842        let _cross_wake_guard = crate::cross_wake::install_cross_wake(&self.cross_wake);
843        let _spawn_guard = RuntimeGuard::enter(&raw mut self.executor);
844        let (ready, deferred) = self.executor.poll_context_ptrs();
845        let _ready_guard = crate::waker::set_poll_context(ready, deferred);
846
847        let cross_queue = &*self.cross_wake;
848        let start = Instant::now();
849
850        loop {
851            // Drain whatever's in the cross-thread queue. The returned
852            // count tells us if anything was pending (a non-consuming
853            // "is empty" check on the Vyukov queue would race the
854            // producer; drain-and-count is the right primitive).
855            let drained_this_pass = self
856                .executor
857                .drain_cross_thread(&cross_queue.queue, self.cross_thread_drain_limit);
858
859            // Poll any ready tasks (drains the local ready queue).
860            self.executor.poll();
861
862            // Quiesced means: `all_tasks` is empty (no tasks with
863            // outstanding refs — completed-but-held tasks would still
864            // fire the abnormal-shutdown branches in `Executor::drop`),
865            // no ready work pending, no cross-queue entries drained
866            // THIS pass. Live or parked tasks that are still in
867            // `all_tasks` count as not-quiesced — they're holding
868            // refs that prevent a clean Runtime drop, even if they're
869            // not making progress.
870            //
871            // Use `outstanding_tasks` (`all_tasks.len()`), NOT
872            // `task_count` (`live_count`). `live_count` decrements
873            // unconditionally on completion; `all_tasks` only loses an
874            // entry when its refcount actually hits zero. A completed
875            // task with a held `JoinHandle` has `live_count -= 1` but
876            // is still in `all_tasks` — quiesce-as-Ok with
877            // `task_count == 0` would mis-claim quiesced and the
878            // user's subsequent drop would fire the
879            // `unbalanced_normal_shutdowns` branch
880            // (PR2-John-review item 2).
881            let has_ready = self.executor.has_ready();
882            let all_tasks_empty = self.executor.outstanding_tasks() == 0;
883            if !has_ready && drained_this_pass == 0 && all_tasks_empty {
884                return Ok(());
885            }
886
887            if start.elapsed() >= timeout {
888                // Final drain to count what's left in the cross-queue
889                // for the diagnostic. Anything after this drain is a
890                // post-timeout race — not counted (and at this point
891                // the user is about to investigate or drop).
892                let remaining_cross_queue = self
893                    .executor
894                    .drain_cross_thread(&cross_queue.queue, usize::MAX)
895                    as u64;
896                return Err(QuiesceTimeout {
897                    remaining_cross_queue,
898                    remaining_outstanding_refs: self.executor.outstanding_tasks() as u64,
899                    elapsed: start.elapsed(),
900                });
901            }
902
903            // Avoid a tight spin on transient "queue popped a stub"
904            // states. yield_now is a hint to the scheduler.
905            std::thread::yield_now();
906        }
907    }
908
909    fn run_loop<F>(&mut self, future: F, mode: ParkMode) -> F::Output
910    where
911        F: Future + 'static,
912    {
913        // Install TLS context.
914        let _ctx_guard = crate::context::install(
915            self.ctx.as_ptr(),
916            self.io.get(),
917            self.timers.get(),
918            &raw const self.event_time,
919            std::sync::Arc::as_ptr(&self.shutdown.flag_ptr()),
920            std::ptr::from_ref(&self.shutdown.task_waker),
921        );
922
923        // Slab TLS is installed at Runtime construction (BUG-1 #167 fix)
924        // and torn down when the Runtime drops — no longer scoped to
925        // run_loop, so nothing to install here.
926
927        // Install cross-thread wake context in TLS.
928        let _cross_wake_guard = crate::cross_wake::install_cross_wake(&self.cross_wake);
929
930        let mut root: Pin<Box<dyn Future<Output = F::Output>>> = Box::pin(future);
931
932        let woken = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
933        let root_waker = Waker::from(std::sync::Arc::new(RootWake {
934            woken: std::sync::Arc::clone(&woken),
935            // SAFETY: single-threaded, called during block_on setup.
936            mio_waker: unsafe { &*self.io.get() }.mio_waker(),
937        }));
938        let mut root_cx = Context::from_waker(&root_waker);
939
940        // Install spawn TLS.
941        let _spawn_guard = RuntimeGuard::enter(&raw mut self.executor);
942
943        // Install waker TLS: ready queue + deferred free list.
944        // Uses UnsafeCell::get() to derive pointers that survive &mut self reborrows.
945        let (ready, deferred) = self.executor.poll_context_ptrs();
946        let _ready_guard = crate::waker::set_poll_context(ready, deferred);
947
948        self.event_time.set(Instant::now());
949
950        // The cross-thread queue uses interior mutability (UnsafeCell)
951        // for the consumer head. pop() takes &self, so a shared ref
952        // from the Arc is sufficient. No unsafe cast needed.
953        let cross_queue = &*self.cross_wake;
954
955        let mut tick: u32 = 0;
956
957        loop {
958            // 1. Poll root future if woken or shutdown requested.
959            if woken.swap(false, std::sync::atomic::Ordering::Acquire)
960                || self.shutdown.is_shutdown()
961            {
962                match root.as_mut().poll(&mut root_cx) {
963                    Poll::Ready(output) => return output,
964                    Poll::Pending => {}
965                }
966            }
967
968            // 2. Drain cross-thread inbox.
969            self.executor
970                .drain_cross_thread(&cross_queue.queue, self.cross_thread_drain_limit);
971
972            // 3. Poll ready tasks (up to tasks_per_cycle).
973            self.executor.poll();
974
975            // 4. Fire expired timers.
976            // SAFETY: single-threaded runtime, no concurrent access.
977            unsafe { &mut *self.timers.get() }.fire_expired(Instant::now());
978
979            // 4.5. Set parked early (park mode only) so cross-thread
980            // wakers arriving from here on will poke the eventfd.
981            if matches!(mode, ParkMode::Park) {
982                cross_queue
983                    .parked
984                    .store(true, std::sync::atomic::Ordering::Release);
985            }
986
987            // 5. Drain cross-thread inbox again (wakes during step 3/4).
988            self.executor
989                .drain_cross_thread(&cross_queue.queue, self.cross_thread_drain_limit);
990
991            tick = tick.wrapping_add(1);
992
993            // 6. Periodic non-blocking IO check every event_interval ticks.
994            //    Prevents IO starvation under sustained task load.
995            if tick % self.event_interval == 0 {
996                if let Err(e) = unsafe { &mut *self.io.get() }.poll_io(Some(Duration::ZERO)) {
997                    assert!(
998                        e.kind() == std::io::ErrorKind::Interrupted,
999                        "mio::Poll::poll failed: {e}"
1000                    );
1001                }
1002                self.event_time.set(Instant::now());
1003            }
1004
1005            // 7. If work remains, loop immediately.
1006            let has_work =
1007                self.executor.has_ready() || woken.load(std::sync::atomic::Ordering::Acquire);
1008
1009            if has_work {
1010                if matches!(mode, ParkMode::Park) {
1011                    cross_queue
1012                        .parked
1013                        .store(false, std::sync::atomic::Ordering::Release);
1014                }
1015                continue;
1016            }
1017
1018            // 8. No work. Spin mode loops; park mode sleeps in epoll.
1019            match mode {
1020                ParkMode::Spin => {
1021                    // Non-blocking IO check before spinning again.
1022                    if let Err(e) = unsafe { &mut *self.io.get() }.poll_io(Some(Duration::ZERO)) {
1023                        assert!(
1024                            e.kind() == std::io::ErrorKind::Interrupted,
1025                            "mio::Poll::poll failed: {e}"
1026                        );
1027                    }
1028                    self.event_time.set(Instant::now());
1029                }
1030                ParkMode::Park => {
1031                    // parked is already true (set at step 4.5).
1032                    // Park in epoll_wait until IO, timer, or cross-thread
1033                    // eventfd wakes us.
1034                    // SAFETY: single-threaded, no concurrent timer access.
1035                    let timeout = unsafe { &*self.timers.get() }
1036                        .next_deadline()
1037                        .map(|d| d.saturating_duration_since(Instant::now()));
1038
1039                    if let Err(e) = unsafe { &mut *self.io.get() }.poll_io(timeout) {
1040                        assert!(
1041                            e.kind() == std::io::ErrorKind::Interrupted,
1042                            "mio::Poll::poll failed: {e}"
1043                        );
1044                    }
1045
1046                    cross_queue
1047                        .parked
1048                        .store(false, std::sync::atomic::Ordering::Release);
1049                    self.event_time.set(Instant::now());
1050                }
1051            }
1052        }
1053    }
1054}
1055
1056// =============================================================================
1057// QuiesceTimeout — error type for `Runtime::shutdown_quiesce`
1058// =============================================================================
1059
1060/// Returned by [`Runtime::shutdown_quiesce`] when the timeout elapses
1061/// before the executor reaches a quiesced state.
1062///
1063/// The diagnostic fields help identify which producer didn't release
1064/// its refs:
1065///
1066/// - `remaining_cross_queue`: number of cross-thread queue entries
1067///   still pending at the moment of timeout. Non-zero indicates a
1068///   producer thread is still pushing wakes faster than quiesce can
1069///   drain them, OR a final-drain wake landed after the last drain
1070///   pass — investigate which off-thread producer is still active.
1071/// - `remaining_outstanding_refs`: number of tasks still in
1072///   `Executor::all_tasks` at the moment of timeout. Each represents a
1073///   task with outstanding cross-thread refs (or a held JoinHandle).
1074/// - `elapsed`: how long quiesce ran before timing out.
1075///
1076/// PR 2 §2.4 open-item 5 noted that finer-grained diagnostics
1077/// ("which task ID had the outstanding ref") could be added if
1078/// implementation revealed them as cheap to surface. The implementation
1079/// uses `Executor::task_count()` which doesn't enumerate tasks; adding
1080/// per-task data here would require new accessors. Out of scope for
1081/// initial PR 2; future enhancement.
1082#[derive(Debug)]
1083pub struct QuiesceTimeout {
1084    /// Number of cross-thread queue entries still pending at timeout.
1085    /// Non-zero means a producer is racing the drain loop.
1086    pub remaining_cross_queue: u64,
1087    /// Number of tasks still alive at the moment of timeout. Each is
1088    /// a candidate for "producer hasn't released its refs."
1089    pub remaining_outstanding_refs: u64,
1090    /// Time elapsed inside `shutdown_quiesce` before returning timeout.
1091    /// Approximately equal to the input `timeout`.
1092    pub elapsed: Duration,
1093}
1094
1095impl std::fmt::Display for QuiesceTimeout {
1096    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1097        write!(
1098            f,
1099            "Runtime::shutdown_quiesce timed out after {:?} with {} outstanding tasks, \
1100             {} cross-queue entries pending",
1101            self.elapsed, self.remaining_outstanding_refs, self.remaining_cross_queue
1102        )
1103    }
1104}
1105
1106impl std::error::Error for QuiesceTimeout {}
1107
1108// =============================================================================
1109// Park mode
1110// =============================================================================
1111
1112#[derive(Clone, Copy)]
1113enum ParkMode {
1114    Park,
1115    Spin,
1116}
1117
1118// =============================================================================
1119// Root future waker
1120// =============================================================================
1121
1122struct RootWake {
1123    woken: std::sync::Arc<std::sync::atomic::AtomicBool>,
1124    mio_waker: std::sync::Arc<mio::Waker>,
1125}
1126
1127impl Wake for RootWake {
1128    fn wake(self: std::sync::Arc<Self>) {
1129        self.wake_by_ref();
1130    }
1131
1132    fn wake_by_ref(self: &std::sync::Arc<Self>) {
1133        let was_woken = self.woken.swap(true, std::sync::atomic::Ordering::Release);
1134        if !was_woken {
1135            let _ = self.mio_waker.wake();
1136        }
1137    }
1138}
1139
1140// =============================================================================
1141// RAII guard for spawn TLS
1142// =============================================================================
1143
1144struct RuntimeGuard {
1145    prev: *mut Executor,
1146}
1147
1148impl RuntimeGuard {
1149    fn enter(executor: *mut Executor) -> Self {
1150        let prev = CURRENT.with(|cell| cell.replace(executor));
1151        Self { prev }
1152    }
1153}
1154
1155impl Drop for RuntimeGuard {
1156    fn drop(&mut self) {
1157        CURRENT.with(|cell| cell.set(self.prev));
1158    }
1159}
1160
1161// =============================================================================
1162// RAII guard for Runtime presence on this thread
1163// =============================================================================
1164//
1165// Enforces "at most one Runtime alive per thread" at construction time. This
1166// is the right scope because:
1167//
1168//  - Slab TLS is installed at construction (post BUG-1 fix). A second
1169//    construction would silently overwrite the first's slab dispatch state,
1170//    corrupting allocator routing for the first Runtime's surviving tasks.
1171//  - !Send + !Sync prevents cross-thread coexistence at the type level.
1172//    This guard prevents same-thread coexistence at runtime.
1173//
1174// Different from `RuntimeGuard` above: that one is per-`block_on` for spawn
1175// TLS, this one is per-Runtime for existence tracking.
1176
1177thread_local! {
1178    static RUNTIME_PRESENT: Cell<bool> = const { Cell::new(false) };
1179}
1180
1181pub(crate) struct RuntimePresenceGuard;
1182
1183impl RuntimePresenceGuard {
1184    /// Install the Runtime-presence flag. Panics if another Runtime is
1185    /// already alive on this thread.
1186    fn install() -> Self {
1187        assert!(
1188            !RUNTIME_PRESENT.with(Cell::get),
1189            "nexus-async-rt: another Runtime is already alive on this \
1190             thread. Only one Runtime is supported per thread because \
1191             thread-local state (slab dispatch, IO/timer drivers, \
1192             cross-thread wake context) cannot be shared between \
1193             Runtimes. Drop the existing Runtime first."
1194        );
1195        RUNTIME_PRESENT.with(|c| c.set(true));
1196        Self
1197    }
1198}
1199
1200impl Drop for RuntimePresenceGuard {
1201    fn drop(&mut self) {
1202        RUNTIME_PRESENT.with(|c| c.set(false));
1203    }
1204}
1205
1206// =============================================================================
1207// Tests
1208// =============================================================================
1209
1210#[cfg(test)]
1211mod tests {
1212    use super::*;
1213    use nexus_rt::{Handler, IntoHandler, Res, ResMut, WorldBuilder};
1214
1215    nexus_rt::new_resource!(Val(u64));
1216    nexus_rt::new_resource!(Out(u64));
1217
1218    #[test]
1219    fn block_on_returns_value() {
1220        let mut wb = WorldBuilder::new();
1221        wb.register(Val(42));
1222        let mut world = wb.build();
1223
1224        let mut rt = Runtime::new(&mut world);
1225        let result = rt.block_on(async { 42u64 });
1226        assert_eq!(result, 42);
1227    }
1228
1229    #[test]
1230    fn block_on_with_world_access() {
1231        let mut wb = WorldBuilder::new();
1232        wb.register(Val(42));
1233        wb.register(Out(0));
1234        let mut world = wb.build();
1235
1236        let mut rt = Runtime::new(&mut world);
1237
1238        let result = rt.block_on(async move {
1239            crate::WorldCtx::current().with_world(|world| {
1240                let v = world.resource::<Val>().0;
1241                world.resource_mut::<Out>().0 = v + 10;
1242            });
1243            crate::WorldCtx::current().with_world_ref(|world| world.resource::<Out>().0)
1244        });
1245
1246        assert_eq!(result, 52);
1247    }
1248
1249    #[test]
1250    fn block_on_with_pre_resolved_handler() {
1251        let mut wb = WorldBuilder::new();
1252        wb.register(Val(42));
1253        wb.register(Out(0));
1254        let mut world = wb.build();
1255
1256        let mut rt = Runtime::new(&mut world);
1257
1258        let mut h = (|val: Res<Val>, mut out: ResMut<Out>, event: u64| {
1259            out.0 = val.0 + event;
1260        })
1261        .into_handler(world.registry());
1262
1263        let result = rt.block_on(async move {
1264            crate::WorldCtx::current().with_world(|world| h.run(world, 10));
1265            crate::WorldCtx::current().with_world_ref(|world| world.resource::<Out>().0)
1266        });
1267
1268        assert_eq!(result, 52);
1269    }
1270
1271    #[test]
1272    fn spawn_from_root_future() {
1273        let mut wb = WorldBuilder::new();
1274        wb.register(Out(0));
1275        let mut world = wb.build();
1276
1277        let mut rt = Runtime::new(&mut world);
1278
1279        rt.block_on(async move {
1280            for i in 1..=3u64 {
1281                spawn_boxed(async move {
1282                    crate::WorldCtx::current().with_world(|world| {
1283                        world.resource_mut::<Out>().0 += i;
1284                    });
1285                });
1286            }
1287
1288            YieldOnce(false).await;
1289        });
1290
1291        assert_eq!(world.resource::<Out>().0, 6);
1292    }
1293
1294    #[test]
1295    fn block_on_busy_returns_value() {
1296        let mut wb = WorldBuilder::new();
1297        wb.register(Val(7));
1298        let mut world = wb.build();
1299
1300        let mut rt = Runtime::new(&mut world);
1301        let result = rt.block_on_busy(async { 6 * 7 });
1302        assert_eq!(result, 42);
1303    }
1304
1305    #[test]
1306    fn block_on_busy_with_spawned_tasks() {
1307        let mut wb = WorldBuilder::new();
1308        wb.register(Out(0));
1309        let mut world = wb.build();
1310
1311        let mut rt = Runtime::new(&mut world);
1312
1313        rt.block_on_busy(async move {
1314            spawn_boxed(async move {
1315                crate::WorldCtx::current().with_world(|world| {
1316                    world.resource_mut::<Out>().0 = 99;
1317                });
1318            });
1319
1320            YieldOnce(false).await;
1321        });
1322
1323        assert_eq!(world.resource::<Out>().0, 99);
1324    }
1325
1326    #[test]
1327    fn event_time_is_set() {
1328        let mut wb = WorldBuilder::new();
1329        wb.register(Val(0));
1330        let mut world = wb.build();
1331
1332        let mut rt = Runtime::new(&mut world);
1333
1334        let before = Instant::now();
1335        rt.block_on(async move {
1336            let t = crate::context::event_time();
1337            assert!(t >= before);
1338        });
1339    }
1340
1341    #[test]
1342    #[should_panic(expected = "spawn_boxed() called outside of Runtime::block_on")]
1343    fn spawn_outside_runtime_panics() {
1344        spawn_boxed(async {});
1345    }
1346
1347    fn test_slab() -> nexus_slab::byte::unbounded::Slab<256> {
1348        // SAFETY: single-threaded test.
1349        unsafe { nexus_slab::byte::unbounded::Slab::with_chunk_capacity(16) }
1350    }
1351
1352    #[test]
1353    #[should_panic(expected = "spawn_slab() called without a slab")]
1354    fn spawn_slab_without_slab_panics() {
1355        let mut wb = WorldBuilder::new();
1356        let mut world = wb.build();
1357        let mut rt = Runtime::new(&mut world);
1358
1359        rt.block_on(async {
1360            spawn_slab(async {});
1361        });
1362    }
1363
1364    #[test]
1365    fn spawn_slab_with_slab() {
1366        let mut wb = WorldBuilder::new();
1367        wb.register(Out(0));
1368        let mut world = wb.build();
1369
1370        let mut rt = Runtime::builder(&mut world)
1371            .slab_unbounded(test_slab())
1372            .build();
1373
1374        rt.block_on(async move {
1375            spawn_slab(async move {
1376                crate::WorldCtx::current().with_world(|world| {
1377                    world.resource_mut::<Out>().0 = 77;
1378                });
1379            });
1380
1381            YieldOnce(false).await;
1382        });
1383
1384        assert_eq!(world.resource::<Out>().0, 77);
1385    }
1386
1387    #[test]
1388    fn mixed_spawn_and_spawn_slab() {
1389        let mut wb = WorldBuilder::new();
1390        wb.register(Out(0));
1391        let mut world = wb.build();
1392
1393        let mut rt = Runtime::builder(&mut world)
1394            .slab_unbounded(test_slab())
1395            .build();
1396
1397        rt.block_on(async move {
1398            // Box-allocated
1399            spawn_boxed(async move {
1400                crate::WorldCtx::current().with_world(|world| {
1401                    world.resource_mut::<Out>().0 += 10;
1402                });
1403            });
1404            // Slab-allocated
1405            spawn_slab(async move {
1406                crate::WorldCtx::current().with_world(|world| {
1407                    world.resource_mut::<Out>().0 += 20;
1408                });
1409            });
1410
1411            YieldOnce(false).await;
1412        });
1413
1414        assert_eq!(world.resource::<Out>().0, 30);
1415    }
1416
1417    // =========================================================================
1418    // Claim API tests
1419    // =========================================================================
1420
1421    #[test]
1422    fn claim_slab_spawn_executes() {
1423        let mut wb = WorldBuilder::new();
1424        wb.register(Out(0));
1425        let mut world = wb.build();
1426
1427        let mut rt = Runtime::builder(&mut world)
1428            .slab_unbounded(test_slab())
1429            .build();
1430
1431        rt.block_on(async move {
1432            let claim = claim_slab();
1433            claim.spawn(async move {
1434                crate::WorldCtx::current().with_world(|world| {
1435                    world.resource_mut::<Out>().0 = 55;
1436                });
1437            });
1438
1439            YieldOnce(false).await;
1440        });
1441
1442        assert_eq!(world.resource::<Out>().0, 55);
1443    }
1444
1445    #[test]
1446    fn claim_slab_drop_returns_slot() {
1447        let mut wb = WorldBuilder::new();
1448        let mut world = wb.build();
1449
1450        let bounded = unsafe { nexus_slab::byte::bounded::Slab::<256>::with_capacity(1) };
1451        let mut rt = Runtime::builder(&mut world).slab_bounded(bounded).build();
1452
1453        rt.block_on(async {
1454            // Claim the only slot, then drop without spawning.
1455            let claim = claim_slab();
1456            drop(claim);
1457
1458            // Slot should be back — can claim again.
1459            let claim = claim_slab();
1460            claim.spawn(async {});
1461
1462            YieldOnce(false).await;
1463        });
1464    }
1465
1466    #[test]
1467    fn try_claim_slab_returns_none_when_full() {
1468        let mut wb = WorldBuilder::new();
1469        let mut world = wb.build();
1470
1471        let bounded = unsafe { nexus_slab::byte::bounded::Slab::<256>::with_capacity(1) };
1472        let mut rt = Runtime::builder(&mut world).slab_bounded(bounded).build();
1473
1474        rt.block_on(async {
1475            let _held = claim_slab(); // hold the only slot
1476            assert!(try_claim_slab().is_none());
1477        });
1478    }
1479
1480    #[test]
1481    fn mixed_spawn_boxed_and_claim_slab() {
1482        let mut wb = WorldBuilder::new();
1483        wb.register(Out(0));
1484        let mut world = wb.build();
1485
1486        let mut rt = Runtime::builder(&mut world)
1487            .slab_unbounded(test_slab())
1488            .build();
1489
1490        rt.block_on(async move {
1491            spawn_boxed(async move {
1492                crate::WorldCtx::current().with_world(|world| {
1493                    world.resource_mut::<Out>().0 += 10;
1494                });
1495            });
1496
1497            let claim = claim_slab();
1498            claim.spawn(async move {
1499                crate::WorldCtx::current().with_world(|world| {
1500                    world.resource_mut::<Out>().0 += 20;
1501                });
1502            });
1503
1504            YieldOnce(false).await;
1505        });
1506
1507        assert_eq!(world.resource::<Out>().0, 30);
1508    }
1509
1510    // =========================================================================
1511    // Timer tests
1512    // =========================================================================
1513
1514    #[test]
1515    fn sleep_completes() {
1516        let mut wb = WorldBuilder::new();
1517        wb.register(Out(0));
1518        let mut world = wb.build();
1519
1520        let mut rt = Runtime::new(&mut world);
1521
1522        let before = Instant::now();
1523        rt.block_on(async move {
1524            crate::context::sleep(Duration::from_millis(50)).await;
1525        });
1526        let elapsed = before.elapsed();
1527
1528        assert!(
1529            elapsed >= Duration::from_millis(40),
1530            "elapsed {elapsed:?} too short"
1531        );
1532        assert!(
1533            elapsed < Duration::from_millis(200),
1534            "elapsed {elapsed:?} too long"
1535        );
1536    }
1537
1538    #[test]
1539    fn sleep_in_spawned_task() {
1540        let mut wb = WorldBuilder::new();
1541        wb.register(Out(0));
1542        let mut world = wb.build();
1543
1544        let mut rt = Runtime::new(&mut world);
1545
1546        let before = Instant::now();
1547        rt.block_on(async move {
1548            spawn_boxed(async move {
1549                crate::context::sleep(Duration::from_millis(50)).await;
1550                crate::WorldCtx::current().with_world(|world| {
1551                    world.resource_mut::<Out>().0 = 42;
1552                });
1553            });
1554
1555            crate::context::sleep(Duration::from_millis(100)).await;
1556        });
1557
1558        let elapsed = before.elapsed();
1559        assert!(elapsed >= Duration::from_millis(80));
1560        assert_eq!(world.resource::<Out>().0, 42);
1561    }
1562
1563    #[test]
1564    fn sleep_zero_duration_ready_immediately() {
1565        let mut wb = WorldBuilder::new();
1566        let mut world = wb.build();
1567        let mut rt = Runtime::new(&mut world);
1568
1569        let before = Instant::now();
1570        rt.block_on(async move {
1571            crate::context::sleep(Duration::ZERO).await;
1572        });
1573        assert!(before.elapsed() < Duration::from_millis(10));
1574    }
1575
1576    #[test]
1577    fn sleep_past_deadline_ready_immediately() {
1578        let mut wb = WorldBuilder::new();
1579        let mut world = wb.build();
1580        let mut rt = Runtime::new(&mut world);
1581
1582        let past = Instant::now() - Duration::from_secs(1);
1583        let before = Instant::now();
1584        rt.block_on(async move {
1585            crate::context::sleep_until(past).await;
1586        });
1587        assert!(before.elapsed() < Duration::from_millis(10));
1588    }
1589
1590    // =========================================================================
1591    // Timeout tests
1592    // =========================================================================
1593
1594    #[test]
1595    fn timeout_completes_before_deadline() {
1596        let mut wb = WorldBuilder::new();
1597        let mut world = wb.build();
1598        let mut rt = Runtime::new(&mut world);
1599
1600        let result = rt.block_on(async {
1601            crate::context::timeout(Duration::from_millis(500), async { 42u64 }).await
1602        });
1603
1604        assert_eq!(result.unwrap(), 42);
1605    }
1606
1607    #[test]
1608    fn timeout_expires() {
1609        let mut wb = WorldBuilder::new();
1610        let mut world = wb.build();
1611        let mut rt = Runtime::new(&mut world);
1612
1613        let result = rt.block_on(async {
1614            crate::context::timeout(
1615                Duration::from_millis(10),
1616                crate::context::sleep(Duration::from_secs(10)),
1617            )
1618            .await
1619        });
1620
1621        assert!(result.is_err());
1622    }
1623
1624    // =========================================================================
1625    // Interval tests
1626    // =========================================================================
1627
1628    #[test]
1629    fn interval_ticks() {
1630        let mut wb = WorldBuilder::new();
1631        wb.register(Out(0));
1632        let mut world = wb.build();
1633        let mut rt = Runtime::new(&mut world);
1634
1635        let before = Instant::now();
1636        rt.block_on(async move {
1637            let mut iv = crate::context::interval(Duration::from_millis(20));
1638            iv.tick().await; // ~20ms
1639            iv.tick().await; // ~40ms
1640            iv.tick().await; // ~60ms
1641        });
1642        let elapsed = before.elapsed();
1643
1644        assert!(
1645            elapsed >= Duration::from_millis(50),
1646            "too fast: {elapsed:?}"
1647        );
1648        assert!(
1649            elapsed < Duration::from_millis(200),
1650            "too slow: {elapsed:?}"
1651        );
1652    }
1653
1654    // =========================================================================
1655    // yield_now tests
1656    // =========================================================================
1657
1658    #[test]
1659    fn yield_now_lets_other_tasks_run() {
1660        let mut wb = WorldBuilder::new();
1661        wb.register(Out(0));
1662        let mut world = wb.build();
1663        let mut rt = Runtime::new(&mut world);
1664
1665        rt.block_on(async move {
1666            spawn_boxed(async move {
1667                crate::WorldCtx::current().with_world(|world| {
1668                    world.resource_mut::<Out>().0 = 99;
1669                });
1670            });
1671
1672            // Yield so the spawned task gets a turn.
1673            crate::context::yield_now().await;
1674
1675            let val = crate::WorldCtx::current().with_world_ref(|world| world.resource::<Out>().0);
1676            assert_eq!(val, 99);
1677        });
1678    }
1679
1680    // =========================================================================
1681    // Test helpers
1682    // =========================================================================
1683
1684    struct YieldOnce(bool);
1685
1686    impl Future for YieldOnce {
1687        type Output = ();
1688        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1689            if self.0 {
1690                Poll::Ready(())
1691            } else {
1692                self.0 = true;
1693                cx.waker().wake_by_ref();
1694                Poll::Pending
1695            }
1696        }
1697    }
1698
1699    // =========================================================================
1700    // JoinHandle tests
1701    // =========================================================================
1702
1703    #[test]
1704    fn join_handle_await_gets_value() {
1705        let wb = WorldBuilder::new();
1706        let mut world = wb.build();
1707        let mut rt = Runtime::new(&mut world);
1708
1709        rt.block_on(async {
1710            let handle = spawn_boxed(async { 42u64 });
1711            let result = handle.await;
1712            assert_eq!(result, 42);
1713        });
1714    }
1715
1716    #[test]
1717    fn join_handle_await_string() {
1718        let wb = WorldBuilder::new();
1719        let mut world = wb.build();
1720        let mut rt = Runtime::new(&mut world);
1721
1722        rt.block_on(async {
1723            let handle = spawn_boxed(async { String::from("hello world") });
1724            let result = handle.await;
1725            assert_eq!(result, "hello world");
1726        });
1727    }
1728
1729    #[test]
1730    fn join_handle_detach() {
1731        use std::cell::Cell;
1732        use std::rc::Rc;
1733
1734        let wb = WorldBuilder::new();
1735        let mut world = wb.build();
1736        let mut rt = Runtime::new(&mut world);
1737
1738        let ran = Rc::new(Cell::new(false));
1739        let r = ran.clone();
1740
1741        rt.block_on(async move {
1742            // Spawn and immediately drop handle (detach).
1743            drop(spawn_boxed(async move {
1744                r.set(true);
1745            }));
1746            // Yield to let the spawned task run.
1747            crate::context::yield_now().await;
1748        });
1749
1750        assert!(ran.get());
1751    }
1752
1753    #[test]
1754    fn join_handle_is_finished() {
1755        let wb = WorldBuilder::new();
1756        let mut world = wb.build();
1757        let mut rt = Runtime::new(&mut world);
1758
1759        rt.block_on(async {
1760            let handle = spawn_boxed(async { 1 });
1761            // The task hasn't been polled yet.
1762            assert!(!handle.is_finished());
1763            // Yield to let the task run.
1764            crate::context::yield_now().await;
1765            assert!(handle.is_finished());
1766            let val = handle.await;
1767            assert_eq!(val, 1);
1768        });
1769    }
1770
1771    #[test]
1772    fn join_handle_abort_returns_true() {
1773        let wb = WorldBuilder::new();
1774        let mut world = wb.build();
1775        let mut rt = Runtime::new(&mut world);
1776
1777        rt.block_on(async {
1778            let handle = spawn_boxed(std::future::pending::<()>());
1779            assert!(handle.abort()); // was running, handle consumed
1780        });
1781    }
1782
1783    #[test]
1784    fn join_handle_abort_completed_returns_false() {
1785        let wb = WorldBuilder::new();
1786        let mut world = wb.build();
1787        let mut rt = Runtime::new(&mut world);
1788
1789        rt.block_on(async {
1790            let handle = spawn_boxed(async { 42 });
1791            crate::context::yield_now().await;
1792            assert!(handle.is_finished());
1793            assert!(!handle.abort()); // already done, handle consumed
1794        });
1795    }
1796
1797    #[test]
1798    fn join_handle_drop_after_completion_drops_output() {
1799        use std::cell::Cell;
1800        use std::rc::Rc;
1801
1802        let wb = WorldBuilder::new();
1803        let mut world = wb.build();
1804        let mut rt = Runtime::new(&mut world);
1805
1806        let drop_count = Rc::new(Cell::new(0u32));
1807        let dc = drop_count.clone();
1808
1809        struct DropCounter(Rc<Cell<u32>>);
1810        impl Drop for DropCounter {
1811            fn drop(&mut self) {
1812                self.0.set(self.0.get() + 1);
1813            }
1814        }
1815
1816        rt.block_on(async move {
1817            let handle = spawn_boxed(async move { DropCounter(dc) });
1818            // Let it complete.
1819            crate::context::yield_now().await;
1820            assert!(handle.is_finished());
1821            // Drop handle without reading — output should be dropped.
1822            drop(handle);
1823        });
1824
1825        assert_eq!(drop_count.get(), 1, "output should be dropped exactly once");
1826    }
1827
1828    #[test]
1829    fn join_handle_multiple_concurrent() {
1830        let wb = WorldBuilder::new();
1831        let mut world = wb.build();
1832        let mut rt = Runtime::new(&mut world);
1833
1834        rt.block_on(async {
1835            let h1 = spawn_boxed(async { 10u64 });
1836            let h2 = spawn_boxed(async { 20u64 });
1837            let h3 = spawn_boxed(async { 30u64 });
1838
1839            let r3 = h3.await;
1840            let r1 = h1.await;
1841            let r2 = h2.await;
1842
1843            assert_eq!(r1, 10);
1844            assert_eq!(r2, 20);
1845            assert_eq!(r3, 30);
1846        });
1847    }
1848
1849    #[test]
1850    fn join_handle_output_larger_than_future() {
1851        let wb = WorldBuilder::new();
1852        let mut world = wb.build();
1853        let mut rt = Runtime::new(&mut world);
1854
1855        rt.block_on(async {
1856            // The future is tiny, the output is large.
1857            let handle = spawn_boxed(async { [42u64; 32] });
1858            let result = handle.await;
1859            assert_eq!(result[0], 42);
1860            assert_eq!(result[31], 42);
1861        });
1862    }
1863}