use std::mem;
use std::vec::Vec;
use std::sync::{Arc, RwLock};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::ops::Deref;
use {Future, Poll, Async};
use task::{self, Task};
use lock::Lock;
#[must_use = "futures do nothing unless polled"]
pub struct Shared<F>
where F: Future
{
inner: Arc<Inner<F>>,
}
struct Inner<F>
where F: Future
{
original_future: Lock<Option<F>>,
result_ready: AtomicBool,
state: RwLock<State<F::Item, F::Error>>,
}
enum State<T, E> {
Waiting(Vec<Task>),
Done(Result<Arc<T>, Arc<E>>),
}
impl<F> Shared<F>
where F: Future
{
pub fn new(future: F) -> Self {
Shared {
inner: Arc::new(Inner {
original_future: Lock::new(Some(future)),
result_ready: AtomicBool::new(false),
state: RwLock::new(State::Waiting(vec![])),
}),
}
}
fn park(&self) -> Poll<SharedItem<F::Item>, SharedError<F::Error>> {
let me = task::park();
match *self.inner.state.write().unwrap() {
State::Waiting(ref mut list) => {
list.push(me);
Ok(Async::NotReady)
}
State::Done(Ok(ref e)) => Ok(SharedItem { item: e.clone() }.into()),
State::Done(Err(ref e)) => Err(SharedError { error: e.clone() }),
}
}
}
impl<F> Future for Shared<F>
where F: Future
{
type Item = SharedItem<F::Item>;
type Error = SharedError<F::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if !self.inner.result_ready.load(SeqCst) {
match self.inner.original_future.try_lock() {
Some(_) if self.inner.result_ready.load(SeqCst) => {}
Some(mut future) => {
let result = match future.as_mut().unwrap().poll() {
Ok(Async::NotReady) => {
drop(future);
return self.park()
}
Ok(Async::Ready(item)) => Ok(Arc::new(item)),
Err(error) => Err(Arc::new(error)),
};
*future = None;
let waiters = {
let mut state = self.inner.state.write().unwrap();
self.inner.result_ready.store(true, SeqCst);
match mem::replace(&mut *state, State::Done(result)) {
State::Waiting(waiters) => waiters,
State::Done(_) => panic!("store_result() was called twice"),
}
};
for task in waiters {
task.unpark();
}
}
None => return self.park(),
}
}
let result = match *self.inner.state.read().unwrap() {
State::Done(ref result) => result.clone(),
State::Waiting(_) => panic!("still waiting, not done yet"),
};
match result {
Ok(e) => Ok(SharedItem { item: e }.into()),
Err(e) => Err(SharedError { error: e }),
}
}
}
impl<F> Clone for Shared<F>
where F: Future
{
fn clone(&self) -> Self {
Shared { inner: self.inner.clone() }
}
}
#[derive(Debug)]
pub struct SharedItem<T> {
item: Arc<T>,
}
impl<T> Deref for SharedItem<T> {
type Target = T;
fn deref(&self) -> &T {
&self.item.as_ref()
}
}
#[derive(Debug)]
pub struct SharedError<E> {
error: Arc<E>,
}
impl<E> Deref for SharedError<E> {
type Target = E;
fn deref(&self) -> &E {
&self.error.as_ref()
}
}