futures_concurrency/stream/stream_ext.rs
1use core::future::IntoFuture;
2
3use crate::stream::{IntoStream, Merge};
4use futures_core::Stream;
5
6#[cfg(feature = "alloc")]
7use crate::concurrent_stream::FromStream;
8
9use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, WaitUntil, Zip};
10
11/// An extension trait for the `Stream` trait.
12pub trait StreamExt: Stream {
13 /// Combines two streams into a single stream of all their outputs.
14 fn merge<T, S2>(self, other: S2) -> Merge2<T, Self, S2::IntoStream>
15 where
16 Self: Stream<Item = T> + Sized,
17 S2: IntoStream<Item = T>;
18
19 /// Takes two streams and creates a new stream over all in sequence
20 fn chain<T, S2>(self, other: S2) -> Chain2<Self, S2::IntoStream>
21 where
22 Self: Stream<Item = T> + Sized,
23 S2: IntoStream<Item = T>;
24
25 /// ‘Zips up’ multiple streams into a single stream of pairs.
26 fn zip<T, S2>(self, other: S2) -> Zip2<Self, S2::IntoStream>
27 where
28 Self: Stream<Item = T> + Sized,
29 S2: IntoStream<Item = T>;
30
31 /// Convert into a concurrent stream.
32 #[cfg(feature = "alloc")]
33 fn co(self) -> FromStream<Self>
34 where
35 Self: Sized,
36 {
37 FromStream::new(self)
38 }
39
40 /// Delay the yielding of items from the stream until the given deadline.
41 ///
42 /// The underlying stream will not be polled until the deadline has expired. In addition
43 /// to using a time source as a deadline, any future can be used as a
44 /// deadline too. When used in combination with a multi-consumer channel,
45 /// this method can be used to synchronize the start of multiple streams and futures.
46 ///
47 /// # Example
48 /// ```
49 /// # #[cfg(miri)] fn main() {}
50 /// # #[cfg(not(miri))]
51 /// # fn main() {
52 /// use async_io::Timer;
53 /// use futures_concurrency::prelude::*;
54 /// use futures_lite::{future::block_on, stream};
55 /// use futures_lite::prelude::*;
56 /// use std::time::{Duration, Instant};
57 ///
58 /// block_on(async {
59 /// let now = Instant::now();
60 /// let duration = Duration::from_millis(100);
61 ///
62 /// stream::once("meow")
63 /// .wait_until(Timer::after(duration))
64 /// .next()
65 /// .await;
66 ///
67 /// assert!(now.elapsed() >= duration);
68 /// });
69 /// # }
70 /// ```
71 fn wait_until<D>(self, deadline: D) -> WaitUntil<Self, D::IntoFuture>
72 where
73 Self: Sized,
74 D: IntoFuture,
75 {
76 WaitUntil::new(self, deadline.into_future())
77 }
78}
79
80impl<S1> StreamExt for S1
81where
82 S1: Stream,
83{
84 fn merge<T, S2>(self, other: S2) -> Merge2<T, S1, S2::IntoStream>
85 where
86 S1: Stream<Item = T>,
87 S2: IntoStream<Item = T>,
88 {
89 Merge::merge((self, other))
90 }
91
92 fn chain<T, S2>(self, other: S2) -> Chain2<Self, S2::IntoStream>
93 where
94 Self: Stream<Item = T> + Sized,
95 S2: IntoStream<Item = T>,
96 {
97 // TODO(yosh): fix the bounds on the tuple impl
98 Chain::chain((self, other.into_stream()))
99 }
100
101 fn zip<T, S2>(self, other: S2) -> Zip2<Self, S2::IntoStream>
102 where
103 Self: Stream<Item = T> + Sized,
104 S2: IntoStream<Item = T>,
105 {
106 // TODO(yosh): fix the bounds on the tuple impl
107 Zip::zip((self, other.into_stream()))
108 }
109}