nexus-async-rt 0.4.1

Single-threaded async executor with pre-allocated task storage
Documentation
//! Thread-local runtime context.
//!
//! All runtime state is accessible via free functions that read from
//! thread-local storage. The TLS slots are set by [`Runtime::block_on`]
//! and cleared on exit. All const-initialized for zero first-access cost.
//!
//! ```ignore
//! use nexus_async_rt::{spawn, with_world, sleep, io, shutdown_signal};
//!
//! rt.block_on(async {
//!     spawn(async {
//!         with_world(|world| { /* ... */ });
//!         sleep(Duration::from_secs(1)).await;
//!         let listener = TcpListener::bind(addr, io());
//!     });
//!     shutdown_signal().await;
//! });
//! ```

use std::cell::Cell;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::{Duration, Instant};

use crate::io::{IoDriver, IoHandle};
use crate::timer::{TimerDriver, TimerHandle};

// =============================================================================
// TLS slots — const-initialized, zero first-access cost
// =============================================================================

thread_local! {
    static CTX_WORLD: Cell<*mut nexus_rt::World> =
        const { Cell::new(std::ptr::null_mut()) };
    static CTX_IO: Cell<*mut IoDriver> =
        const { Cell::new(std::ptr::null_mut()) };
    static CTX_TIMER: Cell<*mut TimerDriver> =
        const { Cell::new(std::ptr::null_mut()) };
    static CTX_EVENT_TIME: Cell<*const Cell<Instant>> =
        const { Cell::new(std::ptr::null()) };
    static CTX_SHUTDOWN: Cell<*const AtomicBool> =
        const { Cell::new(std::ptr::null()) };
    static CTX_SHUTDOWN_WAKER: Cell<*const Arc<std::sync::Mutex<Option<std::task::Waker>>>> =
        const { Cell::new(std::ptr::null()) };
}

// =============================================================================
// Install / clear (called by Runtime::block_on)
// =============================================================================

/// Install runtime context into TLS. Called by RuntimeBuilder::build().
/// The context stays installed until the guard is dropped (Runtime::drop).
pub(crate) fn install(
    world: *mut nexus_rt::World,
    io: *mut IoDriver,
    timer: *mut TimerDriver,
    event_time: *const Cell<Instant>,
    shutdown_flag: *const AtomicBool,
    shutdown_waker: *const Arc<std::sync::Mutex<Option<std::task::Waker>>>,
) -> ContextGuard {
    let prev = PrevContext {
        world: CTX_WORLD.with(|c| c.replace(world)),
        io: CTX_IO.with(|c| c.replace(io)),
        timer: CTX_TIMER.with(|c| c.replace(timer)),
        event_time: CTX_EVENT_TIME.with(|c| c.replace(event_time)),
        shutdown: CTX_SHUTDOWN.with(|c| c.replace(shutdown_flag)),
        shutdown_waker: CTX_SHUTDOWN_WAKER.with(|c| c.replace(shutdown_waker)),
    };
    ContextGuard { prev }
}

struct PrevContext {
    world: *mut nexus_rt::World,
    io: *mut IoDriver,
    timer: *mut TimerDriver,
    event_time: *const Cell<Instant>,
    shutdown: *const AtomicBool,
    shutdown_waker: *const Arc<std::sync::Mutex<Option<std::task::Waker>>>,
}

pub(crate) struct ContextGuard {
    prev: PrevContext,
}

impl Drop for ContextGuard {
    fn drop(&mut self) {
        CTX_WORLD.with(|c| c.set(self.prev.world));
        CTX_IO.with(|c| c.set(self.prev.io));
        CTX_TIMER.with(|c| c.set(self.prev.timer));
        CTX_EVENT_TIME.with(|c| c.set(self.prev.event_time));
        CTX_SHUTDOWN.with(|c| c.set(self.prev.shutdown));
        CTX_SHUTDOWN_WAKER.with(|c| c.set(self.prev.shutdown_waker));
    }
}

/// Assert that we're inside a runtime context. Panics with `msg` if not.
pub(crate) fn assert_in_runtime(msg: &str) {
    let ptr = CTX_WORLD.with(Cell::get);
    assert!(!ptr.is_null(), "{msg}");
}

// =============================================================================
// Public free functions — the user-facing API
// =============================================================================

/// Access the [`World`](nexus_rt::World) with exclusive access.
///
/// Runs the closure synchronously inline. Must be called from within
/// [`crate::Runtime::block_on`].
///
/// # Panics
///
/// Panics if called outside a runtime context.
pub fn with_world<R>(f: impl FnOnce(&mut nexus_rt::World) -> R) -> R {
    let ptr = CTX_WORLD.with(Cell::get);
    assert!(
        !ptr.is_null(),
        "with_world() called outside Runtime::block_on"
    );
    // SAFETY: ptr set by install(), valid for Runtime lifetime.
    // Single-threaded — exclusive access.
    let world = unsafe { &mut *ptr };
    f(world)
}

/// Access the [`World`](nexus_rt::World) with shared access.
///
/// # Panics
///
/// Panics if called outside a runtime context.
pub fn with_world_ref<R>(f: impl FnOnce(&nexus_rt::World) -> R) -> R {
    let ptr = CTX_WORLD.with(Cell::get);
    assert!(
        !ptr.is_null(),
        "with_world_ref() called outside Runtime::block_on"
    );
    let world = unsafe { &*ptr };
    f(world)
}

/// Returns the IO handle for registering mio sources.
///
/// # Panics
///
/// Panics if called outside a runtime context.
pub fn io() -> IoHandle {
    let ptr = CTX_IO.with(Cell::get);
    assert!(!ptr.is_null(), "io() called outside Runtime::block_on");
    // SAFETY: ptr valid for Runtime lifetime.
    IoHandle::new(unsafe { &mut *ptr })
}

/// Create a [`Sleep`](crate::Sleep) future that completes after `duration`.
///
/// # Panics
///
/// Panics if called outside a runtime context.
pub fn sleep(duration: Duration) -> crate::Sleep {
    let ptr = CTX_TIMER.with(Cell::get);
    assert!(!ptr.is_null(), "sleep() called outside Runtime::block_on");
    // SAFETY: ptr valid for Runtime lifetime.
    let handle = TimerHandle::new(unsafe { &mut *ptr });
    handle.sleep(duration)
}

/// Create a [`Sleep`](crate::Sleep) future that completes at `deadline`.
pub fn sleep_until(deadline: Instant) -> crate::Sleep {
    let ptr = CTX_TIMER.with(Cell::get);
    assert!(
        !ptr.is_null(),
        "sleep_until() called outside Runtime::block_on"
    );
    let handle = TimerHandle::new(unsafe { &mut *ptr });
    handle.sleep_until(deadline)
}

/// Timestamp taken after the most recent IO poll cycle.
///
/// All events dispatched within the same cycle share this timestamp.
/// One clock read per cycle, not per event.
pub fn event_time() -> Instant {
    let ptr = CTX_EVENT_TIME.with(Cell::get);
    assert!(
        !ptr.is_null(),
        "event_time() called outside Runtime::block_on"
    );
    // SAFETY: ptr valid for Runtime lifetime.
    unsafe { &*ptr }.get()
}

/// Wrap a future with a deadline. Returns `Err(Elapsed)` if the
/// deadline expires before the future completes.
///
/// # Panics
///
/// Panics if called outside a runtime context.
pub fn timeout<F: std::future::Future>(duration: Duration, future: F) -> crate::timer::Timeout<F> {
    crate::timer::Timeout::new(future, sleep(duration))
}

/// Create an interval that ticks at a fixed period.
///
/// The first tick completes after `period`. Subsequent ticks are
/// spaced `period` apart. If processing takes longer than `period`,
/// behavior is controlled by [`MissedTickBehavior`](crate::MissedTickBehavior).
///
/// # Panics
///
/// Panics if `period` is zero. Polling the interval (via `tick().await`)
/// requires an active runtime context and will panic otherwise.
pub fn interval(period: Duration) -> crate::timer::Interval {
    crate::timer::Interval::new(period)
}

/// Run a future no earlier than `deadline`.
///
/// Waits until `deadline`, then polls the future. Useful for
/// scheduling deferred work at a specific time.
///
/// Polling requires an active runtime context.
pub async fn after<F: std::future::Future>(deadline: Instant, future: F) -> F::Output {
    sleep_until(deadline).await;
    future.await
}

/// Run a future after `duration` elapses.
///
/// Waits for `duration`, then polls the future.
///
/// Polling requires an active runtime context.
pub async fn after_delay<F: std::future::Future>(duration: Duration, future: F) -> F::Output {
    sleep(duration).await;
    future.await
}

/// Wrap a future with an absolute deadline. Returns `Err(Elapsed)` if
/// the deadline passes before the future completes.
///
/// Like [`timeout`] but takes an [`Instant`] instead of a [`Duration`].
///
/// # Panics
///
/// Panics if called outside a runtime context.
pub fn timeout_at<F: std::future::Future>(
    deadline: Instant,
    future: F,
) -> crate::timer::Timeout<F> {
    crate::timer::Timeout::new(future, sleep_until(deadline))
}

/// Create an interval that starts ticking at `start`, then every `period`.
///
/// If `start` is in the past, the first tick fires immediately.
///
/// # Panics
///
/// Panics if `period` is zero. Polling the interval requires an active
/// runtime context.
pub fn interval_at(start: Instant, period: Duration) -> crate::timer::Interval {
    crate::timer::Interval::new_at(start, period)
}

/// Cooperatively yield the current task.
///
/// Returns `Pending` once, wakes itself, then completes on the next
/// poll. Other ready tasks get a turn before this task resumes.
pub fn yield_now() -> crate::timer::YieldNow {
    crate::timer::YieldNow(false)
}

/// Returns a future that resolves when shutdown is triggered.
pub fn shutdown_signal() -> crate::ShutdownSignal {
    let ptr = CTX_SHUTDOWN.with(Cell::get);
    assert!(
        !ptr.is_null(),
        "shutdown_signal() called outside Runtime::block_on"
    );
    let waker_ptr = CTX_SHUTDOWN_WAKER.with(Cell::get);
    // SAFETY: waker_ptr was set by install() and is valid for Runtime lifetime.
    let task_waker = unsafe { (*waker_ptr).clone() };
    crate::ShutdownSignal {
        flag: ptr,
        task_waker,
    }
}