futures_concurrency/concurrent_stream/
limit.rs1use pin_project::pin_project;
2
3use super::{ConcurrentStream, Consumer};
4use core::future::Future;
5use core::num::NonZeroUsize;
6use core::pin::Pin;
7
8#[derive(Debug)]
16pub struct Limit<CS: ConcurrentStream> {
17 inner: CS,
18 limit: Option<NonZeroUsize>,
19}
20
21impl<CS: ConcurrentStream> Limit<CS> {
22 pub(crate) fn new(inner: CS, limit: Option<NonZeroUsize>) -> Self {
23 Self { inner, limit }
24 }
25}
26
27impl<CS: ConcurrentStream> ConcurrentStream for Limit<CS> {
28 type Item = CS::Item;
29 type Future = CS::Future;
30
31 async fn drive<C>(self, consumer: C) -> C::Output
32 where
33 C: Consumer<Self::Item, Self::Future>,
34 {
35 self.inner.drive(LimitConsumer { inner: consumer }).await
36 }
37
38 fn concurrency_limit(&self) -> Option<NonZeroUsize> {
41 self.limit
42 }
43
44 fn size_hint(&self) -> (usize, Option<usize>) {
45 self.inner.size_hint()
46 }
47}
48
49#[pin_project]
50struct LimitConsumer<C> {
51 #[pin]
52 inner: C,
53}
54impl<C, Item, Fut> Consumer<Item, Fut> for LimitConsumer<C>
55where
56 Fut: Future<Output = Item>,
57 C: Consumer<Item, Fut>,
58{
59 type Output = C::Output;
60
61 async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
62 let this = self.project();
63 this.inner.send(future).await
64 }
65
66 async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
67 let this = self.project();
68 this.inner.progress().await
69 }
70
71 async fn flush(self: Pin<&mut Self>) -> Self::Output {
72 let this = self.project();
73 this.inner.flush().await
74 }
75}