mod blocking;
mod blocking_state;
mod state;
pub(crate) use self::blocking::{Blocking, CanBlock};
use self::blocking_state::BlockingState;
use self::state::State;
use notifier::Notifier;
use pool::Pool;
use futures::executor::{self, Spawn};
use futures::{self, Async, Future};
use std::cell::{Cell, UnsafeCell};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use std::sync::atomic::{AtomicPtr, AtomicUsize};
use std::sync::Arc;
use std::{fmt, panic, ptr};
pub(crate) struct Task {
state: AtomicUsize,
blocking: AtomicUsize,
next_blocking: AtomicPtr<Task>,
pub reg_worker: Cell<Option<u32>>,
pub reg_index: Cell<usize>,
future: UnsafeCell<Option<Spawn<BoxFuture>>>,
}
#[derive(Debug)]
pub(crate) enum Run {
Idle,
Schedule,
Complete,
}
type BoxFuture = Box<dyn Future<Item = (), Error = ()> + Send + 'static>;
impl Task {
pub fn new(future: BoxFuture) -> Task {
let task_fut = executor::spawn(future);
Task {
state: AtomicUsize::new(State::new().into()),
blocking: AtomicUsize::new(BlockingState::new().into()),
next_blocking: AtomicPtr::new(ptr::null_mut()),
reg_worker: Cell::new(None),
reg_index: Cell::new(0),
future: UnsafeCell::new(Some(task_fut)),
}
}
fn stub() -> Task {
let future = Box::new(futures::empty()) as BoxFuture;
let task_fut = executor::spawn(future);
Task {
state: AtomicUsize::new(State::stub().into()),
blocking: AtomicUsize::new(BlockingState::new().into()),
next_blocking: AtomicPtr::new(ptr::null_mut()),
reg_worker: Cell::new(None),
reg_index: Cell::new(0),
future: UnsafeCell::new(Some(task_fut)),
}
}
pub fn run(&self, unpark: &Arc<Notifier>) -> Run {
use self::State::*;
let actual: State = self
.state
.compare_and_swap(Scheduled.into(), Running.into(), AcqRel)
.into();
match actual {
Scheduled => {}
_ => panic!("unexpected task state; {:?}", actual),
}
trace!(
"Task::run; state={:?}",
State::from(self.state.load(Relaxed))
);
let fut = unsafe { &mut (*self.future.get()) };
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
struct Guard<'a>(&'a mut Option<Spawn<BoxFuture>>, bool);
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
if self.1 {
let _ = self.0.take();
}
}
}
let mut g = Guard(fut, true);
let ret =
g.0.as_mut()
.unwrap()
.poll_future_notify(unpark, self as *const _ as usize);
g.1 = false;
ret
}));
match res {
Ok(Ok(Async::Ready(_))) | Ok(Err(_)) | Err(_) => {
trace!(" -> task complete");
self.drop_future();
self.state.store(State::Complete.into(), Release);
if let Err(panic_err) = res {
if let Some(ref f) = unpark.pool.config.panic_handler {
f(panic_err);
}
}
Run::Complete
}
Ok(Ok(Async::NotReady)) => {
trace!(" -> not ready");
let prev: State = self
.state
.compare_and_swap(Running.into(), Idle.into(), AcqRel)
.into();
match prev {
Running => Run::Idle,
Notified => {
self.state.store(Scheduled.into(), Release);
Run::Schedule
}
_ => unreachable!(),
}
}
}
}
pub fn abort(&self) {
use self::State::*;
let mut state = self.state.load(Acquire).into();
loop {
match state {
Idle | Scheduled => {}
Running | Notified | Complete | Aborted => {
panic!("unexpected state while aborting task: {:?}", state);
}
}
let actual = self
.state
.compare_and_swap(state.into(), Aborted.into(), AcqRel)
.into();
if actual == state {
self.drop_future();
break;
}
state = actual;
}
}
pub fn notify(me: Arc<Task>, pool: &Arc<Pool>) {
if me.schedule() {
let _ = pool.submit(me, pool);
}
}
pub fn notify_blocking(me: Arc<Task>, pool: &Arc<Pool>) {
BlockingState::notify_blocking(&me.blocking, AcqRel);
Task::notify(me, pool);
}
pub fn schedule(&self) -> bool {
use self::State::*;
loop {
let actual = self
.state
.compare_and_swap(Idle.into(), Scheduled.into(), AcqRel)
.into();
match actual {
Idle => return true,
Running => {
let actual = self
.state
.compare_and_swap(Running.into(), Notified.into(), AcqRel)
.into();
match actual {
Idle => continue,
_ => return false,
}
}
Complete | Aborted | Notified | Scheduled => return false,
}
}
}
pub fn consume_blocking_allocation(&self) -> CanBlock {
BlockingState::consume_allocation(&self.blocking, AcqRel)
}
fn drop_future(&self) {
let _ = unsafe { (*self.future.get()).take() };
}
}
impl fmt::Debug for Task {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Task")
.field("state", &self.state)
.field("future", &"Spawn<BoxFuture>")
.finish()
}
}