use std::{
cell::RefCell,
future::Future,
marker::PhantomData,
rc::Rc,
sync::{Arc, Weak},
task::Waker,
};
use async_task::{Runnable, Task};
use crossbeam_queue::SegQueue;
use slab::Slab;
use crate::runtime::{
SendWrapper,
scheduler::{drop_hook::DropHook, local_queue::LocalQueue},
};
mod drop_hook;
mod local_queue;
struct TaskQueue {
local_queue: Arc<SendWrapper<LocalQueue<Runnable>>>,
sync_queue: Arc<SegQueue<Runnable>>,
}
impl TaskQueue {
fn new() -> Self {
Self {
local_queue: Arc::new(SendWrapper::new(LocalQueue::new())),
sync_queue: Arc::new(SegQueue::new()),
}
}
unsafe fn pop(&self) -> (Option<Runnable>, Option<Runnable>) {
let local_queue = unsafe { self.local_queue.get_unchecked() };
let local_task = local_queue.pop();
let sync_task = if self.sync_queue.is_empty() {
None
} else {
self.sync_queue.pop()
};
(local_task, sync_task)
}
unsafe fn is_empty(&self) -> bool {
let local_queue = unsafe { self.local_queue.get_unchecked() };
local_queue.is_empty() && self.sync_queue.is_empty()
}
unsafe fn clear(&self) {
let local_queue = unsafe { self.local_queue.get_unchecked() };
while let Some(item) = local_queue.pop() {
drop(item);
}
while let Some(item) = self.sync_queue.pop() {
drop(item);
}
}
fn downgrade(&self) -> WeakTaskQueue {
WeakTaskQueue {
local_queue: Arc::downgrade(&self.local_queue),
sync_queue: Arc::downgrade(&self.sync_queue),
local_thread: self.local_queue.tracker(),
}
}
}
struct WeakTaskQueue {
local_queue: Weak<SendWrapper<LocalQueue<Runnable>>>,
sync_queue: Weak<SegQueue<Runnable>>,
local_thread: SendWrapper<()>,
}
impl WeakTaskQueue {
fn upgrade_and_push(&self, runnable: Runnable, waker: &Waker) {
if self.local_thread.valid() {
if let Some(local_queue) = self.local_queue.upgrade() {
unsafe { local_queue.get_unchecked() }.push(runnable);
#[cfg(feature = "notify-always")]
waker.wake_by_ref();
}
} else if let Some(sync_queue) = self.sync_queue.upgrade() {
sync_queue.push(runnable);
waker.wake_by_ref();
} else {
std::mem::forget(runnable);
}
}
}
pub(crate) struct Scheduler {
task_queue: TaskQueue,
active_tasks: Rc<RefCell<Slab<Waker>>>,
event_interval: usize,
_local_marker: PhantomData<*const ()>,
}
impl Scheduler {
pub(crate) fn new(event_interval: usize) -> Self {
Self {
task_queue: TaskQueue::new(),
active_tasks: Rc::new(RefCell::new(Slab::new())),
event_interval,
_local_marker: PhantomData,
}
}
pub(crate) unsafe fn spawn_unchecked<F>(&self, future: F, waker: Waker) -> Task<F::Output>
where
F: Future,
{
let mut active_tasks = self.active_tasks.borrow_mut();
let task_entry = active_tasks.vacant_entry();
let future = {
let active_tasks = self.active_tasks.clone();
let index = task_entry.key();
DropHook::new(future, move || {
active_tasks.borrow_mut().remove(index);
})
};
let schedule = {
let task_queue = self.task_queue.downgrade();
move |runnable| task_queue.upgrade_and_push(runnable, &waker)
};
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
task_entry.insert(runnable.waker());
runnable.schedule();
task
}
pub(crate) fn run(&self) -> bool {
for _ in 0..self.event_interval {
let tasks = unsafe { self.task_queue.pop() };
match tasks {
(Some(local), Some(sync)) => {
local.run();
sync.run();
}
(Some(local), None) => {
local.run();
}
(None, Some(sync)) => {
sync.run();
}
(None, None) => break,
}
}
!unsafe { self.task_queue.is_empty() }
}
pub(crate) fn clear(&self) {
self.active_tasks
.borrow()
.iter()
.for_each(|(_, waker)| waker.wake_by_ref());
unsafe { self.task_queue.clear() };
}
}