1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
use comparators::Ascending;
use comparators::Descending;
use comparators::FnCmp;
use compare::Compare;
use futures::Stream;
use crate::stream_more::coalesce::Coalesce;
use crate::stream_more::kmerge::KMerge;
pub mod coalesce;
pub mod comparators;
pub mod kmerge;
pub mod peeked;
/// Provide more methods for [`Stream`].
pub trait StreamMore: Stream {
/// Create a k-way merge `Stream` that flattens `Stream`s by merging them according
/// to the given closure `first()`.
///
/// The closure `first()` is called with two elements `a`, `b` and should return `true` if `a`
/// is ordered before `b`.
///
/// If all base `Stream`s are sorted according to `first()`, the result is sorted.
///
/// # Example
///
/// Sort merge two streams in ascending order:
/// ```
/// use futures::stream::iter;
/// use futures::executor::block_on;
/// # use futures::StreamExt;
/// # use crate::stream_more::StreamMore;
///
/// let m = iter([1,3]).kmerge_by(|a,b| a < b).merge(iter([2,4]));
/// let got = block_on(m.collect::<Vec<u64>>());
/// assert_eq!(vec![1, 2, 3, 4], got);
/// ```
fn kmerge_by<'a, F>(self, first: F) -> KMerge<'a, FnCmp<F>, Self::Item>
where
Self: Sized + Send + 'a,
F: Fn(&Self::Item, &Self::Item) -> bool,
{
KMerge::by(first).merge(self)
}
/// Create a k-way merge stream, which chooses the item from the streams by a comparator
/// [`Compare`].
///
/// If `comparator::compare(a,b)` returns [`Ordering::Greater`], `a` will be chosen first over
/// `b`, where `a` and `b` are next item from different streams.
///
/// # Example
///
/// Sort merge two streams in ascending order:
/// ```
/// use futures::stream::iter;
/// use futures::executor::block_on;
/// use stream_more::comparators::Ascending;
/// # use futures::StreamExt;
/// # use crate::stream_more::StreamMore;
///
/// let m = iter([1,3]).kmerge_by_cmp(Ascending).merge(iter([2,4]));
/// let got = block_on(m.collect::<Vec<u64>>());
/// assert_eq!(vec![1, 2, 3, 4], got);
/// ```
///
/// [`Ordering::Greater`]: `std::cmp::Ordering`
fn kmerge_by_cmp<'a, C>(self, cmp: C) -> KMerge<'a, C, Self::Item>
where
Self: Sized + Send + 'a,
C: Compare<Self::Item>,
{
KMerge::by_cmp(cmp).merge(self)
}
/// Convert this stream to a [`KMerge`] streams which merge streams by choosing the maximum
/// item from the streams, behaving like a max-heap.
///
/// # Example
///
/// ```
/// use futures::stream::iter;
/// # use futures::StreamExt;
/// # use crate::stream_more::StreamMore;
/// # futures::executor::block_on(async {
/// let m = iter([3,1]).kmerge_max().merge(iter([4,2])).merge(iter([5]));
/// let got = m.collect::<Vec<u64>>().await;
/// assert_eq!(vec![5, 4, 3, 2, 1], got);
/// # });
/// ```
fn kmerge_max<'a>(self) -> KMerge<'a, Descending, Self::Item>
where
Self: Sized + Send + 'a,
Self::Item: Ord,
{
KMerge::max().merge(self)
}
/// Convert this stream to a [`KMerge`] streams which merge streams by choosing the minimum
/// item from the streams, behaving like a min-heap.
///
/// # Example
///
/// ```
/// use futures::stream::iter;
/// # use futures::StreamExt;
/// # use crate::stream_more::StreamMore;
/// # futures::executor::block_on(async {
/// let m = iter([3,1]).kmerge_min().merge(iter([4,2]));
/// let got = m.collect::<Vec<u64>>().await;
/// assert_eq!(vec![3,1,4,2], got);
/// # });
/// ```
fn kmerge_min<'a>(self) -> KMerge<'a, Ascending, Self::Item>
where
Self: Sized + Send + 'a,
Self::Item: Ord,
{
KMerge::min().merge(self)
}
/// Return a stream adaptor that uses the passed-in closure to optionally merge together
/// consecutive items.
///
/// The closure `f` is passed two items `previous` and `current` and may return either:
/// - (1) `Ok(combined)` to merge the two values or
/// - (2) `Err((previous, current))` to indicate they can’t be merged.
///
/// In (2), the value `previous` is emitted.
/// Either (1) `combined` or (2) `current` becomes the previous value when coalesce continues
/// with the next pair of items to merge. The value that remains at the end is also
/// emitted.
///
/// The stream item type is `Self::Item`.
///
/// ```
/// use futures::stream::iter;
/// # use futures::StreamExt;
/// # use crate::stream_more::StreamMore;
/// # futures::executor::block_on(async {
///
/// // sum same-sign runs together
/// let got = iter(vec![-1, -2, -3, 3, 1, 0, -1])
/// .coalesce(|x, y|
/// if x * y >= 0 {
/// Ok(x + y)
/// } else {
/// Err((x, y))
/// })
/// .collect::<Vec<_>>().await;
/// assert_eq!(vec![-6, 4, -1], got);
/// # });
/// ```
fn coalesce<'a, F>(self, f: F) -> Coalesce<'a, Self::Item, F>
where
Self: Sized + Send + 'a,
F: FnMut(Self::Item, Self::Item) -> Result<Self::Item, (Self::Item, Self::Item)>,
{
Coalesce::new(Box::pin(self), f)
}
}
impl<T: ?Sized> StreamMore for T where T: Stream {}