mod adapters;
mod batch_forward;
mod batch_reverse;
pub(super) mod iter_flags;
mod range_bound;
mod scan_entry;
#[cfg(test)]
mod unit_tests;
pub use adapters::{KeysIter, ValuesIter};
pub use range_bound::RangeBound;
pub use scan_entry::ScanEntry;
use std::fmt::{self as StdFmt, Debug, Formatter};
use std::iter::FusedIterator;
use std::marker::PhantomData;
use seize::LocalGuard;
use crate::alloc_trait::TreeAllocator;
use crate::key::IKEY_SIZE;
use crate::policy::LeafPolicy;
use crate::tree::MassTreeGeneric;
#[cfg(debug_assertions)]
use super::cursor_key::CursorDebugState;
use super::cursor_key::CursorKey;
use super::forward_ctx::ForwardScanCtx;
use super::reverse_ctx::ReverseScanCtx;
use super::scan_state::{LayerContext, ScanSnapshot, ScanState, ScanStateBack, StepResult};
pub struct RangeIter<'a, 'g, P, A>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
pub(super) guard: &'g LocalGuard<'a>,
pub(super) fwd: ForwardScanCtx<P>,
pub(super) rev: Option<ReverseScanCtx<P>>,
pub(super) tree_root: *const u8,
pub(super) start_bound: RangeBound<'a>,
pub(super) end_bound: RangeBound<'a>,
_marker: PhantomData<&'a A>,
}
impl<P, A> Debug for RangeIter<'_, '_, P, A>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> StdFmt::Result {
let mut s = f.debug_struct("RangeIter");
s.field("exhausted", &self.fwd.flags.exhausted())
.field("initialized", &self.fwd.flags.initialized())
.field("state", &self.fwd.state);
if let Some(rev) = &self.rev {
s.field("back_exhausted", &rev.flags.exhausted())
.field("back_initialized", &rev.flags.initialized())
.field("back_state", &rev.state);
} else {
s.field("reverse", &"forward-only");
}
s.finish_non_exhaustive()
}
}
impl<'a, 'g, P, A> RangeIter<'a, 'g, P, A>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
pub(crate) fn new_forward_only(
tree: &'a MassTreeGeneric<P, A>,
start: RangeBound<'a>,
end: RangeBound<'a>,
guard: &'g LocalGuard<'a>,
) -> Self {
let (start_key, emit_equal) = start.to_start_params();
let cursor_key = CursorKey::from_slice(start_key);
let root = tree.load_root_ptr_generic(guard);
let single_layer_mode = {
let start_ok = start_key.len() <= IKEY_SIZE;
let end_ok = match &end {
RangeBound::Unbounded => true,
RangeBound::Included(k) | RangeBound::Excluded(k) => k.len() <= IKEY_SIZE,
};
start_ok && end_ok
};
Self {
guard,
fwd: ForwardScanCtx::new(root, cursor_key, emit_equal, single_layer_mode),
rev: None,
tree_root: root,
start_bound: start,
end_bound: end,
_marker: PhantomData,
}
}
pub(crate) fn new_forward_only_from_root(
layer_root: *const u8,
cursor_key: CursorKey,
start_bound: RangeBound<'a>,
end_bound: RangeBound<'a>,
guard: &'g LocalGuard<'a>,
) -> Self {
Self {
guard,
fwd: ForwardScanCtx::new(layer_root, cursor_key, true, false),
rev: None,
tree_root: layer_root,
start_bound,
end_bound,
_marker: PhantomData,
}
}
pub(crate) fn new(
tree: &'a MassTreeGeneric<P, A>,
start: RangeBound<'a>,
end: RangeBound<'a>,
guard: &'g LocalGuard<'a>,
) -> Self {
let (start_key, emit_equal) = start.to_start_params();
let cursor_key = CursorKey::from_slice(start_key);
let root = tree.load_root_ptr_generic(guard);
let single_layer_mode = {
let start_ok = start_key.len() <= IKEY_SIZE;
let end_ok = match &end {
RangeBound::Unbounded => true,
RangeBound::Included(k) | RangeBound::Excluded(k) => k.len() <= IKEY_SIZE,
};
start_ok && end_ok
};
let back_emit_equal = match &end {
RangeBound::Unbounded | RangeBound::Included(_) => true,
RangeBound::Excluded(_) => false,
};
Self {
guard,
fwd: ForwardScanCtx::new(root, cursor_key, emit_equal, single_layer_mode),
rev: Some(ReverseScanCtx::new_with_bound(
root,
CursorKey::for_reverse_scan(&end),
back_emit_equal,
)),
tree_root: root,
start_bound: start,
end_bound: end,
_marker: PhantomData,
}
}
#[inline(always)]
const fn back_exhausted(&self) -> bool {
match &self.rev {
Some(rev) => rev.flags.exhausted(),
None => false,
}
}
#[inline(always)]
const fn back_initialized(&self) -> bool {
match &self.rev {
Some(rev) => rev.flags.initialized(),
None => false,
}
}
#[inline(always)]
const fn mark_back_exhausted(&mut self) {
if let Some(rev) = &mut self.rev {
rev.flags.mark_exhausted();
}
}
#[inline(always)]
const fn disable_single_layer_mode(&mut self) {
self.fwd.flags.disable_single_layer_mode();
}
pub(super) fn initialize(&mut self) {
if self.fwd.flags.initialized() {
return;
}
self.fwd.flags.mark_initialized();
if self.fwd.stack.root().is_null() {
self.fwd.flags.mark_exhausted();
return;
}
loop {
let parent_root: *const u8 = self.fwd.stack.root();
let (state, snapshot) = self.fwd.find_initial(
self.fwd.stack.root(),
self.fwd.flags.emit_equal(),
self.guard,
);
match state {
ScanState::Down => {
self.disable_single_layer_mode();
self.fwd
.layer_stack
.push(LayerContext::new(parent_root, self.fwd.stack.leaf_ptr()));
if self.fwd.cursor_key.has_suffix() {
self.fwd.cursor_key.shift();
} else {
self.fwd.cursor_key.shift_clear();
}
}
ScanState::Retry => {
}
_ => {
self.fwd.state = state;
self.fwd.snapshot = snapshot;
break;
}
}
}
}
#[inline]
#[expect(
clippy::too_many_lines,
reason = "State machine with fused optimization and debug instrumentation"
)]
fn advance(&mut self) -> Option<ScanEntry<P::Output>> {
loop {
match self.fwd.state {
ScanState::Emit => {
let key = unsafe { self.fwd.cursor_key.full_key_unchecked() };
if !self.end_bound.contains(key) {
self.fwd.flags.mark_exhausted();
return None;
}
if self.back_initialized() && !self.back_exhausted() {
let back_key = unsafe {
self.rev
.as_ref()
.unwrap_unchecked()
.cursor_key
.full_key_unchecked()
};
if key >= back_key {
self.fwd.flags.mark_exhausted();
self.mark_back_exhausted();
return None;
}
}
#[cfg(debug_assertions)]
#[allow(
clippy::panic,
reason = "Intentional panic for debug-only ordering violation detection"
)]
{
if let Some(ref last_key) = self.fwd.debug_last_emitted_key
&& key <= last_key.as_slice()
{
let current_state = self.fwd.cursor_key.debug_state();
let last_state = self.fwd.debug_last_cursor_state.as_ref();
eprintln!("\n=== ORDERING VIOLATION DETECTED ===");
eprintln!("Current key: {:?}", String::from_utf8_lossy(key));
eprintln!("Last key: {:?}", String::from_utf8_lossy(last_key));
eprintln!("Current key bytes: {key:?}");
eprintln!("Last key bytes: {last_key:?}");
eprintln!("Current cursor: {current_state}");
if let Some(last) = last_state {
eprintln!("Last cursor: {last}");
}
eprintln!("=== END ORDERING VIOLATION ===\n");
panic!(
"Scan ordering violation: emitted key {:?} is not > last emitted key {:?}",
String::from_utf8_lossy(key),
String::from_utf8_lossy(last_key)
);
}
self.fwd.debug_last_emitted_key = Some(key.to_vec());
self.fwd.debug_last_cursor_state = Some(self.fwd.cursor_key.debug_state());
}
debug_assert!(
self.fwd.snapshot.is_some(),
"Emit state entered without snapshot - state machine bug"
);
let snapshot = self.fwd.snapshot.take()?;
let entry = ScanEntry::new(key.to_vec(), snapshot.value);
self.fwd.state = ScanState::FindNext;
return Some(entry);
}
ScanState::FindNext => {
let (new_state, snapshot) = if self.fwd.flags.needs_duplicate_check() {
self.fwd.flags.clear_duplicate_check();
self.fwd.find_next_with_dup_check(self.guard)
} else {
self.fwd.find_next(self.guard)
};
if new_state == ScanState::Emit {
let key: &[u8] = unsafe { self.fwd.cursor_key.full_key_unchecked() };
if !self.end_bound.contains(key) {
self.fwd.flags.mark_exhausted();
return None;
}
if self.back_initialized() && !self.back_exhausted() {
let back_key: &[u8] = unsafe {
self.rev
.as_ref()
.unwrap_unchecked()
.cursor_key
.full_key_unchecked()
};
if key >= back_key {
self.fwd.flags.mark_exhausted();
self.mark_back_exhausted();
return None;
}
}
#[cfg(debug_assertions)]
#[allow(
clippy::panic,
reason = "Intentional panic for debug-only ordering violation detection"
)]
{
if let Some(ref last_key) = self.fwd.debug_last_emitted_key
&& key <= last_key.as_slice()
{
let current_state: CursorDebugState =
self.fwd.cursor_key.debug_state();
let last_state: Option<&CursorDebugState> =
self.fwd.debug_last_cursor_state.as_ref();
eprintln!("\n=== ORDERING VIOLATION DETECTED (FUSED) ===");
eprintln!("Current key: {:?}", String::from_utf8_lossy(key));
eprintln!("Last key: {:?}", String::from_utf8_lossy(last_key));
eprintln!("Current key bytes: {key:?}");
eprintln!("Last key bytes: {last_key:?}");
eprintln!("Current cursor: {current_state}");
if let Some(last) = last_state {
eprintln!("Last cursor: {last}");
}
eprintln!("=== END ORDERING VIOLATION ===\n");
panic!(
"Scan ordering violation: emitted key {:?} is not > last emitted key {:?}",
String::from_utf8_lossy(key),
String::from_utf8_lossy(last_key)
);
}
self.fwd.debug_last_emitted_key = Some(key.to_vec());
self.fwd.debug_last_cursor_state =
Some(self.fwd.cursor_key.debug_state());
}
let snapshot: ScanSnapshot<P> = snapshot?;
return Some(ScanEntry::new(key.to_vec(), snapshot.value));
}
self.fwd.state = new_state;
self.fwd.snapshot = snapshot;
}
ScanState::Down | ScanState::Up | ScanState::Retry => {
if self.fwd.step_transitions(self.guard) == StepResult::Exhausted {
return None;
}
}
}
}
}
pub(super) fn initialize_back(&mut self) {
if self.back_initialized() {
return;
}
if self.rev.is_none() {
let back_emit_equal = matches!(
self.end_bound,
RangeBound::Unbounded | RangeBound::Included(_)
);
self.rev = Some(ReverseScanCtx::new_with_bound(
self.tree_root,
CursorKey::for_reverse_scan(&self.end_bound),
back_emit_equal,
));
}
let rev = unsafe { self.rev.as_mut().unwrap_unchecked() };
let back_emit_equal = rev.flags.emit_equal();
rev.flags.mark_initialized();
if self.tree_root.is_null() {
rev.flags.mark_exhausted();
return;
}
if self.end_bound.is_unbounded() {
rev.flags.set_upper_bound();
}
loop {
let (state, snapshot) =
rev.find_initial_reverse(rev.stack.get_root(), back_emit_equal, self.guard);
match state {
ScanStateBack::Down => {
self.fwd.flags.disable_single_layer_mode();
rev.handle_down_back();
}
ScanStateBack::Retry => {
}
_ => {
if !rev.layer_stack.is_empty() {
self.fwd.flags.disable_single_layer_mode();
}
rev.state = state;
rev.snapshot = snapshot;
if rev.state.is_emit() {
rev.flags.clear_upper_bound();
}
break;
}
}
}
}
#[inline]
#[expect(
clippy::too_many_lines,
reason = "State machine benefits from unified logic"
)]
fn advance_back(&mut self) -> Option<ScanEntry<P::Output>> {
let rev = unsafe { self.rev.as_mut().unwrap_unchecked() };
loop {
match rev.state {
ScanStateBack::Emit => {
let key: &[u8] = unsafe { rev.cursor_key.full_key_unchecked() };
if !self.start_bound.contains_reverse(key) {
rev.flags.mark_exhausted();
return None;
}
if self.fwd.flags.initialized() && !self.fwd.flags.exhausted() {
let front_key: &[u8] = unsafe { self.fwd.cursor_key.full_key_unchecked() };
if key <= front_key {
rev.flags.mark_exhausted();
self.fwd.flags.mark_exhausted();
return None;
}
}
let snapshot: ScanSnapshot<P> = rev.snapshot.take()?;
let entry = ScanEntry::new(key.to_vec(), snapshot.value);
rev.flags.clear_upper_bound();
rev.state = ScanStateBack::FindPrev;
return Some(entry);
}
ScanStateBack::FindPrev => {
if self.fwd.flags.single_layer_mode() {
let needs_dup_check = rev.flags.needs_duplicate_check();
if needs_dup_check {
rev.flags.clear_duplicate_check();
}
let (new_state, snapshot) =
rev.find_prev_single_layer(self.guard, needs_dup_check);
rev.state = new_state;
rev.snapshot = snapshot;
match new_state {
ScanStateBack::FindPrev => {
if rev.stack.get_leaf_ptr().is_null() {
rev.flags.mark_exhausted();
return None;
}
}
ScanStateBack::Down => {
self.fwd.flags.disable_single_layer_mode();
rev.state = ScanStateBack::FindPrev;
}
_ => {}
}
continue;
}
let (new_state, snapshot) = if rev.flags.needs_duplicate_check() {
rev.flags.clear_duplicate_check();
rev.find_prev_with_dup_check(self.guard)
} else {
rev.find_prev(self.guard)
};
rev.state = new_state;
rev.snapshot = snapshot;
}
ScanStateBack::Down => {
self.fwd.flags.disable_single_layer_mode();
rev.handle_down_back();
let (state, snapshot) = rev.find_initial_reverse(
rev.stack.get_root(),
false, self.guard,
);
rev.state = state;
rev.snapshot = snapshot;
rev.flags.require_duplicate_check();
}
ScanStateBack::Up => {
if !rev.handle_up_back(self.guard) {
rev.flags.mark_exhausted();
return None;
}
rev.state = ScanStateBack::FindPrev;
rev.flags.require_duplicate_check();
}
ScanStateBack::Retry => {
let (new_state, _) = rev.reposition_back(self.guard);
rev.state = new_state;
rev.flags.require_duplicate_check();
}
}
}
}
pub const fn keys(self) -> KeysIter<'a, 'g, P, A> {
KeysIter { inner: self }
}
pub const fn values(self) -> ValuesIter<'a, 'g, P, A> {
ValuesIter { inner: self }
}
}
impl<P, A> Iterator for RangeIter<'_, '_, P, A>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
type Item = ScanEntry<P::Output>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.fwd.flags.exhausted() {
return None;
}
if !self.fwd.flags.initialized() {
self.initialize();
if self.fwd.flags.exhausted() {
return None;
}
}
if self.back_initialized() && !self.back_exhausted() {
let front_key: &[u8] = unsafe { self.fwd.cursor_key.full_key_unchecked() };
let back_key: &[u8] = unsafe {
self.rev
.as_ref()
.unwrap_unchecked()
.cursor_key
.full_key_unchecked()
};
if front_key >= back_key {
self.fwd.flags.mark_exhausted();
self.mark_back_exhausted();
return None;
}
}
self.advance()
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
if self.fwd.flags.exhausted() {
(0, Some(0))
} else {
(0, None)
}
}
}
impl<P, A> DoubleEndedIterator for RangeIter<'_, '_, P, A>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
#[inline]
fn next_back(&mut self) -> Option<Self::Item> {
if self.back_exhausted() {
return None;
}
if !self.back_initialized() {
self.initialize_back();
if self.back_exhausted() {
return None;
}
}
if self.fwd.flags.initialized() && !self.fwd.flags.exhausted() {
let front_key: &[u8] = unsafe { self.fwd.cursor_key.full_key_unchecked() };
let back_key: &[u8] = unsafe {
self.rev
.as_ref()
.unwrap_unchecked()
.cursor_key
.full_key_unchecked()
};
if back_key <= front_key {
self.mark_back_exhausted();
self.fwd.flags.mark_exhausted();
return None;
}
}
self.advance_back()
}
}
impl<P, A> FusedIterator for RangeIter<'_, '_, P, A>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
}