futures_01_ext/
futures_ordered.rs1use std::fmt;
14
15use futures::Async;
16use futures::Future;
17use futures::IntoFuture;
18use futures::Poll;
19use futures::Stream;
20
21#[must_use = "streams do nothing unless polled"]
26pub struct FuturesOrdered<I>
27where
28 I: IntoIterator,
29 I::Item: IntoFuture,
30{
31 elems: I::IntoIter,
32 current: Option<<I::Item as IntoFuture>::Future>,
33}
34
35impl<I> fmt::Debug for FuturesOrdered<I>
36where
37 I: IntoIterator,
38 I::Item: IntoFuture,
39 <<I as IntoIterator>::Item as IntoFuture>::Future: fmt::Debug,
40 <<I as IntoIterator>::Item as IntoFuture>::Item: fmt::Debug,
41{
42 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
43 fmt.debug_struct("FuturesOrdered")
44 .field("current", &self.current)
45 .finish()
46 }
47}
48
49pub fn futures_ordered<I>(iter: I) -> FuturesOrdered<I>
55where
56 I: IntoIterator,
57 I::Item: IntoFuture,
58{
59 let mut elems = iter.into_iter();
60 let current = next_future(&mut elems);
61 FuturesOrdered { elems, current }
62}
63
64impl<I> Stream for FuturesOrdered<I>
65where
66 I: IntoIterator,
67 I::Item: IntoFuture,
68{
69 type Item = <I::Item as IntoFuture>::Item;
70 type Error = <I::Item as IntoFuture>::Error;
71
72 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
73 match self.current.take() {
74 Some(mut fut) => {
75 match fut.poll() {
76 Ok(Async::Ready(v)) => {
77 self.current = next_future(&mut self.elems);
78 Ok(Async::Ready(Some(v)))
79 }
80 Ok(Async::NotReady) => {
81 self.current = Some(fut);
82 Ok(Async::NotReady)
83 }
84 Err(e) => {
85 self.current = next_future(&mut self.elems);
88 Err(e)
89 }
90 }
91 }
92 None => {
93 Ok(Async::Ready(None))
95 }
96 }
97 }
98}
99
100#[inline]
101fn next_future<I>(elems: &mut I) -> Option<<I::Item as IntoFuture>::Future>
102where
103 I: Iterator,
104 I::Item: IntoFuture,
105{
106 elems.next().map(IntoFuture::into_future)
107}
108
109#[cfg(test)]
110mod test {
111 use std::result;
112
113 use futures::sync::mpsc;
114 use futures::task;
115 use futures::Future;
116 use futures::Sink;
117 use futures::Stream;
118 use futures03::compat::Future01CompatExt;
119
120 use super::*;
121
122 #[test]
123 fn test_basic() {
124 let into_futs = vec![ok(1), ok(2)];
125 assert_eq!(futures_ordered(into_futs).collect().wait(), Ok(vec![1, 2]));
126
127 let into_futs = vec![ok(1), err(2), ok(3)];
128 assert_eq!(futures_ordered(into_futs).collect().wait(), Err(2));
129 }
130
131 #[test]
132 fn test_serial() {
133 let (tx, rx) = mpsc::channel(2);
134 let futs = vec![delayed_future(10, tx.clone(), 4), delayed_future(20, tx, 2)];
138
139 let runtime = tokio::runtime::Runtime::new().unwrap();
140 runtime
141 .block_on(futures_ordered(futs).collect().compat())
142 .unwrap();
143 let results = runtime.block_on(rx.collect().compat());
144 assert_eq!(results, Ok(vec![10, 20]));
145 }
146
147 fn delayed_future<T>(v: T, tx: mpsc::Sender<T>, count: usize) -> DelayedFuture<T> {
148 DelayedFuture {
149 send: Some((v, tx)),
150 count,
151 }
152 }
153 #[must_use = "futures do nothing unless you `.await` or poll them"]
154 struct DelayedFuture<T> {
155 send: Option<(T, mpsc::Sender<T>)>,
156 count: usize,
157 }
158
159 impl<T> Future for DelayedFuture<T> {
160 type Item = ();
161 type Error = !;
162
163 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
164 self.count -= 1;
165 if self.count == 0 {
166 let (v, tx) = self.send.take().unwrap();
167 tx.send(v).wait().unwrap();
170 Ok(Async::Ready(()))
171 } else {
172 task::current().notify();
174 Ok(Async::NotReady)
175 }
176 }
177 }
178
179 fn ok(v: i32) -> result::Result<i32, i32> {
180 Ok(v)
181 }
182
183 fn err(v: i32) -> result::Result<i32, i32> {
184 Err(v)
185 }
186}