1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
//! Composable asynchronous iteration.

use crate::MergeStreams;
use futures_core::Stream;

/// Extend `Stream` with a `merge` method.
pub trait StreamExt: Stream {
    /// Combines multiple streams into a single stream of all their outputs.
    ///
    /// Items are yielded as soon as they're received, and the stream continues
    /// yield until both streams have been exhausted. The output ordering
    /// between streams is not guaranteed.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures::stream::StreamExt;
    /// use merge_streams::StreamExt as _;
    /// use futures_lite::future::block_on;
    /// use futures_lite::stream;
    ///
    /// fn main() {
    ///     block_on(async {
    ///         let a = stream::once(1u8);
    ///         let b = stream::once(2u8);
    ///         let mut s = a.merge(b);
    ///
    ///         let mut buf = vec![];
    ///         while let Some(n) = s.next().await {
    ///             buf.push(n);
    ///         }
    ///         buf.sort_unstable();
    ///         assert_eq!(&buf, &[1u8, 2u8]);
    ///     })
    /// }
    /// ```
    fn merge<S1>(self, other: S1) -> Box<dyn Stream<Item = Self::Item> + Unpin>
    where
        Self: Sized + 'static,
        S1: Stream<Item = Self::Item> + 'static,
    {
        Box::new((self, other).merge())
    }
}

impl<S> StreamExt for S where S: Stream {}

/// Helper trait for converting values into a `Stream`.
///
/// By implementing `IntoStream` for a type, you define how it will be
/// converted to an iterator. This is common for types which describe a
/// collection of some kind.
///
/// [`from_stream`]: #tymethod.from_stream
/// [`Stream`]: trait.Stream.html
pub trait IntoStream {
    /// The type of the elements being iterated over.
    type Item;

    /// Which kind of stream are we turning this into?
    type IntoStream: Stream<Item = Self::Item>;

    /// Creates a stream from a value.
    fn into_stream(self) -> Self::IntoStream;
}

impl<S: Stream> IntoStream for S {
    type Item = S::Item;
    type IntoStream = S;

    #[inline]
    fn into_stream(self) -> S {
        self
    }
}