use std::cell::Cell;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Wake, Waker};
use std::time::{Duration, Instant};
use crate::{Executor, TaskId, WorldCtx};
use crate::io::IoDriver;
use crate::timer::TimerDriver;
thread_local! {
static CURRENT: Cell<*mut Executor> = const { Cell::new(std::ptr::null_mut()) };
}
pub fn spawn_boxed<F>(future: F) -> TaskId
where
F: Future<Output = ()> + 'static,
{
CURRENT.with(|cell| {
let ptr = cell.get();
assert!(!ptr.is_null(), "spawn_boxed() called outside of Runtime::block_on");
let executor = unsafe { &mut *ptr };
executor.spawn_boxed(future)
})
}
pub fn spawn_slab<F>(future: F) -> TaskId
where
F: Future<Output = ()> + 'static,
{
CURRENT.with(|cell| {
let ptr = cell.get();
assert!(!ptr.is_null(), "spawn_slab() called outside of Runtime::block_on");
let executor = unsafe { &mut *ptr };
let tracker_key = executor.next_tracker_key();
let task_ptr = crate::alloc::slab_spawn(future, tracker_key);
executor.spawn_raw(task_ptr)
})
}
pub(crate) fn with_executor<R>(f: impl FnOnce(&mut Executor) -> R) -> R {
CURRENT.with(|cell| {
let ptr = cell.get();
assert!(!ptr.is_null(), "called outside of Runtime::block_on");
let executor = unsafe { &mut *ptr };
f(executor)
})
}
pub fn try_claim_slab() -> Option<crate::alloc::SlabClaim> {
CURRENT.with(|cell| {
assert!(!cell.get().is_null(), "try_claim_slab() called outside of Runtime::block_on");
});
crate::alloc::try_claim()
}
pub fn claim_slab() -> crate::alloc::SlabClaim {
CURRENT.with(|cell| {
assert!(!cell.get().is_null(), "claim_slab() called outside of Runtime::block_on");
});
crate::alloc::claim()
}
pub struct Runtime {
executor: Executor,
io: IoDriver,
timers: TimerDriver,
ctx: WorldCtx,
event_time: Cell<Instant>,
shutdown: crate::ShutdownHandle,
_slab: Option<Box<dyn std::any::Any>>,
slab_tls: Option<crate::alloc::SlabTlsConfig>,
}
impl Runtime {
pub fn new(world: &mut nexus_rt::World) -> Self {
RuntimeBuilder::new(world).build()
}
pub fn builder(world: &mut nexus_rt::World) -> RuntimeBuilder<'_> {
RuntimeBuilder::new(world)
}
pub fn shutdown_handle(&self) -> crate::ShutdownHandle {
self.shutdown.clone()
}
pub fn install_signal_handlers(&self) {
crate::shutdown::install_signal_handlers(
&self.shutdown.flag_ptr(),
&self.io.mio_waker(),
);
}
pub fn task_count(&self) -> usize {
self.executor.task_count()
}
}
type SlabInstaller = Box<dyn FnOnce() -> (Box<dyn std::any::Any>, crate::alloc::SlabTlsConfig)>;
pub struct RuntimeBuilder<'w> {
world: &'w mut nexus_rt::World,
tasks_per_cycle: usize,
queue_capacity: usize,
event_capacity: usize,
token_capacity: usize,
signal_handlers: bool,
slab_installer: Option<SlabInstaller>,
}
impl<'w> RuntimeBuilder<'w> {
fn new(world: &'w mut nexus_rt::World) -> Self {
Self {
world,
tasks_per_cycle: crate::DEFAULT_TASKS_PER_CYCLE,
queue_capacity: 64,
event_capacity: 1024,
token_capacity: 64,
signal_handlers: false,
slab_installer: None,
}
}
pub fn tasks_per_cycle(mut self, limit: usize) -> Self {
self.tasks_per_cycle = limit;
self
}
pub fn queue_capacity(mut self, cap: usize) -> Self {
self.queue_capacity = cap;
self
}
pub fn event_capacity(mut self, cap: usize) -> Self {
self.event_capacity = cap;
self
}
pub fn token_capacity(mut self, cap: usize) -> Self {
self.token_capacity = cap;
self
}
pub fn signal_handlers(mut self, enable: bool) -> Self {
self.signal_handlers = enable;
self
}
pub fn slab_unbounded<const S: usize>(
mut self,
slab: nexus_slab::byte::unbounded::Slab<S>,
) -> Self {
const {
assert!(S >= 64,
"slab slot size must be at least 64 bytes (32 for task header + 32 for future)");
}
self.slab_installer = Some(Box::new(move || {
let slab = Box::new(slab);
let slab_ptr = std::ptr::from_ref(slab.as_ref()).cast::<u8>();
let config = crate::alloc::make_unbounded_config::<S>(slab_ptr);
(slab as Box<dyn std::any::Any>, config)
}));
self
}
pub fn slab_bounded<const S: usize>(
mut self,
slab: nexus_slab::byte::bounded::Slab<S>,
) -> Self {
const {
assert!(S >= 64,
"slab slot size must be at least 64 bytes (32 for task header + 32 for future)");
}
self.slab_installer = Some(Box::new(move || {
let slab = Box::new(slab);
let slab_ptr = std::ptr::from_ref(slab.as_ref()).cast::<u8>();
let config = crate::alloc::make_bounded_config::<S>(slab_ptr);
(slab as Box<dyn std::any::Any>, config)
}));
self
}
pub fn build(self) -> Runtime {
let io = IoDriver::new(self.event_capacity, self.token_capacity)
.expect("failed to create mio::Poll");
let mut shutdown = crate::ShutdownHandle::new();
shutdown.set_mio_waker(io.mio_waker());
let mut executor = Executor::new(self.queue_capacity);
executor.set_tasks_per_cycle(self.tasks_per_cycle);
let ctx = WorldCtx::new(self.world);
let event_time = Cell::new(Instant::now());
let (slab, slab_tls) = self
.slab_installer
.map_or((None, None), |install| {
let (slab, config) = install();
(Some(slab), Some(config))
});
let rt = Runtime {
executor,
io,
timers: TimerDriver::new(64),
ctx,
event_time,
shutdown,
_slab: slab,
slab_tls,
};
if self.signal_handlers {
rt.install_signal_handlers();
}
rt
}
}
impl Runtime {
pub fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future + 'static,
{
self.run_loop(future, ParkMode::Park)
}
pub fn block_on_busy<F>(&mut self, future: F) -> F::Output
where
F: Future + 'static,
{
self.run_loop(future, ParkMode::Spin)
}
fn run_loop<F>(&mut self, future: F, mode: ParkMode) -> F::Output
where
F: Future + 'static,
{
let _ctx_guard = crate::context::install(
self.ctx.as_ptr(),
&raw mut self.io,
&raw mut self.timers,
&raw const self.event_time,
std::sync::Arc::as_ptr(&self.shutdown.flag_ptr()),
);
let _slab_guard = self.slab_tls.as_ref().map(crate::alloc::install_slab);
let mut root: Pin<Box<dyn Future<Output = F::Output>>> = Box::pin(future);
let woken = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
let root_waker = Waker::from(std::sync::Arc::new(RootWake {
woken: std::sync::Arc::clone(&woken),
mio_waker: self.io.mio_waker(),
}));
let mut root_cx = Context::from_waker(&root_waker);
let _spawn_guard = RuntimeGuard::enter(&raw mut self.executor);
let (ready, deferred) = self.executor.poll_context_mut();
let _ready_guard = crate::waker::set_poll_context(ready, deferred);
self.event_time.set(Instant::now());
loop {
if woken.swap(false, std::sync::atomic::Ordering::Acquire)
|| self.shutdown.is_shutdown()
{
match root.as_mut().poll(&mut root_cx) {
Poll::Ready(output) => return output,
Poll::Pending => {}
}
}
self.executor.poll();
let now = Instant::now();
self.timers.fire_expired(now);
let has_work = self.executor.has_ready()
|| woken.load(std::sync::atomic::Ordering::Acquire);
let mio_timeout = match mode {
ParkMode::Park => {
if has_work {
Some(Duration::ZERO)
} else {
self.timers.next_deadline().map(|deadline| {
deadline.saturating_duration_since(Instant::now())
})
}
}
ParkMode::Spin => Some(Duration::ZERO),
};
if let Err(e) = self.io.poll_io(mio_timeout) {
assert!(
e.kind() == std::io::ErrorKind::Interrupted,
"mio::Poll::poll failed: {e}"
);
}
self.event_time.set(Instant::now());
}
}
}
#[derive(Clone, Copy)]
enum ParkMode {
Park,
Spin,
}
struct RootWake {
woken: std::sync::Arc<std::sync::atomic::AtomicBool>,
mio_waker: std::sync::Arc<mio::Waker>,
}
impl Wake for RootWake {
fn wake(self: std::sync::Arc<Self>) {
self.wake_by_ref();
}
fn wake_by_ref(self: &std::sync::Arc<Self>) {
let was_woken = self.woken.swap(true, std::sync::atomic::Ordering::Release);
if !was_woken {
let _ = self.mio_waker.wake();
}
}
}
struct RuntimeGuard {
prev: *mut Executor,
}
impl RuntimeGuard {
fn enter(executor: *mut Executor) -> Self {
let prev = CURRENT.with(|cell| cell.replace(executor));
Self { prev }
}
}
impl Drop for RuntimeGuard {
fn drop(&mut self) {
CURRENT.with(|cell| cell.set(self.prev));
}
}
#[cfg(test)]
mod tests {
use super::*;
use nexus_rt::{Handler, IntoHandler, Res, ResMut, WorldBuilder};
nexus_rt::new_resource!(Val(u64));
nexus_rt::new_resource!(Out(u64));
#[test]
fn block_on_returns_value() {
let mut wb = WorldBuilder::new();
wb.register(Val(42));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let result = rt.block_on(async { 42u64 });
assert_eq!(result, 42);
}
#[test]
fn block_on_with_world_access() {
let mut wb = WorldBuilder::new();
wb.register(Val(42));
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let result = rt.block_on(async move {
crate::context::with_world(|world| {
let v = world.resource::<Val>().0;
world.resource_mut::<Out>().0 = v + 10;
});
crate::context::with_world_ref(|world| world.resource::<Out>().0)
});
assert_eq!(result, 52);
}
#[test]
fn block_on_with_pre_resolved_handler() {
let mut wb = WorldBuilder::new();
wb.register(Val(42));
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let mut h = (|val: Res<Val>, mut out: ResMut<Out>, event: u64| {
out.0 = val.0 + event;
})
.into_handler(world.registry());
let result = rt.block_on(async move {
crate::context::with_world(|world| h.run(world, 10));
crate::context::with_world_ref(|world| world.resource::<Out>().0)
});
assert_eq!(result, 52);
}
#[test]
fn spawn_from_root_future() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async move {
for i in 1..=3u64 {
spawn_boxed(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 += i;
});
});
}
YieldOnce(false).await;
});
assert_eq!(world.resource::<Out>().0, 6);
}
#[test]
fn block_on_busy_returns_value() {
let mut wb = WorldBuilder::new();
wb.register(Val(7));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let result = rt.block_on_busy(async { 6 * 7 });
assert_eq!(result, 42);
}
#[test]
fn block_on_busy_with_spawned_tasks() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on_busy(async move {
spawn_boxed(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 = 99;
});
});
YieldOnce(false).await;
});
assert_eq!(world.resource::<Out>().0, 99);
}
#[test]
fn event_time_is_set() {
let mut wb = WorldBuilder::new();
wb.register(Val(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let before = Instant::now();
rt.block_on(async move {
let t = crate::context::event_time();
assert!(t >= before);
});
}
#[test]
#[should_panic(expected = "spawn_boxed() called outside of Runtime::block_on")]
fn spawn_outside_runtime_panics() {
spawn_boxed(async {});
}
fn test_slab() -> nexus_slab::byte::unbounded::Slab<256> {
unsafe { nexus_slab::byte::unbounded::Slab::with_chunk_capacity(16) }
}
#[test]
#[should_panic(expected = "spawn_slab() called without a slab")]
fn spawn_slab_without_slab_panics() {
let mut wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async {
spawn_slab(async {});
});
}
#[test]
fn spawn_slab_with_slab() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::builder(&mut world)
.slab_unbounded(test_slab())
.build();
rt.block_on(async move {
spawn_slab(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 = 77;
});
});
YieldOnce(false).await;
});
assert_eq!(world.resource::<Out>().0, 77);
}
#[test]
fn mixed_spawn_and_spawn_slab() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::builder(&mut world)
.slab_unbounded(test_slab())
.build();
rt.block_on(async move {
spawn_boxed(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 += 10;
});
});
spawn_slab(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 += 20;
});
});
YieldOnce(false).await;
});
assert_eq!(world.resource::<Out>().0, 30);
}
#[test]
fn claim_slab_spawn_executes() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::builder(&mut world)
.slab_unbounded(test_slab())
.build();
rt.block_on(async move {
let claim = claim_slab();
claim.spawn(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 = 55;
});
});
YieldOnce(false).await;
});
assert_eq!(world.resource::<Out>().0, 55);
}
#[test]
fn claim_slab_drop_returns_slot() {
let mut wb = WorldBuilder::new();
let mut world = wb.build();
let bounded = unsafe { nexus_slab::byte::bounded::Slab::<256>::with_capacity(1) };
let mut rt = Runtime::builder(&mut world)
.slab_bounded(bounded)
.build();
rt.block_on(async {
let claim = claim_slab();
drop(claim);
let claim = claim_slab();
claim.spawn(async {});
YieldOnce(false).await;
});
}
#[test]
fn try_claim_slab_returns_none_when_full() {
let mut wb = WorldBuilder::new();
let mut world = wb.build();
let bounded = unsafe { nexus_slab::byte::bounded::Slab::<256>::with_capacity(1) };
let mut rt = Runtime::builder(&mut world)
.slab_bounded(bounded)
.build();
rt.block_on(async {
let _held = claim_slab(); assert!(try_claim_slab().is_none());
});
}
#[test]
fn mixed_spawn_boxed_and_claim_slab() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::builder(&mut world)
.slab_unbounded(test_slab())
.build();
rt.block_on(async move {
spawn_boxed(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 += 10;
});
});
let claim = claim_slab();
claim.spawn(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 += 20;
});
});
YieldOnce(false).await;
});
assert_eq!(world.resource::<Out>().0, 30);
}
#[test]
fn sleep_completes() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let before = Instant::now();
rt.block_on(async move {
crate::context::sleep(Duration::from_millis(50)).await;
});
let elapsed = before.elapsed();
assert!(elapsed >= Duration::from_millis(40), "elapsed {elapsed:?} too short");
assert!(elapsed < Duration::from_millis(200), "elapsed {elapsed:?} too long");
}
#[test]
fn sleep_in_spawned_task() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let before = Instant::now();
rt.block_on(async move {
spawn_boxed(async move {
crate::context::sleep(Duration::from_millis(50)).await;
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 = 42;
});
});
crate::context::sleep(Duration::from_millis(100)).await;
});
let elapsed = before.elapsed();
assert!(elapsed >= Duration::from_millis(80));
assert_eq!(world.resource::<Out>().0, 42);
}
#[test]
fn sleep_zero_duration_ready_immediately() {
let mut wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let before = Instant::now();
rt.block_on(async move {
crate::context::sleep(Duration::ZERO).await;
});
assert!(before.elapsed() < Duration::from_millis(10));
}
#[test]
fn sleep_past_deadline_ready_immediately() {
let mut wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let past = Instant::now() - Duration::from_secs(1);
let before = Instant::now();
rt.block_on(async move {
crate::context::sleep_until(past).await;
});
assert!(before.elapsed() < Duration::from_millis(10));
}
#[test]
fn timeout_completes_before_deadline() {
let mut wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let result = rt.block_on(async {
crate::context::timeout(
Duration::from_millis(500),
async { 42u64 },
).await
});
assert_eq!(result.unwrap(), 42);
}
#[test]
fn timeout_expires() {
let mut wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let result = rt.block_on(async {
crate::context::timeout(
Duration::from_millis(10),
crate::context::sleep(Duration::from_secs(10)),
).await
});
assert!(result.is_err());
}
#[test]
fn interval_ticks() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let before = Instant::now();
rt.block_on(async move {
let mut iv = crate::context::interval(Duration::from_millis(20));
iv.tick().await; iv.tick().await; iv.tick().await; });
let elapsed = before.elapsed();
assert!(elapsed >= Duration::from_millis(50), "too fast: {elapsed:?}");
assert!(elapsed < Duration::from_millis(200), "too slow: {elapsed:?}");
}
#[test]
fn yield_now_lets_other_tasks_run() {
let mut wb = WorldBuilder::new();
wb.register(Out(0));
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async move {
spawn_boxed(async move {
crate::context::with_world(|world| {
world.resource_mut::<Out>().0 = 99;
});
});
crate::context::yield_now().await;
let val = crate::context::with_world_ref(|world| world.resource::<Out>().0);
assert_eq!(val, 99);
});
}
struct YieldOnce(bool);
impl Future for YieldOnce {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.0 {
Poll::Ready(())
} else {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
}