stream_ext/
lib.rs

1use std::marker::Unpin;
2
3use futures::Stream;
4
5#[allow(unreachable_pub)]
6pub use self::limiter::{IntoLimiter, Limiter};
7
8#[cfg(feature = "leaky-bucket")]
9mod rate_limiter;
10
11mod limiter;
12
13impl<T: ?Sized> LimiterExt for T where T: Stream {}
14
15pub trait LimiterExt: Stream {
16    #[inline]
17    fn limiter<L>(self, l: L) -> IntoLimiter<Self, L>
18    where
19        Self: Sized + Stream + Unpin,
20        L: Limiter + Unpin,
21    {
22        assert_stream::<Self::Item, _>(IntoLimiter::new(self, l))
23    }
24
25    #[cfg(feature = "leaky-bucket")]
26    #[inline]
27    fn leaky_bucket_limiter(
28        self,
29        rate_limiter: leaky_bucket::RateLimiter,
30    ) -> IntoLimiter<Self, rate_limiter::LeakyBucketRateLimiter>
31    where
32        Self: Sized + Stream + Unpin,
33    {
34        let l = rate_limiter::LeakyBucketRateLimiter::new(rate_limiter);
35        assert_stream::<Self::Item, _>(IntoLimiter::new(self, l))
36    }
37
38    #[cfg(feature = "governor")]
39    #[inline]
40    fn governor_limiter<D, C, MW>(
41        self,
42        rate_limiter: &governor::RateLimiter<governor::state::NotKeyed, D, C, MW>,
43    ) -> governor::RatelimitedStream<Self, D, C, MW>
44    where
45        D: governor::state::DirectStateStore,
46        C: governor::clock::Clock + governor::clock::ReasonablyRealtime,
47        MW: governor::middleware::RateLimitingMiddleware<
48            C::Instant,
49            NegativeOutcome = governor::NotUntil<<C as governor::clock::Clock>::Instant>,
50        >,
51        Self: Sized + Stream + Unpin,
52        Self::Item: Unpin,
53    {
54        use governor::state::StreamRateLimitExt;
55        assert_stream::<Self::Item, _>(self.ratelimit_stream(rate_limiter))
56    }
57}
58
59// Just a helper function to ensure the streams we're returning all have the
60// right implementations.
61#[inline]
62pub(crate) fn assert_stream<T, S>(stream: S) -> S
63where
64    S: Stream<Item = T>,
65{
66    stream
67}