Skip to main content

mace/index/
tree.rs

1use crate::Options;
2use crate::cc::context::{Context, TxOutcome};
3use crate::map::buffer::BucketContext;
4use crate::map::publish::AllocGuard;
5use crate::map::{Loader, Node, Page};
6use crate::types::data::{HistRef, IterItem, Record, Val};
7use crate::types::node::{Junk, MergeOp, RawLeafIter, RawLeafRevIter};
8use crate::types::refbox::DeltaView;
9use crate::types::traits::{IAsBoxRef, IBoxHeader, IDecode, IHeader, ILoader};
10use crate::utils::data::Position;
11use crate::utils::observe::{
12    CounterMetric, HistogramMetric, LATENCY_SAMPLE_SHIFT, observe_elapsed, sampled_instant,
13};
14use crate::utils::{Handle, MutRef, NULL_ADDR, OpCode};
15use crate::{
16    Store,
17    types::{
18        data::{Index, IntlKey, Key, Ver},
19        refbox::BoxRef,
20        traits::{ICodec, IKey},
21    },
22    utils::{NULL_CMD, NULL_PID},
23};
24use crossbeam_epoch::Guard;
25use std::cmp::Ordering::Equal;
26use std::ops::{Bound, RangeBounds};
27use std::sync::Arc;
28use std::sync::atomic::Ordering::Acquire;
29
30/// A reference to a value in the storage engine.
31#[derive(Clone)]
32pub struct ValRef {
33    raw: Record,
34    _owner: BoxRef,
35}
36
37#[derive(Clone, Copy)]
38pub(crate) struct LatestValMeta {
39    pub(crate) ver: Ver,
40    pub(crate) group_id: u8,
41    pub(crate) is_del: bool,
42}
43
44impl ValRef {
45    pub(crate) fn new(raw: Record, owner: BoxRef) -> Self {
46        Self { raw, _owner: owner }
47    }
48
49    /// Returns the data as a byte slice.
50    pub fn slice(&self) -> &[u8] {
51        self.raw.data()
52    }
53
54    /// Converts the reference into a owned Vec<u8>.
55    pub fn to_vec(self) -> Vec<u8> {
56        self.raw.data().to_vec()
57    }
58}
59
60#[derive(Clone)]
61pub struct Tree {
62    pub(crate) store: MutRef<Store>,
63    pub(crate) root_index: Index,
64    pub(crate) bucket: Arc<BucketContext>,
65}
66
67impl Tree {
68    pub fn new(store: MutRef<Store>, root_pid: u64, bucket: Arc<BucketContext>) -> Self {
69        let this = Self {
70            store,
71            root_index: Index::new(root_pid),
72            bucket,
73        };
74
75        let addr = this.bucket.table.index(root_pid).load(Acquire);
76        if addr == NULL_ADDR {
77            this.init(root_pid);
78        }
79        this
80    }
81
82    fn init(&self, root_pid: u64) {
83        let g = crossbeam_epoch::pin();
84        let mut build = self.begin_build();
85        let lsn = self.store.context.group(0).logging.lock().current_pos();
86        let node = Node::new_leaf(&mut build, self.bucket.loader(self.store.context), 0, lsn);
87        let mut page = Page::new(node);
88        let mut publish = build.into_publish(&g);
89        publish.map_to(&mut page, root_pid);
90        publish.cache_after_commit(page);
91        publish.commit();
92    }
93
94    fn txid(&self) -> u64 {
95        self.store.context.compact_safe_txid()
96    }
97
98    pub(crate) fn bucket_id(&self) -> u64 {
99        self.bucket.bucket_id
100    }
101
102    pub(crate) fn begin_build(&self) -> AllocGuard<'_> {
103        AllocGuard::new(&self.bucket)
104    }
105
106    pub(crate) fn load_node(&self, g: &Guard, pid: u64) -> Result<Option<Page>, OpCode> {
107        loop {
108            if let Some(p) = self.bucket.load(pid)? {
109                let child_pid = p.header().merging_child;
110                if child_pid != NULL_PID {
111                    self.merge_node(p, child_pid, g)?;
112                    continue;
113                }
114                return Ok(Some(p));
115            } else {
116                return Ok(None);
117            }
118        }
119    }
120
121    // 1. mark child node as `merging`
122    // 2. find it's left sibling, to merge child into it
123    // 3. replace old left sibling with merged node
124    // 4. unmap child pid from page table
125    // 5. remove index to child from it's parent
126    fn merge_node(&self, parent_ptr: Page, child_pid: u64, g: &Guard) -> Result<(), OpCode> {
127        // NOTE: a big lock is necessary because the merge process must be exclusive
128        let Some(_lk) = parent_ptr.try_lock() else {
129            // return ok so cooperative callers avoid retry storms
130            return Ok(());
131        };
132
133        if self.bucket.table.get(parent_ptr.pid()) != parent_ptr.swip() {
134            return Ok(());
135        }
136
137        assert_ne!(child_pid, NULL_PID);
138        assert!(parent_ptr.is_intl());
139        let child_index = parent_ptr
140            .intl_iter()
141            .position(|(_, idx)| idx.pid == child_pid)
142            .unwrap();
143        // the "can_merge_child" check is somewhat failed
144        assert_ne!(child_index, 0, "we can't handle merge the leftmost node");
145
146        let safe_txid = self.txid();
147        // 1.
148        let child_ptr = if let Some(x) = self.set_node_merging(child_pid, g, safe_txid)? {
149            x
150        } else {
151            // child_pid was unmapped (and crashed) but not removed from parent yet
152            self.remove_node_index(parent_ptr, child_pid, g, safe_txid);
153            return Ok(());
154        };
155
156        // 2.
157        let mut merge_index = child_index - 1;
158        let mut cursor_pid = parent_ptr
159            .intl_iter()
160            .nth(merge_index)
161            .map(|(_, x)| x.pid)
162            .unwrap();
163        let mut child_unmapped = false;
164
165        loop {
166            let cursor_ptr = if let Some(x) = self.load_node(g, cursor_pid)? {
167                x
168            } else {
169                // the cursor pid may already be merged away
170                if merge_index == 0 {
171                    return Ok(());
172                }
173
174                merge_index -= 1;
175                cursor_pid = parent_ptr
176                    .intl_iter()
177                    .nth(merge_index)
178                    .map(|(_, x)| x.pid)
179                    .unwrap();
180                continue;
181            };
182
183            // 3. necessary, because cursor node itself maybe split concurrently
184            let Some(_cursor_lk) = cursor_ptr.try_lock() else {
185                continue;
186            };
187            if self.bucket.table.get(cursor_ptr.pid()) != cursor_ptr.swip() {
188                continue;
189            }
190
191            let next_pid = cursor_ptr.header().right_sibling;
192            let mut build = self.begin_build();
193            // verify this candidate still points to child as its right sibling
194            if next_pid == child_pid {
195                let (new_node, mut junks) =
196                    cursor_ptr.merge_node(&mut build, &child_ptr, safe_txid);
197                child_ptr.collect_junk(|x| junks.push(x));
198                build.collect_retired(child_ptr.base_addr(), &mut junks);
199                let mut publish = build.into_publish(g);
200                // NOTE: keep replace and mark_unmap in one publish to avoid checkpoint cutting across
201                // two write epochs and making one base addr appear in both dirty roots and junk pages
202                publish.replace(cursor_ptr, new_node, junks);
203                publish.mark_unmap(child_pid, child_ptr.swip());
204                publish.commit();
205                child_unmapped = true;
206                break;
207            }
208            let hi = cursor_ptr.hi();
209            let lo = child_ptr.lo();
210            if hi >= Some(lo) {
211                // another thread installed merged content after we loaded cursor
212                break;
213            } else {
214                // another thread split cursor after we loaded it
215                if next_pid != NULL_PID {
216                    cursor_pid = next_pid
217                } else {
218                    // child may already be unmapped by another completed merge
219                    break;
220                }
221            }
222        }
223
224        // 4. hide child from the in-memory page table before reclaiming it (or else scavenge process
225        // may use it cause memory bug).
226        // checkpoint will later make this NULL mapping durable and only then recycle child_pid into
227        // the free list. otherwise readers/gc can still load child_pid and observe a reclaimed page.
228        debug_assert_eq!(child_ptr.box_header().pid, child_pid);
229        if !child_unmapped {
230            self.begin_build().mark_unmap(child_pid, child_ptr.swip()); // child's junks were already collected
231        }
232        self.bucket.evict_cache(child_pid);
233        g.defer(move || child_ptr.reclaim());
234
235        // 5.
236        self.remove_node_index(parent_ptr, child_pid, g, safe_txid);
237
238        self.store
239            .opt
240            .observer
241            .counter(CounterMetric::TreeNodeMerge, 1);
242
243        Ok(())
244    }
245
246    // NOTE: caller must hold parent lock
247    fn remove_node_index(&self, parent_ptr: Page, child_pid: u64, g: &Guard, safe_txid: u64) {
248        debug_assert_eq!(parent_ptr.header().merging_child, child_pid);
249
250        let mut build = self.begin_build();
251        let (new_ptr, junks) = parent_ptr.process_merge(&mut build, MergeOp::Merged, safe_txid);
252        let mut publish = build.into_publish(g);
253        publish.replace(parent_ptr, new_ptr, junks);
254        publish.commit();
255    }
256
257    // 1. load child node and check if it's merging
258    // 2. return if it's merging
259    // 3. or else create a new node with merging set to true
260    // 4. replace old child node with the new node
261    // NOTE: it must be protected by lock
262    fn set_node_merging(
263        &self,
264        child_pid: u64,
265        g: &Guard,
266        safe_txid: u64,
267    ) -> Result<Option<Page>, OpCode> {
268        let page = if let Some(x) = self.load_node(g, child_pid)? {
269            x
270        } else {
271            return Ok(None);
272        };
273        if page.header().merging {
274            return Ok(Some(page));
275        }
276        let _lk = page.lock();
277        if self.bucket.table.get(page.pid()) != page.swip() {
278            return Err(OpCode::Again);
279        }
280        let mut build = self.begin_build();
281        let (new_node, junks) = page.process_merge(&mut build, MergeOp::MarkChild, safe_txid);
282        let mut publish = build.into_publish(g);
283        let new_page = publish.replace(page, new_node, junks);
284        publish.commit();
285        Ok(Some(new_page))
286    }
287
288    /// split flow:
289    /// 1. build lhs/rhs from `split_overlay`
290    ///    - no delta: split base directly
291    ///    - has delta: compact first then split
292    /// 2. map rhs and wire `lhs.right_sibling = rhs.pid`
293    /// 3. publish lhs at old pid so readers can follow sibling chain
294    /// 4. if parent exists, install rhs separator into parent
295    /// 5. if current node is root, build and publish a new root
296    fn split_node(&self, node: Page, parent_opt: Option<Page>, g: &Guard) -> Result<(), OpCode> {
297        let Some(node_lock) = node.try_lock() else {
298            return Err(OpCode::Again);
299        };
300        if self.bucket.table.get(node.pid()) != node.swip() {
301            return Err(OpCode::Again);
302        }
303        let safe_txid = self.txid();
304        let mut build = self.begin_build();
305        // 1.
306        let (mut lnode, rnode) = node.split(&mut build);
307        let mut rpage = Page::new(rnode);
308
309        // 2.
310        let mut publish = build.into_publish(g);
311        let rpid = publish.map(&mut rpage);
312        lnode.header_mut().right_sibling = rpid;
313
314        // 3.
315        let junk = Junk::new();
316        let lpage = publish.replace(node, lnode, junk);
317        publish.cache_after_commit(rpage);
318        // drop lock early so cooperative threads can make progress
319        drop(node_lock);
320        // publish rpage to page table
321        publish.commit();
322
323        let lo = rpage.lo();
324        if let Some(parent) = parent_opt {
325            // cooperative threads may race to install the same separator
326            let _lk = parent.lock();
327            if self.bucket.table.get(parent.pid()) != parent.swip() {
328                // another thread already finished this parent update
329                return Ok(());
330            }
331            // 4.
332            let mut build = self.begin_build();
333            let Some((new_node, junk)) = parent.insert_index(&mut build, lo, rpid, safe_txid)
334            else {
335                // parent update raced with other tree change
336                return Ok(());
337            };
338            let mut publish = build.into_publish(g);
339            publish.replace(parent, new_node, junk);
340            // publish new parent to page table
341            publish.commit();
342            self.store
343                .opt
344                .observer
345                .counter(CounterMetric::TreeNodeSplit, 1);
346        } else {
347            // 5.
348            self.split_root(g, lpage, rpid, lo, safe_txid)?;
349        }
350
351        Ok(())
352    }
353
354    fn split_root(
355        &self,
356        g: &Guard,
357        root: Page,
358        rpid: u64,
359        lo: &[u8],
360        safe_txid: u64,
361    ) -> Result<(), OpCode> {
362        let _lk = root.lock();
363        if self.bucket.table.get(root.pid()) != root.swip() {
364            return Err(OpCode::Again);
365        };
366        let mut build = self.begin_build();
367        let lpid = build.reserve_pid(); // no early return, no leak is possible
368
369        // compact root before building new root because step-3 publication can race with new writes
370        let (mut lnode, junk) = root.compact(&mut build, safe_txid);
371        lnode.header_mut().right_sibling = rpid;
372        let (group, lsn) = lnode.get_group_lsn();
373        let mut lpage = Page::new(lnode);
374
375        let new_root_node = Node::new_root(
376            &mut build,
377            self.bucket.loader(self.store.context),
378            &[
379                (IntlKey::new([].as_slice()), Index::new(lpid)),
380                (IntlKey::new(lo), Index::new(rpid)),
381            ],
382            group,
383            lsn,
384        );
385        let mut publish = build.into_publish(g);
386        publish.map_to(&mut lpage, lpid);
387        let n = publish.replace(root, new_root_node, junk);
388        assert_eq!(n.box_header().pid, self.root_index.pid);
389        publish.cache_after_commit(lpage);
390        // publish new root to global
391        publish.commit();
392        self.store
393            .opt
394            .observer
395            .counter(CounterMetric::TreeNodeSplit, 1);
396        Ok(())
397    }
398
399    fn find_leaf(&self, g: &Guard, k: &[u8]) -> Result<Page, OpCode> {
400        loop {
401            match self.try_find_leaf(g, k) {
402                Err(OpCode::Again) => {
403                    g.flush();
404                    continue;
405                }
406                Err(e) => unreachable!("invalid opcode {:?}", e),
407                o => return o,
408            }
409        }
410    }
411
412    fn try_find_leaf(&self, g: &Guard, key: &[u8]) -> Result<Page, OpCode> {
413        let mut cursor = self.root_index.pid;
414        let mut parent_opt: Option<Page> = None;
415        let mut unsplit_parent_opt: Option<Page> = None;
416        let mut leftmost = false;
417
418        loop {
419            let node_ptr = if let Some(x) = self.load_node(g, cursor)? {
420                x
421            } else {
422                return Err(OpCode::Again);
423            };
424
425            if node_ptr.header().merging {
426                return Err(OpCode::Again);
427            }
428
429            // node may already be replaced by smo, ensure key is still in [lo, hi)
430            let lo = node_ptr.lo();
431            if key < lo {
432                return Err(OpCode::Again);
433            }
434
435            if node_ptr.should_split(self.bucket.opt.split_elems) {
436                self.split_node(node_ptr, parent_opt, g)?;
437                return Err(OpCode::Again);
438            }
439
440            // another thread may already split this node, detect by key >= hi and follow sibling
441            let hi = node_ptr.hi();
442            let is_splitting = if let Some(hi) = hi { key >= hi } else { false };
443
444            if is_splitting {
445                // search from right sibling
446                let rpid = node_ptr.header().right_sibling;
447                assert_ne!(rpid, NULL_PID);
448
449                if unsplit_parent_opt.is_none() && parent_opt.is_some() {
450                    unsplit_parent_opt = parent_opt;
451                } else if parent_opt.is_none() && lo.is_empty() {
452                    // root may be in partial split state:
453                    // current page is lhs and rhs is already mapped but new root is not installed yet
454                    // complete root installation cooperatively
455                    assert_eq!(cursor, self.root_index.pid);
456                    let safe_txid = self.txid();
457                    let _ = self.split_root(g, node_ptr, rpid, hi.unwrap(), safe_txid);
458                    return Err(OpCode::Again);
459                }
460                cursor = rpid;
461
462                continue;
463            }
464
465            // complete pending parent separator installation cooperatively
466            if let Some(unsplit) = unsplit_parent_opt.take() {
467                let mut build = self.begin_build();
468                let _lk = unsplit.lock();
469                if self.bucket.table.get(unsplit.pid()) != unsplit.swip() {
470                    // another thread already finished this parent update
471                    return Err(OpCode::Again);
472                }
473
474                // create a new index in intl node
475                let Some((split_node, junk)) =
476                    unsplit.insert_index(&mut build, lo, cursor, self.txid())
477                else {
478                    return Err(OpCode::Again);
479                };
480                let mut publish = build.into_publish(g);
481                publish.replace(unsplit, split_node, junk);
482                publish.commit();
483                self.store
484                    .opt
485                    .observer
486                    .counter(CounterMetric::TreeNodeSplit, 1);
487            }
488
489            if !leftmost
490                && let Some(parent) = parent_opt
491                && node_ptr.should_merge()
492            {
493                self.try_merge(g, parent, node_ptr)?;
494                return Err(OpCode::Again);
495            }
496
497            if node_ptr.is_intl() {
498                assert_eq!(node_ptr.delta_len(), 0);
499                let (is_leftmost, pid) = node_ptr.child_index(key);
500                leftmost = is_leftmost;
501                parent_opt = Some(node_ptr);
502                cursor = pid;
503            } else {
504                if node_ptr.delta_len() >= self.bucket.opt.consolidate_threshold as usize {
505                    self.try_compact(g, node_ptr);
506                    // it may need split
507                    continue;
508                }
509                return Ok(node_ptr);
510            }
511        }
512    }
513
514    fn find_prev_leaf(&self, g: &Guard, key: &[u8]) -> Result<Option<Page>, OpCode> {
515        let mut cursor = self.root_index.pid;
516        let mut path: Vec<(u64, u64, usize)> = Vec::new();
517
518        loop {
519            let Some(node) = self.load_node(g, cursor)? else {
520                return Err(OpCode::Again);
521            };
522            if node.header().merging {
523                return Err(OpCode::Again);
524            }
525            if key < node.lo() {
526                return Err(OpCode::Again);
527            }
528            if let Some(hi) = node.hi()
529                && key >= hi
530            {
531                return Err(OpCode::Again);
532            }
533
534            if !node.is_intl() {
535                break;
536            }
537
538            let sst = node.sst::<IntlKey>();
539            let pos = match sst.search_by(&IntlKey::new(key), |x, y| x.raw.cmp(y.raw)) {
540                Ok(pos) => pos,
541                Err(pos) => pos.max(1) - 1,
542            };
543            let (_, idx) = sst.kv_at::<Index>(pos);
544            path.push((node.pid(), node.swip(), pos));
545            cursor = idx.pid;
546        }
547
548        while let Some((parent_pid, parent_swip, child_pos)) = path.pop() {
549            if child_pos == 0 {
550                continue;
551            }
552
553            let Some(parent) = self.load_node(g, parent_pid)? else {
554                return Err(OpCode::Again);
555            };
556            if parent.swip() != parent_swip {
557                return Err(OpCode::Again);
558            }
559            if !parent.is_intl() {
560                return Err(OpCode::Again);
561            }
562
563            let sst = parent.sst::<IntlKey>();
564            if child_pos >= parent.header().elems as usize {
565                return Err(OpCode::Again);
566            }
567            let (_, idx) = sst.kv_at::<Index>(child_pos - 1);
568            let mut pid = idx.pid;
569
570            loop {
571                let Some(node) = self.load_node(g, pid)? else {
572                    return Err(OpCode::Again);
573                };
574                if node.header().merging {
575                    return Err(OpCode::Again);
576                }
577                if let Some(hi) = node.hi()
578                    && key > hi
579                {
580                    let rpid = node.header().right_sibling;
581                    if rpid == NULL_PID {
582                        return Err(OpCode::Again);
583                    }
584                    pid = rpid;
585                    continue;
586                }
587                if key <= node.lo() {
588                    return Err(OpCode::Again);
589                }
590                if !node.is_intl() {
591                    return Ok(Some(node));
592                }
593
594                let elems = node.header().elems as usize;
595                if elems == 0 {
596                    return Err(OpCode::Again);
597                }
598                let (_, rightmost) = node.sst::<IntlKey>().kv_at::<Index>(elems - 1);
599                pid = rightmost.pid;
600            }
601        }
602
603        Ok(None)
604    }
605
606    fn try_compact(&self, g: &Guard, page: Page) {
607        let _lk = page.lock();
608        if self.bucket.table.get(page.pid()) != page.swip() {
609            return;
610        };
611
612        // consolidation never retry
613        let mut build = self.begin_build();
614        let (new_node, junk) = page.compact(&mut build, self.txid());
615        let mut publish = build.into_publish(g);
616        publish.replace(page, new_node, junk);
617        publish.commit();
618        self.store
619            .opt
620            .observer
621            .counter(CounterMetric::TreeNodeConsolidate, 1);
622    }
623
624    pub(crate) fn try_scavenge(&self, pid: u64, g: &Guard) -> Result<bool, OpCode> {
625        let page = if let Some(p) = self.load_node(g, pid)? {
626            p
627        } else {
628            return Ok(false);
629        };
630
631        let h = page.header();
632        if h.merging || h.merging_child != NULL_ADDR {
633            return Ok(false);
634        }
635
636        let safe_txid = self.txid();
637        let delta_len = page.delta_len();
638        let threshold = self.bucket.opt.consolidate_threshold as usize;
639
640        if delta_len >= threshold {
641            self.try_compact(g, page);
642            return Ok(true);
643        }
644
645        if page.ref_node().has_garbage(safe_txid) {
646            self.try_compact(g, page);
647            return Ok(true);
648        }
649
650        Ok(false)
651    }
652
653    fn try_merge(&self, g: &Guard, parent: Page, cur: Page) -> Result<(), OpCode> {
654        let Some(lk) = parent.try_lock() else {
655            return Err(OpCode::Again);
656        };
657        if self.bucket.table.get(parent.pid()) != parent.swip() {
658            return Err(OpCode::Again);
659        }
660        if parent.header().merging_child != NULL_ADDR {
661            return Err(OpCode::Again);
662        }
663        let pid = cur.pid();
664
665        if parent.can_merge_child(cur.lo(), pid) {
666            let mut build = self.begin_build();
667            let (new_parent, j) =
668                parent.process_merge(&mut build, MergeOp::MarkParent(pid), self.txid());
669            let mut publish = build.into_publish(g);
670            let new_page = publish.replace(parent, new_parent, j);
671            publish.commit();
672            drop(lk);
673            self.merge_node(new_page, pid, g)?;
674        }
675        Ok(())
676    }
677
678    fn link<F>(
679        &self,
680        _g: &Guard,
681        page: Page,
682        k: &Key,
683        v: &Record,
684        mut check: F,
685    ) -> Result<(), OpCode>
686    where
687        F: FnMut(Page, &Key) -> Result<(u8, Position), OpCode>,
688    {
689        loop {
690            let Some(node) = page.try_lock() else {
691                continue;
692            };
693            let lock_started = sampled_instant(k.txid(), LATENCY_SAMPLE_SHIFT);
694            let pid = page.pid();
695            // consolidate happened, we must retry from root
696            if self.bucket.table.get(pid) != page.swip() {
697                observe_elapsed(
698                    self.store.opt.observer.as_ref(),
699                    HistogramMetric::TreeLinkHoldMicros,
700                    lock_started,
701                );
702                return Err(OpCode::Again);
703            };
704
705            let (group, pos) = check(page, k)?;
706            let mut build = self.begin_build();
707            let (k, v) = DeltaView::from_key_val(&mut build, k, v, group, pos);
708
709            let addr = node.insert(k, v);
710            build.mark_dirty(pid, addr);
711            observe_elapsed(
712                self.store.opt.observer.as_ref(),
713                HistogramMetric::TreeLinkHoldMicros,
714                lock_started,
715            );
716            drop(node);
717            return Ok(());
718        }
719    }
720
721    fn try_put(&self, g: &Guard, key: &Key, val: &Record) -> Result<(), OpCode> {
722        let page = self.find_leaf(g, key.raw())?;
723
724        // it never write log, so use default value is always OK
725        self.link(g, page, key, val, |_, _| Ok((0, Position::default())))?;
726        Ok(())
727    }
728
729    /// for non-txn use, such as registry and recovery
730    pub fn put(&self, g: &Guard, key: Key, val: Record) -> Result<(), OpCode> {
731        loop {
732            match self.try_put(g, &key, &val) {
733                Ok(_) => return Ok(()),
734                Err(OpCode::Again) => {
735                    self.store
736                        .opt
737                        .observer
738                        .counter(CounterMetric::TreeRetryAgain, 1);
739                    g.flush();
740                    continue;
741                }
742                Err(e) => return Err(e),
743            }
744        }
745    }
746
747    fn try_update<F>(
748        &self,
749        g: &Guard,
750        key: &Key,
751        val: &Record,
752        visible: &mut F,
753    ) -> Result<Option<LatestValMeta>, OpCode>
754    where
755        F: FnMut(&Option<LatestValMeta>) -> Result<(u8, Position), OpCode>,
756    {
757        let page = self.find_leaf(g, key.raw)?;
758        let mut r = None;
759
760        self.link(g, page, key, val, |pg, k| {
761            let tmp = pg.find_latest_meta(k);
762            r = tmp.map(|meta| LatestValMeta {
763                ver: meta.ver,
764                group_id: meta.group_id,
765                is_del: meta.is_del,
766            });
767            visible(&r)
768        })?;
769
770        Ok(r)
771    }
772
773    // NOTE: the `visible` function may be called multiple times
774    pub fn update<F>(
775        &self,
776        g: &Guard,
777        key: Key,
778        val: Record,
779        mut visible: F,
780    ) -> Result<Option<LatestValMeta>, OpCode>
781    where
782        F: FnMut(&Option<LatestValMeta>) -> Result<(u8, Position), OpCode>,
783    {
784        let ksz = key.packed_size();
785        if ksz > Options::MAX_KEY_SIZE || ksz + val.packed_size() > Options::MAX_KV_SIZE {
786            return Err(OpCode::TooLarge);
787        }
788        loop {
789            match self.try_update(g, &key, &val, &mut visible) {
790                Ok(x) => return Ok(x),
791                Err(OpCode::Again) => {
792                    self.store
793                        .opt
794                        .observer
795                        .counter(CounterMetric::TreeRetryAgain, 1);
796                    self.store
797                        .opt
798                        .observer
799                        .counter(CounterMetric::TxnRetryAgain, 1);
800                    g.flush();
801                    continue;
802                }
803                Err(e) => return Err(e),
804            }
805        }
806    }
807
808    // background abort-clean uses page-wide compaction so non-head aborted versions are also purged
809    // this keeps cleanup crash-safe even when newer committed versions already cover the same key
810    pub(crate) fn remove_aborted(&self, g: &Guard, raw: &[u8]) -> Result<bool, OpCode> {
811        let page = self.find_leaf(g, raw)?;
812        let Some(_lk) = page.try_lock() else {
813            return Err(OpCode::Again);
814        };
815        if self.bucket.table.get(page.pid()) != page.swip() {
816            return Err(OpCode::Again);
817        }
818        self.rewrite_node(g, page)
819    }
820
821    // foreground retry uses head-gated cleanup to avoid no-op page compaction under concurrent updates
822    // if the aborted version is no longer the key head, gc path will handle full cleanup later
823    pub(crate) fn remove_aborted_head(
824        &self,
825        g: &Guard,
826        raw: &[u8],
827        aborted_txid: u64,
828    ) -> Result<bool, OpCode> {
829        let page = self.find_leaf(g, raw)?;
830        let Some(_lk) = page.try_lock() else {
831            return Err(OpCode::Again);
832        };
833        if self.bucket.table.get(page.pid()) != page.swip() {
834            return Err(OpCode::Again);
835        }
836
837        let Some((head_ver, _, _)) = page.find_latest(&Key::new(raw, Ver::new(u64::MAX, u32::MAX)))
838        else {
839            return Ok(false);
840        };
841        if head_ver.txid != aborted_txid {
842            return Ok(false);
843        }
844        if self.store.context.get_aborted(aborted_txid) != Some(TxOutcome::Aborted) {
845            return Ok(false);
846        }
847
848        self.rewrite_node(g, page)
849    }
850
851    #[inline]
852    fn rewrite_node(&self, g: &Guard, page: Page) -> Result<bool, OpCode> {
853        let mut build = self.begin_build();
854        let (new_node, junk, removed) =
855            page.remove_aborted(&mut build, self.txid(), self.store.context);
856        if !removed {
857            return Ok(false);
858        }
859        let mut publish = build.into_publish(g);
860        publish.replace(page, new_node, junk);
861        publish.commit();
862        Ok(true)
863    }
864
865    /// return the latest key-val pair, by using Ikey::raw(), thanks to MVCC, the first match one is
866    /// the latest one
867    pub fn get<'b>(&'b self, g: &Guard, key: Key<'b>) -> Result<(Key<'b>, ValRef), OpCode> {
868        let page = self.find_leaf(g, key.raw())?;
869
870        let Some((ver, v, b)) = page.find_latest(&key) else {
871            return Err(OpCode::NotFound);
872        };
873
874        Ok((Key::new(key.raw, ver), ValRef::new(v, b)))
875    }
876
877    pub fn range<'a, K, R, F>(&'a self, range: R, visible: F) -> Iter<'a>
878    where
879        K: AsRef<[u8]>,
880        R: RangeBounds<K>,
881        F: FnMut(&Context, u64, u8) -> bool + 'a,
882    {
883        let cached_key = Handle::new(Vec::new());
884        let lo = match range.start_bound() {
885            Bound::Included(b) => Bound::Included(b.as_ref().to_vec()),
886            Bound::Excluded(b) => Bound::Excluded(b.as_ref().to_vec()),
887            Bound::Unbounded => Bound::Included(vec![]),
888        };
889        let hi = match range.end_bound() {
890            Bound::Included(e) => Bound::Included(e.as_ref().to_vec()),
891            Bound::Excluded(e) => Bound::Excluded(e.as_ref().to_vec()),
892            Bound::Unbounded => Bound::Unbounded,
893        };
894
895        Iter {
896            tree: self,
897            cached_key,
898            lo,
899            hi,
900            iter: None,
901            rev_iter: None,
902            cache: None,
903            iter_bound: None,
904            checker: Box::new(visible),
905            filter: Filter { has_last: false },
906            guard: crossbeam_epoch::pin(),
907        }
908    }
909
910    fn traverse_hist<L, F>(
911        &self,
912        l: &L,
913        start_ts: u64,
914        hist: HistRef,
915        visible: &mut F,
916    ) -> Result<ValRef, OpCode>
917    where
918        L: ILoader,
919        F: FnMut(u64, u8) -> bool,
920    {
921        let mut addr = hist.page_addr;
922        let mut pos = hist.slot as usize;
923        let mut remaining = hist.count as usize;
924        let mut first_segment = true;
925        let target = Ver::new(start_ts, NULL_CMD);
926
927        while addr != NULL_PID && remaining > 0 {
928            let page = l.load_sibling(addr)?;
929            let ptr = page.view().as_base();
930            let sst = ptr.sst::<Ver>();
931            let elems = sst.header().elems as usize;
932            if pos >= elems {
933                addr = ptr.box_header().link;
934                pos = 0;
935                continue;
936            }
937
938            // history is key-local contiguous region, so only binary-search the active subrange
939            // on the first page and then continue linearly across the bounded region
940            let mut page_end = elems.min(pos.saturating_add(remaining));
941            if first_segment {
942                first_segment = false;
943                let begin = Self::lower_bound_hist_subrange(&sst, pos, page_end, &target);
944                let skipped = begin - pos;
945                pos = begin;
946                remaining = remaining.saturating_sub(skipped);
947                page_end = elems.min(pos.saturating_add(remaining));
948            }
949
950            while pos < page_end && remaining > 0 {
951                let (k, v) = sst.kv_at::<Val>(pos);
952                if visible(k.txid, v.group_id()) {
953                    if v.is_tombstone() {
954                        return Err(OpCode::NotFound);
955                    }
956                    let (v, r) = v.get_record(l);
957                    return Ok(ValRef::new(v, r.unwrap_or(page)));
958                }
959                pos += 1;
960                remaining -= 1;
961            }
962
963            if remaining == 0 {
964                break;
965            }
966            addr = ptr.box_header().link;
967            pos = 0;
968        }
969        Err(OpCode::NotFound)
970    }
971
972    fn lower_bound_hist_subrange(
973        sst: &crate::types::sst::Sst<Ver>,
974        mut lo: usize,
975        mut hi: usize,
976        target: &Ver,
977    ) -> usize {
978        while lo < hi {
979            let mid = lo + ((hi - lo) >> 1);
980            let key = sst.key_at(mid);
981            if key.cmp(target).is_lt() {
982                lo = mid + 1;
983            } else {
984                hi = mid;
985            }
986        }
987        lo
988    }
989
990    pub fn traverse<F>(&self, g: &Guard, key: Key, mut visible: F) -> Result<ValRef, OpCode>
991    where
992        F: FnMut(u64, u8) -> bool,
993    {
994        let page = self.find_leaf(g, key.raw)?;
995
996        let mut result = None;
997        let search_key = Key::new(key.raw, Ver::new(u64::MAX, u32::MAX));
998        page.visit_versions(
999            search_key,
1000            |x, y| {
1001                let k = Key::decode_from(x.key());
1002                match k.raw.cmp(y.raw) {
1003                    Equal => y.txid.cmp(&k.txid), // compare txid is enough
1004                    o => o,
1005                }
1006            },
1007            |x| {
1008                let k = Key::decode_from(x.key());
1009                if k.raw.cmp(key.raw).is_ne() {
1010                    return true;
1011                }
1012                let val = x.val();
1013                if visible(k.txid, val.group_id()) {
1014                    if val.is_tombstone() {
1015                        result = Some(Err(OpCode::NotFound));
1016                        return true;
1017                    }
1018                    let (r, v) = val.get_record(&page.loader);
1019                    result = Some(Ok(ValRef::new(r, v.unwrap_or_else(|| x.as_box()))));
1020                    return true;
1021                }
1022                false
1023            },
1024        );
1025
1026        if let Some(res) = result {
1027            return res;
1028        }
1029
1030        // Key::raw is unique in sst
1031        let (k, val) = page.search_sst(&key).ok_or(OpCode::NotFound)?;
1032        if visible(k.txid, val.group_id()) {
1033            if val.is_tombstone() {
1034                return Err(OpCode::NotFound);
1035            }
1036            let (record, r) = val.get_record(&page.loader);
1037            return Ok(ValRef::new(record, r.unwrap_or_else(|| page.base_box())));
1038        }
1039        if let Some(hist) = val.get_hist() {
1040            return self.traverse_hist(&page.loader, key.txid, hist, &mut visible);
1041        }
1042        Err(OpCode::NotFound)
1043    }
1044}
1045
1046/// An iterator over key-value pairs in a bucket.
1047pub struct Iter<'a> {
1048    tree: &'a Tree,
1049    cached_key: Handle<Vec<u8>>,
1050    lo: Bound<Vec<u8>>,
1051    hi: Bound<Vec<u8>>,
1052    iter: Option<RawLeafIter<'a, Loader>>,
1053    rev_iter: Option<RawLeafRevIter<'a, Loader>>,
1054    cache: Option<Box<Node>>,
1055    iter_bound: Option<Box<Bound<Vec<u8>>>>,
1056    checker: Box<dyn FnMut(&Context, u64, u8) -> bool + 'a>,
1057    filter: Filter,
1058    guard: Guard,
1059}
1060
1061impl Drop for Iter<'_> {
1062    fn drop(&mut self) {
1063        // release iterator state before reclaiming shared key scratch
1064        self.iter.take();
1065        self.rev_iter.take();
1066        self.cache.take();
1067        self.iter_bound.take();
1068        self.cached_key.reclaim();
1069    }
1070}
1071
1072impl Iter<'_> {
1073    fn low_key(&self) -> &[u8] {
1074        match self.lo {
1075            Bound::Unbounded => &[],
1076            Bound::Excluded(ref x) | Bound::Included(ref x) => x,
1077        }
1078    }
1079
1080    fn high_key(&self) -> Option<&[u8]> {
1081        match self.hi {
1082            Bound::Unbounded => None,
1083            Bound::Excluded(ref x) | Bound::Included(ref x) => Some(x),
1084        }
1085    }
1086
1087    fn collapsed(&self) -> bool {
1088        match (&self.lo, &self.hi) {
1089            (Bound::Included(b), Bound::Included(e))
1090            | (Bound::Excluded(b), Bound::Excluded(e))
1091            | (Bound::Included(b), Bound::Excluded(e))
1092            | (Bound::Excluded(b), Bound::Included(e)) => b > e,
1093            _ => false,
1094        }
1095    }
1096
1097    fn find_leaf_for_next_back(&self) -> Result<Page, OpCode> {
1098        if let Some(k) = self.high_key() {
1099            let node = self.tree.find_leaf(&self.guard, k)?;
1100            if matches!(self.hi, Bound::Excluded(_)) && node.lo() >= k {
1101                return self
1102                    .tree
1103                    .find_prev_leaf(&self.guard, k)?
1104                    .ok_or(OpCode::NotFound);
1105            }
1106            return Ok(node);
1107        }
1108
1109        let mut node = self.tree.find_leaf(&self.guard, self.low_key())?;
1110        loop {
1111            let rpid = node.header().right_sibling;
1112            if rpid == NULL_PID {
1113                return Ok(node);
1114            }
1115            let Some(next) = self.tree.load_node(&self.guard, rpid)? else {
1116                return Err(OpCode::Again);
1117            };
1118            node = next;
1119        }
1120    }
1121
1122    fn get_next(&mut self) -> Option<<Self as Iterator>::Item> {
1123        self.rev_iter.take();
1124
1125        'retry: while !self.collapsed() {
1126            if self.iter.is_none() {
1127                let node = match self.tree.find_leaf(&self.guard, self.low_key()) {
1128                    Ok(node) => node,
1129                    Err(OpCode::Again) => {
1130                        self.guard.flush();
1131                        continue;
1132                    }
1133                    Err(OpCode::NotFound) => return None,
1134                    Err(e) => panic!("iter find_leaf failed: {e:?}"),
1135                };
1136                let next_node = node.ref_node();
1137                let next_bound = self.lo.clone();
1138
1139                if let Some(cache) = self.cache.as_mut() {
1140                    **cache = next_node;
1141                } else {
1142                    self.cache = Some(Box::new(next_node));
1143                }
1144
1145                if let Some(bound) = self.iter_bound.as_mut() {
1146                    **bound = next_bound;
1147                } else {
1148                    self.iter_bound = Some(Box::new(next_bound));
1149                }
1150
1151                let cache = self.cache.as_ref().expect("must valid");
1152                let bound = self.iter_bound.as_ref().expect("must valid");
1153                self.iter = Some(unsafe {
1154                    std::mem::transmute::<RawLeafIter<'_, Loader>, RawLeafIter<'_, Loader>>(
1155                        cache.successor(bound.as_ref(), self.cached_key),
1156                    )
1157                });
1158            }
1159
1160            let r = loop {
1161                let next = {
1162                    let iter = self.iter.as_mut().expect("must valid");
1163                    iter.try_next()
1164                };
1165                match next {
1166                    Ok(Some(item)) => {
1167                        let ok = match &self.lo {
1168                            Bound::Unbounded => true,
1169                            Bound::Included(b) => item.cmp_key(b.as_slice()).is_ge(),
1170                            Bound::Excluded(b) => item.cmp_key(b.as_slice()).is_gt(),
1171                        };
1172                        if ok
1173                            && (self.checker)(
1174                                &self.tree.store.context,
1175                                item.txid(),
1176                                item.group_id(),
1177                            )
1178                            && self.filter.check(&item)
1179                        {
1180                            break Some(item);
1181                        }
1182                    }
1183                    Ok(None) => break None,
1184                    Err(OpCode::Again | OpCode::NotFound) => {
1185                        self.iter.take();
1186                        continue 'retry;
1187                    }
1188                    Err(e) => panic!("iter load failed: {e:?}"),
1189                }
1190            };
1191
1192            if let Some(item) = r {
1193                // reuse existing lower-bound buffer to avoid realloc per item
1194                let key = item.key();
1195                match &mut self.lo {
1196                    Bound::Included(v) | Bound::Excluded(v) => {
1197                        v.clear();
1198                        v.extend_from_slice(key);
1199                        // keep the variant as Excluded for next search step
1200                        self.lo = Bound::Excluded(std::mem::take(v));
1201                    }
1202                    Bound::Unbounded => {
1203                        self.lo = Bound::Excluded(key.to_vec());
1204                    }
1205                }
1206
1207                match self.hi {
1208                    Bound::Unbounded => return Some(item),
1209                    Bound::Included(ref h) if item.cmp_key(h.as_slice()).is_le() => {
1210                        return Some(item);
1211                    }
1212                    Bound::Excluded(ref h) if item.cmp_key(h.as_slice()).is_lt() => {
1213                        return Some(item);
1214                    }
1215                    _ => return None,
1216                }
1217            } else {
1218                self.iter.take();
1219                let node = self.cache.as_ref().expect("must valid");
1220                if let Some(hi) = node.hi() {
1221                    self.lo = Bound::Included(hi.to_vec());
1222                    continue;
1223                }
1224                break;
1225            }
1226        }
1227
1228        None
1229    }
1230}
1231
1232impl<'a> Iterator for Iter<'a> {
1233    type Item = IterItem<'a, Loader>;
1234
1235    fn next(&mut self) -> Option<Self::Item> {
1236        self.get_next()
1237    }
1238}
1239
1240impl<'a> DoubleEndedIterator for Iter<'a> {
1241    fn next_back(&mut self) -> Option<Self::Item> {
1242        self.iter.take();
1243
1244        'retry: while !self.collapsed() {
1245            if self.rev_iter.is_none() {
1246                let node = match self.find_leaf_for_next_back() {
1247                    Ok(node) => node,
1248                    Err(OpCode::Again) => {
1249                        self.guard.flush();
1250                        continue;
1251                    }
1252                    Err(OpCode::NotFound) => return None,
1253                    Err(e) => panic!("iter find_leaf failed: {e:?}"),
1254                };
1255                let next_node = node.ref_node();
1256                if let Some(cache) = self.cache.as_mut() {
1257                    **cache = next_node;
1258                } else {
1259                    self.cache = Some(Box::new(next_node));
1260                }
1261                self.rev_iter = Some(unsafe {
1262                    std::mem::transmute::<RawLeafRevIter<'_, Loader>, RawLeafRevIter<'_, Loader>>(
1263                        self.cache.as_ref().expect("must valid").predecessor(
1264                            &self.lo,
1265                            &self.hi,
1266                            self.cached_key,
1267                        ),
1268                    )
1269                });
1270            }
1271
1272            let res = loop {
1273                let next = {
1274                    let iter = self.rev_iter.as_mut().expect("must valid");
1275                    iter.try_next_back()
1276                };
1277                match next {
1278                    Ok(Some(item)) => {
1279                        let lo_ok = match &self.lo {
1280                            Bound::Unbounded => true,
1281                            Bound::Included(b) => item.cmp_key(b.as_slice()).is_ge(),
1282                            Bound::Excluded(b) => item.cmp_key(b.as_slice()).is_gt(),
1283                        };
1284                        let hi_ok = match &self.hi {
1285                            Bound::Unbounded => true,
1286                            Bound::Included(h) => item.cmp_key(h.as_slice()).is_le(),
1287                            Bound::Excluded(h) => item.cmp_key(h.as_slice()).is_lt(),
1288                        };
1289                        if lo_ok
1290                            && hi_ok
1291                            && (self.checker)(
1292                                &self.tree.store.context,
1293                                item.txid(),
1294                                item.group_id(),
1295                            )
1296                            && self.filter.check(&item)
1297                        {
1298                            break Some(item);
1299                        }
1300                    }
1301                    Ok(None) => break None,
1302                    Err(OpCode::Again | OpCode::NotFound) => {
1303                        self.rev_iter.take();
1304                        continue 'retry;
1305                    }
1306                    Err(e) => panic!("iter load failed: {e:?}"),
1307                }
1308            };
1309
1310            if let Some(item) = res {
1311                let key = item.key();
1312                match &mut self.hi {
1313                    Bound::Included(v) | Bound::Excluded(v) => {
1314                        v.clear();
1315                        v.extend_from_slice(key);
1316                        self.hi = Bound::Excluded(std::mem::take(v));
1317                    }
1318                    Bound::Unbounded => {
1319                        self.hi = Bound::Excluded(key.to_vec());
1320                    }
1321                }
1322                return Some(item);
1323            }
1324
1325            self.rev_iter.take();
1326            let lo = self.cache.as_ref().expect("must valid").lo();
1327            if lo.is_empty() {
1328                return None;
1329            }
1330            self.hi = Bound::Excluded(lo.to_vec());
1331        }
1332
1333        None
1334    }
1335}
1336
1337struct Filter {
1338    has_last: bool,
1339}
1340
1341impl Filter {
1342    fn check<L: ILoader>(&mut self, item: &IterItem<L>) -> bool {
1343        // key() returns cached assembled key from previous accepted item
1344        if self.has_last && item.cmp_key(item.key()).is_eq() {
1345            return false;
1346        }
1347        let _ = item.assembled_key();
1348        self.has_last = true;
1349        !item.is_tombstone()
1350    }
1351}
1352
1353#[cfg(test)]
1354mod test {
1355    use crate::{BucketOptions, Mace, Options, RandomPath};
1356    use std::thread;
1357
1358    #[test]
1359    fn concurrent_page_hit() {
1360        let path = RandomPath::tmp();
1361        let mut opt = Options::new(&*path);
1362        opt.tmp_store = true;
1363        let mace = Mace::new(opt.validate().unwrap()).unwrap();
1364        let db = mace
1365            .new_bucket(
1366                "default",
1367                BucketOptions {
1368                    split_elems: 256,
1369                    ..BucketOptions::default()
1370                },
1371            )
1372            .unwrap();
1373
1374        let num_readers = 4;
1375        let num_iterations = 1000;
1376
1377        thread::scope(|s| {
1378            for _ in 0..num_readers {
1379                let db = db.clone();
1380                s.spawn(move || {
1381                    for _ in 0..num_iterations {
1382                        let view = db.view().unwrap();
1383                        let mut count = 0;
1384                        for _ in view.seek("key") {
1385                            count += 1;
1386                        }
1387                        assert!(count >= 0);
1388                    }
1389                });
1390            }
1391
1392            s.spawn(|| {
1393                for i in 0..num_iterations {
1394                    let kv = db.begin().unwrap();
1395                    let key = format!("key_{:05}", i);
1396                    kv.put(&key, &key).unwrap();
1397                    kv.commit().unwrap();
1398                }
1399            });
1400        });
1401    }
1402}