bztree/
lib.rs

1//! # BzTree
2//! Concurrent B-tree implementation based on paper
3//! [BzTree: A High-Performance Latch-free Range Index for Non-Volatile Memory](http://www.vldb.org/pvldb/vol11/p553-arulraj.pdf).
4//! Current implementation doesn't support non-volatile memory and supposed to be used only as
5//! in-memory(not persistent) data structure.
6//! BzTree uses [MwCAS](https://crates.io/crates/mwcas) crate to get access to multi-word CAS.
7//!
8//! # Examples
9//! ```
10//! use bztree::BzTree;
11//!
12//! let mut tree = BzTree::with_node_size(2);
13//! let guard = crossbeam_epoch::pin();
14//!
15//! let key1 = "key_1".to_string();
16//! assert!(tree.insert(key1.clone(), 1, &crossbeam_epoch::pin()));
17//! assert!(!tree.insert(key1.clone(), 5, &crossbeam_epoch::pin()));
18//! tree.upsert(key1.clone(), 10, &crossbeam_epoch::pin());
19//!
20//! assert!(matches!(tree.delete(&key1, &guard), Some(&10)));
21//!
22//! let key2 = "key_2".to_string();
23//! tree.insert(key2.clone(), 2, &crossbeam_epoch::pin());
24//! assert!(tree.compute(&key2, |(_, v)| Some(v + 1), &guard));
25//! assert!(matches!(tree.get(&key2, &guard), Some(&3)));
26//!
27//! assert!(tree.compute(&key2, |(_, v)| {
28//!     if *v == 3 {
29//!         None
30//!     } else {
31//!         Some(v + 1)
32//!     }
33//! }, &guard));
34//! assert!(matches!(tree.get(&key2, &guard), None));
35//! ```
36
37mod node;
38mod scanner;
39
40use crate::node::{DeleteError, InsertError, MergeMode, Node, NodeEdge, SplitMode};
41use crate::scanner::Scanner;
42use crate::NodePointer::{Interim, Leaf};
43use crossbeam_epoch::Guard;
44use mwcas::{HeapPointer, MwCas};
45use node::status_word::StatusWord;
46use std::borrow::Borrow;
47use std::fmt::Debug;
48use std::ops::{Deref, DerefMut, RangeBounds};
49use std::option::Option::Some;
50use std::ptr;
51
52/// BzTree data structure.
53///
54/// # Key-value trait bounds
55///
56/// Keys should implement [Ord] and [Clone], values should be [Clone], [Send] and [Sync].
57///
58/// Because of nature of concurrent trees, node split/merge operations are optimistic and can fail:
59/// implementation creates copy of nodes which is a part of split/merge process
60/// and try to replace existing node by new one. At one point in time, 2 nodes can
61/// points to the same key-value pair. To reduce count of unsafe code inside tree implementation,
62/// key and values should implement [Clone].
63///
64/// # Visibility of changes
65/// Changes made by `insert` and `delete` operations will be immediately visible through
66/// `get`/`scan`
67/// operations if `insert`/`delete` happens before `get`/`scan`.
68/// `Scan/range` operation may see tree updates which made concurrently with scanner progress, e.g.
69/// scanner doesn't provide snapshot view of tree.
70///
71/// # Memory reclamation
72/// BzTree uses [crossbeam_epoch::Guard] memory reclamation to free memory of removed/replaced
73/// key-values.
74/// Because of internal representation of tree nodes, drop of some removed keys can be delayed
75/// until node split/merge. This limitation caused by 'sorted' space inside tree node which uses
76/// binary search to locate keys inside. Current implementation of binary search should have an
77/// access to removed/replaced keys during search.
78///
79/// # Heap allocation
80/// BzTree allocates memory for nodes on heap. Key and values in leaf nodes(e.g., nodes which
81/// stores actual user data) stored directly in node memory without additional heap allocations.
82/// Links to other nodes, stored in interim nodes, allocated on heap.
83///
84/// # Thread safety
85/// BzTree can be safely shared between threads, e.g. it implements [Send] ans [Sync].
86pub struct BzTree<K: Ord, V> {
87    root: NodeLink<K, V>,
88    node_size: usize,
89}
90
91type NodeLink<K, V> = HeapPointer<NodePointer<K, V>>;
92
93unsafe impl<K: Ord, V> Send for BzTree<K, V> {}
94
95unsafe impl<K: Ord, V> Sync for BzTree<K, V> {}
96
97/// Leaf node of tree actually store KV pairs.
98type LeafNode<K, V> = Node<K, V>;
99/// Interim node store links to other nodes(leaf and interim).
100/// Each cell in interim node points to tree node which contain
101/// keys less or equal to cell key.
102/// Interim node always contain special guard cell which represent
103/// 'positive infinite' key. This cell used to store link to node
104/// which keys greater than any other key in current interim node.
105type InterimNode<K, V> = Node<Key<K>, NodeLink<K, V>>;
106
107impl<K, V> BzTree<K, V>
108where
109    K: Clone + Ord,
110    V: Clone + Send + Sync,
111{
112    /// Create new tree with default node size.
113    pub fn new() -> BzTree<K, V> {
114        Self::with_node_size(60)
115    }
116
117    /// Create new tree with passed node size.
118    pub fn with_node_size(node_size: u16) -> BzTree<K, V> {
119        assert!(node_size > 1, "Max node elements should be > 1");
120        let root = Node::with_capacity(node_size);
121        BzTree {
122            root: HeapPointer::new(NodePointer::new_leaf(root)),
123            node_size: node_size as usize,
124        }
125    }
126
127    /// Insert key-value pair to tree if no elements with same key already exists.
128    /// # Return
129    /// Returns true if key-value pair successfully inserted, otherwise false if key already in
130    /// tree.
131    /// # Examples
132    /// ```
133    /// use bztree::BzTree;
134    ///
135    /// let tree = BzTree::new();
136    /// let guard = crossbeam_epoch::pin();
137    ///
138    /// let key = "key".to_string();
139    /// assert!(tree.insert(key.clone(), 1, &guard));
140    /// assert!(!tree.insert(key.clone(), 5, &guard));
141    /// ```
142    pub fn insert(&self, key: K, value: V, guard: &Guard) -> bool {
143        let mut value: V = value;
144        let search_key = Key::new(key.clone());
145        loop {
146            let node = self.find_leaf_for_key(&search_key, true, guard).unwrap();
147            match node.insert(key.clone(), value, guard) {
148                Ok(_) => {
149                    return true;
150                }
151                Err(InsertError::Split(val)) => {
152                    value = val;
153                    // try to find path to overflowed node
154                    let leaf_ptr = node as *const LeafNode<K, V>;
155                    let path = self.find_path_to_key(&search_key, false, guard);
156                    if let Some(path) = path {
157                        if let Leaf(found_leaf) = path.node_pointer {
158                            // if overflowed node is not split/merged by other thread during insert
159                            if ptr::eq(leaf_ptr, found_leaf.deref()) {
160                                self.split_leaf(path, guard);
161                            }
162                        }
163                    }
164                }
165                Err(InsertError::DuplicateKey) => {
166                    return false;
167                }
168                Err(InsertError::Retry(val)) | Err(InsertError::NodeFrozen(val)) => {
169                    value = val;
170                }
171            }
172        }
173    }
174
175    /// Insert key-value pair if not already exists, otherwise replace value of existing element.
176    /// # Return
177    /// Returns value previously associated with same key or `None`.
178    ///
179    /// # Examples
180    /// ```
181    /// use bztree::BzTree;
182    ///
183    /// let tree = BzTree::new();
184    /// let guard = crossbeam_epoch::pin();
185    ///
186    /// let key = "key".to_string();
187    /// assert!(matches!(tree.upsert(key.clone(), 10, &guard), None));
188    /// assert!(matches!(tree.upsert(key.clone(), 11, &guard), Some(10)));
189    /// ```
190    pub fn upsert<'g>(&'g self, key: K, value: V, guard: &'g Guard) -> Option<&'g V> {
191        let mut value: V = value;
192        let search_key = Key::new(key.clone());
193        loop {
194            // use raw pointer to overcome borrowing rules in loop
195            // which borrows value even on return statement
196            let node = self.find_leaf_for_key(&search_key, true, guard).unwrap();
197            match node.upsert(key.clone(), value, guard) {
198                Ok(prev_val) => {
199                    return prev_val;
200                }
201                Err(InsertError::Split(val)) => {
202                    value = val;
203                    // try to find path to overflowed node
204                    let path = self.find_path_to_key(&search_key, false, guard);
205                    if let Some(path) = path {
206                        if let Leaf(found_leaf) = path.node_pointer {
207                            // if overflowed node is not split/merged by other thread
208                            if ptr::eq(node, found_leaf.deref()) {
209                                self.split_leaf(path, guard);
210                            }
211                        }
212                    }
213                }
214                Err(InsertError::DuplicateKey) => {
215                    panic!("Duplicate key error reported on upsert")
216                }
217                Err(InsertError::Retry(val)) | Err(InsertError::NodeFrozen(val)) => {
218                    value = val;
219                }
220            };
221        }
222    }
223
224    /// Delete value associated with key.
225    /// # Return
226    /// Returns removed value if key found in tree.
227    ///
228    /// # Examples
229    /// ```
230    /// use bztree::BzTree;
231    ///
232    /// let tree = BzTree::new();
233    /// let guard = crossbeam_epoch::pin();
234    ///
235    /// let key = "key".to_string();
236    /// assert!(tree.insert(key.clone(), 10, &guard));
237    /// assert!(matches!(tree.delete(&key, &guard), Some(&10)));
238    /// ```
239    pub fn delete<'g, Q>(&'g self, key: &Q, guard: &'g Guard) -> Option<&'g V>
240    where
241        K: Borrow<Q>,
242        Q: Ord + Clone,
243    {
244        let search_key = Key::new(key.clone());
245        loop {
246            // use raw pointer to overcome borrowing rules in loop
247            // which borrows value even on return statement
248            if let Some(node) = self.find_leaf_for_key(&search_key, false, guard) {
249                let len = node.estimated_len(guard);
250                match node.delete(key.borrow(), guard) {
251                    Ok(val) => {
252                        if self.should_merge(len - 1) {
253                            self.merge_recursive(&search_key, guard);
254                        }
255                        return Some(val);
256                    }
257                    Err(DeleteError::KeyNotFound) => return None,
258                    Err(DeleteError::Retry) => {}
259                }
260            } else {
261                return None;
262            }
263        }
264    }
265
266    /// Get value associated with key.
267    ///
268    /// # Examples
269    /// ```
270    /// use bztree::BzTree;
271    ///
272    /// let tree = BzTree::new();
273    /// let guard = crossbeam_epoch::pin();
274    ///
275    /// let key = "key".to_string();
276    /// assert!(tree.insert(key.clone(), 10, &guard));
277    /// assert!(matches!(tree.get(&key, &guard), Some(&10)));
278    /// ```
279    pub fn get<'g, Q>(&'g self, key: &Q, guard: &'g Guard) -> Option<&'g V>
280    where
281        K: Borrow<Q>,
282        Q: Clone + Ord,
283    {
284        let search_key = Key::new(key.clone());
285        self.find_leaf_for_key(&search_key, false, guard)
286            .and_then(|node| node.get(key, guard).map(|(_, val, _, _)| val))
287    }
288
289    /// Create tree range scanner which will return values whose keys is in passed range.
290    ///
291    /// Visibility of changes made in tree during scan, described in [BzTree] doc.
292    ///
293    /// # Examples
294    /// ```
295    /// use bztree::BzTree;
296    ///
297    /// let tree = BzTree::new();
298    /// let guard = crossbeam_epoch::pin();
299    ///
300    /// tree.insert("key1".to_string(), 1, &guard);
301    /// tree.insert("key2".to_string(), 2, &guard);
302    /// tree.insert("key3".to_string(), 3, &guard);
303    ///
304    /// let mut scanner = tree.range("key1".to_string()..="key2".to_string(), &guard);
305    /// assert!(matches!(scanner.next(), Some((_, &1))));
306    /// assert!(matches!(scanner.next(), Some((_, &2))));
307    /// assert!(matches!(scanner.next(), None));
308    /// ```
309    pub fn range<'g>(
310        &'g self,
311        // TODO: think how we can accept Q instead of K in range
312        key_range: impl RangeBounds<K> + 'g + Clone,
313        guard: &'g Guard,
314    ) -> impl DoubleEndedIterator<Item = (&'g K, &'g V)> + 'g {
315        return match self.root.read(guard) {
316            Leaf(root) => Scanner::from_leaf_root(root, key_range, guard),
317            Interim(root) => Scanner::from_non_leaf_root(root, key_range, guard),
318        };
319    }
320
321    /// Create iterator through all key-values of tree.
322    ///
323    /// Iterator based on tree range scanner and have same changes visibility guarantees.
324    ///
325    /// # Examples
326    /// ```
327    /// use bztree::BzTree;
328    ///
329    /// let tree = BzTree::new();
330    /// let guard = crossbeam_epoch::pin();
331    ///
332    /// tree.insert("key1".to_string(), 1, &guard);
333    /// tree.insert("key2".to_string(), 2, &guard);
334    /// tree.insert("key3".to_string(), 3, &guard);
335    ///
336    /// let mut scanner = tree.iter(&guard);
337    /// assert!(matches!(scanner.next(), Some((_, &1))));
338    /// assert!(matches!(scanner.next(), Some((_, &2))));
339    /// assert!(matches!(scanner.next(), Some((_, &3))));
340    /// assert!(matches!(scanner.next(), None));
341    /// ```
342    pub fn iter<'g>(&'g self, guard: &'g Guard) -> impl DoubleEndedIterator<Item = (&'g K, &'g V)> {
343        self.range(.., guard)
344    }
345
346    /// Return first element of tree according to key ordering.
347    ///
348    /// # Examples
349    /// ```
350    /// use bztree::BzTree;
351    ///
352    /// let tree = BzTree::new();
353    /// let guard = crossbeam_epoch::pin();
354    ///
355    /// tree.insert("key1".to_string(), 1, &guard);
356    /// tree.insert("key2".to_string(), 2, &guard);
357    /// tree.insert("key3".to_string(), 3, &guard);
358    ///
359    /// assert!(matches!(tree.first(&guard), Some((_, &1))));
360    /// ```
361    #[inline]
362    pub fn first<'g>(&'g self, guard: &'g Guard) -> Option<(&'g K, &'g V)> {
363        self.iter(guard).next()
364    }
365
366    /// Return last element of tree according to key ordering.
367    ///
368    /// # Examples
369    /// ```
370    /// use bztree::BzTree;
371    ///
372    /// let tree = BzTree::new();
373    /// let guard = crossbeam_epoch::pin();
374    ///
375    /// tree.insert("key1".to_string(), 1, &guard);
376    /// tree.insert("key2".to_string(), 2, &guard);
377    /// tree.insert("key3".to_string(), 3, &guard);
378    ///
379    /// assert!(matches!(tree.last(&guard), Some((_, &3))));
380    /// ```
381    #[inline]
382    pub fn last<'g>(&'g self, guard: &'g Guard) -> Option<(&'g K, &'g V)> {
383        self.iter(guard).rev().next()
384    }
385
386    /// Remove and return first element of tree according to key ordering.
387    ///
388    /// # Examples
389    /// ```
390    /// use bztree::BzTree;
391    ///
392    /// let tree = BzTree::new();
393    /// let guard = crossbeam_epoch::pin();
394    ///
395    /// tree.insert("key1".to_string(), 1, &guard);
396    /// tree.insert("key2".to_string(), 2, &guard);
397    /// tree.insert("key3".to_string(), 3, &guard);
398    ///
399    /// assert!(matches!(tree.pop_first(&guard), Some((_, &1))));
400    /// assert!(matches!(tree.pop_first(&guard), Some((_, &2))));
401    /// assert!(matches!(tree.pop_first(&guard), Some((_, &3))));
402    /// assert!(matches!(tree.pop_first(&guard), None));
403    /// ```
404    pub fn pop_first<'g>(&'g self, guard: &'g Guard) -> Option<(K, &'g V)> {
405        self.pop(NodeEdge::Left, guard)
406    }
407
408    /// Remove and return last element of tree according to key ordering.
409    ///
410    /// # Examples
411    /// ```
412    /// use bztree::BzTree;
413    ///
414    /// let tree = BzTree::new();
415    /// let guard = crossbeam_epoch::pin();
416    ///
417    /// tree.insert("key1".to_string(), 1, &guard);
418    /// tree.insert("key2".to_string(), 2, &guard);
419    /// tree.insert("key3".to_string(), 3, &guard);
420    ///
421    /// assert!(matches!(tree.pop_last(&guard), Some((_, &3))));
422    /// assert!(matches!(tree.pop_last(&guard), Some((_, &2))));
423    /// assert!(matches!(tree.pop_last(&guard), Some((_, &1))));
424    /// assert!(matches!(tree.pop_last(&guard), None));
425    /// ```
426    pub fn pop_last<'g>(&'g self, guard: &'g Guard) -> Option<(K, &'g V)> {
427        self.pop(NodeEdge::Right, guard)
428    }
429
430    fn pop<'g>(&'g self, pop_from: NodeEdge, guard: &'g Guard) -> Option<(K, &'g V)> {
431        loop {
432            let (edge_node, edge_key) = self.find_edge_node(pop_from, guard);
433            let node_status = edge_node.status_word().read(guard);
434            if node_status.is_frozen() {
435                // found node already removed from tree, restart
436                continue;
437            }
438
439            if let Some(edge_entry) = edge_node.conditional_edge_kv(pop_from, node_status, guard) {
440                // It's enough to only validate status word of leaf node to ensure that we pop
441                // current max element. While pop in progress, other threads can change tree
442                // shape by merge and split.
443                // If node which currently contains min/max element is merged/split, we will
444                // detect such change by checking node's status word.
445                // Merging of parent node also can proceed without additional validation
446                // because merge of parent do not change ordering of leaf elements.
447                // Split of parent node have 2 cases:
448                // 1. split caused by overflow of other leaf node(e.g., not current min/max node).
449                // In this case min or max node still contain min/max value and can be used for
450                // pop operation.
451                // 2. split caused by overflow of current min/max node.
452                // This case covered by status word validation.
453                let len = edge_node.estimated_len(guard);
454                match edge_node.conditional_delete(node_status, edge_entry.location, guard) {
455                    Ok(_) => {
456                        if self.should_merge(len - 1) {
457                            if let Some(k) = edge_key {
458                                self.merge_recursive(k, guard);
459                            }
460                        }
461                        return Some((edge_entry.key.clone(), edge_entry.value));
462                    }
463                    Err(DeleteError::Retry) | Err(DeleteError::KeyNotFound) => {}
464                }
465            } else {
466                if self.root.read(guard).points_to_leaf(edge_node) {
467                    // current max/min node is empty and it is the root of tree
468                    return None;
469                }
470                // node with max possible key is empty,
471                // we should help to merge such node and try again
472                if let Some(k) = edge_key {
473                    self.merge_recursive(k, guard);
474                }
475            }
476        }
477    }
478
479    /// Update or delete element with passed key using conditional logic.
480    ///
481    /// Function `F` accepts current value of key and based on it, produces new value.
482    /// If `F` returns `Some(V)` then current value will be updated. Otherwise(`None`) key will
483    /// be removed from tree.
484    ///
485    /// Function `F` can be called several times in case of concurrent modification of tree.
486    /// Because of this behaviour, function code should not modify global application state or
487    /// such code should be carefully designed(understanding consequences of repeated function
488    /// calls for same key).
489    ///
490    /// # Examples
491    /// ```
492    /// use bztree::BzTree;
493    ///
494    /// let tree = BzTree::new();
495    /// let guard = crossbeam_epoch::pin();
496    ///
497    /// let key = "key".to_string();
498    /// tree.insert(key.clone(), 2, &guard);
499    ///
500    /// assert!(tree.compute(&key, |(_, v)| Some(v + 1), &guard));
501    /// assert!(matches!(tree.get(&key, &guard), Some(&3)));
502    ///
503    /// assert!(tree.compute(&key, |(_, v)| {
504    ///  if *v == 3 {
505    ///      None
506    ///  } else {
507    ///      Some(v + 1)
508    ///  }
509    /// }, &guard));
510    /// assert!(matches!(tree.get(&key, &guard), None));
511    /// ```
512    pub fn compute<'g, Q, F>(&'g self, key: &Q, mut new_val: F, guard: &'g Guard) -> bool
513    where
514        K: Borrow<Q>,
515        Q: Ord + Clone,
516        F: FnMut((&K, &V)) -> Option<V>,
517    {
518        let search_key = Key::new(key.clone());
519        loop {
520            let node = self.find_leaf_for_key(&search_key, false, guard);
521            if node.is_none() {
522                return false;
523            }
524
525            let node = node.unwrap();
526            let status_word = node.status_word().read(guard);
527            if let Some((found_key, val, kv_index)) = node.conditional_get(key, status_word, guard)
528            {
529                // compute new value for key
530                if let Some(new_val) = new_val((found_key, val)) {
531                    match node.conditional_upsert(found_key.clone(), new_val, status_word, guard) {
532                        Ok(_) => {
533                            return true;
534                        }
535                        Err(InsertError::Split(_)) => {
536                            // try to find path to overflowed node
537                            if let Some(path) = self.find_path_to_key(&search_key, false, guard) {
538                                if let Leaf(found_leaf) = path.node_pointer {
539                                    // if overflowed node is not split/merged by other thread
540                                    if ptr::eq(node, found_leaf.deref()) {
541                                        self.split_leaf(path, guard);
542                                    }
543                                }
544                            }
545                        }
546                        Err(InsertError::DuplicateKey) => {
547                            panic!("Duplicate key error reported on upsert")
548                        }
549                        Err(InsertError::Retry(_)) | Err(InsertError::NodeFrozen(_)) => {}
550                    };
551                } else {
552                    // no new value exists for key, caller want to remove KV
553                    let len = node.estimated_len(guard);
554                    match node.conditional_delete(status_word, kv_index, guard) {
555                        Ok(_) => {
556                            if self.should_merge(len - 1) {
557                                self.merge_recursive(&search_key, guard);
558                            }
559                            return true;
560                        }
561                        Err(DeleteError::Retry) | Err(DeleteError::KeyNotFound) => {}
562                    }
563                }
564            } else {
565                // TODO: add possibility to insert KV if not exists
566                // Read status again and check that node didn't change.
567                // If it not changed, then we ensure that key doesn't find in node.
568                // If it changed, then we should restart and try to find key again.
569                // Other thread could start upsert operation for the same key concurrently while
570                // we are searching the key. This can cause situation when 'search/get' operation
571                // can miss the new value produced by upsert, but still observe removal of previous
572                // value(when upsert of new value completed).
573                // This race can lead to key doesn't exist error, but the tree actually contains
574                // such a key.
575                //
576                // Example:
577                // - node KVs: [(1,1),(2,2),(3,3),(4,4)]
578                // - get operation tries to obtain value for key '2'
579                // - concurrent upsert updates value for the same key '2'.
580                // Internally, node will looks like:
581                // [
582                //  sorted block: [(1,1)],
583                //  unsorted:[(2,2), (3,3), (4,4)]
584                // ]
585                // Let's imagine that get started to scan node(in order to find key '2') and
586                // already reached KV (3,3). At the same time, concurrent upsert reserves new
587                // entry for updated value and mark previous entry as deleted:
588                // [
589                //  sorted block: [(1,1)],
590                //  unsorted:[(2,2: deleted), (3,3), (4,4), (2, NEW_VAL_HERE)]
591                // ]
592                // After that thread which performs node scanning completes analyzing KV (3,3)
593                // and moves to the next KV. Here it sees that KV (2,2) marked as deleted.
594                let cur_status = node.status_word().read(guard);
595                if cur_status == status_word {
596                    return false;
597                }
598                // someone changed the node, we should retry
599            }
600        }
601    }
602
603    #[inline(always)]
604    fn should_merge(&self, node_size: usize) -> bool {
605        node_size <= self.node_size / 3
606    }
607
608    fn merge_recursive<Q>(&self, key: &Key<Q>, guard: &Guard)
609    where
610        K: Borrow<Q>,
611        Q: Ord + Clone,
612    {
613        let mut retries = 0;
614        loop {
615            if let Some(path) = self.find_path_to_key(key, false, guard) {
616                match self.merge_node(path, guard) {
617                    MergeResult::Completed => break,
618                    MergeResult::Retry => {
619                        retries += 1;
620                        if retries >= 3 {
621                            break;
622                        }
623                    }
624                    MergeResult::RecursiveMerge(path) => {
625                        // repeat until we try recursively merge parent nodes(until we find root
626                        // or some parent cannot be merged).
627                        retries = 0;
628                        let mut merge_path = path;
629                        while let MergeResult::RecursiveMerge(path) =
630                            self.merge_node(merge_path, guard)
631                        {
632                            merge_path = path;
633                        }
634                    }
635                }
636            }
637        }
638    }
639
640    fn merge_node<'g>(
641        &'g self,
642        mut path: TraversePath<'g, K, V>,
643        guard: &'g Guard,
644    ) -> MergeResult<'g, K, V> {
645        // obtain direct parent of merging node
646        let parent = path.parents.pop();
647        if parent.is_none() {
648            let result = self.merge_root(path.node_pointer, guard);
649            return result;
650        }
651
652        // freeze underflow node to ensure that no one can modify it
653        if !path.node_pointer.try_froze(guard) {
654            return MergeResult::Retry;
655        }
656
657        match path.node_pointer {
658            Leaf(node) => {
659                // merge is not required anymore, node received new KVs
660                // while we tried to merge it
661                if !self.should_merge(node.estimated_len(guard)) {
662                    path.node_pointer.try_unfroze(guard);
663                    return MergeResult::Completed;
664                }
665            }
666            Interim(_) => {
667                // this is explicit request to merge interim node,
668                // always proceed with merge
669            }
670        };
671
672        // merging of underflow node with sibling will return 'new parent' node.
673        // This 'new parent' doesn't contain underutilized node and should be
674        // installed in grandparent node(by replacing 'current parent' node pointer with pointer
675        // to 'new parent' node).
676        let parent = parent.unwrap();
677        let parent_status = parent.node().status_word().read(guard);
678        if parent_status.is_frozen() {
679            path.node_pointer.try_unfroze(guard);
680            return MergeResult::Retry;
681        }
682
683        // if no siblings in parent node
684        if parent.node().estimated_len(guard) <= 1 {
685            path.node_pointer.try_unfroze(guard);
686            return if path.node_pointer.len(guard) == 0 {
687                // underflow node is empty and parent doesn't contain any other nodes,
688                // we can remove empty parent node from the tree
689                self.remove_empty_parent(
690                    TraversePath {
691                        parents: path.parents,
692                        node_pointer: parent.node_pointer,
693                        cas_pointer: parent.cas_pointer,
694                    },
695                    NodeDrop::new(),
696                    guard,
697                )
698            } else {
699                // underflow node cannot be merged with siblings, try to merge parent with it
700                // sibling on their level. This give a chance later to merge current underflow node
701                // with sibling node from another parent.
702                MergeResult::RecursiveMerge(TraversePath {
703                    cas_pointer: parent.cas_pointer,
704                    node_pointer: parent.node_pointer,
705                    parents: path.parents,
706                })
707            };
708        }
709
710        let mut drop = NodeDrop::new();
711        let mut mwcas = MwCas::new();
712        // merge underflow node with one of its siblings
713        let merge_res = self.merge_with_sibling(
714            path.node_pointer,
715            &parent,
716            parent_status,
717            &mut mwcas,
718            &mut drop,
719            guard,
720        );
721        let new_parent = match merge_res {
722            Ok(new_parent) => new_parent,
723            Err(e) => {
724                path.node_pointer.try_unfroze(guard);
725                return e;
726            }
727        };
728
729        // check that parent node is also underflow and should be merged on it's own level
730        let new_parent_should_merge = self.should_merge(new_parent.estimated_len(guard));
731
732        mwcas.compare_exchange(
733            parent.node().status_word(),
734            parent_status,
735            parent_status.froze(),
736        );
737        mwcas.compare_exchange(
738            parent.cas_pointer,
739            parent.node_pointer,
740            NodePointer::new_interim(new_parent),
741        );
742        // if parent node has grandparent node, then check that grandparent not frozen.
743        // if parent node has no grandparent node, then parent is current root, we can simply CAS
744        // on root pointer.
745        if let Some(grand_parent) = path.parents.last() {
746            let status_word = grand_parent.node().status_word().read(guard);
747            if status_word.is_frozen() {
748                // grandparent merged/split in progress, rollback changes and retry
749                // when parent node will be moved to new grandparent
750                // or this grandparent will be unfrozen
751                path.node_pointer.try_unfroze(guard);
752                return MergeResult::Retry;
753            }
754            // check that grandparent is not frozen and increase it version
755            // (because we make in-place update of link to parent node)
756            mwcas.compare_exchange(
757                grand_parent.node().status_word(),
758                status_word,
759                status_word.inc_version(),
760            );
761        }
762
763        // merge prepared, try to install new nodes instead of underutilized
764        if mwcas.exec(guard) {
765            // we successfully merge node, now check is it parent become underutilized.
766            // we try to merge parent only after original node merged, this will provide
767            // bottom-up merge until we reach root.
768            drop.add(path.node_pointer.clone());
769            drop.add(parent.node_pointer.clone());
770            drop.exec(guard);
771            if new_parent_should_merge {
772                let new_parent_ptr = parent.cas_pointer.read(guard);
773                return MergeResult::RecursiveMerge(TraversePath {
774                    cas_pointer: parent.cas_pointer,
775                    node_pointer: new_parent_ptr,
776                    parents: path.parents,
777                });
778            }
779            MergeResult::Completed
780        } else {
781            path.node_pointer.try_unfroze(guard);
782            MergeResult::Retry
783        }
784    }
785
786    /// Method will remove parent node from grandparent.
787    /// Method suppose that underutilized parent contains only 1 link to child node and this
788    /// child is empty.
789    /// We create copy of grandparent node without link to such underutilized parent and install
790    /// new grandparent inside tree.
791    fn remove_empty_parent(
792        &self,
793        mut empty_node_path: TraversePath<K, V>,
794        mut drop: NodeDrop<K, V>,
795        guard: &Guard,
796    ) -> MergeResult<K, V> {
797        let mut mwcas = MwCas::new();
798        let node_status = empty_node_path.node_pointer.status_word().read(guard);
799        if node_status.is_frozen() {
800            return MergeResult::Retry;
801        }
802
803        drop.add(empty_node_path.node_pointer.clone());
804        mwcas.compare_exchange(
805            empty_node_path.node_pointer.status_word(),
806            node_status,
807            node_status.froze(),
808        );
809
810        if let Some(parent_handle) = empty_node_path.parents.pop() {
811            let parent_node = parent_handle.node_pointer.to_interim_node();
812            let parent_status = parent_node.status_word().read(guard);
813            if parent_status.is_frozen() {
814                return MergeResult::Retry;
815            }
816            // Parent node contains 1 node which should be removed because it empty.
817            // We should recursively process this parent node because logically it empty. Then we
818            // found some grandparent node which is not empty, we remove all empty nodes from it.
819            // This will release all empty nodes found below it.
820            if parent_node.estimated_len(guard) == 1 {
821                return self.remove_empty_parent(
822                    TraversePath {
823                        parents: empty_node_path.parents,
824                        cas_pointer: parent_handle.cas_pointer,
825                        node_pointer: parent_handle.node_pointer,
826                    },
827                    drop,
828                    guard,
829                );
830            }
831
832            drop.add(parent_handle.node_pointer.clone());
833            mwcas.compare_exchange(
834                parent_node.status_word(),
835                parent_status,
836                parent_status.froze(),
837            );
838
839            // if parent node has grandparent node, then check that grandparent not frozen.
840            if let Some(grand_parent) = empty_node_path.parents.last() {
841                let status_word = grand_parent.node().status_word().read(guard);
842                if status_word.is_frozen() {
843                    // grandparent merged/split in progress, rollback changes and retry
844                    // when parent node will be moved to new grandparent
845                    // or this grandparent will be unfrozen
846                    return MergeResult::Retry;
847                }
848
849                mwcas.compare_exchange(
850                    grand_parent.node().status_word(),
851                    status_word,
852                    status_word.inc_version(),
853                );
854            }
855
856            // create copy of parent node without link to empty child
857            let new_node = InterimNode::new_readonly(
858                parent_node.clone_with_filter(|(k, _)| *k != &parent_handle.child_key, guard),
859            );
860
861            mwcas.compare_exchange(
862                parent_handle.cas_pointer,
863                parent_handle.node_pointer,
864                NodePointer::new_interim(new_node),
865            );
866        } else {
867            // Parent node is root and it contains only 1 link to empty child node, replace root
868            // node by empty leaf node. Parent node already frozen at this moment, simply replace
869            // old parent by new root node.
870            mwcas.compare_exchange(
871                empty_node_path.cas_pointer,
872                empty_node_path.node_pointer,
873                NodePointer::new_leaf(LeafNode::with_capacity(self.node_size as u16)),
874            );
875        }
876
877        if mwcas.exec(guard) {
878            drop.exec(guard);
879            MergeResult::Completed
880        } else {
881            MergeResult::Retry
882        }
883    }
884
885    fn merge_root(&self, root_ptr: &NodePointer<K, V>, guard: &Guard) -> MergeResult<K, V> {
886        let mut cur_root = root_ptr;
887        loop {
888            if let Interim(root) = cur_root {
889                let root_status = root.status_word().read(guard);
890                if root_status.is_frozen() || root.estimated_len(guard) != 1 {
891                    break;
892                }
893                // if root contains only 1 node, move this node up to root level
894                let (_, child_ptr) = root.iter(guard).next().unwrap();
895                let child_node_pointer = child_ptr.read(guard);
896                let child_status = child_node_pointer.status_word().read(guard);
897                if child_status.is_frozen() {
898                    break;
899                }
900                let mut mwcas = MwCas::new();
901                mwcas.compare_exchange(root.status_word(), root_status, root_status.froze());
902                mwcas.compare_exchange(
903                    child_node_pointer.status_word(),
904                    child_status,
905                    child_status.clone(),
906                );
907                mwcas.compare_exchange(&self.root, cur_root, child_node_pointer.clone());
908                if mwcas.exec(guard) {
909                    let mut drop = NodeDrop::new();
910                    drop.add(cur_root.clone());
911                    drop.exec(guard);
912                    cur_root = self.root.read(guard);
913                    continue;
914                }
915            }
916            break;
917        }
918        MergeResult::Completed
919    }
920
921    /// Merge node with one of it's sibling if possible.
922    fn merge_with_sibling<'g>(
923        &self,
924        node: &'g NodePointer<K, V>,
925        parent: &'g Parent<'g, K, V>,
926        parent_status: &StatusWord,
927        mwcas: &mut MwCas<'g>,
928        drop: &mut NodeDrop<K, V>,
929        guard: &'g Guard,
930    ) -> Result<InterimNode<K, V>, MergeResult<K, V>> {
931        let mut siblings: Vec<(&Key<K>, &NodePointer<K, V>)> = Vec::with_capacity(2);
932        let node_key = &parent.child_key;
933        let sibling_array = parent.node().get_siblings(node_key, guard);
934        if parent.node().status_word().read(guard) != parent_status {
935            // parent changed, found siblings can be invalid, retry
936            return Err(MergeResult::Retry);
937        }
938
939        for (key, sibling) in sibling_array.iter().flatten() {
940            siblings.push((key, sibling.read(guard)));
941        }
942
943        let mut merged_sibling_key: Option<&Key<K>> = None;
944        let mut merged: Option<(&Key<K>, NodePointer<K, V>)> = None;
945        for (sibling_key, sibling) in &siblings {
946            let sibling_status = sibling.status_word().read(guard);
947            if sibling_status.is_frozen() {
948                // sibling is frozen by merge in other thread, retry merge again because parent
949                // already changed and current merge will fail.
950                return Err(MergeResult::Retry);
951            }
952
953            // try merge node with sibling
954            match node {
955                Leaf(node) => match sibling {
956                    Leaf(other) => match node.merge_with_leaf(other, self.node_size, guard) {
957                        MergeMode::NewNode(merged_node) => {
958                            let node_key = std::cmp::max(&parent.child_key, sibling_key);
959                            let node_ptr = NodePointer::new_leaf(merged_node);
960                            merged = Some((node_key, node_ptr));
961                            merged_sibling_key = Some(*sibling_key);
962                            mwcas.compare_exchange(
963                                sibling.status_word(),
964                                sibling_status,
965                                sibling_status.froze(),
966                            );
967                            drop.add((*sibling).clone());
968                            break;
969                        }
970                        MergeMode::MergeFailed => {}
971                    },
972                    Interim(_) => {
973                        panic!("Can't merge leaf with interim node")
974                    }
975                },
976                Interim(node) => match sibling {
977                    Interim(other) => {
978                        match node.merge_with_interim(
979                            other,
980                            node.estimated_len(guard) + other.estimated_len(guard),
981                            guard,
982                        ) {
983                            MergeMode::NewNode(merged_node) => {
984                                let node_key = std::cmp::max(&parent.child_key, sibling_key);
985                                let node_ptr = NodePointer::new_interim(merged_node);
986                                merged = Some((node_key, node_ptr));
987                                merged_sibling_key = Some(*sibling_key);
988                                mwcas.compare_exchange(
989                                    sibling.status_word(),
990                                    sibling_status,
991                                    sibling_status.froze(),
992                                );
993                                drop.add((*sibling).clone());
994                                break;
995                            }
996                            MergeMode::MergeFailed => {}
997                        }
998                    }
999                    Leaf(_) => panic!("Can't merge interim node with leaf node"),
1000                },
1001            }
1002        }
1003
1004        if merged.is_none() {
1005            // no siblings have enough space to be merged
1006            return Err(MergeResult::Completed);
1007        }
1008
1009        let (new_node_key, new_node_ptr) = merged.unwrap();
1010        let merged_sibling_key = merged_sibling_key.unwrap();
1011        // create new parent node with merged node and without empty/merged siblings.
1012        let mut buffer = Vec::with_capacity(parent.node().estimated_len(guard) - 1);
1013        let merged_node_key = &parent.child_key;
1014        for (key, val) in parent.node().clone_content(guard) {
1015            if &key == merged_node_key {
1016                // replace underutilized node by merged one
1017                buffer.push((
1018                    (*new_node_key).clone(),
1019                    HeapPointer::new(new_node_ptr.clone()),
1020                ));
1021            } else if &key != merged_sibling_key {
1022                // remove merged sibling from parent node
1023                buffer.push((key, val));
1024            }
1025        }
1026        Ok(InterimNode::new_readonly(buffer))
1027    }
1028
1029    fn split_leaf(&self, path_to_leaf: TraversePath<K, V>, guard: &Guard) {
1030        let mut path = path_to_leaf;
1031        // try_split can return parent which should also be split
1032        // because it will overflow when we split passed leaf node
1033        while let Some(split_node) = self.try_split(path, guard) {
1034            path = split_node;
1035        }
1036    }
1037
1038    /// Try split passed node. If node requires split of it's parents, method return new split
1039    /// context with parent node as split node.
1040    fn try_split<'g>(
1041        &self,
1042        mut path: TraversePath<'g, K, V>,
1043        guard: &'g Guard,
1044    ) -> Option<TraversePath<'g, K, V>> {
1045        let parent = path.parents.pop();
1046        if parent.is_none() {
1047            self.split_root(path.node_pointer, guard);
1048            return None;
1049        }
1050
1051        if !path.node_pointer.try_froze(guard) {
1052            // someone already try to split/merge node
1053            return None;
1054        }
1055
1056        // freeze node to ensure that no one can modify it during split
1057        let parent = parent.unwrap();
1058        let parent_status = parent.node().status_word().read(guard);
1059        if parent_status.is_frozen() {
1060            path.node_pointer.try_unfroze(guard);
1061            return None;
1062        }
1063
1064        // node split results in new parent with replaced overflowed node by two new nodes
1065        match self.try_split_node(path.node_pointer, &parent.child_key, parent.node(), guard) {
1066            SplitResult::Split(new_parent) => {
1067                // node was split: create new parent which links to 2 new children which replace
1068                // original overflow node
1069                let mut mwcas = MwCas::new();
1070                mwcas.compare_exchange(
1071                    parent.node().status_word(),
1072                    parent_status,
1073                    parent_status.froze(),
1074                );
1075                if let Some(grand_parent) = path.parents.last() {
1076                    let status = grand_parent.node().status_word().read(guard);
1077                    if !status.is_frozen() {
1078                        // check that grandparent is not frozen and increase it version
1079                        // (because we make in-place update of link to parent node)
1080                        mwcas.compare_exchange(
1081                            grand_parent.node().status_word(),
1082                            status,
1083                            status.inc_version(),
1084                        );
1085                    } else {
1086                        // retry split again
1087                        path.node_pointer.try_unfroze(guard);
1088                        return None;
1089                    }
1090                }
1091
1092                // update link in grandparent node to new parent node
1093                // or parent is a root node which should be replaced by new one
1094                mwcas.compare_exchange(parent.cas_pointer, parent.node_pointer, new_parent);
1095                if !mwcas.exec(guard) {
1096                    path.node_pointer.try_unfroze(guard);
1097                } else {
1098                    let mut drop = NodeDrop::new();
1099                    drop.add(path.node_pointer.clone());
1100                    drop.add(parent.node_pointer.clone());
1101                    drop.exec(guard);
1102                }
1103                None
1104            }
1105            SplitResult::Compacted(compacted_node) => {
1106                // node overflow caused by too many updates => replace overflow node
1107                // with new compacted node which can accept more elements
1108                let mut mwcas = MwCas::new();
1109                mwcas.compare_exchange(
1110                    parent.node().status_word(),
1111                    parent_status,
1112                    parent_status.inc_version(),
1113                );
1114                mwcas.compare_exchange(path.cas_pointer, path.node_pointer, compacted_node);
1115                if !mwcas.exec(guard) {
1116                    path.node_pointer.try_unfroze(guard);
1117                } else {
1118                    let mut drop = NodeDrop::new();
1119                    drop.add(path.node_pointer.clone());
1120                    drop.exec(guard);
1121                }
1122                None
1123            }
1124            SplitResult::ParentOverflow => {
1125                // parent node is full and should be split before we can insert new child
1126                path.node_pointer.try_unfroze(guard);
1127                Some(TraversePath {
1128                    cas_pointer: parent.cas_pointer,
1129                    node_pointer: parent.node_pointer,
1130                    parents: path.parents,
1131                })
1132            }
1133        }
1134    }
1135
1136    fn split_root(&self, root: &NodePointer<K, V>, guard: &Guard) {
1137        if !root.try_froze(guard) {
1138            return;
1139        }
1140
1141        let new_root = match root {
1142            Leaf(cur_root) => {
1143                match cur_root.split_leaf(guard) {
1144                    SplitMode::Split(left, right) => {
1145                        // greatest key of left node moved to parent as split point
1146                        let left_key = left
1147                            .last_kv(guard)
1148                            .expect("Left node must have at least 1 element after split")
1149                            .0
1150                            .clone();
1151                        debug_assert!(
1152                            right.exact_len(guard) > 0,
1153                            "Right node must have at least 1 element after split"
1154                        );
1155                        // keys between (..left_key] in left
1156                        // keys between (left_key..+Inf] in right
1157                        let sorted_elements = vec![
1158                            (
1159                                Key::new(left_key),
1160                                HeapPointer::new(NodePointer::new_leaf(left)),
1161                            ),
1162                            (
1163                                Key::pos_infinite(),
1164                                HeapPointer::new(NodePointer::new_leaf(right)),
1165                            ),
1166                        ];
1167                        NodePointer::new_interim(InterimNode::new_readonly(sorted_elements))
1168                    }
1169                    SplitMode::Compact(compacted_root) => NodePointer::new_leaf(compacted_root),
1170                }
1171            }
1172            Interim(cur_root) => {
1173                match cur_root.split_interim(guard) {
1174                    SplitMode::Split(left, right) => {
1175                        let left_key = left
1176                            .last_kv(guard)
1177                            .expect("Left node must have at least 1 element after split")
1178                            .0
1179                            .clone();
1180                        let right_key = right
1181                            .last_kv(guard)
1182                            .expect("Right node must have at least 1 element after split")
1183                            .0
1184                            .clone();
1185                        // keys between (..left_key] in left
1186                        // keys between (left_key..right_key] in right
1187                        let sorted_elements = vec![
1188                            (left_key, HeapPointer::new(NodePointer::new_interim(left))),
1189                            (right_key, HeapPointer::new(NodePointer::new_interim(right))),
1190                        ];
1191                        NodePointer::new_interim(InterimNode::new_readonly(sorted_elements))
1192                    }
1193                    SplitMode::Compact(compacted_root) => NodePointer::new_interim(compacted_root),
1194                }
1195            }
1196        };
1197
1198        let mut mwcas = MwCas::new();
1199        mwcas.compare_exchange(&self.root, root, new_root);
1200        if mwcas.exec(guard) {
1201            let mut drop = NodeDrop::new();
1202            drop.add(root.clone());
1203            drop.exec(guard);
1204        } else {
1205            debug_assert!(false);
1206        }
1207    }
1208
1209    fn try_split_node(
1210        &self,
1211        node: &NodePointer<K, V>,
1212        overflow_node_key: &Key<K>,
1213        parent_node: &InterimNode<K, V>,
1214        guard: &Guard,
1215    ) -> SplitResult<K, V> {
1216        /// Create new parent node with 2 nodes created by split of other node.
1217        #[inline(always)]
1218        fn new_parent<K, V>(
1219            parent_node: &InterimNode<K, V>,
1220            overflow_node_key: &Key<K>,
1221            left_key: Key<K>,
1222            left_child: NodePointer<K, V>,
1223            right_child: NodePointer<K, V>,
1224            max_node_size: usize,
1225            guard: &Guard,
1226        ) -> Option<InterimNode<K, V>>
1227        where
1228            K: Clone + Ord,
1229        {
1230            let node_len = parent_node.estimated_len(guard);
1231            if node_len == max_node_size {
1232                // parent node overflow, should be split
1233                return None;
1234            }
1235
1236            let mut sorted_elems = Vec::with_capacity(node_len + 1);
1237            for (key, val) in parent_node.clone_content(guard) {
1238                // overflowed node found inside parent, replace it by 2 new nodes
1239                if overflow_node_key == &key {
1240                    sorted_elems.push((left_key.clone(), HeapPointer::new(left_child.clone())));
1241                    sorted_elems.push((
1242                        overflow_node_key.clone(),
1243                        HeapPointer::new(right_child.clone()),
1244                    ));
1245                } else {
1246                    sorted_elems.push((key, val));
1247                }
1248            }
1249
1250            Some(InterimNode::new_readonly(sorted_elems))
1251        }
1252
1253        match node {
1254            Leaf(leaf) => match leaf.split_leaf(guard) {
1255                SplitMode::Split(left, right) => {
1256                    let left_key = left
1257                        .last_kv(guard)
1258                        .expect("Left node must have at least 1 element after split")
1259                        .0
1260                        .clone();
1261                    if let Some(new_parent) = new_parent(
1262                        parent_node,
1263                        overflow_node_key,
1264                        Key::new(left_key),
1265                        NodePointer::new_leaf(left),
1266                        NodePointer::new_leaf(right),
1267                        self.node_size,
1268                        guard,
1269                    ) {
1270                        SplitResult::Split(NodePointer::new_interim(new_parent))
1271                    } else {
1272                        SplitResult::ParentOverflow
1273                    }
1274                }
1275                SplitMode::Compact(compacted_node) => {
1276                    SplitResult::Compacted(NodePointer::new_leaf(compacted_node))
1277                }
1278            },
1279            Interim(interim) => match interim.split_interim(guard) {
1280                SplitMode::Split(left, right) => {
1281                    let left_key = left
1282                        .last_kv(guard)
1283                        .expect("Left node must have at least 1 element after split")
1284                        .0
1285                        .clone();
1286                    if let Some(new_parent) = new_parent(
1287                        parent_node,
1288                        overflow_node_key,
1289                        left_key,
1290                        NodePointer::new_interim(left),
1291                        NodePointer::new_interim(right),
1292                        self.node_size,
1293                        guard,
1294                    ) {
1295                        SplitResult::Split(NodePointer::new_interim(new_parent))
1296                    } else {
1297                        SplitResult::ParentOverflow
1298                    }
1299                }
1300                SplitMode::Compact(compacted_node) => {
1301                    SplitResult::Compacted(NodePointer::new_interim(compacted_node))
1302                }
1303            },
1304        }
1305    }
1306
1307    /// Find leaf node which can contain passed key
1308    fn find_leaf_for_key<'g, Q>(
1309        &'g self,
1310        search_key: &Key<Q>,
1311        create_node_for_key: bool,
1312        guard: &'g Guard,
1313    ) -> Option<&'g LeafNode<K, V>>
1314    where
1315        K: Borrow<Q>,
1316        Q: Ord + Clone,
1317    {
1318        let mut next_node: &NodeLink<K, V> = &self.root;
1319        loop {
1320            next_node = match next_node.read(guard) {
1321                Interim(node) => {
1322                    if let Some((_, child_node_ptr)) = node.closest(search_key, guard) {
1323                        child_node_ptr
1324                    } else if create_node_for_key {
1325                        // slow path
1326                        return self
1327                            .find_path_to_key(search_key, create_node_for_key, guard)
1328                            .map(|path| path.node_pointer.to_leaf_node());
1329                    } else {
1330                        return None;
1331                    }
1332                }
1333                Leaf(node) => {
1334                    return Some(node);
1335                }
1336            }
1337        }
1338    }
1339
1340    /// Find leaf node(with traversal path from root) which can contain passed key.
1341    /// If no node exists which can contain passed key, it can be created using
1342    /// `create_pos_inf_node_if_not_exists` flag.
1343    fn find_path_to_key<'g, Q>(
1344        &'g self,
1345        search_key: &Key<Q>,
1346        create_node_for_key: bool,
1347        guard: &'g Guard,
1348    ) -> Option<TraversePath<'g, K, V>>
1349    where
1350        K: Borrow<Q>,
1351        Q: Ord,
1352    {
1353        let mut parents = Vec::new();
1354        let mut next_node: &NodeLink<K, V> = &self.root;
1355        loop {
1356            next_node = match next_node.read(guard) {
1357                node_pointer @ Interim(node) => {
1358                    if let Some((child_node_key, child_node_ptr)) = node.closest(search_key, guard)
1359                    {
1360                        // found node which can contain the key
1361                        parents.push(Parent {
1362                            cas_pointer: next_node,
1363                            node_pointer,
1364                            child_key: child_node_key.clone(),
1365                        });
1366                        child_node_ptr
1367                    } else if create_node_for_key {
1368                        // No place found for the key, we need to grow the tree.
1369                        // Greatest element of interim node will serve as +Inf node to
1370                        // accommodate new KV.
1371                        self.insert_pos_inf_node(
1372                            TraversePath {
1373                                cas_pointer: next_node,
1374                                node_pointer,
1375                                parents,
1376                            },
1377                            guard,
1378                        );
1379                        // tree is changed, restart from root
1380                        parents = Vec::new();
1381                        &self.root
1382                    } else {
1383                        return None;
1384                    }
1385                }
1386                leaf_pointer @ Leaf(_) => {
1387                    return Some(TraversePath {
1388                        cas_pointer: next_node,
1389                        node_pointer: leaf_pointer,
1390                        parents,
1391                    });
1392                }
1393            }
1394        }
1395    }
1396
1397    /// When interim node doesn't contains +Inf node and client tries to insert key which is
1398    /// greater than any other key in the tree, we should update interim's node which
1399    /// contains link to leaf with greatest element. This update will replace key of greatest
1400    /// element in interim node by +Inf key. This updated interim can now accept new keys which
1401    /// are greater than other keys of the tree.
1402    fn insert_pos_inf_node<'g>(&'g self, mut path: TraversePath<K, V>, guard: &'g Guard) -> bool {
1403        let parent = Parent {
1404            child_key: Key::pos_infinite(), // key is ignored in this context
1405            node_pointer: path.node_pointer,
1406            cas_pointer: path.cas_pointer,
1407        };
1408        path.parents.push(parent);
1409
1410        let mut drop = NodeDrop::new();
1411        // update key of greatest node to +Inf on all levels of tree
1412        let mut mwcas = MwCas::new();
1413        for interim in path.parents {
1414            let node = interim.node_pointer.to_interim_node();
1415            let status_word = node.status_word().read(guard);
1416            if status_word.is_frozen() {
1417                return false;
1418            }
1419            mwcas.compare_exchange(
1420                interim.node_pointer.status_word(),
1421                status_word,
1422                status_word.inc_version(),
1423            );
1424
1425            let mut cloned_kvs = node.clone_content(guard);
1426            cloned_kvs.last_mut().unwrap().0 = Key::pos_infinite();
1427            let new_interim = InterimNode::new_readonly(cloned_kvs);
1428            // update link in parent node to new interim node
1429            drop.add(interim.node_pointer.clone());
1430            mwcas.compare_exchange(
1431                interim.cas_pointer,
1432                interim.node_pointer,
1433                NodePointer::new_interim(new_interim),
1434            );
1435        }
1436
1437        if mwcas.exec(guard) {
1438            drop.exec(guard);
1439            true
1440        } else {
1441            false
1442        }
1443    }
1444
1445    /// Return leaf node which contains max known key of the tree. Also return key in parent node
1446    /// which points to this leaf(or `None` if this is root node).
1447    fn find_edge_node<'g, Q>(
1448        &'g self,
1449        node_edge: NodeEdge,
1450        guard: &'g Guard,
1451    ) -> (&'g LeafNode<K, V>, Option<&'g Key<K>>)
1452    where
1453        K: Borrow<Q>,
1454        Q: Ord,
1455    {
1456        let mut next_node: (&NodeLink<K, V>, Option<&Key<K>>) = (&self.root, None);
1457        loop {
1458            next_node = match next_node.0.read(guard) {
1459                Interim(node) => {
1460                    let (key, child_node_ptr) = node
1461                        .edge_kv(node_edge, guard)
1462                        .map(|e| (e.key, e.value))
1463                        .expect("Tree never contains empty interim nodes");
1464
1465                    (child_node_ptr, Some(key))
1466                }
1467                Leaf(node) => {
1468                    return (node, next_node.1);
1469                }
1470            }
1471        }
1472    }
1473}
1474
1475impl<K, V> Default for BzTree<K, V>
1476where
1477    K: Clone + Ord,
1478    V: Clone + Send + Sync,
1479{
1480    fn default() -> Self {
1481        BzTree::new()
1482    }
1483}
1484
1485enum MergeResult<'g, K: Ord, V> {
1486    Completed,
1487    RecursiveMerge(TraversePath<'g, K, V>),
1488    Retry,
1489}
1490
1491enum SplitResult<K: Ord, V> {
1492    Split(NodePointer<K, V>),
1493    Compacted(NodePointer<K, V>),
1494    ParentOverflow,
1495}
1496
1497struct TraversePath<'g, K: Ord, V> {
1498    /// Node pointer of this node inside parent(used by MwCAS).
1499    cas_pointer: &'g NodeLink<K, V>,
1500    /// Pointer to found node inside tree(read from CAS pointer during traversal)
1501    node_pointer: &'g NodePointer<K, V>,
1502    /// Chain of parents including root node(starts from root, vector end is most closest parent)
1503    parents: Vec<Parent<'g, K, V>>,
1504}
1505
1506#[repr(transparent)]
1507struct LeafNodeRef<K: Ord, V> {
1508    node: *mut LeafNode<K, V>,
1509}
1510
1511impl<K: Ord, V> LeafNodeRef<K, V> {
1512    fn new(leaf: LeafNode<K, V>) -> Self {
1513        LeafNodeRef {
1514            node: Box::into_raw(Box::new(leaf)),
1515        }
1516    }
1517
1518    fn drop_manual(self, guard: &Guard) {
1519        unsafe {
1520            let node = Box::from_raw(self.node);
1521            guard.defer_unchecked(move || {
1522                drop(node);
1523            });
1524        }
1525    }
1526}
1527
1528impl<K: Ord, V> Deref for LeafNodeRef<K, V> {
1529    type Target = LeafNode<K, V>;
1530
1531    fn deref(&self) -> &Self::Target {
1532        unsafe { &*self.node }
1533    }
1534}
1535
1536impl<K: Ord, V> DerefMut for LeafNodeRef<K, V> {
1537    fn deref_mut(&mut self) -> &mut Self::Target {
1538        unsafe { &mut *self.node }
1539    }
1540}
1541
1542impl<K: Ord, V> Clone for LeafNodeRef<K, V> {
1543    fn clone(&self) -> Self {
1544        LeafNodeRef { node: self.node }
1545    }
1546}
1547
1548unsafe impl<K: Ord, V> Send for LeafNodeRef<K, V> {}
1549unsafe impl<K: Ord, V> Sync for LeafNodeRef<K, V> {}
1550
1551#[repr(transparent)]
1552struct InterimNodeRef<K: Ord, V> {
1553    node: *mut InterimNode<K, V>,
1554}
1555
1556impl<K: Ord, V> InterimNodeRef<K, V> {
1557    fn new(interim: InterimNode<K, V>) -> Self {
1558        InterimNodeRef {
1559            node: Box::into_raw(Box::new(interim)),
1560        }
1561    }
1562
1563    fn drop_manual(self, guard: &Guard) {
1564        unsafe {
1565            let node = Box::from_raw(self.node);
1566            guard.defer_unchecked(move || {
1567                drop(node);
1568            });
1569        }
1570    }
1571}
1572
1573impl<K: Ord, V> Deref for InterimNodeRef<K, V> {
1574    type Target = InterimNode<K, V>;
1575
1576    fn deref(&self) -> &Self::Target {
1577        unsafe { &*self.node }
1578    }
1579}
1580
1581impl<K: Ord, V> DerefMut for InterimNodeRef<K, V> {
1582    fn deref_mut(&mut self) -> &mut Self::Target {
1583        unsafe { &mut *self.node }
1584    }
1585}
1586
1587impl<K: Ord, V> Clone for InterimNodeRef<K, V> {
1588    fn clone(&self) -> Self {
1589        InterimNodeRef { node: self.node }
1590    }
1591}
1592
1593unsafe impl<K: Ord, V> Send for InterimNodeRef<K, V> {}
1594unsafe impl<K: Ord, V> Sync for InterimNodeRef<K, V> {}
1595
1596struct NodeDrop<K: Ord, V> {
1597    nodes: Vec<NodePointer<K, V>>,
1598}
1599
1600impl<K: Ord, V> NodeDrop<K, V> {
1601    fn new() -> Self {
1602        NodeDrop {
1603            nodes: Vec::with_capacity(2),
1604        }
1605    }
1606
1607    fn add(&mut self, node: NodePointer<K, V>) {
1608        self.nodes.push(node);
1609    }
1610
1611    fn exec(self, guard: &Guard) {
1612        for node in self.nodes {
1613            match node {
1614                NodePointer::Interim(node) => node.drop_manual(guard),
1615                NodePointer::Leaf(node) => node.drop_manual(guard),
1616            }
1617        }
1618    }
1619}
1620
1621enum NodePointer<K: Ord, V> {
1622    Leaf(LeafNodeRef<K, V>),
1623    Interim(InterimNodeRef<K, V>),
1624}
1625
1626impl<K: Ord, V> Clone for NodePointer<K, V> {
1627    fn clone(&self) -> Self {
1628        match self {
1629            Leaf(node) => Leaf(node.clone()),
1630            Interim(node) => Interim(node.clone()),
1631        }
1632    }
1633}
1634
1635unsafe impl<K: Ord, V> Send for NodePointer<K, V> {}
1636
1637unsafe impl<K: Ord, V> Sync for NodePointer<K, V> {}
1638
1639impl<K: Ord, V> From<InterimNodeRef<K, V>> for NodePointer<K, V> {
1640    fn from(node: InterimNodeRef<K, V>) -> Self {
1641        Interim(node)
1642    }
1643}
1644
1645impl<K: Ord, V> NodePointer<K, V> {
1646    #[inline]
1647    fn new_leaf(node: LeafNode<K, V>) -> NodePointer<K, V> {
1648        let leaf_node = LeafNodeRef::new(node);
1649        Leaf(leaf_node)
1650    }
1651
1652    #[inline]
1653    fn new_interim(node: InterimNode<K, V>) -> NodePointer<K, V> {
1654        let interim_node = InterimNodeRef::new(node);
1655        Interim(interim_node)
1656    }
1657
1658    #[inline]
1659    fn points_to_leaf(&self, node_ptr: &LeafNode<K, V>) -> bool {
1660        match self {
1661            Interim(_) => false,
1662            Leaf(node) => ptr::eq(node.node, node_ptr),
1663        }
1664    }
1665
1666    #[inline]
1667    fn to_interim_node(&self) -> &InterimNode<K, V> {
1668        match self {
1669            Interim(node) => node,
1670            Leaf(_) => panic!("Pointer points to leaf node"),
1671        }
1672    }
1673
1674    #[inline]
1675    fn to_leaf_node(&self) -> &LeafNode<K, V> {
1676        match self {
1677            Interim(_) => panic!("Pointer points to interim node"),
1678            Leaf(node) => node,
1679        }
1680    }
1681
1682    #[inline]
1683    fn len(&self, guard: &Guard) -> usize
1684    where
1685        K: Ord,
1686    {
1687        match self {
1688            Leaf(node) => node.exact_len(guard),
1689            Interim(node) => node.estimated_len(guard),
1690        }
1691    }
1692
1693    #[inline]
1694    fn try_froze(&self, guard: &Guard) -> bool {
1695        match self {
1696            Leaf(node) => node.try_froze(guard),
1697            Interim(node) => node.try_froze(guard),
1698        }
1699    }
1700
1701    #[inline]
1702    fn try_unfroze(&self, guard: &Guard) -> bool {
1703        match self {
1704            Leaf(node) => node.try_unfroze(guard),
1705            Interim(node) => node.try_unfroze(guard),
1706        }
1707    }
1708
1709    #[inline]
1710    fn status_word(&self) -> &HeapPointer<StatusWord> {
1711        match self {
1712            Leaf(node) => node.status_word(),
1713            Interim(node) => node.status_word(),
1714        }
1715    }
1716}
1717
1718/// Special wrapper for tree keys which can hold special 'empty key'.
1719/// Empty key means 'positive infinity' key, which always great than any other key.
1720/// Such key used as guard element in node and get access to keys greater than
1721/// any other keys in node.
1722#[derive(Clone, Debug)]
1723#[repr(transparent)]
1724struct Key<K> {
1725    key: Option<K>,
1726}
1727
1728impl<K> Key<K> {
1729    #[inline(always)]
1730    fn new(key: K) -> Self {
1731        Key { key: Some(key) }
1732    }
1733
1734    #[inline(always)]
1735    fn pos_infinite() -> Self {
1736        Key { key: None }
1737    }
1738}
1739
1740impl<K: Ord> Ord for Key<K> {
1741    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1742        match &self.key {
1743            Some(key) => match &other.key {
1744                Some(other_key) => key.cmp(other_key),
1745                None => std::cmp::Ordering::Less,
1746            },
1747            None => match other.key {
1748                Some(_) => std::cmp::Ordering::Greater,
1749                None => std::cmp::Ordering::Equal,
1750            },
1751        }
1752    }
1753}
1754
1755/// Impl used by [`Node`] to find siblings or closest node to some other node.
1756impl<K: Borrow<Q>, Q: Ord> PartialOrd<Key<Q>> for Key<K> {
1757    fn partial_cmp(&self, other: &Key<Q>) -> Option<std::cmp::Ordering> {
1758        match &self.key {
1759            Some(key) => match &other.key {
1760                Some(other_key) => Some(key.borrow().cmp(other_key)),
1761                None => Some(std::cmp::Ordering::Less),
1762            },
1763            None => match other.key {
1764                Some(_) => Some(std::cmp::Ordering::Greater),
1765                None => Some(std::cmp::Ordering::Equal),
1766            },
1767        }
1768    }
1769}
1770
1771impl<K: Eq> Eq for Key<K> {}
1772
1773impl<K: Borrow<Q>, Q: Eq> PartialEq<Key<Q>> for Key<K> {
1774    fn eq(&self, other: &Key<Q>) -> bool {
1775        match &self.key {
1776            Some(key) => matches!(&other.key, Some(other_key) if key.borrow() == other_key),
1777            None => other.key.is_none(),
1778        }
1779    }
1780}
1781
1782struct Parent<'a, K: Ord, V> {
1783    /// Node pointer of this parent inside grandparent(used by MwCAS).
1784    cas_pointer: &'a NodeLink<K, V>,
1785    /// Parent node pointer inside grandparent at moment of tree traversal(actual value read from
1786    /// cas_pointer at some moment in time).
1787    node_pointer: &'a NodePointer<K, V>,
1788    /// Key in parent node which points to leaf/non-leaf child.
1789    child_key: Key<K>,
1790}
1791
1792impl<'a, K: Ord, V> Parent<'a, K, V> {
1793    fn node(&self) -> &InterimNode<K, V> {
1794        self.node_pointer.to_interim_node()
1795    }
1796}
1797
1798#[cfg(test)]
1799mod tests {
1800    use rand::{thread_rng, Rng};
1801    use std::borrow::Borrow;
1802    use std::fmt::{Debug, Display, Formatter};
1803
1804    use crate::BzTree;
1805
1806    #[test]
1807    fn insert_min_sized_node() {
1808        let tree = BzTree::with_node_size(2);
1809        tree.insert(Key::new("1"), "1", &crossbeam_epoch::pin());
1810        tree.insert(Key::new("2"), "2", &crossbeam_epoch::pin());
1811        tree.insert(Key::new("3"), "3", &crossbeam_epoch::pin());
1812
1813        let guard = crossbeam_epoch::pin();
1814        assert!(matches!(tree.get(&"1", &guard), Some(&"1")));
1815        assert!(matches!(tree.get(&"2", &guard), Some(&"2")));
1816        assert!(matches!(tree.get(&"3", &guard), Some(&"3")));
1817
1818        let tree = BzTree::with_node_size(2);
1819        tree.insert(Key::new("1"), "1", &crossbeam_epoch::pin());
1820        tree.insert(Key::new("2"), "2", &crossbeam_epoch::pin());
1821
1822        let guard = crossbeam_epoch::pin();
1823        assert!(matches!(tree.get(&"1", &guard), Some(&"1")));
1824        assert!(matches!(tree.get(&"2", &guard), Some(&"2")));
1825        assert!(matches!(tree.iter(&guard).count(), 2));
1826    }
1827
1828    #[test]
1829    fn insert_full_nodess() {
1830        let nodes = 5;
1831        for i in 1..=nodes {
1832            for size in [
1833                2,
1834                thread_rng().gen_range(3..500),
1835                thread_rng().gen_range(3..500),
1836                thread_rng().gen_range(3..500),
1837            ] {
1838                let tree = BzTree::with_node_size(size);
1839                for i in 0..size * i {
1840                    let expected_key = i.to_string();
1841                    let expected_val = i.to_string();
1842                    tree.insert(
1843                        Key::new(expected_key.clone()),
1844                        expected_val.clone(),
1845                        &crossbeam_epoch::pin(),
1846                    );
1847
1848                    assert!(matches!(
1849                        tree.get(&expected_key, &crossbeam_epoch::pin()),
1850                        Some(val) if val == &expected_val
1851                    ));
1852                }
1853
1854                for i in 0..size * i {
1855                    let expected_key = i.to_string();
1856                    let expected_val = i.to_string();
1857                    assert!(matches!(
1858                        tree.get(&expected_key, &crossbeam_epoch::pin()),
1859                        Some(val) if val == &expected_val
1860                    ));
1861                }
1862            }
1863        }
1864    }
1865
1866    #[test]
1867    fn upsert_min_sized_node() {
1868        let tree = BzTree::with_node_size(2);
1869        tree.upsert(Key::new("1"), "1", &crossbeam_epoch::pin());
1870        tree.upsert(Key::new("2"), "2", &crossbeam_epoch::pin());
1871        tree.upsert(Key::new("3"), "3", &crossbeam_epoch::pin());
1872
1873        let guard = crossbeam_epoch::pin();
1874        assert!(matches!(tree.get(&"1", &guard), Some(&"1")));
1875        assert!(matches!(tree.get(&"2", &guard), Some(&"2")));
1876        assert!(matches!(tree.get(&"3", &guard), Some(&"3")));
1877
1878        let tree = BzTree::with_node_size(2);
1879        tree.upsert(Key::new("1"), "1", &crossbeam_epoch::pin());
1880        tree.upsert(Key::new("2"), "2", &crossbeam_epoch::pin());
1881
1882        let guard = crossbeam_epoch::pin();
1883        assert!(matches!(tree.get(&"1", &guard), Some(&"1")));
1884        assert!(matches!(tree.get(&"2", &guard), Some(&"2")));
1885        assert!(matches!(tree.iter(&guard).count(), 2));
1886    }
1887
1888    #[test]
1889    fn upsert_full_nodess() {
1890        let nodes = 5;
1891        for i in 1..=nodes {
1892            for size in [
1893                2,
1894                thread_rng().gen_range(3..500),
1895                thread_rng().gen_range(3..500),
1896                thread_rng().gen_range(3..500),
1897            ] {
1898                let tree = BzTree::with_node_size(size);
1899                for i in 0..size * i {
1900                    let expected_key = i.to_string();
1901                    let expected_val = i.to_string();
1902                    tree.upsert(
1903                        Key::new(expected_key.clone()),
1904                        expected_val.clone(),
1905                        &crossbeam_epoch::pin(),
1906                    );
1907
1908                    assert!(matches!(
1909                        tree.get(&expected_key, &crossbeam_epoch::pin()),
1910                        Some(val) if val == &expected_val
1911                    ));
1912                }
1913
1914                for i in 0..size * i {
1915                    let expected_key = i.to_string();
1916                    let expected_val = i.to_string();
1917                    assert!(matches!(
1918                        tree.get(&expected_key, &crossbeam_epoch::pin()),
1919                        Some(val) if val == &expected_val
1920                    ));
1921                }
1922            }
1923        }
1924    }
1925
1926    #[test]
1927    fn forward_delete() {
1928        let nodes = 5;
1929        for i in 1..=nodes {
1930            for size in [
1931                2,
1932                thread_rng().gen_range(3..500),
1933                thread_rng().gen_range(3..500),
1934                thread_rng().gen_range(3..500),
1935            ] {
1936                let tree = BzTree::with_node_size(size);
1937                for i in 0..size * i {
1938                    let expected_key = i.to_string();
1939                    let expected_val = i.to_string();
1940                    tree.insert(
1941                        Key::new(expected_key.clone()),
1942                        expected_val.clone(),
1943                        &crossbeam_epoch::pin(),
1944                    );
1945                }
1946
1947                for i in 0..size * i {
1948                    let expected_key = i.to_string();
1949                    let expected_val = i.to_string();
1950                    assert!(matches!(
1951                        tree.delete(&expected_key, &crossbeam_epoch::pin()),
1952                        Some(val) if val == &expected_val
1953                    ));
1954                }
1955
1956                assert_eq!(tree.iter(&crossbeam_epoch::pin()).count(), 0);
1957
1958                let tree = BzTree::with_node_size(size);
1959                for i in 0..size * i {
1960                    let expected_key = i.to_string();
1961                    let expected_val = i.to_string();
1962                    tree.insert(
1963                        Key::new(expected_key.clone()),
1964                        expected_val.clone(),
1965                        &crossbeam_epoch::pin(),
1966                    );
1967                    assert!(matches!(
1968                        tree.delete(&expected_key, &crossbeam_epoch::pin()),
1969                        Some(val) if val == &expected_val
1970                    ));
1971                }
1972
1973                assert_eq!(tree.iter(&crossbeam_epoch::pin()).count(), 0);
1974            }
1975        }
1976    }
1977
1978    #[test]
1979    fn backward_delete() {
1980        let nodes = 5;
1981        for i in 1..=nodes {
1982            for size in [
1983                2,
1984                thread_rng().gen_range(3..500),
1985                thread_rng().gen_range(3..500),
1986                thread_rng().gen_range(3..500),
1987            ] {
1988                let tree = BzTree::with_node_size(size);
1989                for i in 0..size * i {
1990                    let expected_key = i.to_string();
1991                    let expected_val = i.to_string();
1992                    tree.insert(
1993                        Key::new(expected_key.clone()),
1994                        expected_val.clone(),
1995                        &crossbeam_epoch::pin(),
1996                    );
1997                }
1998
1999                for i in (0..size * i).rev() {
2000                    let expected_key = i.to_string();
2001                    let expected_val = i.to_string();
2002                    assert!(matches!(
2003                        tree.delete(&expected_key, &crossbeam_epoch::pin()),
2004                        Some(val) if val == &expected_val
2005                    ));
2006                }
2007
2008                assert_eq!(tree.iter(&crossbeam_epoch::pin()).count(), 0);
2009            }
2010        }
2011    }
2012
2013    #[test]
2014    fn delete_from_midpoint() {
2015        let nodes = 5;
2016        for i in 1..=nodes {
2017            for size in [
2018                2,
2019                thread_rng().gen_range(3..500),
2020                thread_rng().gen_range(3..500),
2021                thread_rng().gen_range(3..500),
2022            ] {
2023                let tree = BzTree::with_node_size(size);
2024                for i in 0..size * i {
2025                    let expected_key = i.to_string();
2026                    let expected_val = i.to_string();
2027                    tree.insert(
2028                        Key::new(expected_key.clone()),
2029                        expected_val.clone(),
2030                        &crossbeam_epoch::pin(),
2031                    );
2032                }
2033
2034                let vec = (0..(size * i) as usize).collect::<Vec<usize>>();
2035                let (left, right) = vec.split_at((size * i / 2) as usize);
2036
2037                let mut left_iter = left.iter().rev();
2038                let mut right_iter = right.iter().rev();
2039                loop {
2040                    let next = if thread_rng().gen_bool(0.5) {
2041                        left_iter.next().or_else(|| right_iter.next())
2042                    } else {
2043                        right_iter.next().or_else(|| left_iter.next())
2044                    };
2045                    if next.is_none() {
2046                        break;
2047                    }
2048
2049                    let i = next.unwrap();
2050                    let expected_key = i.to_string();
2051                    let expected_val = i.to_string();
2052                    assert!(matches!(
2053                        tree.delete(&expected_key, &crossbeam_epoch::pin()),
2054                        Some(val) if val == &expected_val
2055                    ));
2056                }
2057
2058                assert_eq!(tree.iter(&crossbeam_epoch::pin()).count(), 0);
2059            }
2060        }
2061    }
2062
2063    #[test]
2064    fn delete_to_midpoint() {
2065        let nodes = 5;
2066        for i in 1..=nodes {
2067            for size in [
2068                2,
2069                thread_rng().gen_range(3..500),
2070                thread_rng().gen_range(3..500),
2071                thread_rng().gen_range(3..500),
2072            ] {
2073                let tree = BzTree::with_node_size(size);
2074                for i in 0..size * i {
2075                    let expected_key = i.to_string();
2076                    let expected_val = i.to_string();
2077                    tree.insert(
2078                        Key::new(expected_key.clone()),
2079                        expected_val.clone(),
2080                        &crossbeam_epoch::pin(),
2081                    );
2082                }
2083
2084                let vec = (0..(size * i) as usize).collect::<Vec<usize>>();
2085                let (left, right) = vec.split_at((size * i / 2) as usize);
2086
2087                let mut left_iter = left.iter();
2088                let mut right_iter = right.iter();
2089                loop {
2090                    let next = if thread_rng().gen_bool(0.5) {
2091                        left_iter.next().or_else(|| right_iter.next())
2092                    } else {
2093                        right_iter.next().or_else(|| left_iter.next())
2094                    };
2095                    if next.is_none() {
2096                        break;
2097                    }
2098
2099                    let i = next.unwrap();
2100                    let expected_key = i.to_string();
2101                    let expected_val = i.to_string();
2102                    assert!(matches!(
2103                        tree.delete(&expected_key, &crossbeam_epoch::pin()),
2104                        Some(val) if val == &expected_val
2105                    ));
2106                }
2107
2108                assert_eq!(tree.iter(&crossbeam_epoch::pin()).count(), 0);
2109            }
2110        }
2111    }
2112
2113    #[test]
2114    fn forward_scan() {
2115        let tree = BzTree::with_node_size(2);
2116        tree.insert(Key::new("1"), String::from("1"), &crossbeam_epoch::pin());
2117        tree.insert(Key::new("2"), String::from("2"), &crossbeam_epoch::pin());
2118        tree.insert(Key::new("3"), String::from("3"), &crossbeam_epoch::pin());
2119        tree.insert(Key::new("4"), String::from("4"), &crossbeam_epoch::pin());
2120        tree.insert(Key::new("5"), String::from("5"), &crossbeam_epoch::pin());
2121
2122        let guard = crossbeam_epoch::pin();
2123        assert!(tree.range(Key::new("6").., &guard).next().is_none());
2124
2125        let mut iter = tree.range(Key::new("2").., &guard);
2126        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2127        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2128        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("4")));
2129        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("5")));
2130        assert!(iter.next().is_none());
2131
2132        let mut iter = tree.range(Key::new("3").., &guard);
2133        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2134        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("4")));
2135        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("5")));
2136        assert!(iter.next().is_none());
2137
2138        let mut iter = tree.range(Key::new("1").., &guard);
2139        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("1")));
2140        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2141        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2142        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("4")));
2143        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("5")));
2144        assert!(iter.next().is_none());
2145
2146        let mut iter = tree.range(Key::new("3")..=Key::new("6"), &guard);
2147        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2148        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("4")));
2149        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("5")));
2150        assert!(iter.next().is_none());
2151
2152        let mut iter = tree.range(Key::new("2")..Key::new("4"), &guard);
2153        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2154        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2155        assert!(iter.next().is_none());
2156
2157        let mut iter = tree.range(..=Key::new("5"), &guard);
2158        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("1")));
2159        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2160        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2161        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("4")));
2162        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("5")));
2163        assert!(iter.next().is_none());
2164
2165        let mut iter = tree.range(..Key::new("6"), &guard);
2166        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("1")));
2167        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2168        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2169        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("4")));
2170        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("5")));
2171        assert!(iter.next().is_none());
2172    }
2173
2174    #[test]
2175    fn reversed_scan() {
2176        let tree = BzTree::with_node_size(2);
2177        tree.insert(Key::new("1"), String::from("1"), &crossbeam_epoch::pin());
2178        tree.insert(Key::new("2"), String::from("2"), &crossbeam_epoch::pin());
2179        tree.insert(Key::new("3"), String::from("3"), &crossbeam_epoch::pin());
2180        tree.insert(Key::new("4"), String::from("4"), &crossbeam_epoch::pin());
2181        tree.insert(Key::new("5"), String::from("5"), &crossbeam_epoch::pin());
2182
2183        let guard = crossbeam_epoch::pin();
2184        let mut iter = tree.range(Key::new("1").., &guard).rev();
2185        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("5")));
2186        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("4")));
2187        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2188        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2189        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("1")));
2190        assert!(iter.next().is_none());
2191
2192        let mut iter = tree.range(Key::new("3").., &guard).rev();
2193        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("5")));
2194        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("4")));
2195        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2196        assert!(iter.next().is_none());
2197
2198        let mut iter = tree.range(..Key::new("3"), &guard).rev();
2199        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2200        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("1")));
2201        assert!(iter.next().is_none());
2202
2203        let mut iter = tree.range(..=Key::new("3"), &guard).rev();
2204        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2205        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2206        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("1")));
2207        assert!(iter.next().is_none());
2208
2209        let mut iter = tree.range(Key::new("2")..Key::new("4"), &guard).rev();
2210        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2211        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2212        assert!(iter.next().is_none());
2213
2214        let mut iter = tree.range(Key::new("2")..=Key::new("4"), &guard).rev();
2215        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("4")));
2216        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2217        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2218        assert!(iter.next().is_none());
2219    }
2220
2221    #[test]
2222    fn reversed_scan_with_deletes() {
2223        let tree = BzTree::with_node_size(2);
2224        tree.insert(Key::new("1"), String::from("1"), &crossbeam_epoch::pin());
2225        tree.insert(Key::new("2"), String::from("2"), &crossbeam_epoch::pin());
2226        tree.insert(Key::new("3"), String::from("3"), &crossbeam_epoch::pin());
2227        tree.insert(Key::new("4"), String::from("4"), &crossbeam_epoch::pin());
2228        tree.insert(Key::new("5"), String::from("5"), &crossbeam_epoch::pin());
2229        tree.insert(Key::new("6"), String::from("6"), &crossbeam_epoch::pin());
2230        tree.insert(Key::new("7"), String::from("7"), &crossbeam_epoch::pin());
2231
2232        let guard = crossbeam_epoch::pin();
2233        tree.delete(&"1", &guard).unwrap();
2234        tree.delete(&"2", &guard).unwrap();
2235        tree.delete(&"4", &guard).unwrap();
2236        tree.delete(&"6", &guard).unwrap();
2237        tree.delete(&"7", &guard).unwrap();
2238
2239        let mut iter = tree.range(Key::new("0").., &guard).rev();
2240        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("5")));
2241        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2242        assert!(iter.next().is_none());
2243
2244        let mut iter = tree.range(Key::new("2").., &guard).rev();
2245        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("5")));
2246        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2247        assert!(iter.next().is_none());
2248
2249        let mut iter = tree.range(Key::new("3").., &guard).rev();
2250        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("5")));
2251        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2252        assert!(iter.next().is_none());
2253
2254        let mut iter = tree.range(..Key::new("3"), &guard).rev();
2255        assert!(iter.next().is_none());
2256
2257        let mut iter = tree.range(..=Key::new("3"), &guard).rev();
2258        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2259        assert!(iter.next().is_none());
2260
2261        let mut iter = tree.range(..=Key::new("5"), &guard).rev();
2262        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("5")));
2263        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2264        assert!(iter.next().is_none());
2265    }
2266
2267    #[test]
2268    fn mixed_scan() {
2269        let tree = BzTree::with_node_size(3);
2270        tree.insert(
2271            Key::new(String::from("1")),
2272            String::from("1"),
2273            &crossbeam_epoch::pin(),
2274        );
2275        tree.insert(
2276            Key::new(String::from("2")),
2277            String::from("2"),
2278            &crossbeam_epoch::pin(),
2279        );
2280        tree.insert(
2281            Key::new(String::from("3")),
2282            String::from("3"),
2283            &crossbeam_epoch::pin(),
2284        );
2285        tree.insert(
2286            Key::new(String::from("4")),
2287            String::from("4"),
2288            &crossbeam_epoch::pin(),
2289        );
2290        tree.insert(
2291            Key::new(String::from("5")),
2292            String::from("5"),
2293            &crossbeam_epoch::pin(),
2294        );
2295
2296        let guard = crossbeam_epoch::pin();
2297        let mut iter = tree.range((Key::new(String::from("1"))).., &guard);
2298        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("1")));
2299        assert!(matches!(iter.next_back(), Some((_, val)) if val == &String::from("5")));
2300        assert!(matches!(iter.next_back(), Some((_, val)) if val == &String::from("4")));
2301        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2302        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2303        assert!(iter.next().is_none());
2304    }
2305
2306    #[test]
2307    fn mixed_scan_on_root_node() {
2308        // size of tree node is greater than count of elements,
2309        // e.g. all elements placed in leaf root node
2310        let tree = BzTree::with_node_size(30);
2311        tree.insert(
2312            Key::new(String::from("1")),
2313            String::from("1"),
2314            &crossbeam_epoch::pin(),
2315        );
2316        tree.insert(
2317            Key::new(String::from("2")),
2318            String::from("2"),
2319            &crossbeam_epoch::pin(),
2320        );
2321        tree.insert(
2322            Key::new(String::from("3")),
2323            String::from("3"),
2324            &crossbeam_epoch::pin(),
2325        );
2326        tree.insert(
2327            Key::new(String::from("4")),
2328            String::from("4"),
2329            &crossbeam_epoch::pin(),
2330        );
2331        tree.insert(
2332            Key::new(String::from("5")),
2333            String::from("5"),
2334            &crossbeam_epoch::pin(),
2335        );
2336
2337        let guard = crossbeam_epoch::pin();
2338        let mut iter = tree.range((Key::new(String::from("1"))).., &guard);
2339        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("1")));
2340        assert!(matches!(iter.next_back(), Some((_, val)) if val == &String::from("5")));
2341        assert!(matches!(iter.next_back(), Some((_, val)) if val == &String::from("4")));
2342        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2343        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2344        assert!(iter.next().is_none());
2345    }
2346
2347    #[test]
2348    fn scan_after_delete() {
2349        let tree = BzTree::with_node_size(3);
2350        tree.insert(Key::new(1), String::from("1"), &crossbeam_epoch::pin());
2351        tree.insert(Key::new(2), String::from("2"), &crossbeam_epoch::pin());
2352        tree.insert(Key::new(3), String::from("3"), &crossbeam_epoch::pin());
2353        tree.insert(Key::new(4), String::from("4"), &crossbeam_epoch::pin());
2354        tree.insert(Key::new(5), String::from("5"), &crossbeam_epoch::pin());
2355        tree.insert(Key::new(6), String::from("6"), &crossbeam_epoch::pin());
2356        tree.insert(Key::new(7), String::from("7"), &crossbeam_epoch::pin());
2357        tree.insert(Key::new(8), String::from("8"), &crossbeam_epoch::pin());
2358
2359        let guard = crossbeam_epoch::pin();
2360        tree.delete(&1, &guard).unwrap();
2361        tree.delete(&2, &guard).unwrap();
2362        tree.delete(&5, &guard).unwrap();
2363        tree.delete(&7, &guard).unwrap();
2364        tree.delete(&8, &guard).unwrap();
2365        let mut iter = tree.range(Key::new(1).., &guard);
2366        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2367        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("4")));
2368        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("6")));
2369        assert!(iter.next().is_none());
2370
2371        let mut iter = tree.range(Key::new(3)..=Key::new(6), &guard);
2372        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2373        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("4")));
2374        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("6")));
2375        assert!(iter.next().is_none());
2376
2377        let mut iter = tree.range(Key::new(2)..Key::new(4), &guard);
2378        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2379        assert!(iter.next().is_none());
2380
2381        let mut iter = tree.range(..=Key::new(6), &guard);
2382        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2383        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("4")));
2384        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("6")));
2385        assert!(iter.next().is_none());
2386
2387        let mut iter = tree.range(..Key::new(7), &guard);
2388        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2389        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("4")));
2390        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("6")));
2391        assert!(iter.next().is_none());
2392    }
2393
2394    #[test]
2395    fn scan_outside_of_existing_key_range() {
2396        let tree = BzTree::with_node_size(2);
2397        let guard = crossbeam_epoch::pin();
2398        tree.insert(Key::new(2), String::from("2"), &guard);
2399        tree.insert(Key::new(3), String::from("3"), &guard);
2400        tree.insert(Key::new(4), String::from("4"), &guard);
2401        tree.insert(Key::new(5), String::from("5"), &guard);
2402        tree.insert(Key::new(6), String::from("6"), &guard);
2403        tree.insert(Key::new(7), String::from("7"), &guard);
2404        tree.insert(Key::new(8), String::from("8"), &guard);
2405        tree.insert(Key::new(9), String::from("9"), &guard);
2406        tree.insert(Key::new(10), String::from("10"), &guard);
2407        tree.insert(Key::new(11), String::from("11"), &guard);
2408        tree.insert(Key::new(12), String::from("12"), &guard);
2409        tree.insert(Key::new(13), String::from("13"), &guard);
2410
2411        assert_eq!(tree.range(Key::new(0)..Key::new(1), &guard).count(), 0);
2412        assert_eq!(tree.range(Key::new(0)..=Key::new(1), &guard).count(), 0);
2413        assert_eq!(tree.range(Key::new(0)..Key::new(2), &guard).count(), 0);
2414        assert_eq!(tree.range(..Key::new(2), &guard).count(), 0);
2415        assert_eq!(tree.range(Key::new(14).., &guard).count(), 0);
2416        assert_eq!(tree.range(Key::new(14)..Key::new(16), &guard).count(), 0);
2417        assert_eq!(tree.range(Key::new(14)..=Key::new(16), &guard).count(), 0);
2418
2419        assert_eq!(tree.range(Key::new(5)..Key::new(5), &guard).count(), 0);
2420        assert_eq!(tree.range(Key::new(6)..Key::new(2), &guard).count(), 0);
2421        assert_eq!(tree.range(Key::new(14)..Key::new(10), &guard).count(), 0);
2422    }
2423
2424    #[test]
2425    fn test_iter() {
2426        let tree = BzTree::with_node_size(2);
2427        tree.insert(Key::new(1), String::from("1"), &crossbeam_epoch::pin());
2428        tree.insert(Key::new(2), String::from("2"), &crossbeam_epoch::pin());
2429        tree.insert(Key::new(3), String::from("3"), &crossbeam_epoch::pin());
2430
2431        let guard = crossbeam_epoch::pin();
2432        let mut iter = tree.iter(&guard);
2433        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("1")));
2434        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2435        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2436        assert!(iter.next().is_none());
2437    }
2438
2439    #[test]
2440    fn test_iter_on_root_node() {
2441        // size of tree node is greater than count of elements,
2442        // e.g. all elements placed in leaf root node
2443        let tree = BzTree::with_node_size(20);
2444        tree.insert(Key::new(1), String::from("1"), &crossbeam_epoch::pin());
2445        tree.insert(Key::new(2), String::from("2"), &crossbeam_epoch::pin());
2446        tree.insert(Key::new(3), String::from("3"), &crossbeam_epoch::pin());
2447
2448        let guard = crossbeam_epoch::pin();
2449        let mut iter = tree.iter(&guard);
2450        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("1")));
2451        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2452        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2453        assert!(iter.next().is_none());
2454    }
2455
2456    #[test]
2457    fn test_rev_iter_on_root_node() {
2458        // size of tree node is greater than count of elements,
2459        // e.g. all elements placed in leaf root node
2460        let tree = BzTree::with_node_size(20);
2461        tree.insert(Key::new(1), String::from("1"), &crossbeam_epoch::pin());
2462        tree.insert(Key::new(2), String::from("2"), &crossbeam_epoch::pin());
2463        tree.insert(Key::new(3), String::from("3"), &crossbeam_epoch::pin());
2464
2465        let guard = crossbeam_epoch::pin();
2466
2467        let mut iter = tree.iter(&guard).rev();
2468        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("3")));
2469        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("2")));
2470        assert!(matches!(iter.next(), Some((_, val)) if val == &String::from("1")));
2471        assert!(iter.next().is_none());
2472    }
2473
2474    #[test]
2475    fn first() {
2476        for size in [
2477            2,
2478            thread_rng().gen_range(3..500),
2479            thread_rng().gen_range(3..500),
2480            thread_rng().gen_range(3..500),
2481        ] {
2482            let tree = BzTree::with_node_size(size);
2483            let guard = crossbeam_epoch::pin();
2484            tree.insert(Key::new("1"), "1", &guard);
2485            tree.insert(Key::new("2"), "2", &guard);
2486            tree.insert(Key::new("3"), "3", &guard);
2487            tree.insert(Key::new("4"), "4", &guard);
2488            tree.insert(Key::new("5"), "5", &guard);
2489            tree.insert(Key::new("6"), "6", &guard);
2490            tree.insert(Key::new("7"), "7", &guard);
2491
2492            assert!(matches!(tree.first(&guard), Some((k, _)) if k == &Key::new("1")));
2493
2494            tree.delete(&"1", &guard);
2495            tree.delete(&"2", &guard);
2496            tree.delete(&"3", &guard);
2497
2498            assert!(matches!(tree.first(&guard), Some((k, _)) if k == &Key::new("4")));
2499        }
2500    }
2501
2502    #[test]
2503    fn last() {
2504        for size in [
2505            2,
2506            thread_rng().gen_range(3..500),
2507            thread_rng().gen_range(3..500),
2508            thread_rng().gen_range(3..500),
2509        ] {
2510            let tree = BzTree::with_node_size(size);
2511            let guard = crossbeam_epoch::pin();
2512            tree.insert(Key::new("1"), "1", &guard);
2513            tree.insert(Key::new("2"), "2", &guard);
2514            tree.insert(Key::new("3"), "3", &guard);
2515            tree.insert(Key::new("4"), "4", &guard);
2516            tree.insert(Key::new("5"), "5", &guard);
2517            tree.insert(Key::new("6"), "6", &guard);
2518            tree.insert(Key::new("7"), "7", &guard);
2519
2520            assert!(matches!(tree.last(&guard), Some((k, _)) if k == &Key::new("7")));
2521
2522            tree.delete(&"5", &guard);
2523            tree.delete(&"6", &guard);
2524            tree.delete(&"7", &guard);
2525
2526            assert!(matches!(tree.last(&guard), Some((k, _)) if k == &Key::new("4")));
2527        }
2528    }
2529
2530    #[test]
2531    fn pop_first() {
2532        for size in [
2533            2,
2534            thread_rng().gen_range(3..500),
2535            thread_rng().gen_range(3..500),
2536            thread_rng().gen_range(3..500),
2537        ] {
2538            let tree = BzTree::with_node_size(size);
2539            let guard = crossbeam_epoch::pin();
2540            let count = size * thread_rng().gen_range(1..5) + thread_rng().gen_range(0..size);
2541            for i in 0..count {
2542                tree.insert(Key::new(i), i, &guard);
2543            }
2544
2545            for i in 0..count {
2546                assert!(matches!(tree.pop_first(&guard), Some((k, _)) if k == Key::new(i)));
2547            }
2548
2549            assert!(tree.iter(&guard).next().is_none());
2550        }
2551    }
2552
2553    #[test]
2554    fn pop_last() {
2555        for size in [
2556            2,
2557            thread_rng().gen_range(3..500),
2558            thread_rng().gen_range(3..500),
2559            thread_rng().gen_range(3..500),
2560        ] {
2561            let tree = BzTree::with_node_size(size);
2562            let guard = crossbeam_epoch::pin();
2563            let count = size * thread_rng().gen_range(1..5) + thread_rng().gen_range(0..size);
2564            for i in 0..count {
2565                tree.insert(Key::new(i), i, &guard);
2566            }
2567
2568            for i in (0..count).rev() {
2569                assert!(matches!(tree.pop_last(&guard), Some((k, _)) if k == Key::new(i)));
2570            }
2571
2572            assert!(tree.iter(&guard).next().is_none());
2573        }
2574    }
2575
2576    #[test]
2577    fn conditional_insert() {
2578        for size in [
2579            2,
2580            thread_rng().gen_range(3..500),
2581            thread_rng().gen_range(3..500),
2582            thread_rng().gen_range(3..500),
2583        ] {
2584            let tree = BzTree::with_node_size(size);
2585            let guard = crossbeam_epoch::pin();
2586            let count = size * thread_rng().gen_range(1..5) + thread_rng().gen_range(0..size);
2587            for i in 0..count {
2588                tree.insert(Key::new(i), i, &guard);
2589            }
2590
2591            for i in 0..count {
2592                assert!(tree.compute(&i, |(_, v)| Some(v + 1), &guard));
2593            }
2594
2595            for i in 0..count {
2596                assert_eq!(*tree.get(&i, &guard).unwrap(), i + 1);
2597            }
2598
2599            assert!(!tree.compute(&(count + 1), |(_, v)| Some(v + 1), &guard));
2600        }
2601    }
2602
2603    #[test]
2604    fn conditional_remove() {
2605        for size in [
2606            2,
2607            thread_rng().gen_range(3..500),
2608            thread_rng().gen_range(3..500),
2609            thread_rng().gen_range(3..500),
2610        ] {
2611            let tree = BzTree::with_node_size(size);
2612            let guard = crossbeam_epoch::pin();
2613            let count = size * thread_rng().gen_range(1..5) + thread_rng().gen_range(0..size);
2614            for i in 0..count {
2615                tree.insert(Key::new(i), i, &guard);
2616            }
2617
2618            for i in 0..count {
2619                assert!(tree.compute(&i, |(_, _)| None, &guard));
2620            }
2621
2622            for i in 0..count {
2623                assert!(matches!(tree.get(&i, &guard), None));
2624            }
2625        }
2626    }
2627
2628    #[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Debug)]
2629    struct Key<K> {
2630        inner: Box<K>,
2631    }
2632
2633    impl<K> Borrow<K> for Key<K> {
2634        fn borrow(&self) -> &K {
2635            self.inner.borrow()
2636        }
2637    }
2638
2639    impl<K> Key<K> {
2640        fn new(val: K) -> Self {
2641            Key {
2642                inner: Box::new(val),
2643            }
2644        }
2645    }
2646
2647    impl<K: Debug> Display for Key<K> {
2648        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2649            writeln!(f, "{:?}", self.inner)
2650        }
2651    }
2652}