1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
//! A sequence of asynchronous values. #[cfg(feature = "timer")] use std::time::Duration; #[cfg(feature = "timer")] use tokio_timer::{throttle::Throttle, Timeout}; #[doc(inline)] pub use futures_core::Stream; #[doc(inline)] pub use futures_util::stream::{empty, iter, once, pending, poll_fn, repeat, unfold}; /// An extension trait for `Stream` that provides a variety of convenient /// combinator functions. /// /// Currently, there are only [`timeout`] and [`throttle`] functions, but /// this will increase over time. /// /// Users are not expected to implement this trait. All types that implement /// `Stream` already implement `StreamExt`. /// /// This trait can be imported directly or via the Tokio prelude: `use /// tokio::prelude::*`. /// /// [`timeout`]: #method.timeout pub trait StreamExt: Stream { /// Throttle down the stream by enforcing a fixed delay between items. /// /// Errors are also delayed. #[cfg(feature = "timer")] fn throttle(self, duration: Duration) -> Throttle<Self> where Self: Sized, { Throttle::new(self, duration) } /// Creates a new stream which allows `self` until `timeout`. /// /// This combinator creates a new stream which wraps the receiving stream /// with a timeout. For each item, the returned stream is allowed to execute /// until it completes or `timeout` has elapsed, whichever happens first. /// /// If an item completes before `timeout` then the stream will yield /// with that item. Otherwise the stream will yield to an error. /// /// # Examples /// /// ``` /// #![feature(async_await)] /// /// use tokio::prelude::*; /// /// use std::time::Duration; /// /// # fn slow_stream() -> impl Stream<Item = ()> { /// # tokio::stream::empty() /// # } /// # /// # async fn dox() { /// let mut stream = slow_stream() /// .timeout(Duration::from_secs(1)); /// /// while let Some(value) = stream.next().await { /// println!("value = {:?}", value); /// } /// # } /// ``` #[cfg(feature = "timer")] fn timeout(self, timeout: Duration) -> Timeout<Self> where Self: Sized, { Timeout::new(self, timeout) } } impl<T: ?Sized> StreamExt for T where T: Stream {}