1use futures::Future;
2use tokio::{select, sync::mpsc};
3
4async fn send_item_requests<Iter, T>(items: Iter, item_sender: async_channel::Sender<T>)
5where
6 Iter: ExactSizeIterator<Item = T>,
7{
8 for item in items {
9 let send_response = item_sender.send(item).await;
10 if send_response.is_err() {
11 return;
12 }
13 }
14}
15
16async fn process_item_requests<I, T, E, F, Fut>(
17 item_receiver: async_channel::Receiver<I>,
18 checker: F,
19 response_sender: mpsc::Sender<Result<T, E>>,
20) where
21 F: (FnOnce(I) -> Fut) + Copy,
22 Fut: Future<Output = Result<T, E>>,
23{
24 while let Ok(item) = item_receiver.recv().await {
25 select! {
26 result = checker(item) => {
27 let _ = response_sender.send(result).await;
28 }
29 _ = response_sender.closed() => {
30 return;
31 }
32 }
33 }
34}
35
36async fn process_item_responses<T, E>(
37 length: usize,
38 mut response_receiver: mpsc::Receiver<Result<T, E>>,
39) -> Option<Result<T, E>> {
40 let mut result: Option<Result<T, E>> = None;
41 for _ in 0..length {
42 let option_result_cur = response_receiver.recv().await;
43 match option_result_cur {
44 Some(result_cur) => match result_cur {
45 Ok(_) => {
46 result = Some(result_cur);
47 break;
48 }
49 Err(_) => {
50 result = Some(result_cur);
51 }
52 },
53 None => {
54 break;
55 }
56 }
57 }
58 result
59}
60
61pub async fn get_first_ok_bounded<Iter, I, T, E, F, Fut>(
67 items: Iter,
68 mut concurrent: usize,
69 checker: F,
70) -> Option<Result<T, E>>
71where
72 F: (FnOnce(I) -> Fut) + Send + Copy + 'static,
73 Fut: Future<Output = Result<T, E>> + Send,
74 I: Send + 'static,
75 T: Send + 'static,
76 E: Send + 'static,
77 Iter: ExactSizeIterator<Item = I> + Send + 'static,
78{
79 let length = items.len();
80 if concurrent == 0 {
81 concurrent = length;
82 }
83 let (item_sender, item_receiver) = async_channel::bounded::<I>(16);
84 let (response_sender, response_receiver) = mpsc::channel::<Result<T, E>>(16);
85 for _ in 0..concurrent {
86 let item_receiver = item_receiver.clone();
87 let response_sender = response_sender.clone();
88 tokio::task::spawn(async move {
89 process_item_requests(item_receiver, checker, response_sender).await;
90 });
91 }
92 tokio::task::spawn(send_item_requests(items, item_sender));
93 process_item_responses(length, response_receiver).await
94}