pgm_extra/index/external/
static.rs

1//! Multi-level recursive PGM-Index.
2//!
3//! This is the primary external-keys index with recursive segment levels
4//! for optimal query performance on large datasets.
5
6use alloc::vec;
7use alloc::vec::Vec;
8use core::ops::RangeBounds;
9
10use crate::error::Error;
11
12use crate::index::Segment;
13use crate::index::key::Indexable;
14use crate::index::model::build_segments;
15
16use crate::util::ApproxPos;
17use crate::util::range::range_to_indices;
18use crate::util::search::{pgm_add_eps, pgm_sub_eps};
19
20const LINEAR_SEARCH_THRESHOLD_SEGMENTS: usize = 32;
21
22/// A multi-level recursive PGM-Index.
23///
24/// This index builds multiple levels of linear models for efficient lookups.
25/// It does not own the data; the keys must be stored separately and passed
26/// to query methods.
27///
28/// # Type Parameters
29///
30/// - `T`: The value type that implements [`Indexable`]. The index internally
31///   stores segments of `T::Key` for the linear models.
32///
33/// # Example
34///
35/// ```
36/// use pgm_extra::index::external::Static;
37///
38/// let keys: Vec<u64> = (0..10000).collect();
39/// let index = Static::new(&keys, 64, 4).unwrap();
40///
41/// assert!(index.contains(&keys, &5000));
42/// assert_eq!(index.lower_bound(&keys, &5000), 5000);
43/// ```
44#[derive(Clone, Debug)]
45#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
46#[cfg_attr(
47    feature = "serde",
48    serde(bound = "T::Key: serde::Serialize + serde::de::DeserializeOwned")
49)]
50pub struct Static<T: Indexable> {
51    epsilon: usize,
52    epsilon_recursive: usize,
53    len: usize,
54    levels_offsets: Vec<usize>,
55    segments: Vec<Segment<T::Key>>,
56}
57
58impl<T: Indexable> Static<T>
59where
60    T::Key: Ord,
61{
62    /// Build a new multi-level PGM-Index from sorted data.
63    ///
64    /// # Parameters
65    ///
66    /// - `data`: A sorted slice of values
67    /// - `epsilon`: Error bound for the bottom level (larger = fewer segments)
68    /// - `epsilon_recursive`: Error bound for upper levels
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if `data` is empty or `epsilon` is 0.
73    pub fn new(data: &[T], epsilon: usize, epsilon_recursive: usize) -> Result<Self, Error> {
74        if data.is_empty() {
75            return Err(Error::EmptyInput);
76        }
77        if epsilon == 0 {
78            return Err(Error::InvalidEpsilon);
79        }
80
81        debug_assert!(
82            data.windows(2)
83                .all(|w| w[0].index_key() <= w[1].index_key()),
84            "data must be sorted by index key"
85        );
86
87        let keys: Vec<T::Key> = data.iter().map(|v| v.index_key()).collect();
88        Self::build_from_keys(&keys, epsilon, epsilon_recursive)
89    }
90
91    /// Build from pre-extracted keys (internal use).
92    pub(crate) fn build_from_keys(
93        keys: &[T::Key],
94        epsilon: usize,
95        epsilon_recursive: usize,
96    ) -> Result<Self, Error> {
97        let bottom_segments = build_segments(keys, epsilon);
98
99        if bottom_segments.is_empty() {
100            return Err(Error::EmptyInput);
101        }
102
103        let mut levels: Vec<Vec<Segment<T::Key>>> = vec![bottom_segments];
104
105        while epsilon_recursive > 0 && levels.last().unwrap().len() > 1 {
106            let prev_level = levels.last().unwrap();
107            let super_keys: Vec<T::Key> = prev_level.iter().map(|s| s.key).collect();
108            let upper_segments = build_segments(&super_keys, epsilon_recursive);
109
110            if upper_segments.len() >= prev_level.len() {
111                break;
112            }
113
114            levels.push(upper_segments);
115        }
116
117        let total_segments: usize = levels.iter().map(|l| l.len()).sum();
118        let mut segments = Vec::with_capacity(total_segments);
119        let mut levels_offsets = Vec::with_capacity(levels.len() + 1);
120
121        levels_offsets.push(0);
122        for level in levels.iter().rev() {
123            segments.extend_from_slice(level);
124            levels_offsets.push(segments.len());
125        }
126
127        Ok(Self {
128            segments,
129            levels_offsets,
130            epsilon,
131            epsilon_recursive,
132            len: keys.len(),
133        })
134    }
135
136    #[cfg(feature = "parallel")]
137    pub fn new_parallel(
138        data: &[T],
139        epsilon: usize,
140        epsilon_recursive: usize,
141    ) -> Result<Self, Error> {
142        use crate::index::model::build_segments_parallel;
143
144        if data.is_empty() {
145            return Err(Error::EmptyInput);
146        }
147        if epsilon == 0 {
148            return Err(Error::InvalidEpsilon);
149        }
150
151        debug_assert!(
152            data.windows(2)
153                .all(|w| w[0].index_key() <= w[1].index_key()),
154            "data must be sorted by index key"
155        );
156
157        let keys: Vec<T::Key> = data.iter().map(|v| v.index_key()).collect();
158
159        let bottom_segments = build_segments_parallel(&keys, epsilon);
160
161        if bottom_segments.is_empty() {
162            return Err(Error::EmptyInput);
163        }
164
165        let mut levels: Vec<Vec<Segment<T::Key>>> = vec![bottom_segments];
166
167        while epsilon_recursive > 0 && levels.last().unwrap().len() > 1 {
168            let prev_level = levels.last().unwrap();
169            let super_keys: Vec<T::Key> = prev_level.iter().map(|s| s.key).collect();
170            let upper_segments = build_segments(&super_keys, epsilon_recursive);
171
172            if upper_segments.len() >= prev_level.len() {
173                break;
174            }
175
176            levels.push(upper_segments);
177        }
178
179        let total_segments: usize = levels.iter().map(|l| l.len()).sum();
180        let mut segments = Vec::with_capacity(total_segments);
181        let mut levels_offsets = Vec::with_capacity(levels.len() + 1);
182
183        levels_offsets.push(0);
184        for level in levels.iter().rev() {
185            segments.extend_from_slice(level);
186            levels_offsets.push(segments.len());
187        }
188
189        Ok(Self {
190            segments,
191            levels_offsets,
192            epsilon,
193            epsilon_recursive,
194            len: keys.len(),
195        })
196    }
197
198    #[inline]
199    fn search_segment(&self, level: usize, key: &T::Key, lo: usize, hi: usize) -> usize {
200        let level_start = self.levels_offsets[level];
201        let level_end = self.levels_offsets[level + 1];
202        let level_size = level_end - level_start;
203
204        let lo = lo.min(level_size);
205        let hi = hi.min(level_size);
206
207        if hi <= lo {
208            return lo;
209        }
210
211        let abs_lo = level_start + lo;
212        let abs_hi = level_start + hi;
213
214        if abs_hi - abs_lo <= LINEAR_SEARCH_THRESHOLD_SEGMENTS {
215            let mut idx = abs_lo;
216            while idx + 1 < abs_hi && self.segments[idx + 1].key <= *key {
217                idx += 1;
218            }
219            idx - level_start
220        } else {
221            let slice = &self.segments[abs_lo..abs_hi];
222            let pos = slice.partition_point(|s| s.key <= *key);
223            let pos = pos.saturating_sub(1);
224            lo + pos
225        }
226    }
227
228    /// Get an approximate position for the given value.
229    #[inline]
230    pub fn search(&self, value: &T) -> ApproxPos {
231        let key = value.index_key();
232        self.search_by_key(&key)
233    }
234
235    /// Get an approximate position for the given key.
236    #[inline]
237    pub fn search_by_key(&self, key: &T::Key) -> ApproxPos {
238        let num_levels = self.levels_offsets.len() - 1;
239
240        if num_levels == 0 {
241            return ApproxPos::new(0, 0, self.len);
242        }
243
244        let mut seg_lo = 0usize;
245        let mut seg_hi = self.levels_offsets[1];
246
247        for level in 0..num_levels - 1 {
248            let level_start = self.levels_offsets[level];
249            let level_size = self.levels_offsets[level + 1] - level_start;
250
251            let local_idx = self.search_segment(level, key, seg_lo, seg_hi.min(level_size));
252            let segment = &self.segments[level_start + local_idx];
253
254            let next_level_start = self.levels_offsets[level + 1];
255            let next_level_size = self.levels_offsets[level + 2] - next_level_start;
256            let predicted = segment.predict(*key).min(next_level_size.saturating_sub(1));
257
258            seg_lo = pgm_sub_eps(predicted, self.epsilon_recursive + 1);
259            seg_hi = pgm_add_eps(predicted, self.epsilon_recursive, next_level_size);
260        }
261
262        let bottom_level = num_levels - 1;
263        let bottom_start = self.levels_offsets[bottom_level];
264        let bottom_size = self.levels_offsets[bottom_level + 1] - bottom_start;
265
266        let local_idx = self.search_segment(bottom_level, key, seg_lo, seg_hi.min(bottom_size));
267        let segment = &self.segments[bottom_start + local_idx];
268
269        let pos = segment.predict(*key).min(self.len.saturating_sub(1));
270        let lo = pgm_sub_eps(pos, self.epsilon);
271        let hi = pgm_add_eps(pos, self.epsilon, self.len);
272
273        ApproxPos::new(pos, lo, hi)
274    }
275
276    /// Find the first position where `data[pos] >= value`.
277    #[inline]
278    pub fn lower_bound(&self, data: &[T], value: &T) -> usize
279    where
280        T: Ord,
281    {
282        let key = value.index_key();
283        let approx = self.search_by_key(&key);
284        let len = data.len();
285        if len == 0 {
286            return 0;
287        }
288
289        let pos = approx.pos.min(len - 1);
290        if data[pos] == *value {
291            let mut i = pos;
292            while i > 0 && data[i - 1] == *value {
293                i -= 1;
294            }
295            return i;
296        }
297
298        if data[pos] < *value {
299            if pos + 1 < len && data[pos + 1] >= *value {
300                return pos + 1;
301            }
302        } else if pos > 0 && data[pos - 1] < *value {
303            return pos;
304        }
305
306        let lo = approx.lo;
307        let hi = approx.hi.min(len);
308        let slice = &data[lo..hi];
309        lo + slice.partition_point(|x| x < value)
310    }
311
312    /// Find the first position where `data[pos] > value`.
313    #[inline]
314    pub fn upper_bound(&self, data: &[T], value: &T) -> usize
315    where
316        T: Ord,
317    {
318        let idx = self.lower_bound(data, value);
319        let mut i = idx;
320        while i < data.len() && data[i] == *value {
321            i += 1;
322        }
323        i
324    }
325
326    /// Check if the value exists in the data.
327    #[inline]
328    pub fn contains(&self, data: &[T], value: &T) -> bool
329    where
330        T: Ord,
331    {
332        let key = value.index_key();
333        let approx = self.search_by_key(&key);
334        let len = data.len();
335
336        if len == 0 {
337            return false;
338        }
339
340        let pos = approx.pos.min(len - 1);
341        if data[pos] == *value {
342            return true;
343        }
344
345        let lo = approx.lo;
346        let hi = approx.hi.min(len);
347        data[lo..hi].binary_search(value).is_ok()
348    }
349
350    /// Number of elements the index was built for.
351    #[inline]
352    pub fn len(&self) -> usize {
353        self.len
354    }
355
356    #[inline]
357    pub fn is_empty(&self) -> bool {
358        self.len == 0
359    }
360
361    /// Number of segments across all levels.
362    #[inline]
363    pub fn segments_count(&self) -> usize {
364        self.segments.len()
365    }
366
367    /// Number of levels in the index.
368    #[inline]
369    pub fn height(&self) -> usize {
370        self.levels_offsets.len().saturating_sub(1)
371    }
372
373    #[inline]
374    pub fn epsilon(&self) -> usize {
375        self.epsilon
376    }
377
378    #[inline]
379    pub fn epsilon_recursive(&self) -> usize {
380        self.epsilon_recursive
381    }
382
383    /// Approximate memory usage in bytes.
384    pub fn size_in_bytes(&self) -> usize {
385        core::mem::size_of::<Self>()
386            + self.segments.capacity() * core::mem::size_of::<Segment<T::Key>>()
387            + self.levels_offsets.capacity() * core::mem::size_of::<usize>()
388    }
389
390    /// Returns the (start, end) indices for iterating over data in the given range.
391    #[inline]
392    pub fn range_indices<R>(&self, data: &[T], range: R) -> (usize, usize)
393    where
394        T: Ord,
395        R: RangeBounds<T>,
396    {
397        range_to_indices(
398            range,
399            data.len(),
400            |v| self.lower_bound(data, v),
401            |v| self.upper_bound(data, v),
402        )
403    }
404
405    /// Returns an iterator over data in the given range.
406    #[inline]
407    pub fn range<'a, R>(&self, data: &'a [T], range: R) -> impl DoubleEndedIterator<Item = &'a T>
408    where
409        T: Ord,
410        R: RangeBounds<T>,
411    {
412        let (start, end) = self.range_indices(data, range);
413        data[start..end].iter()
414    }
415}
416
417impl<T: Indexable> crate::index::External<T> for Static<T>
418where
419    T::Key: Ord,
420{
421    #[inline]
422    fn search(&self, value: &T) -> ApproxPos {
423        self.search(value)
424    }
425
426    #[inline]
427    fn lower_bound(&self, data: &[T], value: &T) -> usize
428    where
429        T: Ord,
430    {
431        self.lower_bound(data, value)
432    }
433
434    #[inline]
435    fn upper_bound(&self, data: &[T], value: &T) -> usize
436    where
437        T: Ord,
438    {
439        self.upper_bound(data, value)
440    }
441
442    #[inline]
443    fn contains(&self, data: &[T], value: &T) -> bool
444    where
445        T: Ord,
446    {
447        self.contains(data, value)
448    }
449
450    #[inline]
451    fn len(&self) -> usize {
452        self.len()
453    }
454
455    #[inline]
456    fn segments_count(&self) -> usize {
457        self.segments_count()
458    }
459
460    #[inline]
461    fn epsilon(&self) -> usize {
462        self.epsilon()
463    }
464
465    #[inline]
466    fn size_in_bytes(&self) -> usize {
467        self.size_in_bytes()
468    }
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474    use alloc::vec::Vec;
475
476    #[test]
477    fn test_pgm_index_basic() {
478        let keys: Vec<u64> = (0..10000).collect();
479        let index = Static::new(&keys, 64, 4).unwrap();
480
481        assert_eq!(index.len(), 10000);
482        assert!(!index.is_empty());
483        assert!(index.height() >= 1);
484    }
485
486    #[test]
487    fn test_pgm_index_search() {
488        let keys: Vec<u64> = (0..10000).collect();
489        let index = Static::new(&keys, 64, 4).unwrap();
490
491        for &key in &[0u64, 100, 5000, 9999] {
492            let idx = index.lower_bound(&keys, &key);
493            assert_eq!(idx, key as usize, "Failed for key {}", key);
494        }
495    }
496
497    #[test]
498    fn test_pgm_index_sparse() {
499        let keys: Vec<u64> = (0..1000).map(|i| i * 1000).collect();
500        let index = Static::new(&keys, 16, 4).unwrap();
501
502        for (i, &key) in keys.iter().enumerate() {
503            let idx = index.lower_bound(&keys, &key);
504            assert_eq!(idx, i, "Failed for key {} at index {}", key, i);
505        }
506    }
507
508    #[test]
509    fn test_pgm_index_contains() {
510        let keys: Vec<u64> = (0..100).map(|i| i * 2).collect();
511        let index = Static::new(&keys, 8, 4).unwrap();
512
513        assert!(index.contains(&keys, &0));
514        assert!(index.contains(&keys, &100));
515        assert!(!index.contains(&keys, &1));
516        assert!(!index.contains(&keys, &99));
517    }
518
519    #[test]
520    fn test_pgm_index_signed() {
521        let keys: Vec<i64> = (-500..500).collect();
522        let index = Static::new(&keys, 16, 4).unwrap();
523
524        for &key in &[-500i64, -100, 0, 100, 499] {
525            let expected = (key + 500) as usize;
526            let idx = index.lower_bound(&keys, &key);
527            assert_eq!(idx, expected, "Failed for key {}", key);
528        }
529    }
530
531    #[test]
532    fn test_pgm_index_duplicates() {
533        let keys: Vec<u64> = vec![1, 1, 2, 2, 2, 3, 3, 4, 5, 5, 5, 5];
534        let index = Static::new(&keys, 4, 2).unwrap();
535
536        assert_eq!(index.lower_bound(&keys, &1), 0);
537        assert_eq!(index.lower_bound(&keys, &2), 2);
538        assert_eq!(index.lower_bound(&keys, &5), 8);
539    }
540
541    #[test]
542    fn test_empty_input_error() {
543        let keys: Vec<u64> = vec![];
544        let result = Static::new(&keys, 64, 4);
545        assert_eq!(result.unwrap_err(), Error::EmptyInput);
546    }
547
548    #[test]
549    fn test_invalid_epsilon_error() {
550        let keys: Vec<u64> = vec![1, 2, 3];
551        let result = Static::new(&keys, 0, 4);
552        assert_eq!(result.unwrap_err(), Error::InvalidEpsilon);
553    }
554
555    #[test]
556    fn test_single_element() {
557        let keys: Vec<u64> = vec![42];
558        let index = Static::new(&keys, 64, 4).unwrap();
559
560        assert_eq!(index.len(), 1);
561        assert_eq!(index.height(), 1);
562        assert!(index.contains(&keys, &42));
563        assert!(!index.contains(&keys, &0));
564        assert!(!index.contains(&keys, &100));
565        assert_eq!(index.lower_bound(&keys, &42), 0);
566        assert_eq!(index.lower_bound(&keys, &0), 0);
567        assert_eq!(index.lower_bound(&keys, &100), 1);
568    }
569
570    #[test]
571    fn test_epsilon_recursive_zero() {
572        let keys: Vec<u64> = (0..1000).collect();
573        let index = Static::new(&keys, 64, 0).unwrap();
574
575        assert_eq!(index.height(), 1);
576        assert!(index.contains(&keys, &500));
577        assert_eq!(index.lower_bound(&keys, &500), 500);
578    }
579
580    #[test]
581    fn test_very_small_epsilon() {
582        let keys: Vec<u64> = (0..100).collect();
583        let index = Static::new(&keys, 1, 1).unwrap();
584
585        for &key in &[0u64, 50, 99] {
586            assert!(index.contains(&keys, &key));
587            assert_eq!(index.lower_bound(&keys, &key), key as usize);
588        }
589    }
590
591    #[test]
592    fn test_very_large_epsilon() {
593        let keys: Vec<u64> = (0..100).collect();
594        let index = Static::new(&keys, 1000, 1000).unwrap();
595
596        assert_eq!(index.segments_count(), 1);
597        for &key in &[0u64, 50, 99] {
598            assert!(index.contains(&keys, &key));
599        }
600    }
601
602    #[test]
603    fn test_upper_bound() {
604        let keys: Vec<u64> = vec![1, 1, 2, 2, 2, 3, 3, 4, 5, 5, 5, 5];
605        let index = Static::new(&keys, 4, 2).unwrap();
606
607        assert_eq!(index.upper_bound(&keys, &1), 2);
608        assert_eq!(index.upper_bound(&keys, &2), 5);
609        assert_eq!(index.upper_bound(&keys, &5), 12);
610        assert_eq!(index.upper_bound(&keys, &0), 0);
611        assert_eq!(index.upper_bound(&keys, &6), 12);
612    }
613
614    #[test]
615    fn test_range_all_variants() {
616        let keys: Vec<u64> = (0..100).collect();
617        let index = Static::new(&keys, 16, 4).unwrap();
618
619        let range_full: Vec<_> = index.range(&keys, ..).copied().collect();
620        assert_eq!(range_full.len(), 100);
621
622        let range_from: Vec<_> = index.range(&keys, 90..).copied().collect();
623        assert_eq!(range_from, (90..100).collect::<Vec<_>>());
624
625        let range_to: Vec<_> = index.range(&keys, ..10).copied().collect();
626        assert_eq!(range_to, (0..10).collect::<Vec<_>>());
627
628        let range_to_inclusive: Vec<_> = index.range(&keys, ..=10).copied().collect();
629        assert_eq!(range_to_inclusive, (0..=10).collect::<Vec<_>>());
630
631        let range_bounded: Vec<_> = index.range(&keys, 10..20).copied().collect();
632        assert_eq!(range_bounded, (10..20).collect::<Vec<_>>());
633
634        let range_bounded_inclusive: Vec<_> = index.range(&keys, 10..=20).copied().collect();
635        assert_eq!(range_bounded_inclusive, (10..=20).collect::<Vec<_>>());
636    }
637
638    #[test]
639    fn test_range_empty() {
640        let keys: Vec<u64> = (0..100).collect();
641        let index = Static::new(&keys, 16, 4).unwrap();
642
643        let empty: Vec<_> = index.range(&keys, 200..300).copied().collect();
644        assert!(empty.is_empty());
645    }
646
647    #[test]
648    fn test_size_in_bytes() {
649        let keys: Vec<u64> = (0..1000).collect();
650        let index = Static::new(&keys, 64, 4).unwrap();
651
652        assert!(index.size_in_bytes() > 0);
653    }
654}