futures_stream_ext/lib.rs
1// SPDX-FileCopyrightText: The futures-stream-ext authors
2// SPDX-License-Identifier: MPL-2.0
3
4//! Extensions of the [`Stream`] trait.
5
6// Repetitions of module/type names occur frequently when using many
7// modules for keeping the size of the source files handy. Often
8// types have the same name as their parent module.
9#![allow(clippy::module_name_repetitions)]
10// Repeating the type name in `Default::default()` expressions is not needed
11// as long as the context is obvious.
12#![allow(clippy::default_trait_access)]
13
14use std::num::NonZeroUsize;
15
16use futures_core::Stream;
17
18mod throttle;
19#[cfg(feature = "tokio")]
20pub use self::throttle::IntervalThrottler;
21pub use self::throttle::{Throttle, ThrottleIntervalConfig, Throttler};
22
23/// Interval edge trigger variants
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum IntervalEdge {
26 Leading,
27 Trailing,
28}
29
30/// Extension trait for [`Stream`]
31pub trait StreamExt: Stream {
32 /// Throttle an input stream
33 ///
34 /// The throttler defines the throttling strategy.
35 ///
36 /// The `poll_next_max_ready_count` argument controls how many _Ready_ items
37 /// are polled at once from the input stream during an invocation of
38 /// [`Stream::poll_next()`] before polling the throttler. This limit
39 /// ensures that input streams which are always ready are not polled forever.
40 /// A value of [`NonZeroUsize::MIN`] will poll the input stream only once and
41 /// could be used as a save default. Using a greater value to skip multiple
42 /// items at once will reduce the number of calls to the throttler and
43 /// improves the performance and efficiency.
44 fn throttle<T>(self, throttler: T, poll_next_max_ready_count: NonZeroUsize) -> Throttle<Self, T>
45 where
46 Self: Stream + Sized,
47 T: Throttler<Self::Item>,
48 {
49 Throttle::new(self, throttler, poll_next_max_ready_count)
50 }
51
52 /// Throttle an input stream by using a fixed interval
53 #[cfg(feature = "tokio")]
54 fn throttle_interval(
55 self,
56 config: ThrottleIntervalConfig,
57 poll_next_max_ready_count: std::num::NonZeroUsize,
58 ) -> Throttle<Self, IntervalThrottler<Self::Item>>
59 where
60 Self: Stream + Sized,
61 {
62 let throttler = IntervalThrottler::new(config);
63 self.throttle(throttler, poll_next_max_ready_count)
64 }
65}
66
67impl<S: Stream> StreamExt for S {}