Skip to main content

nexus_async_rt/
runtime.rs

1//! Single-threaded async runtime.
2//!
3//! [`Runtime`] owns an [`Executor`](crate::Executor) for spawned tasks, a
4//! boxed root future, and an event-cycle timestamp. The root future is
5//! driven to completion by [`block_on`](Runtime::block_on) or
6//! [`block_on_busy`](Runtime::block_on_busy).
7//!
8//! Two spawn strategies:
9//! - **`spawn_boxed()`** — Box-allocated. Default. No setup needed.
10//! - **`spawn_slab()`** — Slab-allocated. Zero-alloc hot path.
11//!   Requires slab configured via [`RuntimeBuilder::slab`].
12//!
13//! # Thread-local spawn
14//!
15//! [`spawn`] and [`spawn_slab`] are free functions that push tasks into
16//! the current runtime via thread-local pointers set during `block_on`.
17//! Calling them outside `block_on` panics.
18
19use std::cell::Cell;
20use std::future::Future;
21use std::pin::Pin;
22use std::task::{Context, Poll, Wake, Waker};
23use std::time::{Duration, Instant};
24
25use crate::io::IoDriver;
26use crate::task::JoinHandle;
27use crate::timer::TimerDriver;
28use crate::{Executor, WorldCtx};
29
30/// Default number of loop iterations between non-blocking IO polls.
31/// Matches tokio's heuristic (61, originally from Go's scheduler).
32const DEFAULT_EVENT_INTERVAL: u32 = 61;
33
34// =============================================================================
35// Thread-local spawn context
36// =============================================================================
37
38thread_local! {
39    /// Raw pointer to the active runtime's executor.
40    /// Set on `block_on` entry, cleared on exit.
41    static CURRENT: Cell<*mut Executor> = const { Cell::new(std::ptr::null_mut()) };
42}
43
44/// Spawn a Box-allocated task into the current runtime.
45///
46/// Returns a [`JoinHandle`] that can be awaited for the task's output.
47/// Drop the handle to detach the task.
48///
49/// Must be called from within [`Runtime::block_on`] or
50/// [`Runtime::block_on_busy`]. Panics otherwise.
51///
52/// # Panics
53///
54/// - If called outside a runtime context.
55pub fn spawn_boxed<F>(future: F) -> JoinHandle<F::Output>
56where
57    F: Future + 'static,
58    F::Output: 'static,
59{
60    CURRENT.with(|cell| {
61        let ptr = cell.get();
62        assert!(
63            !ptr.is_null(),
64            "spawn_boxed() called outside of Runtime::block_on"
65        );
66        // SAFETY: pointer valid for duration of block_on. Single-threaded.
67        let executor = unsafe { &mut *ptr };
68        executor.spawn_boxed(future)
69    })
70}
71
72/// Spawn a slab-allocated task into the current runtime.
73///
74/// Returns a [`JoinHandle`] that can be awaited for the task's output.
75/// Zero allocation — the task is placed directly into a pre-allocated
76/// slab slot via TLS.
77///
78/// # Panics
79///
80/// - If called outside a runtime context.
81/// - If no slab is configured.
82/// - If the slab is full (bounded slab).
83/// - If the task future exceeds the slab's slot capacity.
84pub fn spawn_slab<F>(future: F) -> JoinHandle<F::Output>
85where
86    F: Future + 'static,
87    F::Output: 'static,
88{
89    CURRENT.with(|cell| {
90        let ptr = cell.get();
91        assert!(
92            !ptr.is_null(),
93            "spawn_slab() called outside of Runtime::block_on"
94        );
95        let executor = unsafe { &mut *ptr };
96        let tracker_key = executor.next_tracker_key();
97        let task_ptr = crate::alloc::slab_spawn(future, tracker_key);
98        executor.spawn_raw(task_ptr);
99        JoinHandle::new(task_ptr)
100    })
101}
102
103/// Access the current executor via TLS. Panics if outside `block_on`.
104pub(crate) fn with_executor<R>(f: impl FnOnce(&mut Executor) -> R) -> R {
105    CURRENT.with(|cell| {
106        let ptr = cell.get();
107        assert!(!ptr.is_null(), "called outside of Runtime::block_on");
108        let executor = unsafe { &mut *ptr };
109        f(executor)
110    })
111}
112
113/// Try to reserve a slab slot. Returns `None` if the slab is full.
114///
115/// Call `.spawn(future)` on the returned [`SlabClaim`](crate::alloc::SlabClaim)
116/// to write a task and enqueue it. If dropped without spawning, the
117/// slot is returned to the freelist automatically.
118///
119/// # Panics
120///
121/// - If called outside a runtime context.
122/// - If no slab is configured.
123pub fn try_claim_slab() -> Option<crate::alloc::SlabClaim> {
124    CURRENT.with(|cell| {
125        assert!(
126            !cell.get().is_null(),
127            "try_claim_slab() called outside of Runtime::block_on"
128        );
129    });
130    crate::alloc::try_claim()
131}
132
133/// Reserve a slab slot. Panics if full or no slab configured.
134///
135/// Call `.spawn(future)` on the returned [`SlabClaim`](crate::alloc::SlabClaim)
136/// to write a task and enqueue it. If dropped without spawning, the
137/// slot is returned to the freelist automatically.
138///
139/// # Panics
140///
141/// - If called outside a runtime context.
142/// - If no slab is configured.
143/// - If the slab is full (bounded slab).
144pub fn claim_slab() -> crate::alloc::SlabClaim {
145    CURRENT.with(|cell| {
146        assert!(
147            !cell.get().is_null(),
148            "claim_slab() called outside of Runtime::block_on"
149        );
150    });
151    crate::alloc::claim()
152}
153
154// =============================================================================
155// Runtime
156// =============================================================================
157
158/// Single-threaded async runtime.
159///
160/// # Examples
161///
162/// ```ignore
163/// use nexus_async_rt::{Runtime, spawn_boxed, spawn_slab};
164/// use nexus_slab::byte::unbounded::Slab;
165/// use nexus_rt::WorldBuilder;
166///
167/// let mut world = WorldBuilder::new().build();
168///
169/// // Simple — Box-allocated tasks
170/// let mut rt = Runtime::new(&mut world);
171/// rt.block_on(async {
172///     spawn_boxed(async { /* Box-allocated */ });
173/// });
174///
175/// // With slab for hot-path tasks
176/// let slab = unsafe { Slab::<256>::with_chunk_capacity(64) };
177/// let mut rt = Runtime::builder(&mut world)
178///     .slab_unbounded(slab)
179///     .build();
180/// rt.block_on(async {
181///     spawn_boxed(async { /* Box-allocated */ });
182///     spawn_slab(async { /* slab-allocated */ });
183/// });
184/// ```
185pub struct Runtime {
186    /// Spawned task storage.
187    executor: Executor,
188
189    /// IO driver (mio).
190    io: IoDriver,
191
192    /// Timer driver.
193    timers: TimerDriver,
194
195    /// World access handle.
196    ctx: WorldCtx,
197
198    /// Event-cycle timestamp.
199    event_time: Cell<Instant>,
200
201    /// Graceful shutdown handle.
202    shutdown: crate::ShutdownHandle,
203
204    /// Cross-thread wake context. Shared with cross-thread wakers via Arc.
205    /// Contains the intrusive MPSC inbox + mio::Waker for eventfd.
206    cross_wake: std::sync::Arc<crate::cross_wake::CrossWakeContext>,
207
208    /// Max cross-thread wakes drained per poll cycle.
209    cross_thread_drain_limit: usize,
210
211    /// Loop iterations between non-blocking IO polls.
212    event_interval: u32,
213
214    /// Optional slab allocator. Stored as a boxed trait object for
215    /// type erasure (the const generic lives inside). The slab itself
216    /// is accessed via TLS fn pointers — this field just owns the memory.
217    _slab: Option<Box<dyn std::any::Any>>,
218
219    /// Slab TLS config for deferred installation in run_loop.
220    /// None = no slab (Box-only).
221    slab_tls: Option<crate::alloc::SlabTlsConfig>,
222}
223
224impl Runtime {
225    /// Create a runtime with default settings. Box-allocated tasks only.
226    ///
227    /// For slab allocation or custom configuration, use [`Runtime::builder`].
228    pub fn new(world: &mut nexus_rt::World) -> Self {
229        RuntimeBuilder::new(world).build()
230    }
231
232    /// Create a runtime via the builder pattern.
233    pub fn builder(world: &mut nexus_rt::World) -> RuntimeBuilder<'_> {
234        RuntimeBuilder::new(world)
235    }
236
237    /// Returns a [`ShutdownHandle`](crate::ShutdownHandle) for triggering or observing shutdown.
238    pub fn shutdown_handle(&self) -> crate::ShutdownHandle {
239        self.shutdown.clone()
240    }
241
242    /// Install signal handlers for SIGTERM and SIGINT.
243    pub fn install_signal_handlers(&self) {
244        crate::shutdown::install_signal_handlers(&self.shutdown.flag_ptr(), &self.io.mio_waker());
245    }
246
247    /// Number of live spawned tasks.
248    pub fn task_count(&self) -> usize {
249        self.executor.task_count()
250    }
251}
252
253// =============================================================================
254// RuntimeBuilder
255// =============================================================================
256
257/// Type-erased closure that boxes the slab and returns (ownership, TLS config).
258type SlabInstaller = Box<dyn FnOnce() -> (Box<dyn std::any::Any>, crate::alloc::SlabTlsConfig)>;
259
260/// Builder for configuring a [`Runtime`].
261///
262/// # Examples
263///
264/// ```ignore
265/// use nexus_async_rt::*;
266/// use nexus_slab::byte::unbounded::Slab;
267///
268/// let mut world = nexus_rt::WorldBuilder::new().build();
269/// let slab = unsafe { Slab::<256>::with_chunk_capacity(64) };
270///
271/// let mut rt = Runtime::builder(&mut world)
272///     .tasks_per_cycle(128)
273///     .slab_unbounded(slab)
274///     .signal_handlers(true)
275///     .build();
276/// ```
277pub struct RuntimeBuilder<'w> {
278    world: &'w mut nexus_rt::World,
279    tasks_per_cycle: usize,
280    cross_thread_drain_limit: usize,
281    event_interval: u32,
282    queue_capacity: usize,
283    event_capacity: usize,
284    token_capacity: usize,
285    signal_handlers: bool,
286    /// Type-erased slab + guard installer. None = no slab (Box-only).
287    slab_installer: Option<SlabInstaller>,
288}
289
290impl<'w> RuntimeBuilder<'w> {
291    fn new(world: &'w mut nexus_rt::World) -> Self {
292        Self {
293            world,
294            tasks_per_cycle: crate::DEFAULT_TASKS_PER_CYCLE,
295            cross_thread_drain_limit: usize::MAX,
296            event_interval: DEFAULT_EVENT_INTERVAL,
297            queue_capacity: 64,
298            event_capacity: 1024,
299            token_capacity: 64,
300            signal_handlers: false,
301            slab_installer: None,
302        }
303    }
304
305    /// Maximum tasks polled per cycle before yielding to check IO.
306    /// Default: 64.
307    pub fn tasks_per_cycle(mut self, limit: usize) -> Self {
308        self.tasks_per_cycle = limit;
309        self
310    }
311
312    /// Number of loop iterations between non-blocking IO driver polls.
313    /// Default: 61 (matches tokio's heuristic).
314    ///
315    /// Every `event_interval` iterations the runtime does a non-blocking
316    /// `epoll_wait(0)` to check for socket events, even if tasks are
317    /// ready. Lower values improve IO responsiveness at the cost of
318    /// more syscalls; higher values favor task throughput.
319    pub fn event_interval(mut self, n: u32) -> Self {
320        assert!(n > 0, "event_interval must be > 0");
321        self.event_interval = n;
322        self
323    }
324
325    /// Maximum cross-thread wakes drained per poll cycle.
326    /// Default: unlimited.
327    ///
328    /// Caps how many tasks woken from other threads are moved into the
329    /// local ready queue per iteration. Prevents a firehose of
330    /// cross-thread wakes from starving local tasks and IO. Remaining
331    /// wakes are drained on the next iteration.
332    pub fn cross_thread_drain_limit(mut self, limit: usize) -> Self {
333        self.cross_thread_drain_limit = limit;
334        self
335    }
336
337    /// Pre-allocated capacity for internal queues. Default: 64.
338    pub fn queue_capacity(mut self, cap: usize) -> Self {
339        self.queue_capacity = cap;
340        self
341    }
342
343    /// Maximum IO events processed per epoll cycle. Default: 1024.
344    pub fn event_capacity(mut self, cap: usize) -> Self {
345        self.event_capacity = cap;
346        self
347    }
348
349    /// Initial number of IO source slots. Default: 64.
350    pub fn token_capacity(mut self, cap: usize) -> Self {
351        self.token_capacity = cap;
352        self
353    }
354
355    /// Install SIGTERM/SIGINT signal handlers. Default: false.
356    pub fn signal_handlers(mut self, enable: bool) -> Self {
357        self.signal_handlers = enable;
358        self
359    }
360
361    /// Hand off a growable (unbounded) slab for [`spawn_slab`].
362    ///
363    /// `S` is the total slot size in bytes. The task header uses 64 bytes,
364    /// so `Slab<256>` gives 192 bytes for the future. Most async IO
365    /// futures are 128–256 bytes — `Slab<256>` or `Slab<512>` covers
366    /// the common cases.
367    ///
368    /// The slab grows by allocating new chunks when full. No task spawn
369    /// will ever fail due to capacity.
370    ///
371    /// # Examples
372    ///
373    /// ```ignore
374    /// use nexus_slab::byte::unbounded::Slab;
375    ///
376    /// // SAFETY: single-threaded runtime.
377    /// let slab = unsafe { Slab::<256>::with_chunk_capacity(64) };
378    ///
379    /// let mut rt = Runtime::builder(&mut world)
380    ///     .slab_unbounded(slab)
381    ///     .build();
382    /// ```
383    pub fn slab_unbounded<const S: usize>(
384        mut self,
385        slab: nexus_slab::byte::unbounded::Slab<S>,
386    ) -> Self {
387        const {
388            assert!(
389                S >= 64,
390                "slab slot size must be at least 64 bytes (TASK_HEADER_SIZE)"
391            );
392        }
393        self.slab_installer = Some(Box::new(move || {
394            let mut slab = Box::new(slab);
395            // Derive pointer via &mut to get write provenance. Using &ref
396            // gives read-only provenance under stacked borrows, but the
397            // allocator writes through this pointer.
398            let slab_ptr = std::ptr::from_mut(slab.as_mut()).cast::<u8>();
399            let config = crate::alloc::make_unbounded_config::<S>(slab_ptr);
400            (slab as Box<dyn std::any::Any>, config)
401        }));
402        self
403    }
404
405    /// Hand off a fixed-capacity (bounded) slab for [`spawn_slab`].
406    ///
407    /// `S` is the total slot size in bytes. The slab has a fixed number
408    /// of slots — `spawn_slab` panics if the slab is full. Use this
409    /// when you want deterministic memory usage and know the maximum
410    /// number of concurrent hot-path tasks.
411    ///
412    /// # Examples
413    ///
414    /// ```ignore
415    /// use nexus_slab::byte::bounded::Slab;
416    ///
417    /// // SAFETY: single-threaded runtime.
418    /// let slab = unsafe { Slab::<256>::with_capacity(64) };
419    ///
420    /// let mut rt = Runtime::builder(&mut world)
421    ///     .slab_bounded(slab)
422    ///     .build();
423    /// ```
424    pub fn slab_bounded<const S: usize>(
425        mut self,
426        slab: nexus_slab::byte::bounded::Slab<S>,
427    ) -> Self {
428        const {
429            assert!(
430                S >= 64,
431                "slab slot size must be at least 64 bytes (TASK_HEADER_SIZE)"
432            );
433        }
434        self.slab_installer = Some(Box::new(move || {
435            let mut slab = Box::new(slab);
436            // Derive pointer via &mut to get write provenance. Using &ref
437            // gives read-only provenance under stacked borrows, but the
438            // allocator writes through this pointer.
439            let slab_ptr = std::ptr::from_mut(slab.as_mut()).cast::<u8>();
440            let config = crate::alloc::make_bounded_config::<S>(slab_ptr);
441            (slab as Box<dyn std::any::Any>, config)
442        }));
443        self
444    }
445
446    /// Build the runtime.
447    pub fn build(self) -> Runtime {
448        let io = IoDriver::new(self.event_capacity, self.token_capacity)
449            .expect("failed to create mio::Poll");
450        let mut shutdown = crate::ShutdownHandle::new();
451        shutdown.set_mio_waker(io.mio_waker());
452
453        let mut executor = Executor::new(self.queue_capacity);
454        executor.set_tasks_per_cycle(self.tasks_per_cycle);
455
456        let ctx = WorldCtx::new(self.world);
457        let event_time = Cell::new(Instant::now());
458
459        // Create slab if configured. TLS is installed later in run_loop.
460        let (slab, slab_tls) = self.slab_installer.map_or((None, None), |install| {
461            let (slab, config) = install();
462            (Some(slab), Some(config))
463        });
464
465        let cross_wake = std::sync::Arc::new(crate::cross_wake::CrossWakeContext {
466            queue: crate::cross_wake::CrossWakeQueue::new(),
467            mio_waker: io.mio_waker(),
468            parked: std::sync::atomic::AtomicBool::new(false),
469        });
470
471        let rt = Runtime {
472            executor,
473            io,
474            timers: TimerDriver::new(64),
475            ctx,
476            event_time,
477            shutdown,
478            cross_wake,
479            cross_thread_drain_limit: self.cross_thread_drain_limit,
480            event_interval: self.event_interval,
481            _slab: slab,
482            slab_tls,
483        };
484
485        if self.signal_handlers {
486            rt.install_signal_handlers();
487        }
488
489        rt
490    }
491}
492
493// =============================================================================
494// block_on / run_loop
495// =============================================================================
496
497impl Runtime {
498    /// Drive the root future to completion. CPU-friendly.
499    ///
500    /// Parks the thread when no work is available.
501    pub fn block_on<F>(&mut self, future: F) -> F::Output
502    where
503        F: Future + 'static,
504    {
505        self.run_loop(future, ParkMode::Park)
506    }
507
508    /// Drive the root future to completion. Busy-wait.
509    ///
510    /// Never parks. Minimum wake latency at 100% CPU.
511    pub fn block_on_busy<F>(&mut self, future: F) -> F::Output
512    where
513        F: Future + 'static,
514    {
515        self.run_loop(future, ParkMode::Spin)
516    }
517
518    fn run_loop<F>(&mut self, future: F, mode: ParkMode) -> F::Output
519    where
520        F: Future + 'static,
521    {
522        // Install TLS context.
523        let _ctx_guard = crate::context::install(
524            self.ctx.as_ptr(),
525            &raw mut self.io,
526            &raw mut self.timers,
527            &raw const self.event_time,
528            std::sync::Arc::as_ptr(&self.shutdown.flag_ptr()),
529            std::ptr::from_ref(&self.shutdown.task_waker),
530        );
531
532        // Install slab TLS if configured (scoped to run_loop).
533        let _slab_guard = self.slab_tls.as_ref().map(crate::alloc::install_slab);
534
535        // Install cross-thread wake context in TLS.
536        let _cross_wake_guard = crate::cross_wake::install_cross_wake(&self.cross_wake);
537
538        let mut root: Pin<Box<dyn Future<Output = F::Output>>> = Box::pin(future);
539
540        let woken = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
541        let root_waker = Waker::from(std::sync::Arc::new(RootWake {
542            woken: std::sync::Arc::clone(&woken),
543            mio_waker: self.io.mio_waker(),
544        }));
545        let mut root_cx = Context::from_waker(&root_waker);
546
547        // Install spawn TLS.
548        let _spawn_guard = RuntimeGuard::enter(&raw mut self.executor);
549
550        // Install waker TLS: ready queue + deferred free list.
551        let (ready, deferred) = self.executor.poll_context_mut();
552        let _ready_guard = crate::waker::set_poll_context(ready, deferred);
553
554        self.event_time.set(Instant::now());
555
556        // The cross-thread queue uses interior mutability (UnsafeCell)
557        // for the consumer head. pop() takes &self, so a shared ref
558        // from the Arc is sufficient. No unsafe cast needed.
559        let cross_queue = &*self.cross_wake;
560
561        let mut tick: u32 = 0;
562
563        loop {
564            // 1. Poll root future if woken or shutdown requested.
565            if woken.swap(false, std::sync::atomic::Ordering::Acquire)
566                || self.shutdown.is_shutdown()
567            {
568                match root.as_mut().poll(&mut root_cx) {
569                    Poll::Ready(output) => return output,
570                    Poll::Pending => {}
571                }
572            }
573
574            // 2. Drain cross-thread inbox.
575            self.executor
576                .drain_cross_thread(&cross_queue.queue, self.cross_thread_drain_limit);
577
578            // 3. Poll ready tasks (up to tasks_per_cycle).
579            self.executor.poll();
580
581            // 4. Fire expired timers.
582            self.timers.fire_expired(Instant::now());
583
584            // 4.5. Set parked early (park mode only) so cross-thread
585            // wakers arriving from here on will poke the eventfd.
586            if matches!(mode, ParkMode::Park) {
587                cross_queue
588                    .parked
589                    .store(true, std::sync::atomic::Ordering::Release);
590            }
591
592            // 5. Drain cross-thread inbox again (wakes during step 3/4).
593            self.executor
594                .drain_cross_thread(&cross_queue.queue, self.cross_thread_drain_limit);
595
596            tick = tick.wrapping_add(1);
597
598            // 6. Periodic non-blocking IO check every event_interval ticks.
599            //    Prevents IO starvation under sustained task load.
600            if tick % self.event_interval == 0 {
601                if let Err(e) = self.io.poll_io(Some(Duration::ZERO)) {
602                    assert!(
603                        e.kind() == std::io::ErrorKind::Interrupted,
604                        "mio::Poll::poll failed: {e}"
605                    );
606                }
607                self.event_time.set(Instant::now());
608            }
609
610            // 7. If work remains, loop immediately.
611            let has_work =
612                self.executor.has_ready() || woken.load(std::sync::atomic::Ordering::Acquire);
613
614            if has_work {
615                if matches!(mode, ParkMode::Park) {
616                    cross_queue
617                        .parked
618                        .store(false, std::sync::atomic::Ordering::Release);
619                }
620                continue;
621            }
622
623            // 8. No work. Spin mode loops; park mode sleeps in epoll.
624            match mode {
625                ParkMode::Spin => {
626                    // Non-blocking IO check before spinning again.
627                    if let Err(e) = self.io.poll_io(Some(Duration::ZERO)) {
628                        assert!(
629                            e.kind() == std::io::ErrorKind::Interrupted,
630                            "mio::Poll::poll failed: {e}"
631                        );
632                    }
633                    self.event_time.set(Instant::now());
634                }
635                ParkMode::Park => {
636                    // parked is already true (set at step 4.5).
637                    // Park in epoll_wait until IO, timer, or cross-thread
638                    // eventfd wakes us.
639                    let timeout = self
640                        .timers
641                        .next_deadline()
642                        .map(|d| d.saturating_duration_since(Instant::now()));
643
644                    if let Err(e) = self.io.poll_io(timeout) {
645                        assert!(
646                            e.kind() == std::io::ErrorKind::Interrupted,
647                            "mio::Poll::poll failed: {e}"
648                        );
649                    }
650
651                    cross_queue
652                        .parked
653                        .store(false, std::sync::atomic::Ordering::Release);
654                    self.event_time.set(Instant::now());
655                }
656            }
657        }
658    }
659}
660
661// =============================================================================
662// Park mode
663// =============================================================================
664
665#[derive(Clone, Copy)]
666enum ParkMode {
667    Park,
668    Spin,
669}
670
671// =============================================================================
672// Root future waker
673// =============================================================================
674
675struct RootWake {
676    woken: std::sync::Arc<std::sync::atomic::AtomicBool>,
677    mio_waker: std::sync::Arc<mio::Waker>,
678}
679
680impl Wake for RootWake {
681    fn wake(self: std::sync::Arc<Self>) {
682        self.wake_by_ref();
683    }
684
685    fn wake_by_ref(self: &std::sync::Arc<Self>) {
686        let was_woken = self.woken.swap(true, std::sync::atomic::Ordering::Release);
687        if !was_woken {
688            let _ = self.mio_waker.wake();
689        }
690    }
691}
692
693// =============================================================================
694// RAII guard for spawn TLS
695// =============================================================================
696
697struct RuntimeGuard {
698    prev: *mut Executor,
699}
700
701impl RuntimeGuard {
702    fn enter(executor: *mut Executor) -> Self {
703        let prev = CURRENT.with(|cell| cell.replace(executor));
704        Self { prev }
705    }
706}
707
708impl Drop for RuntimeGuard {
709    fn drop(&mut self) {
710        CURRENT.with(|cell| cell.set(self.prev));
711    }
712}
713
714// =============================================================================
715// Tests
716// =============================================================================
717
718#[cfg(test)]
719mod tests {
720    use super::*;
721    use nexus_rt::{Handler, IntoHandler, Res, ResMut, WorldBuilder};
722
723    nexus_rt::new_resource!(Val(u64));
724    nexus_rt::new_resource!(Out(u64));
725
726    #[test]
727    fn block_on_returns_value() {
728        let mut wb = WorldBuilder::new();
729        wb.register(Val(42));
730        let mut world = wb.build();
731
732        let mut rt = Runtime::new(&mut world);
733        let result = rt.block_on(async { 42u64 });
734        assert_eq!(result, 42);
735    }
736
737    #[test]
738    fn block_on_with_world_access() {
739        let mut wb = WorldBuilder::new();
740        wb.register(Val(42));
741        wb.register(Out(0));
742        let mut world = wb.build();
743
744        let mut rt = Runtime::new(&mut world);
745
746        let result = rt.block_on(async move {
747            crate::context::with_world(|world| {
748                let v = world.resource::<Val>().0;
749                world.resource_mut::<Out>().0 = v + 10;
750            });
751            crate::context::with_world_ref(|world| world.resource::<Out>().0)
752        });
753
754        assert_eq!(result, 52);
755    }
756
757    #[test]
758    fn block_on_with_pre_resolved_handler() {
759        let mut wb = WorldBuilder::new();
760        wb.register(Val(42));
761        wb.register(Out(0));
762        let mut world = wb.build();
763
764        let mut rt = Runtime::new(&mut world);
765
766        let mut h = (|val: Res<Val>, mut out: ResMut<Out>, event: u64| {
767            out.0 = val.0 + event;
768        })
769        .into_handler(world.registry());
770
771        let result = rt.block_on(async move {
772            crate::context::with_world(|world| h.run(world, 10));
773            crate::context::with_world_ref(|world| world.resource::<Out>().0)
774        });
775
776        assert_eq!(result, 52);
777    }
778
779    #[test]
780    fn spawn_from_root_future() {
781        let mut wb = WorldBuilder::new();
782        wb.register(Out(0));
783        let mut world = wb.build();
784
785        let mut rt = Runtime::new(&mut world);
786
787        rt.block_on(async move {
788            for i in 1..=3u64 {
789                spawn_boxed(async move {
790                    crate::context::with_world(|world| {
791                        world.resource_mut::<Out>().0 += i;
792                    });
793                });
794            }
795
796            YieldOnce(false).await;
797        });
798
799        assert_eq!(world.resource::<Out>().0, 6);
800    }
801
802    #[test]
803    fn block_on_busy_returns_value() {
804        let mut wb = WorldBuilder::new();
805        wb.register(Val(7));
806        let mut world = wb.build();
807
808        let mut rt = Runtime::new(&mut world);
809        let result = rt.block_on_busy(async { 6 * 7 });
810        assert_eq!(result, 42);
811    }
812
813    #[test]
814    fn block_on_busy_with_spawned_tasks() {
815        let mut wb = WorldBuilder::new();
816        wb.register(Out(0));
817        let mut world = wb.build();
818
819        let mut rt = Runtime::new(&mut world);
820
821        rt.block_on_busy(async move {
822            spawn_boxed(async move {
823                crate::context::with_world(|world| {
824                    world.resource_mut::<Out>().0 = 99;
825                });
826            });
827
828            YieldOnce(false).await;
829        });
830
831        assert_eq!(world.resource::<Out>().0, 99);
832    }
833
834    #[test]
835    fn event_time_is_set() {
836        let mut wb = WorldBuilder::new();
837        wb.register(Val(0));
838        let mut world = wb.build();
839
840        let mut rt = Runtime::new(&mut world);
841
842        let before = Instant::now();
843        rt.block_on(async move {
844            let t = crate::context::event_time();
845            assert!(t >= before);
846        });
847    }
848
849    #[test]
850    #[should_panic(expected = "spawn_boxed() called outside of Runtime::block_on")]
851    fn spawn_outside_runtime_panics() {
852        spawn_boxed(async {});
853    }
854
855    fn test_slab() -> nexus_slab::byte::unbounded::Slab<256> {
856        // SAFETY: single-threaded test.
857        unsafe { nexus_slab::byte::unbounded::Slab::with_chunk_capacity(16) }
858    }
859
860    #[test]
861    #[should_panic(expected = "spawn_slab() called without a slab")]
862    fn spawn_slab_without_slab_panics() {
863        let mut wb = WorldBuilder::new();
864        let mut world = wb.build();
865        let mut rt = Runtime::new(&mut world);
866
867        rt.block_on(async {
868            spawn_slab(async {});
869        });
870    }
871
872    #[test]
873    fn spawn_slab_with_slab() {
874        let mut wb = WorldBuilder::new();
875        wb.register(Out(0));
876        let mut world = wb.build();
877
878        let mut rt = Runtime::builder(&mut world)
879            .slab_unbounded(test_slab())
880            .build();
881
882        rt.block_on(async move {
883            spawn_slab(async move {
884                crate::context::with_world(|world| {
885                    world.resource_mut::<Out>().0 = 77;
886                });
887            });
888
889            YieldOnce(false).await;
890        });
891
892        assert_eq!(world.resource::<Out>().0, 77);
893    }
894
895    #[test]
896    fn mixed_spawn_and_spawn_slab() {
897        let mut wb = WorldBuilder::new();
898        wb.register(Out(0));
899        let mut world = wb.build();
900
901        let mut rt = Runtime::builder(&mut world)
902            .slab_unbounded(test_slab())
903            .build();
904
905        rt.block_on(async move {
906            // Box-allocated
907            spawn_boxed(async move {
908                crate::context::with_world(|world| {
909                    world.resource_mut::<Out>().0 += 10;
910                });
911            });
912            // Slab-allocated
913            spawn_slab(async move {
914                crate::context::with_world(|world| {
915                    world.resource_mut::<Out>().0 += 20;
916                });
917            });
918
919            YieldOnce(false).await;
920        });
921
922        assert_eq!(world.resource::<Out>().0, 30);
923    }
924
925    // =========================================================================
926    // Claim API tests
927    // =========================================================================
928
929    #[test]
930    fn claim_slab_spawn_executes() {
931        let mut wb = WorldBuilder::new();
932        wb.register(Out(0));
933        let mut world = wb.build();
934
935        let mut rt = Runtime::builder(&mut world)
936            .slab_unbounded(test_slab())
937            .build();
938
939        rt.block_on(async move {
940            let claim = claim_slab();
941            claim.spawn(async move {
942                crate::context::with_world(|world| {
943                    world.resource_mut::<Out>().0 = 55;
944                });
945            });
946
947            YieldOnce(false).await;
948        });
949
950        assert_eq!(world.resource::<Out>().0, 55);
951    }
952
953    #[test]
954    fn claim_slab_drop_returns_slot() {
955        let mut wb = WorldBuilder::new();
956        let mut world = wb.build();
957
958        let bounded = unsafe { nexus_slab::byte::bounded::Slab::<256>::with_capacity(1) };
959        let mut rt = Runtime::builder(&mut world).slab_bounded(bounded).build();
960
961        rt.block_on(async {
962            // Claim the only slot, then drop without spawning.
963            let claim = claim_slab();
964            drop(claim);
965
966            // Slot should be back — can claim again.
967            let claim = claim_slab();
968            claim.spawn(async {});
969
970            YieldOnce(false).await;
971        });
972    }
973
974    #[test]
975    fn try_claim_slab_returns_none_when_full() {
976        let mut wb = WorldBuilder::new();
977        let mut world = wb.build();
978
979        let bounded = unsafe { nexus_slab::byte::bounded::Slab::<256>::with_capacity(1) };
980        let mut rt = Runtime::builder(&mut world).slab_bounded(bounded).build();
981
982        rt.block_on(async {
983            let _held = claim_slab(); // hold the only slot
984            assert!(try_claim_slab().is_none());
985        });
986    }
987
988    #[test]
989    fn mixed_spawn_boxed_and_claim_slab() {
990        let mut wb = WorldBuilder::new();
991        wb.register(Out(0));
992        let mut world = wb.build();
993
994        let mut rt = Runtime::builder(&mut world)
995            .slab_unbounded(test_slab())
996            .build();
997
998        rt.block_on(async move {
999            spawn_boxed(async move {
1000                crate::context::with_world(|world| {
1001                    world.resource_mut::<Out>().0 += 10;
1002                });
1003            });
1004
1005            let claim = claim_slab();
1006            claim.spawn(async move {
1007                crate::context::with_world(|world| {
1008                    world.resource_mut::<Out>().0 += 20;
1009                });
1010            });
1011
1012            YieldOnce(false).await;
1013        });
1014
1015        assert_eq!(world.resource::<Out>().0, 30);
1016    }
1017
1018    // =========================================================================
1019    // Timer tests
1020    // =========================================================================
1021
1022    #[test]
1023    fn sleep_completes() {
1024        let mut wb = WorldBuilder::new();
1025        wb.register(Out(0));
1026        let mut world = wb.build();
1027
1028        let mut rt = Runtime::new(&mut world);
1029
1030        let before = Instant::now();
1031        rt.block_on(async move {
1032            crate::context::sleep(Duration::from_millis(50)).await;
1033        });
1034        let elapsed = before.elapsed();
1035
1036        assert!(
1037            elapsed >= Duration::from_millis(40),
1038            "elapsed {elapsed:?} too short"
1039        );
1040        assert!(
1041            elapsed < Duration::from_millis(200),
1042            "elapsed {elapsed:?} too long"
1043        );
1044    }
1045
1046    #[test]
1047    fn sleep_in_spawned_task() {
1048        let mut wb = WorldBuilder::new();
1049        wb.register(Out(0));
1050        let mut world = wb.build();
1051
1052        let mut rt = Runtime::new(&mut world);
1053
1054        let before = Instant::now();
1055        rt.block_on(async move {
1056            spawn_boxed(async move {
1057                crate::context::sleep(Duration::from_millis(50)).await;
1058                crate::context::with_world(|world| {
1059                    world.resource_mut::<Out>().0 = 42;
1060                });
1061            });
1062
1063            crate::context::sleep(Duration::from_millis(100)).await;
1064        });
1065
1066        let elapsed = before.elapsed();
1067        assert!(elapsed >= Duration::from_millis(80));
1068        assert_eq!(world.resource::<Out>().0, 42);
1069    }
1070
1071    #[test]
1072    fn sleep_zero_duration_ready_immediately() {
1073        let mut wb = WorldBuilder::new();
1074        let mut world = wb.build();
1075        let mut rt = Runtime::new(&mut world);
1076
1077        let before = Instant::now();
1078        rt.block_on(async move {
1079            crate::context::sleep(Duration::ZERO).await;
1080        });
1081        assert!(before.elapsed() < Duration::from_millis(10));
1082    }
1083
1084    #[test]
1085    fn sleep_past_deadline_ready_immediately() {
1086        let mut wb = WorldBuilder::new();
1087        let mut world = wb.build();
1088        let mut rt = Runtime::new(&mut world);
1089
1090        let past = Instant::now() - Duration::from_secs(1);
1091        let before = Instant::now();
1092        rt.block_on(async move {
1093            crate::context::sleep_until(past).await;
1094        });
1095        assert!(before.elapsed() < Duration::from_millis(10));
1096    }
1097
1098    // =========================================================================
1099    // Timeout tests
1100    // =========================================================================
1101
1102    #[test]
1103    fn timeout_completes_before_deadline() {
1104        let mut wb = WorldBuilder::new();
1105        let mut world = wb.build();
1106        let mut rt = Runtime::new(&mut world);
1107
1108        let result = rt.block_on(async {
1109            crate::context::timeout(Duration::from_millis(500), async { 42u64 }).await
1110        });
1111
1112        assert_eq!(result.unwrap(), 42);
1113    }
1114
1115    #[test]
1116    fn timeout_expires() {
1117        let mut wb = WorldBuilder::new();
1118        let mut world = wb.build();
1119        let mut rt = Runtime::new(&mut world);
1120
1121        let result = rt.block_on(async {
1122            crate::context::timeout(
1123                Duration::from_millis(10),
1124                crate::context::sleep(Duration::from_secs(10)),
1125            )
1126            .await
1127        });
1128
1129        assert!(result.is_err());
1130    }
1131
1132    // =========================================================================
1133    // Interval tests
1134    // =========================================================================
1135
1136    #[test]
1137    fn interval_ticks() {
1138        let mut wb = WorldBuilder::new();
1139        wb.register(Out(0));
1140        let mut world = wb.build();
1141        let mut rt = Runtime::new(&mut world);
1142
1143        let before = Instant::now();
1144        rt.block_on(async move {
1145            let mut iv = crate::context::interval(Duration::from_millis(20));
1146            iv.tick().await; // ~20ms
1147            iv.tick().await; // ~40ms
1148            iv.tick().await; // ~60ms
1149        });
1150        let elapsed = before.elapsed();
1151
1152        assert!(
1153            elapsed >= Duration::from_millis(50),
1154            "too fast: {elapsed:?}"
1155        );
1156        assert!(
1157            elapsed < Duration::from_millis(200),
1158            "too slow: {elapsed:?}"
1159        );
1160    }
1161
1162    // =========================================================================
1163    // yield_now tests
1164    // =========================================================================
1165
1166    #[test]
1167    fn yield_now_lets_other_tasks_run() {
1168        let mut wb = WorldBuilder::new();
1169        wb.register(Out(0));
1170        let mut world = wb.build();
1171        let mut rt = Runtime::new(&mut world);
1172
1173        rt.block_on(async move {
1174            spawn_boxed(async move {
1175                crate::context::with_world(|world| {
1176                    world.resource_mut::<Out>().0 = 99;
1177                });
1178            });
1179
1180            // Yield so the spawned task gets a turn.
1181            crate::context::yield_now().await;
1182
1183            let val = crate::context::with_world_ref(|world| world.resource::<Out>().0);
1184            assert_eq!(val, 99);
1185        });
1186    }
1187
1188    // =========================================================================
1189    // Test helpers
1190    // =========================================================================
1191
1192    struct YieldOnce(bool);
1193
1194    impl Future for YieldOnce {
1195        type Output = ();
1196        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1197            if self.0 {
1198                Poll::Ready(())
1199            } else {
1200                self.0 = true;
1201                cx.waker().wake_by_ref();
1202                Poll::Pending
1203            }
1204        }
1205    }
1206
1207    // =========================================================================
1208    // JoinHandle tests
1209    // =========================================================================
1210
1211    #[test]
1212    fn join_handle_await_gets_value() {
1213        let wb = WorldBuilder::new();
1214        let mut world = wb.build();
1215        let mut rt = Runtime::new(&mut world);
1216
1217        rt.block_on(async {
1218            let handle = spawn_boxed(async { 42u64 });
1219            let result = handle.await;
1220            assert_eq!(result, 42);
1221        });
1222    }
1223
1224    #[test]
1225    fn join_handle_await_string() {
1226        let wb = WorldBuilder::new();
1227        let mut world = wb.build();
1228        let mut rt = Runtime::new(&mut world);
1229
1230        rt.block_on(async {
1231            let handle = spawn_boxed(async { String::from("hello world") });
1232            let result = handle.await;
1233            assert_eq!(result, "hello world");
1234        });
1235    }
1236
1237    #[test]
1238    fn join_handle_detach() {
1239        use std::cell::Cell;
1240        use std::rc::Rc;
1241
1242        let wb = WorldBuilder::new();
1243        let mut world = wb.build();
1244        let mut rt = Runtime::new(&mut world);
1245
1246        let ran = Rc::new(Cell::new(false));
1247        let r = ran.clone();
1248
1249        rt.block_on(async move {
1250            // Spawn and immediately drop handle (detach).
1251            drop(spawn_boxed(async move {
1252                r.set(true);
1253            }));
1254            // Yield to let the spawned task run.
1255            crate::context::yield_now().await;
1256        });
1257
1258        assert!(ran.get());
1259    }
1260
1261    #[test]
1262    fn join_handle_is_finished() {
1263        let wb = WorldBuilder::new();
1264        let mut world = wb.build();
1265        let mut rt = Runtime::new(&mut world);
1266
1267        rt.block_on(async {
1268            let handle = spawn_boxed(async { 1 });
1269            // The task hasn't been polled yet.
1270            assert!(!handle.is_finished());
1271            // Yield to let the task run.
1272            crate::context::yield_now().await;
1273            assert!(handle.is_finished());
1274            let val = handle.await;
1275            assert_eq!(val, 1);
1276        });
1277    }
1278
1279    #[test]
1280    fn join_handle_abort_returns_true() {
1281        let wb = WorldBuilder::new();
1282        let mut world = wb.build();
1283        let mut rt = Runtime::new(&mut world);
1284
1285        rt.block_on(async {
1286            let handle = spawn_boxed(std::future::pending::<()>());
1287            assert!(handle.abort()); // was running, handle consumed
1288        });
1289    }
1290
1291    #[test]
1292    fn join_handle_abort_completed_returns_false() {
1293        let wb = WorldBuilder::new();
1294        let mut world = wb.build();
1295        let mut rt = Runtime::new(&mut world);
1296
1297        rt.block_on(async {
1298            let handle = spawn_boxed(async { 42 });
1299            crate::context::yield_now().await;
1300            assert!(handle.is_finished());
1301            assert!(!handle.abort()); // already done, handle consumed
1302        });
1303    }
1304
1305    #[test]
1306    fn join_handle_drop_after_completion_drops_output() {
1307        use std::cell::Cell;
1308        use std::rc::Rc;
1309
1310        let wb = WorldBuilder::new();
1311        let mut world = wb.build();
1312        let mut rt = Runtime::new(&mut world);
1313
1314        let drop_count = Rc::new(Cell::new(0u32));
1315        let dc = drop_count.clone();
1316
1317        struct DropCounter(Rc<Cell<u32>>);
1318        impl Drop for DropCounter {
1319            fn drop(&mut self) {
1320                self.0.set(self.0.get() + 1);
1321            }
1322        }
1323
1324        rt.block_on(async move {
1325            let handle = spawn_boxed(async move { DropCounter(dc) });
1326            // Let it complete.
1327            crate::context::yield_now().await;
1328            assert!(handle.is_finished());
1329            // Drop handle without reading — output should be dropped.
1330            drop(handle);
1331        });
1332
1333        assert_eq!(drop_count.get(), 1, "output should be dropped exactly once");
1334    }
1335
1336    #[test]
1337    fn join_handle_multiple_concurrent() {
1338        let wb = WorldBuilder::new();
1339        let mut world = wb.build();
1340        let mut rt = Runtime::new(&mut world);
1341
1342        rt.block_on(async {
1343            let h1 = spawn_boxed(async { 10u64 });
1344            let h2 = spawn_boxed(async { 20u64 });
1345            let h3 = spawn_boxed(async { 30u64 });
1346
1347            let r3 = h3.await;
1348            let r1 = h1.await;
1349            let r2 = h2.await;
1350
1351            assert_eq!(r1, 10);
1352            assert_eq!(r2, 20);
1353            assert_eq!(r3, 30);
1354        });
1355    }
1356
1357    #[test]
1358    fn join_handle_output_larger_than_future() {
1359        let wb = WorldBuilder::new();
1360        let mut world = wb.build();
1361        let mut rt = Runtime::new(&mut world);
1362
1363        rt.block_on(async {
1364            // The future is tiny, the output is large.
1365            let handle = spawn_boxed(async { [42u64; 32] });
1366            let result = handle.await;
1367            assert_eq!(result[0], 42);
1368            assert_eq!(result[31], 42);
1369        });
1370    }
1371}