use std::prelude::v1::*;
use std::cell::Cell;
use std::fmt;
use std::mem;
use std::sync::Arc;
use std::sync::atomic::{Ordering, AtomicBool, AtomicUsize, ATOMIC_USIZE_INIT};
use std::thread;
use {Poll, Future, Async, Stream, Sink, StartSend, AsyncSink};
use future::BoxFuture;
mod unpark_mutex;
use self::unpark_mutex::UnparkMutex;
mod task_rc;
mod data;
#[allow(deprecated)]
#[cfg(feature = "with-deprecated")]
pub use self::task_rc::TaskRc;
pub use self::data::LocalKey;
struct BorrowedTask<'a> {
id: usize,
unpark: &'a Arc<Unpark>,
map: &'a data::LocalMap,
events: Events,
}
thread_local!(static CURRENT_TASK: Cell<*const BorrowedTask<'static>> = {
Cell::new(0 as *const _)
});
fn fresh_task_id() -> usize {
static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
assert!(id < usize::max_value() / 2,
"too many previous tasks have been allocated");
id
}
fn set<'a, F, R>(task: &BorrowedTask<'a>, f: F) -> R
where F: FnOnce() -> R
{
struct Reset(*const BorrowedTask<'static>);
impl Drop for Reset {
fn drop(&mut self) {
CURRENT_TASK.with(|c| c.set(self.0));
}
}
CURRENT_TASK.with(move |c| {
let _reset = Reset(c.get());
let task = unsafe {
mem::transmute::<&BorrowedTask<'a>,
*const BorrowedTask<'static>>(task)
};
c.set(task);
f()
})
}
fn with<F: FnOnce(&BorrowedTask) -> R, R>(f: F) -> R {
let task = CURRENT_TASK.with(|c| c.get());
assert!(!task.is_null(), "no Task is currently running");
unsafe {
f(&*task)
}
}
#[derive(Clone)]
pub struct Task {
id: usize,
unpark: Arc<Unpark>,
events: Events,
}
fn _assert_kinds() {
fn _assert_send<T: Send>() {}
_assert_send::<Task>();
}
pub fn park() -> Task {
with(|task| {
Task {
id: task.id,
events: task.events.clone(),
unpark: task.unpark.clone(),
}
})
}
impl Task {
pub fn unpark(&self) {
self.events.trigger();
self.unpark.unpark();
}
pub fn is_current(&self) -> bool {
with(|current| current.id == self.id)
}
}
impl fmt::Debug for Task {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Task")
.field("id", &self.id)
.finish()
}
}
pub fn with_unpark_event<F, R>(event: UnparkEvent, f: F) -> R
where F: FnOnce() -> R
{
with(|task| {
let new_task = BorrowedTask {
id: task.id,
unpark: task.unpark,
events: task.events.with_event(event),
map: task.map,
};
set(&new_task, f)
})
}
#[derive(Clone)]
pub struct UnparkEvent {
set: Arc<EventSet>,
item: usize,
}
impl UnparkEvent {
pub fn new(set: Arc<EventSet>, id: usize) -> UnparkEvent {
UnparkEvent {
set: set,
item: id,
}
}
}
impl fmt::Debug for UnparkEvent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("UnparkEvent")
.field("set", &"...")
.field("item", &self.item)
.finish()
}
}
pub trait EventSet: Send + Sync + 'static {
fn insert(&self, id: usize);
}
#[derive(Clone)]
enum Events {
Zero,
One(UnparkEvent),
Lots(Vec<UnparkEvent>),
}
impl Events {
fn new() -> Events {
Events::Zero
}
fn trigger(&self) {
match *self {
Events::Zero => {}
Events::One(ref event) => event.set.insert(event.item),
Events::Lots(ref list) => {
for event in list {
event.set.insert(event.item);
}
}
}
}
fn with_event(&self, event: UnparkEvent) -> Events {
let mut list = match *self {
Events::Zero => return Events::One(event),
Events::One(ref event) => vec![event.clone()],
Events::Lots(ref list) => list.clone(),
};
list.push(event);
Events::Lots(list)
}
}
pub struct Spawn<T> {
obj: T,
id: usize,
data: data::LocalMap,
}
pub fn spawn<T>(obj: T) -> Spawn<T> {
Spawn {
obj: obj,
id: fresh_task_id(),
data: data::local_map(),
}
}
impl<T> Spawn<T> {
pub fn get_ref(&self) -> &T {
&self.obj
}
pub fn get_mut(&mut self) -> &mut T {
&mut self.obj
}
pub fn into_inner(self) -> T {
self.obj
}
}
impl<F: Future> Spawn<F> {
pub fn poll_future(&mut self, unpark: Arc<Unpark>) -> Poll<F::Item, F::Error> {
self.enter(&unpark, |f| f.poll())
}
pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
loop {
match try!(self.poll_future(unpark.clone())) {
Async::NotReady => unpark.park(),
Async::Ready(e) => return Ok(e),
}
}
}
pub fn execute(self, exec: Arc<Executor>)
where F: Future<Item=(), Error=()> + Send + 'static,
{
exec.clone().execute(Run {
spawn: Spawn {
id: self.id,
data: self.data,
obj: self.obj.boxed(),
},
inner: Arc::new(Inner {
exec: exec,
mutex: UnparkMutex::new()
}),
})
}
}
impl<S: Stream> Spawn<S> {
pub fn poll_stream(&mut self, unpark: Arc<Unpark>)
-> Poll<Option<S::Item>, S::Error> {
self.enter(&unpark, |stream| stream.poll())
}
pub fn wait_stream(&mut self) -> Option<Result<S::Item, S::Error>> {
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
loop {
match self.poll_stream(unpark.clone()) {
Ok(Async::NotReady) => unpark.park(),
Ok(Async::Ready(Some(e))) => return Some(Ok(e)),
Ok(Async::Ready(None)) => return None,
Err(e) => return Some(Err(e)),
}
}
}
}
impl<S: Sink> Spawn<S> {
pub fn start_send(&mut self, value: S::SinkItem, unpark: &Arc<Unpark>)
-> StartSend<S::SinkItem, S::SinkError> {
self.enter(unpark, |sink| sink.start_send(value))
}
pub fn poll_flush(&mut self, unpark: &Arc<Unpark>)
-> Poll<(), S::SinkError> {
self.enter(unpark, |sink| sink.poll_complete())
}
pub fn wait_send(&mut self, mut value: S::SinkItem)
-> Result<(), S::SinkError> {
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
let unpark2 = unpark.clone() as Arc<Unpark>;
loop {
value = match try!(self.start_send(value, &unpark2)) {
AsyncSink::NotReady(v) => v,
AsyncSink::Ready => return Ok(()),
};
unpark.park();
}
}
pub fn wait_flush(&mut self) -> Result<(), S::SinkError> {
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
let unpark2 = unpark.clone() as Arc<Unpark>;
loop {
if try!(self.poll_flush(&unpark2)).is_ready() {
return Ok(())
}
unpark.park();
}
}
}
impl<T> Spawn<T> {
fn enter<F, R>(&mut self, unpark: &Arc<Unpark>, f: F) -> R
where F: FnOnce(&mut T) -> R
{
let task = BorrowedTask {
id: self.id,
unpark: unpark,
events: Events::new(),
map: &self.data,
};
let obj = &mut self.obj;
set(&task, || f(obj))
}
}
impl<T: fmt::Debug> fmt::Debug for Spawn<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Spawn")
.field("obj", &self.obj)
.field("id", &self.id)
.finish()
}
}
pub trait Unpark: Send + Sync {
fn unpark(&self);
}
pub trait Executor: Send + Sync + 'static {
fn execute(&self, r: Run);
}
struct ThreadUnpark {
thread: thread::Thread,
ready: AtomicBool,
}
impl ThreadUnpark {
fn new(thread: thread::Thread) -> ThreadUnpark {
ThreadUnpark {
thread: thread,
ready: AtomicBool::new(false),
}
}
fn park(&self) {
if !self.ready.swap(false, Ordering::SeqCst) {
thread::park();
}
}
}
impl Unpark for ThreadUnpark {
fn unpark(&self) {
self.ready.store(true, Ordering::SeqCst);
self.thread.unpark()
}
}
pub struct Run {
spawn: Spawn<BoxFuture<(), ()>>,
inner: Arc<Inner>,
}
struct Inner {
mutex: UnparkMutex<Run>,
exec: Arc<Executor>,
}
impl Run {
pub fn run(self) {
let Run { mut spawn, inner } = self;
unsafe {
inner.mutex.start_poll();
loop {
match spawn.poll_future(inner.clone()) {
Ok(Async::NotReady) => {}
Ok(Async::Ready(())) |
Err(()) => return inner.mutex.complete(),
}
let run = Run { spawn: spawn, inner: inner.clone() };
match inner.mutex.wait(run) {
Ok(()) => return, Err(r) => spawn = r.spawn, }
}
}
}
}
impl fmt::Debug for Run {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Run")
.field("contents", &"...")
.finish()
}
}
impl Unpark for Inner {
fn unpark(&self) {
match self.mutex.notify() {
Ok(run) => self.exec.execute(run),
Err(()) => {}
}
}
}