1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
use futures_core::stream::Stream;
use futures_util::stream::{iter, Iter};

/// Conversion into an [`Stream`].
///
/// By implementing IntoStream for a type, you define how it will be converted to a [`Stream`].\
/// This is the [`Stream`] trait parallel to [`IntoIterator`] for [`Iterator`](std::iter::Iterator)s.
///
/// A note of caution is to not implement generic methods for this like you'd for [`IntoIterator`] because without [specialization](https://github.com/rust-lang/rust/issues/31844) it's not currently possible to blanket implement this for all [`Stream`]s
///
/// [`Stream`]: futures_core::stream::Stream
/// [`IntoIterator`]: std::iter::IntoIterator
pub trait IntoStream {
    /// The type of the elements being streamed.
    type Item;

    /// Which kind of stream are we turning this into?
    type IntoStream: Stream<Item = Self::Item>;

    /// Creates a stream from a value.
    ///
    /// # Examples
    ///
    /// Basic usage:
    ///
    /// ```
    /// use flood::IntoStream;
    /// use futures_util::stream::StreamExt;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let v = vec![1, 2, 3];
    ///     let mut stream = v.into_stream();
    ///
    ///     assert_eq!(Some(1), stream.next().await);
    ///     assert_eq!(Some(2), stream.next().await);
    ///     assert_eq!(Some(3), stream.next().await);
    ///     assert_eq!(None, stream.next().await);
    /// }
    /// ```
    fn into_stream(self) -> Self::IntoStream;
}

impl<T> IntoStream for T where T: IntoIterator {
    type Item = T::Item;
    type IntoStream = Iter<T::IntoIter>;

    fn into_stream(self) -> Self::IntoStream {
        iter(self)
    }
}

// TODO: Specialization
// impl<St: Stream> IntoStream for St {
//     type Item = St::Item;
//     type IntoStream = St;

//     fn into_stream(self) -> Self::IntoStream {
//         self
//     }
// }

#[cfg(test)]
mod tests {
    use super::IntoStream;
    // Thanks karroffel for the protip
    use futures_util::stream::StreamExt as _;

    #[tokio::test]
    async fn into_stream_vec() {
        let v = vec![1, 2, 3];
        let mut stream = v.into_stream();

        assert_eq!(Some(1), stream.next().await);
        assert_eq!(Some(2), stream.next().await);
        assert_eq!(Some(3), stream.next().await);
        assert_eq!(None, stream.next().await);
    }

    // I am not putting this test here to avoid breaking crater runs.
    // #[tokio::test]
    // async fn into_stream_array() {
    //     let arr = [1, 2, 3];
    //     let mut stream = arr.into_stream();

    //     assert_eq!(Some(1), stream.next().await);
    //     assert_eq!(Some(2), stream.next().await);
    //     assert_eq!(Some(3), stream.next().await);
    //     assert_eq!(None, stream.next().await);
    // }

    #[tokio::test]
    async fn into_stream_slice() {
        let v = vec![1, 2, 3];
        let slice = &v[..];
        let mut stream = slice.into_stream();

        assert_eq!(Some(&1), stream.next().await);
        assert_eq!(Some(&2), stream.next().await);
        assert_eq!(Some(&3), stream.next().await);
        assert_eq!(None, stream.next().await);
    }
}