use std::sync::Arc;
use crate::api::errors::{Error, Result};
use crate::layout::{BlobGuid, BlobNode, NodeType, BLOB_MAX_INLINE, PREFIX_MAX_INLINE};
use crate::store::{BlobFrameRef, BufferManager, CachedBlob};
use super::cast;
use super::readers::{
ntype_of, read_leaf_kv, read_node16, read_node256, read_node4, read_node48, read_prefix,
};
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum RangeEntry {
Key {
key: Vec<u8>,
value: Vec<u8>,
},
CommonPrefix(Vec<u8>),
}
#[must_use = "RangeBuilder is lazy — call `.into_iter()` or use it in a `for` loop"]
pub struct RangeBuilder {
bm: Arc<BufferManager>,
root_guid: BlobGuid,
prefix: Vec<u8>,
start_after: Option<Vec<u8>>,
delimiter: Option<u8>,
}
impl RangeBuilder {
pub fn new(bm: Arc<BufferManager>, root_guid: BlobGuid) -> Self {
Self {
bm,
root_guid,
prefix: Vec::new(),
start_after: None,
delimiter: None,
}
}
pub fn prefix(mut self, prefix: &[u8]) -> Self {
self.prefix = prefix.to_vec();
self
}
pub fn start_after(mut self, key: &[u8]) -> Self {
self.start_after = Some(key.to_vec());
self
}
pub fn delimiter(mut self, byte: u8) -> Self {
self.delimiter = Some(byte);
self
}
}
impl IntoIterator for RangeBuilder {
type Item = Result<RangeEntry>;
type IntoIter = RangeIter;
fn into_iter(self) -> RangeIter {
RangeIter {
bm: self.bm,
root_guid: self.root_guid,
stack: Vec::new(),
curr_key: Vec::new(),
anchor_depth: 0,
prefix: self.prefix,
start_after: self.start_after,
delimiter: self.delimiter,
last_common_prefix: None,
initialized: false,
terminated: false,
}
}
}
pub struct RangeIter {
bm: Arc<BufferManager>,
root_guid: BlobGuid,
stack: Vec<Frame>,
curr_key: Vec<u8>,
anchor_depth: usize,
prefix: Vec<u8>,
start_after: Option<Vec<u8>>,
delimiter: Option<u8>,
last_common_prefix: Option<Vec<u8>>,
initialized: bool,
terminated: bool,
}
struct Frame {
pin: Arc<CachedBlob>,
blob_guid: BlobGuid,
slot: u16,
ntype: NodeType,
next: u16,
pushed_bytes: u16,
}
impl Iterator for RangeIter {
type Item = Result<RangeEntry>;
fn next(&mut self) -> Option<Result<RangeEntry>> {
if self.terminated {
return None;
}
if !self.initialized {
if let Err(e) = self.init_descent() {
self.terminated = true;
return Some(Err(e));
}
self.initialized = true;
if self.stack.is_empty() {
self.terminated = true;
return None;
}
}
loop {
match self.advance_to_next_leaf() {
Ok(None) => {
self.terminated = true;
return None;
}
Err(e) => {
self.terminated = true;
return Some(Err(e));
}
Ok(Some((stored_key, value))) => {
let user_key: Vec<u8> = if stored_key.last() == Some(&0) {
stored_key[..stored_key.len() - 1].to_vec()
} else {
stored_key
};
if !user_key.starts_with(&self.prefix) {
continue;
}
if let Some(after) = &self.start_after {
if user_key.as_slice() <= after.as_slice() {
continue;
}
}
if let Some(d) = self.delimiter {
let rest = &user_key[self.prefix.len()..];
if let Some(idx) = rest.iter().position(|b| *b == d) {
let common: Vec<u8> = user_key[..=self.prefix.len() + idx].to_vec();
if self.last_common_prefix.as_deref() == Some(common.as_slice()) {
continue;
}
self.last_common_prefix = Some(common.clone());
return Some(Ok(RangeEntry::CommonPrefix(common)));
}
self.last_common_prefix = None;
}
return Some(Ok(RangeEntry::Key {
key: user_key,
value,
}));
}
}
}
}
}
impl RangeIter {
fn init_descent(&mut self) -> Result<()> {
let root_pin = self.bm.pin(self.root_guid)?;
let (root_slot, root_ntype) = {
let guard = root_pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
let slot = frame.header().root_slot;
(slot, ntype_of(frame, slot)?)
};
self.stack.push(Frame {
pin: root_pin,
blob_guid: self.root_guid,
slot: root_slot,
ntype: root_ntype,
next: 0,
pushed_bytes: 0,
});
while self.curr_key.len() < self.prefix.len() {
let top_ntype = self.stack.last().expect("stack non-empty").ntype;
match top_ntype {
NodeType::Prefix => {
if !self.descend_prefix_for_anchor()? {
self.stack.clear();
return Ok(());
}
}
NodeType::Blob => {
if !self.descend_blob_for_anchor()? {
self.stack.clear();
return Ok(());
}
}
NodeType::Node4 | NodeType::Node16 | NodeType::Node48 | NodeType::Node256 => {
let need = self.prefix[self.curr_key.len()];
if !self.descend_inner_for_anchor(need)? {
self.stack.clear();
return Ok(());
}
}
NodeType::Leaf => {
let (stored_key, _v) = {
let top = self.stack.last().unwrap();
let guard = top.pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
read_leaf_kv(frame, top.slot)?
};
let user_key: &[u8] = if stored_key.last() == Some(&0) {
&stored_key[..stored_key.len() - 1]
} else {
&stored_key[..]
};
if !user_key.starts_with(&self.prefix) {
self.stack.clear();
return Ok(());
}
break;
}
NodeType::EmptyRoot | NodeType::Invalid => {
self.stack.clear();
return Ok(());
}
}
}
self.anchor_depth = self.stack.len();
Ok(())
}
fn descend_prefix_for_anchor(&mut self) -> Result<bool> {
let (child_slot, p_bytes) = {
let top = self.stack.last().expect("stack non-empty");
let guard = top.pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
let p = read_prefix(frame, top.slot)?;
let plen = (p.prefix_len as usize).min(PREFIX_MAX_INLINE);
(p.child as u16, p.bytes[..plen].to_vec())
};
let remaining = &self.prefix[self.curr_key.len()..];
let cmp_len = p_bytes.len().min(remaining.len());
if p_bytes[..cmp_len] != remaining[..cmp_len] {
return Ok(false);
}
let (top_pin, top_blob_guid) = {
let top = self.stack.last().expect("stack non-empty");
(top.pin.clone(), top.blob_guid)
};
self.stack.last_mut().unwrap().next = 1;
self.curr_key.extend_from_slice(&p_bytes);
let child_ntype = {
let guard = top_pin.read();
ntype_of(BlobFrameRef::wrap(guard.as_slice()), child_slot)?
};
let pushed = p_bytes.len() as u16;
self.stack.push(Frame {
pin: top_pin,
blob_guid: top_blob_guid,
slot: child_slot,
ntype: child_ntype,
next: 0,
pushed_bytes: pushed,
});
Ok(true)
}
fn descend_blob_for_anchor(&mut self) -> Result<bool> {
let (child_guid, child_slot, p_bytes) = {
let top = self.stack.last().expect("stack non-empty");
let guard = top.pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
let body = frame.body_of_slot(top.slot).ok_or(Error::NodeCorrupt {
context: "range::descend_blob_for_anchor: body resolution",
})?;
let b = cast::<BlobNode>(body);
let plen = (b.prefix_len as usize).min(BLOB_MAX_INLINE);
(
b.child_blob_guid,
b.child_entry_ptr as u16,
b.bytes[..plen].to_vec(),
)
};
let remaining = &self.prefix[self.curr_key.len()..];
let cmp_len = p_bytes.len().min(remaining.len());
if !p_bytes.is_empty() && p_bytes[..cmp_len] != remaining[..cmp_len] {
return Ok(false);
}
self.stack.last_mut().unwrap().next = 1;
self.curr_key.extend_from_slice(&p_bytes);
let child_pin = self.bm.pin(child_guid)?;
let child_ntype = {
let guard = child_pin.read();
ntype_of(BlobFrameRef::wrap(guard.as_slice()), child_slot)?
};
let pushed = p_bytes.len() as u16;
self.stack.push(Frame {
pin: child_pin,
blob_guid: child_guid,
slot: child_slot,
ntype: child_ntype,
next: 0,
pushed_bytes: pushed,
});
Ok(true)
}
fn descend_inner_for_anchor(&mut self, need: u8) -> Result<bool> {
let (top_pin, top_blob_guid, top_slot, top_ntype, child_slot, cursor_after) = {
let top = self.stack.last().expect("stack non-empty");
let guard = top.pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
let Some((slot, cursor)) =
find_inner_child_and_cursor(frame, top.slot, top.ntype, need)?
else {
return Ok(false);
};
(
top.pin.clone(),
top.blob_guid,
top.slot,
top.ntype,
slot,
cursor,
)
};
let _ = (top_slot, top_ntype); self.stack.last_mut().unwrap().next = cursor_after;
self.curr_key.push(need);
let child_ntype = {
let guard = top_pin.read();
ntype_of(BlobFrameRef::wrap(guard.as_slice()), child_slot)?
};
self.stack.push(Frame {
pin: top_pin,
blob_guid: top_blob_guid,
slot: child_slot,
ntype: child_ntype,
next: 0,
pushed_bytes: 1,
});
Ok(true)
}
fn advance_to_next_leaf(&mut self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
loop {
if self.stack.len() < self.anchor_depth {
return Ok(None);
}
let Some(top) = self.stack.last_mut() else {
return Ok(None);
};
let top_ntype = top.ntype;
match top_ntype {
NodeType::Leaf => {
if top.next == 0 {
top.next = 1;
let (key, value) = {
let guard = top.pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
read_leaf_kv(frame, top.slot)?
};
return Ok(Some((key, value)));
}
self.pop_frame();
}
NodeType::EmptyRoot | NodeType::Invalid => {
self.pop_frame();
}
NodeType::Prefix => {
if top.next == 0 {
top.next = 1;
let (top_pin, top_blob_guid) = (top.pin.clone(), top.blob_guid);
let (child_slot, p_bytes) = {
let guard = top_pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
let p = read_prefix(frame, top.slot)?;
let plen = (p.prefix_len as usize).min(PREFIX_MAX_INLINE);
(p.child as u16, p.bytes[..plen].to_vec())
};
self.push_within_blob(top_pin, top_blob_guid, child_slot, &p_bytes)?;
} else {
self.pop_frame();
}
}
NodeType::Blob => {
if top.next == 0 {
top.next = 1;
let (child_guid, child_slot, p_bytes) = {
let guard = top.pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
let body = frame.body_of_slot(top.slot).ok_or(Error::NodeCorrupt {
context: "range::advance: BlobNode body resolution",
})?;
let b = cast::<BlobNode>(body);
let plen = (b.prefix_len as usize).min(BLOB_MAX_INLINE);
(
b.child_blob_guid,
b.child_entry_ptr as u16,
b.bytes[..plen].to_vec(),
)
};
self.push_in_other_blob(child_guid, child_slot, &p_bytes)?;
} else {
self.pop_frame();
}
}
NodeType::Node4 | NodeType::Node16 | NodeType::Node48 | NodeType::Node256 => {
let (top_pin, top_blob_guid, top_slot, cursor) =
(top.pin.clone(), top.blob_guid, top.slot, top.next);
let result = {
let guard = top_pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
next_inner_child_from(frame, top_slot, top_ntype, cursor)?
};
match result {
None => self.pop_frame(),
Some((byte, child_slot, next_cursor)) => {
self.stack.last_mut().unwrap().next = next_cursor;
let child_ntype = {
let guard = top_pin.read();
ntype_of(BlobFrameRef::wrap(guard.as_slice()), child_slot)?
};
self.curr_key.push(byte);
self.stack.push(Frame {
pin: top_pin,
blob_guid: top_blob_guid,
slot: child_slot,
ntype: child_ntype,
next: 0,
pushed_bytes: 1,
});
}
}
}
}
}
}
fn push_within_blob(
&mut self,
pin: Arc<CachedBlob>,
blob_guid: BlobGuid,
child_slot: u16,
prefix_bytes: &[u8],
) -> Result<()> {
let child_ntype = {
let guard = pin.read();
ntype_of(BlobFrameRef::wrap(guard.as_slice()), child_slot)?
};
self.curr_key.extend_from_slice(prefix_bytes);
self.stack.push(Frame {
pin,
blob_guid,
slot: child_slot,
ntype: child_ntype,
next: 0,
pushed_bytes: prefix_bytes.len() as u16,
});
Ok(())
}
fn push_in_other_blob(
&mut self,
child_guid: BlobGuid,
child_slot: u16,
prefix_bytes: &[u8],
) -> Result<()> {
let child_pin = self.bm.pin(child_guid)?;
let child_ntype = {
let guard = child_pin.read();
ntype_of(BlobFrameRef::wrap(guard.as_slice()), child_slot)?
};
self.curr_key.extend_from_slice(prefix_bytes);
self.stack.push(Frame {
pin: child_pin,
blob_guid: child_guid,
slot: child_slot,
ntype: child_ntype,
next: 0,
pushed_bytes: prefix_bytes.len() as u16,
});
Ok(())
}
fn pop_frame(&mut self) {
let Some(f) = self.stack.pop() else { return };
let new_len = self.curr_key.len().saturating_sub(f.pushed_bytes as usize);
self.curr_key.truncate(new_len);
}
}
fn find_inner_child_and_cursor(
frame: BlobFrameRef<'_>,
slot: u16,
ntype: NodeType,
byte: u8,
) -> Result<Option<(u16, u16)>> {
match ntype {
NodeType::Node4 => {
let n = read_node4(frame, slot)?;
let count = (n.count as usize).min(4);
for i in 0..count {
if n.keys[i] == byte {
return Ok(Some((n.children[i] as u16, (i + 1) as u16)));
}
if n.keys[i] > byte {
return Ok(None);
}
}
Ok(None)
}
NodeType::Node16 => {
let n = read_node16(frame, slot)?;
let count = (n.count as usize).min(16);
for i in 0..count {
if n.keys[i] == byte {
return Ok(Some((n.children[i] as u16, (i + 1) as u16)));
}
if n.keys[i] > byte {
return Ok(None);
}
}
Ok(None)
}
NodeType::Node48 => {
let n = read_node48(frame, slot)?;
let idx = n.index[byte as usize];
if idx == 0 {
return Ok(None);
}
let ci = idx as usize - 1;
if ci >= 48 {
return Err(Error::NodeCorrupt {
context: "range::find_inner_child: Node48 index out of range",
});
}
Ok(Some((n.children[ci] as u16, u16::from(byte) + 1)))
}
NodeType::Node256 => {
let n = read_node256(frame, slot)?;
let s = n.children[byte as usize];
if s == 0 {
return Ok(None);
}
Ok(Some((s as u16, u16::from(byte) + 1)))
}
_ => Err(Error::NodeCorrupt {
context: "range::find_inner_child: not an inner node",
}),
}
}
fn next_inner_child_from(
frame: BlobFrameRef<'_>,
slot: u16,
ntype: NodeType,
from: u16,
) -> Result<Option<(u8, u16, u16)>> {
match ntype {
NodeType::Node4 => {
let n = read_node4(frame, slot)?;
let count = (n.count as usize).min(4);
let i = from as usize;
if i >= count {
return Ok(None);
}
Ok(Some((n.keys[i], n.children[i] as u16, (i + 1) as u16)))
}
NodeType::Node16 => {
let n = read_node16(frame, slot)?;
let count = (n.count as usize).min(16);
let i = from as usize;
if i >= count {
return Ok(None);
}
Ok(Some((n.keys[i], n.children[i] as u16, (i + 1) as u16)))
}
NodeType::Node48 => {
let n = read_node48(frame, slot)?;
let start = (from as usize).min(256);
for b in start..256 {
let idx = n.index[b];
if idx == 0 {
continue;
}
let ci = idx as usize - 1;
if ci >= 48 {
return Err(Error::NodeCorrupt {
context: "range::next_inner_child: Node48 index out of range",
});
}
return Ok(Some((b as u8, n.children[ci] as u16, (b + 1) as u16)));
}
Ok(None)
}
NodeType::Node256 => {
let n = read_node256(frame, slot)?;
let start = (from as usize).min(256);
for b in start..256 {
let s = n.children[b];
if s == 0 {
continue;
}
return Ok(Some((b as u8, s as u16, (b + 1) as u16)));
}
Ok(None)
}
_ => Err(Error::NodeCorrupt {
context: "range::next_inner_child: not an inner node",
}),
}
}