use crate::api::errors::{Error, Result};
use crate::layout::{
BlobGuid, BlobNode, Leaf, Node16, Node256, Node4, Node48, NodeType, Prefix, BLOB_MAX_INLINE,
DATA_AREA_START, MAX_SLOTS, PAGE_SIZE, PREFIX_MAX_INLINE,
};
use crate::store::blob_store::AlignedBlobBuf;
use crate::store::{decode_child_off, encode_child_off, BlobFrame, BlobFrameRef, BufferManager};
use super::cast;
use super::readers::child_offset;
use super::types::MakeBlobOutcome;
use super::writers::{write_prefix_chain, write_struct_at};
const MERGE_RESERVE: u32 = 0x1000;
#[cfg(test)]
pub fn make_blob_from_node(
src_frame: &BlobFrame<'_>,
src_off: u32,
new_guid: BlobGuid,
) -> Result<MakeBlobOutcome> {
make_blob_from_node_with_buf(AlignedBlobBuf::zeroed(), src_frame, src_off, new_guid)
}
pub fn make_blob_from_node_in(
bm: &BufferManager,
src_frame: &BlobFrame<'_>,
src_off: u32,
new_guid: BlobGuid,
) -> Result<MakeBlobOutcome> {
make_blob_from_node_with_buf(bm.alloc_blob_buf_zeroed(), src_frame, src_off, new_guid)
}
fn make_blob_from_node_with_buf(
mut buf: AlignedBlobBuf,
src_frame: &BlobFrame<'_>,
src_off: u32,
new_guid: BlobGuid,
) -> Result<MakeBlobOutcome> {
let cloned_root_off;
{
let mut new_frame = BlobFrame::init(buf.as_mut_slice(), new_guid)?;
cloned_root_off = clone_subtree(src_frame, &mut new_frame, src_off, false)?
.expect("preserve mode never returns None");
new_frame.header_mut().root_slot = encode_child_off(cloned_root_off);
}
Ok(MakeBlobOutcome { buf })
}
const DEAD_BYTES_COMPACT_THRESHOLD: u32 = (PAGE_SIZE - DATA_AREA_START) / 16;
#[must_use]
pub fn blob_needs_compaction(frame: BlobFrameRef<'_>) -> bool {
let h = frame.header();
h.tombstone_leaf_cnt != 0 || h.dead_bytes >= DEAD_BYTES_COMPACT_THRESHOLD
}
pub fn compact_blob(buf: &mut AlignedBlobBuf) -> Result<()> {
let (blob_guid, old_root_off, old_compact_times) = {
let old_frame = BlobFrame::wrap(buf.as_mut_slice());
let h = old_frame.header();
(h.blob_guid, decode_child_off(h.root_slot), h.compact_times)
};
let mut new_buf = buf.zeroed_like();
{
let mut new_frame = BlobFrame::init(new_buf.as_mut_slice(), blob_guid)?;
let old_frame = BlobFrame::wrap(buf.as_mut_slice());
let cloned = clone_subtree(&old_frame, &mut new_frame, old_root_off, true)?;
let entry_off = match cloned {
Some(off) => off,
None => {
decode_child_off(1)
}
};
let h = new_frame.header_mut();
h.root_slot = encode_child_off(entry_off);
h.tombstone_leaf_cnt = 0;
h.dead_bytes = 0;
h.compact_times = old_compact_times.saturating_add(1);
}
buf.as_mut_slice().copy_from_slice(new_buf.as_slice());
#[cfg(feature = "tracing")]
tracing::debug!(
target: "holt::engine::compact",
blob_guid = ?&blob_guid[..4],
compact_times = old_compact_times.saturating_add(1),
"compact_blob: in-place rebuild complete",
);
Ok(())
}
pub fn is_mergeable(
bm: &BufferManager,
parent_frame: &BlobFrame<'_>,
parent_bn_off: u32,
) -> Result<bool> {
let bn = read_blob_node(parent_frame, parent_bn_off)?;
let child_pin = bm.pin(bn.child_blob_guid)?;
let guard = child_pin.read();
let child_frame = BlobFrameRef::wrap(guard.as_slice());
let parent_h = parent_frame.header();
let child_h = child_frame.header();
let parent_remaining = PAGE_SIZE
.saturating_sub(parent_h.space_used)
.saturating_sub(MERGE_RESERVE);
let child_data_bytes = child_h.space_used.saturating_sub(DATA_AREA_START);
let space_ok = child_data_bytes <= parent_remaining;
let combined_slots = u32::from(parent_h.num_slots) + u32::from(child_h.num_slots);
let slots_ok = combined_slots <= MAX_SLOTS;
let no_grandchild = child_h.num_ext_blobs == 0;
let no_tombstones = child_h.tombstone_leaf_cnt == 0;
Ok(space_ok && slots_ok && no_grandchild && no_tombstones)
}
pub fn merge_blob(
bm: &BufferManager,
parent_frame: &mut BlobFrame<'_>,
parent_bn_off: u32,
seq: u64,
) -> Result<u32> {
let bn = read_blob_node(parent_frame, parent_bn_off)?;
let child_guid = bn.child_blob_guid;
let plen = (bn.prefix_len as usize).min(BLOB_MAX_INLINE);
let prefix_bytes: Vec<u8> = bn.bytes[..plen].to_vec();
let new_subtree_root = {
let child_pin = bm.pin(child_guid)?;
let mut child_guard = child_pin.write();
let child_frame = child_guard.frame();
let child_root_off = decode_child_off(child_frame.header().root_slot);
clone_subtree(&child_frame, parent_frame, child_root_off, false)?
.expect("preserve mode never returns None")
};
let inlined_root = if prefix_bytes.is_empty() {
new_subtree_root
} else {
write_prefix_chain(parent_frame, &prefix_bytes, new_subtree_root)?
};
parent_frame.note_abandoned(parent_bn_off);
{
let h = parent_frame.header_mut();
h.num_ext_blobs = h.num_ext_blobs.saturating_sub(1);
}
bm.mark_for_delete(child_guid, seq);
#[cfg(feature = "tracing")]
tracing::debug!(
target: "holt::engine::merge",
child_guid = ?&child_guid[..4],
parent_bn_off = parent_bn_off,
inlined_root = inlined_root,
"merge_blob: folded child into parent + queued delete",
);
Ok(inlined_root)
}
fn read_blob_node(frame: &BlobFrame<'_>, off: u32) -> Result<BlobNode> {
let body = frame.body_at_offset(off).ok_or(Error::node_corrupt(
"read_blob_node: body resolution failed",
))?;
Ok(*cast::<BlobNode>(body))
}
fn clone_subtree(
src: &BlobFrame<'_>,
dst: &mut BlobFrame<'_>,
src_off: u32,
filter_tombstones: bool,
) -> Result<Option<u32>> {
let ntype = src
.ntype_at(src_off)
.ok_or(Error::node_corrupt("clone_subtree: undecodable src ntype"))?;
let body = src.body_at_offset(src_off).ok_or(Error::node_corrupt(
"clone_subtree: src body resolution failed",
))?;
match ntype {
NodeType::Invalid => Err(Error::node_corrupt(
"clone_subtree: NodeType::Invalid in source",
)),
NodeType::EmptyRoot => {
let out = dst.alloc_node(NodeType::EmptyRoot)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_subtree: EmptyRoot offset"))?;
if let Some(b) = dst.bytes_at_mut(off, 8) {
b[1] = NodeType::EmptyRoot.as_u8();
}
Ok(Some(off))
}
NodeType::Leaf => clone_leaf(body, dst, filter_tombstones),
NodeType::Prefix => clone_prefix(src, body, dst, filter_tombstones),
NodeType::Node4 => clone_node4(src, body, dst, filter_tombstones),
NodeType::Node16 => clone_node16(src, body, dst, filter_tombstones),
NodeType::Node48 => clone_node48(src, body, dst, filter_tombstones),
NodeType::Node256 => clone_node256(src, body, dst, filter_tombstones),
NodeType::Blob => clone_blob_node(body, dst),
}
}
fn clone_leaf(
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
) -> Result<Option<u32>> {
let src_leaf = *cast::<Leaf>(&src_body[..std::mem::size_of::<Leaf>()]);
if filter_tombstones && src_leaf.tombstone != 0 {
return Ok(None);
}
let total = src_body.len() as u32;
debug_assert_eq!(
total,
crate::layout::leaf_body_size(u32::from(src_leaf.key_len), u32::from(src_leaf.value_len),)
);
let out = dst.alloc_leaf(total)?;
let dst_off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_leaf: dst slot offset"))?;
{
let dst_body = dst
.bytes_at_mut(dst_off, total)
.ok_or(Error::node_corrupt("clone_leaf: dst body out of range"))?;
debug_assert_eq!(dst_body.len(), src_body.len());
dst_body.copy_from_slice(src_body);
}
Ok(Some(dst_off))
}
fn clone_prefix(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
) -> Result<Option<u32>> {
let p = *cast::<Prefix>(src_body);
let plen = (p.prefix_len as usize).min(PREFIX_MAX_INLINE);
let Some(new_child_off) =
clone_subtree(src, dst, child_offset(p.child as u16), filter_tombstones)?
else {
return Ok(None);
};
let out = dst.alloc_node(NodeType::Prefix)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_prefix: dst offset"))?;
let new_p = Prefix::new(&p.bytes[..plen], u32::from(encode_child_off(new_child_off)));
write_struct_at(dst, off, &new_p)?;
Ok(Some(off))
}
fn clone_node4(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
) -> Result<Option<u32>> {
let src_n = *cast::<Node4>(src_body);
let count = (src_n.count as usize).min(4);
if filter_tombstones {
let mut survivors: Vec<(u8, u32)> = Vec::with_capacity(count);
for i in 0..count {
if let Some(new_child) = clone_subtree(src, dst, child_offset(src_n.children[i]), true)?
{
survivors.push((src_n.keys[i], new_child));
}
}
pack_inner_node(dst, &survivors)
} else {
let mut new_children = [0u16; 4];
for (i, child) in new_children.iter_mut().enumerate().take(count) {
let cloned = clone_subtree(src, dst, child_offset(src_n.children[i]), false)?
.expect("preserve mode never returns None");
*child = encode_child_off(cloned);
}
let out = dst.alloc_node(NodeType::Node4)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_node4: dst offset"))?;
let mut new_n = Node4::empty();
new_n.count = src_n.count;
new_n.keys = src_n.keys;
new_n.children = new_children;
write_struct_at(dst, off, &new_n)?;
Ok(Some(off))
}
}
fn clone_node16(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
) -> Result<Option<u32>> {
let src_n = *cast::<Node16>(src_body);
let count = (src_n.count as usize).min(16);
if filter_tombstones {
let mut survivors: Vec<(u8, u32)> = Vec::with_capacity(count);
for i in 0..count {
if let Some(new_child) = clone_subtree(src, dst, child_offset(src_n.children[i]), true)?
{
survivors.push((src_n.keys[i], new_child));
}
}
pack_inner_node(dst, &survivors)
} else {
let mut new_children = [0u16; 16];
for (i, child) in new_children.iter_mut().enumerate().take(count) {
let cloned = clone_subtree(src, dst, child_offset(src_n.children[i]), false)?
.expect("preserve mode never returns None");
*child = encode_child_off(cloned);
}
let out = dst.alloc_node(NodeType::Node16)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_node16: dst offset"))?;
let mut new_n = Node16::empty();
new_n.count = src_n.count;
new_n.keys = src_n.keys;
new_n.children = new_children;
write_struct_at(dst, off, &new_n)?;
Ok(Some(off))
}
}
fn clone_node48(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
) -> Result<Option<u32>> {
let src_n = *cast::<Node48>(src_body);
if filter_tombstones {
let mut survivors: Vec<(u8, u32)> = Vec::with_capacity(48);
for b in 0..256usize {
let idx = src_n.index[b];
if idx == 0 {
continue;
}
let ci = idx as usize - 1;
if ci >= 48 {
return Err(Error::node_corrupt("clone_node48: index out of range"));
}
if let Some(new_child) =
clone_subtree(src, dst, child_offset(src_n.children[ci]), true)?
{
survivors.push((b as u8, new_child));
}
}
pack_inner_node(dst, &survivors)
} else {
let mut new_children = [0u16; 48];
for (i, child) in new_children.iter_mut().enumerate() {
if src_n.children[i] != 0 {
let cloned = clone_subtree(src, dst, child_offset(src_n.children[i]), false)?
.expect("preserve mode never returns None");
*child = encode_child_off(cloned);
}
}
let out = dst.alloc_node(NodeType::Node48)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_node48: dst offset"))?;
let mut new_n = Node48::empty();
new_n.count = src_n.count;
new_n.index = src_n.index;
new_n.children = new_children;
write_struct_at(dst, off, &new_n)?;
Ok(Some(off))
}
}
fn clone_node256(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
) -> Result<Option<u32>> {
let src_n = *cast::<Node256>(src_body);
if filter_tombstones {
let mut survivors: Vec<(u8, u32)> = Vec::with_capacity(64);
for (b, &child_enc) in src_n.children.iter().enumerate() {
if child_enc == 0 {
continue;
}
if let Some(new_child) = clone_subtree(src, dst, child_offset(child_enc), true)? {
survivors.push((b as u8, new_child));
}
}
pack_inner_node(dst, &survivors)
} else {
let mut new_children = [0u16; 256];
for (i, child) in new_children.iter_mut().enumerate() {
if src_n.children[i] != 0 {
let cloned = clone_subtree(src, dst, child_offset(src_n.children[i]), false)?
.expect("preserve mode never returns None");
*child = encode_child_off(cloned);
}
}
let out = dst.alloc_node(NodeType::Node256)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_node256: dst offset"))?;
let mut new_n = Node256::empty();
new_n.count = src_n.count;
new_n.children = new_children;
write_struct_at(dst, off, &new_n)?;
Ok(Some(off))
}
}
fn clone_blob_node(src_body: &[u8], dst: &mut BlobFrame<'_>) -> Result<Option<u32>> {
let src_b = *cast::<BlobNode>(src_body);
let plen = (src_b.prefix_len as usize).min(BLOB_MAX_INLINE);
let new_b = BlobNode::new(&src_b.bytes[..plen], src_b.child_blob_guid);
let out = dst.alloc_node(NodeType::Blob)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("clone_blob_node: dst offset"))?;
write_struct_at(dst, off, &new_b)?;
Ok(Some(off))
}
fn pack_inner_node(dst: &mut BlobFrame<'_>, survivors: &[(u8, u32)]) -> Result<Option<u32>> {
debug_assert!(
survivors.windows(2).all(|w| w[0].0 < w[1].0),
"pack_inner_node: survivors must be byte-sorted ascending"
);
let alloc = |dst: &mut BlobFrame<'_>, nt: NodeType| -> Result<(u16, u32)> {
let out = dst.alloc_node(nt)?;
let off = dst
.offset_of_slot(out.slot)
.ok_or(Error::node_corrupt("pack_inner_node: dst offset"))?;
Ok((out.slot, off))
};
match survivors.len() {
0 => Ok(None),
1 => {
let (byte, child_off) = survivors[0];
let off = write_prefix_chain(dst, &[byte], child_off)?;
Ok(Some(off))
}
2..=4 => {
let (_, off) = alloc(dst, NodeType::Node4)?;
let mut n = Node4::empty();
n.count = survivors.len() as u8;
for (i, &(b, c)) in survivors.iter().enumerate() {
n.keys[i] = b;
n.children[i] = encode_child_off(c);
}
write_struct_at(dst, off, &n)?;
Ok(Some(off))
}
5..=16 => {
let (_, off) = alloc(dst, NodeType::Node16)?;
let mut n = Node16::empty();
n.count = survivors.len() as u8;
for (i, &(b, c)) in survivors.iter().enumerate() {
n.keys[i] = b;
n.children[i] = encode_child_off(c);
}
write_struct_at(dst, off, &n)?;
Ok(Some(off))
}
17..=48 => {
let (_, off) = alloc(dst, NodeType::Node48)?;
let mut n = Node48::empty();
n.count = survivors.len() as u8;
for (ci, &(b, c)) in survivors.iter().enumerate() {
n.children[ci] = encode_child_off(c);
n.index[b as usize] = (ci as u8) + 1;
}
write_struct_at(dst, off, &n)?;
Ok(Some(off))
}
_ => {
let (_, off) = alloc(dst, NodeType::Node256)?;
let mut n = Node256::empty();
n.count = survivors.len() as u8;
for &(b, c) in survivors {
n.children[b as usize] = encode_child_off(c);
}
write_struct_at(dst, off, &n)?;
Ok(Some(off))
}
}
}