1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use futures::Future;
use tokio::{select, sync::mpsc};

async fn send_item_requests<Iter, T>(items: Iter, item_sender: async_channel::Sender<T>)
where
    Iter: ExactSizeIterator<Item = T>,
{
    for item in items {
        let send_response = item_sender.send(item).await;
        if send_response.is_err() {
            return;
        }
    }
}

async fn process_item_requests<I, T, E, F, Fut>(
    item_receiver: async_channel::Receiver<I>,
    checker: F,
    response_sender: mpsc::Sender<Result<T, E>>,
) where
    F: (FnOnce(I) -> Fut) + Copy,
    Fut: Future<Output = Result<T, E>>,
{
    while let Ok(item) = item_receiver.recv().await {
        select! {
            result = checker(item) => {
                let _ = response_sender.send(result).await;
            }
            _ = response_sender.closed() => {
                return;
            }
        }
    }
}

async fn process_item_responses<T, E>(
    length: usize,
    mut response_receiver: mpsc::Receiver<Result<T, E>>,
) -> Option<Result<T, E>> {
    let mut result: Option<Result<T, E>> = None;
    for _ in 0..length {
        let option_result_cur = response_receiver.recv().await;
        match option_result_cur {
            Some(result_cur) => match result_cur {
                Ok(_) => {
                    result = Some(result_cur);
                    break;
                }
                Err(_) => {
                    result = Some(result_cur);
                }
            },
            None => {
                break;
            }
        }
    }
    result
}

/// Returns the first non-error result from a function `checker` applied to each entry in a list of `items`.
/// If the list of items is empty, it returns `None`.
/// If all of the results are errors, it returns the last error.
/// There are `concurrent` workers to apply the `checker` function.
/// If `concurrent` is 0, then it will create `len(items)` workers.
pub async fn get_first_ok_bounded<Iter, I, T, E, F, Fut>(
    items: Iter,
    mut concurrent: usize,
    checker: F,
) -> Option<Result<T, E>>
where
    F: (FnOnce(I) -> Fut) + Send + Copy + 'static,
    Fut: Future<Output = Result<T, E>> + Send,
    I: Send + 'static,
    T: Send + 'static,
    E: Send + 'static,
    Iter: ExactSizeIterator<Item = I> + Send + 'static,
{
    let length = items.len();
    if concurrent == 0 {
        concurrent = length;
    }
    let (item_sender, item_receiver) = async_channel::bounded::<I>(16);
    let (response_sender, response_receiver) = mpsc::channel::<Result<T, E>>(16);
    for _ in 0..concurrent {
        let item_receiver = item_receiver.clone();
        let response_sender = response_sender.clone();
        tokio::task::spawn(async move {
            process_item_requests(item_receiver, checker, response_sender).await;
        });
    }
    tokio::task::spawn(send_item_requests(items, item_sender));
    process_item_responses(length, response_receiver).await
}