tokio_stream_util/try_stream/ext/mod.rs
1//! Streams
2//!
3//! This module contains a number of functions for working with `Streams`s
4//! that return `Result`s, allowing for short-circuiting computations.
5
6use core::future::Future;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9use futures_core::future::TryFuture;
10
11use super::TryStream;
12use crate::FusedStream;
13
14mod and_then;
15pub use and_then::AndThen;
16
17mod err_into;
18pub use err_into::ErrInto;
19
20mod inspect;
21pub use inspect::InspectOk;
22
23pub use inspect::InspectErr;
24
25mod into_stream;
26
27#[cfg(feature = "alloc")]
28#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
29pub(crate) use into_stream::IntoFuseStream;
30
31pub use into_stream::IntoStream;
32
33mod map_ok;
34pub use map_ok::MapOk;
35
36mod map_err;
37pub use map_err::MapErr;
38
39mod or_else;
40pub use or_else::OrElse;
41
42mod try_next;
43pub use try_next::TryNext;
44
45mod try_filter;
46pub use try_filter::TryFilter;
47
48mod try_filter_map;
49pub use try_filter_map::TryFilterMap;
50
51mod try_flatten;
52pub use try_flatten::TryFlatten;
53
54#[cfg(all(feature = "alloc", feature = "std"))]
55#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
56mod try_flatten_unordered;
57#[cfg(all(feature = "alloc", feature = "std"))]
58#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
59pub use try_flatten_unordered::TryFlattenUnordered;
60
61mod try_collect;
62pub use try_collect::TryCollect;
63
64mod try_concat;
65pub use try_concat::TryConcat;
66
67#[cfg(feature = "alloc")]
68mod try_chunks;
69#[cfg(feature = "alloc")]
70pub use try_chunks::{TryChunks, TryChunksError};
71
72#[cfg(feature = "alloc")]
73mod try_ready_chunks;
74#[cfg(feature = "alloc")]
75pub use try_ready_chunks::{TryReadyChunks, TryReadyChunksError};
76
77mod try_unfold;
78pub use try_unfold::{try_unfold, TryUnfold};
79
80mod try_skip_while;
81pub use try_skip_while::TrySkipWhile;
82
83mod try_take_while;
84pub use try_take_while::TryTakeWhile;
85
86#[cfg(feature = "alloc")]
87#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
88mod try_buffer_unordered;
89#[cfg(feature = "alloc")]
90#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
91pub use try_buffer_unordered::TryBufferUnordered;
92
93#[cfg(feature = "alloc")]
94#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
95mod try_buffered;
96#[cfg(feature = "alloc")]
97#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
98pub use try_buffered::TryBuffered;
99
100#[cfg(all(feature = "io", feature = "std"))]
101mod into_async_read;
102#[cfg(all(feature = "io", feature = "std"))]
103#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
104pub use into_async_read::IntoAsyncRead;
105
106mod try_all;
107pub use try_all::TryAll;
108
109mod try_any;
110pub use try_any::TryAny;
111
112impl<S: ?Sized + TryStream> TryStreamExt for S {}
113
114/// Adapters specific to `Result`-returning streams
115pub trait TryStreamExt: TryStream {
116 /// Wraps the current stream in a new stream which converts the error type
117 /// into the one provided.
118 ///
119 /// # Examples
120 ///
121 /// ```
122 /// use tokio_stream::StreamExt;
123 /// use tokio_stream_util::TryStreamExt;
124 ///
125 /// #[tokio::main]
126 /// async fn main() {
127 /// let stream =
128 /// tokio_stream::iter(vec![Ok::<(), i32>(()), Err::<(), i32>(5)])
129 /// .err_into::<i64>();
130 ///
131 /// let collected = stream.into_stream().collect::<Vec<_>>().await;
132 /// assert_eq!(collected, vec![Ok(()), Err(5i64)]);
133 /// }
134 /// ```
135 fn err_into<E>(self) -> ErrInto<Self, E>
136 where
137 Self: Sized,
138 Self::Error: Into<E>,
139 {
140 ErrInto::new(self)
141 }
142
143 /// Wraps the current stream in a new stream which maps the success value
144 /// using the provided closure.
145 ///
146 /// # Examples
147 ///
148 /// ```
149 /// use tokio_stream::StreamExt;
150 /// use tokio_stream_util::TryStreamExt;
151 ///
152 /// #[tokio::main]
153 /// async fn main() {
154 /// let stream =
155 /// tokio_stream::iter(vec![Ok::<i32, i32>(5), Err::<i32, i32>(0)])
156 /// .map_ok(|x| x + 2);
157 ///
158 /// let out = stream.into_stream().collect::<Vec<_>>().await;
159 /// assert_eq!(out, vec![Ok(7), Err(0)]);
160 /// }
161 /// ```
162 fn map_ok<V, F>(self, f: F) -> MapOk<Self, V, F>
163 where
164 Self: TryStream + Unpin + Sized,
165 F: FnMut(Self::Ok) -> V,
166 {
167 MapOk::new(self, f)
168 }
169
170 /// Wraps the current stream in a new stream which maps the error value
171 /// using the provided closure.
172 ///
173 /// # Examples
174 ///
175 /// ```
176 /// use tokio_stream::StreamExt;
177 /// use tokio_stream_util::TryStreamExt;
178 ///
179 /// #[derive(Debug)]
180 /// struct MyErr(String);
181 /// impl core::fmt::Display for MyErr { fn fmt(&self, f:&mut core::fmt::Formatter<'_>)->core::fmt::Result{ self.0.fmt(f)} }
182 /// impl core::error::Error for MyErr {}
183 ///
184 /// let stream =
185 /// tokio_stream::iter(vec![Ok::<i32, i32>(5), Err::<i32, i32>(0)])
186 /// .map_err(|x| MyErr(format!("e{}", x)));
187 ///
188 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
189 /// let out = rt.block_on(async { stream.into_stream().collect::<Vec<_>>().await });
190 /// assert_eq!(format!("{:?}", out), format!("{:?}", vec![Ok(5), Err(MyErr(String::from("e0")))]));
191 /// ```
192 fn map_err<E, F>(self, f: F) -> MapErr<Self, E, F>
193 where
194 Self: TryStream + Unpin + Sized,
195 E: core::error::Error,
196 F: FnMut(Self::Error) -> E,
197 {
198 MapErr::new(self, f)
199 }
200
201 /// Chain on a computation for when a value is ready, passing the successful
202 /// results to the provided closure `f`.
203 ///
204 /// This function can be used to run a unit of work when the next successful
205 /// value on a stream is ready. The closure provided will be yielded a value
206 /// when ready, and the returned future will then be run to completion to
207 /// produce the next value on this stream.
208 ///
209 /// Any errors produced by this stream will not be passed to the closure,
210 /// and will be passed through.
211 ///
212 /// The returned value of the closure must implement the `TryFuture` trait
213 /// and can represent some more work to be done before the composed stream
214 /// is finished.
215 ///
216 /// Note that this function consumes the receiving stream and returns a
217 /// wrapped version of it.
218 ///
219 /// To process the entire stream and return a single future representing
220 /// success or error, use `try_for_each` instead.
221 ///
222 /// # Examples
223 ///
224 /// ```
225 /// use tokio_stream::StreamExt;
226 /// use tokio_stream_util::TryStreamExt;
227 ///
228 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
229 /// rt.block_on(async {
230 /// let stream = tokio_stream::iter(vec![Ok::<i32, ()>(1), Ok(2)])
231 /// .and_then(|result| async move {
232 /// Ok(if result % 2 == 0 { Some(result) } else { None })
233 /// });
234 ///
235 /// let out = stream.into_stream().collect::<Vec<_>>().await;
236 /// assert_eq!(out, vec![Ok(None), Ok(Some(2))]);
237 /// });
238 /// ```
239 fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
240 where
241 F: FnMut(Self::Ok) -> Fut,
242 Fut: TryFuture<Error = Self::Error>,
243 Self: Sized,
244 {
245 AndThen::new(self, f)
246 }
247
248 /// Chain on a computation for when an error happens, passing the
249 /// erroneous result to the provided closure `f`.
250 ///
251 /// This function can be used to run a unit of work and attempt to recover from
252 /// an error if one happens. The closure provided will be yielded an error
253 /// when one appears, and the returned future will then be run to completion
254 /// to produce the next value on this stream.
255 ///
256 /// Any successful values produced by this stream will not be passed to the
257 /// closure, and will be passed through.
258 ///
259 /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait
260 /// and can represent some more work to be done before the composed stream
261 /// is finished.
262 ///
263 /// Note that this function consumes the receiving stream and returns a
264 /// wrapped version of it.
265 fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
266 where
267 F: FnMut(Self::Error) -> Fut,
268 Fut: TryFuture<Ok = Self::Ok>,
269 Self: Sized,
270 {
271 OrElse::new(self, f)
272 }
273
274 /// Do something with the success value of this stream, afterwards passing
275 /// it on.
276 ///
277 /// This is similar to the `StreamExt::inspect` method where it allows
278 /// easily inspecting the success value as it passes through the stream, for
279 /// example to debug what's going on.
280 fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
281 where
282 F: FnMut(&Self::Ok),
283 Self: Sized,
284 {
285 InspectOk::new(self, f)
286 }
287
288 /// Do something with the error value of this stream, afterwards passing it on.
289 ///
290 /// This is similar to the `StreamExt::inspect` method where it allows
291 /// easily inspecting the error value as it passes through the stream, for
292 /// example to debug what's going on.
293 fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
294 where
295 F: FnMut(&Self::Error),
296 Self: Sized,
297 {
298 InspectErr::new(self, f)
299 }
300
301 /// Wraps a [`TryStream`] into a type that implements
302 /// [`Stream`](tokio_stream::Stream)
303 ///
304 /// [`TryStream`]s currently do not implement the
305 /// [`Stream`](tokio_stream::Stream) trait because of limitations
306 /// of the compiler.
307 ///
308 /// # Examples
309 ///
310 /// ```
311 /// use tokio_stream::StreamExt;
312 /// use tokio_stream_util::{TryStream, TryStreamExt};
313 ///
314 /// type T = i32;
315 /// type E = ();
316 ///
317 /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> {
318 /// tokio_stream::iter(vec![Ok::<T, E>(1), Ok(2), Err(())])
319 /// }
320 ///
321 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
322 /// rt.block_on(async {
323 /// let out = make_try_stream().into_stream().collect::<Vec<_>>().await;
324 /// assert_eq!(out, vec![Ok(1), Ok(2), Err(())]);
325 /// });
326 /// ```
327 fn into_stream(self) -> IntoStream<Self>
328 where
329 Self: Sized,
330 {
331 IntoStream::new(self)
332 }
333
334 /// Creates a future that attempts to resolve the next item in the stream.
335 /// If an error is encountered before the next item, the error is returned
336 /// instead.
337 ///
338 /// This is similar to the `Stream::next` combinator, but returns a
339 /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making
340 /// for easy use with the `?` operator.
341 ///
342 /// # Examples
343 ///
344 /// ```
345 /// use tokio_stream_util::TryStreamExt;
346 ///
347 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
348 /// rt.block_on(async {
349 /// let mut stream = tokio_stream::iter(vec![Ok::<(), ()>(()), Err::<(), ()>(())]);
350 /// assert_eq!(stream.try_next().await, Ok(Some(())));
351 /// });
352 /// ```
353 fn try_next(&mut self) -> TryNext<'_, Self>
354 where
355 Self: Unpin,
356 {
357 TryNext::new(self)
358 }
359
360 /// Skip elements on this stream while the provided asynchronous predicate
361 /// resolves to `true`.
362 ///
363 /// This function is similar to
364 /// [`StreamExt::skip_while`](tokio_stream::stream::StreamExt::skip_while) but exits
365 /// early if an error occurs.
366 ///
367 /// # Examples
368 ///
369 /// ```
370 /// use tokio_stream::StreamExt;
371 /// use tokio_stream_util::TryStreamExt;
372 ///
373 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
374 /// rt.block_on(async {
375 /// let stream = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)])
376 /// .try_skip_while(|x| {
377 /// let v = *x;
378 /// async move { Ok(v < 3) }
379 /// });
380 ///
381 /// let out = stream.into_stream().collect::<Vec<_>>().await;
382 /// assert_eq!(out, vec![Ok(3), Ok(2)]);
383 /// });
384 /// ```
385 fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
386 where
387 F: FnMut(&Self::Ok) -> Fut,
388 Fut: TryFuture<Ok = bool, Error = Self::Error>,
389 Self: Sized,
390 {
391 TrySkipWhile::new(self, f)
392 }
393
394 /// Take elements on this stream while the provided asynchronous predicate
395 /// resolves to `true`.
396 ///
397 /// This function is similar to
398 /// [`StreamExt::take_while`](tokio_stream::stream::StreamExt::take_while) but exits
399 /// early if an error occurs.
400 ///
401 /// # Examples
402 ///
403 /// ```
404 /// use tokio_stream::StreamExt;
405 /// use tokio_stream_util::TryStreamExt;
406 ///
407 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
408 /// rt.block_on(async {
409 /// let stream = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)])
410 /// .try_take_while(|x| {
411 /// let v = *x;
412 /// async move { Ok(v < 3) }
413 /// });
414 ///
415 /// let out = stream.into_stream().collect::<Vec<_>>().await;
416 /// assert_eq!(out, vec![Ok(1), Ok(2)]);
417 /// });
418 /// ```
419 fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
420 where
421 F: FnMut(&Self::Ok) -> Fut,
422 Fut: TryFuture<Ok = bool, Error = Self::Error>,
423 Self: Sized,
424 {
425 TryTakeWhile::new(self, f)
426 }
427
428 /// Attempt to transform a stream into a collection,
429 /// returning a future representing the result of that computation.
430 ///
431 /// This combinator will collect all successful results of this stream and
432 /// collect them into the specified collection type. If an error happens then all
433 /// collected elements will be dropped and the error will be returned.
434 ///
435 /// The returned future will be resolved when the stream terminates.
436 ///
437 /// # Examples
438 ///
439 /// ```
440 /// use tokio_stream_util::TryStreamExt;
441 ///
442 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
443 /// rt.block_on(async {
444 /// let future = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Err(3)])
445 /// .try_collect::<Vec<_>>();
446 ///
447 /// assert_eq!(future.await, Err(3));
448 /// });
449 /// ```
450 fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
451 where
452 Self: Sized,
453 {
454 TryCollect::new(self)
455 }
456
457 /// An adaptor for chunking up successful items of the stream inside a vector.
458 ///
459 /// This combinator will attempt to pull successful items from this stream and buffer
460 /// them into a local vector. At most `capacity` items will get buffered
461 /// before they're yielded from the returned stream.
462 ///
463 /// Note that the vectors returned from this iterator may not always have
464 /// `capacity` elements. If the underlying stream ended and only a partial
465 /// vector was created, it'll be returned. Additionally if an error happens
466 /// from the underlying stream then the currently buffered items will be
467 /// yielded.
468 ///
469 /// This method is only available when the `std` or `alloc` feature of this
470 /// library is activated, and it is activated by default.
471 ///
472 /// This function is similar to
473 /// [`StreamExt::chunks`](tokio_stream::stream::StreamExt::chunks) but exits
474 /// early if an error occurs.
475 ///
476 /// # Examples
477 ///
478 /// ```
479 /// use tokio_stream::StreamExt;
480 /// use tokio_stream_util::TryStreamExt;
481 /// use tokio_stream_util::try_stream::TryChunksError;
482 ///
483 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
484 /// rt.block_on(async {
485 /// let stream = tokio_stream::iter(vec![
486 /// Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)
487 /// ]).try_chunks(2);
488 ///
489 /// let out = stream.into_stream().collect::<Vec<_>>().await;
490 /// assert_eq!(out, vec![
491 /// Ok(vec![1, 2]),
492 /// Ok(vec![3]),
493 /// Err(TryChunksError::<i32, i32>(vec![], 4)),
494 /// Ok(vec![5, 6]),
495 /// ]);
496 /// });
497 /// ```
498 ///
499 /// # Panics
500 ///
501 /// This method will panic if `capacity` is zero.
502 #[cfg(feature = "alloc")]
503 fn try_chunks(self, capacity: usize) -> TryChunks<Self>
504 where
505 <IntoFuseStream<Self> as tokio_stream::Stream>::Item: core::fmt::Debug,
506 Self: Sized,
507 {
508 TryChunks::new(self, capacity)
509 }
510
511 /// An adaptor for chunking up successful, ready items of the stream inside a vector.
512 ///
513 /// This combinator will attempt to pull successful items from this stream and buffer
514 /// them into a local vector. At most `capacity` items will get buffered
515 /// before they're yielded from the returned stream. If the underlying stream
516 /// returns `Poll::Pending`, and the collected chunk is not empty, it will
517 /// be immediately returned.
518 ///
519 /// Note that the vectors returned from this iterator may not always have
520 /// `capacity` elements. If the underlying stream ended and only a partial
521 /// vector was created, it'll be returned. Additionally if an error happens
522 /// from the underlying stream then the currently buffered items will be
523 /// yielded.
524 ///
525 /// This method is only available when the `std` or `alloc` feature of this
526 /// library is activated, and it is activated by default.
527 ///
528 /// This function is similar to
529 /// [`StreamExt::ready_chunks`](tokio_stream::stream::StreamExt::ready_chunks) but exits
530 /// early if an error occurs.
531 ///
532 /// # Examples
533 ///
534 /// ```
535 /// use tokio_stream::StreamExt;
536 /// use tokio_stream_util::TryStreamExt;
537 /// use tokio_stream_util::try_stream::TryReadyChunksError;
538 ///
539 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
540 /// rt.block_on(async {
541 /// let stream = tokio_stream::iter(vec![
542 /// Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)
543 /// ]).try_ready_chunks(2);
544 ///
545 /// let out = stream.into_stream().collect::<Vec<_>>().await;
546 /// assert_eq!(out, vec![
547 /// Ok(vec![1, 2]),
548 /// Ok(vec![3]),
549 /// Err(TryReadyChunksError::<i32, i32>(vec![], 4)),
550 /// Ok(vec![5, 6]),
551 /// ]);
552 /// });
553 /// ```
554 ///
555 /// # Panics
556 ///
557 /// This method will panic if `capacity` is zero.
558 #[cfg(feature = "alloc")]
559 fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self>
560 where
561 Self: Sized,
562 {
563 TryReadyChunks::new(self, capacity)
564 }
565
566 /// Attempt to filter the values produced by this stream according to the
567 /// provided asynchronous closure.
568 ///
569 /// As values of this stream are made available, the provided predicate `f`
570 /// will be run on them. If the predicate returns a `Future` which resolves
571 /// to `true`, then the stream will yield the value, but if the predicate
572 /// return a `Future` which resolves to `false`, then the value will be
573 /// discarded and the next value will be produced.
574 ///
575 /// All errors are passed through without filtering in this combinator.
576 ///
577 /// Note that this function consumes the stream passed into it and returns a
578 /// wrapped version of it, similar to the existing `filter` methods in
579 /// the standard library.
580 ///
581 /// # Examples
582 /// ```
583 /// use tokio_stream::StreamExt;
584 /// use tokio_stream_util::TryStreamExt;
585 ///
586 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
587 /// rt.block_on(async {
588 /// let events = tokio_stream::iter(vec![Ok::<i32, &str>(1), Ok(2), Ok(3), Err("error")])
589 /// .try_filter(|x| {
590 /// let v = *x;
591 /// async move { v >= 2 }
592 /// });
593 ///
594 /// let out = events.into_stream().collect::<Vec<_>>().await;
595 /// assert_eq!(out, vec![Ok(2), Ok(3), Err("error")]);
596 /// });
597 /// ```
598 fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
599 where
600 Fut: Future<Output = bool>,
601 F: FnMut(&Self::Ok) -> Fut,
602 Self: Sized,
603 {
604 TryFilter::new(self, f)
605 }
606
607 /// Attempt to filter the values produced by this stream while
608 /// simultaneously mapping them to a different type according to the
609 /// provided asynchronous closure.
610 ///
611 /// As values of this stream are made available, the provided function will
612 /// be run on them. If the future returned by the predicate `f` resolves to
613 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
614 /// it resolves to [`None`] then the next value will be produced.
615 ///
616 /// All errors are passed through without filtering in this combinator.
617 ///
618 /// Note that this function consumes the stream passed into it and returns a
619 /// wrapped version of it, similar to the existing `filter_map` methods in
620 /// the standard library.
621 ///
622 /// # Examples
623 /// ```
624 /// use tokio_stream::StreamExt;
625 /// use tokio_stream_util::TryStreamExt;
626 ///
627 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
628 /// rt.block_on(async {
629 /// let halves = tokio_stream::iter(vec![Ok::<i32, &str>(1), Ok(6), Err("error")])
630 /// .try_filter_map(|x| async move {
631 /// let ret = if x % 2 == 0 { Some(x / 2) } else { None };
632 /// Ok(ret)
633 /// });
634 ///
635 /// let out = halves.into_stream().collect::<Vec<_>>().await;
636 /// assert_eq!(out, vec![Ok(3), Err("error")]);
637 /// });
638 /// ```
639 fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
640 where
641 Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
642 F: FnMut(Self::Ok) -> Fut,
643 Self: Sized,
644 {
645 TryFilterMap::new(self, f)
646 }
647
648 /// Flattens a stream of streams into just one continuous stream. Produced streams
649 /// will be polled concurrently and any errors will be passed through without looking at them.
650 /// If the underlying base stream returns an error, it will be **immediately** propagated.
651 ///
652 /// The only argument is an optional limit on the number of concurrently
653 /// polled streams. If this limit is not `None`, no more than `limit` streams
654 /// will be polled at the same time. The `limit` argument is of type
655 /// `Into<Option<usize>>`, and so can be provided as either `None`,
656 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
657 /// no limit at all, and will have the same result as passing in `None`.
658 ///
659 /// # Examples
660 ///
661 /// ```
662 /// use tokio_stream::StreamExt;
663 /// use tokio_stream_util::TryStreamExt;
664 ///
665 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
666 /// rt.block_on(async {
667 /// let inner1 = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
668 /// let inner2 = tokio_stream::iter(vec![Ok::<i32, i32>(3), Ok(4)]);
669 /// let base = tokio_stream::iter(vec![Ok::<_, i32>(inner1), Ok::<_, i32>(inner2)]);
670 /// let stream = base.try_flatten_unordered(None);
671 ///
672 /// let mut out = stream.into_stream().collect::<Vec<_>>().await;
673 /// out.sort_by_key(|r| r.clone().unwrap_or_default());
674 /// assert_eq!(out, vec![Ok(1), Ok(2), Ok(3), Ok(4)]);
675 /// });
676 /// ```
677 #[cfg(all(feature = "alloc", feature = "std"))]
678 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
679 fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self>
680 where
681 Self: Sized,
682 Self::Ok: TryStream + Unpin,
683 <Self::Ok as TryStream>::Error: From<Self::Error>,
684 {
685 TryFlattenUnordered::new(self, limit.into())
686 }
687
688 /// Flattens a stream of streams into just one continuous stream.
689 ///
690 /// If this stream's elements are themselves streams then this combinator
691 /// will flatten out the entire stream to one long chain of elements. Any
692 /// errors are passed through without looking at them, but otherwise each
693 /// individual stream will get exhausted before moving on to the next.
694 ///
695 /// # Examples
696 ///
697 /// ```
698 /// use tokio_stream::StreamExt;
699 /// use tokio_stream_util::TryStreamExt;
700 ///
701 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
702 /// rt.block_on(async {
703 /// let inner1 = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
704 /// let inner2 = tokio_stream::iter(vec![Ok::<i32, i32>(3), Ok(4)]);
705 /// let base = tokio_stream::iter(vec![Ok::<_, i32>(inner1), Ok::<_, i32>(inner2)]);
706 /// let stream = base.try_flatten();
707 ///
708 /// let out = stream.into_stream().collect::<Vec<_>>().await;
709 /// assert_eq!(out, vec![Ok(1), Ok(2), Ok(3), Ok(4)]);
710 /// });
711 /// ```
712 fn try_flatten(self) -> TryFlatten<Self>
713 where
714 Self::Ok: TryStream,
715 <Self::Ok as TryStream>::Error: From<Self::Error>,
716 Self: Sized,
717 {
718 TryFlatten::new(self)
719 }
720
721 /// Attempt to concatenate all items of a stream into a single
722 /// extendable destination, returning a future representing the end result.
723 ///
724 /// This combinator will extend the first item with the contents of all
725 /// the subsequent successful results of the stream. If the stream is empty,
726 /// the default value will be returned.
727 ///
728 /// Works with all collections that implement the [`Extend`](core::iter::Extend) trait.
729 ///
730 /// This method is similar to [`concat`](tokio_stream::stream::StreamExt::concat), but will
731 /// exit early if an error is encountered in the stream.
732 ///
733 /// # Examples
734 ///
735 /// ```
736 /// use tokio_stream_util::TryStreamExt;
737 ///
738 /// #[tokio::main]
739 /// async fn main() {
740 /// let fut = tokio_stream::iter(vec![
741 /// Ok::<Vec<i32>, ()>(vec![1, 2, 3]),
742 /// Ok(vec![4, 5, 6]),
743 /// Ok(vec![7, 8, 9]),
744 /// ]).try_concat();
745 ///
746 /// assert_eq!(fut.await, Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
747 /// }
748 /// ```
749 fn try_concat(self) -> TryConcat<Self>
750 where
751 Self: Sized,
752 Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
753 {
754 TryConcat::new(self)
755 }
756
757 /// Attempt to execute several futures from a stream concurrently (unordered).
758 ///
759 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
760 /// that matches the stream's `Error` type.
761 ///
762 /// This adaptor will buffer up to `n` futures and then return their
763 /// outputs in the order in which they complete. If the underlying stream
764 /// returns an error, it will be immediately propagated.
765 ///
766 /// The limit argument is of type `Into<Option<usize>>`, and so can be
767 /// provided as either `None`, `Some(10)`, or just `10`. Note: a limit of zero is
768 /// interpreted as no limit at all, and will have the same result as passing in `None`.
769 ///
770 /// The returned stream will be a stream of results, each containing either
771 /// an error or a future's output. An error can be produced either by the
772 /// underlying stream itself or by one of the futures it yielded.
773 ///
774 /// This method is only available when the `std` or `alloc` feature of this
775 /// library is activated, and it is activated by default.
776 ///
777 /// # Examples
778 ///
779 /// ```
780 /// use tokio_stream::StreamExt;
781 /// use tokio_stream_util::TryStreamExt;
782 ///
783 /// #[tokio::main]
784 /// async fn main() {
785 /// let unordered = tokio_stream::iter(vec![Ok::<i32, &str>(3), Ok(1), Ok(2)])
786 /// .map_ok(async |i| if i == 2 { Err("error") } else { Ok(i) })
787 /// .try_buffer_unordered(None);
788 ///
789 /// let mut out = unordered.into_stream().collect::<Vec<_>>().await;
790 /// out.sort();
791 /// assert_eq!(out, vec![Ok(1), Ok(3), Err("error")]);
792 /// }
793 /// ```
794 #[cfg(feature = "alloc")]
795 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
796 fn try_buffer_unordered(self, n: impl Into<Option<usize>>) -> TryBufferUnordered<Self>
797 where
798 Self::Ok: TryFuture<Error = Self::Error>,
799 Self: Sized,
800 {
801 TryBufferUnordered::new(self, n.into())
802 }
803
804 /// Attempt to execute several futures from a stream concurrently.
805 ///
806 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
807 /// that matches the stream's `Error` type.
808 ///
809 /// This adaptor will buffer up to `n` futures and then return their
810 /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
811 /// be immediately propagated.
812 ///
813 /// The limit argument is of type `Into<Option<usize>>`, and so can be
814 /// provided as either `None`, `Some(10)`, or just `10`. Note: a limit of zero is
815 /// interpreted as no limit at all, and will have the same result as passing in `None`.
816 ///
817 /// The returned stream will be a stream of results, each containing either
818 /// an error or a future's output. An error can be produced either by the
819 /// underlying stream itself or by one of the futures it yielded.
820 ///
821 /// This method is only available when the `std` or `alloc` feature of this
822 /// library is activated, and it is activated by default.
823 ///
824 /// # Examples
825 ///
826 /// ```
827 /// use tokio_stream::StreamExt;
828 /// use tokio_stream_util::TryStreamExt;
829 ///
830 /// #[tokio::main]
831 /// async fn main() {
832 /// let buffered = tokio_stream::iter(vec![Ok::<i32, &str>(3), Ok(1), Ok(2)])
833 /// .map_ok(|i| async move { if i == 2 { Err("error") } else { Ok(i) } })
834 /// .try_buffered(None);
835 ///
836 /// let out = buffered.into_stream().collect::<Vec<_>>().await;
837 /// assert_eq!(out, vec![Ok(3), Ok(1), Err("error")]);
838 /// }
839 /// ```
840 #[cfg(feature = "alloc")]
841 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
842 fn try_buffered(self, n: impl Into<Option<usize>>) -> TryBuffered<Self>
843 where
844 Self::Ok: TryFuture<Error = Self::Error>,
845 Self: Sized,
846 {
847 TryBuffered::new(self, n.into())
848 }
849
850 /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
851 /// stream types.
852 fn try_poll_next_unpin(
853 &mut self,
854 cx: &mut Context<'_>,
855 ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
856 where
857 Self: Unpin,
858 {
859 Pin::new(self).try_poll_next(cx)
860 }
861
862 /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
863 ///
864 /// This method is only available when the `std` feature of this
865 /// library is activated, and it is activated by default.
866 ///
867 /// # Examples
868 ///
869 /// ```
870 /// use tokio_stream_util::TryStreamExt;
871 ///
872 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
873 /// rt.block_on(async {
874 /// let reader = tokio_stream::iter([Ok::<_, std::io::Error>(vec![1, 2, 3])]).into_async_read();
875 /// let mut buf_reader = tokio::io::BufReader::new(reader);
876 /// let mut buf = Vec::new();
877 /// use tokio::io::AsyncReadExt;
878 /// buf_reader.read_to_end(&mut buf).await.unwrap();
879 /// assert_eq!(buf, vec![1, 2, 3]);
880 /// });
881 /// ```
882 #[cfg(feature = "io")]
883 #[cfg(feature = "std")]
884 #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
885 fn into_async_read(self) -> IntoAsyncRead<Self>
886 where
887 Self: Sized + TryStreamExt<Error = std::io::Error>,
888 Self::Ok: AsRef<[u8]>,
889 {
890 IntoAsyncRead::new(self)
891 }
892
893 /// Attempt to execute a predicate over an asynchronous stream and evaluate if all items
894 /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
895 /// that does not satisfy the predicate.
896 ///
897 /// # Examples
898 ///
899 /// ```
900 /// use tokio_stream::StreamExt;
901 /// use tokio_stream_util::TryStreamExt;
902 ///
903 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
904 /// rt.block_on(async {
905 /// let future_true = tokio_stream::iter(vec![1, 2, 3]).map(|i| Ok::<i32, &str>(i))
906 /// .try_all(|i| async move { i > 0 });
907 /// assert!(future_true.await.unwrap());
908 ///
909 /// let future_err = tokio_stream::iter(vec![Ok::<i32, &str>(1), Err("err"), Ok(3)])
910 /// .try_all(|i| async move { i > 0 });
911 /// assert!(future_err.await.is_err());
912 /// });
913 /// ```
914 fn try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F>
915 where
916 Self: Sized,
917 F: FnMut(Self::Ok) -> Fut,
918 Fut: Future<Output = bool>,
919 {
920 TryAll::new(self, f)
921 }
922
923 /// Attempt to execute a predicate over an asynchronous stream and evaluate if any items
924 /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
925 /// that satisfies the predicate.
926 ///
927 /// # Examples
928 ///
929 /// ```
930 /// use tokio_stream::StreamExt;
931 /// use tokio_stream_util::TryStreamExt;
932 ///
933 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
934 /// rt.block_on(async {
935 /// let future_true = tokio_stream::iter(0..10).map(|i| Ok::<i32, &str>(i))
936 /// .try_any(|i| async move { i == 3 });
937 /// assert!(future_true.await.unwrap());
938 ///
939 /// let future_err = tokio_stream::iter(vec![Ok::<i32, &str>(1), Err("err"), Ok(3)])
940 /// .try_any(|i| async move { i == 3 });
941 /// assert!(future_err.await.is_err());
942 /// });
943 /// ```
944 fn try_any<Fut, F>(self, f: F) -> TryAny<Self, Fut, F>
945 where
946 Self: Sized,
947 F: FnMut(Self::Ok) -> Fut,
948 Fut: Future<Output = bool>,
949 {
950 TryAny::new(self, f)
951 }
952}