1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#![warn(missing_docs)]
#![crate_name = "streamtools"]
#![cfg_attr(all(doc, CHANNEL_NIGHTLY), feature(doc_auto_cfg))]

//! Additional stream combinators.
//!
//! ## Feature flags
//!
//! - `tokio-time`: Enables combinators which depend on the tokio crate and its time feature, in particular:
//!   - [`sample_by_duration`](crate::StreamTools::sample_by_duration)
//!   - [`sample_by_interval`](crate::StreamTools::sample_by_interval)
//! - `test-util`: Exposes utilities for testing streams, in particular:
//!   - [`delay_items`](crate::test_util::delay_items)
//!   - [`record_delay`](crate::StreamTools::record_delay)
#![doc(html_root_url = "https://docs.rs/streamtools/0.7.5/")]

use futures::{stream::Map, Stream};

mod fast_forward;
mod flatten_switch;
mod outer_waker;
mod sample;

#[cfg(feature = "tokio-time")]
mod throttle_last;

#[cfg(feature = "test-util")]
mod record_delay;

/// Utilities for testing streams
#[cfg(feature = "test-util")]
pub mod test_util;

pub use fast_forward::*;
pub use flatten_switch::*;
pub use sample::*;

#[cfg(feature = "tokio-time")]
pub use throttle_last::*;

#[cfg(feature = "test-util")]
pub use record_delay::*;

/// An extension trait for the [`Stream`] trait that provides a variety of
/// convenient combinator functions.
///
/// [`Stream`]: crate::Stream
/// [futures]: https://docs.rs/futures
/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
pub trait StreamTools: Stream {
    /// Fast-forwards to the latest value on the underlying stream by polling the underyling until it is [`Pending`].
    ///
    /// When the underlying stream terminates, this stream will yield the last value on the underlying
    /// stream, if it has not already been yielded.
    ///
    /// This behaves like a [`WatchStream`] but can be applied to arbitrary streams without requiring a channel.
    ///
    /// [`Pending`]: std::task::Poll#variant.Pending
    /// [`WatchStream`]: https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/struct.WatchStream.html
    fn fast_forward(self) -> FastForward<Self>
    where
        Self: Sized,
    {
        let stream = FastForward::new(self);
        assert_stream::<Self::Item, _>(stream)
    }

    /// Flattens a stream of streams into just one continuous stream. Polls only the latest
    /// inner stream.
    ///
    /// The stream terminates when the outer stream terminates.
    ///
    /// This mirrors the behaviour of the [Switch](https://reactivex.io/documentation/operators/switch.html) operator in [ReactiveX](https://reactivex.io/).
    fn flatten_switch(self) -> FlattenSwitch<Self>
    where
        Self::Item: Stream,
        Self: Sized,
    {
        let stream = FlattenSwitch::new(self);
        assert_stream::<<Self::Item as Stream>::Item, _>(stream)
    }

    /// Maps a stream like [`StreamExt::map`] but flattens nested [Stream]s using [`flatten_switch`](Self::flatten_switch).
    ///
    /// [`StreamExt::map`]: futures::StreamExt
    fn flat_map_switch<U, F>(self, f: F) -> FlattenSwitch<Map<Self, F>>
    where
        F: FnMut(Self::Item) -> U,
        U: Stream,
        Self: Sized,
    {
        let stream = FlattenSwitch::new(futures::StreamExt::map(self, f));
        assert_stream::<U::Item, _>(stream)
    }

    /// Samples values from the stream when the sampler yields.
    ///
    /// The stream terminates when either the input stream or the sampler stream terminate.
    ///
    /// If no value was seen on the input stream when the sampler yields, nothing will be yielded.
    ///
    /// This mirrors the behaviour of the [Sample](https://reactivex.io/documentation/operators/sample.html) operator in [ReactiveX](https://reactivex.io/).
    fn sample<S: Stream>(self, sampler: S) -> Sample<Self, S>
    where
        Self: Sized,
    {
        let stream = Sample::new(self, sampler);
        assert_stream(stream)
    }

    /// Samples values from the stream at intervals of length `duration`. This is a convenience method which invokes [`sample_by_interval`](StreamTools::sample_by_interval).
    ///
    /// The stream terminates when the input stream terminates.
    ///
    /// Uses the default [`MissedTickBehavior`] to create an [`Interval`]. If another is needed, then configure it on an [`Interval`] and
    /// use [`sample_by_interval`](StreamTools::sample_by_interval) instead of this method.
    ///
    /// This mirrors the behaviour of the [Sample](https://reactivex.io/documentation/operators/sample.html) operator in [ReactiveX](https://reactivex.io/).
    ///
    /// [`MissedTickBehavior`]: https://docs.rs/tokio/latest/tokio/time/enum.MissedTickBehavior.html
    /// [`Interval`]: https://docs.rs/tokio/latest/tokio/time/struct.Interval.html
    #[cfg(feature = "tokio-time")]
    fn sample_by_duration(
        self,
        duration: std::time::Duration,
    ) -> Sample<Self, tokio_stream::wrappers::IntervalStream>
    where
        Self: Sized,
    {
        self.sample_by_interval(tokio::time::interval(duration))
    }

    /// Samples values from the stream at intervals. This is a convenience method which invokes [`sample`](StreamTools::sample).
    ///
    /// The stream terminates when the input stream terminates.
    ///
    /// This mirrors the behaviour of the [Sample](https://reactivex.io/documentation/operators/sample.html) operator in [ReactiveX](https://reactivex.io/).
    #[cfg(feature = "tokio-time")]
    fn sample_by_interval(
        self,
        interval: tokio::time::Interval,
    ) -> Sample<Self, tokio_stream::wrappers::IntervalStream>
    where
        Self: Sized,
    {
        let sampler = tokio_stream::wrappers::IntervalStream::new(interval);
        self.sample(sampler)
    }

    /// Throttles values from the stream at intervals of length `duration`, skipping all but the last value seen in each interval.
    ///
    /// Note that this behaves exactly the same as applying [`fast_forward`] followed by tokio's [`throttle`].
    ///
    /// The stream terminates after the input stream terminates and any pending timeout expires for the throttling.
    ///
    /// [`fast_forward`]: Self::fast_forward
    /// [`throttle`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.throttle
    #[cfg(feature = "tokio-time")]
    fn throttle_last<'a>(self, duration: std::time::Duration) -> ThrottleLast<Self>
    where
        Self: Sized + Send + 'a,
    {
        ThrottleLast::new(duration, self)
    }

    /// Records the duration relative to the time this method was called at which each
    /// item is yielded from the stream.
    #[cfg(feature = "test-util")]
    fn record_delay(self) -> RecordDelay<Self>
    where
        Self: Sized,
    {
        RecordDelay::new(self)
    }
}

impl<T: Stream> StreamTools for T {}

// Just a helper function to ensure the streams we're returning all have the
// right implementations.
pub(crate) fn assert_stream<T, S>(stream: S) -> S
where
    S: Stream<Item = T>,
{
    stream
}