use crate::park::{Park, Unpark};
use crate::task::{self, JoinHandle, Schedule, Task};
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::mem::ManuallyDrop;
use std::sync::{Arc, Mutex};
use std::task::{RawWaker, RawWakerVTable, Waker};
use std::time::Duration;
#[derive(Debug)]
pub(crate) struct BasicScheduler<P>
where
P: Park,
{
scheduler: Arc<SchedulerPriv>,
local: LocalState<P>,
}
#[derive(Debug, Clone)]
pub(crate) struct Spawner {
scheduler: Arc<SchedulerPriv>,
}
pub(super) struct SchedulerPriv {
owned_tasks: UnsafeCell<task::OwnedList<Self>>,
local_queue: UnsafeCell<VecDeque<Task<SchedulerPriv>>>,
remote_queue: Mutex<RemoteQueue>,
pending_drop: task::TransferStack<Self>,
unpark: Box<dyn Unpark>,
}
unsafe impl Send for SchedulerPriv {}
unsafe impl Sync for SchedulerPriv {}
#[derive(Debug)]
struct LocalState<P> {
tick: u8,
park: P,
}
#[derive(Debug)]
struct RemoteQueue {
queue: VecDeque<Task<SchedulerPriv>>,
open: bool,
}
const MAX_TASKS_PER_TICK: usize = 61;
const CHECK_REMOTE_INTERVAL: u8 = 13;
impl<P> BasicScheduler<P>
where
P: Park,
{
pub(crate) fn new(park: P) -> BasicScheduler<P> {
let unpark = park.unpark();
BasicScheduler {
scheduler: Arc::new(SchedulerPriv {
owned_tasks: UnsafeCell::new(task::OwnedList::new()),
local_queue: UnsafeCell::new(VecDeque::with_capacity(64)),
remote_queue: Mutex::new(RemoteQueue {
queue: VecDeque::with_capacity(64),
open: true,
}),
pending_drop: task::TransferStack::new(),
unpark: Box::new(unpark),
}),
local: LocalState { tick: 0, park },
}
}
pub(crate) fn spawner(&self) -> Spawner {
Spawner {
scheduler: self.scheduler.clone(),
}
}
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
self.scheduler.schedule(task);
handle
}
pub(crate) fn block_on<F>(&mut self, mut future: F) -> F::Output
where
F: Future,
{
use crate::runtime;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll::Ready;
let local = &mut self.local;
let scheduler = &*self.scheduler;
runtime::global::with_basic_scheduler(scheduler, || {
let mut _enter = runtime::enter();
let raw_waker = RawWaker::new(
scheduler as *const SchedulerPriv as *const (),
&RawWakerVTable::new(sched_clone_waker, sched_noop, sched_wake_by_ref, sched_noop),
);
let waker = ManuallyDrop::new(unsafe { Waker::from_raw(raw_waker) });
let mut cx = Context::from_waker(&waker);
let mut future = unsafe { Pin::new_unchecked(&mut future) };
loop {
if let Ready(v) = future.as_mut().poll(&mut cx) {
return v;
}
scheduler.tick(local);
scheduler.drain_pending_drop();
}
})
}
}
impl Spawner {
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
self.scheduler.schedule(task);
handle
}
pub(crate) fn enter<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
use crate::runtime::global;
global::with_basic_scheduler(&*self.scheduler, f)
}
}
impl SchedulerPriv {
fn tick(&self, local: &mut LocalState<impl Park>) {
for _ in 0..MAX_TASKS_PER_TICK {
let tick = local.tick;
local.tick = tick.wrapping_add(1);
let task = match self.next_task(tick) {
Some(task) => task,
None => {
local.park.park().ok().expect("failed to park");
return;
}
};
if let Some(task) = task.run(&mut || Some(self.into())) {
unsafe {
self.schedule_local(task);
}
}
}
local
.park
.park_timeout(Duration::from_millis(0))
.ok()
.expect("failed to park");
}
fn drain_pending_drop(&self) {
for task in self.pending_drop.drain() {
unsafe {
(*self.owned_tasks.get()).remove(&task);
}
drop(task);
}
}
pub(super) unsafe fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
self.schedule_local(task);
handle
}
unsafe fn schedule_local(&self, task: Task<Self>) {
(*self.local_queue.get()).push_back(task);
}
fn next_task(&self, tick: u8) -> Option<Task<Self>> {
if 0 == tick % CHECK_REMOTE_INTERVAL {
self.next_remote_task().or_else(|| self.next_local_task())
} else {
self.next_local_task().or_else(|| self.next_remote_task())
}
}
fn next_local_task(&self) -> Option<Task<Self>> {
unsafe { (*self.local_queue.get()).pop_front() }
}
fn next_remote_task(&self) -> Option<Task<Self>> {
self.remote_queue.lock().unwrap().queue.pop_front()
}
}
impl Schedule for SchedulerPriv {
fn bind(&self, task: &Task<Self>) {
unsafe {
(*self.owned_tasks.get()).insert(task);
}
}
fn release(&self, task: Task<Self>) {
self.pending_drop.push(task);
}
fn release_local(&self, task: &Task<Self>) {
unsafe {
(*self.owned_tasks.get()).remove(task);
}
}
fn schedule(&self, task: Task<Self>) {
use crate::runtime::global;
if global::basic_scheduler_is_current(self) {
unsafe { self.schedule_local(task) };
} else {
let mut lock = self.remote_queue.lock().unwrap();
if lock.open {
lock.queue.push_back(task);
} else {
task.shutdown();
}
self.unpark.unpark();
drop(lock);
}
}
}
impl<P> Drop for BasicScheduler<P>
where
P: Park,
{
fn drop(&mut self) {
let mut lock = self.scheduler.remote_queue.lock().unwrap();
lock.open = false;
while let Some(task) = lock.queue.pop_front() {
task.shutdown();
}
drop(lock);
while let Some(task) = self.scheduler.next_local_task() {
task.shutdown();
}
unsafe {
(*self.scheduler.owned_tasks.get()).shutdown();
}
self.scheduler.drain_pending_drop();
while unsafe { !(*self.scheduler.owned_tasks.get()).is_empty() } {
self.local.park.park().ok().expect("park failed");
self.scheduler.drain_pending_drop();
}
}
}
impl fmt::Debug for SchedulerPriv {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Scheduler").finish()
}
}
unsafe fn sched_clone_waker(ptr: *const ()) -> RawWaker {
let s1 = ManuallyDrop::new(Arc::from_raw(ptr as *const SchedulerPriv));
#[allow(clippy::redundant_clone)]
let s2 = s1.clone();
RawWaker::new(
&**s2 as *const SchedulerPriv as *const (),
&RawWakerVTable::new(sched_clone_waker, sched_wake, sched_wake_by_ref, sched_drop),
)
}
unsafe fn sched_wake(ptr: *const ()) {
let scheduler = Arc::from_raw(ptr as *const SchedulerPriv);
scheduler.unpark.unpark();
}
unsafe fn sched_wake_by_ref(ptr: *const ()) {
let scheduler = ManuallyDrop::new(Arc::from_raw(ptr as *const SchedulerPriv));
scheduler.unpark.unpark();
}
unsafe fn sched_drop(ptr: *const ()) {
let _ = Arc::from_raw(ptr as *const SchedulerPriv);
}
unsafe fn sched_noop(_ptr: *const ()) {
unreachable!();
}