use parking_lot::{Mutex, MutexGuard, RwLock};
use std::{
cmp::Ordering::{self, Equal, Greater, Less},
ops::{Bound, Deref, DerefMut},
sync::Arc,
};
use crate::{
cc::context::{Context, TxOutcome},
types::{
base::{BaseIter, BaseRevIter},
data::{
Index, IntlKey, IntlSeg, IterItem, Key, LeafSeg, Record, Val, Ver,
cmp_raw_with_prefixed_tail,
},
header::{BoxHeader, NodeType},
refbox::{BaseView, BoxView, DeltaView, RemoteView},
traits::{IAsBoxRef, IBoxHeader, IDecode, IFrameAlloc, IHeader, IKey, ILoader, IVal},
},
utils::{
Handle, INIT_ORACLE, NULL_ADDR, NULL_CMD, NULL_ORACLE, NULL_PID, OpCode,
data::Position,
imtree::{ImTree, Iter, RangeIter},
},
};
use super::{header::TagKind, refbox::BoxRef};
pub(crate) type Junk = Vec<u64>;
pub(crate) enum MergeOp {
Merged,
MarkChild,
MarkParent(u64),
}
pub(crate) struct NodeState {
addr: u64,
total_size: usize,
max_txid: u64,
group: u8,
latest_lsn: Position,
delta: ImTree<DeltaView>,
}
pub(crate) struct Node<L: ILoader> {
pub(crate) loader: L,
mtx: Arc<Mutex<()>>,
pub(crate) state: RwLock<NodeState>,
inner: BaseView,
}
fn intl_cmp(x: &DeltaView, y: &DeltaView) -> Ordering {
let l = IntlKey::decode_from(x.key());
let r = IntlKey::decode_from(y.key());
l.raw.cmp(r.raw)
}
fn leaf_cmp(x: &DeltaView, y: &DeltaView) -> Ordering {
let l = Key::decode_from(x.key());
let r = Key::decode_from(y.key());
l.cmp(&r)
}
fn null_cmp(_x: &DeltaView, _y: &DeltaView) -> Ordering {
unimplemented!()
}
impl<L: ILoader> Drop for Node<L> {
fn drop(&mut self) {}
}
impl<L> Node<L>
where
L: ILoader,
{
pub(crate) fn new(loader: L, b: BoxRef, group: u8, latest_lsn: Position) -> Self {
let h = b.header();
let (addr, total_size, max_txid) = (h.addr, h.total_size as usize, h.txid);
let base = b.view().as_base();
loader.pin(b);
let this = Self {
loader,
mtx: Arc::new(Mutex::new(())),
state: RwLock::new(NodeState {
addr,
total_size,
max_txid,
group,
latest_lsn,
delta: ImTree::new(if base.header().is_index {
intl_cmp
} else {
leaf_cmp
}),
}),
inner: base,
};
this.pin_leaf_pages_unchecked();
this
}
pub(crate) fn load(addr: u64, loader: L) -> Result<Self, OpCode> {
let d = loader.load(addr)?;
let mut l = Self {
loader,
mtx: Arc::new(Mutex::new(())),
state: RwLock::new(NodeState {
addr: d.addr,
total_size: d.total_size as usize,
max_txid: d.header().txid,
group: 0,
latest_lsn: Position::MIN,
delta: ImTree::new(null_cmp),
}),
inner: BaseView::null(),
};
Self::load_inner(&mut l, d)?;
l.try_pin_leaf_pages()?;
Ok(l)
}
fn save(&self, b: BoxRef, r: Option<BoxRef>) {
self.loader.pin(b);
if let Some(x) = r {
self.loader.pin(x);
}
}
pub(crate) fn reference(&self) -> Self {
let state = self.state.read();
let this = Self {
loader: self.loader.copy_with_pin(),
mtx: self.mtx.clone(),
state: RwLock::new(NodeState {
addr: state.addr,
total_size: state.total_size,
max_txid: state.max_txid,
group: state.group,
latest_lsn: state.latest_lsn,
delta: state.delta.clone(),
}),
inner: self.inner,
};
this.pin_leaf_pages_unchecked();
this
}
pub(crate) fn pid(&self) -> u64 {
self.inner.box_header().pid
}
pub(crate) fn set_pid(&mut self, pid: u64) {
self.inner.box_header_mut().pid = pid;
}
pub(crate) fn new_leaf<A: IFrameAlloc>(
a: &mut A,
loader: L,
group: u8,
lsn: Position,
) -> Node<L> {
let empty: &[(LeafSeg, Val)] = &[];
let mut iter = empty.iter().map(|x| (x.0, x.1));
let b = BaseView::new_leaf(
a,
&loader,
[].as_slice(),
None,
NULL_PID,
&mut iter,
INIT_ORACLE,
group,
lsn,
);
let (group, lsn) = {
let h = b.header();
(h.group, h.lsn)
};
Self::new(loader, b, group, lsn)
}
pub(crate) fn new_root<A: IFrameAlloc>(
a: &mut A,
loader: L,
item: &[(IntlKey, Index)],
group: u8,
lsn: Position,
) -> Node<L> {
let b = BaseView::new_intl(
a,
[].as_slice(),
None,
NULL_PID,
|| item.iter().map(|&(x, y)| (IntlSeg::new(&[], x.raw), y)),
INIT_ORACLE,
group,
lsn,
);
let (group, lsn) = {
let h = b.header();
(h.group, h.lsn)
};
Self::new(loader, b, group, lsn)
}
pub(crate) fn size(&self) -> usize {
self.state.read().total_size
}
pub(crate) fn base_addr(&self) -> u64 {
self.inner.box_header().addr
}
pub(crate) fn base_box(&self) -> BoxRef {
self.inner.as_box()
}
pub(crate) fn collect_junk<F>(&self, mut f: F)
where
F: FnMut(u64),
{
self.state
.read()
.delta
.clone()
.iter()
.for_each(|x| f(x.box_header().addr));
f(self.base_addr());
}
fn set_comparator(&mut self, nt: NodeType) {
let mut state = self.state.write();
if nt == NodeType::Intl {
state.delta.set_comparator(intl_cmp);
} else {
state.delta.set_comparator(leaf_cmp);
}
}
pub(crate) fn get_group_lsn(&self) -> (u8, Position) {
let state = self.state.read();
(state.group, state.latest_lsn)
}
pub(crate) fn should_split(&self, split_elem: u16) -> bool {
let h = self.header();
let size_limited = h.elems >= split_elem;
let no_conflict = !h.merging && h.merging_child == NULL_PID && h.elems >= 2;
size_limited && no_conflict
}
pub(crate) fn should_merge(&self) -> bool {
let h = self.header();
let size_limited = h.split_elems >= h.elems * 4;
let no_conflict = !h.merging && h.merging_child == NULL_PID;
size_limited && no_conflict
}
pub(crate) fn can_merge_child(&self, child_lo: &[u8], child_pid: u64) -> bool {
debug_assert_eq!(self.box_header().node_type, NodeType::Intl);
let h = self.header();
if h.merging_child != NULL_PID || h.merging {
return false;
}
if h.elems == 0 || child_lo < self.lo() {
return false;
}
if let Some(hi) = self.hi()
&& child_lo >= hi
{
return false;
}
let (is_left_most, pid) = self.child_index(child_lo);
!is_left_most && pid == child_pid
}
pub(crate) fn merge_node<A: IFrameAlloc>(
&self,
a: &mut A,
other: &Node<L>,
safe_txid: u64,
) -> (Node<L>, Junk) {
let mut junks = Vec::new();
let lb = self.merge_to_base(a, &mut junks, safe_txid);
let rb = other.merge_to_base(a, &mut junks, safe_txid);
let (lhs, rhs) = (lb.view().as_base(), rb.view().as_base());
#[cfg(feature = "extra_check")]
assert_ne!(self.base_addr(), other.base_addr());
let (group, lsn) = other.get_group_lsn();
let merged = lhs.merge(a, &self.loader, rhs, safe_txid, group, lsn);
let (group, lsn) = {
let h = merged.header();
(h.group, h.lsn)
};
let mut node = Self::new(self.loader.copy_without_pin(), merged, group, lsn);
node.header_mut().split_elems = self.header().split_elems;
(node, junks)
}
pub(crate) fn child_index(&self, k: &[u8]) -> (bool, u64) {
#[cfg(feature = "extra_check")]
{
assert!(self.header().is_index);
assert!(k >= self.lo());
if let Some(hi) = self.hi() {
assert!(hi > k);
}
}
let sst = self.sst::<IntlKey>();
let pos = match sst.search_by(&IntlKey::new(k), |x, y| x.raw.cmp(y.raw)) {
Ok(pos) => pos,
Err(pos) => pos.max(1) - 1,
};
let (_, v) = sst.kv_at::<Index>(pos);
(pos == 0, v.pid)
}
pub(crate) fn insert_index<A: IFrameAlloc>(
&self,
a: &mut A,
key: &[u8],
pid: u64,
safe_txid: u64,
) -> Option<(Node<L>, Junk)> {
#[cfg(feature = "extra_check")]
if key < self.lo()
|| if let Some(hi) = self.hi() {
hi <= key
} else {
false
}
{
panic!("somehow it happens");
}
let (group, lsn) = self.get_group_lsn();
let b =
DeltaView::from_key_index(a, IntlKey::new(key), Index::new(pid), safe_txid, group, lsn);
Some(self.insert(b).compact(a, safe_txid)) }
fn decode_pefix<K>(
&self,
pos: usize,
prefix_len: usize,
lo: &[u8],
hi: &Option<&[u8]>,
) -> (Vec<u8>, (usize, usize))
where
K: IKey,
{
let k = self.sst::<K>().key_at(pos);
let mut sep = Vec::with_capacity(prefix_len + k.raw().len());
sep.extend_from_slice(&lo[..prefix_len]);
sep.extend_from_slice(k.raw());
let new_prefix_len = (
BaseView::calc_prefix(lo, &Some(sep.as_slice())),
BaseView::calc_prefix(sep.as_slice(), hi),
);
(sep, new_prefix_len)
}
pub(crate) fn split<A: IFrameAlloc>(&self, a: &mut A) -> (Node<L>, Node<L>) {
let h = self.inner.header();
let prefix_len = h.prefix_len as usize;
let elems = h.elems as usize;
let sep = elems / 2;
let lo = self.lo();
let hi = self.hi();
let sibling = self.header().right_sibling;
let txid = self.box_header().txid;
let (group, lsn) = self.get_group_lsn();
let (l, r) = if h.is_index {
let (sep_key, (llen, rlen)) = self.decode_pefix::<IntlKey>(sep, prefix_len, lo, &hi);
let lhs_prefix = &lo[..llen];
let rhs_prefix = &sep_key[..rlen];
let (ld, rd) = (lhs_prefix.len() - prefix_len, rhs_prefix.len() - prefix_len);
(
BaseView::new_intl(
a,
lo,
Some(sep_key.as_slice()),
sibling,
|| {
self.inner
.range_iter::<L, IntlKey>(&self.loader, 0, sep)
.map(|(k, v)| (IntlSeg::new(lhs_prefix, &k.raw[ld..]), v))
},
txid,
group,
lsn,
),
BaseView::new_intl(
a,
sep_key.as_slice(),
hi,
sibling,
|| {
self.inner
.range_iter::<L, IntlKey>(&self.loader, sep, elems)
.map(|(k, v)| (IntlSeg::new(rhs_prefix, &k.raw[rd..]), v))
},
txid,
group,
lsn,
),
)
} else {
let (sep_key, (llen, rlen)) = self.decode_pefix::<Key>(sep, prefix_len, lo, &hi);
let lhs_prefix = &lo[..llen];
let (ld, rd) = (lhs_prefix.len() - prefix_len, rlen - prefix_len);
let mut liter = self
.inner
.range_iter::<L, Key>(&self.loader, 0, sep)
.map(|(k, v)| (LeafSeg::new(lhs_prefix, &k.raw[ld..], k.ver), v));
let mut riter = self
.inner
.range_iter::<L, Key>(&self.loader, sep, elems)
.map(|(k, v)| (LeafSeg::new(&sep_key[..rlen], &k.raw[rd..], k.ver), v));
(
BaseView::new_leaf(
a,
&self.loader,
lo,
Some(sep_key.as_slice()),
sibling,
&mut liter,
txid,
group,
lsn,
),
BaseView::new_leaf(
a,
&self.loader,
sep_key.as_slice(),
hi,
sibling,
&mut riter,
txid,
group,
lsn,
),
)
};
let (lgroup, llsn) = {
let h = l.header();
(h.group, h.lsn)
};
let (rgroup, rlsn) = {
let h = r.header();
(h.group, h.lsn)
};
let (mut lhs, mut rhs) = (
Self::new(self.loader.copy_without_pin(), l, lgroup, llsn),
Self::new(self.loader.copy_without_pin(), r, rgroup, rlsn),
);
lhs.header_mut().split_elems = sep as u16;
rhs.header_mut().split_elems = (elems - sep) as u16;
(lhs, rhs)
}
#[allow(clippy::iter_skip_zero)]
pub(crate) fn intl_iter(&'_ self) -> IntlIter<'_, L> {
debug_assert_eq!(self.box_header().node_type, NodeType::Intl);
let len = self.header().prefix_len as usize;
let lo = self.lo();
IntlIter {
prefix: &lo[..len],
next_l: None,
next_r: None,
sst_iter: self
.inner
.range_iter(&self.loader, 0, self.inner.header().elems as usize),
delta_iter: IterAdaptor::Iter(self.state.read().delta.iter().skip(0)),
}
}
#[allow(clippy::iter_skip_zero)]
fn leaf_iter<'b>(&'b self, j: &'b mut Junk, safe_txid: u64) -> LeafIter<'b, L> {
debug_assert_eq!(self.box_header().node_type, NodeType::Leaf);
let len = self.header().prefix_len as usize;
let lo = self.lo();
LeafIter {
prefix: &lo[..len],
next_l: None,
next_r: None,
sst_iter: self
.inner
.range_iter(&self.loader, 0, self.inner.header().elems as usize),
delta_iter: IterAdaptor::Iter(self.state.read().delta.iter().skip(0)),
filter: LeafFilter::new(safe_txid, j),
}
}
#[allow(clippy::iter_skip_zero)]
fn leaf_iter_drop_aborted<'b>(
&'b self,
j: &'b mut Junk,
safe_txid: u64,
ctx: Handle<Context>,
) -> LeafIter<'b, L> {
debug_assert_eq!(self.box_header().node_type, NodeType::Leaf);
let len = self.header().prefix_len as usize;
let lo = self.lo();
LeafIter {
prefix: &lo[..len],
next_l: None,
next_r: None,
sst_iter: self
.inner
.range_iter(&self.loader, 0, self.inner.header().elems as usize),
delta_iter: IterAdaptor::Iter(self.state.read().delta.iter().skip(0)),
filter: LeafFilter::with_drop_aborted(safe_txid, j, ctx),
}
}
pub(crate) fn compact<A: IFrameAlloc>(&self, a: &mut A, safe_txid: u64) -> (Node<L>, Junk) {
let mut junks = Junk::new();
let b = self.merge_to_base(a, &mut junks, safe_txid);
let mut base = b.view().as_base();
let h = base.header_mut();
let old = self.header();
h.merging = old.merging;
h.merging_child = old.merging_child;
let (group, lsn) = {
let h = b.header();
(h.group, h.lsn)
};
(
Self::new(self.loader.copy_without_pin(), b, group, lsn),
junks,
)
}
pub(crate) fn remove_aborted<A: IFrameAlloc>(
&self,
a: &mut A,
safe_txid: u64,
ctx: Handle<Context>,
) -> (Node<L>, Junk, bool) {
let mut junks = Junk::new();
let (b, removed) = self.merge_to_base_drop_aborted(a, &mut junks, safe_txid, ctx);
let mut base = b.view().as_base();
let h = base.header_mut();
let old = self.header();
h.merging = old.merging;
h.merging_child = old.merging_child;
let (group, lsn) = {
let h = b.header();
(h.group, h.lsn)
};
(
Self::new(self.loader.copy_without_pin(), b, group, lsn),
junks,
removed,
)
}
fn merge_to_base<A: IFrameAlloc>(&self, a: &mut A, j: &mut Junk, safe_txid: u64) -> BoxRef {
let h = self.header();
let lo = self.lo();
let hi = self.hi();
if h.is_index {
let (group, lsn) = self.get_group_lsn();
BaseView::new_intl(
a,
lo,
hi,
h.right_sibling,
|| self.intl_iter(),
safe_txid,
group,
lsn,
)
} else {
let mut iter = self.leaf_iter(j, safe_txid);
let (group, lsn) = self.get_group_lsn();
BaseView::new_leaf(
a,
&self.loader,
lo,
hi,
h.right_sibling,
&mut iter,
safe_txid,
group,
lsn,
)
}
}
fn merge_to_base_drop_aborted<A: IFrameAlloc>(
&self,
a: &mut A,
j: &mut Junk,
safe_txid: u64,
ctx: Handle<Context>,
) -> (BoxRef, bool) {
let h = self.header();
let lo = self.lo();
let hi = self.hi();
debug_assert!(!h.is_index);
let mut iter = self.leaf_iter_drop_aborted(j, safe_txid, ctx);
let (group, lsn) = self.get_group_lsn();
let base = BaseView::new_leaf(
a,
&self.loader,
lo,
hi,
h.right_sibling,
&mut iter,
safe_txid,
group,
lsn,
);
(base, iter.filter.removed)
}
pub(crate) fn process_merge<A: IFrameAlloc>(
&self,
a: &mut A,
op: MergeOp,
safe_txid: u64,
) -> (Node<L>, Junk) {
match op {
MergeOp::Merged => {
assert_eq!(self.box_header().node_type, NodeType::Intl);
let mut key = None;
let h = self.header();
let merging_child = h.merging_child;
for (k, v) in self.intl_iter() {
if v.pid == merging_child {
key = Some(k);
break;
}
}
let (group, lsn) = self.get_group_lsn();
let b = DeltaView::from_key_index(
a,
key.unwrap(),
Index::null(),
safe_txid,
group,
lsn,
);
let tmp = self.insert(b);
let (mut p, j) = tmp.compact(a, safe_txid);
p.header_mut().merging_child = NULL_PID;
p.header_mut().split_elems = h.split_elems + 1;
(p, j)
}
MergeOp::MarkParent(pid) => {
assert_eq!(self.box_header().node_type, NodeType::Intl);
let (mut p, j) = self.compact(a, safe_txid);
p.header_mut().merging_child = pid;
(p, j)
}
MergeOp::MarkChild => {
assert_eq!(self.box_header().node_type, NodeType::Leaf);
let (mut p, j) = self.compact(a, safe_txid);
p.header_mut().merging = true;
(p, j)
}
}
}
pub(crate) fn insert(&self, b: BoxRef) -> Node<L> {
let mut k = b.view().as_delta();
let h = k.box_header_mut();
let th = self.box_header();
let state = self.state.read();
h.link = state.addr;
h.node_type = th.node_type;
h.pid = th.pid;
let addr = h.addr;
self.loader.pin(b);
Node {
loader: self.loader.copy_with_pin(),
mtx: self.mtx.clone(),
state: RwLock::new(NodeState {
addr,
total_size: state.total_size + h.total_size as usize,
max_txid: h.txid,
group: h.group,
latest_lsn: h.lsn,
delta: state.delta.update(k),
}),
inner: self.inner,
}
}
fn insert_inplace(&self, mut k: DeltaView, remote_size: usize) -> u64 {
let h = k.box_header_mut();
let th = self.box_header();
let addr = h.addr;
let mut state = self.state.write();
h.link = state.addr;
h.node_type = th.node_type;
h.pid = th.pid;
state.addr = addr;
state.total_size += h.total_size as usize + remote_size;
state.max_txid = h.txid;
state.group = h.group;
state.latest_lsn = h.lsn;
state.delta.put(k);
addr
}
pub(crate) fn lock(&'_ self) -> NodeGuard<'_, L> {
NodeGuard {
_guard: self.mtx.lock(),
node: self,
}
}
pub(crate) fn try_lock(&'_ self) -> Option<NodeGuard<'_, L>> {
let guard = self.mtx.try_lock()?;
Some(NodeGuard {
_guard: guard,
node: self,
})
}
pub(crate) fn latest_addr(&self) -> u64 {
self.state.read().addr
}
pub(crate) fn has_garbage(&self, safe_txid: u64) -> bool {
if self.header().is_index {
return false;
}
let state = self.state.read();
if state.max_txid <= safe_txid {
return self.inner.header().has_multiple_versions;
}
false
}
pub(crate) fn find_latest(&self, key: &Key) -> Option<(Ver, Record, BoxRef)> {
debug_assert!(!self.inner.header().is_index);
let mut result = None;
self.visit_versions(
*key,
|x, y| Key::decode_from(x.key()).cmp(y),
|x| {
let k = Key::decode_from(x.key());
if k.raw() == key.raw() {
let v = x.val();
let (v, r) = v.get_record(&self.loader, true);
result = Some((k.ver, v, r.unwrap_or_else(|| self.base_box())));
return true;
}
false
},
);
if let Some(res) = result {
return Some(res);
}
self.search_sst(key).map(|(k, v)| {
let (v, r) = v.get_record(&self.loader, true);
(k.ver, v, r.unwrap_or_else(|| self.base_box()))
})
}
pub(crate) fn search_sst<'a>(&self, key: &Key) -> Option<(Key<'a>, Val<'a>)> {
debug_assert_eq!(self.box_header().node_type, NodeType::Leaf);
if self.header().elems > 0 {
let sst = self.inner.sst::<Key>();
let pos = sst.search_by(key, |x, y| x.raw().cmp(y.raw())).ok()?;
Some(sst.kv_at(pos))
} else {
None
}
}
pub(crate) fn visit_versions<K, F>(&self, key: K, cmp: fn(&DeltaView, &K) -> Ordering, mut f: F)
where
K: IKey,
F: FnMut(DeltaView) -> bool,
{
self.state.read().delta.visit_from(&key, cmp, &mut f);
}
#[allow(unused)]
pub(crate) fn show(&self) {
let h = self.box_header();
let state = self.state.read();
log::debug!(
"---------- show delta {} {:?} elems {} ----------",
h.pid,
h.addr,
state.delta.len()
);
if self.header().is_index {
let it = state.delta.iter();
for x in it {
let k = IntlKey::decode_from(x.key());
let v = Index::decode_from(x.index());
log::debug!("{} => {}", k.to_string(), v);
}
let sst = self.sst::<IntlKey>();
sst.show_intl(h.pid, h.addr);
} else {
let it = state.delta.iter();
for x in it {
let k = Key::decode_from(x.key());
let val = x.val();
let (r, _) = val.get_record(&self.loader, true);
log::debug!("{} => {}", k.to_string(), r);
}
let sst = self.sst::<Key>();
sst.show_leaf(&self.loader, h.pid, h.addr);
}
}
pub(crate) fn box_header(&self) -> &BoxHeader {
self.inner.box_header()
}
fn try_pin_leaf_pages(&self) -> Result<(), OpCode> {
if self.box_header().node_type != NodeType::Leaf {
return Ok(());
}
let mut siblings = Vec::new();
if self.inner.load_sibling_heads_hint(&mut siblings) {
for mut addr in siblings {
while addr != NULL_ADDR {
let p = self.loader.load(addr)?;
addr = p.link;
}
}
}
Ok(())
}
fn pin_leaf_pages_unchecked(&self) {
self.try_pin_leaf_pages().expect("must exist")
}
pub(crate) fn delta_len(&self) -> usize {
self.state.read().delta.len()
}
fn load_inner(l: &mut Node<L>, mut d: BoxView) -> Result<(), OpCode> {
let mut one_base = true;
let mut last_type = None;
loop {
let h = d.header();
let state = l.state.get_mut();
if last_type.is_none() {
state.group = h.group;
state.latest_lsn = h.lsn;
}
state.total_size += d.total_size as usize;
state.max_txid = state.max_txid.max(h.txid);
if let Some(t) = last_type {
assert_eq!(t, h.node_type);
} else {
l.set_comparator(h.node_type);
}
last_type = Some(h.node_type);
match h.kind {
TagKind::Delta => {
let delta = d.as_delta();
l.state.get_mut().delta.put(delta);
}
TagKind::Base => {
assert!(one_base);
one_base = false;
l.inner = d.as_base();
}
_ => unreachable!("bad kind {:?}", h.kind),
}
if d.link == NULL_ADDR {
break;
}
d = l.loader.load(d.link)?;
}
assert!(!l.inner.is_null());
Ok(())
}
#[allow(clippy::iter_skip_zero)]
pub(crate) fn successor<'a>(
&'a self,
b: &'a Bound<Vec<u8>>,
cached_key: Handle<Vec<u8>>,
) -> RawLeafIter<'a, L> {
fn cmp_fn(x: &DeltaView, y: &&[u8]) -> Ordering {
Key::decode_from(x.key()).raw.cmp(y)
}
fn equal_fn(_x: &DeltaView, _y: &&[u8]) -> bool {
true
}
let state = self.state.read();
let (delta, pos) = match b {
Bound::Unbounded => (IterAdaptor::Iter(state.delta.iter().skip(0)), 0),
Bound::Included(b) => {
let r = state
.delta
.range_from(b.as_slice(), cmp_fn, equal_fn)
.skip(0);
let lo = self.lo();
let pos = if b.as_slice() < lo {
Err(0)
} else {
let key = Key::new(b, Ver::new(NULL_ORACLE, NULL_CMD));
self.sst::<Key>().lower_bound(&key)
};
(IterAdaptor::Range(r), pos.unwrap_or_else(|x| x))
}
Bound::Excluded(b) => {
let iter = state.delta.range_from(b.as_slice(), cmp_fn, equal_fn);
let delta = if let Some(cur) = iter.peek()
&& Key::decode_from(cur.key()).raw == b.as_slice()
{
iter.skip(1)
} else {
iter.skip(0)
};
let lo = self.lo();
let inner_pos = if b.as_slice() < lo {
Err(0)
} else {
let key = Key::new(b, Ver::new(NULL_ORACLE, NULL_CMD));
self.sst::<Key>().lower_bound(&key)
};
let pos = match inner_pos {
Ok(x) => {
let (k, _) = self.sst::<Key>().kv_at::<Val>(x);
if k.raw == b.as_slice() { x + 1 } else { x }
}
Err(x) => x,
};
(IterAdaptor::Range(delta), pos)
}
};
let lo = self.lo();
let len = self.header().prefix_len as usize;
RawLeafIter {
cached_key,
prefix: &lo[..len],
next_l: None,
next_r: None,
delta_iter: delta,
sst_iter: self
.inner
.range_iter(&self.loader, pos, self.header().elems as usize),
}
}
pub(crate) fn predecessor<'a>(
&'a self,
lo: &'a Bound<Vec<u8>>,
hi: &'a Bound<Vec<u8>>,
cached_key: Handle<Vec<u8>>,
) -> RawLeafRevIter<'a, L> {
fn cmp_fn(x: &DeltaView, y: &&[u8]) -> Ordering {
Key::decode_from(x.key()).raw.cmp(y)
}
let state = self.state.read();
let delta = match lo {
Bound::Unbounded => IterAdaptorRev::IterRev {
iter: state.delta.iter(),
excluded: None,
pending: None,
group: Vec::new(),
},
Bound::Included(b) => IterAdaptorRev::RangeRev {
iter: state.delta.range_from(b.as_slice(), cmp_fn, |_x, _y| true),
excluded: None,
pending: None,
group: Vec::new(),
},
Bound::Excluded(b) => IterAdaptorRev::RangeRev {
iter: state.delta.range_from(b.as_slice(), cmp_fn, |_x, _y| true),
excluded: Some(b.as_slice()),
pending: None,
group: Vec::new(),
},
};
let sst = self.sst::<Key>();
let elems = self.header().elems as usize;
let beg = match hi {
Bound::Unbounded => elems,
Bound::Included(h) => {
let key = Key::new(h, Ver::new(NULL_ORACLE, NULL_CMD));
match sst.lower_bound(&key) {
Ok(i) => i + 1,
Err(i) => i,
}
}
Bound::Excluded(h) => {
let key = Key::new(h, Ver::new(u64::MAX, u32::MAX));
match sst.lower_bound(&key) {
Ok(i) | Err(i) => i,
}
}
};
let cur = if beg == 0 { -1 } else { beg as isize - 1 };
let end = match lo {
Bound::Unbounded => 0,
Bound::Included(b) => {
let pos = if b.as_slice() < self.lo() {
0
} else {
sst.lower_bound(&Key::new(b, Ver::new(u64::MAX, u32::MAX)))
.unwrap_or_else(|x| x)
};
pos as isize
}
Bound::Excluded(b) => {
let pos = if b.as_slice() < self.lo() {
0
} else {
let key = Key::new(b, Ver::new(NULL_ORACLE, NULL_CMD));
match sst.lower_bound(&key) {
Ok(i) => {
let (k, _) = sst.kv_at::<Val>(i);
if k.raw == b.as_slice() { i + 1 } else { i }
}
Err(i) => i,
}
};
pos as isize
}
};
let prefix = &self.lo()[..self.header().prefix_len as usize];
RawLeafRevIter {
cached_key,
prefix,
next_l: None,
next_r: None,
sst_iter: BaseRevIter::new(&self.loader, sst, cur, end),
delta_iter: delta,
}
}
}
pub(crate) struct NodeGuard<'a, L: ILoader> {
_guard: MutexGuard<'a, ()>,
node: &'a Node<L>,
}
impl<L> NodeGuard<'_, L>
where
L: ILoader,
{
pub(crate) fn insert(&self, k: BoxRef, v: Option<BoxRef>) -> u64 {
let remote_size = v
.as_ref()
.map(|x| x.header().total_size as usize)
.unwrap_or(0);
let addr = self.node.insert_inplace(k.view().as_delta(), remote_size);
self.node.save(k, v); addr
}
}
impl<L> Deref for Node<L>
where
L: ILoader,
{
type Target = BaseView;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<L> DerefMut for Node<L>
where
L: ILoader,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
enum IterAdaptor<'a, T> {
Iter(std::iter::Skip<Iter<'a, DeltaView>>),
Range(std::iter::Skip<RangeIter<'a, DeltaView, T>>),
}
impl<'a, T> Iterator for IterAdaptor<'a, T> {
type Item = DeltaView;
fn next(&mut self) -> Option<Self::Item> {
match self {
IterAdaptor::Iter(i) => i.next(),
IterAdaptor::Range(r) => r.next(),
}
}
}
enum IterAdaptorRev<'a> {
IterRev {
iter: Iter<'a, DeltaView>,
excluded: Option<&'a [u8]>,
pending: Option<DeltaView>,
group: Vec<DeltaView>,
},
RangeRev {
iter: RangeIter<'a, DeltaView, &'a [u8]>,
excluded: Option<&'a [u8]>,
pending: Option<DeltaView>,
group: Vec<DeltaView>,
},
}
impl IterAdaptorRev<'_> {
fn next(&mut self) -> Option<DeltaView> {
match self {
IterAdaptorRev::IterRev {
iter,
excluded,
pending,
group,
} => {
if let Some(x) = group.pop() {
return Some(x);
}
let first = loop {
let x = pending.take().or_else(|| iter.next_back())?;
if excluded.is_none_or(|b| Key::decode_from(x.key()).raw != b) {
break x;
}
};
let raw = Key::decode_from(first.key()).raw;
group.push(first);
while let Some(x) = iter.next_back() {
let k = Key::decode_from(x.key());
if excluded.is_some_and(|b| k.raw == b) {
continue;
}
if k.raw == raw {
group.push(x);
continue;
}
*pending = Some(x);
break;
}
group.pop()
}
IterAdaptorRev::RangeRev {
iter,
excluded,
pending,
group,
} => {
if let Some(x) = group.pop() {
return Some(x);
}
let first = loop {
let x = pending.take().or_else(|| iter.next_back())?;
if excluded.is_none_or(|b| Key::decode_from(x.key()).raw != b) {
break x;
}
};
let raw = Key::decode_from(first.key()).raw;
group.push(first);
while let Some(x) = iter.next_back() {
let k = Key::decode_from(x.key());
if excluded.is_some_and(|b| k.raw == b) {
continue;
}
if k.raw == raw {
group.push(x);
continue;
}
*pending = Some(x);
break;
}
group.pop()
}
}
}
}
pub(crate) struct IntlIter<'a, L>
where
L: ILoader,
{
prefix: &'a [u8],
next_l: Option<(IntlSeg<'a>, Index)>,
next_r: Option<(IntlSeg<'a>, Index)>,
sst_iter: BaseIter<'a, L, IntlKey<'a>>,
delta_iter: IterAdaptor<'a, &'a [u8]>,
}
pub(crate) struct LeafIter<'a, L>
where
L: ILoader,
{
prefix: &'a [u8],
next_l: Option<(LeafSeg<'a>, Val<'a>)>,
next_r: Option<(LeafSeg<'a>, Val<'a>)>,
sst_iter: BaseIter<'a, L, Key<'a>>,
delta_iter: IterAdaptor<'a, &'a [u8]>,
filter: LeafFilter<'a>,
}
pub(crate) struct RawLeafIter<'a, L>
where
L: ILoader,
{
cached_key: Handle<Vec<u8>>,
prefix: &'a [u8],
next_l: Option<IterItem<'a, L>>,
next_r: Option<IterItem<'a, L>>,
sst_iter: BaseIter<'a, L, Key<'a>>,
delta_iter: IterAdaptor<'a, &'a [u8]>,
}
pub(crate) struct RawLeafRevIter<'a, L>
where
L: ILoader,
{
cached_key: Handle<Vec<u8>>,
prefix: &'a [u8],
next_l: Option<IterItem<'a, L>>,
next_r: Option<IterItem<'a, L>>,
sst_iter: BaseRevIter<'a, L, Key<'a>>,
delta_iter: IterAdaptorRev<'a>,
}
impl<'a, L> Iterator for IntlIter<'a, L>
where
L: ILoader,
{
type Item = (IntlSeg<'a>, Index);
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.next_l.is_none()
&& let Some(x) = self.delta_iter.next()
{
let k = IntlKey::decode_from(x.key());
self.next_l = Some((
IntlSeg::new(self.prefix, &k.raw[self.prefix.len()..]),
Index::decode_from(x.index()),
));
}
if self.next_r.is_none()
&& let Some((k, v)) = self.sst_iter.next()
{
self.next_r = Some((IntlSeg::new(self.prefix, k.raw), v));
}
match (self.next_l, self.next_r) {
(None, None) => return None,
(None, Some(x)) => {
self.next_r = None;
return Some(x);
}
(Some(x), None) => {
self.next_l = None;
debug_assert!(!x.1.is_tombstone());
return Some(x);
}
(Some(l), Some(r)) => match l.0.raw_cmp(&r.0) {
Equal => {
self.next_l = None;
self.next_r = None;
if l.1.is_tombstone() {
continue;
}
return Some(l);
}
Greater => {
self.next_r = None;
return Some(r);
}
Less => {
self.next_l = None;
return Some(l);
}
},
}
}
}
}
impl<'a, L> Iterator for LeafIter<'a, L>
where
L: ILoader,
{
type Item = (LeafSeg<'a>, Val<'a>);
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.next_r.is_none() {
let (sst_iter, filter) = (&mut self.sst_iter, &mut self.filter);
if let Some((k, v)) = sst_iter.next_with_sibling(|addr| filter.junks.push(addr)) {
self.next_r = Some((LeafSeg::new(self.prefix, k.raw, k.ver), v));
}
}
if self.next_l.is_none()
&& let Some(x) = self.delta_iter.next()
{
let k = Key::decode_from(x.key());
let v = x.val();
self.next_l = Some((
LeafSeg::new(self.prefix, &k.raw[self.prefix.len()..], k.ver),
v,
));
}
match (self.next_l, self.next_r) {
(None, None) => return None,
(None, Some(r)) => {
self.next_r = None;
if self.filter.check(&(r.0, r.1)) {
return Some(r);
}
}
(Some(l), None) => {
self.next_l = None;
if self.filter.check(&(l.0, l.1)) {
return Some(l);
}
}
(Some(l), Some(r)) => match l.0.cmp(&r.0) {
Less => {
self.next_l = None;
if self.filter.check(&(l.0, l.1)) {
return Some(l);
}
}
Greater => {
self.next_r = None;
if self.filter.check(&(r.0, r.1)) {
return Some(r);
}
}
Equal => unreachable!("never happen"),
},
}
}
}
}
impl<'a, L> Iterator for RawLeafIter<'a, L>
where
L: ILoader,
{
type Item = IterItem<'a, L>;
fn next(&mut self) -> Option<Self::Item> {
self.try_next().expect("must exist")
}
}
impl<'a, L> RawLeafIter<'a, L>
where
L: ILoader,
{
pub(crate) fn try_next(&mut self) -> Result<Option<IterItem<'a, L>>, OpCode> {
if self.next_l.is_none()
&& let Some(x) = self.delta_iter.next()
{
let k = Key::decode_from(x.key());
self.next_l = Some(IterItem::new(
self.cached_key,
&[],
k,
x.val(),
self.sst_iter.loader,
));
}
if self.next_r.is_none()
&& let Some((k, val)) = self.sst_iter.try_next_with_sibling(|_| {})?
{
self.next_r = Some(IterItem::new(
self.cached_key,
self.prefix,
k,
val,
self.sst_iter.loader,
));
}
Ok(match (self.next_l.take(), self.next_r.take()) {
(None, None) => None,
(None, Some(r)) => Some(r),
(Some(l), None) => Some(l),
(Some(l), Some(r)) => {
let ord = if self.prefix.is_empty() {
l.cmp(&r)
} else {
cmp_raw_with_prefixed_tail(l.base.raw, self.prefix, r.base.raw)
};
match ord {
Less => {
self.next_r = Some(r);
Some(l)
}
Greater => {
self.next_l = Some(l);
Some(r)
}
Equal => {
self.next_r = Some(r);
Some(l)
}
}
}
})
}
}
impl<'a, L> RawLeafRevIter<'a, L>
where
L: ILoader,
{
pub(crate) fn try_next_back(&mut self) -> Result<Option<IterItem<'a, L>>, OpCode> {
if self.next_l.is_none()
&& let Some(x) = self.delta_iter.next()
{
let k = Key::decode_from(x.key());
self.next_l = Some(IterItem::new(
self.cached_key,
&[],
k,
x.val(),
self.sst_iter.loader,
));
}
if self.next_r.is_none()
&& let Some((k, val)) = self.sst_iter.try_next_back_with_sibling(|_| {})?
{
self.next_r = Some(IterItem::new(
self.cached_key,
self.prefix,
k,
val,
self.sst_iter.loader,
));
}
Ok(match (self.next_l.take(), self.next_r.take()) {
(None, None) => None,
(None, Some(r)) => Some(r),
(Some(l), None) => Some(l),
(Some(l), Some(r)) => {
let ord = if self.prefix.is_empty() {
l.cmp(&r)
} else {
cmp_raw_with_prefixed_tail(l.base.raw, self.prefix, r.base.raw)
};
match ord {
Less => {
self.next_l = Some(l);
Some(r)
}
Greater => {
self.next_r = Some(r);
Some(l)
}
Equal => {
self.next_r = Some(r);
Some(l)
}
}
}
})
}
}
struct LeafFilter<'a> {
txid: u64,
last: Option<&'a [u8]>,
junks: &'a mut Vec<u64>,
skip_dup: bool,
drop_aborted_ctx: Option<Handle<Context>>,
removed: bool,
}
impl<'a> LeafFilter<'a> {
fn new(txid: u64, junks: &'a mut Vec<u64>) -> Self {
Self {
txid,
last: None,
junks,
skip_dup: false,
drop_aborted_ctx: None,
removed: false,
}
}
fn with_drop_aborted(txid: u64, junks: &'a mut Vec<u64>, ctx: Handle<Context>) -> Self {
Self {
txid,
last: None,
junks,
skip_dup: false,
drop_aborted_ctx: Some(ctx),
removed: false,
}
}
fn check_impl(&mut self, k: &LeafSeg<'a>, v: &Val) -> bool {
if let Some(ctx) = self.drop_aborted_ctx
&& ctx.get_aborted(k.txid()) == Some(TxOutcome::Aborted)
{
self.removed = true;
return false;
}
if let Some(last) = self.last
&& last == k.raw()
{
if self.skip_dup {
return false;
}
if k.txid() > self.txid {
return true;
}
self.skip_dup = true;
return !v.is_tombstone();
}
self.last = Some(k.raw());
self.skip_dup = k.txid() <= self.txid;
if v.is_tombstone() && self.skip_dup {
return false;
}
true
}
#[inline(always)]
fn check(&mut self, x: &(LeafSeg<'a>, Val)) -> bool {
let (k, v) = x;
if !self.check_impl(k, v) {
self.collect(v);
false
} else {
true
}
}
fn collect(&mut self, v: &Val) {
let remote = v.get_remote();
if remote != NULL_ADDR {
self.junks.push(RemoteView::tag(remote));
}
}
}
#[cfg(test)]
mod test {
use parking_lot::Mutex;
use std::{
collections::HashMap,
rc::Rc,
sync::atomic::{AtomicU64, Ordering::Relaxed},
};
use crate::{
Options,
types::{
data::{Key, LeafSeg, Record, Ver},
node::{Junk, Node},
refbox::{BoxRef, BoxView, DeltaView},
traits::{IFrameAlloc, IHeader, ILoader},
},
utils::{OpCode, data::Position},
};
struct AInner {
map: Mutex<HashMap<u64, BoxRef>>,
off: AtomicU64,
}
#[derive(Clone)]
struct A {
inner: Rc<AInner>,
}
impl A {
fn new() -> Self {
Self {
inner: Rc::new(AInner {
map: Mutex::new(HashMap::new()),
off: AtomicU64::new(0),
}),
}
}
fn load(&self, addr: u64) -> BoxRef {
let lk = self.inner.map.lock();
lk.get(&addr).unwrap().clone()
}
}
impl IFrameAlloc for A {
fn alloc(&mut self, size: u32) -> BoxRef {
let addr = self
.inner
.off
.fetch_add(BoxRef::real_size(size) as u64, Relaxed);
let p = BoxRef::alloc(size, addr);
let mut lk = self.inner.map.lock();
lk.insert(addr, p.clone());
p
}
fn inline_size(&self) -> usize {
Options::MIN_INLINE_SIZE
}
}
impl ILoader for A {
fn load(&self, addr: u64) -> Result<BoxView, OpCode> {
Ok(self.load(addr).view())
}
fn pin(&self, data: BoxRef) {
let mut lk = self.inner.map.lock();
lk.insert(data.header().addr, data);
}
fn copy_with_pin(&self) -> Self {
self.clone()
}
fn copy_without_pin(&self) -> Self {
self.clone()
}
fn load_remote(&self, addr: u64) -> Result<BoxRef, OpCode> {
Ok(self.load(addr))
}
}
#[test]
fn leaf_iter() {
let mut a = A::new();
let txid = AtomicU64::new(1);
const CONSOLIDATE_THRESHOLD: usize = 64;
let lsn = Position::default();
let mut j = Junk::new();
{
let l = a.clone();
let mut node = Node::new_leaf(&mut a, l, 0, Position::MIN);
let (d1, r1) = DeltaView::from_key_val(
&mut a,
&Key::new("foo".as_bytes(), Ver::new(txid.fetch_add(1, Relaxed), 1)),
&Record::normal(1, "1".as_bytes()),
0,
lsn,
);
let (d2, r2) = DeltaView::from_key_val(
&mut a,
&Key::new("foo".as_bytes(), Ver::new(txid.load(Relaxed), 2)),
&Record::normal(1, "2".as_bytes()),
0,
lsn,
);
let (d3, r3) = DeltaView::from_key_val(
&mut a,
&Key::new("foo".as_bytes(), Ver::new(txid.load(Relaxed), 3)),
&Record::remove(1),
0,
lsn,
);
node.insert_inplace(
d1.view().as_delta(),
r1.as_ref()
.map(|x| x.header().total_size as usize)
.unwrap_or(0),
);
node.save(d1, r1);
node.insert_inplace(
d2.view().as_delta(),
r2.as_ref()
.map(|x| x.header().total_size as usize)
.unwrap_or(0),
);
node.save(d2, r2);
(node, _) = node.compact(&mut a, 1);
let iter = node.leaf_iter(&mut j, 1);
assert_eq!(iter.count(), 2);
node.insert_inplace(
d3.view().as_delta(),
r3.as_ref()
.map(|x| x.header().total_size as usize)
.unwrap_or(0),
);
node.save(d3, r3);
let iter = node.leaf_iter(&mut j, 3);
assert_eq!(iter.count(), 0);
}
let l = a.clone();
let mut node = Node::new_leaf(&mut a, l, 0, Position::MIN);
for i in 0..30 {
let raw = format!("key_{i}");
let k = Key::new(raw.as_bytes(), Ver::new(txid.fetch_add(1, Relaxed), 1));
let v = Record::normal(1, raw.as_bytes());
let (delta, r) = DeltaView::from_key_val(&mut a, &k, &v, 0, lsn);
node.insert_inplace(
delta.view().as_delta(),
r.as_ref()
.map(|x| x.header().total_size as usize)
.unwrap_or(0),
);
node.save(delta, r);
if node.delta_len() >= CONSOLIDATE_THRESHOLD {
(node, _) = node.compact(&mut a, 3);
}
}
for i in 0..20 {
let raw = format!("key_{i}");
let k = Key::new(raw.as_bytes(), Ver::new(txid.fetch_add(1, Relaxed), 0));
let v = Record::remove(1);
let (delta, r) = DeltaView::from_key_val(&mut a, &k, &v, 0, lsn);
node.insert_inplace(
delta.view().as_delta(),
r.as_ref()
.map(|x| x.header().total_size as usize)
.unwrap_or(0),
);
node.save(delta, r);
if node.delta_len() >= CONSOLIDATE_THRESHOLD {
(node, _) = node.compact(&mut a, 3);
}
}
for i in 30..31 {
let raw = format!("key_{i}");
let k = Key::new(raw.as_bytes(), Ver::new(txid.fetch_add(1, Relaxed), 0));
let v = Record::normal(1, raw.as_bytes());
let (delta, r) = DeltaView::from_key_val(&mut a, &k, &v, 0, lsn);
node.insert_inplace(
delta.view().as_delta(),
r.as_ref()
.map(|x| x.header().total_size as usize)
.unwrap_or(0),
);
node.save(delta, r);
if node.delta_len() >= CONSOLIDATE_THRESHOLD {
(node, _) = node.compact(&mut a, 3);
}
}
let mut last: Option<LeafSeg<'_>> = None;
let iter = node.leaf_iter(&mut j, 3);
for (k, _) in iter {
if let Some(old) = last {
assert!(old.cmp(&k).is_lt());
}
last = Some(k);
}
}
}