futures_concurrency/concurrent_stream/
limit.rs

1use pin_project::pin_project;
2
3use super::{ConcurrentStream, Consumer};
4use core::future::Future;
5use core::num::NonZeroUsize;
6use core::pin::Pin;
7
8/// A concurrent iterator that limits the amount of concurrency applied.
9///
10/// This `struct` is created by the [`limit`] method on [`ConcurrentStream`]. See its
11/// documentation for more.
12///
13/// [`limit`]: ConcurrentStream::limit
14/// [`ConcurrentStream`]: trait.ConcurrentStream.html
15#[derive(Debug)]
16pub struct Limit<CS: ConcurrentStream> {
17    inner: CS,
18    limit: Option<NonZeroUsize>,
19}
20
21impl<CS: ConcurrentStream> Limit<CS> {
22    pub(crate) fn new(inner: CS, limit: Option<NonZeroUsize>) -> Self {
23        Self { inner, limit }
24    }
25}
26
27impl<CS: ConcurrentStream> ConcurrentStream for Limit<CS> {
28    type Item = CS::Item;
29    type Future = CS::Future;
30
31    async fn drive<C>(self, consumer: C) -> C::Output
32    where
33        C: Consumer<Self::Item, Self::Future>,
34    {
35        self.inner.drive(LimitConsumer { inner: consumer }).await
36    }
37
38    // NOTE: this is the only interesting bit in this module. When a limit is
39    // set, this now starts using it.
40    fn concurrency_limit(&self) -> Option<NonZeroUsize> {
41        self.limit
42    }
43
44    fn size_hint(&self) -> (usize, Option<usize>) {
45        self.inner.size_hint()
46    }
47}
48
49#[pin_project]
50struct LimitConsumer<C> {
51    #[pin]
52    inner: C,
53}
54impl<C, Item, Fut> Consumer<Item, Fut> for LimitConsumer<C>
55where
56    Fut: Future<Output = Item>,
57    C: Consumer<Item, Fut>,
58{
59    type Output = C::Output;
60
61    async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
62        let this = self.project();
63        this.inner.send(future).await
64    }
65
66    async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
67        let this = self.project();
68        this.inner.progress().await
69    }
70
71    async fn flush(self: Pin<&mut Self>) -> Self::Output {
72        let this = self.project();
73        this.inner.flush().await
74    }
75}