use std::cmp::Ordering;
use std::ptr as StdPtr;
use seize::LocalGuard;
use std::mem::MaybeUninit;
use crate::hints::likely;
use crate::key::IKEY_SIZE;
use crate::leaf_trait::TreeLeafNode;
use crate::leaf15::LeafNode15;
use crate::leaf15::{KSUF_KEYLENX, LAYER_KEYLENX};
use crate::link::Linker;
use crate::nodeversion::NodeVersion;
use crate::policy::LeafPolicy;
use crate::policy::RefPolicy as RefLeafPolicy;
use crate::prefetch::prefetch_read;
use super::batch_common::{
BatchCtx, CloneEmitter, CopySlotVisitor, Forward, PtrEmitter, RefSlotVisitor, ScanEmitter,
process_batch_keyed, process_batch_values,
};
#[cfg(debug_assertions)]
use super::cursor_key::CursorDebugState;
use super::cursor_key::CursorKey;
use super::find::LeafBatchResult;
use super::helper::{
KeyIndexedPosition, initial_ksuf_match, lower_with_position, lower_with_suffix,
};
use super::iterator::RangeBound;
use super::iterator::iter_flags::ForwardFlags;
use super::scan_state::{
LayerContext, LayerStack, ScanSnapshot, ScanSnapshotPtr, ScanStackElement, ScanState,
StepResult,
};
use super::traversal::reach_leaf_for_scan;
pub struct ForwardScanCtx<P: LeafPolicy> {
pub(crate) stack: ScanStackElement<P>,
pub(crate) layer_stack: LayerStack<P>,
pub(crate) cursor_key: CursorKey,
pub(crate) state: ScanState,
pub(crate) snapshot: Option<ScanSnapshot<P>>,
pub(crate) last_output: Option<P::Output>,
pub(crate) scratch_value: MaybeUninit<P::Value>,
pub(crate) flags: ForwardFlags,
#[cfg(debug_assertions)]
pub(crate) debug_last_emitted_key: Option<Vec<u8>>,
#[cfg(debug_assertions)]
pub(crate) debug_last_cursor_state: Option<CursorDebugState>,
#[cfg(debug_assertions)]
pub(crate) debug_transition_history: [Option<String>; 32],
#[cfg(debug_assertions)]
pub(crate) debug_transition_idx: usize,
#[cfg(debug_assertions)]
pub(crate) debug_transition_count: usize,
}
impl<P: LeafPolicy> ForwardScanCtx<P> {
#[inline]
pub fn new(
root: *const u8,
cursor_key: CursorKey,
emit_equal: bool,
single_layer_mode: bool,
) -> Self {
Self {
stack: ScanStackElement::new(root),
layer_stack: LayerStack::new(),
cursor_key,
state: ScanState::FindNext,
snapshot: None,
last_output: None,
scratch_value: MaybeUninit::uninit(),
flags: ForwardFlags::with_values(emit_equal, single_layer_mode),
#[cfg(debug_assertions)]
debug_last_emitted_key: None,
#[cfg(debug_assertions)]
debug_last_cursor_state: None,
#[cfg(debug_assertions)]
debug_transition_history: [const { None }; 32],
#[cfg(debug_assertions)]
debug_transition_idx: 0,
#[cfg(debug_assertions)]
debug_transition_count: 0,
}
}
#[cfg(debug_assertions)]
#[inline]
#[allow(
clippy::panic,
reason = "Intentional panic for debug-only ordering violation detection"
)]
pub(crate) fn assert_ordering(&mut self, key: &[u8]) {
if let Some(ref last_key) = self.debug_last_emitted_key
&& key <= last_key.as_slice()
{
let current_state: CursorDebugState = self.cursor_key.debug_state();
let last_state: Option<&CursorDebugState> = self.debug_last_cursor_state.as_ref();
eprintln!("\n=== ORDERING VIOLATION DETECTED (batch path) ===");
eprintln!("Current key: {:?}", String::from_utf8_lossy(key));
eprintln!("Last key: {:?}", String::from_utf8_lossy(last_key));
eprintln!("Current key bytes: {key:?}");
eprintln!("Last key bytes: {last_key:?}");
eprintln!("Current cursor: {current_state}");
if let Some(last) = last_state {
eprintln!("Last cursor: {last}");
}
eprintln!("\n--- Recent state transitions ---");
let count = self.debug_transition_count;
let start = if count < 32 {
0
} else {
self.debug_transition_idx
};
for i in 0..count {
let idx = (start + i) % 32;
if let Some(ref transition) = self.debug_transition_history[idx] {
eprintln!("[{i}] {transition}");
}
}
eprintln!("--- End transitions ---");
eprintln!("=== END ORDERING VIOLATION ===\n");
panic!(
"Scan ordering violation: emitted key {:?} is not > last emitted key {:?}",
String::from_utf8_lossy(key),
String::from_utf8_lossy(last_key)
);
}
self.debug_last_emitted_key = Some(key.to_vec());
self.debug_last_cursor_state = Some(self.cursor_key.debug_state());
self.record_transition(format!(
"EMIT: {:?} cursor={}",
String::from_utf8_lossy(key),
self.cursor_key.debug_state()
));
}
#[cfg(debug_assertions)]
#[inline]
pub(crate) fn record_transition(&mut self, description: String) {
self.debug_transition_history[self.debug_transition_idx] = Some(description);
self.debug_transition_idx = (self.debug_transition_idx + 1) % 32;
if self.debug_transition_count < 32 {
self.debug_transition_count += 1;
}
}
}
impl<P: LeafPolicy> ForwardScanCtx<P> {
pub fn find_initial(
&mut self,
root: *const u8,
emit_equal: bool,
guard: &LocalGuard<'_>,
) -> (ScanState, Option<ScanSnapshot<P>>)
where
P::Output: Clone,
{
self.stack.set_root(root);
let leaf_ptr: *mut LeafNode15<P> = reach_leaf_for_scan::<P>(root, &self.cursor_key, guard);
if leaf_ptr.is_null() {
return (ScanState::Up, None);
}
self.stack.set_leaf(leaf_ptr);
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let version: u32 = leaf.version().stable();
if NodeVersion::is_deleted_version(version) {
return (ScanState::Retry, None);
}
let perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = leaf.permutation();
let kx: KeyIndexedPosition = lower_with_position(&self.cursor_key, leaf, &perm);
let (next_state, snapshot) = match kx.p {
Some(slot) => Self::handle_initial_match(
leaf,
slot,
&mut self.cursor_key,
&mut self.stack,
emit_equal,
version,
&perm,
kx.i,
),
None => (ScanState::FindNext, None),
};
if leaf.version().has_changed(version) {
return (ScanState::Retry, None);
}
let final_pos = if kx.p.is_some() { kx.i + 1 } else { kx.i };
self.stack.update_state(version, perm, final_pos);
(next_state, snapshot)
}
#[expect(clippy::too_many_arguments, reason = "Internals")]
#[inline(always)]
fn handle_initial_match(
leaf: &LeafNode15<P>,
slot: usize,
cursor_key: &mut CursorKey,
stack: &mut ScanStackElement<P>,
emit_equal: bool,
_version: u32,
_perm: &<LeafNode15<P> as TreeLeafNode<P>>::Perm,
_pos: usize,
) -> (ScanState, Option<ScanSnapshot<P>>)
where
P::Output: Clone,
{
let keylenx: u8 = leaf.keylenx_relaxed(slot);
if keylenx >= LAYER_KEYLENX {
let slot_ikey: u64 = leaf.ikey_relaxed(slot);
let layer_ptr: *mut u8 = leaf.load_layer_raw(slot);
cursor_key.assign_store_ikey(slot_ikey);
prefetch_read(layer_ptr);
stack.set_root(layer_ptr);
return (ScanState::Down, None);
}
if keylenx == KSUF_KEYLENX
&& let Some(stored_suffix) = leaf.ksuf(slot)
{
let cursor_suffix: &[u8] = cursor_key.suffix();
let cmp = stored_suffix.cmp(cursor_suffix);
if initial_ksuf_match(cmp, emit_equal)
&& let Some(output) = leaf.load_value(slot)
{
let key_len = IKEY_SIZE + stored_suffix.len();
cursor_key.assign_store_ikey(leaf.ikey_relaxed(slot));
let _ = cursor_key.assign_store_suffix(stored_suffix);
cursor_key.assign_store_length(key_len);
return (ScanState::Emit, Some(ScanSnapshot::new(output, key_len)));
}
return (ScanState::FindNext, None);
}
if emit_equal && let Some(output) = leaf.load_value(slot) {
let key_len = keylenx as usize;
cursor_key.assign_store_ikey(leaf.ikey_relaxed(slot));
cursor_key.assign_store_length(key_len);
return (ScanState::Emit, Some(ScanSnapshot::new(output, key_len)));
}
(ScanState::FindNext, None)
}
}
impl<P: LeafPolicy> ForwardScanCtx<P> {
#[inline]
pub fn find_next(&mut self, guard: &LocalGuard<'_>) -> (ScanState, Option<ScanSnapshot<P>>)
where
P::Output: Clone,
{
self.find_next_generic::<CloneEmitter>(guard, false)
}
#[inline]
pub fn find_next_with_dup_check(
&mut self,
guard: &LocalGuard<'_>,
) -> (ScanState, Option<ScanSnapshot<P>>)
where
P::Output: Clone,
{
self.find_next_generic::<CloneEmitter>(guard, true)
}
#[inline]
pub fn find_next_ptr(
&mut self,
guard: &LocalGuard<'_>,
) -> (ScanState, Option<ScanSnapshotPtr<P::Value>>) {
self.find_next_generic::<PtrEmitter>(guard, false)
}
#[inline]
pub fn find_next_with_dup_check_ptr(
&mut self,
guard: &LocalGuard<'_>,
) -> (ScanState, Option<ScanSnapshotPtr<P::Value>>) {
self.find_next_generic::<PtrEmitter>(guard, true)
}
#[inline]
fn find_next_generic<E: ScanEmitter<P>>(
&mut self,
guard: &LocalGuard<'_>,
needs_duplicate_check: bool,
) -> (ScanState, Option<E::Snapshot>) {
if self.stack.is_null() {
return (ScanState::Up, None);
}
let leaf: &LeafNode15<P> = unsafe { self.stack.leaf_ref() };
if leaf.version().is_deleted() {
return (ScanState::Retry, None);
}
let Some(slot) = self.stack.kp() else {
return self.advance_leaf_generic::<E>(guard);
};
let slot_ikey: u64 = leaf.ikey_relaxed(slot);
let slot_keylenx: u8 = leaf.keylenx_relaxed(slot);
leaf.prefetch_value(slot);
if slot_ikey < self.stack.last_ikey() {
return (ScanState::Retry, None);
}
if needs_duplicate_check {
let cmp: Ordering = self.cursor_key.compare(slot_ikey, slot_keylenx as usize);
let is_dup: bool = if likely(cmp == Ordering::Less) {
false
} else if cmp == Ordering::Greater {
true
} else if slot_keylenx == KSUF_KEYLENX && self.cursor_key.has_suffix() {
leaf.ksuf(slot).is_none_or(|stored_suffix| {
self.cursor_key.compare_suffix(stored_suffix) != Ordering::Less
})
} else {
true
};
if is_dup {
self.stack.next();
return (ScanState::FindNext, None);
}
}
if slot_keylenx >= LAYER_KEYLENX {
let layer_ptr: *mut u8 = leaf.load_layer_raw(slot);
self.layer_stack
.push(LayerContext::new(self.stack.root(), self.stack.leaf_ptr()));
self.cursor_key.assign_store_ikey(slot_ikey);
prefetch_read(layer_ptr);
self.stack.set_root(layer_ptr);
return (ScanState::Down, None);
}
let key_len: usize = if slot_keylenx == KSUF_KEYLENX {
if let Some(suffix) = leaf.ksuf(slot) {
let suffix_len: usize = suffix.len();
self.cursor_key.assign_store_ikey(slot_ikey);
let _ = self.cursor_key.assign_store_suffix(suffix);
self.cursor_key.assign_store_length(IKEY_SIZE + suffix_len);
IKEY_SIZE + suffix_len
} else {
self.cursor_key.assign_store_ikey(slot_ikey);
self.cursor_key.assign_store_length(IKEY_SIZE);
IKEY_SIZE
}
} else {
let len: usize = slot_keylenx as usize;
self.cursor_key.assign_store_ikey(slot_ikey);
self.cursor_key.assign_store_length(len);
len
};
let Some(snapshot) = E::emit_value(leaf, slot, key_len) else {
self.stack.next();
return (ScanState::FindNext, None);
};
self.cursor_key.mark_key_complete();
self.stack.set_last_ikey(slot_ikey);
self.stack.next();
(ScanState::Emit, Some(snapshot))
}
}
impl<P: LeafPolicy> ForwardScanCtx<P> {
#[inline]
#[expect(clippy::cast_possible_truncation)]
pub fn find_next_single_layer_ptr(
&mut self,
guard: &LocalGuard<'_>,
needs_duplicate_check: bool,
) -> (ScanState, Option<ScanSnapshotPtr<P::Value>>) {
if self.stack.is_null() {
return (ScanState::FindNext, None);
}
let leaf: &LeafNode15<P> = unsafe { self.stack.leaf_ref() };
if leaf.version().is_deleted() {
return (ScanState::Retry, None);
}
let version: u32 = self.stack.version();
if leaf.version().has_changed(version) {
return (ScanState::Retry, None);
}
let Some(slot) = self.stack.kp() else {
return self.advance_leaf_single_layer(guard);
};
let slot_ikey: u64 = leaf.ikey_relaxed(slot);
let slot_keylenx: u8 = leaf.keylenx_relaxed(slot);
leaf.prefetch_value(slot);
if slot_ikey < self.stack.last_ikey() {
return (ScanState::Retry, None);
}
if needs_duplicate_check {
let cmp: Ordering = self.cursor_key.compare(slot_ikey, slot_keylenx as usize);
let is_dup: bool = !likely(cmp == Ordering::Less);
if is_dup {
self.stack.next();
return (ScanState::FindNext, None);
}
}
if slot_keylenx >= LAYER_KEYLENX {
self.cursor_key.assign_store_ikey(slot_ikey);
let layer_ptr: *mut u8 = leaf.load_layer_raw(slot);
prefetch_read(layer_ptr);
return (ScanState::Down, None);
}
if slot_keylenx > IKEY_SIZE as u8 {
self.flags.disable_single_layer_mode();
return (ScanState::FindNext, None);
}
let slot_ptr: *mut u8 = leaf.load_value_raw(slot);
if slot_ptr.is_null() {
self.stack.next();
return (ScanState::FindNext, None);
}
let key_len: usize = slot_keylenx as usize;
self.cursor_key.assign_store_ikey(slot_ikey);
self.cursor_key.assign_store_length(key_len);
self.cursor_key.mark_key_complete();
self.stack.set_last_ikey(slot_ikey);
self.stack.next();
(
ScanState::Emit,
Some(ScanSnapshotPtr::from_raw(slot_ptr, key_len)),
)
}
#[inline(always)]
fn advance_leaf_single_layer(
&mut self,
guard: &LocalGuard<'_>,
) -> (ScanState, Option<ScanSnapshotPtr<P::Value>>) {
let leaf: &LeafNode15<P> = unsafe { self.stack.leaf_ref() };
let version: u32 = self.stack.version();
let next_raw: *mut LeafNode15<P> = leaf.next_raw(guard);
if Linker::is_marked(next_raw) {
leaf.wait_for_split();
return (ScanState::Retry, None);
}
if leaf.version().has_changed(version) {
return (ScanState::Retry, None);
}
let next: *mut LeafNode15<P> = next_raw.map_addr(|addr| addr & !1);
if next.is_null() {
self.stack.set_leaf(StdPtr::null_mut());
return (ScanState::FindNext, None);
}
self.stack.set_leaf(next);
let next_leaf: &LeafNode15<P> = unsafe { &*next };
next_leaf.prefetch();
let next_next: *mut LeafNode15<P> = next_leaf.safe_next(guard);
if !next_next.is_null() {
let next_next_leaf: &LeafNode15<P> = unsafe { &*next_next };
next_next_leaf.prefetch();
}
let next_version: u32 = next_leaf.version().stable();
if NodeVersion::is_deleted_version(next_version) {
return (ScanState::Retry, None);
}
let perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = next_leaf.permutation();
let pos: usize = lower_with_suffix(&self.cursor_key, next_leaf, &perm);
self.stack.update_state(next_version, perm, pos);
(ScanState::FindNext, None)
}
}
impl<P: LeafPolicy> ForwardScanCtx<P> {
#[inline]
pub fn advance_leaf(
&mut self,
guard: &LocalGuard<'_>,
) -> (ScanState, Option<ScanSnapshotPtr<P::Value>>) {
self.advance_leaf_generic::<PtrEmitter>(guard)
}
#[inline]
fn advance_leaf_generic<E: ScanEmitter<P>>(
&mut self,
guard: &LocalGuard<'_>,
) -> (ScanState, Option<E::Snapshot>) {
let leaf: &LeafNode15<P> = unsafe { self.stack.leaf_ref() };
let version: u32 = self.stack.version();
let next_raw: *mut LeafNode15<P> = leaf.next_raw(guard);
if Linker::is_marked(next_raw) {
leaf.wait_for_split();
return (ScanState::Retry, None);
}
if leaf.version().has_changed(version) {
return (ScanState::Retry, None);
}
let next: *mut LeafNode15<P> = next_raw.map_addr(|addr| addr & !1);
if next.is_null() {
return (ScanState::Up, None);
}
self.stack.set_leaf(next);
let next_leaf: &LeafNode15<P> = unsafe { &*next };
next_leaf.prefetch();
let next_next: *mut LeafNode15<P> = next_leaf.safe_next(guard);
if !next_next.is_null() {
let next_next_leaf: &LeafNode15<P> = unsafe { &*next_next };
next_next_leaf.prefetch();
}
let next_version: u32 = next_leaf.version().stable();
if NodeVersion::is_deleted_version(next_version) {
return (ScanState::Retry, None);
}
let perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = next_leaf.permutation();
let pos: usize = lower_with_suffix(&self.cursor_key, next_leaf, &perm);
self.stack.update_state(next_version, perm, pos);
(ScanState::FindNext, None)
}
pub fn find_retry(&mut self, guard: &LocalGuard<'_>) -> ScanState {
let leaf_ptr: *mut LeafNode15<P> =
reach_leaf_for_scan::<P>(self.stack.root(), &self.cursor_key, guard);
if leaf_ptr.is_null() {
return ScanState::Up;
}
self.stack.set_leaf(leaf_ptr);
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let version: u32 = leaf.version().stable();
if NodeVersion::is_deleted_version(version) {
return ScanState::Retry;
}
let perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = leaf.permutation();
let pos: usize = lower_with_suffix(&self.cursor_key, leaf, &perm);
self.stack.update_state(version, perm, pos);
ScanState::FindNext
}
pub fn handle_down(&mut self) {
self.cursor_key.shift_clear();
self.stack.set_last_ikey(0);
}
pub fn handle_up(&mut self, _guard: &LocalGuard<'_>) -> bool {
let Some(parent) = self.layer_stack.pop() else {
return false;
};
self.stack.set_root(parent.root);
self.stack.set_leaf(parent.leaf_ptr());
self.cursor_key.unshift();
let leaf: &LeafNode15<P> = unsafe { parent.leaf.as_ref() };
let version: u32 = leaf.version().stable();
let perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = leaf.permutation();
let pos: usize = lower_with_suffix(&self.cursor_key, leaf, &perm);
self.stack.update_state(version, perm, pos);
self.stack.set_last_ikey(0);
true
}
#[inline(always)]
pub(crate) fn step_transitions(&mut self, guard: &LocalGuard<'_>) -> StepResult {
match self.state {
ScanState::Down => {
self.flags.disable_single_layer_mode();
self.handle_down();
self.state = ScanState::Retry;
self.flags.require_duplicate_check();
StepResult::Continue
}
ScanState::Up => {
if !self.handle_up(guard) {
self.flags.mark_exhausted();
return StepResult::Exhausted;
}
self.state = ScanState::FindNext;
self.flags.require_duplicate_check();
StepResult::Continue
}
ScanState::Retry => {
self.state = self.find_retry(guard);
self.flags.require_duplicate_check();
StepResult::Continue
}
ScanState::Emit | ScanState::FindNext => StepResult::Ready,
}
}
}
impl<P: LeafPolicy> ForwardScanCtx<P> {
#[inline]
pub fn process_leaf_batch_ptr<F>(
&mut self,
end_bound: &RangeBound<'_>,
end_bound_ikey: Option<u64>,
visitor: &mut F,
count: &mut usize,
) -> LeafBatchResult
where
F: FnMut(&[u8], &P::Value) -> bool,
{
let (result, ki, root) = {
let mut ctx = self.build_batch_ctx();
let r = process_batch_keyed::<Forward, _, P>(
&mut ctx,
end_bound,
end_bound_ikey,
&mut RefSlotVisitor(visitor),
count,
&mut (),
);
(r, ctx.ki, ctx.root)
};
self.stack.set_ki(ki.cast_unsigned());
self.stack.set_root(root);
result
}
#[inline]
pub fn process_leaf_batch<F>(
&mut self,
end_bound: &RangeBound<'_>,
end_bound_ikey: Option<u64>,
visitor: &mut F,
count: &mut usize,
) -> LeafBatchResult
where
F: FnMut(&[u8], P::Output) -> bool,
{
let (result, ki, root) = {
let mut ctx = self.build_batch_ctx();
let r = process_batch_keyed::<Forward, _, P>(
&mut ctx,
end_bound,
end_bound_ikey,
&mut CopySlotVisitor(visitor),
count,
&mut (),
);
(r, ctx.ki, ctx.root)
};
self.stack.set_ki(ki.cast_unsigned());
self.stack.set_root(root);
result
}
#[inline]
pub fn process_leaf_batch_values(
&mut self,
end_bound_ikey: Option<u64>,
visitor: &mut impl FnMut(P::Output) -> bool,
count: &mut usize,
) -> LeafBatchResult {
let (result, ki, root) = {
let mut ctx = self.build_batch_ctx();
let r = process_batch_values::<Forward, P>(
&mut ctx,
end_bound_ikey,
visitor,
count,
&mut (),
);
(r, ctx.ki, ctx.root)
};
self.stack.set_ki(ki.cast_unsigned());
self.stack.set_root(root);
result
}
#[inline(always)]
fn build_batch_ctx(&mut self) -> BatchCtx<'_, P> {
let leaf_ptr: *mut LeafNode15<P> = self.stack.leaf_ptr();
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let perm = self.stack.perm();
let perm_size = perm.size();
BatchCtx {
leaf,
perm,
perm_size,
cached_version: self.stack.version(),
ki: self.stack.ki().cast_signed(),
root: self.stack.root(),
leaf_ptr,
cursor_key: &mut self.cursor_key,
layer_stack: &mut self.layer_stack,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BatchAction {
Continue,
Stopped,
Exhausted,
}
pub trait ForwardBatchStrategy<P: LeafPolicy> {
fn emit_initial(
&mut self,
ctx: &mut ForwardScanCtx<P>,
end_bound: &RangeBound<'_>,
count: &mut usize,
) -> BatchAction;
fn dup_check_entry(
&mut self,
ctx: &mut ForwardScanCtx<P>,
end_bound: &RangeBound<'_>,
guard: &LocalGuard<'_>,
count: &mut usize,
) -> BatchAction;
fn process_leaf(
&mut self,
ctx: &mut ForwardScanCtx<P>,
end_bound: &RangeBound<'_>,
guard: &LocalGuard<'_>,
count: &mut usize,
) -> LeafBatchResult;
}
impl<P: LeafPolicy> ForwardScanCtx<P> {
#[inline]
pub(crate) fn run_batch<S: ForwardBatchStrategy<P>>(
&mut self,
strategy: &mut S,
end_bound: &RangeBound<'_>,
guard: &LocalGuard<'_>,
) -> usize {
let mut count: usize = 0;
if self.state == ScanState::Emit {
match strategy.emit_initial(self, end_bound, &mut count) {
BatchAction::Continue => {
self.state = ScanState::FindNext;
}
BatchAction::Stopped => return count,
BatchAction::Exhausted => {
self.flags.mark_exhausted();
return count;
}
}
}
loop {
match self.step_transitions(guard) {
StepResult::Exhausted => return count,
StepResult::Continue => continue,
StepResult::Ready => {}
}
if self.stack.is_null() {
if self.layer_stack.is_empty() {
self.flags.mark_exhausted();
return count;
}
self.state = ScanState::Up;
continue;
}
let leaf: &LeafNode15<P> = unsafe { self.stack.leaf_ref() };
if leaf.version().is_deleted() {
self.state = ScanState::Retry;
continue;
}
if self.flags.needs_duplicate_check() {
self.flags.clear_duplicate_check();
match strategy.dup_check_entry(self, end_bound, guard, &mut count) {
BatchAction::Continue => continue,
BatchAction::Stopped => return count,
BatchAction::Exhausted => {
self.flags.mark_exhausted();
return count;
}
}
}
let result = strategy.process_leaf(self, end_bound, guard, &mut count);
match result {
LeafBatchResult::LeafExhausted => {
let (state, _) = self.advance_leaf(guard);
self.state = state;
}
LeafBatchResult::LayerEncountered => {
self.state = ScanState::Down;
}
LeafBatchResult::VersionChanged => {
self.state = ScanState::Retry;
}
LeafBatchResult::Stopped => return count,
LeafBatchResult::EndBoundExceeded => {
self.flags.mark_exhausted();
return count;
}
}
}
}
}
pub struct IntraLeafRefStrategy<'a, F> {
visitor: &'a mut F,
end_bound_ikey: Option<u64>,
}
impl<'a, F> IntraLeafRefStrategy<'a, F> {
#[inline]
pub const fn new(visitor: &'a mut F, end_bound_ikey: Option<u64>) -> Self {
Self {
visitor,
end_bound_ikey,
}
}
}
impl<P, F> ForwardBatchStrategy<P> for IntraLeafRefStrategy<'_, F>
where
P: RefLeafPolicy,
F: FnMut(&[u8], &P::Value) -> bool,
{
#[inline(always)]
fn emit_initial(
&mut self,
ctx: &mut ForwardScanCtx<P>,
end_bound: &RangeBound<'_>,
count: &mut usize,
) -> BatchAction {
let Some(snapshot) = ctx.snapshot.take() else {
return BatchAction::Continue;
};
let key: &[u8] = unsafe { ctx.cursor_key.full_key_unchecked() };
if !end_bound.contains(key) {
return BatchAction::Exhausted;
}
let value_ref: &P::Value =
unsafe { P::output_as_ref_sound(&snapshot.value, &mut ctx.scratch_value) };
*count += 1;
if (self.visitor)(key, value_ref) {
BatchAction::Continue
} else {
BatchAction::Stopped
}
}
#[inline(always)]
fn dup_check_entry(
&mut self,
ctx: &mut ForwardScanCtx<P>,
end_bound: &RangeBound<'_>,
guard: &LocalGuard<'_>,
count: &mut usize,
) -> BatchAction {
let (new_state, snapshot_ptr) = ctx.find_next_with_dup_check_ptr(guard);
ctx.state = new_state;
if new_state == ScanState::Emit
&& let Some(snap) = snapshot_ptr
{
let key: &[u8] = unsafe { ctx.cursor_key.full_key_unchecked() };
if !end_bound.contains(key) {
return BatchAction::Exhausted;
}
let value_ref: &P::Value =
unsafe { snap.resolve_value_ref::<P>(&mut ctx.scratch_value) };
*count += 1;
ctx.state = ScanState::FindNext;
if !(self.visitor)(key, value_ref) {
return BatchAction::Stopped;
}
}
BatchAction::Continue
}
#[inline(always)]
fn process_leaf(
&mut self,
ctx: &mut ForwardScanCtx<P>,
end_bound: &RangeBound<'_>,
_guard: &LocalGuard<'_>,
count: &mut usize,
) -> LeafBatchResult {
ctx.process_leaf_batch_ptr(end_bound, self.end_bound_ikey, self.visitor, count)
}
}
pub struct IntraLeafCopyStrategy<'a, F> {
visitor: &'a mut F,
end_bound_ikey: Option<u64>,
}
impl<'a, F> IntraLeafCopyStrategy<'a, F> {
#[inline]
pub const fn new(visitor: &'a mut F, end_bound_ikey: Option<u64>) -> Self {
Self {
visitor,
end_bound_ikey,
}
}
}
impl<P, F> ForwardBatchStrategy<P> for IntraLeafCopyStrategy<'_, F>
where
P: LeafPolicy,
F: FnMut(&[u8], P::Output) -> bool,
{
#[inline(always)]
fn emit_initial(
&mut self,
ctx: &mut ForwardScanCtx<P>,
end_bound: &RangeBound<'_>,
count: &mut usize,
) -> BatchAction {
let Some(snapshot) = ctx.snapshot.take() else {
return BatchAction::Continue;
};
let key: &[u8] = unsafe { ctx.cursor_key.full_key_unchecked() };
if !end_bound.contains(key) {
return BatchAction::Exhausted;
}
*count += 1;
if (self.visitor)(key, snapshot.value) {
BatchAction::Continue
} else {
BatchAction::Stopped
}
}
#[inline(always)]
fn dup_check_entry(
&mut self,
ctx: &mut ForwardScanCtx<P>,
end_bound: &RangeBound<'_>,
guard: &LocalGuard<'_>,
count: &mut usize,
) -> BatchAction {
let (new_state, snapshot) = ctx.find_next_with_dup_check(guard);
ctx.state = new_state;
if new_state == ScanState::Emit
&& let Some(snap) = snapshot
{
let key: &[u8] = unsafe { ctx.cursor_key.full_key_unchecked() };
if !end_bound.contains(key) {
return BatchAction::Exhausted;
}
*count += 1;
ctx.state = ScanState::FindNext;
if !(self.visitor)(key, snap.value) {
return BatchAction::Stopped;
}
}
BatchAction::Continue
}
#[inline(always)]
fn process_leaf(
&mut self,
ctx: &mut ForwardScanCtx<P>,
end_bound: &RangeBound<'_>,
_guard: &LocalGuard<'_>,
count: &mut usize,
) -> LeafBatchResult {
ctx.process_leaf_batch(end_bound, self.end_bound_ikey, self.visitor, count)
}
}
pub struct ValuesOnlyStrategy<'a, F> {
visitor: &'a mut F,
}
impl<'a, F> ValuesOnlyStrategy<'a, F> {
#[inline]
pub const fn new(visitor: &'a mut F) -> Self {
Self { visitor }
}
}
impl<P, F> ForwardBatchStrategy<P> for ValuesOnlyStrategy<'_, F>
where
P: LeafPolicy,
F: FnMut(P::Output) -> bool,
{
#[inline(always)]
fn emit_initial(
&mut self,
ctx: &mut ForwardScanCtx<P>,
_end_bound: &RangeBound<'_>,
count: &mut usize,
) -> BatchAction {
let Some(snapshot) = ctx.snapshot.take() else {
return BatchAction::Continue;
};
*count += 1;
if (self.visitor)(snapshot.value) {
BatchAction::Continue
} else {
BatchAction::Stopped
}
}
#[inline(always)]
fn dup_check_entry(
&mut self,
ctx: &mut ForwardScanCtx<P>,
_end_bound: &RangeBound<'_>,
guard: &LocalGuard<'_>,
count: &mut usize,
) -> BatchAction {
let (new_state, snapshot) = ctx.find_next_with_dup_check(guard);
ctx.state = new_state;
if new_state == ScanState::Emit
&& let Some(snap) = snapshot
{
*count += 1;
ctx.state = ScanState::FindNext;
if !(self.visitor)(snap.value) {
return BatchAction::Stopped;
}
}
BatchAction::Continue
}
#[inline(always)]
fn process_leaf(
&mut self,
ctx: &mut ForwardScanCtx<P>,
end_bound: &RangeBound<'_>,
_guard: &LocalGuard<'_>,
count: &mut usize,
) -> LeafBatchResult {
let end_bound_ikey: Option<u64> = end_bound.extract_ikey_at(ctx.cursor_key.offset());
ctx.process_leaf_batch_values(end_bound_ikey, self.visitor, count)
}
}
impl<P: LeafPolicy> ForwardScanCtx<P> {
#[inline(always)]
pub fn advance_no_alloc(
&mut self,
end_bound: &RangeBound<'_>,
guard: &LocalGuard<'_>,
) -> Option<(&[u8], P::Output)>
where
P::Output: Clone,
{
if self.state == ScanState::Emit
&& let Some(snapshot) = self.snapshot.take()
{
#[cfg(debug_assertions)]
{
let key_copy = self.cursor_key.full_key().to_vec();
self.assert_ordering(&key_copy);
}
let key: &[u8] = unsafe { self.cursor_key.full_key_unchecked() };
if !end_bound.contains(key) {
self.flags.mark_exhausted();
return None;
}
self.state = ScanState::FindNext;
return Some((key, snapshot.value));
}
loop {
#[cfg(debug_assertions)]
let (pre_state, pre_cursor) = (self.state, self.cursor_key.debug_state());
match self.step_transitions(guard) {
StepResult::Exhausted => {
#[cfg(debug_assertions)]
self.record_transition(format!("Up -> Exhausted: pre={pre_cursor}"));
return None;
}
StepResult::Continue => {
#[cfg(debug_assertions)]
self.record_transition(format!(
"{pre_state:?} -> {:?}: pre={pre_cursor}, post={}",
self.state,
self.cursor_key.debug_state()
));
continue;
}
StepResult::Ready => {}
}
let (new_state, snapshot) = if self.flags.needs_duplicate_check() {
self.flags.clear_duplicate_check();
self.find_next_with_dup_check(guard)
} else {
self.find_next(guard)
};
self.state = new_state;
if new_state == ScanState::Emit
&& let Some(snap) = snapshot
{
#[cfg(debug_assertions)]
{
let key_copy = self.cursor_key.full_key().to_vec();
self.assert_ordering(&key_copy);
}
let key = unsafe { self.cursor_key.full_key_unchecked() };
if !end_bound.contains(key) {
self.flags.mark_exhausted();
return None;
}
self.state = ScanState::FindNext;
return Some((key, snap.value));
}
self.snapshot = snapshot;
}
}
#[inline(always)]
#[expect(clippy::too_many_lines, reason = "Complex allocation logic")]
pub fn advance_no_alloc_ref<'s>(
&'s mut self,
end_bound: &RangeBound<'_>,
guard: &LocalGuard<'_>,
) -> Option<(&'s [u8], &'s P::Value)>
where
P: RefLeafPolicy,
{
if self.state == ScanState::Emit && self.snapshot.is_some() {
let key = unsafe { self.cursor_key.full_key_unchecked() };
if !end_bound.contains(key) {
self.flags.mark_exhausted();
return None;
}
let snapshot: ScanSnapshot<P> = self.snapshot.take()?;
self.state = ScanState::FindNext;
self.last_output = Some(snapshot.value);
let value_ref: &P::Value = unsafe {
P::output_as_ref_sound(self.last_output.as_ref().unwrap(), &mut self.scratch_value)
};
return Some((key, value_ref));
}
loop {
if self.flags.single_layer_mode() {
if self.state == ScanState::Retry {
self.state = self.find_retry(guard);
self.flags.require_duplicate_check();
continue;
}
let (new_state, snapshot_ptr) =
self.find_next_single_layer_ptr(guard, self.flags.needs_duplicate_check());
if self.flags.needs_duplicate_check() {
self.flags.clear_duplicate_check();
}
self.state = new_state;
match new_state {
ScanState::Emit => {
if let Some(snap) = snapshot_ptr {
let key = unsafe { self.cursor_key.full_key_unchecked() };
if !end_bound.contains(key) {
self.flags.mark_exhausted();
return None;
}
self.state = ScanState::FindNext;
let value_ref: &P::Value =
unsafe { snap.resolve_value_ref::<P>(&mut self.scratch_value) };
return Some((key, value_ref));
}
}
ScanState::FindNext => {
if self.stack.is_null() {
self.flags.mark_exhausted();
return None;
}
continue;
}
ScanState::Retry => continue,
ScanState::Down => {
self.flags.disable_single_layer_mode();
self.layer_stack
.push(LayerContext::new(self.stack.root(), self.stack.leaf_ptr()));
let Some(slot) = self.stack.kp() else {
debug_assert!(
false,
"Down state entered without valid slot - state machine bug"
);
self.state = ScanState::Retry;
continue;
};
let leaf: &LeafNode15<P> = unsafe { self.stack.leaf_ref() };
let layer_ptr: *mut u8 = leaf.load_layer_raw(slot);
self.stack.set_root(layer_ptr);
}
ScanState::Up => {
self.flags.mark_exhausted();
return None;
}
}
}
match self.step_transitions(guard) {
StepResult::Exhausted => return None,
StepResult::Continue => continue,
StepResult::Ready => {}
}
let (new_state, snapshot_ptr) = if self.flags.needs_duplicate_check() {
self.flags.clear_duplicate_check();
self.find_next_with_dup_check_ptr(guard)
} else {
self.find_next_ptr(guard)
};
self.state = new_state;
if new_state == ScanState::Emit
&& let Some(snap) = snapshot_ptr
{
let key = unsafe { self.cursor_key.full_key_unchecked() };
if !end_bound.contains(key) {
self.flags.mark_exhausted();
return None;
}
self.state = ScanState::FindNext;
let value_ref: &P::Value =
unsafe { snap.resolve_value_ref::<P>(&mut self.scratch_value) };
return Some((key, value_ref));
}
}
}
}