pub trait StreamMore: Stream {
// Provided methods
fn kmerge_by<'a, F>(self, first: F) -> KMerge<'a, FnCmp<F>, Self::Item>
where Self: Sized + Send + 'a,
F: Fn(&Self::Item, &Self::Item) -> bool { ... }
fn kmerge_by_cmp<'a, C>(self, cmp: C) -> KMerge<'a, C, Self::Item>
where Self: Sized + Send + 'a,
C: Compare<Self::Item> { ... }
fn kmerge_max<'a>(self) -> KMerge<'a, Descending, Self::Item>
where Self: Sized + Send + 'a,
Self::Item: Ord { ... }
fn kmerge_min<'a>(self) -> KMerge<'a, Ascending, Self::Item>
where Self: Sized + Send + 'a,
Self::Item: Ord { ... }
fn coalesce<'a, F>(self, f: F) -> Coalesce<'a, Self::Item, F>
where Self: Sized + Send + 'a,
F: FnMut(Self::Item, Self::Item) -> Result<Self::Item, (Self::Item, Self::Item)> { ... }
}
Expand description
Provide more methods for Stream
.
Provided Methods§
Sourcefn kmerge_by<'a, F>(self, first: F) -> KMerge<'a, FnCmp<F>, Self::Item>
fn kmerge_by<'a, F>(self, first: F) -> KMerge<'a, FnCmp<F>, Self::Item>
Create a k-way merge Stream
that flattens Stream
s by merging them according
to the given closure first()
.
The closure first()
is called with two elements a
, b
and should return true
if a
is ordered before b
.
If all base Stream
s are sorted according to first()
, the result is sorted.
§Example
Sort merge two streams in ascending order:
use futures::stream::iter;
use futures::executor::block_on;
let m = iter([1,3]).kmerge_by(|a,b| a < b).merge(iter([2,4]));
let got = block_on(m.collect::<Vec<u64>>());
assert_eq!(vec![1, 2, 3, 4], got);
Sourcefn kmerge_by_cmp<'a, C>(self, cmp: C) -> KMerge<'a, C, Self::Item>
fn kmerge_by_cmp<'a, C>(self, cmp: C) -> KMerge<'a, C, Self::Item>
Create a k-way merge stream, which chooses the item from the streams by a comparator
Compare
.
If comparator::compare(a,b)
returns Ordering::Greater
, a
will be chosen first over
b
, where a
and b
are next item from different streams.
§Example
Sort merge two streams in ascending order:
use futures::stream::iter;
use futures::executor::block_on;
use stream_more::comparators::Ascending;
let m = iter([1,3]).kmerge_by_cmp(Ascending).merge(iter([2,4]));
let got = block_on(m.collect::<Vec<u64>>());
assert_eq!(vec![1, 2, 3, 4], got);
Sourcefn kmerge_max<'a>(self) -> KMerge<'a, Descending, Self::Item>
fn kmerge_max<'a>(self) -> KMerge<'a, Descending, Self::Item>
Convert this stream to a KMerge
streams which merge streams by choosing the maximum
item from the streams, behaving like a max-heap.
§Example
use futures::stream::iter;
let m = iter([3,1]).kmerge_max().merge(iter([4,2])).merge(iter([5]));
let got = m.collect::<Vec<u64>>().await;
assert_eq!(vec![5, 4, 3, 2, 1], got);
Sourcefn kmerge_min<'a>(self) -> KMerge<'a, Ascending, Self::Item>
fn kmerge_min<'a>(self) -> KMerge<'a, Ascending, Self::Item>
Sourcefn coalesce<'a, F>(self, f: F) -> Coalesce<'a, Self::Item, F>
fn coalesce<'a, F>(self, f: F) -> Coalesce<'a, Self::Item, F>
Return a stream adaptor that uses the passed-in closure to optionally merge together consecutive items.
The closure f
is passed two items previous
and current
and may return either:
- (1)
Ok(combined)
to merge the two values or - (2)
Err((previous, current))
to indicate they can’t be merged.
In (2), the value previous
is emitted.
Either (1) combined
or (2) current
becomes the previous value when coalesce continues
with the next pair of items to merge. The value that remains at the end is also
emitted.
The stream item type is Self::Item
.
use futures::stream::iter;
// sum same-sign runs together
let got = iter(vec![-1, -2, -3, 3, 1, 0, -1])
.coalesce(|x, y|
if x * y >= 0 {
Ok(x + y)
} else {
Err((x, y))
})
.collect::<Vec<_>>().await;
assert_eq!(vec![-6, 4, -1], got);