use std::cell::Cell;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll, Wake, Waker};
use std::time::{Duration, Instant};
use crate::io::IoDriver;
use crate::task::JoinHandle;
use crate::timer::TimerDriver;
use crate::{Executor, WorldCtx};
const DEFAULT_EVENT_INTERVAL: u32 = 61;
thread_local! {
static CURRENT: Cell<*mut Executor> = const { Cell::new(std::ptr::null_mut()) };
}
pub fn spawn_boxed<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
CURRENT.with(|cell| {
let ptr = cell.get();
assert!(
!ptr.is_null(),
"spawn_boxed() called outside of Runtime::block_on"
);
let executor = unsafe { &mut *ptr };
executor.spawn_boxed(future)
})
}
pub fn spawn_slab<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
CURRENT.with(|cell| {
let ptr = cell.get();
assert!(
!ptr.is_null(),
"spawn_slab() called outside of Runtime::block_on"
);
let executor = unsafe { &mut *ptr };
let tracker_key = executor.next_tracker_key();
let task_ptr = crate::alloc::slab_spawn(future, tracker_key);
executor.spawn_raw(task_ptr);
JoinHandle::new(task_ptr)
})
}
pub(crate) fn with_executor<R>(f: impl FnOnce(&mut Executor) -> R) -> R {
CURRENT.with(|cell| {
let ptr = cell.get();
assert!(!ptr.is_null(), "called outside of Runtime::block_on");
let executor = unsafe { &mut *ptr };
f(executor)
})
}
pub fn try_claim_slab() -> Option<crate::alloc::SlabClaim> {
CURRENT.with(|cell| {
assert!(
!cell.get().is_null(),
"try_claim_slab() called outside of Runtime::block_on"
);
});
crate::alloc::try_claim()
}
pub fn claim_slab() -> crate::alloc::SlabClaim {
CURRENT.with(|cell| {
assert!(
!cell.get().is_null(),
"claim_slab() called outside of Runtime::block_on"
);
});
crate::alloc::claim()
}
#[repr(C)]
pub struct Runtime {
executor: Executor,
io: std::cell::UnsafeCell<IoDriver>,
timers: std::cell::UnsafeCell<TimerDriver>,
ctx: WorldCtx,
event_time: Cell<Instant>,
shutdown: crate::ShutdownHandle,
cross_wake: std::sync::Arc<crate::cross_wake::CrossWakeContext>,
cross_thread_drain_limit: usize,
event_interval: u32,
_slab_guard: Option<crate::alloc::SlabGuard>,
_runtime_presence: RuntimePresenceGuard,
_not_thread_safe: PhantomData<*const ()>,
}
const _: () = assert!(
std::mem::offset_of!(Runtime, _slab_guard) > std::mem::offset_of!(Runtime, executor),
"BUG-1 (#167) invariant violated: Runtime::_slab_guard MUST be \
declared after Runtime::executor so it drops after the executor \
frees surviving slab tasks. Restore the declaration order or BUG-1 \
reappears as a panic at Runtime::drop."
);
impl Runtime {
pub fn new(world: &mut nexus_rt::World) -> Self {
RuntimeBuilder::new(world).build()
}
pub fn builder(world: &mut nexus_rt::World) -> RuntimeBuilder<'_> {
RuntimeBuilder::new(world)
}
pub fn shutdown_handle(&self) -> crate::ShutdownHandle {
self.shutdown.clone()
}
pub fn install_signal_handlers(&self) {
crate::shutdown::install_signal_handlers(
&self.shutdown.flag_ptr(),
&unsafe { &*self.io.get() }.mio_waker(),
);
}
pub fn task_count(&self) -> usize {
self.executor.task_count()
}
}
type SlabInstaller = Box<dyn FnOnce() -> (Box<dyn std::any::Any>, crate::alloc::SlabTlsConfig)>;
pub struct RuntimeBuilder<'w> {
world: &'w mut nexus_rt::World,
tasks_per_cycle: usize,
cross_thread_drain_limit: usize,
event_interval: u32,
queue_capacity: usize,
event_capacity: usize,
token_capacity: usize,
signal_handlers: bool,
slab_installer: Option<SlabInstaller>,
}
impl<'w> RuntimeBuilder<'w> {
fn new(world: &'w mut nexus_rt::World) -> Self {
Self {
world,
tasks_per_cycle: crate::DEFAULT_TASKS_PER_CYCLE,
cross_thread_drain_limit: usize::MAX,
event_interval: DEFAULT_EVENT_INTERVAL,
queue_capacity: 64,
event_capacity: 1024,
token_capacity: 64,
signal_handlers: false,
slab_installer: None,
}
}
pub fn tasks_per_cycle(mut self, limit: usize) -> Self {
self.tasks_per_cycle = limit;
self
}
pub fn event_interval(mut self, n: u32) -> Self {
assert!(n > 0, "event_interval must be > 0");
self.event_interval = n;
self
}
pub fn cross_thread_drain_limit(mut self, limit: usize) -> Self {
self.cross_thread_drain_limit = limit;
self
}
pub fn queue_capacity(mut self, cap: usize) -> Self {
self.queue_capacity = cap;
self
}
pub fn event_capacity(mut self, cap: usize) -> Self {
self.event_capacity = cap;
self
}
pub fn token_capacity(mut self, cap: usize) -> Self {
self.token_capacity = cap;
self
}
pub fn signal_handlers(mut self, enable: bool) -> Self {
self.signal_handlers = enable;
self
}
pub fn slab_unbounded<const S: usize>(
mut self,
slab: nexus_slab::byte::unbounded::Slab<S>,
) -> Self {
const {
assert!(
S >= 64,
"slab slot size must be at least 64 bytes (TASK_HEADER_SIZE)"
);
}
self.slab_installer = Some(Box::new(move || {
let mut slab = Box::new(slab);
let slab_ptr = std::ptr::from_mut(slab.as_mut()).cast::<u8>();
let config = crate::alloc::make_unbounded_config::<S>(slab_ptr);
(slab as Box<dyn std::any::Any>, config)
}));
self
}
pub fn slab_bounded<const S: usize>(
mut self,
slab: nexus_slab::byte::bounded::Slab<S>,
) -> Self {
const {
assert!(
S >= 64,
"slab slot size must be at least 64 bytes (TASK_HEADER_SIZE)"
);
}
self.slab_installer = Some(Box::new(move || {
let mut slab = Box::new(slab);
let slab_ptr = std::ptr::from_mut(slab.as_mut()).cast::<u8>();
let config = crate::alloc::make_bounded_config::<S>(slab_ptr);
(slab as Box<dyn std::any::Any>, config)
}));
self
}
pub fn build(self) -> Runtime {
let runtime_presence = RuntimePresenceGuard::install();
let io = IoDriver::new(self.event_capacity, self.token_capacity)
.expect("failed to create mio::Poll");
let mut shutdown = crate::ShutdownHandle::new();
shutdown.set_mio_waker(io.mio_waker());
let mut executor = Executor::new(self.queue_capacity);
executor.set_tasks_per_cycle(self.tasks_per_cycle);
let ctx = WorldCtx::new(self.world);
let event_time = Cell::new(Instant::now());
let slab_guard = self.slab_installer.map(|install| {
let (slab, config) = install();
crate::alloc::install_slab(slab, &config)
});
let cross_wake = std::sync::Arc::new(crate::cross_wake::CrossWakeContext {
queue: crate::cross_wake::CrossWakeQueue::new(),
mio_waker: io.mio_waker(),
parked: std::sync::atomic::AtomicBool::new(false),
});
let rt = Runtime {
executor,
io: std::cell::UnsafeCell::new(io),
timers: std::cell::UnsafeCell::new(TimerDriver::new(64)),
ctx,
event_time,
shutdown,
cross_wake,
cross_thread_drain_limit: self.cross_thread_drain_limit,
event_interval: self.event_interval,
_slab_guard: slab_guard,
_runtime_presence: runtime_presence,
_not_thread_safe: PhantomData,
};
if self.signal_handlers {
rt.install_signal_handlers();
}
rt
}
}
impl Runtime {
pub fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future + 'static,
{
self.run_loop(future, ParkMode::Park)
}
pub fn block_on_busy<F>(&mut self, future: F) -> F::Output
where
F: Future + 'static,
{
self.run_loop(future, ParkMode::Spin)
}
fn run_loop<F>(&mut self, future: F, mode: ParkMode) -> F::Output
where
F: Future + 'static,
{
let _ctx_guard = crate::context::install(
self.ctx.as_ptr(),
self.io.get(),
self.timers.get(),
&raw const self.event_time,
std::sync::Arc::as_ptr(&self.shutdown.flag_ptr()),
std::ptr::from_ref(&self.shutdown.task_waker),
);
let _cross_wake_guard = crate::cross_wake::install_cross_wake(&self.cross_wake);
let mut root: Pin<Box<dyn Future<Output = F::Output>>> = Box::pin(future);
let woken = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
let root_waker = Waker::from(std::sync::Arc::new(RootWake {
woken: std::sync::Arc::clone(&woken),
mio_waker: unsafe { &*self.io.get() }.mio_waker(),
}));
let mut root_cx = Context::from_waker(&root_waker);
let _spawn_guard = RuntimeGuard::enter(&raw mut self.executor);
let (ready, deferred) = self.executor.poll_context_ptrs();
let _ready_guard = crate::waker::set_poll_context(ready, deferred);
self.event_time.set(Instant::now());
let cross_queue = &*self.cross_wake;
let mut tick: u32 = 0;
loop {
if woken.swap(false, std::sync::atomic::Ordering::Acquire)
|| self.shutdown.is_shutdown()
{
match root.as_mut().poll(&mut root_cx) {
Poll::Ready(output) => return output,
Poll::Pending => {}
}
}
self.executor
.drain_cross_thread(&cross_queue.queue, self.cross_thread_drain_limit);
self.executor.poll();
unsafe { &mut *self.timers.get() }.fire_expired(Instant::now());
if matches!(mode, ParkMode::Park) {
cross_queue
.parked
.store(true, std::sync::atomic::Ordering::Release);
}
self.executor
.drain_cross_thread(&cross_queue.queue, self.cross_thread_drain_limit);
tick = tick.wrapping_add(1);
if tick % self.event_interval == 0 {
if let Err(e) = unsafe { &mut *self.io.get() }.poll_io(Some(Duration::ZERO)) {
assert!(
e.kind() == std::io::ErrorKind::Interrupted,
"mio::Poll::poll failed: {e}"
);
}
self.event_time.set(Instant::now());
}
let has_work =
self.executor.has_ready() || woken.load(std::sync::atomic::Ordering::Acquire);
if has_work {
if matches!(mode, ParkMode::Park) {
cross_queue
.parked
.store(false, std::sync::atomic::Ordering::Release);
}
continue;
}
match mode {
ParkMode::Spin => {
if let Err(e) = unsafe { &mut *self.io.get() }.poll_io(Some(Duration::ZERO)) {
assert!(
e.kind() == std::io::ErrorKind::Interrupted,
"mio::Poll::poll failed: {e}"
);
}
self.event_time.set(Instant::now());
}
ParkMode::Park => {
let timeout = unsafe { &*self.timers.get() }
.next_deadline()
.map(|d| d.saturating_duration_since(Instant::now()));
if let Err(e) = unsafe { &mut *self.io.get() }.poll_io(timeout) {
assert!(
e.kind() == std::io::ErrorKind::Interrupted,
"mio::Poll::poll failed: {e}"
);
}
cross_queue
.parked
.store(false, std::sync::atomic::Ordering::Release);
self.event_time.set(Instant::now());
}
}
}
}
}
#[derive(Clone, Copy)]
enum ParkMode {
Park,
Spin,
}
struct RootWake {
woken: std::sync::Arc<std::sync::atomic::AtomicBool>,
mio_waker: std::sync::Arc<mio::Waker>,
}
impl Wake for RootWake {
fn wake(self: std::sync::Arc<Self>) {
self.wake_by_ref();
}
fn wake_by_ref(self: &std::sync::Arc<Self>) {
let was_woken = self.woken.swap(true, std::sync::atomic::Ordering::Release);
if !was_woken {
let _ = self.mio_waker.wake();
}
}
}
struct RuntimeGuard {
prev: *mut Executor,
}
impl RuntimeGuard {
fn enter(executor: *mut Executor) -> Self {
let prev = CURRENT.with(|cell| cell.replace(executor));
Self { prev }
}
}
impl Drop for RuntimeGuard {
fn drop(&mut self) {
CURRENT.with(|cell| cell.set(self.prev));
}
}
thread_local! {
static RUNTIME_PRESENT: Cell<bool> = const { Cell::new(false) };
}
pub(crate) struct RuntimePresenceGuard;
impl RuntimePresenceGuard {
fn install() -> Self {
assert!(
!RUNTIME_PRESENT.with(Cell::get),
"nexus-async-rt: another Runtime is already alive on this \
thread. Only one Runtime is supported per thread because \
thread-local state (slab dispatch, IO/timer drivers, \
cross-thread wake context) cannot be shared between \
Runtimes. Drop the existing Runtime first."
);
RUNTIME_PRESENT.with(|c| c.set(true));
Self
}
}
impl Drop for RuntimePresenceGuard {
fn drop(&mut self) {
RUNTIME_PRESENT.with(|c| c.set(false));
}
}
#[cfg(test)]
mod tests {
use super::*;
use nexus_rt::{Handler, IntoHandler, Res, ResMut, WorldBuilder};
nexus_rt::new_resource!(Val(u64));
nexus_rt::new_resource!(Out(u64));
#[test]
fn block_on_returns_value() {
let mut wb = WorldBuilder::new();
wb.register(Val(42));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let result = rt.block_on(async { 42u64 });
assert_eq!(result, 42);
}
#[test]
fn block_on_with_world_access() {
let mut wb = WorldBuilder::new();
wb.register(Val(42));
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let result = rt.block_on(async move {
crate::context::with_world(|world| {
let v = world.resource::<Val>().0;
world.resource_mut::<Out>().0 = v + 10;
});
crate::context::with_world_ref(|world| world.resource::<Out>().0)
});
assert_eq!(result, 52);
}
#[test]
fn block_on_with_pre_resolved_handler() {
let mut wb = WorldBuilder::new();
wb.register(Val(42));
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let mut h = (|val: Res<Val>, mut out: ResMut<Out>, event: u64| {
out.0 = val.0 + event;
})
.into_handler(world.registry());
let result = rt.block_on(async move {
crate::context::with_world(|world| h.run(world, 10));
crate::context::with_world_ref(|world| world.resource::<Out>().0)
});
assert_eq!(result, 52);
}
#[test]
fn spawn_from_root_future() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async move {
for i in 1..=3u64 {
spawn_boxed(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 += i;
});
});
}
YieldOnce(false).await;
});
assert_eq!(world.resource::<Out>().0, 6);
}
#[test]
fn block_on_busy_returns_value() {
let mut wb = WorldBuilder::new();
wb.register(Val(7));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let result = rt.block_on_busy(async { 6 * 7 });
assert_eq!(result, 42);
}
#[test]
fn block_on_busy_with_spawned_tasks() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on_busy(async move {
spawn_boxed(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 = 99;
});
});
YieldOnce(false).await;
});
assert_eq!(world.resource::<Out>().0, 99);
}
#[test]
fn event_time_is_set() {
let mut wb = WorldBuilder::new();
wb.register(Val(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let before = Instant::now();
rt.block_on(async move {
let t = crate::context::event_time();
assert!(t >= before);
});
}
#[test]
#[should_panic(expected = "spawn_boxed() called outside of Runtime::block_on")]
fn spawn_outside_runtime_panics() {
spawn_boxed(async {});
}
fn test_slab() -> nexus_slab::byte::unbounded::Slab<256> {
unsafe { nexus_slab::byte::unbounded::Slab::with_chunk_capacity(16) }
}
#[test]
#[should_panic(expected = "spawn_slab() called without a slab")]
fn spawn_slab_without_slab_panics() {
let mut wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async {
spawn_slab(async {});
});
}
#[test]
fn spawn_slab_with_slab() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::builder(&mut world)
.slab_unbounded(test_slab())
.build();
rt.block_on(async move {
spawn_slab(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 = 77;
});
});
YieldOnce(false).await;
});
assert_eq!(world.resource::<Out>().0, 77);
}
#[test]
fn mixed_spawn_and_spawn_slab() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::builder(&mut world)
.slab_unbounded(test_slab())
.build();
rt.block_on(async move {
spawn_boxed(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 += 10;
});
});
spawn_slab(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 += 20;
});
});
YieldOnce(false).await;
});
assert_eq!(world.resource::<Out>().0, 30);
}
#[test]
fn claim_slab_spawn_executes() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::builder(&mut world)
.slab_unbounded(test_slab())
.build();
rt.block_on(async move {
let claim = claim_slab();
claim.spawn(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 = 55;
});
});
YieldOnce(false).await;
});
assert_eq!(world.resource::<Out>().0, 55);
}
#[test]
fn claim_slab_drop_returns_slot() {
let mut wb = WorldBuilder::new();
let mut world = wb.build();
let bounded = unsafe { nexus_slab::byte::bounded::Slab::<256>::with_capacity(1) };
let mut rt = Runtime::builder(&mut world).slab_bounded(bounded).build();
rt.block_on(async {
let claim = claim_slab();
drop(claim);
let claim = claim_slab();
claim.spawn(async {});
YieldOnce(false).await;
});
}
#[test]
fn try_claim_slab_returns_none_when_full() {
let mut wb = WorldBuilder::new();
let mut world = wb.build();
let bounded = unsafe { nexus_slab::byte::bounded::Slab::<256>::with_capacity(1) };
let mut rt = Runtime::builder(&mut world).slab_bounded(bounded).build();
rt.block_on(async {
let _held = claim_slab(); assert!(try_claim_slab().is_none());
});
}
#[test]
fn mixed_spawn_boxed_and_claim_slab() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::builder(&mut world)
.slab_unbounded(test_slab())
.build();
rt.block_on(async move {
spawn_boxed(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 += 10;
});
});
let claim = claim_slab();
claim.spawn(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 += 20;
});
});
YieldOnce(false).await;
});
assert_eq!(world.resource::<Out>().0, 30);
}
#[test]
fn sleep_completes() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let before = Instant::now();
rt.block_on(async move {
crate::context::sleep(Duration::from_millis(50)).await;
});
let elapsed = before.elapsed();
assert!(
elapsed >= Duration::from_millis(40),
"elapsed {elapsed:?} too short"
);
assert!(
elapsed < Duration::from_millis(200),
"elapsed {elapsed:?} too long"
);
}
#[test]
fn sleep_in_spawned_task() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let before = Instant::now();
rt.block_on(async move {
spawn_boxed(async move {
crate::context::sleep(Duration::from_millis(50)).await;
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 = 42;
});
});
crate::context::sleep(Duration::from_millis(100)).await;
});
let elapsed = before.elapsed();
assert!(elapsed >= Duration::from_millis(80));
assert_eq!(world.resource::<Out>().0, 42);
}
#[test]
fn sleep_zero_duration_ready_immediately() {
let mut wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let before = Instant::now();
rt.block_on(async move {
crate::context::sleep(Duration::ZERO).await;
});
assert!(before.elapsed() < Duration::from_millis(10));
}
#[test]
fn sleep_past_deadline_ready_immediately() {
let mut wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let past = Instant::now() - Duration::from_secs(1);
let before = Instant::now();
rt.block_on(async move {
crate::context::sleep_until(past).await;
});
assert!(before.elapsed() < Duration::from_millis(10));
}
#[test]
fn timeout_completes_before_deadline() {
let mut wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let result = rt.block_on(async {
crate::context::timeout(Duration::from_millis(500), async { 42u64 }).await
});
assert_eq!(result.unwrap(), 42);
}
#[test]
fn timeout_expires() {
let mut wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let result = rt.block_on(async {
crate::context::timeout(
Duration::from_millis(10),
crate::context::sleep(Duration::from_secs(10)),
)
.await
});
assert!(result.is_err());
}
#[test]
fn interval_ticks() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let before = Instant::now();
rt.block_on(async move {
let mut iv = crate::context::interval(Duration::from_millis(20));
iv.tick().await; iv.tick().await; iv.tick().await; });
let elapsed = before.elapsed();
assert!(
elapsed >= Duration::from_millis(50),
"too fast: {elapsed:?}"
);
assert!(
elapsed < Duration::from_millis(200),
"too slow: {elapsed:?}"
);
}
#[test]
fn yield_now_lets_other_tasks_run() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async move {
spawn_boxed(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 = 99;
});
});
crate::context::yield_now().await;
let val = crate::context::with_world_ref(|world| world.resource::<Out>().0);
assert_eq!(val, 99);
});
}
struct YieldOnce(bool);
impl Future for YieldOnce {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.0 {
Poll::Ready(())
} else {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[test]
fn join_handle_await_gets_value() {
let wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async {
let handle = spawn_boxed(async { 42u64 });
let result = handle.await;
assert_eq!(result, 42);
});
}
#[test]
fn join_handle_await_string() {
let wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async {
let handle = spawn_boxed(async { String::from("hello world") });
let result = handle.await;
assert_eq!(result, "hello world");
});
}
#[test]
fn join_handle_detach() {
use std::cell::Cell;
use std::rc::Rc;
let wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let ran = Rc::new(Cell::new(false));
let r = ran.clone();
rt.block_on(async move {
drop(spawn_boxed(async move {
r.set(true);
}));
crate::context::yield_now().await;
});
assert!(ran.get());
}
#[test]
fn join_handle_is_finished() {
let wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async {
let handle = spawn_boxed(async { 1 });
assert!(!handle.is_finished());
crate::context::yield_now().await;
assert!(handle.is_finished());
let val = handle.await;
assert_eq!(val, 1);
});
}
#[test]
fn join_handle_abort_returns_true() {
let wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async {
let handle = spawn_boxed(std::future::pending::<()>());
assert!(handle.abort()); });
}
#[test]
fn join_handle_abort_completed_returns_false() {
let wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async {
let handle = spawn_boxed(async { 42 });
crate::context::yield_now().await;
assert!(handle.is_finished());
assert!(!handle.abort()); });
}
#[test]
fn join_handle_drop_after_completion_drops_output() {
use std::cell::Cell;
use std::rc::Rc;
let wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let drop_count = Rc::new(Cell::new(0u32));
let dc = drop_count.clone();
struct DropCounter(Rc<Cell<u32>>);
impl Drop for DropCounter {
fn drop(&mut self) {
self.0.set(self.0.get() + 1);
}
}
rt.block_on(async move {
let handle = spawn_boxed(async move { DropCounter(dc) });
crate::context::yield_now().await;
assert!(handle.is_finished());
drop(handle);
});
assert_eq!(drop_count.get(), 1, "output should be dropped exactly once");
}
#[test]
fn join_handle_multiple_concurrent() {
let wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async {
let h1 = spawn_boxed(async { 10u64 });
let h2 = spawn_boxed(async { 20u64 });
let h3 = spawn_boxed(async { 30u64 });
let r3 = h3.await;
let r1 = h1.await;
let r2 = h2.await;
assert_eq!(r1, 10);
assert_eq!(r2, 20);
assert_eq!(r3, 30);
});
}
#[test]
fn join_handle_output_larger_than_future() {
let wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async {
let handle = spawn_boxed(async { [42u64; 32] });
let result = handle.await;
assert_eq!(result[0], 42);
assert_eq!(result[31], 42);
});
}
}