use crate::api::errors::{Error, Result};
use crate::engine::simd;
use crate::layout::{
BlobNode, Leaf, Node16, Node256, Node4, Node48, NodeType, Prefix, BLOB_MAX_INLINE,
};
use std::sync::Arc;
use crate::store::{BlobFrameRef, BufferManager, CachedBlob};
use super::cast;
use super::readers::{leaf_extent, resolve_typed};
use super::types::{BlobNodeCrossing, LookupHit, LookupResult};
use super::SearchKey;
#[cfg(test)]
pub(super) fn lookup<'a>(
frame: BlobFrameRef<'a>,
start_slot: u16,
key: &[u8],
) -> Result<LookupResult<'a>> {
descend(frame, start_slot, SearchKey::exact(key), 0)
}
pub(super) fn lookup_at<'a>(
frame: BlobFrameRef<'a>,
start_slot: u16,
key: SearchKey<'_>,
depth: usize,
) -> Result<LookupResult<'a>> {
descend(frame, start_slot, key, depth)
}
pub fn lookup_multi_with<R, F>(
bm: &BufferManager,
root_pin: &Arc<CachedBlob>,
key: SearchKey<'_>,
mut consume: F,
) -> Result<Option<R>>
where
F: FnMut(LookupHit<'_>) -> R,
{
'restart: loop {
let crossing = {
let guard = root_pin.read_optimistic();
let frame = BlobFrameRef::wrap(guard.as_slice());
let root_slot = frame.header().root_slot;
let result = lookup_at(frame, root_slot, key, 0);
if !guard.validate() {
bm.note_optimistic_restart();
continue 'restart;
}
match result {
Err(e) => return Err(e),
Ok(LookupResult::Found(hit)) => return Ok(Some(consume(hit))),
Ok(LookupResult::NotFound) => return Ok(None),
Ok(LookupResult::Crossing(c)) => c,
}
};
let mut current_guid = crossing.child_guid;
let mut depth = crossing.child_depth;
loop {
let pin = bm.pin(current_guid)?;
let guard = pin.read_optimistic();
let frame = BlobFrameRef::wrap(guard.as_slice());
let start_slot = frame.header().root_slot;
let result = lookup_at(frame, start_slot, key, depth);
if !guard.validate() {
bm.note_optimistic_restart();
continue 'restart;
}
match result {
Err(e) => return Err(e),
Ok(LookupResult::Found(hit)) => return Ok(Some(consume(hit))),
Ok(LookupResult::NotFound) => return Ok(None),
Ok(LookupResult::Crossing(c)) => {
current_guid = c.child_guid;
depth = c.child_depth;
}
}
}
}
}
fn descend<'a>(
frame: BlobFrameRef<'a>,
slot: u16,
key: SearchKey<'_>,
depth: usize,
) -> Result<LookupResult<'a>> {
let (ntype, body) = resolve_typed(frame, slot)?;
match ntype {
NodeType::Invalid => Err(Error::node_corrupt(
"walker::descend: hit NodeType::Invalid",
)),
NodeType::EmptyRoot => Ok(LookupResult::NotFound),
NodeType::Leaf => leaf_check(frame, body, key, depth),
NodeType::Prefix => prefix_descend(frame, body, key, depth),
NodeType::Node4 => node4_descend(frame, body, key, depth),
NodeType::Node16 => node16_descend(frame, body, key, depth),
NodeType::Node48 => node48_descend(frame, body, key, depth),
NodeType::Node256 => node256_descend(frame, body, key, depth),
NodeType::Blob => blob_descend(body, key, depth),
}
}
fn blob_descend<'a>(body: &[u8], key: SearchKey<'_>, depth: usize) -> Result<LookupResult<'a>> {
let b = cast::<BlobNode>(body);
let plen = b.prefix_len as usize;
if plen > BLOB_MAX_INLINE {
return Err(Error::node_corrupt(
"walker::blob_descend: prefix_len exceeds inline buffer",
));
}
if !key.range_eq(depth, &b.bytes[..plen]) {
return Ok(LookupResult::NotFound);
}
Ok(LookupResult::Crossing(BlobNodeCrossing {
child_guid: b.child_blob_guid,
child_depth: depth + plen,
}))
}
fn leaf_check<'a>(
frame: BlobFrameRef<'a>,
body: &'a [u8],
key: SearchKey<'_>,
_depth: usize,
) -> Result<LookupResult<'a>> {
let leaf = cast::<Leaf>(body);
if leaf.tombstone != 0 {
return Ok(LookupResult::NotFound);
}
let (leaf_key, value) = leaf_extent(frame, leaf)?;
if !key.eq_slice(leaf_key) {
return Ok(LookupResult::NotFound);
}
Ok(LookupResult::Found(LookupHit {
value,
seq: leaf.seq,
}))
}
fn prefix_descend<'a>(
frame: BlobFrameRef<'a>,
body: &'a [u8],
key: SearchKey<'_>,
depth: usize,
) -> Result<LookupResult<'a>> {
let p = cast::<Prefix>(body);
let plen = p.prefix_len as usize;
if plen > p.bytes.len() {
return Err(Error::node_corrupt(
"walker::prefix_descend: prefix_len exceeds inline buffer",
));
}
if !key.range_eq(depth, &p.bytes[..plen]) {
return Ok(LookupResult::NotFound);
}
descend(frame, p.child as u16, key, depth + plen)
}
fn node4_descend<'a>(
frame: BlobFrameRef<'a>,
body: &'a [u8],
key: SearchKey<'_>,
depth: usize,
) -> Result<LookupResult<'a>> {
let n = cast::<Node4>(body);
let Some(byte) = key.byte_at(depth) else {
return Ok(LookupResult::NotFound);
};
let count = (n.count as usize).min(4);
for i in 0..count {
if n.keys[i] == byte {
return descend(frame, n.children[i] as u16, key, depth + 1);
}
if n.keys[i] > byte {
break;
}
}
Ok(LookupResult::NotFound)
}
fn node16_descend<'a>(
frame: BlobFrameRef<'a>,
body: &'a [u8],
key: SearchKey<'_>,
depth: usize,
) -> Result<LookupResult<'a>> {
let n = cast::<Node16>(body);
let Some(byte) = key.byte_at(depth) else {
return Ok(LookupResult::NotFound);
};
if let Some(i) = simd::node16_find_byte(&n.keys, n.count, byte) {
return descend(frame, n.children[i as usize] as u16, key, depth + 1);
}
Ok(LookupResult::NotFound)
}
fn node48_descend<'a>(
frame: BlobFrameRef<'a>,
body: &'a [u8],
key: SearchKey<'_>,
depth: usize,
) -> Result<LookupResult<'a>> {
let n = cast::<Node48>(body);
let Some(byte) = key.byte_at(depth) else {
return Ok(LookupResult::NotFound);
};
let idx = n.index[byte as usize];
if idx == 0 {
return Ok(LookupResult::NotFound);
}
let ci = idx as usize - 1;
if ci >= 48 {
return Err(Error::node_corrupt(
"walker::node48_descend: child index out of range",
));
}
descend(frame, n.children[ci] as u16, key, depth + 1)
}
fn node256_descend<'a>(
frame: BlobFrameRef<'a>,
body: &'a [u8],
key: SearchKey<'_>,
depth: usize,
) -> Result<LookupResult<'a>> {
let n = cast::<Node256>(body);
let Some(byte) = key.byte_at(depth) else {
return Ok(LookupResult::NotFound);
};
let slot = n.children[byte as usize];
if slot == 0 {
return Ok(LookupResult::NotFound);
}
descend(frame, slot as u16, key, depth + 1)
}