1extern crate futures;
2#[cfg(test)] #[macro_use] extern crate mdo;
3#[cfg(test)] extern crate futures_cpupool;
4
5pub mod future {
6 use futures::future::*;
7
8 pub fn bind<T, E, TF: Future<Item=T, Error=E> + Sized, IFU: IntoFuture<Error = E>, F: FnOnce(T) -> IFU>(m: TF, f: F) -> AndThen<TF, IFU, F> {
9 m.and_then(f)
10 }
11
12 pub fn ret<T, E>(x: T) -> FutureResult<T, E> {
13 ok::<T, E>(x)
14 }
15
16}
17
18pub mod stream {
19 use futures::{Async, Poll};
20 use futures::future::{Future, FutureResult, ok};
21 use futures::stream::*;
22
23 pub fn bind<E, I, U, F>(m: I, f: F) -> Flatten<Map<I, F>>
25 where I: Stream<Error = E> + Sized,
26 U: Stream<Error = E> + Sized,
27 F: FnMut(<I as Stream>::Item) -> U
28 {
29 m.map(f).flatten()
30 }
31
32 pub fn ret<T, E>(x: T) -> WrappedStream<FutureResult<T, E>> {
33 new(Some(ok::<T, E>(x)))
34 }
35
36 pub fn mzero<T, E>() -> WrappedStream<FutureResult<T, E>> {
37 new(None)
38 }
39
40 pub struct WrappedStream<F: Future> {
42 future: Option<F>,
43 }
44
45 pub fn new<F: Future>(future: Option<F>) -> WrappedStream<F> {
46 WrappedStream { future: future }
47 }
48
49 impl<F: Future> Stream for WrappedStream<F> {
50 type Item = F::Item;
51 type Error = F::Error;
52
53 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
54 let ret = match self.future {
55 None => return Ok(Async::Ready(None)),
56 Some(ref mut future) => {
57 match future.poll() {
58 Ok(Async::NotReady) => return Ok(Async::NotReady),
59 Err(e) => Err(e),
60 Ok(Async::Ready(r)) => Ok(r),
61 }
62 }
63 };
64 self.future = None;
65 ret.map(|r| Async::Ready(Some(r)))
66 }
67 }
68
69}
70
71#[cfg(test)]
72mod tests {
73 use futures_cpupool::CpuPool;
74 use std::vec::Vec;
75 use futures::{Future, stream, Stream};
76
77 #[test]
78 fn future_mdo() {
79 use futures::future::ok;
80 use super::future::{bind, ret};
81 let pool = CpuPool::new_num_cpus();
82
83 let get_num = ok::<u32, String>(42);
84 let get_factor = ok::<u32, String>(2);
85
86 let res = mdo! {
87 arg =<< get_num;
88 fact =<< get_factor;
89 ret ret(arg * fact)
90 };
91
92 let val = pool.spawn(res);
93
94 assert_eq!(val.wait().unwrap(), 84);
95 }
96
97 #[test]
98 fn stream_bind() {
99 use super::stream::{bind, ret, mzero};
100
101 let l = bind(stream_range(0, 3), move |x| stream_range(x, 3));
102 assert_eq!(execute(l), vec![0, 1, 2, 1, 2, 2]);
103
104 let l = bind(stream_range(0, 3),
105 move |x| bind(stream_range(0, 3), move |y| ret(x + y)));
106 assert_eq!(execute(l), vec![0, 1, 2, 1, 2, 3, 2, 3, 4]);
107
108 let l = bind(stream_range(1, 11), move |z| {
109 bind(stream_range(1, z + 1), move |y| {
110 bind(stream_range(1, y + 1), move |x| {
111 bind(if x * x + y * y == z * z {
112 ret(())
113 } else {
114 mzero()
115 },
116 move |_| ret((x, y, z)))
117 })
118 })
119 });
120 assert_eq!(execute(l), vec![(3, 4, 5), (6, 8, 10)]);
121 }
122
123 #[test]
124 fn stream_mdo() {
125 use super::stream::{bind, ret, mzero};
126 let l = mdo! {
127 x =<< stream_range(0, 3);
128 ret stream_range(x, 3)
129 };
130 assert_eq!(execute(l), vec![0, 1, 2, 1, 2, 2]);
131 let l = mdo! {
132 x =<< stream_range(0, 3);
133 y =<< stream_range(0, 3);
134 ret ret(x + y)
135 };
136 assert_eq!(execute(l), vec![0, 1, 2, 1, 2, 3, 2, 3, 4]);
137 let l = mdo! {
138 z =<< stream_range(1, 11);
139 y =<< stream_range(1, z);
140 x =<< stream_range(1, y + 1);
141 let test = x * x + y * y == z * z;
142 when test;
143 let res = (x, y, z);
144 ret ret(res)
145 };
146 assert_eq!(execute(l), vec![(3, 4, 5), (6, 8, 10)]);
147 }
148
149 #[test]
150 fn stream_ignore() {
151 use super::stream::{bind, ret};
152 let l = mdo! {
153 x =<< stream_range(0, 5);
154 ign stream_range(0, 2);
155 ret ret(x)
156 };
157 assert_eq!(execute(l), vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4]);
158 }
159
160 fn stream_range(start: u32, end: u32) -> stream::BoxStream<u32, String> {
162 stream::iter((start..end).map(Ok::<u32, String>)).boxed()
163 }
164
165 fn execute<T, S>(s: S) -> Vec<T>
168 where T: Send + 'static,
169 S: Stream<Item = T, Error = String> + Send + 'static
170 {
171 let pool = CpuPool::new_num_cpus();
172 let v = pool.spawn(s.collect());
173 v.wait().unwrap()
174 }
175
176}