use std::cmp::Ordering;
use std::ptr as StdPtr;
use seize::LocalGuard;
use crate::key::IKEY_SIZE;
use crate::leaf_trait::TreeLeafNode;
use crate::leaf15::LeafNode15;
use crate::leaf15::{KSUF_KEYLENX, LAYER_KEYLENX};
use crate::nodeversion::NodeVersion;
use crate::policy::LeafPolicy;
use crate::prefetch::prefetch_read;
use super::batch_common::{
Backward, BatchCtx, CloneEmitter, CopySlotVisitor, PtrEmitter, RefSlotVisitor, ScanEmitter,
process_batch_keyed, process_batch_values,
};
use super::cursor_key::CursorKey;
use super::find_rev::LeafBatchResultBack;
use super::helper::ReverseScanHelper;
use super::iterator::RangeBound;
use super::iterator::iter_flags::ReverseFlags;
use super::scan_state::{
BackStackElement, LayerContext, LayerStack, ScanSnapshot, ScanSnapshotPtr, ScanStateBack,
};
use super::traversal::reach_leaf_for_scan;
enum InitialPositionResult<P: LeafPolicy> {
Emit(ScanSnapshot<P>),
LayerDescent(*const u8),
FindPrev,
#[expect(dead_code, reason = "Reserved for future use")]
Up,
Retry,
}
enum EmitResult<P: LeafPolicy> {
Emit(ScanSnapshot<P>),
NoMatch,
VersionChanged,
}
pub struct ReverseScanCtx<P: LeafPolicy> {
pub(crate) stack: BackStackElement<P>,
pub(crate) layer_stack: LayerStack<P>,
pub(crate) cursor_key: CursorKey,
pub(crate) state: ScanStateBack,
pub(crate) snapshot: Option<ScanSnapshot<P>>,
pub(crate) flags: ReverseFlags,
#[cfg(debug_assertions)]
#[allow(dead_code)]
pub(crate) debug_last_emitted_key_back: Option<Vec<u8>>,
}
impl<P: LeafPolicy> ReverseScanCtx<P> {
#[inline]
pub fn new_with_bound(root: *const u8, cursor_key: CursorKey, emit_equal: bool) -> Self {
Self {
stack: BackStackElement::new(root),
layer_stack: LayerStack::new(),
cursor_key,
state: ScanStateBack::FindPrev,
snapshot: None,
flags: ReverseFlags::with_values(emit_equal),
#[cfg(debug_assertions)]
debug_last_emitted_key_back: None,
}
}
}
impl<P: LeafPolicy> ReverseScanCtx<P> {
pub fn find_initial_reverse(
&mut self,
root: *const u8,
emit_equal: bool,
guard: &LocalGuard<'_>,
) -> (ScanStateBack, Option<ScanSnapshot<P>>)
where
P::Output: Clone,
{
let mut current_root: *const u8 = root;
loop {
self.stack.set_root(current_root);
let mut leaf_ptr: *mut LeafNode15<P> =
reach_leaf_for_scan::<P>(current_root, &self.cursor_key, guard);
if leaf_ptr.is_null() {
return (ScanStateBack::Up, None);
}
let version: u32 =
ReverseScanHelper::stable_reverse(&mut leaf_ptr, &self.cursor_key, guard);
self.stack.set_leaf(leaf_ptr);
if NodeVersion::is_deleted_version(version) {
return (ScanStateBack::Retry, None);
}
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = leaf.permutation();
let size: usize = perm.size();
if size == 0 {
return (ScanStateBack::Up, None);
}
match self.try_initial_position_reverse(
leaf,
perm,
size,
version,
current_root,
leaf_ptr,
emit_equal,
) {
InitialPositionResult::Emit(snapshot) => {
return (ScanStateBack::Emit, Some(snapshot));
}
InitialPositionResult::LayerDescent(layer_ptr) => {
current_root = layer_ptr;
}
InitialPositionResult::FindPrev => {
if leaf.version().has_changed(version) {
return (ScanStateBack::Retry, None);
}
return (ScanStateBack::FindPrev, None);
}
InitialPositionResult::Up => {
return (ScanStateBack::Up, None);
}
InitialPositionResult::Retry => {
return (ScanStateBack::Retry, None);
}
}
}
}
#[expect(clippy::too_many_arguments, reason = "Internal helper")]
fn try_initial_position_reverse(
&mut self,
leaf: &LeafNode15<P>,
perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm,
size: usize,
version: u32,
current_root: *const u8,
leaf_ptr: *mut LeafNode15<P>,
emit_equal: bool,
) -> InitialPositionResult<P>
where
P::Output: Clone,
{
let ki: isize = ReverseScanHelper::lower_reverse(
&self.cursor_key,
leaf,
&perm,
self.flags.upper_bound(),
);
if !(ki >= 0 && ki.cast_unsigned() < size) {
self.stack.update_state(version, perm, ki);
return InitialPositionResult::FindPrev;
}
let slot: usize = perm.get(ki.cast_unsigned());
let keylenx: u8 = leaf.keylenx_relaxed(slot);
let slot_ikey: u64 = leaf.ikey_relaxed(slot);
if keylenx >= LAYER_KEYLENX {
return self.handle_layer_descent_reverse(
current_root,
leaf_ptr,
slot,
slot_ikey,
version,
perm,
ki,
leaf,
);
}
match Self::try_emit_slot_reverse(
leaf,
slot,
slot_ikey,
keylenx,
version,
&perm,
ki,
&mut self.cursor_key,
&mut self.stack,
emit_equal,
self.flags.upper_bound(),
) {
EmitResult::Emit(snapshot) => InitialPositionResult::Emit(snapshot),
EmitResult::NoMatch => {
self.stack.update_state(version, perm, ki - 1);
InitialPositionResult::FindPrev
}
EmitResult::VersionChanged => {
InitialPositionResult::Retry
}
}
}
#[expect(clippy::too_many_arguments, reason = "Internal helper")]
fn handle_layer_descent_reverse(
&mut self,
current_root: *const u8,
leaf_ptr: *mut LeafNode15<P>,
slot: usize,
slot_ikey: u64,
version: u32,
perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm,
ki: isize,
leaf: &LeafNode15<P>,
) -> InitialPositionResult<P>
where
P::Output: Clone,
{
self.layer_stack
.push(LayerContext::new(current_root, leaf_ptr));
self.cursor_key.assign_store_ikey(slot_ikey);
let layer_ptr: *mut u8 = leaf.load_layer_raw(slot);
prefetch_read(layer_ptr);
self.stack.update_state(version, perm, ki - 1);
if self.cursor_key.has_suffix() {
self.cursor_key.shift();
} else {
self.cursor_key.shift_clear_reverse();
self.flags.set_upper_bound();
}
InitialPositionResult::LayerDescent(layer_ptr.cast_const())
}
#[inline]
#[expect(clippy::too_many_arguments, reason = "Internal helper")]
fn try_emit_slot_reverse(
leaf: &LeafNode15<P>,
slot: usize,
slot_ikey: u64,
keylenx: u8,
version: u32,
perm: &<LeafNode15<P> as TreeLeafNode<P>>::Perm,
ki: isize,
cursor_key: &mut CursorKey,
stack: &mut BackStackElement<P>,
emit_equal: bool,
upper_bound: bool,
) -> EmitResult<P>
where
P::Output: Clone,
{
leaf.prefetch_value(slot);
if keylenx == KSUF_KEYLENX {
return Self::try_emit_suffix_slot_reverse(
leaf,
slot,
slot_ikey,
version,
perm,
ki,
cursor_key,
stack,
emit_equal,
upper_bound,
);
}
if !emit_equal && !upper_bound {
return EmitResult::NoMatch;
}
if leaf.is_value_empty_relaxed(slot) {
return EmitResult::NoMatch;
}
if leaf.version().has_changed(version) {
return EmitResult::VersionChanged;
}
let Some(output) = leaf.load_value(slot) else {
return EmitResult::NoMatch;
};
#[expect(clippy::cast_possible_truncation, reason = "Known const")]
let key_len: usize = std::cmp::min(keylenx, IKEY_SIZE as u8) as usize;
cursor_key.assign_store_ikey(slot_ikey);
cursor_key.assign_store_length(key_len);
stack.update_state(version, *perm, ki - 1);
EmitResult::Emit(ScanSnapshot {
value: output,
key_len,
})
}
#[inline]
#[expect(clippy::too_many_arguments, reason = "Internal helper")]
fn try_emit_suffix_slot_reverse(
leaf: &LeafNode15<P>,
slot: usize,
slot_ikey: u64,
version: u32,
perm: &<LeafNode15<P> as TreeLeafNode<P>>::Perm,
ki: isize,
cursor_key: &mut CursorKey,
stack: &mut BackStackElement<P>,
emit_equal: bool,
upper_bound: bool,
) -> EmitResult<P>
where
P::Output: Clone,
{
let Some(stored_suffix) = leaf.ksuf(slot) else {
return EmitResult::NoMatch;
};
if !upper_bound {
let cmp: Ordering = stored_suffix.cmp(cursor_key.suffix());
if !ReverseScanHelper::initial_ksuf_match_reverse(cmp, emit_equal) {
return EmitResult::NoMatch;
}
}
if leaf.is_value_empty_relaxed(slot) {
return EmitResult::NoMatch;
}
if leaf.version().has_changed(version) {
return EmitResult::VersionChanged;
}
let Some(output) = leaf.load_value(slot) else {
return EmitResult::NoMatch;
};
let key_len: usize = IKEY_SIZE + stored_suffix.len();
cursor_key.assign_store_ikey(slot_ikey);
cursor_key.assign_store_suffix(stored_suffix);
cursor_key.assign_store_length(key_len);
stack.update_state(version, *perm, ki - 1);
EmitResult::Emit(ScanSnapshot {
value: output,
key_len,
})
}
}
impl<P: LeafPolicy> ReverseScanCtx<P> {
#[inline]
pub(crate) fn find_prev(
&mut self,
guard: &LocalGuard<'_>,
) -> (ScanStateBack, Option<ScanSnapshot<P>>)
where
P::Output: Clone,
{
self.find_prev_generic::<CloneEmitter>(guard, false)
}
#[inline]
pub(crate) fn find_prev_with_dup_check(
&mut self,
guard: &LocalGuard<'_>,
) -> (ScanStateBack, Option<ScanSnapshot<P>>)
where
P::Output: Clone,
{
self.find_prev_generic::<CloneEmitter>(guard, true)
}
#[inline]
#[allow(dead_code, reason = "Zero-copy reverse scan API")]
pub(crate) fn find_prev_ptr(
&mut self,
guard: &LocalGuard<'_>,
) -> (ScanStateBack, Option<ScanSnapshotPtr<P::Value>>)
where
P: LeafPolicy,
{
self.find_prev_generic::<PtrEmitter>(guard, false)
}
#[inline]
#[allow(dead_code, reason = "Zero-copy reverse scan API")]
pub(crate) fn find_prev_with_dup_check_ptr(
&mut self,
guard: &LocalGuard<'_>,
) -> (ScanStateBack, Option<ScanSnapshotPtr<P::Value>>)
where
P: LeafPolicy,
{
self.find_prev_generic::<PtrEmitter>(guard, true)
}
#[inline]
fn find_prev_generic<E: ScanEmitter<P>>(
&mut self,
guard: &LocalGuard<'_>,
needs_duplicate_check: bool,
) -> (ScanStateBack, Option<E::Snapshot>) {
let leaf_ptr: *mut LeafNode15<P> = self.stack.get_leaf_ptr();
if leaf_ptr.is_null() {
return (ScanStateBack::Up, None);
}
let ki: isize = self.stack.get_ki();
if ki < 0 {
return self.advance_to_prev_leaf_generic::<E>(guard);
}
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let version: u32 = self.stack.get_version();
if leaf.version().has_changed(version) {
return self.reposition_back_generic::<E>(guard);
}
let perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = *self.stack.get_perm_ref();
let size: usize = perm.size();
if ki.unsigned_abs() >= size {
return self.advance_to_prev_leaf_generic::<E>(guard);
}
self.process_slot_generic::<E>(leaf, leaf_ptr, perm, version, ki, needs_duplicate_check)
}
#[inline]
#[expect(clippy::too_many_arguments, reason = "Internal helper")]
fn process_slot_generic<E: ScanEmitter<P>>(
&mut self,
leaf: &LeafNode15<P>,
leaf_ptr: *mut LeafNode15<P>,
perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm,
version: u32,
ki: isize,
needs_duplicate_check: bool,
) -> (ScanStateBack, Option<E::Snapshot>) {
let slot: usize = perm.get(ki.unsigned_abs());
let slot_ikey: u64 = leaf.ikey_relaxed(slot);
let keylenx: u8 = leaf.keylenx_relaxed(slot);
if slot_ikey > self.stack.last_ikey() {
return (ScanStateBack::Retry, None);
}
if ki > 0 {
let next_slot: usize = perm.get((ki - 1).unsigned_abs());
leaf.prefetch_value(next_slot);
}
if needs_duplicate_check
&& ReverseScanHelper::is_duplicate_reverse(
&self.cursor_key,
slot_ikey,
keylenx,
self.flags.upper_bound(),
)
{
self.stack.set_ki(ReverseScanHelper::prev(ki));
return (ScanStateBack::FindPrev, None);
}
if keylenx >= LAYER_KEYLENX {
return self.handle_layer_pointer_generic::<E>(
leaf, leaf_ptr, slot, slot_ikey, version, perm, ki,
);
}
self.try_emit_generic::<E>(leaf, slot, slot_ikey, keylenx, version, &perm, ki)
}
#[inline]
#[expect(clippy::too_many_arguments, reason = "Internal helper")]
fn handle_layer_pointer_generic<E: ScanEmitter<P>>(
&mut self,
leaf: &LeafNode15<P>,
leaf_ptr: *mut LeafNode15<P>,
slot: usize,
slot_ikey: u64,
version: u32,
perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm,
ki: isize,
) -> (ScanStateBack, Option<E::Snapshot>) {
self.layer_stack
.push(LayerContext::new(self.stack.get_root(), leaf_ptr));
self.cursor_key.assign_store_ikey(slot_ikey);
let layer_ptr: *mut u8 = leaf.load_layer_raw(slot);
prefetch_read(layer_ptr);
self.stack.set_root(layer_ptr.cast_const());
self.stack
.update_state(version, perm, ReverseScanHelper::prev(ki));
(ScanStateBack::Down, None)
}
#[inline]
#[expect(clippy::too_many_arguments, reason = "Internal helper")]
fn try_emit_generic<E: ScanEmitter<P>>(
&mut self,
leaf: &LeafNode15<P>,
slot: usize,
slot_ikey: u64,
keylenx: u8,
version: u32,
perm: &<LeafNode15<P> as TreeLeafNode<P>>::Perm,
ki: isize,
) -> (ScanStateBack, Option<E::Snapshot>) {
leaf.prefetch_value(slot);
if leaf.is_value_empty_relaxed(slot) {
self.stack.set_ki(ReverseScanHelper::prev(ki));
return (ScanStateBack::FindPrev, None);
}
if leaf.version().has_changed(version) {
return (ScanStateBack::Retry, None);
}
self.cursor_key.assign_store_ikey(slot_ikey);
self.flags.clear_upper_bound();
let key_len: usize = if keylenx == KSUF_KEYLENX {
leaf.ksuf(slot).map_or(IKEY_SIZE, |suffix| {
self.cursor_key.assign_store_suffix(suffix);
IKEY_SIZE + suffix.len()
})
} else {
#[expect(clippy::cast_possible_truncation, reason = "Known const")]
let len: usize = std::cmp::min(keylenx, IKEY_SIZE as u8) as usize;
len
};
self.cursor_key.assign_store_length(key_len);
let Some(snapshot) = E::emit_value(leaf, slot, key_len) else {
self.stack.set_ki(ReverseScanHelper::prev(ki));
return (ScanStateBack::FindPrev, None);
};
self.stack.set_last_ikey(slot_ikey);
self.stack
.update_state(version, *perm, ReverseScanHelper::prev(ki));
(ScanStateBack::Emit, Some(snapshot))
}
}
impl<P: LeafPolicy> ReverseScanCtx<P> {
#[inline]
fn advance_to_prev_leaf_generic<E: ScanEmitter<P>>(
&mut self,
guard: &LocalGuard<'_>,
) -> (ScanStateBack, Option<E::Snapshot>) {
let current_ptr: *mut LeafNode15<P> = self.stack.get_leaf_ptr();
let current_leaf: &LeafNode15<P> = unsafe { &*current_ptr };
let ikey_bound: u64 = current_leaf.ikey_bound();
self.cursor_key.assign_store_ikey(ikey_bound);
self.cursor_key.assign_store_length(0);
let prev_ptr: *mut LeafNode15<P> = ReverseScanHelper::retreat(current_leaf);
if prev_ptr.is_null() {
self.stack.set_leaf(StdPtr::null_mut());
return (ScanStateBack::Up, None);
}
prefetch_read(prev_ptr.cast::<u8>());
self.stack.set_leaf(prev_ptr);
let mut leaf_ptr: *mut LeafNode15<P> = prev_ptr;
let version: u32 =
ReverseScanHelper::stable_reverse(&mut leaf_ptr, &self.cursor_key, guard);
if leaf_ptr != prev_ptr {
self.stack.set_leaf(leaf_ptr);
}
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let prev_prev: *mut LeafNode15<P> = leaf.prev(guard);
if !prev_prev.is_null() {
prefetch_read(prev_prev.cast::<u8>());
}
let perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = leaf.permutation();
let size: usize = perm.size();
let ki: isize = if size > 0 {
(size - 1).cast_signed()
} else {
-1 };
self.stack.set_last_ikey(u64::MAX);
self.stack.update_state(version, perm, ki);
(ScanStateBack::FindPrev, None)
}
}
impl<P: LeafPolicy> ReverseScanCtx<P> {
pub(crate) fn reposition_back(
&mut self,
guard: &LocalGuard<'_>,
) -> (ScanStateBack, Option<ScanSnapshot<P>>)
where
P::Output: Clone,
{
self.reposition_back_generic::<CloneEmitter>(guard)
}
fn reposition_back_generic<E: ScanEmitter<P>>(
&mut self,
guard: &LocalGuard<'_>,
) -> (ScanStateBack, Option<E::Snapshot>) {
const MAX_REPOSITION_RETRIES: u32 = 16;
let root: *const u8 = self.stack.get_root();
if root.is_null() {
return (ScanStateBack::Up, None);
}
for _retry in 0..MAX_REPOSITION_RETRIES {
let mut leaf_ptr: *mut LeafNode15<P> =
reach_leaf_for_scan::<P>(root, &self.cursor_key, guard);
if leaf_ptr.is_null() {
self.stack.set_leaf(StdPtr::null_mut());
return (ScanStateBack::Up, None);
}
let version: u32 =
ReverseScanHelper::stable_reverse(&mut leaf_ptr, &self.cursor_key, guard);
if NodeVersion::is_deleted_version(version) {
continue;
}
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = leaf.permutation();
let ki: isize = ReverseScanHelper::lower_reverse(
&self.cursor_key,
leaf,
&perm,
self.flags.upper_bound(),
);
self.stack.set_leaf(leaf_ptr);
self.stack.set_last_ikey(u64::MAX);
self.stack.update_state(version, perm, ki);
return (ScanStateBack::FindPrev, None);
}
(ScanStateBack::Retry, None)
}
}
impl<P: LeafPolicy> ReverseScanCtx<P> {
#[inline(always)]
pub(crate) fn handle_down_back(&mut self) {
self.cursor_key.shift_clear_reverse();
self.flags.set_upper_bound();
self.stack.set_last_ikey(u64::MAX);
}
pub(crate) fn handle_up_back(&mut self, guard: &LocalGuard<'_>) -> bool {
let completed_layer_root: *const u8 = self.stack.get_root();
let Some(parent_ctx) = self.layer_stack.pop() else {
return false;
};
self.stack.set_root(parent_ctx.root);
self.stack.set_leaf(parent_ctx.leaf.as_ptr());
self.cursor_key.unshift();
if self.cursor_key.is_at_empty_root() {
return self.handle_up_back(guard);
}
let mut leaf_ptr: *mut LeafNode15<P> = parent_ctx.leaf.as_ptr();
let version: u32 =
ReverseScanHelper::stable_reverse(&mut leaf_ptr, &self.cursor_key, guard);
if leaf_ptr != parent_ctx.leaf.as_ptr() {
self.stack.set_leaf(leaf_ptr);
}
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = leaf.permutation();
self.flags.clear_upper_bound();
let mut anchored_ki: Option<isize> = None;
for pos in 0..perm.size() {
let slot: usize = perm.get(pos);
if leaf.keylenx_relaxed(slot) >= LAYER_KEYLENX {
let layer_ptr: *const u8 = leaf.load_layer_raw(slot).cast_const();
if layer_ptr == completed_layer_root {
anchored_ki = Some(pos.cast_signed() - 1);
break;
}
}
}
let ki: isize = anchored_ki.unwrap_or_else(|| {
ReverseScanHelper::lower_reverse(
&self.cursor_key,
leaf,
&perm,
self.flags.upper_bound(),
)
});
self.stack.set_last_ikey(u64::MAX);
self.stack.update_state(version, perm, ki);
true
}
}
impl<P: LeafPolicy> ReverseScanCtx<P> {
#[inline]
#[expect(clippy::cast_possible_truncation)]
pub(crate) fn find_prev_single_layer(
&mut self,
guard: &LocalGuard<'_>,
needs_duplicate_check: bool,
) -> (ScanStateBack, Option<ScanSnapshot<P>>)
where
P::Output: Clone,
{
let leaf_ptr: *mut LeafNode15<P> = self.stack.get_leaf_ptr();
if leaf_ptr.is_null() {
return (ScanStateBack::FindPrev, None);
}
let ki: isize = self.stack.get_ki();
if ki < 0 {
return self.advance_prev_leaf_single_layer_snapshot(guard);
}
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let version: u32 = self.stack.get_version();
if leaf.version().has_changed(version) {
return (ScanStateBack::Retry, None);
}
let perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = *self.stack.get_perm_ref();
let perm_size: usize = perm.size();
if ki.unsigned_abs() >= perm_size {
return self.advance_prev_leaf_single_layer_snapshot(guard);
}
let slot: usize = perm.get(ki.unsigned_abs());
let slot_ikey: u64 = leaf.ikey_relaxed(slot);
let slot_keylenx: u8 = leaf.keylenx_relaxed(slot);
if slot_ikey > self.stack.last_ikey() {
return (ScanStateBack::Retry, None);
}
if needs_duplicate_check
&& ReverseScanHelper::is_duplicate_reverse(
&self.cursor_key,
slot_ikey,
slot_keylenx,
self.flags.upper_bound(),
)
{
self.stack.set_ki(ki - 1);
return (ScanStateBack::FindPrev, None);
}
if slot_keylenx > IKEY_SIZE as u8 {
return (ScanStateBack::Down, None);
}
leaf.prefetch_value(slot);
if leaf.is_value_empty_relaxed(slot) {
self.stack.set_ki(ki - 1);
return (ScanStateBack::FindPrev, None);
}
let key_len: usize = slot_keylenx as usize;
self.cursor_key.assign_store_ikey(slot_ikey);
self.cursor_key.assign_store_length(key_len);
self.cursor_key.mark_key_complete();
self.flags.clear_upper_bound();
self.stack.set_last_ikey(slot_ikey);
self.stack.set_ki(ki - 1);
let Some(output) = leaf.load_value(slot) else {
return (ScanStateBack::FindPrev, None);
};
(
ScanStateBack::Emit,
Some(ScanSnapshot {
value: output,
key_len,
}),
)
}
#[inline(always)]
fn advance_prev_leaf_single_layer_snapshot(
&mut self,
guard: &LocalGuard<'_>,
) -> (ScanStateBack, Option<ScanSnapshot<P>>) {
let leaf_ptr: *mut LeafNode15<P> = self.stack.get_leaf_ptr();
if leaf_ptr.is_null() {
return (ScanStateBack::FindPrev, None);
}
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let version: u32 = self.stack.get_version();
if leaf.version().has_changed(version) {
return (ScanStateBack::Retry, None);
}
let ikey_bound: u64 = leaf.ikey_bound();
self.cursor_key.assign_store_ikey(ikey_bound);
self.cursor_key.assign_store_length(0);
let prev: *mut LeafNode15<P> = leaf.prev(guard);
if prev.is_null() {
self.stack.set_leaf(StdPtr::null_mut());
return (ScanStateBack::FindPrev, None);
}
self.stack.set_leaf(prev);
let prev_leaf: &LeafNode15<P> = unsafe { &*prev };
prev_leaf.prefetch();
let prev_prev: *mut LeafNode15<P> = prev_leaf.prev(guard);
if !prev_prev.is_null() {
let prev_prev_leaf: &LeafNode15<P> = unsafe { &*prev_prev };
prev_prev_leaf.prefetch();
}
let prev_version: u32 = prev_leaf.version().stable();
if NodeVersion::is_deleted_version(prev_version) {
return (ScanStateBack::Retry, None);
}
let perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = prev_leaf.permutation();
let size: usize = perm.size();
let ki: isize = if size > 0 {
(size - 1).cast_signed()
} else {
-1
};
self.stack.set_last_ikey(u64::MAX);
self.stack.update_state(prev_version, perm, ki);
(ScanStateBack::FindPrev, None)
}
}
#[inline(always)]
fn build_reverse_batch_ctx<'a, P: LeafPolicy>(
stack: &'a BackStackElement<P>,
cursor_key: &'a mut CursorKey,
layer_stack: &'a mut LayerStack<P>,
) -> BatchCtx<'a, P> {
let leaf_ptr: *mut LeafNode15<P> = stack.get_leaf_ptr();
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let perm = *stack.get_perm_ref();
let perm_size = perm.size();
BatchCtx {
leaf,
perm,
perm_size,
cached_version: stack.get_version(),
ki: stack.get_ki(),
root: stack.get_root(),
leaf_ptr,
cursor_key,
layer_stack,
}
}
impl<P: LeafPolicy> ReverseScanCtx<P> {
#[inline]
pub(crate) fn process_prev_leaf_batch_ptr<F>(
&mut self,
start_bound: &RangeBound<'_>,
start_bound_ikey: Option<u64>,
visitor: &mut F,
count: &mut usize,
) -> LeafBatchResultBack
where
F: FnMut(&[u8], &P::Value) -> bool,
{
let (result, ki, root) = {
let mut ctx =
build_reverse_batch_ctx(&self.stack, &mut self.cursor_key, &mut self.layer_stack);
let r = process_batch_keyed::<Backward, _, P>(
&mut ctx,
start_bound,
start_bound_ikey,
&mut RefSlotVisitor(visitor),
count,
&mut self.flags,
);
(r, ctx.ki, ctx.root)
};
self.stack.set_ki(ki);
self.stack.set_root(root);
result
}
#[inline]
pub(crate) fn process_prev_leaf_batch<F>(
&mut self,
start_bound: &RangeBound<'_>,
start_bound_ikey: Option<u64>,
visitor: &mut F,
count: &mut usize,
) -> LeafBatchResultBack
where
F: FnMut(&[u8], P::Output) -> bool,
{
let (result, ki, root) = {
let mut ctx =
build_reverse_batch_ctx(&self.stack, &mut self.cursor_key, &mut self.layer_stack);
let r = process_batch_keyed::<Backward, _, P>(
&mut ctx,
start_bound,
start_bound_ikey,
&mut CopySlotVisitor(visitor),
count,
&mut self.flags,
);
(r, ctx.ki, ctx.root)
};
self.stack.set_ki(ki);
self.stack.set_root(root);
result
}
#[inline]
pub(crate) fn process_prev_leaf_batch_values<F>(
&mut self,
start_bound_ikey: Option<u64>,
visitor: &mut F,
count: &mut usize,
) -> LeafBatchResultBack
where
F: FnMut(P::Output) -> bool,
{
let (result, ki, root) = {
let mut ctx =
build_reverse_batch_ctx(&self.stack, &mut self.cursor_key, &mut self.layer_stack);
let r = process_batch_values::<Backward, P>(
&mut ctx,
start_bound_ikey,
visitor,
count,
&mut self.flags,
);
(r, ctx.ki, ctx.root)
};
self.stack.set_ki(ki);
self.stack.set_root(root);
result
}
#[inline]
pub(crate) fn advance_prev_leaf(&mut self, guard: &LocalGuard<'_>) -> bool {
let current_ptr: *mut LeafNode15<P> = self.stack.get_leaf_ptr();
if current_ptr.is_null() {
return false;
}
let current_leaf: &LeafNode15<P> = unsafe { &*current_ptr };
let ikey_bound: u64 = current_leaf.ikey_bound();
self.cursor_key.assign_store_ikey(ikey_bound);
self.cursor_key.assign_store_length(0);
let prev_ptr: *mut LeafNode15<P> = ReverseScanHelper::retreat(current_leaf);
if prev_ptr.is_null() {
self.stack.set_leaf(StdPtr::null_mut());
return false;
}
prefetch_read(prev_ptr.cast::<u8>());
self.stack.set_leaf(prev_ptr);
let mut leaf_ptr: *mut LeafNode15<P> = prev_ptr;
let version: u32 =
ReverseScanHelper::stable_reverse(&mut leaf_ptr, &self.cursor_key, guard);
if leaf_ptr != prev_ptr {
self.stack.set_leaf(leaf_ptr);
}
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let prev_prev: *mut LeafNode15<P> = leaf.prev(guard);
if !prev_prev.is_null() {
prefetch_read(prev_prev.cast::<u8>());
}
let perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = leaf.permutation();
let size: usize = perm.size();
let ki: isize = if size > 0 {
(size - 1).cast_signed()
} else {
-1
};
self.stack.update_state(version, perm, ki);
true
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReverseBatchAction {
Continue,
Stopped,
Exhausted,
}
pub trait ReverseBatchStrategy<P: LeafPolicy> {
fn emit_initial(
&mut self,
ctx: &mut ReverseScanCtx<P>,
start_bound: &RangeBound<'_>,
count: &mut usize,
) -> ReverseBatchAction;
fn process_leaf(
&mut self,
ctx: &mut ReverseScanCtx<P>,
start_bound: &RangeBound<'_>,
start_bound_ikey: Option<u64>,
count: &mut usize,
) -> LeafBatchResultBack;
}
impl<P: LeafPolicy> ReverseScanCtx<P> {
#[inline]
pub(crate) fn run_batch_reverse<S: ReverseBatchStrategy<P>>(
&mut self,
strategy: &mut S,
start_bound: &RangeBound<'_>,
start_bound_ikey: Option<u64>,
guard: &LocalGuard<'_>,
) -> usize {
let mut count: usize = 0;
if self.state == ScanStateBack::Emit {
match strategy.emit_initial(self, start_bound, &mut count) {
ReverseBatchAction::Continue => {
self.state = ScanStateBack::FindPrev;
}
ReverseBatchAction::Stopped => return count,
ReverseBatchAction::Exhausted => {
self.flags.mark_exhausted();
return count;
}
}
}
loop {
match self.state {
ScanStateBack::Down => {
self.handle_down_back();
self.state = ScanStateBack::Retry;
self.flags.require_duplicate_check();
continue;
}
ScanStateBack::Up => {
if !self.handle_up_back(guard) {
self.flags.mark_exhausted();
return count;
}
self.state = ScanStateBack::FindPrev;
self.flags.require_duplicate_check();
continue;
}
ScanStateBack::Retry => {
let (new_state, _) = self.reposition_back(guard);
self.state = new_state;
self.flags.require_duplicate_check();
continue;
}
ScanStateBack::Emit | ScanStateBack::FindPrev => {}
}
if self.stack.get_leaf_ptr().is_null() {
if self.layer_stack.is_empty() {
self.flags.mark_exhausted();
return count;
}
self.state = ScanStateBack::Up;
continue;
}
let leaf: &LeafNode15<P> = unsafe { &*self.stack.get_leaf_ptr() };
if leaf.version().is_deleted() {
self.state = ScanStateBack::Retry;
continue;
}
let result = strategy.process_leaf(self, start_bound, start_bound_ikey, &mut count);
match result {
LeafBatchResultBack::LeafExhausted => {
if !self.advance_prev_leaf(guard) {
if self.layer_stack.is_empty() {
self.flags.mark_exhausted();
return count;
}
self.state = ScanStateBack::Up;
}
}
LeafBatchResultBack::LayerEncountered => {
self.state = ScanStateBack::Down;
}
LeafBatchResultBack::VersionChanged => {
self.state = ScanStateBack::Retry;
}
LeafBatchResultBack::Stopped => return count,
LeafBatchResultBack::StartBoundExceeded => {
self.flags.mark_exhausted();
return count;
}
}
}
}
}
pub struct RevIntraLeafRefStrategy<'a, F> {
visitor: &'a mut F,
}
impl<'a, F> RevIntraLeafRefStrategy<'a, F> {
#[inline]
pub const fn new(visitor: &'a mut F) -> Self {
Self { visitor }
}
}
impl<P, F> ReverseBatchStrategy<P> for RevIntraLeafRefStrategy<'_, F>
where
P: crate::policy::RefPolicy,
F: FnMut(&[u8], &P::Value) -> bool,
{
#[inline(always)]
fn emit_initial(
&mut self,
ctx: &mut ReverseScanCtx<P>,
start_bound: &RangeBound<'_>,
count: &mut usize,
) -> ReverseBatchAction {
let Some(snapshot) = ctx.snapshot.take() else {
return ReverseBatchAction::Continue;
};
let key: &[u8] = unsafe { ctx.cursor_key.full_key_unchecked() };
if !start_bound.contains_reverse(key) {
return ReverseBatchAction::Exhausted;
}
*count += 1;
let mut scratch = std::mem::MaybeUninit::uninit();
let value_ref: &P::Value = unsafe { P::output_as_ref_sound(&snapshot.value, &mut scratch) };
let should_continue = (self.visitor)(key, value_ref);
if should_continue {
ReverseBatchAction::Continue
} else {
ReverseBatchAction::Stopped
}
}
#[inline(always)]
fn process_leaf(
&mut self,
ctx: &mut ReverseScanCtx<P>,
start_bound: &RangeBound<'_>,
start_bound_ikey: Option<u64>,
count: &mut usize,
) -> LeafBatchResultBack {
ctx.process_prev_leaf_batch_ptr(start_bound, start_bound_ikey, self.visitor, count)
}
}
pub struct RevIntraLeafCopyStrategy<'a, F> {
visitor: &'a mut F,
}
impl<'a, F> RevIntraLeafCopyStrategy<'a, F> {
#[inline]
pub const fn new(visitor: &'a mut F) -> Self {
Self { visitor }
}
}
impl<P, F> ReverseBatchStrategy<P> for RevIntraLeafCopyStrategy<'_, F>
where
P: LeafPolicy,
F: FnMut(&[u8], P::Output) -> bool,
{
#[inline(always)]
fn emit_initial(
&mut self,
ctx: &mut ReverseScanCtx<P>,
start_bound: &RangeBound<'_>,
count: &mut usize,
) -> ReverseBatchAction {
let Some(snapshot) = ctx.snapshot.take() else {
return ReverseBatchAction::Continue;
};
let key: &[u8] = unsafe { ctx.cursor_key.full_key_unchecked() };
if !start_bound.contains_reverse(key) {
return ReverseBatchAction::Exhausted;
}
*count += 1;
if (self.visitor)(key, snapshot.value) {
ReverseBatchAction::Continue
} else {
ReverseBatchAction::Stopped
}
}
#[inline(always)]
fn process_leaf(
&mut self,
ctx: &mut ReverseScanCtx<P>,
start_bound: &RangeBound<'_>,
start_bound_ikey: Option<u64>,
count: &mut usize,
) -> LeafBatchResultBack {
ctx.process_prev_leaf_batch(start_bound, start_bound_ikey, self.visitor, count)
}
}
pub struct RevValuesOnlyStrategy<'a, F> {
visitor: &'a mut F,
}
impl<'a, F> RevValuesOnlyStrategy<'a, F> {
#[inline]
pub const fn new(visitor: &'a mut F) -> Self {
Self { visitor }
}
}
impl<P, F> ReverseBatchStrategy<P> for RevValuesOnlyStrategy<'_, F>
where
P: LeafPolicy,
F: FnMut(P::Output) -> bool,
{
#[inline(always)]
fn emit_initial(
&mut self,
ctx: &mut ReverseScanCtx<P>,
_start_bound: &RangeBound<'_>,
count: &mut usize,
) -> ReverseBatchAction {
let Some(snapshot) = ctx.snapshot.take() else {
return ReverseBatchAction::Continue;
};
*count += 1;
if (self.visitor)(snapshot.value) {
ReverseBatchAction::Continue
} else {
ReverseBatchAction::Stopped
}
}
#[inline(always)]
fn process_leaf(
&mut self,
ctx: &mut ReverseScanCtx<P>,
start_bound: &RangeBound<'_>,
_start_bound_ikey: Option<u64>,
count: &mut usize,
) -> LeafBatchResultBack {
let start_bound_ikey: Option<u64> = start_bound.extract_ikey_at(ctx.cursor_key.offset());
ctx.process_prev_leaf_batch_values(start_bound_ikey, self.visitor, count)
}
}