vortex_array/
search_sorted.rs

1use std::cmp::Ordering;
2use std::cmp::Ordering::{Equal, Greater, Less};
3use std::fmt::{Debug, Display, Formatter};
4use std::hint;
5
6use vortex_scalar::Scalar;
7
8use crate::Array;
9
10#[derive(Debug, Copy, Clone, Eq, PartialEq)]
11pub enum SearchSortedSide {
12    Left,
13    Right,
14}
15
16impl Display for SearchSortedSide {
17    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
18        match self {
19            SearchSortedSide::Left => write!(f, "left"),
20            SearchSortedSide::Right => write!(f, "right"),
21        }
22    }
23}
24
25/// Result of performing search_sorted on an Array
26#[derive(Debug, Copy, Clone, PartialEq, Eq)]
27pub enum SearchResult {
28    /// Result for a found element was found in the array and another one could be inserted at the given position
29    /// in the sorted order
30    Found(usize),
31
32    /// Result for an element not found, but that could be inserted at the given position
33    /// in the sorted order.
34    NotFound(usize),
35}
36
37impl SearchResult {
38    /// Convert search result to an index only if the value have been found
39    pub fn to_found(self) -> Option<usize> {
40        match self {
41            Self::Found(i) => Some(i),
42            Self::NotFound(_) => None,
43        }
44    }
45
46    /// Extract index out of search result regardless of whether the value have been found or not
47    pub fn to_index(self) -> usize {
48        match self {
49            Self::Found(i) => i,
50            Self::NotFound(i) => i,
51        }
52    }
53
54    /// Convert search result into an index suitable for searching array of offset indices, i.e. first element starts at 0.
55    ///
56    /// For example for a ChunkedArray with chunk offsets array [0, 3, 8, 10] you can use this method to
57    /// obtain index suitable for indexing into it after performing a search
58    pub fn to_offsets_index(self, len: usize, side: SearchSortedSide) -> usize {
59        match self {
60            SearchResult::Found(i) => {
61                if side == SearchSortedSide::Right || i == len {
62                    i.saturating_sub(1)
63                } else {
64                    i
65                }
66            }
67            SearchResult::NotFound(i) => i.saturating_sub(1),
68        }
69    }
70
71    /// Convert search result into an index suitable for searching array of end indices without 0 offset,
72    /// i.e. first element implicitly covers 0..0th-element range.
73    ///
74    /// For example for a RunEndArray with ends array [3, 8, 10], you can use this method to obtain index suitable for
75    /// indexing into it after performing a search
76    pub fn to_ends_index(self, len: usize) -> usize {
77        let idx = self.to_index();
78        if idx == len { idx - 1 } else { idx }
79    }
80}
81
82impl Display for SearchResult {
83    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
84        match self {
85            SearchResult::Found(i) => write!(f, "Found({i})"),
86            SearchResult::NotFound(i) => write!(f, "NotFound({i})"),
87        }
88    }
89}
90
91pub trait IndexOrd<V> {
92    /// PartialOrd of the value at index `idx` with `elem`.
93    /// For example, if self\[idx\] > elem, return Some(Greater).
94    fn index_cmp(&self, idx: usize, elem: &V) -> Option<Ordering>;
95
96    fn index_lt(&self, idx: usize, elem: &V) -> bool {
97        matches!(self.index_cmp(idx, elem), Some(Less))
98    }
99
100    fn index_le(&self, idx: usize, elem: &V) -> bool {
101        matches!(self.index_cmp(idx, elem), Some(Less | Equal))
102    }
103
104    fn index_gt(&self, idx: usize, elem: &V) -> bool {
105        matches!(self.index_cmp(idx, elem), Some(Greater))
106    }
107
108    fn index_ge(&self, idx: usize, elem: &V) -> bool {
109        matches!(self.index_cmp(idx, elem), Some(Greater | Equal))
110    }
111
112    /// Get the length of the underlying ordered collection
113    fn index_len(&self) -> usize;
114}
115
116/// Searches for value assuming the array is sorted.
117///
118/// Returned indices satisfy following condition if the search for value was to be inserted into the array at found positions
119///
120/// |side |result satisfies|
121/// |-----|----------------|
122/// |left |array\[i-1\] < value <= array\[i\]|
123/// |right|array\[i-1\] <= value < array\[i\]|
124pub trait SearchSorted<T> {
125    fn search_sorted_many<I: IntoIterator<Item = T>>(
126        &self,
127        values: I,
128        side: SearchSortedSide,
129    ) -> impl Iterator<Item = SearchResult>
130    where
131        Self: IndexOrd<T>,
132    {
133        values
134            .into_iter()
135            .map(move |value| self.search_sorted(&value, side))
136    }
137
138    fn search_sorted(&self, value: &T, side: SearchSortedSide) -> SearchResult
139    where
140        Self: IndexOrd<T>,
141    {
142        match side {
143            SearchSortedSide::Left => self.search_sorted_by(
144                |idx| self.index_cmp(idx, value).unwrap_or(Less),
145                |idx| {
146                    if self.index_lt(idx, value) {
147                        Less
148                    } else {
149                        Greater
150                    }
151                },
152                side,
153            ),
154            SearchSortedSide::Right => self.search_sorted_by(
155                |idx| self.index_cmp(idx, value).unwrap_or(Less),
156                |idx| {
157                    if self.index_le(idx, value) {
158                        Less
159                    } else {
160                        Greater
161                    }
162                },
163                side,
164            ),
165        }
166    }
167
168    /// find function is used to find the element if it exists, if element exists side_find will be
169    /// used to find desired index amongst equal values
170    fn search_sorted_by<F: FnMut(usize) -> Ordering, N: FnMut(usize) -> Ordering>(
171        &self,
172        find: F,
173        side_find: N,
174        side: SearchSortedSide,
175    ) -> SearchResult;
176}
177
178// Default implementation for types that implement IndexOrd.
179impl<S, T> SearchSorted<T> for S
180where
181    S: IndexOrd<T> + ?Sized,
182{
183    fn search_sorted_by<F: FnMut(usize) -> Ordering, N: FnMut(usize) -> Ordering>(
184        &self,
185        find: F,
186        side_find: N,
187        side: SearchSortedSide,
188    ) -> SearchResult {
189        match search_sorted_side_idx(find, 0, self.index_len()) {
190            SearchResult::Found(found) => {
191                let idx_search = match side {
192                    SearchSortedSide::Left => search_sorted_side_idx(side_find, 0, found),
193                    SearchSortedSide::Right => {
194                        search_sorted_side_idx(side_find, found, self.index_len())
195                    }
196                };
197                match idx_search {
198                    SearchResult::NotFound(i) => SearchResult::Found(i),
199                    _ => unreachable!(
200                        "searching amongst equal values should never return Found result"
201                    ),
202                }
203            }
204            s => s,
205        }
206    }
207}
208
209// Code adapted from Rust standard library slice::binary_search_by
210fn search_sorted_side_idx<F: FnMut(usize) -> Ordering>(
211    mut find: F,
212    from: usize,
213    to: usize,
214) -> SearchResult {
215    let mut size = to - from;
216    if size == 0 {
217        return SearchResult::NotFound(0);
218    }
219    let mut base = from;
220
221    // This loop intentionally doesn't have an early exit if the comparison
222    // returns Equal. We want the number of loop iterations to depend *only*
223    // on the size of the input slice so that the CPU can reliably predict
224    // the loop count.
225    while size > 1 {
226        let half = size / 2;
227        let mid = base + half;
228
229        // SAFETY: the call is made safe by the following inconstants:
230        // - `mid >= 0`: by definition
231        // - `mid < size`: `mid = size / 2 + size / 4 + size / 8 ...`
232        let cmp = find(mid);
233
234        // Binary search interacts poorly with branch prediction, so force
235        // the compiler to use conditional moves if supported by the target
236        // architecture.
237        base = if cmp == Greater { base } else { mid };
238
239        // This is imprecise in the case where `size` is odd and the
240        // comparison returns Greater: the mid element still gets included
241        // by `size` even though it's known to be larger than the element
242        // being searched for.
243        //
244        // This is fine though: we gain more performance by keeping the
245        // loop iteration count invariant (and thus predictable) than we
246        // lose from considering one additional element.
247        size -= half;
248    }
249
250    // SAFETY: base is always in [0, size) because base <= mid.
251    let cmp = find(base);
252    if cmp == Equal {
253        // SAFETY: same as the call to `find` above.
254        unsafe { hint::assert_unchecked(base < to) };
255        SearchResult::Found(base)
256    } else {
257        let result = base + (cmp == Less) as usize;
258        // SAFETY: same as the call to `find` above.
259        // Note that this is `<=`, unlike the assert in the `Found` path.
260        unsafe { hint::assert_unchecked(result <= to) };
261        SearchResult::NotFound(result)
262    }
263}
264
265impl IndexOrd<Scalar> for dyn Array + '_ {
266    fn index_cmp(&self, idx: usize, elem: &Scalar) -> Option<Ordering> {
267        let scalar_a = self.scalar_at(idx).ok()?;
268        scalar_a.partial_cmp(elem)
269    }
270
271    fn index_len(&self) -> usize {
272        Self::len(self)
273    }
274}
275
276impl<T: PartialOrd> IndexOrd<T> for [T] {
277    fn index_cmp(&self, idx: usize, elem: &T) -> Option<Ordering> {
278        // SAFETY: Used in search_sorted_by same as the standard library. The search_sorted ensures idx is in bounds
279        unsafe { self.get_unchecked(idx) }.partial_cmp(elem)
280    }
281
282    fn index_len(&self) -> usize {
283        self.len()
284    }
285}
286
287#[cfg(test)]
288mod test {
289    use crate::search_sorted::{SearchResult, SearchSorted, SearchSortedSide};
290
291    #[test]
292    fn left_side_equal() {
293        let arr = [0, 1, 2, 2, 2, 2, 3, 4, 5, 6, 7, 8, 9];
294        let res = arr.search_sorted(&2, SearchSortedSide::Left);
295        assert_eq!(arr[res.to_index()], 2);
296        assert_eq!(res, SearchResult::Found(2));
297    }
298
299    #[test]
300    fn right_side_equal() {
301        let arr = [0, 1, 2, 2, 2, 2, 3, 4, 5, 6, 7, 8, 9];
302        let res = arr.search_sorted(&2, SearchSortedSide::Right);
303        assert_eq!(arr[res.to_index() - 1], 2);
304        assert_eq!(res, SearchResult::Found(6));
305    }
306
307    #[test]
308    fn left_side_equal_beginning() {
309        let arr = [0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
310        let res = arr.search_sorted(&0, SearchSortedSide::Left);
311        assert_eq!(arr[res.to_index()], 0);
312        assert_eq!(res, SearchResult::Found(0));
313    }
314
315    #[test]
316    fn right_side_equal_beginning() {
317        let arr = [0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
318        let res = arr.search_sorted(&0, SearchSortedSide::Right);
319        assert_eq!(arr[res.to_index() - 1], 0);
320        assert_eq!(res, SearchResult::Found(4));
321    }
322
323    #[test]
324    fn left_side_equal_end() {
325        let arr = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 9];
326        let res = arr.search_sorted(&9, SearchSortedSide::Left);
327        assert_eq!(arr[res.to_index()], 9);
328        assert_eq!(res, SearchResult::Found(9));
329    }
330
331    #[test]
332    fn right_side_equal_end() {
333        let arr = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 9];
334        let res = arr.search_sorted(&9, SearchSortedSide::Right);
335        assert_eq!(arr[res.to_index() - 1], 9);
336        assert_eq!(res, SearchResult::Found(13));
337    }
338}