futures 0.1.26

An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces.
Documentation
extern crate futures;

use futures::prelude::*;
use futures::future;
use futures::unsync::oneshot::{channel, Canceled, spawn};

mod support;
use support::local_executor;

#[test]
fn smoke() {
    let (tx, rx) = channel();
    tx.send(33).unwrap();
    assert_eq!(rx.wait().unwrap(), 33);
}

#[test]
fn canceled() {
    let (_, rx) = channel::<()>();
    assert_eq!(rx.wait().unwrap_err(), Canceled);
}

#[test]
fn poll_cancel() {
    let (mut tx, _) = channel::<()>();
    assert!(tx.poll_cancel().unwrap().is_ready());
}

#[test]
fn tx_complete_rx_unparked() {
    let (tx, rx) = channel();

    let res = rx.join(future::lazy(move || {
        tx.send(55).unwrap();
        Ok(11)
    }));
    assert_eq!(res.wait().unwrap(), (55, 11));
}

#[test]
fn tx_dropped_rx_unparked() {
    let (tx, rx) = channel::<i32>();

    let res = rx.join(future::lazy(move || {
        let _tx = tx;
        Ok(11)
    }));
    assert_eq!(res.wait().unwrap_err(), Canceled);
}


#[test]
fn is_canceled() {
    let (tx, rx) = channel::<u32>();
    assert!(!tx.is_canceled());
    drop(rx);
    assert!(tx.is_canceled());
}

#[test]
fn spawn_sends_items() {
    let core = local_executor::Core::new();
    let future = future::ok::<_, ()>(1);
    let rx = spawn(future, &core);
    assert_eq!(core.run(rx).unwrap(), 1);
}

#[test]
fn spawn_kill_dead_stream() {
    use std::thread;
    use std::time::Duration;
    use futures::future::Either;
    use futures::sync::oneshot;

    // a future which never returns anything (forever accepting incoming
    // connections), but dropping it leads to observable side effects
    // (like closing listening sockets, releasing limited resources,
    // ...)
    #[derive(Debug)]
    struct Dead {
        // when dropped you should get Err(oneshot::Canceled) on the
        // receiving end
        done: oneshot::Sender<()>,
    }
    impl Future for Dead {
        type Item = ();
        type Error = ();

        fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
            Ok(Async::NotReady)
        }
    }

    // need to implement a timeout for the test, as it would hang
    // forever right now
    let (timeout_tx, timeout_rx) = oneshot::channel();
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(1000));
        let _ = timeout_tx.send(());
    });

    let core = local_executor::Core::new();
    let (done_tx, done_rx) = oneshot::channel();
    let future = Dead{done: done_tx};
    let rx = spawn(future, &core);
    let res = core.run(
        Ok::<_, ()>(())
        .into_future()
        .then(move |_| {
            // now drop the spawned future: maybe some timeout exceeded,
            // or some connection on this end was closed by the remote
            // end.
            drop(rx);
            // and wait for the spawned future to release its resources
            done_rx
        })
        .select2(timeout_rx)
    );
    match res {
        Err(Either::A((oneshot::Canceled, _))) => (),
        Ok(Either::B(((), _))) => {
            panic!("dead future wasn't canceled (timeout)");
        },
        _ => {
            panic!("dead future wasn't canceled (unexpected result)");
        },
    }
}

#[test]
fn spawn_dont_kill_forgot_dead_stream() {
    use std::thread;
    use std::time::Duration;
    use futures::future::Either;
    use futures::sync::oneshot;

    // a future which never returns anything (forever accepting incoming
    // connections), but dropping it leads to observable side effects
    // (like closing listening sockets, releasing limited resources,
    // ...)
    #[derive(Debug)]
    struct Dead {
        // when dropped you should get Err(oneshot::Canceled) on the
        // receiving end
        done: oneshot::Sender<()>,
    }
    impl Future for Dead {
        type Item = ();
        type Error = ();

        fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
            Ok(Async::NotReady)
        }
    }

    // need to implement a timeout for the test, as it would hang
    // forever right now
    let (timeout_tx, timeout_rx) = oneshot::channel();
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(1000));
        let _ = timeout_tx.send(());
    });

    let core = local_executor::Core::new();
    let (done_tx, done_rx) = oneshot::channel();
    let future = Dead{done: done_tx};
    let rx = spawn(future, &core);
    let res = core.run(
        Ok::<_, ()>(())
        .into_future()
        .then(move |_| {
            // forget the spawned future: should keep running, i.e. hit
            // the timeout below.
            rx.forget();
            // and wait for the spawned future to release its resources
            done_rx
        })
        .select2(timeout_rx)
    );
    match res {
        Err(Either::A((oneshot::Canceled, _))) => {
            panic!("forgotten dead future was canceled");
        },
        Ok(Either::B(((), _))) => (), // reached timeout
        _ => {
            panic!("forgotten dead future was canceled (unexpected result)");
        },
    }
}