1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
use crate::ops::*;
use futures::stream::FusedStream;
use futures::{Future, Stream, TryFuture, TryStream};
use std::hash::Hash;

///
/// Extensions to the [`TryStream`](futures::TryStream) type which aren't already covered by the
/// included [`TryStreamExt`](futures::TryStreamExt).
///
/// This is implemented using a blanket impl for all `TryStream` implementors (which means any
/// [`Stream`](futures::Stream) that emits a `Result`).
///
pub trait JTryStreamExt: TryStream + Sized {
    ///
    /// Turn this [`TryStream`](futures::TryStream) into a [`TryFuture`](futures::TryFuture) which
    /// gives the first item emitted by this stream (in the form of an `Option`, because the stream
    /// doesn't necessarily have to emit anything).
    ///
    fn try_first(self) -> TryStreamNth<Self> {
        TryStreamNth::first(self)
    }

    ///
    /// Turn this [`TryStream`](futures::TryStream) into a [`TryFuture`](futures::TryFuture) which
    /// gives the nth item emitted by this stream (in the form of an `Option`, because the stream
    /// doesn't have to emit enough items to reach the index `n`).
    ///
    /// It will only emit exactly the `n`'th item. If the stream completes before the `n`th item
    /// is reached, then the future will emit a value of `None`.
    ///
    /// Any errors encountered while reaching `n`th item will be immediately returned.
    ///
    /// The future emits a value of type `Result<Option<Self::Ok>, Self::Error>`
    ///
    fn try_nth(self, n: usize) -> TryStreamNth<Self> {
        TryStreamNth::new(self, n)
    }

    ///
    /// filter+map on the `Self::Ok` value of this stream.
    ///
    /// This stream, with the item type `Result<Self::Ok, Self::Error>`, can be converted
    /// to a stream which skips some values of `Self::Ok` and otherwise transforms non-skipped
    /// values to a new type `T`, giving a new stream with item type `Result<T, Self::Error>`.
    ///
    /// If the current stream emits an `Ok(Self::Ok)` value, then the function passed to this
    /// method will be called to transform it to an `Ok(T)` message.
    ///
    /// If the current stream emits some error by emitting the message `Err(Self::Error)`, then
    /// this message will be passed straight-through.
    ///
    fn try_filter_map_ok<F, R>(self, predicate: F) -> TryFilterMapOk<Self, F, R>
    where
        F: FnMut(Self::Ok) -> Option<R>,
    {
        TryFilterMapOk::new(self, predicate)
    }

    ///
    /// Given some stream where the `Self::Ok` type is `Hash`, then this method will allow you
    /// to "de-duplicate" that stream.
    ///
    /// This is implemented by storing the hash (a `u64` value) of the `Self::Ok` messages in a
    /// `HashSet<u64>` internally. If an item is emitted, it's hash is computed, and if the hash
    /// has been seen before, then the item is skipped.
    ///
    /// Any error items will not be checked for duplication, and will simply be emitted by the
    /// modified "de-duplicated" stream.
    ///
    fn try_dedup(self) -> TryDedupStream<Self>
    where
        Self::Ok: Hash,
    {
        TryDedupStream::new(self)
    }

    ///
    /// If an `Err(Self::Error)` item is emitted from the stream, then panic on further calls to
    /// this stream's `try_poll_next` method, and also implement
    /// [FusedStream](futures::stream::FusedStream) for this stream (even if the current stream
    /// doesn't actually implement `FusedStream`).
    ///
    fn fuse_on_fail(self) -> FuseOnFail<Self> {
        FuseOnFail::new(self)
    }

    ///
    /// Given some initial value of a type `T`, and some function which accepts `&mut T` and
    /// `Self::Ok` and returns a `Future<Output=Result<(), Self::Error>>`, this stream can be
    /// converted into a `Future<Output=Result<T, Self::Error>>`.
    ///
    /// The purpose of this method is basically "fold, but with mutable references, instead of
    /// moving the value."
    ///
    /// Most implementations will not actually need to return a `Future` in the handler function,
    /// but it is required to be a `Future` to support rare use-cases where async code must be
    /// executed to update the `T`. In the common case, you can just use an `async move` block
    /// to turn your non-async code into a `Future`, or you can use `futures::future::ready` to
    /// construct a ready `Future` returning `Ok(())`.
    ///
    /// If the source stream ever emits an `Err(Self::Error)` item, then that causes this future
    /// to immediately emit that same message. Otherwise, the returned future completes when
    /// the stream completes.
    ///
    /// If the stream emits no items, then the initial value of `T` passed as the first parameter
    /// to this method is emitted as `Ok(T)`.
    ///
    fn try_fold_mut<T, F, Fut>(self, initial: T, handler: F) -> TryFoldMut<Self, T, F, Fut>
    where
        Self: FusedStream,
        F: FnMut(&mut T, Self::Ok) -> Fut,
        Fut: TryFuture<Ok = (), Error = Self::Error>,
    {
        TryFoldMut::new(self, initial, handler)
    }
}

impl<T> JTryStreamExt for T where T: TryStream + Sized {}

///
/// Extensions to the [`Stream`](futures::Stream) type which aren't already covered in
/// [`StreamExt`](futures::StreamExt).
///
/// This is implemented using a blanket impl for all `Stream` implementors.
///
pub trait JStreamExt: Stream + Sized {
    ///
    /// Given some stream where the item is `Hash`, return a stream which only emits the unique
    /// items emitted by the source stream.
    ///
    /// You can think of this as "de-duplicating" this stream. Only the first instance of every
    /// unique item will be emitted.
    ///
    /// This is implemented by computing and storing the hash (a `u64` value) in a `HashSet` for
    /// each item emitted by the stream.
    ///
    fn dedup(self) -> DedupStream<Self>
    where
        Self::Item: Hash,
    {
        DedupStream::new(self)
    }

    ///
    /// fold, but with mutable references.
    ///
    /// Turns this stream into a `Future<Output=T>`. You must provide some initial value of type `T`,
    /// and some function which accepts `&mut T` and `Self::Item` and returns `Future<Output=()>`.
    ///
    /// After all items are emitted by this stream, the current value of `T` is emitted by the returned
    /// future.
    ///
    /// If the stream emits no values, then the initial value of `T` is emitted.
    ///
    fn fold_mut<T, F, Fut>(self, initial: T, handler: F) -> FoldMut<Self, T, F, Fut>
    where
        Self: FusedStream,
        F: FnMut(&mut T, Self::Item) -> Fut,
        Fut: Future<Output = ()>,
    {
        FoldMut::new(self, initial, handler)
    }

    ///
    /// Turn this [`Stream`](futures::Stream) into a [`Future`](futures::Future) which gives the
    /// first item emitted by this stream (in the form of an `Option`, because the stream doesn't
    /// necessarily have to emit anything).
    ///
    fn first(self) -> StreamNth<Self> {
        StreamNth::first(self)
    }

    ///
    /// Turn this [`Stream`](futures::Stream) into a [`Future`](futures::Future) which gives the
    /// nth item emitted by this stream (in the form of an `Option`, because the stream doesn't
    /// have to emit enough items to reach the index `n`).
    ///
    /// It will only emit exactly the `n`'th item. If the stream completes before the `n`th item
    /// is reached, then the future will emit a value of `None`.
    ///
    fn nth(self, index: usize) -> StreamNth<Self> {
        StreamNth::new(self, index)
    }
}

impl<T> JStreamExt for T where T: Stream + Sized {}