use crate::loom::sync::{Arc, Mutex};
use crate::runtime;
use crate::runtime::scheduler::multi_thread::{
idle, park, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
};
use crate::runtime::scheduler::{inject, Defer, Lock};
use crate::runtime::task::OwnedTasks;
use crate::runtime::{
blocking, driver, scheduler, task, Config, SchedulerMetrics, TimerFlavor, WorkerMetrics,
};
use crate::runtime::{context, TaskHooks};
use crate::task::coop;
use crate::util::atomic_cell::AtomicCell;
use crate::util::rand::{FastRand, RngSeedGenerator};
use std::cell::RefCell;
use std::task::Waker;
use std::thread;
use std::time::Duration;
mod metrics;
cfg_taskdump! {
mod taskdump;
}
cfg_not_taskdump! {
mod taskdump_mock;
}
#[cfg(all(tokio_unstable, feature = "time"))]
use crate::loom::sync::atomic::AtomicBool;
#[cfg(all(tokio_unstable, feature = "time"))]
use crate::runtime::time_alt;
#[cfg(all(tokio_unstable, feature = "time"))]
use crate::runtime::scheduler::util;
pub(super) struct Worker {
handle: Arc<Handle>,
index: usize,
core: AtomicCell<Core>,
}
struct Core {
tick: u32,
lifo_enabled: bool,
run_queue: queue::Local<Arc<Handle>>,
#[cfg(all(tokio_unstable, feature = "time"))]
time_context: time_alt::LocalContext,
is_searching: bool,
is_shutdown: bool,
is_traced: bool,
had_driver: park::HadDriver,
enable_eager_driver_handoff: bool,
park: Option<Parker>,
stats: Stats,
global_queue_interval: u32,
rand: FastRand,
}
pub(crate) struct Shared {
remotes: Box<[Remote]>,
pub(super) inject: inject::Shared<Arc<Handle>>,
idle: Idle,
pub(crate) owned: OwnedTasks<Arc<Handle>>,
pub(super) synced: Mutex<Synced>,
#[allow(clippy::vec_box)] shutdown_cores: Mutex<Vec<Box<Core>>>,
pub(super) trace_status: TraceStatus,
config: Config,
pub(super) scheduler_metrics: SchedulerMetrics,
pub(super) worker_metrics: Box<[WorkerMetrics]>,
_counters: Counters,
}
pub(crate) struct Synced {
pub(super) idle: idle::Synced,
pub(crate) inject: inject::Synced,
#[cfg(all(tokio_unstable, feature = "time"))]
inject_timers: Vec<time_alt::EntryHandle>,
}
struct Remote {
pub(super) steal: queue::Steal<Arc<Handle>>,
unpark: Unparker,
}
pub(crate) struct Context {
worker: Arc<Worker>,
core: RefCell<Option<Box<Core>>>,
pub(crate) defer: Defer,
}
pub(crate) struct Launch(Vec<Arc<Worker>>);
type RunResult = Result<Box<Core>, ()>;
type Notified = task::Notified<Arc<Handle>>;
const MAX_LIFO_POLLS_PER_TICK: usize = 3;
#[allow(clippy::too_many_arguments)]
pub(super) fn create(
size: usize,
park: Parker,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
timer_flavor: TimerFlavor,
name: Option<String>,
) -> (Arc<Handle>, Launch) {
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
let mut worker_metrics = Vec::with_capacity(size);
for _ in 0..size {
let (steal, run_queue) = queue::local();
let park = park.clone();
let unpark = park.unpark();
let metrics = WorkerMetrics::from_config(&config);
let stats = Stats::new(&metrics);
cores.push(Box::new(Core {
tick: 0,
lifo_enabled: !config.disable_lifo_slot,
run_queue,
#[cfg(all(tokio_unstable, feature = "time"))]
time_context: time_alt::LocalContext::new(),
is_searching: false,
is_shutdown: false,
is_traced: false,
enable_eager_driver_handoff: config.enable_eager_driver_handoff,
had_driver: park::HadDriver::No,
park: Some(park),
global_queue_interval: stats.tuned_global_queue_interval(&config),
stats,
rand: FastRand::from_seed(config.seed_generator.next_seed()),
}));
remotes.push(Remote { steal, unpark });
worker_metrics.push(metrics);
}
let (idle, idle_synced) = Idle::new(size);
let (inject, inject_synced) = inject::Shared::new();
let remotes_len = remotes.len();
let handle = Arc::new(Handle {
name,
task_hooks: TaskHooks::from_config(&config),
shared: Shared {
remotes: remotes.into_boxed_slice(),
inject,
idle,
owned: OwnedTasks::new(size),
synced: Mutex::new(Synced {
idle: idle_synced,
inject: inject_synced,
#[cfg(all(tokio_unstable, feature = "time"))]
inject_timers: Vec::new(),
}),
shutdown_cores: Mutex::new(vec![]),
trace_status: TraceStatus::new(remotes_len),
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: worker_metrics.into_boxed_slice(),
_counters: Counters,
},
driver: driver_handle,
blocking_spawner,
seed_generator,
timer_flavor,
#[cfg(all(tokio_unstable, feature = "time"))]
is_shutdown: AtomicBool::new(false),
});
let mut launch = Launch(vec![]);
for (index, core) in cores.drain(..).enumerate() {
launch.0.push(Arc::new(Worker {
handle: handle.clone(),
index,
core: AtomicCell::new(Some(core)),
}));
}
(handle, launch)
}
#[track_caller]
pub(crate) fn block_in_place<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
struct Reset {
take_core: bool,
budget: coop::Budget,
}
impl Drop for Reset {
fn drop(&mut self) {
with_current(|maybe_cx| {
if let Some(cx) = maybe_cx {
if self.take_core {
let core = cx.worker.core.take();
if core.is_some() {
cx.worker.handle.shared.worker_metrics[cx.worker.index]
.set_thread_id(thread::current().id());
}
let mut cx_core = cx.core.borrow_mut();
assert!(cx_core.is_none());
*cx_core = core;
}
coop::set(self.budget);
}
});
}
}
let mut had_entered = false;
let mut take_core = false;
let setup_result = with_current(|maybe_cx| {
match (
crate::runtime::context::current_enter_context(),
maybe_cx.is_some(),
) {
(context::EnterRuntime::Entered { .. }, true) => {
had_entered = true;
}
(
context::EnterRuntime::Entered {
allow_block_in_place,
},
false,
) => {
if allow_block_in_place {
had_entered = true;
return Ok(());
} else {
return Err(
"can call blocking only when running on the multi-threaded runtime",
);
}
}
(context::EnterRuntime::NotEntered, true) => {
return Ok(());
}
(context::EnterRuntime::NotEntered, false) => {
return Ok(());
}
}
let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
cx.defer.wake();
let mut core = match cx.core.borrow_mut().take() {
Some(core) => core,
None => return Ok(()),
};
if let Some(task) = core.run_queue.pop_lifo() {
core.run_queue
.push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats);
}
take_core = true;
assert!(core.park.is_some());
cx.worker.core.set(core);
let worker = cx.worker.clone();
runtime::spawn_blocking(move || run(worker));
Ok(())
});
if let Err(panic_message) = setup_result {
panic!("{}", panic_message);
}
if had_entered {
let _reset = Reset {
take_core,
budget: coop::stop(),
};
crate::runtime::context::exit_runtime(f)
} else {
f()
}
}
impl Launch {
pub(crate) fn launch(mut self) {
for worker in self.0.drain(..) {
runtime::spawn_blocking(move || run(worker));
}
}
}
fn run(worker: Arc<Worker>) {
#[allow(dead_code)]
struct AbortOnPanic;
impl Drop for AbortOnPanic {
fn drop(&mut self) {
if std::thread::panicking() {
eprintln!("worker thread panicking; aborting process");
std::process::abort();
}
}
}
#[cfg(debug_assertions)]
let _abort_on_panic = AbortOnPanic;
let core = match worker.core.take() {
Some(core) => core,
None => return,
};
worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id());
let handle = scheduler::Handle::MultiThread(worker.handle.clone());
crate::runtime::context::enter_runtime(&handle, true, |_| {
let cx = scheduler::Context::MultiThread(Context {
worker,
core: RefCell::new(None),
defer: Defer::new(),
});
context::set_scheduler(&cx, || {
let cx = cx.expect_multi_thread();
assert!(cx.run(core).is_err());
cx.defer.wake();
});
});
}
impl Context {
fn run(&self, mut core: Box<Core>) -> RunResult {
self.reset_lifo_enabled(&mut core);
core.stats.start_processing_scheduled_tasks();
while !core.is_shutdown {
self.assert_lifo_enabled_is_correct(&core);
if core.is_traced {
core = self.worker.handle.trace_core(core);
}
core.tick();
core = self.maintenance(core);
if let Some(task) = core.next_task(&self.worker) {
core = self.run_task(task, core)?;
continue;
}
core.stats.end_processing_scheduled_tasks();
if let Some(task) = core.steal_work(&self.worker) {
core.stats.start_processing_scheduled_tasks();
core = self.run_task(task, core)?;
} else {
core = if !self.defer.is_empty() {
self.park_yield(core)
} else {
self.park(core)
};
core.stats.start_processing_scheduled_tasks();
}
}
#[cfg(all(tokio_unstable, feature = "time"))]
{
match self.worker.handle.timer_flavor {
TimerFlavor::Traditional => {}
TimerFlavor::Alternative => {
util::time_alt::shutdown_local_timers(
&mut core.time_context.wheel,
&mut core.time_context.canc_rx,
self.worker.handle.take_remote_timers(),
&self.worker.handle.driver,
);
}
}
}
core.pre_shutdown(&self.worker);
self.worker.handle.shutdown_core(core);
Err(())
}
fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
#[cfg(tokio_unstable)]
let task_meta = task.task_meta();
let task = self.worker.handle.shared.owned.assert_owner(task);
let notified_parked_worker = core.transition_from_searching(&self.worker);
if cfg!(tokio_unstable)
&& core.enable_eager_driver_handoff
&& core.had_driver == park::HadDriver::Yes
&& !notified_parked_worker
{
core.had_driver = park::HadDriver::No;
self.worker.handle.notify_parked_local();
}
self.assert_lifo_enabled_is_correct(&core);
core.stats.start_poll();
*self.core.borrow_mut() = Some(core);
coop::budget(|| {
#[cfg(tokio_unstable)]
self.worker
.handle
.task_hooks
.poll_start_callback(&task_meta);
task.run();
#[cfg(tokio_unstable)]
self.worker.handle.task_hooks.poll_stop_callback(&task_meta);
let mut lifo_polls = 0;
loop {
let mut core = match self.core.borrow_mut().take() {
Some(core) => core,
None => {
return Err(());
}
};
let task = match core.run_queue.pop_lifo() {
Some(task) => task,
None => {
self.reset_lifo_enabled(&mut core);
core.stats.end_poll();
return Ok(core);
}
};
if !coop::has_budget_remaining() {
core.stats.end_poll();
core.run_queue.push_back_or_overflow(
task,
&*self.worker.handle,
&mut core.stats,
);
debug_assert!(core.lifo_enabled);
return Ok(core);
}
lifo_polls += 1;
super::counters::inc_lifo_schedules();
if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
core.lifo_enabled = false;
super::counters::inc_lifo_capped();
}
*self.core.borrow_mut() = Some(core);
let task = self.worker.handle.shared.owned.assert_owner(task);
#[cfg(tokio_unstable)]
let task_meta = task.task_meta();
#[cfg(tokio_unstable)]
self.worker
.handle
.task_hooks
.poll_start_callback(&task_meta);
task.run();
#[cfg(tokio_unstable)]
self.worker.handle.task_hooks.poll_stop_callback(&task_meta);
}
})
}
fn reset_lifo_enabled(&self, core: &mut Core) {
core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
}
fn assert_lifo_enabled_is_correct(&self, core: &Core) {
debug_assert_eq!(
core.lifo_enabled,
!self.worker.handle.shared.config.disable_lifo_slot
);
}
fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
if core.tick % self.worker.handle.shared.config.event_interval == 0 {
super::counters::inc_num_maintenance();
core.stats.end_processing_scheduled_tasks();
core = self.park_yield(core);
core.maintenance(&self.worker);
core.stats.start_processing_scheduled_tasks();
}
core
}
fn park(&self, mut core: Box<Core>) -> Box<Core> {
if let Some(f) = &self.worker.handle.shared.config.before_park {
f();
}
if core.transition_to_parked(&self.worker) {
while !core.is_shutdown && !core.is_traced {
core.stats.about_to_park();
core.stats
.submit(&self.worker.handle.shared.worker_metrics[self.worker.index]);
core = self.park_internal(core, None);
core.stats.unparked();
core.maintenance(&self.worker);
if core.transition_from_parked(&self.worker) {
break;
}
}
}
if let Some(f) = &self.worker.handle.shared.config.after_unpark {
f();
}
core
}
fn park_yield(&self, core: Box<Core>) -> Box<Core> {
self.park_internal(core, Some(Duration::from_millis(0)))
}
fn park_internal(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
self.assert_lifo_enabled_is_correct(&core);
let mut park = core.park.take().expect("park missing");
*self.core.borrow_mut() = Some(core);
#[cfg(feature = "time")]
let (duration, auto_advance_duration) = match self.worker.handle.timer_flavor {
TimerFlavor::Traditional => (duration, None::<Duration>),
#[cfg(tokio_unstable)]
TimerFlavor::Alternative => {
let MaintainLocalTimer {
park_duration: duration,
auto_advance_duration,
} = self.maintain_local_timers_before_parking(duration);
(duration, auto_advance_duration)
}
};
let had_driver = if let Some(timeout) = duration {
park.park_timeout(&self.worker.handle.driver, timeout)
} else {
park.park(&self.worker.handle.driver)
};
self.defer.wake();
#[cfg(feature = "time")]
match self.worker.handle.timer_flavor {
TimerFlavor::Traditional => {
let _ = auto_advance_duration;
}
#[cfg(tokio_unstable)]
TimerFlavor::Alternative => {
self.maintain_local_timers_after_parking(auto_advance_duration);
}
}
core = self.core.borrow_mut().take().expect("core missing");
core.park = Some(park);
core.had_driver = had_driver;
if core.should_notify_others() {
self.worker.handle.notify_parked_local();
}
core
}
pub(crate) fn defer(&self, waker: &Waker) {
if self.core.borrow().is_none() {
waker.wake_by_ref();
} else {
self.defer.defer(waker);
}
}
#[cfg(all(tokio_unstable, feature = "time"))]
fn maintain_local_timers_before_parking(
&self,
park_duration: Option<Duration>,
) -> MaintainLocalTimer {
let handle = &self.worker.handle;
let mut wake_queue = time_alt::WakeQueue::new();
let (should_yield, next_timer) = with_current(|maybe_cx| {
let cx = maybe_cx.expect("function should be called when core is present");
assert_eq!(
Arc::as_ptr(&cx.worker.handle),
Arc::as_ptr(&self.worker.handle),
"function should be called on the exact same worker"
);
let mut maybe_core = cx.core.borrow_mut();
let core = maybe_core.as_mut().expect("core missing");
let time_cx = &mut core.time_context;
util::time_alt::process_registration_queue(
&mut time_cx.registration_queue,
&mut time_cx.wheel,
&time_cx.canc_tx,
&mut wake_queue,
);
util::time_alt::insert_inject_timers(
&mut time_cx.wheel,
&time_cx.canc_tx,
handle.take_remote_timers(),
&mut wake_queue,
);
util::time_alt::remove_cancelled_timers(&mut time_cx.wheel, &mut time_cx.canc_rx);
let should_yield = !wake_queue.is_empty();
let next_timer = util::time_alt::next_expiration_time(&time_cx.wheel, &handle.driver);
(should_yield, next_timer)
});
wake_queue.wake_all();
if should_yield {
MaintainLocalTimer {
park_duration: Some(Duration::from_millis(0)),
auto_advance_duration: None,
}
} else {
let dur = util::time_alt::min_duration(park_duration, next_timer);
if util::time_alt::pre_auto_advance(&handle.driver, dur) {
MaintainLocalTimer {
park_duration: Some(Duration::ZERO),
auto_advance_duration: dur,
}
} else {
MaintainLocalTimer {
park_duration: dur,
auto_advance_duration: None,
}
}
}
}
#[cfg(all(tokio_unstable, feature = "time"))]
fn maintain_local_timers_after_parking(&self, auto_advance_duration: Option<Duration>) {
let handle = &self.worker.handle;
let mut wake_queue = time_alt::WakeQueue::new();
with_current(|maybe_cx| {
let cx = maybe_cx.expect("function should be called when core is present");
assert_eq!(
Arc::as_ptr(&cx.worker.handle),
Arc::as_ptr(&self.worker.handle),
"function should be called on the exact same worker"
);
let mut maybe_core = cx.core.borrow_mut();
let core = maybe_core.as_mut().expect("core missing");
let time_cx = &mut core.time_context;
util::time_alt::post_auto_advance(&handle.driver, auto_advance_duration);
util::time_alt::process_expired_timers(
&mut time_cx.wheel,
&handle.driver,
&mut wake_queue,
);
});
wake_queue.wake_all();
}
#[cfg(all(tokio_unstable, feature = "time"))]
fn with_core<F, R>(&self, f: F) -> R
where
F: FnOnce(Option<&mut Core>) -> R,
{
match self.core.borrow_mut().as_mut() {
Some(core) => f(Some(core)),
None => f(None),
}
}
#[cfg(all(tokio_unstable, feature = "time"))]
pub(crate) fn with_time_temp_local_context<F, R>(&self, f: F) -> R
where
F: FnOnce(Option<time_alt::TempLocalContext<'_>>) -> R,
{
self.with_core(|maybe_core| match maybe_core {
Some(core) if core.is_shutdown => f(Some(time_alt::TempLocalContext::new_shutdown())),
Some(core) => f(Some(time_alt::TempLocalContext::new_running(
&mut core.time_context,
))),
None => f(None),
})
}
#[cfg(tokio_unstable)]
pub(crate) fn worker_index(&self) -> usize {
self.worker.index
}
}
impl Core {
fn tick(&mut self) {
self.tick = self.tick.wrapping_add(1);
}
fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
if self.tick % self.global_queue_interval == 0 {
self.tune_global_queue_interval(worker);
worker
.handle
.next_remote_task()
.or_else(|| self.next_local_task())
} else {
let maybe_task = self.next_local_task();
if maybe_task.is_some() {
return maybe_task;
}
if worker.inject().is_empty() {
return None;
}
let cap = usize::min(
self.run_queue.remaining_slots(),
self.run_queue.max_capacity() / 2,
);
let n = usize::min(
worker.inject().len() / worker.handle.shared.remotes.len() + 1,
cap,
);
let n = usize::max(1, n);
let mut synced = worker.handle.shared.synced.lock();
let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) };
let ret = tasks.next();
self.run_queue.push_back(tasks);
ret
}
}
fn next_local_task(&mut self) -> Option<Notified> {
self.run_queue.pop_lifo().or_else(|| self.run_queue.pop())
}
fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
if !self.transition_to_searching(worker) {
return None;
}
let num = worker.handle.shared.remotes.len();
let start = self.rand.fastrand_n(num as u32) as usize;
for i in 0..num {
let i = (start + i) % num;
if i == worker.index {
continue;
}
let target = &worker.handle.shared.remotes[i];
if let Some(task) = target
.steal
.steal_into(&mut self.run_queue, &mut self.stats)
{
return Some(task);
}
}
worker.handle.next_remote_task()
}
fn transition_to_searching(&mut self, worker: &Worker) -> bool {
if !self.is_searching {
self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
}
self.is_searching
}
fn transition_from_searching(&mut self, worker: &Worker) -> bool {
if !self.is_searching {
return false;
}
self.is_searching = false;
worker.handle.transition_worker_from_searching()
}
fn has_tasks(&self) -> bool {
self.run_queue.has_tasks()
}
fn should_notify_others(&self) -> bool {
if self.is_searching {
return false;
}
self.run_queue.len() > 1
}
fn transition_to_parked(&mut self, worker: &Worker) -> bool {
if self.has_tasks() || self.is_traced {
return false;
}
let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
&worker.handle.shared,
worker.index,
self.is_searching,
);
self.is_searching = false;
if is_last_searcher {
worker.handle.notify_if_work_pending();
}
true
}
fn transition_from_parked(&mut self, worker: &Worker) -> bool {
if self.has_tasks() {
self.is_searching = !worker
.handle
.shared
.idle
.unpark_worker_by_id(&worker.handle.shared, worker.index);
return true;
}
if worker
.handle
.shared
.idle
.is_parked(&worker.handle.shared, worker.index)
{
return false;
}
self.is_searching = true;
true
}
fn maintenance(&mut self, worker: &Worker) {
self.stats
.submit(&worker.handle.shared.worker_metrics[worker.index]);
if !self.is_shutdown {
let synced = worker.handle.shared.synced.lock();
self.is_shutdown = worker.inject().is_closed(&synced.inject);
}
if !self.is_traced {
self.is_traced = worker.handle.shared.trace_status.trace_requested();
}
}
fn pre_shutdown(&mut self, worker: &Worker) {
let start = self
.rand
.fastrand_n(worker.handle.shared.owned.get_shard_size() as u32);
worker
.handle
.shared
.owned
.close_and_shutdown_all(start as usize);
self.stats
.submit(&worker.handle.shared.worker_metrics[worker.index]);
}
fn shutdown(&mut self, handle: &Handle) {
let mut park = self.park.take().expect("park missing");
while self.next_local_task().is_some() {}
park.shutdown(&handle.driver);
}
fn tune_global_queue_interval(&mut self, worker: &Worker) {
let next = self
.stats
.tuned_global_queue_interval(&worker.handle.shared.config);
if u32::abs_diff(self.global_queue_interval, next) > 2 {
self.global_queue_interval = next;
}
}
}
impl Worker {
fn inject(&self) -> &inject::Shared<Arc<Handle>> {
&self.handle.shared.inject
}
}
impl Handle {
pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
with_current(|maybe_cx| {
if let Some(cx) = maybe_cx {
if self.ptr_eq(&cx.worker.handle) {
if let Some(core) = cx.core.borrow_mut().as_mut() {
self.schedule_local(core, task, is_yield);
return;
}
}
}
self.push_remote_task(task);
self.notify_parked_remote();
});
}
pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) {
if let Some(task) = task {
self.schedule_task(task, false);
}
}
fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
core.stats.inc_local_schedule_count();
if is_yield || !core.lifo_enabled {
core.run_queue
.push_back_or_overflow(task, self, &mut core.stats);
} else {
if let Some(prev) = core.run_queue.push_lifo(task) {
core.run_queue
.push_back_or_overflow(prev, self, &mut core.stats);
}
};
if core.park.is_some() {
self.notify_parked_local();
}
}
fn next_remote_task(&self) -> Option<Notified> {
if self.shared.inject.is_empty() {
return None;
}
let mut synced = self.shared.synced.lock();
unsafe { self.shared.inject.pop(&mut synced.inject) }
}
fn push_remote_task(&self, task: Notified) {
self.shared.scheduler_metrics.inc_remote_schedule_count();
let mut synced = self.shared.synced.lock();
unsafe {
self.shared.inject.push(&mut synced.inject, task);
}
}
#[cfg(all(tokio_unstable, feature = "time"))]
pub(crate) fn push_remote_timer(&self, hdl: time_alt::EntryHandle) {
assert_eq!(self.timer_flavor, TimerFlavor::Alternative);
{
let mut synced = self.shared.synced.lock();
synced.inject_timers.push(hdl);
}
self.notify_parked_remote();
}
#[cfg(all(tokio_unstable, feature = "time"))]
pub(crate) fn take_remote_timers(&self) -> Vec<time_alt::EntryHandle> {
assert_eq!(self.timer_flavor, TimerFlavor::Alternative);
match self.shared.synced.try_lock() {
Some(mut synced) => std::mem::take(&mut synced.inject_timers),
None => Vec::new(),
}
}
pub(super) fn close(&self) {
if self
.shared
.inject
.close(&mut self.shared.synced.lock().inject)
{
self.notify_all();
}
}
fn notify_parked_local(&self) -> bool {
super::counters::inc_num_inc_notify_local();
if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
super::counters::inc_num_unparks_local();
self.shared.remotes[index].unpark.unpark(&self.driver);
true
} else {
false
}
}
fn notify_parked_remote(&self) {
if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
self.shared.remotes[index].unpark.unpark(&self.driver);
}
}
pub(super) fn notify_all(&self) {
for remote in &self.shared.remotes[..] {
remote.unpark.unpark(&self.driver);
}
}
fn notify_if_work_pending(&self) {
for remote in &self.shared.remotes[..] {
if !remote.steal.is_empty() {
self.notify_parked_local();
return;
}
}
if !self.shared.inject.is_empty() {
self.notify_parked_local();
}
}
fn transition_worker_from_searching(&self) -> bool {
if self.shared.idle.transition_worker_from_searching() {
self.notify_parked_local()
} else {
false
}
}
fn shutdown_core(&self, core: Box<Core>) {
let mut cores = self.shared.shutdown_cores.lock();
cores.push(core);
if cores.len() != self.shared.remotes.len() {
return;
}
debug_assert!(self.shared.owned.is_empty());
for mut core in cores.drain(..) {
core.shutdown(self);
}
while let Some(task) = self.next_remote_task() {
drop(task);
}
}
fn ptr_eq(&self, other: &Handle) -> bool {
std::ptr::eq(self, other)
}
}
impl Overflow<Arc<Handle>> for Handle {
fn push(&self, task: task::Notified<Arc<Handle>>) {
self.push_remote_task(task);
}
fn push_batch<I>(&self, iter: I)
where
I: Iterator<Item = task::Notified<Arc<Handle>>>,
{
unsafe {
self.shared.inject.push_batch(self, iter);
}
}
}
pub(crate) struct InjectGuard<'a> {
lock: crate::loom::sync::MutexGuard<'a, Synced>,
}
impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
fn as_mut(&mut self) -> &mut inject::Synced {
&mut self.lock.inject
}
}
impl<'a> Lock<inject::Synced> for &'a Handle {
type Handle = InjectGuard<'a>;
fn lock(self) -> Self::Handle {
InjectGuard {
lock: self.shared.synced.lock(),
}
}
}
#[cfg(all(tokio_unstable, feature = "time"))]
struct MaintainLocalTimer {
park_duration: Option<Duration>,
auto_advance_duration: Option<Duration>,
}
#[track_caller]
fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
use scheduler::Context::MultiThread;
context::with_scheduler(|ctx| match ctx {
Some(MultiThread(ctx)) => f(Some(ctx)),
_ => f(None),
})
}