use super::Borrow;
use tokio_executor::park::Unpark;
use tokio_executor::Enter;
use futures::executor::{self, NotifyHandle, Spawn, UnsafeNotify};
use futures::{Async, Future};
use std::cell::UnsafeCell;
use std::fmt::{self, Debug};
use std::marker::PhantomData;
use std::mem;
use std::ptr;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
use std::sync::{Arc, Weak};
use std::thread;
use std::usize;
pub struct Scheduler<U> {
inner: Arc<Inner<U>>,
nodes: List<U>,
}
pub struct Notify<'a, U: 'a>(&'a Arc<Node<U>>);
struct List<U> {
len: usize,
head: *const Node<U>,
tail: *const Node<U>,
}
struct Inner<U> {
unpark: U,
tick_num: AtomicUsize,
head_readiness: AtomicPtr<Node<U>>,
tail_readiness: UnsafeCell<*const Node<U>>,
stub: Arc<Node<U>>,
}
unsafe impl<U: Sync + Send> Send for Inner<U> {}
unsafe impl<U: Sync + Send> Sync for Inner<U> {}
impl<U: Unpark> executor::Notify for Inner<U> {
fn notify(&self, _: usize) {
self.unpark.unpark();
}
}
struct Node<U> {
item: UnsafeCell<Option<Task>>,
notified_at: AtomicUsize,
next_all: UnsafeCell<*const Node<U>>,
prev_all: UnsafeCell<*const Node<U>>,
next_readiness: AtomicPtr<Node<U>>,
queued: AtomicBool,
queue: Weak<Inner<U>>,
}
enum Dequeue<U> {
Data(*const Node<U>),
Empty,
Yield,
Inconsistent,
}
struct Task(Spawn<Box<dyn Future<Item = (), Error = ()>>>);
pub struct Scheduled<'a, U: 'a> {
task: &'a mut Task,
notify: &'a Notify<'a, U>,
done: &'a mut bool,
}
impl<U> Scheduler<U>
where
U: Unpark,
{
pub fn new(unpark: U) -> Self {
let stub = Arc::new(Node {
item: UnsafeCell::new(None),
notified_at: AtomicUsize::new(0),
next_all: UnsafeCell::new(ptr::null()),
prev_all: UnsafeCell::new(ptr::null()),
next_readiness: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
queue: Weak::new(),
});
let stub_ptr = &*stub as *const Node<U>;
let inner = Arc::new(Inner {
unpark,
tick_num: AtomicUsize::new(0),
head_readiness: AtomicPtr::new(stub_ptr as *mut _),
tail_readiness: UnsafeCell::new(stub_ptr),
stub: stub,
});
Scheduler {
inner: inner,
nodes: List::new(),
}
}
pub fn notify(&self) -> NotifyHandle {
self.inner.clone().into()
}
pub fn schedule(&mut self, item: Box<dyn Future<Item = (), Error = ()>>) {
let tick_num = self.inner.tick_num.load(SeqCst);
let node = Arc::new(Node {
item: UnsafeCell::new(Some(Task::new(item))),
notified_at: AtomicUsize::new(tick_num),
next_all: UnsafeCell::new(ptr::null_mut()),
prev_all: UnsafeCell::new(ptr::null_mut()),
next_readiness: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
queue: Arc::downgrade(&self.inner),
});
let ptr = self.nodes.push_back(node);
self.inner.enqueue(ptr);
}
pub fn has_pending_futures(&mut self) -> bool {
unsafe { self.inner.has_pending_futures() }
}
pub fn tick(&mut self, eid: u64, enter: &mut Enter, num_futures: &AtomicUsize) -> bool {
let mut ret = false;
let tick = self.inner.tick_num.fetch_add(1, SeqCst).wrapping_add(1);
loop {
let node = match unsafe { self.inner.dequeue(Some(tick)) } {
Dequeue::Empty => {
return ret;
}
Dequeue::Yield => {
self.inner.unpark.unpark();
return ret;
}
Dequeue::Inconsistent => {
thread::yield_now();
continue;
}
Dequeue::Data(node) => node,
};
ret = true;
debug_assert!(node != self.inner.stub());
unsafe {
if (*(*node).item.get()).is_none() {
let node = ptr2arc(node);
assert!((*node.next_all.get()).is_null());
assert!((*node.prev_all.get()).is_null());
continue;
};
struct Bomb<'a, U: Unpark + 'a> {
borrow: &'a mut Borrow<'a, U>,
enter: &'a mut Enter,
node: Option<Arc<Node<U>>>,
}
impl<'a, U: Unpark> Drop for Bomb<'a, U> {
fn drop(&mut self) {
if let Some(node) = self.node.take() {
self.borrow.enter(self.enter, || release_node(node))
}
}
}
let node = self.nodes.remove(node);
let mut borrow = Borrow {
id: eid,
scheduler: self,
num_futures,
};
let mut bomb = Bomb {
node: Some(node),
enter: enter,
borrow: &mut borrow,
};
let mut done = false;
{
let node = bomb.node.as_ref().unwrap();
let item = (*node.item.get()).as_mut().unwrap();
let prev = (*node).queued.swap(false, SeqCst);
assert!(prev);
let borrow = &mut *bomb.borrow;
let enter = &mut *bomb.enter;
let notify = Notify(bomb.node.as_ref().unwrap());
let mut scheduled = Scheduled {
task: item,
notify: ¬ify,
done: &mut done,
};
if borrow.enter(enter, || scheduled.tick()) {
borrow.num_futures.fetch_sub(2, SeqCst);
}
}
if !done {
let node = bomb.node.take().unwrap();
bomb.borrow.scheduler.nodes.push_back(node);
}
}
}
}
}
impl<'a, U: Unpark> Scheduled<'a, U> {
pub fn tick(&mut self) -> bool {
let ret = match self.task.0.poll_future_notify(self.notify, 0) {
Ok(Async::Ready(_)) | Err(_) => true,
Ok(Async::NotReady) => false,
};
*self.done = ret;
ret
}
}
impl Task {
pub fn new(future: Box<dyn Future<Item = (), Error = ()> + 'static>) -> Self {
Task(executor::spawn(future))
}
}
impl fmt::Debug for Task {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Task").finish()
}
}
fn release_node<U>(node: Arc<Node<U>>) {
let prev = node.queued.swap(true, SeqCst);
unsafe {
drop((*node.item.get()).take());
}
if prev {
mem::forget(node);
}
}
impl<U> Debug for Scheduler<U> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Scheduler {{ ... }}")
}
}
impl<U> Drop for Scheduler<U> {
fn drop(&mut self) {
while let Some(node) = self.nodes.pop_front() {
release_node(node);
}
}
}
impl<U> Inner<U> {
fn enqueue(&self, node: *const Node<U>) {
unsafe {
debug_assert!((*node).queued.load(Relaxed));
(*node).next_readiness.store(ptr::null_mut(), Relaxed);
let node = node as *mut _;
let prev = self.head_readiness.swap(node, AcqRel);
(*prev).next_readiness.store(node, Release);
}
}
unsafe fn has_pending_futures(&self) -> bool {
let tail = *self.tail_readiness.get();
let next = (*tail).next_readiness.load(Acquire);
if tail == self.stub() {
if next.is_null() {
return false;
}
}
true
}
unsafe fn dequeue(&self, tick: Option<usize>) -> Dequeue<U> {
let mut tail = *self.tail_readiness.get();
let mut next = (*tail).next_readiness.load(Acquire);
if tail == self.stub() {
if next.is_null() {
return Dequeue::Empty;
}
*self.tail_readiness.get() = next;
tail = next;
next = (*next).next_readiness.load(Acquire);
}
if let Some(tick) = tick {
let actual = (*tail).notified_at.load(SeqCst);
if actual == tick {
return Dequeue::Yield;
}
}
if !next.is_null() {
*self.tail_readiness.get() = next;
debug_assert!(tail != self.stub());
return Dequeue::Data(tail);
}
if self.head_readiness.load(Acquire) as *const _ != tail {
return Dequeue::Inconsistent;
}
self.enqueue(self.stub());
next = (*tail).next_readiness.load(Acquire);
if !next.is_null() {
*self.tail_readiness.get() = next;
return Dequeue::Data(tail);
}
Dequeue::Inconsistent
}
fn stub(&self) -> *const Node<U> {
&*self.stub
}
}
impl<U> Drop for Inner<U> {
fn drop(&mut self) {
unsafe {
loop {
match self.dequeue(None) {
Dequeue::Empty => break,
Dequeue::Yield => unreachable!(),
Dequeue::Inconsistent => abort("inconsistent in drop"),
Dequeue::Data(ptr) => drop(ptr2arc(ptr)),
}
}
}
}
}
impl<U> List<U> {
fn new() -> Self {
List {
len: 0,
head: ptr::null_mut(),
tail: ptr::null_mut(),
}
}
fn push_back(&mut self, node: Arc<Node<U>>) -> *const Node<U> {
let ptr = arc2ptr(node);
unsafe {
*(*ptr).prev_all.get() = self.tail;
*(*ptr).next_all.get() = ptr::null_mut();
if !self.tail.is_null() {
*(*self.tail).next_all.get() = ptr;
self.tail = ptr;
} else {
self.tail = ptr;
self.head = ptr;
}
}
self.len += 1;
return ptr;
}
fn pop_front(&mut self) -> Option<Arc<Node<U>>> {
if self.head.is_null() {
return None;
}
self.len -= 1;
unsafe {
let node = ptr2arc(self.head);
self.head = *node.next_all.get();
if self.head.is_null() {
self.tail = ptr::null_mut();
} else {
*(*self.head).prev_all.get() = ptr::null_mut();
}
Some(node)
}
}
unsafe fn remove(&mut self, node: *const Node<U>) -> Arc<Node<U>> {
let node = ptr2arc(node);
let next = *node.next_all.get();
let prev = *node.prev_all.get();
*node.next_all.get() = ptr::null_mut();
*node.prev_all.get() = ptr::null_mut();
if !next.is_null() {
*(*next).prev_all.get() = prev;
} else {
self.tail = prev;
}
if !prev.is_null() {
*(*prev).next_all.get() = next;
} else {
self.head = next;
}
self.len -= 1;
return node;
}
}
impl<'a, U> Clone for Notify<'a, U> {
fn clone(&self) -> Self {
Notify(self.0)
}
}
impl<'a, U> fmt::Debug for Notify<'a, U> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Notify").finish()
}
}
impl<'a, U: Unpark> From<Notify<'a, U>> for NotifyHandle {
fn from(handle: Notify<'a, U>) -> NotifyHandle {
unsafe {
let ptr = handle.0.clone();
let ptr = mem::transmute::<Arc<Node<U>>, *mut ArcNode<U>>(ptr);
NotifyHandle::new(hide_lt(ptr))
}
}
}
struct ArcNode<U>(PhantomData<U>);
unsafe impl<U: Sync + Send> Send for ArcNode<U> {}
unsafe impl<U: Sync + Send> Sync for ArcNode<U> {}
impl<U: Unpark> executor::Notify for ArcNode<U> {
fn notify(&self, _id: usize) {
unsafe {
let me: *const ArcNode<U> = self;
let me: *const *const ArcNode<U> = &me;
let me = me as *const Arc<Node<U>>;
Node::notify(&*me)
}
}
}
unsafe impl<U: Unpark> UnsafeNotify for ArcNode<U> {
unsafe fn clone_raw(&self) -> NotifyHandle {
let me: *const ArcNode<U> = self;
let me: *const *const ArcNode<U> = &me;
let me = &*(me as *const Arc<Node<U>>);
Notify(me).into()
}
unsafe fn drop_raw(&self) {
let mut me: *const ArcNode<U> = self;
let me = &mut me as *mut *const ArcNode<U> as *mut Arc<Node<U>>;
ptr::drop_in_place(me);
}
}
unsafe fn hide_lt<U: Unpark>(p: *mut ArcNode<U>) -> *mut dyn UnsafeNotify {
mem::transmute(p as *mut dyn UnsafeNotify)
}
impl<U: Unpark> Node<U> {
fn notify(me: &Arc<Node<U>>) {
let inner = match me.queue.upgrade() {
Some(inner) => inner,
None => return,
};
let prev = me.queued.swap(true, SeqCst);
if !prev {
let tick_num = inner.tick_num.load(SeqCst);
me.notified_at.store(tick_num, SeqCst);
inner.enqueue(&**me);
inner.unpark.unpark();
}
}
}
impl<U> Drop for Node<U> {
fn drop(&mut self) {
unsafe {
if (*self.item.get()).is_some() {
abort("item still here when dropping");
}
}
}
}
fn arc2ptr<T>(ptr: Arc<T>) -> *const T {
let addr = &*ptr as *const T;
mem::forget(ptr);
return addr;
}
unsafe fn ptr2arc<T>(ptr: *const T) -> Arc<T> {
let anchor = mem::transmute::<usize, Arc<T>>(0x10);
let addr = &*anchor as *const T;
mem::forget(anchor);
let offset = addr as isize - 0x10;
mem::transmute::<isize, Arc<T>>(ptr as isize - offset)
}
fn abort(s: &str) -> ! {
struct DoublePanic;
impl Drop for DoublePanic {
fn drop(&mut self) {
panic!("panicking twice to abort the program");
}
}
let _bomb = DoublePanic;
panic!("{}", s);
}