#![allow(clippy::future_not_send)]
#![cfg(unix)]
#![warn(missing_docs)]
mod alloc;
mod backoff;
mod cancel;
pub mod channel;
mod context;
pub(crate) mod cross_wake;
mod io;
pub mod net;
mod runtime;
mod shutdown;
mod task;
mod timer;
#[cfg(feature = "tokio-compat")]
pub mod tokio_compat;
#[cfg(feature = "tokio-compat")]
pub use tokio_compat::{TokioJoinError, TokioJoinHandle, spawn_on_tokio};
mod waker;
mod world_ctx;
pub use alloc::SlabClaim;
pub use backoff::{Backoff, BackoffBuilder, Exhausted};
pub use cancel::{CancellationToken, DropGuard};
pub use context::{
after, after_delay, event_time, interval, interval_at, sleep, sleep_until, timeout, timeout_at,
yield_now,
};
pub use io::IoHandle;
pub use net::{
AsyncRead, AsyncWrite, OwnedReadHalf, OwnedWriteHalf, ReadHalf, TcpListener, TcpSocket,
TcpStream, UdpSocket, WriteHalf,
};
pub use nexus_slab::byte::unbounded::Slab as ByteSlab;
pub use runtime::{
QuiesceTimeout, Runtime, RuntimeBuilder, claim_slab, spawn_boxed, spawn_slab, try_claim_slab,
};
pub use shutdown::{ShutdownHandle, ShutdownSignal};
pub use task::{JoinHandle, TASK_HEADER_SIZE};
pub use timer::{Elapsed, Interval, MissedTickBehavior, Sleep, Timeout, TimerHandle, YieldNow};
pub use world_ctx::WorldCtx;
use std::future::Future;
use std::task::{Context, Poll};
use waker::set_poll_context;
pub const MIN_SLOT_SIZE: usize = 128;
pub struct Executor {
incoming: std::cell::UnsafeCell<Vec<*mut u8>>,
draining: Vec<*mut u8>,
all_tasks: slab::Slab<*mut u8>,
live_count: usize,
tasks_per_cycle: usize,
deferred_free: std::cell::UnsafeCell<Vec<*mut u8>>,
shutdown_stats: std::sync::Arc<ShutdownStatsAtomics>,
cross_wake_for_drop: Option<std::sync::Arc<crate::cross_wake::CrossWakeContext>>,
}
#[derive(Default, Debug)]
pub struct ShutdownStatsAtomics {
aborted_unwinds: std::sync::atomic::AtomicU64,
leaked_box_tasks: std::sync::atomic::AtomicU64,
unbalanced_normal_shutdowns: std::sync::atomic::AtomicU64,
cross_queue_undrained: std::sync::atomic::AtomicU64,
}
impl ShutdownStatsAtomics {
pub fn snapshot(&self) -> ShutdownStats {
use std::sync::atomic::Ordering;
ShutdownStats {
aborted_unwinds: self.aborted_unwinds.load(Ordering::Relaxed),
leaked_box_tasks: self.leaked_box_tasks.load(Ordering::Relaxed),
unbalanced_normal_shutdowns: self.unbalanced_normal_shutdowns.load(Ordering::Relaxed),
cross_queue_undrained: self.cross_queue_undrained.load(Ordering::Relaxed),
}
}
}
#[derive(Default, Debug, Clone, Copy)]
pub struct ShutdownStats {
pub aborted_unwinds: u64,
pub leaked_box_tasks: u64,
pub unbalanced_normal_shutdowns: u64,
pub cross_queue_undrained: u64,
}
const DEFAULT_TASKS_PER_CYCLE: usize = 64;
impl Executor {
pub fn new(initial_capacity: usize) -> Self {
Self {
incoming: std::cell::UnsafeCell::new(Vec::with_capacity(initial_capacity)),
draining: Vec::with_capacity(initial_capacity),
all_tasks: slab::Slab::with_capacity(initial_capacity),
live_count: 0,
tasks_per_cycle: DEFAULT_TASKS_PER_CYCLE,
shutdown_stats: std::sync::Arc::new(ShutdownStatsAtomics::default()),
cross_wake_for_drop: None,
deferred_free: std::cell::UnsafeCell::new(Vec::new()),
}
}
pub(crate) fn next_tracker_key(&self) -> u32 {
let key = self.all_tasks.vacant_key();
debug_assert!(
u32::try_from(key).is_ok(),
"more than 4 billion concurrent tasks — tracker_key overflow"
);
key as u32
}
pub fn spawn_boxed<F>(&mut self, future: F) -> task::JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let tracker_key = self.all_tasks.vacant_key();
debug_assert!(
u32::try_from(tracker_key).is_ok(),
"more than 4 billion concurrent tasks — tracker_key overflow"
);
let cross_wake_ctx = crate::cross_wake::current_runtime_ctx();
let ptr = task::box_spawn_joinable(future, tracker_key as u32, cross_wake_ctx);
self.enqueue(ptr);
task::JoinHandle::new(ptr)
}
pub(crate) fn spawn_raw(&mut self, ptr: *mut u8) {
self.enqueue(ptr);
}
fn enqueue(&mut self, ptr: *mut u8) {
self.all_tasks.insert(ptr);
unsafe { task::set_queued(ptr, true) };
unsafe { &mut *self.incoming.get() }.push(ptr);
self.live_count += 1;
}
pub(crate) fn drain_cross_thread(
&mut self,
inbox: &crate::cross_wake::CrossWakeQueue,
limit: usize,
) -> usize {
let mut drained = 0;
while drained < limit {
match inbox.pop() {
Some(task_ptr) => {
unsafe { task::clear_queued(task_ptr) };
if unsafe { task::is_terminal(task_ptr) } {
unsafe { &mut *self.deferred_free.get() }.push(task_ptr);
} else {
unsafe { &mut *self.incoming.get() }.push(task_ptr);
}
drained += 1;
}
None => break,
}
}
drained
}
pub fn poll(&mut self) -> usize {
let mut completed = 0;
for ptr in unsafe { &mut *self.deferred_free.get() }.drain(..) {
let key = unsafe { task::tracker_key(ptr) } as usize;
unsafe { task::free_task(ptr) };
if self.all_tasks.contains(key) {
self.all_tasks.remove(key);
}
}
std::mem::swap(unsafe { &mut *self.incoming.get() }, &mut self.draining);
let _guard = set_poll_context(self.incoming.get(), self.deferred_free.get());
let limit = self.tasks_per_cycle.min(self.draining.len());
let draining_ptr: *const Vec<*mut u8> = &raw const self.draining;
let drain_slice = unsafe { &(&*draining_ptr)[..limit] };
for &ptr in drain_slice {
if unsafe { task::is_completed(ptr) } {
continue;
}
unsafe { task::set_queued(ptr, false) };
let waker = unsafe { crate::waker::task_waker(ptr) };
let mut cx = Context::from_waker(&waker);
let poll_result = unsafe { task::poll_task(ptr, &mut cx) };
drop(waker);
match poll_result {
Poll::Pending => {}
Poll::Ready(()) => {
self.complete_task(ptr);
completed += 1;
}
}
}
if limit < self.draining.len() {
unsafe { &mut *self.incoming.get() }.extend_from_slice(&self.draining[limit..]);
}
self.draining.clear();
completed
}
pub fn task_count(&self) -> usize {
self.live_count
}
pub(crate) fn outstanding_tasks(&self) -> usize {
self.all_tasks.len()
}
#[cfg(test)]
pub fn deferred_free_count(&self) -> usize {
unsafe { &*self.deferred_free.get() }.len()
}
pub(crate) fn shutdown_stats(&self) -> std::sync::Arc<ShutdownStatsAtomics> {
std::sync::Arc::clone(&self.shutdown_stats)
}
fn record_aborted_unwind(&self) {
self.shutdown_stats
.aborted_unwinds
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
fn record_leaked_box(&self) {
self.shutdown_stats
.leaked_box_tasks
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
fn record_unbalanced_normal(&self) {
self.shutdown_stats
.unbalanced_normal_shutdowns
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
fn record_cross_queue_undrained(&self, count: u64) {
self.shutdown_stats
.cross_queue_undrained
.fetch_add(count, std::sync::atomic::Ordering::Relaxed);
}
pub(crate) fn install_cross_wake_for_drop(
&mut self,
cross_wake: std::sync::Arc<crate::cross_wake::CrossWakeContext>,
) {
self.cross_wake_for_drop = Some(cross_wake);
}
pub fn has_ready(&self) -> bool {
!unsafe { &*self.incoming.get() }.is_empty()
}
pub fn set_tasks_per_cycle(&mut self, limit: usize) {
self.tasks_per_cycle = limit;
}
fn complete_task(&mut self, ptr: *mut u8) {
let aborted = unsafe { task::is_aborted(ptr) };
if aborted {
unsafe { task::drop_task_future(ptr) };
self.live_count -= 1;
if unsafe { task::has_join(ptr) } {
let waker = unsafe { task::take_join_waker(ptr) };
if let Some(w) = waker {
w.wake();
}
}
match unsafe { task::complete_and_unref(ptr) } {
task::FreeAction::Retain => {}
task::FreeAction::FreeBox | task::FreeAction::FreeSlab => {
let key = unsafe { task::tracker_key(ptr) } as usize;
unsafe { task::free_task(ptr) };
self.all_tasks.remove(key);
}
}
} else if unsafe { task::has_join(ptr) } {
self.live_count -= 1;
let waker = unsafe { task::take_join_waker(ptr) };
if let Some(w) = waker {
w.wake();
}
match unsafe { task::complete_and_unref(ptr) } {
task::FreeAction::Retain => {}
task::FreeAction::FreeBox | task::FreeAction::FreeSlab => {
unsafe { task::drop_task_future(ptr) };
let key = unsafe { task::tracker_key(ptr) } as usize;
unsafe { task::free_task(ptr) };
self.all_tasks.remove(key);
}
}
} else {
unsafe { task::drop_task_future(ptr) };
self.live_count -= 1;
match unsafe { task::complete_and_unref(ptr) } {
task::FreeAction::Retain => {}
task::FreeAction::FreeBox | task::FreeAction::FreeSlab => {
let key = unsafe { task::tracker_key(ptr) } as usize;
unsafe { task::free_task(ptr) };
self.all_tasks.remove(key);
}
}
}
}
pub(crate) fn poll_context_ptrs(&self) -> (*mut Vec<*mut u8>, *mut Vec<*mut u8>) {
(self.incoming.get(), self.deferred_free.get())
}
#[allow(dead_code)]
pub(crate) fn cancel(&mut self, id: task::TaskId) {
let ptr = id.0;
if unsafe { task::is_completed(ptr) } {
return;
}
unsafe { &mut *self.incoming.get() }.retain(|p| *p != ptr);
self.draining.retain(|p| *p != ptr);
self.complete_task(ptr);
}
}
impl Drop for Executor {
fn drop(&mut self) {
let undrained = self.cross_wake_for_drop.take().map_or(0u64, |ctx| {
self.drain_cross_thread(&ctx.queue, usize::MAX) as u64
});
if undrained > 0 {
self.record_cross_queue_undrained(undrained);
}
self.drop_drain_deferred_free();
for (_, &ptr) in &self.all_tasks {
if unsafe { task::is_terminal(ptr) } {
unsafe { task::free_task(ptr) };
continue;
}
if !unsafe { task::is_completed(ptr) } && Self::drop_complete_and_maybe_free(ptr) {
continue;
}
let rc = unsafe { task::ref_count(ptr) };
if rc > 0 {
if std::thread::panicking() {
self.drop_outstanding_unwinding(ptr, rc);
} else {
self.drop_outstanding_normal(ptr, rc);
}
continue;
}
unsafe { task::free_task(ptr) };
}
}
}
impl Executor {
fn drop_drain_deferred_free(&mut self) {
for ptr in unsafe { &mut *self.deferred_free.get() }.drain(..) {
let key = unsafe { task::tracker_key(ptr) } as usize;
unsafe { task::free_task(ptr) };
if self.all_tasks.contains(key) {
self.all_tasks.remove(key);
}
}
}
fn drop_complete_and_maybe_free(ptr: *mut u8) -> bool {
unsafe { task::drop_task_future(ptr) };
match unsafe { task::complete_and_unref(ptr) } {
task::FreeAction::Retain => false,
task::FreeAction::FreeBox | task::FreeAction::FreeSlab => {
unsafe { task::free_task(ptr) };
true
}
}
}
fn drop_outstanding_unwinding(&self, ptr: *mut u8, rc: usize) {
if unsafe { task::is_slab_allocated(ptr) } {
self.drop_outstanding_slab_unwinding(ptr);
} else {
self.drop_outstanding_box_unwinding(ptr, rc);
}
}
fn drop_outstanding_slab_unwinding(&self, ptr: *mut u8) {
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(100);
while unsafe { task::ref_count(ptr) } > 0 && std::time::Instant::now() < deadline {
std::thread::yield_now();
}
if unsafe { task::ref_count(ptr) } > 0 {
self.record_aborted_unwind();
eprintln!(
"nexus-async-rt: slab task {ptr:p} has \
outstanding refs after 100ms during unwinding \
— aborting to avoid UAF on slab memory \
release. Cross-thread waker producer thread \
may be deadlocked or starved."
);
std::process::abort();
}
unsafe { task::free_task(ptr) };
}
fn drop_outstanding_box_unwinding(&self, _ptr: *mut u8, rc: usize) {
self.record_leaked_box();
eprintln!(
"nexus-async-rt: executor dropped with {rc} outstanding \
reference(s) during unwinding — suppressing panic to \
avoid abort. Task resources were released via \
drop_task_future; leaking box task allocation + waker \
bookkeeping memory."
);
}
fn drop_outstanding_normal(&self, _ptr: *mut u8, rc: usize) {
self.record_unbalanced_normal();
#[cfg(debug_assertions)]
panic!(
"executor dropped with {rc} outstanding reference(s) — \
all wakers and JoinHandles must be dropped before the Runtime"
);
#[cfg(not(debug_assertions))]
eprintln!(
"nexus-async-rt: executor dropped with {rc} outstanding task \
reference(s) — leaking to avoid UB"
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::hint::black_box;
use std::pin::Pin;
use task::Task;
fn test_executor() -> Executor {
Executor::new(16)
}
#[test]
fn spawn_and_poll_single_task() {
let mut exec = test_executor();
let mut done = false;
let flag = &raw mut done;
exec.spawn_boxed(async move {
unsafe { *flag = true };
});
assert_eq!(exec.task_count(), 1);
let completed = exec.poll();
assert_eq!(completed, 1);
assert!(done);
assert_eq!(exec.task_count(), 0);
}
#[test]
fn spawn_multiple_tasks() {
let mut exec = test_executor();
for _ in 0..8 {
exec.spawn_boxed(async {});
}
assert_eq!(exec.task_count(), 8);
let completed = exec.poll();
assert_eq!(completed, 8);
assert_eq!(exec.task_count(), 0);
}
#[test]
fn pending_task_not_completed() {
let mut exec = test_executor();
exec.spawn_boxed(std::future::pending::<()>());
let completed = exec.poll();
assert_eq!(completed, 0);
assert_eq!(exec.task_count(), 1);
}
#[test]
fn immediate_task_completes() {
let mut exec = test_executor();
exec.spawn_boxed(async {
});
let completed = exec.poll();
assert_eq!(completed, 1);
assert_eq!(exec.task_count(), 0);
}
#[test]
fn self_waking_task_polled_again() {
use std::cell::Cell;
use std::rc::Rc;
let mut exec = test_executor();
let counter = Rc::new(Cell::new(0u32));
let c = counter.clone();
exec.spawn_boxed(async move {
struct SelfWake {
counter: Rc<Cell<u32>>,
}
impl Future for SelfWake {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let n = self.counter.get();
self.counter.set(n + 1);
if n < 3 {
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}
SelfWake { counter: c }.await;
});
let mut total = 0;
for _ in 0..10 {
total += exec.poll();
if exec.task_count() == 0 {
break;
}
}
assert_eq!(total, 1); assert_eq!(counter.get(), 4); }
#[test]
fn abort_task() {
let mut exec = test_executor();
let handle = exec.spawn_boxed(std::future::pending::<()>());
assert_eq!(exec.task_count(), 1);
assert!(handle.abort()); exec.poll(); assert_eq!(exec.task_count(), 0);
}
#[test]
fn abort_frees_slot_for_reuse() {
let mut exec = test_executor();
let handle = exec.spawn_boxed(std::future::pending::<()>());
handle.abort();
exec.poll();
exec.spawn_boxed(async {});
assert_eq!(exec.task_count(), 1);
exec.poll();
assert_eq!(exec.task_count(), 0);
}
#[test]
fn poll_limit_respected() {
let mut exec = test_executor();
exec.set_tasks_per_cycle(2);
for _ in 0..5 {
exec.spawn_boxed(async {});
}
let completed = exec.poll();
assert_eq!(completed, 2);
assert_eq!(exec.task_count(), 3);
let completed = exec.poll();
assert_eq!(completed, 2);
assert_eq!(exec.task_count(), 1);
let completed = exec.poll();
assert_eq!(completed, 1);
assert_eq!(exec.task_count(), 0);
}
#[test]
fn cancel_with_stale_ready_entry() {
use std::cell::Cell;
use std::rc::Rc;
let mut exec = test_executor();
let polled = Rc::new(Cell::new(false));
let p = polled.clone();
struct WakeOnce(bool);
impl Future for WakeOnce {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if !self.0 {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}
let handle = exec.spawn_boxed(WakeOnce(false));
exec.poll();
handle.abort();
exec.spawn_boxed(async move {
p.set(true);
});
exec.poll(); assert!(polled.get());
}
#[test]
fn refcount_starts_at_one() {
let task = Box::new(Task::new_boxed(async {}, 0));
let ptr = Box::into_raw(task) as *mut u8;
assert_eq!(unsafe { task::ref_count(ptr) }, 1);
unsafe { task::free_task(ptr) };
}
#[test]
fn executor_drop_cleans_up_queued_tasks() {
let mut exec = test_executor();
exec.spawn_boxed(std::future::pending::<()>());
exec.spawn_boxed(std::future::pending::<()>());
exec.poll(); drop(exec);
}
#[test]
#[ignore]
fn dispatch_latency() {
use std::time::Instant;
struct Noop;
impl Future for Noop {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
cx.waker().wake_by_ref();
Poll::Pending
}
}
let mut exec = test_executor();
exec.spawn_boxed(Noop);
for _ in 0..10_000 {
exec.poll();
}
let iters = 100_000;
let start = Instant::now();
for _ in 0..iters {
exec.poll();
}
let elapsed = start.elapsed();
let ns_per = elapsed.as_nanos() / iters;
println!("dispatch: {ns_per} ns/poll (Box-allocated)");
black_box(ns_per);
}
}