futures_time/stream/stream_ext.rs
1use crate::channel::Parker;
2use crate::future::{IntoFuture, Timer};
3
4use futures_core::Stream;
5
6use super::{Buffer, Debounce, Delay, IntoStream, Park, Sample, Throttle, Timeout};
7
8/// Extend `Stream` with time-based operations.
9pub trait StreamExt: Stream {
10 /// Yield the last item received at the end of each interval.
11 ///
12 /// If no items have been received during an interval, the stream will not
13 /// yield any items. In addition to using a time-based interval, this method can take any
14 /// stream as a source. This enables throttling based on alternative event
15 /// sources, such as variable-rate timers.
16 ///
17 /// See also [`throttle()`] and [`debounce()`].
18 ///
19 /// [`throttle()`]: StreamExt::throttle
20 /// [`debounce()`]: `StreamExt::debounce`
21 ///
22 /// # Data Loss
23 ///
24 /// This method will discard data between intervals. Though the
25 /// discarded items will have their destuctors run, __using this method
26 /// incorrectly may lead to unintended data loss__. This method is best used
27 /// to reduce the number of _duplicate_ items after the first has been
28 /// received, such as repeated mouse clicks or key presses. This method may
29 /// lead to unintended data loss when used to discard _unique_ items, such
30 /// as network request.
31 ///
32 /// # Example
33 ///
34 /// ```
35 /// use futures_lite::prelude::*;
36 /// use futures_time::prelude::*;
37 /// use futures_time::time::{Instant, Duration};
38 /// use futures_time::stream;
39 ///
40 /// async_io::block_on(async {
41 /// let mut counter = 0;
42 /// stream::interval(Duration::from_millis(100))
43 /// .take(4)
44 /// .sample(Duration::from_millis(200))
45 /// .for_each(|_| counter += 1)
46 /// .await;
47 ///
48 /// assert_eq!(counter, 2);
49 /// })
50 /// ```
51 fn sample<I>(self, interval: I) -> Sample<Self, I::IntoStream>
52 where
53 Self: Sized,
54 I: IntoStream,
55 {
56 Sample::new(self, interval.into_stream())
57 }
58
59 /// Group items into vectors which are yielded at every interval.
60 ///
61 /// In addition to using a time source as a deadline, any stream can be used as a
62 /// deadline too. This enables more interesting buffer strategies to be
63 /// built on top of this primitive.
64 ///
65 /// # Future Improvements
66 ///
67 /// - Lending iterators would allow for internal reusing of the buffer. Though different from
68 /// `Iterator::windows`, it could be more efficient.
69 /// - Contexts/capabilities would enable custom allocators to be used.
70 ///
71 /// # Example
72 ///
73 /// ```
74 /// use futures_lite::prelude::*;
75 /// use futures_time::prelude::*;
76 /// use futures_time::time::{Instant, Duration};
77 /// use futures_time::stream;
78 ///
79 /// async_io::block_on(async {
80 /// let mut counter = 0;
81 /// stream::interval(Duration::from_millis(5))
82 /// .take(10)
83 /// .buffer(Duration::from_millis(20))
84 /// .for_each(|buf| counter += buf.len())
85 /// .await;
86 ///
87 /// assert_eq!(counter, 10);
88 /// })
89 /// ```
90 fn buffer<I>(self, interval: I) -> Buffer<Self, I::IntoStream>
91 where
92 Self: Sized,
93 I: IntoStream,
94 {
95 Buffer::new(self, interval.into_stream())
96 }
97
98 /// Yield the last item received at the end of a window which resets with
99 /// each item received.
100 ///
101 /// Every time an item is yielded by the underlying stream, the window is
102 /// reset. Once the window expires, the last item seen will be yielded. This
103 /// means that in order to yield an item, no items must be received for the
104 /// entire window, or else the window will reset.
105 ///
106 /// This method is useful to perform actions at the end of bursts of events,
107 /// where performing that same action on _every_ event might not be
108 /// economical.
109 ///
110 /// See also [`sample()`] and [`throttle()`].
111 ///
112 /// [`sample()`]: `StreamExt::sample`
113 /// [`throttle()`]: `StreamExt::throttle`
114 ///
115 /// # Example
116 ///
117 /// ```
118 /// use futures_lite::prelude::*;
119 /// use futures_time::prelude::*;
120 /// use futures_time::time::{Instant, Duration};
121 /// use futures_time::stream;
122 ///
123 /// async_io::block_on(async {
124 /// let mut counter = 0;
125 /// stream::interval(Duration::from_millis(10))
126 /// .take(10)
127 /// .debounce(Duration::from_millis(20)) // the window is greater than the interval
128 /// .for_each(|_| counter += 1)
129 /// .await;
130 ///
131 /// assert_eq!(counter, 1); // so only the last item is received
132 /// })
133 /// ```
134 fn debounce<D>(self, window: D) -> Debounce<Self, D::IntoFuture>
135 where
136 Self: Sized,
137 D: IntoFuture,
138 D::IntoFuture: Timer,
139 {
140 Debounce::new(self, window.into_future())
141 }
142
143 /// Delay the yielding of items from the stream until the given deadline.
144 ///
145 /// The underlying stream will not be polled until the deadline has expired. In addition
146 /// to using a time source as a deadline, any future can be used as a
147 /// deadline too. When used in combination with a multi-consumer channel,
148 /// this method can be used to synchronize the start of multiple streams and futures.
149 ///
150 /// # Example
151 ///
152 /// ```
153 /// use futures_lite::prelude::*;
154 /// use futures_time::prelude::*;
155 /// use futures_time::time::{Instant, Duration};
156 /// use futures_lite::stream;
157 ///
158 /// async_io::block_on(async {
159 /// let now = Instant::now();
160 /// let delay = Duration::from_millis(100);
161 /// let _ = stream::once("meow").delay(delay).next().await;
162 /// assert!(now.elapsed() >= *delay);
163 /// });
164 /// ```
165 fn delay<D>(self, deadline: D) -> Delay<Self, D::IntoFuture>
166 where
167 Self: Sized,
168 D: IntoFuture,
169 {
170 Delay::new(self, deadline.into_future())
171 }
172
173 /// Suspend or resume execution of a stream.
174 ///
175 /// When this method is called the execution of the stream will be put into
176 /// a suspended state until the channel returns `Parker::Unpark` or the
177 /// channel's senders are dropped. The underlying stream will not be polled
178 /// while the it is paused.
179 fn park<I>(self, interval: I) -> Park<Self, I::IntoStream>
180 where
181 Self: Sized,
182 I: IntoStream<Item = Parker>,
183 {
184 Park::new(self, interval.into_stream())
185 }
186
187 /// Yield an item, then ignore subsequent items for a duration.
188 ///
189 /// In addition to using a time-based interval, this method can take any
190 /// stream as a source. This enables throttling based on alternative event
191 /// sources, such as variable-rate timers.
192 ///
193 /// See also [`sample()`] and [`debounce()`].
194 ///
195 /// [`sample()`]: `StreamExt::sample`
196 /// [`debounce()`]: `StreamExt::debounce`
197 ///
198 /// # Data Loss
199 ///
200 /// This method will discard data between intervals. Though the
201 /// discarded items will have their destuctors run, __using this method
202 /// incorrectly may lead to unintended data loss__. This method is best used
203 /// to reduce the number of _duplicate_ items after the first has been
204 /// received, such as repeated mouse clicks or key presses. This method may
205 /// lead to unintended data loss when used to discard _unique_ items, such
206 /// as network request.
207 ///
208 /// # Examples
209 ///
210 /// ```
211 /// use futures_lite::prelude::*;
212 /// use futures_time::prelude::*;
213 /// use futures_time::time::Duration;
214 /// use futures_time::stream;
215 ///
216 /// async_io::block_on(async {
217 /// let mut counter = 0;
218 /// stream::interval(Duration::from_millis(100)) // Yield an item every 100ms
219 /// .take(4) // Stop after 4 items
220 /// .throttle(Duration::from_millis(300)) // Only let an item through every 300ms
221 /// .for_each(|_| counter += 1) // Increment a counter for each item
222 /// .await;
223 ///
224 /// assert_eq!(counter, 2);
225 /// })
226 /// ```
227 fn throttle<I>(self, interval: I) -> Throttle<Self, I::IntoStream>
228 where
229 Self: Sized,
230 I: IntoStream,
231 {
232 Throttle::new(self, interval.into_stream())
233 }
234
235 /// Return an error if a stream does not yield an item within a given time
236 /// span.
237 ///
238 /// Typically timeouts are, as the name implies, based on _time_. However
239 /// this method can time out based on any future. This can be useful in
240 /// combination with channels, as it allows (long-lived) streams to be
241 /// cancelled based on some external event.
242 ///
243 /// When a timeout is returned, the stream will be dropped and destructors
244 /// will be run.
245 ///
246 /// # Example
247 ///
248 /// ```
249 /// use futures_lite::prelude::*;
250 /// use futures_time::prelude::*;
251 /// use futures_time::time::{Instant, Duration};
252 /// use futures_lite::stream;
253 /// use std::io;
254 ///
255 /// async_io::block_on(async {
256 /// let res = stream::once("meow")
257 /// .delay(Duration::from_millis(100)) // longer delay
258 /// .timeout(Duration::from_millis(50)) // shorter timeout
259 /// .next()
260 /// .await;
261 /// assert_eq!(res.unwrap().unwrap_err().kind(), io::ErrorKind::TimedOut); // error
262 ///
263 /// let res = stream::once("meow")
264 /// .delay(Duration::from_millis(50)) // shorter delay
265 /// .timeout(Duration::from_millis(100)) // longer timeout
266 /// .next()
267 /// .await;
268 /// assert_eq!(res.unwrap().unwrap(), "meow"); // success
269 /// });
270 /// ```
271 fn timeout<D>(self, deadline: D) -> Timeout<Self, D::IntoFuture>
272 where
273 Self: Sized,
274 D: IntoFuture,
275 D::IntoFuture: Timer,
276 {
277 Timeout::new(self, deadline.into_future())
278 }
279}
280
281impl<S> StreamExt for S where S: Stream {}