#![crate_name = "eventual"]
#![deny(warnings)]
extern crate syncbox;
extern crate time;
#[macro_use]
extern crate log;
pub use self::future::{Future, Complete};
pub use self::join::{join, Join};
pub use self::receipt::Receipt;
pub use self::run::{background, defer};
pub use self::select::{select, Select};
pub use self::sequence::sequence;
pub use self::stream::{Stream, StreamIter, Sender, BusySender};
pub use self::timer::Timer;
use std::error::Error;
use std::fmt;
mod core;
mod future;
mod join;
mod process;
mod receipt;
mod run;
mod select;
mod sequence;
mod stream;
mod timer;
pub trait Async : Send + 'static + Sized {
type Value: Send + 'static;
type Error: Send + 'static;
type Cancel: Cancel<Self>;
fn is_ready(&self) -> bool;
fn is_err(&self) -> bool;
fn poll(self) -> Result<AsyncResult<Self::Value, Self::Error>, Self>;
fn expect(self) -> AsyncResult<Self::Value, Self::Error> {
if let Ok(v) = self.poll() {
return v;
}
panic!("the async value is not ready");
}
fn ready<F>(self, f: F) -> Self::Cancel where F: FnOnce(Self) + Send + 'static;
fn receive<F>(self, f: F)
where F: FnOnce(AsyncResult<Self::Value, Self::Error>) + Send + 'static {
self.ready(move |async| {
match async.poll() {
Ok(res) => f(res),
Err(_) => panic!("ready callback invoked but is not actually ready"),
}
});
}
fn await(self) -> AsyncResult<Self::Value, Self::Error> {
use std::sync::mpsc::channel;
let (tx, rx) = channel();
self.receive(move |res| tx.send(res).ok().expect("receiver thread died"));
rx.recv().ok().expect("async disappeared without a trace")
}
fn fire(self) {
self.receive(drop)
}
fn and<U: Async<Error=Self::Error>>(self, next: U) -> Future<U::Value, Self::Error> {
self.and_then(move |_| next)
}
fn and_then<F, U: Async<Error=Self::Error>>(self, f: F) -> Future<U::Value, Self::Error>
where F: FnOnce(Self::Value) -> U + Send + 'static,
U::Value: Send + 'static {
let (complete, ret) = Future::pair();
complete.receive(move |c| {
if let Ok(complete) = c {
self.receive(move |res| {
match res {
Ok(v) => {
f(v).receive(move |res| {
match res {
Ok(u) => complete.complete(u),
Err(AsyncError::Failed(e)) => complete.fail(e),
_ => {}
}
});
}
Err(AsyncError::Failed(e)) => complete.fail(e),
_ => {}
}
});
}
});
ret
}
fn or<A>(self, alt: A) -> Future<Self::Value, A::Error>
where A: Async<Value=Self::Value> {
self.or_else(move |_| alt)
}
fn or_else<F, A>(self, f: F) -> Future<Self::Value, A::Error>
where F: FnOnce(Self::Error) -> A + Send + 'static,
A: Async<Value=Self::Value> {
let (complete, ret) = Future::pair();
complete.receive(move |c| {
if let Ok(complete) = c {
self.receive(move |res| {
match res {
Ok(v) => complete.complete(v),
Err(AsyncError::Failed(e)) => {
f(e).receive(move |res| {
match res {
Ok(v) => complete.complete(v),
Err(AsyncError::Failed(e)) => complete.fail(e),
_ => {}
}
});
}
Err(AsyncError::Aborted) => drop(complete),
}
});
}
});
ret
}
}
pub trait Pair {
type Tx;
fn pair() -> (Self::Tx, Self);
}
pub trait Cancel<A: Send + 'static> : Send + 'static {
fn cancel(self) -> Option<A>;
}
impl<T: Send + 'static, E: Send + 'static> Async for Result<T, E> {
type Value = T;
type Error = E;
type Cancel = Option<Result<T, E>>;
fn is_ready(&self) -> bool {
true
}
fn is_err(&self) -> bool {
self.is_err()
}
fn poll(self) -> Result<AsyncResult<T, E>, Result<T, E>> {
Ok(self.await())
}
fn ready<F: FnOnce(Result<T, E>) + Send + 'static>(self, f: F) -> Option<Result<T, E>> {
f(self);
None
}
fn await(self) -> AsyncResult<T, E> {
self.map_err(|e| AsyncError::Failed(e))
}
}
impl<A: Send + 'static> Cancel<A> for Option<A> {
fn cancel(self) -> Option<A> {
self
}
}
impl Async for () {
type Value = ();
type Error = ();
type Cancel = Option<()>;
fn is_ready(&self) -> bool {
true
}
fn is_err(&self) -> bool {
false
}
fn poll(self) -> Result<AsyncResult<(), ()>, ()> {
Ok(Ok(self))
}
fn ready<F: FnOnce(()) + Send + 'static>(self, f: F) -> Option<()> {
f(self);
None
}
fn await(self) -> AsyncResult<(), ()> {
Ok(self)
}
}
pub type AsyncResult<T, E> = Result<T, AsyncError<E>>;
#[derive(Eq, PartialEq)]
pub enum AsyncError<E: Send + 'static> {
Failed(E),
Aborted,
}
impl<E: Send + 'static> AsyncError<E> {
pub fn failed(err: E) -> AsyncError<E> {
AsyncError::Failed(err)
}
pub fn aborted() -> AsyncError<E> {
AsyncError::Aborted
}
pub fn is_aborted(&self) -> bool {
match *self {
AsyncError::Aborted => true,
_ => false,
}
}
pub fn is_failed(&self) -> bool {
match *self {
AsyncError::Failed(..) => true,
_ => false,
}
}
pub fn unwrap(self) -> E {
match self {
AsyncError::Failed(err) => err,
AsyncError::Aborted => panic!("unwrapping a cancellation error"),
}
}
pub fn take(self) -> Option<E> {
match self {
AsyncError::Failed(err) => Some(err),
_ => None,
}
}
}
impl<E: Send + Error + 'static> Error for AsyncError<E> {
fn description(&self) -> &str {
match *self {
AsyncError::Failed(ref e) => e.description(),
AsyncError::Aborted => "aborted",
}
}
fn cause(&self) -> Option<&Error> {
match *self {
AsyncError::Failed(ref e) => e.cause(),
AsyncError::Aborted => None,
}
}
}
impl<E: Send + 'static + fmt::Debug> fmt::Debug for AsyncError<E> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
AsyncError::Failed(ref e) => write!(fmt, "AsyncError::Failed({:?})", e),
AsyncError::Aborted => write!(fmt, "AsyncError::Aborted"),
}
}
}
impl<E: Send + 'static + fmt::Display> fmt::Display for AsyncError<E> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
AsyncError::Failed(ref e) => write!(fmt, "{}", e),
AsyncError::Aborted => write!(fmt, "[aborted]"),
}
}
}
trait BoxedReceive<T> : Send + 'static {
fn receive_boxed(self: Box<Self>, val: T);
}
impl<F: FnOnce(T) + Send + 'static, T> BoxedReceive<T> for F {
fn receive_boxed(self: Box<F>, val: T) {
(*self)(val)
}
}