tokio_stream_util/try_stream/ext/
mod.rs

1//! Streams
2//!
3//! This module contains a number of functions for working with `Streams`s
4//! that return `Result`s, allowing for short-circuiting computations.
5
6use core::future::Future;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9use futures_core::future::TryFuture;
10
11use super::TryStream;
12use crate::FusedStream;
13
14mod and_then;
15pub use and_then::AndThen;
16
17mod err_into;
18pub use err_into::ErrInto;
19
20mod inspect;
21pub use inspect::InspectOk;
22
23pub use inspect::InspectErr;
24
25mod into_stream;
26
27#[cfg(feature = "alloc")]
28#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
29pub(crate) use into_stream::IntoFuseStream;
30
31pub use into_stream::IntoStream;
32
33mod map_ok;
34pub use map_ok::MapOk;
35
36mod map_err;
37pub use map_err::MapErr;
38
39mod or_else;
40pub use or_else::OrElse;
41
42mod try_next;
43pub use try_next::TryNext;
44
45mod try_filter;
46pub use try_filter::TryFilter;
47
48mod try_filter_map;
49pub use try_filter_map::TryFilterMap;
50
51mod try_flatten;
52pub use try_flatten::TryFlatten;
53
54#[cfg(all(feature = "alloc", feature = "std"))]
55#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
56mod try_flatten_unordered;
57#[cfg(all(feature = "alloc", feature = "std"))]
58#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
59pub use try_flatten_unordered::TryFlattenUnordered;
60
61mod try_collect;
62pub use try_collect::TryCollect;
63
64mod try_concat;
65pub use try_concat::TryConcat;
66
67#[cfg(feature = "alloc")]
68mod try_chunks;
69#[cfg(feature = "alloc")]
70pub use try_chunks::{TryChunks, TryChunksError};
71
72#[cfg(feature = "alloc")]
73mod try_ready_chunks;
74#[cfg(feature = "alloc")]
75pub use try_ready_chunks::{TryReadyChunks, TryReadyChunksError};
76
77mod try_unfold;
78pub use try_unfold::{try_unfold, TryUnfold};
79
80mod try_skip_while;
81pub use try_skip_while::TrySkipWhile;
82
83mod try_take_while;
84pub use try_take_while::TryTakeWhile;
85
86#[cfg(feature = "alloc")]
87#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
88mod try_buffer_unordered;
89#[cfg(feature = "alloc")]
90#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
91pub use try_buffer_unordered::TryBufferUnordered;
92
93#[cfg(feature = "alloc")]
94#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
95mod try_buffered;
96#[cfg(feature = "alloc")]
97#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
98pub use try_buffered::TryBuffered;
99
100#[cfg(all(feature = "io", feature = "std"))]
101mod into_async_read;
102#[cfg(all(feature = "io", feature = "std"))]
103#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
104pub use into_async_read::IntoAsyncRead;
105
106mod try_all;
107pub use try_all::TryAll;
108
109mod try_any;
110pub use try_any::TryAny;
111
112impl<S: ?Sized + TryStream> TryStreamExt for S {}
113
114/// Adapters specific to `Result`-returning streams
115pub trait TryStreamExt: TryStream {
116    /// Wraps the current stream in a new stream which converts the error type
117    /// into the one provided.
118    ///
119    /// # Examples
120    ///
121    /// ```
122    /// use tokio_stream::StreamExt;
123    /// use tokio_stream_util::TryStreamExt;
124    ///
125    /// #[tokio::main]
126    /// async fn main() {
127    /// let stream =
128    ///     tokio_stream::iter(vec![Ok::<(), i32>(()), Err::<(), i32>(5)])
129    ///         .err_into::<i64>();
130    ///
131    /// let collected =  stream.into_stream().collect::<Vec<_>>().await;
132    /// assert_eq!(collected, vec![Ok(()), Err(5i64)]);
133    /// }
134    /// ```
135    fn err_into<E>(self) -> ErrInto<Self, E>
136    where
137        Self: Sized,
138        Self::Error: Into<E>,
139    {
140        ErrInto::new(self)
141    }
142
143    /// Wraps the current stream in a new stream which maps the success value
144    /// using the provided closure.
145    ///
146    /// # Examples
147    ///
148    /// ```
149    /// use tokio_stream::StreamExt;
150    /// use tokio_stream_util::TryStreamExt;
151    ///
152    /// #[tokio::main]
153    /// async fn main() {
154    /// let stream =
155    ///     tokio_stream::iter(vec![Ok::<i32, i32>(5), Err::<i32, i32>(0)])
156    ///         .map_ok(|x| x + 2);
157    ///
158    /// let out =  stream.into_stream().collect::<Vec<_>>().await;
159    /// assert_eq!(out, vec![Ok(7), Err(0)]);
160    /// }
161    /// ```
162    fn map_ok<V, F>(self, f: F) -> MapOk<Self, V, F>
163    where
164        Self: TryStream + Unpin + Sized,
165        F: FnMut(Self::Ok) -> V,
166    {
167        MapOk::new(self, f)
168    }
169
170    /// Wraps the current stream in a new stream which maps the error value
171    /// using the provided closure.
172    ///
173    /// # Examples
174    ///
175    /// ```
176    /// use tokio_stream::StreamExt;
177    /// use tokio_stream_util::TryStreamExt;
178    ///
179    /// #[derive(Debug)]
180    /// struct MyErr(String);
181    /// impl core::fmt::Display for MyErr { fn fmt(&self, f:&mut core::fmt::Formatter<'_>)->core::fmt::Result{ self.0.fmt(f)} }
182    /// impl core::error::Error for MyErr {}
183    ///
184    /// let stream =
185    ///     tokio_stream::iter(vec![Ok::<i32, i32>(5), Err::<i32, i32>(0)])
186    ///         .map_err(|x| MyErr(format!("e{}", x)));
187    ///
188    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
189    /// let out = rt.block_on(async { stream.into_stream().collect::<Vec<_>>().await });
190    /// assert_eq!(format!("{:?}", out), format!("{:?}", vec![Ok(5), Err(MyErr(String::from("e0")))]));
191    /// ```
192    fn map_err<E, F>(self, f: F) -> MapErr<Self, E, F>
193    where
194        Self: TryStream + Unpin + Sized,
195        E: core::error::Error,
196        F: FnMut(Self::Error) -> E,
197    {
198        MapErr::new(self, f)
199    }
200
201    /// Chain on a computation for when a value is ready, passing the successful
202    /// results to the provided closure `f`.
203    ///
204    /// This function can be used to run a unit of work when the next successful
205    /// value on a stream is ready. The closure provided will be yielded a value
206    /// when ready, and the returned future will then be run to completion to
207    /// produce the next value on this stream.
208    ///
209    /// Any errors produced by this stream will not be passed to the closure,
210    /// and will be passed through.
211    ///
212    /// The returned value of the closure must implement the `TryFuture` trait
213    /// and can represent some more work to be done before the composed stream
214    /// is finished.
215    ///
216    /// Note that this function consumes the receiving stream and returns a
217    /// wrapped version of it.
218    ///
219    /// To process the entire stream and return a single future representing
220    /// success or error, use `try_for_each` instead.
221    ///
222    /// # Examples
223    ///
224    /// ```
225    /// use tokio_stream::StreamExt;
226    /// use tokio_stream_util::TryStreamExt;
227    ///
228    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
229    /// rt.block_on(async {
230    ///     let stream = tokio_stream::iter(vec![Ok::<i32, ()>(1), Ok(2)])
231    ///         .and_then(|result| async move {
232    ///             Ok(if result % 2 == 0 { Some(result) } else { None })
233    ///         });
234    ///
235    ///     let out = stream.into_stream().collect::<Vec<_>>().await;
236    ///     assert_eq!(out, vec![Ok(None), Ok(Some(2))]);
237    /// });
238    /// ```
239    fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
240    where
241        F: FnMut(Self::Ok) -> Fut,
242        Fut: TryFuture<Error = Self::Error>,
243        Self: Sized,
244    {
245        AndThen::new(self, f)
246    }
247
248    /// Chain on a computation for when an error happens, passing the
249    /// erroneous result to the provided closure `f`.
250    ///
251    /// This function can be used to run a unit of work and attempt to recover from
252    /// an error if one happens. The closure provided will be yielded an error
253    /// when one appears, and the returned future will then be run to completion
254    /// to produce the next value on this stream.
255    ///
256    /// Any successful values produced by this stream will not be passed to the
257    /// closure, and will be passed through.
258    ///
259    /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait
260    /// and can represent some more work to be done before the composed stream
261    /// is finished.
262    ///
263    /// Note that this function consumes the receiving stream and returns a
264    /// wrapped version of it.
265    fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
266    where
267        F: FnMut(Self::Error) -> Fut,
268        Fut: TryFuture<Ok = Self::Ok>,
269        Self: Sized,
270    {
271        OrElse::new(self, f)
272    }
273
274    /// Do something with the success value of this stream, afterwards passing
275    /// it on.
276    ///
277    /// This is similar to the `StreamExt::inspect` method where it allows
278    /// easily inspecting the success value as it passes through the stream, for
279    /// example to debug what's going on.
280    fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
281    where
282        F: FnMut(&Self::Ok),
283        Self: Sized,
284    {
285        InspectOk::new(self, f)
286    }
287
288    /// Do something with the error value of this stream, afterwards passing it on.
289    ///
290    /// This is similar to the `StreamExt::inspect` method where it allows
291    /// easily inspecting the error value as it passes through the stream, for
292    /// example to debug what's going on.
293    fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
294    where
295        F: FnMut(&Self::Error),
296        Self: Sized,
297    {
298        InspectErr::new(self, f)
299    }
300
301    /// Wraps a [`TryStream`] into a type that implements
302    /// [`Stream`](tokio_stream::Stream)
303    ///
304    /// [`TryStream`]s currently do not implement the
305    /// [`Stream`](tokio_stream::Stream) trait because of limitations
306    /// of the compiler.
307    ///
308    /// # Examples
309    ///
310    /// ```
311    /// use tokio_stream::StreamExt;
312    /// use tokio_stream_util::{TryStream, TryStreamExt};
313    ///
314    /// type T = i32;
315    /// type E = ();
316    ///
317    /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> {
318    ///     tokio_stream::iter(vec![Ok::<T, E>(1), Ok(2), Err(())])
319    /// }
320    ///
321    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
322    /// rt.block_on(async {
323    ///     let out = make_try_stream().into_stream().collect::<Vec<_>>().await;
324    ///     assert_eq!(out, vec![Ok(1), Ok(2), Err(())]);
325    /// });
326    /// ```
327    fn into_stream(self) -> IntoStream<Self>
328    where
329        Self: Sized,
330    {
331        IntoStream::new(self)
332    }
333
334    /// Creates a future that attempts to resolve the next item in the stream.
335    /// If an error is encountered before the next item, the error is returned
336    /// instead.
337    ///
338    /// This is similar to the `Stream::next` combinator, but returns a
339    /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making
340    /// for easy use with the `?` operator.
341    ///
342    /// # Examples
343    ///
344    /// ```
345    /// use tokio_stream_util::TryStreamExt;
346    ///
347    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
348    /// rt.block_on(async {
349    ///     let mut stream = tokio_stream::iter(vec![Ok::<(), ()>(()), Err::<(), ()>(())]);
350    ///     assert_eq!(stream.try_next().await, Ok(Some(())));
351    /// });
352    /// ```
353    fn try_next(&mut self) -> TryNext<'_, Self>
354    where
355        Self: Unpin,
356    {
357        TryNext::new(self)
358    }
359
360    /// Skip elements on this stream while the provided asynchronous predicate
361    /// resolves to `true`.
362    ///
363    /// This function is similar to
364    /// [`StreamExt::skip_while`](tokio_stream::stream::StreamExt::skip_while) but exits
365    /// early if an error occurs.
366    ///
367    /// # Examples
368    ///
369    /// ```
370    /// use tokio_stream::StreamExt;
371    /// use tokio_stream_util::TryStreamExt;
372    ///
373    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
374    /// rt.block_on(async {
375    ///     let stream = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)])
376    ///         .try_skip_while(|x| {
377    ///             let v = *x;
378    ///             async move { Ok(v < 3) }
379    ///         });
380    ///
381    ///     let out = stream.into_stream().collect::<Vec<_>>().await;
382    ///     assert_eq!(out, vec![Ok(3), Ok(2)]);
383    /// });
384    /// ```
385    fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
386    where
387        F: FnMut(&Self::Ok) -> Fut,
388        Fut: TryFuture<Ok = bool, Error = Self::Error>,
389        Self: Sized,
390    {
391        TrySkipWhile::new(self, f)
392    }
393
394    /// Take elements on this stream while the provided asynchronous predicate
395    /// resolves to `true`.
396    ///
397    /// This function is similar to
398    /// [`StreamExt::take_while`](tokio_stream::stream::StreamExt::take_while) but exits
399    /// early if an error occurs.
400    ///
401    /// # Examples
402    ///
403    /// ```
404    /// use tokio_stream::StreamExt;
405    /// use tokio_stream_util::TryStreamExt;
406    ///
407    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
408    /// rt.block_on(async {
409    ///     let stream = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)])
410    ///         .try_take_while(|x| {
411    ///             let v = *x;
412    ///             async move { Ok(v < 3) }
413    ///         });
414    ///
415    ///     let out = stream.into_stream().collect::<Vec<_>>().await;
416    ///     assert_eq!(out, vec![Ok(1), Ok(2)]);
417    /// });
418    /// ```
419    fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
420    where
421        F: FnMut(&Self::Ok) -> Fut,
422        Fut: TryFuture<Ok = bool, Error = Self::Error>,
423        Self: Sized,
424    {
425        TryTakeWhile::new(self, f)
426    }
427
428    /// Attempt to transform a stream into a collection,
429    /// returning a future representing the result of that computation.
430    ///
431    /// This combinator will collect all successful results of this stream and
432    /// collect them into the specified collection type. If an error happens then all
433    /// collected elements will be dropped and the error will be returned.
434    ///
435    /// The returned future will be resolved when the stream terminates.
436    ///
437    /// # Examples
438    ///
439    /// ```
440    /// use tokio_stream_util::TryStreamExt;
441    ///
442    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
443    /// rt.block_on(async {
444    ///     let future = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Err(3)])
445    ///         .try_collect::<Vec<_>>();
446    ///
447    ///     assert_eq!(future.await, Err(3));
448    /// });
449    /// ```
450    fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
451    where
452        Self: Sized,
453    {
454        TryCollect::new(self)
455    }
456
457    /// An adaptor for chunking up successful items of the stream inside a vector.
458    ///
459    /// This combinator will attempt to pull successful items from this stream and buffer
460    /// them into a local vector. At most `capacity` items will get buffered
461    /// before they're yielded from the returned stream.
462    ///
463    /// Note that the vectors returned from this iterator may not always have
464    /// `capacity` elements. If the underlying stream ended and only a partial
465    /// vector was created, it'll be returned. Additionally if an error happens
466    /// from the underlying stream then the currently buffered items will be
467    /// yielded.
468    ///
469    /// This method is only available when the `std` or `alloc` feature of this
470    /// library is activated, and it is activated by default.
471    ///
472    /// This function is similar to
473    /// [`StreamExt::chunks`](tokio_stream::stream::StreamExt::chunks) but exits
474    /// early if an error occurs.
475    ///
476    /// # Examples
477    ///
478    /// ```
479    /// use tokio_stream::StreamExt;
480    /// use tokio_stream_util::TryStreamExt;
481    /// use tokio_stream_util::try_stream::TryChunksError;
482    ///
483    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
484    /// rt.block_on(async {
485    ///     let stream = tokio_stream::iter(vec![
486    ///         Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)
487    ///     ]).try_chunks(2);
488    ///
489    ///     let out = stream.into_stream().collect::<Vec<_>>().await;
490    ///     assert_eq!(out, vec![
491    ///         Ok(vec![1, 2]),
492    ///         Ok(vec![3]),
493    ///         Err(TryChunksError::<i32, i32>(vec![], 4)),
494    ///         Ok(vec![5, 6]),
495    ///     ]);
496    /// });
497    /// ```
498    ///
499    /// # Panics
500    ///
501    /// This method will panic if `capacity` is zero.
502    #[cfg(feature = "alloc")]
503    fn try_chunks(self, capacity: usize) -> TryChunks<Self>
504    where
505        <IntoFuseStream<Self> as tokio_stream::Stream>::Item: core::fmt::Debug,
506        Self: Sized,
507    {
508        TryChunks::new(self, capacity)
509    }
510
511    /// An adaptor for chunking up successful, ready items of the stream inside a vector.
512    ///
513    /// This combinator will attempt to pull successful items from this stream and buffer
514    /// them into a local vector. At most `capacity` items will get buffered
515    /// before they're yielded from the returned stream. If the underlying stream
516    /// returns `Poll::Pending`, and the collected chunk is not empty, it will
517    /// be immediately returned.
518    ///
519    /// Note that the vectors returned from this iterator may not always have
520    /// `capacity` elements. If the underlying stream ended and only a partial
521    /// vector was created, it'll be returned. Additionally if an error happens
522    /// from the underlying stream then the currently buffered items will be
523    /// yielded.
524    ///
525    /// This method is only available when the `std` or `alloc` feature of this
526    /// library is activated, and it is activated by default.
527    ///
528    /// This function is similar to
529    /// [`StreamExt::ready_chunks`](tokio_stream::stream::StreamExt::ready_chunks) but exits
530    /// early if an error occurs.
531    ///
532    /// # Examples
533    ///
534    /// ```
535    /// use tokio_stream::StreamExt;
536    /// use tokio_stream_util::TryStreamExt;
537    /// use tokio_stream_util::try_stream::TryReadyChunksError;
538    ///
539    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
540    /// rt.block_on(async {
541    ///     let stream = tokio_stream::iter(vec![
542    ///         Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)
543    ///     ]).try_ready_chunks(2);
544    ///
545    ///     let out = stream.into_stream().collect::<Vec<_>>().await;
546    ///     assert_eq!(out, vec![
547    ///         Ok(vec![1, 2]),
548    ///         Ok(vec![3]),
549    ///         Err(TryReadyChunksError::<i32, i32>(vec![], 4)),
550    ///         Ok(vec![5, 6]),
551    ///     ]);
552    /// });
553    /// ```
554    ///
555    /// # Panics
556    ///
557    /// This method will panic if `capacity` is zero.
558    #[cfg(feature = "alloc")]
559    fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self>
560    where
561        Self: Sized,
562    {
563        TryReadyChunks::new(self, capacity)
564    }
565
566    /// Attempt to filter the values produced by this stream according to the
567    /// provided asynchronous closure.
568    ///
569    /// As values of this stream are made available, the provided predicate `f`
570    /// will be run on them. If the predicate returns a `Future` which resolves
571    /// to `true`, then the stream will yield the value, but if the predicate
572    /// return a `Future` which resolves to `false`, then the value will be
573    /// discarded and the next value will be produced.
574    ///
575    /// All errors are passed through without filtering in this combinator.
576    ///
577    /// Note that this function consumes the stream passed into it and returns a
578    /// wrapped version of it, similar to the existing `filter` methods in
579    /// the standard library.
580    ///
581    /// # Examples
582    /// ```
583    /// use tokio_stream::StreamExt;
584    /// use tokio_stream_util::TryStreamExt;
585    ///
586    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
587    /// rt.block_on(async {
588    ///     let events = tokio_stream::iter(vec![Ok::<i32, &str>(1), Ok(2), Ok(3), Err("error")])
589    ///         .try_filter(|x| {
590    ///             let v = *x;
591    ///             async move { v >= 2 }
592    ///         });
593    ///
594    ///     let out = events.into_stream().collect::<Vec<_>>().await;
595    ///     assert_eq!(out, vec![Ok(2), Ok(3), Err("error")]);
596    /// });
597    /// ```
598    fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
599    where
600        Fut: Future<Output = bool>,
601        F: FnMut(&Self::Ok) -> Fut,
602        Self: Sized,
603    {
604        TryFilter::new(self, f)
605    }
606
607    /// Attempt to filter the values produced by this stream while
608    /// simultaneously mapping them to a different type according to the
609    /// provided asynchronous closure.
610    ///
611    /// As values of this stream are made available, the provided function will
612    /// be run on them. If the future returned by the predicate `f` resolves to
613    /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
614    /// it resolves to [`None`] then the next value will be produced.
615    ///
616    /// All errors are passed through without filtering in this combinator.
617    ///
618    /// Note that this function consumes the stream passed into it and returns a
619    /// wrapped version of it, similar to the existing `filter_map` methods in
620    /// the standard library.
621    ///
622    /// # Examples
623    /// ```
624    /// use tokio_stream::StreamExt;
625    /// use tokio_stream_util::TryStreamExt;
626    ///
627    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
628    /// rt.block_on(async {
629    ///     let halves = tokio_stream::iter(vec![Ok::<i32, &str>(1), Ok(6), Err("error")])
630    ///         .try_filter_map(|x| async move {
631    ///             let ret = if x % 2 == 0 { Some(x / 2) } else { None };
632    ///             Ok(ret)
633    ///         });
634    ///
635    ///     let out = halves.into_stream().collect::<Vec<_>>().await;
636    ///     assert_eq!(out, vec![Ok(3), Err("error")]);
637    /// });
638    /// ```
639    fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
640    where
641        Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
642        F: FnMut(Self::Ok) -> Fut,
643        Self: Sized,
644    {
645        TryFilterMap::new(self, f)
646    }
647
648    /// Flattens a stream of streams into just one continuous stream. Produced streams
649    /// will be polled concurrently and any errors will be passed through without looking at them.
650    /// If the underlying base stream returns an error, it will be **immediately** propagated.
651    ///
652    /// The only argument is an optional limit on the number of concurrently
653    /// polled streams. If this limit is not `None`, no more than `limit` streams
654    /// will be polled at the same time. The `limit` argument is of type
655    /// `Into<Option<usize>>`, and so can be provided as either `None`,
656    /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
657    /// no limit at all, and will have the same result as passing in `None`.
658    ///
659    /// # Examples
660    ///
661    /// ```
662    /// use tokio_stream::StreamExt;
663    /// use tokio_stream_util::TryStreamExt;
664    ///
665    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
666    /// rt.block_on(async {
667    ///     let inner1 = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
668    ///     let inner2 = tokio_stream::iter(vec![Ok::<i32, i32>(3), Ok(4)]);
669    ///     let base = tokio_stream::iter(vec![Ok::<_, i32>(inner1), Ok::<_, i32>(inner2)]);
670    ///     let stream = base.try_flatten_unordered(None);
671    ///
672    ///     let mut out = stream.into_stream().collect::<Vec<_>>().await;
673    ///     out.sort_by_key(|r| r.clone().unwrap_or_default());
674    ///     assert_eq!(out, vec![Ok(1), Ok(2), Ok(3), Ok(4)]);
675    /// });
676    /// ```
677    #[cfg(all(feature = "alloc", feature = "std"))]
678    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
679    fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self>
680    where
681        Self: Sized,
682        Self::Ok: TryStream + Unpin,
683        <Self::Ok as TryStream>::Error: From<Self::Error>,
684    {
685        TryFlattenUnordered::new(self, limit.into())
686    }
687
688    /// Flattens a stream of streams into just one continuous stream.
689    ///
690    /// If this stream's elements are themselves streams then this combinator
691    /// will flatten out the entire stream to one long chain of elements. Any
692    /// errors are passed through without looking at them, but otherwise each
693    /// individual stream will get exhausted before moving on to the next.
694    ///
695    /// # Examples
696    ///
697    /// ```
698    /// use tokio_stream::StreamExt;
699    /// use tokio_stream_util::TryStreamExt;
700    ///
701    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
702    /// rt.block_on(async {
703    ///     let inner1 = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
704    ///     let inner2 = tokio_stream::iter(vec![Ok::<i32, i32>(3), Ok(4)]);
705    ///     let base = tokio_stream::iter(vec![Ok::<_, i32>(inner1), Ok::<_, i32>(inner2)]);
706    ///     let stream = base.try_flatten();
707    ///
708    ///     let out = stream.into_stream().collect::<Vec<_>>().await;
709    ///     assert_eq!(out, vec![Ok(1), Ok(2), Ok(3), Ok(4)]);
710    /// });
711    /// ```
712    fn try_flatten(self) -> TryFlatten<Self>
713    where
714        Self::Ok: TryStream,
715        <Self::Ok as TryStream>::Error: From<Self::Error>,
716        Self: Sized,
717    {
718        TryFlatten::new(self)
719    }
720
721    /// Attempt to concatenate all items of a stream into a single
722    /// extendable destination, returning a future representing the end result.
723    ///
724    /// This combinator will extend the first item with the contents of all
725    /// the subsequent successful results of the stream. If the stream is empty,
726    /// the default value will be returned.
727    ///
728    /// Works with all collections that implement the [`Extend`](core::iter::Extend) trait.
729    ///
730    /// This method is similar to [`concat`](tokio_stream::stream::StreamExt::concat), but will
731    /// exit early if an error is encountered in the stream.
732    ///
733    /// # Examples
734    ///
735    /// ```
736    /// use tokio_stream_util::TryStreamExt;
737    ///
738    /// #[tokio::main]
739    /// async fn main() {
740    ///     let fut = tokio_stream::iter(vec![
741    ///         Ok::<Vec<i32>, ()>(vec![1, 2, 3]),
742    ///         Ok(vec![4, 5, 6]),
743    ///         Ok(vec![7, 8, 9]),
744    ///     ]).try_concat();
745    ///
746    ///     assert_eq!(fut.await, Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
747    /// }
748    /// ```
749    fn try_concat(self) -> TryConcat<Self>
750    where
751        Self: Sized,
752        Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
753    {
754        TryConcat::new(self)
755    }
756
757    /// Attempt to execute several futures from a stream concurrently (unordered).
758    ///
759    /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
760    /// that matches the stream's `Error` type.
761    ///
762    /// This adaptor will buffer up to `n` futures and then return their
763    /// outputs in the order in which they complete. If the underlying stream
764    /// returns an error, it will be immediately propagated.
765    ///
766    /// The limit argument is of type `Into<Option<usize>>`, and so can be
767    /// provided as either `None`, `Some(10)`, or just `10`. Note: a limit of zero is
768    /// interpreted as no limit at all, and will have the same result as passing in `None`.
769    ///
770    /// The returned stream will be a stream of results, each containing either
771    /// an error or a future's output. An error can be produced either by the
772    /// underlying stream itself or by one of the futures it yielded.
773    ///
774    /// This method is only available when the `std` or `alloc` feature of this
775    /// library is activated, and it is activated by default.
776    ///
777    /// # Examples
778    ///
779    /// ```
780    /// use tokio_stream::StreamExt;
781    /// use tokio_stream_util::TryStreamExt;
782    ///
783    ///   #[tokio::main]
784    ///   async fn main() {
785    ///     let unordered = tokio_stream::iter(vec![Ok::<i32, &str>(3), Ok(1), Ok(2)])
786    ///         .map_ok(async |i| if i == 2 { Err("error") } else { Ok(i) })
787    ///         .try_buffer_unordered(None);
788    ///
789    ///     let mut out = unordered.into_stream().collect::<Vec<_>>().await;
790    ///     out.sort();
791    ///     assert_eq!(out, vec![Ok(1), Ok(3), Err("error")]);
792    /// }
793    /// ```
794    #[cfg(feature = "alloc")]
795    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
796    fn try_buffer_unordered(self, n: impl Into<Option<usize>>) -> TryBufferUnordered<Self>
797    where
798        Self::Ok: TryFuture<Error = Self::Error>,
799        Self: Sized,
800    {
801        TryBufferUnordered::new(self, n.into())
802    }
803
804    /// Attempt to execute several futures from a stream concurrently.
805    ///
806    /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
807    /// that matches the stream's `Error` type.
808    ///
809    /// This adaptor will buffer up to `n` futures and then return their
810    /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
811    /// be immediately propagated.
812    ///
813    /// The limit argument is of type `Into<Option<usize>>`, and so can be
814    /// provided as either `None`, `Some(10)`, or just `10`. Note: a limit of zero is
815    /// interpreted as no limit at all, and will have the same result as passing in `None`.
816    ///
817    /// The returned stream will be a stream of results, each containing either
818    /// an error or a future's output. An error can be produced either by the
819    /// underlying stream itself or by one of the futures it yielded.
820    ///
821    /// This method is only available when the `std` or `alloc` feature of this
822    /// library is activated, and it is activated by default.
823    ///
824    /// # Examples
825    ///
826    /// ```
827    /// use tokio_stream::StreamExt;
828    /// use tokio_stream_util::TryStreamExt;
829    ///
830    /// #[tokio::main]
831    /// async fn main() {
832    ///     let buffered = tokio_stream::iter(vec![Ok::<i32, &str>(3), Ok(1), Ok(2)])
833    ///         .map_ok(|i| async move { if i == 2 { Err("error") } else { Ok(i) } })
834    ///         .try_buffered(None);
835    ///
836    ///     let out = buffered.into_stream().collect::<Vec<_>>().await;
837    ///     assert_eq!(out, vec![Ok(3), Ok(1), Err("error")]);
838    /// }
839    /// ```
840    #[cfg(feature = "alloc")]
841    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
842    fn try_buffered(self, n: impl Into<Option<usize>>) -> TryBuffered<Self>
843    where
844        Self::Ok: TryFuture<Error = Self::Error>,
845        Self: Sized,
846    {
847        TryBuffered::new(self, n.into())
848    }
849
850    /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
851    /// stream types.
852    fn try_poll_next_unpin(
853        &mut self,
854        cx: &mut Context<'_>,
855    ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
856    where
857        Self: Unpin,
858    {
859        Pin::new(self).try_poll_next(cx)
860    }
861
862    /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
863    ///
864    /// This method is only available when the `std` feature of this
865    /// library is activated, and it is activated by default.
866    ///
867    /// # Examples
868    ///
869    /// ```
870    /// use tokio_stream_util::TryStreamExt;
871    ///
872    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
873    /// rt.block_on(async {
874    ///     let reader = tokio_stream::iter([Ok::<_, std::io::Error>(vec![1, 2, 3])]).into_async_read();
875    ///     let mut buf_reader = tokio::io::BufReader::new(reader);
876    ///     let mut buf = Vec::new();
877    ///     use tokio::io::AsyncReadExt;
878    ///     buf_reader.read_to_end(&mut buf).await.unwrap();
879    ///     assert_eq!(buf, vec![1, 2, 3]);
880    /// });
881    /// ```
882    #[cfg(feature = "io")]
883    #[cfg(feature = "std")]
884    #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
885    fn into_async_read(self) -> IntoAsyncRead<Self>
886    where
887        Self: Sized + TryStreamExt<Error = std::io::Error>,
888        Self::Ok: AsRef<[u8]>,
889    {
890        IntoAsyncRead::new(self)
891    }
892
893    /// Attempt to execute a predicate over an asynchronous stream and evaluate if all items
894    /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
895    /// that does not satisfy the predicate.
896    ///
897    /// # Examples
898    ///
899    /// ```
900    /// use tokio_stream::StreamExt;
901    /// use tokio_stream_util::TryStreamExt;
902    ///
903    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
904    /// rt.block_on(async {
905    ///     let future_true = tokio_stream::iter(vec![1, 2, 3]).map(|i| Ok::<i32, &str>(i))
906    ///         .try_all(|i| async move { i > 0 });
907    ///     assert!(future_true.await.unwrap());
908    ///
909    ///     let future_err = tokio_stream::iter(vec![Ok::<i32, &str>(1), Err("err"), Ok(3)])
910    ///         .try_all(|i| async move { i > 0 });
911    ///     assert!(future_err.await.is_err());
912    /// });
913    /// ```
914    fn try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F>
915    where
916        Self: Sized,
917        F: FnMut(Self::Ok) -> Fut,
918        Fut: Future<Output = bool>,
919    {
920        TryAll::new(self, f)
921    }
922
923    /// Attempt to execute a predicate over an asynchronous stream and evaluate if any items
924    /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
925    /// that satisfies the predicate.
926    ///
927    /// # Examples
928    ///
929    /// ```
930    /// use tokio_stream::StreamExt;
931    /// use tokio_stream_util::TryStreamExt;
932    ///
933    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
934    /// rt.block_on(async {
935    ///     let future_true = tokio_stream::iter(0..10).map(|i| Ok::<i32, &str>(i))
936    ///         .try_any(|i| async move { i == 3 });
937    ///     assert!(future_true.await.unwrap());
938    ///
939    ///     let future_err = tokio_stream::iter(vec![Ok::<i32, &str>(1), Err("err"), Ok(3)])
940    ///         .try_any(|i| async move { i == 3 });
941    ///     assert!(future_err.await.is_err());
942    /// });
943    /// ```
944    fn try_any<Fut, F>(self, f: F) -> TryAny<Self, Fut, F>
945    where
946        Self: Sized,
947        F: FnMut(Self::Ok) -> Fut,
948        Fut: Future<Output = bool>,
949    {
950        TryAny::new(self, f)
951    }
952}