merge_streams/
stream.rs

1//! Composable asynchronous iteration.
2
3use crate::MergeStreams;
4use futures_core::Stream;
5
6/// Extend `Stream` with a `merge` method.
7pub trait StreamExt: Stream {
8    /// Combines multiple streams into a single stream of all their outputs.
9    ///
10    /// Items are yielded as soon as they're received, and the stream continues
11    /// yield until both streams have been exhausted. The output ordering
12    /// between streams is not guaranteed.
13    ///
14    /// # Examples
15    ///
16    /// ```
17    /// use futures::stream::StreamExt;
18    /// use merge_streams::StreamExt as _;
19    /// use futures_lite::future::block_on;
20    /// use futures_lite::stream;
21    ///
22    /// fn main() {
23    ///     block_on(async {
24    ///         let a = stream::once(1u8);
25    ///         let b = stream::once(2u8);
26    ///         let mut s = a.merge(b);
27    ///
28    ///         let mut buf = vec![];
29    ///         while let Some(n) = s.next().await {
30    ///             buf.push(n);
31    ///         }
32    ///         buf.sort_unstable();
33    ///         assert_eq!(&buf, &[1u8, 2u8]);
34    ///     })
35    /// }
36    /// ```
37    fn merge<S1>(self, other: S1) -> Box<dyn Stream<Item = Self::Item> + Unpin>
38    where
39        Self: Sized + 'static,
40        S1: Stream<Item = Self::Item> + 'static,
41    {
42        Box::new((self, other).merge())
43    }
44}
45
46impl<S> StreamExt for S where S: Stream {}
47
48/// Helper trait for converting values into a `Stream`.
49///
50/// By implementing `IntoStream` for a type, you define how it will be
51/// converted to an iterator. This is common for types which describe a
52/// collection of some kind.
53///
54/// [`from_stream`]: #tymethod.from_stream
55/// [`Stream`]: trait.Stream.html
56pub trait IntoStream {
57    /// The type of the elements being iterated over.
58    type Item;
59
60    /// Which kind of stream are we turning this into?
61    type IntoStream: Stream<Item = Self::Item>;
62
63    /// Creates a stream from a value.
64    fn into_stream(self) -> Self::IntoStream;
65}
66
67impl<S: Stream> IntoStream for S {
68    type Item = S::Item;
69    type IntoStream = S;
70
71    #[inline]
72    fn into_stream(self) -> S {
73        self
74    }
75}