Skip to main content

nexus_async_rt/
lib.rs

1//! Single-threaded async runtime.
2//!
3//! Two spawn strategies:
4//! - **`spawn_boxed()`** — Box-allocated. Default. No setup needed.
5//! - **`spawn_slab()`** — Slab-allocated. Pre-allocated, zero-alloc
6//!   hot path. Requires slab configured via [`RuntimeBuilder::slab_unbounded`] or [`RuntimeBuilder::slab_bounded`].
7//!
8//! ```ignore
9//! use nexus_async_rt::*;
10//! use nexus_slab::byte::unbounded::Slab;
11//! use nexus_rt::WorldBuilder;
12//!
13//! let mut world = WorldBuilder::new().build();
14//!
15//! // Simple — Box-allocated tasks, no slab setup
16//! let mut rt = Runtime::new(&mut world);
17//! rt.block_on(async {
18//!     spawn_boxed(async { /* Box-allocated */ });
19//! });
20//!
21//! // Power user — with slab for hot-path tasks
22//! // SAFETY: single-threaded runtime.
23//! let slab = unsafe { Slab::<256>::with_chunk_capacity(64) };
24//! let mut rt = Runtime::builder(&mut world)
25//!     .slab_unbounded(slab)
26//!     .build();
27//! rt.block_on(async {
28//!     spawn_boxed(async { /* Box-allocated, long-lived */ });
29//!     spawn_slab(async { /* slab-allocated, hot path */ });
30//! });
31//! ```
32
33// Single-threaded runtime — futures are intentionally !Send.
34#![allow(clippy::future_not_send)]
35#![cfg(unix)]
36
37mod alloc;
38mod backoff;
39mod cancel;
40pub mod channel;
41mod context;
42pub(crate) mod cross_wake;
43mod io;
44pub mod net;
45mod runtime;
46mod shutdown;
47mod task;
48mod timer;
49#[cfg(feature = "tokio-compat")]
50pub mod tokio_compat;
51#[cfg(feature = "tokio-compat")]
52pub use tokio_compat::{TokioJoinError, TokioJoinHandle, spawn_on_tokio};
53mod waker;
54mod world_ctx;
55
56// Re-export slab type for convenience — users create the slab and hand it to the builder.
57pub use alloc::SlabClaim;
58pub use backoff::{Backoff, BackoffBuilder, Exhausted};
59pub use cancel::{CancellationToken, DropGuard};
60pub use context::{
61    after, after_delay, event_time, interval, interval_at, io, shutdown_signal, sleep, sleep_until,
62    timeout, timeout_at, with_world, with_world_ref, yield_now,
63};
64pub use io::IoHandle;
65pub use net::{
66    AsyncRead, AsyncWrite, OwnedReadHalf, OwnedWriteHalf, ReadHalf, TcpListener, TcpSocket,
67    TcpStream, UdpSocket, WriteHalf,
68};
69pub use nexus_slab::byte::unbounded::Slab as ByteSlab;
70pub use runtime::{Runtime, RuntimeBuilder, claim_slab, spawn_boxed, spawn_slab, try_claim_slab};
71pub use shutdown::{ShutdownHandle, ShutdownSignal};
72pub use task::{JoinHandle, TASK_HEADER_SIZE};
73pub use timer::{Elapsed, Interval, MissedTickBehavior, Sleep, Timeout, TimerHandle, YieldNow};
74pub use world_ctx::WorldCtx;
75
76use std::future::Future;
77use std::task::{Context, Poll};
78
79use waker::set_poll_context;
80
81/// Recommended minimum slab slot size.
82///
83/// The actual minimum depends on the task: header (64 bytes) + `max(size_of::<F>(),
84/// size_of::<T>())`. ZST futures need only 64 bytes. 128 is a conservative default
85/// that covers most small futures.
86pub const MIN_SLOT_SIZE: usize = 128;
87
88// =============================================================================
89// Executor
90// =============================================================================
91
92/// Single-threaded async executor.
93///
94/// Manages task lifecycle: spawn, poll, complete, free. Tasks are
95/// allocated via Box (default) or slab (via `spawn_slab`). Each
96/// task's header contains a `free_fn` that knows how to deallocate
97/// its own storage — the executor doesn't know or care which
98/// allocator was used.
99pub struct Executor {
100    /// Incoming ready tasks. Wakers and spawn push here.
101    /// Swapped with `draining` at the start of each poll cycle.
102    incoming: Vec<*mut u8>,
103
104    /// Tasks being drained this cycle. Iterated linearly.
105    draining: Vec<*mut u8>,
106
107    /// All live task pointers. Slab-indexed for O(1) removal.
108    all_tasks: slab::Slab<*mut u8>,
109
110    /// Number of live tasks.
111    live_count: usize,
112
113    /// Maximum tasks to poll per cycle before yielding to IO.
114    tasks_per_cycle: usize,
115
116    /// Completed task slots awaiting deferred free.
117    deferred_free: Vec<*mut u8>,
118}
119
120/// Default poll limit.
121const DEFAULT_TASKS_PER_CYCLE: usize = 64;
122
123impl Executor {
124    /// Create an executor.
125    pub fn new(initial_capacity: usize) -> Self {
126        Self {
127            incoming: Vec::with_capacity(initial_capacity),
128            draining: Vec::with_capacity(initial_capacity),
129            all_tasks: slab::Slab::with_capacity(initial_capacity),
130            live_count: 0,
131            tasks_per_cycle: DEFAULT_TASKS_PER_CYCLE,
132            deferred_free: Vec::new(),
133        }
134    }
135
136    /// Reserve a tracker key for external allocation (slab spawn).
137    pub(crate) fn next_tracker_key(&self) -> u32 {
138        let key = self.all_tasks.vacant_key();
139        debug_assert!(
140            u32::try_from(key).is_ok(),
141            "more than 4 billion concurrent tasks — tracker_key overflow"
142        );
143        key as u32
144    }
145
146    /// Spawn an async task via Box allocation. Returns a [`JoinHandle`]
147    /// that can be awaited for the task's output.
148    pub fn spawn_boxed<F>(&mut self, future: F) -> task::JoinHandle<F::Output>
149    where
150        F: Future + 'static,
151        F::Output: 'static,
152    {
153        let tracker_key = self.all_tasks.vacant_key();
154        debug_assert!(
155            u32::try_from(tracker_key).is_ok(),
156            "more than 4 billion concurrent tasks — tracker_key overflow"
157        );
158        let ptr = task::box_spawn_joinable(future, tracker_key as u32);
159
160        self.enqueue(ptr);
161        task::JoinHandle::new(ptr)
162    }
163
164    /// Spawn a task with a pre-allocated pointer (from slab).
165    ///
166    /// The task at `ptr` must have been constructed with joinable or
167    /// fire-and-forget constructors and a valid `free_fn`.
168    pub(crate) fn spawn_raw(&mut self, ptr: *mut u8) {
169        self.enqueue(ptr);
170    }
171
172    /// Common enqueue logic for spawn and spawn_raw.
173    fn enqueue(&mut self, ptr: *mut u8) {
174        self.all_tasks.insert(ptr);
175        unsafe { task::set_queued(ptr, true) };
176        self.incoming.push(ptr);
177        self.live_count += 1;
178    }
179
180    /// Drain the cross-thread wake inbox into the local ready queue.
181    ///
182    /// Called at the start of each poll cycle. Tasks pushed from other
183    /// threads via `CrossWakeQueue::push` are moved into `incoming`.
184    /// Completed tasks are routed to `deferred_free` instead — they
185    /// were pushed for cleanup (not re-polling) by `cross_task_drop`.
186    /// Drains at most `limit` tasks (remaining are picked up next cycle).
187    pub(crate) fn drain_cross_thread(
188        &mut self,
189        inbox: &crate::cross_wake::CrossWakeQueue,
190        limit: usize,
191    ) {
192        let mut drained = 0;
193        while drained < limit {
194            match inbox.pop() {
195                Some(task_ptr) => {
196                    if unsafe { task::is_completed(task_ptr) } {
197                        self.deferred_free.push(task_ptr);
198                    } else {
199                        self.incoming.push(task_ptr);
200                    }
201                    drained += 1;
202                }
203                None => break,
204            }
205        }
206    }
207
208    /// Poll all ready tasks once.
209    pub fn poll(&mut self) -> usize {
210        let mut completed = 0;
211
212        // Drain deferred frees from last cycle.
213        for ptr in self.deferred_free.drain(..) {
214            let key = unsafe { task::tracker_key(ptr) } as usize;
215            // SAFETY: free_fn was set at spawn time.
216            unsafe { task::free_task(ptr) };
217            if self.all_tasks.contains(key) {
218                self.all_tasks.remove(key);
219            }
220        }
221
222        std::mem::swap(&mut self.incoming, &mut self.draining);
223
224        let _guard = set_poll_context(&mut self.incoming, &mut self.deferred_free);
225
226        let limit = self.tasks_per_cycle.min(self.draining.len());
227        let draining_ptr: *const Vec<*mut u8> = &raw const self.draining;
228        let drain_slice = unsafe { &(&*draining_ptr)[..limit] };
229
230        for &ptr in drain_slice {
231            if unsafe { task::is_completed(ptr) } {
232                continue;
233            }
234
235            unsafe { task::set_queued(ptr, false) };
236
237            // SAFETY: ptr is a live task, ref_count >= 1 (executor holds a ref).
238            // task_waker increments ref_count; drop after poll decrements it.
239            let waker = unsafe { crate::waker::task_waker(ptr) };
240            let mut cx = Context::from_waker(&waker);
241
242            let poll_result = unsafe { task::poll_task(ptr, &mut cx) };
243
244            drop(waker);
245
246            match poll_result {
247                Poll::Pending => {}
248                Poll::Ready(()) => {
249                    self.complete_task(ptr);
250                    completed += 1;
251                }
252            }
253        }
254
255        if limit < self.draining.len() {
256            self.incoming.extend_from_slice(&self.draining[limit..]);
257        }
258        self.draining.clear();
259
260        completed
261    }
262
263    /// Number of live tasks.
264    pub fn task_count(&self) -> usize {
265        self.live_count
266    }
267
268    /// Returns `true` if any tasks are queued for polling.
269    pub fn has_ready(&self) -> bool {
270        !self.incoming.is_empty()
271    }
272
273    /// Set the maximum tasks to poll per cycle.
274    pub fn set_tasks_per_cycle(&mut self, limit: usize) {
275        self.tasks_per_cycle = limit;
276    }
277
278    /// Complete a task: handle joinable vs fire-and-forget paths.
279    ///
280    /// Three branches based on task state:
281    /// - **Aborted:** drop F (still live — poll_join short-circuited), notify joiner
282    /// - **Joinable (HAS_JOIN):** T is live in the union, don't touch it — JoinHandle owns it
283    /// - **Fire-and-forget / detached:** drop the value (F or T) and free
284    ///
285    /// # Safety invariants
286    ///
287    /// `ptr` must point to a task that just returned `Poll::Ready(())` from poll_task.
288    /// All accessor calls are safe because the task is live and single-threaded.
289    fn complete_task(&mut self, ptr: *mut u8) {
290        let aborted = unsafe { task::is_aborted(ptr) };
291
292        if aborted {
293            // Aborted: poll_join saw ABORTED and returned Ready without polling F.
294            // F is still live in the union. drop_fn still targets F.
295            // SAFETY: drop_fn = drop_future_in_union::<F>, F is live.
296            unsafe { task::drop_task_future(ptr) };
297            unsafe { task::set_completed(ptr) };
298            self.live_count -= 1;
299
300            if unsafe { task::has_join(ptr) } {
301                // JoinHandle still alive — wake it. It will see ABORTED and panic.
302                // (In practice, abort() consumes the handle, so has_join is false.
303                // This branch exists for defensive correctness.)
304                let waker = unsafe { task::take_join_waker(ptr) };
305                if let Some(w) = waker {
306                    w.wake();
307                }
308            }
309
310            // Release executor's reference.
311            let should_free = unsafe { task::ref_dec(ptr) };
312            if should_free {
313                let key = unsafe { task::tracker_key(ptr) } as usize;
314                // SAFETY: future already dropped above, refcount 0.
315                unsafe { task::free_task(ptr) };
316                self.all_tasks.remove(key);
317            }
318        } else if unsafe { task::has_join(ptr) } {
319            // Joinable: poll_join dropped F and wrote T. drop_fn = drop_output::<T>.
320            // Don't drop T — JoinHandle will read it (ptr::read) or drop it (on handle drop).
321            unsafe { task::set_completed(ptr) };
322            self.live_count -= 1;
323
324            // Wake the joiner so it can poll the JoinHandle and read T.
325            let waker = unsafe { task::take_join_waker(ptr) };
326            if let Some(w) = waker {
327                w.wake();
328            }
329
330            // Release executor's reference. JoinHandle still holds one (refcount >= 1).
331            let should_free = unsafe { task::ref_dec(ptr) };
332            if should_free {
333                // Refcount hit 0 — JoinHandle was already dropped (detached).
334                // HAS_JOIN was cleared by JoinHandle::Drop, but we checked it before
335                // the flag was cleared (this branch). Output T was never read.
336                // SAFETY: drop_fn = drop_output::<T>, T is live.
337                unsafe { task::drop_task_future(ptr) };
338                let key = unsafe { task::tracker_key(ptr) } as usize;
339                unsafe { task::free_task(ptr) };
340                self.all_tasks.remove(key);
341            }
342        } else {
343            // Fire-and-forget or detached (HAS_JOIN cleared by JoinHandle::Drop).
344            // SAFETY: poll_join returned Ready(()), so the F→T transition completed.
345            // drop_fn = drop_output::<T>. This drops T, which is the correct live value.
346            unsafe { task::drop_task_future(ptr) };
347            unsafe { task::set_completed(ptr) };
348            self.live_count -= 1;
349
350            let should_free = unsafe { task::ref_dec(ptr) };
351            if should_free {
352                let key = unsafe { task::tracker_key(ptr) } as usize;
353                unsafe { task::free_task(ptr) };
354                self.all_tasks.remove(key);
355            }
356        }
357    }
358
359    /// Returns mutable references for TLS setup.
360    pub(crate) fn poll_context_mut(&mut self) -> (&mut Vec<*mut u8>, &mut Vec<*mut u8>) {
361        (&mut self.incoming, &mut self.deferred_free)
362    }
363
364    /// Run until all tasks complete. Only drives task polling — does NOT
365    /// drive IO, timers, or cross-thread wakes. For test/internal use only.
366    ///
367    /// Tasks awaiting IO or timers will hang. Use `Runtime::block_on` for
368    /// full runtime driving.
369    #[allow(dead_code)]
370    pub(crate) fn drain(&mut self) {
371        while self.task_count() > 0 {
372            if self.has_ready() {
373                self.poll();
374            } else {
375                std::thread::yield_now();
376            }
377        }
378    }
379
380    /// Cancel a task by ID.
381    #[allow(dead_code)]
382    pub(crate) fn cancel(&mut self, id: task::TaskId) {
383        let ptr = id.0;
384        // Skip if already completed (e.g. double-cancel or cancel after poll).
385        if unsafe { task::is_completed(ptr) } {
386            return;
387        }
388        self.incoming.retain(|p| *p != ptr);
389        self.draining.retain(|p| *p != ptr);
390        self.complete_task(ptr);
391    }
392}
393
394impl Drop for Executor {
395    fn drop(&mut self) {
396        // Free deferred slots first (completed tasks whose last ref dropped).
397        for ptr in self.deferred_free.drain(..) {
398            let key = unsafe { task::tracker_key(ptr) } as usize;
399            unsafe { task::free_task(ptr) };
400            if self.all_tasks.contains(key) {
401                self.all_tasks.remove(key);
402            }
403        }
404
405        for (_, &ptr) in &self.all_tasks {
406            // Drop the future if not already dropped.
407            if !unsafe { task::is_completed(ptr) } {
408                unsafe { task::drop_task_future(ptr) };
409                unsafe { task::set_completed(ptr) };
410                unsafe { task::ref_dec(ptr) };
411            }
412
413            let rc = unsafe { task::ref_count(ptr) };
414            if rc > 0 {
415                // Outstanding references (wakers or JoinHandles) still alive.
416                // Freeing would create dangling pointers — leak instead.
417                // This is a bug in the caller, but leak > UB.
418                #[cfg(debug_assertions)]
419                panic!(
420                    "executor dropped with {rc} outstanding reference(s) — \
421                     all wakers and JoinHandles must be dropped before the Runtime"
422                );
423                #[cfg(not(debug_assertions))]
424                eprintln!(
425                    "nexus-async-rt: executor dropped with {rc} outstanding task \
426                     reference(s) — leaking to avoid UB"
427                );
428                #[allow(unreachable_code)]
429                {
430                    continue;
431                }
432            }
433
434            unsafe { task::free_task(ptr) };
435        }
436    }
437}
438
439// =============================================================================
440// Tests
441// =============================================================================
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use std::hint::black_box;
447    use std::pin::Pin;
448    use task::Task;
449
450    fn test_executor() -> Executor {
451        Executor::new(16)
452    }
453
454    // =========================================================================
455    // Basic spawn + poll
456    // =========================================================================
457
458    #[test]
459    fn spawn_and_poll_single_task() {
460        let mut exec = test_executor();
461        let mut done = false;
462        let flag = &raw mut done;
463
464        exec.spawn_boxed(async move {
465            // SAFETY: single-threaded, flag lives on stack.
466            unsafe { *flag = true };
467        });
468
469        assert_eq!(exec.task_count(), 1);
470        let completed = exec.poll();
471        assert_eq!(completed, 1);
472        assert!(done);
473        assert_eq!(exec.task_count(), 0);
474    }
475
476    #[test]
477    fn spawn_multiple_tasks() {
478        let mut exec = test_executor();
479
480        for _ in 0..8 {
481            exec.spawn_boxed(async {});
482        }
483
484        assert_eq!(exec.task_count(), 8);
485        let completed = exec.poll();
486        assert_eq!(completed, 8);
487        assert_eq!(exec.task_count(), 0);
488    }
489
490    // =========================================================================
491    // Pending tasks
492    // =========================================================================
493
494    #[test]
495    fn pending_task_not_completed() {
496        let mut exec = test_executor();
497
498        // A future that is always pending.
499        exec.spawn_boxed(std::future::pending::<()>());
500
501        let completed = exec.poll();
502        assert_eq!(completed, 0);
503        assert_eq!(exec.task_count(), 1);
504    }
505
506    // =========================================================================
507    // Waker: re-queue via wake_by_ref
508    // =========================================================================
509
510    #[test]
511    fn immediate_task_completes() {
512        let mut exec = test_executor();
513
514        exec.spawn_boxed(async {
515            // Immediately ready.
516        });
517
518        let completed = exec.poll();
519        assert_eq!(completed, 1);
520        assert_eq!(exec.task_count(), 0);
521    }
522
523    // =========================================================================
524    // Self-waking task
525    // =========================================================================
526
527    #[test]
528    fn self_waking_task_polled_again() {
529        use std::cell::Cell;
530        use std::rc::Rc;
531
532        let mut exec = test_executor();
533
534        let counter = Rc::new(Cell::new(0u32));
535        let c = counter.clone();
536
537        exec.spawn_boxed(async move {
538            struct SelfWake {
539                counter: Rc<Cell<u32>>,
540            }
541            impl Future for SelfWake {
542                type Output = ();
543                fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
544                    let n = self.counter.get();
545                    self.counter.set(n + 1);
546                    if n < 3 {
547                        cx.waker().wake_by_ref();
548                        Poll::Pending
549                    } else {
550                        Poll::Ready(())
551                    }
552                }
553            }
554            SelfWake { counter: c }.await;
555        });
556
557        // Drain all polls.
558        let mut total = 0;
559        for _ in 0..10 {
560            total += exec.poll();
561            if exec.task_count() == 0 {
562                break;
563            }
564        }
565        assert_eq!(total, 1); // completed once
566        assert_eq!(counter.get(), 4); // polled 4 times
567    }
568
569    // =========================================================================
570    // Cancel
571    // =========================================================================
572
573    #[test]
574    fn abort_task() {
575        let mut exec = test_executor();
576        let handle = exec.spawn_boxed(std::future::pending::<()>());
577
578        assert_eq!(exec.task_count(), 1);
579        assert!(handle.abort()); // was running, handle consumed
580        exec.poll(); // abort takes effect on next poll
581        assert_eq!(exec.task_count(), 0);
582    }
583
584    #[test]
585    fn abort_frees_slot_for_reuse() {
586        let mut exec = test_executor();
587        let handle = exec.spawn_boxed(std::future::pending::<()>());
588        handle.abort(); // consumes handle
589
590        exec.poll(); // process abort + deferred free
591
592        // Should be able to spawn again.
593        exec.spawn_boxed(async {});
594        assert_eq!(exec.task_count(), 1);
595        exec.poll();
596        assert_eq!(exec.task_count(), 0);
597    }
598
599    // =========================================================================
600    // Poll limit (tasks_per_cycle)
601    // =========================================================================
602
603    #[test]
604    fn poll_limit_respected() {
605        let mut exec = test_executor();
606        exec.set_tasks_per_cycle(2);
607
608        for _ in 0..5 {
609            exec.spawn_boxed(async {});
610        }
611
612        // Only 2 polled per cycle.
613        let completed = exec.poll();
614        assert_eq!(completed, 2);
615        assert_eq!(exec.task_count(), 3);
616
617        let completed = exec.poll();
618        assert_eq!(completed, 2);
619        assert_eq!(exec.task_count(), 1);
620
621        let completed = exec.poll();
622        assert_eq!(completed, 1);
623        assert_eq!(exec.task_count(), 0);
624    }
625
626    // =========================================================================
627    // Stale ready entries after cancel
628    // =========================================================================
629
630    #[test]
631    fn cancel_with_stale_ready_entry() {
632        use std::cell::Cell;
633        use std::rc::Rc;
634
635        let mut exec = test_executor();
636
637        let polled = Rc::new(Cell::new(false));
638        let p = polled.clone();
639
640        // Spawn a self-waking task.
641        struct WakeOnce(bool);
642        impl Future for WakeOnce {
643            type Output = ();
644            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
645                if !self.0 {
646                    self.0 = true;
647                    cx.waker().wake_by_ref();
648                    Poll::Pending
649                } else {
650                    Poll::Ready(())
651                }
652            }
653        }
654
655        let handle = exec.spawn_boxed(WakeOnce(false));
656
657        // First poll: sets is_queued again via wake_by_ref.
658        exec.poll();
659
660        // Abort while the task is in the ready queue (consumes handle).
661        handle.abort();
662
663        // Spawn a new task to prove we don't crash on the stale pointer.
664        exec.spawn_boxed(async move {
665            p.set(true);
666        });
667
668        exec.poll(); // processes abort + new task
669        assert!(polled.get());
670    }
671
672    // =========================================================================
673    // Refcount behavior
674    // =========================================================================
675
676    #[test]
677    fn refcount_starts_at_one() {
678        let task = Box::new(Task::new_boxed(async {}, 0));
679        let ptr = Box::into_raw(task) as *mut u8;
680        assert_eq!(unsafe { task::ref_count(ptr) }, 1);
681        unsafe { task::free_task(ptr) };
682    }
683
684    #[test]
685    fn executor_drop_cleans_up_queued_tasks() {
686        let mut exec = test_executor();
687        exec.spawn_boxed(std::future::pending::<()>());
688        exec.spawn_boxed(std::future::pending::<()>());
689        exec.poll(); // poll them once
690        // Drop executor — should free all tasks without panic.
691        drop(exec);
692    }
693
694    // =========================================================================
695    // Dispatch latency (rough, not controlled)
696    // =========================================================================
697
698    #[test]
699    #[ignore]
700    fn dispatch_latency() {
701        use std::time::Instant;
702
703        struct Noop;
704        impl Future for Noop {
705            type Output = ();
706            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
707                cx.waker().wake_by_ref();
708                Poll::Pending
709            }
710        }
711
712        let mut exec = test_executor();
713        exec.spawn_boxed(Noop);
714
715        // Warmup.
716        for _ in 0..10_000 {
717            exec.poll();
718        }
719
720        let iters = 100_000;
721        let start = Instant::now();
722        for _ in 0..iters {
723            exec.poll();
724        }
725        let elapsed = start.elapsed();
726        let ns_per = elapsed.as_nanos() / iters;
727        println!("dispatch: {ns_per} ns/poll (Box-allocated)");
728        black_box(ns_per);
729    }
730}