use crate::api::errors::{Error, Result};
use crate::layout::{
size_of_node, BlobGuid, BlobNode, Leaf, Node16, Node256, Node4, Node48, NodeType, Prefix,
BLOB_MAX_INLINE, DATA_AREA_START, MAX_SLOTS, PAGE_SIZE, PREFIX_MAX_INLINE,
};
use crate::store::blob_store::AlignedBlobBuf;
use crate::store::{
bloom_byte_len, decode_child_off, encode_child_off, page_align_up, BlobFrame, BlobFrameRef,
BloomBuilder, BufferManager, BLOOM_BITS_PER_KEY, PAGE_4K, SPILLOVER_RESERVATION,
};
use super::cast;
use super::cow::child_is_snapshot_shared;
use super::readers::child_offset;
use super::types::MakeBlobOutcome;
use super::writers::{write_prefix_chain, write_struct_at};
const MERGE_RESERVE: u32 = 0x1000;
#[cfg(test)]
pub fn make_blob_from_node(
src_frame: &BlobFrame<'_>,
src_off: u32,
new_guid: BlobGuid,
) -> Result<MakeBlobOutcome> {
make_blob_from_node_with_buf(AlignedBlobBuf::zeroed(), src_frame, src_off, new_guid)
}
pub fn make_blob_from_node_in(
bm: &BufferManager,
src_frame: &BlobFrame<'_>,
src_off: u32,
new_guid: BlobGuid,
) -> Result<MakeBlobOutcome> {
make_blob_from_node_with_buf(bm.alloc_blob_buf_zeroed(), src_frame, src_off, new_guid)
}
fn make_blob_from_node_with_buf(
mut buf: AlignedBlobBuf,
src_frame: &BlobFrame<'_>,
src_off: u32,
new_guid: BlobGuid,
) -> Result<MakeBlobOutcome> {
let cloned_root_off;
{
let mut new_frame = BlobFrame::init(buf.as_mut_slice(), new_guid)?;
let mut leaf_cursor = 0u32;
cloned_root_off = clone_subtree(
src_frame,
&mut new_frame,
src_off,
false,
false,
&mut leaf_cursor,
)?
.expect("preserve mode never returns None");
new_frame.header_mut().root_slot = encode_child_off(cloned_root_off);
}
Ok(MakeBlobOutcome { buf })
}
const DEAD_BYTES_COMPACT_THRESHOLD: u32 = (PAGE_SIZE - DATA_AREA_START) / 16;
#[must_use]
pub fn blob_needs_compaction(frame: BlobFrameRef<'_>) -> bool {
let h = frame.header();
h.tombstone_leaf_cnt != 0 || h.dead_bytes >= DEAD_BYTES_COMPACT_THRESHOLD
}
#[must_use]
pub fn blob_would_route(frame: BlobFrameRef<'_>) -> bool {
let h = frame.header();
if h.routing_len != 0 {
return false; }
if h.routing_unfit != 0 {
return false;
}
let root_off = decode_child_off(h.root_slot);
match routing_budget(frame, root_off) {
Ok(budget) => routing_geometry(budget).is_some(),
Err(_) => false,
}
}
pub fn compact_blob(buf: &mut AlignedBlobBuf) -> Result<()> {
let (blob_guid, old_root_off, old_compact_times) = {
let old_frame = BlobFrame::wrap(buf.as_mut_slice());
let h = old_frame.header();
(h.blob_guid, decode_child_off(h.root_slot), h.compact_times)
};
let (routed_lrs, bloom_leaf_count) = {
let old_frame = BlobFrame::wrap(buf.as_mut_slice());
let budget = routing_budget(old_frame.as_ref(), old_root_off)?;
let leaf_count = budget.as_ref().map_or(0, |b| b.leaf_count);
(routing_geometry(budget), leaf_count)
};
let mut new_buf = buf.zeroed_like();
let mut lrs_opt = routed_lrs;
loop {
let overran = {
let mut new_frame = BlobFrame::init(new_buf.as_mut_slice(), blob_guid)?;
let old_frame = BlobFrame::wrap(buf.as_mut_slice());
let mut leaf_cursor = lrs_opt.unwrap_or(0);
let cloned = clone_subtree(
&old_frame,
&mut new_frame,
old_root_off,
true,
lrs_opt.is_some(),
&mut leaf_cursor,
)?;
let entry_off = match cloned {
Some(off) => off,
None => {
decode_child_off(1)
}
};
let overran = lrs_opt.is_some_and(|lrs| new_frame.header().space_used > lrs);
if !overran {
let internal_end = new_frame.header().space_used;
{
let h = new_frame.header_mut();
h.root_slot = encode_child_off(entry_off);
h.tombstone_leaf_cnt = 0;
h.dead_bytes = 0;
h.compact_times = old_compact_times.saturating_add(1);
}
if let Some(lrs) = lrs_opt {
{
let h = new_frame.header_mut();
h.routing_off = DATA_AREA_START;
h.routing_len = internal_end - DATA_AREA_START;
h.leaf_region_start = lrs;
h.space_used = leaf_cursor;
}
if let Some((boff, blen)) = build_routing_bloom(
&mut new_frame,
internal_end,
lrs,
leaf_cursor,
bloom_leaf_count,
) {
let h = new_frame.header_mut();
h.bloom_off = boff;
h.bloom_len = blen;
h.bloom_bits_per_key = u32::from(BLOOM_BITS_PER_KEY);
}
} else if routed_lrs.is_some() {
new_frame.header_mut().routing_unfit = 1;
}
}
overran
};
if !overran {
break;
}
debug_assert!(
false,
"compact_blob: routing budget under-counted the internal arena; \
falling back to legacy layout",
);
new_buf = buf.zeroed_like();
lrs_opt = None;
}
buf.as_mut_slice().copy_from_slice(new_buf.as_slice());
#[cfg(feature = "tracing")]
tracing::debug!(
target: "holt::engine::compact",
blob_guid = ?&blob_guid[..4],
compact_times = old_compact_times.saturating_add(1),
"compact_blob: in-place rebuild complete",
);
Ok(())
}
pub fn is_mergeable(
bm: &BufferManager,
parent_frame: &BlobFrame<'_>,
parent_bn_off: u32,
) -> Result<bool> {
let bn = read_blob_node(parent_frame, parent_bn_off)?;
let child_pin = bm.pin(bn.child_blob_guid)?;
if child_is_snapshot_shared(bm, child_pin.as_ref()) {
return Ok(false);
}
let guard = child_pin.read();
let child_frame = BlobFrameRef::wrap(guard.as_slice());
let parent_h = parent_frame.header();
let child_h = child_frame.header();
let parent_remaining = PAGE_SIZE
.saturating_sub(parent_h.space_used)
.saturating_sub(MERGE_RESERVE);
let child_data_bytes = child_h.space_used.saturating_sub(DATA_AREA_START);
let space_ok = child_data_bytes <= parent_remaining;
let combined_slots = u32::from(parent_h.num_slots) + u32::from(child_h.num_slots);
let slots_ok = combined_slots <= MAX_SLOTS;
let no_grandchild = child_h.num_ext_blobs == 0;
let no_tombstones = child_h.tombstone_leaf_cnt == 0;
Ok(space_ok && slots_ok && no_grandchild && no_tombstones)
}
pub fn merge_blob(
bm: &BufferManager,
parent_frame: &mut BlobFrame<'_>,
parent_bn_off: u32,
seq: u64,
) -> Result<u32> {
let bn = read_blob_node(parent_frame, parent_bn_off)?;
let child_guid = bn.child_blob_guid;
let plen = (bn.prefix_len as usize).min(BLOB_MAX_INLINE);
let prefix_bytes: Vec<u8> = bn.bytes[..plen].to_vec();
let new_subtree_root = {
let child_pin = bm.pin(child_guid)?;
let mut child_guard = child_pin.write();
let child_frame = child_guard.frame();
let child_root_off = decode_child_off(child_frame.header().root_slot);
let mut leaf_cursor = 0u32;
clone_subtree(
&child_frame,
parent_frame,
child_root_off,
false,
false,
&mut leaf_cursor,
)?
.expect("preserve mode never returns None")
};
let inlined_root = if prefix_bytes.is_empty() {
new_subtree_root
} else {
write_prefix_chain(parent_frame, &prefix_bytes, new_subtree_root)?
};
parent_frame.note_abandoned(parent_bn_off);
{
let h = parent_frame.header_mut();
h.num_ext_blobs = h.num_ext_blobs.saturating_sub(1);
h.routing_len = 0;
}
bm.mark_for_delete(child_guid, seq);
#[cfg(feature = "tracing")]
tracing::debug!(
target: "holt::engine::merge",
child_guid = ?&child_guid[..4],
parent_bn_off = parent_bn_off,
inlined_root = inlined_root,
"merge_blob: folded child into parent + queued delete",
);
Ok(inlined_root)
}
fn read_blob_node(frame: &BlobFrame<'_>, off: u32) -> Result<BlobNode> {
let body = frame.body_at_offset(off).ok_or(Error::node_corrupt(
"read_blob_node: body resolution failed",
))?;
Ok(*cast::<BlobNode>(body))
}
fn packed_inner_size(survivors: usize) -> Option<u32> {
match survivors {
0 => None,
1 => Some(size_of_node(NodeType::Prefix)),
2..=4 => Some(size_of_node(NodeType::Node4)),
5..=16 => Some(size_of_node(NodeType::Node16)),
17..=48 => Some(size_of_node(NodeType::Node48)),
_ => Some(size_of_node(NodeType::Node256)),
}
}
struct BudgetNode {
routing_bytes: u32,
leaf_bytes: u32,
leaf_count: u32,
}
fn routing_budget(src: BlobFrameRef<'_>, src_off: u32) -> Result<Option<BudgetNode>> {
let ntype = src
.ntype_at(src_off)
.ok_or(Error::node_corrupt("routing_budget: undecodable src ntype"))?;
let body = src.body_at_offset(src_off).ok_or(Error::node_corrupt(
"routing_budget: src body resolution failed",
))?;
match ntype {
NodeType::Invalid => Err(Error::node_corrupt(
"routing_budget: NodeType::Invalid in source",
)),
NodeType::EmptyRoot => Ok(Some(BudgetNode {
routing_bytes: 0,
leaf_bytes: 0,
leaf_count: 0,
})),
NodeType::Leaf => {
let leaf = *cast::<Leaf>(&body[..std::mem::size_of::<Leaf>()]);
if leaf.tombstone != 0 {
return Ok(None);
}
Ok(Some(BudgetNode {
routing_bytes: 0,
leaf_bytes: body.len() as u32,
leaf_count: 1,
}))
}
NodeType::Prefix => {
let p = *cast::<Prefix>(body);
Ok(
routing_budget(src, child_offset(p.child as u16))?.map(|c| BudgetNode {
routing_bytes: size_of_node(NodeType::Prefix) + c.routing_bytes,
leaf_bytes: c.leaf_bytes,
leaf_count: c.leaf_count,
}),
)
}
NodeType::Blob => Ok(Some(BudgetNode {
routing_bytes: size_of_node(NodeType::Blob),
leaf_bytes: 0,
leaf_count: 0,
})),
NodeType::Node4 => {
let n = *cast::<Node4>(body);
let count = (n.count as usize).min(4);
budget_inner(src, (0..count).map(|i| child_offset(n.children[i])))
}
NodeType::Node16 => {
let n = *cast::<Node16>(body);
let count = (n.count as usize).min(16);
budget_inner(src, (0..count).map(|i| child_offset(n.children[i])))
}
NodeType::Node48 => {
let n = *cast::<Node48>(body);
budget_inner(
src,
(0..256usize)
.map(|b| n.index[b])
.filter(|&idx| idx != 0)
.map(|idx| child_offset(n.children[idx as usize - 1])),
)
}
NodeType::Node256 => {
let n = *cast::<Node256>(body);
budget_inner(
src,
n.children
.iter()
.filter(|&&c| c != 0)
.map(|&c| child_offset(c)),
)
}
}
}
fn budget_inner(
src: BlobFrameRef<'_>,
child_offs: impl Iterator<Item = u32>,
) -> Result<Option<BudgetNode>> {
let mut survivors = 0usize;
let mut routing_bytes = 0u32;
let mut leaf_bytes = 0u32;
let mut leaf_count = 0u32;
for coff in child_offs {
if let Some(c) = routing_budget(src, coff)? {
survivors += 1;
routing_bytes += c.routing_bytes;
leaf_bytes += c.leaf_bytes;
leaf_count += c.leaf_count;
}
}
Ok(packed_inner_size(survivors).map(|self_size| BudgetNode {
routing_bytes: routing_bytes + self_size,
leaf_bytes,
leaf_count,
}))
}
const ROUTE_MIN_LEAF_BYTES: u32 = 2 * PAGE_4K;
fn routing_geometry(budget: Option<BudgetNode>) -> Option<u32> {
let b = budget?;
if b.routing_bytes == 0 {
return None; }
if b.leaf_bytes < ROUTE_MIN_LEAF_BYTES {
return None; }
let bloom_len = bloom_reserve_bytes(b.leaf_count);
let routing_end =
DATA_AREA_START + size_of_node(NodeType::EmptyRoot) + b.routing_bytes + bloom_len;
let lrs = page_align_up(routing_end);
let fits = u64::from(lrs) + u64::from(b.leaf_bytes) + u64::from(SPILLOVER_RESERVATION)
<= u64::from(PAGE_SIZE);
fits.then_some(lrs)
}
#[inline]
fn bloom_reserve_bytes(leaf_count: u32) -> u32 {
if leaf_count == 0 {
return 0;
}
bloom_byte_len(leaf_count as usize, BLOOM_BITS_PER_KEY) as u32
}
fn build_routing_bloom(
frame: &mut BlobFrame<'_>,
internal_end: u32,
leaf_region_start: u32,
leaf_cursor: u32,
leaf_count: u32,
) -> Option<(u32, u32)> {
let bloom_len = bloom_reserve_bytes(leaf_count);
if bloom_len == 0 {
return None;
}
let bloom_off = internal_end;
if bloom_off.checked_add(bloom_len)? > leaf_region_start {
return None;
}
let bytes = {
let view = frame.as_ref();
let mut builder = BloomBuilder::new(leaf_count as usize, BLOOM_BITS_PER_KEY);
let hdr = std::mem::size_of::<Leaf>();
let mut off = leaf_region_start;
while off < leaf_cursor {
let body = view.body_at_offset(off)?;
let leaf = *cast::<Leaf>(&body[..hdr]);
let key_end = hdr + leaf.key_len as usize;
if key_end > body.len() {
return None; }
builder.add(&body[hdr..key_end]);
off = off.checked_add(body.len() as u32)?;
}
builder.into_bytes()
};
debug_assert_eq!(bytes.len() as u32, bloom_len);
let dst = frame.bytes_at_mut(bloom_off, bloom_len)?;
dst.copy_from_slice(&bytes);
Some((bloom_off, bloom_len))
}
fn clone_subtree(
src: &BlobFrame<'_>,
dst: &mut BlobFrame<'_>,
src_off: u32,
filter_tombstones: bool,
routed: bool,
leaf_cursor: &mut u32,
) -> Result<Option<u32>> {
let ntype = src
.ntype_at(src_off)
.ok_or(Error::node_corrupt("clone_subtree: undecodable src ntype"))?;
let body = src.body_at_offset(src_off).ok_or(Error::node_corrupt(
"clone_subtree: src body resolution failed",
))?;
match ntype {
NodeType::Invalid => Err(Error::node_corrupt(
"clone_subtree: NodeType::Invalid in source",
)),
NodeType::EmptyRoot => {
let out = dst.alloc_node(NodeType::EmptyRoot)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_subtree: EmptyRoot offset"))?;
if let Some(b) = dst.bytes_at_mut(off, 8) {
b[1] = NodeType::EmptyRoot.as_u8();
}
Ok(Some(off))
}
NodeType::Leaf => clone_leaf(body, dst, filter_tombstones, routed, leaf_cursor),
NodeType::Prefix => clone_prefix(src, body, dst, filter_tombstones, routed, leaf_cursor),
NodeType::Node4 => clone_node4(src, body, dst, filter_tombstones, routed, leaf_cursor),
NodeType::Node16 => clone_node16(src, body, dst, filter_tombstones, routed, leaf_cursor),
NodeType::Node48 => clone_node48(src, body, dst, filter_tombstones, routed, leaf_cursor),
NodeType::Node256 => clone_node256(src, body, dst, filter_tombstones, routed, leaf_cursor),
NodeType::Blob => clone_blob_node(body, dst),
}
}
fn clone_leaf(
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
routed: bool,
leaf_cursor: &mut u32,
) -> Result<Option<u32>> {
let src_leaf = *cast::<Leaf>(&src_body[..std::mem::size_of::<Leaf>()]);
if filter_tombstones && src_leaf.tombstone != 0 {
return Ok(None);
}
let total = src_body.len() as u32;
debug_assert_eq!(
total,
crate::layout::leaf_body_size(u32::from(src_leaf.key_len), u32::from(src_leaf.value_len),)
);
let dst_off = if routed {
let off = *leaf_cursor;
dst.alloc_leaf_at(off, total)?;
*leaf_cursor += total;
off
} else {
let out = dst.alloc_leaf(total)?;
dst.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_leaf: dst slot offset"))?
};
{
let dst_body = dst
.bytes_at_mut(dst_off, total)
.ok_or(Error::node_corrupt("clone_leaf: dst body out of range"))?;
debug_assert_eq!(dst_body.len(), src_body.len());
dst_body.copy_from_slice(src_body);
}
Ok(Some(dst_off))
}
fn clone_prefix(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
routed: bool,
leaf_cursor: &mut u32,
) -> Result<Option<u32>> {
let p = *cast::<Prefix>(src_body);
let plen = (p.prefix_len as usize).min(PREFIX_MAX_INLINE);
let Some(new_child_off) = clone_subtree(
src,
dst,
child_offset(p.child as u16),
filter_tombstones,
routed,
leaf_cursor,
)?
else {
return Ok(None);
};
let out = dst.alloc_node(NodeType::Prefix)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_prefix: dst offset"))?;
let new_p = Prefix::new(&p.bytes[..plen], u32::from(encode_child_off(new_child_off)));
write_struct_at(dst, off, &new_p)?;
Ok(Some(off))
}
fn clone_node4(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
routed: bool,
leaf_cursor: &mut u32,
) -> Result<Option<u32>> {
let src_n = *cast::<Node4>(src_body);
let count = (src_n.count as usize).min(4);
if filter_tombstones {
let mut survivors: Vec<(u8, u32)> = Vec::with_capacity(count);
for i in 0..count {
if let Some(new_child) = clone_subtree(
src,
dst,
child_offset(src_n.children[i]),
true,
routed,
leaf_cursor,
)? {
survivors.push((src_n.keys[i], new_child));
}
}
pack_inner_node(dst, &survivors)
} else {
let mut new_children = [0u16; 4];
for (i, child) in new_children.iter_mut().enumerate().take(count) {
let cloned = clone_subtree(
src,
dst,
child_offset(src_n.children[i]),
false,
routed,
leaf_cursor,
)?
.expect("preserve mode never returns None");
*child = encode_child_off(cloned);
}
let out = dst.alloc_node(NodeType::Node4)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_node4: dst offset"))?;
let mut new_n = Node4::empty();
new_n.count = src_n.count;
new_n.keys = src_n.keys;
new_n.children = new_children;
write_struct_at(dst, off, &new_n)?;
Ok(Some(off))
}
}
fn clone_node16(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
routed: bool,
leaf_cursor: &mut u32,
) -> Result<Option<u32>> {
let src_n = *cast::<Node16>(src_body);
let count = (src_n.count as usize).min(16);
if filter_tombstones {
let mut survivors: Vec<(u8, u32)> = Vec::with_capacity(count);
for i in 0..count {
if let Some(new_child) = clone_subtree(
src,
dst,
child_offset(src_n.children[i]),
true,
routed,
leaf_cursor,
)? {
survivors.push((src_n.keys[i], new_child));
}
}
pack_inner_node(dst, &survivors)
} else {
let mut new_children = [0u16; 16];
for (i, child) in new_children.iter_mut().enumerate().take(count) {
let cloned = clone_subtree(
src,
dst,
child_offset(src_n.children[i]),
false,
routed,
leaf_cursor,
)?
.expect("preserve mode never returns None");
*child = encode_child_off(cloned);
}
let out = dst.alloc_node(NodeType::Node16)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_node16: dst offset"))?;
let mut new_n = Node16::empty();
new_n.count = src_n.count;
new_n.keys = src_n.keys;
new_n.children = new_children;
write_struct_at(dst, off, &new_n)?;
Ok(Some(off))
}
}
fn clone_node48(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
routed: bool,
leaf_cursor: &mut u32,
) -> Result<Option<u32>> {
let src_n = *cast::<Node48>(src_body);
if filter_tombstones {
let mut survivors: Vec<(u8, u32)> = Vec::with_capacity(48);
for b in 0..256usize {
let idx = src_n.index[b];
if idx == 0 {
continue;
}
let ci = idx as usize - 1;
if ci >= 48 {
return Err(Error::node_corrupt("clone_node48: index out of range"));
}
if let Some(new_child) = clone_subtree(
src,
dst,
child_offset(src_n.children[ci]),
true,
routed,
leaf_cursor,
)? {
survivors.push((b as u8, new_child));
}
}
pack_inner_node(dst, &survivors)
} else {
let mut new_children = [0u16; 48];
for (i, child) in new_children.iter_mut().enumerate() {
if src_n.children[i] != 0 {
let cloned = clone_subtree(
src,
dst,
child_offset(src_n.children[i]),
false,
routed,
leaf_cursor,
)?
.expect("preserve mode never returns None");
*child = encode_child_off(cloned);
}
}
let out = dst.alloc_node(NodeType::Node48)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_node48: dst offset"))?;
let mut new_n = Node48::empty();
new_n.count = src_n.count;
new_n.index = src_n.index;
new_n.children = new_children;
write_struct_at(dst, off, &new_n)?;
Ok(Some(off))
}
}
fn clone_node256(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
routed: bool,
leaf_cursor: &mut u32,
) -> Result<Option<u32>> {
let src_n = *cast::<Node256>(src_body);
if filter_tombstones {
let mut survivors: Vec<(u8, u32)> = Vec::with_capacity(64);
for (b, &child_enc) in src_n.children.iter().enumerate() {
if child_enc == 0 {
continue;
}
if let Some(new_child) =
clone_subtree(src, dst, child_offset(child_enc), true, routed, leaf_cursor)?
{
survivors.push((b as u8, new_child));
}
}
pack_inner_node(dst, &survivors)
} else {
let mut new_children = [0u16; 256];
for (i, child) in new_children.iter_mut().enumerate() {
if src_n.children[i] != 0 {
let cloned = clone_subtree(
src,
dst,
child_offset(src_n.children[i]),
false,
routed,
leaf_cursor,
)?
.expect("preserve mode never returns None");
*child = encode_child_off(cloned);
}
}
let out = dst.alloc_node(NodeType::Node256)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_node256: dst offset"))?;
let mut new_n = Node256::empty();
new_n.count = src_n.count;
new_n.children = new_children;
write_struct_at(dst, off, &new_n)?;
Ok(Some(off))
}
}
fn clone_blob_node(src_body: &[u8], dst: &mut BlobFrame<'_>) -> Result<Option<u32>> {
let src_b = *cast::<BlobNode>(src_body);
let plen = (src_b.prefix_len as usize).min(BLOB_MAX_INLINE);
let new_b = BlobNode::new(&src_b.bytes[..plen], src_b.child_blob_guid);
let out = dst.alloc_node(NodeType::Blob)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_blob_node: dst offset"))?;
write_struct_at(dst, off, &new_b)?;
Ok(Some(off))
}
fn pack_inner_node(dst: &mut BlobFrame<'_>, survivors: &[(u8, u32)]) -> Result<Option<u32>> {
debug_assert!(
survivors.windows(2).all(|w| w[0].0 < w[1].0),
"pack_inner_node: survivors must be byte-sorted ascending"
);
let alloc = |dst: &mut BlobFrame<'_>, nt: NodeType| -> Result<(u16, u32)> {
let out = dst.alloc_node(nt)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("pack_inner_node: dst offset"))?;
Ok((out.slot, off))
};
match survivors.len() {
0 => Ok(None),
1 => {
let (byte, child_off) = survivors[0];
let off = write_prefix_chain(dst, &[byte], child_off)?;
Ok(Some(off))
}
2..=4 => {
let (_, off) = alloc(dst, NodeType::Node4)?;
let mut n = Node4::empty();
n.count = survivors.len() as u8;
for (i, &(b, c)) in survivors.iter().enumerate() {
n.keys[i] = b;
n.children[i] = encode_child_off(c);
}
write_struct_at(dst, off, &n)?;
Ok(Some(off))
}
5..=16 => {
let (_, off) = alloc(dst, NodeType::Node16)?;
let mut n = Node16::empty();
n.count = survivors.len() as u8;
for (i, &(b, c)) in survivors.iter().enumerate() {
n.keys[i] = b;
n.children[i] = encode_child_off(c);
}
write_struct_at(dst, off, &n)?;
Ok(Some(off))
}
17..=48 => {
let (_, off) = alloc(dst, NodeType::Node48)?;
let mut n = Node48::empty();
n.count = survivors.len() as u8;
for (ci, &(b, c)) in survivors.iter().enumerate() {
n.children[ci] = encode_child_off(c);
n.index[b as usize] = (ci as u8) + 1;
}
write_struct_at(dst, off, &n)?;
Ok(Some(off))
}
_ => {
let (_, off) = alloc(dst, NodeType::Node256)?;
let mut n = Node256::empty();
n.count = survivors.len() as u8;
for &(b, c) in survivors {
n.children[b as usize] = encode_child_off(c);
}
write_struct_at(dst, off, &n)?;
Ok(Some(off))
}
}
}
#[cfg(test)]
mod budget_tests {
use super::{pack_inner_node, packed_inner_size, routing_geometry, BudgetNode};
use crate::layout::{size_of_node, NodeType, DATA_AREA_START};
use crate::store::blob_store::AlignedBlobBuf;
use crate::store::{BlobFrame, PAGE_4K};
#[test]
fn routing_geometry_reserves_the_init_sentinel_on_a_page_boundary() {
let sentinel = size_of_node(NodeType::EmptyRoot);
for routing_bytes in [PAGE_4K, 2 * PAGE_4K, 3 * PAGE_4K] {
let lrs = routing_geometry(Some(BudgetNode {
routing_bytes,
leaf_bytes: 16 * PAGE_4K,
leaf_count: 0,
}))
.expect("substantial routed blob");
let space_used_after_clone = DATA_AREA_START + sentinel + routing_bytes;
assert!(
space_used_after_clone <= lrs,
"routing_bytes={routing_bytes}: space_used {space_used_after_clone:#x} \
overruns leaf_region_start {lrs:#x}",
);
assert_eq!(lrs % PAGE_4K, 0, "leaf_region_start must be page-aligned");
}
}
#[test]
fn packed_inner_size_matches_pack_inner_node() {
let mut ab = AlignedBlobBuf::zeroed();
for &n in &[1usize, 2, 3, 4, 5, 16, 17, 48, 49, 100, 256] {
let mut frame = BlobFrame::init(ab.as_mut_slice(), [0u8; 16]).unwrap();
let mut survivors: Vec<(u8, u32)> = Vec::with_capacity(n);
for i in 0..n {
let out = frame.alloc_leaf(16).unwrap();
let off = frame.offset_of_slot(out.slot).unwrap();
survivors.push((i as u8, off));
}
let off = pack_inner_node(&mut frame, &survivors)
.unwrap()
.expect("non-empty survivors pack to Some");
let nt = frame.ntype_at(off).unwrap();
assert_eq!(
Some(size_of_node(nt)),
packed_inner_size(n),
"tier mismatch at n={n}: pack emitted {nt:?}",
);
}
assert_eq!(packed_inner_size(0), None);
}
}