persy/index/tree/
nodes.rs

1use crate::{
2    error::{IndexChangeError, PIRes},
3    id::RecRef,
4    index::{
5        config::{IndexOrd, IndexTypeInternal, ValueMode},
6        tree::PosRef,
7    },
8};
9use std::{
10    cmp::Ordering,
11    fmt::Display,
12    iter::{Peekable, Rev},
13    ops::Bound,
14    vec::IntoIter,
15};
16
17pub(crate) fn block_size_for_split(len: usize, limit: usize) -> usize {
18    // get the maximum num of blocks for the size (+1 for the leftover)
19    // then get the size of the block from the count
20    len / (len / limit + 1) + 1
21}
22
23pub(crate) fn block_counts_for_split(len: usize, limit: usize) -> usize {
24    let size = block_size_for_split(len, limit);
25    let count = len / size;
26    if len % size != 0 {
27        count + 1
28    } else {
29        count
30    }
31}
32
33pub type TreeNodeRef = RecRef;
34#[derive(Clone)]
35pub enum TreeNode<K, V> {
36    Node(Node<K>),
37    Leaf(Leaf<K, V>),
38}
39
40impl<K, V> TreeNode<K, V> {
41    pub fn get_prev(&self) -> &Option<K> {
42        match self {
43            TreeNode::Node(n) => n.get_prev(),
44            TreeNode::Leaf(l) => l.get_prev(),
45        }
46    }
47    pub fn get_next(&self) -> &Option<K> {
48        match self {
49            TreeNode::Node(n) => n.get_next(),
50            TreeNode::Leaf(l) => l.get_next(),
51        }
52    }
53    pub fn len(&self) -> usize {
54        match self {
55            TreeNode::Node(n) => n.len(),
56            TreeNode::Leaf(l) => l.len(),
57        }
58    }
59
60    pub fn as_mut_node(&mut self) -> &mut Node<K> {
61        match self {
62            TreeNode::Node(n) => n,
63            _ => panic!("not a node"),
64        }
65    }
66
67    pub fn as_mut_leaf(&mut self) -> &mut Leaf<K, V> {
68        match self {
69            TreeNode::Node(_) => panic!("not a leaf"),
70            TreeNode::Leaf(l) => l,
71        }
72    }
73}
74
75impl<K: IndexOrd + Clone, V> TreeNode<K, V> {
76    pub fn merge_right(&mut self, k: &K, node: &mut TreeNode<K, V>) {
77        match self {
78            TreeNode::Node(n) => match node {
79                TreeNode::Node(n1) => {
80                    n.merge_right(k, n1);
81                }
82                TreeNode::Leaf(_) => {
83                    panic!("impossible merge a leaf to node");
84                }
85            },
86            TreeNode::Leaf(l) => match node {
87                TreeNode::Node(_) => {
88                    panic!("impossible merge a node to leaf");
89                }
90                TreeNode::Leaf(l1) => {
91                    l.merge_right(k, l1);
92                }
93            },
94        }
95    }
96
97    pub fn split(&mut self, top_limit: usize) -> Vec<(K, TreeNode<K, V>)> {
98        match self {
99            TreeNode::Node(n) => n
100                .split(top_limit)
101                .into_iter()
102                .map(|x| (x.0, TreeNode::Node(x.1)))
103                .collect(),
104            TreeNode::Leaf(l) => l
105                .split(top_limit)
106                .into_iter()
107                .map(|x| (x.0, TreeNode::Leaf(x.1)))
108                .collect(),
109        }
110    }
111
112    pub fn check_range(&self, k: &K) -> bool {
113        match self {
114            TreeNode::Node(n) => n.check_range(k),
115            TreeNode::Leaf(l) => l.check_range(k),
116        }
117    }
118}
119
120impl<K: Display, V: Display> Display for TreeNode<K, V> {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        match &self {
123            TreeNode::Node(n) => write!(f, "Node: {}", n)?,
124            TreeNode::Leaf(l) => write!(f, "Leaf: {}", l)?,
125        }
126        Ok(())
127    }
128}
129
130pub(crate) fn compare<T: IndexOrd>(first: &T, second: &T) -> Ordering {
131    first.cmp(second)
132}
133
134pub struct PosIter<'a, K> {
135    nodes: &'a Node<K>,
136    pos: usize,
137}
138impl<'a, K> PosIter<'a, K> {
139    fn new(nodes: &'a Node<K>, pos: usize) -> Self {
140        Self { nodes, pos }
141    }
142}
143impl<K> std::iter::Iterator for PosIter<'_, K>
144where
145    K: Clone,
146{
147    type Item = PosRef<K>;
148    fn next(&mut self) -> Option<Self::Item> {
149        if self.pos < self.nodes.len() {
150            let pointer = self.pos;
151            self.pos += 1;
152
153            let k = if pointer == 0 {
154                self.nodes.prev.clone().expect("should be called on middle nodes")
155            } else {
156                self.nodes.keys[pointer - 1].clone()
157            };
158            Some(PosRef::new(&k, pointer, self.nodes.pointers[pointer]))
159        } else {
160            None
161        }
162    }
163}
164
165#[derive(Clone)]
166pub struct Node<K> {
167    pub keys: Vec<K>,
168    pub pointers: Vec<TreeNodeRef>,
169    pub prev: Option<K>,
170    pub next: Option<K>,
171}
172
173impl<K> Node<K> {
174    pub fn len(&self) -> usize {
175        self.pointers.len()
176    }
177    pub(crate) fn get_next(&self) -> &Option<K> {
178        &self.next
179    }
180    pub(crate) fn get_prev(&self) -> &Option<K> {
181        &self.prev
182    }
183}
184
185impl<K: IndexOrd + Clone> Node<K> {
186    pub fn new_from_split(left: TreeNodeRef, values: &[(K, TreeNodeRef)]) -> Node<K> {
187        let keys = values.iter().map(|z| z.0.clone()).collect();
188        let mut pointers: Vec<TreeNodeRef> = values.iter().map(|z| z.1).collect();
189        pointers.insert(0, left);
190        Node {
191            keys,
192            pointers,
193            prev: None,
194            next: None,
195        }
196    }
197
198    pub fn remove_key(&mut self, k: &K) -> Option<K> {
199        match self.keys.binary_search_by(|x| compare(x, k)) {
200            Ok(index) => {
201                self.keys.remove(index);
202                self.pointers.remove(index + 1);
203                None
204            }
205            Err(index) => {
206                if index == 0 {
207                    let prev = if !self.keys.is_empty() {
208                        Some(self.keys.remove(0))
209                    } else {
210                        self.next.clone()
211                    };
212                    self.prev = prev.clone();
213                    self.pointers.remove(0);
214                    prev
215                } else {
216                    unreachable!("when removing a key it should be missing only for index 0");
217                }
218            }
219        }
220    }
221
222    pub fn add(&mut self, pos: usize, k: &K, node_ref: TreeNodeRef) {
223        self.keys.insert(pos, k.clone());
224        self.pointers.insert(pos + 1, node_ref);
225    }
226
227    pub fn find(&self, k: &K) -> PosRef<K> {
228        match self.keys.binary_search_by(|x| compare(x, k)) {
229            Ok(index) => PosRef::new(&self.keys[index], index + 1, self.pointers[index + 1]),
230            Err(index) => {
231                if index == 0 {
232                    PosRef::new(self.prev.as_ref().unwrap_or(k), index, self.pointers[index])
233                } else {
234                    PosRef::new(&self.keys[index - 1], index, self.pointers[index])
235                }
236            }
237        }
238    }
239
240    pub fn iter_from<'a>(&'a self, pos: Option<&PosRef<K>>) -> Option<PosIter<'a, K>> {
241        if pos.is_none() && self.prev.is_none() {
242            return None;
243        }
244        let base = pos.map(|x| x.pos + 1).unwrap_or(0);
245        if base > self.len() {
246            return None;
247        }
248        Some(PosIter::new(self, base))
249    }
250
251    pub fn swap_next(&mut self, new: &K) -> bool {
252        if let Some(next) = &self.next {
253            if compare(next, new) != Ordering::Equal {
254                self.next = Some(new.clone());
255                true
256            } else {
257                false
258            }
259        } else {
260            false
261        }
262    }
263
264    pub fn find_pre_write(&self, k: &K) -> Option<PosRef<K>> {
265        let pos = match self.keys.binary_search_by(|x| compare(x, k)) {
266            Ok(index) => {
267                if index == 0 {
268                    PosRef::new(self.prev.as_ref().unwrap_or(k), index, self.pointers[index])
269                } else {
270                    PosRef::new(&self.keys[index - 1], index, self.pointers[index])
271                }
272            }
273            Err(index) => {
274                if index == 0 {
275                    PosRef::new(self.prev.as_ref().unwrap_or(k), index, self.pointers[index])
276                } else {
277                    PosRef::new(&self.keys[index - 1], index, self.pointers[index])
278                }
279            }
280        };
281
282        if pos.pos == 0 {
283            if let Some(pk) = &self.prev {
284                if compare(k, pk) == Ordering::Less {
285                    return None;
286                }
287            }
288        } else if pos.pos == self.pointers.len() {
289            if let Some(nk) = &self.next {
290                if compare(k, nk) != Ordering::Less {
291                    return None;
292                }
293            }
294        }
295
296        Some(pos)
297    }
298
299    pub fn find_write(&self, k: &K) -> Option<PosRef<K>> {
300        let pos = self.find(k);
301        if pos.pos == 0 {
302            if let Some(pk) = &self.prev {
303                if compare(k, pk) == Ordering::Less {
304                    return None;
305                }
306            }
307        } else if pos.pos == self.pointers.len() {
308            if let Some(nk) = &self.next {
309                if compare(k, nk) != Ordering::Less {
310                    return None;
311                }
312            }
313        }
314
315        Some(pos)
316    }
317
318    pub fn get(&self, pos: usize) -> TreeNodeRef {
319        self.pointers[pos]
320    }
321
322    pub fn insert_after(&mut self, pos: usize, values: &[(K, TreeNodeRef)]) {
323        for (key, node) in values.iter().rev() {
324            self.add(pos, key, *node);
325        }
326    }
327
328    pub fn insert_after_key(&mut self, values: &[(K, TreeNodeRef)]) {
329        if let Some((k, _)) = values.first() {
330            let pos = self.find(k);
331            self.insert_after(pos.pos, values);
332        }
333    }
334
335    pub fn split(&mut self, max: usize) -> Vec<(K, Node<K>)> {
336        let mut split_result: Vec<(K, Node<K>)> = Vec::new();
337        let size = self.keys.len();
338        let split_offset = block_size_for_split(size, max);
339        let mut others = self.keys.split_off(split_offset - 1);
340        let mut other_pointers = self.pointers.split_off(split_offset);
341
342        let pre_next = self.next.clone();
343        while others.len() > max {
344            let new = others.split_off(split_offset);
345            let new_pointers = other_pointers.split_off(split_offset);
346            let key = others.remove(0);
347            if let Some((_, ref mut x)) = split_result.last_mut() {
348                x.next = Some(key.clone());
349            } else {
350                self.next = Some(key.clone());
351            }
352            let leaf = Node {
353                keys: others,
354                pointers: other_pointers,
355                prev: Some(key.clone()),
356                next: None,
357            };
358            split_result.push((key, leaf));
359            others = new;
360            other_pointers = new_pointers;
361        }
362
363        let key = others.remove(0);
364        if let Some((_, ref mut x)) = split_result.last_mut() {
365            x.next = Some(key.clone());
366        } else {
367            self.next = Some(key.clone());
368        }
369        let leaf = Node {
370            keys: others,
371            pointers: other_pointers,
372            prev: Some(key.clone()),
373            next: pre_next,
374        };
375        split_result.push((key, leaf));
376        split_result
377    }
378
379    #[cfg(test)]
380    pub fn merge_left(&mut self, owner: K, nodes: &mut Node<K>) {
381        let mut keys = std::mem::take(&mut nodes.keys);
382        let mut pointers = std::mem::take(&mut nodes.pointers);
383        keys.push(owner);
384        keys.append(&mut self.keys);
385        pointers.append(&mut self.pointers);
386        self.keys = keys;
387        self.pointers = pointers;
388    }
389
390    pub fn merge_right(&mut self, k: &K, nodes: &mut Node<K>) {
391        self.keys.push(k.clone());
392        self.keys.append(&mut nodes.keys);
393        self.pointers.append(&mut nodes.pointers);
394        self.next = nodes.next.clone();
395    }
396
397    pub fn replace_key(&mut self, node: &TreeNodeRef, new_key: &K) -> (bool, bool) {
398        for (p, r) in self.pointers.iter().enumerate() {
399            if r == node {
400                return if p == 0 {
401                    if let Some(p) = &self.prev {
402                        if compare(p, new_key) != Ordering::Equal {
403                            self.prev = Some(new_key.clone());
404                            (true, true)
405                        } else {
406                            (false, false)
407                        }
408                    } else {
409                        (false, false)
410                    }
411                } else if compare(&self.keys[p - 1], new_key) != Ordering::Equal {
412                    self.keys[p - 1] = new_key.clone();
413                    (true, false)
414                } else {
415                    (false, false)
416                };
417            }
418        }
419        (false, false)
420    }
421
422    pub fn find_key(&self, node: &TreeNodeRef) -> Option<&K> {
423        for (p, r) in self.pointers.iter().enumerate() {
424            if r == node {
425                if p == 0 {
426                    return self.prev.as_ref();
427                } else {
428                    return Some(&self.keys[p - 1]);
429                }
430            }
431        }
432        None
433    }
434
435    fn check_range(&self, k: &K) -> bool {
436        if let Some(x) = &self.prev {
437            if compare(x, k) == Ordering::Greater {
438                return false;
439            }
440        }
441        if let Some(x) = &self.next {
442            if compare(x, k) == Ordering::Less {
443                return false;
444            }
445        }
446        true
447    }
448}
449
450impl<K: Display> Display for Node<K> {
451    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452        write!(f, "[")?;
453        for x in 0..self.pointers.len() {
454            if x == 0 {
455                write!(
456                    f,
457                    "{:?}={}",
458                    self.prev.as_ref().map(|p| format!("{}", p)),
459                    self.pointers[x]
460                )?;
461            } else {
462                write!(f, ",{}={}", self.keys[x - 1], self.pointers[x])?;
463            }
464        }
465        write!(f, "]")?;
466        Ok(())
467    }
468}
469
470/// The associated value to the index key
471#[derive(Clone, PartialEq, Debug, Eq)]
472pub enum Value<V> {
473    /// A cluster of values
474    Cluster(Vec<V>),
475    /// A single value entry
476    Single(V),
477}
478
479impl<V> IntoIterator for Value<V> {
480    type Item = V;
481    type IntoIter = IntoIter<V>;
482
483    fn into_iter(self) -> IntoIter<V> {
484        match self {
485            Value::Single(v) => vec![v].into_iter(),
486            Value::Cluster(v) => v.into_iter(),
487        }
488    }
489}
490
491pub struct PageIter<K, V> {
492    pub iter: Peekable<IntoIter<LeafEntry<K, V>>>,
493}
494
495pub struct PageIterBack<K, V> {
496    pub iter: Peekable<Rev<IntoIter<LeafEntry<K, V>>>>,
497}
498
499#[derive(Clone)]
500pub struct Leaf<K, V> {
501    pub entries: Vec<LeafEntry<K, V>>,
502    pub prev: Option<K>,
503    pub next: Option<K>,
504}
505
506#[derive(Clone)]
507pub struct LeafEntry<K, V> {
508    pub key: K,
509    pub value: Value<V>,
510}
511
512impl<K, V> Leaf<K, V> {
513    pub fn new() -> Leaf<K, V> {
514        Leaf {
515            entries: Vec::new(),
516            prev: None,
517            next: None,
518        }
519    }
520
521    pub fn len(&self) -> usize {
522        self.entries.len()
523    }
524
525    #[cfg(test)]
526    pub fn merge_left(&mut self, leaf: &mut Leaf<K, V>) {
527        let mut entries = std::mem::take(&mut leaf.entries);
528        entries.append(&mut self.entries);
529        self.entries = entries;
530    }
531
532    pub(crate) fn get_next(&self) -> &Option<K> {
533        &self.next
534    }
535    pub(crate) fn get_prev(&self) -> &Option<K> {
536        &self.prev
537    }
538}
539
540impl<K: IndexOrd + Clone, V> Leaf<K, V> {
541    fn check_range(&self, k: &K) -> bool {
542        if let Some(x) = &self.prev {
543            if compare(x, k) == Ordering::Greater {
544                return false;
545            }
546        }
547        if let Some(x) = &self.next {
548            if compare(x, k) == Ordering::Less {
549                return false;
550            }
551        }
552        true
553    }
554
555    pub fn merge_right(&mut self, _k: &K, leaf: &mut Leaf<K, V>) {
556        self.entries.append(&mut leaf.entries);
557        self.next = leaf.next.clone();
558    }
559
560    pub fn split(&mut self, max: usize) -> Vec<(K, Leaf<K, V>)> {
561        let mut split_result: Vec<(K, Leaf<K, V>)> = Vec::new();
562        let size = self.entries.len();
563        let split_offset = block_size_for_split(size, max);
564        let mut others = self.entries.split_off(split_offset);
565        let pre_next = self.next.clone();
566        while others.len() > max {
567            let new = others.split_off(split_offset);
568            let key = others[0].key.clone();
569            if let Some((_, ref mut x)) = split_result.last_mut() {
570                x.next = Some(key.clone());
571            } else {
572                self.next = Some(key.clone());
573            }
574            let leaf = Leaf {
575                entries: others,
576                prev: Some(key.clone()),
577                next: None,
578            };
579            split_result.push((key, leaf));
580            others = new;
581        }
582
583        let key = others[0].key.clone();
584        if let Some((_, ref mut x)) = split_result.last_mut() {
585            x.next = Some(key.clone());
586        } else {
587            self.next = Some(key.clone());
588        }
589        let leaf = Leaf {
590            entries: others,
591            prev: Some(key.clone()),
592            next: pre_next,
593        };
594        split_result.push((key, leaf));
595        split_result
596    }
597}
598impl<K: IndexOrd + Clone, V: Clone> Leaf<K, V> {
599    pub fn find(&self, k: &K) -> Result<(K, Value<V>), usize> {
600        self.entries
601            .binary_search_by(|n| compare(&n.key, k))
602            .map(|index| (self.entries[index].key.clone(), self.entries[index].value.clone()))
603    }
604
605    pub fn add_at(&mut self, pos: usize, k: &K, v: &V, _value_mode: ValueMode) {
606        self.entries.insert(
607            pos,
608            LeafEntry {
609                key: k.clone(),
610                value: Value::Single(v.clone()),
611            },
612        );
613    }
614
615    pub fn iter_from(&self, bound: Bound<&K>) -> IntoIter<LeafEntry<K, V>> {
616        let index = match bound {
617            Bound::Included(k) => match self.entries.binary_search_by(|n| compare(&n.key, k)) {
618                Ok(index) => index,
619                Err(index) => index,
620            },
621            Bound::Excluded(k) => match self.entries.binary_search_by(|n| compare(&n.key, k)) {
622                Ok(index) => index + 1,
623                Err(index) => index,
624            },
625            Bound::Unbounded => 0,
626        };
627        self.entries[index..].to_vec().into_iter()
628    }
629
630    pub fn back_iter_from(&self, bound: Bound<&K>) -> Rev<IntoIter<LeafEntry<K, V>>> {
631        let index = match bound {
632            Bound::Included(k) => match self.entries.binary_search_by(|n| compare(&n.key, k)) {
633                Ok(index) => index + 1,
634                Err(index) => index,
635            },
636            Bound::Excluded(k) => match self.entries.binary_search_by(|n| compare(&n.key, k)) {
637                Ok(index) => index,
638                Err(index) => index,
639            },
640            Bound::Unbounded => self.len(),
641        };
642        self.entries[..index].to_vec().into_iter().rev()
643    }
644}
645
646impl<K: IndexTypeInternal, V: IndexOrd + Clone> Leaf<K, V> {
647    pub fn add(&mut self, k: &K, v: &V, value_mode: ValueMode, index_name: &str) -> PIRes<()> {
648        if self.len() > 0 {
649            self.insert_or_update(k, v, value_mode, index_name)
650        } else {
651            self.add_at(0, k, v, value_mode);
652            Ok(())
653        }
654    }
655
656    pub fn insert_or_update(&mut self, k: &K, v: &V, value_mode: ValueMode, index_name: &str) -> PIRes<()> {
657        match self.entries.binary_search_by(|n| compare(&n.key, k)) {
658            Ok(index) => {
659                let entry = &mut self.entries[index];
660                match value_mode {
661                    ValueMode::Replace => {
662                        entry.value = Value::Single(v.clone());
663                    }
664                    ValueMode::Exclusive => match entry.value {
665                        Value::Single(ref ev) => {
666                            if compare(ev, v) != Ordering::Equal {
667                                return Err(IndexChangeError::IndexDuplicateKey(
668                                    index_name.to_string(),
669                                    format!("{}", k),
670                                ));
671                            }
672                        }
673                        _ => unreachable!("Exclusive leafs never have cluster values"),
674                    },
675                    ValueMode::Cluster => {
676                        let mut new_value = None;
677                        match entry.value {
678                            Value::Single(ref ev) => match compare(ev, v) {
679                                Ordering::Equal => {}
680                                Ordering::Less => {
681                                    new_value = Some(Value::Cluster(vec![ev.clone(), v.clone()]));
682                                }
683                                Ordering::Greater => {
684                                    new_value = Some(Value::Cluster(vec![v.clone(), ev.clone()]));
685                                }
686                            },
687                            Value::Cluster(ref mut cl) => {
688                                if let Err(index) = cl.binary_search_by(|x| compare(x, v)) {
689                                    cl.insert(index, v.clone());
690                                }
691                            }
692                        }
693                        if let Some(v) = new_value {
694                            entry.value = v;
695                        }
696                    }
697                }
698            }
699            Err(index) => self.add_at(index, k, v, value_mode),
700        }
701        Ok(())
702    }
703
704    pub fn remove(&mut self, k: &K, v: &Option<V>) -> bool {
705        match self.entries.binary_search_by(|n| compare(&n.key, k)) {
706            Ok(index) => {
707                if let Some(rv) = v {
708                    let mut removed = false;
709                    let remove_entry = {
710                        let mut new_value = None;
711                        let entry = &mut self.entries[index];
712                        let remove_entry = match &mut entry.value {
713                            Value::Single(val) => {
714                                if compare(val, rv) == Ordering::Equal {
715                                    removed = true;
716                                    true
717                                } else {
718                                    false
719                                }
720                            }
721                            Value::Cluster(ref mut cl) => {
722                                if let Ok(index) = cl.binary_search_by(|x| compare(x, rv)) {
723                                    removed = true;
724                                    cl.remove(index);
725                                }
726                                if cl.len() == 1 {
727                                    new_value = Some(Value::Single(cl.pop().unwrap()));
728                                    false
729                                } else {
730                                    cl.is_empty()
731                                }
732                            }
733                        };
734                        if let Some(new) = new_value {
735                            entry.value = new;
736                        }
737                        remove_entry
738                    };
739                    if remove_entry {
740                        self.entries.remove(index);
741                    }
742                    removed
743                } else {
744                    self.entries.remove(index);
745                    true
746                }
747            }
748            Err(_) => false,
749        }
750    }
751}
752
753impl<K: Display, V: Display> Display for Leaf<K, V> {
754    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
755        write!(f, "[")?;
756        for x in 0..self.entries.len() {
757            write!(f, "{}=[", self.entries[x].key)?;
758            match &self.entries[x].value {
759                Value::Single(v) => write!(f, "{}", v)?,
760                Value::Cluster(c) => {
761                    for (c, ev) in c.iter().enumerate() {
762                        write!(f, "{}", ev)?;
763                        if c != 0 {
764                            write!(f, ",")?;
765                        }
766                    }
767                }
768            }
769            write!(f, "]")?;
770            if x != 0 {
771                write!(f, ",")?;
772            }
773        }
774        write!(f, "]")?;
775        Ok(())
776    }
777}