futures_rx/stream_ext.rs
1use std::{collections::VecDeque, future::Future, hash::Hash, vec::IntoIter};
2
3use buffer::Buffer;
4use debounce::Debounce;
5use delay_every::DelayEvery;
6use dematerialize::Dematerialize;
7use distinct::Distinct;
8use distinct_until_changed::DistinctUntilChanged;
9use futures::{stream::Iter, Stream};
10use inspect_done::InspectDone;
11use materialize::Materialize;
12use pairwise::Pairwise;
13use race::Race;
14use sample::Sample;
15use share::Shared;
16use start_with::StartWith;
17use switch_map::SwitchMap;
18use timing::{Timed, Timing};
19use window::Window;
20
21use crate::{
22 BehaviorSubject, CombineLatest2, Event, EventLite, Notification, PublishSubject, ReplaySubject,
23};
24
25use self::{delay::Delay, end_with::EndWith, throttle::Throttle};
26
27pub mod buffer;
28pub mod debounce;
29pub mod delay;
30pub mod delay_every;
31pub mod dematerialize;
32pub mod distinct;
33pub mod distinct_until_changed;
34pub mod end_with;
35pub mod inspect_done;
36pub mod materialize;
37pub mod pairwise;
38pub mod race;
39pub mod sample;
40pub mod share;
41pub mod start_with;
42pub mod switch_map;
43pub mod throttle;
44pub mod timing;
45pub mod window;
46
47impl<T: ?Sized> RxExt for T where T: Stream {}
48pub trait RxExt: Stream {
49 /// Starts polling itself as well as the provided other `Stream`.
50 /// The first one to emit an event "wins" and proceeds to emit all its next events.
51 /// The "loser" is discarded and will not be polled further.
52 ///
53 /// Note that this function consumes the stream passed into it and returns a
54 /// wrapped version of it.
55 ///
56 /// # Examples
57 ///
58 /// ```
59 /// # futures::executor::block_on(async {
60 /// use futures::stream::{self, StreamExt};
61 /// use futures_rx::{Notification, RxExt};
62 ///
63 /// let stream = stream::iter(0..=3);
64 /// let slower_stream = stream::iter(4..=6).delay(|| async { /* return delayed over time */ });
65 /// let stream = stream.race(slower_stream);
66 ///
67 /// assert_eq!(vec![0, 1, 2, 3], stream.collect::<Vec<_>>().await);
68 /// # });
69 ///
70 /// #
71 /// ```
72 fn race<S: Stream<Item = Self::Item>>(self, other: S) -> Race<Self, S, Self::Item>
73 where
74 Self: Sized,
75 {
76 assert_stream::<Self::Item, _>(Race::new(self, other))
77 }
78
79 /// Precedes all emitted events with the items of an iter.
80 ///
81 /// Note that this function consumes the stream passed into it and returns a
82 /// wrapped version of it.
83 ///
84 /// # Examples
85 ///
86 /// ```
87 /// # futures::executor::block_on(async {
88 /// use futures::stream::{self, StreamExt};
89 /// use futures_rx::{Notification, RxExt};
90 ///
91 /// let stream = stream::iter(4..=6);
92 /// let stream = stream.start_with(0..=3);
93 ///
94 /// assert_eq!(vec![0, 1, 2, 3, 4, 5, 6], stream.collect::<Vec<_>>().await);
95 /// # });
96 ///
97 /// #
98 /// ```
99 fn start_with<I: IntoIterator<Item = Self::Item>>(self, iter: I) -> StartWith<Self>
100 where
101 Self: Sized,
102 {
103 assert_stream::<Self::Item, _>(StartWith::new(self, iter))
104 }
105
106 /// Follows all emitted events with the items of an iter.
107 ///
108 /// Note that this function consumes the stream passed into it and returns a
109 /// wrapped version of it.
110 ///
111 /// # Examples
112 ///
113 /// ```
114 /// # futures::executor::block_on(async {
115 /// use futures::stream::{self, StreamExt};
116 /// use futures_rx::{Notification, RxExt};
117 ///
118 /// let stream = stream::iter(0..=3);
119 /// let stream = stream.end_with(4..=6);
120 ///
121 /// assert_eq!(vec![0, 1, 2, 3, 4, 5, 6], stream.collect::<Vec<_>>().await);
122 /// # });
123 ///
124 /// #
125 /// ```
126 fn end_with<I: IntoIterator<Item = Self::Item>>(self, iter: I) -> EndWith<Self>
127 where
128 Self: Sized,
129 {
130 assert_stream::<Self::Item, _>(EndWith::new(self, iter))
131 }
132
133 /// Transforms a `Stream` into a broadcast one, which can be subscribed to more than once, after cloning the shared version.
134 ///
135 /// Behavior is exactly like a `PublishSubject`, every new subscription will produce a unique `Stream` which only emits `Event` objects.
136 /// An `Event` is a helper object which wraps a ref counted value.
137 ///
138 /// Note that this function consumes the stream passed into it and returns a
139 /// wrapped version of it.
140 ///
141 /// # Examples
142 ///
143 /// ```
144 /// # futures::executor::block_on(async {
145 /// use futures::{stream::{StreamExt, self}, future::join};
146 /// use futures_rx::{Notification, RxExt};
147 ///
148 /// let stream = stream::iter(0..=3);
149 /// let stream = stream.share();
150 /// let sub_stream_a = stream.clone().map(|event| *event); // an event is Event here, wrapping a ref counted value
151 /// let sub_stream_b = stream.clone().map(|event| *event); // which we can just deref in this case to i32
152 ///
153 /// assert_eq!((vec![0, 1, 2, 3], vec![0, 1, 2, 3]), join(sub_stream_a.collect::<Vec<_>>(), sub_stream_b.collect::<Vec<_>>()).await);
154 /// # });
155 ///
156 /// #
157 /// ```
158 fn share(self) -> Shared<Self, PublishSubject<Self::Item>>
159 where
160 Self: Sized,
161 {
162 assert_stream::<Event<Self::Item>, _>(Shared::new(self, PublishSubject::new()))
163 }
164
165 /// Transforms a `Stream` into a broadcast one, which can be subscribed to more than once, after cloning the shared version.
166 ///
167 /// Behavior is exactly like a `BehaviorSubject`, where every new subscription will always receive the last emitted event
168 /// from the parent `Stream` first.
169 /// Every new subscription will produce a unique `Stream` which only emits `Event` objects.
170 /// An `Event` is a helper object which wraps a ref counted value.
171 ///
172 /// Note that this function consumes the stream passed into it and returns a
173 /// wrapped version of it.
174 ///
175 /// # Examples
176 ///
177 /// ```
178 /// # futures::executor::block_on(async {
179 /// use futures::{stream::{StreamExt, self}, future::join};
180 /// use futures_rx::{Notification, RxExt};
181 ///
182 /// let stream = stream::iter(1..=3);
183 /// let stream = stream.share_behavior(0);
184 ///
185 /// stream.clone().collect::<Vec<_>>().await; // consume all events beforehand
186 ///
187 /// let sub_stream_a = stream.clone().map(|event| *event); // an event is Event here, wrapping a ref counted value
188 /// let sub_stream_b = stream.clone().map(|event| *event); // which we can just deref in this case to i32
189 ///
190 /// assert_eq!(
191 /// (vec![3], vec![3]),
192 /// join(
193 /// sub_stream_a.collect::<Vec<_>>(),
194 /// sub_stream_b.collect::<Vec<_>>()
195 /// )
196 /// .await
197 /// );
198 /// # });
199 ///
200 /// #
201 /// ```
202 fn share_behavior(self, initial_value: Self::Item) -> Shared<Self, BehaviorSubject<Self::Item>>
203 where
204 Self: Sized,
205 {
206 assert_stream::<Event<Self::Item>, _>(Shared::new(
207 self,
208 BehaviorSubject::new(initial_value),
209 ))
210 }
211
212 /// Transforms a `Stream` into a broadcast one, which can be subscribed to more than once, after cloning the shared version.
213 ///
214 /// Behavior is exactly like a `ReplaySubject`, where every new subscription will always receive all previously emitted events
215 /// from the parent `Stream` first.
216 /// Every new subscription will produce a unique `Stream` which only emits `Event` objects.
217 /// An `Event` is a helper object which wraps a ref counted value.
218 ///
219 /// Note that this function consumes the stream passed into it and returns a
220 /// wrapped version of it.
221 ///
222 /// # Examples
223 ///
224 /// ```
225 /// # futures::executor::block_on(async {
226 /// use futures::{stream::{StreamExt, self}, future::join};
227 /// use futures_rx::{Notification, RxExt};
228 ///
229 /// let stream = stream::iter(0..=3);
230 /// let stream = stream.share_replay();
231 ///
232 /// stream.clone().collect::<Vec<_>>().await; // consume all events beforehand
233 ///
234 /// let sub_stream_a = stream.clone().map(|event| *event); // an event is Event here, wrapping a ref counted value
235 /// let sub_stream_b = stream.clone().map(|event| *event); // which we can just deref in this case to i32
236 ///
237 /// assert_eq!(
238 /// (vec![0, 1, 2, 3], vec![0, 1, 2, 3]),
239 /// join(
240 /// sub_stream_a.collect::<Vec<_>>(),
241 /// sub_stream_b.collect::<Vec<_>>()
242 /// )
243 /// .await
244 /// );
245 /// # });
246 ///
247 /// #
248 /// ```
249 fn share_replay(self) -> Shared<Self, ReplaySubject<Self::Item>>
250 where
251 Self: Sized,
252 {
253 assert_stream::<Event<Self::Item>, _>(Shared::new(self, ReplaySubject::new()))
254 }
255
256 /// Like `flat_map`, except that switched `Stream` is interrupted when the parent `Stream` emits a next event.
257 ///
258 /// Note that this function consumes the stream passed into it and returns a
259 /// wrapped version of it.
260 ///
261 /// # Examples
262 ///
263 /// ```
264 /// # futures::executor::block_on(async {
265 /// use futures::stream::{self, StreamExt};
266 /// use futures_rx::{Notification, RxExt};
267 ///
268 /// let stream = stream::iter(0..=3);
269 /// let stream = stream.switch_map(|event| stream::iter([event + 10, event - 10]));
270 ///
271 /// assert_eq!(vec![10, 11, 12, 13, -7], stream.collect::<Vec<_>>().await);
272 /// # });
273 ///
274 /// #
275 /// ```
276 fn switch_map<S: Stream, F: FnMut(Self::Item) -> S>(self, f: F) -> SwitchMap<Self, S, F>
277 where
278 Self: Sized,
279 {
280 assert_stream::<<F::Output as Stream>::Item, _>(SwitchMap::new(self, f))
281 }
282
283 /// Emits pairs of the previous and next events as a tuple.
284 ///
285 /// Note that this function consumes the stream passed into it and returns a
286 /// wrapped version of it.
287 ///
288 /// The next value in the tuple is a value reference, and therefore wrapped inside an `Event` struct.
289 /// An `Event` is a helper object for ref counted events.
290 /// As the next event will also need to be emitted as the previous event in the next pair,
291 /// it is first made available as next using a ref count - `Event`.
292 ///
293 /// # Examples
294 ///
295 /// ```
296 /// # futures::executor::block_on(async {
297 /// use futures::stream::{self, StreamExt};
298 /// use futures_rx::{Notification, RxExt};
299 ///
300 /// let stream = stream::iter(0..=3);
301 /// let stream = stream.pairwise();
302 /// let stream = stream.map(|(prev, next)| (prev, *next)); // we can deref here to i32
303 ///
304 /// assert_eq!(vec![(0, 1), (1, 2), (2, 3)], stream.collect::<Vec<_>>().await);
305 /// # });
306 ///
307 /// #
308 /// ```
309 fn pairwise(self) -> Pairwise<Self>
310 where
311 Self: Sized,
312 {
313 assert_stream::<(Self::Item, EventLite<Self::Item>), _>(Pairwise::new(self))
314 }
315
316 /// Delays events using a debounce time window.
317 /// The event will emit when this window closes and when no other event
318 /// was emitted while this window was open.
319 ///
320 /// The provided closure is executed over all elements of this stream as
321 /// they are made available. It is executed inline with calls to
322 /// [`poll_next`](Stream::poll_next).
323 ///
324 /// The debounce window resets on every newly emitted event.
325 /// On next, the closure is invoked and a reference to the event is passed.
326 /// The closure needs to return a `Future`, which represents the next debounce window over time.
327 ///
328 /// Note that this function consumes the stream passed into it and returns a
329 /// wrapped version of it.
330 fn debounce<Fut: Future, F: FnMut(&Self::Item) -> Fut>(self, f: F) -> Debounce<Self, Fut, F>
331 where
332 Self: Sized,
333 {
334 assert_stream::<Self::Item, _>(Debounce::new(self, f))
335 }
336
337 /// Creates a new interval from the closure, whenever a new event is emitted from the parent `Stream`.
338 /// This event is immediately emitted, however for as long as the interval is now open, no
339 /// subsequent events will be emitted.
340 ///
341 /// When the interval closes and the parent `Stream` emits a new event, this
342 /// process repeats.
343 ///
344 /// The provided closure is executed over all elements of this stream as
345 /// they are made available. It is executed inline with calls to
346 /// [`poll_next`](Stream::poll_next).
347 ///
348 /// Note that this function consumes the stream passed into it and returns a
349 /// wrapped version of it.
350 ///
351 /// See also `sample`
352 fn throttle<Fut: Future, F: FnMut(&Self::Item) -> Fut>(self, f: F) -> Throttle<Self, Fut, F>
353 where
354 Self: Sized,
355 {
356 assert_stream::<Self::Item, _>(Throttle::new(self, f, throttle::ThrottleConfig::Leading))
357 }
358
359 /// Like `throttle`, but only emitting trailing items.
360 fn throttle_trailing<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
361 self,
362 f: F,
363 ) -> Throttle<Self, Fut, F>
364 where
365 Self: Sized,
366 {
367 assert_stream::<Self::Item, _>(Throttle::new(self, f, throttle::ThrottleConfig::Trailing))
368 }
369
370 /// Like `throttle`, but emitting both leading and trailing items.
371 fn throttle_all<Fut: Future, F: FnMut(&Self::Item) -> Fut>(self, f: F) -> Throttle<Self, Fut, F>
372 where
373 Self: Sized,
374 {
375 assert_stream::<Self::Item, _>(Throttle::new(self, f, throttle::ThrottleConfig::All))
376 }
377
378 /// Creates chunks of buffered data.
379 ///
380 /// The provided closure is executed over all elements of this stream as
381 /// they are made available. It is executed inline with calls to
382 /// [`poll_next`](Stream::poll_next).
383 ///
384 /// You can use a reference to the current event, or the count of the current buffer
385 /// to determine when a chunk should close and emit next.
386 ///
387 /// Note that this function consumes the stream passed into it and returns a
388 /// wrapped version of it.
389 ///
390 /// # Examples
391 ///
392 /// ```
393 /// # futures::executor::block_on(async {
394 /// use std::collections::VecDeque;
395 ///
396 /// use futures::stream::{self, StreamExt};
397 /// use futures_rx::RxExt;
398 ///
399 /// let stream = stream::iter(0..9);
400 /// let stream = stream.buffer(|_, count| async move { count == 3 });
401 ///
402 /// assert_eq!(
403 /// vec![
404 /// VecDeque::from_iter([0, 1, 2]),
405 /// VecDeque::from_iter([3, 4, 5]),
406 /// VecDeque::from_iter([6, 7, 8])
407 /// ],
408 /// stream.collect::<Vec<_>>().await
409 /// );
410 /// # });
411 ///
412 /// #
413 /// ```
414 fn buffer<Fut: Future<Output = bool>, F: FnMut(&Self::Item, usize) -> Fut>(
415 self,
416 f: F,
417 ) -> Buffer<Self, Fut, F>
418 where
419 Self: Sized,
420 {
421 assert_stream::<VecDeque<Self::Item>, _>(Buffer::new(self, f))
422 }
423
424 /// Creates chunks of buffered data as new `Stream`s.
425 ///
426 /// The provided closure is executed over all elements of this stream as
427 /// they are made available. It is executed inline with calls to
428 /// [`poll_next`](Stream::poll_next).
429 ///
430 /// You can use a reference to the current event, or the count of the current buffer
431 /// to determine when a chunk should close and emit next.
432 ///
433 /// Note that this function consumes the stream passed into it and returns a
434 /// wrapped version of it.
435 ///
436 /// # Examples
437 ///
438 /// ```
439 /// # futures::executor::block_on(async {
440 /// use futures::stream::{self, StreamExt};
441 /// use futures_rx::RxExt;
442 ///
443 /// let stream = stream::iter(0..9);
444 /// let stream = stream.window(|_, count| async move { count == 3 }).flat_map(|it| it);
445 ///
446 /// assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7, 8], stream.collect::<Vec<_>>().await);
447 /// # });
448 ///
449 /// #
450 /// ```
451 fn window<Fut: Future<Output = bool>, F: FnMut(&Self::Item, usize) -> Fut>(
452 self,
453 f: F,
454 ) -> Window<Self, Fut, F>
455 where
456 Self: Sized,
457 {
458 assert_stream::<Iter<IntoIter<Self::Item>>, _>(Window::new(self, f))
459 }
460
461 /// Ensures that all emitted events are unique.
462 /// Events are required to implement `Hash`.
463 ///
464 /// Note that this function consumes the stream passed into it and returns a
465 /// wrapped version of it.
466 ///
467 /// # Examples
468 ///
469 /// ```
470 /// # futures::executor::block_on(async {
471 /// use futures::stream::{self, StreamExt};
472 /// use futures_rx::RxExt;
473 ///
474 /// let stream = stream::iter([1, 2, 1, 3, 2, 2, 1, 4]);
475 /// let stream = stream.distinct();
476 ///
477 /// assert_eq!(vec![1, 2, 3, 4], stream.collect::<Vec<_>>().await);
478 /// # });
479 ///
480 /// #
481 /// ```
482 fn distinct(self) -> Distinct<Self>
483 where
484 Self: Sized,
485 Self::Item: Hash,
486 {
487 assert_stream::<Self::Item, _>(Distinct::new(self))
488 }
489
490 /// Ensures that all emitted events are unique within immediate sequence.
491 /// Events are required to implement `Hash`.
492 ///
493 /// Note that this function consumes the stream passed into it and returns a
494 /// wrapped version of it.
495 ///
496 /// # Examples
497 ///
498 /// ```
499 /// # futures::executor::block_on(async {
500 /// use futures::stream::{self, StreamExt};
501 /// use futures_rx::RxExt;
502 ///
503 /// let stream = stream::iter([1, 1, 1, 2, 2, 2, 3, 1, 1]);
504 /// let stream = stream.distinct_until_changed();
505 ///
506 /// assert_eq!(vec![1, 2, 3, 1], stream.collect::<Vec<_>>().await);
507 /// # });
508 ///
509 /// #
510 /// ```
511 fn distinct_until_changed(self) -> DistinctUntilChanged<Self>
512 where
513 Self: Sized,
514 Self::Item: Hash,
515 {
516 assert_stream::<Self::Item, _>(DistinctUntilChanged::new(self))
517 }
518
519 /// Converts all events of a `Stream` into `Notification` events.
520 /// When the `Stream` is done, it will first emit a final `Notification::Complete` event.
521 ///
522 /// Note that this function consumes the stream passed into it and returns a
523 /// wrapped version of it.
524 ///
525 /// # Examples
526 ///
527 /// ```
528 /// # futures::executor::block_on(async {
529 /// use futures::stream::{self, StreamExt};
530 /// use futures_rx::{Notification, RxExt};
531 ///
532 /// let stream = stream::iter(0..=3);
533 /// let stream = stream.materialize();
534 ///
535 /// assert_eq!(
536 /// vec![
537 /// Notification::Next(0),
538 /// Notification::Next(1),
539 /// Notification::Next(2),
540 /// Notification::Next(3),
541 /// Notification::Complete
542 /// ],
543 /// stream.collect::<Vec<_>>().await
544 /// );
545 /// # });
546 ///
547 /// #
548 /// ```
549 fn materialize(self) -> Materialize<Self>
550 where
551 Self: Sized,
552 {
553 assert_stream::<Notification<Self::Item>, _>(Materialize::new(self))
554 }
555
556 /// The inverse of materialize.
557 /// Use this transformer to translate a `Stream` emitting `Notification` events back
558 /// into a `Stream` emitting original events.
559 ///
560 /// Note that this function consumes the stream passed into it and returns a
561 /// wrapped version of it.
562 ///
563 /// # Examples
564 ///
565 /// ```
566 /// # futures::executor::block_on(async {
567 /// use futures::stream::{self, StreamExt};
568 /// use futures_rx::RxExt;
569 ///
570 /// let stream = stream::iter(0..=3);
571 /// let stream = stream.materialize().dematerialize();
572 ///
573 /// assert_eq!(vec![0, 1, 2, 3], stream.collect::<Vec<_>>().await);
574 /// # });
575 ///
576 /// #
577 /// ```
578 fn dematerialize<T>(self) -> Dematerialize<Self, T>
579 where
580 Self: Stream<Item = Notification<T>> + Sized,
581 {
582 assert_stream::<T, _>(Dematerialize::new(self))
583 }
584
585 /// Delays emitting events using an initial time window, provided by a closure.
586 ///
587 /// Note that this function consumes the stream passed into it and returns a
588 /// wrapped version of it.
589 ///
590 /// # Examples
591 ///
592 /// ```
593 /// # futures::executor::block_on(async {
594 /// use futures::stream::{self, StreamExt};
595 /// use futures_rx::RxExt;
596 ///
597 /// let stream = stream::iter(0..=3);
598 /// let stream = stream.delay(|| async { /* return delayed over time */ });
599 ///
600 /// assert_eq!(vec![0, 1, 2, 3], stream.collect::<Vec<_>>().await);
601 /// # });
602 ///
603 /// #
604 /// ```
605 fn delay<Fut: Future, F: FnMut() -> Fut>(self, f: F) -> Delay<Self, Fut, F>
606 where
607 Self: Sized,
608 {
609 assert_stream::<Self::Item, _>(Delay::new(self, f))
610 }
611
612 /// Delays every event using a time window, provided by a closure.
613 ///
614 /// Use max_buffer_size to limit the amount of buffered items that are awaiting
615 /// time window(s) to complete.
616 ///
617 /// Note that this function consumes the stream passed into it and returns a
618 /// wrapped version of it.
619 ///
620 /// # Examples
621 ///
622 /// ```
623 /// # futures::executor::block_on(async {
624 /// use futures::stream::{self, StreamExt};
625 /// use futures_rx::RxExt;
626 ///
627 /// let stream = stream::iter(0..=3);
628 /// let stream = stream.delay_every(|_| async { /* return delayed over time */ }, None);
629 ///
630 /// assert_eq!(vec![0, 1, 2, 3], stream.collect::<Vec<_>>().await);
631 /// # });
632 ///
633 /// #
634 /// ```
635 fn delay_every<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
636 self,
637 f: F,
638 max_buffer_size: Option<usize>,
639 ) -> DelayEvery<Self, Fut, F>
640 where
641 Self: Sized,
642 {
643 assert_stream::<Self::Item, _>(DelayEvery::new(self, f, max_buffer_size))
644 }
645
646 /// Acts just like a `CombineLatest2`, where every next event is a tuple pair
647 /// containing the last emitted events from both `Stream`s.
648 ///
649 /// Note that this function consumes the stream passed into it and returns a
650 /// wrapped version of it.
651 ///
652 /// # Examples
653 ///
654 /// ```
655 /// # futures::executor::block_on(async {
656 /// use futures::stream::{self, StreamExt};
657 /// use futures_rx::RxExt;
658 ///
659 /// let stream = stream::iter(0..=3);
660 /// let stream = stream.with_latest_from(stream::iter(0..=3));
661 ///
662 /// assert_eq!(vec![(0, 0), (1, 1), (2, 2), (3, 3)], stream.collect::<Vec<_>>().await);
663 /// # });
664 ///
665 /// #
666 /// ```
667 fn with_latest_from<S: Stream>(self, stream: S) -> CombineLatest2<Self, S, Self::Item, S::Item>
668 where
669 Self: Sized,
670 Self::Item: ToOwned<Owned = Self::Item>,
671 S::Item: ToOwned<Owned = S::Item>,
672 {
673 assert_stream::<(Self::Item, S::Item), _>(CombineLatest2::new(self, stream))
674 }
675
676 /// Wraps each item into a `Timed` struct.
677 /// This structs hold the actual event, as well as
678 /// a timestamp containing an `Instant` and an elapsed interval
679 /// as `Duration`, relative to the second to last emitted event.
680 ///
681 /// Note that this function consumes the stream passed into it and returns a
682 /// wrapped version of it.
683 fn timing(self) -> Timing<Self>
684 where
685 Self: Sized,
686 {
687 assert_stream::<Timed<Self::Item>, _>(Timing::new(self))
688 }
689
690 /// Similar to `inspect`, except that the closure provided is only ever
691 /// triggered when the `Stream` is done.
692 ///
693 /// Note that this function consumes the stream passed into it and returns a
694 /// wrapped version of it.
695 ///
696 /// # Examples
697 ///
698 /// ```
699 /// # futures::executor::block_on(async {
700 /// use futures::stream::{self, StreamExt};
701 /// use futures_rx::RxExt;
702 ///
703 /// let mut is_done = false;
704 /// stream::iter(0..=8)
705 /// .inspect_done(|| is_done = true)
706 /// .collect::<Vec<_>>()
707 /// .await;
708 ///
709 /// assert!(is_done);
710 /// # });
711 ///
712 /// #
713 /// ```
714 fn inspect_done<F: FnMut()>(self, f: F) -> InspectDone<Self, F>
715 where
716 Self: Sized,
717 {
718 assert_stream::<Self::Item, _>(InspectDone::new(self, f))
719 }
720
721 /// Only emits events whenever the `sampler` emits an event.
722 /// The event emitted is then the last emitted event from the
723 /// source `Stream`.
724 ///
725 /// If the `sampler` triggers before the source `Stream` was
726 /// able to produce a new event, then no event is emitted.
727 ///
728 /// Note that this function consumes the stream passed into it and returns a
729 /// wrapped version of it.
730 ///
731 /// See also `throttle`.
732 fn sample<S: Stream>(self, sampler: S) -> Sample<Self, S>
733 where
734 Self: Sized,
735 {
736 assert_stream::<Self::Item, _>(Sample::new(self, sampler))
737 }
738}
739
740pub(crate) fn assert_stream<T, S>(stream: S) -> S
741where
742 S: Stream<Item = T>,
743{
744 stream
745}