futures_concurrency/stream/
stream_ext.rs

1use core::future::IntoFuture;
2
3use crate::stream::{IntoStream, Merge};
4use futures_core::Stream;
5
6#[cfg(feature = "alloc")]
7use crate::concurrent_stream::FromStream;
8
9use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, WaitUntil, Zip};
10
11/// An extension trait for the `Stream` trait.
12pub trait StreamExt: Stream {
13    /// Combines two streams into a single stream of all their outputs.
14    fn merge<T, S2>(self, other: S2) -> Merge2<T, Self, S2::IntoStream>
15    where
16        Self: Stream<Item = T> + Sized,
17        S2: IntoStream<Item = T>;
18
19    /// Takes two streams and creates a new stream over all in sequence
20    fn chain<T, S2>(self, other: S2) -> Chain2<Self, S2::IntoStream>
21    where
22        Self: Stream<Item = T> + Sized,
23        S2: IntoStream<Item = T>;
24
25    /// ‘Zips up’ multiple streams into a single stream of pairs.
26    fn zip<T, S2>(self, other: S2) -> Zip2<Self, S2::IntoStream>
27    where
28        Self: Stream<Item = T> + Sized,
29        S2: IntoStream<Item = T>;
30
31    /// Convert into a concurrent stream.
32    #[cfg(feature = "alloc")]
33    fn co(self) -> FromStream<Self>
34    where
35        Self: Sized,
36    {
37        FromStream::new(self)
38    }
39
40    /// Delay the yielding of items from the stream until the given deadline.
41    ///
42    /// The underlying stream will not be polled until the deadline has expired. In addition
43    /// to using a time source as a deadline, any future can be used as a
44    /// deadline too. When used in combination with a multi-consumer channel,
45    /// this method can be used to synchronize the start of multiple streams and futures.
46    ///
47    /// # Example
48    /// ```
49    /// # #[cfg(miri)] fn main() {}
50    /// # #[cfg(not(miri))]
51    /// # fn main() {
52    /// use async_io::Timer;
53    /// use futures_concurrency::prelude::*;
54    /// use futures_lite::{future::block_on, stream};
55    /// use futures_lite::prelude::*;
56    /// use std::time::{Duration, Instant};
57    ///
58    /// block_on(async {
59    ///     let now = Instant::now();
60    ///     let duration = Duration::from_millis(100);
61    ///
62    ///     stream::once("meow")
63    ///         .wait_until(Timer::after(duration))
64    ///         .next()
65    ///         .await;
66    ///
67    ///     assert!(now.elapsed() >= duration);
68    /// });
69    /// # }
70    /// ```
71    fn wait_until<D>(self, deadline: D) -> WaitUntil<Self, D::IntoFuture>
72    where
73        Self: Sized,
74        D: IntoFuture,
75    {
76        WaitUntil::new(self, deadline.into_future())
77    }
78}
79
80impl<S1> StreamExt for S1
81where
82    S1: Stream,
83{
84    fn merge<T, S2>(self, other: S2) -> Merge2<T, S1, S2::IntoStream>
85    where
86        S1: Stream<Item = T>,
87        S2: IntoStream<Item = T>,
88    {
89        Merge::merge((self, other))
90    }
91
92    fn chain<T, S2>(self, other: S2) -> Chain2<Self, S2::IntoStream>
93    where
94        Self: Stream<Item = T> + Sized,
95        S2: IntoStream<Item = T>,
96    {
97        // TODO(yosh): fix the bounds on the tuple impl
98        Chain::chain((self, other.into_stream()))
99    }
100
101    fn zip<T, S2>(self, other: S2) -> Zip2<Self, S2::IntoStream>
102    where
103        Self: Stream<Item = T> + Sized,
104        S2: IntoStream<Item = T>,
105    {
106        // TODO(yosh): fix the bounds on the tuple impl
107        Zip::zip((self, other.into_stream()))
108    }
109}