alternate-future 0.1.4

Simple promise/future library with continuations that don't block.


#![feature(fnbox)]

extern crate spin;
extern crate num_cpus;
#[macro_use]
extern crate lazy_static;

use std::sync::Arc;
use std::thread::{self, Thread};
use std::boxed::FnBox;
use std::mem;
use std::error::Error;
use std::fmt::{self, Display};
use spin::Mutex as SpinMutex;

use threadpool::ThreadPool;


mod threadpool;


lazy_static!
{
    static ref POOL : SpinMutex<ThreadPool> = SpinMutex::new(ThreadPool::new(num_cpus::get()));
}


pub type AwaitResult <T> = Result<T, AwaitError>;


pub fn promise_future <T> () -> (Promise<T>, Future<T>)
    where T : Send + 'static
{
    let store = Arc::new(SpinMutex::new((PromiseValue::Impending, None, None)));

    (Promise(Some(store.clone()), false), Future(FutureInner::Impending(store)))
}
fn continue_future <T> () -> (Promise<T>, Future<T>)
    where T : Send + 'static
{
    let store = Arc::new(SpinMutex::new((PromiseValue::Impending, None, None)));

    (Promise(Some(store.clone()), true), Future(FutureInner::Impending(store)))
}

pub fn spawn_future <F, T> (func : F) -> Future<T>
    where F : FnOnce() -> T + Send + 'static, T : Send + 'static
{
    let (p, f) = promise_future();
    POOL.lock().execute(move|| p.fulfill(func()));
    f
}


#[must_use]
pub struct Promise <T> (Option<Arc<SpinMutex<(PromiseValue<T>, Option<Box<FnBox(T) + Send + 'static>>, Option<Thread>)>>>, bool)
    where T : Send + 'static;
impl <T> Promise<T>
    where T : Send + 'static
{
    pub fn fulfill (mut self, val : T)
    {
        let store = self.0.take().unwrap();
        let mut lock = store.lock();

        if let Some(cont) = lock.1.take() {
            if self.1 {
                cont.call_box((val,));
            } else {
                POOL.lock().execute(move|| cont.call_box((val,)));
            }
        } else {
            lock.0 = PromiseValue::Present(val);
            if let Some(ref thread) = lock.2 {
                thread.unpark();
            }
        }
    }
}
impl <T, E> Promise<Result<T, E>>
    where T : Send + 'static, E : Send + 'static
{
    #[inline]
    pub fn ok (self, val : T)
    {
        self.fulfill(Ok(val));
    }
    #[inline]
    pub fn err (self, err : E)
    {
        self.fulfill(Err(err));
    }
}
impl <T> Drop for Promise<T>
    where T : Send + 'static
{
    fn drop (&mut self)
    {
        if let Some(store) = self.0.take() {
            let mut lock = store.lock();
            lock.0 = PromiseValue::Broken;
            if let Some(ref thread) = lock.2 {
                thread.unpark();
            }
        }
    }
}

pub enum PromiseValue <T>
    where T : Send + 'static
{
    Impending,
    Present (T),
    Broken,
}
impl <T> PromiseValue<T>
    where T : Send + 'static
{
    pub fn take (&mut self) -> PromiseValue<T>
    {
        let mut pv = PromiseValue::Impending;
        mem::swap(self, &mut pv);
        pv
    }
}


pub struct Future <T> (FutureInner<T>)
    where T : Send + 'static;
enum FutureInner <T>
    where T : Send + 'static
{
    Impending (Arc<SpinMutex<(PromiseValue<T>, Option<Box<FnBox(T) + Send + 'static>>, Option<Thread>)>>),
    Present   (T),
    Broken,
}
impl <T> Future<T>
    where T : Send + 'static
{
    #[inline]
    pub fn present (val : T) -> Future<T> { Future(FutureInner::Present(val)) }
    

    pub fn poll (&mut self) -> FutureState
    {
        let inner = {
            let store = match self.0 {
                FutureInner::Impending(ref mut store) => store,
                FutureInner::Present(..) => return FutureState::Present,
                FutureInner::Broken => return FutureState::Broken,
            };

            match store.lock().0.take() {
                PromiseValue::Impending => return FutureState::Impending,
                PromiseValue::Present(val) => FutureInner::Present(val),
                PromiseValue::Broken => FutureInner::Broken,
            }
        };
        self.0 = inner;

        match self.0 {
            FutureInner::Impending(..) => FutureState::Impending,
            FutureInner::Present(..) => FutureState::Present,
            FutureInner::Broken => FutureState::Broken,
        }
    }


    pub fn await (self) -> AwaitResult<T>
    {
        match self.0 {
            FutureInner::Impending(store) => {
                loop {
                    {
                        let mut lock = store.lock();
                        match lock.0.take() {
                            PromiseValue::Impending => (),
                            PromiseValue::Present(val) => return Ok(val),
                            PromiseValue::Broken => return Err(AwaitError::Broken),
                        }
                        if let None = lock.2 {
                            lock.2 = Some(thread::current());
                        }
                    }
                    thread::park();
                }
            },
            FutureInner::Present(val) => Ok(val),
            FutureInner::Broken => Err(AwaitError::Broken),
        }
    }

    pub fn then <F, U> (self, cont : F) -> Future<U>
        where F : FnOnce(T) -> U + Send + 'static, U : Send + 'static
    {
        match self.0 {
            FutureInner::Impending(store) => {
                let mut lock = store.lock();

                match lock.0.take() {
                    PromiseValue::Impending => {
                        let (p, f) = continue_future();
                        lock.1 = Some(Box::new(move|val| p.fulfill(cont(val))));
                        f
                    },
                    PromiseValue::Present(val) => {
                        let (p, f) = continue_future();
                        thread::spawn(move|| p.fulfill(cont(val)));
                        f
                    },
                    PromiseValue::Broken => Future(FutureInner::Broken),
                }
            },
            FutureInner::Present(val) => {
                let (p, f) = continue_future();
                thread::spawn(move|| p.fulfill(cont(val)));
                f
            },
            FutureInner::Broken => Future(FutureInner::Broken),
        }
    }
}
impl <T, E> Future<Result<T, E>>
    where T : Send + 'static, E : Send + From<AwaitError> + 'static
{
    pub fn result (self) -> Result<T, E>
    {
        match self.await() {
            Ok(res) => res,
            Err(e) => Err(e.into()),
        }
    }

    pub fn and_then <F, U, G> (self, cont : F) -> Future<Result<U, G>>
        where F : FnOnce(T) -> Result<U, G> + Send + 'static, U : Send + 'static, G : Send + From<E> + 'static
    {
        self.then(|res| match res {
            Ok(val) => cont(val),
            Err(e) => Err(e.into()),
        })
    }
}


#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum FutureState
{
    Impending,
    Present,
    Broken,
}


#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum AwaitError
{
    Broken,
}
impl Error for AwaitError
{
    fn description (&self) -> &str
    {
        match *self {
            AwaitError::Broken => "Broken future",
        }

    }

    fn cause (&self) -> Option<&Error> { None }
}
impl Display for AwaitError
{
    fn fmt (&self, fmt : &mut fmt::Formatter) -> ::std::result::Result<(), ::std::fmt::Error>
    {
        write!(fmt, "AwaitError: {}", self.description())
    }
}