bplustree/
lib.rs

1//! Implementation of a fast in-memory concurrent B+ Tree featuring optimistic lock coupling. The implementation
2//! is based on [LeanStore](https://dbis1.github.io/leanstore.html) with some adaptations from
3//! [Umbra](https://umbra-db.com/#publications).
4//!
5//! The current API is very basic and more features are supposed to be added in the following
6//! versions, it tries to loosely follow the [`std::collections::BTreeMap`] API.
7//!
8//! Currently it is not heavily optimized but is already faster than some concurrent lock-free
9//! implementations. Single threaded performance is generally slower (~ 1.4x) but still comparable to [`std::collections::BTreeMap`]
10//! with sligthly faster scans due to the B+ Tree topology.
11//! ```
12//! use bplustree::BPlusTree;
13//!
14//! let tree = BPlusTree::new();
15//!
16//! tree.insert("some", "data");
17//! ```
18use crossbeam_epoch::{self as epoch, Atomic, Owned};
19use smallvec::{smallvec, SmallVec};
20
21use std::borrow::Borrow;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::fmt;
24
25// use parking_lot_core::SpinWait;
26
27pub mod error;
28pub mod latch;
29pub mod iter;
30#[cfg(test)]
31pub mod util;
32#[cfg(test)]
33pub mod bench;
34
35use latch::{HybridLatch, OptimisticGuard, SharedGuard, ExclusiveGuard, HybridGuard};
36
37/// Type alias for the `GenericBPlusTree` with preset node sizes
38pub type BPlusTree<K, V> = GenericBPlusTree<K, V, 128, 256>;
39
40/// Concurrent, optimistically locked B+ Tree
41///
42/// `InnerNode` and `LeafNode` capacities can be configured through the const generic parameters `IC`
43/// and `LC` respectively.
44pub struct GenericBPlusTree<K, V, const IC: usize, const LC: usize> {
45    root: HybridLatch<Atomic<HybridLatch<Node<K, V, IC, LC>>>>,
46    height: AtomicUsize
47}
48
49pub(crate) enum ParentHandler<'r, 'p, K, V, const IC: usize, const LC: usize> {
50    Root { tree_guard: OptimisticGuard<'r, Atomic<HybridLatch<Node<K, V, IC, LC>>>> },
51    Parent {
52        parent_guard: OptimisticGuard<'p, Node<K, V, IC, LC>>,
53        pos: u16
54    }
55}
56
57#[derive(Debug, PartialEq, Copy, Clone)]
58pub(crate) enum Direction {
59    Forward,
60    Reverse
61}
62
63macro_rules! tp {
64    ($x:expr) => {
65        println!("[{:?}] {}", std::thread::current().id(), format!($x));
66    };
67    ($x:expr, $($y:expr),+) => {
68        println!("[{:?}] {}", std::thread::current().id(), format!($x, $($y),+));
69    };
70}
71
72impl<K: Clone + Ord, V, const IC: usize, const LC: usize> GenericBPlusTree<K, V, IC, LC> {
73    /// Makes a new, empty `GenericBPlusTree`
74    ///
75    /// Allocates the root node on creation
76    ///
77    /// # Examples
78    ///
79    /// Basic usage:
80    ///
81    /// ```
82    /// use bplustree::BPlusTree;
83    ///
84    /// let tree = BPlusTree::new();
85    ///
86    /// // entries can now be inserted into the empty tree
87    /// tree.insert(1, "a");
88    /// ```
89    pub fn new() -> Self {
90        GenericBPlusTree {
91            root: HybridLatch::new(Atomic::new(HybridLatch::new(Node::Leaf(
92                LeafNode {
93                    len: 0,
94                    keys: smallvec![],
95                    values: smallvec![],
96                    lower_fence: None,
97                    upper_fence: None,
98                    sample_key: None
99                }
100            )))),
101            height: AtomicUsize::new(1)
102        }
103    }
104
105    /// Returns the height of the tree
106    pub fn height(&self) -> usize {
107        self.height.load(Ordering::Relaxed)
108    }
109
110    pub(crate) fn find_parent<'t, 'g, 'e>(&'t self, needle: &impl HybridGuard<Node<K, V, IC, LC>>, eg: &'e epoch::Guard) -> error::Result<ParentHandler<'t, 'e, K, V, IC, LC>>
111    where
112        K: Ord
113    {
114        let tree_guard = self.root.optimistic_or_spin();
115        let root_latch = unsafe { tree_guard.load(Ordering::Acquire, eg).deref() };
116        let root_latch_ptr = root_latch as *const _;
117        let root_guard = root_latch.optimistic_or_spin();
118
119        if needle.latch() as *const _ == root_latch_ptr {
120            tree_guard.recheck()?;
121            return Ok(ParentHandler::Root { tree_guard })
122        }
123
124
125
126        let search_key = match needle.inner().sample_key().cloned() {
127            Some(key) => key,
128            None => {
129                needle.recheck()?;
130                // This is not the root node, and does not have a sample key. Assume that it is being
131                // reclaimed
132                return Err(error::Error::Reclaimed);
133            }
134        };
135
136        let mut t_guard = Some(tree_guard);
137        let mut p_guard: Option<OptimisticGuard<'e, Node<K, V, IC, LC>>> = None;
138        let mut target_guard = root_guard;
139
140        let (c_swip, pos) = loop {
141            let (c_swip, pos) = match *target_guard {
142                Node::Internal(ref internal) => {
143                    let (pos, _) = internal.lower_bound(&search_key);
144                    let swip = internal.edge_at(pos)?;
145
146                    (swip, pos)
147                }
148                Node::Leaf(ref _leaf) => {
149                    if let Some(p) = p_guard.as_ref() {
150                        p.recheck()?;
151                    }
152
153                    target_guard.recheck()?;
154
155                    needle.recheck()?; // This is needed to ensure this node was not merged during the search
156
157                    if let Some(tree_guard) = t_guard {
158                        tree_guard.recheck()?;
159                    }
160
161                    panic!("reaching leaves, merges or splits are wrong");
162                    // return Err(error::Error::Unwind);
163                }
164            };
165
166            if c_swip.load(Ordering::Acquire, eg).as_raw() == needle.latch() as *const _ {
167                break (c_swip, pos);
168            }
169
170            let guard = GenericBPlusTree::lock_coupling(&target_guard, c_swip, eg)?;
171            p_guard = Some(target_guard);
172            target_guard = guard;
173
174            if let Some(tree_guard) = t_guard.take() {
175                tree_guard.recheck()?;
176            }
177        };
178
179
180        if c_swip.load(Ordering::Acquire, eg).as_raw() == needle.latch() as *const _ {
181            target_guard.recheck()?;
182            return Ok(ParentHandler::Parent { parent_guard: target_guard, pos });
183        } else {
184            return Err(error::Error::Unwind)
185        }
186    }
187
188    pub(crate) fn find_nearest_leaf<'t, 'g, 'e>(&'t self, needle: &OptimisticGuard<'g, Node<K, V, IC, LC>>, direction: Direction, eg: &'e epoch::Guard) -> error::Result<Option<(OptimisticGuard<'e, Node<K, V, IC, LC>>, (OptimisticGuard<'e, Node<K, V, IC, LC>>, u16))>>
189    where
190        K: Ord
191    {
192        let tree_guard = self.root.optimistic_or_spin();
193        let root_latch = unsafe { tree_guard.load(Ordering::Acquire, eg).deref() };
194        let root_latch_ptr = root_latch as *const _;
195        let root_guard = root_latch.optimistic_or_spin();
196
197        if needle.latch() as *const _ == root_latch_ptr {
198            root_guard.recheck()?;
199            tree_guard.recheck()?;
200            return error::Result::Ok(None);
201        }
202
203
204        let (parent_guard, pos) = match self.find_parent(needle, eg)? {
205            ParentHandler::Root { tree_guard: _ } => {
206                return error::Result::Ok(None);
207            }
208            ParentHandler::Parent { parent_guard, pos } => (parent_guard, pos)
209        };
210
211        let within_bounds = match direction {
212            Direction::Forward => {
213                pos + 1 <= parent_guard.as_internal().len
214            }
215            Direction::Reverse => {
216                pos > 0
217            }
218        };
219
220        if within_bounds {
221            let lookup_pos = match direction {
222                Direction::Forward => pos + 1,
223                Direction::Reverse => pos - 1
224            };
225            let swip = parent_guard.as_internal().edge_at(lookup_pos)?;
226
227            let guard = GenericBPlusTree::lock_coupling(&parent_guard, swip, eg)?;
228
229            if guard.is_leaf() {
230                guard.recheck()?;
231                return error::Result::Ok(Some((guard, (parent_guard, lookup_pos))));
232            } else {
233                let (leaf, parent_opt) = self.find_leaf_and_parent_from_node(guard, direction, eg)?;
234                return error::Result::Ok(Some((leaf, parent_opt.expect("must have parent here"))));
235            }
236        } else {
237            let mut target_guard = parent_guard;
238
239            loop {
240                let (parent_guard, pos) = match self.find_parent(&target_guard, eg)? {
241                    ParentHandler::Root { tree_guard: _ } => {
242                        return error::Result::Ok(None);
243                    }
244                    ParentHandler::Parent { parent_guard, pos } => (parent_guard, pos)
245                };
246
247                let within_bounds = match direction {
248                    Direction::Forward => {
249                        pos + 1 <= parent_guard.as_internal().len
250                    }
251                    Direction::Reverse => {
252                        pos > 0
253                    }
254                };
255
256                if within_bounds {
257                    let lookup_pos = match direction {
258                        Direction::Forward => pos + 1,
259                        Direction::Reverse => pos - 1
260                    };
261                    let swip = parent_guard.as_internal().edge_at(lookup_pos)?;
262
263                    let guard = GenericBPlusTree::lock_coupling(&parent_guard, swip, eg)?;
264
265                    if guard.is_leaf() {
266                        guard.recheck()?;
267                        return error::Result::Ok(Some((guard, (parent_guard, lookup_pos))));
268                    } else {
269                        let (leaf, parent_opt) = self.find_leaf_and_parent_from_node(guard, direction, eg)?;
270                        return error::Result::Ok(Some((leaf, parent_opt.expect("must have parent here"))));
271                    }
272                } else {
273                    target_guard = parent_guard;
274                    continue;
275                }
276            }
277        }
278    }
279
280    fn lock_coupling<'e>(p_guard: &OptimisticGuard<'e, Node<K, V, IC, LC>>, swip: &Atomic<HybridLatch<Node<K, V, IC, LC>>>, eg: &'e epoch::Guard) -> error::Result<OptimisticGuard<'e, Node<K, V, IC, LC>>> {
281        let latch = unsafe { swip.load(Ordering::Acquire, eg).deref() };
282        let guard = latch.optimistic_or_spin();
283
284        p_guard.recheck()?;
285
286        Ok(guard)
287    }
288
289    fn lock_coupling_shared<'e>(p_guard: &OptimisticGuard<'e, Node<K, V, IC, LC>>, swip: &Atomic<HybridLatch<Node<K, V, IC, LC>>>, eg: &'e epoch::Guard) -> error::Result<SharedGuard<'e, Node<K, V, IC, LC>>> {
290        let latch = unsafe { swip.load(Ordering::Acquire, eg).deref() };
291        let guard = latch.shared();
292
293        p_guard.recheck()?;
294
295        Ok(guard)
296    }
297
298    fn lock_coupling_exclusive<'e>(p_guard: &OptimisticGuard<'e, Node<K, V, IC, LC>>, swip: &Atomic<HybridLatch<Node<K, V, IC, LC>>>, eg: &'e epoch::Guard) -> error::Result<ExclusiveGuard<'e, Node<K, V, IC, LC>>> {
299        let latch = unsafe { swip.load(Ordering::Acquire, eg).deref() };
300        let guard = latch.exclusive();
301
302        p_guard.recheck()?;
303
304        Ok(guard)
305    }
306
307    fn find_leaf_and_parent_from_node<'t, 'g ,'e>(&'t self, needle: OptimisticGuard<'e, Node<K, V, IC, LC>>, direction: Direction, eg: &'e epoch::Guard) -> error::Result<(OptimisticGuard<'e, Node<K, V, IC, LC>>, Option<(OptimisticGuard<'e, Node<K, V, IC, LC>>, u16)>)> {
308        let mut p_guard = None;
309        let mut target_guard = needle;
310
311        let leaf_guard = loop {
312            let (c_swip, pos) = match *target_guard {
313                Node::Internal(ref internal) => {
314                    let pos = match direction {
315                        Direction::Forward => 0,
316                        Direction::Reverse => internal.len
317                    };
318                    let swip = internal.edge_at(pos)?;
319                    (swip, pos)
320                }
321                Node::Leaf(ref _leaf) => {
322                    break target_guard;
323                }
324            };
325
326            let guard = GenericBPlusTree::lock_coupling(&target_guard, c_swip, eg)?;
327            p_guard = Some((target_guard, pos));
328            target_guard = guard;
329        };
330
331        leaf_guard.recheck()?;
332
333        Ok((leaf_guard, p_guard))
334    }
335
336    fn find_first_leaf_and_parent<'t, 'e>(&'t self, eg: &'e epoch::Guard) -> error::Result<(OptimisticGuard<'e, Node<K, V, IC, LC>>, Option<(OptimisticGuard<'e, Node<K, V, IC, LC>>, u16)>)> {
337        let tree_guard = self.root.optimistic_or_spin();
338        let root_latch = unsafe { tree_guard.load(Ordering::Acquire, eg).deref() };
339        let root_guard = root_latch.optimistic_or_spin();
340        tree_guard.recheck()?;
341
342        self.find_leaf_and_parent_from_node(root_guard, Direction::Forward, eg)
343    }
344
345    fn find_last_leaf_and_parent<'t, 'e>(&'t self, eg: &'e epoch::Guard) -> error::Result<(OptimisticGuard<'e, Node<K, V, IC, LC>>, Option<(OptimisticGuard<'e, Node<K, V, IC, LC>>, u16)>)> {
346        let tree_guard = self.root.optimistic_or_spin();
347        let root_latch = unsafe { tree_guard.load(Ordering::Acquire, eg).deref() };
348        let root_guard = root_latch.optimistic_or_spin();
349        tree_guard.recheck()?;
350
351        self.find_leaf_and_parent_from_node(root_guard, Direction::Reverse, eg)
352    }
353
354    fn find_leaf_and_parent<'t, 'k ,'e, Q>(&'t self, key: &'k Q, eg: &'e epoch::Guard) -> error::Result<(OptimisticGuard<'e, Node<K, V, IC, LC>>, Option<(OptimisticGuard<'e, Node<K, V, IC, LC>>, u16)>)>
355    where
356        K: Borrow<Q> + Ord,
357        Q: ?Sized + Ord
358    {
359        let tree_guard = self.root.optimistic_or_spin();
360        let root_latch = unsafe { tree_guard.load(Ordering::Acquire, eg).deref() };
361        let root_guard = root_latch.optimistic_or_spin();
362        tree_guard.recheck()?;
363
364        let mut t_guard = Some(tree_guard);
365        let mut p_guard = None;
366        let mut target_guard = root_guard;
367
368        // let mut level = 0u16;
369
370        let leaf_guard = loop {
371            let (c_swip, pos) = match *target_guard {
372                Node::Internal(ref internal) => {
373                    let (pos, _) = internal.lower_bound(key);
374                    let swip = internal.edge_at(pos)?;
375                    (swip, pos)
376                }
377                Node::Leaf(ref _leaf) => {
378                    break target_guard;
379                }
380            };
381
382            // TODO if level == height - 1 use shared mode
383
384            let guard = GenericBPlusTree::lock_coupling(&target_guard, c_swip, eg)?;
385            p_guard = Some((target_guard, pos));
386            target_guard = guard;
387
388            if let Some(tree_guard) = t_guard.take() {
389                tree_guard.recheck()?;
390            }
391
392            // level += 1;
393        };
394
395        Ok((leaf_guard, p_guard))
396    }
397
398    fn find_leaf<'t, 'k ,'e, Q>(&'t self, key: &'k Q, eg: &'e epoch::Guard) -> error::Result<OptimisticGuard<'e, Node<K, V, IC, LC>>>
399    where
400        K: Borrow<Q> + Ord,
401        Q: ?Sized + Ord
402    {
403        let (leaf, _) = self.find_leaf_and_parent(key, eg)?;
404        Ok(leaf)
405    }
406
407    pub(crate) fn find_shared_leaf_and_optimistic_parent<'t, 'k ,'e, Q>(&'t self, key: &'k Q, eg: &'e epoch::Guard) -> (SharedGuard<'e, Node<K, V, IC, LC>>, Option<(OptimisticGuard<'e, Node<K, V, IC, LC>>, u16)>)
408    where
409        K: Borrow<Q> + Ord,
410        Q: ?Sized + Ord
411    {
412        loop {
413            let perform = || {
414                let tree_guard = self.root.optimistic_or_spin();
415                let root_latch = unsafe { tree_guard.load(Ordering::Acquire, eg).deref() };
416                let root_guard = root_latch.optimistic_or_spin();
417                tree_guard.recheck()?;
418
419                let mut t_guard = Some(tree_guard);
420                let mut p_guard = None;
421                let mut target_guard = root_guard;
422
423                let mut level = 1u16;
424
425                let leaf_guard = loop {
426                    let (c_swip, pos) = match *target_guard {
427                        Node::Internal(ref internal) => {
428                            let (pos, _) = internal.lower_bound(key);
429                            let swip = internal.edge_at(pos)?;
430                            (swip, pos)
431                        }
432                        Node::Leaf(ref _leaf) => {
433                            if let Some(tree_guard) = t_guard.take() {
434                                tree_guard.recheck()?;
435                            }
436
437                            if p_guard.is_none() {
438                                tp!("got root");
439                                break target_guard.to_shared()?;
440                            } else {
441                                panic!("got a leaf on the wrong level");
442                                // return Err(error::Error::Unwind);
443                            }
444                        }
445                    };
446
447                    // TODO if level == height - 1 use shared mode
448                    if (level + 1) as usize == self.height.load(Ordering::Acquire) {
449                        if let Some(tree_guard) = t_guard.take() {
450                            tree_guard.recheck()?;
451                        }
452
453                        let guard = GenericBPlusTree::lock_coupling_shared(&target_guard, c_swip, eg)?;
454                        p_guard = Some((target_guard, pos));
455
456                        break guard;
457                    } else {
458                        let guard = GenericBPlusTree::lock_coupling(&target_guard, c_swip, eg)?;
459                        p_guard = Some((target_guard, pos));
460                        target_guard = guard;
461
462                        if let Some(tree_guard) = t_guard.take() {
463                            tree_guard.recheck()?;
464                        }
465
466                        level += 1;
467                    }
468                };
469
470                error::Result::Ok((leaf_guard, p_guard))
471            };
472
473            match perform() {
474                Ok(tup) => {
475                    return tup;
476                }
477                Err(_) => {
478                    // TODO backoff
479                    continue;
480                }
481            }
482        }
483    }
484
485    pub(crate) fn find_first_shared_leaf_and_optimistic_parent<'t, 'e>(&'t self, eg: &'e epoch::Guard) -> (SharedGuard<'e, Node<K, V, IC, LC>>, Option<(OptimisticGuard<'e, Node<K, V, IC, LC>>, u16)>) {
486        loop {
487            let perform = || {
488                let (leaf, parent_opt) = self.find_first_leaf_and_parent(eg)?;
489                let shared_leaf = leaf.to_shared()?;
490                error::Result::Ok((shared_leaf, parent_opt))
491            };
492
493            match perform() {
494                Ok(tup) => {
495                    return tup;
496                }
497                Err(_) => {
498                    // TODO backoff
499                    continue;
500                }
501            }
502        }
503    }
504
505    pub(crate) fn find_last_shared_leaf_and_optimistic_parent<'t, 'e>(&'t self, eg: &'e epoch::Guard) -> (SharedGuard<'e, Node<K, V, IC, LC>>, Option<(OptimisticGuard<'e, Node<K, V, IC, LC>>, u16)>) {
506        loop {
507            let perform = || {
508                let (leaf, parent_opt) = self.find_last_leaf_and_parent(eg)?;
509                let shared_leaf = leaf.to_shared()?;
510                error::Result::Ok((shared_leaf, parent_opt))
511            };
512
513            match perform() {
514                Ok(tup) => {
515                    return tup;
516                }
517                Err(_) => {
518                    // TODO backoff
519                    continue;
520                }
521            }
522        }
523    }
524
525    pub(crate) fn find_exclusive_leaf_and_optimistic_parent<'t, 'k ,'e, Q>(&'t self, key: &'k Q, eg: &'e epoch::Guard) -> (ExclusiveGuard<'e, Node<K, V, IC, LC>>, Option<(OptimisticGuard<'e, Node<K, V, IC, LC>>, u16)>)
526    where
527        K: Borrow<Q> + Ord,
528        Q: ?Sized + Ord
529    {
530        loop {
531            let perform = || {
532                let tree_guard = self.root.optimistic_or_spin();
533                let root_latch = unsafe { tree_guard.load(Ordering::Acquire, eg).deref() };
534                let root_guard = root_latch.optimistic_or_spin();
535                tree_guard.recheck()?;
536
537                let mut t_guard = Some(tree_guard);
538                let mut p_guard = None;
539                let mut target_guard = root_guard;
540
541                let mut level = 1u16;
542
543                let leaf_guard = loop {
544                    let (c_swip, pos) = match *target_guard {
545                        Node::Internal(ref internal) => {
546                            let (pos, _) = internal.lower_bound(key);
547                            let swip = internal.edge_at(pos)?;
548                            (swip, pos)
549                        }
550                        Node::Leaf(ref _leaf) => {
551                            if let Some(tree_guard) = t_guard.take() {
552                                tree_guard.recheck()?;
553                            }
554
555                            if p_guard.is_none() {
556                                break target_guard.to_exclusive()?;
557                            } else {
558                                panic!("got a leaf on the wrong level");
559                                // return Err(error::Error::Unwind);
560                            }
561                        }
562                    };
563
564                    // TODO if level == height - 1 use shared mode
565                    if (level + 1) as usize == self.height.load(Ordering::Acquire) {
566                        if let Some(tree_guard) = t_guard.take() {
567                            tree_guard.recheck()?;
568                        }
569
570                        let guard = GenericBPlusTree::lock_coupling_exclusive(&target_guard, c_swip, eg)?;
571                        p_guard = Some((target_guard, pos));
572
573                        break guard;
574                    } else {
575                        let guard = GenericBPlusTree::lock_coupling(&target_guard, c_swip, eg)?;
576                        p_guard = Some((target_guard, pos));
577                        target_guard = guard;
578
579                        if let Some(tree_guard) = t_guard.take() {
580                            tree_guard.recheck()?;
581                        }
582
583                        level += 1;
584                    }
585                };
586
587                error::Result::Ok((leaf_guard, p_guard))
588            };
589
590            match perform() {
591                Ok(tup) => {
592                    return tup;
593                }
594                Err(_) => {
595                    // TODO backoff
596                    continue;
597                }
598            }
599        }
600    }
601
602    pub(crate) fn find_exact_exclusive_leaf_and_optimistic_parent<'t, 'k ,'e, Q>(&'t self, key: &'k Q, eg: &'e epoch::Guard) -> Option<((ExclusiveGuard<'e, Node<K, V, IC, LC>>, u16), Option<(OptimisticGuard<'e, Node<K, V, IC, LC>>, u16)>)>
603    where
604        K: Borrow<Q> + Ord,
605        Q: ?Sized + Ord
606    {
607        loop {
608            let perform = || {
609                let (leaf, parent_opt) = self.find_leaf_and_parent(key, eg)?;
610                let (pos, exact) = leaf.as_leaf().lower_bound(key);
611                if exact {
612                    let exclusive_leaf = leaf.to_exclusive()?;
613                    error::Result::Ok(Some(((exclusive_leaf, pos), parent_opt)))
614                } else {
615                    leaf.recheck()?;
616                    error::Result::Ok(None)
617                }
618            };
619
620            match perform() {
621                Ok(opt) => {
622                    return opt;
623                }
624                Err(_) => {
625                    // TODO backoff
626                    continue;
627                }
628            }
629        }
630    }
631
632    pub(crate) fn find_first_exclusive_leaf_and_optimistic_parent<'t, 'e>(&'t self, eg: &'e epoch::Guard) -> (ExclusiveGuard<'e, Node<K, V, IC, LC>>, Option<(OptimisticGuard<'e, Node<K, V, IC, LC>>, u16)>) {
633        loop {
634            let perform = || {
635                let (leaf, parent_opt) = self.find_first_leaf_and_parent(eg)?;
636                let exclusive_leaf = leaf.to_exclusive()?;
637                error::Result::Ok((exclusive_leaf, parent_opt))
638            };
639
640            match perform() {
641                Ok(tup) => {
642                    return tup;
643                }
644                Err(_) => {
645                    // TODO backoff
646                    continue;
647                }
648            }
649        }
650    }
651
652    pub(crate) fn find_last_exclusive_leaf_and_optimistic_parent<'t, 'e>(&'t self, eg: &'e epoch::Guard) -> (ExclusiveGuard<'e, Node<K, V, IC, LC>>, Option<(OptimisticGuard<'e, Node<K, V, IC, LC>>, u16)>) {
653        loop {
654            let perform = || {
655                let (leaf, parent_opt) = self.find_last_leaf_and_parent(eg)?;
656                let exclusive_leaf = leaf.to_exclusive()?;
657                error::Result::Ok((exclusive_leaf, parent_opt))
658            };
659
660            match perform() {
661                Ok(tup) => {
662                    return tup;
663                }
664                Err(_) => {
665                    // TODO backoff
666                    continue;
667                }
668            }
669        }
670    }
671
672    /// Zero copy optimistic getter for a value in the tree
673    ///
674    /// Accepts a function that may be executed multiple times until a valid access is performed,
675    /// calling the function should not have any side effects because it may be executed with
676    /// invalid data. When `lookup` returns it is guaranteed to have executed the function with
677    /// valid data at least once from which the result is returned.
678    ///
679    /// If no value exists for the specified key the received function may not be executed and `None`
680    /// is returned.
681    ///
682    /// # Examples
683    ///
684    /// Basic usage:
685    ///
686    /// ```
687    /// use bplustree::BPlusTree;
688    ///
689    /// let tree = BPlusTree::new();
690    ///
691    /// tree.insert("a", 1);
692    ///
693    /// assert_eq!(tree.lookup("a", |value| *value), Some(1));
694    /// ```
695    pub fn lookup<Q, R, F>(&self, key: &Q, f: F) -> Option<R>
696    where
697        K: Borrow<Q> + Ord,
698        Q: ?Sized + Ord,
699        F: Fn(&V) -> R
700    {
701        let eg = &epoch::pin();
702        loop {
703            let perform = || {
704                let guard = self.find_leaf(key, eg)?;
705                if let Node::Leaf(ref leaf) = *guard {
706                    let (pos, exact) = leaf.lower_bound(key);
707                    if exact {
708                        let result = f(leaf.value_at(pos)?);
709                        guard.recheck()?;
710                        error::Result::Ok(Some(result))
711                    } else {
712                        guard.recheck()?;
713                        error::Result::Ok(None)
714                    }
715                } else {
716                    unreachable!("must be a leaf node");
717                }
718            };
719
720            match perform() {
721                Ok(opt) => {
722                    return opt;
723                }
724                Err(_) => {
725                    // TODO backoff
726                    continue;
727                }
728            }
729        }
730    }
731
732    /// Removes a key from the tree, returning the value at the key if the key
733    /// was previously in the tree.
734    ///
735    /// The key may be any borrowed form of the tree's key type, but the ordering
736    /// on the borrowed form *must* match the ordering on the key type.
737    ///
738    /// # Examples
739    ///
740    /// Basic usage:
741    ///
742    /// ```
743    /// use bplustree::BPlusTree;
744    ///
745    /// let tree = BPlusTree::new();
746    /// tree.insert(1, "a");
747    /// assert_eq!(tree.remove(&1), Some("a"));
748    /// assert_eq!(tree.remove(&1), None);
749    /// ```
750    pub fn remove<Q>(&self, key: &Q) -> Option<V>
751    where
752        K: Borrow<Q> + Ord,
753        Q: ?Sized + Ord
754    {
755        self.remove_entry(key).map(|(_, v)| v)
756    }
757
758    /// Removes a key from the tree, returning the stored key and value if the key
759    /// was previously in the tree.
760    ///
761    /// The key may be any borrowed form of the tree's key type, but the ordering
762    /// on the borrowed form *must* match the ordering on the key type.
763    ///
764    /// # Examples
765    ///
766    /// Basic usage:
767    ///
768    /// ```
769    /// use bplustree::BPlusTree;
770    ///
771    /// let tree = BPlusTree::new();
772    /// tree.insert(1, "a");
773    /// assert_eq!(tree.remove_entry(&1), Some((1, "a")));
774    /// assert_eq!(tree.remove_entry(&1), None);
775    /// ```
776    pub fn remove_entry<Q>(&self, key: &Q) -> Option<(K, V)>
777    where
778        K: Borrow<Q> + Ord,
779        Q: ?Sized + Ord
780    {
781        let eg = &epoch::pin();
782
783        let opt = if let Some(((mut guard, pos), _parent_opt)) = self.find_exact_exclusive_leaf_and_optimistic_parent(key, eg) {
784            let kv = guard.as_leaf_mut().remove_at(pos);
785
786            if guard.is_underfull() {
787                let guard = guard.unlock();
788                loop {
789                    let perform_merge = || {
790                        let _ = self.try_merge(&guard, eg)?;
791                        error::Result::Ok(())
792                    };
793
794                    match perform_merge() {
795                        Ok(_) => {
796                            break;
797                        },
798                        Err(error::Error::Reclaimed) => {
799                            break;
800                        }
801                        Err(_) => {
802                            break; // TODO not ensuring merges happen timely
803                            // guard = guard.latch().optimistic_or_spin();
804                            // continue
805                        }
806                    }
807                }
808            }
809
810            Some(kv)
811        } else {
812            None
813        };
814
815        return opt;
816    }
817
818    /// Inserts a key-value pair into the tree.
819    ///
820    /// If the tree did not have this key present, `None` is returned.
821    ///
822    /// If the tree did have this key present, the value is updated, and the old
823    /// value is returned. The key is not updated, though; this matters for
824    /// types that can be `==` without being identical.
825    ///
826    /// # Examples
827    ///
828    /// Basic usage:
829    ///
830    /// ```
831    /// use bplustree::BPlusTree;
832    ///
833    /// let tree = BPlusTree::new();
834    /// assert_eq!(tree.insert(37, "a"), None);
835    ///
836    /// tree.insert(37, "b");
837    /// assert_eq!(tree.insert(37, "c"), Some("b"));
838    /// assert_eq!(tree.lookup(&37, |v| *v), Some("c"));
839    /// ```
840    pub fn insert(&self, key: K, value: V) -> Option<V>
841    where
842        K: Ord
843    {
844        let mut iter = self.raw_iter_mut();
845        iter.insert(key, value)
846    }
847
848    pub(crate) fn try_split<'t, 'g, 'e>(&'t self, needle: &OptimisticGuard<'g, Node<K, V, IC, LC>>, eg: &'e epoch::Guard) -> error::Result<()>
849    where
850        K: Ord
851    {
852        let parent_handler = self.find_parent(needle, eg)?;
853
854        match parent_handler {
855            ParentHandler::Root { tree_guard } => {
856                let mut tree_guard_x = tree_guard.to_exclusive()?; // TODO tree root should not need this lock (use atomic store)
857
858                let root_latch = unsafe { tree_guard_x.load(Ordering::Acquire, eg).deref() };
859                let mut root_guard_x = root_latch.exclusive();
860
861                // TODO assert!(height == 1 || !root_guard_x.is_leaf());
862
863                let mut new_root_owned: Owned<HybridLatch<Node<K, V, IC, LC>>> = Owned::new(
864                    HybridLatch::new(Node::Internal(InternalNode::new()))
865                );
866
867                match root_guard_x.as_mut() {
868                    Node::Internal(root_internal_node) => {
869                        if root_internal_node.len <= 2 {
870                            return Ok(())
871                        }
872
873                        let split_pos = root_internal_node.len / 2; // TODO choose a better split position if bulk loading
874                        let split_key = root_internal_node.key_at(split_pos).expect("should exist").clone();
875
876                        let mut new_right_node_owned = Owned::new(
877                            HybridLatch::new(Node::Internal(InternalNode::new()))
878                        );
879
880                        {
881                            let new_right_node = new_right_node_owned.as_mut().as_mut().as_internal_mut();
882                            root_internal_node.split(new_right_node, split_pos);
883                        }
884
885                        let old_root_edge = Atomic::from(tree_guard_x.load(Ordering::Acquire, eg));
886                        let new_right_node_edge = Atomic::<HybridLatch<Node<K, V, IC, LC>>>::from(new_right_node_owned);
887
888                        {
889                            let new_root = new_root_owned.as_mut().as_mut().as_internal_mut();
890                            new_root.insert(split_key, old_root_edge).expect("must have space");
891                            new_root.upper_edge = Some(new_right_node_edge);
892                        }
893                    }
894                    Node::Leaf(root_leaf_node) => {
895                        if root_leaf_node.len <= 2 {
896                            return Ok(())
897                        }
898
899                        let split_pos = root_leaf_node.len / 2; // TODO choose a better split position if bulk loading
900                        let split_key = root_leaf_node.key_at(split_pos).expect("should exist").clone();
901
902                        let mut new_right_node_owned = Owned::new(
903                            HybridLatch::new(Node::Leaf(LeafNode::new()))
904                        );
905
906                        {
907                            let new_right_node = new_right_node_owned.as_mut().as_mut().as_leaf_mut();
908                            root_leaf_node.split(new_right_node, split_pos);
909                        }
910
911                        let old_root_edge = Atomic::from(tree_guard_x.load(Ordering::Acquire, eg));
912                        let new_right_node_edge = Atomic::<HybridLatch<Node<K, V, IC, LC>>>::from(new_right_node_owned);
913
914                        {
915                            let new_root = new_root_owned.as_mut().as_mut().as_internal_mut();
916                            new_root.insert(split_key, old_root_edge).expect("must have space");
917                            new_root.upper_edge = Some(new_right_node_edge);
918                        }
919                    }
920                }
921
922                let new_root_node_edge = Atomic::<HybridLatch<Node<K, V, IC, LC>>>::from(new_root_owned);
923                *tree_guard_x = new_root_node_edge;
924                self.height.fetch_add(1, Ordering::Relaxed);
925            },
926            ParentHandler::Parent { parent_guard, pos } => {
927                if parent_guard.as_internal().has_space() {
928                    let swip = parent_guard.as_internal().edge_at(pos)?;
929                    let target_guard = GenericBPlusTree::lock_coupling(&parent_guard, swip, eg)?;
930                    let mut parent_guard_x = parent_guard.to_exclusive()?;
931                    let mut target_guard_x = target_guard.to_exclusive()?;
932
933                    match target_guard_x.as_mut() {
934                        Node::Internal(left_internal) => {
935                            if left_internal.len <= 2 {
936                                return Ok(())
937                            }
938
939                            let split_pos = left_internal.len / 2; // TODO choose a better split position if bulk loading
940                            let split_key = left_internal.key_at(split_pos).expect("should exist").clone();
941
942                            let mut new_right_node_owned = Owned::new(
943                                HybridLatch::new(Node::Internal(InternalNode::new()))
944                            );
945
946                            {
947                                let new_right_node = new_right_node_owned.as_mut().as_mut().as_internal_mut();
948                                left_internal.split(new_right_node, split_pos);
949                            }
950
951                            let new_right_node_edge = Atomic::<HybridLatch<Node<K, V, IC, LC>>>::from(new_right_node_owned);
952
953                            let parent_internal = parent_guard_x.as_internal_mut();
954
955                            if pos == parent_internal.len {
956                                let left_edge = parent_internal.upper_edge
957                                    .replace(new_right_node_edge)
958                                    .expect("upper_edge must be populated");
959                                parent_internal.insert(split_key, left_edge);
960                            } else {
961                                let left_edge = std::mem::replace(&mut parent_internal.edges[pos as usize], new_right_node_edge);
962                                parent_internal.insert(split_key, left_edge);
963                            }
964                        }
965                        Node::Leaf(left_leaf) => {
966                            if left_leaf.len <= 2 {
967                                return Ok(())
968                            }
969
970                            let split_pos = left_leaf.len / 2; // TODO choose a better split position if bulk loading
971                            let split_key = left_leaf.key_at(split_pos).expect("should exist").clone();
972
973                            let mut new_right_node_owned = Owned::new(
974                                HybridLatch::new(Node::Leaf(LeafNode::new()))
975                            );
976
977                            {
978                                let new_right_node = new_right_node_owned.as_mut().as_mut().as_leaf_mut();
979                                left_leaf.split(new_right_node, split_pos);
980                            }
981
982                            let new_right_node_edge = Atomic::<HybridLatch<Node<K, V, IC, LC>>>::from(new_right_node_owned);
983
984                            let parent_internal = parent_guard_x.as_internal_mut();
985
986                            if pos == parent_internal.len {
987                                let left_edge = parent_internal.upper_edge
988                                    .replace(new_right_node_edge)
989                                    .expect("upper_edge must be populated");
990                                parent_internal.insert(split_key, left_edge);
991                            } else {
992                                let left_edge = std::mem::replace(&mut parent_internal.edges[pos as usize], new_right_node_edge);
993                                parent_internal.insert(split_key, left_edge);
994                            }
995                        }
996                    }
997                } else {
998                    self.try_split(&parent_guard, eg)?;
999                }
1000            }
1001        }
1002
1003        Ok(())
1004    }
1005
1006    pub(crate) fn try_merge<'t, 'g, 'e>(&'t self, needle: &OptimisticGuard<'g, Node<K, V, IC, LC>>, eg: &'e epoch::Guard) -> error::Result<bool>
1007    where
1008        K: Ord
1009    {
1010        let parent_handler = self.find_parent(needle, eg)?;
1011
1012        match parent_handler {
1013            ParentHandler::Root { tree_guard: _ } => {
1014                return Ok(false);
1015            },
1016            ParentHandler::Parent { mut parent_guard, pos } => {
1017                let parent_len = parent_guard.as_internal().len;
1018
1019                let swip = parent_guard.as_internal().edge_at(pos)?;
1020                let mut target_guard = GenericBPlusTree::lock_coupling(&parent_guard, swip, eg)?;
1021
1022                if !target_guard.is_underfull() {
1023                    target_guard.recheck()?;
1024                    return Ok(false);
1025                }
1026
1027                let merge_succeded = if parent_len > 1 && pos > 0 {
1028                    // Try merge left
1029                    let l_swip = parent_guard.as_internal().edge_at(pos - 1)?;
1030                    let left_guard = GenericBPlusTree::lock_coupling(&parent_guard, l_swip, eg)?;
1031
1032                    if !(left_guard.can_merge_with(&target_guard)) {
1033                        left_guard.recheck()?;
1034                        target_guard.recheck()?;
1035                        false
1036                    } else {
1037                        let mut parent_guard_x = parent_guard.to_exclusive()?;
1038                        let mut target_guard_x = target_guard.to_exclusive()?;
1039                        let mut left_guard_x = left_guard.to_exclusive()?;
1040
1041                        match target_guard_x.as_mut() {
1042                            Node::Leaf(ref mut target_leaf) => {
1043                                assert!(left_guard_x.is_leaf());
1044
1045                                if !left_guard_x.as_leaf_mut().merge(target_leaf) {
1046                                    parent_guard = parent_guard_x.unlock();
1047                                    target_guard = target_guard_x.unlock();
1048                                    false
1049                                } else {
1050                                    let parent_internal = parent_guard_x.as_internal_mut();
1051                                    if pos == parent_len {
1052                                        let (_, left_edge) = parent_internal.remove_at(pos - 1);
1053                                        let dropped_edge = parent_internal
1054                                            .upper_edge
1055                                            .replace(left_edge).expect("must exist");
1056
1057                                        let shared = dropped_edge.load(Ordering::Relaxed, eg);
1058                                        if !shared.is_null() {
1059                                            unsafe { eg.defer_destroy(shared) };
1060                                        }
1061                                    } else {
1062                                        let (_, left_edge) = parent_internal
1063                                            .remove_at(pos - 1);
1064                                        let dropped_edge = std::mem::replace(&mut parent_internal.edges[(pos - 1) as usize], left_edge);
1065
1066                                        let shared = dropped_edge.load(Ordering::Relaxed, eg);
1067                                        if !shared.is_null() {
1068                                            unsafe { eg.defer_destroy(shared) };
1069                                        }
1070                                    }
1071
1072                                    parent_guard = parent_guard_x.unlock();
1073                                    target_guard = target_guard_x.unlock();
1074                                    true
1075                                }
1076                            }
1077                            Node::Internal(target_internal) => {
1078                                assert!(!left_guard_x.is_leaf());
1079
1080                                if !left_guard_x.as_internal_mut().merge(target_internal) {
1081                                    parent_guard = parent_guard_x.unlock();
1082                                    target_guard = target_guard_x.unlock();
1083                                    false
1084                                } else {
1085                                    let parent_internal = parent_guard_x.as_internal_mut();
1086                                    if pos == parent_len {
1087                                        let (_, left_edge) = parent_internal.remove_at(pos - 1);
1088                                        let dropped_edge = parent_internal
1089                                            .upper_edge
1090                                            .replace(left_edge).expect("must exist");
1091
1092                                        let shared = dropped_edge.load(Ordering::Relaxed, eg);
1093                                        if !shared.is_null() {
1094                                            unsafe { eg.defer_destroy(shared) };
1095                                        }
1096                                    } else {
1097                                        let (_, left_edge) = parent_internal
1098                                            .remove_at(pos - 1);
1099                                        let dropped_edge = std::mem::replace(&mut parent_internal.edges[(pos - 1) as usize], left_edge);
1100
1101                                        let shared = dropped_edge.load(Ordering::Relaxed, eg);
1102                                        if !shared.is_null() {
1103                                            unsafe { eg.defer_destroy(shared) };
1104                                        }
1105                                    }
1106
1107                                    parent_guard = parent_guard_x.unlock();
1108                                    target_guard = target_guard_x.unlock();
1109                                    true
1110                                }
1111                            }
1112                        }
1113                    }
1114                } else {
1115                    false
1116                };
1117
1118                let merge_succeded = if !merge_succeded && parent_len > 0 && (pos + 1) <= parent_len {
1119                    // Try merge right
1120                    let r_swip = parent_guard.as_internal().edge_at(pos + 1)?;
1121                    let right_guard = GenericBPlusTree::lock_coupling(&parent_guard, r_swip, eg)?;
1122
1123                    if !(right_guard.can_merge_with(&target_guard)) {
1124                        right_guard.recheck()?;
1125                        target_guard.recheck()?;
1126                        false
1127                    } else {
1128                        let mut parent_guard_x = parent_guard.to_exclusive()?;
1129                        let mut target_guard_x = target_guard.to_exclusive()?;
1130                        let mut right_guard_x = right_guard.to_exclusive()?;
1131
1132                        match target_guard_x.as_mut() {
1133                            Node::Leaf(ref mut target_leaf) => {
1134                                assert!(right_guard_x.is_leaf());
1135
1136                                if !target_leaf.merge(right_guard_x.as_leaf_mut()) {
1137                                    parent_guard = parent_guard_x.unlock();
1138                                    target_guard_x.unlock();
1139                                    false
1140                                } else {
1141                                    let parent_internal = parent_guard_x.as_internal_mut();
1142                                    if pos + 1 == parent_len {
1143                                        let (_, left_edge) = parent_internal.remove_at(pos);
1144                                        let dropped_edge = parent_internal
1145                                            .upper_edge
1146                                            .replace(left_edge).expect("must exist");
1147
1148                                        let shared = dropped_edge.load(Ordering::Relaxed, eg);
1149                                        if !shared.is_null() {
1150                                            unsafe { eg.defer_destroy(shared) };
1151                                        }
1152                                    } else {
1153                                        let (_, left_edge) = parent_internal
1154                                            .remove_at(pos);
1155                                        let dropped_edge = std::mem::replace(&mut parent_internal.edges[pos as usize], left_edge);
1156
1157                                        let shared = dropped_edge.load(Ordering::Relaxed, eg);
1158                                        if !shared.is_null() {
1159                                            unsafe { eg.defer_destroy(shared) };
1160                                        }
1161                                    }
1162
1163                                    parent_guard = parent_guard_x.unlock();
1164                                    target_guard_x.unlock();
1165                                    true
1166                                }
1167                            }
1168                            Node::Internal(target_internal) => {
1169                                assert!(!right_guard_x.is_leaf());
1170
1171                                if !target_internal.merge(right_guard_x.as_internal_mut()) {
1172                                    parent_guard = parent_guard_x.unlock();
1173                                    target_guard_x.unlock();
1174                                    false
1175                                } else {
1176                                    let parent_internal = parent_guard_x.as_internal_mut();
1177                                    if pos + 1 == parent_len {
1178                                        let (_, left_edge) = parent_internal.remove_at(pos);
1179                                        let dropped_edge = parent_internal
1180                                            .upper_edge
1181                                            .replace(left_edge).expect("must exist");
1182
1183                                        let shared = dropped_edge.load(Ordering::Relaxed, eg);
1184                                        if !shared.is_null() {
1185                                            unsafe { eg.defer_destroy(shared) };
1186                                        }
1187                                    } else {
1188                                        let (_, left_edge) = parent_internal
1189                                            .remove_at(pos);
1190                                        let dropped_edge = std::mem::replace(&mut parent_internal.edges[pos as usize], left_edge);
1191
1192                                        let shared = dropped_edge.load(Ordering::Relaxed, eg);
1193                                        if !shared.is_null() {
1194                                            unsafe { eg.defer_destroy(shared) };
1195                                        }
1196                                    }
1197
1198                                    parent_guard = parent_guard_x.unlock();
1199                                    target_guard_x.unlock();
1200                                    true
1201                                }
1202                            }
1203                        }
1204                    }
1205                } else {
1206                    merge_succeded
1207                };
1208
1209                let parent_merge = || { // TODO test if we should ensure parent merges always happen
1210                    if parent_guard.is_underfull() {
1211                        parent_guard.recheck()?;
1212                        let _ = self.try_merge(&parent_guard, eg)?;
1213                    }
1214                    error::Result::Ok(())
1215                };
1216
1217                let _ = parent_merge();
1218
1219                return Ok(merge_succeded)
1220            }
1221        }
1222    }
1223
1224    /// Returns a raw iterator over the entries of the tree.
1225    ///
1226    /// Raw iterators do not implement the [`std::iter::Iterator`] trait,
1227    /// instead they provide an API similar to RocksDB iterators
1228    ///
1229    /// Uses of this iterator should be short lived as it maintains the current
1230    /// leaf locked in shared mode to provide fast scans. It is cheap to create a new iterator.
1231    ///
1232    /// # Examples
1233    ///
1234    /// Basic usage:
1235    ///
1236    /// ```
1237    /// use bplustree::BPlusTree;
1238    ///
1239    /// let tree = BPlusTree::new();
1240    ///
1241    /// tree.insert(1, "a");
1242    /// tree.insert(2, "b");
1243    /// tree.insert(3, "c");
1244    ///
1245    /// let mut iter = tree.raw_iter();
1246    ///
1247    /// iter.seek_to_first();
1248    /// assert_eq!(iter.next(), Some((&1, &"a")));
1249    /// assert_eq!(iter.next(), Some((&2, &"b")));
1250    /// assert_eq!(iter.next(), Some((&3, &"c")));
1251    /// assert_eq!(iter.next(), None);
1252    ///
1253    /// iter.seek_to_last();
1254    /// assert_eq!(iter.prev(), Some((&3, &"c")));
1255    /// assert_eq!(iter.prev(), Some((&2, &"b")));
1256    /// assert_eq!(iter.prev(), Some((&1, &"a")));
1257    ///
1258    /// iter.seek(&2);
1259    /// assert_eq!(iter.next(), Some((&2, &"b")));
1260    ///
1261    /// iter.seek_for_prev(&2);
1262    /// assert_eq!(iter.prev(), Some((&2, &"b")));
1263    ///
1264    /// // Drop this iterator as soon as possible to prevent blocking other threads
1265    /// drop(iter);
1266    /// ```
1267    pub fn raw_iter<'t>(&'t self) -> iter::RawSharedIter<'t, K, V, IC, LC>
1268    where
1269        K: Ord
1270    {
1271        iter::RawSharedIter::new(self)
1272    }
1273
1274    /// Returns a raw mutable iterator over the entries of the tree.
1275    ///
1276    /// Raw iterators do not implement the [`std::iter::Iterator`] trait,
1277    /// instead they provide an API similar to RocksDB iterators
1278    ///
1279    /// This iterator is capable of perfoming mutations to the data structure, and it is
1280    /// recommended for bulk insertion or removal of sorted data.
1281    ///
1282    /// Uses of this iterator should be short lived as it maintains the current
1283    /// leaf locked in exclusive mode to provide fast scans and changes.
1284    /// It is cheap to create a new iterator.
1285    ///
1286    /// # Examples
1287    ///
1288    /// Basic usage:
1289    ///
1290    /// ```
1291    /// use bplustree::BPlusTree;
1292    ///
1293    /// let tree = BPlusTree::new();
1294    ///
1295    /// let mut iter = tree.raw_iter_mut();
1296    ///
1297    /// iter.insert(1, "a");
1298    /// iter.insert(2, "b");
1299    /// iter.insert(3, "c");
1300    ///
1301    /// iter.seek_to_first();
1302    /// assert_eq!(iter.next(), Some((&1, &mut "a")));
1303    /// assert_eq!(iter.next(), Some((&2, &mut "b")));
1304    /// assert_eq!(iter.next(), Some((&3, &mut "c")));
1305    /// assert_eq!(iter.next(), None);
1306    ///
1307    /// iter.seek_to_first();
1308    /// let (_k, v) = iter.next().unwrap();
1309    /// *v = "A";
1310    ///
1311    /// iter.seek_to_last();
1312    /// assert_eq!(iter.prev(), Some((&3, &mut "c")));
1313    /// assert_eq!(iter.prev(), Some((&2, &mut "b")));
1314    /// assert_eq!(iter.prev(), Some((&1, &mut "A")));
1315    ///
1316    /// iter.seek(&2);
1317    /// assert_eq!(iter.next(), Some((&2, &mut "b")));
1318    ///
1319    /// iter.remove(&1);
1320    /// iter.remove(&2);
1321    /// iter.remove(&3);
1322    ///
1323    /// iter.seek_to_first();
1324    /// assert_eq!(iter.next(), None);
1325    ///
1326    /// // Drop this iterator as soon as possible to prevent blocking other threads
1327    /// drop(iter);
1328    /// ```
1329    pub fn raw_iter_mut<'t>(&'t self) -> iter::RawExclusiveIter<'t, K, V, IC, LC>
1330    where
1331        K: Ord
1332    {
1333        iter::RawExclusiveIter::new(self)
1334    }
1335
1336    /// Returns the number of elements in the tree.
1337    ///
1338    /// This is not executed on a snapshot of the data,
1339    /// so it may be inconsistent in the presence of writers
1340    ///
1341    /// # Examples
1342    ///
1343    /// Basic usage:
1344    ///
1345    /// ```
1346    /// use bplustree::BPlusTree;
1347    ///
1348    /// let tree = BPlusTree::new();
1349    /// assert_eq!(tree.len(), 0);
1350    /// tree.insert(1, "a");
1351    /// assert_eq!(tree.len(), 1);
1352    /// ```
1353    pub fn len(&self) -> usize {
1354        let mut count = 0usize;
1355        let mut iter = self.raw_iter();
1356        iter.seek_to_first();
1357        while let Some(_) = iter.next() {
1358            count += 1;
1359        }
1360        count
1361    }
1362}
1363
1364pub(crate) enum Node<K, V, const IC: usize, const LC: usize> {
1365    Internal(InternalNode<K, V, IC, LC>),
1366    Leaf(LeafNode<K, V, LC>)
1367}
1368
1369impl<K: fmt::Debug, V: fmt::Debug, const IC: usize, const LC: usize> fmt::Debug for Node<K, V, IC, LC> {
1370    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1371        match self {
1372            &Node::Internal(ref internal) => {
1373                f.debug_tuple("Internal")
1374                    .field(internal)
1375                    .finish()
1376            }
1377            &Node::Leaf(ref leaf) => {
1378                f.debug_tuple("Leaf")
1379                    .field(leaf)
1380                    .finish()
1381            }
1382        }
1383    }
1384}
1385
1386impl<K, V, const IC: usize, const LC: usize> Node<K, V, IC, LC> {
1387    #[inline]
1388    pub(crate) fn is_leaf(&self) -> bool {
1389        match self {
1390            Node::Leaf(_) => true,
1391            Node::Internal(_) => false
1392        }
1393    }
1394
1395    #[inline]
1396    pub(crate) fn as_leaf(&self) -> &LeafNode<K, V, LC> {
1397        match self {
1398            Node::Leaf(ref leaf) => leaf,
1399            Node::Internal(_) => {
1400                panic!("expected leaf node");
1401            }
1402        }
1403    }
1404
1405    #[inline]
1406    pub(crate) fn as_leaf_mut(&mut self) -> &mut LeafNode<K, V, LC> {
1407        match self {
1408            Node::Leaf(ref mut leaf) => leaf,
1409            Node::Internal(_) => {
1410                panic!("expected leaf node");
1411            }
1412        }
1413    }
1414
1415    #[inline]
1416    pub(crate) fn as_internal(&self) -> &InternalNode<K, V, IC, LC> {
1417        match self {
1418            Node::Internal(ref internal) => internal,
1419            Node::Leaf(_) => {
1420                panic!("expected internal node");
1421            }
1422        }
1423    }
1424
1425    #[inline]
1426    pub(crate) fn as_internal_mut(&mut self) -> &mut InternalNode<K, V, IC, LC> {
1427        match self {
1428            Node::Internal(ref mut internal) => internal,
1429            Node::Leaf(_) => {
1430                panic!("expected internal node");
1431            }
1432        }
1433    }
1434
1435    #[cfg(test)]
1436    #[inline]
1437    pub(crate) fn keys(&self) -> &[K] {
1438        match self {
1439            Node::Internal(ref internal) => &internal.keys,
1440            Node::Leaf(ref leaf) => &leaf.keys
1441        }
1442    }
1443
1444    #[inline]
1445    pub(crate) fn sample_key(&self) -> Option<&K> {
1446        match self {
1447            Node::Internal(ref internal) => internal.sample_key.as_ref(),
1448            Node::Leaf(ref leaf) => leaf.sample_key.as_ref()
1449        }
1450    }
1451
1452    #[inline]
1453    pub(crate) fn is_underfull(&self) -> bool {
1454        match self {
1455            Node::Internal(ref internal) => internal.is_underfull(),
1456            Node::Leaf(ref leaf) => leaf.is_underfull()
1457        }
1458    }
1459
1460    #[inline]
1461    pub(crate) fn can_merge_with(&self, other: &Self) -> bool {
1462        match self {
1463            Node::Internal(ref internal) => {
1464                match other {
1465                    Node::Internal(ref other) => {
1466                        ((internal.len + 1 + other.len) as usize) < IC /* INNER_CAPACITY */
1467                    }
1468                    _ => false
1469                }
1470            },
1471            Node::Leaf(ref leaf) => {
1472                match other {
1473                    Node::Leaf(ref other) => {
1474                        ((leaf.len + other.len) as usize) < LC /* LEAF_CAPACITY */
1475                    }
1476                    _ => false
1477                }
1478            }
1479        }
1480    }
1481}
1482
1483pub(crate) struct LeafNode<K, V, const LEAF_CAPACITY: usize> {
1484    len: u16,
1485    keys: SmallVec<[K; LEAF_CAPACITY]>,
1486    values: SmallVec<[V; LEAF_CAPACITY]>,
1487    lower_fence: Option<K>,
1488    upper_fence: Option<K>,
1489    sample_key: Option<K> // All but the root must keep a sample key
1490}
1491
1492impl<K: fmt::Debug, V: fmt::Debug, const LEAF_CAPACITY: usize> fmt::Debug for LeafNode<K, V, LEAF_CAPACITY> {
1493    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1494        f.debug_struct("LeafNode")
1495         .field("len", &self.len)
1496         .field("keys", &self.keys)
1497         .field("values", &self.values)
1498         .field("lower_fence", &self.lower_fence)
1499         .field("upper_fence", &self.upper_fence)
1500         .field("sample_key", &self.sample_key)
1501         .finish()
1502    }
1503}
1504
1505impl<K, V, const LEAF_CAPACITY: usize> LeafNode<K, V, LEAF_CAPACITY> {
1506    pub fn new() -> LeafNode<K, V, LEAF_CAPACITY> {
1507        LeafNode {
1508            len: 0,
1509            keys: smallvec![],
1510            values: smallvec![],
1511            lower_fence: None,
1512            upper_fence: None,
1513            sample_key: None
1514        }
1515    }
1516
1517    #[inline]
1518    pub(crate) fn lower_bound<Q>(&self, key: &Q) -> (u16, bool)
1519    where
1520        K: Borrow<Q> + Ord,
1521        Q: ?Sized + Ord
1522    {
1523        if self.lower_fence().map(|fk| key < fk.borrow()).unwrap_or(false) {
1524            return (0, false);
1525        }
1526
1527        match self.upper_fence() {
1528            Some(fk) => {
1529                if key > fk.borrow() {
1530                    return (self.len, false);
1531                }
1532            }
1533            None => {}
1534        }
1535
1536        let mut lower = 0;
1537        let mut upper = self.len;
1538
1539        while lower < upper {
1540            let mid = ((upper - lower) / 2) + lower;
1541
1542            if key < unsafe { self.keys.get_unchecked(mid as usize) }.borrow() {
1543                upper = mid;
1544            } else if key > unsafe { self.keys.get_unchecked(mid as usize) }.borrow() {
1545                lower = mid + 1;
1546            } else {
1547                return (mid, true);
1548            }
1549        }
1550
1551        (lower, false)
1552    }
1553
1554    #[inline]
1555    pub(crate) fn lower_fence(&self) -> Option<&K> {
1556        self.lower_fence.as_ref()
1557    }
1558
1559    #[inline]
1560    pub(crate) fn upper_fence(&self) -> Option<&K> {
1561        self.upper_fence.as_ref()
1562    }
1563
1564    #[inline]
1565    pub(crate) fn value_at(&self, pos: u16) -> error::Result<&V> {
1566        Ok(unsafe { self.values.get_unchecked(pos as usize) })
1567    }
1568
1569    #[inline]
1570    pub(crate) fn key_at(&self, pos: u16) -> error::Result<&K> {
1571        Ok(unsafe { self.keys.get_unchecked(pos as usize) })
1572    }
1573
1574    #[inline]
1575    pub(crate) fn kv_at(&self, pos: u16) -> error::Result<(&K, &V)> {
1576        Ok((self.key_at(pos)?, self.value_at(pos)?))
1577    }
1578
1579    #[inline]
1580    pub(crate) fn kv_at_mut(&mut self, pos: u16) -> error::Result<(&K, &mut V)> {
1581        Ok(unsafe { (self.keys.get_unchecked(pos as usize), self.values.get_unchecked_mut(pos as usize)) })
1582    }
1583
1584    #[inline]
1585    pub(crate) fn has_space(&self) -> bool {
1586        (self.len as usize) < LEAF_CAPACITY
1587    }
1588
1589    #[inline]
1590    pub(crate) fn is_underfull(&self) -> bool {
1591        (self.len as usize) < (LEAF_CAPACITY as f32 * 0.4) as usize
1592    }
1593
1594    // OBS: Does not trigger splits
1595    #[cfg(test)]
1596    pub(crate) fn insert(&mut self, key: K, value: V) -> Option<u16>
1597    where
1598        K: Ord
1599    {
1600        let (pos, exact) = self.lower_bound(&key);
1601        if exact {
1602            unimplemented!("upserts");
1603        } else {
1604            if !self.has_space() {
1605                return None;
1606            }
1607
1608            self.keys.insert(pos as usize, key);
1609            self.values.insert(pos as usize, value);
1610            self.len += 1;
1611        }
1612        Some(pos)
1613    }
1614
1615    // OBS: Does not trigger splits
1616    pub(crate) fn insert_at(&mut self, pos: u16, key: K, value: V) -> Option<u16> {
1617        if !self.has_space() {
1618            return None;
1619        }
1620
1621        self.keys.insert(pos as usize, key);
1622        self.values.insert(pos as usize, value);
1623        self.len += 1;
1624
1625        Some(pos)
1626    }
1627
1628    // OBS: Does not trigger merges
1629    #[cfg(test)]
1630    pub(crate) fn remove<Q>(&mut self, key: &Q) -> Option<V>
1631    where
1632        K: Borrow<Q> + Ord,
1633        Q: ?Sized + Ord
1634    {
1635        let (pos, exact) = self.lower_bound(key);
1636        if !exact {
1637            return None;
1638        } else {
1639            let _ = self.keys.remove(pos as usize);
1640            let value = self.values.remove(pos as usize); // TODO Can be improved by postponing compaction
1641            self.len -= 1;
1642
1643            return Some(value);
1644        }
1645    }
1646
1647    pub(crate) fn remove_at(&mut self, pos: u16) -> (K, V) {
1648        self.len -= 1;
1649        let key = self.keys.remove(pos as usize);
1650        let value = self.values.remove(pos as usize);
1651
1652        (key, value)
1653    }
1654
1655    #[inline]
1656    pub(crate) fn within_bounds<Q>(&self, key: &Q) -> bool
1657    where
1658        K: Borrow<Q> + Ord,
1659        Q: ?Sized + Ord
1660    {
1661        match (self.lower_fence().map(Borrow::borrow), self.upper_fence().map(Borrow::borrow)) {
1662            (Some(lf), Some(uf)) => key > lf && key <= uf,
1663            (Some(lf), None) => key > lf,
1664            (None, Some(uf)) => key <= uf,
1665            (None, None) => true
1666        }
1667    }
1668}
1669
1670impl<K: Clone, V, const LEAF_CAPACITY: usize> LeafNode<K, V, LEAF_CAPACITY> {
1671    pub(crate) fn split(&mut self, right: &mut LeafNode<K, V, LEAF_CAPACITY>, split_pos: u16) {
1672        let split_key = self.key_at(split_pos).expect("should exist").clone();
1673        right.lower_fence = Some(split_key.clone());
1674        right.upper_fence = self.upper_fence.clone();
1675
1676        self.upper_fence = Some(split_key);
1677
1678        assert!(right.keys.len() == 0);
1679        assert!(right.values.len() == 0);
1680        right.keys.extend(self.keys.drain((split_pos + 1) as usize..));
1681        right.values.extend(self.values.drain((split_pos + 1) as usize..));
1682
1683        self.sample_key = Some(self.keys[0].clone());
1684        right.sample_key = Some(right.keys[0].clone());
1685
1686        right.len = right.keys.len() as u16;
1687        self.len = self.keys.len() as u16; // TODO move this to before drain
1688    }
1689
1690    pub(crate) fn merge(&mut self, right: &mut LeafNode<K, V, LEAF_CAPACITY>) -> bool {
1691        if (self.len + right.len) as usize > LEAF_CAPACITY { // TODO better merge threshold
1692            return false;
1693        }
1694
1695        right.len = 0;
1696        self.upper_fence = right.upper_fence.take();
1697        self.keys.extend(right.keys.drain(..));
1698        self.values.extend(right.values.drain(..));
1699
1700        self.sample_key = right.sample_key.take();
1701
1702        self.len = self.keys.len() as u16;
1703        true
1704    }
1705}
1706
1707pub(crate) struct InternalNode<K, V, const INNER_CAPACITY: usize, const LC: usize> {
1708    len: u16,
1709    keys: SmallVec<[K; INNER_CAPACITY]>,
1710    edges: SmallVec<[Atomic<HybridLatch<Node<K, V, INNER_CAPACITY, LC>>>; INNER_CAPACITY]>,
1711    upper_edge: Option<Atomic<HybridLatch<Node<K, V, INNER_CAPACITY, LC>>>>,
1712    lower_fence: Option<K>,
1713    upper_fence: Option<K>,
1714    sample_key: Option<K> // All but the root must keep a sample key
1715}
1716
1717impl<K: fmt::Debug, V: fmt::Debug, const INNER_CAPACITY: usize, const LC: usize> fmt::Debug for InternalNode<K, V, INNER_CAPACITY, LC> {
1718    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1719        f.debug_struct("InternalNode")
1720         .field("len", &self.len)
1721         .field("keys", &self.keys)
1722         .field("edges", &self.edges)
1723         .field("upper_edge", &self.upper_edge)
1724         .field("lower_fence", &self.lower_fence)
1725         .field("upper_fence", &self.upper_fence)
1726         .field("sample_key", &self.sample_key)
1727         .finish()
1728    }
1729}
1730
1731impl<K, V, const INNER_CAPACITY: usize, const LC: usize> InternalNode<K, V, INNER_CAPACITY, LC> {
1732    pub(crate) fn new() -> InternalNode<K, V, INNER_CAPACITY, LC> {
1733        InternalNode {
1734            len: 0,
1735            keys: smallvec![],
1736            edges: smallvec![],
1737            upper_edge: None,
1738            lower_fence: None,
1739            upper_fence: None,
1740            sample_key: None
1741        }
1742    }
1743
1744    #[inline]
1745    pub(crate) fn lower_bound<Q>(&self, key: &Q) -> (u16, bool)
1746    where
1747        K: Borrow<Q> + Ord,
1748        Q: ?Sized + Ord
1749    {
1750        if self.lower_fence().map(|fk| key < fk.borrow()).unwrap_or(false) {
1751            return (0, false);
1752        }
1753
1754        match self.upper_fence() {
1755            Some(fk) => {
1756                if key > fk.borrow() {
1757                    return (self.len, false);
1758                }
1759            }
1760            None => {}
1761        }
1762
1763        let mut lower = 0;
1764        let mut upper = self.len;
1765
1766        while lower < upper {
1767            let mid = ((upper - lower) / 2) + lower;
1768
1769            if key < unsafe { self.keys.get_unchecked(mid as usize) }.borrow() {
1770                upper = mid;
1771            } else if key > unsafe { self.keys.get_unchecked(mid as usize) }.borrow() {
1772                lower = mid + 1;
1773            } else {
1774                return (mid, true);
1775            }
1776        }
1777
1778        (lower, false)
1779    }
1780
1781    #[inline]
1782    pub(crate) fn lower_fence(&self) -> Option<&K> {
1783        self.lower_fence.as_ref()
1784    }
1785
1786    #[inline]
1787    pub(crate) fn upper_fence(&self) -> Option<&K> {
1788        self.upper_fence.as_ref()
1789    }
1790
1791    #[inline]
1792    pub(crate) fn edge_at(&self, pos: u16) -> error::Result<&Atomic<HybridLatch<Node<K, V, INNER_CAPACITY, LC>>>> {
1793        if pos == self.len {
1794            if let Some(upper_edge) = self.upper_edge.as_ref() {
1795                Ok(upper_edge)
1796            } else {
1797                println!("unwind due to missing upper edge");
1798                Err(error::Error::Unwind)
1799            }
1800        } else {
1801            Ok(unsafe { self.edges.get_unchecked(pos as usize) })
1802        }
1803    }
1804
1805    #[inline]
1806    pub(crate) fn key_at(&self, pos: u16) -> error::Result<&K> {
1807        Ok(unsafe { self.keys.get_unchecked(pos as usize) })
1808    }
1809
1810    #[inline]
1811    pub(crate) fn has_space(&self) -> bool {
1812        (self.len as usize) < INNER_CAPACITY
1813    }
1814
1815    #[inline]
1816    pub(crate) fn is_underfull(&self) -> bool {
1817        (self.len as usize) < (INNER_CAPACITY as f32 * 0.4) as usize
1818    }
1819
1820    // OBS: Does not trigger splits nor updates upper_edge
1821    pub(crate) fn insert(&mut self, key: K, value: Atomic<HybridLatch<Node<K, V, INNER_CAPACITY, LC>>>) -> Option<u16>
1822    where
1823        K: Ord
1824    {
1825        let (pos, exact) = self.lower_bound(&key);
1826        if exact {
1827            unimplemented!("upserts");
1828        } else {
1829            if !self.has_space() {
1830                return None;
1831            }
1832
1833            self.keys.insert(pos as usize, key);
1834            self.edges.insert(pos as usize, value);
1835            self.len += 1;
1836        }
1837        Some(pos)
1838    }
1839
1840    // OBS: Does not trigger merges or reassings upper_edge
1841    #[allow(dead_code)]
1842    pub(crate) fn remove<Q>(&mut self, key: &Q) -> Option<Atomic<HybridLatch<Node<K, V, INNER_CAPACITY, LC>>>>
1843    where
1844        K: Borrow<Q> + Ord,
1845        Q: ?Sized + Ord
1846    {
1847        let (pos, exact) = self.lower_bound(key);
1848        if !exact {
1849            return None;
1850        } else {
1851            let _ = self.keys.remove(pos as usize);
1852            let edge = self.edges.remove(pos as usize); // TODO Can be improved by postponing compaction
1853            self.len -= 1;
1854
1855            return Some(edge);
1856        }
1857    }
1858
1859    pub(crate) fn remove_at(&mut self, pos: u16) -> (K, Atomic<HybridLatch<Node<K, V, INNER_CAPACITY, LC>>>) {
1860        let key = self.keys.remove(pos as usize);
1861        let edge = self.edges.remove(pos as usize);
1862        self.len -= 1;
1863
1864        (key, edge)
1865    }
1866}
1867
1868impl<K: Clone, V, const INNER_CAPACITY: usize, const LC: usize> InternalNode<K, V, INNER_CAPACITY, LC> {
1869    pub(crate) fn split(&mut self, right: &mut InternalNode<K, V, INNER_CAPACITY, LC>, split_pos: u16) {
1870        let split_key = self.key_at(split_pos).expect("should exist").clone();
1871        right.lower_fence = Some(split_key.clone());
1872        right.upper_fence = self.upper_fence.clone();
1873
1874        self.upper_fence = Some(split_key);
1875
1876        assert!(right.keys.len() == 0);
1877        assert!(right.edges.len() == 0);
1878        right.keys.extend(self.keys.drain((split_pos + 1) as usize..));
1879        right.edges.extend(self.edges.drain((split_pos + 1) as usize..));
1880        right.upper_edge = self.upper_edge.take();
1881
1882        self.upper_edge = Some(self.edges.pop().unwrap());
1883        self.keys.pop().unwrap();
1884
1885        self.sample_key = Some(self.keys[0].clone());
1886        right.sample_key = Some(right.keys[0].clone());
1887
1888        right.len = right.keys.len() as u16;
1889        self.len = self.keys.len() as u16;
1890    }
1891
1892    pub(crate) fn merge(&mut self, right: &mut InternalNode<K, V, INNER_CAPACITY, LC>) -> bool {
1893        if (self.len + right.len + 1 /* left upper edge */) as usize > INNER_CAPACITY {
1894            return false;
1895        }
1896
1897        let _left_upper_fence = std::mem::replace(&mut self.upper_fence, right.upper_fence.take());
1898        let left_upper_edge = std::mem::replace(&mut self.upper_edge, right.upper_edge.take());
1899
1900        self.keys.push(right.lower_fence.take().expect("right node must have lower fence"));
1901        self.edges.push(left_upper_edge.expect("left node must have upper edge"));
1902
1903        self.keys.extend(right.keys.drain(..));
1904        self.edges.extend(right.edges.drain(..));
1905
1906        self.sample_key = right.sample_key.take();
1907
1908        self.len = self.keys.len() as u16;
1909        right.len = 0;
1910
1911        true
1912    }
1913}
1914
1915#[cfg(test)]
1916mod tests {
1917    use super::{LeafNode, ParentHandler, Direction};
1918    use smallvec::smallvec;
1919    use crossbeam_epoch::{self as epoch};
1920
1921    use super::util::sample_tree;
1922
1923    #[test]
1924    fn sample_tree_lookup() {
1925        let bptree = sample_tree("fixtures/sample.json");
1926
1927        let found = bptree.lookup("0003", |value| *value);
1928        assert_eq!(found, Some(3));
1929
1930        let found = bptree.lookup("0005", |value| *value);
1931        assert_eq!(found, Some(5));
1932
1933        let found = bptree.lookup("0004", |value| *value);
1934        assert_eq!(found, None);
1935
1936        let found = bptree.lookup("0002", |value| *value);
1937        assert_eq!(found, Some(2));
1938
1939        let found = bptree.lookup("0001", |value| *value);
1940        assert_eq!(found, None);
1941    }
1942
1943    #[test]
1944    fn sample_tree_find_parent() {
1945        let bptree = sample_tree("fixtures/sample.json");
1946
1947        let eg = &epoch::pin();
1948
1949        let leaf = bptree.find_leaf("0002", eg).unwrap();
1950        assert_eq!(leaf.keys().first(), Some(&"0002".to_string()));
1951
1952        if let ParentHandler::Parent { parent_guard, pos: _ } = bptree.find_parent(&leaf, eg).unwrap() {
1953            assert_eq!(parent_guard.as_internal().keys.first(), Some(&"0002".to_string()));
1954
1955            if let ParentHandler::Parent { parent_guard, pos: _ } = bptree.find_parent(&parent_guard, eg).unwrap() {
1956                assert_eq!(parent_guard.as_internal().keys.first(), Some(&"0003".to_string()));
1957
1958                if let ParentHandler::Root { tree_guard } = bptree.find_parent(&parent_guard, eg).unwrap() {
1959                    assert_eq!(&bptree.root as *const _, tree_guard.latch() as *const _);
1960                } else {
1961                    panic!("missing root");
1962                }
1963            } else {
1964                panic!("missing parent");
1965            }
1966        } else {
1967            panic!("missing parent");
1968        }
1969
1970        let leaf = bptree.find_leaf("0005", eg).unwrap();
1971        assert_eq!(leaf.keys().first(), Some(&"0005".to_string()));
1972
1973        if let ParentHandler::Parent { parent_guard, pos: _ } = bptree.find_parent(&leaf, eg).unwrap() {
1974            assert_eq!(parent_guard.as_internal().keys.first(), Some(&"0005".to_string()));
1975        } else {
1976            panic!("missing parent");
1977        }
1978    }
1979
1980    #[test]
1981    fn sample_tree_find_nearest_leaf() {
1982        let bptree = sample_tree("fixtures/sample.json");
1983
1984        let eg = &epoch::pin();
1985
1986        let leaf = bptree.find_leaf("0002", eg).unwrap();
1987        assert_eq!(leaf.keys().first(), Some(&"0002".to_string()));
1988
1989        let (next_leaf, (parent, _pos)) = bptree.find_nearest_leaf(&leaf, Direction::Forward, eg).unwrap().unwrap();
1990        assert!(next_leaf.is_leaf());
1991        assert_eq!(next_leaf.keys().first(), Some(&"0003".to_string()));
1992        assert_eq!(parent.keys().first(), Some(&"0002".to_string()));
1993
1994        let (next_leaf, (parent, _pos)) = bptree.find_nearest_leaf(&next_leaf, Direction::Forward, eg).unwrap().unwrap();
1995        assert!(next_leaf.is_leaf());
1996        assert_eq!(next_leaf.keys().first(), Some(&"0005".to_string()));
1997        assert_eq!(parent.keys().first(), Some(&"0005".to_string()));
1998
1999        let (next_leaf, (parent, _pos)) = bptree.find_nearest_leaf(&next_leaf, Direction::Reverse, eg).unwrap().unwrap();
2000        assert!(next_leaf.is_leaf());
2001        assert_eq!(next_leaf.keys().first(), Some(&"0003".to_string()));
2002        assert_eq!(parent.keys().first(), Some(&"0002".to_string()));
2003
2004        let (next_leaf, (parent, _pos)) = bptree.find_nearest_leaf(&next_leaf, Direction::Reverse, eg).unwrap().unwrap();
2005        assert!(next_leaf.is_leaf());
2006        assert_eq!(next_leaf.keys().first(), Some(&"0002".to_string()));
2007        assert_eq!(parent.keys().first(), Some(&"0002".to_string()));
2008
2009        assert!(bptree.find_nearest_leaf(&next_leaf, Direction::Reverse, eg).unwrap().is_none());
2010
2011        let (next_leaf, (parent, _pos)) = bptree.find_nearest_leaf(&parent, Direction::Forward, eg).unwrap().unwrap();
2012        assert!(next_leaf.is_leaf());
2013        assert_eq!(next_leaf.keys().first(), Some(&"0005".to_string()));
2014        assert_eq!(parent.keys().first(), Some(&"0005".to_string()));
2015    }
2016
2017    #[test]
2018    fn leaf_node_lower_bound() {
2019        let node: LeafNode<_, _, 16> = LeafNode {
2020            len: 3,
2021            keys: smallvec!["0001".to_string(), "0002".to_string(), "0004".to_string()],
2022            values: smallvec![1u64, 2, 4],
2023            lower_fence: None,
2024            upper_fence: None,
2025            sample_key: None
2026        };
2027
2028        assert!(node.lower_bound("0001") == (0, true));
2029        assert!(node.lower_bound("0002") == (1, true));
2030        assert!(node.lower_bound("00002") == (0, false));
2031        assert!(node.lower_bound("0005") == (3, false));
2032        assert!(node.lower_bound("0003") == (2, false));
2033    }
2034
2035    #[test]
2036    fn leaf_node_insert() {
2037        let mut node: LeafNode<_, _, 16> = LeafNode {
2038            len: 0,
2039            keys: smallvec![],
2040            values: smallvec![],
2041            lower_fence: None,
2042            upper_fence: None,
2043            sample_key: None
2044        };
2045
2046        node.insert("0001", 1u64).unwrap();
2047        node.insert("0002", 2u64).unwrap();
2048        node.insert("0004", 4u64).unwrap();
2049
2050        assert!(node.lower_bound("0001") == (0, true));
2051        assert!(node.lower_bound("0002") == (1, true));
2052        assert!(node.lower_bound("00002") == (0, false));
2053        assert!(node.lower_bound("0005") == (3, false));
2054        assert!(node.lower_bound("0003") == (2, false));
2055
2056        node.remove("0001").unwrap();
2057        node.remove("0002").unwrap();
2058        node.remove("0004").unwrap();
2059
2060        assert!(node.remove("0005").is_none());
2061
2062        assert!(node.len == 0);
2063
2064        assert!(node.lower_bound("0001") == (0, false));
2065    }
2066}