futures_buffered/buffered.rs
1use crate::FuturesOrderedBounded;
2use crate::FuturesUnorderedBounded;
3use core::future::Future;
4use futures_core::Stream;
5
6mod for_each;
7mod ordered;
8mod unordered;
9
10pub use for_each::ForEachConcurrent;
11pub use ordered::BufferedOrdered;
12pub use unordered::BufferUnordered;
13
14impl<T: ?Sized + Stream> BufferedStreamExt for T {}
15
16/// An extension trait for `Stream`s that provides a variety of convenient
17/// combinator functions.
18pub trait BufferedStreamExt: Stream {
19 /// An adaptor for creating a buffered list of pending futures.
20 ///
21 /// If this stream's item can be converted into a future, then this adaptor
22 /// will buffer up to at most `n` futures and then return the outputs in the
23 /// same order as the underlying stream. No more than `n` futures will be
24 /// buffered at any point in time, and less than `n` may also be buffered
25 /// depending on the state of each future.
26 ///
27 /// The returned stream will be a stream of each future's output.
28 fn buffered_ordered(self, n: usize) -> BufferedOrdered<Self>
29 where
30 Self::Item: Future,
31 Self: Sized,
32 {
33 BufferedOrdered {
34 stream: Some(self),
35 in_progress_queue: FuturesOrderedBounded::new(n),
36 }
37 }
38
39 /// An adaptor for creating a buffered list of pending futures (unordered).
40 ///
41 /// If this stream's item can be converted into a future, then this adaptor
42 /// will buffer up to `n` futures and then return the outputs in the order
43 /// in which they complete. No more than `n` futures will be buffered at
44 /// any point in time, and less than `n` may also be buffered depending on
45 /// the state of each future.
46 ///
47 /// The returned stream will be a stream of each future's output.
48 ///
49 /// # Examples
50 ///
51 /// ```
52 /// # futures::executor::block_on(async {
53 /// use futures::channel::oneshot;
54 /// use futures::stream::{self, StreamExt};
55 /// use futures_buffered::BufferedStreamExt;
56 ///
57 /// let (send_one, recv_one) = oneshot::channel();
58 /// let (send_two, recv_two) = oneshot::channel();
59 ///
60 /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
61 /// let mut buffered = stream_of_futures.buffered_unordered(10);
62 ///
63 /// send_two.send(2i32)?;
64 /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
65 ///
66 /// send_one.send(1i32)?;
67 /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
68 ///
69 /// assert_eq!(buffered.next().await, None);
70 /// # Ok::<(), i32>(()) }).unwrap();
71 /// ```
72 ///
73 /// See [`BufferUnordered`] for performance details
74 fn buffered_unordered(self, n: usize) -> BufferUnordered<Self>
75 where
76 Self::Item: Future,
77 Self: Sized,
78 {
79 BufferUnordered {
80 stream: Some(self),
81 in_progress_queue: FuturesUnorderedBounded::new(n),
82 }
83 }
84
85 /// Runs this stream to completion, executing the provided asynchronous
86 /// closure for each element on the stream concurrently as elements become
87 /// available.
88 ///
89 /// This is similar to [`StreamExt::for_each`](futures_util::StreamExt::for_each), but the futures
90 /// produced by the closure are run concurrently (but not in parallel--
91 /// this combinator does not introduce any threads).
92 ///
93 /// The closure provided will be called for each item this stream produces,
94 /// yielding a future. That future will then be executed to completion
95 /// concurrently with the other futures produced by the closure.
96 ///
97 /// The first argument is an optional limit on the number of concurrent
98 /// futures. If this limit is not `None`, no more than `limit` futures
99 /// will be run concurrently. The `limit` argument is of type
100 /// `Into<Option<usize>>`, and so can be provided as either `None`,
101 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
102 /// no limit at all, and will have the same result as passing in `None`.
103 ///
104 /// This method is only available when the `std` or `alloc` feature of this
105 /// library is activated, and it is activated by default.
106 ///
107 /// # Examples
108 ///
109 /// ```
110 /// # futures::executor::block_on(async {
111 /// use futures::channel::oneshot;
112 /// use futures::stream::{self, StreamExt};
113 ///
114 /// let (tx1, rx1) = oneshot::channel();
115 /// let (tx2, rx2) = oneshot::channel();
116 /// let (tx3, rx3) = oneshot::channel();
117 ///
118 /// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent(
119 /// /* limit */ 2,
120 /// |rx| async move {
121 /// rx.await.unwrap();
122 /// }
123 /// );
124 /// tx1.send(()).unwrap();
125 /// tx2.send(()).unwrap();
126 /// tx3.send(()).unwrap();
127 /// fut.await;
128 /// # })
129 /// ```
130 fn for_each_concurrent<Fut, F>(self, limit: usize, f: F) -> ForEachConcurrent<Self, Fut, F>
131 where
132 F: FnMut(Self::Item) -> Fut,
133 Fut: Future<Output = ()>,
134 Self: Sized,
135 {
136 ForEachConcurrent::new(self, limit, f)
137 }
138}