stream_more/stream_more/
mod.rs

1use comparators::Ascending;
2use comparators::Descending;
3use comparators::FnCmp;
4use compare::Compare;
5use futures::Stream;
6
7use crate::stream_more::coalesce::Coalesce;
8use crate::stream_more::kmerge::KMerge;
9
10pub mod coalesce;
11pub mod comparators;
12pub mod kmerge;
13pub mod peeked;
14
15/// Provide more methods for [`Stream`].
16pub trait StreamMore: Stream {
17    /// Create a k-way merge `Stream` that flattens `Stream`s by merging them according
18    /// to the given closure `first()`.
19    ///
20    /// The closure `first()` is called with two elements `a`, `b` and should return `true` if `a`
21    /// is ordered before `b`.
22    ///
23    /// If all base `Stream`s are sorted according to `first()`, the result is sorted.
24    ///
25    /// # Example
26    ///
27    /// Sort merge two streams in ascending order:
28    /// ```
29    /// use futures::stream::iter;
30    /// use futures::executor::block_on;
31    /// # use futures::StreamExt;
32    /// # use crate::stream_more::StreamMore;
33    ///
34    /// let m = iter([1,3]).kmerge_by(|a,b| a < b).merge(iter([2,4]));
35    /// let got = block_on(m.collect::<Vec<u64>>());
36    /// assert_eq!(vec![1, 2, 3, 4], got);
37    /// ```
38    fn kmerge_by<'a, F>(self, first: F) -> KMerge<'a, FnCmp<F>, Self::Item>
39    where
40        Self: Sized + Send + 'a,
41        F: Fn(&Self::Item, &Self::Item) -> bool,
42    {
43        KMerge::by(first).merge(self)
44    }
45
46    /// Create a k-way merge stream, which chooses the item from the streams by a comparator
47    /// [`Compare`].
48    ///
49    /// If `comparator::compare(a,b)` returns [`Ordering::Greater`], `a` will be chosen first over
50    /// `b`, where `a` and `b` are next item from different streams.
51    ///
52    /// # Example
53    ///
54    /// Sort merge two streams in ascending order:
55    /// ```
56    /// use futures::stream::iter;
57    /// use futures::executor::block_on;
58    /// use stream_more::comparators::Ascending;
59    /// # use futures::StreamExt;
60    /// # use crate::stream_more::StreamMore;
61    ///
62    /// let m = iter([1,3]).kmerge_by_cmp(Ascending).merge(iter([2,4]));
63    /// let got = block_on(m.collect::<Vec<u64>>());
64    /// assert_eq!(vec![1, 2, 3, 4], got);
65    /// ```
66    ///
67    /// [`Ordering::Greater`]: `std::cmp::Ordering`
68    fn kmerge_by_cmp<'a, C>(self, cmp: C) -> KMerge<'a, C, Self::Item>
69    where
70        Self: Sized + Send + 'a,
71        C: Compare<Self::Item>,
72    {
73        KMerge::by_cmp(cmp).merge(self)
74    }
75
76    /// Convert this stream to a [`KMerge`] streams which merge streams by choosing the maximum
77    /// item from the streams, behaving like a max-heap.
78    ///
79    /// # Example
80    ///
81    /// ```
82    /// use futures::stream::iter;
83    /// # use futures::StreamExt;
84    /// # use crate::stream_more::StreamMore;
85    /// # futures::executor::block_on(async {
86    /// let m = iter([3,1]).kmerge_max().merge(iter([4,2])).merge(iter([5]));
87    /// let got = m.collect::<Vec<u64>>().await;
88    /// assert_eq!(vec![5, 4, 3, 2, 1], got);
89    /// # });
90    /// ```
91    fn kmerge_max<'a>(self) -> KMerge<'a, Descending, Self::Item>
92    where
93        Self: Sized + Send + 'a,
94        Self::Item: Ord,
95    {
96        KMerge::max().merge(self)
97    }
98
99    /// Convert this stream to a [`KMerge`] streams which merge streams by choosing the minimum
100    /// item from the streams, behaving like a min-heap.
101    ///
102    /// # Example
103    ///
104    /// ```
105    /// use futures::stream::iter;
106    /// # use futures::StreamExt;
107    /// # use crate::stream_more::StreamMore;
108    /// # futures::executor::block_on(async {
109    /// let m = iter([3,1]).kmerge_min().merge(iter([4,2]));
110    /// let got = m.collect::<Vec<u64>>().await;
111    /// assert_eq!(vec![3,1,4,2], got);
112    /// # });
113    /// ```
114    fn kmerge_min<'a>(self) -> KMerge<'a, Ascending, Self::Item>
115    where
116        Self: Sized + Send + 'a,
117        Self::Item: Ord,
118    {
119        KMerge::min().merge(self)
120    }
121
122    /// Return a stream adaptor that uses the passed-in closure to optionally merge together
123    /// consecutive items.
124    ///
125    /// The closure `f` is passed two items `previous` and `current` and may return either:
126    /// - (1) `Ok(combined)` to merge the two values or
127    /// - (2) `Err((previous, current))` to indicate they can’t be merged.
128    ///
129    /// In (2), the value `previous` is emitted.
130    /// Either (1) `combined` or (2) `current` becomes the previous value when coalesce continues
131    /// with the next pair of items to merge. The value that remains at the end is also
132    /// emitted.
133    ///
134    /// The stream item type is `Self::Item`.
135    ///
136    /// ```
137    /// use futures::stream::iter;
138    /// # use futures::StreamExt;
139    /// # use crate::stream_more::StreamMore;
140    /// # futures::executor::block_on(async {
141    ///
142    /// // sum same-sign runs together
143    /// let got = iter(vec![-1, -2, -3, 3, 1, 0, -1])
144    ///             .coalesce(|x, y|
145    ///                 if x * y >= 0 {
146    ///                     Ok(x + y)
147    ///                 } else {
148    ///                     Err((x, y))
149    ///                 })
150    ///             .collect::<Vec<_>>().await;
151    /// assert_eq!(vec![-6, 4, -1], got);
152    /// # });
153    /// ```
154    fn coalesce<'a, F>(self, f: F) -> Coalesce<'a, Self::Item, F>
155    where
156        Self: Sized + Send + 'a,
157        F: FnMut(Self::Item, Self::Item) -> Result<Self::Item, (Self::Item, Self::Item)>,
158    {
159        Coalesce::new(Box::pin(self), f)
160    }
161}
162
163impl<T: ?Sized> StreamMore for T where T: Stream {}