use futures::{Future, Canceled, Complete, Oneshot, Async, Poll};
use futures::executor::{self, Unpark, Spawn};
use ::{Result, Error};
use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use error::RequestPanicked;
use super::request::RequestHead;
#[must_use = "Result of request is unknown unless polled for"]
pub struct Call<T> {
state: CallState<T>,
notify: Arc<Notify>,
}
enum CallState<T> {
Waiting(CallFuture<T>),
Immediate(Result<T>),
Taken
}
type CallFuture<T> = Spawn<Oneshot<Result<T>>>;
impl<T> Call<T> {
pub fn ignore(self) {}
pub fn ignore_ok(self) -> Result<()> { Ok(()) }
pub fn block(self) -> Result<T> {
self.wait()
}
pub fn check(&mut self) -> Option<Result<T>> {
match self.poll_no_task() {
Ok(Async::Ready(val)) => Some(Ok(val)),
Ok(Async::NotReady) | Err(Error::ResultTaken) => None,
Err(e) => Some(Err(e))
}
}
pub fn is_available(&self) -> bool {
if let CallState::Immediate(_) = self.state {
true
} else {
self.notify.check()
}
}
pub fn result_taken(&self) -> bool {
if let CallState::Taken = self.state {
true
} else {
false
}
}
pub fn poll_no_task(&mut self) -> Poll<T, Error> {
let notify = self.notify.clone();
self.poll_by(move |fut| fut.poll_future(notify))
}
fn poll_by<F>(&mut self, poll: F) -> Poll<T, Error>
where F: FnOnce(&mut CallFuture<T>) -> Poll<Result<T>, Canceled> {
match self.state {
CallState::Waiting(ref mut future) => return map_poll(poll(future)),
CallState::Taken => return Err(Error::ResultTaken),
_ => (),
}
if let CallState::Immediate(res) = mem::replace(&mut self.state, CallState::Taken) {
res.map(Async::Ready)
} else {
unreachable!();
}
}
}
impl<T> Future for Call<T> {
type Item = T;
type Error = Error;
fn poll(&mut self) -> Poll<T, Error> {
self.poll_by(|fut| fut.get_mut().poll())
}
}
#[derive(Default)]
struct Notify(AtomicBool);
impl Notify {
fn check(&self) -> bool {
self.0.load(Ordering::Relaxed)
}
}
impl Unpark for Notify {
fn unpark(&self) {
self.0.store(true, Ordering::Relaxed);
}
}
pub fn oneshot<T>(head: Option<RequestHead>) -> (PanicGuard<T>, Call<T>) {
let (tx, rx) = ::futures::oneshot();
let guard = PanicGuard {
head: head,
tx: Some(tx)
};
(guard, Call {
state: CallState::Waiting(executor::spawn(rx)),
notify: Default::default(),
})
}
pub fn immediate<T>(res: Result<T>) -> Call<T> {
Call {
state: CallState::Immediate(res),
notify: Default::default(),
}
}
pub struct PanicGuard<T> {
head: Option<RequestHead>,
tx: Option<Complete<Result<T>>>,
}
impl<T> PanicGuard<T> {
pub fn head_mut(&mut self) -> &mut RequestHead {
self.head.as_mut().expect("PanicGuard::head was None")
}
pub fn complete(&mut self, res: Result<T>) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(res);
}
}
}
impl<T> Drop for PanicGuard<T> {
fn drop(&mut self) {
if let Some(head) = self.head.take() {
self.complete(Err(RequestPanicked(head).into()));
}
}
}
fn map_poll<T>(poll: Poll<Result<T>, Canceled>) -> Poll<T, Error> {
let ret = match try!(poll) {
Async::Ready(val) => Async::Ready(try!(val)),
Async::NotReady => Async::NotReady,
};
Ok(ret)
}