1mod all;
25mod any;
26mod chain;
27mod cloned;
28mod cmp;
29mod copied;
30mod cycle;
31mod enumerate;
32mod eq;
33mod filter;
34mod filter_map;
35mod find;
36mod find_map;
37mod fold;
38mod for_each;
39mod fuse;
40mod ge;
41mod gt;
42mod inspect;
43mod last;
44mod le;
45mod lt;
46mod map;
47mod max;
48mod max_by;
49mod max_by_key;
50mod min;
51mod min_by;
52mod min_by_key;
53mod ne;
54mod next;
55mod nth;
56mod partial_cmp;
57mod position;
58mod scan;
59mod skip;
60mod skip_while;
61mod step_by;
62mod take;
63mod take_while;
64mod try_fold;
65mod try_for_each;
66mod zip;
67
68use all::AllFuture;
69use any::AnyFuture;
70use cmp::CmpFuture;
71use cycle::Cycle;
72use enumerate::Enumerate;
73use eq::EqFuture;
74use filter_map::FilterMap;
75use find::FindFuture;
76use find_map::FindMapFuture;
77use fold::FoldFuture;
78use for_each::ForEachFuture;
79use ge::GeFuture;
80use gt::GtFuture;
81use last::LastFuture;
82use le::LeFuture;
83use lt::LtFuture;
84use max::MaxFuture;
85use max_by::MaxByFuture;
86use max_by_key::MaxByKeyFuture;
87use min::MinFuture;
88use min_by::MinByFuture;
89use min_by_key::MinByKeyFuture;
90use ne::NeFuture;
91use next::NextFuture;
92use nth::NthFuture;
93use partial_cmp::PartialCmpFuture;
94use position::PositionFuture;
95use try_fold::TryFoldFuture;
96use try_for_each::TryForEachFuture;
97
98pub use chain::Chain;
99pub use cloned::Cloned;
100pub use copied::Copied;
101pub use filter::Filter;
102pub use fuse::Fuse;
103pub use inspect::Inspect;
104pub use map::Map;
105pub use scan::Scan;
106pub use skip::Skip;
107pub use skip_while::SkipWhile;
108pub use step_by::StepBy;
109pub use take::Take;
110pub use take_while::TakeWhile;
111pub use zip::Zip;
112
113use std::cmp::Ordering;
114
115cfg_unstable! {
116 use std::future::Future;
117 use std::pin::Pin;
118 use std::time::Duration;
119
120 use crate::stream::into_stream::IntoStream;
121 use crate::stream::{FromStream, Product, Sum};
122 use crate::stream::Extend;
123
124 use count::CountFuture;
125 use partition::PartitionFuture;
126 use unzip::UnzipFuture;
127
128 pub use merge::Merge;
129 pub use flatten::Flatten;
130 pub use flat_map::FlatMap;
131 pub use timeout::{TimeoutError, Timeout};
132 pub use throttle::Throttle;
133 pub use delay::Delay;
134
135 mod count;
136 mod merge;
137 mod flatten;
138 mod flat_map;
139 mod partition;
140 mod timeout;
141 mod throttle;
142 mod delay;
143 mod unzip;
144}
145
146extension_trait! {
147 use std::ops::{Deref, DerefMut};
148
149 use crate::task::{Context, Poll};
150
151 #[doc = r#"
152 An asynchronous stream of values.
153
154 This trait is a re-export of [`futures::stream::Stream`] and is an async version of
155 [`std::iter::Iterator`].
156
157 The [provided methods] do not really exist in the trait itself, but they become
158 available when [`StreamExt`] from the [prelude] is imported:
159
160 ```
161 # #[allow(unused_imports)]
162 use async_std::prelude::*;
163 ```
164
165 [`std::iter::Iterator`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html
166 [`futures::stream::Stream`]:
167 https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
168 [provided methods]: #provided-methods
169 [`StreamExt`]: ../prelude/trait.StreamExt.html
170 [prelude]: ../prelude/index.html
171 "#]
172 pub trait Stream {
173 #[doc = r#"
174 The type of items yielded by this stream.
175 "#]
176 type Item;
177
178 #[doc = r#"
179 Attempts to receive the next item from the stream.
180
181 There are several possible return values:
182
183 * `Poll::Pending` means this stream's next value is not ready yet.
184 * `Poll::Ready(None)` means this stream has been exhausted.
185 * `Poll::Ready(Some(item))` means `item` was received out of the stream.
186
187 # Examples
188
189 ```
190 # fn main() { async_std::task::block_on(async {
191 #
192 use std::pin::Pin;
193
194 use async_std::prelude::*;
195 use async_std::stream;
196 use async_std::task::{Context, Poll};
197
198 fn increment(
199 s: impl Stream<Item = i32> + Unpin,
200 ) -> impl Stream<Item = i32> + Unpin {
201 struct Increment<S>(S);
202
203 impl<S: Stream<Item = i32> + Unpin> Stream for Increment<S> {
204 type Item = S::Item;
205
206 fn poll_next(
207 mut self: Pin<&mut Self>,
208 cx: &mut Context<'_>,
209 ) -> Poll<Option<Self::Item>> {
210 match Pin::new(&mut self.0).poll_next(cx) {
211 Poll::Pending => Poll::Pending,
212 Poll::Ready(None) => Poll::Ready(None),
213 Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
214 }
215 }
216 }
217
218 Increment(s)
219 }
220
221 let mut s = increment(stream::once(7));
222
223 assert_eq!(s.next().await, Some(8));
224 assert_eq!(s.next().await, None);
225 #
226 # }) }
227 ```
228 "#]
229 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
230 }
231
232 #[doc = r#"
233 Extension methods for [`Stream`].
234
235 [`Stream`]: ../stream/trait.Stream.html
236 "#]
237 pub trait StreamExt: futures_core::stream::Stream {
238 #[doc = r#"
239 Advances the stream and returns the next value.
240
241 Returns [`None`] when iteration is finished. Individual stream implementations may
242 choose to resume iteration, and so calling `next()` again may or may not eventually
243 start returning more values.
244
245 [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
246
247 # Examples
248
249 ```
250 # fn main() { async_std::task::block_on(async {
251 #
252 use async_std::prelude::*;
253 use async_std::stream;
254
255 let mut s = stream::once(7);
256
257 assert_eq!(s.next().await, Some(7));
258 assert_eq!(s.next().await, None);
259 #
260 # }) }
261 ```
262 "#]
263 fn next(&mut self) -> impl Future<Output = Option<Self::Item>> + '_ [NextFuture<'_, Self>]
264 where
265 Self: Unpin,
266 {
267 NextFuture { stream: self }
268 }
269
270 #[doc = r#"
271 Creates a stream that yields its first `n` elements.
272
273 # Examples
274
275 ```
276 # fn main() { async_std::task::block_on(async {
277 #
278 use async_std::prelude::*;
279 use async_std::stream;
280
281 let mut s = stream::repeat(9).take(3);
282
283 while let Some(v) = s.next().await {
284 assert_eq!(v, 9);
285 }
286 #
287 # }) }
288 ```
289 "#]
290 fn take(self, n: usize) -> Take<Self>
291 where
292 Self: Sized,
293 {
294 Take::new(self, n)
295 }
296
297 #[doc = r#"
298 Creates a stream that yields elements based on a predicate.
299
300 # Examples
301
302 ```
303 # fn main() { async_std::task::block_on(async {
304 #
305 use async_std::prelude::*;
306 use async_std::stream;
307
308 let s = stream::from_iter(vec![1, 2, 3, 4]);
309 let mut s = s.take_while(|x| x < &3 );
310
311 assert_eq!(s.next().await, Some(1));
312 assert_eq!(s.next().await, Some(2));
313 assert_eq!(s.next().await, None);
314 #
315 # }) }
316 ```
317 "#]
318 fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
319 where
320 Self: Sized,
321 P: FnMut(&Self::Item) -> bool,
322 {
323 TakeWhile::new(self, predicate)
324 }
325
326 #[doc = r#"
327 Limit the amount of items yielded per timeslice in a stream.
328
329 This stream does not drop any items, but will only limit the rate at which items pass through.
330 # Examples
331 ```
332 # fn main() { async_std::task::block_on(async {
333 #
334 use async_std::prelude::*;
335 use async_std::stream;
336 use std::time::{Duration, Instant};
337
338 let start = Instant::now();
339
340 // emit value every 5 milliseconds
341 let s = stream::interval(Duration::from_millis(5)).take(2);
342
343 // throttle for 10 milliseconds
344 let mut s = s.throttle(Duration::from_millis(10));
345
346 s.next().await;
347 assert!(start.elapsed().as_millis() >= 5);
348
349 s.next().await;
350 assert!(start.elapsed().as_millis() >= 15);
351
352 s.next().await;
353 assert!(start.elapsed().as_millis() >= 25);
354 #
355 # }) }
356 ```
357 "#]
358 #[cfg(feature = "unstable")]
359 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
360 fn throttle(self, d: Duration) -> Throttle<Self>
361 where
362 Self: Sized,
363 {
364 Throttle::new(self, d)
365 }
366
367 #[doc = r#"
368 Creates a stream that yields each `step`th element.
369
370 # Panics
371
372 This method will panic if the given step is `0`.
373
374 # Examples
375
376 Basic usage:
377
378 ```
379 # fn main() { async_std::task::block_on(async {
380 #
381 use async_std::prelude::*;
382 use async_std::stream;
383
384 let s = stream::from_iter(vec![0u8, 1, 2, 3, 4]);
385 let mut stepped = s.step_by(2);
386
387 assert_eq!(stepped.next().await, Some(0));
388 assert_eq!(stepped.next().await, Some(2));
389 assert_eq!(stepped.next().await, Some(4));
390 assert_eq!(stepped.next().await, None);
391
392 #
393 # }) }
394 ```
395 "#]
396 fn step_by(self, step: usize) -> StepBy<Self>
397 where
398 Self: Sized,
399 {
400 StepBy::new(self, step)
401 }
402
403 #[doc = r#"
404 Takes two streams and creates a new stream over both in sequence.
405
406 # Examples
407
408 Basic usage:
409
410 ```
411 # fn main() { async_std::task::block_on(async {
412 #
413 use async_std::prelude::*;
414 use async_std::stream;
415
416 let first = stream::from_iter(vec![0u8, 1]);
417 let second = stream::from_iter(vec![2, 3]);
418 let mut c = first.chain(second);
419
420 assert_eq!(c.next().await, Some(0));
421 assert_eq!(c.next().await, Some(1));
422 assert_eq!(c.next().await, Some(2));
423 assert_eq!(c.next().await, Some(3));
424 assert_eq!(c.next().await, None);
425
426 #
427 # }) }
428 ```
429 "#]
430 fn chain<U>(self, other: U) -> Chain<Self, U>
431 where
432 Self: Sized,
433 U: Stream<Item = Self::Item> + Sized,
434 {
435 Chain::new(self, other)
436 }
437
438 #[doc = r#"
439 Creates an stream which copies all of its elements.
440
441 # Examples
442
443 Basic usage:
444
445 ```
446 # fn main() { async_std::task::block_on(async {
447 #
448 use async_std::prelude::*;
449 use async_std::stream;
450
451 let v = stream::from_iter(vec![&1, &2, &3]);
452
453 let mut v_cloned = v.cloned();
454
455 assert_eq!(v_cloned.next().await, Some(1));
456 assert_eq!(v_cloned.next().await, Some(2));
457 assert_eq!(v_cloned.next().await, Some(3));
458 assert_eq!(v_cloned.next().await, None);
459
460 #
461 # }) }
462 ```
463 "#]
464 fn cloned<'a, T>(self) -> Cloned<Self>
465 where
466 Self: Sized + Stream<Item = &'a T>,
467 T: Clone + 'a,
468 {
469 Cloned::new(self)
470 }
471
472
473 #[doc = r#"
474 Creates an stream which copies all of its elements.
475
476 # Examples
477
478 Basic usage:
479
480 ```
481 # fn main() { async_std::task::block_on(async {
482 #
483 use async_std::prelude::*;
484 use async_std::stream;
485
486 let s = stream::from_iter(vec![&1, &2, &3]);
487 let mut s_copied = s.copied();
488
489 assert_eq!(s_copied.next().await, Some(1));
490 assert_eq!(s_copied.next().await, Some(2));
491 assert_eq!(s_copied.next().await, Some(3));
492 assert_eq!(s_copied.next().await, None);
493 #
494 # }) }
495 ```
496 "#]
497 fn copied<'a, T>(self) -> Copied<Self>
498 where
499 Self: Sized + Stream<Item = &'a T>,
500 T: Copy + 'a,
501 {
502 Copied::new(self)
503 }
504
505 #[doc = r#"
506 Creates a stream that yields the provided values infinitely and in order.
507
508 # Examples
509
510 Basic usage:
511
512 ```
513 # async_std::task::block_on(async {
514 #
515 use async_std::prelude::*;
516 use async_std::stream;
517
518 let mut s = stream::once(7).cycle();
519
520 assert_eq!(s.next().await, Some(7));
521 assert_eq!(s.next().await, Some(7));
522 assert_eq!(s.next().await, Some(7));
523 assert_eq!(s.next().await, Some(7));
524 assert_eq!(s.next().await, Some(7));
525 #
526 # })
527 ```
528 "#]
529 fn cycle(self) -> Cycle<Self>
530 where
531 Self: Clone + Sized,
532 {
533 Cycle::new(self)
534 }
535
536 #[doc = r#"
537 Creates a stream that gives the current element's count as well as the next value.
538
539 # Overflow behaviour.
540
541 This combinator does no guarding against overflows.
542
543 # Examples
544
545 ```
546 # fn main() { async_std::task::block_on(async {
547 #
548 use async_std::prelude::*;
549 use async_std::stream;
550
551 let s = stream::from_iter(vec!['a', 'b', 'c']);
552 let mut s = s.enumerate();
553
554 assert_eq!(s.next().await, Some((0, 'a')));
555 assert_eq!(s.next().await, Some((1, 'b')));
556 assert_eq!(s.next().await, Some((2, 'c')));
557 assert_eq!(s.next().await, None);
558 #
559 # }) }
560 ```
561 "#]
562 fn enumerate(self) -> Enumerate<Self>
563 where
564 Self: Sized,
565 {
566 Enumerate::new(self)
567 }
568
569 #[doc = r#"
570 Creates a stream that is delayed before it starts yielding items.
571
572 # Examples
573
574 ```
575 # fn main() { async_std::task::block_on(async {
576 #
577 use async_std::prelude::*;
578 use async_std::stream;
579 use std::time::{Duration, Instant};
580
581 let start = Instant::now();
582 let mut s = stream::from_iter(vec![0u8, 1, 2]).delay(Duration::from_millis(200));
583
584 assert_eq!(s.next().await, Some(0));
585 // The first time will take more than 200ms due to delay.
586 assert!(start.elapsed().as_millis() >= 200);
587
588 assert_eq!(s.next().await, Some(1));
589 // There will be no delay after the first time.
590 assert!(start.elapsed().as_millis() < 400);
591
592 assert_eq!(s.next().await, Some(2));
593 assert!(start.elapsed().as_millis() < 400);
594
595 assert_eq!(s.next().await, None);
596 assert!(start.elapsed().as_millis() < 400);
597 #
598 # }) }
599 ```
600 "#]
601 #[cfg(any(feature = "unstable", feature = "docs"))]
602 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
603 fn delay(self, dur: std::time::Duration) -> Delay<Self>
604 where
605 Self: Sized,
606 {
607 Delay::new(self, dur)
608 }
609
610 #[doc = r#"
611 Takes a closure and creates a stream that calls that closure on every element of this stream.
612
613 # Examples
614
615 ```
616 # fn main() { async_std::task::block_on(async {
617 #
618 use async_std::prelude::*;
619 use async_std::stream;
620
621 let s = stream::from_iter(vec![1, 2, 3]);
622 let mut s = s.map(|x| 2 * x);
623
624 assert_eq!(s.next().await, Some(2));
625 assert_eq!(s.next().await, Some(4));
626 assert_eq!(s.next().await, Some(6));
627 assert_eq!(s.next().await, None);
628
629 #
630 # }) }
631 ```
632 "#]
633 fn map<B, F>(self, f: F) -> Map<Self, F>
634 where
635 Self: Sized,
636 F: FnMut(Self::Item) -> B,
637 {
638 Map::new(self, f)
639 }
640
641 #[doc = r#"
642 A combinator that does something with each element in the stream, passing the value
643 on.
644
645 # Examples
646
647 Basic usage:
648
649 ```
650 # fn main() { async_std::task::block_on(async {
651 #
652 use async_std::prelude::*;
653 use async_std::stream;
654
655 let s = stream::from_iter(vec![1, 2, 3, 4, 5]);
656
657 let sum = s
658 .inspect(|x| println!("about to filter {}", x))
659 .filter(|x| x % 2 == 0)
660 .inspect(|x| println!("made it through filter: {}", x))
661 .fold(0, |sum, i| sum + i)
662 .await;
663
664 assert_eq!(sum, 6);
665 #
666 # }) }
667 ```
668 "#]
669 fn inspect<F>(self, f: F) -> Inspect<Self, F>
670 where
671 Self: Sized,
672 F: FnMut(&Self::Item),
673 {
674 Inspect::new(self, f)
675 }
676
677 #[doc = r#"
678 Returns the last element of the stream.
679
680 # Examples
681
682 Basic usage:
683
684 ```
685 # fn main() { async_std::task::block_on(async {
686 #
687 use async_std::prelude::*;
688 use async_std::stream;
689
690 let s = stream::from_iter(vec![1, 2, 3]);
691
692 let last = s.last().await;
693 assert_eq!(last, Some(3));
694 #
695 # }) }
696 ```
697
698 An empty stream will return `None`:
699 ```
700 # fn main() { async_std::task::block_on(async {
701 #
702 use async_std::stream;
703 use crate::async_std::prelude::*;
704
705 let s = stream::empty::<()>();
706
707 let last = s.last().await;
708 assert_eq!(last, None);
709 #
710 # }) }
711 ```
712 "#]
713 fn last(
714 self,
715 ) -> impl Future<Output = Option<Self::Item>> [LastFuture<Self, Self::Item>]
716 where
717 Self: Sized,
718 {
719 LastFuture::new(self)
720 }
721
722 #[doc = r#"
723 Creates a stream which ends after the first `None`.
724
725 After a stream returns `None`, future calls may or may not yield `Some(T)` again.
726 `fuse()` adapts an iterator, ensuring that after a `None` is given, it will always
727 return `None` forever.
728
729 # Examples
730
731 ```
732 # fn main() { async_std::task::block_on(async {
733 #
734 use async_std::prelude::*;
735 use async_std::stream;
736
737 let mut s = stream::once(1).fuse();
738 assert_eq!(s.next().await, Some(1));
739 assert_eq!(s.next().await, None);
740 assert_eq!(s.next().await, None);
741 #
742 # }) }
743 ```
744 "#]
745 fn fuse(self) -> Fuse<Self>
746 where
747 Self: Sized,
748 {
749 Fuse::new(self)
750 }
751
752 #[doc = r#"
753 Creates a stream that uses a predicate to determine if an element should be yielded.
754
755 # Examples
756
757 Basic usage:
758
759 ```
760 # fn main() { async_std::task::block_on(async {
761 #
762 use async_std::prelude::*;
763 use async_std::stream;
764
765 let s = stream::from_iter(vec![1, 2, 3, 4]);
766 let mut s = s.filter(|i| i % 2 == 0);
767
768 assert_eq!(s.next().await, Some(2));
769 assert_eq!(s.next().await, Some(4));
770 assert_eq!(s.next().await, None);
771 #
772 # }) }
773 ```
774 "#]
775 fn filter<P>(self, predicate: P) -> Filter<Self, P>
776 where
777 Self: Sized,
778 P: FnMut(&Self::Item) -> bool,
779 {
780 Filter::new(self, predicate)
781 }
782
783 #[doc= r#"
784 Creates an stream that works like map, but flattens nested structure.
785
786 # Examples
787
788 Basic usage:
789
790 ```
791 # async_std::task::block_on(async {
792
793 use async_std::prelude::*;
794 use async_std::stream;
795
796 let words = stream::from_iter(&["alpha", "beta", "gamma"]);
797
798 let merged: String = words
799 .flat_map(|s| stream::from_iter(s.chars()))
800 .collect().await;
801 assert_eq!(merged, "alphabetagamma");
802
803 let d3 = stream::from_iter(&[[[1, 2], [3, 4]], [[5, 6], [7, 8]]]);
804 let d1: Vec<_> = d3
805 .flat_map(|item| stream::from_iter(item))
806 .flat_map(|item| stream::from_iter(item))
807 .collect().await;
808
809 assert_eq!(d1, [&1, &2, &3, &4, &5, &6, &7, &8]);
810 # });
811 ```
812 "#]
813 #[cfg(feature = "unstable")]
814 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
815 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
816 where
817 Self: Sized,
818 U: IntoStream,
819 F: FnMut(Self::Item) -> U,
820 {
821 FlatMap::new(self, f)
822 }
823
824 #[doc = r#"
825 Creates an stream that flattens nested structure.
826
827 # Examples
828
829 Basic usage:
830
831 ```
832 # async_std::task::block_on(async {
833
834 use async_std::prelude::*;
835 use async_std::stream;
836
837 let inner1 = stream::from_iter(vec![1u8,2,3]);
838 let inner2 = stream::from_iter(vec![4u8,5,6]);
839 let s = stream::from_iter(vec![inner1, inner2]);
840
841 let v: Vec<_> = s.flatten().collect().await;
842
843 assert_eq!(v, vec![1,2,3,4,5,6]);
844
845 # });
846 "#]
847 #[cfg(feature = "unstable")]
848 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
849 fn flatten(self) -> Flatten<Self>
850 where
851 Self: Sized,
852 Self::Item: IntoStream,
853 {
854 Flatten::new(self)
855 }
856
857 #[doc = r#"
858 Both filters and maps a stream.
859
860 # Examples
861
862 Basic usage:
863
864 ```
865 # fn main() { async_std::task::block_on(async {
866 #
867
868 use async_std::prelude::*;
869 use async_std::stream;
870
871 let s = stream::from_iter(vec!["1", "lol", "3", "NaN", "5"]);
872
873 let mut parsed = s.filter_map(|a| a.parse::<u32>().ok());
874
875 let one = parsed.next().await;
876 assert_eq!(one, Some(1));
877
878 let three = parsed.next().await;
879 assert_eq!(three, Some(3));
880
881 let five = parsed.next().await;
882 assert_eq!(five, Some(5));
883
884 let end = parsed.next().await;
885 assert_eq!(end, None);
886 #
887 # }) }
888 ```
889 "#]
890 fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
891 where
892 Self: Sized,
893 F: FnMut(Self::Item) -> Option<B>,
894 {
895 FilterMap::new(self, f)
896 }
897
898 #[doc = r#"
899 Returns the element that gives the minimum value with respect to the
900 specified key function. If several elements are equally minimum,
901 the first element is returned. If the stream is empty, `None` is returned.
902
903 # Examples
904
905 ```
906 # fn main() { async_std::task::block_on(async {
907 #
908 use async_std::prelude::*;
909 use async_std::stream;
910
911 let s = stream::from_iter(vec![-1isize, 2, -3]);
912
913 let min = s.clone().min_by_key(|x| x.abs()).await;
914 assert_eq!(min, Some(-1));
915
916 let min = stream::empty::<isize>().min_by_key(|x| x.abs()).await;
917 assert_eq!(min, None);
918 #
919 # }) }
920 ```
921 "#]
922 fn min_by_key<B, F>(
923 self,
924 key_by: F,
925 ) -> impl Future<Output = Option<Self::Item>> [MinByKeyFuture<Self, Self::Item, F>]
926 where
927 Self: Sized,
928 B: Ord,
929 F: FnMut(&Self::Item) -> B,
930 {
931 MinByKeyFuture::new(self, key_by)
932 }
933
934 #[doc = r#"
935 Returns the element that gives the maximum value with respect to the
936 specified key function. If several elements are equally maximum,
937 the first element is returned. If the stream is empty, `None` is returned.
938
939 # Examples
940
941 ```
942 # fn main() { async_std::task::block_on(async {
943 #
944 use async_std::prelude::*;
945 use async_std::stream;
946
947 let s = stream::from_iter(vec![-3_i32, 0, 1, 5, -10]);
948
949 let max = s.clone().max_by_key(|x| x.abs()).await;
950 assert_eq!(max, Some(-10));
951
952 let max = stream::empty::<isize>().max_by_key(|x| x.abs()).await;
953 assert_eq!(max, None);
954 #
955 # }) }
956 ```
957 "#]
958 fn max_by_key<B, F>(
959 self,
960 key_by: F,
961 ) -> impl Future<Output = Option<Self::Item>> [MaxByKeyFuture<Self, Self::Item, F>]
962 where
963 Self: Sized,
964 B: Ord,
965 F: FnMut(&Self::Item) -> B,
966 {
967 MaxByKeyFuture::new(self, key_by)
968 }
969
970 #[doc = r#"
971 Returns the element that gives the minimum value with respect to the
972 specified comparison function. If several elements are equally minimum,
973 the first element is returned. If the stream is empty, `None` is returned.
974
975 # Examples
976
977 ```
978 # fn main() { async_std::task::block_on(async {
979 #
980 use async_std::prelude::*;
981 use async_std::stream;
982
983 let s = stream::from_iter(vec![1u8, 2, 3]);
984
985 let min = s.clone().min_by(|x, y| x.cmp(y)).await;
986 assert_eq!(min, Some(1));
987
988 let min = s.min_by(|x, y| y.cmp(x)).await;
989 assert_eq!(min, Some(3));
990
991 let min = stream::empty::<u8>().min_by(|x, y| x.cmp(y)).await;
992 assert_eq!(min, None);
993 #
994 # }) }
995 ```
996 "#]
997 fn min_by<F>(
998 self,
999 compare: F,
1000 ) -> impl Future<Output = Option<Self::Item>> [MinByFuture<Self, F, Self::Item>]
1001 where
1002 Self: Sized,
1003 F: FnMut(&Self::Item, &Self::Item) -> Ordering,
1004 {
1005 MinByFuture::new(self, compare)
1006 }
1007
1008 #[doc = r#"
1009 Returns the element that gives the maximum value. If several elements are equally maximum,
1010 the first element is returned. If the stream is empty, `None` is returned.
1011
1012 # Examples
1013
1014 ```ignore
1015 # fn main() { async_std::task::block_on(async {
1016 #
1017 use async_std::prelude::*;
1018 use async_std::stream;
1019
1020 let s = stream::from_iter(vec![1usize, 2, 3]);
1021
1022 let max = s.clone().max().await;
1023 assert_eq!(max, Some(3));
1024
1025 let max = stream::empty::<usize>().max().await;
1026 assert_eq!(max, None);
1027 #
1028 # }) }
1029 ```
1030 "#]
1031 fn max<F>(
1032 self,
1033 ) -> impl Future<Output = Option<Self::Item>> [MaxFuture<Self, F, Self::Item>]
1034 where
1035 Self: Sized,
1036 F: FnMut(&Self::Item, &Self::Item) -> Ordering,
1037 {
1038 MaxFuture::new(self)
1039 }
1040
1041 #[doc = r#"
1042 Returns the element that gives the minimum value. If several elements are equally minimum,
1043 the first element is returned. If the stream is empty, `None` is returned.
1044
1045 # Examples
1046
1047 ```ignore
1048 # fn main() { async_std::task::block_on(async {
1049 #
1050 use async_std::prelude::*;
1051 use async_std::stream;
1052
1053 let s = stream::from_iter(vec![1usize, 2, 3]);
1054
1055 let min = s.clone().min().await;
1056 assert_eq!(min, Some(1));
1057
1058 let min = stream::empty::<usize>().min().await;
1059 assert_eq!(min, None);
1060 #
1061 # }) }
1062 ```
1063 "#]
1064 fn min<F>(
1065 self,
1066 ) -> impl Future<Output = Option<Self::Item>> [MinFuture<Self, F, Self::Item>]
1067 where
1068 Self: Sized,
1069 F: FnMut(&Self::Item, &Self::Item) -> Ordering,
1070 {
1071 MinFuture::new(self)
1072 }
1073
1074 #[doc = r#"
1075 Returns the element that gives the maximum value with respect to the
1076 specified comparison function. If several elements are equally maximum,
1077 the first element is returned. If the stream is empty, `None` is returned.
1078
1079 # Examples
1080
1081 ```
1082 # fn main() { async_std::task::block_on(async {
1083 #
1084 use async_std::prelude::*;
1085 use async_std::stream;
1086
1087 let s = stream::from_iter(vec![1u8, 2, 3]);
1088
1089 let max = s.clone().max_by(|x, y| x.cmp(y)).await;
1090 assert_eq!(max, Some(3));
1091
1092 let max = s.max_by(|x, y| y.cmp(x)).await;
1093 assert_eq!(max, Some(1));
1094
1095 let max = stream::empty::<usize>().max_by(|x, y| x.cmp(y)).await;
1096 assert_eq!(max, None);
1097 #
1098 # }) }
1099 ```
1100 "#]
1101 fn max_by<F>(
1102 self,
1103 compare: F,
1104 ) -> impl Future<Output = Option<Self::Item>> [MaxByFuture<Self, F, Self::Item>]
1105 where
1106 Self: Sized,
1107 F: FnMut(&Self::Item, &Self::Item) -> Ordering,
1108 {
1109 MaxByFuture::new(self, compare)
1110 }
1111
1112 #[doc = r#"
1113 Returns the nth element of the stream.
1114
1115 # Examples
1116
1117 Basic usage:
1118
1119 ```
1120 # fn main() { async_std::task::block_on(async {
1121 #
1122 use async_std::prelude::*;
1123 use async_std::stream;
1124
1125 let mut s = stream::from_iter(vec![1u8, 2, 3]);
1126
1127 let second = s.nth(1).await;
1128 assert_eq!(second, Some(2));
1129 #
1130 # }) }
1131 ```
1132 Calling `nth()` multiple times:
1133
1134 ```
1135 # fn main() { async_std::task::block_on(async {
1136 #
1137 use async_std::stream;
1138 use async_std::prelude::*;
1139
1140 let mut s = stream::from_iter(vec![1u8, 2, 3]);
1141
1142 let second = s.nth(0).await;
1143 assert_eq!(second, Some(1));
1144
1145 let second = s.nth(0).await;
1146 assert_eq!(second, Some(2));
1147 #
1148 # }) }
1149 ```
1150 Returning `None` if the stream finished before returning `n` elements:
1151 ```
1152 # fn main() { async_std::task::block_on(async {
1153 #
1154 use async_std::prelude::*;
1155 use async_std::stream;
1156
1157 let mut s = stream::from_iter(vec![1u8, 2, 3]);
1158
1159 let fourth = s.nth(4).await;
1160 assert_eq!(fourth, None);
1161 #
1162 # }) }
1163 ```
1164 "#]
1165 fn nth(
1166 &mut self,
1167 n: usize,
1168 ) -> impl Future<Output = Option<Self::Item>> + '_ [NthFuture<'_, Self>]
1169 where
1170 Self: Unpin + Sized,
1171 {
1172 NthFuture::new(self, n)
1173 }
1174
1175 #[doc = r#"
1176 Tests if every element of the stream matches a predicate.
1177
1178 `all()` takes a closure that returns `true` or `false`. It applies
1179 this closure to each element of the stream, and if they all return
1180 `true`, then so does `all()`. If any of them return `false`, it
1181 returns `false`.
1182
1183 `all()` is short-circuiting; in other words, it will stop processing
1184 as soon as it finds a `false`, given that no matter what else happens,
1185 the result will also be `false`.
1186
1187 An empty stream returns `true`.
1188
1189 # Examples
1190
1191 Basic usage:
1192
1193 ```
1194 # fn main() { async_std::task::block_on(async {
1195 #
1196 use async_std::prelude::*;
1197 use async_std::stream;
1198
1199 let mut s = stream::repeat::<u32>(42).take(3);
1200 assert!(s.all(|x| x == 42).await);
1201
1202 #
1203 # }) }
1204 ```
1205
1206 Empty stream:
1207
1208 ```
1209 # fn main() { async_std::task::block_on(async {
1210 #
1211 use async_std::prelude::*;
1212 use async_std::stream;
1213
1214 let mut s = stream::empty::<u32>();
1215 assert!(s.all(|_| false).await);
1216 #
1217 # }) }
1218 ```
1219 "#]
1220 #[inline]
1221 fn all<F>(
1222 &mut self,
1223 f: F,
1224 ) -> impl Future<Output = bool> + '_ [AllFuture<'_, Self, F, Self::Item>]
1225 where
1226 Self: Unpin + Sized,
1227 F: FnMut(Self::Item) -> bool,
1228 {
1229 AllFuture::new(self, f)
1230 }
1231
1232 #[doc = r#"
1233 Searches for an element in a stream that satisfies a predicate.
1234
1235 # Examples
1236
1237 Basic usage:
1238
1239 ```
1240 # fn main() { async_std::task::block_on(async {
1241 #
1242 use async_std::prelude::*;
1243 use async_std::stream;
1244
1245 let mut s = stream::from_iter(vec![1u8, 2, 3]);
1246 let res = s.find(|x| *x == 2).await;
1247 assert_eq!(res, Some(2));
1248 #
1249 # }) }
1250 ```
1251
1252 Resuming after a first find:
1253
1254 ```
1255 # fn main() { async_std::task::block_on(async {
1256 #
1257 use async_std::prelude::*;
1258 use async_std::stream;
1259
1260 let mut s= stream::from_iter(vec![1, 2, 3]);
1261 let res = s.find(|x| *x == 2).await;
1262 assert_eq!(res, Some(2));
1263
1264 let next = s.next().await;
1265 assert_eq!(next, Some(3));
1266 #
1267 # }) }
1268 ```
1269 "#]
1270 fn find<P>(
1271 &mut self,
1272 p: P,
1273 ) -> impl Future<Output = Option<Self::Item>> + '_ [FindFuture<'_, Self, P>]
1274 where
1275 Self: Unpin + Sized,
1276 P: FnMut(&Self::Item) -> bool,
1277 {
1278 FindFuture::new(self, p)
1279 }
1280
1281 #[doc = r#"
1282 Applies function to the elements of stream and returns the first non-none result.
1283
1284 ```
1285 # fn main() { async_std::task::block_on(async {
1286 #
1287 use async_std::prelude::*;
1288 use async_std::stream;
1289
1290 let mut s = stream::from_iter(vec!["lol", "NaN", "2", "5"]);
1291 let first_number = s.find_map(|s| s.parse().ok()).await;
1292
1293 assert_eq!(first_number, Some(2));
1294 #
1295 # }) }
1296 ```
1297 "#]
1298 fn find_map<F, B>(
1299 &mut self,
1300 f: F,
1301 ) -> impl Future<Output = Option<B>> + '_ [FindMapFuture<'_, Self, F>]
1302 where
1303 Self: Unpin + Sized,
1304 F: FnMut(Self::Item) -> Option<B>,
1305 {
1306 FindMapFuture::new(self, f)
1307 }
1308
1309 #[doc = r#"
1310 A combinator that applies a function to every element in a stream
1311 producing a single, final value.
1312
1313 # Examples
1314
1315 Basic usage:
1316
1317 ```
1318 # fn main() { async_std::task::block_on(async {
1319 #
1320 use async_std::prelude::*;
1321 use async_std::stream;
1322
1323 let s = stream::from_iter(vec![1u8, 2, 3]);
1324 let sum = s.fold(0, |acc, x| acc + x).await;
1325
1326 assert_eq!(sum, 6);
1327 #
1328 # }) }
1329 ```
1330 "#]
1331 fn fold<B, F>(
1332 self,
1333 init: B,
1334 f: F,
1335 ) -> impl Future<Output = B> [FoldFuture<Self, F, B>]
1336 where
1337 Self: Sized,
1338 F: FnMut(B, Self::Item) -> B,
1339 {
1340 FoldFuture::new(self, init, f)
1341 }
1342
1343 #[doc = r#"
1344 A combinator that applies a function to every element in a stream
1345 creating two collections from it.
1346
1347 # Examples
1348
1349 Basic usage:
1350
1351 ```
1352 # fn main() { async_std::task::block_on(async {
1353 #
1354 use async_std::prelude::*;
1355 use async_std::stream;
1356
1357 let (even, odd): (Vec<i32>, Vec<i32>) = stream::from_iter(vec![1, 2, 3])
1358 .partition(|&n| n % 2 == 0).await;
1359
1360 assert_eq!(even, vec![2]);
1361 assert_eq!(odd, vec![1, 3]);
1362
1363 #
1364 # }) }
1365 ```
1366 "#]
1367 #[cfg(feature = "unstable")]
1368 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1369 fn partition<B, F>(
1370 self,
1371 f: F,
1372 ) -> impl Future<Output = (B, B)> [PartitionFuture<Self, F, B>]
1373 where
1374 Self: Sized,
1375 F: FnMut(&Self::Item) -> bool,
1376 B: Default + Extend<Self::Item>,
1377 {
1378 PartitionFuture::new(self, f)
1379 }
1380
1381 #[doc = r#"
1382 Call a closure on each element of the stream.
1383
1384 # Examples
1385
1386 ```
1387 # fn main() { async_std::task::block_on(async {
1388 #
1389 use async_std::prelude::*;
1390 use async_std::stream;
1391 use std::sync::mpsc::channel;
1392
1393 let (tx, rx) = channel();
1394
1395 let s = stream::from_iter(vec![1usize, 2, 3]);
1396 let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await;
1397
1398 let v: Vec<_> = rx.iter().collect();
1399
1400 assert_eq!(v, vec![1, 2, 3]);
1401 #
1402 # }) }
1403 ```
1404 "#]
1405 fn for_each<F>(
1406 self,
1407 f: F,
1408 ) -> impl Future<Output = ()> [ForEachFuture<Self, F>]
1409 where
1410 Self: Sized,
1411 F: FnMut(Self::Item),
1412 {
1413 ForEachFuture::new(self, f)
1414 }
1415
1416 #[doc = r#"
1417 Tests if any element of the stream matches a predicate.
1418
1419 `any()` takes a closure that returns `true` or `false`. It applies
1420 this closure to each element of the stream, and if any of them return
1421 `true`, then so does `any()`. If they all return `false`, it
1422 returns `false`.
1423
1424 `any()` is short-circuiting; in other words, it will stop processing
1425 as soon as it finds a `true`, given that no matter what else happens,
1426 the result will also be `true`.
1427
1428 An empty stream returns `false`.
1429
1430 # Examples
1431
1432 Basic usage:
1433
1434 ```
1435 # fn main() { async_std::task::block_on(async {
1436 #
1437 use async_std::prelude::*;
1438 use async_std::stream;
1439
1440 let mut s = stream::repeat::<u32>(42).take(3);
1441 assert!(s.any(|x| x == 42).await);
1442 #
1443 # }) }
1444 ```
1445
1446 Empty stream:
1447
1448 ```
1449 # fn main() { async_std::task::block_on(async {
1450 #
1451 use async_std::prelude::*;
1452 use async_std::stream;
1453
1454 let mut s = stream::empty::<u32>();
1455 assert!(!s.any(|_| false).await);
1456 #
1457 # }) }
1458 ```
1459 "#]
1460 #[inline]
1461 fn any<F>(
1462 &mut self,
1463 f: F,
1464 ) -> impl Future<Output = bool> + '_ [AnyFuture<'_, Self, F, Self::Item>]
1465 where
1466 Self: Unpin + Sized,
1467 F: FnMut(Self::Item) -> bool,
1468 {
1469 AnyFuture::new(self, f)
1470 }
1471
1472 #[doc = r#"
1473 Borrows an stream, rather than consuming it.
1474
1475 This is useful to allow applying stream adaptors while still retaining ownership of the original stream.
1476
1477 # Examples
1478
1479 ```
1480 # fn main() { async_std::task::block_on(async {
1481 #
1482 use async_std::prelude::*;
1483 use async_std::stream;
1484
1485 let a = vec![1isize, 2, 3];
1486
1487 let stream = stream::from_iter(a);
1488
1489 let sum: isize = stream.take(5).sum().await;
1490
1491 assert_eq!(sum, 6);
1492
1493 // if we try to use stream again, it won't work. The following line
1494 // gives error: use of moved value: `stream`
1495 // assert_eq!(stream.next(), None);
1496
1497 // let's try that again
1498 let a = vec![1isize, 2, 3];
1499
1500 let mut stream = stream::from_iter(a);
1501
1502 // instead, we add in a .by_ref()
1503 let sum: isize = stream.by_ref().take(2).sum().await;
1504
1505 assert_eq!(sum, 3);
1506
1507 // now this is just fine:
1508 assert_eq!(stream.next().await, Some(3));
1509 assert_eq!(stream.next().await, None);
1510 #
1511 # }) }
1512 ```
1513 "#]
1514 #[cfg(feature = "unstable")]
1515 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1516 fn by_ref(&mut self) -> &mut Self {
1517 self
1518 }
1519
1520 #[doc = r#"
1521 A stream adaptor similar to [`fold`] that holds internal state and produces a new
1522 stream.
1523
1524 [`fold`]: #method.fold
1525
1526 `scan()` takes two arguments: an initial value which seeds the internal state, and
1527 a closure with two arguments, the first being a mutable reference to the internal
1528 state and the second a stream element. The closure can assign to the internal state
1529 to share state between iterations.
1530
1531 On iteration, the closure will be applied to each element of the stream and the
1532 return value from the closure, an `Option`, is yielded by the stream.
1533
1534 ## Examples
1535
1536 ```
1537 # fn main() { async_std::task::block_on(async {
1538 #
1539 use async_std::prelude::*;
1540 use async_std::stream;
1541
1542 let s = stream::from_iter(vec![1isize, 2, 3]);
1543 let mut s = s.scan(1, |state, x| {
1544 *state = *state * x;
1545 Some(-*state)
1546 });
1547
1548 assert_eq!(s.next().await, Some(-1));
1549 assert_eq!(s.next().await, Some(-2));
1550 assert_eq!(s.next().await, Some(-6));
1551 assert_eq!(s.next().await, None);
1552 #
1553 # }) }
1554 ```
1555 "#]
1556 #[inline]
1557 fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1558 where
1559 Self: Sized,
1560 F: FnMut(&mut St, Self::Item) -> Option<B>,
1561 {
1562 Scan::new(self, initial_state, f)
1563 }
1564
1565 #[doc = r#"
1566 Combinator that `skip`s elements based on a predicate.
1567
1568 Takes a closure argument. It will call this closure on every element in
1569 the stream and ignore elements until it returns `false`.
1570
1571 After `false` is returned, `SkipWhile`'s job is over and all further
1572 elements in the strem are yielded.
1573
1574 ## Examples
1575
1576 ```
1577 # fn main() { async_std::task::block_on(async {
1578 #
1579 use async_std::prelude::*;
1580 use async_std::stream;
1581
1582 let a = stream::from_iter(vec![-1i32, 0, 1]);
1583 let mut s = a.skip_while(|x| x.is_negative());
1584
1585 assert_eq!(s.next().await, Some(0));
1586 assert_eq!(s.next().await, Some(1));
1587 assert_eq!(s.next().await, None);
1588 #
1589 # }) }
1590 ```
1591 "#]
1592 fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1593 where
1594 Self: Sized,
1595 P: FnMut(&Self::Item) -> bool,
1596 {
1597 SkipWhile::new(self, predicate)
1598 }
1599
1600 #[doc = r#"
1601 Creates a combinator that skips the first `n` elements.
1602
1603 ## Examples
1604
1605 ```
1606 # fn main() { async_std::task::block_on(async {
1607 #
1608 use async_std::prelude::*;
1609 use async_std::stream;
1610
1611 let s = stream::from_iter(vec![1u8, 2, 3]);
1612 let mut skipped = s.skip(2);
1613
1614 assert_eq!(skipped.next().await, Some(3));
1615 assert_eq!(skipped.next().await, None);
1616 #
1617 # }) }
1618 ```
1619 "#]
1620 fn skip(self, n: usize) -> Skip<Self>
1621 where
1622 Self: Sized,
1623 {
1624 Skip::new(self, n)
1625 }
1626
1627 #[doc=r#"
1628 Await a stream or times out after a duration of time.
1629
1630 If you want to await an I/O future consider using
1631 [`io::timeout`](../io/fn.timeout.html) instead.
1632
1633 # Examples
1634
1635 ```
1636 # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
1637 #
1638 use std::time::Duration;
1639
1640 use async_std::stream;
1641 use async_std::prelude::*;
1642
1643 let mut s = stream::repeat(1).take(3).timeout(Duration::from_secs(1));
1644
1645 while let Some(v) = s.next().await {
1646 assert_eq!(v, Ok(1));
1647 }
1648
1649 // when timeout
1650 let mut s = stream::pending::<()>().timeout(Duration::from_millis(10));
1651 match s.next().await {
1652 Some(item) => assert!(item.is_err()),
1653 None => panic!()
1654 };
1655 #
1656 # Ok(()) }) }
1657 ```
1658 "#]
1659 #[cfg(any(feature = "unstable", feature = "docs"))]
1660 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1661 fn timeout(self, dur: Duration) -> Timeout<Self>
1662 where
1663 Self: Stream + Sized,
1664 {
1665 Timeout::new(self, dur)
1666 }
1667
1668 #[doc = r#"
1669 A combinator that applies a function as long as it returns successfully, producing a single, final value.
1670 Immediately returns the error when the function returns unsuccessfully.
1671
1672 # Examples
1673
1674 Basic usage:
1675
1676 ```
1677 # fn main() { async_std::task::block_on(async {
1678 #
1679 use async_std::prelude::*;
1680 use async_std::stream;
1681
1682 let mut s = stream::from_iter(vec![1usize, 2, 3]);
1683 let sum = s.try_fold(0, |acc, v| {
1684 if (acc+v) % 2 == 1 {
1685 Ok(v+3)
1686 } else {
1687 Err("fail")
1688 }
1689 }).await;
1690
1691 assert_eq!(sum, Err("fail"));
1692 #
1693 # }) }
1694 ```
1695 "#]
1696 fn try_fold<B, F, T, E>(
1697 &mut self,
1698 init: T,
1699 f: F,
1700 ) -> impl Future<Output = Result<T, E>> + '_ [TryFoldFuture<'_, Self, F, T>]
1701 where
1702 Self: Unpin + Sized,
1703 F: FnMut(B, Self::Item) -> Result<T, E>,
1704 {
1705 TryFoldFuture::new(self, init, f)
1706 }
1707
1708 #[doc = r#"
1709 Applies a falliable function to each element in a stream, stopping at first error and returning it.
1710
1711 # Examples
1712
1713 ```
1714 # fn main() { async_std::task::block_on(async {
1715 #
1716 use std::sync::mpsc::channel;
1717 use async_std::prelude::*;
1718 use async_std::stream;
1719
1720 let (tx, rx) = channel();
1721
1722 let mut s = stream::from_iter(vec![1u8, 2, 3]);
1723 let s = s.try_for_each(|v| {
1724 if v % 2 == 1 {
1725 tx.clone().send(v).unwrap();
1726 Ok(())
1727 } else {
1728 Err("even")
1729 }
1730 });
1731
1732 let res = s.await;
1733 drop(tx);
1734 let values: Vec<_> = rx.iter().collect();
1735
1736 assert_eq!(values, vec![1]);
1737 assert_eq!(res, Err("even"));
1738 #
1739 # }) }
1740 ```
1741 "#]
1742 fn try_for_each<F, E>(
1743 &mut self,
1744 f: F,
1745 ) -> impl Future<Output = E> + 'a [TryForEachFuture<'_, Self, F>]
1746 where
1747 Self: Unpin + Sized,
1748 F: FnMut(Self::Item) -> Result<(), E>,
1749 {
1750 TryForEachFuture::new(self, f)
1751 }
1752
1753 #[doc = r#"
1754 'Zips up' two streams into a single stream of pairs.
1755
1756 `zip()` returns a new stream that will iterate over two other streams, returning a
1757 tuple where the first element comes from the first stream, and the second element
1758 comes from the second stream.
1759
1760 In other words, it zips two streams together, into a single one.
1761
1762 If either stream returns [`None`], [`poll_next`] from the zipped stream will return
1763 [`None`]. If the first stream returns [`None`], `zip` will short-circuit and
1764 `poll_next` will not be called on the second stream.
1765
1766 [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
1767 [`poll_next`]: #tymethod.poll_next
1768
1769 ## Examples
1770
1771 ```
1772 # fn main() { async_std::task::block_on(async {
1773 #
1774 use async_std::prelude::*;
1775 use async_std::stream;
1776
1777 let l = stream::from_iter(vec![1u8, 2, 3]);
1778 let r = stream::from_iter(vec![4u8, 5, 6, 7]);
1779 let mut s = l.zip(r);
1780
1781 assert_eq!(s.next().await, Some((1, 4)));
1782 assert_eq!(s.next().await, Some((2, 5)));
1783 assert_eq!(s.next().await, Some((3, 6)));
1784 assert_eq!(s.next().await, None);
1785 #
1786 # }) }
1787 ```
1788 "#]
1789 #[inline]
1790 fn zip<U>(self, other: U) -> Zip<Self, U>
1791 where
1792 Self: Sized,
1793 U: Stream,
1794 {
1795 Zip::new(self, other)
1796 }
1797
1798 #[doc = r#"
1799 Converts an stream of pairs into a pair of containers.
1800
1801 `unzip()` consumes an entire stream of pairs, producing two collections: one from the left elements of the pairs, and one from the right elements.
1802
1803 This function is, in some sense, the opposite of [`zip`].
1804
1805 [`zip`]: trait.Stream.html#method.zip
1806
1807 # Example
1808
1809 ```
1810 # fn main() { async_std::task::block_on(async {
1811 #
1812 use async_std::prelude::*;
1813 use async_std::stream;
1814
1815 let s = stream::from_iter(vec![(1,2), (3,4)]);
1816
1817 let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1818
1819 assert_eq!(left, [1, 3]);
1820 assert_eq!(right, [2, 4]);
1821 #
1822 # }) }
1823 ```
1824 "#]
1825 #[cfg(feature = "unstable")]
1826 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1827 fn unzip<A, B, FromA, FromB>(self) -> impl Future<Output = (FromA, FromB)> [UnzipFuture<Self, FromA, FromB>]
1828 where
1829 FromA: Default + Extend<A>,
1830 FromB: Default + Extend<B>,
1831 Self: Stream<Item = (A, B)> + Sized,
1832 {
1833 UnzipFuture::new(self)
1834 }
1835
1836 #[doc = r#"
1837 Transforms a stream into a collection.
1838
1839 `collect()` can take anything streamable, and turn it into a relevant
1840 collection. This is one of the more powerful methods in the async
1841 standard library, used in a variety of contexts.
1842
1843 The most basic pattern in which `collect()` is used is to turn one
1844 collection into another. You take a collection, call [`into_stream`] on it,
1845 do a bunch of transformations, and then `collect()` at the end.
1846
1847 Because `collect()` is so general, it can cause problems with type
1848 inference. As such, `collect()` is one of the few times you'll see
1849 the syntax affectionately known as the 'turbofish': `::<>`. This
1850 helps the inference algorithm understand specifically which collection
1851 you're trying to collect into.
1852
1853 # Examples
1854
1855 ```
1856 # fn main() { async_std::task::block_on(async {
1857 #
1858 use async_std::prelude::*;
1859 use async_std::stream;
1860
1861 let s = stream::repeat(9u8).take(3);
1862 let buf: Vec<u8> = s.collect().await;
1863
1864 assert_eq!(buf, vec![9; 3]);
1865
1866 // You can also collect streams of Result values
1867 // into any collection that implements FromStream
1868 let s = stream::repeat(Ok(9)).take(3);
1869 // We are using Vec here, but other collections
1870 // are supported as well
1871 let buf: Result<Vec<u8>, ()> = s.collect().await;
1872
1873 assert_eq!(buf, Ok(vec![9; 3]));
1874
1875 // The stream will stop on the first Err and
1876 // return that instead
1877 let s = stream::repeat(Err(5)).take(3);
1878 let buf: Result<Vec<u8>, u8> = s.collect().await;
1879
1880 assert_eq!(buf, Err(5));
1881 #
1882 # }) }
1883 ```
1884
1885 [`into_stream`]: trait.IntoStream.html#tymethod.into_stream
1886 "#]
1887 #[cfg(feature = "unstable")]
1888 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1889 fn collect<'a, B>(
1890 self,
1891 ) -> impl Future<Output = B> + 'a [Pin<Box<dyn Future<Output = B> + 'a>>]
1892 where
1893 Self: Sized + 'a,
1894 B: FromStream<Self::Item>,
1895 {
1896 FromStream::from_stream(self)
1897 }
1898
1899 #[doc = r#"
1900 Combines multiple streams into a single stream of all their outputs.
1901
1902 Items are yielded as soon as they're received, and the stream continues yield until
1903 both streams have been exhausted. The output ordering between streams is not guaranteed.
1904
1905 # Examples
1906
1907 ```
1908 # async_std::task::block_on(async {
1909 use async_std::prelude::*;
1910 use async_std::stream::{self, FromStream};
1911
1912 let a = stream::once(1u8);
1913 let b = stream::once(2u8);
1914 let c = stream::once(3u8);
1915
1916 let s = a.merge(b).merge(c);
1917 let mut lst = Vec::from_stream(s).await;
1918
1919 lst.sort_unstable();
1920 assert_eq!(&lst, &[1u8, 2u8, 3u8]);
1921 # });
1922 ```
1923 "#]
1924 #[cfg(feature = "unstable")]
1925 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1926 fn merge<U>(self, other: U) -> Merge<Self, U>
1927 where
1928 Self: Sized,
1929 U: Stream<Item = Self::Item> + Sized,
1930 {
1931 Merge::new(self, other)
1932 }
1933
1934 #[doc = r#"
1935 Lexicographically compares the elements of this `Stream` with those
1936 of another.
1937
1938 # Examples
1939
1940 ```
1941 # fn main() { async_std::task::block_on(async {
1942 #
1943 use async_std::prelude::*;
1944 use async_std::stream;
1945
1946 use std::cmp::Ordering;
1947
1948 let s1 = stream::from_iter(vec![1]);
1949 let s2 = stream::from_iter(vec![1, 2]);
1950 let s3 = stream::from_iter(vec![1, 2, 3]);
1951 let s4 = stream::from_iter(vec![1, 2, 4]);
1952 assert_eq!(s1.clone().partial_cmp(s1.clone()).await, Some(Ordering::Equal));
1953 assert_eq!(s1.clone().partial_cmp(s2.clone()).await, Some(Ordering::Less));
1954 assert_eq!(s2.clone().partial_cmp(s1.clone()).await, Some(Ordering::Greater));
1955 assert_eq!(s3.clone().partial_cmp(s4.clone()).await, Some(Ordering::Less));
1956 assert_eq!(s4.clone().partial_cmp(s3.clone()).await, Some(Ordering::Greater));
1957 #
1958 # }) }
1959 ```
1960 "#]
1961 fn partial_cmp<S>(
1962 self,
1963 other: S
1964 ) -> impl Future<Output = Option<Ordering>> [PartialCmpFuture<Self, S>]
1965 where
1966 Self: Sized + Stream,
1967 S: Stream,
1968 <Self as Stream>::Item: PartialOrd<S::Item>,
1969 {
1970 PartialCmpFuture::new(self, other)
1971 }
1972
1973 #[doc = r#"
1974 Searches for an element in a Stream that satisfies a predicate, returning
1975 its index.
1976
1977 # Examples
1978
1979 ```
1980 # fn main() { async_std::task::block_on(async {
1981 #
1982 use async_std::prelude::*;
1983 use async_std::stream;
1984
1985 let s = stream::from_iter(vec![1usize, 2, 3]);
1986 let res = s.clone().position(|x| x == 1).await;
1987 assert_eq!(res, Some(0));
1988
1989 let res = s.clone().position(|x| x == 2).await;
1990 assert_eq!(res, Some(1));
1991
1992 let res = s.clone().position(|x| x == 3).await;
1993 assert_eq!(res, Some(2));
1994
1995 let res = s.clone().position(|x| x == 4).await;
1996 assert_eq!(res, None);
1997 #
1998 # }) }
1999 ```
2000 "#]
2001 fn position<P>(
2002 &mut self,
2003 predicate: P,
2004 ) -> impl Future<Output = Option<usize>> + '_ [PositionFuture<'_, Self, P>]
2005 where
2006 Self: Unpin + Sized,
2007 P: FnMut(Self::Item) -> bool,
2008 {
2009 PositionFuture::new(self, predicate)
2010 }
2011
2012 #[doc = r#"
2013 Lexicographically compares the elements of this `Stream` with those
2014 of another using 'Ord'.
2015
2016 # Examples
2017
2018 ```
2019 # fn main() { async_std::task::block_on(async {
2020 #
2021 use async_std::prelude::*;
2022 use async_std::stream;
2023 use std::cmp::Ordering;
2024
2025 let s1 = stream::from_iter(vec![1]);
2026 let s2 = stream::from_iter(vec![1, 2]);
2027 let s3 = stream::from_iter(vec![1, 2, 3]);
2028 let s4 = stream::from_iter(vec![1, 2, 4]);
2029
2030 assert_eq!(s1.clone().cmp(s1.clone()).await, Ordering::Equal);
2031 assert_eq!(s1.clone().cmp(s2.clone()).await, Ordering::Less);
2032 assert_eq!(s2.clone().cmp(s1.clone()).await, Ordering::Greater);
2033 assert_eq!(s3.clone().cmp(s4.clone()).await, Ordering::Less);
2034 assert_eq!(s4.clone().cmp(s3.clone()).await, Ordering::Greater);
2035 #
2036 # }) }
2037 ```
2038 "#]
2039 fn cmp<S>(
2040 self,
2041 other: S
2042 ) -> impl Future<Output = Ordering> [CmpFuture<Self, S>]
2043 where
2044 Self: Sized + Stream,
2045 S: Stream,
2046 <Self as Stream>::Item: Ord
2047 {
2048 CmpFuture::new(self, other)
2049 }
2050
2051 #[doc = r#"
2052 Counts the number of elements in the stream.
2053
2054 # Examples
2055
2056 ```
2057 # fn main() { async_std::task::block_on(async {
2058 #
2059 use async_std::prelude::*;
2060 use async_std::stream;
2061
2062 let s1 = stream::from_iter(vec![0]);
2063 let s2 = stream::from_iter(vec![1, 2, 3]);
2064
2065 assert_eq!(s1.count().await, 1);
2066 assert_eq!(s2.count().await, 3);
2067 #
2068 # }) }
2069 ```
2070 "#]
2071 #[cfg(feature = "unstable")]
2072 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
2073 fn count(self) -> impl Future<Output = usize> [CountFuture<Self>]
2074 where
2075 Self: Sized,
2076 {
2077 CountFuture::new(self)
2078 }
2079
2080 #[doc = r#"
2081 Determines if the elements of this `Stream` are lexicographically
2082 not equal to those of another.
2083
2084 # Examples
2085
2086 ```
2087 # fn main() { async_std::task::block_on(async {
2088 #
2089 use async_std::prelude::*;
2090 use async_std::stream;
2091
2092 let single = stream::from_iter(vec![1usize]);
2093 let single_ne = stream::from_iter(vec![10usize]);
2094 let multi = stream::from_iter(vec![1usize,2]);
2095 let multi_ne = stream::from_iter(vec![1usize,5]);
2096
2097 assert_eq!(single.clone().ne(single.clone()).await, false);
2098 assert_eq!(single_ne.clone().ne(single.clone()).await, true);
2099 assert_eq!(multi.clone().ne(single_ne.clone()).await, true);
2100 assert_eq!(multi_ne.clone().ne(multi.clone()).await, true);
2101 #
2102 # }) }
2103 ```
2104 "#]
2105 fn ne<S>(
2106 self,
2107 other: S
2108 ) -> impl Future<Output = bool> [NeFuture<Self, S>]
2109 where
2110 Self: Sized,
2111 S: Sized + Stream,
2112 <Self as Stream>::Item: PartialEq<S::Item>,
2113 {
2114 NeFuture::new(self, other)
2115 }
2116
2117 #[doc = r#"
2118 Determines if the elements of this `Stream` are lexicographically
2119 greater than or equal to those of another.
2120
2121 # Examples
2122
2123 ```
2124 # fn main() { async_std::task::block_on(async {
2125 #
2126 use async_std::prelude::*;
2127 use async_std::stream;
2128
2129 let single = stream::from_iter(vec![1]);
2130 let single_gt = stream::from_iter(vec![10]);
2131 let multi = stream::from_iter(vec![1,2]);
2132 let multi_gt = stream::from_iter(vec![1,5]);
2133
2134 assert_eq!(single.clone().ge(single.clone()).await, true);
2135 assert_eq!(single_gt.clone().ge(single.clone()).await, true);
2136 assert_eq!(multi.clone().ge(single_gt.clone()).await, false);
2137 assert_eq!(multi_gt.clone().ge(multi.clone()).await, true);
2138 #
2139 # }) }
2140 ```
2141 "#]
2142 fn ge<S>(
2143 self,
2144 other: S
2145 ) -> impl Future<Output = bool> [GeFuture<Self, S>]
2146 where
2147 Self: Sized + Stream,
2148 S: Stream,
2149 <Self as Stream>::Item: PartialOrd<S::Item>,
2150 {
2151 GeFuture::new(self, other)
2152 }
2153
2154 #[doc = r#"
2155 Determines if the elements of this `Stream` are lexicographically
2156 equal to those of another.
2157
2158 # Examples
2159
2160 ```
2161 # fn main() { async_std::task::block_on(async {
2162 #
2163 use async_std::prelude::*;
2164 use async_std::stream;
2165
2166 let single = stream::from_iter(vec![1]);
2167 let single_eq = stream::from_iter(vec![10]);
2168 let multi = stream::from_iter(vec![1,2]);
2169 let multi_eq = stream::from_iter(vec![1,5]);
2170
2171 assert_eq!(single.clone().eq(single.clone()).await, true);
2172 assert_eq!(single_eq.clone().eq(single.clone()).await, false);
2173 assert_eq!(multi.clone().eq(single_eq.clone()).await, false);
2174 assert_eq!(multi_eq.clone().eq(multi.clone()).await, false);
2175 #
2176 # }) }
2177 ```
2178 "#]
2179 fn eq<S>(
2180 self,
2181 other: S
2182 ) -> impl Future<Output = bool> [EqFuture<Self, S>]
2183 where
2184 Self: Sized + Stream,
2185 S: Sized + Stream,
2186 <Self as Stream>::Item: PartialEq<S::Item>,
2187 {
2188 EqFuture::new(self, other)
2189 }
2190
2191 #[doc = r#"
2192 Determines if the elements of this `Stream` are lexicographically
2193 greater than those of another.
2194
2195 # Examples
2196
2197 ```
2198 # fn main() { async_std::task::block_on(async {
2199 #
2200 use async_std::prelude::*;
2201 use async_std::stream;
2202
2203 let single = stream::from_iter(vec![1]);
2204 let single_gt = stream::from_iter(vec![10]);
2205 let multi = stream::from_iter(vec![1,2]);
2206 let multi_gt = stream::from_iter(vec![1,5]);
2207
2208 assert_eq!(single.clone().gt(single.clone()).await, false);
2209 assert_eq!(single_gt.clone().gt(single.clone()).await, true);
2210 assert_eq!(multi.clone().gt(single_gt.clone()).await, false);
2211 assert_eq!(multi_gt.clone().gt(multi.clone()).await, true);
2212 #
2213 # }) }
2214 ```
2215 "#]
2216 fn gt<S>(
2217 self,
2218 other: S
2219 ) -> impl Future<Output = bool> [GtFuture<Self, S>]
2220 where
2221 Self: Sized + Stream,
2222 S: Stream,
2223 <Self as Stream>::Item: PartialOrd<S::Item>,
2224 {
2225 GtFuture::new(self, other)
2226 }
2227
2228 #[doc = r#"
2229 Determines if the elements of this `Stream` are lexicographically
2230 less or equal to those of another.
2231
2232 # Examples
2233
2234 ```
2235 # fn main() { async_std::task::block_on(async {
2236 #
2237 use async_std::prelude::*;
2238 use async_std::stream;
2239
2240 let single = stream::from_iter(vec![1]);
2241 let single_gt = stream::from_iter(vec![10]);
2242 let multi = stream::from_iter(vec![1,2]);
2243 let multi_gt = stream::from_iter(vec![1,5]);
2244
2245 assert_eq!(single.clone().le(single.clone()).await, true);
2246 assert_eq!(single.clone().le(single_gt.clone()).await, true);
2247 assert_eq!(multi.clone().le(single_gt.clone()).await, true);
2248 assert_eq!(multi_gt.clone().le(multi.clone()).await, false);
2249 #
2250 # }) }
2251 ```
2252 "#]
2253 fn le<S>(
2254 self,
2255 other: S
2256 ) -> impl Future<Output = bool> [LeFuture<Self, S>]
2257 where
2258 Self: Sized + Stream,
2259 S: Stream,
2260 <Self as Stream>::Item: PartialOrd<S::Item>,
2261 {
2262 LeFuture::new(self, other)
2263 }
2264
2265 #[doc = r#"
2266 Determines if the elements of this `Stream` are lexicographically
2267 less than those of another.
2268
2269 # Examples
2270
2271 ```
2272 # fn main() { async_std::task::block_on(async {
2273 #
2274 use async_std::prelude::*;
2275 use async_std::stream;
2276
2277 let single = stream::from_iter(vec![1]);
2278 let single_gt = stream::from_iter(vec![10]);
2279 let multi = stream::from_iter(vec![1,2]);
2280 let multi_gt = stream::from_iter(vec![1,5]);
2281
2282 assert_eq!(single.clone().lt(single.clone()).await, false);
2283 assert_eq!(single.clone().lt(single_gt.clone()).await, true);
2284 assert_eq!(multi.clone().lt(single_gt.clone()).await, true);
2285 assert_eq!(multi_gt.clone().lt(multi.clone()).await, false);
2286 #
2287 # }) }
2288 ```
2289 "#]
2290 fn lt<S>(
2291 self,
2292 other: S
2293 ) -> impl Future<Output = bool> [LtFuture<Self, S>]
2294 where
2295 Self: Sized + Stream,
2296 S: Stream,
2297 <Self as Stream>::Item: PartialOrd<S::Item>,
2298 {
2299 LtFuture::new(self, other)
2300 }
2301
2302 #[doc = r#"
2303 Sums the elements of a stream.
2304
2305 Takes each element, adds them together, and returns the result.
2306
2307 An empty streams returns the zero value of the type.
2308
2309 # Panics
2310
2311 When calling `sum()` and a primitive integer type is being returned, this
2312 method will panic if the computation overflows and debug assertions are
2313 enabled.
2314
2315 # Examples
2316
2317 Basic usage:
2318
2319 ```
2320 # fn main() { async_std::task::block_on(async {
2321 #
2322 use async_std::prelude::*;
2323 use async_std::stream;
2324
2325 let s = stream::from_iter(vec![0u8, 1, 2, 3, 4]);
2326 let sum: u8 = s.sum().await;
2327
2328 assert_eq!(sum, 10);
2329 #
2330 # }) }
2331 ```
2332 "#]
2333 #[cfg(feature = "unstable")]
2334 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
2335 fn sum<'a, S>(
2336 self,
2337 ) -> impl Future<Output = S> + 'a [Pin<Box<dyn Future<Output = S> + 'a>>]
2338 where
2339 Self: Sized + Stream<Item = S> + 'a,
2340 S: Sum<Self::Item>,
2341 {
2342 Sum::sum(self)
2343 }
2344
2345 #[doc = r#"
2346 Multiplies all elements of the stream.
2347
2348 An empty stream returns the one value of the type.
2349
2350 # Panics
2351
2352 When calling `product()` and a primitive integer type is being returned,
2353 method will panic if the computation overflows and debug assertions are
2354 enabled.
2355
2356 # Examples
2357
2358 This example calculates the factorial of n (i.e. the product of the numbers from 1 to
2359 n, inclusive):
2360
2361 ```
2362 # fn main() { async_std::task::block_on(async {
2363 #
2364 async fn factorial(n: u32) -> u32 {
2365 use async_std::prelude::*;
2366 use async_std::stream;
2367
2368 let s = stream::from_iter(1..=n);
2369 s.product().await
2370 }
2371
2372 assert_eq!(factorial(0).await, 1);
2373 assert_eq!(factorial(1).await, 1);
2374 assert_eq!(factorial(5).await, 120);
2375 #
2376 # }) }
2377 ```
2378 "#]
2379 #[cfg(feature = "unstable")]
2380 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
2381 fn product<'a, P>(
2382 self,
2383 ) -> impl Future<Output = P> + 'a [Pin<Box<dyn Future<Output = P> + 'a>>]
2384 where
2385 Self: Sized + Stream<Item = P> + 'a,
2386 P: Product,
2387 {
2388 Product::product(self)
2389 }
2390 }
2391
2392 impl<S: Stream + Unpin + ?Sized> Stream for Box<S> {
2393 type Item = S::Item;
2394
2395 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2396 unreachable!("this impl only appears in the rendered docs")
2397 }
2398 }
2399
2400 impl<S: Stream + Unpin + ?Sized> Stream for &mut S {
2401 type Item = S::Item;
2402
2403 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2404 unreachable!("this impl only appears in the rendered docs")
2405 }
2406 }
2407
2408 impl<P> Stream for Pin<P>
2409 where
2410 P: DerefMut + Unpin,
2411 <P as Deref>::Target: Stream,
2412 {
2413 type Item = <<P as Deref>::Target as Stream>::Item;
2414
2415 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2416 unreachable!("this impl only appears in the rendered docs")
2417 }
2418 }
2419
2420 impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> {
2421 type Item = S::Item;
2422
2423 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2424 unreachable!("this impl only appears in the rendered docs")
2425 }
2426 }
2427}