tokio_stream_util/try_stream/ext/
mod.rs

1//! Streams
2//!
3//! This module contains a number of functions for working with `Streams`s
4//! that return `Result`s, allowing for short-circuiting computations.
5
6use core::future::Future;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9use futures_core::future::TryFuture;
10
11use super::TryStream;
12use crate::FusedStream;
13
14mod and_then;
15pub use and_then::AndThen;
16
17mod err_into;
18pub use err_into::ErrInto;
19
20mod inspect;
21pub use inspect::InspectOk;
22
23pub use inspect::InspectErr;
24
25mod into_stream;
26
27#[cfg(feature = "alloc")]
28#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
29pub(crate) use into_stream::IntoFuseStream;
30
31pub use into_stream::IntoStream;
32
33mod map_ok;
34pub use map_ok::MapOk;
35
36mod map_err;
37pub use map_err::MapErr;
38
39mod or_else;
40pub use or_else::OrElse;
41
42mod try_next;
43pub use try_next::TryNext;
44
45mod try_filter;
46pub use try_filter::TryFilter;
47
48#[cfg(all(feature = "sink", feature = "alloc"))]
49mod try_forward;
50#[cfg(all(feature = "sink", feature = "alloc"))]
51pub use try_forward::TryForward;
52
53mod try_filter_map;
54pub use try_filter_map::TryFilterMap;
55
56mod try_flatten;
57pub use try_flatten::TryFlatten;
58
59#[cfg(all(feature = "alloc", feature = "std"))]
60#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
61mod try_flatten_unordered;
62#[cfg(all(feature = "alloc", feature = "std"))]
63#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
64pub use try_flatten_unordered::TryFlattenUnordered;
65
66mod try_collect;
67pub use try_collect::TryCollect;
68
69mod try_concat;
70pub use try_concat::TryConcat;
71
72#[cfg(feature = "alloc")]
73mod try_chunks;
74#[cfg(feature = "alloc")]
75pub use try_chunks::{TryChunks, TryChunksError};
76
77#[cfg(feature = "alloc")]
78mod try_ready_chunks;
79#[cfg(feature = "alloc")]
80pub use try_ready_chunks::{TryReadyChunks, TryReadyChunksError};
81
82mod try_unfold;
83pub use try_unfold::{try_unfold, TryUnfold};
84
85mod try_skip_while;
86pub use try_skip_while::TrySkipWhile;
87
88mod try_take_while;
89pub use try_take_while::TryTakeWhile;
90
91#[cfg(feature = "alloc")]
92#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
93mod try_buffer_unordered;
94#[cfg(feature = "alloc")]
95#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
96pub use try_buffer_unordered::TryBufferUnordered;
97
98#[cfg(feature = "alloc")]
99#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
100mod try_buffered;
101#[cfg(feature = "alloc")]
102#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
103pub use try_buffered::TryBuffered;
104
105#[cfg(all(feature = "io", feature = "std"))]
106mod into_async_read;
107#[cfg(all(feature = "io", feature = "std"))]
108pub use into_async_read::IntoAsyncRead;
109
110mod try_all;
111pub use try_all::TryAll;
112
113mod try_any;
114pub use try_any::TryAny;
115
116impl<S: ?Sized + TryStream> TryStreamExt for S {}
117
118/// Adapters specific to `Result`-returning streams
119pub trait TryStreamExt: TryStream {
120    /// Wraps the current stream in a new stream which converts the error type
121    /// into the one provided.
122    ///
123    /// # Examples
124    ///
125    /// ```
126    /// use tokio_stream::StreamExt;
127    /// use tokio_stream_util::TryStreamExt;
128    ///
129    /// #[tokio::main]
130    /// async fn main() {
131    /// let stream =
132    ///     tokio_stream::iter(vec![Ok::<(), i32>(()), Err::<(), i32>(5)])
133    ///         .err_into::<i64>();
134    ///
135    /// let collected =  stream.into_stream().collect::<Vec<_>>().await;
136    /// assert_eq!(collected, vec![Ok(()), Err(5i64)]);
137    /// }
138    /// ```
139    fn err_into<E>(self) -> ErrInto<Self, E>
140    where
141        Self: Sized,
142        Self::Error: Into<E>,
143    {
144        ErrInto::new(self)
145    }
146
147    /// Wraps the current stream in a new stream which maps the success value
148    /// using the provided closure.
149    ///
150    /// # Examples
151    ///
152    /// ```
153    /// use tokio_stream::StreamExt;
154    /// use tokio_stream_util::TryStreamExt;
155    ///
156    /// #[tokio::main]
157    /// async fn main() {
158    /// let stream =
159    ///     tokio_stream::iter(vec![Ok::<i32, i32>(5), Err::<i32, i32>(0)])
160    ///         .map_ok(|x| x + 2);
161    ///
162    /// let out =  stream.into_stream().collect::<Vec<_>>().await;
163    /// assert_eq!(out, vec![Ok(7), Err(0)]);
164    /// }
165    /// ```
166    fn map_ok<V, F>(self, f: F) -> MapOk<Self, V, F>
167    where
168        Self: TryStream + Unpin + Sized,
169        F: FnMut(Self::Ok) -> V,
170    {
171        MapOk::new(self, f)
172    }
173
174    /// Wraps the current stream in a new stream which maps the error value
175    /// using the provided closure.
176    ///
177    /// # Examples
178    ///
179    /// ```
180    /// use tokio_stream::StreamExt;
181    /// use tokio_stream_util::TryStreamExt;
182    ///
183    /// #[derive(Debug)]
184    /// struct MyErr(String);
185    /// impl core::fmt::Display for MyErr { fn fmt(&self, f:&mut core::fmt::Formatter<'_>)->core::fmt::Result{ self.0.fmt(f)} }
186    /// impl core::error::Error for MyErr {}
187    ///
188    /// let stream =
189    ///     tokio_stream::iter(vec![Ok::<i32, i32>(5), Err::<i32, i32>(0)])
190    ///         .map_err(|x| MyErr(format!("e{}", x)));
191    ///
192    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
193    /// let out = rt.block_on(async { stream.into_stream().collect::<Vec<_>>().await });
194    /// assert_eq!(format!("{:?}", out), format!("{:?}", vec![Ok(5), Err(MyErr(String::from("e0")))]));
195    /// ```
196    fn map_err<E, F>(self, f: F) -> MapErr<Self, E, F>
197    where
198        Self: TryStream + Unpin + Sized,
199        E: core::error::Error,
200        F: FnMut(Self::Error) -> E,
201    {
202        MapErr::new(self, f)
203    }
204
205    /// Chain on a computation for when a value is ready, passing the successful
206    /// results to the provided closure `f`.
207    ///
208    /// This function can be used to run a unit of work when the next successful
209    /// value on a stream is ready. The closure provided will be yielded a value
210    /// when ready, and the returned future will then be run to completion to
211    /// produce the next value on this stream.
212    ///
213    /// Any errors produced by this stream will not be passed to the closure,
214    /// and will be passed through.
215    ///
216    /// The returned value of the closure must implement the `TryFuture` trait
217    /// and can represent some more work to be done before the composed stream
218    /// is finished.
219    ///
220    /// Note that this function consumes the receiving stream and returns a
221    /// wrapped version of it.
222    ///
223    /// To process the entire stream and return a single future representing
224    /// success or error, use `try_for_each` instead.
225    ///
226    /// # Examples
227    ///
228    /// ```
229    /// use tokio_stream::StreamExt;
230    /// use tokio_stream_util::TryStreamExt;
231    ///
232    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
233    /// rt.block_on(async {
234    ///     let stream = tokio_stream::iter(vec![Ok::<i32, ()>(1), Ok(2)])
235    ///         .and_then(|result| async move {
236    ///             Ok(if result % 2 == 0 { Some(result) } else { None })
237    ///         });
238    ///
239    ///     let out = stream.into_stream().collect::<Vec<_>>().await;
240    ///     assert_eq!(out, vec![Ok(None), Ok(Some(2))]);
241    /// });
242    /// ```
243    fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
244    where
245        F: FnMut(Self::Ok) -> Fut,
246        Fut: TryFuture<Error = Self::Error>,
247        Self: Sized,
248    {
249        AndThen::new(self, f)
250    }
251
252    /// Chain on a computation for when an error happens, passing the
253    /// erroneous result to the provided closure `f`.
254    ///
255    /// This function can be used to run a unit of work and attempt to recover from
256    /// an error if one happens. The closure provided will be yielded an error
257    /// when one appears, and the returned future will then be run to completion
258    /// to produce the next value on this stream.
259    ///
260    /// Any successful values produced by this stream will not be passed to the
261    /// closure, and will be passed through.
262    ///
263    /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait
264    /// and can represent some more work to be done before the composed stream
265    /// is finished.
266    ///
267    /// Note that this function consumes the receiving stream and returns a
268    /// wrapped version of it.
269    fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
270    where
271        F: FnMut(Self::Error) -> Fut,
272        Fut: TryFuture<Ok = Self::Ok>,
273        Self: Sized,
274    {
275        OrElse::new(self, f)
276    }
277
278    #[cfg(all(feature = "sink", feature = "alloc"))]
279    /// A future that completes after the given stream has been fully processed
280    /// into the sink and the sink has been flushed and closed.
281    ///
282    /// This future will drive the stream to keep producing items until it is
283    /// exhausted, sending each item to the sink. It will complete once the
284    /// stream is exhausted, the sink has received and flushed all items, and
285    /// the sink is closed. Note that neither the original stream nor provided
286    /// sink will be output by this future. Pass the sink by `Pin<&mut S>`
287    /// (for example, via `try_forward(&mut sink)` inside an `async` fn/block) in
288    /// order to preserve access to the `Sink`. If the stream produces an error,
289    /// that error will be returned by this future without flushing/closing the sink.
290    fn try_forward<S>(self, sink: S) -> TryForward<Self, S>
291    where
292        S: async_sink::Sink<Self::Ok, Error = Self::Error>,
293        Self: Sized,
294    {
295        TryForward::new(self, sink)
296    }
297
298    /// Do something with the success value of this stream, afterwards passing
299    /// it on.
300    ///
301    /// This is similar to the `StreamExt::inspect` method where it allows
302    /// easily inspecting the success value as it passes through the stream, for
303    /// example to debug what's going on.
304    fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
305    where
306        F: FnMut(&Self::Ok),
307        Self: Sized,
308    {
309        InspectOk::new(self, f)
310    }
311
312    /// Do something with the error value of this stream, afterwards passing it on.
313    ///
314    /// This is similar to the `StreamExt::inspect` method where it allows
315    /// easily inspecting the error value as it passes through the stream, for
316    /// example to debug what's going on.
317    fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
318    where
319        F: FnMut(&Self::Error),
320        Self: Sized,
321    {
322        InspectErr::new(self, f)
323    }
324
325    /// Wraps a [`TryStream`] into a type that implements
326    /// [`Stream`](tokio_stream::Stream)
327    ///
328    /// [`TryStream`]s currently do not implement the
329    /// [`Stream`](tokio_stream::Stream) trait because of limitations
330    /// of the compiler.
331    ///
332    /// # Examples
333    ///
334    /// ```
335    /// use tokio_stream::StreamExt;
336    /// use tokio_stream_util::{TryStream, TryStreamExt};
337    ///
338    /// type T = i32;
339    /// type E = ();
340    ///
341    /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> {
342    ///     tokio_stream::iter(vec![Ok::<T, E>(1), Ok(2), Err(())])
343    /// }
344    ///
345    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
346    /// rt.block_on(async {
347    ///     let out = make_try_stream().into_stream().collect::<Vec<_>>().await;
348    ///     assert_eq!(out, vec![Ok(1), Ok(2), Err(())]);
349    /// });
350    /// ```
351    fn into_stream(self) -> IntoStream<Self>
352    where
353        Self: Sized,
354    {
355        IntoStream::new(self)
356    }
357
358    /// Creates a future that attempts to resolve the next item in the stream.
359    /// If an error is encountered before the next item, the error is returned
360    /// instead.
361    ///
362    /// This is similar to the `Stream::next` combinator, but returns a
363    /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making
364    /// for easy use with the `?` operator.
365    ///
366    /// # Examples
367    ///
368    /// ```
369    /// use tokio_stream_util::TryStreamExt;
370    ///
371    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
372    /// rt.block_on(async {
373    ///     let mut stream = tokio_stream::iter(vec![Ok::<(), ()>(()), Err::<(), ()>(())]);
374    ///     assert_eq!(stream.try_next().await, Ok(Some(())));
375    /// });
376    /// ```
377    fn try_next(&mut self) -> TryNext<'_, Self>
378    where
379        Self: Unpin,
380    {
381        TryNext::new(self)
382    }
383
384    /// Skip elements on this stream while the provided asynchronous predicate
385    /// resolves to `true`.
386    ///
387    /// This function is similar to
388    /// [`StreamExt::skip_while`](futures_util::stream::StreamExt::skip_while) but exits
389    /// early if an error occurs.
390    ///
391    /// # Examples
392    ///
393    /// ```
394    /// use tokio_stream::StreamExt;
395    /// use tokio_stream_util::TryStreamExt;
396    ///
397    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
398    /// rt.block_on(async {
399    ///     let stream = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)])
400    ///         .try_skip_while(|x| {
401    ///             let v = *x;
402    ///             async move { Ok(v < 3) }
403    ///         });
404    ///
405    ///     let out = stream.into_stream().collect::<Vec<_>>().await;
406    ///     assert_eq!(out, vec![Ok(3), Ok(2)]);
407    /// });
408    /// ```
409    fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
410    where
411        F: FnMut(&Self::Ok) -> Fut,
412        Fut: TryFuture<Ok = bool, Error = Self::Error>,
413        Self: Sized,
414    {
415        TrySkipWhile::new(self, f)
416    }
417
418    /// Take elements on this stream while the provided asynchronous predicate
419    /// resolves to `true`.
420    ///
421    /// This function is similar to
422    /// [`StreamExt::take_while`](futures_util::stream::StreamExt::take_while) but exits
423    /// early if an error occurs.
424    ///
425    /// # Examples
426    ///
427    /// ```
428    /// use tokio_stream::StreamExt;
429    /// use tokio_stream_util::TryStreamExt;
430    ///
431    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
432    /// rt.block_on(async {
433    ///     let stream = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)])
434    ///         .try_take_while(|x| {
435    ///             let v = *x;
436    ///             async move { Ok(v < 3) }
437    ///         });
438    ///
439    ///     let out = stream.into_stream().collect::<Vec<_>>().await;
440    ///     assert_eq!(out, vec![Ok(1), Ok(2)]);
441    /// });
442    /// ```
443    fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
444    where
445        F: FnMut(&Self::Ok) -> Fut,
446        Fut: TryFuture<Ok = bool, Error = Self::Error>,
447        Self: Sized,
448    {
449        TryTakeWhile::new(self, f)
450    }
451
452    /// Attempt to transform a stream into a collection,
453    /// returning a future representing the result of that computation.
454    ///
455    /// This combinator will collect all successful results of this stream and
456    /// collect them into the specified collection type. If an error happens then all
457    /// collected elements will be dropped and the error will be returned.
458    ///
459    /// The returned future will be resolved when the stream terminates.
460    ///
461    /// # Examples
462    ///
463    /// ```
464    /// use tokio_stream_util::TryStreamExt;
465    ///
466    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
467    /// rt.block_on(async {
468    ///     let future = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Err(3)])
469    ///         .try_collect::<Vec<_>>();
470    ///
471    ///     assert_eq!(future.await, Err(3));
472    /// });
473    /// ```
474    fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
475    where
476        Self: Sized,
477    {
478        TryCollect::new(self)
479    }
480
481    /// An adaptor for chunking up successful items of the stream inside a vector.
482    ///
483    /// This combinator will attempt to pull successful items from this stream and buffer
484    /// them into a local vector. At most `capacity` items will get buffered
485    /// before they're yielded from the returned stream.
486    ///
487    /// Note that the vectors returned from this iterator may not always have
488    /// `capacity` elements. If the underlying stream ended and only a partial
489    /// vector was created, it'll be returned. Additionally if an error happens
490    /// from the underlying stream then the currently buffered items will be
491    /// yielded.
492    ///
493    /// This method is only available when the `std` or `alloc` feature of this
494    /// library is activated, and it is activated by default.
495    ///
496    /// This function is similar to
497    /// [`StreamExt::chunks`](futures_util::stream::StreamExt::chunks) but exits
498    /// early if an error occurs.
499    ///
500    /// # Examples
501    ///
502    /// ```
503    /// use tokio_stream::StreamExt;
504    /// use tokio_stream_util::TryStreamExt;
505    /// use tokio_stream_util::try_stream::TryChunksError;
506    ///
507    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
508    /// rt.block_on(async {
509    ///     let stream = tokio_stream::iter(vec![
510    ///         Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)
511    ///     ]).try_chunks(2);
512    ///
513    ///     let out = stream.into_stream().collect::<Vec<_>>().await;
514    ///     assert_eq!(out, vec![
515    ///         Ok(vec![1, 2]),
516    ///         Ok(vec![3]),
517    ///         Err(TryChunksError::<i32, i32>(vec![], 4)),
518    ///         Ok(vec![5, 6]),
519    ///     ]);
520    /// });
521    /// ```
522    ///
523    /// # Panics
524    ///
525    /// This method will panic if `capacity` is zero.
526    #[cfg(feature = "alloc")]
527    fn try_chunks(self, capacity: usize) -> TryChunks<Self>
528    where
529        <IntoFuseStream<Self> as tokio_stream::Stream>::Item: core::fmt::Debug,
530        Self: Sized,
531    {
532        TryChunks::new(self, capacity)
533    }
534
535    /// An adaptor for chunking up successful, ready items of the stream inside a vector.
536    ///
537    /// This combinator will attempt to pull successful items from this stream and buffer
538    /// them into a local vector. At most `capacity` items will get buffered
539    /// before they're yielded from the returned stream. If the underlying stream
540    /// returns `Poll::Pending`, and the collected chunk is not empty, it will
541    /// be immediately returned.
542    ///
543    /// Note that the vectors returned from this iterator may not always have
544    /// `capacity` elements. If the underlying stream ended and only a partial
545    /// vector was created, it'll be returned. Additionally if an error happens
546    /// from the underlying stream then the currently buffered items will be
547    /// yielded.
548    ///
549    /// This method is only available when the `std` or `alloc` feature of this
550    /// library is activated, and it is activated by default.
551    ///
552    /// This function is similar to
553    /// [`StreamExt::ready_chunks`](futures_util::stream::StreamExt::ready_chunks) but exits
554    /// early if an error occurs.
555    ///
556    /// # Examples
557    ///
558    /// ```
559    /// use tokio_stream::StreamExt;
560    /// use tokio_stream_util::TryStreamExt;
561    /// use tokio_stream_util::try_stream::TryReadyChunksError;
562    ///
563    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
564    /// rt.block_on(async {
565    ///     let stream = tokio_stream::iter(vec![
566    ///         Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)
567    ///     ]).try_ready_chunks(2);
568    ///
569    ///     let out = stream.into_stream().collect::<Vec<_>>().await;
570    ///     assert_eq!(out, vec![
571    ///         Ok(vec![1, 2]),
572    ///         Ok(vec![3]),
573    ///         Err(TryReadyChunksError::<i32, i32>(vec![], 4)),
574    ///         Ok(vec![5, 6]),
575    ///     ]);
576    /// });
577    /// ```
578    ///
579    /// # Panics
580    ///
581    /// This method will panic if `capacity` is zero.
582    #[cfg(feature = "alloc")]
583    fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self>
584    where
585        Self: Sized,
586    {
587        TryReadyChunks::new(self, capacity)
588    }
589
590    /// Attempt to filter the values produced by this stream according to the
591    /// provided asynchronous closure.
592    ///
593    /// As values of this stream are made available, the provided predicate `f`
594    /// will be run on them. If the predicate returns a `Future` which resolves
595    /// to `true`, then the stream will yield the value, but if the predicate
596    /// return a `Future` which resolves to `false`, then the value will be
597    /// discarded and the next value will be produced.
598    ///
599    /// All errors are passed through without filtering in this combinator.
600    ///
601    /// Note that this function consumes the stream passed into it and returns a
602    /// wrapped version of it, similar to the existing `filter` methods in
603    /// the standard library.
604    ///
605    /// # Examples
606    /// ```
607    /// use tokio_stream::StreamExt;
608    /// use tokio_stream_util::TryStreamExt;
609    ///
610    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
611    /// rt.block_on(async {
612    ///     let events = tokio_stream::iter(vec![Ok::<i32, &str>(1), Ok(2), Ok(3), Err("error")])
613    ///         .try_filter(|x| {
614    ///             let v = *x;
615    ///             async move { v >= 2 }
616    ///         });
617    ///
618    ///     let out = events.into_stream().collect::<Vec<_>>().await;
619    ///     assert_eq!(out, vec![Ok(2), Ok(3), Err("error")]);
620    /// });
621    /// ```
622    fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
623    where
624        Fut: Future<Output = bool>,
625        F: FnMut(&Self::Ok) -> Fut,
626        Self: Sized,
627    {
628        TryFilter::new(self, f)
629    }
630
631    /// Attempt to filter the values produced by this stream while
632    /// simultaneously mapping them to a different type according to the
633    /// provided asynchronous closure.
634    ///
635    /// As values of this stream are made available, the provided function will
636    /// be run on them. If the future returned by the predicate `f` resolves to
637    /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
638    /// it resolves to [`None`] then the next value will be produced.
639    ///
640    /// All errors are passed through without filtering in this combinator.
641    ///
642    /// Note that this function consumes the stream passed into it and returns a
643    /// wrapped version of it, similar to the existing `filter_map` methods in
644    /// the standard library.
645    ///
646    /// # Examples
647    /// ```
648    /// use tokio_stream::StreamExt;
649    /// use tokio_stream_util::TryStreamExt;
650    ///
651    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
652    /// rt.block_on(async {
653    ///     let halves = tokio_stream::iter(vec![Ok::<i32, &str>(1), Ok(6), Err("error")])
654    ///         .try_filter_map(|x| async move {
655    ///             let ret = if x % 2 == 0 { Some(x / 2) } else { None };
656    ///             Ok(ret)
657    ///         });
658    ///
659    ///     let out = halves.into_stream().collect::<Vec<_>>().await;
660    ///     assert_eq!(out, vec![Ok(3), Err("error")]);
661    /// });
662    /// ```
663    fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
664    where
665        Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
666        F: FnMut(Self::Ok) -> Fut,
667        Self: Sized,
668    {
669        TryFilterMap::new(self, f)
670    }
671
672    /// Flattens a stream of streams into just one continuous stream. Produced streams
673    /// will be polled concurrently and any errors will be passed through without looking at them.
674    /// If the underlying base stream returns an error, it will be **immediately** propagated.
675    ///
676    /// The only argument is an optional limit on the number of concurrently
677    /// polled streams. If this limit is not `None`, no more than `limit` streams
678    /// will be polled at the same time. The `limit` argument is of type
679    /// `Into<Option<usize>>`, and so can be provided as either `None`,
680    /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
681    /// no limit at all, and will have the same result as passing in `None`.
682    ///
683    /// # Examples
684    ///
685    /// ```
686    /// use tokio_stream::StreamExt;
687    /// use tokio_stream_util::TryStreamExt;
688    ///
689    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
690    /// rt.block_on(async {
691    ///     let inner1 = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
692    ///     let inner2 = tokio_stream::iter(vec![Ok::<i32, i32>(3), Ok(4)]);
693    ///     let base = tokio_stream::iter(vec![Ok::<_, i32>(inner1), Ok::<_, i32>(inner2)]);
694    ///     let stream = base.try_flatten_unordered(None);
695    ///
696    ///     let mut out = stream.into_stream().collect::<Vec<_>>().await;
697    ///     out.sort_by_key(|r| r.clone().unwrap_or_default());
698    ///     assert_eq!(out, vec![Ok(1), Ok(2), Ok(3), Ok(4)]);
699    /// });
700    /// ```
701    #[cfg(all(feature = "alloc", feature = "std"))]
702    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
703    fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self>
704    where
705        Self: Sized,
706        Self::Ok: TryStream + Unpin,
707        <Self::Ok as TryStream>::Error: From<Self::Error>,
708    {
709        TryFlattenUnordered::new(self, limit.into())
710    }
711
712    /// Flattens a stream of streams into just one continuous stream.
713    ///
714    /// If this stream's elements are themselves streams then this combinator
715    /// will flatten out the entire stream to one long chain of elements. Any
716    /// errors are passed through without looking at them, but otherwise each
717    /// individual stream will get exhausted before moving on to the next.
718    ///
719    /// # Examples
720    ///
721    /// ```
722    /// use tokio_stream::StreamExt;
723    /// use tokio_stream_util::TryStreamExt;
724    ///
725    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
726    /// rt.block_on(async {
727    ///     let inner1 = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
728    ///     let inner2 = tokio_stream::iter(vec![Ok::<i32, i32>(3), Ok(4)]);
729    ///     let base = tokio_stream::iter(vec![Ok::<_, i32>(inner1), Ok::<_, i32>(inner2)]);
730    ///     let stream = base.try_flatten();
731    ///
732    ///     let out = stream.into_stream().collect::<Vec<_>>().await;
733    ///     assert_eq!(out, vec![Ok(1), Ok(2), Ok(3), Ok(4)]);
734    /// });
735    /// ```
736    fn try_flatten(self) -> TryFlatten<Self>
737    where
738        Self::Ok: TryStream,
739        <Self::Ok as TryStream>::Error: From<Self::Error>,
740        Self: Sized,
741    {
742        TryFlatten::new(self)
743    }
744
745    /// Attempt to concatenate all items of a stream into a single
746    /// extendable destination, returning a future representing the end result.
747    ///
748    /// This combinator will extend the first item with the contents of all
749    /// the subsequent successful results of the stream. If the stream is empty,
750    /// the default value will be returned.
751    ///
752    /// Works with all collections that implement the [`Extend`](core::iter::Extend) trait.
753    ///
754    /// This method is similar to [`concat`](futures_util::stream::StreamExt::concat), but will
755    /// exit early if an error is encountered in the stream.
756    ///
757    /// # Examples
758    ///
759    /// ```
760    /// use tokio_stream_util::TryStreamExt;
761    ///
762    /// #[tokio::main]
763    /// async fn main() {
764    ///     let fut = tokio_stream::iter(vec![
765    ///         Ok::<Vec<i32>, ()>(vec![1, 2, 3]),
766    ///         Ok(vec![4, 5, 6]),
767    ///         Ok(vec![7, 8, 9]),
768    ///     ]).try_concat();
769    ///
770    ///     assert_eq!(fut.await, Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
771    /// }
772    /// ```
773    fn try_concat(self) -> TryConcat<Self>
774    where
775        Self: Sized,
776        Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
777    {
778        TryConcat::new(self)
779    }
780
781    /// Attempt to execute several futures from a stream concurrently (unordered).
782    ///
783    /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
784    /// that matches the stream's `Error` type.
785    ///
786    /// This adaptor will buffer up to `n` futures and then return their
787    /// outputs in the order in which they complete. If the underlying stream
788    /// returns an error, it will be immediately propagated.
789    ///
790    /// The limit argument is of type `Into<Option<usize>>`, and so can be
791    /// provided as either `None`, `Some(10)`, or just `10`. Note: a limit of zero is
792    /// interpreted as no limit at all, and will have the same result as passing in `None`.
793    ///
794    /// The returned stream will be a stream of results, each containing either
795    /// an error or a future's output. An error can be produced either by the
796    /// underlying stream itself or by one of the futures it yielded.
797    ///
798    /// This method is only available when the `std` or `alloc` feature of this
799    /// library is activated, and it is activated by default.
800    ///
801    /// # Examples
802    ///
803    /// ```
804    /// use tokio_stream::StreamExt;
805    /// use tokio_stream_util::TryStreamExt;
806    ///
807    ///   #[tokio::main]
808    ///   async fn main() {
809    ///     let unordered = tokio_stream::iter(vec![Ok::<i32, &str>(3), Ok(1), Ok(2)])
810    ///         .map_ok(async |i| if i == 2 { Err("error") } else { Ok(i) })
811    ///         .try_buffer_unordered(None);
812    ///
813    ///     let mut out = unordered.into_stream().collect::<Vec<_>>().await;
814    ///     out.sort();
815    ///     assert_eq!(out, vec![Ok(1), Ok(3), Err("error")]);
816    /// }
817    /// ```
818    #[cfg(feature = "alloc")]
819    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
820    fn try_buffer_unordered(self, n: impl Into<Option<usize>>) -> TryBufferUnordered<Self>
821    where
822        Self::Ok: TryFuture<Error = Self::Error>,
823        Self: Sized,
824    {
825        TryBufferUnordered::new(self, n.into())
826    }
827
828    /// Attempt to execute several futures from a stream concurrently.
829    ///
830    /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
831    /// that matches the stream's `Error` type.
832    ///
833    /// This adaptor will buffer up to `n` futures and then return their
834    /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
835    /// be immediately propagated.
836    ///
837    /// The limit argument is of type `Into<Option<usize>>`, and so can be
838    /// provided as either `None`, `Some(10)`, or just `10`. Note: a limit of zero is
839    /// interpreted as no limit at all, and will have the same result as passing in `None`.
840    ///
841    /// The returned stream will be a stream of results, each containing either
842    /// an error or a future's output. An error can be produced either by the
843    /// underlying stream itself or by one of the futures it yielded.
844    ///
845    /// This method is only available when the `std` or `alloc` feature of this
846    /// library is activated, and it is activated by default.
847    ///
848    /// # Examples
849    ///
850    /// ```
851    /// use tokio_stream::StreamExt;
852    /// use tokio_stream_util::TryStreamExt;
853    ///
854    /// #[tokio::main]
855    /// async fn main() {
856    ///     let buffered = tokio_stream::iter(vec![Ok::<i32, &str>(3), Ok(1), Ok(2)])
857    ///         .map_ok(|i| async move { if i == 2 { Err("error") } else { Ok(i) } })
858    ///         .try_buffered(None);
859    ///
860    ///     let out = buffered.into_stream().collect::<Vec<_>>().await;
861    ///     assert_eq!(out, vec![Ok(3), Ok(1), Err("error")]);
862    /// }
863    /// ```
864    #[cfg(feature = "alloc")]
865    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
866    fn try_buffered(self, n: impl Into<Option<usize>>) -> TryBuffered<Self>
867    where
868        Self::Ok: TryFuture<Error = Self::Error>,
869        Self: Sized,
870    {
871        TryBuffered::new(self, n.into())
872    }
873
874    /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
875    /// stream types.
876    fn try_poll_next_unpin(
877        &mut self,
878        cx: &mut Context<'_>,
879    ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
880    where
881        Self: Unpin,
882    {
883        Pin::new(self).try_poll_next(cx)
884    }
885
886    #[cfg(all(feature = "io", feature = "std"))]
887    /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
888    ///
889    /// This method is only available when the `std` feature of this
890    /// library is activated, and it is activated by default.
891    ///
892    /// # Examples
893    ///
894    /// ```
895    /// use tokio_stream_util::TryStreamExt;
896    ///
897    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
898    /// rt.block_on(async {
899    ///     let reader = tokio_stream::iter([Ok::<_, std::io::Error>(vec![1, 2, 3])]).into_async_read();
900    ///     let mut buf_reader = tokio::io::BufReader::new(reader);
901    ///     let mut buf = Vec::new();
902    ///     use tokio::io::AsyncReadExt;
903    ///     buf_reader.read_to_end(&mut buf).await.unwrap();
904    ///     assert_eq!(buf, vec![1, 2, 3]);
905    /// });
906    /// ```
907    fn into_async_read(self) -> IntoAsyncRead<Self>
908    where
909        Self: Sized + TryStreamExt<Error = std::io::Error>,
910        Self::Ok: AsRef<[u8]>,
911    {
912        IntoAsyncRead::new(self)
913    }
914
915    /// Attempt to execute a predicate over an asynchronous stream and evaluate if all items
916    /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
917    /// that does not satisfy the predicate.
918    ///
919    /// # Examples
920    ///
921    /// ```
922    /// use tokio_stream::StreamExt;
923    /// use tokio_stream_util::TryStreamExt;
924    ///
925    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
926    /// rt.block_on(async {
927    ///     let future_true = tokio_stream::iter(vec![1, 2, 3]).map(|i| Ok::<i32, &str>(i))
928    ///         .try_all(|i| async move { i > 0 });
929    ///     assert!(future_true.await.unwrap());
930    ///
931    ///     let future_err = tokio_stream::iter(vec![Ok::<i32, &str>(1), Err("err"), Ok(3)])
932    ///         .try_all(|i| async move { i > 0 });
933    ///     assert!(future_err.await.is_err());
934    /// });
935    /// ```
936    fn try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F>
937    where
938        Self: Sized,
939        F: FnMut(Self::Ok) -> Fut,
940        Fut: Future<Output = bool>,
941    {
942        TryAll::new(self, f)
943    }
944
945    /// Attempt to execute a predicate over an asynchronous stream and evaluate if any items
946    /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
947    /// that satisfies the predicate.
948    ///
949    /// # Examples
950    ///
951    /// ```
952    /// use tokio_stream::StreamExt;
953    /// use tokio_stream_util::TryStreamExt;
954    ///
955    /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
956    /// rt.block_on(async {
957    ///     let future_true = tokio_stream::iter(0..10).map(|i| Ok::<i32, &str>(i))
958    ///         .try_any(|i| async move { i == 3 });
959    ///     assert!(future_true.await.unwrap());
960    ///
961    ///     let future_err = tokio_stream::iter(vec![Ok::<i32, &str>(1), Err("err"), Ok(3)])
962    ///         .try_any(|i| async move { i == 3 });
963    ///     assert!(future_err.await.is_err());
964    /// });
965    /// ```
966    fn try_any<Fut, F>(self, f: F) -> TryAny<Self, Fut, F>
967    where
968        Self: Sized,
969        F: FnMut(Self::Ok) -> Fut,
970        Fut: Future<Output = bool>,
971    {
972        TryAny::new(self, f)
973    }
974}