1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
use crate::ops::*; use futures::stream::FusedStream; use futures::{Future, Stream, TryFuture, TryStream}; use std::hash::Hash; /// /// Extensions to the [`TryStream`](futures::TryStream) type which aren't already covered by the /// included [`TryStreamExt`](futures::TryStreamExt). /// /// This is implemented using a blanket impl for all `TryStream` implementors (which means any /// [`Stream`](futures::Stream) that emits a `Result`). /// pub trait JTryStreamExt: TryStream + Sized { /// /// Turn this [`TryStream`](futures::TryStream) into a [`TryFuture`](futures::TryFuture) which /// gives the first item emitted by this stream (in the form of an `Option`, because the stream /// doesn't necessarily have to emit anything). /// fn try_first(self) -> TryStreamNth<Self> { TryStreamNth::first(self) } /// /// Turn this [`TryStream`](futures::TryStream) into a [`TryFuture`](futures::TryFuture) which /// gives the nth item emitted by this stream (in the form of an `Option`, because the stream /// doesn't have to emit enough items to reach the index `n`). /// /// It will only emit exactly the `n`'th item. If the stream completes before the `n`th item /// is reached, then the future will emit a value of `None`. /// /// Any errors encountered while reaching `n`th item will be immediately returned. /// /// The future emits a value of type `Result<Option<Self::Ok>, Self::Error>` /// fn try_nth(self, n: usize) -> TryStreamNth<Self> { TryStreamNth::new(self, n) } /// /// filter+map on the `Self::Ok` value of this stream. /// /// This stream, with the item type `Result<Self::Ok, Self::Error>`, can be converted /// to a stream which skips some values of `Self::Ok` and otherwise transforms non-skipped /// values to a new type `T`, giving a new stream with item type `Result<T, Self::Error>`. /// /// If the current stream emits an `Ok(Self::Ok)` value, then the function passed to this /// method will be called to transform it to an `Ok(T)` message. /// /// If the current stream emits some error by emitting the message `Err(Self::Error)`, then /// this message will be passed straight-through. /// fn try_filter_map_ok<F, R>(self, predicate: F) -> TryFilterMapOk<Self, F, R> where F: FnMut(Self::Ok) -> Option<R>, { TryFilterMapOk::new(self, predicate) } /// /// Given some stream where the `Self::Ok` type is `Hash`, then this method will allow you /// to "de-duplicate" that stream. /// /// This is implemented by storing the hash (a `u64` value) of the `Self::Ok` messages in a /// `HashSet<u64>` internally. If an item is emitted, it's hash is computed, and if the hash /// has been seen before, then the item is skipped. /// /// Any error items will not be checked for duplication, and will simply be emitted by the /// modified "de-duplicated" stream. /// fn try_dedup(self) -> TryDedupStream<Self> where Self::Ok: Hash, { TryDedupStream::new(self) } /// /// If an `Err(Self::Error)` item is emitted from the stream, then panic on further calls to /// this stream's `try_poll_next` method, and also implement /// [FusedStream](futures::stream::FusedStream) for this stream (even if the current stream /// doesn't actually implement `FusedStream`). /// fn fuse_on_fail(self) -> FuseOnFail<Self> { FuseOnFail::new(self) } /// /// Given some initial value of a type `T`, and some function which accepts `&mut T` and /// `Self::Ok` and returns a `Future<Output=Result<(), Self::Error>>`, this stream can be /// converted into a `Future<Output=Result<T, Self::Error>>`. /// /// The purpose of this method is basically "fold, but with mutable references, instead of /// moving the value." /// /// Most implementations will not actually need to return a `Future` in the handler function, /// but it is required to be a `Future` to support rare use-cases where async code must be /// executed to update the `T`. In the common case, you can just use an `async move` block /// to turn your non-async code into a `Future`, or you can use `futures::future::ready` to /// construct a ready `Future` returning `Ok(())`. /// /// If the source stream ever emits an `Err(Self::Error)` item, then that causes this future /// to immediately emit that same message. Otherwise, the returned future completes when /// the stream completes. /// /// If the stream emits no items, then the initial value of `T` passed as the first parameter /// to this method is emitted as `Ok(T)`. /// fn try_fold_mut<T, F, Fut>(self, initial: T, handler: F) -> TryFoldMut<Self, T, F, Fut> where Self: FusedStream, F: FnMut(&mut T, Self::Ok) -> Fut, Fut: TryFuture<Ok = (), Error = Self::Error>, { TryFoldMut::new(self, initial, handler) } } impl<T> JTryStreamExt for T where T: TryStream + Sized {} /// /// Extensions to the [`Stream`](futures::Stream) type which aren't already covered in /// [`StreamExt`](futures::StreamExt). /// /// This is implemented using a blanket impl for all `Stream` implementors. /// pub trait JStreamExt: Stream + Sized { /// /// Given some stream where the item is `Hash`, return a stream which only emits the unique /// items emitted by the source stream. /// /// You can think of this as "de-duplicating" this stream. Only the first instance of every /// unique item will be emitted. /// /// This is implemented by computing and storing the hash (a `u64` value) in a `HashSet` for /// each item emitted by the stream. /// fn dedup(self) -> DedupStream<Self> where Self::Item: Hash, { DedupStream::new(self) } /// /// fold, but with mutable references. /// /// Turns this stream into a `Future<Output=T>`. You must provide some initial value of type `T`, /// and some function which accepts `&mut T` and `Self::Item` and returns `Future<Output=()>`. /// /// After all items are emitted by this stream, the current value of `T` is emitted by the returned /// future. /// /// If the stream emits no values, then the initial value of `T` is emitted. /// fn fold_mut<T, F, Fut>(self, initial: T, handler: F) -> FoldMut<Self, T, F, Fut> where Self: FusedStream, F: FnMut(&mut T, Self::Item) -> Fut, Fut: Future<Output = ()>, { FoldMut::new(self, initial, handler) } /// /// Turn this [`Stream`](futures::Stream) into a [`Future`](futures::Future) which gives the /// first item emitted by this stream (in the form of an `Option`, because the stream doesn't /// necessarily have to emit anything). /// fn first(self) -> StreamNth<Self> { StreamNth::first(self) } /// /// Turn this [`Stream`](futures::Stream) into a [`Future`](futures::Future) which gives the /// nth item emitted by this stream (in the form of an `Option`, because the stream doesn't /// have to emit enough items to reach the index `n`). /// /// It will only emit exactly the `n`'th item. If the stream completes before the `n`th item /// is reached, then the future will emit a value of `None`. /// fn nth(self, index: usize) -> StreamNth<Self> { StreamNth::new(self, index) } } impl<T> JStreamExt for T where T: Stream + Sized {}