Skip to main content

futures_util/stream/try_stream/
mod.rs

1//! Streams
2//!
3//! This module contains a number of functions for working with `Streams`s
4//! that return `Result`s, allowing for short-circuiting computations.
5
6#[cfg(feature = "compat")]
7use crate::compat::Compat;
8use crate::fns::{
9    inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, InspectErrFn, InspectOkFn,
10    IntoFn, MapErrFn, MapOkFn,
11};
12use crate::future::assert_future;
13use crate::stream::assert_stream;
14use crate::stream::{Inspect, Map};
15#[cfg(feature = "alloc")]
16use alloc::vec::Vec;
17use core::pin::Pin;
18
19use futures_core::{
20    future::{Future, TryFuture},
21    stream::TryStream,
22    task::{Context, Poll},
23};
24
25mod and_then;
26#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
27pub use self::and_then::AndThen;
28
29delegate_all!(
30    /// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
31    ErrInto<St, E>(
32        MapErr<St, IntoFn<E>>
33    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())]
34);
35
36delegate_all!(
37    /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
38    InspectOk<St, F>(
39        Inspect<IntoStream<St>, InspectOkFn<F>>
40    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))]
41);
42
43delegate_all!(
44    /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
45    InspectErr<St, F>(
46        Inspect<IntoStream<St>, InspectErrFn<F>>
47    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))]
48);
49
50mod into_stream;
51#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
52pub use self::into_stream::IntoStream;
53
54delegate_all!(
55    /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
56    MapOk<St, F>(
57        Map<IntoStream<St>, MapOkFn<F>>
58    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))]
59);
60
61delegate_all!(
62    /// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
63    MapErr<St, F>(
64        Map<IntoStream<St>, MapErrFn<F>>
65    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))]
66);
67
68mod or_else;
69#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
70pub use self::or_else::OrElse;
71
72mod try_next;
73#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
74pub use self::try_next::TryNext;
75
76mod try_for_each;
77#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
78pub use self::try_for_each::TryForEach;
79
80mod try_filter;
81#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
82pub use self::try_filter::TryFilter;
83
84mod try_filter_map;
85#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
86pub use self::try_filter_map::TryFilterMap;
87
88mod try_flatten;
89#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
90pub use self::try_flatten::TryFlatten;
91
92#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
93#[cfg(feature = "alloc")]
94mod try_flatten_unordered;
95#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
96#[cfg(feature = "alloc")]
97#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
98pub use self::try_flatten_unordered::TryFlattenUnordered;
99
100mod try_collect;
101#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
102pub use self::try_collect::TryCollect;
103
104mod try_concat;
105#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
106pub use self::try_concat::TryConcat;
107
108#[cfg(feature = "alloc")]
109mod try_chunks;
110#[cfg(feature = "alloc")]
111#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
112pub use self::try_chunks::{TryChunks, TryChunksError};
113
114#[cfg(feature = "alloc")]
115mod try_ready_chunks;
116#[cfg(feature = "alloc")]
117#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
118pub use self::try_ready_chunks::{TryReadyChunks, TryReadyChunksError};
119
120mod try_fold;
121#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
122pub use self::try_fold::TryFold;
123
124mod try_unfold;
125#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
126pub use self::try_unfold::{try_unfold, TryUnfold};
127
128mod try_skip_while;
129#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
130pub use self::try_skip_while::TrySkipWhile;
131
132mod try_take_while;
133#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
134pub use self::try_take_while::TryTakeWhile;
135
136#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
137#[cfg(feature = "alloc")]
138mod try_buffer_unordered;
139#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
140#[cfg(feature = "alloc")]
141#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
142pub use self::try_buffer_unordered::TryBufferUnordered;
143
144#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
145#[cfg(feature = "alloc")]
146mod try_buffered;
147#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
148#[cfg(feature = "alloc")]
149#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
150pub use self::try_buffered::TryBuffered;
151
152#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
153#[cfg(feature = "alloc")]
154mod try_for_each_concurrent;
155#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
156#[cfg(feature = "alloc")]
157#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
158pub use self::try_for_each_concurrent::TryForEachConcurrent;
159
160#[cfg(feature = "io")]
161#[cfg(feature = "std")]
162mod into_async_read;
163#[cfg(feature = "io")]
164#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
165#[cfg(feature = "std")]
166#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
167pub use self::into_async_read::IntoAsyncRead;
168
169mod try_all;
170#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
171pub use self::try_all::TryAll;
172
173mod try_any;
174#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
175pub use self::try_any::TryAny;
176
177impl<S: ?Sized + TryStream> TryStreamExt for S {}
178
179/// Adapters specific to `Result`-returning streams
180pub trait TryStreamExt: TryStream {
181    /// Wraps the current stream in a new stream which converts the error type
182    /// into the one provided.
183    ///
184    /// # Examples
185    ///
186    /// ```
187    /// # futures::executor::block_on(async {
188    /// use futures::stream::{self, TryStreamExt};
189    ///
190    /// let mut stream =
191    ///     stream::iter(vec![Ok(()), Err(5i32)])
192    ///         .err_into::<i64>();
193    ///
194    /// assert_eq!(stream.try_next().await, Ok(Some(())));
195    /// assert_eq!(stream.try_next().await, Err(5i64));
196    /// # })
197    /// ```
198    fn err_into<E>(self) -> ErrInto<Self, E>
199    where
200        Self: Sized,
201        Self::Error: Into<E>,
202    {
203        assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self))
204    }
205
206    /// Wraps the current stream in a new stream which maps the success value
207    /// using the provided closure.
208    ///
209    /// # Examples
210    ///
211    /// ```
212    /// # futures::executor::block_on(async {
213    /// use futures::stream::{self, TryStreamExt};
214    ///
215    /// let mut stream =
216    ///     stream::iter(vec![Ok(5), Err(0)])
217    ///         .map_ok(|x| x + 2);
218    ///
219    /// assert_eq!(stream.try_next().await, Ok(Some(7)));
220    /// assert_eq!(stream.try_next().await, Err(0));
221    /// # })
222    /// ```
223    fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
224    where
225        Self: Sized,
226        F: FnMut(Self::Ok) -> T,
227    {
228        assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f))
229    }
230
231    /// Wraps the current stream in a new stream which maps the error value
232    /// using the provided closure.
233    ///
234    /// # Examples
235    ///
236    /// ```
237    /// # futures::executor::block_on(async {
238    /// use futures::stream::{self, TryStreamExt};
239    ///
240    /// let mut stream =
241    ///     stream::iter(vec![Ok(5), Err(0)])
242    ///         .map_err(|x| x + 2);
243    ///
244    /// assert_eq!(stream.try_next().await, Ok(Some(5)));
245    /// assert_eq!(stream.try_next().await, Err(2));
246    /// # })
247    /// ```
248    fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
249    where
250        Self: Sized,
251        F: FnMut(Self::Error) -> E,
252    {
253        assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f))
254    }
255
256    /// Chain on a computation for when a value is ready, passing the successful
257    /// results to the provided closure `f`.
258    ///
259    /// This function can be used to run a unit of work when the next successful
260    /// value on a stream is ready. The closure provided will be yielded a value
261    /// when ready, and the returned future will then be run to completion to
262    /// produce the next value on this stream.
263    ///
264    /// Any errors produced by this stream will not be passed to the closure,
265    /// and will be passed through.
266    ///
267    /// The returned value of the closure must implement the `TryFuture` trait
268    /// and can represent some more work to be done before the composed stream
269    /// is finished.
270    ///
271    /// Note that this function consumes the receiving stream and returns a
272    /// wrapped version of it.
273    ///
274    /// To process the entire stream and return a single future representing
275    /// success or error, use `try_for_each` instead.
276    ///
277    /// # Examples
278    ///
279    /// ```
280    /// use futures::channel::mpsc;
281    /// use futures::future;
282    /// use futures::stream::TryStreamExt;
283    ///
284    /// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1);
285    ///
286    /// let rx = rx.and_then(|result| {
287    ///     future::ok(if result % 2 == 0 {
288    ///         Some(result)
289    ///     } else {
290    ///         None
291    ///     })
292    /// });
293    /// ```
294    fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
295    where
296        F: FnMut(Self::Ok) -> Fut,
297        Fut: TryFuture<Error = Self::Error>,
298        Self: Sized,
299    {
300        assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f))
301    }
302
303    /// Chain on a computation for when an error happens, passing the
304    /// erroneous result to the provided closure `f`.
305    ///
306    /// This function can be used to run a unit of work and attempt to recover from
307    /// an error if one happens. The closure provided will be yielded an error
308    /// when one appears, and the returned future will then be run to completion
309    /// to produce the next value on this stream.
310    ///
311    /// Any successful values produced by this stream will not be passed to the
312    /// closure, and will be passed through.
313    ///
314    /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait
315    /// and can represent some more work to be done before the composed stream
316    /// is finished.
317    ///
318    /// Note that this function consumes the receiving stream and returns a
319    /// wrapped version of it.
320    fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
321    where
322        F: FnMut(Self::Error) -> Fut,
323        Fut: TryFuture<Ok = Self::Ok>,
324        Self: Sized,
325    {
326        assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f))
327    }
328
329    /// Do something with the success value of this stream, afterwards passing
330    /// it on.
331    ///
332    /// This is similar to the `StreamExt::inspect` method where it allows
333    /// easily inspecting the success value as it passes through the stream, for
334    /// example to debug what's going on.
335    fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
336    where
337        F: FnMut(&Self::Ok),
338        Self: Sized,
339    {
340        assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f))
341    }
342
343    /// Do something with the error value of this stream, afterwards passing it on.
344    ///
345    /// This is similar to the `StreamExt::inspect` method where it allows
346    /// easily inspecting the error value as it passes through the stream, for
347    /// example to debug what's going on.
348    fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
349    where
350        F: FnMut(&Self::Error),
351        Self: Sized,
352    {
353        assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f))
354    }
355
356    /// Wraps a [`TryStream`] into a type that implements
357    /// [`Stream`](futures_core::stream::Stream)
358    ///
359    /// [`TryStream`]s currently do not implement the
360    /// [`Stream`](futures_core::stream::Stream) trait because of limitations
361    /// of the compiler.
362    ///
363    /// # Examples
364    ///
365    /// ```
366    /// use futures::stream::{Stream, TryStream, TryStreamExt};
367    ///
368    /// # type T = i32;
369    /// # type E = ();
370    /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... }
371    /// # futures::stream::empty()
372    /// # }
373    /// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ }
374    ///
375    /// take_stream(make_try_stream().into_stream());
376    /// ```
377    fn into_stream(self) -> IntoStream<Self>
378    where
379        Self: Sized,
380    {
381        assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self))
382    }
383
384    /// Creates a future that attempts to resolve the next item in the stream.
385    /// If an error is encountered before the next item, the error is returned
386    /// instead.
387    ///
388    /// This is similar to the `Stream::next` combinator, but returns a
389    /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making
390    /// for easy use with the `?` operator.
391    ///
392    /// # Examples
393    ///
394    /// ```
395    /// # futures::executor::block_on(async {
396    /// use futures::stream::{self, TryStreamExt};
397    ///
398    /// let mut stream = stream::iter(vec![Ok(()), Err(())]);
399    ///
400    /// assert_eq!(stream.try_next().await, Ok(Some(())));
401    /// assert_eq!(stream.try_next().await, Err(()));
402    /// # })
403    /// ```
404    fn try_next(&mut self) -> TryNext<'_, Self>
405    where
406        Self: Unpin,
407    {
408        assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self))
409    }
410
411    /// Attempts to run this stream to completion, executing the provided
412    /// asynchronous closure for each element on the stream.
413    ///
414    /// The provided closure will be called for each item this stream produces,
415    /// yielding a future. That future will then be executed to completion
416    /// before moving on to the next item.
417    ///
418    /// The returned value is a [`Future`](futures_core::future::Future) where the
419    /// [`Output`](futures_core::future::Future::Output) type is
420    /// `Result<(), Self::Error>`. If any of the intermediate
421    /// futures or the stream returns an error, this future will return
422    /// immediately with an error.
423    ///
424    /// # Examples
425    ///
426    /// ```
427    /// # futures::executor::block_on(async {
428    /// use futures::future;
429    /// use futures::stream::{self, TryStreamExt};
430    ///
431    /// let mut x = 0i32;
432    ///
433    /// {
434    ///     let fut = stream::repeat(Ok(1)).try_for_each(|item| {
435    ///         x += item;
436    ///         future::ready(if x == 3 { Err(()) } else { Ok(()) })
437    ///     });
438    ///     assert_eq!(fut.await, Err(()));
439    /// }
440    ///
441    /// assert_eq!(x, 3);
442    /// # })
443    /// ```
444    fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
445    where
446        F: FnMut(Self::Ok) -> Fut,
447        Fut: TryFuture<Ok = (), Error = Self::Error>,
448        Self: Sized,
449    {
450        assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f))
451    }
452
453    /// Skip elements on this stream while the provided asynchronous predicate
454    /// resolves to `true`.
455    ///
456    /// This function is similar to
457    /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits
458    /// early if an error occurs.
459    ///
460    /// # Examples
461    ///
462    /// ```
463    /// # futures::executor::block_on(async {
464    /// use futures::future;
465    /// use futures::stream::{self, TryStreamExt};
466    ///
467    /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)]);
468    /// let stream = stream.try_skip_while(|x| future::ready(Ok(*x < 3)));
469    ///
470    /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
471    /// assert_eq!(output, Ok(vec![3, 2]));
472    /// # })
473    /// ```
474    fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
475    where
476        F: FnMut(&Self::Ok) -> Fut,
477        Fut: TryFuture<Ok = bool, Error = Self::Error>,
478        Self: Sized,
479    {
480        assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f))
481    }
482
483    /// Take elements on this stream while the provided asynchronous predicate
484    /// resolves to `true`.
485    ///
486    /// This function is similar to
487    /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits
488    /// early if an error occurs.
489    ///
490    /// # Examples
491    ///
492    /// ```
493    /// # futures::executor::block_on(async {
494    /// use futures::future;
495    /// use futures::stream::{self, TryStreamExt};
496    ///
497    /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]);
498    /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3)));
499    ///
500    /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
501    /// assert_eq!(output, Ok(vec![1, 2]));
502    /// # })
503    /// ```
504    fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
505    where
506        F: FnMut(&Self::Ok) -> Fut,
507        Fut: TryFuture<Ok = bool, Error = Self::Error>,
508        Self: Sized,
509    {
510        assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f))
511    }
512
513    /// Attempts to run this stream to completion, executing the provided asynchronous
514    /// closure for each element on the stream concurrently as elements become
515    /// available, exiting as soon as an error occurs.
516    ///
517    /// This is similar to
518    /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent),
519    /// but will resolve to an error immediately if the underlying stream or the provided
520    /// closure return an error.
521    ///
522    /// This method is only available when the `std` or `alloc` feature of this
523    /// library is activated, and it is activated by default.
524    ///
525    /// # Examples
526    ///
527    /// ```
528    /// # futures::executor::block_on(async {
529    /// use futures::channel::oneshot;
530    /// use futures::stream::{self, StreamExt, TryStreamExt};
531    ///
532    /// let (tx1, rx1) = oneshot::channel();
533    /// let (tx2, rx2) = oneshot::channel();
534    /// let (_tx3, rx3) = oneshot::channel();
535    ///
536    /// let stream = stream::iter(vec![rx1, rx2, rx3]);
537    /// let fut = stream.map(Ok).try_for_each_concurrent(
538    ///     /* limit */ 2,
539    ///     |rx| async move {
540    ///         let res: Result<(), oneshot::Canceled> = rx.await;
541    ///         res
542    ///     }
543    /// );
544    ///
545    /// tx1.send(()).unwrap();
546    /// // Drop the second sender so that `rx2` resolves to `Canceled`.
547    /// drop(tx2);
548    ///
549    /// // The final result is an error because the second future
550    /// // resulted in an error.
551    /// assert_eq!(Err(oneshot::Canceled), fut.await);
552    /// # })
553    /// ```
554    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
555    #[cfg(feature = "alloc")]
556    fn try_for_each_concurrent<Fut, F>(
557        self,
558        limit: impl Into<Option<usize>>,
559        f: F,
560    ) -> TryForEachConcurrent<Self, Fut, F>
561    where
562        F: FnMut(Self::Ok) -> Fut,
563        Fut: Future<Output = Result<(), Self::Error>>,
564        Self: Sized,
565    {
566        assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new(
567            self,
568            limit.into(),
569            f,
570        ))
571    }
572
573    /// Attempt to transform a stream into a collection,
574    /// returning a future representing the result of that computation.
575    ///
576    /// This combinator will collect all successful results of this stream and
577    /// collect them into the specified collection type. If an error happens then all
578    /// collected elements will be dropped and the error will be returned.
579    ///
580    /// The returned future will be resolved when the stream terminates.
581    ///
582    /// # Examples
583    ///
584    /// ```
585    /// # futures::executor::block_on(async {
586    /// use futures::channel::mpsc;
587    /// use futures::stream::TryStreamExt;
588    /// use std::thread;
589    ///
590    /// let (tx, rx) = mpsc::unbounded();
591    ///
592    /// thread::spawn(move || {
593    ///     for i in 1..=5 {
594    ///         tx.unbounded_send(Ok(i)).unwrap();
595    ///     }
596    ///     tx.unbounded_send(Err(6)).unwrap();
597    /// });
598    ///
599    /// let output: Result<Vec<i32>, i32> = rx.try_collect().await;
600    /// assert_eq!(output, Err(6));
601    /// # })
602    /// ```
603    fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
604    where
605        Self: Sized,
606    {
607        assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
608    }
609
610    /// An adaptor for chunking up successful items of the stream inside a vector.
611    ///
612    /// This combinator will attempt to pull successful items from this stream and buffer
613    /// them into a local vector. At most `capacity` items will get buffered
614    /// before they're yielded from the returned stream.
615    ///
616    /// Note that the vectors returned from this iterator may not always have
617    /// `capacity` elements. If the underlying stream ended and only a partial
618    /// vector was created, it'll be returned. Additionally if an error happens
619    /// from the underlying stream then the currently buffered items will be
620    /// yielded.
621    ///
622    /// This method is only available when the `std` or `alloc` feature of this
623    /// library is activated, and it is activated by default.
624    ///
625    /// This function is similar to
626    /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits
627    /// early if an error occurs.
628    ///
629    /// # Examples
630    ///
631    /// ```
632    /// # futures::executor::block_on(async {
633    /// use futures::stream::{self, TryChunksError, TryStreamExt};
634    ///
635    /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
636    /// let mut stream = stream.try_chunks(2);
637    ///
638    /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
639    /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4)));
640    /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
641    /// # })
642    /// ```
643    ///
644    /// # Panics
645    ///
646    /// This method will panic if `capacity` is zero.
647    #[cfg(feature = "alloc")]
648    fn try_chunks(self, capacity: usize) -> TryChunks<Self>
649    where
650        Self: Sized,
651    {
652        assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>(
653            TryChunks::new(self, capacity),
654        )
655    }
656
657    /// An adaptor for chunking up successful, ready items of the stream inside a vector.
658    ///
659    /// This combinator will attempt to pull successful items from this stream and buffer
660    /// them into a local vector. At most `capacity` items will get buffered
661    /// before they're yielded from the returned stream. If the underlying stream
662    /// returns `Poll::Pending`, and the collected chunk is not empty, it will
663    /// be immediately returned.
664    ///
665    /// Note that the vectors returned from this iterator may not always have
666    /// `capacity` elements. If the underlying stream ended and only a partial
667    /// vector was created, it'll be returned. Additionally if an error happens
668    /// from the underlying stream then the currently buffered items will be
669    /// yielded.
670    ///
671    /// This method is only available when the `std` or `alloc` feature of this
672    /// library is activated, and it is activated by default.
673    ///
674    /// This function is similar to
675    /// [`StreamExt::ready_chunks`](crate::stream::StreamExt::ready_chunks) but exits
676    /// early if an error occurs.
677    ///
678    /// # Examples
679    ///
680    /// ```
681    /// # futures::executor::block_on(async {
682    /// use futures::stream::{self, TryReadyChunksError, TryStreamExt};
683    ///
684    /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
685    /// let mut stream = stream.try_ready_chunks(2);
686    ///
687    /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
688    /// assert_eq!(stream.try_next().await, Err(TryReadyChunksError(vec![3], 4)));
689    /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
690    /// # })
691    /// ```
692    ///
693    /// # Panics
694    ///
695    /// This method will panic if `capacity` is zero.
696    #[cfg(feature = "alloc")]
697    fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self>
698    where
699        Self: Sized,
700    {
701        assert_stream::<Result<Vec<Self::Ok>, TryReadyChunksError<Self::Ok, Self::Error>>, _>(
702            TryReadyChunks::new(self, capacity),
703        )
704    }
705
706    /// Attempt to filter the values produced by this stream according to the
707    /// provided asynchronous closure.
708    ///
709    /// As values of this stream are made available, the provided predicate `f`
710    /// will be run on them. If the predicate returns a `Future` which resolves
711    /// to `true`, then the stream will yield the value, but if the predicate
712    /// return a `Future` which resolves to `false`, then the value will be
713    /// discarded and the next value will be produced.
714    ///
715    /// All errors are passed through without filtering in this combinator.
716    ///
717    /// Note that this function consumes the stream passed into it and returns a
718    /// wrapped version of it, similar to the existing `filter` methods in
719    /// the standard library.
720    ///
721    /// # Examples
722    /// ```
723    /// # futures::executor::block_on(async {
724    /// use futures::future;
725    /// use futures::stream::{self, StreamExt, TryStreamExt};
726    ///
727    /// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error")]);
728    /// let mut evens = stream.try_filter(|x| {
729    ///     future::ready(x % 2 == 0)
730    /// });
731    ///
732    /// assert_eq!(evens.next().await, Some(Ok(2)));
733    /// assert_eq!(evens.next().await, Some(Err("error")));
734    /// # })
735    /// ```
736    fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
737    where
738        Fut: Future<Output = bool>,
739        F: FnMut(&Self::Ok) -> Fut,
740        Self: Sized,
741    {
742        assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f))
743    }
744
745    /// Attempt to filter the values produced by this stream while
746    /// simultaneously mapping them to a different type according to the
747    /// provided asynchronous closure.
748    ///
749    /// As values of this stream are made available, the provided function will
750    /// be run on them. If the future returned by the predicate `f` resolves to
751    /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
752    /// it resolves to [`None`] then the next value will be produced.
753    ///
754    /// All errors are passed through without filtering in this combinator.
755    ///
756    /// Note that this function consumes the stream passed into it and returns a
757    /// wrapped version of it, similar to the existing `filter_map` methods in
758    /// the standard library.
759    ///
760    /// # Examples
761    /// ```
762    /// # futures::executor::block_on(async {
763    /// use core::pin::pin;
764    ///
765    /// use futures::stream;
766    /// use futures::stream::StreamExt;
767    /// use futures::stream::TryStreamExt;
768    ///
769    /// let stream = stream::iter(vec![Ok(1i32), Ok(6i32), Err("error")]);
770    /// let halves = stream.try_filter_map(|x| async move {
771    ///     let ret = if x % 2 == 0 { Some(x / 2) } else { None };
772    ///     Ok(ret)
773    /// });
774    ///
775    /// let mut halves = pin!(halves);
776    /// assert_eq!(halves.next().await, Some(Ok(3)));
777    /// assert_eq!(halves.next().await, Some(Err("error")));
778    /// # })
779    /// ```
780    fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
781    where
782        Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
783        F: FnMut(Self::Ok) -> Fut,
784        Self: Sized,
785    {
786        assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
787    }
788
789    /// Flattens a stream of streams into just one continuous stream. Produced streams
790    /// will be polled concurrently and any errors will be passed through without looking at them.
791    /// If the underlying base stream returns an error, it will be **immediately** propagated.
792    ///
793    /// The only argument is an optional limit on the number of concurrently
794    /// polled streams. If this limit is not `None`, no more than `limit` streams
795    /// will be polled at the same time. The `limit` argument is of type
796    /// `Into<Option<usize>>`, and so can be provided as either `None`,
797    /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
798    /// no limit at all, and will have the same result as passing in `None`.
799    ///
800    /// # Examples
801    ///
802    /// ```
803    /// # futures::executor::block_on(async {
804    /// use futures::channel::mpsc;
805    /// use futures::stream::{StreamExt, TryStreamExt};
806    /// use std::thread;
807    ///
808    /// let (tx1, rx1) = mpsc::unbounded();
809    /// let (tx2, rx2) = mpsc::unbounded();
810    /// let (tx3, rx3) = mpsc::unbounded();
811    ///
812    /// thread::spawn(move || {
813    ///     tx1.unbounded_send(Ok(1)).unwrap();
814    /// });
815    /// thread::spawn(move || {
816    ///     tx2.unbounded_send(Ok(2)).unwrap();
817    ///     tx2.unbounded_send(Err(3)).unwrap();
818    ///     tx2.unbounded_send(Ok(4)).unwrap();
819    /// });
820    /// thread::spawn(move || {
821    ///     tx3.unbounded_send(Ok(rx1)).unwrap();
822    ///     tx3.unbounded_send(Ok(rx2)).unwrap();
823    ///     tx3.unbounded_send(Err(5)).unwrap();
824    /// });
825    ///
826    /// let stream = rx3.try_flatten_unordered(None);
827    /// let mut values: Vec<_> = stream.collect().await;
828    /// values.sort();
829    ///
830    /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]);
831    /// # });
832    /// ```
833    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
834    #[cfg(feature = "alloc")]
835    fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self>
836    where
837        Self::Ok: TryStream + Unpin,
838        <Self::Ok as TryStream>::Error: From<Self::Error>,
839        Self: Sized,
840    {
841        assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
842            TryFlattenUnordered::new(self, limit),
843        )
844    }
845
846    /// Flattens a stream of streams into just one continuous stream.
847    ///
848    /// If this stream's elements are themselves streams then this combinator
849    /// will flatten out the entire stream to one long chain of elements. Any
850    /// errors are passed through without looking at them, but otherwise each
851    /// individual stream will get exhausted before moving on to the next.
852    ///
853    /// # Examples
854    ///
855    /// ```
856    /// # futures::executor::block_on(async {
857    /// use futures::channel::mpsc;
858    /// use futures::stream::{StreamExt, TryStreamExt};
859    /// use std::thread;
860    ///
861    /// let (tx1, rx1) = mpsc::unbounded();
862    /// let (tx2, rx2) = mpsc::unbounded();
863    /// let (tx3, rx3) = mpsc::unbounded();
864    ///
865    /// thread::spawn(move || {
866    ///     tx1.unbounded_send(Ok(1)).unwrap();
867    /// });
868    /// thread::spawn(move || {
869    ///     tx2.unbounded_send(Ok(2)).unwrap();
870    ///     tx2.unbounded_send(Err(3)).unwrap();
871    ///     tx2.unbounded_send(Ok(4)).unwrap();
872    /// });
873    /// thread::spawn(move || {
874    ///     tx3.unbounded_send(Ok(rx1)).unwrap();
875    ///     tx3.unbounded_send(Ok(rx2)).unwrap();
876    ///     tx3.unbounded_send(Err(5)).unwrap();
877    /// });
878    ///
879    /// let mut stream = rx3.try_flatten();
880    /// assert_eq!(stream.next().await, Some(Ok(1)));
881    /// assert_eq!(stream.next().await, Some(Ok(2)));
882    /// assert_eq!(stream.next().await, Some(Err(3)));
883    /// assert_eq!(stream.next().await, Some(Ok(4)));
884    /// assert_eq!(stream.next().await, Some(Err(5)));
885    /// assert_eq!(stream.next().await, None);
886    /// # });
887    /// ```
888    fn try_flatten(self) -> TryFlatten<Self>
889    where
890        Self::Ok: TryStream,
891        <Self::Ok as TryStream>::Error: From<Self::Error>,
892        Self: Sized,
893    {
894        assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
895            TryFlatten::new(self),
896        )
897    }
898
899    /// Attempt to execute an accumulating asynchronous computation over a
900    /// stream, collecting all the values into one final result.
901    ///
902    /// This combinator will accumulate all values returned by this stream
903    /// according to the closure provided. The initial state is also provided to
904    /// this method and then is returned again by each execution of the closure.
905    /// Once the entire stream has been exhausted the returned future will
906    /// resolve to this value.
907    ///
908    /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will
909    /// exit early if an error is encountered in either the stream or the
910    /// provided closure.
911    ///
912    /// # Examples
913    ///
914    /// ```
915    /// # futures::executor::block_on(async {
916    /// use futures::stream::{self, TryStreamExt};
917    ///
918    /// let number_stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
919    /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) });
920    /// assert_eq!(sum.await, Ok(3));
921    ///
922    /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]);
923    /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) });
924    /// assert_eq!(sum.await, Err(2));
925    /// # })
926    /// ```
927    fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
928    where
929        F: FnMut(T, Self::Ok) -> Fut,
930        Fut: TryFuture<Ok = T, Error = Self::Error>,
931        Self: Sized,
932    {
933        assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init))
934    }
935
936    /// Attempt to concatenate all items of a stream into a single
937    /// extendable destination, returning a future representing the end result.
938    ///
939    /// This combinator will extend the first item with the contents of all
940    /// the subsequent successful results of the stream. If the stream is empty,
941    /// the default value will be returned.
942    ///
943    /// Works with all collections that implement the [`Extend`](std::iter::Extend) trait.
944    ///
945    /// This method is similar to [`concat`](crate::stream::StreamExt::concat), but will
946    /// exit early if an error is encountered in the stream.
947    ///
948    /// # Examples
949    ///
950    /// ```
951    /// # futures::executor::block_on(async {
952    /// use futures::channel::mpsc;
953    /// use futures::stream::TryStreamExt;
954    /// use std::thread;
955    ///
956    /// let (tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>();
957    ///
958    /// thread::spawn(move || {
959    ///     for i in (0..3).rev() {
960    ///         let n = i * 3;
961    ///         tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap();
962    ///     }
963    /// });
964    ///
965    /// let result = rx.try_concat().await;
966    ///
967    /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
968    /// # });
969    /// ```
970    fn try_concat(self) -> TryConcat<Self>
971    where
972        Self: Sized,
973        Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
974    {
975        assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
976    }
977
978    /// Attempt to execute several futures from a stream concurrently (unordered).
979    ///
980    /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
981    /// that matches the stream's `Error` type.
982    ///
983    /// This adaptor will buffer up to `n` futures and then return their
984    /// outputs in the order in which they complete. If the underlying stream
985    /// returns an error, it will be immediately propagated.
986    ///
987    /// The returned stream will be a stream of results, each containing either
988    /// an error or a future's output. An error can be produced either by the
989    /// underlying stream itself or by one of the futures it yielded.
990    ///
991    /// This method is only available when the `std` or `alloc` feature of this
992    /// library is activated, and it is activated by default.
993    ///
994    /// # Examples
995    ///
996    /// Results are returned in the order of completion:
997    /// ```
998    /// # futures::executor::block_on(async {
999    /// use futures::channel::oneshot;
1000    /// use futures::stream::{self, StreamExt, TryStreamExt};
1001    ///
1002    /// let (send_one, recv_one) = oneshot::channel();
1003    /// let (send_two, recv_two) = oneshot::channel();
1004    ///
1005    /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
1006    ///
1007    /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
1008    ///
1009    /// send_two.send(2i32)?;
1010    /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1011    ///
1012    /// send_one.send(1i32)?;
1013    /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1014    ///
1015    /// assert_eq!(buffered.next().await, None);
1016    /// # Ok::<(), i32>(()) }).unwrap();
1017    /// ```
1018    ///
1019    /// Errors from the underlying stream itself are propagated:
1020    /// ```
1021    /// # futures::executor::block_on(async {
1022    /// use futures::channel::mpsc;
1023    /// use futures::stream::{StreamExt, TryStreamExt};
1024    ///
1025    /// let (sink, stream_of_futures) = mpsc::unbounded();
1026    /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
1027    ///
1028    /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
1029    /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
1030    ///
1031    /// sink.unbounded_send(Err("error in the stream"))?;
1032    /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
1033    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
1034    /// ```
1035    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1036    #[cfg(feature = "alloc")]
1037    fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>
1038    where
1039        Self::Ok: TryFuture<Error = Self::Error>,
1040        Self: Sized,
1041    {
1042        assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(
1043            TryBufferUnordered::new(self, n),
1044        )
1045    }
1046
1047    /// Attempt to execute several futures from a stream concurrently.
1048    ///
1049    /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
1050    /// that matches the stream's `Error` type.
1051    ///
1052    /// This adaptor will buffer up to `n` futures and then return their
1053    /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
1054    /// be immediately propagated.
1055    ///
1056    /// The returned stream will be a stream of results, each containing either
1057    /// an error or a future's output. An error can be produced either by the
1058    /// underlying stream itself or by one of the futures it yielded.
1059    ///
1060    /// This method is only available when the `std` or `alloc` feature of this
1061    /// library is activated, and it is activated by default.
1062    ///
1063    /// # Examples
1064    ///
1065    /// Results are returned in the order of addition:
1066    /// ```
1067    /// # futures::executor::block_on(async {
1068    /// use futures::channel::oneshot;
1069    /// use futures::future::lazy;
1070    /// use futures::stream::{self, StreamExt, TryStreamExt};
1071    ///
1072    /// let (send_one, recv_one) = oneshot::channel();
1073    /// let (send_two, recv_two) = oneshot::channel();
1074    ///
1075    /// let mut buffered = lazy(move |cx| {
1076    ///     let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
1077    ///
1078    ///     let mut buffered = stream_of_futures.try_buffered(10);
1079    ///
1080    ///     assert!(buffered.try_poll_next_unpin(cx).is_pending());
1081    ///
1082    ///     send_two.send(2i32)?;
1083    ///     assert!(buffered.try_poll_next_unpin(cx).is_pending());
1084    ///     Ok::<_, i32>(buffered)
1085    /// }).await?;
1086    ///
1087    /// send_one.send(1i32)?;
1088    /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1089    /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1090    ///
1091    /// assert_eq!(buffered.next().await, None);
1092    /// # Ok::<(), i32>(()) }).unwrap();
1093    /// ```
1094    ///
1095    /// Errors from the underlying stream itself are propagated:
1096    /// ```
1097    /// # futures::executor::block_on(async {
1098    /// use futures::channel::mpsc;
1099    /// use futures::stream::{StreamExt, TryStreamExt};
1100    ///
1101    /// let (sink, stream_of_futures) = mpsc::unbounded();
1102    /// let mut buffered = stream_of_futures.try_buffered(10);
1103    ///
1104    /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
1105    /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
1106    ///
1107    /// sink.unbounded_send(Err("error in the stream"))?;
1108    /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
1109    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
1110    /// ```
1111    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1112    #[cfg(feature = "alloc")]
1113    fn try_buffered(self, n: usize) -> TryBuffered<Self>
1114    where
1115        Self::Ok: TryFuture<Error = Self::Error>,
1116        Self: Sized,
1117    {
1118        assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(
1119            self, n,
1120        ))
1121    }
1122
1123    // TODO: false positive warning from rustdoc. Verify once #43466 settles
1124    //
1125    /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
1126    /// stream types.
1127    fn try_poll_next_unpin(
1128        &mut self,
1129        cx: &mut Context<'_>,
1130    ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
1131    where
1132        Self: Unpin,
1133    {
1134        Pin::new(self).try_poll_next(cx)
1135    }
1136
1137    /// Wraps a [`TryStream`] into a stream compatible with libraries using
1138    /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
1139    /// ```
1140    /// # if cfg!(miri) { return; } // Miri does not support epoll
1141    /// use futures::future::{FutureExt, TryFutureExt};
1142    /// # let (tx, rx) = futures::channel::oneshot::channel();
1143    ///
1144    /// let future03 = async {
1145    ///     println!("Running on the pool");
1146    ///     tx.send(42).unwrap();
1147    /// };
1148    ///
1149    /// let future01 = future03
1150    ///     .unit_error() // Make it a TryFuture
1151    ///     .boxed()  // Make it Unpin
1152    ///     .compat();
1153    ///
1154    /// tokio::run(future01);
1155    /// # assert_eq!(42, futures::executor::block_on(rx).unwrap());
1156    /// ```
1157    #[cfg(feature = "compat")]
1158    #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
1159    fn compat(self) -> Compat<Self>
1160    where
1161        Self: Sized + Unpin,
1162    {
1163        Compat::new(self)
1164    }
1165
1166    /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
1167    ///
1168    /// This method is only available when the `std` feature of this
1169    /// library is activated, and it is activated by default.
1170    ///
1171    /// # Examples
1172    ///
1173    /// ```
1174    /// # futures::executor::block_on(async {
1175    /// use futures::stream::{self, TryStreamExt};
1176    /// use futures::io::AsyncReadExt;
1177    ///
1178    /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
1179    /// let mut reader = stream.into_async_read();
1180    ///
1181    /// let mut buf = Vec::new();
1182    /// reader.read_to_end(&mut buf).await.unwrap();
1183    /// assert_eq!(buf, [1, 2, 3, 4, 5]);
1184    /// # })
1185    /// ```
1186    #[cfg(feature = "io")]
1187    #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
1188    #[cfg(feature = "std")]
1189    fn into_async_read(self) -> IntoAsyncRead<Self>
1190    where
1191        Self: Sized + TryStreamExt<Error = std::io::Error>,
1192        Self::Ok: AsRef<[u8]>,
1193    {
1194        crate::io::assert_read(IntoAsyncRead::new(self))
1195    }
1196
1197    /// Attempt to execute a predicate over an asynchronous stream and evaluate if all items
1198    /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
1199    /// that does not satisfy the predicate.
1200    ///
1201    /// # Examples
1202    ///
1203    /// ```
1204    /// # futures::executor::block_on(async {
1205    /// use futures::stream::{self, StreamExt, TryStreamExt};
1206    /// use std::convert::Infallible;
1207    ///
1208    /// let number_stream = stream::iter(1..10).map(Ok::<_, Infallible>);
1209    /// let positive = number_stream.try_all(|i| async move { i > 0 });
1210    /// assert_eq!(positive.await, Ok(true));
1211    ///
1212    /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]);
1213    /// let positive = stream_with_errors.try_all(|i| async move { i > 0 });
1214    /// assert_eq!(positive.await, Err("err"));
1215    /// # });
1216    /// ```
1217    fn try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F>
1218    where
1219        Self: Sized,
1220        F: FnMut(Self::Ok) -> Fut,
1221        Fut: Future<Output = bool>,
1222    {
1223        assert_future::<Result<bool, Self::Error>, _>(TryAll::new(self, f))
1224    }
1225
1226    /// Attempt to execute a predicate over an asynchronous stream and evaluate if any items
1227    /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
1228    /// that satisfies the predicate.
1229    ///
1230    /// # Examples
1231    ///
1232    /// ```
1233    /// # futures::executor::block_on(async {
1234    /// use futures::stream::{self, StreamExt, TryStreamExt};
1235    /// use std::convert::Infallible;
1236    ///
1237    /// let number_stream = stream::iter(0..10).map(Ok::<_, Infallible>);
1238    /// let contain_three = number_stream.try_any(|i| async move { i == 3 });
1239    /// assert_eq!(contain_three.await, Ok(true));
1240    ///
1241    /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]);
1242    /// let contain_three = stream_with_errors.try_any(|i| async move { i == 3 });
1243    /// assert_eq!(contain_three.await, Err("err"));
1244    /// # });
1245    /// ```
1246    fn try_any<Fut, F>(self, f: F) -> TryAny<Self, Fut, F>
1247    where
1248        Self: Sized,
1249        F: FnMut(Self::Ok) -> Fut,
1250        Fut: Future<Output = bool>,
1251    {
1252        assert_future::<Result<bool, Self::Error>, _>(TryAny::new(self, f))
1253    }
1254}