use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::context::EnterGuard;
use crate::runtime::driver::Driver;
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::Callback;
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::sync::notify::Notify;
use crate::util::atomic_cell::AtomicCell;
use crate::util::{waker_ref, Wake, WakerRef};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::atomic::Ordering::{AcqRel, Release};
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
use std::time::Duration;
pub(crate) struct BasicScheduler {
core: AtomicCell<Core>,
notify: Notify,
spawner: Spawner,
context_guard: Option<EnterGuard>,
}
struct Core {
tasks: VecDeque<task::Notified<Arc<Shared>>>,
spawner: Spawner,
tick: u8,
driver: Option<Driver>,
metrics: MetricsBatch,
}
#[derive(Clone)]
pub(crate) struct Spawner {
shared: Arc<Shared>,
}
enum RemoteMsg {
Schedule(task::Notified<Arc<Shared>>),
}
unsafe impl Send for RemoteMsg {}
struct Shared {
queue: Mutex<Option<VecDeque<RemoteMsg>>>,
owned: OwnedTasks<Arc<Shared>>,
unpark: <Driver as Park>::Unpark,
woken: AtomicBool,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
scheduler_metrics: SchedulerMetrics,
worker_metrics: WorkerMetrics,
}
struct Context {
spawner: Spawner,
core: RefCell<Option<Box<Core>>>,
}
const INITIAL_CAPACITY: usize = 64;
#[cfg(loom)]
const MAX_TASKS_PER_TICK: usize = 4;
#[cfg(not(loom))]
const MAX_TASKS_PER_TICK: usize = 61;
const REMOTE_FIRST_INTERVAL: u8 = 31;
scoped_thread_local!(static CURRENT: Context);
impl BasicScheduler {
pub(crate) fn new(
driver: Driver,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
) -> BasicScheduler {
let unpark = driver.unpark();
let spawner = Spawner {
shared: Arc::new(Shared {
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
owned: OwnedTasks::new(),
unpark,
woken: AtomicBool::new(false),
before_park,
after_unpark,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: WorkerMetrics::new(),
}),
};
let core = AtomicCell::new(Some(Box::new(Core {
tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
spawner: spawner.clone(),
tick: 0,
driver: Some(driver),
metrics: MetricsBatch::new(),
})));
BasicScheduler {
core,
notify: Notify::new(),
spawner,
context_guard: None,
}
}
pub(crate) fn spawner(&self) -> &Spawner {
&self.spawner
}
pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
pin!(future);
loop {
if let Some(core) = self.take_core() {
return core.block_on(future);
} else {
let mut enter = crate::runtime::enter(false);
let notified = self.notify.notified();
pin!(notified);
if let Some(out) = enter
.block_on(poll_fn(|cx| {
if notified.as_mut().poll(cx).is_ready() {
return Ready(None);
}
if let Ready(out) = future.as_mut().poll(cx) {
return Ready(Some(out));
}
Pending
}))
.expect("Failed to `Enter::block_on`")
{
return out;
}
}
}
}
fn take_core(&self) -> Option<CoreGuard<'_>> {
let core = self.core.take()?;
Some(CoreGuard {
context: Context {
spawner: self.spawner.clone(),
core: RefCell::new(Some(core)),
},
basic_scheduler: self,
})
}
pub(super) fn set_context_guard(&mut self, guard: EnterGuard) {
self.context_guard = Some(guard);
}
}
impl Drop for BasicScheduler {
fn drop(&mut self) {
let core = match self.take_core() {
Some(core) => core,
None if std::thread::panicking() => return,
None => panic!("Oh no! We never placed the Core back, this is a bug!"),
};
core.enter(|mut core, context| {
context.spawner.shared.owned.close_and_shutdown_all();
while let Some(task) = core.pop_task() {
drop(task);
}
let remote_queue = core.spawner.shared.queue.lock().take();
if let Some(remote_queue) = remote_queue {
for entry in remote_queue {
match entry {
RemoteMsg::Schedule(task) => {
drop(task);
}
}
}
}
assert!(context.spawner.shared.owned.is_empty());
core.metrics.submit(&core.spawner.shared.worker_metrics);
(core, ())
});
}
}
impl fmt::Debug for BasicScheduler {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("BasicScheduler").finish()
}
}
impl Core {
fn pop_task(&mut self) -> Option<task::Notified<Arc<Shared>>> {
let ret = self.tasks.pop_front();
self.spawner
.shared
.worker_metrics
.set_queue_depth(self.tasks.len());
ret
}
fn push_task(&mut self, task: task::Notified<Arc<Shared>>) {
self.tasks.push_back(task);
self.metrics.inc_local_schedule_count();
self.spawner
.shared
.worker_metrics
.set_queue_depth(self.tasks.len());
}
}
impl Context {
fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
core.metrics.incr_poll_count();
self.enter(core, || crate::coop::budget(f))
}
fn park(&self, mut core: Box<Core>) -> Box<Core> {
let mut driver = core.driver.take().expect("driver missing");
if let Some(f) = &self.spawner.shared.before_park {
#[allow(clippy::redundant_closure)]
let (c, _) = self.enter(core, || f());
core = c;
}
if core.tasks.is_empty() {
core.metrics.about_to_park();
core.metrics.submit(&core.spawner.shared.worker_metrics);
let (c, _) = self.enter(core, || {
driver.park().expect("failed to park");
});
core = c;
core.metrics.returned_from_park();
}
if let Some(f) = &self.spawner.shared.after_unpark {
#[allow(clippy::redundant_closure)]
let (c, _) = self.enter(core, || f());
core = c;
}
core.driver = Some(driver);
core
}
fn park_yield(&self, mut core: Box<Core>) -> Box<Core> {
let mut driver = core.driver.take().expect("driver missing");
core.metrics.submit(&core.spawner.shared.worker_metrics);
let (mut core, _) = self.enter(core, || {
driver
.park_timeout(Duration::from_millis(0))
.expect("failed to park");
});
core.driver = Some(driver);
core
}
fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
*self.core.borrow_mut() = Some(core);
let ret = f();
let core = self.core.borrow_mut().take().expect("core missing");
(core, ret)
}
}
impl Spawner {
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let (handle, notified) = self.shared.owned.bind(future, self.shared.clone());
if let Some(notified) = notified {
self.shared.schedule(notified);
}
handle
}
fn pop(&self) -> Option<RemoteMsg> {
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
None => None,
}
}
fn waker_ref(&self) -> WakerRef<'_> {
self.shared.woken.store(true, Release);
waker_ref(&self.shared)
}
pub(crate) fn reset_woken(&self) -> bool {
self.shared.woken.swap(false, AcqRel)
}
}
cfg_metrics! {
impl Spawner {
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.queue.lock()
.as_ref()
.map(|queue| queue.len())
.unwrap_or(0)
}
pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics
}
}
}
impl fmt::Debug for Spawner {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Spawner").finish()
}
}
impl Schedule for Arc<Shared> {
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
self.owned.remove(task)
}
fn schedule(&self, task: task::Notified<Self>) {
CURRENT.with(|maybe_cx| match maybe_cx {
Some(cx) if Arc::ptr_eq(self, &cx.spawner.shared) => {
let mut core = cx.core.borrow_mut();
if let Some(core) = core.as_mut() {
core.push_task(task);
}
}
_ => {
self.scheduler_metrics.inc_remote_schedule_count();
let mut guard = self.queue.lock();
if let Some(queue) = guard.as_mut() {
queue.push_back(RemoteMsg::Schedule(task));
drop(guard);
self.unpark.unpark();
}
}
});
}
}
impl Wake for Shared {
fn wake(self: Arc<Self>) {
Wake::wake_by_ref(&self)
}
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.woken.store(true, Release);
arc_self.unpark.unpark();
}
}
struct CoreGuard<'a> {
context: Context,
basic_scheduler: &'a BasicScheduler,
}
impl CoreGuard<'_> {
fn block_on<F: Future>(self, future: F) -> F::Output {
self.enter(|mut core, context| {
let _enter = crate::runtime::enter(false);
let waker = context.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
pin!(future);
'outer: loop {
if core.spawner.reset_woken() {
let (c, res) = context.enter(core, || {
crate::coop::budget(|| future.as_mut().poll(&mut cx))
});
core = c;
if let Ready(v) = res {
return (core, v);
}
}
for _ in 0..MAX_TASKS_PER_TICK {
let tick = core.tick;
core.tick = core.tick.wrapping_add(1);
let entry = if tick % REMOTE_FIRST_INTERVAL == 0 {
core.spawner
.pop()
.or_else(|| core.tasks.pop_front().map(RemoteMsg::Schedule))
} else {
core.tasks
.pop_front()
.map(RemoteMsg::Schedule)
.or_else(|| core.spawner.pop())
};
let entry = match entry {
Some(entry) => entry,
None => {
core = context.park(core);
continue 'outer;
}
};
match entry {
RemoteMsg::Schedule(task) => {
let task = context.spawner.shared.owned.assert_owner(task);
let (c, _) = context.run_task(core, || {
task.run();
});
core = c;
}
}
}
core = context.park_yield(core);
}
})
}
fn enter<F, R>(self, f: F) -> R
where
F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
{
let core = self.context.core.borrow_mut().take().expect("core missing");
let (core, ret) = CURRENT.set(&self.context, || f(core, &self.context));
*self.context.core.borrow_mut() = Some(core);
ret
}
}
impl Drop for CoreGuard<'_> {
fn drop(&mut self) {
if let Some(core) = self.context.core.borrow_mut().take() {
self.basic_scheduler.core.set(core);
self.basic_scheduler.notify.notify_one()
}
}
}