futures_time/stream/
stream_ext.rs

1use crate::channel::Parker;
2use crate::future::{IntoFuture, Timer};
3
4use futures_core::Stream;
5
6use super::{Buffer, Debounce, Delay, IntoStream, Park, Sample, Throttle, Timeout};
7
8/// Extend `Stream` with time-based operations.
9pub trait StreamExt: Stream {
10    /// Yield the last item received at the end of each interval.
11    ///
12    /// If no items have been received during an interval, the stream will not
13    /// yield any items. In addition to using a time-based interval, this method can take any
14    /// stream as a source. This enables throttling based on alternative event
15    /// sources, such as variable-rate timers.
16    ///
17    /// See also [`throttle()`] and [`debounce()`].
18    ///
19    /// [`throttle()`]: StreamExt::throttle
20    /// [`debounce()`]: `StreamExt::debounce`
21    ///
22    /// # Data Loss
23    ///
24    /// This method will discard data between intervals. Though the
25    /// discarded items will have their destuctors run, __using this method
26    /// incorrectly may lead to unintended data loss__. This method is best used
27    /// to reduce the number of _duplicate_ items after the first has been
28    /// received, such as repeated mouse clicks or key presses. This method may
29    /// lead to unintended data loss when used to discard _unique_ items, such
30    /// as network request.
31    ///
32    /// # Example
33    ///
34    /// ```
35    /// use futures_lite::prelude::*;
36    /// use futures_time::prelude::*;
37    /// use futures_time::time::{Instant, Duration};
38    /// use futures_time::stream;
39    ///
40    /// async_io::block_on(async {
41    ///     let mut counter = 0;
42    ///     stream::interval(Duration::from_millis(100))
43    ///         .take(4)
44    ///         .sample(Duration::from_millis(200))
45    ///         .for_each(|_| counter += 1)
46    ///         .await;
47    ///
48    ///     assert_eq!(counter, 2);
49    /// })
50    /// ```
51    fn sample<I>(self, interval: I) -> Sample<Self, I::IntoStream>
52    where
53        Self: Sized,
54        I: IntoStream,
55    {
56        Sample::new(self, interval.into_stream())
57    }
58
59    /// Group items into vectors which are yielded at every interval.
60    ///
61    /// In addition to using a time source as a deadline, any stream can be used as a
62    /// deadline too. This enables more interesting buffer strategies to be
63    /// built on top of this primitive.
64    ///
65    /// # Future Improvements
66    ///
67    /// - Lending iterators would allow for internal reusing of the buffer. Though different from
68    ///   `Iterator::windows`, it could be more efficient.
69    /// - Contexts/capabilities would enable custom allocators to be used.
70    ///
71    /// # Example
72    ///
73    /// ```
74    /// use futures_lite::prelude::*;
75    /// use futures_time::prelude::*;
76    /// use futures_time::time::{Instant, Duration};
77    /// use futures_time::stream;
78    ///
79    /// async_io::block_on(async {
80    ///     let mut counter = 0;
81    ///     stream::interval(Duration::from_millis(5))
82    ///         .take(10)
83    ///         .buffer(Duration::from_millis(20))
84    ///         .for_each(|buf| counter += buf.len())
85    ///         .await;
86    ///
87    ///     assert_eq!(counter, 10);
88    /// })
89    /// ```
90    fn buffer<I>(self, interval: I) -> Buffer<Self, I::IntoStream>
91    where
92        Self: Sized,
93        I: IntoStream,
94    {
95        Buffer::new(self, interval.into_stream())
96    }
97
98    /// Yield the last item received at the end of a window which resets with
99    /// each item received.
100    ///
101    /// Every time an item is yielded by the underlying stream, the window is
102    /// reset. Once the window expires, the last item seen will be yielded. This
103    /// means that in order to yield an item, no items must be received for the
104    /// entire window, or else the window will reset.
105    ///
106    /// This method is useful to perform actions at the end of bursts of events,
107    /// where performing that same action on _every_ event might not be
108    /// economical.
109    ///
110    /// See also [`sample()`] and [`throttle()`].
111    ///
112    /// [`sample()`]: `StreamExt::sample`
113    /// [`throttle()`]: `StreamExt::throttle`
114    ///
115    /// # Example
116    ///
117    /// ```
118    /// use futures_lite::prelude::*;
119    /// use futures_time::prelude::*;
120    /// use futures_time::time::{Instant, Duration};
121    /// use futures_time::stream;
122    ///
123    /// async_io::block_on(async {
124    ///     let mut counter = 0;
125    ///     stream::interval(Duration::from_millis(10))
126    ///         .take(10)
127    ///         .debounce(Duration::from_millis(20)) // the window is greater than the interval
128    ///         .for_each(|_| counter += 1)
129    ///         .await;
130    ///
131    ///     assert_eq!(counter, 1); // so only the last item is received
132    /// })
133    /// ```
134    fn debounce<D>(self, window: D) -> Debounce<Self, D::IntoFuture>
135    where
136        Self: Sized,
137        D: IntoFuture,
138        D::IntoFuture: Timer,
139    {
140        Debounce::new(self, window.into_future())
141    }
142
143    /// Delay the yielding of items from the stream until the given deadline.
144    ///
145    /// The underlying stream will not be polled until the deadline has expired. In addition
146    /// to using a time source as a deadline, any future can be used as a
147    /// deadline too. When used in combination with a multi-consumer channel,
148    /// this method can be used to synchronize the start of multiple streams and futures.
149    ///
150    /// # Example
151    ///
152    /// ```
153    /// use futures_lite::prelude::*;
154    /// use futures_time::prelude::*;
155    /// use futures_time::time::{Instant, Duration};
156    /// use futures_lite::stream;
157    ///
158    /// async_io::block_on(async {
159    ///     let now = Instant::now();
160    ///     let delay = Duration::from_millis(100);
161    ///     let _ = stream::once("meow").delay(delay).next().await;
162    ///     assert!(now.elapsed() >= *delay);
163    /// });
164    /// ```
165    fn delay<D>(self, deadline: D) -> Delay<Self, D::IntoFuture>
166    where
167        Self: Sized,
168        D: IntoFuture,
169    {
170        Delay::new(self, deadline.into_future())
171    }
172
173    /// Suspend or resume execution of a stream.
174    ///
175    /// When this method is called the execution of the stream will be put into
176    /// a suspended state until the channel returns `Parker::Unpark` or the
177    /// channel's senders are dropped. The underlying stream will not be polled
178    /// while the it is paused.
179    fn park<I>(self, interval: I) -> Park<Self, I::IntoStream>
180    where
181        Self: Sized,
182        I: IntoStream<Item = Parker>,
183    {
184        Park::new(self, interval.into_stream())
185    }
186
187    /// Yield an item, then ignore subsequent items for a duration.
188    ///
189    /// In addition to using a time-based interval, this method can take any
190    /// stream as a source. This enables throttling based on alternative event
191    /// sources, such as variable-rate timers.
192    ///
193    /// See also [`sample()`] and [`debounce()`].
194    ///
195    /// [`sample()`]: `StreamExt::sample`
196    /// [`debounce()`]: `StreamExt::debounce`
197    ///
198    /// # Data Loss
199    ///
200    /// This method will discard data between intervals. Though the
201    /// discarded items will have their destuctors run, __using this method
202    /// incorrectly may lead to unintended data loss__. This method is best used
203    /// to reduce the number of _duplicate_ items after the first has been
204    /// received, such as repeated mouse clicks or key presses. This method may
205    /// lead to unintended data loss when used to discard _unique_ items, such
206    /// as network request.
207    ///
208    /// # Examples
209    ///
210    /// ```
211    /// use futures_lite::prelude::*;
212    /// use futures_time::prelude::*;
213    /// use futures_time::time::Duration;
214    /// use futures_time::stream;
215    ///
216    /// async_io::block_on(async {
217    ///     let mut counter = 0;
218    ///     stream::interval(Duration::from_millis(100))  // Yield an item every 100ms
219    ///         .take(4)                                  // Stop after 4 items
220    ///         .throttle(Duration::from_millis(300))     // Only let an item through every 300ms
221    ///         .for_each(|_| counter += 1)               // Increment a counter for each item
222    ///         .await;
223    ///
224    ///     assert_eq!(counter, 2);
225    /// })
226    /// ```
227    fn throttle<I>(self, interval: I) -> Throttle<Self, I::IntoStream>
228    where
229        Self: Sized,
230        I: IntoStream,
231    {
232        Throttle::new(self, interval.into_stream())
233    }
234
235    /// Return an error if a stream does not yield an item within a given time
236    /// span.
237    ///
238    /// Typically timeouts are, as the name implies, based on _time_. However
239    /// this method can time out based on any future. This can be useful in
240    /// combination with channels, as it allows (long-lived) streams to be
241    /// cancelled based on some external event.
242    ///
243    /// When a timeout is returned, the stream will be dropped and destructors
244    /// will be run.
245    ///
246    /// # Example
247    ///
248    /// ```
249    /// use futures_lite::prelude::*;
250    /// use futures_time::prelude::*;
251    /// use futures_time::time::{Instant, Duration};
252    /// use futures_lite::stream;
253    /// use std::io;
254    ///
255    /// async_io::block_on(async {
256    ///     let res = stream::once("meow")
257    ///         .delay(Duration::from_millis(100))  // longer delay
258    ///         .timeout(Duration::from_millis(50)) // shorter timeout
259    ///         .next()
260    ///         .await;
261    ///     assert_eq!(res.unwrap().unwrap_err().kind(), io::ErrorKind::TimedOut); // error
262    ///
263    ///     let res = stream::once("meow")
264    ///         .delay(Duration::from_millis(50))    // shorter delay
265    ///         .timeout(Duration::from_millis(100)) // longer timeout
266    ///         .next()
267    ///         .await;
268    ///     assert_eq!(res.unwrap().unwrap(), "meow"); // success
269    /// });
270    /// ```
271    fn timeout<D>(self, deadline: D) -> Timeout<Self, D::IntoFuture>
272    where
273        Self: Sized,
274        D: IntoFuture,
275        D::IntoFuture: Timer,
276    {
277        Timeout::new(self, deadline.into_future())
278    }
279}
280
281impl<S> StreamExt for S where S: Stream {}