use super::BOX_FUTURE_THRESHOLD;
use crate::runtime::blocking::BlockingPool;
use crate::runtime::scheduler::CurrentThread;
use crate::runtime::{context, EnterGuard, Handle};
use crate::task::JoinHandle;
use crate::util::trace::SpawnMeta;
use std::future::Future;
use std::mem;
use std::time::Duration;
cfg_rt_multi_thread! {
use crate::runtime::Builder;
use crate::runtime::scheduler::MultiThread;
}
#[derive(Debug)]
pub struct Runtime {
scheduler: Scheduler,
handle: Handle,
blocking_pool: BlockingPool,
}
#[derive(Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum RuntimeFlavor {
CurrentThread,
MultiThread,
}
#[derive(Debug)]
pub(super) enum Scheduler {
CurrentThread(CurrentThread),
#[cfg(feature = "rt-multi-thread")]
MultiThread(MultiThread),
}
impl Runtime {
pub(super) fn from_parts(
scheduler: Scheduler,
handle: Handle,
blocking_pool: BlockingPool,
) -> Runtime {
Runtime {
scheduler,
handle,
blocking_pool,
}
}
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new() -> std::io::Result<Runtime> {
Builder::new_multi_thread().enable_all().build()
}
pub fn handle(&self) -> &Handle {
&self.handle
}
#[track_caller]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.handle
.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
} else {
self.handle
.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
}
}
#[track_caller]
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.handle.spawn_blocking(func)
}
#[track_caller]
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
} else {
self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
}
}
#[track_caller]
fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
#[cfg(all(
tokio_unstable,
tokio_taskdump,
feature = "rt",
target_os = "linux",
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
))]
let future = super::task::trace::Trace::root(future);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(
future,
"block_on",
_meta,
crate::runtime::task::Id::next().as_u64(),
);
let _enter = self.enter();
match &self.scheduler {
Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
#[cfg(feature = "rt-multi-thread")]
Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
}
}
pub fn enter(&self) -> EnterGuard<'_> {
self.handle.enter()
}
pub fn shutdown_timeout(mut self, duration: Duration) {
self.handle.inner.shutdown();
self.blocking_pool.shutdown(Some(duration));
}
pub fn shutdown_background(self) {
self.shutdown_timeout(Duration::from_nanos(0));
}
pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
self.handle.metrics()
}
}
impl Drop for Runtime {
fn drop(&mut self) {
match &mut self.scheduler {
Scheduler::CurrentThread(current_thread) => {
let _guard = context::try_set_current(&self.handle.inner);
current_thread.shutdown(&self.handle.inner);
}
#[cfg(feature = "rt-multi-thread")]
Scheduler::MultiThread(multi_thread) => {
multi_thread.shutdown(&self.handle.inner);
}
}
}
}
impl std::panic::UnwindSafe for Runtime {}
impl std::panic::RefUnwindSafe for Runtime {}