pgm_extra/index/owned/
dynamic.rs

1//! Dynamic PGM-Index supporting insertions and deletions.
2//!
3//! This index owns its data and supports modifications with automatic rebuilding.
4
5use alloc::vec::Vec;
6use core::ops::RangeBounds;
7use std::collections::BTreeSet;
8
9use crate::error::Error;
10use crate::index::external::Static;
11use crate::index::key::Indexable;
12
13/// A dynamic PGM-Index that supports insertions and deletions.
14///
15/// This index owns its data and maintains insert/delete buffers that are
16/// periodically merged into the main index. When the buffers exceed a
17/// threshold, the index is automatically rebuilt.
18///
19/// # Example
20///
21/// ```
22/// use pgm_extra::index::owned::Dynamic;
23///
24/// let mut index: Dynamic<u64> = Dynamic::new(16, 4);
25///
26/// index.insert(5);
27/// index.insert(3);
28/// index.insert(7);
29///
30/// assert!(index.contains(&3));
31/// assert!(index.contains(&5));
32/// assert!(!index.contains(&4));
33/// ```
34#[cfg_attr(
35    feature = "rkyv",
36    derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
37)]
38#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
39#[cfg_attr(
40    feature = "serde",
41    serde(
42        bound = "T: serde::Serialize + serde::de::DeserializeOwned, T::Key: serde::Serialize + serde::de::DeserializeOwned"
43    )
44)]
45pub struct Dynamic<T: Indexable + Ord>
46where
47    T::Key: Ord,
48{
49    base_index: Option<Static<T>>,
50    base_data: Vec<T>,
51    delete_buffer: BTreeSet<T>,
52    epsilon: usize,
53    epsilon_recursive: usize,
54    insert_buffer: BTreeSet<T>,
55    rebuild_threshold: usize,
56}
57
58impl<T: Indexable + Ord + std::fmt::Debug> std::fmt::Debug for Dynamic<T>
59where
60    T::Key: Ord,
61{
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        f.debug_struct("Dynamic")
64            .field("base_data_len", &self.base_data.len())
65            .field("insert_buffer_len", &self.insert_buffer.len())
66            .field("delete_buffer_len", &self.delete_buffer.len())
67            .field("epsilon", &self.epsilon)
68            .field("epsilon_recursive", &self.epsilon_recursive)
69            .finish()
70    }
71}
72
73impl<T: Indexable + Ord + Copy> Dynamic<T>
74where
75    T::Key: Ord,
76{
77    /// Create a new empty dynamic index.
78    pub fn new(epsilon: usize, epsilon_recursive: usize) -> Self {
79        Self {
80            base_index: None,
81            base_data: Vec::new(),
82            delete_buffer: BTreeSet::new(),
83            epsilon: epsilon.max(1),
84            epsilon_recursive: epsilon_recursive.max(1),
85            insert_buffer: BTreeSet::new(),
86            rebuild_threshold: 1024,
87        }
88    }
89
90    /// Create a dynamic index from pre-sorted data.
91    pub fn from_sorted(
92        data: Vec<T>,
93        epsilon: usize,
94        epsilon_recursive: usize,
95    ) -> Result<Self, Error> {
96        let epsilon = epsilon.max(1);
97        let epsilon_recursive = epsilon_recursive.max(1);
98
99        if data.is_empty() {
100            return Ok(Self::new(epsilon, epsilon_recursive));
101        }
102
103        let base_index = Static::new(&data, epsilon, epsilon_recursive)?;
104        let rebuild_threshold = (data.len() / 10).max(1024);
105
106        Ok(Self {
107            base_index: Some(base_index),
108            base_data: data,
109            delete_buffer: BTreeSet::new(),
110            epsilon,
111            epsilon_recursive,
112            insert_buffer: BTreeSet::new(),
113            rebuild_threshold,
114        })
115    }
116
117    /// Set the threshold for automatic rebuilding.
118    pub fn with_rebuild_threshold(mut self, threshold: usize) -> Self {
119        self.rebuild_threshold = threshold.max(1);
120        self
121    }
122
123    /// Insert a value into the index.
124    pub fn insert(&mut self, value: T) {
125        self.insert_buffer.insert(value);
126        self.maybe_rebuild();
127    }
128
129    /// Remove a value from the index.
130    pub fn remove(&mut self, value: &T) -> bool {
131        if self.insert_buffer.remove(value) {
132            return true;
133        }
134
135        if self
136            .base_index
137            .as_ref()
138            .is_some_and(|idx| idx.contains(&self.base_data, value))
139        {
140            self.delete_buffer.insert(*value);
141            self.maybe_rebuild();
142            return true;
143        }
144
145        false
146    }
147
148    /// Check if a value exists in the index.
149    pub fn contains(&self, value: &T) -> bool {
150        if self.insert_buffer.contains(value) {
151            return true;
152        }
153
154        if self.delete_buffer.contains(value) {
155            return false;
156        }
157
158        (self.base_index.as_ref()).is_some_and(|idx| idx.contains(&self.base_data, value))
159    }
160
161    /// Find the smallest value >= the given value.
162    pub fn lower_bound(&self, value: &T) -> Option<T> {
163        let base_result = self.base_index.as_ref().and_then(|idx| {
164            let pos = idx.lower_bound(&self.base_data, value);
165            if pos < self.base_data.len() {
166                let v = self.base_data[pos];
167                if !self.delete_buffer.contains(&v) {
168                    return Some(v);
169                }
170                for i in (pos + 1)..self.base_data.len() {
171                    let v = self.base_data[i];
172                    if !self.delete_buffer.contains(&v) {
173                        return Some(v);
174                    }
175                }
176            }
177            None
178        });
179
180        let buffer_result = self.insert_buffer.range(value..).next().copied();
181
182        match (base_result, buffer_result) {
183            (Some(a), Some(b)) => Some(a.min(b)),
184            (Some(a), None) => Some(a),
185            (None, Some(b)) => Some(b),
186            (None, None) => None,
187        }
188    }
189
190    /// Get the number of elements in the index.
191    pub fn len(&self) -> usize {
192        let base_len = self.base_data.len();
193        let inserts = self.insert_buffer.len();
194        let deletes = self.delete_buffer.len();
195        base_len + inserts - deletes
196    }
197
198    pub fn is_empty(&self) -> bool {
199        self.len() == 0
200    }
201
202    /// Get the number of pending operations in buffers.
203    pub fn pending_operations(&self) -> usize {
204        self.insert_buffer.len() + self.delete_buffer.len()
205    }
206
207    fn maybe_rebuild(&mut self) {
208        if self.pending_operations() < self.rebuild_threshold {
209            return;
210        }
211
212        self.force_rebuild();
213    }
214
215    /// Force an immediate rebuild of the index.
216    pub fn force_rebuild(&mut self) {
217        let mut all_data: Vec<T> = self
218            .base_data
219            .iter()
220            .copied()
221            .filter(|v| !self.delete_buffer.contains(v))
222            .collect();
223
224        all_data.extend(self.insert_buffer.iter().copied());
225        all_data.sort();
226
227        if all_data.is_empty() {
228            self.base_data = Vec::new();
229            self.base_index = None;
230        } else {
231            match Static::new(&all_data, self.epsilon, self.epsilon_recursive) {
232                Ok(new_index) => {
233                    self.base_data = all_data;
234                    self.base_index = Some(new_index);
235                }
236                Err(_) => {
237                    self.base_data = all_data;
238                    self.base_index = None;
239                }
240            }
241        }
242
243        self.insert_buffer.clear();
244        self.delete_buffer.clear();
245    }
246
247    /// Iterate over all values in sorted order.
248    pub fn iter(&self) -> impl Iterator<Item = T> + '_ {
249        let base_iter = self
250            .base_data
251            .iter()
252            .copied()
253            .filter(|v| !self.delete_buffer.contains(v));
254
255        let buffer_iter = self.insert_buffer.iter().copied();
256
257        MergedIterator::new(base_iter, buffer_iter)
258    }
259
260    /// Iterate over values in the given range.
261    pub fn range<'a, R>(&'a self, range: R) -> impl Iterator<Item = T> + 'a
262    where
263        R: RangeBounds<T> + Clone + 'a,
264    {
265        let range_clone = range.clone();
266        let base_iter = self
267            .base_data
268            .iter()
269            .copied()
270            .filter(|v| !self.delete_buffer.contains(v))
271            .filter(move |v| range_clone.contains(v));
272
273        let buffer_iter = self
274            .insert_buffer
275            .range((range.start_bound().cloned(), range.end_bound().cloned()))
276            .copied();
277
278        MergedIterator::new(base_iter, buffer_iter)
279    }
280
281    /// Approximate memory usage in bytes.
282    pub fn size_in_bytes(&self) -> usize {
283        let base_size =
284            core::mem::size_of::<Self>() + self.base_data.capacity() * core::mem::size_of::<T>();
285
286        let index_size = self
287            .base_index
288            .as_ref()
289            .map_or(0, |idx| idx.size_in_bytes());
290
291        let buffer_size =
292            (self.insert_buffer.len() + self.delete_buffer.len()) * core::mem::size_of::<T>() * 3;
293
294        base_size + index_size + buffer_size
295    }
296}
297
298struct MergedIterator<I1, I2, T>
299where
300    I1: Iterator<Item = T>,
301    I2: Iterator<Item = T>,
302    T: Ord,
303{
304    iter1: core::iter::Peekable<I1>,
305    iter2: core::iter::Peekable<I2>,
306}
307
308impl<I1, I2, T> MergedIterator<I1, I2, T>
309where
310    I1: Iterator<Item = T>,
311    I2: Iterator<Item = T>,
312    T: Ord,
313{
314    fn new(iter1: I1, iter2: I2) -> Self {
315        Self {
316            iter1: iter1.peekable(),
317            iter2: iter2.peekable(),
318        }
319    }
320}
321
322impl<I1, I2, T> Iterator for MergedIterator<I1, I2, T>
323where
324    I1: Iterator<Item = T>,
325    I2: Iterator<Item = T>,
326    T: Ord + Copy,
327{
328    type Item = T;
329
330    fn next(&mut self) -> Option<Self::Item> {
331        match (self.iter1.peek(), self.iter2.peek()) {
332            (Some(&a), Some(&b)) => {
333                if a < b {
334                    self.iter1.next()
335                } else if a > b {
336                    self.iter2.next()
337                } else {
338                    self.iter2.next();
339                    self.iter1.next()
340                }
341            }
342            (Some(_), None) => self.iter1.next(),
343            (None, Some(_)) => self.iter2.next(),
344            (None, None) => None,
345        }
346    }
347}
348
349impl<T: Indexable + Ord + Copy> core::iter::Extend<T> for Dynamic<T>
350where
351    T::Key: Ord,
352{
353    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
354        self.insert_buffer.extend(iter);
355        self.maybe_rebuild();
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362
363    #[test]
364    fn test_dynamic_empty() {
365        let index: Dynamic<u64> = Dynamic::new(16, 4);
366        assert!(index.is_empty());
367        assert_eq!(index.len(), 0);
368    }
369
370    #[test]
371    fn test_dynamic_insert() {
372        let mut index: Dynamic<u64> = Dynamic::new(16, 4);
373
374        index.insert(5);
375        index.insert(3);
376        index.insert(7);
377
378        assert!(index.contains(&3));
379        assert!(index.contains(&5));
380        assert!(index.contains(&7));
381        assert!(!index.contains(&4));
382    }
383
384    #[test]
385    fn test_dynamic_extend() {
386        let mut index: Dynamic<u64> = Dynamic::new(16, 4);
387
388        index.extend(5..7);
389
390        assert!(index.contains(&5));
391        assert!(index.contains(&6));
392        assert!(!index.contains(&7));
393    }
394
395    #[test]
396    fn test_dynamic_remove() {
397        let data: Vec<u64> = (0..100).collect();
398        let mut index = Dynamic::from_sorted(data, 16, 4).unwrap();
399
400        assert!(index.contains(&50));
401        assert!(index.remove(&50));
402        assert!(!index.contains(&50));
403    }
404
405    #[test]
406    fn test_dynamic_rebuild() {
407        let mut index: Dynamic<u64> = Dynamic::new(16, 4).with_rebuild_threshold(10);
408
409        for i in 0..20 {
410            index.insert(i);
411        }
412
413        assert_eq!(index.pending_operations(), 0);
414
415        for i in 0..20 {
416            assert!(index.contains(&i), "Missing value {}", i);
417        }
418    }
419
420    #[test]
421    fn test_dynamic_iter() {
422        let mut index: Dynamic<u64> = Dynamic::new(16, 4);
423
424        index.insert(3);
425        index.insert(1);
426        index.insert(2);
427
428        let collected: Vec<u64> = index.iter().collect();
429        assert_eq!(collected, vec![1, 2, 3]);
430    }
431}