veilid_tools/
future_queue.rs1use super::*;
2use futures_util::{Stream, StreamExt as _};
3use stop_token::future::FutureExt as _;
4
5pub async fn process_batched_future_queue_result<I, C, E, R>(
6 future_queue: I,
7 batch_size: usize,
8 stop_token: StopToken,
9 result_callback: C,
10) -> Result<(), E>
11where
12 I: IntoIterator,
13 C: Fn(R) -> Result<(), E>,
14 <I as std::iter::IntoIterator>::Item: core::future::Future<Output = R>,
15{
16 let mut buffered_futures =
17 futures_util::stream::iter(future_queue).buffer_unordered(batch_size);
18 while let Ok(Some(res)) = buffered_futures.next().timeout_at(stop_token.clone()).await {
19 result_callback(res)?;
20 }
21
22 Ok(())
23}
24
25pub async fn process_batched_future_queue_void<I>(
26 future_queue: I,
27 batch_size: usize,
28 stop_token: StopToken,
29) where
30 I: IntoIterator,
31 <I as std::iter::IntoIterator>::Item: core::future::Future<Output = ()>,
32{
33 let mut buffered_futures =
34 futures_util::stream::iter(future_queue).buffer_unordered(batch_size);
35 while let Ok(Some(())) = buffered_futures.next().timeout_at(stop_token.clone()).await {}
36}
37
38pub async fn process_batched_future_stream_result<S, C, E, R>(
39 future_stream: S,
40 batch_size: usize,
41 stop_token: StopToken,
42 result_callback: C,
43) -> Result<(), E>
44where
45 S: Stream,
46 C: Fn(R) -> Result<(), E>,
47 <S as Stream>::Item: core::future::Future<Output = R>,
48{
49 let mut buffered_futures = Box::pin(future_stream.buffer_unordered(batch_size));
50 while let Ok(Some(res)) = buffered_futures.next().timeout_at(stop_token.clone()).await {
51 result_callback(res)?;
52 }
53
54 Ok(())
55}
56
57pub async fn process_batched_future_stream_void<S>(
58 future_stream: S,
59 batch_size: usize,
60 stop_token: StopToken,
61) where
62 S: Stream,
63 <S as Stream>::Item: core::future::Future<Output = ()>,
64{
65 let mut buffered_futures = Box::pin(future_stream.buffer_unordered(batch_size));
66 while let Ok(Some(())) = buffered_futures.next().timeout_at(stop_token.clone()).await {}
67}