1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use futures::{
stream::{self, FuturesUnordered},
try_ready, Async, IntoFuture, Stream,
};
use std::collections::VecDeque;
use std::iter::FromIterator;
pub fn bounded_traversal_stream<In, InsInit, Ins, Out, Unfold, UFut>(
scheduled_max: usize,
init: InsInit,
mut unfold: Unfold,
) -> impl Stream<Item = Out, Error = UFut::Error>
where
Unfold: FnMut(In) -> UFut,
UFut: IntoFuture<Item = (Out, Ins)>,
InsInit: IntoIterator<Item = In>,
Ins: IntoIterator<Item = In>,
{
let mut unscheduled = VecDeque::from_iter(init);
let mut scheduled = FuturesUnordered::new();
stream::poll_fn(move || {
loop {
if scheduled.is_empty() && unscheduled.is_empty() {
return Ok(Async::Ready(None));
}
for item in unscheduled
.drain(..std::cmp::min(unscheduled.len(), scheduled_max - scheduled.len()))
{
scheduled.push(unfold(item).into_future())
}
if let Some((out, children)) = try_ready!(scheduled.poll()) {
for child in children {
unscheduled.push_front(child);
}
return Ok(Async::Ready(Some(out)));
}
}
})
}