futures_stream_ext/
lib.rs

1// SPDX-FileCopyrightText: The futures-stream-ext authors
2// SPDX-License-Identifier: MPL-2.0
3
4//! Extensions of the [`Stream`] trait.
5
6// Repetitions of module/type names occur frequently when using many
7// modules for keeping the size of the source files handy. Often
8// types have the same name as their parent module.
9#![allow(clippy::module_name_repetitions)]
10// Repeating the type name in `Default::default()` expressions is not needed
11// as long as the context is obvious.
12#![allow(clippy::default_trait_access)]
13
14use std::num::NonZeroUsize;
15
16use futures_core::Stream;
17
18mod throttle;
19#[cfg(feature = "tokio")]
20pub use self::throttle::IntervalThrottler;
21pub use self::throttle::{Throttle, ThrottleIntervalConfig, Throttler};
22
23/// Interval edge trigger variants
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum IntervalEdge {
26    Leading,
27    Trailing,
28}
29
30/// Extension trait for [`Stream`]
31pub trait StreamExt: Stream {
32    /// Throttle an input stream
33    ///
34    /// The throttler defines the throttling strategy.
35    ///
36    /// The `poll_next_max_ready_count` argument controls how many _Ready_ items
37    /// are polled at once from the input stream during an invocation of
38    /// [`Stream::poll_next()`] before polling the throttler. This limit
39    /// ensures that input streams which are always ready are not polled forever.
40    /// A value of [`NonZeroUsize::MIN`] will poll the input stream only once and
41    /// could be used as a save default. Using a greater value to skip multiple
42    /// items at once will reduce the number of calls to the throttler and
43    /// improves the performance and efficiency.
44    fn throttle<T>(self, throttler: T, poll_next_max_ready_count: NonZeroUsize) -> Throttle<Self, T>
45    where
46        Self: Stream + Sized,
47        T: Throttler<Self::Item>,
48    {
49        Throttle::new(self, throttler, poll_next_max_ready_count)
50    }
51
52    /// Throttle an input stream by using a fixed interval
53    #[cfg(feature = "tokio")]
54    fn throttle_interval(
55        self,
56        config: ThrottleIntervalConfig,
57        poll_next_max_ready_count: std::num::NonZeroUsize,
58    ) -> Throttle<Self, IntervalThrottler<Self::Item>>
59    where
60        Self: Stream + Sized,
61    {
62        let throttler = IntervalThrottler::new(config);
63        self.throttle(throttler, poll_next_max_ready_count)
64    }
65}
66
67impl<S: Stream> StreamExt for S {}