tokio_stream/
stream_ext.rs

1use core::future::Future;
2use futures_core::Stream;
3
4mod all;
5use all::AllFuture;
6
7mod any;
8use any::AnyFuture;
9
10mod chain;
11pub use chain::Chain;
12
13pub(crate) mod collect;
14use collect::{Collect, FromStream};
15
16mod filter;
17pub use filter::Filter;
18
19mod filter_map;
20pub use filter_map::FilterMap;
21
22mod fold;
23use fold::FoldFuture;
24
25mod fuse;
26pub use fuse::Fuse;
27
28mod map;
29pub use map::Map;
30
31mod map_while;
32pub use map_while::MapWhile;
33
34mod merge;
35pub use merge::Merge;
36
37mod next;
38use next::Next;
39
40mod skip;
41pub use skip::Skip;
42
43mod skip_while;
44pub use skip_while::SkipWhile;
45
46mod take;
47pub use take::Take;
48
49mod take_while;
50pub use take_while::TakeWhile;
51
52mod then;
53pub use then::Then;
54
55mod try_next;
56use try_next::TryNext;
57
58mod peekable;
59pub use peekable::Peekable;
60
61cfg_time! {
62    pub(crate) mod timeout;
63    pub(crate) mod timeout_repeating;
64    pub use timeout::Timeout;
65    pub use timeout_repeating::TimeoutRepeating;
66    use tokio::time::{Duration, Interval};
67    mod throttle;
68    use throttle::{throttle, Throttle};
69    mod chunks_timeout;
70    pub use chunks_timeout::ChunksTimeout;
71}
72
73/// An extension trait for the [`Stream`] trait that provides a variety of
74/// convenient combinator functions.
75///
76/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
77/// in the [futures] crate, however both Tokio and futures provide separate
78/// `StreamExt` utility traits, and some utilities are only available on one of
79/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
80/// trait in the futures crate.
81///
82/// If you need utilities from both `StreamExt` traits, you should prefer to
83/// import one of them, and use the other through the fully qualified call
84/// syntax. For example:
85/// ```
86/// // import one of the traits:
87/// use futures::stream::StreamExt;
88/// # #[tokio::main(flavor = "current_thread")]
89/// # async fn main() {
90///
91/// let a = tokio_stream::iter(vec![1, 3, 5]);
92/// let b = tokio_stream::iter(vec![2, 4, 6]);
93///
94/// // use the fully qualified call syntax for the other trait:
95/// let merged = tokio_stream::StreamExt::merge(a, b);
96///
97/// // use normal call notation for futures::stream::StreamExt::collect
98/// let output: Vec<_> = merged.collect().await;
99/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
100/// # }
101/// ```
102///
103/// [`Stream`]: crate::Stream
104/// [futures]: https://docs.rs/futures
105/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
106pub trait StreamExt: Stream {
107    /// Consumes and returns the next value in the stream or `None` if the
108    /// stream is finished.
109    ///
110    /// Equivalent to:
111    ///
112    /// ```ignore
113    /// async fn next(&mut self) -> Option<Self::Item>;
114    /// ```
115    ///
116    /// Note that because `next` doesn't take ownership over the stream,
117    /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
118    /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
119    /// be done by boxing the stream using [`Box::pin`] or
120    /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
121    /// crate.
122    ///
123    /// # Cancel safety
124    ///
125    /// This method is cancel safe. The returned future only
126    /// holds onto a reference to the underlying stream,
127    /// so dropping it will never lose a value.
128    ///
129    /// # Examples
130    ///
131    /// ```
132    /// # #[tokio::main(flavor = "current_thread")]
133    /// # async fn main() {
134    /// use tokio_stream::{self as stream, StreamExt};
135    ///
136    /// let mut stream = stream::iter(1..=3);
137    ///
138    /// assert_eq!(stream.next().await, Some(1));
139    /// assert_eq!(stream.next().await, Some(2));
140    /// assert_eq!(stream.next().await, Some(3));
141    /// assert_eq!(stream.next().await, None);
142    /// # }
143    /// ```
144    fn next(&mut self) -> Next<'_, Self>
145    where
146        Self: Unpin,
147    {
148        Next::new(self)
149    }
150
151    /// Consumes and returns the next item in the stream. If an error is
152    /// encountered before the next item, the error is returned instead.
153    ///
154    /// Equivalent to:
155    ///
156    /// ```ignore
157    /// async fn try_next(&mut self) -> Result<Option<T>, E>;
158    /// ```
159    ///
160    /// This is similar to the [`next`](StreamExt::next) combinator,
161    /// but returns a [`Result<Option<T>, E>`](Result) rather than
162    /// an [`Option<Result<T, E>>`](Option), making for easy use
163    /// with the [`?`](std::ops::Try) operator.
164    ///
165    /// # Cancel safety
166    ///
167    /// This method is cancel safe. The returned future only
168    /// holds onto a reference to the underlying stream,
169    /// so dropping it will never lose a value.
170    ///
171    /// # Examples
172    ///
173    /// ```
174    /// # #[tokio::main(flavor = "current_thread")]
175    /// # async fn main() {
176    ///
177    /// use tokio_stream::{self as stream, StreamExt};
178    ///
179    /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
180    ///
181    /// assert_eq!(stream.try_next().await, Ok(Some(1)));
182    /// assert_eq!(stream.try_next().await, Ok(Some(2)));
183    /// assert_eq!(stream.try_next().await, Err("nope"));
184    /// # }
185    /// ```
186    fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
187    where
188        Self: Stream<Item = Result<T, E>> + Unpin,
189    {
190        TryNext::new(self)
191    }
192
193    /// Maps this stream's items to a different type, returning a new stream of
194    /// the resulting type.
195    ///
196    /// The provided closure is executed over all elements of this stream as
197    /// they are made available. It is executed inline with calls to
198    /// [`poll_next`](Stream::poll_next).
199    ///
200    /// Note that this function consumes the stream passed into it and returns a
201    /// wrapped version of it, similar to the existing `map` methods in the
202    /// standard library.
203    ///
204    /// # Examples
205    ///
206    /// ```
207    /// # #[tokio::main(flavor = "current_thread")]
208    /// # async fn main() {
209    /// use tokio_stream::{self as stream, StreamExt};
210    ///
211    /// let stream = stream::iter(1..=3);
212    /// let mut stream = stream.map(|x| x + 3);
213    ///
214    /// assert_eq!(stream.next().await, Some(4));
215    /// assert_eq!(stream.next().await, Some(5));
216    /// assert_eq!(stream.next().await, Some(6));
217    /// # }
218    /// ```
219    fn map<T, F>(self, f: F) -> Map<Self, F>
220    where
221        F: FnMut(Self::Item) -> T,
222        Self: Sized,
223    {
224        Map::new(self, f)
225    }
226
227    /// Map this stream's items to a different type for as long as determined by
228    /// the provided closure. A stream of the target type will be returned,
229    /// which will yield elements until the closure returns `None`.
230    ///
231    /// The provided closure is executed over all elements of this stream as
232    /// they are made available, until it returns `None`. It is executed inline
233    /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
234    /// the underlying stream will not be polled again.
235    ///
236    /// Note that this function consumes the stream passed into it and returns a
237    /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
238    /// standard library.
239    ///
240    /// # Examples
241    ///
242    /// ```
243    /// # #[tokio::main(flavor = "current_thread")]
244    /// # async fn main() {
245    /// use tokio_stream::{self as stream, StreamExt};
246    ///
247    /// let stream = stream::iter(1..=10);
248    /// let mut stream = stream.map_while(|x| {
249    ///     if x < 4 {
250    ///         Some(x + 3)
251    ///     } else {
252    ///         None
253    ///     }
254    /// });
255    /// assert_eq!(stream.next().await, Some(4));
256    /// assert_eq!(stream.next().await, Some(5));
257    /// assert_eq!(stream.next().await, Some(6));
258    /// assert_eq!(stream.next().await, None);
259    /// # }
260    /// ```
261    fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
262    where
263        F: FnMut(Self::Item) -> Option<T>,
264        Self: Sized,
265    {
266        MapWhile::new(self, f)
267    }
268
269    /// Maps this stream's items asynchronously to a different type, returning a
270    /// new stream of the resulting type.
271    ///
272    /// The provided closure is executed over all elements of this stream as
273    /// they are made available, and the returned future is executed. Only one
274    /// future is executed at the time.
275    ///
276    /// Note that this function consumes the stream passed into it and returns a
277    /// wrapped version of it, similar to the existing `then` methods in the
278    /// standard library.
279    ///
280    /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
281    /// returned by this method. To handle this, you can use `tokio::pin!` as in
282    /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
283    ///
284    /// # Examples
285    ///
286    /// ```
287    /// # #[tokio::main(flavor = "current_thread")]
288    /// # async fn main() {
289    /// use tokio_stream::{self as stream, StreamExt};
290    ///
291    /// async fn do_async_work(value: i32) -> i32 {
292    ///     value + 3
293    /// }
294    ///
295    /// let stream = stream::iter(1..=3);
296    /// let stream = stream.then(do_async_work);
297    ///
298    /// tokio::pin!(stream);
299    ///
300    /// assert_eq!(stream.next().await, Some(4));
301    /// assert_eq!(stream.next().await, Some(5));
302    /// assert_eq!(stream.next().await, Some(6));
303    /// # }
304    /// ```
305    fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
306    where
307        F: FnMut(Self::Item) -> Fut,
308        Fut: Future,
309        Self: Sized,
310    {
311        Then::new(self, f)
312    }
313
314    /// Combine two streams into one by interleaving the output of both as it
315    /// is produced.
316    ///
317    /// Values are produced from the merged stream in the order they arrive from
318    /// the two source streams. If both source streams provide values
319    /// simultaneously, the merge stream alternates between them. This provides
320    /// some level of fairness. You should not chain calls to `merge`, as this
321    /// will break the fairness of the merging.
322    ///
323    /// The merged stream completes once **both** source streams complete. When
324    /// one source stream completes before the other, the merge stream
325    /// exclusively polls the remaining stream.
326    ///
327    /// For merging multiple streams, consider using [`StreamMap`] instead.
328    ///
329    /// [`StreamMap`]: crate::StreamMap
330    ///
331    /// # Examples
332    ///
333    /// ```
334    /// use tokio_stream::{StreamExt, Stream};
335    /// use tokio::sync::mpsc;
336    /// use tokio::time;
337    ///
338    /// use std::time::Duration;
339    /// use std::pin::Pin;
340    ///
341    /// # /*
342    /// #[tokio::main]
343    /// # */
344    /// # #[tokio::main(flavor = "current_thread")]
345    /// async fn main() {
346    /// # time::pause();
347    ///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
348    ///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
349    ///
350    ///     // Convert the channels to a `Stream`.
351    ///     let rx1 = Box::pin(async_stream::stream! {
352    ///           while let Some(item) = rx1.recv().await {
353    ///               yield item;
354    ///           }
355    ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
356    ///
357    ///     let rx2 = Box::pin(async_stream::stream! {
358    ///           while let Some(item) = rx2.recv().await {
359    ///               yield item;
360    ///           }
361    ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
362    ///
363    ///     let mut rx = rx1.merge(rx2);
364    ///
365    ///     tokio::spawn(async move {
366    ///         // Send some values immediately
367    ///         tx1.send(1).await.unwrap();
368    ///         tx1.send(2).await.unwrap();
369    ///
370    ///         // Let the other task send values
371    ///         time::sleep(Duration::from_millis(20)).await;
372    ///
373    ///         tx1.send(4).await.unwrap();
374    ///     });
375    ///
376    ///     tokio::spawn(async move {
377    ///         // Wait for the first task to send values
378    ///         time::sleep(Duration::from_millis(5)).await;
379    ///
380    ///         tx2.send(3).await.unwrap();
381    ///
382    ///         time::sleep(Duration::from_millis(25)).await;
383    ///
384    ///         // Send the final value
385    ///         tx2.send(5).await.unwrap();
386    ///     });
387    ///
388    ///    assert_eq!(1, rx.next().await.unwrap());
389    ///    assert_eq!(2, rx.next().await.unwrap());
390    ///    assert_eq!(3, rx.next().await.unwrap());
391    ///    assert_eq!(4, rx.next().await.unwrap());
392    ///    assert_eq!(5, rx.next().await.unwrap());
393    ///
394    ///    // The merged stream is consumed
395    ///    assert!(rx.next().await.is_none());
396    /// }
397    /// ```
398    fn merge<U>(self, other: U) -> Merge<Self, U>
399    where
400        U: Stream<Item = Self::Item>,
401        Self: Sized,
402    {
403        Merge::new(self, other)
404    }
405
406    /// Filters the values produced by this stream according to the provided
407    /// predicate.
408    ///
409    /// As values of this stream are made available, the provided predicate `f`
410    /// will be run against them. If the predicate
411    /// resolves to `true`, then the stream will yield the value, but if the
412    /// predicate resolves to `false`, then the value
413    /// will be discarded and the next value will be produced.
414    ///
415    /// Note that this function consumes the stream passed into it and returns a
416    /// wrapped version of it, similar to [`Iterator::filter`] method in the
417    /// standard library.
418    ///
419    /// # Examples
420    ///
421    /// ```
422    /// # #[tokio::main(flavor = "current_thread")]
423    /// # async fn main() {
424    /// use tokio_stream::{self as stream, StreamExt};
425    ///
426    /// let stream = stream::iter(1..=8);
427    /// let mut evens = stream.filter(|x| x % 2 == 0);
428    ///
429    /// assert_eq!(Some(2), evens.next().await);
430    /// assert_eq!(Some(4), evens.next().await);
431    /// assert_eq!(Some(6), evens.next().await);
432    /// assert_eq!(Some(8), evens.next().await);
433    /// assert_eq!(None, evens.next().await);
434    /// # }
435    /// ```
436    fn filter<F>(self, f: F) -> Filter<Self, F>
437    where
438        F: FnMut(&Self::Item) -> bool,
439        Self: Sized,
440    {
441        Filter::new(self, f)
442    }
443
444    /// Filters the values produced by this stream while simultaneously mapping
445    /// them to a different type according to the provided closure.
446    ///
447    /// As values of this stream are made available, the provided function will
448    /// be run on them. If the predicate `f` resolves to
449    /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
450    /// it resolves to [`None`], then the value will be skipped.
451    ///
452    /// Note that this function consumes the stream passed into it and returns a
453    /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
454    /// standard library.
455    ///
456    /// # Examples
457    /// ```
458    /// # #[tokio::main(flavor = "current_thread")]
459    /// # async fn main() {
460    /// use tokio_stream::{self as stream, StreamExt};
461    ///
462    /// let stream = stream::iter(1..=8);
463    /// let mut evens = stream.filter_map(|x| {
464    ///     if x % 2 == 0 { Some(x + 1) } else { None }
465    /// });
466    ///
467    /// assert_eq!(Some(3), evens.next().await);
468    /// assert_eq!(Some(5), evens.next().await);
469    /// assert_eq!(Some(7), evens.next().await);
470    /// assert_eq!(Some(9), evens.next().await);
471    /// assert_eq!(None, evens.next().await);
472    /// # }
473    /// ```
474    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
475    where
476        F: FnMut(Self::Item) -> Option<T>,
477        Self: Sized,
478    {
479        FilterMap::new(self, f)
480    }
481
482    /// Creates a stream which ends after the first `None`.
483    ///
484    /// After a stream returns `None`, behavior is undefined. Future calls to
485    /// `poll_next` may or may not return `Some(T)` again or they may panic.
486    /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
487    /// return `None` forever.
488    ///
489    /// # Examples
490    ///
491    /// ```
492    /// use tokio_stream::{Stream, StreamExt};
493    ///
494    /// use std::pin::Pin;
495    /// use std::task::{Context, Poll};
496    ///
497    /// // a stream which alternates between Some and None
498    /// struct Alternate {
499    ///     state: i32,
500    /// }
501    ///
502    /// impl Stream for Alternate {
503    ///     type Item = i32;
504    ///
505    ///     fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
506    ///         let val = self.state;
507    ///         self.state = self.state + 1;
508    ///
509    ///         // if it's even, Some(i32), else None
510    ///         if val % 2 == 0 {
511    ///             Poll::Ready(Some(val))
512    ///         } else {
513    ///             Poll::Ready(None)
514    ///         }
515    ///     }
516    /// }
517    ///
518    /// # /*
519    /// #[tokio::main]
520    /// # */
521    /// # #[tokio::main(flavor = "current_thread")]
522    /// async fn main() {
523    ///     let mut stream = Alternate { state: 0 };
524    ///
525    ///     // the stream goes back and forth
526    ///     assert_eq!(stream.next().await, Some(0));
527    ///     assert_eq!(stream.next().await, None);
528    ///     assert_eq!(stream.next().await, Some(2));
529    ///     assert_eq!(stream.next().await, None);
530    ///
531    ///     // however, once it is fused
532    ///     let mut stream = stream.fuse();
533    ///
534    ///     assert_eq!(stream.next().await, Some(4));
535    ///     assert_eq!(stream.next().await, None);
536    ///
537    ///     // it will always return `None` after the first time.
538    ///     assert_eq!(stream.next().await, None);
539    ///     assert_eq!(stream.next().await, None);
540    ///     assert_eq!(stream.next().await, None);
541    /// }
542    /// ```
543    fn fuse(self) -> Fuse<Self>
544    where
545        Self: Sized,
546    {
547        Fuse::new(self)
548    }
549
550    /// Creates a new stream of at most `n` items of the underlying stream.
551    ///
552    /// Once `n` items have been yielded from this stream then it will always
553    /// return that the stream is done.
554    ///
555    /// # Examples
556    ///
557    /// ```
558    /// # #[tokio::main(flavor = "current_thread")]
559    /// # async fn main() {
560    /// use tokio_stream::{self as stream, StreamExt};
561    ///
562    /// let mut stream = stream::iter(1..=10).take(3);
563    ///
564    /// assert_eq!(Some(1), stream.next().await);
565    /// assert_eq!(Some(2), stream.next().await);
566    /// assert_eq!(Some(3), stream.next().await);
567    /// assert_eq!(None, stream.next().await);
568    /// # }
569    /// ```
570    fn take(self, n: usize) -> Take<Self>
571    where
572        Self: Sized,
573    {
574        Take::new(self, n)
575    }
576
577    /// Take elements from this stream while the provided predicate
578    /// resolves to `true`.
579    ///
580    /// This function, like `Iterator::take_while`, will take elements from the
581    /// stream until the predicate `f` resolves to `false`. Once one element
582    /// returns false it will always return that the stream is done.
583    ///
584    /// # Examples
585    ///
586    /// ```
587    /// # #[tokio::main(flavor = "current_thread")]
588    /// # async fn main() {
589    /// use tokio_stream::{self as stream, StreamExt};
590    ///
591    /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
592    ///
593    /// assert_eq!(Some(1), stream.next().await);
594    /// assert_eq!(Some(2), stream.next().await);
595    /// assert_eq!(Some(3), stream.next().await);
596    /// assert_eq!(None, stream.next().await);
597    /// # }
598    /// ```
599    fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
600    where
601        F: FnMut(&Self::Item) -> bool,
602        Self: Sized,
603    {
604        TakeWhile::new(self, f)
605    }
606
607    /// Creates a new stream that will skip the `n` first items of the
608    /// underlying stream.
609    ///
610    /// # Examples
611    ///
612    /// ```
613    /// # #[tokio::main(flavor = "current_thread")]
614    /// # async fn main() {
615    /// use tokio_stream::{self as stream, StreamExt};
616    ///
617    /// let mut stream = stream::iter(1..=10).skip(7);
618    ///
619    /// assert_eq!(Some(8), stream.next().await);
620    /// assert_eq!(Some(9), stream.next().await);
621    /// assert_eq!(Some(10), stream.next().await);
622    /// assert_eq!(None, stream.next().await);
623    /// # }
624    /// ```
625    fn skip(self, n: usize) -> Skip<Self>
626    where
627        Self: Sized,
628    {
629        Skip::new(self, n)
630    }
631
632    /// Skip elements from the underlying stream while the provided predicate
633    /// resolves to `true`.
634    ///
635    /// This function, like [`Iterator::skip_while`], will ignore elements from the
636    /// stream until the predicate `f` resolves to `false`. Once one element
637    /// returns false, the rest of the elements will be yielded.
638    ///
639    /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
640    ///
641    /// # Examples
642    ///
643    /// ```
644    /// # #[tokio::main(flavor = "current_thread")]
645    /// # async fn main() {
646    /// use tokio_stream::{self as stream, StreamExt};
647    /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
648    ///
649    /// assert_eq!(Some(3), stream.next().await);
650    /// assert_eq!(Some(4), stream.next().await);
651    /// assert_eq!(Some(1), stream.next().await);
652    /// assert_eq!(None, stream.next().await);
653    /// # }
654    /// ```
655    fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
656    where
657        F: FnMut(&Self::Item) -> bool,
658        Self: Sized,
659    {
660        SkipWhile::new(self, f)
661    }
662
663    /// Tests if every element of the stream matches a predicate.
664    ///
665    /// Equivalent to:
666    ///
667    /// ```ignore
668    /// async fn all<F>(&mut self, f: F) -> bool;
669    /// ```
670    ///
671    /// `all()` takes a closure that returns `true` or `false`. It applies
672    /// this closure to each element of the stream, and if they all return
673    /// `true`, then so does `all`. If any of them return `false`, it
674    /// returns `false`. An empty stream returns `true`.
675    ///
676    /// `all()` is short-circuiting; in other words, it will stop processing
677    /// as soon as it finds a `false`, given that no matter what else happens,
678    /// the result will also be `false`.
679    ///
680    /// An empty stream returns `true`.
681    ///
682    /// # Examples
683    ///
684    /// Basic usage:
685    ///
686    /// ```
687    /// # #[tokio::main(flavor = "current_thread")]
688    /// # async fn main() {
689    /// use tokio_stream::{self as stream, StreamExt};
690    ///
691    /// let a = [1, 2, 3];
692    ///
693    /// assert!(stream::iter(&a).all(|&x| x > 0).await);
694    ///
695    /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
696    /// # }
697    /// ```
698    ///
699    /// Stopping at the first `false`:
700    ///
701    /// ```
702    /// # #[tokio::main(flavor = "current_thread")]
703    /// # async fn main() {
704    /// use tokio_stream::{self as stream, StreamExt};
705    ///
706    /// let a = [1, 2, 3];
707    ///
708    /// let mut iter = stream::iter(&a);
709    ///
710    /// assert!(!iter.all(|&x| x != 2).await);
711    ///
712    /// // we can still use `iter`, as there are more elements.
713    /// assert_eq!(iter.next().await, Some(&3));
714    /// # }
715    /// ```
716    fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
717    where
718        Self: Unpin,
719        F: FnMut(Self::Item) -> bool,
720    {
721        AllFuture::new(self, f)
722    }
723
724    /// Tests if any element of the stream matches a predicate.
725    ///
726    /// Equivalent to:
727    ///
728    /// ```ignore
729    /// async fn any<F>(&mut self, f: F) -> bool;
730    /// ```
731    ///
732    /// `any()` takes a closure that returns `true` or `false`. It applies
733    /// this closure to each element of the stream, and if any of them return
734    /// `true`, then so does `any()`. If they all return `false`, it
735    /// returns `false`.
736    ///
737    /// `any()` is short-circuiting; in other words, it will stop processing
738    /// as soon as it finds a `true`, given that no matter what else happens,
739    /// the result will also be `true`.
740    ///
741    /// An empty stream returns `false`.
742    ///
743    /// Basic usage:
744    ///
745    /// ```
746    /// # #[tokio::main(flavor = "current_thread")]
747    /// # async fn main() {
748    /// use tokio_stream::{self as stream, StreamExt};
749    ///
750    /// let a = [1, 2, 3];
751    ///
752    /// assert!(stream::iter(&a).any(|&x| x > 0).await);
753    ///
754    /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
755    /// # }
756    /// ```
757    ///
758    /// Stopping at the first `true`:
759    ///
760    /// ```
761    /// # #[tokio::main(flavor = "current_thread")]
762    /// # async fn main() {
763    /// use tokio_stream::{self as stream, StreamExt};
764    ///
765    /// let a = [1, 2, 3];
766    ///
767    /// let mut iter = stream::iter(&a);
768    ///
769    /// assert!(iter.any(|&x| x != 2).await);
770    ///
771    /// // we can still use `iter`, as there are more elements.
772    /// assert_eq!(iter.next().await, Some(&2));
773    /// # }
774    /// ```
775    fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
776    where
777        Self: Unpin,
778        F: FnMut(Self::Item) -> bool,
779    {
780        AnyFuture::new(self, f)
781    }
782
783    /// Combine two streams into one by first returning all values from the
784    /// first stream then all values from the second stream.
785    ///
786    /// As long as `self` still has values to emit, no values from `other` are
787    /// emitted, even if some are ready.
788    ///
789    /// # Examples
790    ///
791    /// ```
792    /// use tokio_stream::{self as stream, StreamExt};
793    ///
794    /// # #[tokio::main(flavor = "current_thread")]
795    /// # async fn main() {
796    /// let one = stream::iter(vec![1, 2, 3]);
797    /// let two = stream::iter(vec![4, 5, 6]);
798    ///
799    /// let mut stream = one.chain(two);
800    ///
801    /// assert_eq!(stream.next().await, Some(1));
802    /// assert_eq!(stream.next().await, Some(2));
803    /// assert_eq!(stream.next().await, Some(3));
804    /// assert_eq!(stream.next().await, Some(4));
805    /// assert_eq!(stream.next().await, Some(5));
806    /// assert_eq!(stream.next().await, Some(6));
807    /// assert_eq!(stream.next().await, None);
808    /// # }
809    /// ```
810    fn chain<U>(self, other: U) -> Chain<Self, U>
811    where
812        U: Stream<Item = Self::Item>,
813        Self: Sized,
814    {
815        Chain::new(self, other)
816    }
817
818    /// A combinator that applies a function to every element in a stream
819    /// producing a single, final value.
820    ///
821    /// Equivalent to:
822    ///
823    /// ```ignore
824    /// async fn fold<B, F>(self, init: B, f: F) -> B;
825    /// ```
826    ///
827    /// # Examples
828    /// Basic usage:
829    /// ```
830    /// # #[tokio::main(flavor = "current_thread")]
831    /// # async fn main() {
832    /// use tokio_stream::{self as stream, *};
833    ///
834    /// let s = stream::iter(vec![1u8, 2, 3]);
835    /// let sum = s.fold(0, |acc, x| acc + x).await;
836    ///
837    /// assert_eq!(sum, 6);
838    /// # }
839    /// ```
840    fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
841    where
842        Self: Sized,
843        F: FnMut(B, Self::Item) -> B,
844    {
845        FoldFuture::new(self, init, f)
846    }
847
848    /// Drain stream pushing all emitted values into a collection.
849    ///
850    /// Equivalent to:
851    ///
852    /// ```ignore
853    /// async fn collect<T>(self) -> T;
854    /// ```
855    ///
856    /// `collect` streams all values, awaiting as needed. Values are pushed into
857    /// a collection. A number of different target collection types are
858    /// supported, including [`Vec`], [`String`], and [`Bytes`].
859    ///
860    /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
861    ///
862    /// # `Result`
863    ///
864    /// `collect()` can also be used with streams of type `Result<T, E>` where
865    /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
866    /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
867    /// streaming is terminated and `collect()` returns the `Err`.
868    ///
869    /// # Notes
870    ///
871    /// `FromStream` is currently a sealed trait. Stabilization is pending
872    /// enhancements to the Rust language.
873    ///
874    /// # Examples
875    ///
876    /// Basic usage:
877    ///
878    /// ```
879    /// use tokio_stream::{self as stream, StreamExt};
880    ///
881    /// # #[tokio::main(flavor = "current_thread")]
882    /// # async fn main() {
883    /// let doubled: Vec<i32> =
884    ///     stream::iter(vec![1, 2, 3])
885    ///         .map(|x| x * 2)
886    ///         .collect()
887    ///         .await;
888    ///
889    /// assert_eq!(vec![2, 4, 6], doubled);
890    /// # }
891    /// ```
892    ///
893    /// Collecting a stream of `Result` values
894    ///
895    /// ```
896    /// use tokio_stream::{self as stream, StreamExt};
897    ///
898    /// # #[tokio::main(flavor = "current_thread")]
899    /// # async fn main() {
900    /// // A stream containing only `Ok` values will be collected
901    /// let values: Result<Vec<i32>, &str> =
902    ///     stream::iter(vec![Ok(1), Ok(2), Ok(3)])
903    ///         .collect()
904    ///         .await;
905    ///
906    /// assert_eq!(Ok(vec![1, 2, 3]), values);
907    ///
908    /// // A stream containing `Err` values will return the first error.
909    /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
910    ///
911    /// let values: Result<Vec<i32>, &str> =
912    ///     stream::iter(results)
913    ///         .collect()
914    ///         .await;
915    ///
916    /// assert_eq!(Err("no"), values);
917    /// # }
918    /// ```
919    fn collect<T>(self) -> Collect<Self, T, T::InternalCollection>
920    where
921        T: FromStream<Self::Item>,
922        Self: Sized,
923    {
924        Collect::new(self)
925    }
926
927    /// Applies a per-item timeout to the passed stream.
928    ///
929    /// `timeout()` takes a `Duration` that represents the maximum amount of
930    /// time each element of the stream has to complete before timing out.
931    ///
932    /// If the wrapped stream yields a value before the deadline is reached, the
933    /// value is returned. Otherwise, an error is returned. The caller may decide
934    /// to continue consuming the stream and will eventually get the next source
935    /// stream value once it becomes available. See
936    /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
937    /// where the timeouts will repeat.
938    ///
939    /// # Notes
940    ///
941    /// This function consumes the stream passed into it and returns a
942    /// wrapped version of it.
943    ///
944    /// Polling the returned stream will continue to poll the inner stream even
945    /// if one or more items time out.
946    ///
947    /// # Examples
948    ///
949    /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
950    ///
951    /// ```
952    /// # #[tokio::main(flavor = "current_thread")]
953    /// # async fn main() {
954    /// use tokio_stream::{self as stream, StreamExt};
955    /// use std::time::Duration;
956    /// # let int_stream = stream::iter(1..=3);
957    ///
958    /// let int_stream = int_stream.timeout(Duration::from_secs(1));
959    /// tokio::pin!(int_stream);
960    ///
961    /// // When no items time out, we get the 3 elements in succession:
962    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
963    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
964    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
965    /// assert_eq!(int_stream.try_next().await, Ok(None));
966    ///
967    /// // If the second item times out, we get an error and continue polling the stream:
968    /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
969    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
970    /// assert!(int_stream.try_next().await.is_err());
971    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
972    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
973    /// assert_eq!(int_stream.try_next().await, Ok(None));
974    ///
975    /// // If we want to stop consuming the source stream the first time an
976    /// // element times out, we can use the `take_while` operator:
977    /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
978    /// let mut int_stream = int_stream.take_while(Result::is_ok);
979    ///
980    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
981    /// assert_eq!(int_stream.try_next().await, Ok(None));
982    /// # }
983    /// ```
984    ///
985    /// Once a timeout error is received, no further events will be received
986    /// unless the wrapped stream yields a value (timeouts do not repeat).
987    ///
988    /// ```
989    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
990    /// # async fn main() {
991    /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
992    /// use std::time::Duration;
993    /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
994    /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
995    /// tokio::pin!(timeout_stream);
996    ///
997    /// // Only one timeout will be received between values in the source stream.
998    /// assert!(timeout_stream.try_next().await.is_ok());
999    /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
1000    /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
1001    /// # }
1002    /// ```
1003    #[cfg(feature = "time")]
1004    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1005    fn timeout(self, duration: Duration) -> Timeout<Self>
1006    where
1007        Self: Sized,
1008    {
1009        Timeout::new(self, duration)
1010    }
1011
1012    /// Applies a per-item timeout to the passed stream.
1013    ///
1014    /// `timeout_repeating()` takes an [`Interval`] that controls the time each
1015    /// element of the stream has to complete before timing out.
1016    ///
1017    /// If the wrapped stream yields a value before the deadline is reached, the
1018    /// value is returned. Otherwise, an error is returned. The caller may decide
1019    /// to continue consuming the stream and will eventually get the next source
1020    /// stream value once it becomes available. Unlike `timeout()`, if no value
1021    /// becomes available before the deadline is reached, additional errors are
1022    /// returned at the specified interval. See [`timeout`](StreamExt::timeout)
1023    /// for an alternative where the timeouts do not repeat.
1024    ///
1025    /// # Notes
1026    ///
1027    /// This function consumes the stream passed into it and returns a
1028    /// wrapped version of it.
1029    ///
1030    /// Polling the returned stream will continue to poll the inner stream even
1031    /// if one or more items time out.
1032    ///
1033    /// # Examples
1034    ///
1035    /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
1036    ///
1037    /// ```
1038    /// # #[tokio::main(flavor = "current_thread")]
1039    /// # async fn main() {
1040    /// use tokio_stream::{self as stream, StreamExt};
1041    /// use std::time::Duration;
1042    /// # let int_stream = stream::iter(1..=3);
1043    ///
1044    /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
1045    /// tokio::pin!(int_stream);
1046    ///
1047    /// // When no items time out, we get the 3 elements in succession:
1048    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1049    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1050    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1051    /// assert_eq!(int_stream.try_next().await, Ok(None));
1052    ///
1053    /// // If the second item times out, we get an error and continue polling the stream:
1054    /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1055    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1056    /// assert!(int_stream.try_next().await.is_err());
1057    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1058    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1059    /// assert_eq!(int_stream.try_next().await, Ok(None));
1060    ///
1061    /// // If we want to stop consuming the source stream the first time an
1062    /// // element times out, we can use the `take_while` operator:
1063    /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1064    /// let mut int_stream = int_stream.take_while(Result::is_ok);
1065    ///
1066    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1067    /// assert_eq!(int_stream.try_next().await, Ok(None));
1068    /// # }
1069    /// ```
1070    ///
1071    /// Timeout errors will be continuously produced at the specified interval
1072    /// until the wrapped stream yields a value.
1073    ///
1074    /// ```
1075    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1076    /// # async fn main() {
1077    /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
1078    /// use std::time::Duration;
1079    /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
1080    /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
1081    /// tokio::pin!(timeout_stream);
1082    ///
1083    /// // Multiple timeouts will be received between values in the source stream.
1084    /// assert!(timeout_stream.try_next().await.is_ok());
1085    /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
1086    /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
1087    /// // Will eventually receive another value from the source stream...
1088    /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
1089    /// # }
1090    /// ```
1091    #[cfg(feature = "time")]
1092    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1093    fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
1094    where
1095        Self: Sized,
1096    {
1097        TimeoutRepeating::new(self, interval)
1098    }
1099
1100    /// Slows down a stream by enforcing a delay between items.
1101    ///
1102    /// The underlying timer behind this utility has a granularity of one millisecond.
1103    ///
1104    /// # Example
1105    ///
1106    /// Create a throttled stream.
1107    /// ```rust,no_run
1108    /// use std::time::Duration;
1109    /// use tokio_stream::StreamExt;
1110    ///
1111    /// # async fn dox() {
1112    /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
1113    /// tokio::pin!(item_stream);
1114    ///
1115    /// loop {
1116    ///     // The string will be produced at most every 2 seconds
1117    ///     println!("{:?}", item_stream.next().await);
1118    /// }
1119    /// # }
1120    /// ```
1121    #[cfg(feature = "time")]
1122    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1123    fn throttle(self, duration: Duration) -> Throttle<Self>
1124    where
1125        Self: Sized,
1126    {
1127        throttle(duration, self)
1128    }
1129
1130    /// Batches the items in the given stream using a maximum duration and size for each batch.
1131    ///
1132    /// This stream returns the next batch of items in the following situations:
1133    ///  1. The inner stream has returned at least `max_size` many items since the last batch.
1134    ///  2. The time since the first item of a batch is greater than the given duration.
1135    ///  3. The end of the stream is reached.
1136    ///
1137    /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
1138    /// will not be emitted if no items are received upstream.
1139    ///
1140    /// # Panics
1141    ///
1142    /// This function panics if `max_size` is zero
1143    ///
1144    /// # Example
1145    ///
1146    /// ```rust
1147    /// use std::time::Duration;
1148    /// use tokio::time;
1149    /// use tokio_stream::{self as stream, StreamExt};
1150    /// use futures::FutureExt;
1151    ///
1152    /// #[tokio::main]
1153    /// # async fn _unused() {}
1154    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1155    /// async fn main() {
1156    ///     let iter = vec![1, 2, 3, 4].into_iter();
1157    ///     let stream0 = stream::iter(iter);
1158    ///
1159    ///     let iter = vec![5].into_iter();
1160    ///     let stream1 = stream::iter(iter)
1161    ///          .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
1162    ///
1163    ///     let chunk_stream = stream0
1164    ///         .chain(stream1)
1165    ///         .chunks_timeout(3, Duration::from_secs(2));
1166    ///     tokio::pin!(chunk_stream);
1167    ///
1168    ///     // a full batch was received
1169    ///     assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
1170    ///     // deadline was reached before max_size was reached
1171    ///     assert_eq!(chunk_stream.next().await, Some(vec![4]));
1172    ///     // last element in the stream
1173    ///     assert_eq!(chunk_stream.next().await, Some(vec![5]));
1174    /// }
1175    /// ```
1176    #[cfg(feature = "time")]
1177    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1178    #[track_caller]
1179    fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
1180    where
1181        Self: Sized,
1182    {
1183        assert!(max_size > 0, "`max_size` must be non-zero.");
1184        ChunksTimeout::new(self, max_size, duration)
1185    }
1186
1187    /// Turns the stream into a peekable stream, whose next element can be peeked at without being
1188    /// consumed.
1189    /// ```rust
1190    /// use tokio_stream::{self as stream, StreamExt};
1191    ///
1192    /// #[tokio::main]
1193    /// # async fn _unused() {}
1194    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1195    /// async fn main() {
1196    ///     let iter = vec![1, 2, 3, 4].into_iter();
1197    ///     let mut stream = stream::iter(iter).peekable();
1198    ///
1199    ///     assert_eq!(*stream.peek().await.unwrap(), 1);
1200    ///     assert_eq!(*stream.peek().await.unwrap(), 1);
1201    ///     assert_eq!(stream.next().await.unwrap(), 1);
1202    ///     assert_eq!(*stream.peek().await.unwrap(), 2);
1203    /// }
1204    /// ```
1205    fn peekable(self) -> Peekable<Self>
1206    where
1207        Self: Sized,
1208    {
1209        Peekable::new(self)
1210    }
1211}
1212
1213impl<St: ?Sized> StreamExt for St where St: Stream {}
1214
1215/// Merge the size hints from two streams.
1216fn merge_size_hints(
1217    (left_low, left_high): (usize, Option<usize>),
1218    (right_low, right_high): (usize, Option<usize>),
1219) -> (usize, Option<usize>) {
1220    let low = left_low.saturating_add(right_low);
1221    let high = match (left_high, right_high) {
1222        (Some(h1), Some(h2)) => h1.checked_add(h2),
1223        _ => None,
1224    };
1225    (low, high)
1226}