tokio_stream_util/try_stream/ext/mod.rs
1//! Streams
2//!
3//! This module contains a number of functions for working with `Streams`s
4//! that return `Result`s, allowing for short-circuiting computations.
5
6use core::future::Future;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9use futures_core::future::TryFuture;
10
11use super::TryStream;
12use crate::FusedStream;
13
14mod and_then;
15pub use and_then::AndThen;
16
17mod err_into;
18pub use err_into::ErrInto;
19
20mod inspect;
21pub use inspect::InspectOk;
22
23pub use inspect::InspectErr;
24
25mod into_stream;
26
27#[cfg(feature = "alloc")]
28#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
29pub(crate) use into_stream::IntoFuseStream;
30
31pub use into_stream::IntoStream;
32
33mod map_ok;
34pub use map_ok::MapOk;
35
36mod map_err;
37pub use map_err::MapErr;
38
39mod or_else;
40pub use or_else::OrElse;
41
42mod try_next;
43pub use try_next::TryNext;
44
45mod try_filter;
46pub use try_filter::TryFilter;
47
48#[cfg(all(feature = "sink", feature = "alloc"))]
49mod try_forward;
50#[cfg(all(feature = "sink", feature = "alloc"))]
51pub use try_forward::TryForward;
52
53mod try_filter_map;
54pub use try_filter_map::TryFilterMap;
55
56mod try_flatten;
57pub use try_flatten::TryFlatten;
58
59#[cfg(all(feature = "alloc", feature = "std"))]
60#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
61mod try_flatten_unordered;
62#[cfg(all(feature = "alloc", feature = "std"))]
63#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
64pub use try_flatten_unordered::TryFlattenUnordered;
65
66mod try_collect;
67pub use try_collect::TryCollect;
68
69mod try_concat;
70pub use try_concat::TryConcat;
71
72#[cfg(feature = "alloc")]
73mod try_chunks;
74#[cfg(feature = "alloc")]
75pub use try_chunks::{TryChunks, TryChunksError};
76
77#[cfg(feature = "alloc")]
78mod try_ready_chunks;
79#[cfg(feature = "alloc")]
80pub use try_ready_chunks::{TryReadyChunks, TryReadyChunksError};
81
82mod try_unfold;
83pub use try_unfold::{try_unfold, TryUnfold};
84
85mod try_skip_while;
86pub use try_skip_while::TrySkipWhile;
87
88mod try_take_while;
89pub use try_take_while::TryTakeWhile;
90
91#[cfg(feature = "alloc")]
92#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
93mod try_buffer_unordered;
94#[cfg(feature = "alloc")]
95#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
96pub use try_buffer_unordered::TryBufferUnordered;
97
98#[cfg(feature = "alloc")]
99#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
100mod try_buffered;
101#[cfg(feature = "alloc")]
102#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
103pub use try_buffered::TryBuffered;
104
105#[cfg(all(feature = "io", feature = "std"))]
106mod into_async_read;
107#[cfg(all(feature = "io", feature = "std"))]
108pub use into_async_read::IntoAsyncRead;
109
110mod try_all;
111pub use try_all::TryAll;
112
113mod try_any;
114pub use try_any::TryAny;
115
116impl<S: ?Sized + TryStream> TryStreamExt for S {}
117
118/// Adapters specific to `Result`-returning streams
119pub trait TryStreamExt: TryStream {
120 /// Wraps the current stream in a new stream which converts the error type
121 /// into the one provided.
122 ///
123 /// # Examples
124 ///
125 /// ```
126 /// use tokio_stream::StreamExt;
127 /// use tokio_stream_util::TryStreamExt;
128 ///
129 /// #[tokio::main]
130 /// async fn main() {
131 /// let stream =
132 /// tokio_stream::iter(vec![Ok::<(), i32>(()), Err::<(), i32>(5)])
133 /// .err_into::<i64>();
134 ///
135 /// let collected = stream.into_stream().collect::<Vec<_>>().await;
136 /// assert_eq!(collected, vec![Ok(()), Err(5i64)]);
137 /// }
138 /// ```
139 fn err_into<E>(self) -> ErrInto<Self, E>
140 where
141 Self: Sized,
142 Self::Error: Into<E>,
143 {
144 ErrInto::new(self)
145 }
146
147 /// Wraps the current stream in a new stream which maps the success value
148 /// using the provided closure.
149 ///
150 /// # Examples
151 ///
152 /// ```
153 /// use tokio_stream::StreamExt;
154 /// use tokio_stream_util::TryStreamExt;
155 ///
156 /// #[tokio::main]
157 /// async fn main() {
158 /// let stream =
159 /// tokio_stream::iter(vec![Ok::<i32, i32>(5), Err::<i32, i32>(0)])
160 /// .map_ok(|x| x + 2);
161 ///
162 /// let out = stream.into_stream().collect::<Vec<_>>().await;
163 /// assert_eq!(out, vec![Ok(7), Err(0)]);
164 /// }
165 /// ```
166 fn map_ok<V, F>(self, f: F) -> MapOk<Self, V, F>
167 where
168 Self: TryStream + Unpin + Sized,
169 F: FnMut(Self::Ok) -> V,
170 {
171 MapOk::new(self, f)
172 }
173
174 /// Wraps the current stream in a new stream which maps the error value
175 /// using the provided closure.
176 ///
177 /// # Examples
178 ///
179 /// ```
180 /// use tokio_stream::StreamExt;
181 /// use tokio_stream_util::TryStreamExt;
182 ///
183 /// #[derive(Debug)]
184 /// struct MyErr(String);
185 /// impl core::fmt::Display for MyErr { fn fmt(&self, f:&mut core::fmt::Formatter<'_>)->core::fmt::Result{ self.0.fmt(f)} }
186 /// impl core::error::Error for MyErr {}
187 ///
188 /// let stream =
189 /// tokio_stream::iter(vec![Ok::<i32, i32>(5), Err::<i32, i32>(0)])
190 /// .map_err(|x| MyErr(format!("e{}", x)));
191 ///
192 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
193 /// let out = rt.block_on(async { stream.into_stream().collect::<Vec<_>>().await });
194 /// assert_eq!(format!("{:?}", out), format!("{:?}", vec![Ok(5), Err(MyErr(String::from("e0")))]));
195 /// ```
196 fn map_err<E, F>(self, f: F) -> MapErr<Self, E, F>
197 where
198 Self: TryStream + Unpin + Sized,
199 E: core::error::Error,
200 F: FnMut(Self::Error) -> E,
201 {
202 MapErr::new(self, f)
203 }
204
205 /// Chain on a computation for when a value is ready, passing the successful
206 /// results to the provided closure `f`.
207 ///
208 /// This function can be used to run a unit of work when the next successful
209 /// value on a stream is ready. The closure provided will be yielded a value
210 /// when ready, and the returned future will then be run to completion to
211 /// produce the next value on this stream.
212 ///
213 /// Any errors produced by this stream will not be passed to the closure,
214 /// and will be passed through.
215 ///
216 /// The returned value of the closure must implement the `TryFuture` trait
217 /// and can represent some more work to be done before the composed stream
218 /// is finished.
219 ///
220 /// Note that this function consumes the receiving stream and returns a
221 /// wrapped version of it.
222 ///
223 /// To process the entire stream and return a single future representing
224 /// success or error, use `try_for_each` instead.
225 ///
226 /// # Examples
227 ///
228 /// ```
229 /// use tokio_stream::StreamExt;
230 /// use tokio_stream_util::TryStreamExt;
231 ///
232 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
233 /// rt.block_on(async {
234 /// let stream = tokio_stream::iter(vec![Ok::<i32, ()>(1), Ok(2)])
235 /// .and_then(|result| async move {
236 /// Ok(if result % 2 == 0 { Some(result) } else { None })
237 /// });
238 ///
239 /// let out = stream.into_stream().collect::<Vec<_>>().await;
240 /// assert_eq!(out, vec![Ok(None), Ok(Some(2))]);
241 /// });
242 /// ```
243 fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
244 where
245 F: FnMut(Self::Ok) -> Fut,
246 Fut: TryFuture<Error = Self::Error>,
247 Self: Sized,
248 {
249 AndThen::new(self, f)
250 }
251
252 /// Chain on a computation for when an error happens, passing the
253 /// erroneous result to the provided closure `f`.
254 ///
255 /// This function can be used to run a unit of work and attempt to recover from
256 /// an error if one happens. The closure provided will be yielded an error
257 /// when one appears, and the returned future will then be run to completion
258 /// to produce the next value on this stream.
259 ///
260 /// Any successful values produced by this stream will not be passed to the
261 /// closure, and will be passed through.
262 ///
263 /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait
264 /// and can represent some more work to be done before the composed stream
265 /// is finished.
266 ///
267 /// Note that this function consumes the receiving stream and returns a
268 /// wrapped version of it.
269 fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
270 where
271 F: FnMut(Self::Error) -> Fut,
272 Fut: TryFuture<Ok = Self::Ok>,
273 Self: Sized,
274 {
275 OrElse::new(self, f)
276 }
277
278 #[cfg(all(feature = "sink", feature = "alloc"))]
279 /// A future that completes after the given stream has been fully processed
280 /// into the sink and the sink has been flushed and closed.
281 ///
282 /// This future will drive the stream to keep producing items until it is
283 /// exhausted, sending each item to the sink. It will complete once the
284 /// stream is exhausted, the sink has received and flushed all items, and
285 /// the sink is closed. Note that neither the original stream nor provided
286 /// sink will be output by this future. Pass the sink by `Pin<&mut S>`
287 /// (for example, via `try_forward(&mut sink)` inside an `async` fn/block) in
288 /// order to preserve access to the `Sink`. If the stream produces an error,
289 /// that error will be returned by this future without flushing/closing the sink.
290 fn try_forward<S>(self, sink: S) -> TryForward<Self, S>
291 where
292 S: async_sink::Sink<Self::Ok, Error = Self::Error>,
293 Self: Sized,
294 {
295 TryForward::new(self, sink)
296 }
297
298 /// Do something with the success value of this stream, afterwards passing
299 /// it on.
300 ///
301 /// This is similar to the `StreamExt::inspect` method where it allows
302 /// easily inspecting the success value as it passes through the stream, for
303 /// example to debug what's going on.
304 fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
305 where
306 F: FnMut(&Self::Ok),
307 Self: Sized,
308 {
309 InspectOk::new(self, f)
310 }
311
312 /// Do something with the error value of this stream, afterwards passing it on.
313 ///
314 /// This is similar to the `StreamExt::inspect` method where it allows
315 /// easily inspecting the error value as it passes through the stream, for
316 /// example to debug what's going on.
317 fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
318 where
319 F: FnMut(&Self::Error),
320 Self: Sized,
321 {
322 InspectErr::new(self, f)
323 }
324
325 /// Wraps a [`TryStream`] into a type that implements
326 /// [`Stream`](tokio_stream::Stream)
327 ///
328 /// [`TryStream`]s currently do not implement the
329 /// [`Stream`](tokio_stream::Stream) trait because of limitations
330 /// of the compiler.
331 ///
332 /// # Examples
333 ///
334 /// ```
335 /// use tokio_stream::StreamExt;
336 /// use tokio_stream_util::{TryStream, TryStreamExt};
337 ///
338 /// type T = i32;
339 /// type E = ();
340 ///
341 /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> {
342 /// tokio_stream::iter(vec![Ok::<T, E>(1), Ok(2), Err(())])
343 /// }
344 ///
345 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
346 /// rt.block_on(async {
347 /// let out = make_try_stream().into_stream().collect::<Vec<_>>().await;
348 /// assert_eq!(out, vec![Ok(1), Ok(2), Err(())]);
349 /// });
350 /// ```
351 fn into_stream(self) -> IntoStream<Self>
352 where
353 Self: Sized,
354 {
355 IntoStream::new(self)
356 }
357
358 /// Creates a future that attempts to resolve the next item in the stream.
359 /// If an error is encountered before the next item, the error is returned
360 /// instead.
361 ///
362 /// This is similar to the `Stream::next` combinator, but returns a
363 /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making
364 /// for easy use with the `?` operator.
365 ///
366 /// # Examples
367 ///
368 /// ```
369 /// use tokio_stream_util::TryStreamExt;
370 ///
371 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
372 /// rt.block_on(async {
373 /// let mut stream = tokio_stream::iter(vec![Ok::<(), ()>(()), Err::<(), ()>(())]);
374 /// assert_eq!(stream.try_next().await, Ok(Some(())));
375 /// });
376 /// ```
377 fn try_next(&mut self) -> TryNext<'_, Self>
378 where
379 Self: Unpin,
380 {
381 TryNext::new(self)
382 }
383
384 /// Skip elements on this stream while the provided asynchronous predicate
385 /// resolves to `true`.
386 ///
387 /// This function is similar to
388 /// [`StreamExt::skip_while`](futures_util::stream::StreamExt::skip_while) but exits
389 /// early if an error occurs.
390 ///
391 /// # Examples
392 ///
393 /// ```
394 /// use tokio_stream::StreamExt;
395 /// use tokio_stream_util::TryStreamExt;
396 ///
397 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
398 /// rt.block_on(async {
399 /// let stream = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)])
400 /// .try_skip_while(|x| {
401 /// let v = *x;
402 /// async move { Ok(v < 3) }
403 /// });
404 ///
405 /// let out = stream.into_stream().collect::<Vec<_>>().await;
406 /// assert_eq!(out, vec![Ok(3), Ok(2)]);
407 /// });
408 /// ```
409 fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
410 where
411 F: FnMut(&Self::Ok) -> Fut,
412 Fut: TryFuture<Ok = bool, Error = Self::Error>,
413 Self: Sized,
414 {
415 TrySkipWhile::new(self, f)
416 }
417
418 /// Take elements on this stream while the provided asynchronous predicate
419 /// resolves to `true`.
420 ///
421 /// This function is similar to
422 /// [`StreamExt::take_while`](futures_util::stream::StreamExt::take_while) but exits
423 /// early if an error occurs.
424 ///
425 /// # Examples
426 ///
427 /// ```
428 /// use tokio_stream::StreamExt;
429 /// use tokio_stream_util::TryStreamExt;
430 ///
431 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
432 /// rt.block_on(async {
433 /// let stream = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)])
434 /// .try_take_while(|x| {
435 /// let v = *x;
436 /// async move { Ok(v < 3) }
437 /// });
438 ///
439 /// let out = stream.into_stream().collect::<Vec<_>>().await;
440 /// assert_eq!(out, vec![Ok(1), Ok(2)]);
441 /// });
442 /// ```
443 fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
444 where
445 F: FnMut(&Self::Ok) -> Fut,
446 Fut: TryFuture<Ok = bool, Error = Self::Error>,
447 Self: Sized,
448 {
449 TryTakeWhile::new(self, f)
450 }
451
452 /// Attempt to transform a stream into a collection,
453 /// returning a future representing the result of that computation.
454 ///
455 /// This combinator will collect all successful results of this stream and
456 /// collect them into the specified collection type. If an error happens then all
457 /// collected elements will be dropped and the error will be returned.
458 ///
459 /// The returned future will be resolved when the stream terminates.
460 ///
461 /// # Examples
462 ///
463 /// ```
464 /// use tokio_stream_util::TryStreamExt;
465 ///
466 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
467 /// rt.block_on(async {
468 /// let future = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Err(3)])
469 /// .try_collect::<Vec<_>>();
470 ///
471 /// assert_eq!(future.await, Err(3));
472 /// });
473 /// ```
474 fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
475 where
476 Self: Sized,
477 {
478 TryCollect::new(self)
479 }
480
481 /// An adaptor for chunking up successful items of the stream inside a vector.
482 ///
483 /// This combinator will attempt to pull successful items from this stream and buffer
484 /// them into a local vector. At most `capacity` items will get buffered
485 /// before they're yielded from the returned stream.
486 ///
487 /// Note that the vectors returned from this iterator may not always have
488 /// `capacity` elements. If the underlying stream ended and only a partial
489 /// vector was created, it'll be returned. Additionally if an error happens
490 /// from the underlying stream then the currently buffered items will be
491 /// yielded.
492 ///
493 /// This method is only available when the `std` or `alloc` feature of this
494 /// library is activated, and it is activated by default.
495 ///
496 /// This function is similar to
497 /// [`StreamExt::chunks`](futures_util::stream::StreamExt::chunks) but exits
498 /// early if an error occurs.
499 ///
500 /// # Examples
501 ///
502 /// ```
503 /// use tokio_stream::StreamExt;
504 /// use tokio_stream_util::TryStreamExt;
505 /// use tokio_stream_util::try_stream::TryChunksError;
506 ///
507 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
508 /// rt.block_on(async {
509 /// let stream = tokio_stream::iter(vec![
510 /// Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)
511 /// ]).try_chunks(2);
512 ///
513 /// let out = stream.into_stream().collect::<Vec<_>>().await;
514 /// assert_eq!(out, vec![
515 /// Ok(vec![1, 2]),
516 /// Ok(vec![3]),
517 /// Err(TryChunksError::<i32, i32>(vec![], 4)),
518 /// Ok(vec![5, 6]),
519 /// ]);
520 /// });
521 /// ```
522 ///
523 /// # Panics
524 ///
525 /// This method will panic if `capacity` is zero.
526 #[cfg(feature = "alloc")]
527 fn try_chunks(self, capacity: usize) -> TryChunks<Self>
528 where
529 <IntoFuseStream<Self> as tokio_stream::Stream>::Item: core::fmt::Debug,
530 Self: Sized,
531 {
532 TryChunks::new(self, capacity)
533 }
534
535 /// An adaptor for chunking up successful, ready items of the stream inside a vector.
536 ///
537 /// This combinator will attempt to pull successful items from this stream and buffer
538 /// them into a local vector. At most `capacity` items will get buffered
539 /// before they're yielded from the returned stream. If the underlying stream
540 /// returns `Poll::Pending`, and the collected chunk is not empty, it will
541 /// be immediately returned.
542 ///
543 /// Note that the vectors returned from this iterator may not always have
544 /// `capacity` elements. If the underlying stream ended and only a partial
545 /// vector was created, it'll be returned. Additionally if an error happens
546 /// from the underlying stream then the currently buffered items will be
547 /// yielded.
548 ///
549 /// This method is only available when the `std` or `alloc` feature of this
550 /// library is activated, and it is activated by default.
551 ///
552 /// This function is similar to
553 /// [`StreamExt::ready_chunks`](futures_util::stream::StreamExt::ready_chunks) but exits
554 /// early if an error occurs.
555 ///
556 /// # Examples
557 ///
558 /// ```
559 /// use tokio_stream::StreamExt;
560 /// use tokio_stream_util::TryStreamExt;
561 /// use tokio_stream_util::try_stream::TryReadyChunksError;
562 ///
563 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
564 /// rt.block_on(async {
565 /// let stream = tokio_stream::iter(vec![
566 /// Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)
567 /// ]).try_ready_chunks(2);
568 ///
569 /// let out = stream.into_stream().collect::<Vec<_>>().await;
570 /// assert_eq!(out, vec![
571 /// Ok(vec![1, 2]),
572 /// Ok(vec![3]),
573 /// Err(TryReadyChunksError::<i32, i32>(vec![], 4)),
574 /// Ok(vec![5, 6]),
575 /// ]);
576 /// });
577 /// ```
578 ///
579 /// # Panics
580 ///
581 /// This method will panic if `capacity` is zero.
582 #[cfg(feature = "alloc")]
583 fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self>
584 where
585 Self: Sized,
586 {
587 TryReadyChunks::new(self, capacity)
588 }
589
590 /// Attempt to filter the values produced by this stream according to the
591 /// provided asynchronous closure.
592 ///
593 /// As values of this stream are made available, the provided predicate `f`
594 /// will be run on them. If the predicate returns a `Future` which resolves
595 /// to `true`, then the stream will yield the value, but if the predicate
596 /// return a `Future` which resolves to `false`, then the value will be
597 /// discarded and the next value will be produced.
598 ///
599 /// All errors are passed through without filtering in this combinator.
600 ///
601 /// Note that this function consumes the stream passed into it and returns a
602 /// wrapped version of it, similar to the existing `filter` methods in
603 /// the standard library.
604 ///
605 /// # Examples
606 /// ```
607 /// use tokio_stream::StreamExt;
608 /// use tokio_stream_util::TryStreamExt;
609 ///
610 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
611 /// rt.block_on(async {
612 /// let events = tokio_stream::iter(vec![Ok::<i32, &str>(1), Ok(2), Ok(3), Err("error")])
613 /// .try_filter(|x| {
614 /// let v = *x;
615 /// async move { v >= 2 }
616 /// });
617 ///
618 /// let out = events.into_stream().collect::<Vec<_>>().await;
619 /// assert_eq!(out, vec![Ok(2), Ok(3), Err("error")]);
620 /// });
621 /// ```
622 fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
623 where
624 Fut: Future<Output = bool>,
625 F: FnMut(&Self::Ok) -> Fut,
626 Self: Sized,
627 {
628 TryFilter::new(self, f)
629 }
630
631 /// Attempt to filter the values produced by this stream while
632 /// simultaneously mapping them to a different type according to the
633 /// provided asynchronous closure.
634 ///
635 /// As values of this stream are made available, the provided function will
636 /// be run on them. If the future returned by the predicate `f` resolves to
637 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
638 /// it resolves to [`None`] then the next value will be produced.
639 ///
640 /// All errors are passed through without filtering in this combinator.
641 ///
642 /// Note that this function consumes the stream passed into it and returns a
643 /// wrapped version of it, similar to the existing `filter_map` methods in
644 /// the standard library.
645 ///
646 /// # Examples
647 /// ```
648 /// use tokio_stream::StreamExt;
649 /// use tokio_stream_util::TryStreamExt;
650 ///
651 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
652 /// rt.block_on(async {
653 /// let halves = tokio_stream::iter(vec![Ok::<i32, &str>(1), Ok(6), Err("error")])
654 /// .try_filter_map(|x| async move {
655 /// let ret = if x % 2 == 0 { Some(x / 2) } else { None };
656 /// Ok(ret)
657 /// });
658 ///
659 /// let out = halves.into_stream().collect::<Vec<_>>().await;
660 /// assert_eq!(out, vec![Ok(3), Err("error")]);
661 /// });
662 /// ```
663 fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
664 where
665 Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
666 F: FnMut(Self::Ok) -> Fut,
667 Self: Sized,
668 {
669 TryFilterMap::new(self, f)
670 }
671
672 /// Flattens a stream of streams into just one continuous stream. Produced streams
673 /// will be polled concurrently and any errors will be passed through without looking at them.
674 /// If the underlying base stream returns an error, it will be **immediately** propagated.
675 ///
676 /// The only argument is an optional limit on the number of concurrently
677 /// polled streams. If this limit is not `None`, no more than `limit` streams
678 /// will be polled at the same time. The `limit` argument is of type
679 /// `Into<Option<usize>>`, and so can be provided as either `None`,
680 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
681 /// no limit at all, and will have the same result as passing in `None`.
682 ///
683 /// # Examples
684 ///
685 /// ```
686 /// use tokio_stream::StreamExt;
687 /// use tokio_stream_util::TryStreamExt;
688 ///
689 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
690 /// rt.block_on(async {
691 /// let inner1 = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
692 /// let inner2 = tokio_stream::iter(vec![Ok::<i32, i32>(3), Ok(4)]);
693 /// let base = tokio_stream::iter(vec![Ok::<_, i32>(inner1), Ok::<_, i32>(inner2)]);
694 /// let stream = base.try_flatten_unordered(None);
695 ///
696 /// let mut out = stream.into_stream().collect::<Vec<_>>().await;
697 /// out.sort_by_key(|r| r.clone().unwrap_or_default());
698 /// assert_eq!(out, vec![Ok(1), Ok(2), Ok(3), Ok(4)]);
699 /// });
700 /// ```
701 #[cfg(all(feature = "alloc", feature = "std"))]
702 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
703 fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self>
704 where
705 Self: Sized,
706 Self::Ok: TryStream + Unpin,
707 <Self::Ok as TryStream>::Error: From<Self::Error>,
708 {
709 TryFlattenUnordered::new(self, limit.into())
710 }
711
712 /// Flattens a stream of streams into just one continuous stream.
713 ///
714 /// If this stream's elements are themselves streams then this combinator
715 /// will flatten out the entire stream to one long chain of elements. Any
716 /// errors are passed through without looking at them, but otherwise each
717 /// individual stream will get exhausted before moving on to the next.
718 ///
719 /// # Examples
720 ///
721 /// ```
722 /// use tokio_stream::StreamExt;
723 /// use tokio_stream_util::TryStreamExt;
724 ///
725 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
726 /// rt.block_on(async {
727 /// let inner1 = tokio_stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
728 /// let inner2 = tokio_stream::iter(vec![Ok::<i32, i32>(3), Ok(4)]);
729 /// let base = tokio_stream::iter(vec![Ok::<_, i32>(inner1), Ok::<_, i32>(inner2)]);
730 /// let stream = base.try_flatten();
731 ///
732 /// let out = stream.into_stream().collect::<Vec<_>>().await;
733 /// assert_eq!(out, vec![Ok(1), Ok(2), Ok(3), Ok(4)]);
734 /// });
735 /// ```
736 fn try_flatten(self) -> TryFlatten<Self>
737 where
738 Self::Ok: TryStream,
739 <Self::Ok as TryStream>::Error: From<Self::Error>,
740 Self: Sized,
741 {
742 TryFlatten::new(self)
743 }
744
745 /// Attempt to concatenate all items of a stream into a single
746 /// extendable destination, returning a future representing the end result.
747 ///
748 /// This combinator will extend the first item with the contents of all
749 /// the subsequent successful results of the stream. If the stream is empty,
750 /// the default value will be returned.
751 ///
752 /// Works with all collections that implement the [`Extend`](core::iter::Extend) trait.
753 ///
754 /// This method is similar to [`concat`](futures_util::stream::StreamExt::concat), but will
755 /// exit early if an error is encountered in the stream.
756 ///
757 /// # Examples
758 ///
759 /// ```
760 /// use tokio_stream_util::TryStreamExt;
761 ///
762 /// #[tokio::main]
763 /// async fn main() {
764 /// let fut = tokio_stream::iter(vec![
765 /// Ok::<Vec<i32>, ()>(vec![1, 2, 3]),
766 /// Ok(vec![4, 5, 6]),
767 /// Ok(vec![7, 8, 9]),
768 /// ]).try_concat();
769 ///
770 /// assert_eq!(fut.await, Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
771 /// }
772 /// ```
773 fn try_concat(self) -> TryConcat<Self>
774 where
775 Self: Sized,
776 Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
777 {
778 TryConcat::new(self)
779 }
780
781 /// Attempt to execute several futures from a stream concurrently (unordered).
782 ///
783 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
784 /// that matches the stream's `Error` type.
785 ///
786 /// This adaptor will buffer up to `n` futures and then return their
787 /// outputs in the order in which they complete. If the underlying stream
788 /// returns an error, it will be immediately propagated.
789 ///
790 /// The limit argument is of type `Into<Option<usize>>`, and so can be
791 /// provided as either `None`, `Some(10)`, or just `10`. Note: a limit of zero is
792 /// interpreted as no limit at all, and will have the same result as passing in `None`.
793 ///
794 /// The returned stream will be a stream of results, each containing either
795 /// an error or a future's output. An error can be produced either by the
796 /// underlying stream itself or by one of the futures it yielded.
797 ///
798 /// This method is only available when the `std` or `alloc` feature of this
799 /// library is activated, and it is activated by default.
800 ///
801 /// # Examples
802 ///
803 /// ```
804 /// use tokio_stream::StreamExt;
805 /// use tokio_stream_util::TryStreamExt;
806 ///
807 /// #[tokio::main]
808 /// async fn main() {
809 /// let unordered = tokio_stream::iter(vec![Ok::<i32, &str>(3), Ok(1), Ok(2)])
810 /// .map_ok(async |i| if i == 2 { Err("error") } else { Ok(i) })
811 /// .try_buffer_unordered(None);
812 ///
813 /// let mut out = unordered.into_stream().collect::<Vec<_>>().await;
814 /// out.sort();
815 /// assert_eq!(out, vec![Ok(1), Ok(3), Err("error")]);
816 /// }
817 /// ```
818 #[cfg(feature = "alloc")]
819 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
820 fn try_buffer_unordered(self, n: impl Into<Option<usize>>) -> TryBufferUnordered<Self>
821 where
822 Self::Ok: TryFuture<Error = Self::Error>,
823 Self: Sized,
824 {
825 TryBufferUnordered::new(self, n.into())
826 }
827
828 /// Attempt to execute several futures from a stream concurrently.
829 ///
830 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
831 /// that matches the stream's `Error` type.
832 ///
833 /// This adaptor will buffer up to `n` futures and then return their
834 /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
835 /// be immediately propagated.
836 ///
837 /// The limit argument is of type `Into<Option<usize>>`, and so can be
838 /// provided as either `None`, `Some(10)`, or just `10`. Note: a limit of zero is
839 /// interpreted as no limit at all, and will have the same result as passing in `None`.
840 ///
841 /// The returned stream will be a stream of results, each containing either
842 /// an error or a future's output. An error can be produced either by the
843 /// underlying stream itself or by one of the futures it yielded.
844 ///
845 /// This method is only available when the `std` or `alloc` feature of this
846 /// library is activated, and it is activated by default.
847 ///
848 /// # Examples
849 ///
850 /// ```
851 /// use tokio_stream::StreamExt;
852 /// use tokio_stream_util::TryStreamExt;
853 ///
854 /// #[tokio::main]
855 /// async fn main() {
856 /// let buffered = tokio_stream::iter(vec![Ok::<i32, &str>(3), Ok(1), Ok(2)])
857 /// .map_ok(|i| async move { if i == 2 { Err("error") } else { Ok(i) } })
858 /// .try_buffered(None);
859 ///
860 /// let out = buffered.into_stream().collect::<Vec<_>>().await;
861 /// assert_eq!(out, vec![Ok(3), Ok(1), Err("error")]);
862 /// }
863 /// ```
864 #[cfg(feature = "alloc")]
865 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
866 fn try_buffered(self, n: impl Into<Option<usize>>) -> TryBuffered<Self>
867 where
868 Self::Ok: TryFuture<Error = Self::Error>,
869 Self: Sized,
870 {
871 TryBuffered::new(self, n.into())
872 }
873
874 /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
875 /// stream types.
876 fn try_poll_next_unpin(
877 &mut self,
878 cx: &mut Context<'_>,
879 ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
880 where
881 Self: Unpin,
882 {
883 Pin::new(self).try_poll_next(cx)
884 }
885
886 #[cfg(all(feature = "io", feature = "std"))]
887 /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
888 ///
889 /// This method is only available when the `std` feature of this
890 /// library is activated, and it is activated by default.
891 ///
892 /// # Examples
893 ///
894 /// ```
895 /// use tokio_stream_util::TryStreamExt;
896 ///
897 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
898 /// rt.block_on(async {
899 /// let reader = tokio_stream::iter([Ok::<_, std::io::Error>(vec![1, 2, 3])]).into_async_read();
900 /// let mut buf_reader = tokio::io::BufReader::new(reader);
901 /// let mut buf = Vec::new();
902 /// use tokio::io::AsyncReadExt;
903 /// buf_reader.read_to_end(&mut buf).await.unwrap();
904 /// assert_eq!(buf, vec![1, 2, 3]);
905 /// });
906 /// ```
907 fn into_async_read(self) -> IntoAsyncRead<Self>
908 where
909 Self: Sized + TryStreamExt<Error = std::io::Error>,
910 Self::Ok: AsRef<[u8]>,
911 {
912 IntoAsyncRead::new(self)
913 }
914
915 /// Attempt to execute a predicate over an asynchronous stream and evaluate if all items
916 /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
917 /// that does not satisfy the predicate.
918 ///
919 /// # Examples
920 ///
921 /// ```
922 /// use tokio_stream::StreamExt;
923 /// use tokio_stream_util::TryStreamExt;
924 ///
925 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
926 /// rt.block_on(async {
927 /// let future_true = tokio_stream::iter(vec![1, 2, 3]).map(|i| Ok::<i32, &str>(i))
928 /// .try_all(|i| async move { i > 0 });
929 /// assert!(future_true.await.unwrap());
930 ///
931 /// let future_err = tokio_stream::iter(vec![Ok::<i32, &str>(1), Err("err"), Ok(3)])
932 /// .try_all(|i| async move { i > 0 });
933 /// assert!(future_err.await.is_err());
934 /// });
935 /// ```
936 fn try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F>
937 where
938 Self: Sized,
939 F: FnMut(Self::Ok) -> Fut,
940 Fut: Future<Output = bool>,
941 {
942 TryAll::new(self, f)
943 }
944
945 /// Attempt to execute a predicate over an asynchronous stream and evaluate if any items
946 /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
947 /// that satisfies the predicate.
948 ///
949 /// # Examples
950 ///
951 /// ```
952 /// use tokio_stream::StreamExt;
953 /// use tokio_stream_util::TryStreamExt;
954 ///
955 /// let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
956 /// rt.block_on(async {
957 /// let future_true = tokio_stream::iter(0..10).map(|i| Ok::<i32, &str>(i))
958 /// .try_any(|i| async move { i == 3 });
959 /// assert!(future_true.await.unwrap());
960 ///
961 /// let future_err = tokio_stream::iter(vec![Ok::<i32, &str>(1), Err("err"), Ok(3)])
962 /// .try_any(|i| async move { i == 3 });
963 /// assert!(future_err.await.is_err());
964 /// });
965 /// ```
966 fn try_any<Fut, F>(self, f: F) -> TryAny<Self, Fut, F>
967 where
968 Self: Sized,
969 F: FnMut(Self::Ok) -> Fut,
970 Fut: Future<Output = bool>,
971 {
972 TryAny::new(self, f)
973 }
974}