stream_more/stream_more/mod.rs
1use comparators::Ascending;
2use comparators::Descending;
3use comparators::FnCmp;
4use compare::Compare;
5use futures::Stream;
6
7use crate::stream_more::coalesce::Coalesce;
8use crate::stream_more::kmerge::KMerge;
9
10pub mod coalesce;
11pub mod comparators;
12pub mod kmerge;
13pub mod peeked;
14
15/// Provide more methods for [`Stream`].
16pub trait StreamMore: Stream {
17 /// Create a k-way merge `Stream` that flattens `Stream`s by merging them according
18 /// to the given closure `first()`.
19 ///
20 /// The closure `first()` is called with two elements `a`, `b` and should return `true` if `a`
21 /// is ordered before `b`.
22 ///
23 /// If all base `Stream`s are sorted according to `first()`, the result is sorted.
24 ///
25 /// # Example
26 ///
27 /// Sort merge two streams in ascending order:
28 /// ```
29 /// use futures::stream::iter;
30 /// use futures::executor::block_on;
31 /// # use futures::StreamExt;
32 /// # use crate::stream_more::StreamMore;
33 ///
34 /// let m = iter([1,3]).kmerge_by(|a,b| a < b).merge(iter([2,4]));
35 /// let got = block_on(m.collect::<Vec<u64>>());
36 /// assert_eq!(vec![1, 2, 3, 4], got);
37 /// ```
38 fn kmerge_by<'a, F>(self, first: F) -> KMerge<'a, FnCmp<F>, Self::Item>
39 where
40 Self: Sized + Send + 'a,
41 F: Fn(&Self::Item, &Self::Item) -> bool,
42 {
43 KMerge::by(first).merge(self)
44 }
45
46 /// Create a k-way merge stream, which chooses the item from the streams by a comparator
47 /// [`Compare`].
48 ///
49 /// If `comparator::compare(a,b)` returns [`Ordering::Greater`], `a` will be chosen first over
50 /// `b`, where `a` and `b` are next item from different streams.
51 ///
52 /// # Example
53 ///
54 /// Sort merge two streams in ascending order:
55 /// ```
56 /// use futures::stream::iter;
57 /// use futures::executor::block_on;
58 /// use stream_more::comparators::Ascending;
59 /// # use futures::StreamExt;
60 /// # use crate::stream_more::StreamMore;
61 ///
62 /// let m = iter([1,3]).kmerge_by_cmp(Ascending).merge(iter([2,4]));
63 /// let got = block_on(m.collect::<Vec<u64>>());
64 /// assert_eq!(vec![1, 2, 3, 4], got);
65 /// ```
66 ///
67 /// [`Ordering::Greater`]: `std::cmp::Ordering`
68 fn kmerge_by_cmp<'a, C>(self, cmp: C) -> KMerge<'a, C, Self::Item>
69 where
70 Self: Sized + Send + 'a,
71 C: Compare<Self::Item>,
72 {
73 KMerge::by_cmp(cmp).merge(self)
74 }
75
76 /// Convert this stream to a [`KMerge`] streams which merge streams by choosing the maximum
77 /// item from the streams, behaving like a max-heap.
78 ///
79 /// # Example
80 ///
81 /// ```
82 /// use futures::stream::iter;
83 /// # use futures::StreamExt;
84 /// # use crate::stream_more::StreamMore;
85 /// # futures::executor::block_on(async {
86 /// let m = iter([3,1]).kmerge_max().merge(iter([4,2])).merge(iter([5]));
87 /// let got = m.collect::<Vec<u64>>().await;
88 /// assert_eq!(vec![5, 4, 3, 2, 1], got);
89 /// # });
90 /// ```
91 fn kmerge_max<'a>(self) -> KMerge<'a, Descending, Self::Item>
92 where
93 Self: Sized + Send + 'a,
94 Self::Item: Ord,
95 {
96 KMerge::max().merge(self)
97 }
98
99 /// Convert this stream to a [`KMerge`] streams which merge streams by choosing the minimum
100 /// item from the streams, behaving like a min-heap.
101 ///
102 /// # Example
103 ///
104 /// ```
105 /// use futures::stream::iter;
106 /// # use futures::StreamExt;
107 /// # use crate::stream_more::StreamMore;
108 /// # futures::executor::block_on(async {
109 /// let m = iter([3,1]).kmerge_min().merge(iter([4,2]));
110 /// let got = m.collect::<Vec<u64>>().await;
111 /// assert_eq!(vec![3,1,4,2], got);
112 /// # });
113 /// ```
114 fn kmerge_min<'a>(self) -> KMerge<'a, Ascending, Self::Item>
115 where
116 Self: Sized + Send + 'a,
117 Self::Item: Ord,
118 {
119 KMerge::min().merge(self)
120 }
121
122 /// Return a stream adaptor that uses the passed-in closure to optionally merge together
123 /// consecutive items.
124 ///
125 /// The closure `f` is passed two items `previous` and `current` and may return either:
126 /// - (1) `Ok(combined)` to merge the two values or
127 /// - (2) `Err((previous, current))` to indicate they can’t be merged.
128 ///
129 /// In (2), the value `previous` is emitted.
130 /// Either (1) `combined` or (2) `current` becomes the previous value when coalesce continues
131 /// with the next pair of items to merge. The value that remains at the end is also
132 /// emitted.
133 ///
134 /// The stream item type is `Self::Item`.
135 ///
136 /// ```
137 /// use futures::stream::iter;
138 /// # use futures::StreamExt;
139 /// # use crate::stream_more::StreamMore;
140 /// # futures::executor::block_on(async {
141 ///
142 /// // sum same-sign runs together
143 /// let got = iter(vec![-1, -2, -3, 3, 1, 0, -1])
144 /// .coalesce(|x, y|
145 /// if x * y >= 0 {
146 /// Ok(x + y)
147 /// } else {
148 /// Err((x, y))
149 /// })
150 /// .collect::<Vec<_>>().await;
151 /// assert_eq!(vec![-6, 4, -1], got);
152 /// # });
153 /// ```
154 fn coalesce<'a, F>(self, f: F) -> Coalesce<'a, Self::Item, F>
155 where
156 Self: Sized + Send + 'a,
157 F: FnMut(Self::Item, Self::Item) -> Result<Self::Item, (Self::Item, Self::Item)>,
158 {
159 Coalesce::new(Box::pin(self), f)
160 }
161}
162
163impl<T: ?Sized> StreamMore for T where T: Stream {}