rsdb/tree/
mod.rs

1/// A flash-sympathetic persistent lock-free B+ tree.
2use std::fmt::{self, Debug};
3use std::sync::Mutex;
4use std::sync::atomic::AtomicUsize;
5use std::sync::atomic::Ordering::SeqCst;
6
7use super::*;
8
9mod bound;
10mod data;
11mod frag;
12mod node;
13
14pub use self::bound::*;
15pub use self::frag::*;
16pub use self::data::*;
17pub use self::node::*;
18
19/// A flash-sympathetic persistent lock-free B+ tree
20pub struct Tree {
21    pages: PageCache<BLinkMaterializer, Frag, PageID>,
22    root: AtomicUsize,
23}
24
25unsafe impl Send for Tree {}
26unsafe impl Sync for Tree {}
27
28impl Tree {
29    /// Load existing or create a new `Tree`.
30    pub fn new(config: Config) -> Tree {
31        let mut pages = PageCache::new(
32            BLinkMaterializer {
33                roots: Mutex::new(vec![]),
34            },
35            config,
36        );
37
38        let root_opt = pages.recover();
39
40        let root_id = if let Some(root_id) = root_opt {
41            root_id
42        } else {
43            let (root_id, root_cas_key) = pages.allocate();
44            let (leaf_id, leaf_cas_key) = pages.allocate();
45
46            let leaf = Frag::Base(
47                Node {
48                    id: leaf_id,
49                    data: Data::Leaf(vec![]),
50                    next: None,
51                    lo: Bound::Inc(vec![]),
52                    hi: Bound::Inf,
53                },
54                false,
55            );
56
57            let mut root_index_vec = vec![];
58            root_index_vec.push((vec![], leaf_id));
59
60            let root = Frag::Base(
61                Node {
62                    id: root_id,
63                    data: Data::Index(root_index_vec),
64                    next: None,
65                    lo: Bound::Inc(vec![]),
66                    hi: Bound::Inf,
67                },
68                true,
69            );
70
71            pages.set(root_id, root_cas_key, root).unwrap();
72            pages.set(leaf_id, leaf_cas_key, leaf).unwrap();
73            root_id
74        };
75
76        Tree {
77            pages: pages,
78            root: AtomicUsize::new(root_id),
79        }
80    }
81
82    /// Returns a ref to the current `Config` in use by the system.
83    pub fn config(&self) -> &Config {
84        self.pages.config()
85    }
86
87    /// Retrieve a value from the `Tree` if it exists.
88    pub fn get(&self, key: &[u8]) -> Option<Value> {
89        let start = clock();
90        // println!("starting get");
91        let (_, ret) = self.get_internal(key);
92        // println!("done get");
93        M.tree_get.measure(clock() - start);
94        ret
95    }
96
97    /// Compare and swap. Capable of unique creation, conditional modification,
98    /// or deletion. If old is None, this will only set the value if it doesn't
99    /// exist yet. If new is None, will delete the value if old is correct.
100    /// If both old and new are Some, will modify the value if old is correct.
101    ///
102    /// # Examples
103    ///
104    /// ```
105    /// use rsdb::Config;
106    /// let t = Config::default().tree();
107    ///
108    /// // unique creation
109    /// assert_eq!(t.cas(vec![1], None, Some(vec![1])), Ok(()));
110    /// assert_eq!(t.cas(vec![1], None, Some(vec![1])), Err(Some(vec![1])));
111    ///
112    /// // conditional modification
113    /// assert_eq!(t.cas(vec![1], Some(vec![1]), Some(vec![2])), Ok(()));
114    /// assert_eq!(t.cas(vec![1], Some(vec![1]), Some(vec![2])), Err(Some(vec![2])));
115    ///
116    /// // conditional deletion
117    /// assert_eq!(t.cas(vec![1], Some(vec![2]), None), Ok(()));
118    /// assert_eq!(t.get(&*vec![1]), None);
119    /// ```
120    pub fn cas(
121        &self,
122        key: Key,
123        old: Option<Value>,
124        new: Option<Value>,
125    ) -> Result<(), Option<Value>> {
126        let start = clock();
127        // we need to retry caps until old != cur, since just because
128        // cap fails it doesn't mean our value was changed.
129        let frag = new.map(|n| Frag::Set(key.clone(), n)).unwrap_or_else(|| {
130            Frag::Del(key.clone())
131        });
132        loop {
133            let (mut path, cur) = self.get_internal(&*key);
134            if old != cur {
135                M.tree_cas.measure(clock() - start);
136                return Err(cur);
137            }
138
139            let &mut (ref node, ref cas_key) = path.last_mut().unwrap();
140            if self.pages
141                .merge(node.id, cas_key.clone(), frag.clone())
142                .is_ok()
143            {
144                M.tree_cas.measure(clock() - start);
145                return Ok(());
146            }
147            M.tree_looped();
148        }
149    }
150
151    /// Set a key to a new value.
152    pub fn set(&self, key: Key, value: Value) {
153        let start = clock();
154        // println!("starting set of {:?} -> {:?}", key, value);
155        let frag = Frag::Set(key.clone(), value);
156        loop {
157            let mut path = self.path_for_key(&*key);
158            let (mut last_node, last_cas_key) = path.pop().unwrap();
159            // println!("last before: {:?}", last);
160            if let Ok(new_cas_key) = self.pages.merge(last_node.id, last_cas_key, frag.clone()) {
161                last_node.apply(&frag);
162                // println!("last after: {:?}", last);
163                let should_split = last_node.should_split(self.fanout());
164                path.push((last_node.clone(), new_cas_key));
165                // success
166                if should_split {
167                    // println!("need to split {:?}", last_node.id);
168                    self.recursive_split(&path);
169                }
170                M.tree_set.measure(clock() - start);
171                return;
172            }
173            M.tree_looped();
174        }
175        // println!("done set of {:?}", key);
176    }
177
178    /// Delete a value, returning the last result if it existed.
179    ///
180    /// # Examples
181    ///
182    /// ```
183    /// use rsdb::Config;
184    /// let t = Config::default().tree();
185    /// t.set(vec![1], vec![1]);
186    /// assert_eq!(t.del(&*vec![1]), Some(vec![1]));
187    /// assert_eq!(t.del(&*vec![1]), None);
188    /// ```
189    pub fn del(&self, key: &[u8]) -> Option<Value> {
190        let start = clock();
191        let mut ret: Option<Value>;
192        loop {
193            let mut path = self.path_for_key(&*key);
194            let (leaf_node, leaf_cas_key) = path.pop().unwrap();
195            match leaf_node.data {
196                Data::Leaf(ref items) => {
197                    let search = items.binary_search_by(|&(ref k, ref _v)| (**k).cmp(key));
198                    if let Ok(idx) = search {
199                        ret = Some(items[idx].1.clone());
200                    } else {
201                        ret = None;
202                        break;
203                    }
204                }
205                _ => panic!("last node in path is not leaf"),
206            }
207
208            let frag = Frag::Del(key.to_vec());
209            if self.pages.merge(leaf_node.id, leaf_cas_key, frag).is_ok() {
210                // success
211                break;
212            } else {
213                // failure, retry
214            }
215            M.tree_looped();
216        }
217        M.tree_del.measure(clock() - start);
218        ret
219    }
220
221    /// Iterate over tuples of keys and values, starting at the provided key.
222    ///
223    /// # Examples
224    ///
225    /// ```
226    /// use rsdb::Config;
227    /// let t = Config::default().tree();
228    /// t.set(vec![1], vec![10]);
229    /// t.set(vec![2], vec![20]);
230    /// t.set(vec![3], vec![30]);
231    /// let mut iter = t.scan(&*vec![2]);
232    /// assert_eq!(iter.next(), Some((vec![2], vec![20])));
233    /// assert_eq!(iter.next(), Some((vec![3], vec![30])));
234    /// assert_eq!(iter.next(), None);
235    /// ```
236    pub fn scan(&self, key: &[u8]) -> TreeIter {
237        let (path, _) = self.get_internal(key);
238        let &(ref last_node, ref _last_cas_key) = path.last().unwrap();
239        TreeIter {
240            id: last_node.id,
241            inner: &self.pages,
242            last_key: Bound::Non(key.to_vec()),
243        }
244    }
245
246    /// Iterate over the tuples of keys and values in this tree.
247    ///
248    /// # Examples
249    ///
250    /// ```
251    /// use rsdb::Config;
252    /// let t = Config::default().tree();
253    /// t.set(vec![1], vec![10]);
254    /// t.set(vec![2], vec![20]);
255    /// t.set(vec![3], vec![30]);
256    /// let mut iter = t.iter();
257    /// assert_eq!(iter.next(), Some((vec![1], vec![10])));
258    /// assert_eq!(iter.next(), Some((vec![2], vec![20])));
259    /// assert_eq!(iter.next(), Some((vec![3], vec![30])));
260    /// assert_eq!(iter.next(), None);
261    /// ```
262    pub fn iter(&self) -> TreeIter {
263        let (path, _) = self.get_internal(b"");
264        let &(ref last_node, ref _last_cas_key) = path.last().unwrap();
265        TreeIter {
266            id: last_node.id,
267            inner: &self.pages,
268            last_key: Bound::Non(vec![]),
269        }
270    }
271
272    fn recursive_split(&self, path: &[(Node, CasKey<Frag>)]) {
273        // to split, we pop the path, see if it's in need of split, recurse up
274        // two-phase: (in prep for lock-free, not necessary for single threaded)
275        //  1. half-split: install split on child, P
276        //      a. allocate new right sibling page, Q
277        //      b. locate split point
278        //      c. create new consolidated pages for both sides
279        //      d. add new node to pagetable
280        //      e. merge split delta to original page P with physical pointer to Q
281        //      f. if failed, free the new page
282        //  2. parent update: install new index term on parent
283        //      a. merge "index term delta record" to parent, containing:
284        //          i. new bounds for P & Q
285        //          ii. logical pointer to Q
286        //
287        //      (it's possible parent was merged in the mean-time, so if that's the
288        //      case, we need to go up the path to the grandparent then down again
289        //      or higher until it works)
290        //  3. any traversing nodes that witness #1 but not #2 try to complete it
291        //
292        //  root is special case, where we need to hoist a new root
293
294        let mut all_page_views = path.to_vec();
295        let mut root_and_key = all_page_views.remove(0);
296
297        // println!("before:\n{:?}\n", self);
298
299        while let Some((node, cas_key)) = all_page_views.pop() {
300            // println!("splitting node {:?}", node);
301            if node.should_split(self.fanout()) {
302                // try to child split
303                if let Ok(parent_split) = self.child_split(&node, cas_key) {
304                    // now try to parent split
305                    let &mut (ref mut parent_node, ref mut parent_cas_key) =
306                        all_page_views.last_mut().unwrap_or(&mut root_and_key);
307
308                    let res = self.parent_split(
309                        parent_node.clone(),
310                        parent_cas_key.clone(),
311                        parent_split.clone(),
312                    );
313
314                    if let Ok(res) = res {
315                        parent_node.apply(&Frag::ParentSplit(parent_split));
316                        *parent_cas_key = res;
317                    } else {
318                        continue;
319                    }
320                } else {
321                    continue;
322                }
323            }
324        }
325
326        let (root_node, root_cas_key) = root_and_key;
327
328        if root_node.should_split(self.fanout()) {
329            // println!("{}: hoisting root {}", name, root_frag.node.id);
330            if let Ok(parent_split) = self.child_split(&root_node, root_cas_key) {
331                self.root_hoist(root_node.id, parent_split.to, parent_split.at.inner().unwrap());
332            }
333        }
334        // println!("after:\n{:?}\n", self);
335    }
336
337    fn child_split(&self, node: &Node, node_cas_key: CasKey<Frag>) -> Result<ParentSplit, ()> {
338        let (new_pid, new_cas_key) = self.pages.allocate();
339
340        // split the node in half
341        let rhs = node.split(new_pid);
342
343        let child_split = Frag::ChildSplit(ChildSplit {
344            at: rhs.lo.clone(),
345            to: new_pid,
346        });
347
348        let parent_split = ParentSplit {
349            at: rhs.lo.clone(),
350            to: new_pid,
351        };
352
353        // install the new right side
354        self.pages
355            .set(new_pid, new_cas_key, Frag::Base(rhs, false))
356            .expect("failed to initialize child split");
357
358        // try to install a child split on the left side
359        if self.pages
360            .merge(node.id, node_cas_key, child_split)
361            .is_err()
362        {
363            // if we failed, don't follow through with the parent split
364            // println!("{}: {}|{} @ {:?} -", tn(), node.id, new_pid, parent_split.at);
365            self.pages.free(new_pid);
366            return Err(());
367        }
368
369        Ok(parent_split)
370    }
371
372    fn parent_split(
373        &self,
374        parent_node: Node,
375        parent_cas_key: CasKey<Frag>,
376        parent_split: ParentSplit,
377    ) -> Result<CasKey<Frag>, Option<CasKey<Frag>>> {
378        // install parent split
379        let res = self.pages.merge(
380            parent_node.id,
381            parent_cas_key,
382            Frag::ParentSplit(parent_split.clone()),
383        );
384
385        if res.is_err() {
386            // println!("{}: {} <- {:?}|{} -", tn(), parent_node.id, parent_split.at, parent_split.to);
387        }
388
389        res
390    }
391
392    fn root_hoist(&self, from: PageID, to: PageID, at: Key) {
393        // hoist new root, pointing to lhs & rhs
394        let (new_root_pid, new_root_cas_key) = self.pages.allocate();
395        let mut new_root_vec = vec![];
396        new_root_vec.push((vec![], from));
397        new_root_vec.push((at, to));
398        let new_root = Frag::Base(
399            Node {
400                id: new_root_pid,
401                data: Data::Index(new_root_vec),
402                next: None,
403                lo: Bound::Inc(vec![]),
404                hi: Bound::Inf,
405            },
406            true,
407        );
408        self.pages
409            .set(new_root_pid, new_root_cas_key, new_root)
410            .unwrap();
411        // println!("split is {:?}", parent_split);
412        // println!("trying to cas root at {:?} with real value {:?}", path.first().unwrap().pid, self.root.load(SeqCst));
413        // println!("root_id is {}", root_id);
414        let cas = self.root.compare_and_swap(from, new_root_pid, SeqCst);
415        if cas == from {
416            // println!("{}: root hoist of {} +", tn(), from);
417        } else {
418            self.pages.free(new_root_pid);
419            // println!("root hoist of {} -", from);
420        }
421    }
422
423    fn get_internal(&self, key: &[u8]) -> (Vec<(Node, CasKey<Frag>)>, Option<Value>) {
424        let path = self.path_for_key(&*key);
425
426        let ret = path.last().and_then(|&(ref last_node, ref _last_cas_key)| {
427            let data = &last_node.data;
428            let items = data.leaf_ref().unwrap();
429            let search = items.binary_search_by(|&(ref k, ref _v)| (**k).cmp(key));
430            if let Ok(idx) = search {
431                // cap a del frag below
432                Some(items[idx].1.clone())
433            } else {
434                // key does not exist
435                None
436            }
437        });
438
439        (path, ret)
440    }
441
442    fn fanout(&self) -> usize {
443        self.config().get_blink_fanout()
444    }
445
446    #[doc(hidden)]
447    pub fn key_debug_str(&self, key: &[u8]) -> String {
448        let path = self.path_for_key(key);
449        let mut ret = String::new();
450        for &(ref node, _) in &path {
451            ret.push_str(&*format!("\n{:?}", node));
452        }
453        ret
454    }
455
456    /// returns the traversal path, completing any observed
457    /// partially complete splits or merges along the way.
458    fn path_for_key(&self, key: &[u8]) -> Vec<(Node, CasKey<Frag>)> {
459        let key_bound = Bound::Inc(key.into());
460        let mut cursor = self.root.load(SeqCst);
461        let mut path: Vec<(Node, CasKey<Frag>)> = vec![];
462
463        // unsplit_parent is used for tracking need
464        // to complete partial splits.
465        let mut unsplit_parent: Option<usize> = None;
466
467        loop {
468            let get_cursor = self.pages.get(cursor);
469            if get_cursor.is_none() {
470                // restart search from the tree's root
471                cursor = self.root.load(SeqCst);
472                continue;
473            }
474            let (frag, cas_key) = get_cursor.unwrap();
475            let (node, _is_root) = frag.into_base().unwrap();
476
477            // TODO this may need to change when handling (half) merges
478            assert!(node.lo <= key_bound, "overshot key somehow");
479
480            // half-complete split detect & completion
481            if node.hi <= key_bound {
482                // println!("{:?} is hi, looking for {:?}", page_view.node.hi, key);
483                // we have encountered a child split, without
484                // having hit the parent split above.
485                cursor = node.next.unwrap();
486                if unsplit_parent.is_none() {
487                    if !path.is_empty() {
488                        unsplit_parent = Some(path.len() - 1);
489                    }
490                }
491                continue;
492            } else if let Some(idx) = unsplit_parent.take() {
493                // we have found the proper page for
494                // our split.
495                // println!("before: {:?}", self);
496                let &(ref parent_node, ref parent_cas_key): &(Node, CasKey<Frag>) = &path[idx];
497
498                let ps = Frag::ParentSplit(ParentSplit {
499                    at: node.lo.clone(),
500                    to: node.id,
501                });
502
503                let _res = self.pages.merge(parent_node.id, parent_cas_key.clone(), ps);
504                // println!("trying to fix incomplete parent split: {:?}", res);
505                // println!("after: {:?}", self);
506            }
507
508            path.push((node, cas_key));
509
510            match path.last().unwrap().0.data {
511                Data::Index(ref ptrs) => {
512                    let old_cursor = cursor;
513                    for &(ref sep_k, ref ptr) in ptrs {
514                        if &**sep_k <= key {
515                            cursor = *ptr;
516                        } else {
517                            break; // we've found our next cursor
518                        }
519                    }
520                    if cursor == old_cursor {
521                        panic!("stuck in page traversal loop");
522                    }
523                }
524                Data::Leaf(_) => {
525                    break;
526                }
527            }
528        }
529
530        path
531    }
532}
533
534impl Debug for Tree {
535    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
536        let mut pid = self.root.load(SeqCst);
537        let mut left_most = pid;
538        let mut level = 0;
539
540        f.write_str("Tree: \n\t").unwrap();
541        self.pages.fmt(f).unwrap();
542        f.write_str("\tlevel 0:\n").unwrap();
543
544        loop {
545            let (frag, _cas_key) = self.pages.get(pid).unwrap();
546            let (node, _is_root) = frag.base().unwrap();
547
548            f.write_str("\t\t").unwrap();
549            node.fmt(f).unwrap();
550            f.write_str("\n").unwrap();
551
552            if let Some(next_pid) = node.next {
553                pid = next_pid;
554            } else {
555                // we've traversed our level, time to bump down
556                let (left_frag, _left_cas_key) = self.pages.get(left_most).unwrap();
557                let (left_node, _is_root) = left_frag.base().unwrap();
558
559                match left_node.data {
560                    Data::Index(ptrs) => {
561                        if let Some(&(ref _sep, ref next_pid)) = ptrs.first() {
562                            pid = *next_pid;
563                            left_most = *next_pid;
564                            level += 1;
565                            f.write_str(&*format!("\n\tlevel {}:\n", level)).unwrap();
566                        } else {
567                            panic!("trying to debug print empty index node");
568                        }
569                    }
570                    Data::Leaf(_items) => {
571                        // we've reached the end of our tree, all leafs are on
572                        // the lowest level.
573                        break;
574                    }
575                }
576            }
577        }
578
579
580        Ok(())
581    }
582}
583
584/// An iterator over keys and values in a `Tree`.
585pub struct TreeIter<'a> {
586    id: PageID,
587    inner: &'a PageCache<BLinkMaterializer, Frag, PageID>,
588    last_key: Bound,
589    // TODO we have to refactor this in light of pages being deleted
590}
591
592impl<'a> Iterator for TreeIter<'a> {
593    type Item = (Vec<u8>, Vec<u8>);
594
595    fn next(&mut self) -> Option<Self::Item> {
596        let start = clock();
597        loop {
598            let (frag, _cas_key) = self.inner.get(self.id).unwrap();
599            let (node, _is_root) = frag.base().unwrap();
600            // TODO this could be None if the node was removed since the last
601            // iteration, and we need to just get the inner node again...
602            for (ref k, ref v) in node.data.leaf().unwrap() {
603                if Bound::Inc(k.clone()) > self.last_key {
604                    self.last_key = Bound::Inc(k.to_vec());
605                    let ret = Some((k.clone(), v.clone()));
606                    M.tree_scan.measure(clock() - start);
607                    return ret;
608                }
609            }
610            if node.next.is_none() {
611                M.tree_scan.measure(clock() - start);
612                return None;
613            }
614            self.id = node.next.unwrap();
615        }
616    }
617}
618
619impl<'a> IntoIterator for &'a Tree {
620    type Item = (Vec<u8>, Vec<u8>);
621    type IntoIter = TreeIter<'a>;
622
623    fn into_iter(self) -> TreeIter<'a> {
624        self.iter()
625    }
626}
627
628pub struct BLinkMaterializer {
629    roots: Mutex<Vec<PageID>>,
630}
631
632impl Materializer for BLinkMaterializer {
633    type PageFrag = Frag;
634    type Recovery = PageID;
635
636    fn merge(&self, frags: &[&Frag]) -> Frag {
637        let mut base_node_opt: Option<Node> = None;
638        let mut root = false;
639
640        for &frag in frags {
641            if let Some(ref mut base_node) = base_node_opt {
642                base_node.apply(frag);
643            } else {
644                let (base_node, is_root) = frag.base().unwrap();
645                base_node_opt = Some(base_node);
646                root = is_root;
647            }
648        }
649
650        Frag::Base(base_node_opt.unwrap(), root)
651    }
652
653    fn recover(&self, frag: &Frag) -> Option<PageID> {
654        match *frag {
655            Frag::Base(ref node, root) => {
656                if root {
657                    let mut roots = self.roots.lock().unwrap();
658                    if roots.contains(&node.id) {
659                        None
660                    } else {
661                        roots.push(node.id);
662                        Some(node.id)
663                    }
664                } else {
665                    None
666                }
667            }
668            _ => None,
669        }
670    }
671}