mdo_future/
lib.rs

1extern crate futures;
2#[cfg(test)] #[macro_use] extern crate mdo;
3#[cfg(test)] extern crate futures_cpupool;
4
5pub mod future {
6    use futures::future::*;
7
8    pub fn bind<T, E, TF: Future<Item=T, Error=E> + Sized, IFU: IntoFuture<Error = E>, F: FnOnce(T) -> IFU>(m: TF, f: F) -> AndThen<TF, IFU, F> {
9        m.and_then(f)
10    }
11
12    pub fn ret<T, E>(x: T) -> FutureResult<T, E> {
13        ok::<T, E>(x)
14    }
15
16}
17
18pub mod stream {
19    use futures::{Async, Poll};
20    use futures::future::{Future, FutureResult, ok};
21    use futures::stream::*;
22
23    /// bind for Stream, equivalent to `m.map(f).flatten()`
24    pub fn bind<E, I, U, F>(m: I, f: F) -> Flatten<Map<I, F>>
25        where I: Stream<Error = E> + Sized,
26              U: Stream<Error = E> + Sized,
27              F: FnMut(<I as Stream>::Item) -> U
28    {
29        m.map(f).flatten()
30    }
31
32    pub fn ret<T, E>(x: T) -> WrappedStream<FutureResult<T, E>> {
33        new(Some(ok::<T, E>(x)))
34    }
35
36    pub fn mzero<T, E>() -> WrappedStream<FutureResult<T, E>> {
37        new(None)
38    }
39
40    /// A stream that wraps a single future or nothing (representing an empty stream)
41    pub struct WrappedStream<F: Future> {
42        future: Option<F>,
43    }
44
45    pub fn new<F: Future>(future: Option<F>) -> WrappedStream<F> {
46        WrappedStream { future: future }
47    }
48
49    impl<F: Future> Stream for WrappedStream<F> {
50        type Item = F::Item;
51        type Error = F::Error;
52
53        fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
54            let ret = match self.future {
55                None => return Ok(Async::Ready(None)),
56                Some(ref mut future) => {
57                    match future.poll() {
58                        Ok(Async::NotReady) => return Ok(Async::NotReady),
59                        Err(e) => Err(e),
60                        Ok(Async::Ready(r)) => Ok(r),
61                    }
62                }
63            };
64            self.future = None;
65            ret.map(|r| Async::Ready(Some(r)))
66        }
67    }
68
69}
70
71#[cfg(test)]
72mod tests {
73    use futures_cpupool::CpuPool;
74    use std::vec::Vec;
75    use futures::{Future, stream, Stream};
76
77    #[test]
78    fn future_mdo() {
79        use futures::future::ok;
80        use super::future::{bind, ret};
81        let pool = CpuPool::new_num_cpus();
82
83        let get_num = ok::<u32, String>(42);
84        let get_factor = ok::<u32, String>(2);
85
86        let res = mdo! {
87            arg =<< get_num;
88            fact =<< get_factor;
89            ret ret(arg * fact)
90        };
91
92        let val = pool.spawn(res);
93
94        assert_eq!(val.wait().unwrap(), 84);
95    }
96
97    #[test]
98    fn stream_bind() {
99        use super::stream::{bind, ret, mzero};
100
101        let l = bind(stream_range(0, 3), move |x| stream_range(x, 3));
102        assert_eq!(execute(l), vec![0, 1, 2, 1, 2, 2]);
103
104        let l = bind(stream_range(0, 3),
105                     move |x| bind(stream_range(0, 3), move |y| ret(x + y)));
106        assert_eq!(execute(l), vec![0, 1, 2, 1, 2, 3, 2, 3, 4]);
107
108        let l = bind(stream_range(1, 11), move |z| {
109            bind(stream_range(1, z + 1), move |y| {
110                bind(stream_range(1, y + 1), move |x| {
111                    bind(if x * x + y * y == z * z {
112                             ret(())
113                         } else {
114                             mzero()
115                         },
116                         move |_| ret((x, y, z)))
117                })
118            })
119        });
120        assert_eq!(execute(l), vec![(3, 4, 5), (6, 8, 10)]);
121    }
122
123    #[test]
124    fn stream_mdo() {
125        use super::stream::{bind, ret, mzero};
126        let l = mdo! {
127            x =<< stream_range(0, 3);
128            ret stream_range(x, 3)
129        };
130        assert_eq!(execute(l), vec![0, 1, 2, 1, 2, 2]);
131        let l = mdo! {
132            x =<< stream_range(0, 3);
133            y =<< stream_range(0, 3);
134            ret ret(x + y)
135        };
136        assert_eq!(execute(l), vec![0, 1, 2, 1, 2, 3, 2, 3, 4]);
137        let l = mdo! {
138            z =<< stream_range(1, 11);
139            y =<< stream_range(1, z);
140            x =<< stream_range(1, y + 1);
141            let test = x * x + y * y == z * z;
142            when test;
143            let res = (x, y, z);
144            ret ret(res)
145        };
146        assert_eq!(execute(l), vec![(3, 4, 5), (6, 8, 10)]);
147    }
148
149    #[test]
150    fn stream_ignore() {
151        use super::stream::{bind, ret};
152        let l = mdo! {
153            x =<< stream_range(0, 5);
154            ign stream_range(0, 2);
155            ret ret(x)
156        };
157        assert_eq!(execute(l), vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4]);
158    }
159
160    // Generate a stream from start (inclusive) to end (exclusive)
161    fn stream_range(start: u32, end: u32) -> stream::BoxStream<u32, String> {
162        stream::iter((start..end).map(Ok::<u32, String>)).boxed()
163    }
164
165    // execute the stream on a CpuPool and return a future of the
166    // collected result
167    fn execute<T, S>(s: S) -> Vec<T>
168        where T: Send + 'static,
169              S: Stream<Item = T, Error = String> + Send + 'static
170    {
171        let pool = CpuPool::new_num_cpus();
172        let v = pool.spawn(s.collect());
173        v.wait().unwrap()
174    }
175
176}