use latch::{LatchProbe};
#[allow(warnings)]
use log::Event::*;
use futures::{Async, Poll};
use futures::executor;
use futures::future::CatchUnwind;
use futures::task::{self, Spawn, Task, Unpark};
use job::{Job, JobRef};
use registry::{Registry, WorkerThread};
use std::any::Any;
use std::panic::AssertUnwindSafe;
use std::mem;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::*;
use std::sync::Mutex;
use unwind;
pub use futures::Future;
const STATE_PARKED: usize = 0;
const STATE_UNPARKED: usize = 1;
const STATE_EXECUTING: usize = 2;
const STATE_EXECUTING_UNPARKED: usize = 3;
const STATE_COMPLETE: usize = 4;
pub struct RayonFuture<T, E> {
inner: Arc<ScopeFutureTrait<Result<T, E>, Box<Any + Send + 'static>>>,
}
pub unsafe trait FutureScope<'scope> {
fn registry(&self) -> Arc<Registry>;
fn future_panicked(self, err: Box<Any + Send>);
fn future_completed(self);
}
pub fn new_rayon_future<'scope, F, S>(future: F, scope: S) -> RayonFuture<F::Item, F::Error>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{
let inner = ScopeFuture::spawn(future, scope);
unsafe {
return RayonFuture { inner: hide_lifetime(inner) };
}
unsafe fn hide_lifetime<'l, T, E>(x: Arc<ScopeFutureTrait<T, E> + 'l>)
-> Arc<ScopeFutureTrait<T, E>> {
mem::transmute(x)
}
}
impl<T, E> RayonFuture<T, E> {
pub fn rayon_wait(mut self) -> Result<T, E> {
let worker_thread = WorkerThread::current();
if worker_thread.is_null() {
self.wait()
} else {
unsafe {
(*worker_thread).wait_until(&*self.inner);
debug_assert!(self.inner.probe());
self.poll().map(|a_v| match a_v {
Async::Ready(v) => v,
Async::NotReady => panic!("probe() returned true but poll not ready")
})
}
}
}
}
impl<T, E> Future for RayonFuture<T, E> {
type Item = T;
type Error = E;
fn wait(self) -> Result<T, E> {
if WorkerThread::current().is_null() {
executor::spawn(self).wait_future()
} else {
panic!("using `wait()` in a Rayon thread is unwise; try `rayon_wait()`")
}
}
fn poll(&mut self) -> Poll<T, E> {
match self.inner.poll() {
Ok(Async::Ready(Ok(v))) => Ok(Async::Ready(v)),
Ok(Async::Ready(Err(e))) => Err(e),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => unwind::resume_unwinding(e),
}
}
}
impl<T, E> Drop for RayonFuture<T, E> {
fn drop(&mut self) {
self.inner.cancel();
}
}
struct ScopeFuture<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{
state: AtomicUsize,
registry: Arc<Registry>,
contents: Mutex<ScopeFutureContents<'scope, F, S>>,
}
type CU<F> = CatchUnwind<AssertUnwindSafe<F>>;
type CUItem<F> = <CU<F> as Future>::Item;
type CUError<F> = <CU<F> as Future>::Error;
struct ScopeFutureContents<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{
spawn: Option<Spawn<CU<F>>>,
unpark: Option<Arc<Unpark>>,
this: Option<Arc<ScopeFuture<'scope, F, S>>>,
scope: Option<S>,
waiting_task: Option<Task>,
result: Poll<CUItem<F>, CUError<F>>,
canceled: bool,
}
unsafe impl<'scope, F, S> Send for ScopeFuture<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{}
unsafe impl<'scope, F, S> Sync for ScopeFuture<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{}
impl<'scope, F, S> ScopeFuture<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{
fn spawn(future: F, scope: S) -> Arc<Self> {
let spawn = task::spawn(AssertUnwindSafe(future).catch_unwind());
let future: Arc<Self> = Arc::new(ScopeFuture::<F, S> {
state: AtomicUsize::new(STATE_PARKED),
registry: scope.registry(),
contents: Mutex::new(ScopeFutureContents {
spawn: None,
unpark: None,
this: None,
scope: Some(scope),
waiting_task: None,
result: Ok(Async::NotReady),
canceled: false,
}),
});
{
let mut contents = future.contents.try_lock().unwrap();
contents.spawn = Some(spawn);
contents.unpark = Some(Self::make_unpark(&future));
contents.this = Some(future.clone());
}
future.unpark();
future
}
unsafe fn into_job_ref(this: Arc<Self>) -> JobRef {
let this: *const Self = mem::transmute(this);
JobRef::new(this)
}
fn make_unpark(this: &Arc<Self>) -> Arc<Unpark> {
unsafe {
return hide_lifetime(this.clone());
}
unsafe fn hide_lifetime<'l>(x: Arc<Unpark + 'l>) -> Arc<Unpark> {
mem::transmute(x)
}
}
fn unpark_inherent(&self) {
loop {
match self.state.load(Relaxed) {
STATE_PARKED => {
if {
self.state
.compare_exchange_weak(STATE_PARKED, STATE_UNPARKED, Release, Relaxed)
.is_ok()
} {
let contents = self.contents.lock().unwrap();
unsafe {
let job_ref = Self::into_job_ref(contents.this.clone().unwrap());
self.registry.inject_or_push(job_ref);
}
return;
}
}
STATE_EXECUTING => {
if {
self.state
.compare_exchange_weak(STATE_EXECUTING,
STATE_EXECUTING_UNPARKED,
Release,
Relaxed)
.is_ok()
} {
return;
}
}
state => {
debug_assert!(state == STATE_UNPARKED || state == STATE_EXECUTING_UNPARKED ||
state == STATE_COMPLETE);
return;
}
}
}
}
fn begin_execute_state(&self) {
let state = self.state.load(Acquire);
debug_assert_eq!(state, STATE_UNPARKED);
let result = self.state.compare_exchange(state, STATE_EXECUTING, Release, Relaxed);
debug_assert_eq!(result, Ok(STATE_UNPARKED));
}
fn end_execute_state(&self) -> bool {
loop {
match self.state.load(Relaxed) {
STATE_EXECUTING => {
if {
self.state
.compare_exchange_weak(STATE_EXECUTING, STATE_PARKED, Release, Relaxed)
.is_ok()
} {
return true;
}
}
state => {
debug_assert_eq!(state, STATE_EXECUTING_UNPARKED);
if {
self.state
.compare_exchange_weak(state, STATE_EXECUTING, Release, Relaxed)
.is_ok()
} {
return false;
}
}
}
}
}
}
impl<'scope, F, S> Unpark for ScopeFuture<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{
fn unpark(&self) {
self.unpark_inherent();
}
}
impl<'scope, F, S> Job for ScopeFuture<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{
unsafe fn execute(this: *const Self) {
let this: Arc<Self> = mem::transmute(this);
let mut contents = this.contents.lock().unwrap();
log!(FutureExecute { state: this.state.load(Relaxed) });
this.begin_execute_state();
loop {
if contents.canceled {
return contents.complete(Ok(Async::NotReady));
} else {
match contents.poll() {
Ok(Async::Ready(v)) => {
log!(FutureExecuteReady);
return contents.complete(Ok(Async::Ready(v)));
}
Ok(Async::NotReady) => {
log!(FutureExecuteNotReady);
if this.end_execute_state() {
return;
}
}
Err(err) => {
log!(FutureExecuteErr);
return contents.complete(Err(err));
}
}
}
}
}
}
impl<'scope, F, S> ScopeFutureContents<'scope, F, S>
where F: Future + Send + 'scope, S: FutureScope<'scope>,
{
fn poll(&mut self) -> Poll<CUItem<F>, CUError<F>> {
let unpark = self.unpark.clone().unwrap();
self.spawn.as_mut().unwrap().poll_future(unpark)
}
fn complete(&mut self, value: Poll<CUItem<F>, CUError<F>>) {
log!(FutureComplete);
mem::drop(self.spawn.take().unwrap());
self.unpark = None;
self.result = value;
let this = self.this.take().unwrap();
if cfg!(debug_assertions) {
let state = this.state.load(Relaxed);
debug_assert!(state == STATE_EXECUTING || state == STATE_EXECUTING_UNPARKED,
"cannot complete when not executing (state = {})",
state);
}
this.state.store(STATE_COMPLETE, Release);
let mut err = None;
if let Some(waiting_task) = self.waiting_task.take() {
log!(FutureUnparkWaitingTask);
match unwind::halt_unwinding(|| waiting_task.unpark()) {
Ok(()) => { }
Err(e) => { err = Some(e); }
}
}
let scope = self.scope.take().unwrap();
if let Some(err) = err {
scope.future_panicked(err);
} else {
scope.future_completed();
}
}
}
impl<'scope, F, S> LatchProbe for ScopeFuture<'scope, F, S>
where F: Future + Send, S: FutureScope<'scope>,
{
fn probe(&self) -> bool {
self.state.load(Acquire) == STATE_COMPLETE
}
}
pub trait ScopeFutureTrait<T, E>: Send + Sync + LatchProbe {
fn poll(&self) -> Poll<T, E>;
fn cancel(&self);
}
impl<'scope, F, S> ScopeFutureTrait<CUItem<F>, CUError<F>> for ScopeFuture<'scope, F, S>
where F: Future + Send, S: FutureScope<'scope>,
{
fn poll(&self) -> Poll<CUItem<F>, CUError<F>> {
let mut contents = self.contents.lock().unwrap();
let state = self.state.load(Relaxed);
if state == STATE_COMPLETE {
let r = mem::replace(&mut contents.result, Ok(Async::NotReady));
return r;
} else {
log!(FutureInstallWaitingTask { state: state });
contents.waiting_task = Some(task::park());
Ok(Async::NotReady)
}
}
fn cancel(&self) {
if self.state.load(Relaxed) == STATE_COMPLETE {
return;
}
let unpark = {
let mut contents = self.contents.lock().unwrap();
contents.canceled = true;
contents.unpark.clone()
};
if let Some(u) = unpark {
u.unpark();
}
}
}
#[cfg(test)]
mod test;