nexus_async_rt/lib.rs
1//! Single-threaded async runtime.
2//!
3//! Two spawn strategies:
4//! - **`spawn_boxed()`** — Box-allocated. Default. No setup needed.
5//! - **`spawn_slab()`** — Slab-allocated. Pre-allocated, zero-alloc
6//! hot path. Requires slab configured via [`RuntimeBuilder::slab_unbounded`] or [`RuntimeBuilder::slab_bounded`].
7//!
8//! ```ignore
9//! use nexus_async_rt::*;
10//! use nexus_slab::byte::unbounded::Slab;
11//! use nexus_rt::WorldBuilder;
12//!
13//! let mut world = WorldBuilder::new().build();
14//!
15//! // Simple — Box-allocated tasks, no slab setup
16//! let mut rt = Runtime::new(&mut world);
17//! rt.block_on(async {
18//! spawn_boxed(async { /* Box-allocated */ });
19//! });
20//!
21//! // Power user — with slab for hot-path tasks
22//! // SAFETY: single-threaded runtime.
23//! let slab = unsafe { Slab::<256>::with_chunk_capacity(64) };
24//! let mut rt = Runtime::builder(&mut world)
25//! .slab_unbounded(slab)
26//! .build();
27//! rt.block_on(async {
28//! spawn_boxed(async { /* Box-allocated, long-lived */ });
29//! spawn_slab(async { /* slab-allocated, hot path */ });
30//! });
31//! ```
32
33// Single-threaded runtime — futures are intentionally !Send.
34#![allow(clippy::future_not_send)]
35#![cfg(unix)]
36
37mod alloc;
38mod backoff;
39mod cancel;
40pub mod channel;
41mod context;
42pub(crate) mod cross_wake;
43mod io;
44pub mod net;
45mod runtime;
46mod shutdown;
47mod task;
48mod timer;
49#[cfg(feature = "tokio-compat")]
50pub mod tokio_compat;
51#[cfg(feature = "tokio-compat")]
52pub use tokio_compat::{TokioJoinError, TokioJoinHandle, spawn_on_tokio};
53mod waker;
54mod world_ctx;
55
56// Re-export slab type for convenience — users create the slab and hand it to the builder.
57pub use alloc::SlabClaim;
58pub use backoff::{Backoff, BackoffBuilder, Exhausted};
59pub use cancel::{CancellationToken, DropGuard};
60pub use context::{
61 after, after_delay, event_time, interval, interval_at, io, shutdown_signal, sleep, sleep_until,
62 timeout, timeout_at, with_world, with_world_ref, yield_now,
63};
64pub use io::IoHandle;
65pub use net::{
66 AsyncRead, AsyncWrite, OwnedReadHalf, OwnedWriteHalf, ReadHalf, TcpListener, TcpSocket,
67 TcpStream, UdpSocket, WriteHalf,
68};
69pub use nexus_slab::byte::unbounded::Slab as ByteSlab;
70pub use runtime::{Runtime, RuntimeBuilder, claim_slab, spawn_boxed, spawn_slab, try_claim_slab};
71pub use shutdown::{ShutdownHandle, ShutdownSignal};
72pub use task::{JoinHandle, TASK_HEADER_SIZE};
73pub use timer::{Elapsed, Interval, MissedTickBehavior, Sleep, Timeout, TimerHandle, YieldNow};
74pub use world_ctx::WorldCtx;
75
76use std::future::Future;
77use std::task::{Context, Poll};
78
79use waker::set_poll_context;
80
81/// Recommended minimum slab slot size.
82///
83/// The actual minimum depends on the task: header (64 bytes) + `max(size_of::<F>(),
84/// size_of::<T>())`. ZST futures need only 64 bytes. 128 is a conservative default
85/// that covers most small futures.
86pub const MIN_SLOT_SIZE: usize = 128;
87
88// =============================================================================
89// Executor
90// =============================================================================
91
92/// Single-threaded async executor.
93///
94/// Manages task lifecycle: spawn, poll, complete, free. Tasks are
95/// allocated via Box (default) or slab (via `spawn_slab`). Each
96/// task's header contains a `free_fn` that knows how to deallocate
97/// its own storage — the executor doesn't know or care which
98/// allocator was used.
99pub struct Executor {
100 /// Incoming ready tasks. Wakers and spawn push here.
101 /// Swapped with `draining` at the start of each poll cycle.
102 incoming: Vec<*mut u8>,
103
104 /// Tasks being drained this cycle. Iterated linearly.
105 draining: Vec<*mut u8>,
106
107 /// All live task pointers. Slab-indexed for O(1) removal.
108 all_tasks: slab::Slab<*mut u8>,
109
110 /// Number of live tasks.
111 live_count: usize,
112
113 /// Maximum tasks to poll per cycle before yielding to IO.
114 tasks_per_cycle: usize,
115
116 /// Completed task slots awaiting deferred free.
117 deferred_free: Vec<*mut u8>,
118}
119
120/// Default poll limit.
121const DEFAULT_TASKS_PER_CYCLE: usize = 64;
122
123impl Executor {
124 /// Create an executor.
125 pub fn new(initial_capacity: usize) -> Self {
126 Self {
127 incoming: Vec::with_capacity(initial_capacity),
128 draining: Vec::with_capacity(initial_capacity),
129 all_tasks: slab::Slab::with_capacity(initial_capacity),
130 live_count: 0,
131 tasks_per_cycle: DEFAULT_TASKS_PER_CYCLE,
132 deferred_free: Vec::new(),
133 }
134 }
135
136 /// Reserve a tracker key for external allocation (slab spawn).
137 pub(crate) fn next_tracker_key(&self) -> u32 {
138 let key = self.all_tasks.vacant_key();
139 debug_assert!(
140 u32::try_from(key).is_ok(),
141 "more than 4 billion concurrent tasks — tracker_key overflow"
142 );
143 key as u32
144 }
145
146 /// Spawn an async task via Box allocation. Returns a [`JoinHandle`]
147 /// that can be awaited for the task's output.
148 pub fn spawn_boxed<F>(&mut self, future: F) -> task::JoinHandle<F::Output>
149 where
150 F: Future + 'static,
151 F::Output: 'static,
152 {
153 let tracker_key = self.all_tasks.vacant_key();
154 debug_assert!(
155 u32::try_from(tracker_key).is_ok(),
156 "more than 4 billion concurrent tasks — tracker_key overflow"
157 );
158 let ptr = task::box_spawn_joinable(future, tracker_key as u32);
159
160 self.enqueue(ptr);
161 task::JoinHandle::new(ptr)
162 }
163
164 /// Spawn a task with a pre-allocated pointer (from slab).
165 ///
166 /// The task at `ptr` must have been constructed with joinable or
167 /// fire-and-forget constructors and a valid `free_fn`.
168 pub(crate) fn spawn_raw(&mut self, ptr: *mut u8) {
169 self.enqueue(ptr);
170 }
171
172 /// Common enqueue logic for spawn and spawn_raw.
173 fn enqueue(&mut self, ptr: *mut u8) {
174 self.all_tasks.insert(ptr);
175 unsafe { task::set_queued(ptr, true) };
176 self.incoming.push(ptr);
177 self.live_count += 1;
178 }
179
180 /// Drain the cross-thread wake inbox into the local ready queue.
181 ///
182 /// Called at the start of each poll cycle. Tasks pushed from other
183 /// threads via `CrossWakeQueue::push` are moved into `incoming`.
184 /// Completed tasks are routed to `deferred_free` instead — they
185 /// were pushed for cleanup (not re-polling) by `cross_task_drop`.
186 /// Drains at most `limit` tasks (remaining are picked up next cycle).
187 pub(crate) fn drain_cross_thread(
188 &mut self,
189 inbox: &crate::cross_wake::CrossWakeQueue,
190 limit: usize,
191 ) {
192 let mut drained = 0;
193 while drained < limit {
194 match inbox.pop() {
195 Some(task_ptr) => {
196 if unsafe { task::is_completed(task_ptr) } {
197 self.deferred_free.push(task_ptr);
198 } else {
199 self.incoming.push(task_ptr);
200 }
201 drained += 1;
202 }
203 None => break,
204 }
205 }
206 }
207
208 /// Poll all ready tasks once.
209 pub fn poll(&mut self) -> usize {
210 let mut completed = 0;
211
212 // Drain deferred frees from last cycle.
213 for ptr in self.deferred_free.drain(..) {
214 let key = unsafe { task::tracker_key(ptr) } as usize;
215 // SAFETY: free_fn was set at spawn time.
216 unsafe { task::free_task(ptr) };
217 if self.all_tasks.contains(key) {
218 self.all_tasks.remove(key);
219 }
220 }
221
222 std::mem::swap(&mut self.incoming, &mut self.draining);
223
224 let _guard = set_poll_context(&mut self.incoming, &mut self.deferred_free);
225
226 let limit = self.tasks_per_cycle.min(self.draining.len());
227 let draining_ptr: *const Vec<*mut u8> = &raw const self.draining;
228 let drain_slice = unsafe { &(&*draining_ptr)[..limit] };
229
230 for &ptr in drain_slice {
231 if unsafe { task::is_completed(ptr) } {
232 continue;
233 }
234
235 unsafe { task::set_queued(ptr, false) };
236
237 // SAFETY: ptr is a live task, ref_count >= 1 (executor holds a ref).
238 // task_waker increments ref_count; drop after poll decrements it.
239 let waker = unsafe { crate::waker::task_waker(ptr) };
240 let mut cx = Context::from_waker(&waker);
241
242 let poll_result = unsafe { task::poll_task(ptr, &mut cx) };
243
244 drop(waker);
245
246 match poll_result {
247 Poll::Pending => {}
248 Poll::Ready(()) => {
249 self.complete_task(ptr);
250 completed += 1;
251 }
252 }
253 }
254
255 if limit < self.draining.len() {
256 self.incoming.extend_from_slice(&self.draining[limit..]);
257 }
258 self.draining.clear();
259
260 completed
261 }
262
263 /// Number of live tasks.
264 pub fn task_count(&self) -> usize {
265 self.live_count
266 }
267
268 /// Returns `true` if any tasks are queued for polling.
269 pub fn has_ready(&self) -> bool {
270 !self.incoming.is_empty()
271 }
272
273 /// Set the maximum tasks to poll per cycle.
274 pub fn set_tasks_per_cycle(&mut self, limit: usize) {
275 self.tasks_per_cycle = limit;
276 }
277
278 /// Complete a task: handle joinable vs fire-and-forget paths.
279 ///
280 /// Three branches based on task state:
281 /// - **Aborted:** drop F (still live — poll_join short-circuited), notify joiner
282 /// - **Joinable (HAS_JOIN):** T is live in the union, don't touch it — JoinHandle owns it
283 /// - **Fire-and-forget / detached:** drop the value (F or T) and free
284 ///
285 /// # Safety invariants
286 ///
287 /// `ptr` must point to a task that just returned `Poll::Ready(())` from poll_task.
288 /// All accessor calls are safe because the task is live and single-threaded.
289 fn complete_task(&mut self, ptr: *mut u8) {
290 let aborted = unsafe { task::is_aborted(ptr) };
291
292 if aborted {
293 // Aborted: poll_join saw ABORTED and returned Ready without polling F.
294 // F is still live in the union. drop_fn still targets F.
295 // SAFETY: drop_fn = drop_future_in_union::<F>, F is live.
296 unsafe { task::drop_task_future(ptr) };
297 unsafe { task::set_completed(ptr) };
298 self.live_count -= 1;
299
300 if unsafe { task::has_join(ptr) } {
301 // JoinHandle still alive — wake it. It will see ABORTED and panic.
302 // (In practice, abort() consumes the handle, so has_join is false.
303 // This branch exists for defensive correctness.)
304 let waker = unsafe { task::take_join_waker(ptr) };
305 if let Some(w) = waker {
306 w.wake();
307 }
308 }
309
310 // Release executor's reference.
311 let should_free = unsafe { task::ref_dec(ptr) };
312 if should_free {
313 let key = unsafe { task::tracker_key(ptr) } as usize;
314 // SAFETY: future already dropped above, refcount 0.
315 unsafe { task::free_task(ptr) };
316 self.all_tasks.remove(key);
317 }
318 } else if unsafe { task::has_join(ptr) } {
319 // Joinable: poll_join dropped F and wrote T. drop_fn = drop_output::<T>.
320 // Don't drop T — JoinHandle will read it (ptr::read) or drop it (on handle drop).
321 unsafe { task::set_completed(ptr) };
322 self.live_count -= 1;
323
324 // Wake the joiner so it can poll the JoinHandle and read T.
325 let waker = unsafe { task::take_join_waker(ptr) };
326 if let Some(w) = waker {
327 w.wake();
328 }
329
330 // Release executor's reference. JoinHandle still holds one (refcount >= 1).
331 let should_free = unsafe { task::ref_dec(ptr) };
332 if should_free {
333 // Refcount hit 0 — JoinHandle was already dropped (detached).
334 // HAS_JOIN was cleared by JoinHandle::Drop, but we checked it before
335 // the flag was cleared (this branch). Output T was never read.
336 // SAFETY: drop_fn = drop_output::<T>, T is live.
337 unsafe { task::drop_task_future(ptr) };
338 let key = unsafe { task::tracker_key(ptr) } as usize;
339 unsafe { task::free_task(ptr) };
340 self.all_tasks.remove(key);
341 }
342 } else {
343 // Fire-and-forget or detached (HAS_JOIN cleared by JoinHandle::Drop).
344 // SAFETY: poll_join returned Ready(()), so the F→T transition completed.
345 // drop_fn = drop_output::<T>. This drops T, which is the correct live value.
346 unsafe { task::drop_task_future(ptr) };
347 unsafe { task::set_completed(ptr) };
348 self.live_count -= 1;
349
350 let should_free = unsafe { task::ref_dec(ptr) };
351 if should_free {
352 let key = unsafe { task::tracker_key(ptr) } as usize;
353 unsafe { task::free_task(ptr) };
354 self.all_tasks.remove(key);
355 }
356 }
357 }
358
359 /// Returns mutable references for TLS setup.
360 pub(crate) fn poll_context_mut(&mut self) -> (&mut Vec<*mut u8>, &mut Vec<*mut u8>) {
361 (&mut self.incoming, &mut self.deferred_free)
362 }
363
364 /// Run until all tasks complete. Only drives task polling — does NOT
365 /// drive IO, timers, or cross-thread wakes. For test/internal use only.
366 ///
367 /// Tasks awaiting IO or timers will hang. Use `Runtime::block_on` for
368 /// full runtime driving.
369 #[allow(dead_code)]
370 pub(crate) fn drain(&mut self) {
371 while self.task_count() > 0 {
372 if self.has_ready() {
373 self.poll();
374 } else {
375 std::thread::yield_now();
376 }
377 }
378 }
379
380 /// Cancel a task by ID.
381 #[allow(dead_code)]
382 pub(crate) fn cancel(&mut self, id: task::TaskId) {
383 let ptr = id.0;
384 // Skip if already completed (e.g. double-cancel or cancel after poll).
385 if unsafe { task::is_completed(ptr) } {
386 return;
387 }
388 self.incoming.retain(|p| *p != ptr);
389 self.draining.retain(|p| *p != ptr);
390 self.complete_task(ptr);
391 }
392}
393
394impl Drop for Executor {
395 fn drop(&mut self) {
396 // Free deferred slots first (completed tasks whose last ref dropped).
397 for ptr in self.deferred_free.drain(..) {
398 let key = unsafe { task::tracker_key(ptr) } as usize;
399 unsafe { task::free_task(ptr) };
400 if self.all_tasks.contains(key) {
401 self.all_tasks.remove(key);
402 }
403 }
404
405 for (_, &ptr) in &self.all_tasks {
406 // Drop the future if not already dropped.
407 if !unsafe { task::is_completed(ptr) } {
408 unsafe { task::drop_task_future(ptr) };
409 unsafe { task::set_completed(ptr) };
410 unsafe { task::ref_dec(ptr) };
411 }
412
413 let rc = unsafe { task::ref_count(ptr) };
414 if rc > 0 {
415 // Outstanding references (wakers or JoinHandles) still alive.
416 // Freeing would create dangling pointers — leak instead.
417 // This is a bug in the caller, but leak > UB.
418 #[cfg(debug_assertions)]
419 panic!(
420 "executor dropped with {rc} outstanding reference(s) — \
421 all wakers and JoinHandles must be dropped before the Runtime"
422 );
423 #[cfg(not(debug_assertions))]
424 eprintln!(
425 "nexus-async-rt: executor dropped with {rc} outstanding task \
426 reference(s) — leaking to avoid UB"
427 );
428 #[allow(unreachable_code)]
429 {
430 continue;
431 }
432 }
433
434 unsafe { task::free_task(ptr) };
435 }
436 }
437}
438
439// =============================================================================
440// Tests
441// =============================================================================
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use std::hint::black_box;
447 use std::pin::Pin;
448 use task::Task;
449
450 fn test_executor() -> Executor {
451 Executor::new(16)
452 }
453
454 // =========================================================================
455 // Basic spawn + poll
456 // =========================================================================
457
458 #[test]
459 fn spawn_and_poll_single_task() {
460 let mut exec = test_executor();
461 let mut done = false;
462 let flag = &raw mut done;
463
464 exec.spawn_boxed(async move {
465 // SAFETY: single-threaded, flag lives on stack.
466 unsafe { *flag = true };
467 });
468
469 assert_eq!(exec.task_count(), 1);
470 let completed = exec.poll();
471 assert_eq!(completed, 1);
472 assert!(done);
473 assert_eq!(exec.task_count(), 0);
474 }
475
476 #[test]
477 fn spawn_multiple_tasks() {
478 let mut exec = test_executor();
479
480 for _ in 0..8 {
481 exec.spawn_boxed(async {});
482 }
483
484 assert_eq!(exec.task_count(), 8);
485 let completed = exec.poll();
486 assert_eq!(completed, 8);
487 assert_eq!(exec.task_count(), 0);
488 }
489
490 // =========================================================================
491 // Pending tasks
492 // =========================================================================
493
494 #[test]
495 fn pending_task_not_completed() {
496 let mut exec = test_executor();
497
498 // A future that is always pending.
499 exec.spawn_boxed(std::future::pending::<()>());
500
501 let completed = exec.poll();
502 assert_eq!(completed, 0);
503 assert_eq!(exec.task_count(), 1);
504 }
505
506 // =========================================================================
507 // Waker: re-queue via wake_by_ref
508 // =========================================================================
509
510 #[test]
511 fn immediate_task_completes() {
512 let mut exec = test_executor();
513
514 exec.spawn_boxed(async {
515 // Immediately ready.
516 });
517
518 let completed = exec.poll();
519 assert_eq!(completed, 1);
520 assert_eq!(exec.task_count(), 0);
521 }
522
523 // =========================================================================
524 // Self-waking task
525 // =========================================================================
526
527 #[test]
528 fn self_waking_task_polled_again() {
529 use std::cell::Cell;
530 use std::rc::Rc;
531
532 let mut exec = test_executor();
533
534 let counter = Rc::new(Cell::new(0u32));
535 let c = counter.clone();
536
537 exec.spawn_boxed(async move {
538 struct SelfWake {
539 counter: Rc<Cell<u32>>,
540 }
541 impl Future for SelfWake {
542 type Output = ();
543 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
544 let n = self.counter.get();
545 self.counter.set(n + 1);
546 if n < 3 {
547 cx.waker().wake_by_ref();
548 Poll::Pending
549 } else {
550 Poll::Ready(())
551 }
552 }
553 }
554 SelfWake { counter: c }.await;
555 });
556
557 // Drain all polls.
558 let mut total = 0;
559 for _ in 0..10 {
560 total += exec.poll();
561 if exec.task_count() == 0 {
562 break;
563 }
564 }
565 assert_eq!(total, 1); // completed once
566 assert_eq!(counter.get(), 4); // polled 4 times
567 }
568
569 // =========================================================================
570 // Cancel
571 // =========================================================================
572
573 #[test]
574 fn abort_task() {
575 let mut exec = test_executor();
576 let handle = exec.spawn_boxed(std::future::pending::<()>());
577
578 assert_eq!(exec.task_count(), 1);
579 assert!(handle.abort()); // was running, handle consumed
580 exec.poll(); // abort takes effect on next poll
581 assert_eq!(exec.task_count(), 0);
582 }
583
584 #[test]
585 fn abort_frees_slot_for_reuse() {
586 let mut exec = test_executor();
587 let handle = exec.spawn_boxed(std::future::pending::<()>());
588 handle.abort(); // consumes handle
589
590 exec.poll(); // process abort + deferred free
591
592 // Should be able to spawn again.
593 exec.spawn_boxed(async {});
594 assert_eq!(exec.task_count(), 1);
595 exec.poll();
596 assert_eq!(exec.task_count(), 0);
597 }
598
599 // =========================================================================
600 // Poll limit (tasks_per_cycle)
601 // =========================================================================
602
603 #[test]
604 fn poll_limit_respected() {
605 let mut exec = test_executor();
606 exec.set_tasks_per_cycle(2);
607
608 for _ in 0..5 {
609 exec.spawn_boxed(async {});
610 }
611
612 // Only 2 polled per cycle.
613 let completed = exec.poll();
614 assert_eq!(completed, 2);
615 assert_eq!(exec.task_count(), 3);
616
617 let completed = exec.poll();
618 assert_eq!(completed, 2);
619 assert_eq!(exec.task_count(), 1);
620
621 let completed = exec.poll();
622 assert_eq!(completed, 1);
623 assert_eq!(exec.task_count(), 0);
624 }
625
626 // =========================================================================
627 // Stale ready entries after cancel
628 // =========================================================================
629
630 #[test]
631 fn cancel_with_stale_ready_entry() {
632 use std::cell::Cell;
633 use std::rc::Rc;
634
635 let mut exec = test_executor();
636
637 let polled = Rc::new(Cell::new(false));
638 let p = polled.clone();
639
640 // Spawn a self-waking task.
641 struct WakeOnce(bool);
642 impl Future for WakeOnce {
643 type Output = ();
644 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
645 if !self.0 {
646 self.0 = true;
647 cx.waker().wake_by_ref();
648 Poll::Pending
649 } else {
650 Poll::Ready(())
651 }
652 }
653 }
654
655 let handle = exec.spawn_boxed(WakeOnce(false));
656
657 // First poll: sets is_queued again via wake_by_ref.
658 exec.poll();
659
660 // Abort while the task is in the ready queue (consumes handle).
661 handle.abort();
662
663 // Spawn a new task to prove we don't crash on the stale pointer.
664 exec.spawn_boxed(async move {
665 p.set(true);
666 });
667
668 exec.poll(); // processes abort + new task
669 assert!(polled.get());
670 }
671
672 // =========================================================================
673 // Refcount behavior
674 // =========================================================================
675
676 #[test]
677 fn refcount_starts_at_one() {
678 let task = Box::new(Task::new_boxed(async {}, 0));
679 let ptr = Box::into_raw(task) as *mut u8;
680 assert_eq!(unsafe { task::ref_count(ptr) }, 1);
681 unsafe { task::free_task(ptr) };
682 }
683
684 #[test]
685 fn executor_drop_cleans_up_queued_tasks() {
686 let mut exec = test_executor();
687 exec.spawn_boxed(std::future::pending::<()>());
688 exec.spawn_boxed(std::future::pending::<()>());
689 exec.poll(); // poll them once
690 // Drop executor — should free all tasks without panic.
691 drop(exec);
692 }
693
694 // =========================================================================
695 // Dispatch latency (rough, not controlled)
696 // =========================================================================
697
698 #[test]
699 #[ignore]
700 fn dispatch_latency() {
701 use std::time::Instant;
702
703 struct Noop;
704 impl Future for Noop {
705 type Output = ();
706 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
707 cx.waker().wake_by_ref();
708 Poll::Pending
709 }
710 }
711
712 let mut exec = test_executor();
713 exec.spawn_boxed(Noop);
714
715 // Warmup.
716 for _ in 0..10_000 {
717 exec.poll();
718 }
719
720 let iters = 100_000;
721 let start = Instant::now();
722 for _ in 0..iters {
723 exec.poll();
724 }
725 let elapsed = start.elapsed();
726 let ns_per = elapsed.as_nanos() / iters;
727 println!("dispatch: {ns_per} ns/poll (Box-allocated)");
728 black_box(ns_per);
729 }
730}