tokio_stream/stream_ext.rs
1use core::future::Future;
2use futures_core::Stream;
3
4mod all;
5use all::AllFuture;
6
7mod any;
8use any::AnyFuture;
9
10mod chain;
11pub use chain::Chain;
12
13pub(crate) mod collect;
14use collect::{Collect, FromStream};
15
16mod filter;
17pub use filter::Filter;
18
19mod filter_map;
20pub use filter_map::FilterMap;
21
22mod fold;
23use fold::FoldFuture;
24
25mod fuse;
26pub use fuse::Fuse;
27
28mod map;
29pub use map::Map;
30
31mod map_while;
32pub use map_while::MapWhile;
33
34mod merge;
35pub use merge::Merge;
36
37mod next;
38use next::Next;
39
40mod skip;
41pub use skip::Skip;
42
43mod skip_while;
44pub use skip_while::SkipWhile;
45
46mod take;
47pub use take::Take;
48
49mod take_while;
50pub use take_while::TakeWhile;
51
52mod then;
53pub use then::Then;
54
55mod try_next;
56use try_next::TryNext;
57
58mod peekable;
59pub use peekable::Peekable;
60
61cfg_time! {
62 pub(crate) mod timeout;
63 pub(crate) mod timeout_repeating;
64 pub use timeout::Timeout;
65 pub use timeout_repeating::TimeoutRepeating;
66 use tokio::time::{Duration, Interval};
67 mod throttle;
68 use throttle::{throttle, Throttle};
69 mod chunks_timeout;
70 pub use chunks_timeout::ChunksTimeout;
71}
72
73/// An extension trait for the [`Stream`] trait that provides a variety of
74/// convenient combinator functions.
75///
76/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
77/// in the [futures] crate, however both Tokio and futures provide separate
78/// `StreamExt` utility traits, and some utilities are only available on one of
79/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
80/// trait in the futures crate.
81///
82/// If you need utilities from both `StreamExt` traits, you should prefer to
83/// import one of them, and use the other through the fully qualified call
84/// syntax. For example:
85/// ```
86/// // import one of the traits:
87/// use futures::stream::StreamExt;
88/// # #[tokio::main(flavor = "current_thread")]
89/// # async fn main() {
90///
91/// let a = tokio_stream::iter(vec![1, 3, 5]);
92/// let b = tokio_stream::iter(vec![2, 4, 6]);
93///
94/// // use the fully qualified call syntax for the other trait:
95/// let merged = tokio_stream::StreamExt::merge(a, b);
96///
97/// // use normal call notation for futures::stream::StreamExt::collect
98/// let output: Vec<_> = merged.collect().await;
99/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
100/// # }
101/// ```
102///
103/// [`Stream`]: crate::Stream
104/// [futures]: https://docs.rs/futures
105/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
106pub trait StreamExt: Stream {
107 /// Consumes and returns the next value in the stream or `None` if the
108 /// stream is finished.
109 ///
110 /// Equivalent to:
111 ///
112 /// ```ignore
113 /// async fn next(&mut self) -> Option<Self::Item>;
114 /// ```
115 ///
116 /// Note that because `next` doesn't take ownership over the stream,
117 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
118 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
119 /// be done by boxing the stream using [`Box::pin`] or
120 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
121 /// crate.
122 ///
123 /// # Cancel safety
124 ///
125 /// This method is cancel safe. The returned future only
126 /// holds onto a reference to the underlying stream,
127 /// so dropping it will never lose a value.
128 ///
129 /// # Examples
130 ///
131 /// ```
132 /// # #[tokio::main(flavor = "current_thread")]
133 /// # async fn main() {
134 /// use tokio_stream::{self as stream, StreamExt};
135 ///
136 /// let mut stream = stream::iter(1..=3);
137 ///
138 /// assert_eq!(stream.next().await, Some(1));
139 /// assert_eq!(stream.next().await, Some(2));
140 /// assert_eq!(stream.next().await, Some(3));
141 /// assert_eq!(stream.next().await, None);
142 /// # }
143 /// ```
144 fn next(&mut self) -> Next<'_, Self>
145 where
146 Self: Unpin,
147 {
148 Next::new(self)
149 }
150
151 /// Consumes and returns the next item in the stream. If an error is
152 /// encountered before the next item, the error is returned instead.
153 ///
154 /// Equivalent to:
155 ///
156 /// ```ignore
157 /// async fn try_next(&mut self) -> Result<Option<T>, E>;
158 /// ```
159 ///
160 /// This is similar to the [`next`](StreamExt::next) combinator,
161 /// but returns a [`Result<Option<T>, E>`](Result) rather than
162 /// an [`Option<Result<T, E>>`](Option), making for easy use
163 /// with the [`?`](std::ops::Try) operator.
164 ///
165 /// # Cancel safety
166 ///
167 /// This method is cancel safe. The returned future only
168 /// holds onto a reference to the underlying stream,
169 /// so dropping it will never lose a value.
170 ///
171 /// # Examples
172 ///
173 /// ```
174 /// # #[tokio::main(flavor = "current_thread")]
175 /// # async fn main() {
176 ///
177 /// use tokio_stream::{self as stream, StreamExt};
178 ///
179 /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
180 ///
181 /// assert_eq!(stream.try_next().await, Ok(Some(1)));
182 /// assert_eq!(stream.try_next().await, Ok(Some(2)));
183 /// assert_eq!(stream.try_next().await, Err("nope"));
184 /// # }
185 /// ```
186 fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
187 where
188 Self: Stream<Item = Result<T, E>> + Unpin,
189 {
190 TryNext::new(self)
191 }
192
193 /// Maps this stream's items to a different type, returning a new stream of
194 /// the resulting type.
195 ///
196 /// The provided closure is executed over all elements of this stream as
197 /// they are made available. It is executed inline with calls to
198 /// [`poll_next`](Stream::poll_next).
199 ///
200 /// Note that this function consumes the stream passed into it and returns a
201 /// wrapped version of it, similar to the existing `map` methods in the
202 /// standard library.
203 ///
204 /// # Examples
205 ///
206 /// ```
207 /// # #[tokio::main(flavor = "current_thread")]
208 /// # async fn main() {
209 /// use tokio_stream::{self as stream, StreamExt};
210 ///
211 /// let stream = stream::iter(1..=3);
212 /// let mut stream = stream.map(|x| x + 3);
213 ///
214 /// assert_eq!(stream.next().await, Some(4));
215 /// assert_eq!(stream.next().await, Some(5));
216 /// assert_eq!(stream.next().await, Some(6));
217 /// # }
218 /// ```
219 fn map<T, F>(self, f: F) -> Map<Self, F>
220 where
221 F: FnMut(Self::Item) -> T,
222 Self: Sized,
223 {
224 Map::new(self, f)
225 }
226
227 /// Map this stream's items to a different type for as long as determined by
228 /// the provided closure. A stream of the target type will be returned,
229 /// which will yield elements until the closure returns `None`.
230 ///
231 /// The provided closure is executed over all elements of this stream as
232 /// they are made available, until it returns `None`. It is executed inline
233 /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
234 /// the underlying stream will not be polled again.
235 ///
236 /// Note that this function consumes the stream passed into it and returns a
237 /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
238 /// standard library.
239 ///
240 /// # Examples
241 ///
242 /// ```
243 /// # #[tokio::main(flavor = "current_thread")]
244 /// # async fn main() {
245 /// use tokio_stream::{self as stream, StreamExt};
246 ///
247 /// let stream = stream::iter(1..=10);
248 /// let mut stream = stream.map_while(|x| {
249 /// if x < 4 {
250 /// Some(x + 3)
251 /// } else {
252 /// None
253 /// }
254 /// });
255 /// assert_eq!(stream.next().await, Some(4));
256 /// assert_eq!(stream.next().await, Some(5));
257 /// assert_eq!(stream.next().await, Some(6));
258 /// assert_eq!(stream.next().await, None);
259 /// # }
260 /// ```
261 fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
262 where
263 F: FnMut(Self::Item) -> Option<T>,
264 Self: Sized,
265 {
266 MapWhile::new(self, f)
267 }
268
269 /// Maps this stream's items asynchronously to a different type, returning a
270 /// new stream of the resulting type.
271 ///
272 /// The provided closure is executed over all elements of this stream as
273 /// they are made available, and the returned future is executed. Only one
274 /// future is executed at the time.
275 ///
276 /// Note that this function consumes the stream passed into it and returns a
277 /// wrapped version of it, similar to the existing `then` methods in the
278 /// standard library.
279 ///
280 /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
281 /// returned by this method. To handle this, you can use `tokio::pin!` as in
282 /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
283 ///
284 /// # Examples
285 ///
286 /// ```
287 /// # #[tokio::main(flavor = "current_thread")]
288 /// # async fn main() {
289 /// use tokio_stream::{self as stream, StreamExt};
290 ///
291 /// async fn do_async_work(value: i32) -> i32 {
292 /// value + 3
293 /// }
294 ///
295 /// let stream = stream::iter(1..=3);
296 /// let stream = stream.then(do_async_work);
297 ///
298 /// tokio::pin!(stream);
299 ///
300 /// assert_eq!(stream.next().await, Some(4));
301 /// assert_eq!(stream.next().await, Some(5));
302 /// assert_eq!(stream.next().await, Some(6));
303 /// # }
304 /// ```
305 fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
306 where
307 F: FnMut(Self::Item) -> Fut,
308 Fut: Future,
309 Self: Sized,
310 {
311 Then::new(self, f)
312 }
313
314 /// Combine two streams into one by interleaving the output of both as it
315 /// is produced.
316 ///
317 /// Values are produced from the merged stream in the order they arrive from
318 /// the two source streams. If both source streams provide values
319 /// simultaneously, the merge stream alternates between them. This provides
320 /// some level of fairness. You should not chain calls to `merge`, as this
321 /// will break the fairness of the merging.
322 ///
323 /// The merged stream completes once **both** source streams complete. When
324 /// one source stream completes before the other, the merge stream
325 /// exclusively polls the remaining stream.
326 ///
327 /// For merging multiple streams, consider using [`StreamMap`] instead.
328 ///
329 /// [`StreamMap`]: crate::StreamMap
330 ///
331 /// # Examples
332 ///
333 /// ```
334 /// use tokio_stream::{StreamExt, Stream};
335 /// use tokio::sync::mpsc;
336 /// use tokio::time;
337 ///
338 /// use std::time::Duration;
339 /// use std::pin::Pin;
340 ///
341 /// # /*
342 /// #[tokio::main]
343 /// # */
344 /// # #[tokio::main(flavor = "current_thread")]
345 /// async fn main() {
346 /// # time::pause();
347 /// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
348 /// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
349 ///
350 /// // Convert the channels to a `Stream`.
351 /// let rx1 = Box::pin(async_stream::stream! {
352 /// while let Some(item) = rx1.recv().await {
353 /// yield item;
354 /// }
355 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
356 ///
357 /// let rx2 = Box::pin(async_stream::stream! {
358 /// while let Some(item) = rx2.recv().await {
359 /// yield item;
360 /// }
361 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
362 ///
363 /// let mut rx = rx1.merge(rx2);
364 ///
365 /// tokio::spawn(async move {
366 /// // Send some values immediately
367 /// tx1.send(1).await.unwrap();
368 /// tx1.send(2).await.unwrap();
369 ///
370 /// // Let the other task send values
371 /// time::sleep(Duration::from_millis(20)).await;
372 ///
373 /// tx1.send(4).await.unwrap();
374 /// });
375 ///
376 /// tokio::spawn(async move {
377 /// // Wait for the first task to send values
378 /// time::sleep(Duration::from_millis(5)).await;
379 ///
380 /// tx2.send(3).await.unwrap();
381 ///
382 /// time::sleep(Duration::from_millis(25)).await;
383 ///
384 /// // Send the final value
385 /// tx2.send(5).await.unwrap();
386 /// });
387 ///
388 /// assert_eq!(1, rx.next().await.unwrap());
389 /// assert_eq!(2, rx.next().await.unwrap());
390 /// assert_eq!(3, rx.next().await.unwrap());
391 /// assert_eq!(4, rx.next().await.unwrap());
392 /// assert_eq!(5, rx.next().await.unwrap());
393 ///
394 /// // The merged stream is consumed
395 /// assert!(rx.next().await.is_none());
396 /// }
397 /// ```
398 fn merge<U>(self, other: U) -> Merge<Self, U>
399 where
400 U: Stream<Item = Self::Item>,
401 Self: Sized,
402 {
403 Merge::new(self, other)
404 }
405
406 /// Filters the values produced by this stream according to the provided
407 /// predicate.
408 ///
409 /// As values of this stream are made available, the provided predicate `f`
410 /// will be run against them. If the predicate
411 /// resolves to `true`, then the stream will yield the value, but if the
412 /// predicate resolves to `false`, then the value
413 /// will be discarded and the next value will be produced.
414 ///
415 /// Note that this function consumes the stream passed into it and returns a
416 /// wrapped version of it, similar to [`Iterator::filter`] method in the
417 /// standard library.
418 ///
419 /// # Examples
420 ///
421 /// ```
422 /// # #[tokio::main(flavor = "current_thread")]
423 /// # async fn main() {
424 /// use tokio_stream::{self as stream, StreamExt};
425 ///
426 /// let stream = stream::iter(1..=8);
427 /// let mut evens = stream.filter(|x| x % 2 == 0);
428 ///
429 /// assert_eq!(Some(2), evens.next().await);
430 /// assert_eq!(Some(4), evens.next().await);
431 /// assert_eq!(Some(6), evens.next().await);
432 /// assert_eq!(Some(8), evens.next().await);
433 /// assert_eq!(None, evens.next().await);
434 /// # }
435 /// ```
436 fn filter<F>(self, f: F) -> Filter<Self, F>
437 where
438 F: FnMut(&Self::Item) -> bool,
439 Self: Sized,
440 {
441 Filter::new(self, f)
442 }
443
444 /// Filters the values produced by this stream while simultaneously mapping
445 /// them to a different type according to the provided closure.
446 ///
447 /// As values of this stream are made available, the provided function will
448 /// be run on them. If the predicate `f` resolves to
449 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
450 /// it resolves to [`None`], then the value will be skipped.
451 ///
452 /// Note that this function consumes the stream passed into it and returns a
453 /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
454 /// standard library.
455 ///
456 /// # Examples
457 /// ```
458 /// # #[tokio::main(flavor = "current_thread")]
459 /// # async fn main() {
460 /// use tokio_stream::{self as stream, StreamExt};
461 ///
462 /// let stream = stream::iter(1..=8);
463 /// let mut evens = stream.filter_map(|x| {
464 /// if x % 2 == 0 { Some(x + 1) } else { None }
465 /// });
466 ///
467 /// assert_eq!(Some(3), evens.next().await);
468 /// assert_eq!(Some(5), evens.next().await);
469 /// assert_eq!(Some(7), evens.next().await);
470 /// assert_eq!(Some(9), evens.next().await);
471 /// assert_eq!(None, evens.next().await);
472 /// # }
473 /// ```
474 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
475 where
476 F: FnMut(Self::Item) -> Option<T>,
477 Self: Sized,
478 {
479 FilterMap::new(self, f)
480 }
481
482 /// Creates a stream which ends after the first `None`.
483 ///
484 /// After a stream returns `None`, behavior is undefined. Future calls to
485 /// `poll_next` may or may not return `Some(T)` again or they may panic.
486 /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
487 /// return `None` forever.
488 ///
489 /// # Examples
490 ///
491 /// ```
492 /// use tokio_stream::{Stream, StreamExt};
493 ///
494 /// use std::pin::Pin;
495 /// use std::task::{Context, Poll};
496 ///
497 /// // a stream which alternates between Some and None
498 /// struct Alternate {
499 /// state: i32,
500 /// }
501 ///
502 /// impl Stream for Alternate {
503 /// type Item = i32;
504 ///
505 /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
506 /// let val = self.state;
507 /// self.state = self.state + 1;
508 ///
509 /// // if it's even, Some(i32), else None
510 /// if val % 2 == 0 {
511 /// Poll::Ready(Some(val))
512 /// } else {
513 /// Poll::Ready(None)
514 /// }
515 /// }
516 /// }
517 ///
518 /// # /*
519 /// #[tokio::main]
520 /// # */
521 /// # #[tokio::main(flavor = "current_thread")]
522 /// async fn main() {
523 /// let mut stream = Alternate { state: 0 };
524 ///
525 /// // the stream goes back and forth
526 /// assert_eq!(stream.next().await, Some(0));
527 /// assert_eq!(stream.next().await, None);
528 /// assert_eq!(stream.next().await, Some(2));
529 /// assert_eq!(stream.next().await, None);
530 ///
531 /// // however, once it is fused
532 /// let mut stream = stream.fuse();
533 ///
534 /// assert_eq!(stream.next().await, Some(4));
535 /// assert_eq!(stream.next().await, None);
536 ///
537 /// // it will always return `None` after the first time.
538 /// assert_eq!(stream.next().await, None);
539 /// assert_eq!(stream.next().await, None);
540 /// assert_eq!(stream.next().await, None);
541 /// }
542 /// ```
543 fn fuse(self) -> Fuse<Self>
544 where
545 Self: Sized,
546 {
547 Fuse::new(self)
548 }
549
550 /// Creates a new stream of at most `n` items of the underlying stream.
551 ///
552 /// Once `n` items have been yielded from this stream then it will always
553 /// return that the stream is done.
554 ///
555 /// # Examples
556 ///
557 /// ```
558 /// # #[tokio::main(flavor = "current_thread")]
559 /// # async fn main() {
560 /// use tokio_stream::{self as stream, StreamExt};
561 ///
562 /// let mut stream = stream::iter(1..=10).take(3);
563 ///
564 /// assert_eq!(Some(1), stream.next().await);
565 /// assert_eq!(Some(2), stream.next().await);
566 /// assert_eq!(Some(3), stream.next().await);
567 /// assert_eq!(None, stream.next().await);
568 /// # }
569 /// ```
570 fn take(self, n: usize) -> Take<Self>
571 where
572 Self: Sized,
573 {
574 Take::new(self, n)
575 }
576
577 /// Take elements from this stream while the provided predicate
578 /// resolves to `true`.
579 ///
580 /// This function, like `Iterator::take_while`, will take elements from the
581 /// stream until the predicate `f` resolves to `false`. Once one element
582 /// returns false it will always return that the stream is done.
583 ///
584 /// # Examples
585 ///
586 /// ```
587 /// # #[tokio::main(flavor = "current_thread")]
588 /// # async fn main() {
589 /// use tokio_stream::{self as stream, StreamExt};
590 ///
591 /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
592 ///
593 /// assert_eq!(Some(1), stream.next().await);
594 /// assert_eq!(Some(2), stream.next().await);
595 /// assert_eq!(Some(3), stream.next().await);
596 /// assert_eq!(None, stream.next().await);
597 /// # }
598 /// ```
599 fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
600 where
601 F: FnMut(&Self::Item) -> bool,
602 Self: Sized,
603 {
604 TakeWhile::new(self, f)
605 }
606
607 /// Creates a new stream that will skip the `n` first items of the
608 /// underlying stream.
609 ///
610 /// # Examples
611 ///
612 /// ```
613 /// # #[tokio::main(flavor = "current_thread")]
614 /// # async fn main() {
615 /// use tokio_stream::{self as stream, StreamExt};
616 ///
617 /// let mut stream = stream::iter(1..=10).skip(7);
618 ///
619 /// assert_eq!(Some(8), stream.next().await);
620 /// assert_eq!(Some(9), stream.next().await);
621 /// assert_eq!(Some(10), stream.next().await);
622 /// assert_eq!(None, stream.next().await);
623 /// # }
624 /// ```
625 fn skip(self, n: usize) -> Skip<Self>
626 where
627 Self: Sized,
628 {
629 Skip::new(self, n)
630 }
631
632 /// Skip elements from the underlying stream while the provided predicate
633 /// resolves to `true`.
634 ///
635 /// This function, like [`Iterator::skip_while`], will ignore elements from the
636 /// stream until the predicate `f` resolves to `false`. Once one element
637 /// returns false, the rest of the elements will be yielded.
638 ///
639 /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
640 ///
641 /// # Examples
642 ///
643 /// ```
644 /// # #[tokio::main(flavor = "current_thread")]
645 /// # async fn main() {
646 /// use tokio_stream::{self as stream, StreamExt};
647 /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
648 ///
649 /// assert_eq!(Some(3), stream.next().await);
650 /// assert_eq!(Some(4), stream.next().await);
651 /// assert_eq!(Some(1), stream.next().await);
652 /// assert_eq!(None, stream.next().await);
653 /// # }
654 /// ```
655 fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
656 where
657 F: FnMut(&Self::Item) -> bool,
658 Self: Sized,
659 {
660 SkipWhile::new(self, f)
661 }
662
663 /// Tests if every element of the stream matches a predicate.
664 ///
665 /// Equivalent to:
666 ///
667 /// ```ignore
668 /// async fn all<F>(&mut self, f: F) -> bool;
669 /// ```
670 ///
671 /// `all()` takes a closure that returns `true` or `false`. It applies
672 /// this closure to each element of the stream, and if they all return
673 /// `true`, then so does `all`. If any of them return `false`, it
674 /// returns `false`. An empty stream returns `true`.
675 ///
676 /// `all()` is short-circuiting; in other words, it will stop processing
677 /// as soon as it finds a `false`, given that no matter what else happens,
678 /// the result will also be `false`.
679 ///
680 /// An empty stream returns `true`.
681 ///
682 /// # Examples
683 ///
684 /// Basic usage:
685 ///
686 /// ```
687 /// # #[tokio::main(flavor = "current_thread")]
688 /// # async fn main() {
689 /// use tokio_stream::{self as stream, StreamExt};
690 ///
691 /// let a = [1, 2, 3];
692 ///
693 /// assert!(stream::iter(&a).all(|&x| x > 0).await);
694 ///
695 /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
696 /// # }
697 /// ```
698 ///
699 /// Stopping at the first `false`:
700 ///
701 /// ```
702 /// # #[tokio::main(flavor = "current_thread")]
703 /// # async fn main() {
704 /// use tokio_stream::{self as stream, StreamExt};
705 ///
706 /// let a = [1, 2, 3];
707 ///
708 /// let mut iter = stream::iter(&a);
709 ///
710 /// assert!(!iter.all(|&x| x != 2).await);
711 ///
712 /// // we can still use `iter`, as there are more elements.
713 /// assert_eq!(iter.next().await, Some(&3));
714 /// # }
715 /// ```
716 fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
717 where
718 Self: Unpin,
719 F: FnMut(Self::Item) -> bool,
720 {
721 AllFuture::new(self, f)
722 }
723
724 /// Tests if any element of the stream matches a predicate.
725 ///
726 /// Equivalent to:
727 ///
728 /// ```ignore
729 /// async fn any<F>(&mut self, f: F) -> bool;
730 /// ```
731 ///
732 /// `any()` takes a closure that returns `true` or `false`. It applies
733 /// this closure to each element of the stream, and if any of them return
734 /// `true`, then so does `any()`. If they all return `false`, it
735 /// returns `false`.
736 ///
737 /// `any()` is short-circuiting; in other words, it will stop processing
738 /// as soon as it finds a `true`, given that no matter what else happens,
739 /// the result will also be `true`.
740 ///
741 /// An empty stream returns `false`.
742 ///
743 /// Basic usage:
744 ///
745 /// ```
746 /// # #[tokio::main(flavor = "current_thread")]
747 /// # async fn main() {
748 /// use tokio_stream::{self as stream, StreamExt};
749 ///
750 /// let a = [1, 2, 3];
751 ///
752 /// assert!(stream::iter(&a).any(|&x| x > 0).await);
753 ///
754 /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
755 /// # }
756 /// ```
757 ///
758 /// Stopping at the first `true`:
759 ///
760 /// ```
761 /// # #[tokio::main(flavor = "current_thread")]
762 /// # async fn main() {
763 /// use tokio_stream::{self as stream, StreamExt};
764 ///
765 /// let a = [1, 2, 3];
766 ///
767 /// let mut iter = stream::iter(&a);
768 ///
769 /// assert!(iter.any(|&x| x != 2).await);
770 ///
771 /// // we can still use `iter`, as there are more elements.
772 /// assert_eq!(iter.next().await, Some(&2));
773 /// # }
774 /// ```
775 fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
776 where
777 Self: Unpin,
778 F: FnMut(Self::Item) -> bool,
779 {
780 AnyFuture::new(self, f)
781 }
782
783 /// Combine two streams into one by first returning all values from the
784 /// first stream then all values from the second stream.
785 ///
786 /// As long as `self` still has values to emit, no values from `other` are
787 /// emitted, even if some are ready.
788 ///
789 /// # Examples
790 ///
791 /// ```
792 /// use tokio_stream::{self as stream, StreamExt};
793 ///
794 /// # #[tokio::main(flavor = "current_thread")]
795 /// # async fn main() {
796 /// let one = stream::iter(vec![1, 2, 3]);
797 /// let two = stream::iter(vec![4, 5, 6]);
798 ///
799 /// let mut stream = one.chain(two);
800 ///
801 /// assert_eq!(stream.next().await, Some(1));
802 /// assert_eq!(stream.next().await, Some(2));
803 /// assert_eq!(stream.next().await, Some(3));
804 /// assert_eq!(stream.next().await, Some(4));
805 /// assert_eq!(stream.next().await, Some(5));
806 /// assert_eq!(stream.next().await, Some(6));
807 /// assert_eq!(stream.next().await, None);
808 /// # }
809 /// ```
810 fn chain<U>(self, other: U) -> Chain<Self, U>
811 where
812 U: Stream<Item = Self::Item>,
813 Self: Sized,
814 {
815 Chain::new(self, other)
816 }
817
818 /// A combinator that applies a function to every element in a stream
819 /// producing a single, final value.
820 ///
821 /// Equivalent to:
822 ///
823 /// ```ignore
824 /// async fn fold<B, F>(self, init: B, f: F) -> B;
825 /// ```
826 ///
827 /// # Examples
828 /// Basic usage:
829 /// ```
830 /// # #[tokio::main(flavor = "current_thread")]
831 /// # async fn main() {
832 /// use tokio_stream::{self as stream, *};
833 ///
834 /// let s = stream::iter(vec![1u8, 2, 3]);
835 /// let sum = s.fold(0, |acc, x| acc + x).await;
836 ///
837 /// assert_eq!(sum, 6);
838 /// # }
839 /// ```
840 fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
841 where
842 Self: Sized,
843 F: FnMut(B, Self::Item) -> B,
844 {
845 FoldFuture::new(self, init, f)
846 }
847
848 /// Drain stream pushing all emitted values into a collection.
849 ///
850 /// Equivalent to:
851 ///
852 /// ```ignore
853 /// async fn collect<T>(self) -> T;
854 /// ```
855 ///
856 /// `collect` streams all values, awaiting as needed. Values are pushed into
857 /// a collection. A number of different target collection types are
858 /// supported, including [`Vec`], [`String`], and [`Bytes`].
859 ///
860 /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
861 ///
862 /// # `Result`
863 ///
864 /// `collect()` can also be used with streams of type `Result<T, E>` where
865 /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
866 /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
867 /// streaming is terminated and `collect()` returns the `Err`.
868 ///
869 /// # Notes
870 ///
871 /// `FromStream` is currently a sealed trait. Stabilization is pending
872 /// enhancements to the Rust language.
873 ///
874 /// # Examples
875 ///
876 /// Basic usage:
877 ///
878 /// ```
879 /// use tokio_stream::{self as stream, StreamExt};
880 ///
881 /// # #[tokio::main(flavor = "current_thread")]
882 /// # async fn main() {
883 /// let doubled: Vec<i32> =
884 /// stream::iter(vec![1, 2, 3])
885 /// .map(|x| x * 2)
886 /// .collect()
887 /// .await;
888 ///
889 /// assert_eq!(vec![2, 4, 6], doubled);
890 /// # }
891 /// ```
892 ///
893 /// Collecting a stream of `Result` values
894 ///
895 /// ```
896 /// use tokio_stream::{self as stream, StreamExt};
897 ///
898 /// # #[tokio::main(flavor = "current_thread")]
899 /// # async fn main() {
900 /// // A stream containing only `Ok` values will be collected
901 /// let values: Result<Vec<i32>, &str> =
902 /// stream::iter(vec![Ok(1), Ok(2), Ok(3)])
903 /// .collect()
904 /// .await;
905 ///
906 /// assert_eq!(Ok(vec![1, 2, 3]), values);
907 ///
908 /// // A stream containing `Err` values will return the first error.
909 /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
910 ///
911 /// let values: Result<Vec<i32>, &str> =
912 /// stream::iter(results)
913 /// .collect()
914 /// .await;
915 ///
916 /// assert_eq!(Err("no"), values);
917 /// # }
918 /// ```
919 fn collect<T>(self) -> Collect<Self, T, T::InternalCollection>
920 where
921 T: FromStream<Self::Item>,
922 Self: Sized,
923 {
924 Collect::new(self)
925 }
926
927 /// Applies a per-item timeout to the passed stream.
928 ///
929 /// `timeout()` takes a `Duration` that represents the maximum amount of
930 /// time each element of the stream has to complete before timing out.
931 ///
932 /// If the wrapped stream yields a value before the deadline is reached, the
933 /// value is returned. Otherwise, an error is returned. The caller may decide
934 /// to continue consuming the stream and will eventually get the next source
935 /// stream value once it becomes available. See
936 /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
937 /// where the timeouts will repeat.
938 ///
939 /// # Notes
940 ///
941 /// This function consumes the stream passed into it and returns a
942 /// wrapped version of it.
943 ///
944 /// Polling the returned stream will continue to poll the inner stream even
945 /// if one or more items time out.
946 ///
947 /// # Examples
948 ///
949 /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
950 ///
951 /// ```
952 /// # #[tokio::main(flavor = "current_thread")]
953 /// # async fn main() {
954 /// use tokio_stream::{self as stream, StreamExt};
955 /// use std::time::Duration;
956 /// # let int_stream = stream::iter(1..=3);
957 ///
958 /// let int_stream = int_stream.timeout(Duration::from_secs(1));
959 /// tokio::pin!(int_stream);
960 ///
961 /// // When no items time out, we get the 3 elements in succession:
962 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
963 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
964 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
965 /// assert_eq!(int_stream.try_next().await, Ok(None));
966 ///
967 /// // If the second item times out, we get an error and continue polling the stream:
968 /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
969 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
970 /// assert!(int_stream.try_next().await.is_err());
971 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
972 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
973 /// assert_eq!(int_stream.try_next().await, Ok(None));
974 ///
975 /// // If we want to stop consuming the source stream the first time an
976 /// // element times out, we can use the `take_while` operator:
977 /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
978 /// let mut int_stream = int_stream.take_while(Result::is_ok);
979 ///
980 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
981 /// assert_eq!(int_stream.try_next().await, Ok(None));
982 /// # }
983 /// ```
984 ///
985 /// Once a timeout error is received, no further events will be received
986 /// unless the wrapped stream yields a value (timeouts do not repeat).
987 ///
988 /// ```
989 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
990 /// # async fn main() {
991 /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
992 /// use std::time::Duration;
993 /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
994 /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
995 /// tokio::pin!(timeout_stream);
996 ///
997 /// // Only one timeout will be received between values in the source stream.
998 /// assert!(timeout_stream.try_next().await.is_ok());
999 /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
1000 /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
1001 /// # }
1002 /// ```
1003 #[cfg(feature = "time")]
1004 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1005 fn timeout(self, duration: Duration) -> Timeout<Self>
1006 where
1007 Self: Sized,
1008 {
1009 Timeout::new(self, duration)
1010 }
1011
1012 /// Applies a per-item timeout to the passed stream.
1013 ///
1014 /// `timeout_repeating()` takes an [`Interval`] that controls the time each
1015 /// element of the stream has to complete before timing out.
1016 ///
1017 /// If the wrapped stream yields a value before the deadline is reached, the
1018 /// value is returned. Otherwise, an error is returned. The caller may decide
1019 /// to continue consuming the stream and will eventually get the next source
1020 /// stream value once it becomes available. Unlike `timeout()`, if no value
1021 /// becomes available before the deadline is reached, additional errors are
1022 /// returned at the specified interval. See [`timeout`](StreamExt::timeout)
1023 /// for an alternative where the timeouts do not repeat.
1024 ///
1025 /// # Notes
1026 ///
1027 /// This function consumes the stream passed into it and returns a
1028 /// wrapped version of it.
1029 ///
1030 /// Polling the returned stream will continue to poll the inner stream even
1031 /// if one or more items time out.
1032 ///
1033 /// # Examples
1034 ///
1035 /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
1036 ///
1037 /// ```
1038 /// # #[tokio::main(flavor = "current_thread")]
1039 /// # async fn main() {
1040 /// use tokio_stream::{self as stream, StreamExt};
1041 /// use std::time::Duration;
1042 /// # let int_stream = stream::iter(1..=3);
1043 ///
1044 /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
1045 /// tokio::pin!(int_stream);
1046 ///
1047 /// // When no items time out, we get the 3 elements in succession:
1048 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1049 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1050 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1051 /// assert_eq!(int_stream.try_next().await, Ok(None));
1052 ///
1053 /// // If the second item times out, we get an error and continue polling the stream:
1054 /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1055 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1056 /// assert!(int_stream.try_next().await.is_err());
1057 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1058 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1059 /// assert_eq!(int_stream.try_next().await, Ok(None));
1060 ///
1061 /// // If we want to stop consuming the source stream the first time an
1062 /// // element times out, we can use the `take_while` operator:
1063 /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1064 /// let mut int_stream = int_stream.take_while(Result::is_ok);
1065 ///
1066 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1067 /// assert_eq!(int_stream.try_next().await, Ok(None));
1068 /// # }
1069 /// ```
1070 ///
1071 /// Timeout errors will be continuously produced at the specified interval
1072 /// until the wrapped stream yields a value.
1073 ///
1074 /// ```
1075 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1076 /// # async fn main() {
1077 /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
1078 /// use std::time::Duration;
1079 /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
1080 /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
1081 /// tokio::pin!(timeout_stream);
1082 ///
1083 /// // Multiple timeouts will be received between values in the source stream.
1084 /// assert!(timeout_stream.try_next().await.is_ok());
1085 /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
1086 /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
1087 /// // Will eventually receive another value from the source stream...
1088 /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
1089 /// # }
1090 /// ```
1091 #[cfg(feature = "time")]
1092 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1093 fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
1094 where
1095 Self: Sized,
1096 {
1097 TimeoutRepeating::new(self, interval)
1098 }
1099
1100 /// Slows down a stream by enforcing a delay between items.
1101 ///
1102 /// The underlying timer behind this utility has a granularity of one millisecond.
1103 ///
1104 /// # Example
1105 ///
1106 /// Create a throttled stream.
1107 /// ```rust,no_run
1108 /// use std::time::Duration;
1109 /// use tokio_stream::StreamExt;
1110 ///
1111 /// # async fn dox() {
1112 /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
1113 /// tokio::pin!(item_stream);
1114 ///
1115 /// loop {
1116 /// // The string will be produced at most every 2 seconds
1117 /// println!("{:?}", item_stream.next().await);
1118 /// }
1119 /// # }
1120 /// ```
1121 #[cfg(feature = "time")]
1122 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1123 fn throttle(self, duration: Duration) -> Throttle<Self>
1124 where
1125 Self: Sized,
1126 {
1127 throttle(duration, self)
1128 }
1129
1130 /// Batches the items in the given stream using a maximum duration and size for each batch.
1131 ///
1132 /// This stream returns the next batch of items in the following situations:
1133 /// 1. The inner stream has returned at least `max_size` many items since the last batch.
1134 /// 2. The time since the first item of a batch is greater than the given duration.
1135 /// 3. The end of the stream is reached.
1136 ///
1137 /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
1138 /// will not be emitted if no items are received upstream.
1139 ///
1140 /// # Panics
1141 ///
1142 /// This function panics if `max_size` is zero
1143 ///
1144 /// # Example
1145 ///
1146 /// ```rust
1147 /// use std::time::Duration;
1148 /// use tokio::time;
1149 /// use tokio_stream::{self as stream, StreamExt};
1150 /// use futures::FutureExt;
1151 ///
1152 /// #[tokio::main]
1153 /// # async fn _unused() {}
1154 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1155 /// async fn main() {
1156 /// let iter = vec![1, 2, 3, 4].into_iter();
1157 /// let stream0 = stream::iter(iter);
1158 ///
1159 /// let iter = vec![5].into_iter();
1160 /// let stream1 = stream::iter(iter)
1161 /// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
1162 ///
1163 /// let chunk_stream = stream0
1164 /// .chain(stream1)
1165 /// .chunks_timeout(3, Duration::from_secs(2));
1166 /// tokio::pin!(chunk_stream);
1167 ///
1168 /// // a full batch was received
1169 /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
1170 /// // deadline was reached before max_size was reached
1171 /// assert_eq!(chunk_stream.next().await, Some(vec![4]));
1172 /// // last element in the stream
1173 /// assert_eq!(chunk_stream.next().await, Some(vec![5]));
1174 /// }
1175 /// ```
1176 #[cfg(feature = "time")]
1177 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1178 #[track_caller]
1179 fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
1180 where
1181 Self: Sized,
1182 {
1183 assert!(max_size > 0, "`max_size` must be non-zero.");
1184 ChunksTimeout::new(self, max_size, duration)
1185 }
1186
1187 /// Turns the stream into a peekable stream, whose next element can be peeked at without being
1188 /// consumed.
1189 /// ```rust
1190 /// use tokio_stream::{self as stream, StreamExt};
1191 ///
1192 /// #[tokio::main]
1193 /// # async fn _unused() {}
1194 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1195 /// async fn main() {
1196 /// let iter = vec![1, 2, 3, 4].into_iter();
1197 /// let mut stream = stream::iter(iter).peekable();
1198 ///
1199 /// assert_eq!(*stream.peek().await.unwrap(), 1);
1200 /// assert_eq!(*stream.peek().await.unwrap(), 1);
1201 /// assert_eq!(stream.next().await.unwrap(), 1);
1202 /// assert_eq!(*stream.peek().await.unwrap(), 2);
1203 /// }
1204 /// ```
1205 fn peekable(self) -> Peekable<Self>
1206 where
1207 Self: Sized,
1208 {
1209 Peekable::new(self)
1210 }
1211}
1212
1213impl<St: ?Sized> StreamExt for St where St: Stream {}
1214
1215/// Merge the size hints from two streams.
1216fn merge_size_hints(
1217 (left_low, left_high): (usize, Option<usize>),
1218 (right_low, right_high): (usize, Option<usize>),
1219) -> (usize, Option<usize>) {
1220 let low = left_low.saturating_add(right_low);
1221 let high = match (left_high, right_high) {
1222 (Some(h1), Some(h2)) => h1.checked_add(h2),
1223 _ => None,
1224 };
1225 (low, high)
1226}