buffer_unordered_weighted/
lib.rs

1// Copyright (c) The buffer-unordered-weighted Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! `buffer_unordered_weighted` is a variant of
5//! [`buffer_unordered`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.buffer_unordered),
6//! where each future can be assigned a different weight.
7//!
8//! This crate is part of the [nextest organization](https://github.com/nextest-rs) on GitHub, and is
9//! designed to serve the needs of [cargo-nextest](https://nexte.st).
10//!
11//! # Motivation
12//!
13//! Async programming in Rust often uses an adaptor called
14//! [`buffer_unordered`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.buffer_unordered):
15//! this adaptor takes a stream of futures[^1], and executes all the futures limited to a maximum
16//! amount of concurrency.
17//!
18//! * Futures are started in the order the stream returns them in.
19//! * Once started, futures are polled simultaneously, and completed future outputs are returned
20//!   in arbitrary order (hence the `unordered`).
21//!
22//! Common use cases for `buffer_unordered` include:
23//!
24//! * Sending network requests concurrently, but limiting the amount of concurrency to avoid
25//!   overwhelming the remote server.
26//! * Running tests with a tool like [cargo-nextest](https://nexte.st).
27//!
28//! `buffer_unordered` works well for many use cases. However, one issue with it is that it treats
29//! all futures as equally taxing: there's no way to say that some futures consume more resources
30//! than others. For nextest in particular, some tests can be much heavier than others, and fewer of
31//! those tests should be run simultaneously.
32//!
33//! [^1]: This adaptor takes a stream of futures for maximum generality. In practice this is often
34//!     an *iterator* of futures, converted over using
35//!     [`stream::iter`](https://docs.rs/futures/latest/futures/stream/fn.iter.html).
36//!
37//! # About this crate
38//!
39//! This crate provides an adaptor on streams called `buffer_unordered_weighted`, which can run
40//! several futures simultaneously, limiting the concurrency to a maximum *weight*.
41//!
42//! Rather than taking a stream of futures, this adaptor takes a stream of `(usize, future)` pairs,
43//! where the `usize` indicates the weight of each future. This adaptor will schedule and buffer
44//! futures to be run until the maximum weight is exceeded. Once that happens, this adaptor will
45//! wait until some of the currently executing futures complete, and the current weight of running
46//! futures drops below the maximum weight, before scheduling new futures.
47//!
48//! Note that in some cases, the current weight may exceed the maximum weight. For example:
49//!
50//! * Let's say the maximum weight is **24**, and the current weight is **20**.
51//! * If the next future has weight **6**, then it will be scheduled and the current weight will become **26**.
52//! * No new futures will be scheduled until the current weight falls to **23** or below.
53//!
54//! It is possible to have a variant of this adaptor which always stays below the limit and holds
55//! the next future in abeyance; however, the implementation for that variant is a bit more
56//! complicated, and is also not the behavior desired by nextest. This variant may be provided in
57//! the future.
58//!
59//! The weight of a future can be zero, in which case it doesn't count towards the maximum weight.
60//!
61//! If all weights are 1, then `buffer_unordered_weighted` is exactly the same as `buffer_unordered`.
62//!
63//! # Examples
64//!
65//! ```
66//! # futures::executor::block_on(async {
67//! use futures::{channel::oneshot, stream, StreamExt as _};
68//! use buffer_unordered_weighted::{StreamExt as _};
69//!
70//! let (send_one, recv_one) = oneshot::channel();
71//! let (send_two, recv_two) = oneshot::channel();
72//!
73//! let stream_of_futures = stream::iter(vec![(1, recv_one), (2, recv_two)]);
74//! let mut buffered = stream_of_futures.buffer_unordered_weighted(10);
75//!
76//! send_two.send("hello")?;
77//! assert_eq!(buffered.next().await, Some(Ok("hello")));
78//!
79//! send_one.send("world")?;
80//! assert_eq!(buffered.next().await, Some(Ok("world")));
81//!
82//! assert_eq!(buffered.next().await, None);
83//! # Ok::<(), &'static str>(()) }).unwrap();
84//! ```
85//!
86//! # Minimum supported Rust version (MSRV)
87//!
88//! The minimum supported Rust version is **Rust 1.56.**
89//!
90//! The MSRV will likely not change in the medium term, but while this crate is a pre-release
91//! (0.x.x) it may have its MSRV bumped in a patch release. Once this crate has reached 1.x, any
92//! MSRV bump will be accompanied with a new minor version.
93//!
94
95use futures_util::{
96    stream::{Fuse, FuturesUnordered},
97    Future, Stream, StreamExt as _,
98};
99use pin_project_lite::pin_project;
100use private::WeightedFuture;
101use std::{
102    fmt,
103    pin::Pin,
104    task::{Context, Poll},
105};
106
107impl<T: ?Sized> StreamExt for T where T: Stream {}
108
109/// An extension trait for `Stream`s that provides
110/// [`buffer_unordered_weighted`](StreamExt::buffer_unordered_weighted).
111pub trait StreamExt: Stream {
112    /// An adaptor for creating a buffered list of pending futures (unordered), where
113    /// each future has a different weight.
114    ///
115    /// This stream must return values of type `(usize, impl Future)`, where the `usize` indicates
116    /// the weight of each future. This adaptor will buffer futures up to weight `max_weight`, and
117    /// then return the outputs in the order in which they complete.
118    ///
119    /// The weight may be exceeded if the last future to be queued has a weight greater than
120    /// `max_weight` minus the total weight of currently executing futures. However, no further
121    /// futures will be queued until the total weights of running futures falls below `max_weight`.
122    ///
123    /// The adaptor will schedule futures in the order they're returned by the stream, without doing
124    /// any reordering based on weight.
125    ///
126    /// The weight of a future can be zero, in which case it will not count towards the total weight.
127    ///
128    /// The returned stream will be a stream of each future's output.
129    ///
130    /// # Examples
131    ///
132    /// See [the crate documentation](crate#examples) for an example.
133    fn buffer_unordered_weighted<Fut>(self, max_weight: usize) -> BufferUnorderedWeighted<Self>
134    where
135        Self: Sized + Stream<Item = (usize, Fut)>,
136        Fut: Future,
137    {
138        assert_stream::<Fut::Output, _>(BufferUnorderedWeighted::new(self, max_weight))
139    }
140}
141
142pin_project! {
143    /// Stream for the [`buffer_unordered_weighted`](StreamExt::buffer_unordered_weighted) method.
144    #[must_use = "streams do nothing unless polled"]
145    pub struct BufferUnorderedWeighted<St>
146    where
147        St: Stream,
148        St::Item: WeightedFuture,
149     {
150        #[pin]
151        stream: Fuse<St>,
152        in_progress_queue: FuturesUnordered<FutureWithWeight<<St::Item as WeightedFuture>::Future>>,
153        max_weight: usize,
154        current_weight: usize,
155    }
156}
157
158impl<St> fmt::Debug for BufferUnorderedWeighted<St>
159where
160    St: Stream + fmt::Debug,
161    St::Item: WeightedFuture,
162{
163    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164        f.debug_struct("BufferUnorderedWeighted")
165            .field("stream", &self.stream)
166            .field("in_progress_queue", &self.in_progress_queue)
167            .field("max_weight", &self.max_weight)
168            .field("current_weight", &self.current_weight)
169            .finish()
170    }
171}
172
173impl<St> BufferUnorderedWeighted<St>
174where
175    St: Stream,
176    St::Item: WeightedFuture,
177{
178    pub(crate) fn new(stream: St, max_weight: usize) -> Self {
179        Self {
180            stream: stream.fuse(),
181            in_progress_queue: FuturesUnordered::new(),
182            max_weight,
183            current_weight: 0,
184        }
185    }
186
187    /// Returns the maximum weight of futures allowed to be run by this adaptor.
188    pub fn max_weight(&self) -> usize {
189        self.max_weight
190    }
191
192    /// Returns the currently running weight of futures.
193    pub fn current_weight(&self) -> usize {
194        self.current_weight
195    }
196
197    /// Acquires a reference to the underlying sink or stream that this combinator is
198    /// pulling from.
199    pub fn get_ref(&self) -> &St {
200        self.stream.get_ref()
201    }
202
203    /// Acquires a mutable reference to the underlying sink or stream that this
204    /// combinator is pulling from.
205    ///
206    /// Note that care must be taken to avoid tampering with the state of the
207    /// sink or stream which may otherwise confuse this combinator.
208    pub fn get_mut(&mut self) -> &mut St {
209        self.stream.get_mut()
210    }
211
212    /// Acquires a pinned mutable reference to the underlying sink or stream that this
213    /// combinator is pulling from.
214    ///
215    /// Note that care must be taken to avoid tampering with the state of the
216    /// sink or stream which may otherwise confuse this combinator.
217    pub fn get_pin_mut(self: Pin<&mut Self>) -> core::pin::Pin<&mut St> {
218        self.project().stream.get_pin_mut()
219    }
220
221    /// Consumes this combinator, returning the underlying sink or stream.
222    ///
223    /// Note that this may discard intermediate state of this combinator, so
224    /// care should be taken to avoid losing resources when this is called.
225    pub fn into_inner(self) -> St {
226        self.stream.into_inner()
227    }
228}
229
230impl<St> Stream for BufferUnorderedWeighted<St>
231where
232    St: Stream,
233    St::Item: WeightedFuture,
234{
235    type Item = <<St::Item as WeightedFuture>::Future as Future>::Output;
236
237    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
238        let mut this = self.project();
239
240        // First up, try to spawn off as many futures as possible by filling up
241        // our queue of futures.
242        while *this.current_weight < *this.max_weight {
243            match this.stream.as_mut().poll_next(cx) {
244                Poll::Ready(Some(weighted_future)) => {
245                    let (weight, future) = weighted_future.into_components();
246                    *this.current_weight =
247                        this.current_weight.checked_add(weight).unwrap_or_else(|| {
248                            panic!(
249                                "buffer_unordered_weighted: added weight {} to current {}, overflowed",
250                                weight,
251                                this.current_weight,
252                            )
253                        });
254                    this.in_progress_queue
255                        .push(FutureWithWeight::new(weight, future));
256                }
257                Poll::Ready(None) | Poll::Pending => break,
258            }
259        }
260
261        // Attempt to pull the next value from the in_progress_queue
262        match this.in_progress_queue.poll_next_unpin(cx) {
263            Poll::Pending => return Poll::Pending,
264            Poll::Ready(Some((weight, output))) => {
265                *this.current_weight = this.current_weight.checked_sub(weight).unwrap_or_else(|| {
266                    panic!(
267                        "buffer_unordered_weighted: subtracted weight {} from current {}, overflowed",
268                        weight,
269                        this.current_weight,
270                    )
271                });
272                return Poll::Ready(Some(output));
273            }
274            Poll::Ready(None) => {}
275        }
276
277        // If more values are still coming from the stream, we're not done yet
278        if this.stream.is_done() {
279            Poll::Ready(None)
280        } else {
281            Poll::Pending
282        }
283    }
284
285    fn size_hint(&self) -> (usize, Option<usize>) {
286        let queue_len = self.in_progress_queue.len();
287        let (lower, upper) = self.stream.size_hint();
288        let lower = lower.saturating_add(queue_len);
289        let upper = match upper {
290            Some(x) => x.checked_add(queue_len),
291            None => None,
292        };
293        (lower, upper)
294    }
295}
296
297mod private {
298    use futures_util::Future;
299
300    pub trait WeightedFuture {
301        type Future: Future;
302
303        fn into_components(self) -> (usize, Self::Future);
304    }
305
306    impl<Fut> WeightedFuture for (usize, Fut)
307    where
308        Fut: Future,
309    {
310        type Future = Fut;
311
312        #[inline]
313        fn into_components(self) -> (usize, Self::Future) {
314            self
315        }
316    }
317}
318
319pin_project! {
320    #[must_use = "futures do nothing unless polled"]
321    struct FutureWithWeight<Fut> {
322        #[pin]
323        future: Fut,
324        weight: usize,
325    }
326}
327
328impl<Fut> FutureWithWeight<Fut> {
329    pub fn new(weight: usize, future: Fut) -> Self {
330        Self { future, weight }
331    }
332}
333
334impl<Fut> Future for FutureWithWeight<Fut>
335where
336    Fut: Future,
337{
338    type Output = (usize, Fut::Output);
339    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
340        let this = self.project();
341
342        match this.future.poll(cx) {
343            Poll::Pending => Poll::Pending,
344            Poll::Ready(output) => Poll::Ready((*this.weight, output)),
345        }
346    }
347}
348
349pub(crate) fn assert_stream<T, S>(stream: S) -> S
350where
351    S: Stream<Item = T>,
352{
353    stream
354}