1use crate::Options;
2use crate::cc::context::{Context, TxOutcome};
3use crate::map::buffer::BucketContext;
4use crate::map::publish::AllocGuard;
5use crate::map::{Loader, Node, Page};
6use crate::types::data::{HistRef, IterItem, Record, Val};
7use crate::types::node::{Junk, MergeOp, RawLeafIter, RawLeafRevIter};
8use crate::types::refbox::DeltaView;
9use crate::types::traits::{IAsBoxRef, IBoxHeader, IDecode, IHeader, ILoader};
10use crate::utils::data::Position;
11use crate::utils::observe::{
12 CounterMetric, HistogramMetric, LATENCY_SAMPLE_SHIFT, observe_elapsed, sampled_instant,
13};
14use crate::utils::{Handle, MutRef, NULL_ADDR, OpCode};
15use crate::{
16 Store,
17 types::{
18 data::{Index, IntlKey, Key, Ver},
19 refbox::BoxRef,
20 traits::{ICodec, IKey},
21 },
22 utils::{NULL_CMD, NULL_PID},
23};
24use crossbeam_epoch::Guard;
25use std::cmp::Ordering::Equal;
26use std::ops::{Bound, RangeBounds};
27use std::sync::Arc;
28use std::sync::atomic::Ordering::Acquire;
29
30#[derive(Clone)]
32pub struct ValRef {
33 raw: Record,
34 _owner: BoxRef,
35}
36
37#[derive(Clone, Copy)]
38pub(crate) struct LatestValMeta {
39 pub(crate) ver: Ver,
40 pub(crate) group_id: u8,
41 pub(crate) is_del: bool,
42}
43
44impl ValRef {
45 pub(crate) fn new(raw: Record, owner: BoxRef) -> Self {
46 Self { raw, _owner: owner }
47 }
48
49 pub fn slice(&self) -> &[u8] {
51 self.raw.data()
52 }
53
54 pub fn to_vec(self) -> Vec<u8> {
56 self.raw.data().to_vec()
57 }
58}
59
60#[derive(Clone)]
61pub struct Tree {
62 pub(crate) store: MutRef<Store>,
63 pub(crate) root_index: Index,
64 pub(crate) bucket: Arc<BucketContext>,
65}
66
67impl Tree {
68 pub fn new(store: MutRef<Store>, root_pid: u64, bucket: Arc<BucketContext>) -> Self {
69 let this = Self {
70 store,
71 root_index: Index::new(root_pid),
72 bucket,
73 };
74
75 let addr = this.bucket.table.index(root_pid).load(Acquire);
76 if addr == NULL_ADDR {
77 this.init(root_pid);
78 }
79 this
80 }
81
82 fn init(&self, root_pid: u64) {
83 let g = crossbeam_epoch::pin();
84 let mut build = self.begin_build();
85 let lsn = self.store.context.group(0).logging.lock().current_pos();
86 let node = Node::new_leaf(&mut build, self.bucket.loader(self.store.context), 0, lsn);
87 let mut page = Page::new(node);
88 let mut publish = build.into_publish(&g);
89 publish.map_to(&mut page, root_pid);
90 publish.cache_after_commit(page);
91 publish.commit();
92 }
93
94 fn txid(&self) -> u64 {
95 self.store.context.compact_safe_txid()
96 }
97
98 pub(crate) fn bucket_id(&self) -> u64 {
99 self.bucket.bucket_id
100 }
101
102 pub(crate) fn begin_build(&self) -> AllocGuard<'_> {
103 AllocGuard::new(&self.bucket)
104 }
105
106 pub(crate) fn load_node(&self, g: &Guard, pid: u64) -> Result<Option<Page>, OpCode> {
107 loop {
108 if let Some(p) = self.bucket.load(pid)? {
109 let child_pid = p.header().merging_child;
110 if child_pid != NULL_PID {
111 self.merge_node(p, child_pid, g)?;
112 continue;
113 }
114 return Ok(Some(p));
115 } else {
116 return Ok(None);
117 }
118 }
119 }
120
121 fn merge_node(&self, parent_ptr: Page, child_pid: u64, g: &Guard) -> Result<(), OpCode> {
127 let Some(_lk) = parent_ptr.try_lock() else {
129 return Ok(());
131 };
132
133 if self.bucket.table.get(parent_ptr.pid()) != parent_ptr.swip() {
134 return Ok(());
135 }
136
137 assert_ne!(child_pid, NULL_PID);
138 assert!(parent_ptr.is_intl());
139 let child_index = parent_ptr
140 .intl_iter()
141 .position(|(_, idx)| idx.pid == child_pid)
142 .unwrap();
143 assert_ne!(child_index, 0, "we can't handle merge the leftmost node");
145
146 let safe_txid = self.txid();
147 let child_ptr = if let Some(x) = self.set_node_merging(child_pid, g, safe_txid)? {
149 x
150 } else {
151 self.remove_node_index(parent_ptr, child_pid, g, safe_txid);
153 return Ok(());
154 };
155
156 let mut merge_index = child_index - 1;
158 let mut cursor_pid = parent_ptr
159 .intl_iter()
160 .nth(merge_index)
161 .map(|(_, x)| x.pid)
162 .unwrap();
163 let mut child_unmapped = false;
164
165 loop {
166 let cursor_ptr = if let Some(x) = self.load_node(g, cursor_pid)? {
167 x
168 } else {
169 if merge_index == 0 {
171 return Ok(());
172 }
173
174 merge_index -= 1;
175 cursor_pid = parent_ptr
176 .intl_iter()
177 .nth(merge_index)
178 .map(|(_, x)| x.pid)
179 .unwrap();
180 continue;
181 };
182
183 let Some(_cursor_lk) = cursor_ptr.try_lock() else {
185 continue;
186 };
187 if self.bucket.table.get(cursor_ptr.pid()) != cursor_ptr.swip() {
188 continue;
189 }
190
191 let next_pid = cursor_ptr.header().right_sibling;
192 let mut build = self.begin_build();
193 if next_pid == child_pid {
195 let (new_node, mut junks) =
196 cursor_ptr.merge_node(&mut build, &child_ptr, safe_txid);
197 child_ptr.collect_junk(|x| junks.push(x));
198 build.collect_retired(child_ptr.base_addr(), &mut junks);
199 let mut publish = build.into_publish(g);
200 publish.replace(cursor_ptr, new_node, junks);
203 publish.mark_unmap(child_pid, child_ptr.swip());
204 publish.commit();
205 child_unmapped = true;
206 break;
207 }
208 let hi = cursor_ptr.hi();
209 let lo = child_ptr.lo();
210 if hi >= Some(lo) {
211 break;
213 } else {
214 if next_pid != NULL_PID {
216 cursor_pid = next_pid
217 } else {
218 break;
220 }
221 }
222 }
223
224 debug_assert_eq!(child_ptr.box_header().pid, child_pid);
229 if !child_unmapped {
230 self.begin_build().mark_unmap(child_pid, child_ptr.swip()); }
232 self.bucket.evict_cache(child_pid);
233 g.defer(move || child_ptr.reclaim());
234
235 self.remove_node_index(parent_ptr, child_pid, g, safe_txid);
237
238 self.store
239 .opt
240 .observer
241 .counter(CounterMetric::TreeNodeMerge, 1);
242
243 Ok(())
244 }
245
246 fn remove_node_index(&self, parent_ptr: Page, child_pid: u64, g: &Guard, safe_txid: u64) {
248 debug_assert_eq!(parent_ptr.header().merging_child, child_pid);
249
250 let mut build = self.begin_build();
251 let (new_ptr, junks) = parent_ptr.process_merge(&mut build, MergeOp::Merged, safe_txid);
252 let mut publish = build.into_publish(g);
253 publish.replace(parent_ptr, new_ptr, junks);
254 publish.commit();
255 }
256
257 fn set_node_merging(
263 &self,
264 child_pid: u64,
265 g: &Guard,
266 safe_txid: u64,
267 ) -> Result<Option<Page>, OpCode> {
268 let page = if let Some(x) = self.load_node(g, child_pid)? {
269 x
270 } else {
271 return Ok(None);
272 };
273 if page.header().merging {
274 return Ok(Some(page));
275 }
276 let _lk = page.lock();
277 if self.bucket.table.get(page.pid()) != page.swip() {
278 return Err(OpCode::Again);
279 }
280 let mut build = self.begin_build();
281 let (new_node, junks) = page.process_merge(&mut build, MergeOp::MarkChild, safe_txid);
282 let mut publish = build.into_publish(g);
283 let new_page = publish.replace(page, new_node, junks);
284 publish.commit();
285 Ok(Some(new_page))
286 }
287
288 fn split_node(&self, node: Page, parent_opt: Option<Page>, g: &Guard) -> Result<(), OpCode> {
297 let Some(node_lock) = node.try_lock() else {
298 return Err(OpCode::Again);
299 };
300 if self.bucket.table.get(node.pid()) != node.swip() {
301 return Err(OpCode::Again);
302 }
303 let safe_txid = self.txid();
304 let mut build = self.begin_build();
305 let (mut lnode, rnode) = node.split(&mut build);
307 let mut rpage = Page::new(rnode);
308
309 let mut publish = build.into_publish(g);
311 let rpid = publish.map(&mut rpage);
312 lnode.header_mut().right_sibling = rpid;
313
314 let junk = Junk::new();
316 let lpage = publish.replace(node, lnode, junk);
317 publish.cache_after_commit(rpage);
318 drop(node_lock);
320 publish.commit();
322
323 let lo = rpage.lo();
324 if let Some(parent) = parent_opt {
325 let _lk = parent.lock();
327 if self.bucket.table.get(parent.pid()) != parent.swip() {
328 return Ok(());
330 }
331 let mut build = self.begin_build();
333 let Some((new_node, junk)) = parent.insert_index(&mut build, lo, rpid, safe_txid)
334 else {
335 return Ok(());
337 };
338 let mut publish = build.into_publish(g);
339 publish.replace(parent, new_node, junk);
340 publish.commit();
342 self.store
343 .opt
344 .observer
345 .counter(CounterMetric::TreeNodeSplit, 1);
346 } else {
347 self.split_root(g, lpage, rpid, lo, safe_txid)?;
349 }
350
351 Ok(())
352 }
353
354 fn split_root(
355 &self,
356 g: &Guard,
357 root: Page,
358 rpid: u64,
359 lo: &[u8],
360 safe_txid: u64,
361 ) -> Result<(), OpCode> {
362 let _lk = root.lock();
363 if self.bucket.table.get(root.pid()) != root.swip() {
364 return Err(OpCode::Again);
365 };
366 let mut build = self.begin_build();
367 let lpid = build.reserve_pid(); let (mut lnode, junk) = root.compact(&mut build, safe_txid);
371 lnode.header_mut().right_sibling = rpid;
372 let (group, lsn) = lnode.get_group_lsn();
373 let mut lpage = Page::new(lnode);
374
375 let new_root_node = Node::new_root(
376 &mut build,
377 self.bucket.loader(self.store.context),
378 &[
379 (IntlKey::new([].as_slice()), Index::new(lpid)),
380 (IntlKey::new(lo), Index::new(rpid)),
381 ],
382 group,
383 lsn,
384 );
385 let mut publish = build.into_publish(g);
386 publish.map_to(&mut lpage, lpid);
387 let n = publish.replace(root, new_root_node, junk);
388 assert_eq!(n.box_header().pid, self.root_index.pid);
389 publish.cache_after_commit(lpage);
390 publish.commit();
392 self.store
393 .opt
394 .observer
395 .counter(CounterMetric::TreeNodeSplit, 1);
396 Ok(())
397 }
398
399 fn find_leaf(&self, g: &Guard, k: &[u8]) -> Result<Page, OpCode> {
400 loop {
401 match self.try_find_leaf(g, k) {
402 Err(OpCode::Again) => {
403 g.flush();
404 continue;
405 }
406 Err(e) => unreachable!("invalid opcode {:?}", e),
407 o => return o,
408 }
409 }
410 }
411
412 fn try_find_leaf(&self, g: &Guard, key: &[u8]) -> Result<Page, OpCode> {
413 let mut cursor = self.root_index.pid;
414 let mut parent_opt: Option<Page> = None;
415 let mut unsplit_parent_opt: Option<Page> = None;
416 let mut leftmost = false;
417
418 loop {
419 let node_ptr = if let Some(x) = self.load_node(g, cursor)? {
420 x
421 } else {
422 return Err(OpCode::Again);
423 };
424
425 if node_ptr.header().merging {
426 return Err(OpCode::Again);
427 }
428
429 let lo = node_ptr.lo();
431 if key < lo {
432 return Err(OpCode::Again);
433 }
434
435 if node_ptr.should_split(self.bucket.opt.split_elems) {
436 self.split_node(node_ptr, parent_opt, g)?;
437 return Err(OpCode::Again);
438 }
439
440 let hi = node_ptr.hi();
442 let is_splitting = if let Some(hi) = hi { key >= hi } else { false };
443
444 if is_splitting {
445 let rpid = node_ptr.header().right_sibling;
447 assert_ne!(rpid, NULL_PID);
448
449 if unsplit_parent_opt.is_none() && parent_opt.is_some() {
450 unsplit_parent_opt = parent_opt;
451 } else if parent_opt.is_none() && lo.is_empty() {
452 assert_eq!(cursor, self.root_index.pid);
456 let safe_txid = self.txid();
457 let _ = self.split_root(g, node_ptr, rpid, hi.unwrap(), safe_txid);
458 return Err(OpCode::Again);
459 }
460 cursor = rpid;
461
462 continue;
463 }
464
465 if let Some(unsplit) = unsplit_parent_opt.take() {
467 let mut build = self.begin_build();
468 let _lk = unsplit.lock();
469 if self.bucket.table.get(unsplit.pid()) != unsplit.swip() {
470 return Err(OpCode::Again);
472 }
473
474 let Some((split_node, junk)) =
476 unsplit.insert_index(&mut build, lo, cursor, self.txid())
477 else {
478 return Err(OpCode::Again);
479 };
480 let mut publish = build.into_publish(g);
481 publish.replace(unsplit, split_node, junk);
482 publish.commit();
483 self.store
484 .opt
485 .observer
486 .counter(CounterMetric::TreeNodeSplit, 1);
487 }
488
489 if !leftmost
490 && let Some(parent) = parent_opt
491 && node_ptr.should_merge()
492 {
493 self.try_merge(g, parent, node_ptr)?;
494 return Err(OpCode::Again);
495 }
496
497 if node_ptr.is_intl() {
498 assert_eq!(node_ptr.delta_len(), 0);
499 let (is_leftmost, pid) = node_ptr.child_index(key);
500 leftmost = is_leftmost;
501 parent_opt = Some(node_ptr);
502 cursor = pid;
503 } else {
504 if node_ptr.delta_len() >= self.bucket.opt.consolidate_threshold as usize {
505 self.try_compact(g, node_ptr);
506 continue;
508 }
509 return Ok(node_ptr);
510 }
511 }
512 }
513
514 fn find_prev_leaf(&self, g: &Guard, key: &[u8]) -> Result<Option<Page>, OpCode> {
515 let mut cursor = self.root_index.pid;
516 let mut path: Vec<(u64, u64, usize)> = Vec::new();
517
518 loop {
519 let Some(node) = self.load_node(g, cursor)? else {
520 return Err(OpCode::Again);
521 };
522 if node.header().merging {
523 return Err(OpCode::Again);
524 }
525 if key < node.lo() {
526 return Err(OpCode::Again);
527 }
528 if let Some(hi) = node.hi()
529 && key >= hi
530 {
531 return Err(OpCode::Again);
532 }
533
534 if !node.is_intl() {
535 break;
536 }
537
538 let sst = node.sst::<IntlKey>();
539 let pos = match sst.search_by(&IntlKey::new(key), |x, y| x.raw.cmp(y.raw)) {
540 Ok(pos) => pos,
541 Err(pos) => pos.max(1) - 1,
542 };
543 let (_, idx) = sst.kv_at::<Index>(pos);
544 path.push((node.pid(), node.swip(), pos));
545 cursor = idx.pid;
546 }
547
548 while let Some((parent_pid, parent_swip, child_pos)) = path.pop() {
549 if child_pos == 0 {
550 continue;
551 }
552
553 let Some(parent) = self.load_node(g, parent_pid)? else {
554 return Err(OpCode::Again);
555 };
556 if parent.swip() != parent_swip {
557 return Err(OpCode::Again);
558 }
559 if !parent.is_intl() {
560 return Err(OpCode::Again);
561 }
562
563 let sst = parent.sst::<IntlKey>();
564 if child_pos >= parent.header().elems as usize {
565 return Err(OpCode::Again);
566 }
567 let (_, idx) = sst.kv_at::<Index>(child_pos - 1);
568 let mut pid = idx.pid;
569
570 loop {
571 let Some(node) = self.load_node(g, pid)? else {
572 return Err(OpCode::Again);
573 };
574 if node.header().merging {
575 return Err(OpCode::Again);
576 }
577 if let Some(hi) = node.hi()
578 && key > hi
579 {
580 let rpid = node.header().right_sibling;
581 if rpid == NULL_PID {
582 return Err(OpCode::Again);
583 }
584 pid = rpid;
585 continue;
586 }
587 if key <= node.lo() {
588 return Err(OpCode::Again);
589 }
590 if !node.is_intl() {
591 return Ok(Some(node));
592 }
593
594 let elems = node.header().elems as usize;
595 if elems == 0 {
596 return Err(OpCode::Again);
597 }
598 let (_, rightmost) = node.sst::<IntlKey>().kv_at::<Index>(elems - 1);
599 pid = rightmost.pid;
600 }
601 }
602
603 Ok(None)
604 }
605
606 fn try_compact(&self, g: &Guard, page: Page) {
607 let _lk = page.lock();
608 if self.bucket.table.get(page.pid()) != page.swip() {
609 return;
610 };
611
612 let mut build = self.begin_build();
614 let (new_node, junk) = page.compact(&mut build, self.txid());
615 let mut publish = build.into_publish(g);
616 publish.replace(page, new_node, junk);
617 publish.commit();
618 self.store
619 .opt
620 .observer
621 .counter(CounterMetric::TreeNodeConsolidate, 1);
622 }
623
624 pub(crate) fn try_scavenge(&self, pid: u64, g: &Guard) -> Result<bool, OpCode> {
625 let page = if let Some(p) = self.load_node(g, pid)? {
626 p
627 } else {
628 return Ok(false);
629 };
630
631 let h = page.header();
632 if h.merging || h.merging_child != NULL_ADDR {
633 return Ok(false);
634 }
635
636 let safe_txid = self.txid();
637 let delta_len = page.delta_len();
638 let threshold = self.bucket.opt.consolidate_threshold as usize;
639
640 if delta_len >= threshold {
641 self.try_compact(g, page);
642 return Ok(true);
643 }
644
645 if page.ref_node().has_garbage(safe_txid) {
646 self.try_compact(g, page);
647 return Ok(true);
648 }
649
650 Ok(false)
651 }
652
653 fn try_merge(&self, g: &Guard, parent: Page, cur: Page) -> Result<(), OpCode> {
654 let Some(lk) = parent.try_lock() else {
655 return Err(OpCode::Again);
656 };
657 if self.bucket.table.get(parent.pid()) != parent.swip() {
658 return Err(OpCode::Again);
659 }
660 if parent.header().merging_child != NULL_ADDR {
661 return Err(OpCode::Again);
662 }
663 let pid = cur.pid();
664
665 if parent.can_merge_child(cur.lo(), pid) {
666 let mut build = self.begin_build();
667 let (new_parent, j) =
668 parent.process_merge(&mut build, MergeOp::MarkParent(pid), self.txid());
669 let mut publish = build.into_publish(g);
670 let new_page = publish.replace(parent, new_parent, j);
671 publish.commit();
672 drop(lk);
673 self.merge_node(new_page, pid, g)?;
674 }
675 Ok(())
676 }
677
678 fn link<F>(
679 &self,
680 _g: &Guard,
681 page: Page,
682 k: &Key,
683 v: &Record,
684 mut check: F,
685 ) -> Result<(), OpCode>
686 where
687 F: FnMut(Page, &Key) -> Result<(u8, Position), OpCode>,
688 {
689 loop {
690 let Some(node) = page.try_lock() else {
691 continue;
692 };
693 let lock_started = sampled_instant(k.txid(), LATENCY_SAMPLE_SHIFT);
694 let pid = page.pid();
695 if self.bucket.table.get(pid) != page.swip() {
697 observe_elapsed(
698 self.store.opt.observer.as_ref(),
699 HistogramMetric::TreeLinkHoldMicros,
700 lock_started,
701 );
702 return Err(OpCode::Again);
703 };
704
705 let (group, pos) = check(page, k)?;
706 let mut build = self.begin_build();
707 let (k, v) = DeltaView::from_key_val(&mut build, k, v, group, pos);
708
709 let addr = node.insert(k, v);
710 build.mark_dirty(pid, addr);
711 observe_elapsed(
712 self.store.opt.observer.as_ref(),
713 HistogramMetric::TreeLinkHoldMicros,
714 lock_started,
715 );
716 drop(node);
717 return Ok(());
718 }
719 }
720
721 fn try_put(&self, g: &Guard, key: &Key, val: &Record) -> Result<(), OpCode> {
722 let page = self.find_leaf(g, key.raw())?;
723
724 self.link(g, page, key, val, |_, _| Ok((0, Position::default())))?;
726 Ok(())
727 }
728
729 pub fn put(&self, g: &Guard, key: Key, val: Record) -> Result<(), OpCode> {
731 loop {
732 match self.try_put(g, &key, &val) {
733 Ok(_) => return Ok(()),
734 Err(OpCode::Again) => {
735 self.store
736 .opt
737 .observer
738 .counter(CounterMetric::TreeRetryAgain, 1);
739 g.flush();
740 continue;
741 }
742 Err(e) => return Err(e),
743 }
744 }
745 }
746
747 fn try_update<F>(
748 &self,
749 g: &Guard,
750 key: &Key,
751 val: &Record,
752 visible: &mut F,
753 ) -> Result<Option<LatestValMeta>, OpCode>
754 where
755 F: FnMut(&Option<LatestValMeta>) -> Result<(u8, Position), OpCode>,
756 {
757 let page = self.find_leaf(g, key.raw)?;
758 let mut r = None;
759
760 self.link(g, page, key, val, |pg, k| {
761 let tmp = pg.find_latest_meta(k);
762 r = tmp.map(|meta| LatestValMeta {
763 ver: meta.ver,
764 group_id: meta.group_id,
765 is_del: meta.is_del,
766 });
767 visible(&r)
768 })?;
769
770 Ok(r)
771 }
772
773 pub fn update<F>(
775 &self,
776 g: &Guard,
777 key: Key,
778 val: Record,
779 mut visible: F,
780 ) -> Result<Option<LatestValMeta>, OpCode>
781 where
782 F: FnMut(&Option<LatestValMeta>) -> Result<(u8, Position), OpCode>,
783 {
784 let ksz = key.packed_size();
785 if ksz > Options::MAX_KEY_SIZE || ksz + val.packed_size() > Options::MAX_KV_SIZE {
786 return Err(OpCode::TooLarge);
787 }
788 loop {
789 match self.try_update(g, &key, &val, &mut visible) {
790 Ok(x) => return Ok(x),
791 Err(OpCode::Again) => {
792 self.store
793 .opt
794 .observer
795 .counter(CounterMetric::TreeRetryAgain, 1);
796 self.store
797 .opt
798 .observer
799 .counter(CounterMetric::TxnRetryAgain, 1);
800 g.flush();
801 continue;
802 }
803 Err(e) => return Err(e),
804 }
805 }
806 }
807
808 pub(crate) fn remove_aborted(&self, g: &Guard, raw: &[u8]) -> Result<bool, OpCode> {
811 let page = self.find_leaf(g, raw)?;
812 let Some(_lk) = page.try_lock() else {
813 return Err(OpCode::Again);
814 };
815 if self.bucket.table.get(page.pid()) != page.swip() {
816 return Err(OpCode::Again);
817 }
818 self.rewrite_node(g, page)
819 }
820
821 pub(crate) fn remove_aborted_head(
824 &self,
825 g: &Guard,
826 raw: &[u8],
827 aborted_txid: u64,
828 ) -> Result<bool, OpCode> {
829 let page = self.find_leaf(g, raw)?;
830 let Some(_lk) = page.try_lock() else {
831 return Err(OpCode::Again);
832 };
833 if self.bucket.table.get(page.pid()) != page.swip() {
834 return Err(OpCode::Again);
835 }
836
837 let Some((head_ver, _, _)) = page.find_latest(&Key::new(raw, Ver::new(u64::MAX, u32::MAX)))
838 else {
839 return Ok(false);
840 };
841 if head_ver.txid != aborted_txid {
842 return Ok(false);
843 }
844 if self.store.context.get_aborted(aborted_txid) != Some(TxOutcome::Aborted) {
845 return Ok(false);
846 }
847
848 self.rewrite_node(g, page)
849 }
850
851 #[inline]
852 fn rewrite_node(&self, g: &Guard, page: Page) -> Result<bool, OpCode> {
853 let mut build = self.begin_build();
854 let (new_node, junk, removed) =
855 page.remove_aborted(&mut build, self.txid(), self.store.context);
856 if !removed {
857 return Ok(false);
858 }
859 let mut publish = build.into_publish(g);
860 publish.replace(page, new_node, junk);
861 publish.commit();
862 Ok(true)
863 }
864
865 pub fn get<'b>(&'b self, g: &Guard, key: Key<'b>) -> Result<(Key<'b>, ValRef), OpCode> {
868 let page = self.find_leaf(g, key.raw())?;
869
870 let Some((ver, v, b)) = page.find_latest(&key) else {
871 return Err(OpCode::NotFound);
872 };
873
874 Ok((Key::new(key.raw, ver), ValRef::new(v, b)))
875 }
876
877 pub fn range<'a, K, R, F>(&'a self, range: R, visible: F) -> Iter<'a>
878 where
879 K: AsRef<[u8]>,
880 R: RangeBounds<K>,
881 F: FnMut(&Context, u64, u8) -> bool + 'a,
882 {
883 let cached_key = Handle::new(Vec::new());
884 let lo = match range.start_bound() {
885 Bound::Included(b) => Bound::Included(b.as_ref().to_vec()),
886 Bound::Excluded(b) => Bound::Excluded(b.as_ref().to_vec()),
887 Bound::Unbounded => Bound::Included(vec![]),
888 };
889 let hi = match range.end_bound() {
890 Bound::Included(e) => Bound::Included(e.as_ref().to_vec()),
891 Bound::Excluded(e) => Bound::Excluded(e.as_ref().to_vec()),
892 Bound::Unbounded => Bound::Unbounded,
893 };
894
895 Iter {
896 tree: self,
897 cached_key,
898 lo,
899 hi,
900 iter: None,
901 rev_iter: None,
902 cache: None,
903 iter_bound: None,
904 checker: Box::new(visible),
905 filter: Filter { has_last: false },
906 guard: crossbeam_epoch::pin(),
907 }
908 }
909
910 fn traverse_hist<L, F>(
911 &self,
912 l: &L,
913 start_ts: u64,
914 hist: HistRef,
915 visible: &mut F,
916 ) -> Result<ValRef, OpCode>
917 where
918 L: ILoader,
919 F: FnMut(u64, u8) -> bool,
920 {
921 let mut addr = hist.page_addr;
922 let mut pos = hist.slot as usize;
923 let mut remaining = hist.count as usize;
924 let mut first_segment = true;
925 let target = Ver::new(start_ts, NULL_CMD);
926
927 while addr != NULL_PID && remaining > 0 {
928 let page = l.load_sibling(addr)?;
929 let ptr = page.view().as_base();
930 let sst = ptr.sst::<Ver>();
931 let elems = sst.header().elems as usize;
932 if pos >= elems {
933 addr = ptr.box_header().link;
934 pos = 0;
935 continue;
936 }
937
938 let mut page_end = elems.min(pos.saturating_add(remaining));
941 if first_segment {
942 first_segment = false;
943 let begin = Self::lower_bound_hist_subrange(&sst, pos, page_end, &target);
944 let skipped = begin - pos;
945 pos = begin;
946 remaining = remaining.saturating_sub(skipped);
947 page_end = elems.min(pos.saturating_add(remaining));
948 }
949
950 while pos < page_end && remaining > 0 {
951 let (k, v) = sst.kv_at::<Val>(pos);
952 if visible(k.txid, v.group_id()) {
953 if v.is_tombstone() {
954 return Err(OpCode::NotFound);
955 }
956 let (v, r) = v.get_record(l);
957 return Ok(ValRef::new(v, r.unwrap_or(page)));
958 }
959 pos += 1;
960 remaining -= 1;
961 }
962
963 if remaining == 0 {
964 break;
965 }
966 addr = ptr.box_header().link;
967 pos = 0;
968 }
969 Err(OpCode::NotFound)
970 }
971
972 fn lower_bound_hist_subrange(
973 sst: &crate::types::sst::Sst<Ver>,
974 mut lo: usize,
975 mut hi: usize,
976 target: &Ver,
977 ) -> usize {
978 while lo < hi {
979 let mid = lo + ((hi - lo) >> 1);
980 let key = sst.key_at(mid);
981 if key.cmp(target).is_lt() {
982 lo = mid + 1;
983 } else {
984 hi = mid;
985 }
986 }
987 lo
988 }
989
990 pub fn traverse<F>(&self, g: &Guard, key: Key, mut visible: F) -> Result<ValRef, OpCode>
991 where
992 F: FnMut(u64, u8) -> bool,
993 {
994 let page = self.find_leaf(g, key.raw)?;
995
996 let mut result = None;
997 let search_key = Key::new(key.raw, Ver::new(u64::MAX, u32::MAX));
998 page.visit_versions(
999 search_key,
1000 |x, y| {
1001 let k = Key::decode_from(x.key());
1002 match k.raw.cmp(y.raw) {
1003 Equal => y.txid.cmp(&k.txid), o => o,
1005 }
1006 },
1007 |x| {
1008 let k = Key::decode_from(x.key());
1009 if k.raw.cmp(key.raw).is_ne() {
1010 return true;
1011 }
1012 let val = x.val();
1013 if visible(k.txid, val.group_id()) {
1014 if val.is_tombstone() {
1015 result = Some(Err(OpCode::NotFound));
1016 return true;
1017 }
1018 let (r, v) = val.get_record(&page.loader);
1019 result = Some(Ok(ValRef::new(r, v.unwrap_or_else(|| x.as_box()))));
1020 return true;
1021 }
1022 false
1023 },
1024 );
1025
1026 if let Some(res) = result {
1027 return res;
1028 }
1029
1030 let (k, val) = page.search_sst(&key).ok_or(OpCode::NotFound)?;
1032 if visible(k.txid, val.group_id()) {
1033 if val.is_tombstone() {
1034 return Err(OpCode::NotFound);
1035 }
1036 let (record, r) = val.get_record(&page.loader);
1037 return Ok(ValRef::new(record, r.unwrap_or_else(|| page.base_box())));
1038 }
1039 if let Some(hist) = val.get_hist() {
1040 return self.traverse_hist(&page.loader, key.txid, hist, &mut visible);
1041 }
1042 Err(OpCode::NotFound)
1043 }
1044}
1045
1046pub struct Iter<'a> {
1048 tree: &'a Tree,
1049 cached_key: Handle<Vec<u8>>,
1050 lo: Bound<Vec<u8>>,
1051 hi: Bound<Vec<u8>>,
1052 iter: Option<RawLeafIter<'a, Loader>>,
1053 rev_iter: Option<RawLeafRevIter<'a, Loader>>,
1054 cache: Option<Box<Node>>,
1055 iter_bound: Option<Box<Bound<Vec<u8>>>>,
1056 checker: Box<dyn FnMut(&Context, u64, u8) -> bool + 'a>,
1057 filter: Filter,
1058 guard: Guard,
1059}
1060
1061impl Drop for Iter<'_> {
1062 fn drop(&mut self) {
1063 self.iter.take();
1065 self.rev_iter.take();
1066 self.cache.take();
1067 self.iter_bound.take();
1068 self.cached_key.reclaim();
1069 }
1070}
1071
1072impl Iter<'_> {
1073 fn low_key(&self) -> &[u8] {
1074 match self.lo {
1075 Bound::Unbounded => &[],
1076 Bound::Excluded(ref x) | Bound::Included(ref x) => x,
1077 }
1078 }
1079
1080 fn high_key(&self) -> Option<&[u8]> {
1081 match self.hi {
1082 Bound::Unbounded => None,
1083 Bound::Excluded(ref x) | Bound::Included(ref x) => Some(x),
1084 }
1085 }
1086
1087 fn collapsed(&self) -> bool {
1088 match (&self.lo, &self.hi) {
1089 (Bound::Included(b), Bound::Included(e))
1090 | (Bound::Excluded(b), Bound::Excluded(e))
1091 | (Bound::Included(b), Bound::Excluded(e))
1092 | (Bound::Excluded(b), Bound::Included(e)) => b > e,
1093 _ => false,
1094 }
1095 }
1096
1097 fn find_leaf_for_next_back(&self) -> Result<Page, OpCode> {
1098 if let Some(k) = self.high_key() {
1099 let node = self.tree.find_leaf(&self.guard, k)?;
1100 if matches!(self.hi, Bound::Excluded(_)) && node.lo() >= k {
1101 return self
1102 .tree
1103 .find_prev_leaf(&self.guard, k)?
1104 .ok_or(OpCode::NotFound);
1105 }
1106 return Ok(node);
1107 }
1108
1109 let mut node = self.tree.find_leaf(&self.guard, self.low_key())?;
1110 loop {
1111 let rpid = node.header().right_sibling;
1112 if rpid == NULL_PID {
1113 return Ok(node);
1114 }
1115 let Some(next) = self.tree.load_node(&self.guard, rpid)? else {
1116 return Err(OpCode::Again);
1117 };
1118 node = next;
1119 }
1120 }
1121
1122 fn get_next(&mut self) -> Option<<Self as Iterator>::Item> {
1123 self.rev_iter.take();
1124
1125 'retry: while !self.collapsed() {
1126 if self.iter.is_none() {
1127 let node = match self.tree.find_leaf(&self.guard, self.low_key()) {
1128 Ok(node) => node,
1129 Err(OpCode::Again) => {
1130 self.guard.flush();
1131 continue;
1132 }
1133 Err(OpCode::NotFound) => return None,
1134 Err(e) => panic!("iter find_leaf failed: {e:?}"),
1135 };
1136 let next_node = node.ref_node();
1137 let next_bound = self.lo.clone();
1138
1139 if let Some(cache) = self.cache.as_mut() {
1140 **cache = next_node;
1141 } else {
1142 self.cache = Some(Box::new(next_node));
1143 }
1144
1145 if let Some(bound) = self.iter_bound.as_mut() {
1146 **bound = next_bound;
1147 } else {
1148 self.iter_bound = Some(Box::new(next_bound));
1149 }
1150
1151 let cache = self.cache.as_ref().expect("must valid");
1152 let bound = self.iter_bound.as_ref().expect("must valid");
1153 self.iter = Some(unsafe {
1154 std::mem::transmute::<RawLeafIter<'_, Loader>, RawLeafIter<'_, Loader>>(
1155 cache.successor(bound.as_ref(), self.cached_key),
1156 )
1157 });
1158 }
1159
1160 let r = loop {
1161 let next = {
1162 let iter = self.iter.as_mut().expect("must valid");
1163 iter.try_next()
1164 };
1165 match next {
1166 Ok(Some(item)) => {
1167 let ok = match &self.lo {
1168 Bound::Unbounded => true,
1169 Bound::Included(b) => item.cmp_key(b.as_slice()).is_ge(),
1170 Bound::Excluded(b) => item.cmp_key(b.as_slice()).is_gt(),
1171 };
1172 if ok
1173 && (self.checker)(
1174 &self.tree.store.context,
1175 item.txid(),
1176 item.group_id(),
1177 )
1178 && self.filter.check(&item)
1179 {
1180 break Some(item);
1181 }
1182 }
1183 Ok(None) => break None,
1184 Err(OpCode::Again | OpCode::NotFound) => {
1185 self.iter.take();
1186 continue 'retry;
1187 }
1188 Err(e) => panic!("iter load failed: {e:?}"),
1189 }
1190 };
1191
1192 if let Some(item) = r {
1193 let key = item.key();
1195 match &mut self.lo {
1196 Bound::Included(v) | Bound::Excluded(v) => {
1197 v.clear();
1198 v.extend_from_slice(key);
1199 self.lo = Bound::Excluded(std::mem::take(v));
1201 }
1202 Bound::Unbounded => {
1203 self.lo = Bound::Excluded(key.to_vec());
1204 }
1205 }
1206
1207 match self.hi {
1208 Bound::Unbounded => return Some(item),
1209 Bound::Included(ref h) if item.cmp_key(h.as_slice()).is_le() => {
1210 return Some(item);
1211 }
1212 Bound::Excluded(ref h) if item.cmp_key(h.as_slice()).is_lt() => {
1213 return Some(item);
1214 }
1215 _ => return None,
1216 }
1217 } else {
1218 self.iter.take();
1219 let node = self.cache.as_ref().expect("must valid");
1220 if let Some(hi) = node.hi() {
1221 self.lo = Bound::Included(hi.to_vec());
1222 continue;
1223 }
1224 break;
1225 }
1226 }
1227
1228 None
1229 }
1230}
1231
1232impl<'a> Iterator for Iter<'a> {
1233 type Item = IterItem<'a, Loader>;
1234
1235 fn next(&mut self) -> Option<Self::Item> {
1236 self.get_next()
1237 }
1238}
1239
1240impl<'a> DoubleEndedIterator for Iter<'a> {
1241 fn next_back(&mut self) -> Option<Self::Item> {
1242 self.iter.take();
1243
1244 'retry: while !self.collapsed() {
1245 if self.rev_iter.is_none() {
1246 let node = match self.find_leaf_for_next_back() {
1247 Ok(node) => node,
1248 Err(OpCode::Again) => {
1249 self.guard.flush();
1250 continue;
1251 }
1252 Err(OpCode::NotFound) => return None,
1253 Err(e) => panic!("iter find_leaf failed: {e:?}"),
1254 };
1255 let next_node = node.ref_node();
1256 if let Some(cache) = self.cache.as_mut() {
1257 **cache = next_node;
1258 } else {
1259 self.cache = Some(Box::new(next_node));
1260 }
1261 self.rev_iter = Some(unsafe {
1262 std::mem::transmute::<RawLeafRevIter<'_, Loader>, RawLeafRevIter<'_, Loader>>(
1263 self.cache.as_ref().expect("must valid").predecessor(
1264 &self.lo,
1265 &self.hi,
1266 self.cached_key,
1267 ),
1268 )
1269 });
1270 }
1271
1272 let res = loop {
1273 let next = {
1274 let iter = self.rev_iter.as_mut().expect("must valid");
1275 iter.try_next_back()
1276 };
1277 match next {
1278 Ok(Some(item)) => {
1279 let lo_ok = match &self.lo {
1280 Bound::Unbounded => true,
1281 Bound::Included(b) => item.cmp_key(b.as_slice()).is_ge(),
1282 Bound::Excluded(b) => item.cmp_key(b.as_slice()).is_gt(),
1283 };
1284 let hi_ok = match &self.hi {
1285 Bound::Unbounded => true,
1286 Bound::Included(h) => item.cmp_key(h.as_slice()).is_le(),
1287 Bound::Excluded(h) => item.cmp_key(h.as_slice()).is_lt(),
1288 };
1289 if lo_ok
1290 && hi_ok
1291 && (self.checker)(
1292 &self.tree.store.context,
1293 item.txid(),
1294 item.group_id(),
1295 )
1296 && self.filter.check(&item)
1297 {
1298 break Some(item);
1299 }
1300 }
1301 Ok(None) => break None,
1302 Err(OpCode::Again | OpCode::NotFound) => {
1303 self.rev_iter.take();
1304 continue 'retry;
1305 }
1306 Err(e) => panic!("iter load failed: {e:?}"),
1307 }
1308 };
1309
1310 if let Some(item) = res {
1311 let key = item.key();
1312 match &mut self.hi {
1313 Bound::Included(v) | Bound::Excluded(v) => {
1314 v.clear();
1315 v.extend_from_slice(key);
1316 self.hi = Bound::Excluded(std::mem::take(v));
1317 }
1318 Bound::Unbounded => {
1319 self.hi = Bound::Excluded(key.to_vec());
1320 }
1321 }
1322 return Some(item);
1323 }
1324
1325 self.rev_iter.take();
1326 let lo = self.cache.as_ref().expect("must valid").lo();
1327 if lo.is_empty() {
1328 return None;
1329 }
1330 self.hi = Bound::Excluded(lo.to_vec());
1331 }
1332
1333 None
1334 }
1335}
1336
1337struct Filter {
1338 has_last: bool,
1339}
1340
1341impl Filter {
1342 fn check<L: ILoader>(&mut self, item: &IterItem<L>) -> bool {
1343 if self.has_last && item.cmp_key(item.key()).is_eq() {
1345 return false;
1346 }
1347 let _ = item.assembled_key();
1348 self.has_last = true;
1349 !item.is_tombstone()
1350 }
1351}
1352
1353#[cfg(test)]
1354mod test {
1355 use crate::{BucketOptions, Mace, Options, RandomPath};
1356 use std::thread;
1357
1358 #[test]
1359 fn concurrent_page_hit() {
1360 let path = RandomPath::tmp();
1361 let mut opt = Options::new(&*path);
1362 opt.tmp_store = true;
1363 let mace = Mace::new(opt.validate().unwrap()).unwrap();
1364 let db = mace
1365 .new_bucket(
1366 "default",
1367 BucketOptions {
1368 split_elems: 256,
1369 ..BucketOptions::default()
1370 },
1371 )
1372 .unwrap();
1373
1374 let num_readers = 4;
1375 let num_iterations = 1000;
1376
1377 thread::scope(|s| {
1378 for _ in 0..num_readers {
1379 let db = db.clone();
1380 s.spawn(move || {
1381 for _ in 0..num_iterations {
1382 let view = db.view().unwrap();
1383 let mut count = 0;
1384 for _ in view.seek("key") {
1385 count += 1;
1386 }
1387 assert!(count >= 0);
1388 }
1389 });
1390 }
1391
1392 s.spawn(|| {
1393 for i in 0..num_iterations {
1394 let kv = db.begin().unwrap();
1395 let key = format!("key_{:05}", i);
1396 kv.put(&key, &key).unwrap();
1397 kv.commit().unwrap();
1398 }
1399 });
1400 });
1401 }
1402}