use seize::LocalGuard;
use crate::Permuter;
use crate::alloc_trait::TreeAllocator;
use crate::key::{IKEY_SIZE, MAX_KEY_LENGTH};
use crate::leaf15::{LAYER_KEYLENX, LeafNode15};
use crate::nodeversion::NodeVersion;
use crate::policy::LeafPolicy;
use crate::tree::MassTreeGeneric;
use super::cursor_key::CursorKey;
use super::helper::{KeyIndexedPosition, lower_with_position};
use super::iterator::{KeysIter, RangeBound, RangeIter, ScanEntry, ValuesIter};
use super::traversal::reach_leaf_for_scan;
impl<P, A> MassTreeGeneric<P, A>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
pub fn range<'a, 'g>(
&'a self,
start: RangeBound<'a>,
end: RangeBound<'a>,
guard: &'g LocalGuard<'a>,
) -> RangeIter<'a, 'g, P, A> {
self.verify_guard(guard);
RangeIter::new(self, start, end, guard)
}
pub(crate) fn range_forward<'a, 'g>(
&'a self,
start: RangeBound<'a>,
end: RangeBound<'a>,
guard: &'g LocalGuard<'a>,
) -> RangeIter<'a, 'g, P, A> {
self.verify_guard(guard);
RangeIter::new_forward_only(self, start, end, guard)
}
pub(crate) fn range_forward_from_root<'a, 'g>(
&'a self,
layer_root: *const u8,
cursor_key: CursorKey,
start: RangeBound<'a>,
end: RangeBound<'a>,
guard: &'g LocalGuard<'a>,
) -> RangeIter<'a, 'g, P, A> {
self.verify_guard(guard);
RangeIter::new_forward_only_from_root(layer_root, cursor_key, start, end, guard)
}
pub fn iter<'a, 'g>(&'a self, guard: &'g LocalGuard<'a>) -> RangeIter<'a, 'g, P, A> {
self.range(RangeBound::Unbounded, RangeBound::Unbounded, guard)
}
pub fn keys<'a, 'g>(&'a self, guard: &'g LocalGuard<'a>) -> KeysIter<'a, 'g, P, A> {
self.iter(guard).keys()
}
pub fn values<'a, 'g>(&'a self, guard: &'g LocalGuard<'a>) -> ValuesIter<'a, 'g, P, A> {
self.iter(guard).values()
}
#[must_use]
#[inline]
pub fn first(&self) -> Option<ScanEntry<P::Value>>
where
P::Value: Clone,
{
let guard = self.guard();
self.first_with_guard(&guard)
.map(|entry| ScanEntry::new(entry.key, P::clone_value_from_output(&entry.value)))
}
#[must_use]
#[inline]
pub fn first_with_guard<'a>(&'a self, guard: &LocalGuard<'a>) -> Option<ScanEntry<P::Output>> {
self.iter(guard).next()
}
#[must_use]
#[inline]
pub fn last(&self) -> Option<ScanEntry<P::Value>>
where
P::Value: Clone,
{
let guard = self.guard();
self.last_with_guard(&guard)
.map(|entry| ScanEntry::new(entry.key, P::clone_value_from_output(&entry.value)))
}
#[must_use]
#[inline]
pub fn last_with_guard<'a>(&'a self, guard: &LocalGuard<'a>) -> Option<ScanEntry<P::Output>> {
self.iter(guard).next_back()
}
pub fn scan<F>(
&self,
start: RangeBound<'_>,
end: RangeBound<'_>,
visitor: F,
guard: &LocalGuard<'_>,
) -> usize
where
F: FnMut(&[u8], P::Output) -> bool,
{
self.range_forward(start, end, guard).for_each(visitor)
}
pub fn scan_intra_leaf_batch<F>(
&self,
start: RangeBound<'_>,
end: RangeBound<'_>,
visitor: F,
guard: &LocalGuard<'_>,
) -> usize
where
F: FnMut(&[u8], P::Output) -> bool,
{
self.range_forward(start, end, guard)
.for_each_intra_leaf_batch(visitor)
}
pub fn scan_rev_batch<F>(
&self,
start: RangeBound<'_>,
end: RangeBound<'_>,
visitor: F,
guard: &LocalGuard<'_>,
) -> usize
where
F: FnMut(&[u8], P::Output) -> bool,
{
self.range(start, end, guard)
.rev_for_each_intra_leaf_batch(visitor)
}
pub fn scan_values<F>(
&self,
start: RangeBound<'_>,
end: RangeBound<'_>,
visitor: F,
guard: &LocalGuard<'_>,
) -> usize
where
F: FnMut(P::Output) -> bool,
{
self.range_forward(start, end, guard)
.for_each_values_batch(visitor)
}
pub fn scan_values_rev<F>(
&self,
start: RangeBound<'_>,
end: RangeBound<'_>,
visitor: F,
guard: &LocalGuard<'_>,
) -> usize
where
F: FnMut(P::Output) -> bool,
{
self.range(start, end, guard)
.rev_for_each_values_batch(visitor)
}
pub fn scan_prefix<F>(&self, prefix: &[u8], mut visitor: F, guard: &LocalGuard<'_>) -> usize
where
F: FnMut(&[u8], P::Output) -> bool,
{
self.verify_guard(guard);
self.scan_prefix_inner(prefix, guard, |exact_value, iter| {
let mut count = 0;
if let Some(value) = exact_value {
count += 1;
if !visitor(prefix, value) {
return count;
}
}
count + iter.for_each_intra_leaf_batch(visitor)
})
}
pub fn scan_prefix_values<F>(
&self,
prefix: &[u8],
mut visitor: F,
guard: &LocalGuard<'_>,
) -> usize
where
F: FnMut(P::Output) -> bool,
{
self.verify_guard(guard);
self.scan_prefix_inner(prefix, guard, |exact_value, iter| {
let mut count = 0;
if let Some(value) = exact_value {
count += 1;
if !visitor(value) {
return count;
}
}
count + iter.for_each_values_batch(visitor)
})
}
#[inline]
fn scan_prefix_inner(
&self,
prefix: &[u8],
guard: &LocalGuard<'_>,
scan_fn: impl FnOnce(Option<P::Output>, RangeIter<'_, '_, P, A>) -> usize,
) -> usize {
assert!(
prefix.len() <= MAX_KEY_LENGTH,
"key length {} exceeds maximum {}",
prefix.len(),
MAX_KEY_LENGTH
);
let mut upper_buf = [0u8; MAX_KEY_LENGTH];
let upper_len = compute_prefix_upper_bound_into(prefix, &mut upper_buf);
let end: RangeBound<'_> = upper_len.map_or(RangeBound::Unbounded, |len| {
RangeBound::Excluded(&upper_buf[..len])
});
if let Some((layer_root, descended_chunks)) = self.descend_prefix_layers(prefix, guard)
&& descended_chunks > 0
{
let mut cursor = CursorKey::from_slice(prefix);
for _ in 0..descended_chunks {
if cursor.has_suffix() {
cursor.shift();
} else {
cursor.shift_clear();
}
}
let prefix_at_chunk_boundary = prefix.len() == descended_chunks * IKEY_SIZE;
if prefix_at_chunk_boundary {
let exact_value = self.get_with_guard(prefix, guard);
let iter = self.range_forward_from_root(
layer_root,
cursor,
RangeBound::Unbounded,
RangeBound::Unbounded,
guard,
);
return scan_fn(exact_value, iter);
}
let iter = self.range_forward_from_root(
layer_root,
cursor,
RangeBound::Included(prefix),
end,
guard,
);
return scan_fn(None, iter);
}
let iter = self.range_forward(RangeBound::Included(prefix), end, guard);
scan_fn(None, iter)
}
pub fn collect_entries(&self, guard: &LocalGuard<'_>) -> Vec<ScanEntry<P::Output>> {
self.iter(guard).collect()
}
pub fn collect_keys(&self, guard: &LocalGuard<'_>) -> Vec<Vec<u8>> {
self.keys(guard).collect()
}
pub fn collect_values(&self, guard: &LocalGuard<'_>) -> Vec<P::Output> {
self.values(guard).collect()
}
}
impl<P, A> MassTreeGeneric<P, A>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
fn descend_prefix_layers(
&self,
prefix: &[u8],
guard: &LocalGuard<'_>,
) -> Option<(*const u8, usize)> {
let full_chunks: usize = prefix.len() / IKEY_SIZE;
if full_chunks == 0 {
return None;
}
let mut current_root = self.load_root_ptr_generic(guard);
let mut descended = 0usize;
while descended < full_chunks {
let chunk_ikey = read_full_chunk_ikey(prefix, descended);
let Some(next_root) = find_layer_child_root::<P>(current_root, chunk_ikey, guard)
else {
break;
};
current_root = next_root;
descended += 1;
}
Some((current_root, descended))
}
}
fn find_layer_child_root<P>(
root: *const u8,
chunk_ikey: u64,
guard: &LocalGuard<'_>,
) -> Option<*const u8>
where
P: LeafPolicy,
{
if root.is_null() {
return None;
}
let cursor = CursorKey::from_slice(&chunk_ikey.to_be_bytes());
let leaf_ptr: *mut LeafNode15<P> = reach_leaf_for_scan::<P>(root, &cursor, guard);
if leaf_ptr.is_null() {
return None;
}
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let version: u32 = leaf.version().stable();
if NodeVersion::is_deleted_version(version) {
return None;
}
let perm: Permuter = leaf.permutation();
let kx: KeyIndexedPosition = lower_with_position(&cursor, leaf, &perm);
let _ = kx.p?;
let mut i: usize = kx.i;
while i < perm.size() {
let slot: usize = perm.get(i);
let slot_ikey: u64 = leaf.ikey_relaxed(slot);
if slot_ikey != chunk_ikey {
break;
}
if leaf.keylenx_relaxed(slot) >= LAYER_KEYLENX && !leaf.is_value_empty_relaxed(slot) {
let layer_ptr: *const u8 = leaf.load_layer_raw(slot).cast_const();
if leaf.version().has_changed(version) {
return None;
}
if !layer_ptr.is_null() {
return Some(layer_ptr);
}
return None;
}
i += 1;
}
if leaf.version().has_changed(version) {
return None;
}
None
}
#[expect(clippy::indexing_slicing, reason = "chunk bounds are caller-checked")]
fn read_full_chunk_ikey(prefix: &[u8], chunk_idx: usize) -> u64 {
let start: usize = chunk_idx * IKEY_SIZE;
let end: usize = start + IKEY_SIZE;
#[expect(clippy::expect_used, reason = "slice length is guaranteed to be 8")]
let bytes: [u8; IKEY_SIZE] = prefix[start..end].try_into().expect("slice is 8 bytes");
u64::from_be_bytes(bytes)
}
#[expect(clippy::indexing_slicing, reason = "Checked")]
fn compute_prefix_upper_bound_into(prefix: &[u8], buf: &mut [u8; MAX_KEY_LENGTH]) -> Option<usize> {
assert!(
prefix.len() <= MAX_KEY_LENGTH,
"key length {} exceeds maximum {}",
prefix.len(),
MAX_KEY_LENGTH
);
if prefix.is_empty() {
return None;
}
buf[..prefix.len()].copy_from_slice(prefix);
for i in (0..prefix.len()).rev() {
if buf[i] < 0xFF {
buf[i] += 1;
return Some(i + 1);
}
}
None
}
#[cfg(test)]
mod unit_tests;