1use std::marker::Unpin;
2
3use futures::Stream;
4
5#[allow(unreachable_pub)]
6pub use self::limiter::{IntoLimiter, Limiter};
7
8#[cfg(feature = "leaky-bucket")]
9mod rate_limiter;
10
11mod limiter;
12
13impl<T: ?Sized> LimiterExt for T where T: Stream {}
14
15pub trait LimiterExt: Stream {
16 #[inline]
17 fn limiter<L>(self, l: L) -> IntoLimiter<Self, L>
18 where
19 Self: Sized + Stream + Unpin,
20 L: Limiter + Unpin,
21 {
22 assert_stream::<Self::Item, _>(IntoLimiter::new(self, l))
23 }
24
25 #[cfg(feature = "leaky-bucket")]
26 #[inline]
27 fn leaky_bucket_limiter(
28 self,
29 rate_limiter: leaky_bucket::RateLimiter,
30 ) -> IntoLimiter<Self, rate_limiter::LeakyBucketRateLimiter>
31 where
32 Self: Sized + Stream + Unpin,
33 {
34 let l = rate_limiter::LeakyBucketRateLimiter::new(rate_limiter);
35 assert_stream::<Self::Item, _>(IntoLimiter::new(self, l))
36 }
37
38 #[cfg(feature = "governor")]
39 #[inline]
40 fn governor_limiter<D, C, MW>(
41 self,
42 rate_limiter: &governor::RateLimiter<governor::state::NotKeyed, D, C, MW>,
43 ) -> governor::RatelimitedStream<Self, D, C, MW>
44 where
45 D: governor::state::DirectStateStore,
46 C: governor::clock::Clock + governor::clock::ReasonablyRealtime,
47 MW: governor::middleware::RateLimitingMiddleware<
48 C::Instant,
49 NegativeOutcome = governor::NotUntil<<C as governor::clock::Clock>::Instant>,
50 >,
51 Self: Sized + Stream + Unpin,
52 Self::Item: Unpin,
53 {
54 use governor::state::StreamRateLimitExt;
55 assert_stream::<Self::Item, _>(self.ratelimit_stream(rate_limiter))
56 }
57}
58
59#[inline]
62pub(crate) fn assert_stream<T, S>(stream: S) -> S
63where
64 S: Stream<Item = T>,
65{
66 stream
67}