use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::Arc;
use crate::runtime::driver::{self, Driver};
use crate::runtime::scheduler::{self, Defer, Inject};
use crate::runtime::task::{
self, JoinHandle, OwnedTasks, Schedule, SpawnLocation, Task, TaskHarnessScheduleHooks,
};
use crate::runtime::{
blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
};
use crate::sync::notify::Notify;
use crate::util::atomic_cell::AtomicCell;
use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::future::{poll_fn, Future};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::Poll::{Pending, Ready};
use std::task::Waker;
use std::thread::ThreadId;
use std::time::Duration;
use std::{fmt, thread};
pub(crate) struct CurrentThread {
core: AtomicCell<Core>,
notify: Notify,
}
pub(crate) struct Handle {
name: Option<String>,
shared: Shared,
pub(crate) driver: driver::Handle,
pub(crate) blocking_spawner: blocking::Spawner,
pub(crate) seed_generator: RngSeedGenerator,
pub(crate) task_hooks: TaskHooks,
pub(crate) local_tid: Option<ThreadId>,
}
struct Core {
tasks: VecDeque<Notified>,
tick: u32,
driver: Option<Driver>,
metrics: MetricsBatch,
global_queue_interval: u32,
unhandled_panic: bool,
}
struct Shared {
inject: Inject<Arc<Handle>>,
owned: OwnedTasks<Arc<Handle>>,
woken: AtomicBool,
config: Config,
scheduler_metrics: SchedulerMetrics,
worker_metrics: WorkerMetrics,
}
pub(crate) struct Context {
handle: Arc<Handle>,
core: RefCell<Option<Box<Core>>>,
pub(crate) defer: Defer,
}
type Notified = task::Notified<Arc<Handle>>;
const INITIAL_CAPACITY: usize = 64;
const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
impl CurrentThread {
pub(crate) fn new(
driver: Driver,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
local_tid: Option<ThreadId>,
name: Option<String>,
) -> (CurrentThread, Arc<Handle>) {
let worker_metrics = WorkerMetrics::from_config(&config);
worker_metrics.set_thread_id(thread::current().id());
let global_queue_interval = config
.global_queue_interval
.unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
let handle = Arc::new(Handle {
name,
task_hooks: TaskHooks {
task_spawn_callback: config.before_spawn.clone(),
task_terminate_callback: config.after_termination.clone(),
#[cfg(tokio_unstable)]
before_poll_callback: config.before_poll.clone(),
#[cfg(tokio_unstable)]
after_poll_callback: config.after_poll.clone(),
},
shared: Shared {
inject: Inject::new(),
owned: OwnedTasks::new(1),
woken: AtomicBool::new(false),
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics,
},
driver: driver_handle,
blocking_spawner,
seed_generator,
local_tid,
});
let core = AtomicCell::new(Some(Box::new(Core {
tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
tick: 0,
driver: Some(driver),
metrics: MetricsBatch::new(&handle.shared.worker_metrics),
global_queue_interval,
unhandled_panic: false,
})));
let scheduler = CurrentThread {
core,
notify: Notify::new(),
};
(scheduler, handle)
}
#[track_caller]
pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
pin!(future);
crate::runtime::context::enter_runtime(handle, false, |blocking| {
let handle = handle.as_current_thread();
loop {
if let Some(core) = self.take_core(handle) {
handle
.shared
.worker_metrics
.set_thread_id(thread::current().id());
return core.block_on(future);
} else {
let notified = self.notify.notified();
pin!(notified);
if let Some(out) = blocking
.block_on(poll_fn(|cx| {
if notified.as_mut().poll(cx).is_ready() {
return Ready(None);
}
if let Ready(out) = future.as_mut().poll(cx) {
return Ready(Some(out));
}
Pending
}))
.expect("Failed to `Enter::block_on`")
{
return out;
}
}
}
})
}
fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
let core = self.core.take()?;
Some(CoreGuard {
context: scheduler::Context::CurrentThread(Context {
handle: handle.clone(),
core: RefCell::new(Some(core)),
defer: Defer::new(),
}),
scheduler: self,
})
}
pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
let handle = handle.as_current_thread();
let core = match self.take_core(handle) {
Some(core) => core,
None if std::thread::panicking() => return,
None => panic!("Oh no! We never placed the Core back, this is a bug!"),
};
let tls_available = context::with_current(|_| ()).is_ok();
if tls_available {
core.enter(|core, _context| {
let core = shutdown2(core, handle);
(core, ())
});
} else {
let context = core.context.expect_current_thread();
let core = context.core.borrow_mut().take().unwrap();
let core = shutdown2(core, handle);
*context.core.borrow_mut() = Some(core);
}
}
}
fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
handle.shared.owned.close_and_shutdown_all(0);
while let Some(task) = core.next_local_task(handle) {
drop(task);
}
handle.shared.inject.close();
while let Some(task) = handle.shared.inject.pop() {
drop(task);
}
assert!(handle.shared.owned.is_empty());
core.submit_metrics(handle);
if let Some(driver) = core.driver.as_mut() {
driver.shutdown(&handle.driver);
}
core
}
impl fmt::Debug for CurrentThread {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("CurrentThread").finish()
}
}
impl Core {
fn tick(&mut self) {
self.tick = self.tick.wrapping_add(1);
}
fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
if self.tick % self.global_queue_interval == 0 {
handle
.next_remote_task()
.or_else(|| self.next_local_task(handle))
} else {
self.next_local_task(handle)
.or_else(|| handle.next_remote_task())
}
}
fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
let ret = self.tasks.pop_front();
handle
.shared
.worker_metrics
.set_queue_depth(self.tasks.len());
ret
}
fn push_task(&mut self, handle: &Handle, task: Notified) {
self.tasks.push_back(task);
self.metrics.inc_local_schedule_count();
handle
.shared
.worker_metrics
.set_queue_depth(self.tasks.len());
}
fn submit_metrics(&mut self, handle: &Handle) {
self.metrics.submit(&handle.shared.worker_metrics, 0);
}
}
#[cfg(feature = "taskdump")]
fn wake_deferred_tasks_and_free(context: &Context) {
let wakers = context.defer.take_deferred();
for waker in wakers {
waker.wake();
}
}
impl Context {
fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
core.metrics.start_poll();
let mut ret = self.enter(core, || crate::task::coop::budget(f));
ret.0.metrics.end_poll();
ret
}
fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
let mut driver = core.driver.take().expect("driver missing");
if let Some(f) = &handle.shared.config.before_park {
let (c, ()) = self.enter(core, || f());
core = c;
}
if !self.has_pending_work(&core) {
core.metrics.about_to_park();
core.submit_metrics(handle);
core = self.park_internal(core, handle, &mut driver, None);
core.metrics.unparked();
core.submit_metrics(handle);
}
if let Some(f) = &handle.shared.config.after_unpark {
let (c, ()) = self.enter(core, || f());
core = c;
}
core.driver = Some(driver);
core
}
fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
let mut driver = core.driver.take().expect("driver missing");
core.submit_metrics(handle);
core = self.park_internal(core, handle, &mut driver, Some(Duration::from_millis(0)));
core.driver = Some(driver);
core
}
fn has_pending_work(&self, core: &Core) -> bool {
!core.tasks.is_empty() || !self.defer.is_empty() || self.handle.shared.woken.load(Acquire)
}
fn park_internal(
&self,
core: Box<Core>,
handle: &Handle,
driver: &mut Driver,
duration: Option<Duration>,
) -> Box<Core> {
let (core, ()) = self.enter(core, || {
match duration {
Some(dur) => driver.park_timeout(&handle.driver, dur),
None => driver.park(&handle.driver),
}
self.defer.wake();
});
core
}
fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
*self.core.borrow_mut() = Some(core);
let ret = f();
let core = self.core.borrow_mut().take().expect("core missing");
(core, ret)
}
pub(crate) fn defer(&self, waker: &Waker) {
self.defer.defer(waker);
}
}
impl Handle {
#[track_caller]
pub(crate) fn spawn<F>(
me: &Arc<Self>,
future: F,
id: crate::runtime::task::Id,
spawned_at: SpawnLocation,
) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);
me.task_hooks.spawn(&TaskMeta {
id,
spawned_at,
_phantom: Default::default(),
});
if let Some(notified) = notified {
me.schedule(notified);
}
handle
}
#[track_caller]
pub(crate) unsafe fn spawn_local<F>(
me: &Arc<Self>,
future: F,
id: crate::runtime::task::Id,
spawned_at: SpawnLocation,
) -> JoinHandle<F::Output>
where
F: crate::future::Future + 'static,
F::Output: 'static,
{
let (handle, notified) = unsafe {
me.shared
.owned
.bind_local(future, me.clone(), id, spawned_at)
};
me.task_hooks.spawn(&TaskMeta {
id,
spawned_at,
_phantom: Default::default(),
});
if let Some(notified) = notified {
me.schedule(notified);
}
handle
}
#[cfg(all(
tokio_unstable,
feature = "taskdump",
target_os = "linux",
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
))]
pub(crate) fn dump(&self) -> crate::runtime::Dump {
use crate::runtime::dump;
use task::trace::trace_current_thread;
let mut traces = vec![];
context::with_scheduler(|maybe_context| {
let context = if let Some(context) = maybe_context {
context.expect_current_thread()
} else {
return;
};
let mut maybe_core = context.core.borrow_mut();
let core = if let Some(core) = maybe_core.as_mut() {
core
} else {
return;
};
let local = &mut core.tasks;
if self.shared.inject.is_closed() {
return;
}
traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
.into_iter()
.map(|(id, trace)| dump::Task::new(id, trace))
.collect();
drop(maybe_core);
wake_deferred_tasks_and_free(context);
});
dump::Dump::new(traces)
}
fn next_remote_task(&self) -> Option<Notified> {
self.shared.inject.pop()
}
fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
me.shared.woken.store(true, Release);
waker_ref(me)
}
pub(crate) fn reset_woken(&self) -> bool {
self.shared.woken.swap(false, AcqRel)
}
pub(crate) fn num_alive_tasks(&self) -> usize {
self.shared.owned.num_alive_tasks()
}
pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.inject.len()
}
pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics
}
}
cfg_unstable_metrics! {
impl Handle {
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.worker_metrics(worker).queue_depth()
}
pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}
pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}
pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
cfg_64bit_metrics! {
pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}
}
}
}
use std::num::NonZeroU64;
impl Handle {
pub(crate) fn owned_id(&self) -> NonZeroU64 {
self.shared.owned.id
}
pub(crate) fn name(&self) -> Option<&str> {
self.name.as_deref()
}
}
impl fmt::Debug for Handle {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("current_thread::Handle { ... }").finish()
}
}
impl Schedule for Arc<Handle> {
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
self.shared.owned.remove(task)
}
fn schedule(&self, task: task::Notified<Self>) {
use scheduler::Context::CurrentThread;
context::with_scheduler(|maybe_cx| match maybe_cx {
Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
let mut core = cx.core.borrow_mut();
if let Some(core) = core.as_mut() {
core.push_task(self, task);
}
}
_ => {
self.shared.scheduler_metrics.inc_remote_schedule_count();
self.shared.inject.push(task);
self.driver.unpark();
}
});
}
fn hooks(&self) -> TaskHarnessScheduleHooks {
TaskHarnessScheduleHooks {
task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
}
}
cfg_unstable! {
fn unhandled_panic(&self) {
use crate::runtime::UnhandledPanic;
match self.shared.config.unhandled_panic {
UnhandledPanic::Ignore => {
}
UnhandledPanic::ShutdownRuntime => {
use scheduler::Context::CurrentThread;
context::with_scheduler(|maybe_cx| match maybe_cx {
Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
let mut core = cx.core.borrow_mut();
if let Some(core) = core.as_mut() {
core.unhandled_panic = true;
self.shared.owned.close_and_shutdown_all(0);
}
}
_ => unreachable!("runtime core not set in CURRENT thread-local"),
})
}
}
}
}
}
impl Wake for Handle {
fn wake(arc_self: Arc<Self>) {
Wake::wake_by_ref(&arc_self);
}
fn wake_by_ref(arc_self: &Arc<Self>) {
let already_woken = arc_self.shared.woken.swap(true, Release);
if !already_woken {
use scheduler::Context::CurrentThread;
context::with_scheduler(|maybe_cx| match maybe_cx {
Some(CurrentThread(cx)) if Arc::ptr_eq(arc_self, &cx.handle) => {}
_ => {
arc_self.driver.unpark();
}
});
}
}
}
struct CoreGuard<'a> {
context: scheduler::Context,
scheduler: &'a CurrentThread,
}
impl CoreGuard<'_> {
#[track_caller]
fn block_on<F: Future>(self, future: F) -> F::Output {
let ret = self.enter(|mut core, context| {
let waker = Handle::waker_ref(&context.handle);
let mut cx = std::task::Context::from_waker(&waker);
pin!(future);
core.metrics.start_processing_scheduled_tasks();
'outer: loop {
let handle = &context.handle;
if handle.reset_woken() {
let (c, res) = context.enter(core, || {
crate::task::coop::budget(|| future.as_mut().poll(&mut cx))
});
core = c;
if let Ready(v) = res {
return (core, Some(v));
}
}
for _ in 0..handle.shared.config.event_interval {
if core.unhandled_panic {
return (core, None);
}
core.tick();
let entry = core.next_task(handle);
let task = match entry {
Some(entry) => entry,
None => {
core.metrics.end_processing_scheduled_tasks();
core = if context.has_pending_work(&core) {
context.park_yield(core, handle)
} else {
context.park(core, handle)
};
core.metrics.start_processing_scheduled_tasks();
continue 'outer;
}
};
let task = context.handle.shared.owned.assert_owner(task);
#[cfg(tokio_unstable)]
let task_meta = task.task_meta();
let (c, ()) = context.run_task(core, || {
#[cfg(tokio_unstable)]
context.handle.task_hooks.poll_start_callback(&task_meta);
task.run();
#[cfg(tokio_unstable)]
context.handle.task_hooks.poll_stop_callback(&task_meta);
});
core = c;
}
core.metrics.end_processing_scheduled_tasks();
core = context.park_yield(core, handle);
core.metrics.start_processing_scheduled_tasks();
}
});
match ret {
Some(ret) => ret,
None => {
panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
}
}
}
fn enter<F, R>(self, f: F) -> R
where
F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
{
let context = self.context.expect_current_thread();
let core = context.core.borrow_mut().take().expect("core missing");
let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
*context.core.borrow_mut() = Some(core);
ret
}
}
impl Drop for CoreGuard<'_> {
fn drop(&mut self) {
let context = self.context.expect_current_thread();
if let Some(core) = context.core.borrow_mut().take() {
self.scheduler.core.set(core);
self.scheduler.notify.notify_one();
}
}
}