use crate::Options;
use crate::cc::context::{Context, TxOutcome};
use crate::map::buffer::BucketContext;
use crate::map::publish::AllocGuard;
use crate::map::{Loader, Node, Page};
use crate::types::data::{HistRef, IterItem, Record, Val};
use crate::types::node::{Junk, MergeOp, RawLeafIter, RawLeafRevIter};
use crate::types::refbox::DeltaView;
use crate::types::traits::{IAsBoxRef, IBoxHeader, IDecode, IHeader, ILoader};
use crate::utils::data::Position;
use crate::utils::observe::{
CounterMetric, HistogramMetric, LATENCY_SAMPLE_SHIFT, observe_elapsed, sampled_instant,
};
use crate::utils::{Handle, MutRef, NULL_ADDR, OpCode};
use crate::{
Store,
types::{
data::{Index, IntlKey, Key, Ver},
refbox::BoxRef,
traits::{ICodec, IKey, IVal},
},
utils::{NULL_CMD, NULL_PID},
};
use crossbeam_epoch::Guard;
use std::cmp::Ordering::Equal;
use std::ops::{Bound, RangeBounds};
use std::sync::Arc;
use std::sync::atomic::Ordering::Acquire;
#[derive(Clone)]
pub struct ValRef {
raw: Record,
_owner: BoxRef,
}
impl ValRef {
pub(crate) fn new(raw: Record, owner: BoxRef) -> Self {
Self { raw, _owner: owner }
}
pub fn slice(&self) -> &[u8] {
self.raw.data()
}
pub fn to_vec(self) -> Vec<u8> {
self.raw.data().to_vec()
}
pub(crate) fn group_id(&self) -> u8 {
self.raw.group_id()
}
pub(crate) fn is_put(&self) -> bool {
!self.is_del()
}
pub(crate) fn is_del(&self) -> bool {
self.raw.is_tombstone()
}
}
impl Drop for ValRef {
fn drop(&mut self) {
}
}
#[derive(Clone)]
pub struct Tree {
pub(crate) store: MutRef<Store>,
pub(crate) root_index: Index,
pub(crate) bucket: Arc<BucketContext>,
}
impl Tree {
pub fn new(store: MutRef<Store>, root_pid: u64, bucket: Arc<BucketContext>) -> Self {
let this = Self {
store,
root_index: Index::new(root_pid),
bucket,
};
let addr = this.bucket.table.index(root_pid).load(Acquire);
if addr == NULL_ADDR {
this.init(root_pid);
}
this
}
fn init(&self, root_pid: u64) {
let g = crossbeam_epoch::pin();
let mut build = self.begin_build();
let lsn = self.store.context.group(0).logging.lock().current_pos();
let node = Node::new_leaf(&mut build, self.bucket.loader(self.store.context), 0, lsn);
let mut page = Page::new(node);
let mut publish = build.into_publish(&g);
publish.map_to(&mut page, root_pid);
publish.cache_after_commit(page);
publish.commit();
}
fn txid(&self) -> u64 {
self.store.context.compact_safe_txid()
}
pub(crate) fn bucket_id(&self) -> u64 {
self.bucket.bucket_id
}
pub(crate) fn begin_build(&self) -> AllocGuard<'_> {
AllocGuard::new(&self.store.opt, &self.bucket)
}
pub(crate) fn load_node(&self, g: &Guard, pid: u64) -> Result<Option<Page>, OpCode> {
loop {
if let Some(p) = self.bucket.load(pid)? {
let child_pid = p.header().merging_child;
if child_pid != NULL_PID {
self.merge_node(p, child_pid, g)?;
continue;
}
return Ok(Some(p));
} else {
return Ok(None);
}
}
}
fn merge_node(&self, parent_ptr: Page, child_pid: u64, g: &Guard) -> Result<(), OpCode> {
let Some(_lk) = parent_ptr.try_lock() else {
return Ok(());
};
if self.bucket.table.get(parent_ptr.pid()) != parent_ptr.swip() {
return Ok(());
}
assert_ne!(child_pid, NULL_PID);
assert!(parent_ptr.is_intl());
let child_index = parent_ptr
.intl_iter()
.position(|(_, idx)| idx.pid == child_pid)
.unwrap();
assert_ne!(child_index, 0, "we can't handle merge the leftmost node");
let safe_txid = self.txid();
let child_ptr = if let Some(x) = self.set_node_merging(child_pid, g, safe_txid)? {
x
} else {
self.remove_node_index(parent_ptr, child_pid, g, safe_txid);
return Ok(());
};
let mut merge_index = child_index - 1;
let mut cursor_pid = parent_ptr
.intl_iter()
.nth(merge_index)
.map(|(_, x)| x.pid)
.unwrap();
let mut child_unmapped = false;
loop {
let cursor_ptr = if let Some(x) = self.load_node(g, cursor_pid)? {
x
} else {
if merge_index == 0 {
return Ok(());
}
merge_index -= 1;
cursor_pid = parent_ptr
.intl_iter()
.nth(merge_index)
.map(|(_, x)| x.pid)
.unwrap();
continue;
};
let Some(_cursor_lk) = cursor_ptr.try_lock() else {
continue;
};
if self.bucket.table.get(cursor_ptr.pid()) != cursor_ptr.swip() {
continue;
}
let next_pid = cursor_ptr.header().right_sibling;
let mut build = self.begin_build();
if next_pid == child_pid {
let (new_node, mut junks) =
cursor_ptr.merge_node(&mut build, &child_ptr, safe_txid);
child_ptr.collect_junk(|x| junks.push(x));
build.collect_retired(child_ptr.base_addr(), &mut junks);
let mut publish = build.into_publish(g);
publish.replace(cursor_ptr, new_node, junks);
publish.mark_unmap(child_pid, child_ptr.swip());
publish.commit();
child_unmapped = true;
break;
}
let hi = cursor_ptr.hi();
let lo = child_ptr.lo();
if hi >= Some(lo) {
break;
} else {
if next_pid != NULL_PID {
cursor_pid = next_pid
} else {
break;
}
}
}
debug_assert_eq!(child_ptr.box_header().pid, child_pid);
if !child_unmapped {
self.begin_build().mark_unmap(child_pid, child_ptr.swip()); }
self.bucket.evict_cache(child_pid);
g.defer(move || child_ptr.reclaim());
self.remove_node_index(parent_ptr, child_pid, g, safe_txid);
self.store
.opt
.observer
.counter(CounterMetric::TreeNodeMerge, 1);
Ok(())
}
fn remove_node_index(&self, parent_ptr: Page, child_pid: u64, g: &Guard, safe_txid: u64) {
debug_assert_eq!(parent_ptr.header().merging_child, child_pid);
let mut build = self.begin_build();
let (new_ptr, junks) = parent_ptr.process_merge(&mut build, MergeOp::Merged, safe_txid);
let mut publish = build.into_publish(g);
publish.replace(parent_ptr, new_ptr, junks);
publish.commit();
}
fn set_node_merging(
&self,
child_pid: u64,
g: &Guard,
safe_txid: u64,
) -> Result<Option<Page>, OpCode> {
let page = if let Some(x) = self.load_node(g, child_pid)? {
x
} else {
return Ok(None);
};
if page.header().merging {
return Ok(Some(page));
}
let _lk = page.lock();
if self.bucket.table.get(page.pid()) != page.swip() {
return Err(OpCode::Again);
}
let mut build = self.begin_build();
let (new_node, junks) = page.process_merge(&mut build, MergeOp::MarkChild, safe_txid);
let mut publish = build.into_publish(g);
let new_page = publish.replace(page, new_node, junks);
publish.commit();
Ok(Some(new_page))
}
fn split_node(&self, node: Page, parent_opt: Option<Page>, g: &Guard) -> Result<(), OpCode> {
let Some(node_lock) = node.try_lock() else {
return Err(OpCode::Again);
};
if self.bucket.table.get(node.pid()) != node.swip() {
return Err(OpCode::Again);
}
let safe_txid = self.txid();
let mut build = self.begin_build();
let (mut lnode, rnode) = node.split(&mut build);
let mut rpage = Page::new(rnode);
let mut publish = build.into_publish(g);
let rpid = publish.map(&mut rpage);
lnode.header_mut().right_sibling = rpid;
let junk = Junk::new();
let lpage = publish.replace(node, lnode, junk);
publish.cache_after_commit(rpage);
drop(node_lock);
publish.commit();
let lo = rpage.lo();
if let Some(parent) = parent_opt {
let _lk = parent.lock();
if self.bucket.table.get(parent.pid()) != parent.swip() {
return Ok(());
}
let mut build = self.begin_build();
let Some((new_node, junk)) = parent.insert_index(&mut build, lo, rpid, safe_txid)
else {
return Ok(());
};
let mut publish = build.into_publish(g);
publish.replace(parent, new_node, junk);
publish.commit();
self.store
.opt
.observer
.counter(CounterMetric::TreeNodeSplit, 1);
} else {
self.split_root(g, lpage, rpid, lo, safe_txid)?;
}
Ok(())
}
fn split_root(
&self,
g: &Guard,
root: Page,
rpid: u64,
lo: &[u8],
safe_txid: u64,
) -> Result<(), OpCode> {
let _lk = root.lock();
if self.bucket.table.get(root.pid()) != root.swip() {
return Err(OpCode::Again);
};
let mut build = self.begin_build();
let lpid = build.reserve_pid();
let (mut lnode, junk) = root.compact(&mut build, safe_txid);
lnode.header_mut().right_sibling = rpid;
let (group, lsn) = lnode.get_group_lsn();
let mut lpage = Page::new(lnode);
let new_root_node = Node::new_root(
&mut build,
self.bucket.loader(self.store.context),
&[
(IntlKey::new([].as_slice()), Index::new(lpid)),
(IntlKey::new(lo), Index::new(rpid)),
],
group,
lsn,
);
let mut publish = build.into_publish(g);
publish.map_to(&mut lpage, lpid);
let n = publish.replace(root, new_root_node, junk);
assert_eq!(n.box_header().pid, self.root_index.pid);
publish.cache_after_commit(lpage);
publish.commit();
self.store
.opt
.observer
.counter(CounterMetric::TreeNodeSplit, 1);
Ok(())
}
fn find_leaf(&self, g: &Guard, k: &[u8]) -> Result<Page, OpCode> {
loop {
match self.try_find_leaf(g, k) {
Err(OpCode::Again) => {
g.flush();
continue;
}
Err(e) => unreachable!("invalid opcode {:?}", e),
o => return o,
}
}
}
fn try_find_leaf(&self, g: &Guard, key: &[u8]) -> Result<Page, OpCode> {
let mut cursor = self.root_index.pid;
let mut parent_opt: Option<Page> = None;
let mut unsplit_parent_opt: Option<Page> = None;
let mut leftmost = false;
loop {
let node_ptr = if let Some(x) = self.load_node(g, cursor)? {
x
} else {
return Err(OpCode::Again);
};
if node_ptr.header().merging {
return Err(OpCode::Again);
}
let lo = node_ptr.lo();
if key < lo {
return Err(OpCode::Again);
}
if node_ptr.should_split(self.store.opt.split_elems) {
self.split_node(node_ptr, parent_opt, g)?;
return Err(OpCode::Again);
}
let hi = node_ptr.hi();
let is_splitting = if let Some(hi) = hi { key >= hi } else { false };
if is_splitting {
let rpid = node_ptr.header().right_sibling;
assert_ne!(rpid, NULL_PID);
if unsplit_parent_opt.is_none() && parent_opt.is_some() {
unsplit_parent_opt = parent_opt;
} else if parent_opt.is_none() && lo.is_empty() {
assert_eq!(cursor, self.root_index.pid);
let safe_txid = self.txid();
let _ = self.split_root(g, node_ptr, rpid, hi.unwrap(), safe_txid);
return Err(OpCode::Again);
}
cursor = rpid;
continue;
}
if let Some(unsplit) = unsplit_parent_opt.take() {
let mut build = self.begin_build();
let _lk = unsplit.lock();
if self.bucket.table.get(unsplit.pid()) != unsplit.swip() {
return Err(OpCode::Again);
}
let Some((split_node, junk)) =
unsplit.insert_index(&mut build, lo, cursor, self.txid())
else {
return Err(OpCode::Again);
};
let mut publish = build.into_publish(g);
publish.replace(unsplit, split_node, junk);
publish.commit();
self.store
.opt
.observer
.counter(CounterMetric::TreeNodeSplit, 1);
}
if !leftmost
&& let Some(parent) = parent_opt
&& node_ptr.should_merge()
{
self.try_merge(g, parent, node_ptr)?;
return Err(OpCode::Again);
}
if node_ptr.is_intl() {
assert_eq!(node_ptr.delta_len(), 0);
let (is_leftmost, pid) = node_ptr.child_index(key);
leftmost = is_leftmost;
parent_opt = Some(node_ptr);
cursor = pid;
} else {
if node_ptr.delta_len() >= self.store.opt.consolidate_threshold as usize {
self.try_compact(g, node_ptr);
continue;
}
return Ok(node_ptr);
}
}
}
fn find_prev_leaf(&self, g: &Guard, key: &[u8]) -> Result<Option<Page>, OpCode> {
let mut cursor = self.root_index.pid;
let mut path: Vec<(u64, u64, usize)> = Vec::new();
loop {
let Some(node) = self.load_node(g, cursor)? else {
return Err(OpCode::Again);
};
if node.header().merging {
return Err(OpCode::Again);
}
if key < node.lo() {
return Err(OpCode::Again);
}
if let Some(hi) = node.hi()
&& key >= hi
{
return Err(OpCode::Again);
}
if !node.is_intl() {
break;
}
let sst = node.sst::<IntlKey>();
let pos = match sst.search_by(&IntlKey::new(key), |x, y| x.raw.cmp(y.raw)) {
Ok(pos) => pos,
Err(pos) => pos.max(1) - 1,
};
let (_, idx) = sst.kv_at::<Index>(pos);
path.push((node.pid(), node.swip(), pos));
cursor = idx.pid;
}
while let Some((parent_pid, parent_swip, child_pos)) = path.pop() {
if child_pos == 0 {
continue;
}
let Some(parent) = self.load_node(g, parent_pid)? else {
return Err(OpCode::Again);
};
if parent.swip() != parent_swip {
return Err(OpCode::Again);
}
if !parent.is_intl() {
return Err(OpCode::Again);
}
let sst = parent.sst::<IntlKey>();
if child_pos >= parent.header().elems as usize {
return Err(OpCode::Again);
}
let (_, idx) = sst.kv_at::<Index>(child_pos - 1);
let mut pid = idx.pid;
loop {
let Some(node) = self.load_node(g, pid)? else {
return Err(OpCode::Again);
};
if node.header().merging {
return Err(OpCode::Again);
}
if let Some(hi) = node.hi()
&& key > hi
{
let rpid = node.header().right_sibling;
if rpid == NULL_PID {
return Err(OpCode::Again);
}
pid = rpid;
continue;
}
if key <= node.lo() {
return Err(OpCode::Again);
}
if !node.is_intl() {
return Ok(Some(node));
}
let elems = node.header().elems as usize;
if elems == 0 {
return Err(OpCode::Again);
}
let (_, rightmost) = node.sst::<IntlKey>().kv_at::<Index>(elems - 1);
pid = rightmost.pid;
}
}
Ok(None)
}
fn try_compact(&self, g: &Guard, page: Page) {
let _lk = page.lock();
if self.bucket.table.get(page.pid()) != page.swip() {
return;
};
let mut build = self.begin_build();
let (new_node, junk) = page.compact(&mut build, self.txid());
let mut publish = build.into_publish(g);
publish.replace(page, new_node, junk);
publish.commit();
self.store
.opt
.observer
.counter(CounterMetric::TreeNodeConsolidate, 1);
}
pub(crate) fn try_scavenge(&self, pid: u64, g: &Guard) -> Result<bool, OpCode> {
let page = if let Some(p) = self.load_node(g, pid)? {
p
} else {
return Ok(false);
};
let h = page.header();
if h.merging || h.merging_child != NULL_ADDR {
return Ok(false);
}
let safe_txid = self.txid();
let delta_len = page.delta_len();
let threshold = self.store.opt.consolidate_threshold as usize;
if delta_len >= threshold {
self.try_compact(g, page);
return Ok(true);
}
if page.ref_node().has_garbage(safe_txid) {
self.try_compact(g, page);
return Ok(true);
}
Ok(false)
}
fn try_merge(&self, g: &Guard, parent: Page, cur: Page) -> Result<(), OpCode> {
let Some(lk) = parent.try_lock() else {
return Err(OpCode::Again);
};
if self.bucket.table.get(parent.pid()) != parent.swip() {
return Err(OpCode::Again);
}
if parent.header().merging_child != NULL_ADDR {
return Err(OpCode::Again);
}
let pid = cur.pid();
if parent.can_merge_child(cur.lo(), pid) {
let mut build = self.begin_build();
let (new_parent, j) =
parent.process_merge(&mut build, MergeOp::MarkParent(pid), self.txid());
let mut publish = build.into_publish(g);
let new_page = publish.replace(parent, new_parent, j);
publish.commit();
drop(lk);
self.merge_node(new_page, pid, g)?;
}
Ok(())
}
fn link<K, V, F>(
&self,
_g: &Guard,
page: Page,
k: &K,
v: &V,
mut check: F,
) -> Result<(), OpCode>
where
K: IKey,
V: IVal,
F: FnMut(Page, &K) -> Result<(u8, Position), OpCode>,
{
loop {
let Some(node) = page.try_lock() else {
continue;
};
let lock_started = sampled_instant(k.txid(), LATENCY_SAMPLE_SHIFT);
let pid = page.pid();
if self.bucket.table.get(pid) != page.swip() {
observe_elapsed(
self.store.opt.observer.as_ref(),
HistogramMetric::TreeLinkHoldMicros,
lock_started,
);
return Err(OpCode::Again);
};
let (group, pos) = check(page, k)?;
let mut build = self.begin_build();
let (k, v) = DeltaView::from_key_val(&mut build, k, v, group, pos);
let addr = node.insert(k, v);
build.mark_dirty(pid, addr);
observe_elapsed(
self.store.opt.observer.as_ref(),
HistogramMetric::TreeLinkHoldMicros,
lock_started,
);
drop(node);
return Ok(());
}
}
fn try_put<K, V>(&self, g: &Guard, key: &K, val: &V) -> Result<(), OpCode>
where
K: IKey,
V: IVal,
{
let page = self.find_leaf(g, key.raw())?;
self.link(g, page, key, val, |_, _| Ok((0, Position::default())))?;
Ok(())
}
pub fn put<K, V>(&self, g: &Guard, key: K, val: V) -> Result<(), OpCode>
where
K: IKey,
V: IVal,
{
loop {
match self.try_put::<K, V>(g, &key, &val) {
Ok(_) => return Ok(()),
Err(OpCode::Again) => {
self.store
.opt
.observer
.counter(CounterMetric::TreeRetryAgain, 1);
g.flush();
continue;
}
Err(e) => return Err(e),
}
}
}
fn try_update<V, F>(
&self,
g: &Guard,
key: &Key,
val: &V,
visible: &mut F,
) -> Result<Option<ValRef>, OpCode>
where
V: IVal,
F: FnMut(&Option<(Key, ValRef)>) -> Result<(u8, Position), OpCode>,
{
let page = self.find_leaf(g, key.raw)?;
let mut r = None;
self.link(g, page, key, val, |pg, k| {
let tmp = pg.find_latest(k);
r = tmp.map(|(ver, y, b)| (Key::new(k.raw, ver), ValRef::new(y, b)));
visible(&r)
})?;
Ok(r.map(|x| x.1))
}
pub fn update<V, F>(
&self,
g: &Guard,
key: Key,
val: V,
mut visible: F,
) -> Result<Option<ValRef>, OpCode>
where
V: IVal,
F: FnMut(&Option<(Key, ValRef)>) -> Result<(u8, Position), OpCode>,
{
let ksz = key.packed_size();
if ksz > Options::MAX_KEY_SIZE || ksz + val.packed_size() > Options::MAX_KV_SIZE {
return Err(OpCode::TooLarge);
}
loop {
match self.try_update(g, &key, &val, &mut visible) {
Ok(x) => return Ok(x),
Err(OpCode::Again) => {
self.store
.opt
.observer
.counter(CounterMetric::TreeRetryAgain, 1);
self.store
.opt
.observer
.counter(CounterMetric::TxnRetryAgain, 1);
g.flush();
continue;
}
Err(e) => return Err(e),
}
}
}
pub(crate) fn remove_aborted(&self, g: &Guard, raw: &[u8]) -> Result<bool, OpCode> {
let page = self.find_leaf(g, raw)?;
let Some(_lk) = page.try_lock() else {
return Err(OpCode::Again);
};
if self.bucket.table.get(page.pid()) != page.swip() {
return Err(OpCode::Again);
}
self.rewrite_node(g, page)
}
pub(crate) fn remove_aborted_head(
&self,
g: &Guard,
raw: &[u8],
aborted_txid: u64,
) -> Result<bool, OpCode> {
let page = self.find_leaf(g, raw)?;
let Some(_lk) = page.try_lock() else {
return Err(OpCode::Again);
};
if self.bucket.table.get(page.pid()) != page.swip() {
return Err(OpCode::Again);
}
let Some((head_ver, _, _)) = page.find_latest(&Key::new(raw, Ver::new(u64::MAX, u32::MAX)))
else {
return Ok(false);
};
if head_ver.txid != aborted_txid {
return Ok(false);
}
if self.store.context.get_aborted(aborted_txid) != Some(TxOutcome::Aborted) {
return Ok(false);
}
self.rewrite_node(g, page)
}
#[inline]
fn rewrite_node(&self, g: &Guard, page: Page) -> Result<bool, OpCode> {
let mut build = self.begin_build();
let (new_node, junk, removed) =
page.remove_aborted(&mut build, self.txid(), self.store.context);
if !removed {
return Ok(false);
}
let mut publish = build.into_publish(g);
publish.replace(page, new_node, junk);
publish.commit();
Ok(true)
}
pub fn get<'b>(&'b self, g: &Guard, key: Key<'b>) -> Result<(Key<'b>, ValRef), OpCode> {
let page = self.find_leaf(g, key.raw())?;
let Some((ver, v, b)) = page.find_latest(&key) else {
return Err(OpCode::NotFound);
};
Ok((Key::new(key.raw, ver), ValRef::new(v, b)))
}
pub fn range<'a, K, R, F>(&'a self, range: R, visible: F) -> Iter<'a>
where
K: AsRef<[u8]>,
R: RangeBounds<K>,
F: FnMut(&Context, u64, u8) -> bool + 'a,
{
let cached_key = Handle::new(Vec::new());
let lo = match range.start_bound() {
Bound::Included(b) => Bound::Included(b.as_ref().to_vec()),
Bound::Excluded(b) => Bound::Excluded(b.as_ref().to_vec()),
Bound::Unbounded => Bound::Included(vec![]),
};
let hi = match range.end_bound() {
Bound::Included(e) => Bound::Included(e.as_ref().to_vec()),
Bound::Excluded(e) => Bound::Excluded(e.as_ref().to_vec()),
Bound::Unbounded => Bound::Unbounded,
};
Iter {
tree: self,
cached_key,
lo,
hi,
iter: None,
rev_iter: None,
cache: None,
iter_bound: None,
checker: Box::new(visible),
filter: Filter { has_last: false },
guard: crossbeam_epoch::pin(),
}
}
fn traverse_hist<L, F>(
&self,
l: &L,
start_ts: u64,
hist: HistRef,
visible: &mut F,
) -> Result<ValRef, OpCode>
where
L: ILoader,
F: FnMut(u64, u8) -> bool,
{
let mut addr = hist.page_addr;
let mut pos = hist.slot as usize;
let mut remaining = hist.count as usize;
let mut first_segment = true;
let target = Ver::new(start_ts, NULL_CMD);
while addr != NULL_PID && remaining > 0 {
let ptr = l.load(addr)?.as_base();
let sst = ptr.sst::<Ver>();
let elems = sst.header().elems as usize;
if pos >= elems {
addr = ptr.box_header().link;
pos = 0;
continue;
}
let mut page_end = elems.min(pos.saturating_add(remaining));
if first_segment {
first_segment = false;
let begin = Self::lower_bound_hist_subrange(&sst, pos, page_end, &target);
let skipped = begin - pos;
pos = begin;
remaining = remaining.saturating_sub(skipped);
page_end = elems.min(pos.saturating_add(remaining));
}
while pos < page_end && remaining > 0 {
let (k, v) = sst.kv_at::<Val>(pos);
if visible(k.txid, v.group_id()) {
if v.is_tombstone() {
return Err(OpCode::NotFound);
}
let (v, r) = v.get_record(l, true);
return Ok(ValRef::new(v, r.map_or(ptr.as_box(), |x| x)));
}
pos += 1;
remaining -= 1;
}
if remaining == 0 {
break;
}
addr = ptr.box_header().link;
pos = 0;
}
Err(OpCode::NotFound)
}
fn lower_bound_hist_subrange(
sst: &crate::types::sst::Sst<Ver>,
mut lo: usize,
mut hi: usize,
target: &Ver,
) -> usize {
while lo < hi {
let mid = lo + ((hi - lo) >> 1);
let key = sst.key_at(mid);
if key.cmp(target).is_lt() {
lo = mid + 1;
} else {
hi = mid;
}
}
lo
}
pub fn traverse<F>(&self, g: &Guard, key: Key, mut visible: F) -> Result<ValRef, OpCode>
where
F: FnMut(u64, u8) -> bool,
{
let page = self.find_leaf(g, key.raw)?;
let mut result = None;
let search_key = Key::new(key.raw, Ver::new(u64::MAX, u32::MAX));
page.visit_versions(
search_key,
|x, y| {
let k = Key::decode_from(x.key());
match k.raw.cmp(y.raw) {
Equal => y.txid.cmp(&k.txid), o => o,
}
},
|x| {
let k = Key::decode_from(x.key());
if k.raw.cmp(key.raw).is_ne() {
return true;
}
let val = x.val();
if visible(k.txid, val.group_id()) {
if val.is_tombstone() {
result = Some(Err(OpCode::NotFound));
return true;
}
let (r, v) = val.get_record(&page.loader, true);
result = Some(Ok(ValRef::new(r, v.unwrap_or_else(|| x.as_box()))));
return true;
}
false
},
);
if let Some(res) = result {
return res;
}
let (k, val) = page.search_sst(&key).ok_or(OpCode::NotFound)?;
if visible(k.txid, val.group_id()) {
if val.is_tombstone() {
return Err(OpCode::NotFound);
}
let (record, r) = val.get_record(&page.loader, true);
return Ok(ValRef::new(record, r.unwrap_or_else(|| page.base_box())));
}
if let Some(hist) = val.get_hist() {
return self.traverse_hist(&page.loader, key.txid, hist, &mut visible);
}
Err(OpCode::NotFound)
}
}
pub struct Iter<'a> {
tree: &'a Tree,
cached_key: Handle<Vec<u8>>,
lo: Bound<Vec<u8>>,
hi: Bound<Vec<u8>>,
iter: Option<RawLeafIter<'a, Loader>>,
rev_iter: Option<RawLeafRevIter<'a, Loader>>,
cache: Option<Box<Node>>,
iter_bound: Option<Box<Bound<Vec<u8>>>>,
checker: Box<dyn FnMut(&Context, u64, u8) -> bool + 'a>,
filter: Filter,
guard: Guard,
}
impl Drop for Iter<'_> {
fn drop(&mut self) {
self.iter.take();
self.rev_iter.take();
self.cache.take();
self.iter_bound.take();
self.cached_key.reclaim();
}
}
impl Iter<'_> {
fn low_key(&self) -> &[u8] {
match self.lo {
Bound::Unbounded => &[],
Bound::Excluded(ref x) | Bound::Included(ref x) => x,
}
}
fn high_key(&self) -> Option<&[u8]> {
match self.hi {
Bound::Unbounded => None,
Bound::Excluded(ref x) | Bound::Included(ref x) => Some(x),
}
}
fn collapsed(&self) -> bool {
match (&self.lo, &self.hi) {
(Bound::Included(b), Bound::Included(e))
| (Bound::Excluded(b), Bound::Excluded(e))
| (Bound::Included(b), Bound::Excluded(e))
| (Bound::Excluded(b), Bound::Included(e)) => b > e,
_ => false,
}
}
fn find_leaf_for_next_back(&self) -> Result<Page, OpCode> {
if let Some(k) = self.high_key() {
let node = self.tree.find_leaf(&self.guard, k)?;
if matches!(self.hi, Bound::Excluded(_)) && node.lo() >= k {
return self
.tree
.find_prev_leaf(&self.guard, k)?
.ok_or(OpCode::NotFound);
}
return Ok(node);
}
let mut node = self.tree.find_leaf(&self.guard, self.low_key())?;
loop {
let rpid = node.header().right_sibling;
if rpid == NULL_PID {
return Ok(node);
}
let Some(next) = self.tree.load_node(&self.guard, rpid)? else {
return Err(OpCode::Again);
};
node = next;
}
}
fn get_next(&mut self) -> Option<<Self as Iterator>::Item> {
self.rev_iter.take();
'retry: while !self.collapsed() {
if self.iter.is_none() {
let node = match self.tree.find_leaf(&self.guard, self.low_key()) {
Ok(node) => node,
Err(OpCode::Again) => {
self.guard.flush();
continue;
}
Err(OpCode::NotFound) => return None,
Err(e) => panic!("iter find_leaf failed: {e:?}"),
};
let next_node = node.ref_node();
let next_bound = self.lo.clone();
if let Some(cache) = self.cache.as_mut() {
**cache = next_node;
} else {
self.cache = Some(Box::new(next_node));
}
if let Some(bound) = self.iter_bound.as_mut() {
**bound = next_bound;
} else {
self.iter_bound = Some(Box::new(next_bound));
}
let cache = self.cache.as_ref().expect("must valid");
let bound = self.iter_bound.as_ref().expect("must valid");
self.iter = Some(unsafe {
std::mem::transmute::<RawLeafIter<'_, Loader>, RawLeafIter<'_, Loader>>(
cache.successor(bound.as_ref(), self.cached_key),
)
});
}
let r = loop {
let next = {
let iter = self.iter.as_mut().expect("must valid");
iter.try_next()
};
match next {
Ok(Some(item)) => {
let ok = match &self.lo {
Bound::Unbounded => true,
Bound::Included(b) => item.cmp_key(b.as_slice()).is_ge(),
Bound::Excluded(b) => item.cmp_key(b.as_slice()).is_gt(),
};
if ok
&& (self.checker)(
&self.tree.store.context,
item.txid(),
item.group_id(),
)
&& self.filter.check(&item)
{
break Some(item);
}
}
Ok(None) => break None,
Err(OpCode::Again | OpCode::NotFound) => {
self.iter.take();
continue 'retry;
}
Err(e) => panic!("iter load failed: {e:?}"),
}
};
if let Some(item) = r {
let key = item.key();
match &mut self.lo {
Bound::Included(v) | Bound::Excluded(v) => {
v.clear();
v.extend_from_slice(key);
self.lo = Bound::Excluded(std::mem::take(v));
}
Bound::Unbounded => {
self.lo = Bound::Excluded(key.to_vec());
}
}
match self.hi {
Bound::Unbounded => return Some(item),
Bound::Included(ref h) if item.cmp_key(h.as_slice()).is_le() => {
return Some(item);
}
Bound::Excluded(ref h) if item.cmp_key(h.as_slice()).is_lt() => {
return Some(item);
}
_ => return None,
}
} else {
self.iter.take();
let node = self.cache.as_ref().expect("must valid");
if let Some(hi) = node.hi() {
self.lo = Bound::Included(hi.to_vec());
continue;
}
break;
}
}
None
}
}
impl<'a> Iterator for Iter<'a> {
type Item = IterItem<'a, Loader>;
fn next(&mut self) -> Option<Self::Item> {
self.get_next()
}
}
impl<'a> DoubleEndedIterator for Iter<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
self.iter.take();
'retry: while !self.collapsed() {
if self.rev_iter.is_none() {
let node = match self.find_leaf_for_next_back() {
Ok(node) => node,
Err(OpCode::Again) => {
self.guard.flush();
continue;
}
Err(OpCode::NotFound) => return None,
Err(e) => panic!("iter find_leaf failed: {e:?}"),
};
let next_node = node.ref_node();
if let Some(cache) = self.cache.as_mut() {
**cache = next_node;
} else {
self.cache = Some(Box::new(next_node));
}
self.rev_iter = Some(unsafe {
std::mem::transmute::<RawLeafRevIter<'_, Loader>, RawLeafRevIter<'_, Loader>>(
self.cache.as_ref().expect("must valid").predecessor(
&self.lo,
&self.hi,
self.cached_key,
),
)
});
}
let res = loop {
let next = {
let iter = self.rev_iter.as_mut().expect("must valid");
iter.try_next_back()
};
match next {
Ok(Some(item)) => {
let lo_ok = match &self.lo {
Bound::Unbounded => true,
Bound::Included(b) => item.cmp_key(b.as_slice()).is_ge(),
Bound::Excluded(b) => item.cmp_key(b.as_slice()).is_gt(),
};
let hi_ok = match &self.hi {
Bound::Unbounded => true,
Bound::Included(h) => item.cmp_key(h.as_slice()).is_le(),
Bound::Excluded(h) => item.cmp_key(h.as_slice()).is_lt(),
};
if lo_ok
&& hi_ok
&& (self.checker)(
&self.tree.store.context,
item.txid(),
item.group_id(),
)
&& self.filter.check(&item)
{
break Some(item);
}
}
Ok(None) => break None,
Err(OpCode::Again | OpCode::NotFound) => {
self.rev_iter.take();
continue 'retry;
}
Err(e) => panic!("iter load failed: {e:?}"),
}
};
if let Some(item) = res {
let key = item.key();
match &mut self.hi {
Bound::Included(v) | Bound::Excluded(v) => {
v.clear();
v.extend_from_slice(key);
self.hi = Bound::Excluded(std::mem::take(v));
}
Bound::Unbounded => {
self.hi = Bound::Excluded(key.to_vec());
}
}
return Some(item);
}
self.rev_iter.take();
let lo = self.cache.as_ref().expect("must valid").lo();
if lo.is_empty() {
return None;
}
self.hi = Bound::Excluded(lo.to_vec());
}
None
}
}
struct Filter {
has_last: bool,
}
impl Filter {
fn check<L: ILoader>(&mut self, item: &IterItem<L>) -> bool {
if self.has_last && item.cmp_key(item.key()).is_eq() {
return false;
}
let _ = item.assembled_key();
self.has_last = true;
!item.is_tombstone()
}
}
#[cfg(test)]
mod test {
use crate::{Mace, Options, RandomPath};
use std::thread;
#[test]
fn concurrent_page_hit() {
let path = RandomPath::tmp();
let mut opt = Options::new(&*path);
opt.split_elems = 256;
opt.tmp_store = true;
let mace = Mace::new(opt.validate().unwrap()).unwrap();
let db = mace.new_bucket("default").unwrap();
let num_readers = 4;
let num_iterations = 1000;
thread::scope(|s| {
for _ in 0..num_readers {
let db = db.clone();
s.spawn(move || {
for _ in 0..num_iterations {
let view = db.view().unwrap();
let mut count = 0;
for _ in view.seek("key") {
count += 1;
}
assert!(count >= 0);
}
});
}
s.spawn(|| {
for i in 0..num_iterations {
let kv = db.begin().unwrap();
let key = format!("key_{:05}", i);
kv.put(&key, &key).unwrap();
kv.commit().unwrap();
}
});
});
}
}