buffer_unordered_weighted/lib.rs
1// Copyright (c) The buffer-unordered-weighted Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! `buffer_unordered_weighted` is a variant of
5//! [`buffer_unordered`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.buffer_unordered),
6//! where each future can be assigned a different weight.
7//!
8//! This crate is part of the [nextest organization](https://github.com/nextest-rs) on GitHub, and is
9//! designed to serve the needs of [cargo-nextest](https://nexte.st).
10//!
11//! # Motivation
12//!
13//! Async programming in Rust often uses an adaptor called
14//! [`buffer_unordered`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.buffer_unordered):
15//! this adaptor takes a stream of futures[^1], and executes all the futures limited to a maximum
16//! amount of concurrency.
17//!
18//! * Futures are started in the order the stream returns them in.
19//! * Once started, futures are polled simultaneously, and completed future outputs are returned
20//! in arbitrary order (hence the `unordered`).
21//!
22//! Common use cases for `buffer_unordered` include:
23//!
24//! * Sending network requests concurrently, but limiting the amount of concurrency to avoid
25//! overwhelming the remote server.
26//! * Running tests with a tool like [cargo-nextest](https://nexte.st).
27//!
28//! `buffer_unordered` works well for many use cases. However, one issue with it is that it treats
29//! all futures as equally taxing: there's no way to say that some futures consume more resources
30//! than others. For nextest in particular, some tests can be much heavier than others, and fewer of
31//! those tests should be run simultaneously.
32//!
33//! [^1]: This adaptor takes a stream of futures for maximum generality. In practice this is often
34//! an *iterator* of futures, converted over using
35//! [`stream::iter`](https://docs.rs/futures/latest/futures/stream/fn.iter.html).
36//!
37//! # About this crate
38//!
39//! This crate provides an adaptor on streams called `buffer_unordered_weighted`, which can run
40//! several futures simultaneously, limiting the concurrency to a maximum *weight*.
41//!
42//! Rather than taking a stream of futures, this adaptor takes a stream of `(usize, future)` pairs,
43//! where the `usize` indicates the weight of each future. This adaptor will schedule and buffer
44//! futures to be run until the maximum weight is exceeded. Once that happens, this adaptor will
45//! wait until some of the currently executing futures complete, and the current weight of running
46//! futures drops below the maximum weight, before scheduling new futures.
47//!
48//! Note that in some cases, the current weight may exceed the maximum weight. For example:
49//!
50//! * Let's say the maximum weight is **24**, and the current weight is **20**.
51//! * If the next future has weight **6**, then it will be scheduled and the current weight will become **26**.
52//! * No new futures will be scheduled until the current weight falls to **23** or below.
53//!
54//! It is possible to have a variant of this adaptor which always stays below the limit and holds
55//! the next future in abeyance; however, the implementation for that variant is a bit more
56//! complicated, and is also not the behavior desired by nextest. This variant may be provided in
57//! the future.
58//!
59//! The weight of a future can be zero, in which case it doesn't count towards the maximum weight.
60//!
61//! If all weights are 1, then `buffer_unordered_weighted` is exactly the same as `buffer_unordered`.
62//!
63//! # Examples
64//!
65//! ```
66//! # futures::executor::block_on(async {
67//! use futures::{channel::oneshot, stream, StreamExt as _};
68//! use buffer_unordered_weighted::{StreamExt as _};
69//!
70//! let (send_one, recv_one) = oneshot::channel();
71//! let (send_two, recv_two) = oneshot::channel();
72//!
73//! let stream_of_futures = stream::iter(vec![(1, recv_one), (2, recv_two)]);
74//! let mut buffered = stream_of_futures.buffer_unordered_weighted(10);
75//!
76//! send_two.send("hello")?;
77//! assert_eq!(buffered.next().await, Some(Ok("hello")));
78//!
79//! send_one.send("world")?;
80//! assert_eq!(buffered.next().await, Some(Ok("world")));
81//!
82//! assert_eq!(buffered.next().await, None);
83//! # Ok::<(), &'static str>(()) }).unwrap();
84//! ```
85//!
86//! # Minimum supported Rust version (MSRV)
87//!
88//! The minimum supported Rust version is **Rust 1.56.**
89//!
90//! The MSRV will likely not change in the medium term, but while this crate is a pre-release
91//! (0.x.x) it may have its MSRV bumped in a patch release. Once this crate has reached 1.x, any
92//! MSRV bump will be accompanied with a new minor version.
93//!
94
95use futures_util::{
96 stream::{Fuse, FuturesUnordered},
97 Future, Stream, StreamExt as _,
98};
99use pin_project_lite::pin_project;
100use private::WeightedFuture;
101use std::{
102 fmt,
103 pin::Pin,
104 task::{Context, Poll},
105};
106
107impl<T: ?Sized> StreamExt for T where T: Stream {}
108
109/// An extension trait for `Stream`s that provides
110/// [`buffer_unordered_weighted`](StreamExt::buffer_unordered_weighted).
111pub trait StreamExt: Stream {
112 /// An adaptor for creating a buffered list of pending futures (unordered), where
113 /// each future has a different weight.
114 ///
115 /// This stream must return values of type `(usize, impl Future)`, where the `usize` indicates
116 /// the weight of each future. This adaptor will buffer futures up to weight `max_weight`, and
117 /// then return the outputs in the order in which they complete.
118 ///
119 /// The weight may be exceeded if the last future to be queued has a weight greater than
120 /// `max_weight` minus the total weight of currently executing futures. However, no further
121 /// futures will be queued until the total weights of running futures falls below `max_weight`.
122 ///
123 /// The adaptor will schedule futures in the order they're returned by the stream, without doing
124 /// any reordering based on weight.
125 ///
126 /// The weight of a future can be zero, in which case it will not count towards the total weight.
127 ///
128 /// The returned stream will be a stream of each future's output.
129 ///
130 /// # Examples
131 ///
132 /// See [the crate documentation](crate#examples) for an example.
133 fn buffer_unordered_weighted<Fut>(self, max_weight: usize) -> BufferUnorderedWeighted<Self>
134 where
135 Self: Sized + Stream<Item = (usize, Fut)>,
136 Fut: Future,
137 {
138 assert_stream::<Fut::Output, _>(BufferUnorderedWeighted::new(self, max_weight))
139 }
140}
141
142pin_project! {
143 /// Stream for the [`buffer_unordered_weighted`](StreamExt::buffer_unordered_weighted) method.
144 #[must_use = "streams do nothing unless polled"]
145 pub struct BufferUnorderedWeighted<St>
146 where
147 St: Stream,
148 St::Item: WeightedFuture,
149 {
150 #[pin]
151 stream: Fuse<St>,
152 in_progress_queue: FuturesUnordered<FutureWithWeight<<St::Item as WeightedFuture>::Future>>,
153 max_weight: usize,
154 current_weight: usize,
155 }
156}
157
158impl<St> fmt::Debug for BufferUnorderedWeighted<St>
159where
160 St: Stream + fmt::Debug,
161 St::Item: WeightedFuture,
162{
163 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164 f.debug_struct("BufferUnorderedWeighted")
165 .field("stream", &self.stream)
166 .field("in_progress_queue", &self.in_progress_queue)
167 .field("max_weight", &self.max_weight)
168 .field("current_weight", &self.current_weight)
169 .finish()
170 }
171}
172
173impl<St> BufferUnorderedWeighted<St>
174where
175 St: Stream,
176 St::Item: WeightedFuture,
177{
178 pub(crate) fn new(stream: St, max_weight: usize) -> Self {
179 Self {
180 stream: stream.fuse(),
181 in_progress_queue: FuturesUnordered::new(),
182 max_weight,
183 current_weight: 0,
184 }
185 }
186
187 /// Returns the maximum weight of futures allowed to be run by this adaptor.
188 pub fn max_weight(&self) -> usize {
189 self.max_weight
190 }
191
192 /// Returns the currently running weight of futures.
193 pub fn current_weight(&self) -> usize {
194 self.current_weight
195 }
196
197 /// Acquires a reference to the underlying sink or stream that this combinator is
198 /// pulling from.
199 pub fn get_ref(&self) -> &St {
200 self.stream.get_ref()
201 }
202
203 /// Acquires a mutable reference to the underlying sink or stream that this
204 /// combinator is pulling from.
205 ///
206 /// Note that care must be taken to avoid tampering with the state of the
207 /// sink or stream which may otherwise confuse this combinator.
208 pub fn get_mut(&mut self) -> &mut St {
209 self.stream.get_mut()
210 }
211
212 /// Acquires a pinned mutable reference to the underlying sink or stream that this
213 /// combinator is pulling from.
214 ///
215 /// Note that care must be taken to avoid tampering with the state of the
216 /// sink or stream which may otherwise confuse this combinator.
217 pub fn get_pin_mut(self: Pin<&mut Self>) -> core::pin::Pin<&mut St> {
218 self.project().stream.get_pin_mut()
219 }
220
221 /// Consumes this combinator, returning the underlying sink or stream.
222 ///
223 /// Note that this may discard intermediate state of this combinator, so
224 /// care should be taken to avoid losing resources when this is called.
225 pub fn into_inner(self) -> St {
226 self.stream.into_inner()
227 }
228}
229
230impl<St> Stream for BufferUnorderedWeighted<St>
231where
232 St: Stream,
233 St::Item: WeightedFuture,
234{
235 type Item = <<St::Item as WeightedFuture>::Future as Future>::Output;
236
237 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
238 let mut this = self.project();
239
240 // First up, try to spawn off as many futures as possible by filling up
241 // our queue of futures.
242 while *this.current_weight < *this.max_weight {
243 match this.stream.as_mut().poll_next(cx) {
244 Poll::Ready(Some(weighted_future)) => {
245 let (weight, future) = weighted_future.into_components();
246 *this.current_weight =
247 this.current_weight.checked_add(weight).unwrap_or_else(|| {
248 panic!(
249 "buffer_unordered_weighted: added weight {} to current {}, overflowed",
250 weight,
251 this.current_weight,
252 )
253 });
254 this.in_progress_queue
255 .push(FutureWithWeight::new(weight, future));
256 }
257 Poll::Ready(None) | Poll::Pending => break,
258 }
259 }
260
261 // Attempt to pull the next value from the in_progress_queue
262 match this.in_progress_queue.poll_next_unpin(cx) {
263 Poll::Pending => return Poll::Pending,
264 Poll::Ready(Some((weight, output))) => {
265 *this.current_weight = this.current_weight.checked_sub(weight).unwrap_or_else(|| {
266 panic!(
267 "buffer_unordered_weighted: subtracted weight {} from current {}, overflowed",
268 weight,
269 this.current_weight,
270 )
271 });
272 return Poll::Ready(Some(output));
273 }
274 Poll::Ready(None) => {}
275 }
276
277 // If more values are still coming from the stream, we're not done yet
278 if this.stream.is_done() {
279 Poll::Ready(None)
280 } else {
281 Poll::Pending
282 }
283 }
284
285 fn size_hint(&self) -> (usize, Option<usize>) {
286 let queue_len = self.in_progress_queue.len();
287 let (lower, upper) = self.stream.size_hint();
288 let lower = lower.saturating_add(queue_len);
289 let upper = match upper {
290 Some(x) => x.checked_add(queue_len),
291 None => None,
292 };
293 (lower, upper)
294 }
295}
296
297mod private {
298 use futures_util::Future;
299
300 pub trait WeightedFuture {
301 type Future: Future;
302
303 fn into_components(self) -> (usize, Self::Future);
304 }
305
306 impl<Fut> WeightedFuture for (usize, Fut)
307 where
308 Fut: Future,
309 {
310 type Future = Fut;
311
312 #[inline]
313 fn into_components(self) -> (usize, Self::Future) {
314 self
315 }
316 }
317}
318
319pin_project! {
320 #[must_use = "futures do nothing unless polled"]
321 struct FutureWithWeight<Fut> {
322 #[pin]
323 future: Fut,
324 weight: usize,
325 }
326}
327
328impl<Fut> FutureWithWeight<Fut> {
329 pub fn new(weight: usize, future: Fut) -> Self {
330 Self { future, weight }
331 }
332}
333
334impl<Fut> Future for FutureWithWeight<Fut>
335where
336 Fut: Future,
337{
338 type Output = (usize, Fut::Output);
339 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
340 let this = self.project();
341
342 match this.future.poll(cx) {
343 Poll::Pending => Poll::Pending,
344 Poll::Ready(output) => Poll::Ready((*this.weight, output)),
345 }
346 }
347}
348
349pub(crate) fn assert_stream<T, S>(stream: S) -> S
350where
351 S: Stream<Item = T>,
352{
353 stream
354}