1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/*
 * Copyright (c) Facebook, Inc. and its affiliates.
 *
 * This source code is licensed under both the MIT license found in the
 * LICENSE-MIT file in the root directory of this source tree and the Apache
 * License, Version 2.0 found in the LICENSE-APACHE file in the root directory
 * of this source tree.
 */

use futures::{
    stream::{self, FuturesUnordered},
    try_ready, Async, IntoFuture, Stream,
};
use std::collections::VecDeque;
use std::iter::FromIterator;

/// `bounded_traversal_stream` traverses implicit asynchronous tree specified by `init`
/// and `unfold` arguments. All `unfold` operations are executed in parallel if they
/// do not depend on each other (not related by ancestor-descendant relation in implicit
/// tree) with amount of concurrency constrained by `scheduled_max`. Main difference
/// with `bounded_traversal` is that this one is not structure perserving, and returns
/// stream.
///
/// ## `init: InsInit`
/// Is the root(s) of the implicit tree to be traversed
///
/// ## `unfold: FnMut(In) -> impl IntoFuture<Item = (Out, impl IntoIterator<Item = In>)>`
/// Asynchronous function which given input value produces list of its children and output
/// value.
///
/// ## return value `impl Stream<Item = Out>`
/// Stream of all `Out` values
///
pub fn bounded_traversal_stream<In, InsInit, Ins, Out, Unfold, UFut>(
    scheduled_max: usize,
    init: InsInit,
    mut unfold: Unfold,
) -> impl Stream<Item = Out, Error = UFut::Error>
where
    Unfold: FnMut(In) -> UFut,
    UFut: IntoFuture<Item = (Out, Ins)>,
    InsInit: IntoIterator<Item = In>,
    Ins: IntoIterator<Item = In>,
{
    let mut unscheduled = VecDeque::from_iter(init);
    let mut scheduled = FuturesUnordered::new();
    stream::poll_fn(move || {
        loop {
            if scheduled.is_empty() && unscheduled.is_empty() {
                return Ok(Async::Ready(None));
            }

            for item in unscheduled
                .drain(..std::cmp::min(unscheduled.len(), scheduled_max - scheduled.len()))
            {
                scheduled.push(unfold(item).into_future())
            }

            if let Some((out, children)) = try_ready!(scheduled.poll()) {
                for child in children {
                    unscheduled.push_front(child);
                }
                return Ok(Async::Ready(Some(out)));
            }
        }
    })
}