#![allow(clippy::future_not_send)]
#![cfg(unix)]
mod alloc;
mod backoff;
mod cancel;
pub mod channel;
mod context;
pub(crate) mod cross_wake;
mod io;
pub mod net;
mod runtime;
mod shutdown;
mod task;
mod timer;
#[cfg(feature = "tokio-compat")]
pub mod tokio_compat;
mod waker;
mod world_ctx;
pub use alloc::SlabClaim;
pub use backoff::{Backoff, BackoffBuilder, Exhausted};
pub use cancel::{CancellationToken, DropGuard};
pub use context::{
after, after_delay, event_time, interval, interval_at, io, shutdown_signal, sleep, sleep_until,
timeout, timeout_at, with_world, with_world_ref, yield_now,
};
pub use io::IoHandle;
pub use net::{
AsyncRead, AsyncWrite, OwnedReadHalf, OwnedWriteHalf, ReadHalf, TcpListener, TcpSocket,
TcpStream, UdpSocket, WriteHalf,
};
pub use nexus_slab::byte::unbounded::Slab as ByteSlab;
pub use runtime::{Runtime, RuntimeBuilder, claim_slab, spawn_boxed, spawn_slab, try_claim_slab};
pub use shutdown::{ShutdownHandle, ShutdownSignal};
pub use task::{JoinHandle, TASK_HEADER_SIZE};
pub use timer::{Elapsed, Interval, MissedTickBehavior, Sleep, Timeout, TimerHandle, YieldNow};
pub use world_ctx::WorldCtx;
use std::future::Future;
use std::task::{Context, Poll};
use waker::set_poll_context;
pub const MIN_SLOT_SIZE: usize = 128;
pub struct Executor {
incoming: Vec<*mut u8>,
draining: Vec<*mut u8>,
all_tasks: slab::Slab<*mut u8>,
live_count: usize,
tasks_per_cycle: usize,
deferred_free: Vec<*mut u8>,
}
const DEFAULT_TASKS_PER_CYCLE: usize = 64;
impl Executor {
pub fn new(initial_capacity: usize) -> Self {
Self {
incoming: Vec::with_capacity(initial_capacity),
draining: Vec::with_capacity(initial_capacity),
all_tasks: slab::Slab::with_capacity(initial_capacity),
live_count: 0,
tasks_per_cycle: DEFAULT_TASKS_PER_CYCLE,
deferred_free: Vec::new(),
}
}
pub(crate) fn next_tracker_key(&self) -> u32 {
let key = self.all_tasks.vacant_key();
debug_assert!(
u32::try_from(key).is_ok(),
"more than 4 billion concurrent tasks — tracker_key overflow"
);
key as u32
}
pub fn spawn_boxed<F>(&mut self, future: F) -> task::JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let tracker_key = self.all_tasks.vacant_key();
debug_assert!(
u32::try_from(tracker_key).is_ok(),
"more than 4 billion concurrent tasks — tracker_key overflow"
);
let ptr = task::box_spawn_joinable(future, tracker_key as u32);
self.enqueue(ptr);
task::JoinHandle::new(ptr)
}
pub(crate) fn spawn_raw(&mut self, ptr: *mut u8) {
self.enqueue(ptr);
}
fn enqueue(&mut self, ptr: *mut u8) {
self.all_tasks.insert(ptr);
unsafe { task::set_queued(ptr, true) };
self.incoming.push(ptr);
self.live_count += 1;
}
pub(crate) fn drain_cross_thread(
&mut self,
inbox: &crate::cross_wake::CrossWakeQueue,
limit: usize,
) {
let mut drained = 0;
while drained < limit {
match inbox.pop() {
Some(task_ptr) => {
self.incoming.push(task_ptr);
drained += 1;
}
None => break,
}
}
}
pub fn poll(&mut self) -> usize {
let mut completed = 0;
for ptr in self.deferred_free.drain(..) {
let key = unsafe { task::tracker_key(ptr) } as usize;
unsafe { task::free_task(ptr) };
if self.all_tasks.contains(key) {
self.all_tasks.remove(key);
}
}
std::mem::swap(&mut self.incoming, &mut self.draining);
let _guard = set_poll_context(&mut self.incoming, &mut self.deferred_free);
let limit = self.tasks_per_cycle.min(self.draining.len());
let draining_ptr: *const Vec<*mut u8> = &raw const self.draining;
let drain_slice = unsafe { &(&*draining_ptr)[..limit] };
for &ptr in drain_slice {
if unsafe { task::is_completed(ptr) } {
continue;
}
unsafe { task::set_queued(ptr, false) };
let waker = unsafe { crate::waker::task_waker(ptr) };
let mut cx = Context::from_waker(&waker);
let poll_result = unsafe { task::poll_task(ptr, &mut cx) };
drop(waker);
match poll_result {
Poll::Pending => {}
Poll::Ready(()) => {
self.complete_task(ptr);
completed += 1;
}
}
}
if limit < self.draining.len() {
self.incoming.extend_from_slice(&self.draining[limit..]);
}
self.draining.clear();
completed
}
pub fn task_count(&self) -> usize {
self.live_count
}
pub fn has_ready(&self) -> bool {
!self.incoming.is_empty()
}
pub fn set_tasks_per_cycle(&mut self, limit: usize) {
self.tasks_per_cycle = limit;
}
fn complete_task(&mut self, ptr: *mut u8) {
let aborted = unsafe { task::is_aborted(ptr) };
if aborted {
unsafe { task::drop_task_future(ptr) };
unsafe { task::set_completed(ptr) };
self.live_count -= 1;
if unsafe { task::has_join(ptr) } {
let waker = unsafe { task::take_join_waker(ptr) };
if let Some(w) = waker {
w.wake();
}
}
let should_free = unsafe { task::ref_dec(ptr) };
if should_free {
let key = unsafe { task::tracker_key(ptr) } as usize;
unsafe { task::free_task(ptr) };
self.all_tasks.remove(key);
}
} else if unsafe { task::has_join(ptr) } {
unsafe { task::set_completed(ptr) };
self.live_count -= 1;
let waker = unsafe { task::take_join_waker(ptr) };
if let Some(w) = waker {
w.wake();
}
let should_free = unsafe { task::ref_dec(ptr) };
if should_free {
unsafe { task::drop_task_future(ptr) };
let key = unsafe { task::tracker_key(ptr) } as usize;
unsafe { task::free_task(ptr) };
self.all_tasks.remove(key);
}
} else {
unsafe { task::drop_task_future(ptr) };
unsafe { task::set_completed(ptr) };
self.live_count -= 1;
let should_free = unsafe { task::ref_dec(ptr) };
if should_free {
let key = unsafe { task::tracker_key(ptr) } as usize;
unsafe { task::free_task(ptr) };
self.all_tasks.remove(key);
}
}
}
pub(crate) fn poll_context_mut(&mut self) -> (&mut Vec<*mut u8>, &mut Vec<*mut u8>) {
(&mut self.incoming, &mut self.deferred_free)
}
pub fn drain(&mut self) {
while self.task_count() > 0 {
if self.has_ready() {
self.poll();
} else {
std::thread::yield_now();
}
}
}
#[allow(dead_code)]
pub(crate) fn cancel(&mut self, id: task::TaskId) {
let ptr = id.0;
if unsafe { task::is_completed(ptr) } {
return;
}
self.incoming.retain(|p| *p != ptr);
self.draining.retain(|p| *p != ptr);
self.complete_task(ptr);
}
}
impl Drop for Executor {
fn drop(&mut self) {
for ptr in self.deferred_free.drain(..) {
let key = unsafe { task::tracker_key(ptr) } as usize;
unsafe { task::free_task(ptr) };
if self.all_tasks.contains(key) {
self.all_tasks.remove(key);
}
}
for (_, &ptr) in &self.all_tasks {
if !unsafe { task::is_completed(ptr) } {
unsafe { task::drop_task_future(ptr) };
unsafe { task::set_completed(ptr) };
unsafe { task::ref_dec(ptr) };
}
let rc = unsafe { task::ref_count(ptr) };
if rc > 0 {
debug_assert!(
false,
"executor dropped with {rc} outstanding reference(s) — \
all wakers and JoinHandles must be dropped before the Runtime",
);
continue;
}
unsafe { task::free_task(ptr) };
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::hint::black_box;
use std::pin::Pin;
use task::Task;
fn test_executor() -> Executor {
Executor::new(16)
}
#[test]
fn spawn_and_poll_single_task() {
let mut exec = test_executor();
let mut done = false;
let flag = &raw mut done;
exec.spawn_boxed(async move {
unsafe { *flag = true };
});
assert_eq!(exec.task_count(), 1);
let completed = exec.poll();
assert_eq!(completed, 1);
assert!(done);
assert_eq!(exec.task_count(), 0);
}
#[test]
fn spawn_multiple_tasks() {
let mut exec = test_executor();
for _ in 0..8 {
exec.spawn_boxed(async {});
}
assert_eq!(exec.task_count(), 8);
let completed = exec.poll();
assert_eq!(completed, 8);
assert_eq!(exec.task_count(), 0);
}
#[test]
fn pending_task_not_completed() {
let mut exec = test_executor();
exec.spawn_boxed(std::future::pending::<()>());
let completed = exec.poll();
assert_eq!(completed, 0);
assert_eq!(exec.task_count(), 1);
}
#[test]
fn immediate_task_completes() {
let mut exec = test_executor();
exec.spawn_boxed(async {
});
let completed = exec.poll();
assert_eq!(completed, 1);
assert_eq!(exec.task_count(), 0);
}
#[test]
fn self_waking_task_polled_again() {
use std::cell::Cell;
use std::rc::Rc;
let mut exec = test_executor();
let counter = Rc::new(Cell::new(0u32));
let c = counter.clone();
exec.spawn_boxed(async move {
struct SelfWake {
counter: Rc<Cell<u32>>,
}
impl Future for SelfWake {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let n = self.counter.get();
self.counter.set(n + 1);
if n < 3 {
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}
SelfWake { counter: c }.await;
});
let mut total = 0;
for _ in 0..10 {
total += exec.poll();
if exec.task_count() == 0 {
break;
}
}
assert_eq!(total, 1); assert_eq!(counter.get(), 4); }
#[test]
fn abort_task() {
let mut exec = test_executor();
let handle = exec.spawn_boxed(std::future::pending::<()>());
assert_eq!(exec.task_count(), 1);
assert!(handle.abort()); exec.poll(); assert_eq!(exec.task_count(), 0);
}
#[test]
fn abort_frees_slot_for_reuse() {
let mut exec = test_executor();
let handle = exec.spawn_boxed(std::future::pending::<()>());
handle.abort();
exec.poll();
exec.spawn_boxed(async {});
assert_eq!(exec.task_count(), 1);
exec.poll();
assert_eq!(exec.task_count(), 0);
}
#[test]
fn poll_limit_respected() {
let mut exec = test_executor();
exec.set_tasks_per_cycle(2);
for _ in 0..5 {
exec.spawn_boxed(async {});
}
let completed = exec.poll();
assert_eq!(completed, 2);
assert_eq!(exec.task_count(), 3);
let completed = exec.poll();
assert_eq!(completed, 2);
assert_eq!(exec.task_count(), 1);
let completed = exec.poll();
assert_eq!(completed, 1);
assert_eq!(exec.task_count(), 0);
}
#[test]
fn cancel_with_stale_ready_entry() {
use std::cell::Cell;
use std::rc::Rc;
let mut exec = test_executor();
let polled = Rc::new(Cell::new(false));
let p = polled.clone();
struct WakeOnce(bool);
impl Future for WakeOnce {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if !self.0 {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}
let handle = exec.spawn_boxed(WakeOnce(false));
exec.poll();
handle.abort();
exec.spawn_boxed(async move {
p.set(true);
});
exec.poll(); assert!(polled.get());
}
#[test]
fn refcount_starts_at_one() {
let task = Box::new(Task::new_boxed(async {}, 0));
let ptr = Box::into_raw(task) as *mut u8;
assert_eq!(unsafe { task::ref_count(ptr) }, 1);
unsafe { task::free_task(ptr) };
}
#[test]
fn executor_drop_cleans_up_queued_tasks() {
let mut exec = test_executor();
exec.spawn_boxed(std::future::pending::<()>());
exec.spawn_boxed(std::future::pending::<()>());
exec.poll(); drop(exec);
}
#[test]
#[ignore]
fn dispatch_latency() {
use std::time::Instant;
struct Noop;
impl Future for Noop {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
cx.waker().wake_by_ref();
Poll::Pending
}
}
let mut exec = test_executor();
exec.spawn_boxed(Noop);
for _ in 0..10_000 {
exec.poll();
}
let iters = 100_000;
let start = Instant::now();
for _ in 0..iters {
exec.poll();
}
let elapsed = start.elapsed();
let ns_per = elapsed.as_nanos() / iters;
println!("dispatch: {ns_per} ns/poll (Box-allocated)");
black_box(ns_per);
}
}