use core::cell::{Cell, Ref, RefCell};
use core::iter;
use core::ops::Deref;
use core::sync::atomic::{self, AtomicBool, Ordering};
use futures::future::{FutureExt, FutureObj, LocalFutureObj};
use futures::task::{LocalSpawn, LocalWaker, Poll, Spawn, SpawnError, UnsafeWake, Waker};
#[derive(Debug)]
pub struct CachedExec<T: AsRef<[Task]>> {
storage: T,
next: Cell<usize>,
count: Cell<usize>,
}
#[derive(Debug)]
struct TaskInner {
ready: Flag,
task: RefCell<LocalFutureObj<'static, ()>>,
}
#[derive(Debug)]
struct Flag(AtomicBool);
#[derive(Default, Debug)]
pub struct Task(RefCell<Option<TaskInner>>);
impl Task {
pub const fn new() -> Task {
Task(RefCell::new(None))
}
pub fn init() -> impl Iterator<Item = Task> {
iter::repeat_with(Task::new)
}
}
impl<T: AsRef<[Task]>> CachedExec<T> {
pub fn new(cache: T) -> CachedExec<T> {
CachedExec {
storage: cache,
next: Cell::new(0),
count: Cell::new(0),
}
}
pub fn run(&self) {
for (_, cell) in self.task_iter() {
if self.count.get() == 0 {
return;
}
match cell.0.try_borrow() {
Err(_) => continue,
Ok(task_) => {
let task = match task_.as_ref() {
Some(t) => t,
None => continue,
};
if self.run_once(task) {
drop(task_);
cell.0.replace(None);
self.count.update(|v| v - 1);
}
}
}
atomic::spin_loop_hint();
}
}
#[inline]
fn task_iter(&self) -> impl Iterator<Item = (usize, &Task)> {
self.storage
.as_ref()
.iter()
.enumerate()
.cycle()
.skip(self.next.get())
}
#[inline]
fn set_next(&self, current: usize) {
self.next.set(current + 1 % self.storage.as_ref().len())
}
#[inline]
fn get_inner(&self, idx: usize) -> Ref<TaskInner> {
Ref::map(self.storage.as_ref()[idx].0.borrow(), |t| {
t.as_ref().unwrap()
})
}
fn spawn_raw(&self, future: LocalFutureObj<'static,()>) -> Ref<TaskInner> {
let new_task = Some(TaskInner {
ready: Flag::true_(),
task: RefCell::new(future),
});
for (idx, cell) in self.task_iter().take(self.storage.as_ref().len()) {
match cell.0.try_borrow() {
Err(_) => continue,
Ok(task) => {
if task.is_none() {
drop(task);
cell.0.replace(new_task);
self.set_next(idx);
return self.get_inner(idx);
}
}
}
}
for (idx, cell) in self.task_iter() {
match cell.0.try_borrow() {
Err(_) => continue,
Ok(task_) => {
let task = task_.as_ref().unwrap();
if self.run_once(task) {
drop(task_);
cell.0.replace(new_task);
self.set_next(idx);
return self.get_inner(idx);
}
}
}
atomic::spin_loop_hint();
}
unreachable!(); }
fn run_once(&self, task: &TaskInner) -> bool {
if task.ready.compare_and_swap(true, false, Ordering::Acquire) {
let mut future = match task.task.try_borrow_mut() {
Err(_) => return false,
Ok(inner) => inner,
};
let waker = unsafe { LocalWaker::new((&task.ready as &UnsafeWake).into()) };
match future.poll_unpin(&waker) {
Poll::Pending => false,
Poll::Ready(()) => true,
}
} else {
false
}
}
}
impl<T: AsRef<[Task]>> Spawn for CachedExec<T> {
#[inline]
fn spawn_obj(&mut self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.spawn_raw(future.into());
Ok(())
}
}
impl<'a, T: AsRef<[Task]>> Spawn for &'a CachedExec<T> {
fn spawn_obj(&mut self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.spawn_raw(future.into());
Ok(())
}
}
impl<T: AsRef<[Task]>> LocalSpawn for CachedExec<T> {
#[inline]
fn spawn_local_obj(&mut self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
self.spawn_raw(future);
Ok(())
}
}
impl<'a, T: AsRef<[Task]>> LocalSpawn for &'a CachedExec<T> {
fn spawn_local_obj(&mut self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
self.spawn_raw(future);
Ok(())
}
}
impl Flag {
const fn true_() -> Flag {
Flag(AtomicBool::new(true))
}
}
impl Deref for Flag {
type Target = AtomicBool;
fn deref(&self) -> &AtomicBool {
&self.0
}
}
unsafe impl UnsafeWake for Flag {
unsafe fn clone_raw(&self) -> Waker {
Waker::new((self as &UnsafeWake).into())
}
unsafe fn drop_raw(&self) {}
unsafe fn wake(&self) {
self.store(true, Ordering::Release)
}
}