pub trait StreamMore: Stream {
// Provided methods
fn kmerge_by<'a, F>(self, first: F) -> KMerge<'a, FnCmp<F>, Self::Item>
where Self: Sized + Send + 'a,
F: Fn(&Self::Item, &Self::Item) -> bool { ... }
fn kmerge_by_cmp<'a, C>(self, cmp: C) -> KMerge<'a, C, Self::Item>
where Self: Sized + Send + 'a,
C: Compare<Self::Item> { ... }
fn kmerge_max<'a>(self) -> KMerge<'a, Descending, Self::Item>
where Self: Sized + Send + 'a,
Self::Item: Ord { ... }
fn kmerge_min<'a>(self) -> KMerge<'a, Ascending, Self::Item>
where Self: Sized + Send + 'a,
Self::Item: Ord { ... }
fn coalesce<'a, F>(self, f: F) -> Coalesce<'a, Self::Item, F>
where Self: Sized + Send + 'a,
F: FnMut(Self::Item, Self::Item) -> Result<Self::Item, (Self::Item, Self::Item)> { ... }
}Expand description
Provide more methods for Stream.
Provided Methods§
Sourcefn kmerge_by<'a, F>(self, first: F) -> KMerge<'a, FnCmp<F>, Self::Item>
fn kmerge_by<'a, F>(self, first: F) -> KMerge<'a, FnCmp<F>, Self::Item>
Create a k-way merge Stream that flattens Streams by merging them according
to the given closure first().
The closure first() is called with two elements a, b and should return true if a
is ordered before b.
If all base Streams are sorted according to first(), the result is sorted.
§Example
Sort merge two streams in ascending order:
use futures::stream::iter;
use futures::executor::block_on;
let m = iter([1,3]).kmerge_by(|a,b| a < b).merge(iter([2,4]));
let got = block_on(m.collect::<Vec<u64>>());
assert_eq!(vec![1, 2, 3, 4], got);Sourcefn kmerge_by_cmp<'a, C>(self, cmp: C) -> KMerge<'a, C, Self::Item>
fn kmerge_by_cmp<'a, C>(self, cmp: C) -> KMerge<'a, C, Self::Item>
Create a k-way merge stream, which chooses the item from the streams by a comparator
Compare.
If comparator::compare(a,b) returns Ordering::Greater, a will be chosen first over
b, where a and b are next item from different streams.
§Example
Sort merge two streams in ascending order:
use futures::stream::iter;
use futures::executor::block_on;
use stream_more::comparators::Ascending;
let m = iter([1,3]).kmerge_by_cmp(Ascending).merge(iter([2,4]));
let got = block_on(m.collect::<Vec<u64>>());
assert_eq!(vec![1, 2, 3, 4], got);Sourcefn kmerge_max<'a>(self) -> KMerge<'a, Descending, Self::Item>
fn kmerge_max<'a>(self) -> KMerge<'a, Descending, Self::Item>
Convert this stream to a KMerge streams which merge streams by choosing the maximum
item from the streams, behaving like a max-heap.
§Example
use futures::stream::iter;
let m = iter([3,1]).kmerge_max().merge(iter([4,2])).merge(iter([5]));
let got = m.collect::<Vec<u64>>().await;
assert_eq!(vec![5, 4, 3, 2, 1], got);Sourcefn kmerge_min<'a>(self) -> KMerge<'a, Ascending, Self::Item>
fn kmerge_min<'a>(self) -> KMerge<'a, Ascending, Self::Item>
Sourcefn coalesce<'a, F>(self, f: F) -> Coalesce<'a, Self::Item, F>
fn coalesce<'a, F>(self, f: F) -> Coalesce<'a, Self::Item, F>
Return a stream adaptor that uses the passed-in closure to optionally merge together consecutive items.
The closure f is passed two items previous and current and may return either:
- (1)
Ok(combined)to merge the two values or - (2)
Err((previous, current))to indicate they can’t be merged.
In (2), the value previous is emitted.
Either (1) combined or (2) current becomes the previous value when coalesce continues
with the next pair of items to merge. The value that remains at the end is also
emitted.
The stream item type is Self::Item.
use futures::stream::iter;
// sum same-sign runs together
let got = iter(vec![-1, -2, -3, 3, 1, 0, -1])
.coalesce(|x, y|
if x * y >= 0 {
Ok(x + y)
} else {
Err((x, y))
})
.collect::<Vec<_>>().await;
assert_eq!(vec![-6, 4, -1], got);