first_ok/
lib.rs

1use futures::Future;
2use tokio::{select, sync::mpsc};
3
4async fn send_item_requests<Iter, T>(items: Iter, item_sender: async_channel::Sender<T>)
5where
6    Iter: ExactSizeIterator<Item = T>,
7{
8    for item in items {
9        let send_response = item_sender.send(item).await;
10        if send_response.is_err() {
11            return;
12        }
13    }
14}
15
16async fn process_item_requests<I, T, E, F, Fut>(
17    item_receiver: async_channel::Receiver<I>,
18    checker: F,
19    response_sender: mpsc::Sender<Result<T, E>>,
20) where
21    F: (FnOnce(I) -> Fut) + Copy,
22    Fut: Future<Output = Result<T, E>>,
23{
24    while let Ok(item) = item_receiver.recv().await {
25        select! {
26            result = checker(item) => {
27                let _ = response_sender.send(result).await;
28            }
29            _ = response_sender.closed() => {
30                return;
31            }
32        }
33    }
34}
35
36async fn process_item_responses<T, E>(
37    length: usize,
38    mut response_receiver: mpsc::Receiver<Result<T, E>>,
39) -> Option<Result<T, E>> {
40    let mut result: Option<Result<T, E>> = None;
41    for _ in 0..length {
42        let option_result_cur = response_receiver.recv().await;
43        match option_result_cur {
44            Some(result_cur) => match result_cur {
45                Ok(_) => {
46                    result = Some(result_cur);
47                    break;
48                }
49                Err(_) => {
50                    result = Some(result_cur);
51                }
52            },
53            None => {
54                break;
55            }
56        }
57    }
58    result
59}
60
61/// Returns the first non-error result from a function `checker` applied to each entry in a list of `items`.
62/// If the list of items is empty, it returns `None`.
63/// If all of the results are errors, it returns the last error.
64/// There are `concurrent` workers to apply the `checker` function.
65/// If `concurrent` is 0, then it will create `len(items)` workers.
66pub async fn get_first_ok_bounded<Iter, I, T, E, F, Fut>(
67    items: Iter,
68    mut concurrent: usize,
69    checker: F,
70) -> Option<Result<T, E>>
71where
72    F: (FnOnce(I) -> Fut) + Send + Copy + 'static,
73    Fut: Future<Output = Result<T, E>> + Send,
74    I: Send + 'static,
75    T: Send + 'static,
76    E: Send + 'static,
77    Iter: ExactSizeIterator<Item = I> + Send + 'static,
78{
79    let length = items.len();
80    if concurrent == 0 {
81        concurrent = length;
82    }
83    let (item_sender, item_receiver) = async_channel::bounded::<I>(16);
84    let (response_sender, response_receiver) = mpsc::channel::<Result<T, E>>(16);
85    for _ in 0..concurrent {
86        let item_receiver = item_receiver.clone();
87        let response_sender = response_sender.clone();
88        tokio::task::spawn(async move {
89            process_item_requests(item_receiver, checker, response_sender).await;
90        });
91    }
92    tokio::task::spawn(send_item_requests(items, item_sender));
93    process_item_responses(length, response_receiver).await
94}