use crate::api::errors::{Error, Result};
use crate::engine::simd;
use crate::layout::{
BlobGuid, BlobNode, Leaf, Node16, Node256, Node4, Node48, NodeType, Prefix, BLOB_MAX_INLINE,
};
use crate::store::{BlobFrameRef, BufferManager};
use super::cast;
use super::readers::{leaf_extent, resolve_typed};
use super::types::{BlobNodeCrossing, LookupResult};
pub fn lookup<'a>(
frame: BlobFrameRef<'a>,
start_slot: u16,
key: &[u8],
) -> Result<LookupResult<'a>> {
descend(frame, start_slot, key, 0)
}
pub fn lookup_at<'a>(
frame: BlobFrameRef<'a>,
start_slot: u16,
key: &[u8],
depth: usize,
) -> Result<LookupResult<'a>> {
descend(frame, start_slot, key, depth)
}
pub fn lookup_multi(
bm: &BufferManager,
root_guid: BlobGuid,
key: &[u8],
) -> Result<Option<Vec<u8>>> {
'restart: loop {
let root_pin = bm.pin(root_guid)?;
let crossing = {
let guard = root_pin.read_optimistic();
let frame = BlobFrameRef::wrap(guard.as_slice());
let root_slot = frame.header().root_slot;
let result = lookup_at(frame, root_slot, key, 0);
if !guard.validate() {
continue 'restart;
}
match result {
Err(e) => return Err(e),
Ok(LookupResult::Found(v)) => return Ok(Some(v.to_vec())),
Ok(LookupResult::NotFound) => return Ok(None),
Ok(LookupResult::Crossing(c)) => c,
}
};
drop(root_pin);
let mut current_guid = crossing.child_guid;
let mut start_slot = crossing.child_slot;
let mut depth = crossing.child_depth;
loop {
let pin = bm.pin(current_guid)?;
let guard = pin.read_optimistic();
let frame = BlobFrameRef::wrap(guard.as_slice());
let result = lookup_at(frame, start_slot, key, depth);
if !guard.validate() {
continue 'restart;
}
match result {
Err(e) => return Err(e),
Ok(LookupResult::Found(v)) => return Ok(Some(v.to_vec())),
Ok(LookupResult::NotFound) => return Ok(None),
Ok(LookupResult::Crossing(c)) => {
current_guid = c.child_guid;
start_slot = c.child_slot;
depth = c.child_depth;
}
}
}
}
}
fn descend<'a>(
frame: BlobFrameRef<'a>,
slot: u16,
key: &[u8],
depth: usize,
) -> Result<LookupResult<'a>> {
let (ntype, body) = resolve_typed(frame, slot)?;
match ntype {
NodeType::Invalid => Err(Error::NodeCorrupt {
context: "walker::descend: hit NodeType::Invalid",
}),
NodeType::EmptyRoot => Ok(LookupResult::NotFound),
NodeType::Leaf => leaf_check(frame, body, key, depth),
NodeType::Prefix => prefix_descend(frame, body, key, depth),
NodeType::Node4 => node4_descend(frame, body, key, depth),
NodeType::Node16 => node16_descend(frame, body, key, depth),
NodeType::Node48 => node48_descend(frame, body, key, depth),
NodeType::Node256 => node256_descend(frame, body, key, depth),
NodeType::Blob => blob_descend(body, key, depth),
}
}
fn blob_descend<'a>(body: &[u8], key: &[u8], depth: usize) -> Result<LookupResult<'a>> {
let b = cast::<BlobNode>(body);
let plen = b.prefix_len as usize;
if plen > BLOB_MAX_INLINE {
return Err(Error::NodeCorrupt {
context: "walker::blob_descend: prefix_len exceeds inline buffer",
});
}
if depth + plen > key.len() {
return Ok(LookupResult::NotFound);
}
if key[depth..depth + plen] != b.bytes[..plen] {
return Ok(LookupResult::NotFound);
}
Ok(LookupResult::Crossing(BlobNodeCrossing {
child_guid: b.child_blob_guid,
child_slot: b.child_entry_ptr as u16,
child_depth: depth + plen,
}))
}
fn leaf_check<'a>(
frame: BlobFrameRef<'a>,
body: &'a [u8],
key: &[u8],
_depth: usize,
) -> Result<LookupResult<'a>> {
let leaf = cast::<Leaf>(body);
if leaf.tombstone != 0 {
return Ok(LookupResult::NotFound);
}
let (leaf_key, value) = leaf_extent(frame, leaf)?;
if leaf_key != key {
return Ok(LookupResult::NotFound);
}
Ok(LookupResult::Found(value))
}
fn prefix_descend<'a>(
frame: BlobFrameRef<'a>,
body: &'a [u8],
key: &[u8],
depth: usize,
) -> Result<LookupResult<'a>> {
let p = cast::<Prefix>(body);
let plen = p.prefix_len as usize;
if plen > p.bytes.len() {
return Err(Error::NodeCorrupt {
context: "walker::prefix_descend: prefix_len exceeds inline buffer",
});
}
if depth + plen > key.len() {
return Ok(LookupResult::NotFound);
}
if key[depth..depth + plen] != p.bytes[..plen] {
return Ok(LookupResult::NotFound);
}
descend(frame, p.child as u16, key, depth + plen)
}
fn node4_descend<'a>(
frame: BlobFrameRef<'a>,
body: &'a [u8],
key: &[u8],
depth: usize,
) -> Result<LookupResult<'a>> {
let n = cast::<Node4>(body);
if depth >= key.len() {
return Ok(LookupResult::NotFound);
}
let byte = key[depth];
let count = (n.count as usize).min(4);
for i in 0..count {
if n.keys[i] == byte {
return descend(frame, n.children[i] as u16, key, depth + 1);
}
if n.keys[i] > byte {
break;
}
}
Ok(LookupResult::NotFound)
}
fn node16_descend<'a>(
frame: BlobFrameRef<'a>,
body: &'a [u8],
key: &[u8],
depth: usize,
) -> Result<LookupResult<'a>> {
let n = cast::<Node16>(body);
if depth >= key.len() {
return Ok(LookupResult::NotFound);
}
let byte = key[depth];
if let Some(i) = simd::node16_find_byte(&n.keys, n.count, byte) {
return descend(frame, n.children[i as usize] as u16, key, depth + 1);
}
Ok(LookupResult::NotFound)
}
fn node48_descend<'a>(
frame: BlobFrameRef<'a>,
body: &'a [u8],
key: &[u8],
depth: usize,
) -> Result<LookupResult<'a>> {
let n = cast::<Node48>(body);
if depth >= key.len() {
return Ok(LookupResult::NotFound);
}
let idx = n.index[key[depth] as usize];
if idx == 0 {
return Ok(LookupResult::NotFound);
}
let ci = idx as usize - 1;
if ci >= 48 {
return Err(Error::NodeCorrupt {
context: "walker::node48_descend: child index out of range",
});
}
descend(frame, n.children[ci] as u16, key, depth + 1)
}
fn node256_descend<'a>(
frame: BlobFrameRef<'a>,
body: &'a [u8],
key: &[u8],
depth: usize,
) -> Result<LookupResult<'a>> {
let n = cast::<Node256>(body);
if depth >= key.len() {
return Ok(LookupResult::NotFound);
}
let slot = n.children[key[depth] as usize];
if slot == 0 {
return Ok(LookupResult::NotFound);
}
descend(frame, slot as u16, key, depth + 1)
}