use parking_lot::{Mutex, MutexGuard, RwLock};
use std::{
cmp::Ordering::{self, Equal, Greater, Less},
ops::{Bound, Deref, DerefMut},
sync::{
Arc,
atomic::{
AtomicU8, AtomicU64,
Ordering::{AcqRel, Acquire, Relaxed},
},
},
};
use crate::{
types::{
base::BaseIter,
data::{
Index, IntlKey, IntlSeg, IterItem, Key, LeafSeg, Record, Val, Ver,
cmp_raw_with_prefixed_tail,
},
header::{BoxHeader, NodeType},
refbox::{BaseView, BoxView, DeltaView, RemoteView},
traits::{
IAsBoxRef, IBoxHeader, IDecode, IFrameAlloc, IHeader, IKey, ILoader, IRetireSink, IVal,
},
},
utils::{
Handle, NULL_ADDR, NULL_CMD, NULL_ORACLE, NULL_PID, OpCode,
imtree::{ImTree, Iter, RangeIter},
},
};
use super::{header::TagKind, refbox::BoxRef};
pub(crate) struct NodeState {
addr: u64,
total_size: usize,
max_txid: u64,
delta: ImTree<DeltaView>,
}
pub(crate) struct NodeSmoState {
merge_state: AtomicU8,
merging_child: AtomicU64,
}
impl NodeSmoState {
const MERGE_IDLE: u8 = 0;
const MERGE_ACTIVE: u8 = 1;
fn new() -> Self {
Self {
merge_state: AtomicU8::new(Self::MERGE_IDLE),
merging_child: AtomicU64::new(NULL_PID),
}
}
}
pub(crate) struct Node<L: ILoader> {
pub(crate) loader: L,
mtx: Arc<Mutex<()>>,
smo: Arc<NodeSmoState>,
pub(crate) state: RwLock<NodeState>,
inner: BaseView,
}
fn intl_cmp(x: &DeltaView, y: &DeltaView) -> Ordering {
let l = IntlKey::decode_from(x.key());
let r = IntlKey::decode_from(y.key());
l.raw.cmp(r.raw)
}
fn leaf_cmp(x: &DeltaView, y: &DeltaView) -> Ordering {
let l = Key::decode_from(x.key());
let r = Key::decode_from(y.key());
l.cmp(&r)
}
fn null_cmp(_x: &DeltaView, _y: &DeltaView) -> Ordering {
unimplemented!()
}
pub type Junks = Vec<u64>;
impl<L: ILoader> Drop for Node<L> {
fn drop(&mut self) {}
}
impl<L> Node<L>
where
L: ILoader,
{
pub(crate) fn new(loader: L, b: BoxRef) -> Self {
Self::new_with_mtx(loader, b, Arc::new(Mutex::new(())))
}
fn new_with_mtx(loader: L, b: BoxRef, mtx: Arc<Mutex<()>>) -> Self {
let h = b.header();
let (addr, total_size, max_txid) = (h.addr, h.total_size as usize, h.txid);
let base = b.view().as_base();
loader.pin(b);
Self {
loader,
mtx,
smo: Arc::new(NodeSmoState::new()),
state: RwLock::new(NodeState {
addr,
total_size,
max_txid,
delta: ImTree::new(if base.header().is_index {
intl_cmp
} else {
leaf_cmp
}),
}),
inner: base,
}
}
fn save(&self, b: BoxRef, r: Option<BoxRef>) {
self.loader.pin(b);
if let Some(x) = r {
self.loader.pin(x);
}
}
pub(crate) fn reference(&self) -> Self {
let state = self.state.read();
Self {
loader: self.loader.shallow_copy(),
mtx: self.mtx.clone(),
smo: self.smo.clone(),
state: RwLock::new(NodeState {
addr: state.addr,
total_size: state.total_size,
max_txid: state.max_txid,
delta: state.delta.clone(),
}),
inner: self.inner,
}
}
pub(crate) fn pid(&self) -> u64 {
self.inner.box_header().pid
}
pub(crate) fn set_pid(&mut self, pid: u64) {
self.inner.box_header_mut().pid = pid;
}
pub(crate) fn new_leaf<A: IFrameAlloc>(a: &mut A, loader: L) -> Node<L> {
Self::try_new_leaf(a, loader).expect("node leaf alloc fail")
}
pub(crate) fn try_new_leaf<A: IFrameAlloc>(a: &mut A, loader: L) -> Result<Node<L>, OpCode> {
let empty: &[(LeafSeg, Val)] = &[];
let mut iter = empty.iter().map(|x| (x.0, x.1));
let b = BaseView::try_new_leaf(a, &loader, [].as_slice(), None, NULL_PID, &mut iter, 0)?;
Ok(Self::new(loader, b))
}
pub(crate) fn try_new_root<A: IFrameAlloc>(
a: &mut A,
loader: L,
item: &[(IntlKey, Index)],
) -> Result<Node<L>, OpCode> {
let b = BaseView::try_new_intl(
a,
[].as_slice(),
None,
NULL_PID,
|| item.iter().map(|&(x, y)| (IntlSeg::new(&[], x.raw), y)),
0,
)?;
Ok(Self::new(loader, b))
}
pub(crate) fn size(&self) -> usize {
self.state.read().total_size
}
pub(crate) fn base_addr(&self) -> u64 {
self.inner.box_header().addr
}
pub(crate) fn base_box(&self) -> BoxRef {
self.inner.as_box()
}
pub(crate) fn garbage_collect<A: IRetireSink>(&self, a: &mut A, junks: &[u64]) {
self.state
.read()
.delta
.iter()
.for_each(|x| a.collect(&[x.box_header().addr]));
a.collect(junks);
a.collect(&[self.base_addr()]);
}
pub(crate) fn load(addr: u64, loader: L) -> Result<Self, OpCode> {
let d = loader.load(addr)?;
let mut l = Self {
loader,
mtx: Arc::new(Mutex::new(())),
smo: Arc::new(NodeSmoState::new()),
state: RwLock::new(NodeState {
addr: d.addr,
total_size: d.total_size as usize,
max_txid: d.header().txid,
delta: ImTree::new(null_cmp),
}),
inner: BaseView::null(),
};
Self::load_inner(&mut l, d)?;
Ok(l)
}
fn set_comparator(&mut self, nt: NodeType) {
let mut state = self.state.write();
if nt == NodeType::Intl {
state.delta.set_comparator(intl_cmp);
} else {
state.delta.set_comparator(leaf_cmp);
}
}
pub(crate) fn should_split(&self, split_elem: u16) -> bool {
let h = self.header();
let size_limited = h.elems >= split_elem;
let no_conflict = !self.is_smo_busy() && h.elems >= 2;
size_limited && no_conflict
}
pub(crate) fn should_merge(&self) -> bool {
let h = self.header();
let size_limited = h.split_elems >= h.elems * 4;
size_limited && !self.is_smo_busy()
}
pub(crate) fn can_merge_child_runtime(&self, child_lo: &[u8], child_pid: u64) -> bool {
debug_assert_eq!(self.box_header().node_type, NodeType::Intl);
if self.runtime_merging_child() != NULL_PID || self.runtime_merging() {
return false;
}
if self.delta_len() > 0 {
return false;
}
let h = self.header();
if h.elems == 0 || child_lo < self.lo() {
return false;
}
if let Some(hi) = self.hi()
&& child_lo >= hi
{
return false;
}
let (_, pid) = self.child_index(child_lo);
pid == child_pid
}
pub(crate) fn runtime_merging(&self) -> bool {
self.smo.merge_state.load(Acquire) == NodeSmoState::MERGE_ACTIVE
}
pub(crate) fn runtime_merging_child(&self) -> u64 {
self.smo.merging_child.load(Acquire)
}
pub(crate) fn try_mark_runtime_merging(&self) -> bool {
let cur = self.smo.merge_state.load(Acquire);
if cur == NodeSmoState::MERGE_ACTIVE {
return true;
}
if cur != NodeSmoState::MERGE_IDLE {
return false;
}
self.smo
.merge_state
.compare_exchange(
NodeSmoState::MERGE_IDLE,
NodeSmoState::MERGE_ACTIVE,
AcqRel,
Acquire,
)
.is_ok()
}
pub(crate) fn clear_runtime_merging(&self) {
self.smo
.merge_state
.store(NodeSmoState::MERGE_IDLE, Relaxed);
}
pub(crate) fn try_mark_runtime_merging_child(&self, pid: u64) -> bool {
let cur = self.smo.merging_child.load(Acquire);
if cur == pid {
return true;
}
if cur != NULL_PID {
return false;
}
self.smo
.merging_child
.compare_exchange(NULL_PID, pid, AcqRel, Acquire)
.is_ok()
}
pub(crate) fn clear_runtime_merging_child(&self, pid: u64) -> bool {
self.smo
.merging_child
.compare_exchange(pid, NULL_PID, AcqRel, Acquire)
.is_ok()
}
pub(crate) fn is_smo_busy(&self) -> bool {
self.runtime_merging() || self.runtime_merging_child() != NULL_PID
}
pub(crate) fn try_merge_node<A: IFrameAlloc>(
&self,
a: &mut A,
other: &Node<L>,
safe_txid: u64,
) -> Result<(Node<L>, Junks, Junks), OpCode> {
let (lb, lj) = self.try_merge_to_base(a, safe_txid)?;
let (rb, rj) = other.try_merge_to_base(a, safe_txid)?;
let (lhs, rhs) = (lb.view().as_base(), rb.view().as_base());
#[cfg(feature = "extra_check")]
assert_ne!(self.base_addr(), other.base_addr());
let merged = lhs.try_merge(a, &self.loader, rhs, safe_txid)?;
let mut node = Self::new(self.loader.deep_copy(), merged);
node.header_mut().split_elems = self.header().split_elems;
Ok((node, lj, rj))
}
pub(crate) fn child_index(&self, k: &[u8]) -> (bool, u64) {
#[cfg(feature = "extra_check")]
{
assert!(self.header().is_index);
assert!(k >= self.lo());
if let Some(hi) = self.hi() {
assert!(hi > k);
}
}
let sst = self.sst::<IntlKey>();
let pos = match sst.search_by(&IntlKey::new(k), |x, y| x.raw.cmp(y.raw)) {
Ok(pos) => pos,
Err(pos) => pos.max(1) - 1,
};
let (_, v) = sst.kv_at::<Index>(pos);
(pos == 0, v.pid)
}
pub(crate) fn try_insert_index<A: IFrameAlloc>(
&self,
a: &mut A,
key: &[u8],
pid: u64,
safe_txid: u64,
) -> Result<Option<(Node<L>, Junks)>, OpCode> {
#[cfg(feature = "extra_check")]
if key < self.lo()
|| if let Some(hi) = self.hi() {
hi <= key
} else {
false
}
{
panic!("somehow it happens");
}
let b = DeltaView::try_from_key_index(a, IntlKey::new(key), Index::new(pid), safe_txid)?;
let view = b.view().as_delta();
Ok(Some(self.insert(view).try_compact(a, safe_txid)?)) }
fn decode_pefix<K>(
&self,
pos: usize,
prefix_len: usize,
lo: &[u8],
hi: &Option<&[u8]>,
) -> (Vec<u8>, (usize, usize))
where
K: IKey,
{
let k = self.sst::<K>().key_at(pos);
let mut sep = Vec::with_capacity(prefix_len + k.raw().len());
sep.extend_from_slice(&lo[..prefix_len]);
sep.extend_from_slice(k.raw());
let new_prefix_len = (
BaseView::calc_prefix(lo, &Some(sep.as_slice())),
BaseView::calc_prefix(sep.as_slice(), hi),
);
(sep, new_prefix_len)
}
pub(crate) fn try_split<A: IFrameAlloc>(
&self,
a: &mut A,
) -> Result<(Node<L>, Node<L>), OpCode> {
let h = self.inner.header();
let prefix_len = h.prefix_len as usize;
let elems = h.elems as usize;
let sep = elems / 2;
let lo = self.lo();
let hi = self.hi();
let sibling = self.header().right_sibling;
let txid = self.box_header().txid;
let (l, r) = if h.is_index {
let (sep_key, (llen, rlen)) = self.decode_pefix::<IntlKey>(sep, prefix_len, lo, &hi);
let lhs_prefix = &lo[..llen];
let rhs_prefix = &sep_key[..rlen];
let (ld, rd) = (lhs_prefix.len() - prefix_len, rhs_prefix.len() - prefix_len);
(
BaseView::try_new_intl(
a,
lo,
Some(sep_key.as_slice()),
sibling,
|| {
self.inner
.range_iter::<L, IntlKey>(&self.loader, 0, sep)
.map(|(k, v)| (IntlSeg::new(lhs_prefix, &k.raw[ld..]), v))
},
txid,
)?,
BaseView::try_new_intl(
a,
sep_key.as_slice(),
hi,
sibling,
|| {
self.inner
.range_iter::<L, IntlKey>(&self.loader, sep, elems)
.map(|(k, v)| (IntlSeg::new(rhs_prefix, &k.raw[rd..]), v))
},
txid,
)?,
)
} else {
let (sep_key, (llen, rlen)) = self.decode_pefix::<Key>(sep, prefix_len, lo, &hi);
let lhs_prefix = &lo[..llen];
let (ld, rd) = (lhs_prefix.len() - prefix_len, rlen - prefix_len);
let mut liter = self
.inner
.range_iter::<L, Key>(&self.loader, 0, sep)
.map(|(k, v)| (LeafSeg::new(lhs_prefix, &k.raw[ld..], k.ver), v));
let mut riter = self
.inner
.range_iter::<L, Key>(&self.loader, sep, elems)
.map(|(k, v)| (LeafSeg::new(&sep_key[..rlen], &k.raw[rd..], k.ver), v));
(
BaseView::try_new_leaf(
a,
&self.loader,
lo,
Some(sep_key.as_slice()),
sibling,
&mut liter,
txid,
)?,
BaseView::try_new_leaf(
a,
&self.loader,
sep_key.as_slice(),
hi,
sibling,
&mut riter,
txid,
)?,
)
};
let (mut lhs, mut rhs) = (
Self::new(self.loader.deep_copy(), l),
Self::new(self.loader.deep_copy(), r),
);
lhs.header_mut().split_elems = sep as u16;
rhs.header_mut().split_elems = (elems - sep) as u16;
Ok((lhs, rhs))
}
pub(crate) fn try_split_overlay<A: IFrameAlloc>(
&self,
a: &mut A,
safe_txid: u64,
) -> Result<(Node<L>, Node<L>, Junks), OpCode> {
if self.delta_len() == 0 {
let (lhs, rhs) = self.try_split(a)?;
return Ok((lhs, rhs, Junks::new()));
}
let (node, junks) = self.try_compact(a, safe_txid)?;
let (lhs, rhs) = node.try_split(a)?;
Ok((lhs, rhs, junks))
}
#[allow(clippy::iter_skip_zero)]
pub(crate) fn intl_iter(&'_ self) -> IntlIter<'_, L> {
debug_assert_eq!(self.box_header().node_type, NodeType::Intl);
let len = self.header().prefix_len as usize;
let lo = self.lo();
IntlIter {
prefix: &lo[..len],
next_l: None,
next_r: None,
sst_iter: self
.inner
.range_iter(&self.loader, 0, self.inner.header().elems as usize),
delta_iter: IterAdaptor::Iter(self.state.read().delta.iter().skip(0)),
}
}
#[allow(clippy::iter_skip_zero)]
fn leaf_iter(&'_ self, safe_txid: u64) -> LeafIter<'_, L> {
debug_assert_eq!(self.box_header().node_type, NodeType::Leaf);
let len = self.header().prefix_len as usize;
let lo = self.lo();
LeafIter {
prefix: &lo[..len],
next_l: None,
next_r: None,
sst_iter: self
.inner
.range_iter(&self.loader, 0, self.inner.header().elems as usize),
delta_iter: IterAdaptor::Iter(self.state.read().delta.iter().skip(0)),
filter: LeafFilter {
txid: safe_txid,
last: None,
junks: Vec::new(),
skip_dup: false,
},
}
}
pub(crate) fn try_compact<A: IFrameAlloc>(
&self,
a: &mut A,
safe_txid: u64,
) -> Result<(Node<L>, Junks), OpCode> {
let (b, j) = self.try_merge_to_base(a, safe_txid)?;
let node = Self::new(self.loader.deep_copy(), b);
node.smo
.merge_state
.store(self.smo.merge_state.load(Acquire), Relaxed);
node.smo
.merging_child
.store(self.smo.merging_child.load(Acquire), Relaxed);
Ok((node, j))
}
fn try_merge_to_base<A: IFrameAlloc>(
&self,
a: &mut A,
safe_txid: u64,
) -> Result<(BoxRef, Junks), OpCode> {
let h = self.header();
let lo = self.lo();
let hi = self.hi();
if h.is_index {
Ok((
BaseView::try_new_intl(a, lo, hi, h.right_sibling, || self.intl_iter(), safe_txid)?,
Vec::new(),
))
} else {
let mut iter = self.leaf_iter(safe_txid);
let b = BaseView::try_new_leaf(
a,
&self.loader,
lo,
hi,
h.right_sibling,
&mut iter,
safe_txid,
)?;
Ok((b, iter.filter.junks))
}
}
pub(crate) fn try_remove_index<A: IFrameAlloc>(
&self,
a: &mut A,
child_pid: u64,
safe_txid: u64,
) -> Result<Option<(Node<L>, Junks)>, OpCode> {
assert_eq!(self.box_header().node_type, NodeType::Intl);
let key = self
.intl_iter()
.find(|(_, idx)| idx.pid == child_pid)
.map(|(k, _)| k);
let Some(key) = key else {
return Ok(None);
};
let b = DeltaView::try_from_key_index(a, key, Index::null(), safe_txid)?;
let tmp = self.insert(b.view().as_delta());
let (mut p, j) = tmp.try_compact(a, safe_txid)?;
p.header_mut().split_elems = self.header().split_elems + 1;
Ok(Some((p, j)))
}
pub(crate) fn insert(&self, mut k: DeltaView) -> Node<L> {
let h = k.box_header_mut();
let th = self.box_header();
let state = self.state.read();
h.link = state.addr;
h.node_type = th.node_type;
h.pid = th.pid;
Node {
loader: self.loader.shallow_copy(),
mtx: self.mtx.clone(),
smo: self.smo.clone(),
state: RwLock::new(NodeState {
addr: h.addr,
total_size: state.total_size + h.total_size as usize,
max_txid: h.txid,
delta: state.delta.update(k),
}),
inner: self.inner,
}
}
fn insert_inplace(&self, mut k: DeltaView) {
let h = k.box_header_mut();
let th = self.box_header();
let mut state = self.state.write();
h.link = state.addr;
h.node_type = th.node_type;
h.pid = th.pid;
state.addr = h.addr;
state.total_size += h.total_size as usize;
state.max_txid = h.txid;
state.delta.put(k);
}
pub(crate) fn lock(&'_ self) -> NodeGuard<'_, L> {
NodeGuard {
_guard: self.mtx.lock(),
node: self,
}
}
pub(crate) fn try_lock(&'_ self) -> Option<NodeGuard<'_, L>> {
let guard = self.mtx.try_lock()?;
Some(NodeGuard {
_guard: guard,
node: self,
})
}
pub(crate) fn latest_addr(&self) -> u64 {
self.state.read().addr
}
pub(crate) fn has_garbage(&self, safe_txid: u64) -> bool {
if self.header().is_index {
return false;
}
let state = self.state.read();
if state.max_txid <= safe_txid {
return self.inner.header().has_multiple_versions;
}
false
}
pub(crate) fn find_latest<K>(&self, key: &K) -> Option<(K, Record, BoxRef)>
where
K: IKey,
{
debug_assert!(!self.inner.header().is_index);
let mut result = None;
self.visit_versions(
*key,
|x, y| K::decode_from(x.key()).cmp(y),
|x| {
let k = K::decode_from(x.key());
if k.raw() == key.raw() {
let v = x.val();
let (v, r) = v.get_record(&self.loader, true);
result = Some((k, v, r.unwrap_or_else(|| self.base_box())));
return true;
}
false
},
);
if let Some(res) = result {
return Some(res);
}
self.search_sst(key).map(|(k, v)| {
let (v, r) = v.get_record(&self.loader, true);
(k, v, r.unwrap_or_else(|| self.base_box()))
})
}
pub(crate) fn search_sst<'a, K: IKey>(&self, key: &K) -> Option<(K, Val<'a>)> {
debug_assert_eq!(self.box_header().node_type, NodeType::Leaf);
if self.header().elems > 0 {
let sst = self.inner.sst::<K>();
let pos = sst.search_by(key, |x, y| x.raw().cmp(y.raw())).ok()?;
Some(sst.kv_at(pos))
} else {
None
}
}
pub(crate) fn visit_versions<K, F>(&self, key: K, cmp: fn(&DeltaView, &K) -> Ordering, mut f: F)
where
K: IKey,
F: FnMut(DeltaView) -> bool,
{
self.state.read().delta.visit_from(&key, cmp, &mut f);
}
#[allow(unused)]
pub(crate) fn show(&self) {
let h = self.box_header();
let state = self.state.read();
log::debug!(
"---------- show delta {} {:?} elems {} ----------",
h.pid,
h.addr,
state.delta.len()
);
if self.header().is_index {
let it = state.delta.iter();
for x in it {
let k = IntlKey::decode_from(x.key());
let v = Index::decode_from(x.index());
log::debug!("{} => {}", k.to_string(), v);
}
let sst = self.sst::<IntlKey>();
sst.show_intl(h.pid, h.addr);
} else {
let it = state.delta.iter();
for x in it {
let k = Key::decode_from(x.key());
let val = x.val();
let (r, _) = val.get_record(&self.loader, true);
log::debug!("{} => {}", k.to_string(), r);
}
let sst = self.sst::<Key>();
sst.show_leaf(&self.loader, h.pid, h.addr);
}
}
pub(crate) fn box_header(&self) -> &BoxHeader {
self.inner.box_header()
}
pub(crate) fn delta_len(&self) -> usize {
self.state.read().delta.len()
}
fn load_inner(l: &mut Node<L>, mut d: BoxView) -> Result<(), OpCode> {
let mut one_base = true;
let mut last_type = None;
loop {
let h = d.header();
let state = l.state.get_mut();
state.total_size += d.total_size as usize;
state.max_txid = state.max_txid.max(h.txid);
if let Some(t) = last_type {
assert_eq!(t, h.node_type);
} else {
l.set_comparator(h.node_type);
}
last_type = Some(h.node_type);
match h.kind {
TagKind::Delta => {
let delta = d.as_delta();
l.state.get_mut().delta.put(delta);
}
TagKind::Base => {
assert!(one_base);
one_base = false;
l.inner = d.as_base();
}
_ => unreachable!("bad kind {:?}", h.kind),
}
if d.link == NULL_ADDR {
break;
}
d = l.loader.load(d.link)?;
}
assert!(!l.inner.is_null());
Ok(())
}
#[allow(clippy::iter_skip_zero)]
pub(crate) fn successor<'a>(
&'a self,
b: &'a Bound<Vec<u8>>,
cached_key: Handle<Vec<u8>>,
) -> RawLeafIter<'a, L> {
fn cmp_fn(x: &DeltaView, y: &&[u8]) -> Ordering {
Key::decode_from(x.key()).raw.cmp(y)
}
fn equal_fn(_x: &DeltaView, _y: &&[u8]) -> bool {
true
}
let state = self.state.read();
let (delta, pos) = match b {
Bound::Unbounded => (IterAdaptor::Iter(state.delta.iter().skip(0)), 0),
Bound::Included(b) => {
let r = state
.delta
.range_from(b.as_slice(), cmp_fn, equal_fn)
.skip(0);
let lo = self.lo();
let pos = if b.as_slice() < lo {
Err(0)
} else {
let key = Key::new(b, Ver::new(NULL_ORACLE, NULL_CMD));
self.sst::<Key>().lower_bound(&key)
};
(IterAdaptor::Range(r), pos.unwrap_or_else(|x| x))
}
Bound::Excluded(b) => {
let iter = state.delta.range_from(b.as_slice(), cmp_fn, equal_fn);
let delta = if let Some(cur) = iter.peek()
&& Key::decode_from(cur.key()).raw == b.as_slice()
{
iter.skip(1)
} else {
iter.skip(0)
};
let lo = self.lo();
let inner_pos = if b.as_slice() < lo {
Err(0)
} else {
let key = Key::new(b, Ver::new(NULL_ORACLE, NULL_CMD));
self.sst::<Key>().lower_bound(&key)
};
let pos = match inner_pos {
Ok(x) => x + 1, Err(x) => x,
};
(IterAdaptor::Range(delta), pos)
}
};
let lo = self.lo();
let len = self.header().prefix_len as usize;
RawLeafIter {
cached_key,
prefix: &lo[..len],
next_l: None,
next_r: None,
delta_iter: delta,
sst_iter: self
.inner
.range_iter(&self.loader, pos, self.header().elems as usize),
}
}
}
pub(crate) struct NodeGuard<'a, L: ILoader> {
_guard: MutexGuard<'a, ()>,
node: &'a Node<L>,
}
impl<L> NodeGuard<'_, L>
where
L: ILoader,
{
pub(crate) fn insert(&self, k: BoxRef, v: Option<BoxRef>) {
self.node.insert_inplace(k.view().as_delta());
self.node.save(k, v); }
}
impl<L> Deref for Node<L>
where
L: ILoader,
{
type Target = BaseView;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<L> DerefMut for Node<L>
where
L: ILoader,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
enum IterAdaptor<'a, T> {
Iter(std::iter::Skip<Iter<'a, DeltaView>>),
Range(std::iter::Skip<RangeIter<'a, DeltaView, T>>),
}
impl<'a, T> Iterator for IterAdaptor<'a, T> {
type Item = DeltaView;
fn next(&mut self) -> Option<Self::Item> {
match self {
IterAdaptor::Iter(i) => i.next(),
IterAdaptor::Range(r) => r.next(),
}
}
}
pub(crate) struct IntlIter<'a, L>
where
L: ILoader,
{
prefix: &'a [u8],
next_l: Option<(IntlSeg<'a>, Index)>,
next_r: Option<(IntlSeg<'a>, Index)>,
sst_iter: BaseIter<'a, L, IntlKey<'a>>,
delta_iter: IterAdaptor<'a, &'a [u8]>,
}
pub(crate) struct LeafIter<'a, L>
where
L: ILoader,
{
prefix: &'a [u8],
next_l: Option<(LeafSeg<'a>, Val<'a>)>,
next_r: Option<(LeafSeg<'a>, Val<'a>)>,
sst_iter: BaseIter<'a, L, Key<'a>>,
delta_iter: IterAdaptor<'a, &'a [u8]>,
filter: LeafFilter<'a>,
}
pub(crate) struct RawLeafIter<'a, L>
where
L: ILoader,
{
cached_key: Handle<Vec<u8>>,
prefix: &'a [u8],
next_l: Option<IterItem<'a, L>>,
next_r: Option<IterItem<'a, L>>,
sst_iter: BaseIter<'a, L, Key<'a>>,
delta_iter: IterAdaptor<'a, &'a [u8]>,
}
impl<'a, L> Iterator for IntlIter<'a, L>
where
L: ILoader,
{
type Item = (IntlSeg<'a>, Index);
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.next_l.is_none()
&& let Some(x) = self.delta_iter.next()
{
let k = IntlKey::decode_from(x.key());
self.next_l = Some((
IntlSeg::new(self.prefix, &k.raw[self.prefix.len()..]),
Index::decode_from(x.index()),
));
}
if self.next_r.is_none()
&& let Some((k, v)) = self.sst_iter.next()
{
self.next_r = Some((IntlSeg::new(self.prefix, k.raw), v));
}
match (self.next_l, self.next_r) {
(None, None) => return None,
(None, Some(x)) => {
self.next_r = None;
return Some(x);
}
(Some(x), None) => {
self.next_l = None;
debug_assert!(!x.1.is_tombstone());
return Some(x);
}
(Some(l), Some(r)) => match l.0.raw_cmp(&r.0) {
Equal => {
self.next_l = None;
self.next_r = None;
if l.1.is_tombstone() {
continue;
}
return Some(l);
}
Greater => {
self.next_r = None;
return Some(r);
}
Less => {
self.next_l = None;
return Some(l);
}
},
}
}
}
}
impl<'a, L> Iterator for LeafIter<'a, L>
where
L: ILoader,
{
type Item = (LeafSeg<'a>, Val<'a>);
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.next_r.is_none() {
let (sst_iter, filter) = (&mut self.sst_iter, &mut self.filter);
if let Some((k, v)) = sst_iter.next_with_sibling(|addr| filter.junks.push(addr)) {
self.next_r = Some((LeafSeg::new(self.prefix, k.raw, k.ver), v));
}
}
if self.next_l.is_none()
&& let Some(x) = self.delta_iter.next()
{
let k = Key::decode_from(x.key());
let v = x.val();
self.next_l = Some((
LeafSeg::new(self.prefix, &k.raw[self.prefix.len()..], k.ver),
v,
));
}
match (self.next_l, self.next_r) {
(None, None) => return None,
(None, Some(r)) => {
self.next_r = None;
if self.filter.check(&(r.0, r.1)) {
return Some(r);
}
}
(Some(l), None) => {
self.next_l = None;
if self.filter.check(&(l.0, l.1)) {
return Some(l);
}
}
(Some(l), Some(r)) => match l.0.cmp(&r.0) {
Less => {
self.next_l = None;
if self.filter.check(&(l.0, l.1)) {
return Some(l);
}
}
Greater => {
self.next_r = None;
if self.filter.check(&(r.0, r.1)) {
return Some(r);
}
}
Equal => unreachable!("never happen"),
},
}
}
}
}
impl<'a, L> Iterator for RawLeafIter<'a, L>
where
L: ILoader,
{
type Item = IterItem<'a, L>;
fn next(&mut self) -> Option<Self::Item> {
if self.next_l.is_none()
&& let Some(x) = self.delta_iter.next()
{
let k = Key::decode_from(x.key());
self.next_l = Some(IterItem::new(
self.cached_key,
&[],
k,
x.val(),
self.sst_iter.loader,
));
}
if self.next_r.is_none()
&& let Some((k, val)) = self.sst_iter.next()
{
self.next_r = Some(IterItem::new(
self.cached_key,
self.prefix,
k,
val,
self.sst_iter.loader,
));
}
return match (self.next_l.take(), self.next_r.take()) {
(None, None) => None,
(None, Some(r)) => Some(r),
(Some(l), None) => Some(l),
(Some(l), Some(r)) => {
let ord = if self.prefix.is_empty() {
l.cmp(&r)
} else {
cmp_raw_with_prefixed_tail(l.base.raw, self.prefix, r.base.raw)
};
match ord {
Less => {
self.next_r = Some(r);
Some(l)
}
Greater => {
self.next_l = Some(l);
Some(r)
}
Equal => {
self.next_r = Some(r);
Some(l)
}
}
}
};
}
}
struct LeafFilter<'a> {
txid: u64,
last: Option<&'a [u8]>,
junks: Junks,
skip_dup: bool,
}
impl<'a> LeafFilter<'a> {
fn check_impl(&mut self, k: &LeafSeg<'a>, v: &Val) -> bool {
if let Some(last) = self.last
&& last == k.raw()
{
if self.skip_dup {
return false;
}
if k.txid() > self.txid {
return true;
}
self.skip_dup = true;
return !v.is_tombstone();
}
self.last = Some(k.raw());
self.skip_dup = k.txid() <= self.txid;
if v.is_tombstone() && self.skip_dup {
return false;
}
true
}
#[inline(always)]
fn check(&mut self, x: &(LeafSeg<'a>, Val)) -> bool {
let (k, v) = x;
if !self.check_impl(k, v) {
self.collect(v);
false
} else {
true
}
}
fn collect(&mut self, v: &Val) {
let remote = v.get_remote();
if remote != NULL_ADDR {
self.junks.push(RemoteView::tag(remote));
}
}
}
#[cfg(test)]
mod test {
use parking_lot::Mutex;
use std::{
collections::HashMap,
rc::Rc,
sync::atomic::{AtomicU64, Ordering::Relaxed},
};
use crate::{
Options,
types::{
data::{Key, LeafSeg, Record, Val, Ver},
node::Node,
refbox::{BaseView, BoxRef, BoxView, DeltaView},
traits::{ICodec, IFrameAlloc, IHeader, ILoader, IRetireSink},
},
utils::{NULL_ADDR, NULL_PID, OpCode},
};
struct AInner {
map: Mutex<HashMap<u64, BoxRef>>,
off: AtomicU64,
collected: Mutex<Vec<u64>>,
arena_size: usize,
}
#[derive(Clone)]
struct A {
inner: Rc<AInner>,
}
impl A {
fn new() -> Self {
Self::new_with_arena_size(64 << 20)
}
fn new_with_arena_size(arena_size: usize) -> Self {
Self {
inner: Rc::new(AInner {
map: Mutex::new(HashMap::new()),
off: AtomicU64::new(0),
collected: Mutex::new(Vec::new()),
arena_size,
}),
}
}
fn load(&self, addr: u64) -> BoxRef {
let lk = self.inner.map.lock();
lk.get(&addr).unwrap().clone()
}
fn take_collected(&self) -> Vec<u64> {
let mut lk = self.inner.collected.lock();
std::mem::take(&mut *lk)
}
}
impl IFrameAlloc for A {
fn try_alloc(&mut self, size: u32) -> Result<BoxRef, OpCode> {
let addr = self
.inner
.off
.fetch_add(BoxRef::real_size(size) as u64, Relaxed);
let p = BoxRef::alloc(size, addr);
let mut lk = self.inner.map.lock();
lk.insert(addr, p.clone());
Ok(p)
}
fn try_alloc_pair(&mut self, size1: u32, size2: u32) -> Result<(BoxRef, BoxRef), OpCode> {
Ok((self.try_alloc(size1)?, self.try_alloc(size2)?))
}
fn arena_size(&mut self) -> usize {
self.inner.arena_size
}
fn inline_size(&self) -> usize {
Options::MIN_INLINE_SIZE
}
}
impl IRetireSink for A {
fn collect(&mut self, addr: &[u64]) {
let mut lk = self.inner.collected.lock();
lk.extend_from_slice(addr);
}
}
impl ILoader for A {
fn load(&self, addr: u64) -> Result<BoxView, OpCode> {
Ok(self.load(addr).view())
}
fn pin(&self, data: BoxRef) {
let mut lk = self.inner.map.lock();
lk.insert(data.header().addr, data);
}
fn shallow_copy(&self) -> Self {
self.clone()
}
fn deep_copy(&self) -> Self {
self.clone()
}
fn load_remote(&self, addr: u64) -> Result<BoxRef, OpCode> {
Ok(self.load(addr))
}
}
#[test]
fn split_overlay_keeps_delta_changes() {
let mut a = A::new();
let txid = AtomicU64::new(1);
let l = a.clone();
let mut node = Node::new_leaf(&mut a, l);
for raw in ["a", "b", "c", "d"] {
let k = Key::new(raw.as_bytes(), Ver::new(txid.fetch_add(1, Relaxed), 1));
let v = Record::normal(1, raw.as_bytes());
let (d, r) = DeltaView::try_from_key_val(&mut a, &k, &v).unwrap();
node.insert_inplace(d.view().as_delta());
node.save(d, r);
}
(node, _) = node.try_compact(&mut a, u64::MAX).unwrap();
let k = Key::new("zz".as_bytes(), Ver::new(txid.fetch_add(1, Relaxed), 1));
let v = Record::normal(1, "zz".as_bytes());
let (d, r) = DeltaView::try_from_key_val(&mut a, &k, &v).unwrap();
node.insert_inplace(d.view().as_delta());
node.save(d, r);
let probe = Key::new("zz".as_bytes(), Ver::new(u64::MAX, 0));
let (l_new, r_new, _) = node.try_split_overlay(&mut a, u64::MAX).unwrap();
let (l_old, r_old) = node.try_split(&mut a).unwrap();
assert!(l_old.find_latest(&probe).is_none());
assert!(r_old.find_latest(&probe).is_none());
assert!(l_new.find_latest(&probe).is_some() || r_new.find_latest(&probe).is_some());
}
#[test]
fn merge_overlay_keeps_delta_changes() {
let mut a = A::new();
let txid = AtomicU64::new(1);
let l = a.clone();
let mut node = Node::new_leaf(&mut a, l);
for raw in ["a", "b", "c", "d"] {
let k = Key::new(raw.as_bytes(), Ver::new(txid.fetch_add(1, Relaxed), 1));
let v = Record::normal(1, raw.as_bytes());
let (d, r) = DeltaView::try_from_key_val(&mut a, &k, &v).unwrap();
node.insert_inplace(d.view().as_delta());
node.save(d, r);
}
(node, _) = node.try_compact(&mut a, u64::MAX).unwrap();
let (left, right) = node.try_split(&mut a).unwrap();
let kl = Key::new("aa".as_bytes(), Ver::new(txid.fetch_add(1, Relaxed), 1));
let vl = Record::normal(1, "aa".as_bytes());
let (dl, rl) = DeltaView::try_from_key_val(&mut a, &kl, &vl).unwrap();
left.insert_inplace(dl.view().as_delta());
left.save(dl, rl);
let kr = Key::new("zz".as_bytes(), Ver::new(txid.fetch_add(1, Relaxed), 1));
let vr = Record::normal(1, "zz".as_bytes());
let (dr, rr) = DeltaView::try_from_key_val(&mut a, &kr, &vr).unwrap();
right.insert_inplace(dr.view().as_delta());
right.save(dr, rr);
let (merged, _, _) = left.try_merge_node(&mut a, &right, u64::MAX).unwrap();
let probe_l = Key::new("aa".as_bytes(), Ver::new(u64::MAX, 0));
let probe_r = Key::new("zz".as_bytes(), Ver::new(u64::MAX, 0));
assert!(merged.find_latest(&probe_l).is_some());
assert!(merged.find_latest(&probe_r).is_some());
}
#[test]
fn leaf_iter() {
let mut a = A::new();
let txid = AtomicU64::new(1);
const CONSOLIDATE_THRESHOLD: usize = 64;
{
let l = a.clone();
let mut node = Node::new_leaf(&mut a, l);
let (d1, r1) = DeltaView::try_from_key_val(
&mut a,
&Key::new("foo".as_bytes(), Ver::new(txid.fetch_add(1, Relaxed), 1)),
&Record::normal(1, "1".as_bytes()),
)
.unwrap();
let (d2, r2) = DeltaView::try_from_key_val(
&mut a,
&Key::new("foo".as_bytes(), Ver::new(txid.load(Relaxed), 2)),
&Record::normal(1, "2".as_bytes()),
)
.unwrap();
let (d3, r3) = DeltaView::try_from_key_val(
&mut a,
&Key::new("foo".as_bytes(), Ver::new(txid.load(Relaxed), 3)),
&Record::remove(1),
)
.unwrap();
node.insert_inplace(d1.view().as_delta());
node.save(d1, r1);
node.insert_inplace(d2.view().as_delta());
node.save(d2, r2);
(node, _) = node.try_compact(&mut a, 1).unwrap();
let iter = node.leaf_iter(1);
assert_eq!(iter.count(), 2);
node.insert_inplace(d3.view().as_delta());
node.save(d3, r3);
let iter = node.leaf_iter(3);
assert_eq!(iter.count(), 0);
}
let l = a.clone();
let mut node = Node::new_leaf(&mut a, l);
for i in 0..30 {
let raw = format!("key_{i}");
let k = Key::new(raw.as_bytes(), Ver::new(txid.fetch_add(1, Relaxed), 1));
let v = Record::normal(1, raw.as_bytes());
let (delta, r) = DeltaView::try_from_key_val(&mut a, &k, &v).unwrap();
node.insert_inplace(delta.view().as_delta());
node.save(delta, r);
if node.delta_len() >= CONSOLIDATE_THRESHOLD {
(node, _) = node.try_compact(&mut a, 3).unwrap();
}
}
for i in 0..20 {
let raw = format!("key_{i}");
let k = Key::new(raw.as_bytes(), Ver::new(txid.fetch_add(1, Relaxed), 0));
let v = Record::remove(1);
let (delta, r) = DeltaView::try_from_key_val(&mut a, &k, &v).unwrap();
node.insert_inplace(delta.view().as_delta());
node.save(delta, r);
if node.delta_len() >= CONSOLIDATE_THRESHOLD {
(node, _) = node.try_compact(&mut a, 3).unwrap();
}
}
for i in 30..31 {
let raw = format!("key_{i}");
let k = Key::new(raw.as_bytes(), Ver::new(txid.fetch_add(1, Relaxed), 0));
let v = Record::normal(1, raw.as_bytes());
let (delta, r) = DeltaView::try_from_key_val(&mut a, &k, &v).unwrap();
node.insert_inplace(delta.view().as_delta());
node.save(delta, r);
if node.delta_len() >= CONSOLIDATE_THRESHOLD {
(node, _) = node.try_compact(&mut a, 3).unwrap();
}
}
let mut last: Option<crate::types::data::LeafSeg<'_>> = None;
let iter = node.leaf_iter(3);
for (k, _) in iter {
if let Some(old) = last {
assert!(old.cmp(&k).is_lt());
}
last = Some(k);
}
}
#[test]
fn garbage_collect_collects_whole_sibling_chain() {
let mut a = A::new_with_arena_size(96);
let l = a.clone();
fn gen_val(a: &mut A, r: Record) -> Val<'static> {
let sz = Val::calc_size(false, a.inline_size(), r.packed_size());
let mut b = a.try_alloc(sz as u32).unwrap();
Val::encode_inline(b.data_slice_mut::<u8>(), NULL_ADDR, &r);
Val::from_raw(unsafe { std::mem::transmute::<&[u8], &[u8]>(b.data_slice::<u8>()) })
}
let mut items = Vec::new();
for i in (0..16u64).rev() {
items.push((
LeafSeg::new(&[], "hot_key".as_bytes(), Ver::new(i + 1, 1)),
gen_val(&mut a, Record::normal(1, format!("v{i}").as_bytes())),
));
}
let mut iter = items.iter().map(|x| (x.0, x.1));
let base = BaseView::try_new_leaf(&mut a, &l, &[], None, NULL_PID, &mut iter, 1).unwrap();
let node = Node::new(l, base);
let mut sst_iter = node.range_iter::<A, Key>(&node.loader, 0, node.header().elems as usize);
let (_, val) = sst_iter.next().expect("must exist");
let head = val.get_sibling().expect("must have sibling");
let mut sibling_chain = Vec::new();
let mut cur = head;
while cur != NULL_ADDR {
sibling_chain.push(cur);
let view = node.loader.load_unchecked(cur);
cur = view.header().link;
}
assert!(sibling_chain.len() > 1);
let (_next, junks) = node.try_compact(&mut a, u64::MAX).unwrap();
for addr in &sibling_chain {
assert!(junks.contains(addr));
}
node.garbage_collect(&mut a, &junks);
let collected = a.take_collected();
for addr in sibling_chain {
assert!(collected.contains(&addr));
}
}
}