pgm_extra/index/external/
static.rs

1//! Multi-level recursive PGM-Index.
2//!
3//! This is the primary external-keys index with recursive segment levels
4//! for optimal query performance on large datasets.
5
6use alloc::vec;
7use alloc::vec::Vec;
8use core::ops::RangeBounds;
9
10use crate::error::Error;
11
12use crate::index::Segment;
13use crate::index::key::Indexable;
14use crate::index::model::build_segments;
15
16use crate::util::ApproxPos;
17use crate::util::range::range_to_indices;
18use crate::util::search::{pgm_add_eps, pgm_sub_eps};
19
20const LINEAR_SEARCH_THRESHOLD_SEGMENTS: usize = 32;
21
22/// A multi-level recursive PGM-Index.
23///
24/// This index builds multiple levels of linear models for efficient lookups.
25/// It does not own the data; the keys must be stored separately and passed
26/// to query methods.
27///
28/// # Type Parameters
29///
30/// - `T`: The value type that implements [`Indexable`]. The index internally
31///   stores segments of `T::Key` for the linear models.
32///
33/// # Example
34///
35/// ```
36/// use pgm_extra::index::external::Static;
37///
38/// let keys: Vec<u64> = (0..10000).collect();
39/// let index = Static::new(&keys, 64, 4).unwrap();
40///
41/// assert!(index.contains(&keys, &5000));
42/// assert_eq!(index.lower_bound(&keys, &5000), 5000);
43/// ```
44#[derive(Clone, Debug, PartialEq)]
45#[cfg_attr(
46    feature = "rkyv",
47    derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
48)]
49#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
50#[cfg_attr(
51    feature = "serde",
52    serde(bound = "T::Key: serde::Serialize + serde::de::DeserializeOwned")
53)]
54pub struct Static<T: Indexable> {
55    epsilon: usize,
56    epsilon_recursive: usize,
57    len: usize,
58    levels_offsets: Vec<usize>,
59    segments: Vec<Segment<T::Key>>,
60}
61
62impl<T: Indexable> Static<T>
63where
64    T::Key: Ord,
65{
66    /// Build a new multi-level PGM-Index from sorted data.
67    ///
68    /// # Parameters
69    ///
70    /// - `data`: A sorted slice of values
71    /// - `epsilon`: Error bound for the bottom level (larger = fewer segments)
72    /// - `epsilon_recursive`: Error bound for upper levels
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if `data` is empty or `epsilon` is 0.
77    pub fn new(data: &[T], epsilon: usize, epsilon_recursive: usize) -> Result<Self, Error> {
78        if data.is_empty() {
79            return Err(Error::EmptyInput);
80        }
81        if epsilon == 0 {
82            return Err(Error::InvalidEpsilon);
83        }
84
85        debug_assert!(
86            data.windows(2)
87                .all(|w| w[0].index_key() <= w[1].index_key()),
88            "data must be sorted by index key"
89        );
90
91        let keys: Vec<T::Key> = data.iter().map(|v| v.index_key()).collect();
92        Self::build_from_keys(&keys, epsilon, epsilon_recursive)
93    }
94
95    /// Build from pre-extracted keys (internal use).
96    pub(crate) fn build_from_keys(
97        keys: &[T::Key],
98        epsilon: usize,
99        epsilon_recursive: usize,
100    ) -> Result<Self, Error> {
101        let bottom_segments = build_segments(keys, epsilon);
102
103        if bottom_segments.is_empty() {
104            return Err(Error::EmptyInput);
105        }
106
107        let mut levels: Vec<Vec<Segment<T::Key>>> = vec![bottom_segments];
108
109        while epsilon_recursive > 0 && levels.last().unwrap().len() > 1 {
110            let prev_level = levels.last().unwrap();
111            let super_keys: Vec<T::Key> = prev_level.iter().map(|s| s.key).collect();
112            let upper_segments = build_segments(&super_keys, epsilon_recursive);
113
114            if upper_segments.len() >= prev_level.len() {
115                break;
116            }
117
118            levels.push(upper_segments);
119        }
120
121        let total_segments: usize = levels.iter().map(|l| l.len()).sum();
122        let mut segments = Vec::with_capacity(total_segments);
123        let mut levels_offsets = Vec::with_capacity(levels.len() + 1);
124
125        levels_offsets.push(0);
126        for level in levels.iter().rev() {
127            segments.extend_from_slice(level);
128            levels_offsets.push(segments.len());
129        }
130
131        Ok(Self {
132            segments,
133            levels_offsets,
134            epsilon,
135            epsilon_recursive,
136            len: keys.len(),
137        })
138    }
139
140    #[cfg(feature = "parallel")]
141    pub fn new_parallel(
142        data: &[T],
143        epsilon: usize,
144        epsilon_recursive: usize,
145    ) -> Result<Self, Error> {
146        use crate::index::model::build_segments_parallel;
147
148        if data.is_empty() {
149            return Err(Error::EmptyInput);
150        }
151        if epsilon == 0 {
152            return Err(Error::InvalidEpsilon);
153        }
154
155        debug_assert!(
156            data.windows(2)
157                .all(|w| w[0].index_key() <= w[1].index_key()),
158            "data must be sorted by index key"
159        );
160
161        let keys: Vec<T::Key> = data.iter().map(|v| v.index_key()).collect();
162
163        let bottom_segments = build_segments_parallel(&keys, epsilon);
164
165        if bottom_segments.is_empty() {
166            return Err(Error::EmptyInput);
167        }
168
169        let mut levels: Vec<Vec<Segment<T::Key>>> = vec![bottom_segments];
170
171        while epsilon_recursive > 0 && levels.last().unwrap().len() > 1 {
172            let prev_level = levels.last().unwrap();
173            let super_keys: Vec<T::Key> = prev_level.iter().map(|s| s.key).collect();
174            let upper_segments = build_segments(&super_keys, epsilon_recursive);
175
176            if upper_segments.len() >= prev_level.len() {
177                break;
178            }
179
180            levels.push(upper_segments);
181        }
182
183        let total_segments: usize = levels.iter().map(|l| l.len()).sum();
184        let mut segments = Vec::with_capacity(total_segments);
185        let mut levels_offsets = Vec::with_capacity(levels.len() + 1);
186
187        levels_offsets.push(0);
188        for level in levels.iter().rev() {
189            segments.extend_from_slice(level);
190            levels_offsets.push(segments.len());
191        }
192
193        Ok(Self {
194            segments,
195            levels_offsets,
196            epsilon,
197            epsilon_recursive,
198            len: keys.len(),
199        })
200    }
201
202    #[inline]
203    fn search_segment(&self, level: usize, key: &T::Key, lo: usize, hi: usize) -> usize {
204        let level_start = self.levels_offsets[level];
205        let level_end = self.levels_offsets[level + 1];
206        let level_size = level_end - level_start;
207
208        let lo = lo.min(level_size);
209        let hi = hi.min(level_size);
210
211        if hi <= lo {
212            return lo;
213        }
214
215        let abs_lo = level_start + lo;
216        let abs_hi = level_start + hi;
217
218        if abs_hi - abs_lo <= LINEAR_SEARCH_THRESHOLD_SEGMENTS {
219            let mut idx = abs_lo;
220            while idx + 1 < abs_hi && self.segments[idx + 1].key <= *key {
221                idx += 1;
222            }
223            idx - level_start
224        } else {
225            let slice = &self.segments[abs_lo..abs_hi];
226            let pos = slice.partition_point(|s| s.key <= *key);
227            let pos = pos.saturating_sub(1);
228            lo + pos
229        }
230    }
231
232    /// Get an approximate position for the given value.
233    #[inline]
234    pub fn search(&self, value: &T) -> ApproxPos {
235        let key = value.index_key();
236        self.search_by_key(&key)
237    }
238
239    /// Get an approximate position for the given key.
240    #[inline]
241    pub fn search_by_key(&self, key: &T::Key) -> ApproxPos {
242        let num_levels = self.levels_offsets.len() - 1;
243
244        if num_levels == 0 {
245            return ApproxPos::new(0, 0, self.len);
246        }
247
248        let mut seg_lo = 0usize;
249        let mut seg_hi = self.levels_offsets[1];
250
251        for level in 0..num_levels - 1 {
252            let level_start = self.levels_offsets[level];
253            let level_size = self.levels_offsets[level + 1] - level_start;
254
255            let local_idx = self.search_segment(level, key, seg_lo, seg_hi.min(level_size));
256            let segment = &self.segments[level_start + local_idx];
257
258            let next_level_start = self.levels_offsets[level + 1];
259            let next_level_size = self.levels_offsets[level + 2] - next_level_start;
260            let predicted = segment.predict(*key).min(next_level_size.saturating_sub(1));
261
262            seg_lo = pgm_sub_eps(predicted, self.epsilon_recursive + 1);
263            seg_hi = pgm_add_eps(predicted, self.epsilon_recursive, next_level_size);
264        }
265
266        let bottom_level = num_levels - 1;
267        let bottom_start = self.levels_offsets[bottom_level];
268        let bottom_size = self.levels_offsets[bottom_level + 1] - bottom_start;
269
270        let local_idx = self.search_segment(bottom_level, key, seg_lo, seg_hi.min(bottom_size));
271        let segment = &self.segments[bottom_start + local_idx];
272
273        let pos = segment.predict(*key).min(self.len.saturating_sub(1));
274        let lo = pgm_sub_eps(pos, self.epsilon);
275        let hi = pgm_add_eps(pos, self.epsilon, self.len);
276
277        ApproxPos::new(pos, lo, hi)
278    }
279
280    /// Find the first position where `data[pos] >= value`.
281    #[inline]
282    pub fn lower_bound(&self, data: &[T], value: &T) -> usize
283    where
284        T: Ord,
285    {
286        let key = value.index_key();
287        let approx = self.search_by_key(&key);
288        let len = data.len();
289        if len == 0 {
290            return 0;
291        }
292
293        let pos = approx.pos.min(len - 1);
294        if data[pos] == *value {
295            let mut i = pos;
296            while i > 0 && data[i - 1] == *value {
297                i -= 1;
298            }
299            return i;
300        }
301
302        if data[pos] < *value {
303            if pos + 1 < len && data[pos + 1] >= *value {
304                return pos + 1;
305            }
306        } else if pos > 0 && data[pos - 1] < *value {
307            return pos;
308        }
309
310        let lo = approx.lo;
311        let hi = approx.hi.min(len);
312        let slice = &data[lo..hi];
313        lo + slice.partition_point(|x| x < value)
314    }
315
316    /// Find the first position where `data[pos] > value`.
317    #[inline]
318    pub fn upper_bound(&self, data: &[T], value: &T) -> usize
319    where
320        T: Ord,
321    {
322        let idx = self.lower_bound(data, value);
323        let mut i = idx;
324        while i < data.len() && data[i] == *value {
325            i += 1;
326        }
327        i
328    }
329
330    /// Check if the value exists in the data.
331    #[inline]
332    pub fn contains(&self, data: &[T], value: &T) -> bool
333    where
334        T: Ord,
335    {
336        let key = value.index_key();
337        let approx = self.search_by_key(&key);
338        let len = data.len();
339
340        if len == 0 {
341            return false;
342        }
343
344        let pos = approx.pos.min(len - 1);
345        if data[pos] == *value {
346            return true;
347        }
348
349        let lo = approx.lo;
350        let hi = approx.hi.min(len);
351        data[lo..hi].binary_search(value).is_ok()
352    }
353
354    /// Number of elements the index was built for.
355    #[inline]
356    pub fn len(&self) -> usize {
357        self.len
358    }
359
360    #[inline]
361    pub fn is_empty(&self) -> bool {
362        self.len == 0
363    }
364
365    /// Number of segments across all levels.
366    #[inline]
367    pub fn segments_count(&self) -> usize {
368        self.segments.len()
369    }
370
371    /// Number of levels in the index.
372    #[inline]
373    pub fn height(&self) -> usize {
374        self.levels_offsets.len().saturating_sub(1)
375    }
376
377    #[inline]
378    pub fn epsilon(&self) -> usize {
379        self.epsilon
380    }
381
382    #[inline]
383    pub fn epsilon_recursive(&self) -> usize {
384        self.epsilon_recursive
385    }
386
387    /// Approximate memory usage in bytes.
388    pub fn size_in_bytes(&self) -> usize {
389        core::mem::size_of::<Self>()
390            + self.segments.capacity() * core::mem::size_of::<Segment<T::Key>>()
391            + self.levels_offsets.capacity() * core::mem::size_of::<usize>()
392    }
393
394    /// Returns the (start, end) indices for iterating over data in the given range.
395    #[inline]
396    pub fn range_indices<R>(&self, data: &[T], range: R) -> (usize, usize)
397    where
398        T: Ord,
399        R: RangeBounds<T>,
400    {
401        range_to_indices(
402            range,
403            data.len(),
404            |v| self.lower_bound(data, v),
405            |v| self.upper_bound(data, v),
406        )
407    }
408
409    /// Returns an iterator over data in the given range.
410    #[inline]
411    pub fn range<'a, R>(&self, data: &'a [T], range: R) -> impl DoubleEndedIterator<Item = &'a T>
412    where
413        T: Ord,
414        R: RangeBounds<T>,
415    {
416        let (start, end) = self.range_indices(data, range);
417        data[start..end].iter()
418    }
419}
420
421impl<T: Indexable> crate::index::External<T> for Static<T>
422where
423    T::Key: Ord,
424{
425    #[inline]
426    fn search(&self, value: &T) -> ApproxPos {
427        self.search(value)
428    }
429
430    #[inline]
431    fn lower_bound(&self, data: &[T], value: &T) -> usize
432    where
433        T: Ord,
434    {
435        self.lower_bound(data, value)
436    }
437
438    #[inline]
439    fn upper_bound(&self, data: &[T], value: &T) -> usize
440    where
441        T: Ord,
442    {
443        self.upper_bound(data, value)
444    }
445
446    #[inline]
447    fn contains(&self, data: &[T], value: &T) -> bool
448    where
449        T: Ord,
450    {
451        self.contains(data, value)
452    }
453
454    #[inline]
455    fn len(&self) -> usize {
456        self.len()
457    }
458
459    #[inline]
460    fn segments_count(&self) -> usize {
461        self.segments_count()
462    }
463
464    #[inline]
465    fn epsilon(&self) -> usize {
466        self.epsilon()
467    }
468
469    #[inline]
470    fn size_in_bytes(&self) -> usize {
471        self.size_in_bytes()
472    }
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478    use alloc::vec::Vec;
479
480    #[test]
481    fn test_pgm_index_basic() {
482        let keys: Vec<u64> = (0..10000).collect();
483        let index = Static::new(&keys, 64, 4).unwrap();
484
485        assert_eq!(index.len(), 10000);
486        assert!(!index.is_empty());
487        assert!(index.height() >= 1);
488    }
489
490    #[test]
491    fn test_pgm_index_search() {
492        let keys: Vec<u64> = (0..10000).collect();
493        let index = Static::new(&keys, 64, 4).unwrap();
494
495        for &key in &[0u64, 100, 5000, 9999] {
496            let idx = index.lower_bound(&keys, &key);
497            assert_eq!(idx, key as usize, "Failed for key {}", key);
498        }
499    }
500
501    #[test]
502    fn test_pgm_index_sparse() {
503        let keys: Vec<u64> = (0..1000).map(|i| i * 1000).collect();
504        let index = Static::new(&keys, 16, 4).unwrap();
505
506        for (i, &key) in keys.iter().enumerate() {
507            let idx = index.lower_bound(&keys, &key);
508            assert_eq!(idx, i, "Failed for key {} at index {}", key, i);
509        }
510    }
511
512    #[test]
513    fn test_pgm_index_contains() {
514        let keys: Vec<u64> = (0..100).map(|i| i * 2).collect();
515        let index = Static::new(&keys, 8, 4).unwrap();
516
517        assert!(index.contains(&keys, &0));
518        assert!(index.contains(&keys, &100));
519        assert!(!index.contains(&keys, &1));
520        assert!(!index.contains(&keys, &99));
521    }
522
523    #[test]
524    fn test_pgm_index_signed() {
525        let keys: Vec<i64> = (-500..500).collect();
526        let index = Static::new(&keys, 16, 4).unwrap();
527
528        for &key in &[-500i64, -100, 0, 100, 499] {
529            let expected = (key + 500) as usize;
530            let idx = index.lower_bound(&keys, &key);
531            assert_eq!(idx, expected, "Failed for key {}", key);
532        }
533    }
534
535    #[test]
536    fn test_pgm_index_duplicates() {
537        let keys: Vec<u64> = vec![1, 1, 2, 2, 2, 3, 3, 4, 5, 5, 5, 5];
538        let index = Static::new(&keys, 4, 2).unwrap();
539
540        assert_eq!(index.lower_bound(&keys, &1), 0);
541        assert_eq!(index.lower_bound(&keys, &2), 2);
542        assert_eq!(index.lower_bound(&keys, &5), 8);
543    }
544
545    #[test]
546    fn test_empty_input_error() {
547        let keys: Vec<u64> = vec![];
548        let result = Static::new(&keys, 64, 4);
549        assert_eq!(result.unwrap_err(), Error::EmptyInput);
550    }
551
552    #[test]
553    fn test_invalid_epsilon_error() {
554        let keys: Vec<u64> = vec![1, 2, 3];
555        let result = Static::new(&keys, 0, 4);
556        assert_eq!(result.unwrap_err(), Error::InvalidEpsilon);
557    }
558
559    #[test]
560    fn test_single_element() {
561        let keys: Vec<u64> = vec![42];
562        let index = Static::new(&keys, 64, 4).unwrap();
563
564        assert_eq!(index.len(), 1);
565        assert_eq!(index.height(), 1);
566        assert!(index.contains(&keys, &42));
567        assert!(!index.contains(&keys, &0));
568        assert!(!index.contains(&keys, &100));
569        assert_eq!(index.lower_bound(&keys, &42), 0);
570        assert_eq!(index.lower_bound(&keys, &0), 0);
571        assert_eq!(index.lower_bound(&keys, &100), 1);
572    }
573
574    #[test]
575    fn test_epsilon_recursive_zero() {
576        let keys: Vec<u64> = (0..1000).collect();
577        let index = Static::new(&keys, 64, 0).unwrap();
578
579        assert_eq!(index.height(), 1);
580        assert!(index.contains(&keys, &500));
581        assert_eq!(index.lower_bound(&keys, &500), 500);
582    }
583
584    #[test]
585    fn test_very_small_epsilon() {
586        let keys: Vec<u64> = (0..100).collect();
587        let index = Static::new(&keys, 1, 1).unwrap();
588
589        for &key in &[0u64, 50, 99] {
590            assert!(index.contains(&keys, &key));
591            assert_eq!(index.lower_bound(&keys, &key), key as usize);
592        }
593    }
594
595    #[test]
596    fn test_very_large_epsilon() {
597        let keys: Vec<u64> = (0..100).collect();
598        let index = Static::new(&keys, 1000, 1000).unwrap();
599
600        assert_eq!(index.segments_count(), 1);
601        for &key in &[0u64, 50, 99] {
602            assert!(index.contains(&keys, &key));
603        }
604    }
605
606    #[test]
607    fn test_upper_bound() {
608        let keys: Vec<u64> = vec![1, 1, 2, 2, 2, 3, 3, 4, 5, 5, 5, 5];
609        let index = Static::new(&keys, 4, 2).unwrap();
610
611        assert_eq!(index.upper_bound(&keys, &1), 2);
612        assert_eq!(index.upper_bound(&keys, &2), 5);
613        assert_eq!(index.upper_bound(&keys, &5), 12);
614        assert_eq!(index.upper_bound(&keys, &0), 0);
615        assert_eq!(index.upper_bound(&keys, &6), 12);
616    }
617
618    #[test]
619    fn test_range_all_variants() {
620        let keys: Vec<u64> = (0..100).collect();
621        let index = Static::new(&keys, 16, 4).unwrap();
622
623        let range_full: Vec<_> = index.range(&keys, ..).copied().collect();
624        assert_eq!(range_full.len(), 100);
625
626        let range_from: Vec<_> = index.range(&keys, 90..).copied().collect();
627        assert_eq!(range_from, (90..100).collect::<Vec<_>>());
628
629        let range_to: Vec<_> = index.range(&keys, ..10).copied().collect();
630        assert_eq!(range_to, (0..10).collect::<Vec<_>>());
631
632        let range_to_inclusive: Vec<_> = index.range(&keys, ..=10).copied().collect();
633        assert_eq!(range_to_inclusive, (0..=10).collect::<Vec<_>>());
634
635        let range_bounded: Vec<_> = index.range(&keys, 10..20).copied().collect();
636        assert_eq!(range_bounded, (10..20).collect::<Vec<_>>());
637
638        let range_bounded_inclusive: Vec<_> = index.range(&keys, 10..=20).copied().collect();
639        assert_eq!(range_bounded_inclusive, (10..=20).collect::<Vec<_>>());
640    }
641
642    #[test]
643    fn test_range_empty() {
644        let keys: Vec<u64> = (0..100).collect();
645        let index = Static::new(&keys, 16, 4).unwrap();
646
647        let empty: Vec<_> = index.range(&keys, 200..300).copied().collect();
648        assert!(empty.is_empty());
649    }
650
651    #[test]
652    fn test_size_in_bytes() {
653        let keys: Vec<u64> = (0..1000).collect();
654        let index = Static::new(&keys, 64, 4).unwrap();
655
656        assert!(index.size_in_bytes() > 0);
657    }
658}