1use std::{num::NonZeroU64, ops::Bound};
2
3use super::*;
4
5#[derive(Default, Debug, Clone, PartialEq)]
6pub struct Node {
7 pub(crate) prefix_len: u8,
8 pub(crate) next: Option<NonZeroU64>,
9 pub(crate) merging_child: Option<NonZeroU64>,
10 pub(crate) merging: bool,
11 pub(crate) lo: IVec,
12 pub(crate) hi: IVec,
13 pub(crate) data: Data,
14}
15
16impl Node {
17 pub(crate) fn new_hoisted_root(
18 left: PageId,
19 at: IVec,
20 right: PageId,
21 ) -> Node {
22 Node {
23 data: Data::Index(Index {
24 keys: vec![prefix::empty().into(), at],
25 pointers: vec![left, right],
26 }),
27 ..Node::default()
28 }
29 }
30
31 pub(crate) fn new_root(child_pid: PageId) -> Node {
32 Node {
33 data: Data::Index(Index {
34 keys: vec![prefix::empty().into()],
35 pointers: vec![child_pid],
36 }),
37 ..Node::default()
38 }
39 }
40
41 pub(crate) fn rss(&self) -> u64 {
42 std::mem::size_of::<Node>() as u64
43 + self.lo.len() as u64
44 + self.hi.len() as u64
45 + self.data.rss()
46 }
47
48 fn prefix_decode(&self, key: &[u8]) -> IVec {
49 prefix::decode(self.prefix(), key)
50 }
51
52 fn prefix_encode<'a>(&self, key: &'a [u8]) -> &'a [u8] {
53 assert!(*self.lo <= *key);
54 if !self.hi.is_empty() {
55 assert!(*self.hi > *key);
56 }
57
58 &key[self.prefix_len as usize..]
59 }
60
61 fn prefix(&self) -> &[u8] {
62 &self.lo[..self.prefix_len as usize]
63 }
64
65 pub(crate) fn apply(&mut self, link: &Link) {
66 use self::Link::*;
67
68 assert!(
69 !self.merging,
70 "somehow a link was applied to a node after it was merged"
71 );
72
73 match *link {
74 Set(ref k, ref v) => {
75 self.set_leaf(k.clone(), v.clone());
76 }
77 Del(ref k) => {
78 self.del_leaf(k);
79 }
80 ParentMergeIntention(pid) => {
81 assert!(
82 self.merging_child.is_none(),
83 "trying to merge {:?} into node {:?} which \
84 is already merging another child",
85 link,
86 self
87 );
88 self.merging_child = Some(NonZeroU64::new(pid).unwrap());
89 }
90 ParentMergeConfirm => {
91 assert!(self.merging_child.is_some());
92 let merged_child = self
93 .merging_child
94 .take()
95 .expect(
96 "we should have a specific \
97 child that was merged if this \
98 link appears here",
99 )
100 .get();
101 self.data.parent_merge_confirm(merged_child);
102 }
103 ChildMergeCap => {
104 self.merging = true;
105 }
106 }
107 }
108
109 pub(crate) fn set_leaf(&mut self, key: IVec, val: IVec) {
110 if !self.hi.is_empty() {
111 assert!(*key < self.hi[self.prefix_len as usize..]);
112 }
113 if let Data::Leaf(ref mut leaf) = self.data {
114 let search = leaf.keys.binary_search_by(|k| fastcmp(k, &key));
115 match search {
116 Ok(idx) => leaf.values[idx] = val,
117 Err(idx) => {
118 leaf.keys.insert(idx, key);
119 leaf.values.insert(idx, val);
120 }
121 }
122 testing_assert!(is_sorted(&leaf.keys));
123 } else {
124 panic!("tried to Set a value to an index");
125 }
126 }
127
128 pub(crate) fn del_leaf(&mut self, key: &IVec) {
129 if let Data::Leaf(ref mut leaf) = self.data {
130 let search = leaf.keys.binary_search_by(|k| fastcmp(k, key));
131 if let Ok(idx) = search {
132 leaf.keys.remove(idx);
133 leaf.values.remove(idx);
134 }
135 testing_assert!(is_sorted(&leaf.keys));
136 } else {
137 panic!("tried to attach a Del to an Index chain");
138 }
139 }
140
141 pub(crate) fn parent_split(&mut self, at: &[u8], to: PageId) -> bool {
142 if let Data::Index(ref mut index) = self.data {
143 let encoded_sep = &at[self.prefix_len as usize..];
144 match index.keys.binary_search_by(|k| fastcmp(k, encoded_sep)) {
145 Ok(_) => {
146 debug!(
147 "parent_split skipped because \
148 parent already contains child \
149 at split point due to deep race"
150 );
151 return false;
152 }
153 Err(idx) => {
154 index.keys.insert(idx, IVec::from(encoded_sep));
155 index.pointers.insert(idx, to)
156 }
157 }
158 testing_assert!(is_sorted(&index.keys));
159 } else {
160 panic!("tried to attach a ParentSplit to a Leaf chain");
161 }
162
163 true
164 }
165
166 pub(crate) fn split(mut self) -> (Node, Node) {
167 fn split_inner<T>(
168 keys: &mut Vec<IVec>,
169 values: &mut Vec<T>,
170 old_prefix: &[u8],
171 old_hi: &[u8],
172 suffix_truncation: bool,
173 ) -> (IVec, u8, Vec<IVec>, Vec<T>)
174 where
175 T: Clone + Ord,
176 {
177 let split_point = keys.len() / 2 + 1;
178 let right_keys = keys.split_off(split_point);
179 let right_values = values.split_off(split_point);
180 let right_min = &right_keys[0];
181 let left_max = &keys.last().unwrap();
182
183 let splitpoint_length = if suffix_truncation {
184 right_min
193 .iter()
194 .zip(left_max.iter())
195 .take_while(|(a, b)| a == b)
196 .count()
197 + 1
198 } else {
199 right_min.len()
200 };
201
202 let split_point: IVec =
203 prefix::decode(old_prefix, &right_min[..splitpoint_length]);
204
205 assert!(!split_point.is_empty());
206
207 let new_prefix_len = old_hi
208 .iter()
209 .zip(split_point.iter())
210 .take_while(|(a, b)| a == b)
211 .take(u8::max_value() as usize)
212 .count();
213
214 assert!(
215 new_prefix_len >= old_prefix.len(),
216 "new prefix length {} should be greater than \
217 or equal to the old prefix length {}",
218 new_prefix_len,
219 old_prefix.len()
220 );
221
222 let mut right_keys_data = Vec::with_capacity(right_keys.len());
223
224 for k in right_keys {
225 let k: IVec = if new_prefix_len == old_prefix.len() {
226 k.clone()
227 } else {
228 prefix::reencode(old_prefix, &k, new_prefix_len)
230 };
231 right_keys_data.push(k);
232 }
233
234 testing_assert!(is_sorted(&right_keys_data));
235
236 (
237 split_point,
238 u8::try_from(new_prefix_len).unwrap(),
239 right_keys_data,
240 right_values,
241 )
242 }
243
244 let prefixed_lo = &self.lo[..self.prefix_len as usize];
245 let prefixed_hi = &self.hi;
246 let (split, right_prefix_len, right_data) = match self.data {
247 Data::Index(ref mut index) => {
248 let (split, right_prefix_len, right_keys, right_values) =
249 split_inner(
250 &mut index.keys,
251 &mut index.pointers,
252 prefixed_lo,
253 prefixed_hi,
254 false,
255 );
256
257 (
258 split,
259 right_prefix_len,
260 Data::Index(Index {
261 keys: right_keys,
262 pointers: right_values,
263 }),
264 )
265 }
266 Data::Leaf(ref mut leaf) => {
267 let (split, right_prefix_len, right_keys, right_values) =
268 split_inner(
269 &mut leaf.keys,
270 &mut leaf.values,
271 prefixed_lo,
272 prefixed_hi,
273 true,
274 );
275
276 (
277 split,
278 right_prefix_len,
279 Data::Leaf(Leaf { keys: right_keys, values: right_values }),
280 )
281 }
282 };
283
284 let right = Node {
285 data: right_data,
286 next: self.next,
287 lo: split.clone(),
288 hi: self.hi.clone(),
289 merging_child: None,
290 merging: false,
291 prefix_len: right_prefix_len,
292 };
293
294 self.hi = split;
295
296 let new_prefix_len = self
297 .hi
298 .iter()
299 .zip(self.lo.iter())
300 .take_while(|(a, b)| a == b)
301 .take(u8::max_value() as usize)
302 .count();
303
304 if new_prefix_len != self.prefix_len as usize {
305 match self.data {
306 Data::Index(ref mut index) => {
307 for k in &mut index.keys {
308 *k = prefix::reencode(prefixed_lo, k, new_prefix_len);
309 }
310 }
311 Data::Leaf(ref mut leaf) => {
312 for k in &mut leaf.keys {
313 *k = prefix::reencode(prefixed_lo, k, new_prefix_len);
314 }
315 }
316 }
317 }
318
319 self.prefix_len = u8::try_from(new_prefix_len).unwrap();
320
321 self.next = None;
325
326 if self.hi.is_empty() {
327 assert_eq!(self.prefix_len, 0);
328 }
329
330 assert!(!(self.lo.is_empty() && self.hi.is_empty()));
331 assert!(!(self.lo.is_empty() && (self.prefix_len > 0)));
332 assert!(self.lo.len() >= self.prefix_len as usize);
333 assert!(self.hi.len() >= self.prefix_len as usize);
334 assert!(!(right.lo.is_empty() && right.hi.is_empty()));
335 assert!(!(right.lo.is_empty() && (right.prefix_len > 0)));
336 assert!(right.lo.len() >= right.prefix_len as usize);
337 assert!(right.hi.len() >= right.prefix_len as usize);
338
339 if !self.lo.is_empty() && !self.hi.is_empty() {
340 assert!(self.lo < self.hi, "{:?}", self);
341 }
342
343 if !right.lo.is_empty() && !right.hi.is_empty() {
344 assert!(right.lo < right.hi, "{:?}", right);
345 }
346
347 (self, right)
348 }
349
350 pub(crate) fn receive_merge(&self, right: &Node) -> Node {
351 fn receive_merge_inner<T>(
352 old_prefix: &[u8],
353 new_prefix_len: usize,
354 left_keys: &mut Vec<IVec>,
355 left_values: &mut Vec<T>,
356 right_keys: &[IVec],
357 right_values: &[T],
358 ) where
359 T: Debug + Clone + PartialOrd,
360 {
361 for (k, v) in right_keys.iter().zip(right_values.iter()) {
366 let k = if new_prefix_len == old_prefix.len() {
367 k.clone()
368 } else {
369 prefix::reencode(old_prefix, k, new_prefix_len)
370 };
371 left_keys.push(k);
372 left_values.push(v.clone());
373 }
374 testing_assert!(
375 is_sorted(left_keys),
376 "should have been sorted: {:?}",
377 left_keys
378 );
379 }
380
381 let mut merged = self.clone();
382
383 let new_prefix_len = right
384 .hi
385 .iter()
386 .zip(self.lo.iter())
387 .take_while(|(a, b)| a == b)
388 .take(u8::max_value() as usize)
389 .count();
390
391 if new_prefix_len != merged.prefix_len as usize {
392 match merged.data {
393 Data::Index(ref mut index) => {
394 for k in &mut index.keys {
395 *k = prefix::reencode(self.prefix(), k, new_prefix_len);
396 }
397 }
398 Data::Leaf(ref mut leaf) => {
399 for k in &mut leaf.keys {
400 *k = prefix::reencode(self.prefix(), k, new_prefix_len);
401 }
402 }
403 }
404 }
405
406 merged.prefix_len = u8::try_from(new_prefix_len).unwrap();
407
408 match (&mut merged.data, &right.data) {
409 (Data::Index(ref mut left_index), Data::Index(ref right_index)) => {
410 receive_merge_inner(
411 right.prefix(),
412 new_prefix_len,
413 &mut left_index.keys,
414 &mut left_index.pointers,
415 right_index.keys.as_ref(),
416 right_index.pointers.as_ref(),
417 );
418 }
419 (Data::Leaf(ref mut left_leaf), Data::Leaf(ref right_leaf)) => {
420 receive_merge_inner(
421 right.prefix(),
422 new_prefix_len,
423 &mut left_leaf.keys,
424 &mut left_leaf.values,
425 right_leaf.keys.as_ref(),
426 right_leaf.values.as_ref(),
427 );
428 }
429 _ => panic!("Can't merge incompatible Data!"),
430 }
431
432 merged.hi = right.hi.clone();
433 merged.next = right.next;
434 merged
435 }
436
437 pub(crate) fn contains_upper_bound(&self, bound: &Bound<IVec>) -> bool {
438 match bound {
439 Bound::Excluded(bound) if self.hi >= *bound => true,
440 Bound::Included(bound) if self.hi > *bound => true,
441 _ => self.hi.is_empty(),
442 }
443 }
444
445 pub(crate) fn contains_lower_bound(
446 &self,
447 bound: &Bound<IVec>,
448 is_forward: bool,
449 ) -> bool {
450 match bound {
451 Bound::Excluded(bound)
452 if self.lo < *bound || (is_forward && *bound == self.lo) =>
453 {
454 true
455 }
456 Bound::Included(bound) if self.lo <= *bound => true,
457 Bound::Unbounded if !is_forward => self.hi.is_empty(),
458 _ => self.lo.is_empty(),
459 }
460 }
461
462 pub(crate) fn successor(
463 &self,
464 bound: &Bound<IVec>,
465 ) -> Option<(IVec, IVec)> {
466 assert!(!self.data.is_index());
467
468 let predecessor_key = match bound {
471 Bound::Unbounded => self.prefix_encode(&self.lo),
472 Bound::Included(b) | Bound::Excluded(b) => {
473 let max = std::cmp::max(b, &self.lo);
474 self.prefix_encode(max)
475 }
476 };
477
478 let leaf = self.data.leaf_ref().unwrap();
479 let search =
480 leaf.keys.binary_search_by(|k| fastcmp(k, predecessor_key));
481
482 let start = match search {
483 Ok(start) => start,
484 Err(start) if start < leaf.keys.len() => start,
485 _ => return None,
486 };
487
488 for (idx, k) in leaf.keys[start..].iter().enumerate() {
489 match bound {
490 Bound::Excluded(b) if b[self.prefix_len as usize..] == **k => {
491 continue;
494 }
495 _ => {}
496 }
497 let decoded_key = self.prefix_decode(k);
498 return Some((decoded_key, leaf.values[start + idx].clone()));
499 }
500
501 None
502 }
503
504 pub(crate) fn predecessor(
505 &self,
506 bound: &Bound<IVec>,
507 ) -> Option<(IVec, IVec)> {
508 assert!(!self.data.is_index());
509
510 let successor_key = match bound {
514 Bound::Unbounded => {
515 if self.hi.is_empty() {
516 None
517 } else {
518 Some(IVec::from(self.prefix_encode(&self.hi)))
519 }
520 }
521 Bound::Included(b) => Some(IVec::from(self.prefix_encode(b))),
522 Bound::Excluded(b) => {
523 let encoded = &b[self.prefix_len as usize..];
528 Some(IVec::from(encoded))
529 }
530 };
531
532 let leaf = self.data.leaf_ref().unwrap();
533 let search = if let Some(successor_key) = successor_key {
534 leaf.keys.binary_search_by(|k| fastcmp(k, &successor_key))
535 } else if leaf.keys.is_empty() {
536 Err(0)
537 } else {
538 Ok(leaf.keys.len() - 1)
539 };
540
541 let end = match search {
542 Ok(end) => end,
543 Err(end) if end > 0 => end - 1,
544 _ => return None,
545 };
546
547 for (idx, k) in leaf.keys[0..=end].iter().enumerate().rev() {
548 match bound {
549 Bound::Excluded(b)
550 if b.len() >= self.prefix_len as usize
551 && b[self.prefix_len as usize..] == **k =>
552 {
553 continue;
556 }
557 _ => {}
558 }
559 let decoded_key = self.prefix_decode(k);
560
561 return Some((decoded_key, leaf.values[idx].clone()));
562 }
563 None
564 }
565
566 pub(crate) fn leaf_pair_for_key(
568 &self,
569 key: &[u8],
570 ) -> Option<(&IVec, &IVec)> {
571 let leaf = self
572 .data
573 .leaf_ref()
574 .expect("leaf_pair_for_key called on index node");
575
576 let suffix = &key[self.prefix_len as usize..];
577
578 let search = leaf.keys.binary_search_by(|k| fastcmp(k, suffix)).ok();
579
580 search.map(|idx| (&leaf.keys[idx], &leaf.values[idx]))
581 }
582
583 pub fn node_kv_pair(&self, key: &[u8]) -> (IVec, Option<IVec>) {
586 assert!(key >= self.lo.as_ref());
587 if !self.hi.is_empty() {
588 assert!(key < self.hi.as_ref());
589 }
590 if let Some((k, v)) = self.leaf_pair_for_key(key.as_ref()) {
591 (k.clone(), Some(v.clone()))
592 } else {
593 let encoded_key = IVec::from(&key[self.prefix_len as usize..]);
594 let encoded_val = None;
595 (encoded_key, encoded_val)
596 }
597 }
598
599 pub(crate) fn should_split(&self) -> bool {
600 let threshold = if cfg!(any(test, feature = "lock_free_delays")) {
601 2
602 } else if self.data.is_index() {
603 256
604 } else {
605 16
606 };
607
608 let size_checks = self.data.len() > threshold;
609 let safety_checks = self.merging_child.is_none() && !self.merging;
610
611 size_checks && safety_checks
612 }
613
614 pub(crate) fn should_merge(&self) -> bool {
615 let threshold = if cfg!(any(test, feature = "lock_free_delays")) {
616 1
617 } else if self.data.is_index() {
618 64
619 } else {
620 4
621 };
622
623 let size_checks = self.data.len() < threshold;
624 let safety_checks = self.merging_child.is_none() && !self.merging;
625
626 size_checks && safety_checks
627 }
628
629 pub(crate) fn can_merge_child(&self) -> bool {
630 self.merging_child.is_none() && !self.merging
631 }
632
633 pub(crate) fn index_next_node(&self, key: &[u8]) -> (usize, PageId) {
634 let index =
635 self.data.index_ref().expect("index_next_node called on leaf");
636
637 let suffix = &key[self.prefix_len as usize..];
638
639 let search = binary_search_lub(suffix, &index.keys);
640
641 let pos = search.expect("failed to traverse index");
642
643 (pos, index.pointers[pos])
644 }
645}
646
647#[derive(Clone, Debug, PartialEq)]
648pub(crate) enum Data {
649 Index(Index),
650 Leaf(Leaf),
651}
652
653impl Default for Data {
654 fn default() -> Data {
655 Data::Leaf(Leaf::default())
656 }
657}
658
659impl Data {
660 pub(crate) fn rss(&self) -> u64 {
661 match self {
662 Data::Index(ref index) => {
663 index.keys.iter().map(|k| k.len() + 8).sum::<usize>() as u64
664 }
665 Data::Leaf(ref leaf) => leaf
666 .keys
667 .iter()
668 .zip(leaf.values.iter())
669 .map(|(k, v)| k.len() + v.len())
670 .sum::<usize>() as u64,
671 }
672 }
673
674 pub(crate) fn len(&self) -> usize {
675 match *self {
676 Data::Index(ref index) => index.keys.len(),
677 Data::Leaf(ref leaf) => leaf.keys.len(),
678 }
679 }
680
681 pub(crate) fn parent_merge_confirm(&mut self, merged_child_pid: PageId) {
682 match self {
683 Data::Index(ref mut index) => {
684 let idx = index
685 .pointers
686 .iter()
687 .position(|c| *c == merged_child_pid)
688 .unwrap();
689 index.keys.remove(idx);
690 index.pointers.remove(idx);
691 }
692 _ => panic!("parent_merge_confirm called on leaf data"),
693 }
694 }
695
696 pub(crate) fn leaf_ref(&self) -> Option<&Leaf> {
697 match *self {
698 Data::Index(_) => None,
699 Data::Leaf(ref leaf) => Some(leaf),
700 }
701 }
702
703 pub(crate) fn index_ref(&self) -> Option<&Index> {
704 match *self {
705 Data::Index(ref index) => Some(index),
706 Data::Leaf(_) => None,
707 }
708 }
709
710 pub(crate) fn is_index(&self) -> bool {
711 if let Data::Index(..) = self { true } else { false }
712 }
713}
714
715#[derive(Clone, Debug, PartialEq, Default)]
716pub(crate) struct Leaf {
717 pub(crate) keys: Vec<IVec>,
718 pub(crate) values: Vec<IVec>,
719}
720
721#[derive(Clone, Debug, PartialEq, Default)]
722pub(crate) struct Index {
723 pub(crate) keys: Vec<IVec>,
724 pub(crate) pointers: Vec<PageId>,
725}
726
727#[cfg(feature = "lock_free_delays")]
728fn is_sorted<T: PartialOrd>(xs: &[T]) -> bool {
729 xs.windows(2).all(|pair| pair[0] <= pair[1])
730}
731
732#[test]
733fn merge_uneven_nodes() {
734 let left = Node {
735 data: Data::Leaf(Leaf {
736 keys: vec![vec![230, 126, 1, 0].into()],
737 values: vec![vec![].into()],
738 }),
739 next: NonZeroU64::new(1),
740 lo: vec![230, 125, 1, 0].into(),
741 hi: vec![230, 134, 0, 0].into(),
742 merging_child: None,
743 merging: false,
744 prefix_len: 0,
745 };
746
747 let right = Node {
748 data: Data::Leaf(Leaf {
749 keys: vec![vec![134, 0, 0].into()],
750 values: vec![vec![].into()],
751 }),
752 next: None,
753 lo: vec![230, 134, 0, 0].into(),
754 hi: vec![230, 147, 0, 0].into(),
755 merging_child: None,
756 merging: false,
757 prefix_len: 1,
758 };
759
760 left.receive_merge(&right);
761}