futures_rx/
stream_ext.rs

1use std::{collections::VecDeque, future::Future, hash::Hash, vec::IntoIter};
2
3use buffer::Buffer;
4use debounce::Debounce;
5use delay_every::DelayEvery;
6use dematerialize::Dematerialize;
7use distinct::Distinct;
8use distinct_until_changed::DistinctUntilChanged;
9use futures::{stream::Iter, Stream};
10use inspect_done::InspectDone;
11use materialize::Materialize;
12use pairwise::Pairwise;
13use race::Race;
14use sample::Sample;
15use share::Shared;
16use start_with::StartWith;
17use switch_map::SwitchMap;
18use timing::{Timed, Timing};
19use window::Window;
20
21use crate::{
22    BehaviorSubject, CombineLatest2, Event, EventLite, Notification, PublishSubject, ReplaySubject,
23};
24
25use self::{delay::Delay, end_with::EndWith, throttle::Throttle};
26
27pub mod buffer;
28pub mod debounce;
29pub mod delay;
30pub mod delay_every;
31pub mod dematerialize;
32pub mod distinct;
33pub mod distinct_until_changed;
34pub mod end_with;
35pub mod inspect_done;
36pub mod materialize;
37pub mod pairwise;
38pub mod race;
39pub mod sample;
40pub mod share;
41pub mod start_with;
42pub mod switch_map;
43pub mod throttle;
44pub mod timing;
45pub mod window;
46
47impl<T: ?Sized> RxExt for T where T: Stream {}
48pub trait RxExt: Stream {
49    /// Starts polling itself as well as the provided other `Stream`.
50    /// The first one to emit an event "wins" and proceeds to emit all its next events.
51    /// The "loser" is discarded and will not be polled further.
52    ///
53    /// Note that this function consumes the stream passed into it and returns a
54    /// wrapped version of it.
55    ///
56    /// # Examples
57    ///
58    /// ```
59    /// # futures::executor::block_on(async {
60    /// use futures::stream::{self, StreamExt};
61    /// use futures_rx::{Notification, RxExt};
62    ///
63    /// let stream = stream::iter(0..=3);
64    /// let slower_stream = stream::iter(4..=6).delay(|| async { /* return delayed over time */ });
65    /// let stream = stream.race(slower_stream);
66    ///
67    /// assert_eq!(vec![0, 1, 2, 3], stream.collect::<Vec<_>>().await);
68    /// # });
69    ///
70    /// #
71    /// ```
72    fn race<S: Stream<Item = Self::Item>>(self, other: S) -> Race<Self, S, Self::Item>
73    where
74        Self: Sized,
75    {
76        assert_stream::<Self::Item, _>(Race::new(self, other))
77    }
78
79    /// Precedes all emitted events with the items of an iter.
80    ///
81    /// Note that this function consumes the stream passed into it and returns a
82    /// wrapped version of it.
83    ///
84    /// # Examples
85    ///
86    /// ```
87    /// # futures::executor::block_on(async {
88    /// use futures::stream::{self, StreamExt};
89    /// use futures_rx::{Notification, RxExt};
90    ///
91    /// let stream = stream::iter(4..=6);
92    /// let stream = stream.start_with(0..=3);
93    ///
94    /// assert_eq!(vec![0, 1, 2, 3, 4, 5, 6], stream.collect::<Vec<_>>().await);
95    /// # });
96    ///
97    /// #
98    /// ```
99    fn start_with<I: IntoIterator<Item = Self::Item>>(self, iter: I) -> StartWith<Self>
100    where
101        Self: Sized,
102    {
103        assert_stream::<Self::Item, _>(StartWith::new(self, iter))
104    }
105
106    /// Follows all emitted events with the items of an iter.
107    ///
108    /// Note that this function consumes the stream passed into it and returns a
109    /// wrapped version of it.
110    ///
111    /// # Examples
112    ///
113    /// ```
114    /// # futures::executor::block_on(async {
115    /// use futures::stream::{self, StreamExt};
116    /// use futures_rx::{Notification, RxExt};
117    ///
118    /// let stream = stream::iter(0..=3);
119    /// let stream = stream.end_with(4..=6);
120    ///
121    /// assert_eq!(vec![0, 1, 2, 3, 4, 5, 6], stream.collect::<Vec<_>>().await);
122    /// # });
123    ///
124    /// #
125    /// ```
126    fn end_with<I: IntoIterator<Item = Self::Item>>(self, iter: I) -> EndWith<Self>
127    where
128        Self: Sized,
129    {
130        assert_stream::<Self::Item, _>(EndWith::new(self, iter))
131    }
132
133    /// Transforms a `Stream` into a broadcast one, which can be subscribed to more than once, after cloning the shared version.
134    ///
135    /// Behavior is exactly like a `PublishSubject`, every new subscription will produce a unique `Stream` which only emits `Event` objects.
136    /// An `Event` is a helper object which wraps a ref counted value.
137    ///
138    /// Note that this function consumes the stream passed into it and returns a
139    /// wrapped version of it.
140    ///
141    /// # Examples
142    ///
143    /// ```
144    /// # futures::executor::block_on(async {
145    /// use futures::{stream::{StreamExt, self}, future::join};
146    /// use futures_rx::{Notification, RxExt};
147    ///
148    /// let stream = stream::iter(0..=3);
149    /// let stream = stream.share();
150    /// let sub_stream_a = stream.clone().map(|event| *event); // an event is Event here, wrapping a ref counted value
151    /// let sub_stream_b = stream.clone().map(|event| *event); // which we can just deref in this case to i32
152    ///
153    /// assert_eq!((vec![0, 1, 2, 3], vec![0, 1, 2, 3]), join(sub_stream_a.collect::<Vec<_>>(), sub_stream_b.collect::<Vec<_>>()).await);
154    /// # });
155    ///
156    /// #
157    /// ```
158    fn share(self) -> Shared<Self, PublishSubject<Self::Item>>
159    where
160        Self: Sized,
161    {
162        assert_stream::<Event<Self::Item>, _>(Shared::new(self, PublishSubject::new()))
163    }
164
165    /// Transforms a `Stream` into a broadcast one, which can be subscribed to more than once, after cloning the shared version.
166    ///
167    /// Behavior is exactly like a `BehaviorSubject`, where every new subscription will always receive the last emitted event
168    /// from the parent `Stream` first.
169    /// Every new subscription will produce a unique `Stream` which only emits `Event` objects.
170    /// An `Event` is a helper object which wraps a ref counted value.
171    ///
172    /// Note that this function consumes the stream passed into it and returns a
173    /// wrapped version of it.
174    ///
175    /// # Examples
176    ///
177    /// ```
178    /// # futures::executor::block_on(async {
179    /// use futures::{stream::{StreamExt, self}, future::join};
180    /// use futures_rx::{Notification, RxExt};
181    ///
182    /// let stream = stream::iter(1..=3);
183    /// let stream = stream.share_behavior(0);
184    ///
185    /// stream.clone().collect::<Vec<_>>().await; // consume all events beforehand
186    ///
187    /// let sub_stream_a = stream.clone().map(|event| *event); // an event is Event here, wrapping a ref counted value
188    /// let sub_stream_b = stream.clone().map(|event| *event); // which we can just deref in this case to i32
189    ///
190    /// assert_eq!(
191    ///     (vec![3], vec![3]),
192    ///     join(
193    ///         sub_stream_a.collect::<Vec<_>>(),
194    ///         sub_stream_b.collect::<Vec<_>>()
195    ///     )
196    ///     .await
197    /// );
198    /// # });
199    ///
200    /// #
201    /// ```
202    fn share_behavior(self, initial_value: Self::Item) -> Shared<Self, BehaviorSubject<Self::Item>>
203    where
204        Self: Sized,
205    {
206        assert_stream::<Event<Self::Item>, _>(Shared::new(
207            self,
208            BehaviorSubject::new(initial_value),
209        ))
210    }
211
212    /// Transforms a `Stream` into a broadcast one, which can be subscribed to more than once, after cloning the shared version.
213    ///
214    /// Behavior is exactly like a `ReplaySubject`, where every new subscription will always receive all previously emitted events
215    /// from the parent `Stream` first.
216    /// Every new subscription will produce a unique `Stream` which only emits `Event` objects.
217    /// An `Event` is a helper object which wraps a ref counted value.
218    ///
219    /// Note that this function consumes the stream passed into it and returns a
220    /// wrapped version of it.
221    ///
222    /// # Examples
223    ///
224    /// ```
225    /// # futures::executor::block_on(async {
226    /// use futures::{stream::{StreamExt, self}, future::join};
227    /// use futures_rx::{Notification, RxExt};
228    ///
229    /// let stream = stream::iter(0..=3);
230    /// let stream = stream.share_replay();
231    ///
232    /// stream.clone().collect::<Vec<_>>().await; // consume all events beforehand
233    ///
234    /// let sub_stream_a = stream.clone().map(|event| *event); // an event is Event here, wrapping a ref counted value
235    /// let sub_stream_b = stream.clone().map(|event| *event); // which we can just deref in this case to i32
236    ///
237    /// assert_eq!(
238    ///     (vec![0, 1, 2, 3], vec![0, 1, 2, 3]),
239    ///     join(
240    ///         sub_stream_a.collect::<Vec<_>>(),
241    ///         sub_stream_b.collect::<Vec<_>>()
242    ///     )
243    ///     .await
244    /// );
245    /// # });
246    ///
247    /// #
248    /// ```
249    fn share_replay(self) -> Shared<Self, ReplaySubject<Self::Item>>
250    where
251        Self: Sized,
252    {
253        assert_stream::<Event<Self::Item>, _>(Shared::new(self, ReplaySubject::new()))
254    }
255
256    /// Like `flat_map`, except that switched `Stream` is interrupted when the parent `Stream` emits a next event.
257    ///
258    /// Note that this function consumes the stream passed into it and returns a
259    /// wrapped version of it.
260    ///
261    /// # Examples
262    ///
263    /// ```
264    /// # futures::executor::block_on(async {
265    /// use futures::stream::{self, StreamExt};
266    /// use futures_rx::{Notification, RxExt};
267    ///
268    /// let stream = stream::iter(0..=3);
269    /// let stream = stream.switch_map(|event| stream::iter([event + 10, event - 10]));
270    ///
271    /// assert_eq!(vec![10, 11, 12, 13, -7], stream.collect::<Vec<_>>().await);
272    /// # });
273    ///
274    /// #
275    /// ```
276    fn switch_map<S: Stream, F: FnMut(Self::Item) -> S>(self, f: F) -> SwitchMap<Self, S, F>
277    where
278        Self: Sized,
279    {
280        assert_stream::<<F::Output as Stream>::Item, _>(SwitchMap::new(self, f))
281    }
282
283    /// Emits pairs of the previous and next events as a tuple.
284    ///
285    /// Note that this function consumes the stream passed into it and returns a
286    /// wrapped version of it.
287    ///
288    /// The next value in the tuple is a value reference, and therefore wrapped inside an `Event` struct.
289    /// An `Event` is a helper object for ref counted events.
290    /// As the next event will also need to be emitted as the previous event in the next pair,
291    /// it is first made available as next using a ref count - `Event`.
292    ///
293    /// # Examples
294    ///
295    /// ```
296    /// # futures::executor::block_on(async {
297    /// use futures::stream::{self, StreamExt};
298    /// use futures_rx::{Notification, RxExt};
299    ///
300    /// let stream = stream::iter(0..=3);
301    /// let stream = stream.pairwise();
302    /// let stream = stream.map(|(prev, next)| (prev, *next)); // we can deref here to i32
303    ///
304    /// assert_eq!(vec![(0, 1), (1, 2), (2, 3)], stream.collect::<Vec<_>>().await);
305    /// # });
306    ///
307    /// #
308    /// ```
309    fn pairwise(self) -> Pairwise<Self>
310    where
311        Self: Sized,
312    {
313        assert_stream::<(Self::Item, EventLite<Self::Item>), _>(Pairwise::new(self))
314    }
315
316    /// Delays events using a debounce time window.
317    /// The event will emit when this window closes and when no other event
318    /// was emitted while this window was open.
319    ///
320    /// The provided closure is executed over all elements of this stream as
321    /// they are made available. It is executed inline with calls to
322    /// [`poll_next`](Stream::poll_next).
323    ///
324    /// The debounce window resets on every newly emitted event.
325    /// On next, the closure is invoked and a reference to the event is passed.
326    /// The closure needs to return a `Future`, which represents the next debounce window over time.
327    ///
328    /// Note that this function consumes the stream passed into it and returns a
329    /// wrapped version of it.
330    fn debounce<Fut: Future, F: FnMut(&Self::Item) -> Fut>(self, f: F) -> Debounce<Self, Fut, F>
331    where
332        Self: Sized,
333    {
334        assert_stream::<Self::Item, _>(Debounce::new(self, f))
335    }
336
337    /// Creates a new interval from the closure, whenever a new event is emitted from the parent `Stream`.
338    /// This event is immediately emitted, however for as long as the interval is now open, no
339    /// subsequent events will be emitted.
340    ///
341    /// When the interval closes and the parent `Stream` emits a new event, this
342    /// process repeats.
343    ///
344    /// The provided closure is executed over all elements of this stream as
345    /// they are made available. It is executed inline with calls to
346    /// [`poll_next`](Stream::poll_next).
347    ///
348    /// Note that this function consumes the stream passed into it and returns a
349    /// wrapped version of it.
350    ///
351    /// See also `sample`
352    fn throttle<Fut: Future, F: FnMut(&Self::Item) -> Fut>(self, f: F) -> Throttle<Self, Fut, F>
353    where
354        Self: Sized,
355    {
356        assert_stream::<Self::Item, _>(Throttle::new(self, f, throttle::ThrottleConfig::Leading))
357    }
358
359    /// Like `throttle`, but only emitting trailing items.
360    fn throttle_trailing<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
361        self,
362        f: F,
363    ) -> Throttle<Self, Fut, F>
364    where
365        Self: Sized,
366    {
367        assert_stream::<Self::Item, _>(Throttle::new(self, f, throttle::ThrottleConfig::Trailing))
368    }
369
370    /// Like `throttle`, but emitting both leading and trailing items.
371    fn throttle_all<Fut: Future, F: FnMut(&Self::Item) -> Fut>(self, f: F) -> Throttle<Self, Fut, F>
372    where
373        Self: Sized,
374    {
375        assert_stream::<Self::Item, _>(Throttle::new(self, f, throttle::ThrottleConfig::All))
376    }
377
378    /// Creates chunks of buffered data.
379    ///
380    /// The provided closure is executed over all elements of this stream as
381    /// they are made available. It is executed inline with calls to
382    /// [`poll_next`](Stream::poll_next).
383    ///
384    /// You can use a reference to the current event, or the count of the current buffer
385    /// to determine when a chunk should close and emit next.
386    ///
387    /// Note that this function consumes the stream passed into it and returns a
388    /// wrapped version of it.
389    ///
390    /// # Examples
391    ///
392    /// ```
393    /// # futures::executor::block_on(async {
394    /// use std::collections::VecDeque;
395    ///
396    /// use futures::stream::{self, StreamExt};
397    /// use futures_rx::RxExt;
398    ///
399    /// let stream = stream::iter(0..9);
400    /// let stream = stream.buffer(|_, count| async move { count == 3 });
401    ///
402    /// assert_eq!(
403    ///     vec![
404    ///         VecDeque::from_iter([0, 1, 2]),
405    ///         VecDeque::from_iter([3, 4, 5]),
406    ///         VecDeque::from_iter([6, 7, 8])
407    ///     ],
408    ///     stream.collect::<Vec<_>>().await
409    /// );
410    /// # });
411    ///
412    /// #
413    /// ```
414    fn buffer<Fut: Future<Output = bool>, F: FnMut(&Self::Item, usize) -> Fut>(
415        self,
416        f: F,
417    ) -> Buffer<Self, Fut, F>
418    where
419        Self: Sized,
420    {
421        assert_stream::<VecDeque<Self::Item>, _>(Buffer::new(self, f))
422    }
423
424    /// Creates chunks of buffered data as new `Stream`s.
425    ///
426    /// The provided closure is executed over all elements of this stream as
427    /// they are made available. It is executed inline with calls to
428    /// [`poll_next`](Stream::poll_next).
429    ///
430    /// You can use a reference to the current event, or the count of the current buffer
431    /// to determine when a chunk should close and emit next.
432    ///
433    /// Note that this function consumes the stream passed into it and returns a
434    /// wrapped version of it.
435    ///
436    /// # Examples
437    ///
438    /// ```
439    /// # futures::executor::block_on(async {
440    /// use futures::stream::{self, StreamExt};
441    /// use futures_rx::RxExt;
442    ///
443    /// let stream = stream::iter(0..9);
444    /// let stream = stream.window(|_, count| async move { count == 3 }).flat_map(|it| it);
445    ///
446    /// assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7, 8], stream.collect::<Vec<_>>().await);
447    /// # });
448    ///
449    /// #
450    /// ```
451    fn window<Fut: Future<Output = bool>, F: FnMut(&Self::Item, usize) -> Fut>(
452        self,
453        f: F,
454    ) -> Window<Self, Fut, F>
455    where
456        Self: Sized,
457    {
458        assert_stream::<Iter<IntoIter<Self::Item>>, _>(Window::new(self, f))
459    }
460
461    /// Ensures that all emitted events are unique.
462    /// Events are required to implement `Hash`.
463    ///
464    /// Note that this function consumes the stream passed into it and returns a
465    /// wrapped version of it.
466    ///
467    /// # Examples
468    ///
469    /// ```
470    /// # futures::executor::block_on(async {
471    /// use futures::stream::{self, StreamExt};
472    /// use futures_rx::RxExt;
473    ///
474    /// let stream = stream::iter([1, 2, 1, 3, 2, 2, 1, 4]);
475    /// let stream = stream.distinct();
476    ///
477    /// assert_eq!(vec![1, 2, 3, 4], stream.collect::<Vec<_>>().await);
478    /// # });
479    ///
480    /// #
481    /// ```
482    fn distinct(self) -> Distinct<Self>
483    where
484        Self: Sized,
485        Self::Item: Hash,
486    {
487        assert_stream::<Self::Item, _>(Distinct::new(self))
488    }
489
490    /// Ensures that all emitted events are unique within immediate sequence.
491    /// Events are required to implement `Hash`.
492    ///
493    /// Note that this function consumes the stream passed into it and returns a
494    /// wrapped version of it.
495    ///
496    /// # Examples
497    ///
498    /// ```
499    /// # futures::executor::block_on(async {
500    /// use futures::stream::{self, StreamExt};
501    /// use futures_rx::RxExt;
502    ///
503    /// let stream = stream::iter([1, 1, 1, 2, 2, 2, 3, 1, 1]);
504    /// let stream = stream.distinct_until_changed();
505    ///
506    /// assert_eq!(vec![1, 2, 3, 1], stream.collect::<Vec<_>>().await);
507    /// # });
508    ///
509    /// #
510    /// ```
511    fn distinct_until_changed(self) -> DistinctUntilChanged<Self>
512    where
513        Self: Sized,
514        Self::Item: Hash,
515    {
516        assert_stream::<Self::Item, _>(DistinctUntilChanged::new(self))
517    }
518
519    /// Converts all events of a `Stream` into `Notification` events.
520    /// When the `Stream` is done, it will first emit a final `Notification::Complete` event.
521    ///
522    /// Note that this function consumes the stream passed into it and returns a
523    /// wrapped version of it.
524    ///
525    /// # Examples
526    ///
527    /// ```
528    /// # futures::executor::block_on(async {
529    /// use futures::stream::{self, StreamExt};
530    /// use futures_rx::{Notification, RxExt};
531    ///
532    /// let stream = stream::iter(0..=3);
533    /// let stream = stream.materialize();
534    ///
535    /// assert_eq!(
536    ///     vec![
537    ///         Notification::Next(0),
538    ///         Notification::Next(1),
539    ///         Notification::Next(2),
540    ///         Notification::Next(3),
541    ///         Notification::Complete
542    ///     ],
543    ///     stream.collect::<Vec<_>>().await
544    /// );
545    /// # });
546    ///
547    /// #
548    /// ```
549    fn materialize(self) -> Materialize<Self>
550    where
551        Self: Sized,
552    {
553        assert_stream::<Notification<Self::Item>, _>(Materialize::new(self))
554    }
555
556    /// The inverse of materialize.
557    /// Use this transformer to translate a `Stream` emitting `Notification` events back
558    /// into a `Stream` emitting original events.
559    ///
560    /// Note that this function consumes the stream passed into it and returns a
561    /// wrapped version of it.
562    ///
563    /// # Examples
564    ///
565    /// ```
566    /// # futures::executor::block_on(async {
567    /// use futures::stream::{self, StreamExt};
568    /// use futures_rx::RxExt;
569    ///
570    /// let stream = stream::iter(0..=3);
571    /// let stream = stream.materialize().dematerialize();
572    ///
573    /// assert_eq!(vec![0, 1, 2, 3], stream.collect::<Vec<_>>().await);
574    /// # });
575    ///
576    /// #
577    /// ```
578    fn dematerialize<T>(self) -> Dematerialize<Self, T>
579    where
580        Self: Stream<Item = Notification<T>> + Sized,
581    {
582        assert_stream::<T, _>(Dematerialize::new(self))
583    }
584
585    /// Delays emitting events using an initial time window, provided by a closure.
586    ///
587    /// Note that this function consumes the stream passed into it and returns a
588    /// wrapped version of it.
589    ///
590    /// # Examples
591    ///
592    /// ```
593    /// # futures::executor::block_on(async {
594    /// use futures::stream::{self, StreamExt};
595    /// use futures_rx::RxExt;
596    ///
597    /// let stream = stream::iter(0..=3);
598    /// let stream = stream.delay(|| async { /* return delayed over time */ });
599    ///
600    /// assert_eq!(vec![0, 1, 2, 3], stream.collect::<Vec<_>>().await);
601    /// # });
602    ///
603    /// #
604    /// ```
605    fn delay<Fut: Future, F: FnMut() -> Fut>(self, f: F) -> Delay<Self, Fut, F>
606    where
607        Self: Sized,
608    {
609        assert_stream::<Self::Item, _>(Delay::new(self, f))
610    }
611
612    /// Delays every event using a time window, provided by a closure.
613    ///
614    /// Use max_buffer_size to limit the amount of buffered items that are awaiting
615    /// time window(s) to complete.
616    ///
617    /// Note that this function consumes the stream passed into it and returns a
618    /// wrapped version of it.
619    ///
620    /// # Examples
621    ///
622    /// ```
623    /// # futures::executor::block_on(async {
624    /// use futures::stream::{self, StreamExt};
625    /// use futures_rx::RxExt;
626    ///
627    /// let stream = stream::iter(0..=3);
628    /// let stream = stream.delay_every(|_| async { /* return delayed over time */ }, None);
629    ///
630    /// assert_eq!(vec![0, 1, 2, 3], stream.collect::<Vec<_>>().await);
631    /// # });
632    ///
633    /// #
634    /// ```
635    fn delay_every<Fut: Future, F: FnMut(&Self::Item) -> Fut>(
636        self,
637        f: F,
638        max_buffer_size: Option<usize>,
639    ) -> DelayEvery<Self, Fut, F>
640    where
641        Self: Sized,
642    {
643        assert_stream::<Self::Item, _>(DelayEvery::new(self, f, max_buffer_size))
644    }
645
646    /// Acts just like a `CombineLatest2`, where every next event is a tuple pair
647    /// containing the last emitted events from both `Stream`s.
648    ///
649    /// Note that this function consumes the stream passed into it and returns a
650    /// wrapped version of it.
651    ///
652    /// # Examples
653    ///
654    /// ```
655    /// # futures::executor::block_on(async {
656    /// use futures::stream::{self, StreamExt};
657    /// use futures_rx::RxExt;
658    ///
659    /// let stream = stream::iter(0..=3);
660    /// let stream = stream.with_latest_from(stream::iter(0..=3));
661    ///
662    /// assert_eq!(vec![(0, 0), (1, 1), (2, 2), (3, 3)], stream.collect::<Vec<_>>().await);
663    /// # });
664    ///
665    /// #
666    /// ```
667    fn with_latest_from<S: Stream>(self, stream: S) -> CombineLatest2<Self, S, Self::Item, S::Item>
668    where
669        Self: Sized,
670        Self::Item: ToOwned<Owned = Self::Item>,
671        S::Item: ToOwned<Owned = S::Item>,
672    {
673        assert_stream::<(Self::Item, S::Item), _>(CombineLatest2::new(self, stream))
674    }
675
676    /// Wraps each item into a `Timed` struct.
677    /// This structs hold the actual event, as well as
678    /// a timestamp containing an `Instant` and an elapsed interval
679    /// as `Duration`, relative to the second to last emitted event.
680    ///
681    /// Note that this function consumes the stream passed into it and returns a
682    /// wrapped version of it.
683    fn timing(self) -> Timing<Self>
684    where
685        Self: Sized,
686    {
687        assert_stream::<Timed<Self::Item>, _>(Timing::new(self))
688    }
689
690    /// Similar to `inspect`, except that the closure provided is only ever
691    /// triggered when the `Stream` is done.
692    ///
693    /// Note that this function consumes the stream passed into it and returns a
694    /// wrapped version of it.
695    ///
696    /// # Examples
697    ///
698    /// ```
699    /// # futures::executor::block_on(async {
700    /// use futures::stream::{self, StreamExt};
701    /// use futures_rx::RxExt;
702    ///
703    /// let mut is_done = false;
704    /// stream::iter(0..=8)
705    ///     .inspect_done(|| is_done = true)
706    ///     .collect::<Vec<_>>()
707    ///     .await;
708    ///
709    /// assert!(is_done);
710    /// # });
711    ///
712    /// #
713    /// ```
714    fn inspect_done<F: FnMut()>(self, f: F) -> InspectDone<Self, F>
715    where
716        Self: Sized,
717    {
718        assert_stream::<Self::Item, _>(InspectDone::new(self, f))
719    }
720
721    /// Only emits events whenever the `sampler` emits an event.
722    /// The event emitted is then the last emitted event from the
723    /// source `Stream`.
724    ///
725    /// If the `sampler` triggers before the source `Stream` was
726    /// able to produce a new event, then no event is emitted.
727    ///
728    /// Note that this function consumes the stream passed into it and returns a
729    /// wrapped version of it.
730    ///
731    /// See also `throttle`.
732    fn sample<S: Stream>(self, sampler: S) -> Sample<Self, S>
733    where
734        Self: Sized,
735    {
736        assert_stream::<Self::Item, _>(Sample::new(self, sampler))
737    }
738}
739
740pub(crate) fn assert_stream<T, S>(stream: S) -> S
741where
742    S: Stream<Item = T>,
743{
744    stream
745}