Skip to main content

veilid_tools/
future_queue.rs

1use super::*;
2use futures_util::{Stream, StreamExt as _};
3use stop_token::future::FutureExt as _;
4
5pub async fn process_batched_future_queue_result<I, C, E, R>(
6    future_queue: I,
7    batch_size: usize,
8    stop_token: StopToken,
9    result_callback: C,
10) -> Result<(), E>
11where
12    I: IntoIterator,
13    C: Fn(R) -> Result<(), E>,
14    <I as std::iter::IntoIterator>::Item: core::future::Future<Output = R>,
15{
16    let mut buffered_futures =
17        futures_util::stream::iter(future_queue).buffer_unordered(batch_size);
18    while let Ok(Some(res)) = buffered_futures.next().timeout_at(stop_token.clone()).await {
19        result_callback(res)?;
20    }
21
22    Ok(())
23}
24
25pub async fn process_batched_future_queue_void<I>(
26    future_queue: I,
27    batch_size: usize,
28    stop_token: StopToken,
29) where
30    I: IntoIterator,
31    <I as std::iter::IntoIterator>::Item: core::future::Future<Output = ()>,
32{
33    let mut buffered_futures =
34        futures_util::stream::iter(future_queue).buffer_unordered(batch_size);
35    while let Ok(Some(())) = buffered_futures.next().timeout_at(stop_token.clone()).await {}
36}
37
38pub async fn process_batched_future_stream_result<S, C, E, R>(
39    future_stream: S,
40    batch_size: usize,
41    stop_token: StopToken,
42    result_callback: C,
43) -> Result<(), E>
44where
45    S: Stream,
46    C: Fn(R) -> Result<(), E>,
47    <S as Stream>::Item: core::future::Future<Output = R>,
48{
49    let mut buffered_futures = Box::pin(future_stream.buffer_unordered(batch_size));
50    while let Ok(Some(res)) = buffered_futures.next().timeout_at(stop_token.clone()).await {
51        result_callback(res)?;
52    }
53
54    Ok(())
55}
56
57pub async fn process_batched_future_stream_void<S>(
58    future_stream: S,
59    batch_size: usize,
60    stop_token: StopToken,
61) where
62    S: Stream,
63    <S as Stream>::Item: core::future::Future<Output = ()>,
64{
65    let mut buffered_futures = Box::pin(future_stream.buffer_unordered(batch_size));
66    while let Ok(Some(())) = buffered_futures.next().timeout_at(stop_token.clone()).await {}
67}