use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::driver::{self, Driver};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::{blocking, context, scheduler, Config};
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::sync::notify::Notify;
use crate::util::atomic_cell::AtomicCell;
use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::atomic::Ordering::{AcqRel, Release};
use std::task::Poll::{Pending, Ready};
use std::time::Duration;
pub(crate) struct CurrentThread {
core: AtomicCell<Core>,
notify: Notify,
}
pub(crate) struct Handle {
shared: Shared,
pub(crate) driver: driver::Handle,
pub(crate) blocking_spawner: blocking::Spawner,
pub(crate) seed_generator: RngSeedGenerator,
}
struct Core {
tasks: VecDeque<task::Notified<Arc<Handle>>>,
tick: u32,
driver: Option<Driver>,
metrics: MetricsBatch,
unhandled_panic: bool,
}
struct Shared {
queue: Mutex<Option<VecDeque<task::Notified<Arc<Handle>>>>>,
owned: OwnedTasks<Arc<Handle>>,
woken: AtomicBool,
config: Config,
scheduler_metrics: SchedulerMetrics,
worker_metrics: WorkerMetrics,
}
struct Context {
handle: Arc<Handle>,
core: RefCell<Option<Box<Core>>>,
}
const INITIAL_CAPACITY: usize = 64;
scoped_thread_local!(static CURRENT: Context);
impl CurrentThread {
pub(crate) fn new(
driver: Driver,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
) -> (CurrentThread, Arc<Handle>) {
let handle = Arc::new(Handle {
shared: Shared {
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
owned: OwnedTasks::new(),
woken: AtomicBool::new(false),
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: WorkerMetrics::new(),
},
driver: driver_handle,
blocking_spawner,
seed_generator,
});
let core = AtomicCell::new(Some(Box::new(Core {
tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
tick: 0,
driver: Some(driver),
metrics: MetricsBatch::new(),
unhandled_panic: false,
})));
let scheduler = CurrentThread {
core,
notify: Notify::new(),
};
(scheduler, handle)
}
#[track_caller]
pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
pin!(future);
let mut enter = crate::runtime::context::enter_runtime(handle, false);
let handle = handle.as_current_thread();
loop {
if let Some(core) = self.take_core(handle) {
return core.block_on(future);
} else {
let notified = self.notify.notified();
pin!(notified);
if let Some(out) = enter
.blocking
.block_on(poll_fn(|cx| {
if notified.as_mut().poll(cx).is_ready() {
return Ready(None);
}
if let Ready(out) = future.as_mut().poll(cx) {
return Ready(Some(out));
}
Pending
}))
.expect("Failed to `Enter::block_on`")
{
return out;
}
}
}
}
fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
let core = self.core.take()?;
Some(CoreGuard {
context: Context {
handle: handle.clone(),
core: RefCell::new(Some(core)),
},
scheduler: self,
})
}
pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
let handle = handle.as_current_thread();
let core = match self.take_core(handle) {
Some(core) => core,
None if std::thread::panicking() => return,
None => panic!("Oh no! We never placed the Core back, this is a bug!"),
};
core.enter(|mut core, _context| {
handle.shared.owned.close_and_shutdown_all();
while let Some(task) = core.pop_task(handle) {
drop(task);
}
let remote_queue = handle.shared.queue.lock().take();
if let Some(remote_queue) = remote_queue {
for task in remote_queue {
drop(task);
}
}
assert!(handle.shared.owned.is_empty());
core.metrics.submit(&handle.shared.worker_metrics);
if let Some(driver) = core.driver.as_mut() {
driver.shutdown(&handle.driver);
}
(core, ())
});
}
}
impl fmt::Debug for CurrentThread {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("CurrentThread").finish()
}
}
impl Core {
fn pop_task(&mut self, handle: &Handle) -> Option<task::Notified<Arc<Handle>>> {
let ret = self.tasks.pop_front();
handle
.shared
.worker_metrics
.set_queue_depth(self.tasks.len());
ret
}
fn push_task(&mut self, handle: &Handle, task: task::Notified<Arc<Handle>>) {
self.tasks.push_back(task);
self.metrics.inc_local_schedule_count();
handle
.shared
.worker_metrics
.set_queue_depth(self.tasks.len());
}
}
fn did_defer_tasks() -> bool {
context::with_defer(|deferred| !deferred.is_empty()).unwrap()
}
fn wake_deferred_tasks() {
context::with_defer(|deferred| deferred.wake());
}
impl Context {
fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
core.metrics.incr_poll_count();
self.enter(core, || crate::runtime::coop::budget(f))
}
fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
let mut driver = core.driver.take().expect("driver missing");
if let Some(f) = &handle.shared.config.before_park {
#[allow(clippy::redundant_closure)]
let (c, _) = self.enter(core, || f());
core = c;
}
if core.tasks.is_empty() {
core.metrics.about_to_park();
core.metrics.submit(&handle.shared.worker_metrics);
let (c, _) = self.enter(core, || {
driver.park(&handle.driver);
wake_deferred_tasks();
});
core = c;
core.metrics.returned_from_park();
}
if let Some(f) = &handle.shared.config.after_unpark {
#[allow(clippy::redundant_closure)]
let (c, _) = self.enter(core, || f());
core = c;
}
core.driver = Some(driver);
core
}
fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
let mut driver = core.driver.take().expect("driver missing");
core.metrics.submit(&handle.shared.worker_metrics);
let (mut core, _) = self.enter(core, || {
driver.park_timeout(&handle.driver, Duration::from_millis(0));
wake_deferred_tasks();
});
core.driver = Some(driver);
core
}
fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
*self.core.borrow_mut() = Some(core);
let ret = f();
let core = self.core.borrow_mut().take().expect("core missing");
(core, ret)
}
}
impl Handle {
pub(crate) fn spawn<F>(
me: &Arc<Self>,
future: F,
id: crate::runtime::task::Id,
) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
if let Some(notified) = notified {
me.schedule(notified);
}
handle
}
fn pop(&self) -> Option<task::Notified<Arc<Handle>>> {
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
None => None,
}
}
fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
me.shared.woken.store(true, Release);
waker_ref(me)
}
pub(crate) fn reset_woken(&self) -> bool {
self.shared.woken.swap(false, AcqRel)
}
}
cfg_metrics! {
impl Handle {
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared
.queue
.lock()
.as_ref()
.map(|queue| queue.len())
.unwrap_or(0)
}
pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics
}
pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}
pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}
pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
}
}
impl fmt::Debug for Handle {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("current_thread::Handle { ... }").finish()
}
}
impl Schedule for Arc<Handle> {
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
self.shared.owned.remove(task)
}
fn schedule(&self, task: task::Notified<Self>) {
CURRENT.with(|maybe_cx| match maybe_cx {
Some(cx) if Arc::ptr_eq(self, &cx.handle) => {
let mut core = cx.core.borrow_mut();
if let Some(core) = core.as_mut() {
core.push_task(self, task);
}
}
_ => {
self.shared.scheduler_metrics.inc_remote_schedule_count();
let mut guard = self.shared.queue.lock();
if let Some(queue) = guard.as_mut() {
queue.push_back(task);
drop(guard);
self.driver.unpark();
}
}
});
}
cfg_unstable! {
fn unhandled_panic(&self) {
use crate::runtime::UnhandledPanic;
match self.shared.config.unhandled_panic {
UnhandledPanic::Ignore => {
}
UnhandledPanic::ShutdownRuntime => {
CURRENT.with(|maybe_cx| match maybe_cx {
Some(cx) if Arc::ptr_eq(self, &cx.handle) => {
let mut core = cx.core.borrow_mut();
if let Some(core) = core.as_mut() {
core.unhandled_panic = true;
self.shared.owned.close_and_shutdown_all();
}
}
_ => unreachable!("runtime core not set in CURRENT thread-local"),
})
}
}
}
}
}
impl Wake for Handle {
fn wake(arc_self: Arc<Self>) {
Wake::wake_by_ref(&arc_self)
}
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.shared.woken.store(true, Release);
arc_self.driver.unpark();
}
}
struct CoreGuard<'a> {
context: Context,
scheduler: &'a CurrentThread,
}
impl CoreGuard<'_> {
#[track_caller]
fn block_on<F: Future>(self, future: F) -> F::Output {
let ret = self.enter(|mut core, context| {
let waker = Handle::waker_ref(&context.handle);
let mut cx = std::task::Context::from_waker(&waker);
pin!(future);
'outer: loop {
let handle = &context.handle;
if handle.reset_woken() {
let (c, res) = context.enter(core, || {
crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
});
core = c;
if let Ready(v) = res {
return (core, Some(v));
}
}
for _ in 0..handle.shared.config.event_interval {
if core.unhandled_panic {
return (core, None);
}
let tick = core.tick;
core.tick = core.tick.wrapping_add(1);
let entry = if tick % handle.shared.config.global_queue_interval == 0 {
handle.pop().or_else(|| core.tasks.pop_front())
} else {
core.tasks.pop_front().or_else(|| handle.pop())
};
let task = match entry {
Some(entry) => entry,
None => {
core = if did_defer_tasks() {
context.park_yield(core, handle)
} else {
context.park(core, handle)
};
continue 'outer;
}
};
let task = context.handle.shared.owned.assert_owner(task);
let (c, _) = context.run_task(core, || {
task.run();
});
core = c;
}
core = context.park_yield(core, handle);
}
});
match ret {
Some(ret) => ret,
None => {
panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
}
}
}
fn enter<F, R>(self, f: F) -> R
where
F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
{
let core = self.context.core.borrow_mut().take().expect("core missing");
let (core, ret) = CURRENT.set(&self.context, || f(core, &self.context));
*self.context.core.borrow_mut() = Some(core);
ret
}
}
impl Drop for CoreGuard<'_> {
fn drop(&mut self) {
if let Some(core) = self.context.core.borrow_mut().take() {
self.scheduler.core.set(core);
self.scheduler.notify.notify_one()
}
}
}