1use std::fmt::{self, Debug};
3use std::sync::Mutex;
4use std::sync::atomic::AtomicUsize;
5use std::sync::atomic::Ordering::SeqCst;
6
7use super::*;
8
9mod bound;
10mod data;
11mod frag;
12mod node;
13
14pub use self::bound::*;
15pub use self::frag::*;
16pub use self::data::*;
17pub use self::node::*;
18
19pub struct Tree {
21 pages: PageCache<BLinkMaterializer, Frag, PageID>,
22 root: AtomicUsize,
23}
24
25unsafe impl Send for Tree {}
26unsafe impl Sync for Tree {}
27
28impl Tree {
29 pub fn new(config: Config) -> Tree {
31 let mut pages = PageCache::new(
32 BLinkMaterializer {
33 roots: Mutex::new(vec![]),
34 },
35 config,
36 );
37
38 let root_opt = pages.recover();
39
40 let root_id = if let Some(root_id) = root_opt {
41 root_id
42 } else {
43 let (root_id, root_cas_key) = pages.allocate();
44 let (leaf_id, leaf_cas_key) = pages.allocate();
45
46 let leaf = Frag::Base(
47 Node {
48 id: leaf_id,
49 data: Data::Leaf(vec![]),
50 next: None,
51 lo: Bound::Inc(vec![]),
52 hi: Bound::Inf,
53 },
54 false,
55 );
56
57 let mut root_index_vec = vec![];
58 root_index_vec.push((vec![], leaf_id));
59
60 let root = Frag::Base(
61 Node {
62 id: root_id,
63 data: Data::Index(root_index_vec),
64 next: None,
65 lo: Bound::Inc(vec![]),
66 hi: Bound::Inf,
67 },
68 true,
69 );
70
71 pages.set(root_id, root_cas_key, root).unwrap();
72 pages.set(leaf_id, leaf_cas_key, leaf).unwrap();
73 root_id
74 };
75
76 Tree {
77 pages: pages,
78 root: AtomicUsize::new(root_id),
79 }
80 }
81
82 pub fn config(&self) -> &Config {
84 self.pages.config()
85 }
86
87 pub fn get(&self, key: &[u8]) -> Option<Value> {
89 let start = clock();
90 let (_, ret) = self.get_internal(key);
92 M.tree_get.measure(clock() - start);
94 ret
95 }
96
97 pub fn cas(
121 &self,
122 key: Key,
123 old: Option<Value>,
124 new: Option<Value>,
125 ) -> Result<(), Option<Value>> {
126 let start = clock();
127 let frag = new.map(|n| Frag::Set(key.clone(), n)).unwrap_or_else(|| {
130 Frag::Del(key.clone())
131 });
132 loop {
133 let (mut path, cur) = self.get_internal(&*key);
134 if old != cur {
135 M.tree_cas.measure(clock() - start);
136 return Err(cur);
137 }
138
139 let &mut (ref node, ref cas_key) = path.last_mut().unwrap();
140 if self.pages
141 .merge(node.id, cas_key.clone(), frag.clone())
142 .is_ok()
143 {
144 M.tree_cas.measure(clock() - start);
145 return Ok(());
146 }
147 M.tree_looped();
148 }
149 }
150
151 pub fn set(&self, key: Key, value: Value) {
153 let start = clock();
154 let frag = Frag::Set(key.clone(), value);
156 loop {
157 let mut path = self.path_for_key(&*key);
158 let (mut last_node, last_cas_key) = path.pop().unwrap();
159 if let Ok(new_cas_key) = self.pages.merge(last_node.id, last_cas_key, frag.clone()) {
161 last_node.apply(&frag);
162 let should_split = last_node.should_split(self.fanout());
164 path.push((last_node.clone(), new_cas_key));
165 if should_split {
167 self.recursive_split(&path);
169 }
170 M.tree_set.measure(clock() - start);
171 return;
172 }
173 M.tree_looped();
174 }
175 }
177
178 pub fn del(&self, key: &[u8]) -> Option<Value> {
190 let start = clock();
191 let mut ret: Option<Value>;
192 loop {
193 let mut path = self.path_for_key(&*key);
194 let (leaf_node, leaf_cas_key) = path.pop().unwrap();
195 match leaf_node.data {
196 Data::Leaf(ref items) => {
197 let search = items.binary_search_by(|&(ref k, ref _v)| (**k).cmp(key));
198 if let Ok(idx) = search {
199 ret = Some(items[idx].1.clone());
200 } else {
201 ret = None;
202 break;
203 }
204 }
205 _ => panic!("last node in path is not leaf"),
206 }
207
208 let frag = Frag::Del(key.to_vec());
209 if self.pages.merge(leaf_node.id, leaf_cas_key, frag).is_ok() {
210 break;
212 } else {
213 }
215 M.tree_looped();
216 }
217 M.tree_del.measure(clock() - start);
218 ret
219 }
220
221 pub fn scan(&self, key: &[u8]) -> TreeIter {
237 let (path, _) = self.get_internal(key);
238 let &(ref last_node, ref _last_cas_key) = path.last().unwrap();
239 TreeIter {
240 id: last_node.id,
241 inner: &self.pages,
242 last_key: Bound::Non(key.to_vec()),
243 }
244 }
245
246 pub fn iter(&self) -> TreeIter {
263 let (path, _) = self.get_internal(b"");
264 let &(ref last_node, ref _last_cas_key) = path.last().unwrap();
265 TreeIter {
266 id: last_node.id,
267 inner: &self.pages,
268 last_key: Bound::Non(vec![]),
269 }
270 }
271
272 fn recursive_split(&self, path: &[(Node, CasKey<Frag>)]) {
273 let mut all_page_views = path.to_vec();
295 let mut root_and_key = all_page_views.remove(0);
296
297 while let Some((node, cas_key)) = all_page_views.pop() {
300 if node.should_split(self.fanout()) {
302 if let Ok(parent_split) = self.child_split(&node, cas_key) {
304 let &mut (ref mut parent_node, ref mut parent_cas_key) =
306 all_page_views.last_mut().unwrap_or(&mut root_and_key);
307
308 let res = self.parent_split(
309 parent_node.clone(),
310 parent_cas_key.clone(),
311 parent_split.clone(),
312 );
313
314 if let Ok(res) = res {
315 parent_node.apply(&Frag::ParentSplit(parent_split));
316 *parent_cas_key = res;
317 } else {
318 continue;
319 }
320 } else {
321 continue;
322 }
323 }
324 }
325
326 let (root_node, root_cas_key) = root_and_key;
327
328 if root_node.should_split(self.fanout()) {
329 if let Ok(parent_split) = self.child_split(&root_node, root_cas_key) {
331 self.root_hoist(root_node.id, parent_split.to, parent_split.at.inner().unwrap());
332 }
333 }
334 }
336
337 fn child_split(&self, node: &Node, node_cas_key: CasKey<Frag>) -> Result<ParentSplit, ()> {
338 let (new_pid, new_cas_key) = self.pages.allocate();
339
340 let rhs = node.split(new_pid);
342
343 let child_split = Frag::ChildSplit(ChildSplit {
344 at: rhs.lo.clone(),
345 to: new_pid,
346 });
347
348 let parent_split = ParentSplit {
349 at: rhs.lo.clone(),
350 to: new_pid,
351 };
352
353 self.pages
355 .set(new_pid, new_cas_key, Frag::Base(rhs, false))
356 .expect("failed to initialize child split");
357
358 if self.pages
360 .merge(node.id, node_cas_key, child_split)
361 .is_err()
362 {
363 self.pages.free(new_pid);
366 return Err(());
367 }
368
369 Ok(parent_split)
370 }
371
372 fn parent_split(
373 &self,
374 parent_node: Node,
375 parent_cas_key: CasKey<Frag>,
376 parent_split: ParentSplit,
377 ) -> Result<CasKey<Frag>, Option<CasKey<Frag>>> {
378 let res = self.pages.merge(
380 parent_node.id,
381 parent_cas_key,
382 Frag::ParentSplit(parent_split.clone()),
383 );
384
385 if res.is_err() {
386 }
388
389 res
390 }
391
392 fn root_hoist(&self, from: PageID, to: PageID, at: Key) {
393 let (new_root_pid, new_root_cas_key) = self.pages.allocate();
395 let mut new_root_vec = vec![];
396 new_root_vec.push((vec![], from));
397 new_root_vec.push((at, to));
398 let new_root = Frag::Base(
399 Node {
400 id: new_root_pid,
401 data: Data::Index(new_root_vec),
402 next: None,
403 lo: Bound::Inc(vec![]),
404 hi: Bound::Inf,
405 },
406 true,
407 );
408 self.pages
409 .set(new_root_pid, new_root_cas_key, new_root)
410 .unwrap();
411 let cas = self.root.compare_and_swap(from, new_root_pid, SeqCst);
415 if cas == from {
416 } else {
418 self.pages.free(new_root_pid);
419 }
421 }
422
423 fn get_internal(&self, key: &[u8]) -> (Vec<(Node, CasKey<Frag>)>, Option<Value>) {
424 let path = self.path_for_key(&*key);
425
426 let ret = path.last().and_then(|&(ref last_node, ref _last_cas_key)| {
427 let data = &last_node.data;
428 let items = data.leaf_ref().unwrap();
429 let search = items.binary_search_by(|&(ref k, ref _v)| (**k).cmp(key));
430 if let Ok(idx) = search {
431 Some(items[idx].1.clone())
433 } else {
434 None
436 }
437 });
438
439 (path, ret)
440 }
441
442 fn fanout(&self) -> usize {
443 self.config().get_blink_fanout()
444 }
445
446 #[doc(hidden)]
447 pub fn key_debug_str(&self, key: &[u8]) -> String {
448 let path = self.path_for_key(key);
449 let mut ret = String::new();
450 for &(ref node, _) in &path {
451 ret.push_str(&*format!("\n{:?}", node));
452 }
453 ret
454 }
455
456 fn path_for_key(&self, key: &[u8]) -> Vec<(Node, CasKey<Frag>)> {
459 let key_bound = Bound::Inc(key.into());
460 let mut cursor = self.root.load(SeqCst);
461 let mut path: Vec<(Node, CasKey<Frag>)> = vec![];
462
463 let mut unsplit_parent: Option<usize> = None;
466
467 loop {
468 let get_cursor = self.pages.get(cursor);
469 if get_cursor.is_none() {
470 cursor = self.root.load(SeqCst);
472 continue;
473 }
474 let (frag, cas_key) = get_cursor.unwrap();
475 let (node, _is_root) = frag.into_base().unwrap();
476
477 assert!(node.lo <= key_bound, "overshot key somehow");
479
480 if node.hi <= key_bound {
482 cursor = node.next.unwrap();
486 if unsplit_parent.is_none() {
487 if !path.is_empty() {
488 unsplit_parent = Some(path.len() - 1);
489 }
490 }
491 continue;
492 } else if let Some(idx) = unsplit_parent.take() {
493 let &(ref parent_node, ref parent_cas_key): &(Node, CasKey<Frag>) = &path[idx];
497
498 let ps = Frag::ParentSplit(ParentSplit {
499 at: node.lo.clone(),
500 to: node.id,
501 });
502
503 let _res = self.pages.merge(parent_node.id, parent_cas_key.clone(), ps);
504 }
507
508 path.push((node, cas_key));
509
510 match path.last().unwrap().0.data {
511 Data::Index(ref ptrs) => {
512 let old_cursor = cursor;
513 for &(ref sep_k, ref ptr) in ptrs {
514 if &**sep_k <= key {
515 cursor = *ptr;
516 } else {
517 break; }
519 }
520 if cursor == old_cursor {
521 panic!("stuck in page traversal loop");
522 }
523 }
524 Data::Leaf(_) => {
525 break;
526 }
527 }
528 }
529
530 path
531 }
532}
533
534impl Debug for Tree {
535 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
536 let mut pid = self.root.load(SeqCst);
537 let mut left_most = pid;
538 let mut level = 0;
539
540 f.write_str("Tree: \n\t").unwrap();
541 self.pages.fmt(f).unwrap();
542 f.write_str("\tlevel 0:\n").unwrap();
543
544 loop {
545 let (frag, _cas_key) = self.pages.get(pid).unwrap();
546 let (node, _is_root) = frag.base().unwrap();
547
548 f.write_str("\t\t").unwrap();
549 node.fmt(f).unwrap();
550 f.write_str("\n").unwrap();
551
552 if let Some(next_pid) = node.next {
553 pid = next_pid;
554 } else {
555 let (left_frag, _left_cas_key) = self.pages.get(left_most).unwrap();
557 let (left_node, _is_root) = left_frag.base().unwrap();
558
559 match left_node.data {
560 Data::Index(ptrs) => {
561 if let Some(&(ref _sep, ref next_pid)) = ptrs.first() {
562 pid = *next_pid;
563 left_most = *next_pid;
564 level += 1;
565 f.write_str(&*format!("\n\tlevel {}:\n", level)).unwrap();
566 } else {
567 panic!("trying to debug print empty index node");
568 }
569 }
570 Data::Leaf(_items) => {
571 break;
574 }
575 }
576 }
577 }
578
579
580 Ok(())
581 }
582}
583
584pub struct TreeIter<'a> {
586 id: PageID,
587 inner: &'a PageCache<BLinkMaterializer, Frag, PageID>,
588 last_key: Bound,
589 }
591
592impl<'a> Iterator for TreeIter<'a> {
593 type Item = (Vec<u8>, Vec<u8>);
594
595 fn next(&mut self) -> Option<Self::Item> {
596 let start = clock();
597 loop {
598 let (frag, _cas_key) = self.inner.get(self.id).unwrap();
599 let (node, _is_root) = frag.base().unwrap();
600 for (ref k, ref v) in node.data.leaf().unwrap() {
603 if Bound::Inc(k.clone()) > self.last_key {
604 self.last_key = Bound::Inc(k.to_vec());
605 let ret = Some((k.clone(), v.clone()));
606 M.tree_scan.measure(clock() - start);
607 return ret;
608 }
609 }
610 if node.next.is_none() {
611 M.tree_scan.measure(clock() - start);
612 return None;
613 }
614 self.id = node.next.unwrap();
615 }
616 }
617}
618
619impl<'a> IntoIterator for &'a Tree {
620 type Item = (Vec<u8>, Vec<u8>);
621 type IntoIter = TreeIter<'a>;
622
623 fn into_iter(self) -> TreeIter<'a> {
624 self.iter()
625 }
626}
627
628pub struct BLinkMaterializer {
629 roots: Mutex<Vec<PageID>>,
630}
631
632impl Materializer for BLinkMaterializer {
633 type PageFrag = Frag;
634 type Recovery = PageID;
635
636 fn merge(&self, frags: &[&Frag]) -> Frag {
637 let mut base_node_opt: Option<Node> = None;
638 let mut root = false;
639
640 for &frag in frags {
641 if let Some(ref mut base_node) = base_node_opt {
642 base_node.apply(frag);
643 } else {
644 let (base_node, is_root) = frag.base().unwrap();
645 base_node_opt = Some(base_node);
646 root = is_root;
647 }
648 }
649
650 Frag::Base(base_node_opt.unwrap(), root)
651 }
652
653 fn recover(&self, frag: &Frag) -> Option<PageID> {
654 match *frag {
655 Frag::Base(ref node, root) => {
656 if root {
657 let mut roots = self.roots.lock().unwrap();
658 if roots.contains(&node.id) {
659 None
660 } else {
661 roots.push(node.id);
662 Some(node.id)
663 }
664 } else {
665 None
666 }
667 }
668 _ => None,
669 }
670 }
671}