futures_ratelimit/
unordered.rs

1//! Bounded replacements for `FuturesUnordered`.
2
3use std::borrow::BorrowMut;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use futures::stream::{FuturesUnordered, Stream};
8use futures::Future;
9use pin_project::pin_project;
10
11use crate::common::Passthrough;
12
13/// Bounded FuturesUnordered from an iterator.
14///
15/// This struct consumes an iterator where each item
16/// is a future. By using an iterator, we need no additional state,
17/// or additional queues to store the excess of futures.
18///
19/// This should be always preferred over `FuturesUnorderedBounded`,
20/// as it's more performant and adds basically ZERO overhead on top
21/// of the normal `FuturesUnordered`.
22#[pin_project]
23pub struct FuturesUnorderedIter<T, F>
24where
25    F: Future,
26    T: Iterator<Item = F>,
27{
28    max_concurrent: usize,
29    tasks: T,
30    #[pin]
31    running_tasks: FuturesUnordered<F>,
32}
33
34/// Drop-in replacement for `FuturesUnordered`.
35///
36/// This struct behaves exactly like the normal `FuturesUnordered`,
37/// and allows a 100.0% drop-in replacement for `FuturesUnordered`.
38///
39/// Unlike `FuturesUnorderedIter`, this struct allows you to continuously
40/// push futures into it, but it'll only poll N futures at any given time.
41/// This struct internally uses a `Vec` to store the excess of futures.
42#[pin_project]
43pub struct FuturesUnorderedBounded<F>
44where
45    F: Future,
46{
47    max_concurrent: usize,
48    queued_tasks: Vec<F>,
49    #[pin]
50    running_tasks: FuturesUnordered<F>,
51}
52
53impl<T, F> Passthrough<F> for FuturesUnorderedIter<T, F>
54where
55    T: Iterator<Item = F>,
56    F: Future,
57{
58    type FuturesHolder = FuturesUnordered<F>;
59
60    fn set_max_concurrent(&mut self, max_concurrent: usize) {
61        self.max_concurrent = max_concurrent;
62    }
63
64    fn borrow_inner(&self) -> &FuturesUnordered<F> {
65        &self.running_tasks
66    }
67
68    fn borrow_mut_inner(&mut self) -> &mut FuturesUnordered<F> {
69        self.running_tasks.borrow_mut()
70    }
71
72    fn into_inner(self) -> FuturesUnordered<F>
73    where
74        Self: Sized,
75    {
76        self.running_tasks
77    }
78}
79
80impl<F> Passthrough<F> for FuturesUnorderedBounded<F>
81where
82    F: Future,
83{
84    type FuturesHolder = FuturesUnordered<F>;
85
86    fn set_max_concurrent(&mut self, max_concurrent: usize) {
87        self.max_concurrent = max_concurrent;
88    }
89
90    fn borrow_inner(&self) -> &FuturesUnordered<F> {
91        &self.running_tasks
92    }
93
94    fn borrow_mut_inner(&mut self) -> &mut FuturesUnordered<F> {
95        self.running_tasks.borrow_mut()
96    }
97
98    fn into_inner(self) -> FuturesUnordered<F>
99    where
100        Self: Sized,
101    {
102        self.running_tasks
103    }
104}
105
106impl<T, F> Stream for FuturesUnorderedIter<T, F>
107where
108    T: Iterator<Item = F>,
109    F: Future,
110{
111    type Item = F::Output;
112
113    /// Call the `poll()` method on the underlying `FuturesUnordered`.
114    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
115        let mut this = self.project();
116        match this.running_tasks.as_mut().poll_next(cx) {
117            Poll::Ready(Some(value)) => {
118                while this.running_tasks.len() < *this.max_concurrent {
119                    match this.tasks.next() {
120                        Some(future) => this.running_tasks.push(future),
121                        _ => break,
122                    }
123                }
124                Poll::Ready(Some(value))
125            }
126            Poll::Ready(None) => Poll::Ready(None),
127            Poll::Pending => Poll::Pending,
128        }
129    }
130}
131
132impl<F> futures::stream::Stream for FuturesUnorderedBounded<F>
133where
134    F: Future,
135{
136    type Item = F::Output;
137
138    /// Calls the `poll()` method on the underlying `FuturesUnordered`.
139    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
140        let mut this = self.project();
141        match this.running_tasks.as_mut().poll_next(cx) {
142            Poll::Ready(Some(value)) => {
143                while this.running_tasks.len() < *this.max_concurrent {
144                    match this.queued_tasks.pop() {
145                        Some(future) => this.running_tasks.push(future),
146                        _ => break,
147                    }
148                }
149                Poll::Ready(Some(value))
150            }
151            Poll::Ready(None) => Poll::Ready(None),
152            Poll::Pending => Poll::Pending,
153        }
154    }
155}
156
157impl<T, F> FuturesUnorderedIter<T, F>
158where
159    F: Future,
160    T: Iterator<Item = F>,
161{
162    /// Creates a new bounded `FuturesUnordered` from an iterator.
163    /// This option is as efficient as it gets since it completely avoids
164    /// allocating a new vector to store the excess of futures.
165    ///
166    /// Since iterators are lazily evaluated, futures will be created on the
167    /// fly as well.
168    ///
169    /// Panics if `max_concurrent` is 0.
170    /// ```rust
171    /// use futures_ratelimit::unordered::FuturesUnorderedIter;
172    /// use futures_ratelimit::common::Passthrough;
173    /// use futures::StreamExt;
174    ///
175    /// async fn dummy() -> u64{
176    ///     42
177    /// }
178    ///
179    /// let tasks = (0..100).into_iter().map(|_| dummy());
180    /// let mut fut_unordered = FuturesUnorderedIter::new(5, tasks);
181    ///
182    /// // The internal FuturesUnorderedBounded will have 5 futures at most.
183    /// // The rest of futures will be pulled from the iterator as needed.
184    ///
185    /// tokio_test::block_on(async move{
186    ///     assert_eq!(fut_unordered.borrow_inner().len(), 5);
187    ///
188    ///     while let Some(value) = fut_unordered.next().await {
189    ///         println!("{}", value);
190    ///         assert!(fut_unordered.borrow_inner().len() <= 5);
191    ///     }
192    ///
193    ///     assert_eq!(fut_unordered.borrow_inner().len(), 0);
194    ///     assert!(fut_unordered.borrow_inner().is_empty());
195    ///
196    /// });
197    ///
198    /// ```
199    pub fn new<I: IntoIterator<IntoIter = T>>(max_concurrent: usize, tasks: I) -> Self {
200        let running_tasks = FuturesUnordered::new();
201        assert!(max_concurrent > 0, "max_concurrent must be greater than 0");
202        let mut tasks = tasks.into_iter();
203        // Immediately put futures in the queue
204        tasks
205            .borrow_mut()
206            .take(max_concurrent)
207            .for_each(|future| running_tasks.push(future));
208
209        Self {
210            max_concurrent,
211            tasks,
212            running_tasks,
213        }
214    }
215}
216
217impl<F> FuturesUnorderedBounded<F>
218where
219    F: Future,
220{
221    /// Creates a new bounded `FuturesUnordered`. This struct
222    /// is 100% compatible with the normal `FuturesUnordered`.
223    ///
224    /// Panics if `max_concurrent` is 0.
225    /// ```rust
226    /// use futures_ratelimit::unordered::FuturesUnorderedBounded;
227    /// use futures_ratelimit::common::Passthrough;
228    /// use futures::StreamExt;
229    ///
230    /// async fn dummy() -> u64{
231    ///     42
232    /// }
233    ///
234    /// let mut fut_unordered = FuturesUnorderedBounded::new(5);
235    /// for _ in 0..10 {
236    ///     fut_unordered.push(dummy());
237    /// }
238    /// // The internal FuturesUnorderedBounded will have 5 futures at most.
239    /// // All other remaining futures will be stored in the internal queue,
240    /// // and will be consumed as needed.
241    ///
242    /// tokio_test::block_on(async move{
243    ///     assert_eq!(fut_unordered.borrow_inner().len(), 5);
244    ///
245    ///     while let Some(value) = fut_unordered.next().await {
246    ///         println!("{}", value);
247    ///         assert!(fut_unordered.borrow_inner().len() <= 5);
248    ///     }
249    ///
250    ///     assert_eq!(fut_unordered.borrow_inner().len(), 0);
251    ///     assert!(fut_unordered.borrow_inner().is_empty());
252    ///
253    /// });
254    ///
255    /// ```
256    pub fn new(max_concurrent: usize) -> Self {
257        let running_tasks = FuturesUnordered::new();
258        assert!(max_concurrent > 0, "max_concurrent must be greater than 0");
259        Self {
260            max_concurrent,
261            queued_tasks: Vec::new(),
262            running_tasks,
263        }
264    }
265
266    /// Enqueues a new future into the `FuturesUnorderedBounded`.
267    ///
268    /// If more than `max_concurrent` futures are already in the
269    /// internal queue, the excess of futures will be stored in another
270    /// queue and will only get pulled as needed.
271    pub fn push(&mut self, fut: F) {
272        match self.borrow_inner().len() < self.max_concurrent {
273            true => self.borrow_mut_inner().push(fut),
274            false => self.queued_tasks.push(fut),
275        }
276    }
277
278    /// Mutably borrow the internal queue task list.
279    pub fn borrow_mut_queue(&mut self) -> &mut Vec<F> {
280        &mut self.queued_tasks
281    }
282
283    /// Borrow the internal queue task list.
284    pub fn borrow_queue(&self) -> &Vec<F> {
285        &self.queued_tasks
286    }
287
288    /// Clear the internal queue.
289    /// This will clear both the queue used to store the
290    /// excess of futures and the internal queue as well.
291    pub fn clear_queue(&mut self) {
292        self.queued_tasks.clear();
293        self.borrow_mut_inner().clear();
294    }
295
296    /// Return the length of the inner queue.
297    pub fn len_queue(&self) -> usize {
298        self.queued_tasks.len()
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use std::{
305        cmp,
306        sync::{atomic::AtomicU8, Arc},
307        time::Duration,
308    };
309
310    use futures::StreamExt;
311
312    use super::*;
313
314    fn create_tasks() -> Vec<impl Future<Output = u64>> {
315        const MAX_FUTURES: u64 = 100;
316        let tasks = (0..MAX_FUTURES).map(|i| dummy(i)).collect::<Vec<_>>();
317        tasks
318    }
319
320    /// Dummy function that receives and
321    /// returns a `u64` value.
322    async fn dummy(val: u64) -> u64 {
323        val
324    }
325
326    /// Dummy function that increments and decrements a counter.
327    /// This is useful to check if more than N tasks are running concurrently.
328    async fn dummy_checked(val: Arc<AtomicU8>) -> u8 {
329        val.fetch_add(1, std::sync::atomic::Ordering::Acquire);
330        let max = val.load(std::sync::atomic::Ordering::Relaxed);
331        tokio::time::sleep(Duration::from_nanos(100)).await;
332        val.fetch_sub(1, std::sync::atomic::Ordering::Release);
333        max
334    }
335
336    #[tokio::test]
337    async fn test_futures_unordered_iter() {
338        let tasks = create_tasks();
339        let max_concurrent: u64 = 10;
340
341        let mut fut_iter = FuturesUnorderedIter::new(max_concurrent as usize, tasks);
342
343        let mut counter = 0;
344        while let Some(result) = fut_iter.next().await {
345            assert_eq!(result, counter);
346            counter += 1;
347            // Inner struct should always have less than `max_concurrent` futures.
348            assert!(fut_iter.borrow_inner().len() <= max_concurrent as usize);
349        }
350    }
351
352    #[tokio::test]
353    async fn test_futures_unordered() {
354        let mut fut_iter = FuturesUnorderedBounded::new(10);
355        for i in 0..100 {
356            fut_iter.push(dummy(i as u64));
357        }
358        // Run poll just 1 time
359        let result = fut_iter.next().await;
360
361        assert!(result.is_some());
362        assert_eq!(fut_iter.len_queue(), 89); // we consumed 1 future
363
364        // We should have 10 enqueued futures
365        assert_eq!(fut_iter.borrow_inner().len(), 10);
366        let _result = fut_iter.next().await;
367        // 2 consumed futures at this point
368        assert_eq!(fut_iter.len_queue(), 88);
369        // We should (still) have 10 enqueued futures
370        assert_eq!(fut_iter.borrow_inner().len(), 10);
371        // Test clear queue
372        fut_iter.clear_queue();
373        assert_eq!(fut_iter.len_queue(), 0);
374        assert!(fut_iter.borrow_inner().is_empty());
375        assert_eq!(fut_iter.borrow_inner().len(), 0);
376
377        let result = fut_iter.next().await;
378        assert!(result.is_none());
379        // Test push logic
380        for i in 0..100 {
381            fut_iter.push(dummy(i as u64));
382        }
383        assert_eq!(fut_iter.len_queue(), 90);
384        assert_eq!(fut_iter.borrow_inner().len(), 10);
385
386        let result = fut_iter.next().await;
387        assert!(result.is_some());
388        // we consumed 1 future
389        assert_eq!(fut_iter.len_queue(), 89);
390        // We should have 10 enqueued futures
391        assert_eq!(fut_iter.borrow_inner().len(), 10);
392        // Test borrow* methods
393        assert_eq!(fut_iter.borrow_queue().len(), 89);
394        assert_eq!(fut_iter.borrow_mut_queue().len(), 89);
395
396        assert_eq!(fut_iter.into_inner().len(), 10);
397    }
398
399    #[tokio::test]
400    async fn test_max_concurrent() {
401        // Test from iter
402        let test_value = Arc::new(AtomicU8::new(0));
403        let tasks = (0..50)
404            .map(|_| dummy_checked(Arc::clone(&test_value)))
405            .collect::<Vec<_>>();
406
407        let mut fut_iter = FuturesUnorderedIter::new(10, tasks);
408        let mut max_so_far = 0;
409        let mut count = 0;
410        while let Some(max_res) = fut_iter.next().await {
411            max_so_far = cmp::max(max_so_far, max_res);
412            // Don't consume all the futures - we want to change the limit at runtime
413            if count > 10 {
414                fut_iter.set_max_concurrent(20);
415                break;
416            }
417            count += 1;
418        }
419        assert_eq!(max_so_far, 10);
420        // Ensure the new capacity limit was applied
421        let max = fut_iter.collect::<Vec<_>>().await.into_iter().max();
422        assert_eq!(max, Some(20));
423
424        // Test bounded one
425        let test_value = Arc::new(AtomicU8::new(0));
426        let mut fut_iter = FuturesUnorderedBounded::new(10);
427        for _ in 0..50 {
428            fut_iter.push(dummy_checked(Arc::clone(&test_value)));
429        }
430        let mut max_so_far = 0;
431        while let Some(max_res) = fut_iter.next().await {
432            max_so_far = cmp::max(max_so_far, max_res);
433        }
434        assert_eq!(max_so_far, 10);
435        // Change capacity
436        fut_iter.set_max_concurrent(20);
437        for _ in 0..50 {
438            fut_iter.push(dummy_checked(Arc::clone(&test_value)));
439        }
440        while let Some(max_res) = fut_iter.next().await {
441            max_so_far = cmp::max(max_so_far, max_res);
442        }
443        assert_eq!(max_so_far, 20);
444    }
445
446    #[tokio::test]
447    async fn test_iter_inner() {
448        let max_concurrent: u64 = 10;
449        let tasks = (0..100).map(|i| dummy(i)).collect::<Vec<_>>();
450
451        let mut fut_iter = FuturesUnorderedIter::new(max_concurrent as usize, tasks);
452
453        assert_eq!(fut_iter.borrow_inner().len(), 10);
454        assert_eq!(fut_iter.borrow_inner().is_empty(), false);
455        {
456            let inner = fut_iter.borrow_inner();
457            assert_eq!(inner.len(), 10);
458        }
459        {
460            let inner = fut_iter.borrow_mut_inner();
461            assert_eq!(inner.len(), 10);
462        }
463        // Bypass control mechanism
464        fut_iter.borrow_mut_inner().push(dummy(123));
465        assert_eq!(fut_iter.borrow_inner().len(), 11);
466
467        // Clear internal struct
468        fut_iter.borrow_mut_inner().clear();
469        assert_eq!(fut_iter.borrow_inner().len(), 0);
470
471        let inner = fut_iter.into_inner();
472        assert_eq!(inner.len(), 0);
473    }
474}