async_std/stream/stream/
mod.rs

1//! Asynchronous iteration.
2//!
3//! This module is an async version of [`std::iter`].
4//!
5//! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html
6//!
7//! # Examples
8//!
9//! ```
10//! # async_std::task::block_on(async {
11//! #
12//! use async_std::prelude::*;
13//! use async_std::stream;
14//!
15//! let mut s = stream::repeat(9).take(3);
16//!
17//! while let Some(v) = s.next().await {
18//!     assert_eq!(v, 9);
19//! }
20//! #
21//! # })
22//! ```
23
24mod all;
25mod any;
26mod chain;
27mod cloned;
28mod cmp;
29mod copied;
30mod cycle;
31mod enumerate;
32mod eq;
33mod filter;
34mod filter_map;
35mod find;
36mod find_map;
37mod fold;
38mod for_each;
39mod fuse;
40mod ge;
41mod gt;
42mod inspect;
43mod last;
44mod le;
45mod lt;
46mod map;
47mod max;
48mod max_by;
49mod max_by_key;
50mod min;
51mod min_by;
52mod min_by_key;
53mod ne;
54mod next;
55mod nth;
56mod partial_cmp;
57mod position;
58mod scan;
59mod skip;
60mod skip_while;
61mod step_by;
62mod take;
63mod take_while;
64mod try_fold;
65mod try_for_each;
66mod zip;
67
68use all::AllFuture;
69use any::AnyFuture;
70use cmp::CmpFuture;
71use cycle::Cycle;
72use enumerate::Enumerate;
73use eq::EqFuture;
74use filter_map::FilterMap;
75use find::FindFuture;
76use find_map::FindMapFuture;
77use fold::FoldFuture;
78use for_each::ForEachFuture;
79use ge::GeFuture;
80use gt::GtFuture;
81use last::LastFuture;
82use le::LeFuture;
83use lt::LtFuture;
84use max::MaxFuture;
85use max_by::MaxByFuture;
86use max_by_key::MaxByKeyFuture;
87use min::MinFuture;
88use min_by::MinByFuture;
89use min_by_key::MinByKeyFuture;
90use ne::NeFuture;
91use next::NextFuture;
92use nth::NthFuture;
93use partial_cmp::PartialCmpFuture;
94use position::PositionFuture;
95use try_fold::TryFoldFuture;
96use try_for_each::TryForEachFuture;
97
98pub use chain::Chain;
99pub use cloned::Cloned;
100pub use copied::Copied;
101pub use filter::Filter;
102pub use fuse::Fuse;
103pub use inspect::Inspect;
104pub use map::Map;
105pub use scan::Scan;
106pub use skip::Skip;
107pub use skip_while::SkipWhile;
108pub use step_by::StepBy;
109pub use take::Take;
110pub use take_while::TakeWhile;
111pub use zip::Zip;
112
113use std::cmp::Ordering;
114
115cfg_unstable! {
116    use std::future::Future;
117    use std::pin::Pin;
118    use std::time::Duration;
119
120    use crate::stream::into_stream::IntoStream;
121    use crate::stream::{FromStream, Product, Sum};
122    use crate::stream::Extend;
123
124    use count::CountFuture;
125    use partition::PartitionFuture;
126    use unzip::UnzipFuture;
127
128    pub use merge::Merge;
129    pub use flatten::Flatten;
130    pub use flat_map::FlatMap;
131    pub use timeout::{TimeoutError, Timeout};
132    pub use throttle::Throttle;
133    pub use delay::Delay;
134
135    mod count;
136    mod merge;
137    mod flatten;
138    mod flat_map;
139    mod partition;
140    mod timeout;
141    mod throttle;
142    mod delay;
143    mod unzip;
144}
145
146extension_trait! {
147    use std::ops::{Deref, DerefMut};
148
149    use crate::task::{Context, Poll};
150
151    #[doc = r#"
152        An asynchronous stream of values.
153
154        This trait is a re-export of [`futures::stream::Stream`] and is an async version of
155        [`std::iter::Iterator`].
156
157        The [provided methods] do not really exist in the trait itself, but they become
158        available when [`StreamExt`] from the [prelude] is imported:
159
160        ```
161        # #[allow(unused_imports)]
162        use async_std::prelude::*;
163        ```
164
165        [`std::iter::Iterator`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html
166        [`futures::stream::Stream`]:
167        https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
168        [provided methods]: #provided-methods
169        [`StreamExt`]: ../prelude/trait.StreamExt.html
170        [prelude]: ../prelude/index.html
171    "#]
172    pub trait Stream {
173        #[doc = r#"
174            The type of items yielded by this stream.
175        "#]
176        type Item;
177
178        #[doc = r#"
179            Attempts to receive the next item from the stream.
180
181            There are several possible return values:
182
183            * `Poll::Pending` means this stream's next value is not ready yet.
184            * `Poll::Ready(None)` means this stream has been exhausted.
185            * `Poll::Ready(Some(item))` means `item` was received out of the stream.
186
187            # Examples
188
189            ```
190            # fn main() { async_std::task::block_on(async {
191            #
192            use std::pin::Pin;
193
194            use async_std::prelude::*;
195            use async_std::stream;
196            use async_std::task::{Context, Poll};
197
198            fn increment(
199                s: impl Stream<Item = i32> + Unpin,
200            ) -> impl Stream<Item = i32> + Unpin {
201                struct Increment<S>(S);
202
203                impl<S: Stream<Item = i32> + Unpin> Stream for Increment<S> {
204                    type Item = S::Item;
205
206                    fn poll_next(
207                        mut self: Pin<&mut Self>,
208                        cx: &mut Context<'_>,
209                    ) -> Poll<Option<Self::Item>> {
210                        match Pin::new(&mut self.0).poll_next(cx) {
211                            Poll::Pending => Poll::Pending,
212                            Poll::Ready(None) => Poll::Ready(None),
213                            Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
214                        }
215                    }
216                }
217
218                Increment(s)
219            }
220
221            let mut s = increment(stream::once(7));
222
223            assert_eq!(s.next().await, Some(8));
224            assert_eq!(s.next().await, None);
225            #
226            # }) }
227            ```
228        "#]
229        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
230    }
231
232    #[doc = r#"
233        Extension methods for [`Stream`].
234
235        [`Stream`]: ../stream/trait.Stream.html
236    "#]
237    pub trait StreamExt: futures_core::stream::Stream {
238        #[doc = r#"
239            Advances the stream and returns the next value.
240
241            Returns [`None`] when iteration is finished. Individual stream implementations may
242            choose to resume iteration, and so calling `next()` again may or may not eventually
243            start returning more values.
244
245            [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
246
247            # Examples
248
249            ```
250            # fn main() { async_std::task::block_on(async {
251            #
252            use async_std::prelude::*;
253            use async_std::stream;
254
255            let mut s = stream::once(7);
256
257            assert_eq!(s.next().await, Some(7));
258            assert_eq!(s.next().await, None);
259            #
260            # }) }
261            ```
262        "#]
263        fn next(&mut self) -> impl Future<Output = Option<Self::Item>> + '_ [NextFuture<'_, Self>]
264        where
265            Self: Unpin,
266        {
267            NextFuture { stream: self }
268        }
269
270        #[doc = r#"
271            Creates a stream that yields its first `n` elements.
272
273            # Examples
274
275            ```
276            # fn main() { async_std::task::block_on(async {
277            #
278            use async_std::prelude::*;
279            use async_std::stream;
280
281            let mut s = stream::repeat(9).take(3);
282
283            while let Some(v) = s.next().await {
284                assert_eq!(v, 9);
285            }
286            #
287            # }) }
288            ```
289        "#]
290        fn take(self, n: usize) -> Take<Self>
291        where
292            Self: Sized,
293        {
294            Take::new(self, n)
295        }
296
297        #[doc = r#"
298            Creates a stream that yields elements based on a predicate.
299
300            # Examples
301
302            ```
303            # fn main() { async_std::task::block_on(async {
304            #
305            use async_std::prelude::*;
306            use async_std::stream;
307
308            let s = stream::from_iter(vec![1, 2, 3, 4]);
309            let mut s = s.take_while(|x| x < &3 );
310
311            assert_eq!(s.next().await, Some(1));
312            assert_eq!(s.next().await, Some(2));
313            assert_eq!(s.next().await, None);
314            #
315            # }) }
316            ```
317        "#]
318        fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
319        where
320            Self: Sized,
321            P: FnMut(&Self::Item) -> bool,
322        {
323            TakeWhile::new(self, predicate)
324        }
325
326        #[doc = r#"
327            Limit the amount of items yielded per timeslice in a stream.
328
329            This stream does not drop any items, but will only limit the rate at which items pass through.
330            # Examples
331            ```
332            # fn main() { async_std::task::block_on(async {
333            #
334            use async_std::prelude::*;
335            use async_std::stream;
336            use std::time::{Duration, Instant};
337
338            let start = Instant::now();
339
340            // emit value every 5 milliseconds
341            let s = stream::interval(Duration::from_millis(5)).take(2);
342
343            // throttle for 10 milliseconds
344            let mut s = s.throttle(Duration::from_millis(10));
345
346            s.next().await;
347            assert!(start.elapsed().as_millis() >= 5);
348
349            s.next().await;
350            assert!(start.elapsed().as_millis() >= 15);
351
352            s.next().await;
353            assert!(start.elapsed().as_millis() >= 25);
354            #
355            # }) }
356            ```
357        "#]
358        #[cfg(feature = "unstable")]
359        #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
360        fn throttle(self, d: Duration) -> Throttle<Self>
361        where
362            Self: Sized,
363        {
364            Throttle::new(self, d)
365        }
366
367        #[doc = r#"
368            Creates a stream that yields each `step`th element.
369
370            # Panics
371
372            This method will panic if the given step is `0`.
373
374            # Examples
375
376            Basic usage:
377
378            ```
379            # fn main() { async_std::task::block_on(async {
380            #
381            use async_std::prelude::*;
382            use async_std::stream;
383
384            let s = stream::from_iter(vec![0u8, 1, 2, 3, 4]);
385            let mut stepped = s.step_by(2);
386
387            assert_eq!(stepped.next().await, Some(0));
388            assert_eq!(stepped.next().await, Some(2));
389            assert_eq!(stepped.next().await, Some(4));
390            assert_eq!(stepped.next().await, None);
391
392            #
393            # }) }
394            ```
395        "#]
396        fn step_by(self, step: usize) -> StepBy<Self>
397        where
398            Self: Sized,
399        {
400            StepBy::new(self, step)
401        }
402
403        #[doc = r#"
404            Takes two streams and creates a new stream over both in sequence.
405
406            # Examples
407
408            Basic usage:
409
410            ```
411            # fn main() { async_std::task::block_on(async {
412            #
413            use async_std::prelude::*;
414            use async_std::stream;
415
416            let first = stream::from_iter(vec![0u8, 1]);
417            let second = stream::from_iter(vec![2, 3]);
418            let mut c = first.chain(second);
419
420            assert_eq!(c.next().await, Some(0));
421            assert_eq!(c.next().await, Some(1));
422            assert_eq!(c.next().await, Some(2));
423            assert_eq!(c.next().await, Some(3));
424            assert_eq!(c.next().await, None);
425
426            #
427            # }) }
428            ```
429        "#]
430        fn chain<U>(self, other: U) -> Chain<Self, U>
431        where
432            Self: Sized,
433            U: Stream<Item = Self::Item> + Sized,
434        {
435            Chain::new(self, other)
436        }
437
438            #[doc = r#"
439            Creates an stream which copies all of its elements.
440
441            # Examples
442
443            Basic usage:
444
445            ```
446            # fn main() { async_std::task::block_on(async {
447            #
448            use async_std::prelude::*;
449            use async_std::stream;
450
451            let v = stream::from_iter(vec![&1, &2, &3]);
452
453            let mut v_cloned = v.cloned();
454
455            assert_eq!(v_cloned.next().await, Some(1));
456            assert_eq!(v_cloned.next().await, Some(2));
457            assert_eq!(v_cloned.next().await, Some(3));
458            assert_eq!(v_cloned.next().await, None);
459
460            #
461            # }) }
462            ```
463        "#]
464        fn cloned<'a, T>(self) -> Cloned<Self>
465        where
466            Self: Sized + Stream<Item = &'a T>,
467            T: Clone + 'a,
468        {
469            Cloned::new(self)
470        }
471
472
473        #[doc = r#"
474            Creates an stream which copies all of its elements.
475
476            # Examples
477
478            Basic usage:
479
480            ```
481            # fn main() { async_std::task::block_on(async {
482            #
483            use async_std::prelude::*;
484            use async_std::stream;
485
486            let s = stream::from_iter(vec![&1, &2, &3]);
487            let mut s_copied  = s.copied();
488
489            assert_eq!(s_copied.next().await, Some(1));
490            assert_eq!(s_copied.next().await, Some(2));
491            assert_eq!(s_copied.next().await, Some(3));
492            assert_eq!(s_copied.next().await, None);
493            #
494            # }) }
495            ```
496        "#]
497        fn copied<'a, T>(self) -> Copied<Self>
498        where
499            Self: Sized + Stream<Item = &'a T>,
500            T: Copy + 'a,
501        {
502            Copied::new(self)
503        }
504
505        #[doc = r#"
506            Creates a stream that yields the provided values infinitely and in order.
507
508            # Examples
509
510            Basic usage:
511
512            ```
513            # async_std::task::block_on(async {
514            #
515            use async_std::prelude::*;
516            use async_std::stream;
517
518            let mut s = stream::once(7).cycle();
519
520            assert_eq!(s.next().await, Some(7));
521            assert_eq!(s.next().await, Some(7));
522            assert_eq!(s.next().await, Some(7));
523            assert_eq!(s.next().await, Some(7));
524            assert_eq!(s.next().await, Some(7));
525            #
526            # })
527            ```
528        "#]
529        fn cycle(self) -> Cycle<Self>
530        where
531            Self: Clone + Sized,
532        {
533            Cycle::new(self)
534        }
535
536        #[doc = r#"
537            Creates a stream that gives the current element's count as well as the next value.
538
539            # Overflow behaviour.
540
541            This combinator does no guarding against overflows.
542
543            # Examples
544
545            ```
546            # fn main() { async_std::task::block_on(async {
547            #
548            use async_std::prelude::*;
549            use async_std::stream;
550
551            let s = stream::from_iter(vec!['a', 'b', 'c']);
552            let mut s = s.enumerate();
553
554            assert_eq!(s.next().await, Some((0, 'a')));
555            assert_eq!(s.next().await, Some((1, 'b')));
556            assert_eq!(s.next().await, Some((2, 'c')));
557            assert_eq!(s.next().await, None);
558            #
559            # }) }
560            ```
561        "#]
562        fn enumerate(self) -> Enumerate<Self>
563        where
564            Self: Sized,
565        {
566            Enumerate::new(self)
567        }
568
569        #[doc = r#"
570            Creates a stream that is delayed before it starts yielding items.
571
572            # Examples
573
574            ```
575            # fn main() { async_std::task::block_on(async {
576            #
577            use async_std::prelude::*;
578            use async_std::stream;
579            use std::time::{Duration, Instant};
580
581            let start = Instant::now();
582            let mut s = stream::from_iter(vec![0u8, 1, 2]).delay(Duration::from_millis(200));
583
584            assert_eq!(s.next().await, Some(0));
585            // The first time will take more than 200ms due to delay.
586            assert!(start.elapsed().as_millis() >= 200);
587
588            assert_eq!(s.next().await, Some(1));
589            // There will be no delay after the first time.
590            assert!(start.elapsed().as_millis() < 400);
591
592            assert_eq!(s.next().await, Some(2));
593            assert!(start.elapsed().as_millis() < 400);
594
595            assert_eq!(s.next().await, None);
596            assert!(start.elapsed().as_millis() < 400);
597            #
598            # }) }
599            ```
600        "#]
601        #[cfg(any(feature = "unstable", feature = "docs"))]
602        #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
603        fn delay(self, dur: std::time::Duration) -> Delay<Self>
604        where
605            Self: Sized,
606        {
607            Delay::new(self, dur)
608        }
609
610        #[doc = r#"
611            Takes a closure and creates a stream that calls that closure on every element of this stream.
612
613            # Examples
614
615            ```
616            # fn main() { async_std::task::block_on(async {
617            #
618            use async_std::prelude::*;
619            use async_std::stream;
620
621            let s = stream::from_iter(vec![1, 2, 3]);
622            let mut s = s.map(|x| 2 * x);
623
624            assert_eq!(s.next().await, Some(2));
625            assert_eq!(s.next().await, Some(4));
626            assert_eq!(s.next().await, Some(6));
627            assert_eq!(s.next().await, None);
628
629            #
630            # }) }
631            ```
632        "#]
633        fn map<B, F>(self, f: F) -> Map<Self, F>
634        where
635            Self: Sized,
636            F: FnMut(Self::Item) -> B,
637        {
638            Map::new(self, f)
639        }
640
641        #[doc = r#"
642            A combinator that does something with each element in the stream, passing the value
643            on.
644
645            # Examples
646
647            Basic usage:
648
649            ```
650            # fn main() { async_std::task::block_on(async {
651            #
652            use async_std::prelude::*;
653            use async_std::stream;
654
655            let s = stream::from_iter(vec![1, 2, 3, 4, 5]);
656
657            let sum = s
658               .inspect(|x| println!("about to filter {}", x))
659               .filter(|x| x % 2 == 0)
660               .inspect(|x| println!("made it through filter: {}", x))
661               .fold(0, |sum, i| sum + i)
662               .await;
663
664            assert_eq!(sum, 6);
665            #
666            # }) }
667            ```
668        "#]
669        fn inspect<F>(self, f: F) -> Inspect<Self, F>
670        where
671            Self: Sized,
672            F: FnMut(&Self::Item),
673        {
674            Inspect::new(self, f)
675        }
676
677        #[doc = r#"
678            Returns the last element of the stream.
679
680            # Examples
681
682            Basic usage:
683
684            ```
685            # fn main() { async_std::task::block_on(async {
686            #
687            use async_std::prelude::*;
688            use async_std::stream;
689
690            let s = stream::from_iter(vec![1, 2, 3]);
691
692            let last  = s.last().await;
693            assert_eq!(last, Some(3));
694            #
695            # }) }
696            ```
697
698            An empty stream will return `None`:
699            ```
700            # fn main() { async_std::task::block_on(async {
701            #
702            use async_std::stream;
703            use crate::async_std::prelude::*;
704
705            let s = stream::empty::<()>();
706
707            let last  = s.last().await;
708            assert_eq!(last, None);
709            #
710            # }) }
711            ```
712        "#]
713        fn last(
714            self,
715        ) -> impl Future<Output = Option<Self::Item>> [LastFuture<Self, Self::Item>]
716        where
717            Self: Sized,
718        {
719            LastFuture::new(self)
720        }
721
722        #[doc = r#"
723            Creates a stream which ends after the first `None`.
724
725            After a stream returns `None`, future calls may or may not yield `Some(T)` again.
726            `fuse()` adapts an iterator, ensuring that after a `None` is given, it will always
727            return `None` forever.
728
729            # Examples
730
731            ```
732            # fn main() { async_std::task::block_on(async {
733            #
734            use async_std::prelude::*;
735            use async_std::stream;
736
737            let mut s = stream::once(1).fuse();
738            assert_eq!(s.next().await, Some(1));
739            assert_eq!(s.next().await, None);
740            assert_eq!(s.next().await, None);
741            #
742            # }) }
743            ```
744        "#]
745        fn fuse(self) -> Fuse<Self>
746        where
747            Self: Sized,
748        {
749            Fuse::new(self)
750        }
751
752        #[doc = r#"
753            Creates a stream that uses a predicate to determine if an element should be yielded.
754
755            # Examples
756
757            Basic usage:
758
759            ```
760            # fn main() { async_std::task::block_on(async {
761            #
762            use async_std::prelude::*;
763            use async_std::stream;
764
765            let s = stream::from_iter(vec![1, 2, 3, 4]);
766            let mut s = s.filter(|i| i % 2 == 0);
767
768            assert_eq!(s.next().await, Some(2));
769            assert_eq!(s.next().await, Some(4));
770            assert_eq!(s.next().await, None);
771            #
772            # }) }
773            ```
774        "#]
775        fn filter<P>(self, predicate: P) -> Filter<Self, P>
776        where
777            Self: Sized,
778            P: FnMut(&Self::Item) -> bool,
779        {
780            Filter::new(self, predicate)
781        }
782
783        #[doc= r#"
784            Creates an stream that works like map, but flattens nested structure.
785
786            # Examples
787
788            Basic usage:
789
790            ```
791            # async_std::task::block_on(async {
792
793            use async_std::prelude::*;
794            use async_std::stream;
795
796            let words = stream::from_iter(&["alpha", "beta", "gamma"]);
797
798            let merged: String = words
799                .flat_map(|s| stream::from_iter(s.chars()))
800                .collect().await;
801                assert_eq!(merged, "alphabetagamma");
802
803            let d3 = stream::from_iter(&[[[1, 2], [3, 4]], [[5, 6], [7, 8]]]);
804            let d1: Vec<_> = d3
805                .flat_map(|item| stream::from_iter(item))
806                .flat_map(|item| stream::from_iter(item))
807                .collect().await;
808
809            assert_eq!(d1, [&1, &2, &3, &4, &5, &6, &7, &8]);
810            # });
811            ```
812        "#]
813        #[cfg(feature = "unstable")]
814        #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
815        fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
816            where
817                Self: Sized,
818                U: IntoStream,
819                F: FnMut(Self::Item) -> U,
820        {
821            FlatMap::new(self, f)
822        }
823
824        #[doc = r#"
825            Creates an stream that flattens nested structure.
826
827            # Examples
828
829            Basic usage:
830
831            ```
832            # async_std::task::block_on(async {
833
834            use async_std::prelude::*;
835            use async_std::stream;
836
837            let inner1 = stream::from_iter(vec![1u8,2,3]);
838            let inner2 = stream::from_iter(vec![4u8,5,6]);
839            let s  = stream::from_iter(vec![inner1, inner2]);
840
841            let v: Vec<_> = s.flatten().collect().await;
842
843            assert_eq!(v, vec![1,2,3,4,5,6]);
844
845            # });
846        "#]
847        #[cfg(feature = "unstable")]
848        #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
849        fn flatten(self) -> Flatten<Self>
850        where
851            Self: Sized,
852            Self::Item: IntoStream,
853        {
854            Flatten::new(self)
855        }
856
857        #[doc = r#"
858            Both filters and maps a stream.
859
860            # Examples
861
862            Basic usage:
863
864            ```
865            # fn main() { async_std::task::block_on(async {
866            #
867
868            use async_std::prelude::*;
869            use async_std::stream;
870
871            let s = stream::from_iter(vec!["1", "lol", "3", "NaN", "5"]);
872
873            let mut parsed = s.filter_map(|a| a.parse::<u32>().ok());
874
875            let one = parsed.next().await;
876            assert_eq!(one, Some(1));
877
878            let three = parsed.next().await;
879            assert_eq!(three, Some(3));
880
881            let five = parsed.next().await;
882            assert_eq!(five, Some(5));
883
884            let end = parsed.next().await;
885            assert_eq!(end, None);
886            #
887            # }) }
888            ```
889        "#]
890        fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
891        where
892            Self: Sized,
893            F: FnMut(Self::Item) -> Option<B>,
894        {
895            FilterMap::new(self, f)
896        }
897
898        #[doc = r#"
899            Returns the element that gives the minimum value with respect to the
900            specified key function. If several elements are equally minimum,
901            the first element is returned. If the stream is empty, `None` is returned.
902
903            # Examples
904
905            ```
906            # fn main() { async_std::task::block_on(async {
907            #
908            use async_std::prelude::*;
909            use async_std::stream;
910
911            let s = stream::from_iter(vec![-1isize, 2, -3]);
912
913            let min = s.clone().min_by_key(|x| x.abs()).await;
914            assert_eq!(min, Some(-1));
915
916            let min = stream::empty::<isize>().min_by_key(|x| x.abs()).await;
917            assert_eq!(min, None);
918            #
919            # }) }
920            ```
921        "#]
922        fn min_by_key<B, F>(
923            self,
924            key_by: F,
925        ) -> impl Future<Output = Option<Self::Item>> [MinByKeyFuture<Self, Self::Item, F>]
926        where
927            Self: Sized,
928            B: Ord,
929            F: FnMut(&Self::Item) -> B,
930        {
931            MinByKeyFuture::new(self, key_by)
932        }
933
934        #[doc = r#"
935            Returns the element that gives the maximum value with respect to the
936            specified key function. If several elements are equally maximum,
937            the first element is returned. If the stream is empty, `None` is returned.
938
939            # Examples
940
941            ```
942            # fn main() { async_std::task::block_on(async {
943            #
944            use async_std::prelude::*;
945            use async_std::stream;
946
947            let s = stream::from_iter(vec![-3_i32, 0, 1, 5, -10]);
948
949            let max = s.clone().max_by_key(|x| x.abs()).await;
950            assert_eq!(max, Some(-10));
951
952            let max = stream::empty::<isize>().max_by_key(|x| x.abs()).await;
953            assert_eq!(max, None);
954            #
955            # }) }
956            ```
957        "#]
958        fn max_by_key<B, F>(
959            self,
960            key_by: F,
961        ) -> impl Future<Output = Option<Self::Item>> [MaxByKeyFuture<Self, Self::Item, F>]
962        where
963            Self: Sized,
964            B: Ord,
965            F: FnMut(&Self::Item) -> B,
966        {
967            MaxByKeyFuture::new(self, key_by)
968        }
969
970        #[doc = r#"
971            Returns the element that gives the minimum value with respect to the
972            specified comparison function. If several elements are equally minimum,
973            the first element is returned. If the stream is empty, `None` is returned.
974
975            # Examples
976
977            ```
978            # fn main() { async_std::task::block_on(async {
979            #
980            use async_std::prelude::*;
981            use async_std::stream;
982
983            let s = stream::from_iter(vec![1u8, 2, 3]);
984
985            let min = s.clone().min_by(|x, y| x.cmp(y)).await;
986            assert_eq!(min, Some(1));
987
988            let min = s.min_by(|x, y| y.cmp(x)).await;
989            assert_eq!(min, Some(3));
990
991            let min = stream::empty::<u8>().min_by(|x, y| x.cmp(y)).await;
992            assert_eq!(min, None);
993            #
994            # }) }
995            ```
996        "#]
997        fn min_by<F>(
998            self,
999            compare: F,
1000        ) -> impl Future<Output = Option<Self::Item>> [MinByFuture<Self, F, Self::Item>]
1001        where
1002            Self: Sized,
1003            F: FnMut(&Self::Item, &Self::Item) -> Ordering,
1004        {
1005            MinByFuture::new(self, compare)
1006        }
1007
1008        #[doc = r#"
1009            Returns the element that gives the maximum value. If several elements are equally maximum,
1010            the first element is returned. If the stream is empty, `None` is returned.
1011
1012            # Examples
1013
1014            ```ignore
1015            # fn main() { async_std::task::block_on(async {
1016            #
1017            use async_std::prelude::*;
1018            use async_std::stream;
1019
1020            let s = stream::from_iter(vec![1usize, 2, 3]);
1021
1022            let max = s.clone().max().await;
1023            assert_eq!(max, Some(3));
1024
1025            let max = stream::empty::<usize>().max().await;
1026            assert_eq!(max, None);
1027            #
1028            # }) }
1029            ```
1030        "#]
1031        fn max<F>(
1032            self,
1033        ) -> impl Future<Output = Option<Self::Item>> [MaxFuture<Self, F, Self::Item>]
1034        where
1035            Self: Sized,
1036            F: FnMut(&Self::Item, &Self::Item) -> Ordering,
1037        {
1038            MaxFuture::new(self)
1039        }
1040
1041                #[doc = r#"
1042            Returns the element that gives the minimum value. If several elements are equally minimum,
1043            the first element is returned. If the stream is empty, `None` is returned.
1044
1045            # Examples
1046
1047            ```ignore
1048            # fn main() { async_std::task::block_on(async {
1049            #
1050            use async_std::prelude::*;
1051            use async_std::stream;
1052
1053            let s = stream::from_iter(vec![1usize, 2, 3]);
1054
1055            let min = s.clone().min().await;
1056            assert_eq!(min, Some(1));
1057
1058            let min = stream::empty::<usize>().min().await;
1059            assert_eq!(min, None);
1060            #
1061            # }) }
1062            ```
1063        "#]
1064        fn min<F>(
1065            self,
1066        ) -> impl Future<Output = Option<Self::Item>> [MinFuture<Self, F, Self::Item>]
1067        where
1068            Self: Sized,
1069            F: FnMut(&Self::Item, &Self::Item) -> Ordering,
1070        {
1071            MinFuture::new(self)
1072        }
1073
1074         #[doc = r#"
1075            Returns the element that gives the maximum value with respect to the
1076            specified comparison function. If several elements are equally maximum,
1077            the first element is returned. If the stream is empty, `None` is returned.
1078
1079            # Examples
1080
1081            ```
1082            # fn main() { async_std::task::block_on(async {
1083            #
1084            use async_std::prelude::*;
1085            use async_std::stream;
1086
1087            let s = stream::from_iter(vec![1u8, 2, 3]);
1088
1089            let max = s.clone().max_by(|x, y| x.cmp(y)).await;
1090            assert_eq!(max, Some(3));
1091
1092            let max = s.max_by(|x, y| y.cmp(x)).await;
1093            assert_eq!(max, Some(1));
1094
1095            let max = stream::empty::<usize>().max_by(|x, y| x.cmp(y)).await;
1096            assert_eq!(max, None);
1097            #
1098            # }) }
1099            ```
1100        "#]
1101        fn max_by<F>(
1102            self,
1103            compare: F,
1104        ) -> impl Future<Output = Option<Self::Item>> [MaxByFuture<Self, F, Self::Item>]
1105        where
1106            Self: Sized,
1107            F: FnMut(&Self::Item, &Self::Item) -> Ordering,
1108        {
1109            MaxByFuture::new(self, compare)
1110        }
1111
1112        #[doc = r#"
1113            Returns the nth element of the stream.
1114
1115            # Examples
1116
1117            Basic usage:
1118
1119            ```
1120            # fn main() { async_std::task::block_on(async {
1121            #
1122            use async_std::prelude::*;
1123            use async_std::stream;
1124
1125            let mut s = stream::from_iter(vec![1u8, 2, 3]);
1126
1127            let second = s.nth(1).await;
1128            assert_eq!(second, Some(2));
1129            #
1130            # }) }
1131            ```
1132            Calling `nth()` multiple times:
1133
1134            ```
1135            # fn main() { async_std::task::block_on(async {
1136            #
1137            use async_std::stream;
1138            use async_std::prelude::*;
1139
1140            let mut s = stream::from_iter(vec![1u8, 2, 3]);
1141
1142            let second = s.nth(0).await;
1143            assert_eq!(second, Some(1));
1144
1145            let second = s.nth(0).await;
1146            assert_eq!(second, Some(2));
1147            #
1148            # }) }
1149            ```
1150            Returning `None` if the stream finished before returning `n` elements:
1151            ```
1152            # fn main() { async_std::task::block_on(async {
1153            #
1154            use async_std::prelude::*;
1155            use async_std::stream;
1156
1157            let mut s  = stream::from_iter(vec![1u8, 2, 3]);
1158
1159            let fourth = s.nth(4).await;
1160            assert_eq!(fourth, None);
1161            #
1162            # }) }
1163            ```
1164        "#]
1165        fn nth(
1166            &mut self,
1167            n: usize,
1168        ) -> impl Future<Output = Option<Self::Item>> + '_ [NthFuture<'_, Self>]
1169        where
1170            Self: Unpin + Sized,
1171        {
1172            NthFuture::new(self, n)
1173        }
1174
1175        #[doc = r#"
1176            Tests if every element of the stream matches a predicate.
1177
1178            `all()` takes a closure that returns `true` or `false`. It applies
1179            this closure to each element of the stream, and if they all return
1180            `true`, then so does `all()`. If any of them return `false`, it
1181            returns `false`.
1182
1183            `all()` is short-circuiting; in other words, it will stop processing
1184            as soon as it finds a `false`, given that no matter what else happens,
1185            the result will also be `false`.
1186
1187            An empty stream returns `true`.
1188
1189            # Examples
1190
1191            Basic usage:
1192
1193            ```
1194            # fn main() { async_std::task::block_on(async {
1195            #
1196            use async_std::prelude::*;
1197            use async_std::stream;
1198
1199            let mut s = stream::repeat::<u32>(42).take(3);
1200            assert!(s.all(|x| x ==  42).await);
1201
1202            #
1203            # }) }
1204            ```
1205
1206            Empty stream:
1207
1208            ```
1209            # fn main() { async_std::task::block_on(async {
1210            #
1211            use async_std::prelude::*;
1212            use async_std::stream;
1213
1214            let mut s = stream::empty::<u32>();
1215            assert!(s.all(|_| false).await);
1216            #
1217            # }) }
1218            ```
1219        "#]
1220        #[inline]
1221        fn all<F>(
1222            &mut self,
1223            f: F,
1224        ) -> impl Future<Output = bool> + '_ [AllFuture<'_, Self, F, Self::Item>]
1225        where
1226            Self: Unpin + Sized,
1227            F: FnMut(Self::Item) -> bool,
1228        {
1229            AllFuture::new(self, f)
1230        }
1231
1232        #[doc = r#"
1233            Searches for an element in a stream that satisfies a predicate.
1234
1235            # Examples
1236
1237            Basic usage:
1238
1239            ```
1240            # fn main() { async_std::task::block_on(async {
1241            #
1242            use async_std::prelude::*;
1243            use async_std::stream;
1244
1245            let mut s = stream::from_iter(vec![1u8, 2, 3]);
1246            let res = s.find(|x| *x == 2).await;
1247            assert_eq!(res, Some(2));
1248            #
1249            # }) }
1250            ```
1251
1252            Resuming after a first find:
1253
1254            ```
1255            # fn main() { async_std::task::block_on(async {
1256            #
1257            use async_std::prelude::*;
1258            use async_std::stream;
1259
1260            let mut s= stream::from_iter(vec![1, 2, 3]);
1261            let res = s.find(|x| *x == 2).await;
1262            assert_eq!(res, Some(2));
1263
1264            let next = s.next().await;
1265            assert_eq!(next, Some(3));
1266            #
1267            # }) }
1268            ```
1269        "#]
1270        fn find<P>(
1271            &mut self,
1272            p: P,
1273        ) -> impl Future<Output = Option<Self::Item>> + '_ [FindFuture<'_, Self, P>]
1274        where
1275            Self: Unpin + Sized,
1276            P: FnMut(&Self::Item) -> bool,
1277        {
1278            FindFuture::new(self, p)
1279        }
1280
1281        #[doc = r#"
1282            Applies function to the elements of stream and returns the first non-none result.
1283
1284            ```
1285            # fn main() { async_std::task::block_on(async {
1286            #
1287            use async_std::prelude::*;
1288            use async_std::stream;
1289
1290            let mut s = stream::from_iter(vec!["lol", "NaN", "2", "5"]);
1291            let first_number = s.find_map(|s| s.parse().ok()).await;
1292
1293            assert_eq!(first_number, Some(2));
1294            #
1295            # }) }
1296            ```
1297        "#]
1298        fn find_map<F, B>(
1299            &mut self,
1300            f: F,
1301        ) -> impl Future<Output = Option<B>> + '_ [FindMapFuture<'_, Self, F>]
1302        where
1303            Self: Unpin + Sized,
1304            F: FnMut(Self::Item) -> Option<B>,
1305        {
1306            FindMapFuture::new(self, f)
1307        }
1308
1309        #[doc = r#"
1310            A combinator that applies a function to every element in a stream
1311            producing a single, final value.
1312
1313            # Examples
1314
1315            Basic usage:
1316
1317            ```
1318            # fn main() { async_std::task::block_on(async {
1319            #
1320            use async_std::prelude::*;
1321            use async_std::stream;
1322
1323            let s = stream::from_iter(vec![1u8, 2, 3]);
1324            let sum = s.fold(0, |acc, x| acc + x).await;
1325
1326            assert_eq!(sum, 6);
1327            #
1328            # }) }
1329            ```
1330        "#]
1331        fn fold<B, F>(
1332            self,
1333            init: B,
1334            f: F,
1335        ) -> impl Future<Output = B> [FoldFuture<Self, F, B>]
1336        where
1337            Self: Sized,
1338            F: FnMut(B, Self::Item) -> B,
1339        {
1340            FoldFuture::new(self, init, f)
1341        }
1342
1343        #[doc = r#"
1344            A combinator that applies a function to every element in a stream
1345            creating two collections from it.
1346
1347            # Examples
1348
1349            Basic usage:
1350
1351            ```
1352            # fn main() { async_std::task::block_on(async {
1353            #
1354            use async_std::prelude::*;
1355            use async_std::stream;
1356
1357            let (even, odd): (Vec<i32>, Vec<i32>) = stream::from_iter(vec![1, 2, 3])
1358                .partition(|&n| n % 2 == 0).await;
1359
1360            assert_eq!(even, vec![2]);
1361            assert_eq!(odd, vec![1, 3]);
1362
1363            #
1364            # }) }
1365            ```
1366        "#]
1367        #[cfg(feature = "unstable")]
1368        #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1369        fn partition<B, F>(
1370            self,
1371            f: F,
1372        ) -> impl Future<Output = (B, B)> [PartitionFuture<Self, F, B>]
1373        where
1374            Self: Sized,
1375            F: FnMut(&Self::Item) -> bool,
1376            B: Default + Extend<Self::Item>,
1377        {
1378            PartitionFuture::new(self, f)
1379        }
1380
1381        #[doc = r#"
1382            Call a closure on each element of the stream.
1383
1384            # Examples
1385
1386            ```
1387            # fn main() { async_std::task::block_on(async {
1388            #
1389            use async_std::prelude::*;
1390            use async_std::stream;
1391            use std::sync::mpsc::channel;
1392
1393            let (tx, rx) = channel();
1394
1395            let s = stream::from_iter(vec![1usize, 2, 3]);
1396            let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await;
1397
1398            let v: Vec<_> = rx.iter().collect();
1399
1400            assert_eq!(v, vec![1, 2, 3]);
1401            #
1402            # }) }
1403            ```
1404        "#]
1405        fn for_each<F>(
1406            self,
1407            f: F,
1408        ) -> impl Future<Output = ()> [ForEachFuture<Self, F>]
1409        where
1410            Self: Sized,
1411            F: FnMut(Self::Item),
1412        {
1413            ForEachFuture::new(self, f)
1414        }
1415
1416        #[doc = r#"
1417            Tests if any element of the stream matches a predicate.
1418
1419            `any()` takes a closure that returns `true` or `false`. It applies
1420            this closure to each element of the stream, and if any of them return
1421            `true`, then so does `any()`. If they all return `false`, it
1422            returns `false`.
1423
1424            `any()` is short-circuiting; in other words, it will stop processing
1425            as soon as it finds a `true`, given that no matter what else happens,
1426            the result will also be `true`.
1427
1428            An empty stream returns `false`.
1429
1430            # Examples
1431
1432            Basic usage:
1433
1434            ```
1435            # fn main() { async_std::task::block_on(async {
1436            #
1437            use async_std::prelude::*;
1438            use async_std::stream;
1439
1440            let mut s = stream::repeat::<u32>(42).take(3);
1441            assert!(s.any(|x| x ==  42).await);
1442            #
1443            # }) }
1444            ```
1445
1446            Empty stream:
1447
1448            ```
1449            # fn main() { async_std::task::block_on(async {
1450            #
1451            use async_std::prelude::*;
1452            use async_std::stream;
1453
1454            let mut s = stream::empty::<u32>();
1455            assert!(!s.any(|_| false).await);
1456            #
1457            # }) }
1458            ```
1459        "#]
1460        #[inline]
1461        fn any<F>(
1462            &mut self,
1463            f: F,
1464        ) -> impl Future<Output = bool> + '_ [AnyFuture<'_, Self, F, Self::Item>]
1465        where
1466            Self: Unpin + Sized,
1467            F: FnMut(Self::Item) -> bool,
1468        {
1469            AnyFuture::new(self, f)
1470        }
1471
1472        #[doc = r#"
1473            Borrows an stream, rather than consuming it.
1474
1475            This is useful to allow applying stream adaptors while still retaining ownership of the original stream.
1476
1477            # Examples
1478
1479            ```
1480            # fn main() { async_std::task::block_on(async {
1481            #
1482            use async_std::prelude::*;
1483            use async_std::stream;
1484
1485            let a = vec![1isize, 2, 3];
1486
1487            let stream = stream::from_iter(a);
1488
1489            let sum: isize = stream.take(5).sum().await;
1490
1491            assert_eq!(sum, 6);
1492
1493            // if we try to use stream again, it won't work. The following line
1494            // gives error: use of moved value: `stream`
1495            // assert_eq!(stream.next(), None);
1496
1497            // let's try that again
1498            let a = vec![1isize, 2, 3];
1499
1500            let mut stream = stream::from_iter(a);
1501
1502            // instead, we add in a .by_ref()
1503            let sum: isize = stream.by_ref().take(2).sum().await;
1504
1505            assert_eq!(sum, 3);
1506
1507            // now this is just fine:
1508            assert_eq!(stream.next().await, Some(3));
1509            assert_eq!(stream.next().await, None);
1510            #
1511            # }) }
1512            ```
1513        "#]
1514        #[cfg(feature = "unstable")]
1515        #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1516        fn by_ref(&mut self) -> &mut Self {
1517            self
1518        }
1519
1520        #[doc = r#"
1521            A stream adaptor similar to [`fold`] that holds internal state and produces a new
1522            stream.
1523
1524            [`fold`]: #method.fold
1525
1526            `scan()` takes two arguments: an initial value which seeds the internal state, and
1527            a closure with two arguments, the first being a mutable reference to the internal
1528            state and the second a stream element. The closure can assign to the internal state
1529            to share state between iterations.
1530
1531            On iteration, the closure will be applied to each element of the stream and the
1532            return value from the closure, an `Option`, is yielded by the stream.
1533
1534            ## Examples
1535
1536            ```
1537            # fn main() { async_std::task::block_on(async {
1538            #
1539            use async_std::prelude::*;
1540            use async_std::stream;
1541
1542            let s = stream::from_iter(vec![1isize, 2, 3]);
1543            let mut s = s.scan(1, |state, x| {
1544                *state = *state * x;
1545                Some(-*state)
1546            });
1547
1548            assert_eq!(s.next().await, Some(-1));
1549            assert_eq!(s.next().await, Some(-2));
1550            assert_eq!(s.next().await, Some(-6));
1551            assert_eq!(s.next().await, None);
1552            #
1553            # }) }
1554            ```
1555        "#]
1556        #[inline]
1557        fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1558        where
1559            Self: Sized,
1560            F: FnMut(&mut St, Self::Item) -> Option<B>,
1561        {
1562            Scan::new(self, initial_state, f)
1563        }
1564
1565        #[doc = r#"
1566            Combinator that `skip`s elements based on a predicate.
1567
1568            Takes a closure argument. It will call this closure on every element in
1569            the stream and ignore elements until it returns `false`.
1570
1571            After `false` is returned, `SkipWhile`'s job is over and all further
1572            elements in the strem are yielded.
1573
1574            ## Examples
1575
1576            ```
1577            # fn main() { async_std::task::block_on(async {
1578            #
1579            use async_std::prelude::*;
1580            use async_std::stream;
1581
1582            let a = stream::from_iter(vec![-1i32, 0, 1]);
1583            let mut s = a.skip_while(|x| x.is_negative());
1584
1585            assert_eq!(s.next().await, Some(0));
1586            assert_eq!(s.next().await, Some(1));
1587            assert_eq!(s.next().await, None);
1588            #
1589            # }) }
1590            ```
1591        "#]
1592        fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1593        where
1594            Self: Sized,
1595            P: FnMut(&Self::Item) -> bool,
1596        {
1597            SkipWhile::new(self, predicate)
1598        }
1599
1600        #[doc = r#"
1601            Creates a combinator that skips the first `n` elements.
1602
1603            ## Examples
1604
1605            ```
1606            # fn main() { async_std::task::block_on(async {
1607            #
1608            use async_std::prelude::*;
1609            use async_std::stream;
1610
1611            let s = stream::from_iter(vec![1u8, 2, 3]);
1612            let mut skipped = s.skip(2);
1613
1614            assert_eq!(skipped.next().await, Some(3));
1615            assert_eq!(skipped.next().await, None);
1616            #
1617            # }) }
1618            ```
1619        "#]
1620        fn skip(self, n: usize) -> Skip<Self>
1621        where
1622            Self: Sized,
1623        {
1624            Skip::new(self, n)
1625        }
1626
1627        #[doc=r#"
1628            Await a stream or times out after a duration of time.
1629
1630            If you want to await an I/O future consider using
1631            [`io::timeout`](../io/fn.timeout.html) instead.
1632
1633            # Examples
1634
1635            ```
1636            # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
1637            #
1638            use std::time::Duration;
1639
1640            use async_std::stream;
1641            use async_std::prelude::*;
1642
1643            let mut s = stream::repeat(1).take(3).timeout(Duration::from_secs(1));
1644
1645            while let Some(v) = s.next().await {
1646                assert_eq!(v, Ok(1));
1647            }
1648
1649            // when timeout
1650            let mut s = stream::pending::<()>().timeout(Duration::from_millis(10));
1651            match s.next().await {
1652                Some(item) => assert!(item.is_err()),
1653                None => panic!()
1654            };
1655            #
1656            # Ok(()) }) }
1657            ```
1658        "#]
1659        #[cfg(any(feature = "unstable", feature = "docs"))]
1660        #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1661        fn timeout(self, dur: Duration) -> Timeout<Self>
1662        where
1663            Self: Stream + Sized,
1664        {
1665            Timeout::new(self, dur)
1666        }
1667
1668        #[doc = r#"
1669            A combinator that applies a function as long as it returns successfully, producing a single, final value.
1670            Immediately returns the error when the function returns unsuccessfully.
1671
1672            # Examples
1673
1674            Basic usage:
1675
1676            ```
1677            # fn main() { async_std::task::block_on(async {
1678            #
1679            use async_std::prelude::*;
1680            use async_std::stream;
1681
1682            let mut s = stream::from_iter(vec![1usize, 2, 3]);
1683            let sum = s.try_fold(0, |acc, v| {
1684                if (acc+v) % 2 == 1 {
1685                    Ok(v+3)
1686                } else {
1687                    Err("fail")
1688                }
1689            }).await;
1690
1691            assert_eq!(sum, Err("fail"));
1692            #
1693            # }) }
1694            ```
1695        "#]
1696        fn try_fold<B, F, T, E>(
1697            &mut self,
1698            init: T,
1699            f: F,
1700        ) -> impl Future<Output = Result<T, E>> + '_ [TryFoldFuture<'_, Self, F, T>]
1701        where
1702            Self: Unpin + Sized,
1703            F: FnMut(B, Self::Item) -> Result<T, E>,
1704        {
1705            TryFoldFuture::new(self, init, f)
1706        }
1707
1708        #[doc = r#"
1709            Applies a falliable function to each element in a stream, stopping at first error and returning it.
1710
1711            # Examples
1712
1713            ```
1714            # fn main() { async_std::task::block_on(async {
1715            #
1716            use std::sync::mpsc::channel;
1717            use async_std::prelude::*;
1718            use async_std::stream;
1719
1720            let (tx, rx) = channel();
1721
1722            let mut s = stream::from_iter(vec![1u8, 2, 3]);
1723            let s = s.try_for_each(|v| {
1724                if v % 2 == 1 {
1725                    tx.clone().send(v).unwrap();
1726                    Ok(())
1727                } else {
1728                    Err("even")
1729                }
1730            });
1731
1732            let res = s.await;
1733            drop(tx);
1734            let values: Vec<_> = rx.iter().collect();
1735
1736            assert_eq!(values, vec![1]);
1737            assert_eq!(res, Err("even"));
1738            #
1739            # }) }
1740            ```
1741        "#]
1742        fn try_for_each<F, E>(
1743            &mut self,
1744            f: F,
1745        ) -> impl Future<Output = E> + 'a [TryForEachFuture<'_, Self, F>]
1746        where
1747            Self: Unpin + Sized,
1748            F: FnMut(Self::Item) -> Result<(), E>,
1749        {
1750            TryForEachFuture::new(self, f)
1751        }
1752
1753        #[doc = r#"
1754            'Zips up' two streams into a single stream of pairs.
1755
1756            `zip()` returns a new stream that will iterate over two other streams, returning a
1757            tuple where the first element comes from the first stream, and the second element
1758            comes from the second stream.
1759
1760            In other words, it zips two streams together, into a single one.
1761
1762            If either stream returns [`None`], [`poll_next`] from the zipped stream will return
1763            [`None`]. If the first stream returns [`None`], `zip` will short-circuit and
1764            `poll_next` will not be called on the second stream.
1765
1766            [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
1767            [`poll_next`]: #tymethod.poll_next
1768
1769            ## Examples
1770
1771            ```
1772            # fn main() { async_std::task::block_on(async {
1773            #
1774            use async_std::prelude::*;
1775            use async_std::stream;
1776
1777            let l = stream::from_iter(vec![1u8, 2, 3]);
1778            let r = stream::from_iter(vec![4u8, 5, 6, 7]);
1779            let mut s = l.zip(r);
1780
1781            assert_eq!(s.next().await, Some((1, 4)));
1782            assert_eq!(s.next().await, Some((2, 5)));
1783            assert_eq!(s.next().await, Some((3, 6)));
1784            assert_eq!(s.next().await, None);
1785            #
1786            # }) }
1787            ```
1788        "#]
1789        #[inline]
1790        fn zip<U>(self, other: U) -> Zip<Self, U>
1791        where
1792            Self: Sized,
1793            U: Stream,
1794        {
1795            Zip::new(self, other)
1796        }
1797
1798        #[doc = r#"
1799            Converts an stream of pairs into a pair of containers.
1800
1801            `unzip()` consumes an entire stream of pairs, producing two collections: one from the left elements of the pairs, and one from the right elements.
1802
1803            This function is, in some sense, the opposite of [`zip`].
1804
1805            [`zip`]: trait.Stream.html#method.zip
1806
1807            # Example
1808
1809            ```
1810            # fn main() { async_std::task::block_on(async {
1811            #
1812            use async_std::prelude::*;
1813            use async_std::stream;
1814
1815            let s  = stream::from_iter(vec![(1,2), (3,4)]);
1816
1817            let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1818
1819            assert_eq!(left, [1, 3]);
1820            assert_eq!(right, [2, 4]);
1821            #
1822            # }) }
1823            ```
1824        "#]
1825        #[cfg(feature = "unstable")]
1826        #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1827        fn unzip<A, B, FromA, FromB>(self) -> impl Future<Output = (FromA, FromB)> [UnzipFuture<Self, FromA, FromB>]
1828        where
1829        FromA: Default + Extend<A>,
1830        FromB: Default + Extend<B>,
1831        Self: Stream<Item = (A, B)> + Sized,
1832        {
1833            UnzipFuture::new(self)
1834        }
1835
1836        #[doc = r#"
1837            Transforms a stream into a collection.
1838
1839            `collect()` can take anything streamable, and turn it into a relevant
1840            collection. This is one of the more powerful methods in the async
1841            standard library, used in a variety of contexts.
1842
1843            The most basic pattern in which `collect()` is used is to turn one
1844            collection into another. You take a collection, call [`into_stream`] on it,
1845            do a bunch of transformations, and then `collect()` at the end.
1846
1847            Because `collect()` is so general, it can cause problems with type
1848            inference. As such, `collect()` is one of the few times you'll see
1849            the syntax affectionately known as the 'turbofish': `::<>`. This
1850            helps the inference algorithm understand specifically which collection
1851            you're trying to collect into.
1852
1853            # Examples
1854
1855            ```
1856            # fn main() { async_std::task::block_on(async {
1857            #
1858            use async_std::prelude::*;
1859            use async_std::stream;
1860
1861            let s = stream::repeat(9u8).take(3);
1862            let buf: Vec<u8> = s.collect().await;
1863
1864            assert_eq!(buf, vec![9; 3]);
1865
1866            // You can also collect streams of Result values
1867            // into any collection that implements FromStream
1868            let s = stream::repeat(Ok(9)).take(3);
1869            // We are using Vec here, but other collections
1870            // are supported as well
1871            let buf: Result<Vec<u8>, ()> = s.collect().await;
1872
1873            assert_eq!(buf, Ok(vec![9; 3]));
1874
1875            // The stream will stop on the first Err and
1876            // return that instead
1877            let s = stream::repeat(Err(5)).take(3);
1878            let buf: Result<Vec<u8>, u8> = s.collect().await;
1879
1880            assert_eq!(buf, Err(5));
1881            #
1882            # }) }
1883            ```
1884
1885            [`into_stream`]: trait.IntoStream.html#tymethod.into_stream
1886        "#]
1887        #[cfg(feature = "unstable")]
1888        #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1889        fn collect<'a, B>(
1890            self,
1891        ) -> impl Future<Output = B> + 'a [Pin<Box<dyn Future<Output = B> + 'a>>]
1892        where
1893            Self: Sized + 'a,
1894            B: FromStream<Self::Item>,
1895        {
1896            FromStream::from_stream(self)
1897        }
1898
1899        #[doc = r#"
1900            Combines multiple streams into a single stream of all their outputs.
1901
1902            Items are yielded as soon as they're received, and the stream continues yield until
1903            both streams have been exhausted. The output ordering between streams is not guaranteed.
1904
1905            # Examples
1906
1907            ```
1908            # async_std::task::block_on(async {
1909            use async_std::prelude::*;
1910            use async_std::stream::{self, FromStream};
1911
1912            let a = stream::once(1u8);
1913            let b = stream::once(2u8);
1914            let c = stream::once(3u8);
1915
1916            let s = a.merge(b).merge(c);
1917            let mut lst = Vec::from_stream(s).await;
1918
1919            lst.sort_unstable();
1920            assert_eq!(&lst, &[1u8, 2u8, 3u8]);
1921            # });
1922            ```
1923        "#]
1924        #[cfg(feature = "unstable")]
1925        #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1926        fn merge<U>(self, other: U) -> Merge<Self, U>
1927        where
1928            Self: Sized,
1929            U: Stream<Item = Self::Item> + Sized,
1930        {
1931            Merge::new(self, other)
1932        }
1933
1934        #[doc = r#"
1935            Lexicographically compares the elements of this `Stream` with those
1936            of another.
1937
1938            # Examples
1939
1940            ```
1941            # fn main() { async_std::task::block_on(async {
1942            #
1943            use async_std::prelude::*;
1944            use async_std::stream;
1945
1946            use std::cmp::Ordering;
1947
1948            let s1 = stream::from_iter(vec![1]);
1949            let s2 = stream::from_iter(vec![1, 2]);
1950            let s3 = stream::from_iter(vec![1, 2, 3]);
1951            let s4 = stream::from_iter(vec![1, 2, 4]);
1952            assert_eq!(s1.clone().partial_cmp(s1.clone()).await, Some(Ordering::Equal));
1953            assert_eq!(s1.clone().partial_cmp(s2.clone()).await, Some(Ordering::Less));
1954            assert_eq!(s2.clone().partial_cmp(s1.clone()).await, Some(Ordering::Greater));
1955            assert_eq!(s3.clone().partial_cmp(s4.clone()).await, Some(Ordering::Less));
1956            assert_eq!(s4.clone().partial_cmp(s3.clone()).await, Some(Ordering::Greater));
1957            #
1958            # }) }
1959            ```
1960        "#]
1961        fn partial_cmp<S>(
1962           self,
1963           other: S
1964        ) -> impl Future<Output = Option<Ordering>>  [PartialCmpFuture<Self, S>]
1965        where
1966            Self: Sized + Stream,
1967            S: Stream,
1968            <Self as Stream>::Item: PartialOrd<S::Item>,
1969        {
1970            PartialCmpFuture::new(self, other)
1971        }
1972
1973        #[doc = r#"
1974            Searches for an element in a Stream that satisfies a predicate, returning
1975            its index.
1976
1977            # Examples
1978
1979            ```
1980            # fn main() { async_std::task::block_on(async {
1981            #
1982            use async_std::prelude::*;
1983            use async_std::stream;
1984
1985            let s = stream::from_iter(vec![1usize, 2, 3]);
1986            let res = s.clone().position(|x| x == 1).await;
1987            assert_eq!(res, Some(0));
1988
1989            let res = s.clone().position(|x| x == 2).await;
1990            assert_eq!(res, Some(1));
1991
1992            let res = s.clone().position(|x| x == 3).await;
1993            assert_eq!(res, Some(2));
1994
1995            let res = s.clone().position(|x| x == 4).await;
1996            assert_eq!(res, None);
1997            #
1998            # }) }
1999            ```
2000        "#]
2001        fn position<P>(
2002           &mut self,
2003           predicate: P,
2004        ) -> impl Future<Output = Option<usize>> + '_ [PositionFuture<'_, Self, P>]
2005        where
2006            Self: Unpin + Sized,
2007            P: FnMut(Self::Item) -> bool,
2008        {
2009            PositionFuture::new(self, predicate)
2010        }
2011
2012        #[doc = r#"
2013            Lexicographically compares the elements of this `Stream` with those
2014            of another using 'Ord'.
2015
2016            # Examples
2017
2018            ```
2019            # fn main() { async_std::task::block_on(async {
2020            #
2021            use async_std::prelude::*;
2022            use async_std::stream;
2023            use std::cmp::Ordering;
2024
2025            let s1 = stream::from_iter(vec![1]);
2026            let s2 = stream::from_iter(vec![1, 2]);
2027            let s3 = stream::from_iter(vec![1, 2, 3]);
2028            let s4 = stream::from_iter(vec![1, 2, 4]);
2029
2030            assert_eq!(s1.clone().cmp(s1.clone()).await, Ordering::Equal);
2031            assert_eq!(s1.clone().cmp(s2.clone()).await, Ordering::Less);
2032            assert_eq!(s2.clone().cmp(s1.clone()).await, Ordering::Greater);
2033            assert_eq!(s3.clone().cmp(s4.clone()).await, Ordering::Less);
2034            assert_eq!(s4.clone().cmp(s3.clone()).await, Ordering::Greater);
2035            #
2036            # }) }
2037            ```
2038        "#]
2039        fn cmp<S>(
2040           self,
2041           other: S
2042        ) -> impl Future<Output = Ordering> [CmpFuture<Self, S>]
2043        where
2044            Self: Sized + Stream,
2045            S: Stream,
2046            <Self as Stream>::Item: Ord
2047        {
2048            CmpFuture::new(self, other)
2049        }
2050
2051        #[doc = r#"
2052            Counts the number of elements in the stream.
2053
2054            # Examples
2055
2056            ```
2057            # fn main() { async_std::task::block_on(async {
2058            #
2059            use async_std::prelude::*;
2060            use async_std::stream;
2061
2062            let s1 = stream::from_iter(vec![0]);
2063            let s2 = stream::from_iter(vec![1, 2, 3]);
2064
2065            assert_eq!(s1.count().await, 1);
2066            assert_eq!(s2.count().await, 3);
2067            #
2068            # }) }
2069            ```
2070        "#]
2071        #[cfg(feature = "unstable")]
2072        #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
2073        fn count(self) -> impl Future<Output = usize> [CountFuture<Self>]
2074        where
2075            Self: Sized,
2076        {
2077            CountFuture::new(self)
2078        }
2079
2080        #[doc = r#"
2081            Determines if the elements of this `Stream` are lexicographically
2082            not equal to those of another.
2083
2084            # Examples
2085
2086            ```
2087            # fn main() { async_std::task::block_on(async {
2088            #
2089            use async_std::prelude::*;
2090            use async_std::stream;
2091
2092            let single     = stream::from_iter(vec![1usize]);
2093            let single_ne  = stream::from_iter(vec![10usize]);
2094            let multi      = stream::from_iter(vec![1usize,2]);
2095            let multi_ne   = stream::from_iter(vec![1usize,5]);
2096
2097            assert_eq!(single.clone().ne(single.clone()).await, false);
2098            assert_eq!(single_ne.clone().ne(single.clone()).await, true);
2099            assert_eq!(multi.clone().ne(single_ne.clone()).await, true);
2100            assert_eq!(multi_ne.clone().ne(multi.clone()).await, true);
2101            #
2102            # }) }
2103            ```
2104        "#]
2105        fn ne<S>(
2106           self,
2107           other: S
2108        ) -> impl Future<Output = bool> [NeFuture<Self, S>]
2109        where
2110            Self: Sized,
2111            S: Sized + Stream,
2112            <Self as Stream>::Item: PartialEq<S::Item>,
2113        {
2114            NeFuture::new(self, other)
2115        }
2116
2117        #[doc = r#"
2118            Determines if the elements of this `Stream` are lexicographically
2119            greater than or equal to those of another.
2120
2121            # Examples
2122
2123            ```
2124            # fn main() { async_std::task::block_on(async {
2125            #
2126            use async_std::prelude::*;
2127            use async_std::stream;
2128
2129            let single    = stream::from_iter(vec![1]);
2130            let single_gt = stream::from_iter(vec![10]);
2131            let multi     = stream::from_iter(vec![1,2]);
2132            let multi_gt  = stream::from_iter(vec![1,5]);
2133
2134            assert_eq!(single.clone().ge(single.clone()).await, true);
2135            assert_eq!(single_gt.clone().ge(single.clone()).await, true);
2136            assert_eq!(multi.clone().ge(single_gt.clone()).await, false);
2137            assert_eq!(multi_gt.clone().ge(multi.clone()).await, true);
2138            #
2139            # }) }
2140            ```
2141        "#]
2142        fn ge<S>(
2143           self,
2144           other: S
2145        ) -> impl Future<Output = bool> [GeFuture<Self, S>]
2146        where
2147            Self: Sized + Stream,
2148            S: Stream,
2149            <Self as Stream>::Item: PartialOrd<S::Item>,
2150        {
2151            GeFuture::new(self, other)
2152        }
2153
2154        #[doc = r#"
2155            Determines if the elements of this `Stream` are lexicographically
2156            equal to those of another.
2157
2158            # Examples
2159
2160            ```
2161            # fn main() { async_std::task::block_on(async {
2162            #
2163            use async_std::prelude::*;
2164            use async_std::stream;
2165
2166            let single     = stream::from_iter(vec![1]);
2167            let single_eq  = stream::from_iter(vec![10]);
2168            let multi      = stream::from_iter(vec![1,2]);
2169            let multi_eq   = stream::from_iter(vec![1,5]);
2170
2171            assert_eq!(single.clone().eq(single.clone()).await, true);
2172            assert_eq!(single_eq.clone().eq(single.clone()).await, false);
2173            assert_eq!(multi.clone().eq(single_eq.clone()).await, false);
2174            assert_eq!(multi_eq.clone().eq(multi.clone()).await, false);
2175            #
2176            # }) }
2177            ```
2178        "#]
2179        fn eq<S>(
2180           self,
2181           other: S
2182        ) -> impl Future<Output = bool> [EqFuture<Self, S>]
2183        where
2184            Self: Sized + Stream,
2185            S: Sized + Stream,
2186            <Self as Stream>::Item: PartialEq<S::Item>,
2187        {
2188            EqFuture::new(self, other)
2189        }
2190
2191        #[doc = r#"
2192            Determines if the elements of this `Stream` are lexicographically
2193            greater than those of another.
2194
2195            # Examples
2196
2197            ```
2198            # fn main() { async_std::task::block_on(async {
2199            #
2200            use async_std::prelude::*;
2201            use async_std::stream;
2202
2203            let single = stream::from_iter(vec![1]);
2204            let single_gt = stream::from_iter(vec![10]);
2205            let multi = stream::from_iter(vec![1,2]);
2206            let multi_gt = stream::from_iter(vec![1,5]);
2207
2208            assert_eq!(single.clone().gt(single.clone()).await, false);
2209            assert_eq!(single_gt.clone().gt(single.clone()).await, true);
2210            assert_eq!(multi.clone().gt(single_gt.clone()).await, false);
2211            assert_eq!(multi_gt.clone().gt(multi.clone()).await, true);
2212            #
2213            # }) }
2214            ```
2215        "#]
2216        fn gt<S>(
2217           self,
2218           other: S
2219        ) -> impl Future<Output = bool> [GtFuture<Self, S>]
2220        where
2221            Self: Sized + Stream,
2222            S: Stream,
2223            <Self as Stream>::Item: PartialOrd<S::Item>,
2224        {
2225            GtFuture::new(self, other)
2226        }
2227
2228        #[doc = r#"
2229            Determines if the elements of this `Stream` are lexicographically
2230            less or equal to those of another.
2231
2232            # Examples
2233
2234            ```
2235            # fn main() { async_std::task::block_on(async {
2236            #
2237            use async_std::prelude::*;
2238            use async_std::stream;
2239
2240            let single = stream::from_iter(vec![1]);
2241            let single_gt = stream::from_iter(vec![10]);
2242            let multi = stream::from_iter(vec![1,2]);
2243            let multi_gt = stream::from_iter(vec![1,5]);
2244
2245            assert_eq!(single.clone().le(single.clone()).await, true);
2246            assert_eq!(single.clone().le(single_gt.clone()).await, true);
2247            assert_eq!(multi.clone().le(single_gt.clone()).await, true);
2248            assert_eq!(multi_gt.clone().le(multi.clone()).await, false);
2249            #
2250            # }) }
2251            ```
2252        "#]
2253        fn le<S>(
2254           self,
2255           other: S
2256        ) -> impl Future<Output = bool> [LeFuture<Self, S>]
2257        where
2258            Self: Sized + Stream,
2259            S: Stream,
2260            <Self as Stream>::Item: PartialOrd<S::Item>,
2261        {
2262            LeFuture::new(self, other)
2263        }
2264
2265        #[doc = r#"
2266            Determines if the elements of this `Stream` are lexicographically
2267            less than those of another.
2268
2269            # Examples
2270
2271            ```
2272            # fn main() { async_std::task::block_on(async {
2273            #
2274            use async_std::prelude::*;
2275            use async_std::stream;
2276
2277            let single = stream::from_iter(vec![1]);
2278            let single_gt = stream::from_iter(vec![10]);
2279            let multi = stream::from_iter(vec![1,2]);
2280            let multi_gt = stream::from_iter(vec![1,5]);
2281
2282            assert_eq!(single.clone().lt(single.clone()).await, false);
2283            assert_eq!(single.clone().lt(single_gt.clone()).await, true);
2284            assert_eq!(multi.clone().lt(single_gt.clone()).await, true);
2285            assert_eq!(multi_gt.clone().lt(multi.clone()).await, false);
2286            #
2287            # }) }
2288            ```
2289        "#]
2290        fn lt<S>(
2291           self,
2292           other: S
2293        ) -> impl Future<Output = bool> [LtFuture<Self, S>]
2294        where
2295            Self: Sized + Stream,
2296            S: Stream,
2297            <Self as Stream>::Item: PartialOrd<S::Item>,
2298        {
2299            LtFuture::new(self, other)
2300        }
2301
2302        #[doc = r#"
2303            Sums the elements of a stream.
2304
2305            Takes each element, adds them together, and returns the result.
2306
2307            An empty streams returns the zero value of the type.
2308
2309            # Panics
2310
2311            When calling `sum()` and a primitive integer type is being returned, this
2312            method will panic if the computation overflows and debug assertions are
2313            enabled.
2314
2315            # Examples
2316
2317            Basic usage:
2318
2319            ```
2320            # fn main() { async_std::task::block_on(async {
2321            #
2322            use async_std::prelude::*;
2323            use async_std::stream;
2324
2325            let s = stream::from_iter(vec![0u8, 1, 2, 3, 4]);
2326            let sum: u8 = s.sum().await;
2327
2328            assert_eq!(sum, 10);
2329            #
2330            # }) }
2331            ```
2332        "#]
2333        #[cfg(feature = "unstable")]
2334        #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
2335        fn sum<'a, S>(
2336            self,
2337        ) -> impl Future<Output = S> + 'a [Pin<Box<dyn Future<Output = S> + 'a>>]
2338        where
2339            Self: Sized + Stream<Item = S> + 'a,
2340            S: Sum<Self::Item>,
2341        {
2342            Sum::sum(self)
2343        }
2344
2345        #[doc = r#"
2346            Multiplies all elements of the stream.
2347
2348            An empty stream returns the one value of the type.
2349
2350            # Panics
2351
2352            When calling `product()` and a primitive integer type is being returned,
2353            method will panic if the computation overflows and debug assertions are
2354            enabled.
2355
2356            # Examples
2357
2358            This example calculates the factorial of n (i.e. the product of the numbers from 1 to
2359            n, inclusive):
2360
2361            ```
2362            # fn main() { async_std::task::block_on(async {
2363            #
2364            async fn factorial(n: u32) -> u32 {
2365                use async_std::prelude::*;
2366                use async_std::stream;
2367
2368                let s = stream::from_iter(1..=n);
2369                s.product().await
2370            }
2371
2372            assert_eq!(factorial(0).await, 1);
2373            assert_eq!(factorial(1).await, 1);
2374            assert_eq!(factorial(5).await, 120);
2375            #
2376            # }) }
2377            ```
2378        "#]
2379        #[cfg(feature = "unstable")]
2380        #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
2381        fn product<'a, P>(
2382            self,
2383        ) -> impl Future<Output = P> + 'a [Pin<Box<dyn Future<Output = P> + 'a>>]
2384        where
2385            Self: Sized + Stream<Item = P> + 'a,
2386            P: Product,
2387        {
2388            Product::product(self)
2389        }
2390    }
2391
2392    impl<S: Stream + Unpin + ?Sized> Stream for Box<S> {
2393        type Item = S::Item;
2394
2395        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2396            unreachable!("this impl only appears in the rendered docs")
2397        }
2398    }
2399
2400    impl<S: Stream + Unpin + ?Sized> Stream for &mut S {
2401        type Item = S::Item;
2402
2403        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2404            unreachable!("this impl only appears in the rendered docs")
2405        }
2406    }
2407
2408    impl<P> Stream for Pin<P>
2409    where
2410        P: DerefMut + Unpin,
2411        <P as Deref>::Target: Stream,
2412    {
2413        type Item = <<P as Deref>::Target as Stream>::Item;
2414
2415        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2416            unreachable!("this impl only appears in the rendered docs")
2417        }
2418    }
2419
2420    impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> {
2421        type Item = S::Item;
2422
2423        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2424            unreachable!("this impl only appears in the rendered docs")
2425        }
2426    }
2427}