Trait stream_more::StreamMore
source · pub trait StreamMore: Stream {
// Provided methods
fn kmerge_by<'a, F>(self, first: F) -> KMerge<'a, FnCmp<F>, Self::Item>
where Self: Sized + Send + 'a,
F: Fn(&Self::Item, &Self::Item) -> bool { ... }
fn kmerge_by_cmp<'a, C>(self, cmp: C) -> KMerge<'a, C, Self::Item>
where Self: Sized + Send + 'a,
C: Compare<Self::Item> { ... }
fn kmerge_max<'a>(self) -> KMerge<'a, Descending, Self::Item>
where Self: Sized + Send + 'a,
Self::Item: Ord { ... }
fn kmerge_min<'a>(self) -> KMerge<'a, Ascending, Self::Item>
where Self: Sized + Send + 'a,
Self::Item: Ord { ... }
fn coalesce<'a, F>(self, f: F) -> Coalesce<'a, Self::Item, F>
where Self: Sized + Send + 'a,
F: FnMut(Self::Item, Self::Item) -> Result<Self::Item, (Self::Item, Self::Item)> { ... }
}
Expand description
Provide more methods for [Stream
].
Provided Methods§
sourcefn kmerge_by<'a, F>(self, first: F) -> KMerge<'a, FnCmp<F>, Self::Item>where
Self: Sized + Send + 'a,
F: Fn(&Self::Item, &Self::Item) -> bool,
fn kmerge_by<'a, F>(self, first: F) -> KMerge<'a, FnCmp<F>, Self::Item>where Self: Sized + Send + 'a, F: Fn(&Self::Item, &Self::Item) -> bool,
Create a k-way merge Stream
that flattens Stream
s by merging them according
to the given closure first()
.
The closure first()
is called with two elements a
, b
and should return true
if a
is ordered before b
.
If all base Stream
s are sorted according to first()
, the result is sorted.
Example
Sort merge two streams in ascending order:
use futures::stream::iter;
use futures::executor::block_on;
let m = iter([1,3]).kmerge_by(|a,b| a < b).merge(iter([2,4]));
let got = block_on(m.collect::<Vec<u64>>());
assert_eq!(vec![1, 2, 3, 4], got);
sourcefn kmerge_by_cmp<'a, C>(self, cmp: C) -> KMerge<'a, C, Self::Item>where
Self: Sized + Send + 'a,
C: Compare<Self::Item>,
fn kmerge_by_cmp<'a, C>(self, cmp: C) -> KMerge<'a, C, Self::Item>where Self: Sized + Send + 'a, C: Compare<Self::Item>,
Create a k-way merge stream, which chooses the item from the streams by a comparator
Compare
.
If comparator::compare(a,b)
returns Ordering::Greater
, a
will be chosen first over
b
, where a
and b
are next item from different streams.
Example
Sort merge two streams in ascending order:
use futures::stream::iter;
use futures::executor::block_on;
use stream_more::comparators::Ascending;
let m = iter([1,3]).kmerge_by_cmp(Ascending).merge(iter([2,4]));
let got = block_on(m.collect::<Vec<u64>>());
assert_eq!(vec![1, 2, 3, 4], got);
sourcefn kmerge_max<'a>(self) -> KMerge<'a, Descending, Self::Item>where
Self: Sized + Send + 'a,
Self::Item: Ord,
fn kmerge_max<'a>(self) -> KMerge<'a, Descending, Self::Item>where Self: Sized + Send + 'a, Self::Item: Ord,
Convert this stream to a KMerge
streams which merge streams by choosing the maximum
item from the streams, behaving like a max-heap.
Example
use futures::stream::iter;
let m = iter([3,1]).kmerge_max().merge(iter([4,2])).merge(iter([5]));
let got = m.collect::<Vec<u64>>().await;
assert_eq!(vec![5, 4, 3, 2, 1], got);
sourcefn kmerge_min<'a>(self) -> KMerge<'a, Ascending, Self::Item>where
Self: Sized + Send + 'a,
Self::Item: Ord,
fn kmerge_min<'a>(self) -> KMerge<'a, Ascending, Self::Item>where Self: Sized + Send + 'a, Self::Item: Ord,
sourcefn coalesce<'a, F>(self, f: F) -> Coalesce<'a, Self::Item, F>where
Self: Sized + Send + 'a,
F: FnMut(Self::Item, Self::Item) -> Result<Self::Item, (Self::Item, Self::Item)>,
fn coalesce<'a, F>(self, f: F) -> Coalesce<'a, Self::Item, F>where Self: Sized + Send + 'a, F: FnMut(Self::Item, Self::Item) -> Result<Self::Item, (Self::Item, Self::Item)>,
Return a stream adaptor that uses the passed-in closure to optionally merge together consecutive items.
The closure f
is passed two items previous
and current
and may return either:
- (1)
Ok(combined)
to merge the two values or - (2)
Err((previous, current))
to indicate they can’t be merged.
In (2), the value previous
is emitted.
Either (1) combined
or (2) current
becomes the previous value when coalesce continues
with the next pair of items to merge. The value that remains at the end is also
emitted.
The stream item type is Self::Item
.
use futures::stream::iter;
// sum same-sign runs together
let got = iter(vec![-1, -2, -3, 3, 1, 0, -1])
.coalesce(|x, y|
if x * y >= 0 {
Ok(x + y)
} else {
Err((x, y))
})
.collect::<Vec<_>>().await;
assert_eq!(vec![-6, 4, -1], got);