nio 0.1.4

Async runtime for Rust
Documentation
use super::*;
use task::*;

pub struct RuntimeContext {
    pub(crate) workers: Workers,
    pub(crate) threadpool: ThreadPool<BlockingTask>,

    #[cfg(feature = "metrics")]
    pub(crate) measurement: Box<dyn metrics::Measurement>,
}

impl RuntimeContext {
    pub(crate) fn with<F, R>(f: F) -> R
    where
        F: FnOnce(&Arc<RuntimeContext>) -> R,
    {
        NioContext::get(|ctx| match ctx {
            NioContext::None => no_rt_found_panic(),
            NioContext::Runtime(ctx) => f(ctx),
            NioContext::Local(ctx) => f(&ctx.runtime_ctx),
        })
    }

    pub fn current() -> Arc<RuntimeContext> {
        RuntimeContext::with(Arc::clone)
    }

    pub fn metrics(self: Arc<Self>) -> metrics::RuntimeMetrics {
        metrics::RuntimeMetrics { ctx: self }
    }

    pub fn enter(self: Arc<Self>) {
        NioContext::enter(self);
    }

    pub fn spawn_blocking<F, R>(&self, f: F) -> JoinHandle<R>
    where
        F: FnOnce() -> R + Send + 'static,
        R: Send + 'static,
    {
        let (task, join) = BlockingTask::spawn(f);
        self.threadpool.execute(task);
        join
    }

    pub fn spawn<F>(self: &Arc<Self>, future: F) -> JoinHandle<F::Output>
    where
        F: Future + Send + 'static,
        F::Output: Send + 'static,
    {
        let (task, join) = Scheduler::spawn(self.clone(), future);
        self.send_task_at(self.workers.least_loaded_worker(), task);
        join
    }

    pub fn spawn_pinned_at<F, Fut>(self: &Arc<Self>, id: u8, future: F) -> JoinHandle<Fut::Output>
    where
        F: FnOnce() -> Fut + Send,
        Fut: Future + 'static,
        Fut::Output: Send + 'static,
    {
        let id = self.workers.id(id);
        let (task, join) = unsafe { LocalScheduler::spawn(id, self.clone(), future()) };
        self.send_task_at(id, task);
        join
    }

    pub fn spawn_pinned<F, Fut>(self: &Arc<Self>, future: F) -> JoinHandle<Fut::Output>
    where
        F: FnOnce() -> Fut + Send,
        Fut: Future + 'static,
        Fut::Output: Send + 'static,
    {
        let id = self.workers.least_loaded_worker();
        let (task, join) = unsafe { LocalScheduler::spawn(id, self.clone(), future()) };
        self.send_task_at(id, task);
        join
    }

    pub(crate) fn send_task_to_least_loaded_worker(&self, task: Task) {
        self.send_task_at(self.workers.least_loaded_worker(), task);
    }

    pub(crate) fn send_task_at(&self, id: WorkerId, task: Task) {
        self.workers.shared_queue(id).push(task);

        let task_queue = self.workers.task_queue(id);
        let state = task_queue.increase_shared_and_mark_as_notified();
        if !state.is_notified() {
            #[cfg(feature = "metrics")]
            self.measurement.queue_notified(id.get());

            if let Err(_err) = self.workers.notifier(id).wake() {
                task_queue.clear_notified_flag();
                #[cfg(debug_assertions)]
                eprintln!("notifier error: {_err}");
            }
        }
    }
}