jstream_ext/
ext.rs

1use crate::ops::*;
2use futures::stream::FusedStream;
3use futures::{Future, Stream, TryFuture, TryStream};
4use std::hash::Hash;
5
6///
7/// Extensions to the [`TryStream`](futures::TryStream) type which aren't already covered by the
8/// included [`TryStreamExt`](futures::TryStreamExt).
9///
10/// This is implemented using a blanket impl for all `TryStream` implementors (which means any
11/// [`Stream`](futures::Stream) that emits a `Result`).
12///
13pub trait JTryStreamExt: TryStream + Sized {
14    ///
15    /// Turn this [`TryStream`](futures::TryStream) into a [`TryFuture`](futures::TryFuture) which
16    /// gives the first item emitted by this stream (in the form of an `Option`, because the stream
17    /// doesn't necessarily have to emit anything).
18    ///
19    fn try_first(self) -> TryStreamNth<Self> {
20        TryStreamNth::first(self)
21    }
22
23    ///
24    /// Turn this [`TryStream`](futures::TryStream) into a [`TryFuture`](futures::TryFuture) which
25    /// gives the nth item emitted by this stream (in the form of an `Option`, because the stream
26    /// doesn't have to emit enough items to reach the index `n`).
27    ///
28    /// It will only emit exactly the `n`'th item. If the stream completes before the `n`th item
29    /// is reached, then the future will emit a value of `None`.
30    ///
31    /// Any errors encountered while reaching `n`th item will be immediately returned.
32    ///
33    /// The future emits a value of type `Result<Option<Self::Ok>, Self::Error>`
34    ///
35    fn try_nth(self, n: usize) -> TryStreamNth<Self> {
36        TryStreamNth::new(self, n)
37    }
38
39    ///
40    /// filter+map on the `Self::Ok` value of this stream.
41    ///
42    /// This stream, with the item type `Result<Self::Ok, Self::Error>`, can be converted
43    /// to a stream which skips some values of `Self::Ok` and otherwise transforms non-skipped
44    /// values to a new type `T`, giving a new stream with item type `Result<T, Self::Error>`.
45    ///
46    /// If the current stream emits an `Ok(Self::Ok)` value, then the function passed to this
47    /// method will be called to transform it to an `Ok(T)` message.
48    ///
49    /// If the current stream emits some error by emitting the message `Err(Self::Error)`, then
50    /// this message will be passed straight-through.
51    ///
52    fn try_filter_map_ok<F, R>(self, predicate: F) -> TryFilterMapOk<Self, F, R>
53    where
54        F: FnMut(Self::Ok) -> Option<R>,
55    {
56        TryFilterMapOk::new(self, predicate)
57    }
58
59    ///
60    /// Given some stream where the `Self::Ok` type is `Hash`, then this method will allow you
61    /// to "de-duplicate" that stream.
62    ///
63    /// This is implemented by storing the hash (a `u64` value) of the `Self::Ok` messages in a
64    /// `HashSet<u64>` internally. If an item is emitted, it's hash is computed, and if the hash
65    /// has been seen before, then the item is skipped.
66    ///
67    /// Any error items will not be checked for duplication, and will simply be emitted by the
68    /// modified "de-duplicated" stream.
69    ///
70    fn try_dedup(self) -> TryDedupStream<Self>
71    where
72        Self::Ok: Hash,
73    {
74        TryDedupStream::new(self)
75    }
76
77    ///
78    /// If an `Err(Self::Error)` item is emitted from the stream, then panic on further calls to
79    /// this stream's `try_poll_next` method, and also implement
80    /// [FusedStream](futures::stream::FusedStream) for this stream (even if the current stream
81    /// doesn't actually implement `FusedStream`).
82    ///
83    fn fuse_on_fail(self) -> FuseOnFail<Self> {
84        FuseOnFail::new(self)
85    }
86
87    ///
88    /// Given some initial value of a type `T`, and some function which accepts `&mut T` and
89    /// `Self::Ok` and returns a `Future<Output=Result<(), Self::Error>>`, this stream can be
90    /// converted into a `Future<Output=Result<T, Self::Error>>`.
91    ///
92    /// The purpose of this method is basically "fold, but with mutable references, instead of
93    /// moving the value."
94    ///
95    /// Most implementations will not actually need to return a `Future` in the handler function,
96    /// but it is required to be a `Future` to support rare use-cases where async code must be
97    /// executed to update the `T`. In the common case, you can just use an `async move` block
98    /// to turn your non-async code into a `Future`, or you can use `futures::future::ready` to
99    /// construct a ready `Future` returning `Ok(())`.
100    ///
101    /// If the source stream ever emits an `Err(Self::Error)` item, then that causes this future
102    /// to immediately emit that same message. Otherwise, the returned future completes when
103    /// the stream completes.
104    ///
105    /// If the stream emits no items, then the initial value of `T` passed as the first parameter
106    /// to this method is emitted as `Ok(T)`.
107    ///
108    fn try_fold_mut<T, F, Fut>(self, initial: T, handler: F) -> TryFoldMut<Self, T, F, Fut>
109    where
110        Self: FusedStream,
111        F: FnMut(&mut T, Self::Ok) -> Fut,
112        Fut: TryFuture<Ok = (), Error = Self::Error>,
113    {
114        TryFoldMut::new(self, initial, handler)
115    }
116}
117
118impl<T> JTryStreamExt for T where T: TryStream + Sized {}
119
120///
121/// Extensions to the [`Stream`](futures::Stream) type which aren't already covered in
122/// [`StreamExt`](futures::StreamExt).
123///
124/// This is implemented using a blanket impl for all `Stream` implementors.
125///
126pub trait JStreamExt: Stream + Sized {
127    ///
128    /// Given some stream where the item is `Hash`, return a stream which only emits the unique
129    /// items emitted by the source stream.
130    ///
131    /// You can think of this as "de-duplicating" this stream. Only the first instance of every
132    /// unique item will be emitted.
133    ///
134    /// This is implemented by computing and storing the hash (a `u64` value) in a `HashSet` for
135    /// each item emitted by the stream.
136    ///
137    fn dedup(self) -> DedupStream<Self>
138    where
139        Self::Item: Hash,
140    {
141        DedupStream::new(self)
142    }
143
144    ///
145    /// fold, but with mutable references.
146    ///
147    /// Turns this stream into a `Future<Output=T>`. You must provide some initial value of type `T`,
148    /// and some function which accepts `&mut T` and `Self::Item` and returns `Future<Output=()>`.
149    ///
150    /// After all items are emitted by this stream, the current value of `T` is emitted by the returned
151    /// future.
152    ///
153    /// If the stream emits no values, then the initial value of `T` is emitted.
154    ///
155    fn fold_mut<T, F, Fut>(self, initial: T, handler: F) -> FoldMut<Self, T, F, Fut>
156    where
157        Self: FusedStream,
158        F: FnMut(&mut T, Self::Item) -> Fut,
159        Fut: Future<Output = ()>,
160    {
161        FoldMut::new(self, initial, handler)
162    }
163
164    ///
165    /// Turn this [`Stream`](futures::Stream) into a [`Future`](futures::Future) which gives the
166    /// first item emitted by this stream (in the form of an `Option`, because the stream doesn't
167    /// necessarily have to emit anything).
168    ///
169    fn first(self) -> StreamNth<Self> {
170        StreamNth::first(self)
171    }
172
173    ///
174    /// Turn this [`Stream`](futures::Stream) into a [`Future`](futures::Future) which gives the
175    /// nth item emitted by this stream (in the form of an `Option`, because the stream doesn't
176    /// have to emit enough items to reach the index `n`).
177    ///
178    /// It will only emit exactly the `n`'th item. If the stream completes before the `n`th item
179    /// is reached, then the future will emit a value of `None`.
180    ///
181    fn nth(self, index: usize) -> StreamNth<Self> {
182        StreamNth::new(self, index)
183    }
184}
185
186impl<T> JStreamExt for T where T: Stream + Sized {}