merge_streams/stream.rs
1//! Composable asynchronous iteration.
2
3use crate::MergeStreams;
4use futures_core::Stream;
5
6/// Extend `Stream` with a `merge` method.
7pub trait StreamExt: Stream {
8 /// Combines multiple streams into a single stream of all their outputs.
9 ///
10 /// Items are yielded as soon as they're received, and the stream continues
11 /// yield until both streams have been exhausted. The output ordering
12 /// between streams is not guaranteed.
13 ///
14 /// # Examples
15 ///
16 /// ```
17 /// use futures::stream::StreamExt;
18 /// use merge_streams::StreamExt as _;
19 /// use futures_lite::future::block_on;
20 /// use futures_lite::stream;
21 ///
22 /// fn main() {
23 /// block_on(async {
24 /// let a = stream::once(1u8);
25 /// let b = stream::once(2u8);
26 /// let mut s = a.merge(b);
27 ///
28 /// let mut buf = vec![];
29 /// while let Some(n) = s.next().await {
30 /// buf.push(n);
31 /// }
32 /// buf.sort_unstable();
33 /// assert_eq!(&buf, &[1u8, 2u8]);
34 /// })
35 /// }
36 /// ```
37 fn merge<S1>(self, other: S1) -> Box<dyn Stream<Item = Self::Item> + Unpin>
38 where
39 Self: Sized + 'static,
40 S1: Stream<Item = Self::Item> + 'static,
41 {
42 Box::new((self, other).merge())
43 }
44}
45
46impl<S> StreamExt for S where S: Stream {}
47
48/// Helper trait for converting values into a `Stream`.
49///
50/// By implementing `IntoStream` for a type, you define how it will be
51/// converted to an iterator. This is common for types which describe a
52/// collection of some kind.
53///
54/// [`from_stream`]: #tymethod.from_stream
55/// [`Stream`]: trait.Stream.html
56pub trait IntoStream {
57 /// The type of the elements being iterated over.
58 type Item;
59
60 /// Which kind of stream are we turning this into?
61 type IntoStream: Stream<Item = Self::Item>;
62
63 /// Creates a stream from a value.
64 fn into_stream(self) -> Self::IntoStream;
65}
66
67impl<S: Stream> IntoStream for S {
68 type Item = S::Item;
69 type IntoStream = S;
70
71 #[inline]
72 fn into_stream(self) -> S {
73 self
74 }
75}