sled/
node.rs

1use std::{num::NonZeroU64, ops::Bound};
2
3use super::*;
4
5#[derive(Default, Debug, Clone, PartialEq)]
6pub struct Node {
7    pub(crate) prefix_len: u8,
8    pub(crate) next: Option<NonZeroU64>,
9    pub(crate) merging_child: Option<NonZeroU64>,
10    pub(crate) merging: bool,
11    pub(crate) lo: IVec,
12    pub(crate) hi: IVec,
13    pub(crate) data: Data,
14}
15
16impl Node {
17    pub(crate) fn new_hoisted_root(
18        left: PageId,
19        at: IVec,
20        right: PageId,
21    ) -> Node {
22        Node {
23            data: Data::Index(Index {
24                keys: vec![prefix::empty().into(), at],
25                pointers: vec![left, right],
26            }),
27            ..Node::default()
28        }
29    }
30
31    pub(crate) fn new_root(child_pid: PageId) -> Node {
32        Node {
33            data: Data::Index(Index {
34                keys: vec![prefix::empty().into()],
35                pointers: vec![child_pid],
36            }),
37            ..Node::default()
38        }
39    }
40
41    pub(crate) fn rss(&self) -> u64 {
42        std::mem::size_of::<Node>() as u64
43            + self.lo.len() as u64
44            + self.hi.len() as u64
45            + self.data.rss()
46    }
47
48    fn prefix_decode(&self, key: &[u8]) -> IVec {
49        prefix::decode(self.prefix(), key)
50    }
51
52    fn prefix_encode<'a>(&self, key: &'a [u8]) -> &'a [u8] {
53        assert!(*self.lo <= *key);
54        if !self.hi.is_empty() {
55            assert!(*self.hi > *key);
56        }
57
58        &key[self.prefix_len as usize..]
59    }
60
61    fn prefix(&self) -> &[u8] {
62        &self.lo[..self.prefix_len as usize]
63    }
64
65    pub(crate) fn apply(&mut self, link: &Link) {
66        use self::Link::*;
67
68        assert!(
69            !self.merging,
70            "somehow a link was applied to a node after it was merged"
71        );
72
73        match *link {
74            Set(ref k, ref v) => {
75                self.set_leaf(k.clone(), v.clone());
76            }
77            Del(ref k) => {
78                self.del_leaf(k);
79            }
80            ParentMergeIntention(pid) => {
81                assert!(
82                    self.merging_child.is_none(),
83                    "trying to merge {:?} into node {:?} which \
84                     is already merging another child",
85                    link,
86                    self
87                );
88                self.merging_child = Some(NonZeroU64::new(pid).unwrap());
89            }
90            ParentMergeConfirm => {
91                assert!(self.merging_child.is_some());
92                let merged_child = self
93                    .merging_child
94                    .take()
95                    .expect(
96                        "we should have a specific \
97                     child that was merged if this \
98                     link appears here",
99                    )
100                    .get();
101                self.data.parent_merge_confirm(merged_child);
102            }
103            ChildMergeCap => {
104                self.merging = true;
105            }
106        }
107    }
108
109    pub(crate) fn set_leaf(&mut self, key: IVec, val: IVec) {
110        if !self.hi.is_empty() {
111            assert!(*key < self.hi[self.prefix_len as usize..]);
112        }
113        if let Data::Leaf(ref mut leaf) = self.data {
114            let search = leaf.keys.binary_search_by(|k| fastcmp(k, &key));
115            match search {
116                Ok(idx) => leaf.values[idx] = val,
117                Err(idx) => {
118                    leaf.keys.insert(idx, key);
119                    leaf.values.insert(idx, val);
120                }
121            }
122            testing_assert!(is_sorted(&leaf.keys));
123        } else {
124            panic!("tried to Set a value to an index");
125        }
126    }
127
128    pub(crate) fn del_leaf(&mut self, key: &IVec) {
129        if let Data::Leaf(ref mut leaf) = self.data {
130            let search = leaf.keys.binary_search_by(|k| fastcmp(k, key));
131            if let Ok(idx) = search {
132                leaf.keys.remove(idx);
133                leaf.values.remove(idx);
134            }
135            testing_assert!(is_sorted(&leaf.keys));
136        } else {
137            panic!("tried to attach a Del to an Index chain");
138        }
139    }
140
141    pub(crate) fn parent_split(&mut self, at: &[u8], to: PageId) -> bool {
142        if let Data::Index(ref mut index) = self.data {
143            let encoded_sep = &at[self.prefix_len as usize..];
144            match index.keys.binary_search_by(|k| fastcmp(k, encoded_sep)) {
145                Ok(_) => {
146                    debug!(
147                        "parent_split skipped because \
148                         parent already contains child \
149                         at split point due to deep race"
150                    );
151                    return false;
152                }
153                Err(idx) => {
154                    index.keys.insert(idx, IVec::from(encoded_sep));
155                    index.pointers.insert(idx, to)
156                }
157            }
158            testing_assert!(is_sorted(&index.keys));
159        } else {
160            panic!("tried to attach a ParentSplit to a Leaf chain");
161        }
162
163        true
164    }
165
166    pub(crate) fn split(mut self) -> (Node, Node) {
167        fn split_inner<T>(
168            keys: &mut Vec<IVec>,
169            values: &mut Vec<T>,
170            old_prefix: &[u8],
171            old_hi: &[u8],
172            suffix_truncation: bool,
173        ) -> (IVec, u8, Vec<IVec>, Vec<T>)
174        where
175            T: Clone + Ord,
176        {
177            let split_point = keys.len() / 2 + 1;
178            let right_keys = keys.split_off(split_point);
179            let right_values = values.split_off(split_point);
180            let right_min = &right_keys[0];
181            let left_max = &keys.last().unwrap();
182
183            let splitpoint_length = if suffix_truncation {
184                // we can only perform suffix truncation when
185                // choosing the split points for leaf nodes.
186                // split points bubble up into indexes, but
187                // an important invariant is that for indexes
188                // the first item always matches the lo key,
189                // otherwise ranges would be permanently
190                // inaccessible by falling into the gap
191                // during a split.
192                right_min
193                    .iter()
194                    .zip(left_max.iter())
195                    .take_while(|(a, b)| a == b)
196                    .count()
197                    + 1
198            } else {
199                right_min.len()
200            };
201
202            let split_point: IVec =
203                prefix::decode(old_prefix, &right_min[..splitpoint_length]);
204
205            assert!(!split_point.is_empty());
206
207            let new_prefix_len = old_hi
208                .iter()
209                .zip(split_point.iter())
210                .take_while(|(a, b)| a == b)
211                .take(u8::max_value() as usize)
212                .count();
213
214            assert!(
215                new_prefix_len >= old_prefix.len(),
216                "new prefix length {} should be greater than \
217                 or equal to the old prefix length {}",
218                new_prefix_len,
219                old_prefix.len()
220            );
221
222            let mut right_keys_data = Vec::with_capacity(right_keys.len());
223
224            for k in right_keys {
225                let k: IVec = if new_prefix_len == old_prefix.len() {
226                    k.clone()
227                } else {
228                    // shave off additional prefixed bytes
229                    prefix::reencode(old_prefix, &k, new_prefix_len)
230                };
231                right_keys_data.push(k);
232            }
233
234            testing_assert!(is_sorted(&right_keys_data));
235
236            (
237                split_point,
238                u8::try_from(new_prefix_len).unwrap(),
239                right_keys_data,
240                right_values,
241            )
242        }
243
244        let prefixed_lo = &self.lo[..self.prefix_len as usize];
245        let prefixed_hi = &self.hi;
246        let (split, right_prefix_len, right_data) = match self.data {
247            Data::Index(ref mut index) => {
248                let (split, right_prefix_len, right_keys, right_values) =
249                    split_inner(
250                        &mut index.keys,
251                        &mut index.pointers,
252                        prefixed_lo,
253                        prefixed_hi,
254                        false,
255                    );
256
257                (
258                    split,
259                    right_prefix_len,
260                    Data::Index(Index {
261                        keys: right_keys,
262                        pointers: right_values,
263                    }),
264                )
265            }
266            Data::Leaf(ref mut leaf) => {
267                let (split, right_prefix_len, right_keys, right_values) =
268                    split_inner(
269                        &mut leaf.keys,
270                        &mut leaf.values,
271                        prefixed_lo,
272                        prefixed_hi,
273                        true,
274                    );
275
276                (
277                    split,
278                    right_prefix_len,
279                    Data::Leaf(Leaf { keys: right_keys, values: right_values }),
280                )
281            }
282        };
283
284        let right = Node {
285            data: right_data,
286            next: self.next,
287            lo: split.clone(),
288            hi: self.hi.clone(),
289            merging_child: None,
290            merging: false,
291            prefix_len: right_prefix_len,
292        };
293
294        self.hi = split;
295
296        let new_prefix_len = self
297            .hi
298            .iter()
299            .zip(self.lo.iter())
300            .take_while(|(a, b)| a == b)
301            .take(u8::max_value() as usize)
302            .count();
303
304        if new_prefix_len != self.prefix_len as usize {
305            match self.data {
306                Data::Index(ref mut index) => {
307                    for k in &mut index.keys {
308                        *k = prefix::reencode(prefixed_lo, k, new_prefix_len);
309                    }
310                }
311                Data::Leaf(ref mut leaf) => {
312                    for k in &mut leaf.keys {
313                        *k = prefix::reencode(prefixed_lo, k, new_prefix_len);
314                    }
315                }
316            }
317        }
318
319        self.prefix_len = u8::try_from(new_prefix_len).unwrap();
320
321        // intentionally make this the end to make
322        // any issues pop out with setting it
323        // correctly after the split.
324        self.next = None;
325
326        if self.hi.is_empty() {
327            assert_eq!(self.prefix_len, 0);
328        }
329
330        assert!(!(self.lo.is_empty() && self.hi.is_empty()));
331        assert!(!(self.lo.is_empty() && (self.prefix_len > 0)));
332        assert!(self.lo.len() >= self.prefix_len as usize);
333        assert!(self.hi.len() >= self.prefix_len as usize);
334        assert!(!(right.lo.is_empty() && right.hi.is_empty()));
335        assert!(!(right.lo.is_empty() && (right.prefix_len > 0)));
336        assert!(right.lo.len() >= right.prefix_len as usize);
337        assert!(right.hi.len() >= right.prefix_len as usize);
338
339        if !self.lo.is_empty() && !self.hi.is_empty() {
340            assert!(self.lo < self.hi, "{:?}", self);
341        }
342
343        if !right.lo.is_empty() && !right.hi.is_empty() {
344            assert!(right.lo < right.hi, "{:?}", right);
345        }
346
347        (self, right)
348    }
349
350    pub(crate) fn receive_merge(&self, right: &Node) -> Node {
351        fn receive_merge_inner<T>(
352            old_prefix: &[u8],
353            new_prefix_len: usize,
354            left_keys: &mut Vec<IVec>,
355            left_values: &mut Vec<T>,
356            right_keys: &[IVec],
357            right_values: &[T],
358        ) where
359            T: Debug + Clone + PartialOrd,
360        {
361            // When merging, the prefix should only shrink or
362            // stay the same length. Here we figure out if
363            // we need to add previous prefixed bytes.
364
365            for (k, v) in right_keys.iter().zip(right_values.iter()) {
366                let k = if new_prefix_len == old_prefix.len() {
367                    k.clone()
368                } else {
369                    prefix::reencode(old_prefix, k, new_prefix_len)
370                };
371                left_keys.push(k);
372                left_values.push(v.clone());
373            }
374            testing_assert!(
375                is_sorted(left_keys),
376                "should have been sorted: {:?}",
377                left_keys
378            );
379        }
380
381        let mut merged = self.clone();
382
383        let new_prefix_len = right
384            .hi
385            .iter()
386            .zip(self.lo.iter())
387            .take_while(|(a, b)| a == b)
388            .take(u8::max_value() as usize)
389            .count();
390
391        if new_prefix_len != merged.prefix_len as usize {
392            match merged.data {
393                Data::Index(ref mut index) => {
394                    for k in &mut index.keys {
395                        *k = prefix::reencode(self.prefix(), k, new_prefix_len);
396                    }
397                }
398                Data::Leaf(ref mut leaf) => {
399                    for k in &mut leaf.keys {
400                        *k = prefix::reencode(self.prefix(), k, new_prefix_len);
401                    }
402                }
403            }
404        }
405
406        merged.prefix_len = u8::try_from(new_prefix_len).unwrap();
407
408        match (&mut merged.data, &right.data) {
409            (Data::Index(ref mut left_index), Data::Index(ref right_index)) => {
410                receive_merge_inner(
411                    right.prefix(),
412                    new_prefix_len,
413                    &mut left_index.keys,
414                    &mut left_index.pointers,
415                    right_index.keys.as_ref(),
416                    right_index.pointers.as_ref(),
417                );
418            }
419            (Data::Leaf(ref mut left_leaf), Data::Leaf(ref right_leaf)) => {
420                receive_merge_inner(
421                    right.prefix(),
422                    new_prefix_len,
423                    &mut left_leaf.keys,
424                    &mut left_leaf.values,
425                    right_leaf.keys.as_ref(),
426                    right_leaf.values.as_ref(),
427                );
428            }
429            _ => panic!("Can't merge incompatible Data!"),
430        }
431
432        merged.hi = right.hi.clone();
433        merged.next = right.next;
434        merged
435    }
436
437    pub(crate) fn contains_upper_bound(&self, bound: &Bound<IVec>) -> bool {
438        match bound {
439            Bound::Excluded(bound) if self.hi >= *bound => true,
440            Bound::Included(bound) if self.hi > *bound => true,
441            _ => self.hi.is_empty(),
442        }
443    }
444
445    pub(crate) fn contains_lower_bound(
446        &self,
447        bound: &Bound<IVec>,
448        is_forward: bool,
449    ) -> bool {
450        match bound {
451            Bound::Excluded(bound)
452                if self.lo < *bound || (is_forward && *bound == self.lo) =>
453            {
454                true
455            }
456            Bound::Included(bound) if self.lo <= *bound => true,
457            Bound::Unbounded if !is_forward => self.hi.is_empty(),
458            _ => self.lo.is_empty(),
459        }
460    }
461
462    pub(crate) fn successor(
463        &self,
464        bound: &Bound<IVec>,
465    ) -> Option<(IVec, IVec)> {
466        assert!(!self.data.is_index());
467
468        // This encoding happens this way because
469        // keys cannot be lower than the node's lo key.
470        let predecessor_key = match bound {
471            Bound::Unbounded => self.prefix_encode(&self.lo),
472            Bound::Included(b) | Bound::Excluded(b) => {
473                let max = std::cmp::max(b, &self.lo);
474                self.prefix_encode(max)
475            }
476        };
477
478        let leaf = self.data.leaf_ref().unwrap();
479        let search =
480            leaf.keys.binary_search_by(|k| fastcmp(k, predecessor_key));
481
482        let start = match search {
483            Ok(start) => start,
484            Err(start) if start < leaf.keys.len() => start,
485            _ => return None,
486        };
487
488        for (idx, k) in leaf.keys[start..].iter().enumerate() {
489            match bound {
490                Bound::Excluded(b) if b[self.prefix_len as usize..] == **k => {
491                    // keep going because we wanted to exclude
492                    // this key.
493                    continue;
494                }
495                _ => {}
496            }
497            let decoded_key = self.prefix_decode(k);
498            return Some((decoded_key, leaf.values[start + idx].clone()));
499        }
500
501        None
502    }
503
504    pub(crate) fn predecessor(
505        &self,
506        bound: &Bound<IVec>,
507    ) -> Option<(IVec, IVec)> {
508        assert!(!self.data.is_index());
509
510        // This encoding happens this way because
511        // the rightmost (unbounded) node has
512        // a hi key represented by the empty slice
513        let successor_key = match bound {
514            Bound::Unbounded => {
515                if self.hi.is_empty() {
516                    None
517                } else {
518                    Some(IVec::from(self.prefix_encode(&self.hi)))
519                }
520            }
521            Bound::Included(b) => Some(IVec::from(self.prefix_encode(b))),
522            Bound::Excluded(b) => {
523                // we use manual prefix encoding here because
524                // there is an assertion in `prefix_encode`
525                // that asserts the key is within the node,
526                // and maybe `b` is above the node.
527                let encoded = &b[self.prefix_len as usize..];
528                Some(IVec::from(encoded))
529            }
530        };
531
532        let leaf = self.data.leaf_ref().unwrap();
533        let search = if let Some(successor_key) = successor_key {
534            leaf.keys.binary_search_by(|k| fastcmp(k, &successor_key))
535        } else if leaf.keys.is_empty() {
536            Err(0)
537        } else {
538            Ok(leaf.keys.len() - 1)
539        };
540
541        let end = match search {
542            Ok(end) => end,
543            Err(end) if end > 0 => end - 1,
544            _ => return None,
545        };
546
547        for (idx, k) in leaf.keys[0..=end].iter().enumerate().rev() {
548            match bound {
549                Bound::Excluded(b)
550                    if b.len() >= self.prefix_len as usize
551                        && b[self.prefix_len as usize..] == **k =>
552                {
553                    // keep going because we wanted to exclude
554                    // this key.
555                    continue;
556                }
557                _ => {}
558            }
559            let decoded_key = self.prefix_decode(k);
560
561            return Some((decoded_key, leaf.values[idx].clone()));
562        }
563        None
564    }
565
566    /// `leaf_pair_for_key` finds an existing value pair for a given key.
567    pub(crate) fn leaf_pair_for_key(
568        &self,
569        key: &[u8],
570    ) -> Option<(&IVec, &IVec)> {
571        let leaf = self
572            .data
573            .leaf_ref()
574            .expect("leaf_pair_for_key called on index node");
575
576        let suffix = &key[self.prefix_len as usize..];
577
578        let search = leaf.keys.binary_search_by(|k| fastcmp(k, suffix)).ok();
579
580        search.map(|idx| (&leaf.keys[idx], &leaf.values[idx]))
581    }
582
583    /// `node_kv_pair` returns either existing (node/key, value) pair or
584    /// (node/key, none) where a node/key is node level encoded key.
585    pub fn node_kv_pair(&self, key: &[u8]) -> (IVec, Option<IVec>) {
586        assert!(key >= self.lo.as_ref());
587        if !self.hi.is_empty() {
588            assert!(key < self.hi.as_ref());
589        }
590        if let Some((k, v)) = self.leaf_pair_for_key(key.as_ref()) {
591            (k.clone(), Some(v.clone()))
592        } else {
593            let encoded_key = IVec::from(&key[self.prefix_len as usize..]);
594            let encoded_val = None;
595            (encoded_key, encoded_val)
596        }
597    }
598
599    pub(crate) fn should_split(&self) -> bool {
600        let threshold = if cfg!(any(test, feature = "lock_free_delays")) {
601            2
602        } else if self.data.is_index() {
603            256
604        } else {
605            16
606        };
607
608        let size_checks = self.data.len() > threshold;
609        let safety_checks = self.merging_child.is_none() && !self.merging;
610
611        size_checks && safety_checks
612    }
613
614    pub(crate) fn should_merge(&self) -> bool {
615        let threshold = if cfg!(any(test, feature = "lock_free_delays")) {
616            1
617        } else if self.data.is_index() {
618            64
619        } else {
620            4
621        };
622
623        let size_checks = self.data.len() < threshold;
624        let safety_checks = self.merging_child.is_none() && !self.merging;
625
626        size_checks && safety_checks
627    }
628
629    pub(crate) fn can_merge_child(&self) -> bool {
630        self.merging_child.is_none() && !self.merging
631    }
632
633    pub(crate) fn index_next_node(&self, key: &[u8]) -> (usize, PageId) {
634        let index =
635            self.data.index_ref().expect("index_next_node called on leaf");
636
637        let suffix = &key[self.prefix_len as usize..];
638
639        let search = binary_search_lub(suffix, &index.keys);
640
641        let pos = search.expect("failed to traverse index");
642
643        (pos, index.pointers[pos])
644    }
645}
646
647#[derive(Clone, Debug, PartialEq)]
648pub(crate) enum Data {
649    Index(Index),
650    Leaf(Leaf),
651}
652
653impl Default for Data {
654    fn default() -> Data {
655        Data::Leaf(Leaf::default())
656    }
657}
658
659impl Data {
660    pub(crate) fn rss(&self) -> u64 {
661        match self {
662            Data::Index(ref index) => {
663                index.keys.iter().map(|k| k.len() + 8).sum::<usize>() as u64
664            }
665            Data::Leaf(ref leaf) => leaf
666                .keys
667                .iter()
668                .zip(leaf.values.iter())
669                .map(|(k, v)| k.len() + v.len())
670                .sum::<usize>() as u64,
671        }
672    }
673
674    pub(crate) fn len(&self) -> usize {
675        match *self {
676            Data::Index(ref index) => index.keys.len(),
677            Data::Leaf(ref leaf) => leaf.keys.len(),
678        }
679    }
680
681    pub(crate) fn parent_merge_confirm(&mut self, merged_child_pid: PageId) {
682        match self {
683            Data::Index(ref mut index) => {
684                let idx = index
685                    .pointers
686                    .iter()
687                    .position(|c| *c == merged_child_pid)
688                    .unwrap();
689                index.keys.remove(idx);
690                index.pointers.remove(idx);
691            }
692            _ => panic!("parent_merge_confirm called on leaf data"),
693        }
694    }
695
696    pub(crate) fn leaf_ref(&self) -> Option<&Leaf> {
697        match *self {
698            Data::Index(_) => None,
699            Data::Leaf(ref leaf) => Some(leaf),
700        }
701    }
702
703    pub(crate) fn index_ref(&self) -> Option<&Index> {
704        match *self {
705            Data::Index(ref index) => Some(index),
706            Data::Leaf(_) => None,
707        }
708    }
709
710    pub(crate) fn is_index(&self) -> bool {
711        if let Data::Index(..) = self { true } else { false }
712    }
713}
714
715#[derive(Clone, Debug, PartialEq, Default)]
716pub(crate) struct Leaf {
717    pub(crate) keys: Vec<IVec>,
718    pub(crate) values: Vec<IVec>,
719}
720
721#[derive(Clone, Debug, PartialEq, Default)]
722pub(crate) struct Index {
723    pub(crate) keys: Vec<IVec>,
724    pub(crate) pointers: Vec<PageId>,
725}
726
727#[cfg(feature = "lock_free_delays")]
728fn is_sorted<T: PartialOrd>(xs: &[T]) -> bool {
729    xs.windows(2).all(|pair| pair[0] <= pair[1])
730}
731
732#[test]
733fn merge_uneven_nodes() {
734    let left = Node {
735        data: Data::Leaf(Leaf {
736            keys: vec![vec![230, 126, 1, 0].into()],
737            values: vec![vec![].into()],
738        }),
739        next: NonZeroU64::new(1),
740        lo: vec![230, 125, 1, 0].into(),
741        hi: vec![230, 134, 0, 0].into(),
742        merging_child: None,
743        merging: false,
744        prefix_len: 0,
745    };
746
747    let right = Node {
748        data: Data::Leaf(Leaf {
749            keys: vec![vec![134, 0, 0].into()],
750            values: vec![vec![].into()],
751        }),
752        next: None,
753        lo: vec![230, 134, 0, 0].into(),
754        hi: vec![230, 147, 0, 0].into(),
755        merging_child: None,
756        merging: false,
757        prefix_len: 1,
758    };
759
760    left.receive_merge(&right);
761}