jstream_ext/ext.rs
1use crate::ops::*;
2use futures::stream::FusedStream;
3use futures::{Future, Stream, TryFuture, TryStream};
4use std::hash::Hash;
5
6///
7/// Extensions to the [`TryStream`](futures::TryStream) type which aren't already covered by the
8/// included [`TryStreamExt`](futures::TryStreamExt).
9///
10/// This is implemented using a blanket impl for all `TryStream` implementors (which means any
11/// [`Stream`](futures::Stream) that emits a `Result`).
12///
13pub trait JTryStreamExt: TryStream + Sized {
14 ///
15 /// Turn this [`TryStream`](futures::TryStream) into a [`TryFuture`](futures::TryFuture) which
16 /// gives the first item emitted by this stream (in the form of an `Option`, because the stream
17 /// doesn't necessarily have to emit anything).
18 ///
19 fn try_first(self) -> TryStreamNth<Self> {
20 TryStreamNth::first(self)
21 }
22
23 ///
24 /// Turn this [`TryStream`](futures::TryStream) into a [`TryFuture`](futures::TryFuture) which
25 /// gives the nth item emitted by this stream (in the form of an `Option`, because the stream
26 /// doesn't have to emit enough items to reach the index `n`).
27 ///
28 /// It will only emit exactly the `n`'th item. If the stream completes before the `n`th item
29 /// is reached, then the future will emit a value of `None`.
30 ///
31 /// Any errors encountered while reaching `n`th item will be immediately returned.
32 ///
33 /// The future emits a value of type `Result<Option<Self::Ok>, Self::Error>`
34 ///
35 fn try_nth(self, n: usize) -> TryStreamNth<Self> {
36 TryStreamNth::new(self, n)
37 }
38
39 ///
40 /// filter+map on the `Self::Ok` value of this stream.
41 ///
42 /// This stream, with the item type `Result<Self::Ok, Self::Error>`, can be converted
43 /// to a stream which skips some values of `Self::Ok` and otherwise transforms non-skipped
44 /// values to a new type `T`, giving a new stream with item type `Result<T, Self::Error>`.
45 ///
46 /// If the current stream emits an `Ok(Self::Ok)` value, then the function passed to this
47 /// method will be called to transform it to an `Ok(T)` message.
48 ///
49 /// If the current stream emits some error by emitting the message `Err(Self::Error)`, then
50 /// this message will be passed straight-through.
51 ///
52 fn try_filter_map_ok<F, R>(self, predicate: F) -> TryFilterMapOk<Self, F, R>
53 where
54 F: FnMut(Self::Ok) -> Option<R>,
55 {
56 TryFilterMapOk::new(self, predicate)
57 }
58
59 ///
60 /// Given some stream where the `Self::Ok` type is `Hash`, then this method will allow you
61 /// to "de-duplicate" that stream.
62 ///
63 /// This is implemented by storing the hash (a `u64` value) of the `Self::Ok` messages in a
64 /// `HashSet<u64>` internally. If an item is emitted, it's hash is computed, and if the hash
65 /// has been seen before, then the item is skipped.
66 ///
67 /// Any error items will not be checked for duplication, and will simply be emitted by the
68 /// modified "de-duplicated" stream.
69 ///
70 fn try_dedup(self) -> TryDedupStream<Self>
71 where
72 Self::Ok: Hash,
73 {
74 TryDedupStream::new(self)
75 }
76
77 ///
78 /// If an `Err(Self::Error)` item is emitted from the stream, then panic on further calls to
79 /// this stream's `try_poll_next` method, and also implement
80 /// [FusedStream](futures::stream::FusedStream) for this stream (even if the current stream
81 /// doesn't actually implement `FusedStream`).
82 ///
83 fn fuse_on_fail(self) -> FuseOnFail<Self> {
84 FuseOnFail::new(self)
85 }
86
87 ///
88 /// Given some initial value of a type `T`, and some function which accepts `&mut T` and
89 /// `Self::Ok` and returns a `Future<Output=Result<(), Self::Error>>`, this stream can be
90 /// converted into a `Future<Output=Result<T, Self::Error>>`.
91 ///
92 /// The purpose of this method is basically "fold, but with mutable references, instead of
93 /// moving the value."
94 ///
95 /// Most implementations will not actually need to return a `Future` in the handler function,
96 /// but it is required to be a `Future` to support rare use-cases where async code must be
97 /// executed to update the `T`. In the common case, you can just use an `async move` block
98 /// to turn your non-async code into a `Future`, or you can use `futures::future::ready` to
99 /// construct a ready `Future` returning `Ok(())`.
100 ///
101 /// If the source stream ever emits an `Err(Self::Error)` item, then that causes this future
102 /// to immediately emit that same message. Otherwise, the returned future completes when
103 /// the stream completes.
104 ///
105 /// If the stream emits no items, then the initial value of `T` passed as the first parameter
106 /// to this method is emitted as `Ok(T)`.
107 ///
108 fn try_fold_mut<T, F, Fut>(self, initial: T, handler: F) -> TryFoldMut<Self, T, F, Fut>
109 where
110 Self: FusedStream,
111 F: FnMut(&mut T, Self::Ok) -> Fut,
112 Fut: TryFuture<Ok = (), Error = Self::Error>,
113 {
114 TryFoldMut::new(self, initial, handler)
115 }
116}
117
118impl<T> JTryStreamExt for T where T: TryStream + Sized {}
119
120///
121/// Extensions to the [`Stream`](futures::Stream) type which aren't already covered in
122/// [`StreamExt`](futures::StreamExt).
123///
124/// This is implemented using a blanket impl for all `Stream` implementors.
125///
126pub trait JStreamExt: Stream + Sized {
127 ///
128 /// Given some stream where the item is `Hash`, return a stream which only emits the unique
129 /// items emitted by the source stream.
130 ///
131 /// You can think of this as "de-duplicating" this stream. Only the first instance of every
132 /// unique item will be emitted.
133 ///
134 /// This is implemented by computing and storing the hash (a `u64` value) in a `HashSet` for
135 /// each item emitted by the stream.
136 ///
137 fn dedup(self) -> DedupStream<Self>
138 where
139 Self::Item: Hash,
140 {
141 DedupStream::new(self)
142 }
143
144 ///
145 /// fold, but with mutable references.
146 ///
147 /// Turns this stream into a `Future<Output=T>`. You must provide some initial value of type `T`,
148 /// and some function which accepts `&mut T` and `Self::Item` and returns `Future<Output=()>`.
149 ///
150 /// After all items are emitted by this stream, the current value of `T` is emitted by the returned
151 /// future.
152 ///
153 /// If the stream emits no values, then the initial value of `T` is emitted.
154 ///
155 fn fold_mut<T, F, Fut>(self, initial: T, handler: F) -> FoldMut<Self, T, F, Fut>
156 where
157 Self: FusedStream,
158 F: FnMut(&mut T, Self::Item) -> Fut,
159 Fut: Future<Output = ()>,
160 {
161 FoldMut::new(self, initial, handler)
162 }
163
164 ///
165 /// Turn this [`Stream`](futures::Stream) into a [`Future`](futures::Future) which gives the
166 /// first item emitted by this stream (in the form of an `Option`, because the stream doesn't
167 /// necessarily have to emit anything).
168 ///
169 fn first(self) -> StreamNth<Self> {
170 StreamNth::first(self)
171 }
172
173 ///
174 /// Turn this [`Stream`](futures::Stream) into a [`Future`](futures::Future) which gives the
175 /// nth item emitted by this stream (in the form of an `Option`, because the stream doesn't
176 /// have to emit enough items to reach the index `n`).
177 ///
178 /// It will only emit exactly the `n`'th item. If the stream completes before the `n`th item
179 /// is reached, then the future will emit a value of `None`.
180 ///
181 fn nth(self, index: usize) -> StreamNth<Self> {
182 StreamNth::new(self, index)
183 }
184}
185
186impl<T> JStreamExt for T where T: Stream + Sized {}