use crate::api::errors::{Error, Result};
use crate::layout::{
leaf_extent_size, BlobGuid, BlobNode, Leaf, Node16, Node256, Node4, Node48, NodeType, Prefix,
BLOB_MAX_INLINE, DATA_AREA_START, MAX_SLOTS, PAGE_SIZE, PREFIX_MAX_INLINE,
};
use crate::store::backend::AlignedBlobBuf;
use crate::store::{BlobFrame, BlobFrameRef, BufferManager};
use super::cast;
use super::types::MakeBlobOutcome;
use super::writers::{write_prefix_chain, write_struct_to_slot};
const MERGE_RESERVE: u32 = 0x1000;
#[cfg(test)]
pub fn make_blob_from_node(
src_frame: &BlobFrame<'_>,
src_slot: u16,
new_guid: BlobGuid,
) -> Result<MakeBlobOutcome> {
make_blob_from_node_with_buf(AlignedBlobBuf::zeroed(), src_frame, src_slot, new_guid)
}
pub fn make_blob_from_node_in(
bm: &BufferManager,
src_frame: &BlobFrame<'_>,
src_slot: u16,
new_guid: BlobGuid,
) -> Result<MakeBlobOutcome> {
make_blob_from_node_with_buf(bm.alloc_blob_buf_zeroed(), src_frame, src_slot, new_guid)
}
fn make_blob_from_node_with_buf(
mut buf: AlignedBlobBuf,
src_frame: &BlobFrame<'_>,
src_slot: u16,
new_guid: BlobGuid,
) -> Result<MakeBlobOutcome> {
let cloned_root_slot;
{
let mut new_frame = BlobFrame::init(buf.as_mut_slice(), new_guid)?;
cloned_root_slot = clone_subtree(src_frame, &mut new_frame, src_slot, false)?
.expect("preserve mode never returns None");
if new_frame.header().root_slot == 1 && cloned_root_slot != 1 {
new_frame.free_node(1)?;
}
new_frame.header_mut().root_slot = cloned_root_slot;
}
Ok(MakeBlobOutcome { buf })
}
#[must_use]
pub fn blob_needs_compaction(frame: BlobFrameRef<'_>) -> bool {
let h = frame.header();
let leaf_free_list = h.free_list_head[NodeType::Leaf as usize - 1];
h.tombstone_leaf_cnt != 0 || leaf_free_list != 0
}
pub fn compact_blob(buf: &mut AlignedBlobBuf) -> Result<()> {
let (blob_guid, old_root, old_compact_times) = {
let old_frame = BlobFrame::wrap(buf.as_mut_slice());
let h = old_frame.header();
(h.blob_guid, h.root_slot, h.compact_times)
};
let mut new_buf = buf.zeroed_like();
{
let mut new_frame = BlobFrame::init(new_buf.as_mut_slice(), blob_guid)?;
let old_frame = BlobFrame::wrap(buf.as_mut_slice());
let cloned = clone_subtree(&old_frame, &mut new_frame, old_root, true)?;
let entry = match cloned {
Some(slot) => slot,
None => {
new_frame.alloc_node(NodeType::EmptyRoot)?.slot
}
};
if new_frame.header().root_slot == 1 && entry != 1 {
new_frame.free_node(1)?;
}
let h = new_frame.header_mut();
h.root_slot = entry;
h.tombstone_leaf_cnt = 0;
h.compact_times = old_compact_times.saturating_add(1);
}
buf.as_mut_slice().copy_from_slice(new_buf.as_slice());
#[cfg(feature = "tracing")]
tracing::debug!(
target: "holt::engine::compact",
blob_guid = ?&blob_guid[..4],
compact_times = old_compact_times.saturating_add(1),
"compact_blob: in-place rebuild complete",
);
Ok(())
}
pub fn is_mergeable(
bm: &BufferManager,
parent_frame: &BlobFrame<'_>,
parent_bn_slot: u16,
) -> Result<bool> {
let bn = read_blob_node(parent_frame, parent_bn_slot)?;
let child_pin = bm.pin(bn.child_blob_guid)?;
let guard = child_pin.read();
let child_frame = BlobFrameRef::wrap(guard.as_slice());
let parent_h = parent_frame.header();
let child_h = child_frame.header();
let parent_remaining = PAGE_SIZE
.saturating_sub(parent_h.space_used)
.saturating_sub(MERGE_RESERVE);
let child_data_bytes = child_h.space_used.saturating_sub(DATA_AREA_START);
let space_ok = child_data_bytes <= parent_remaining;
let combined_slots = u32::from(parent_h.num_slots) + u32::from(child_h.num_slots);
let slots_ok = combined_slots <= MAX_SLOTS;
let no_grandchild = child_h.num_ext_blobs == 0;
let no_tombstones = child_h.tombstone_leaf_cnt == 0;
Ok(space_ok && slots_ok && no_grandchild && no_tombstones)
}
pub fn merge_blob(
bm: &BufferManager,
parent_frame: &mut BlobFrame<'_>,
parent_bn_slot: u16,
seq: u64,
) -> Result<u16> {
let bn = read_blob_node(parent_frame, parent_bn_slot)?;
let child_guid = bn.child_blob_guid;
let plen = (bn.prefix_len as usize).min(BLOB_MAX_INLINE);
let prefix_bytes: Vec<u8> = bn.bytes[..plen].to_vec();
let new_subtree_root = {
let child_pin = bm.pin(child_guid)?;
let mut child_guard = child_pin.write();
let child_frame = child_guard.frame();
let child_root = child_frame.header().root_slot;
clone_subtree(&child_frame, parent_frame, child_root, false)?
.expect("preserve mode never returns None")
};
let inlined_root = if prefix_bytes.is_empty() {
new_subtree_root
} else {
write_prefix_chain(parent_frame, &prefix_bytes, new_subtree_root)?
};
parent_frame.free_node(parent_bn_slot)?;
bm.mark_for_delete(child_guid, seq);
#[cfg(feature = "tracing")]
tracing::debug!(
target: "holt::engine::merge",
child_guid = ?&child_guid[..4],
parent_bn_slot = parent_bn_slot,
inlined_root = inlined_root,
"merge_blob: folded child into parent + queued delete",
);
Ok(inlined_root)
}
fn read_blob_node(frame: &BlobFrame<'_>, slot: u16) -> Result<BlobNode> {
let body = frame.body_of_slot(slot).ok_or(Error::node_corrupt(
"read_blob_node: body resolution failed",
))?;
Ok(*cast::<BlobNode>(body))
}
fn clone_subtree(
src: &BlobFrame<'_>,
dst: &mut BlobFrame<'_>,
src_slot: u16,
filter_tombstones: bool,
) -> Result<Option<u16>> {
let entry = src
.slot_entry(src_slot)
.ok_or(Error::node_corrupt("clone_subtree: invalid src slot"))?;
let ntype = entry
.node_type()
.ok_or(Error::node_corrupt("clone_subtree: undecodable src ntype"))?;
let body = src.body_of_slot(src_slot).ok_or(Error::node_corrupt(
"clone_subtree: src body resolution failed",
))?;
match ntype {
NodeType::Invalid => Err(Error::node_corrupt(
"clone_subtree: NodeType::Invalid in source",
)),
NodeType::EmptyRoot => {
let out = dst.alloc_node(NodeType::EmptyRoot)?;
Ok(Some(out.slot))
}
NodeType::Leaf => clone_leaf(src, body, dst, filter_tombstones),
NodeType::Prefix => clone_prefix(src, body, dst, filter_tombstones),
NodeType::Node4 => clone_node4(src, body, dst, filter_tombstones),
NodeType::Node16 => clone_node16(src, body, dst, filter_tombstones),
NodeType::Node48 => clone_node48(src, body, dst, filter_tombstones),
NodeType::Node256 => clone_node256(src, body, dst, filter_tombstones),
NodeType::Blob => clone_blob_node(body, dst),
}
}
fn clone_leaf(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
) -> Result<Option<u16>> {
let src_leaf = *cast::<Leaf>(src_body);
if filter_tombstones && src_leaf.tombstone != 0 {
return Ok(None);
}
let hdr = src
.bytes_at(src_leaf.key_offset, 2)
.ok_or(Error::node_corrupt(
"clone_leaf: extent header out of range",
))?;
let key_len = u32::from(u16::from_le_bytes([hdr[0], hdr[1]]));
let ext_total = leaf_extent_size(key_len, u32::from(src_leaf.value_size));
let src_ext = src
.bytes_at(src_leaf.key_offset, ext_total)
.ok_or(Error::node_corrupt("clone_leaf: extent body out of range"))?
.to_vec();
let dst_ext = dst.alloc_extent(ext_total)?;
dst.bytes_at_mut(dst_ext.byte_offset, ext_total)
.ok_or(Error::node_corrupt("clone_leaf: dst extent out of range"))?
.copy_from_slice(&src_ext);
let leaf_out = dst.alloc_node(NodeType::Leaf)?;
let new_leaf = if filter_tombstones {
Leaf::live(dst_ext.byte_offset, src_leaf.value_size, src_leaf.seq)
} else {
let mut copy = src_leaf;
copy.key_offset = dst_ext.byte_offset;
copy
};
write_struct_to_slot(dst, leaf_out.slot, &new_leaf)?;
Ok(Some(leaf_out.slot))
}
fn clone_prefix(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
) -> Result<Option<u16>> {
let p = *cast::<Prefix>(src_body);
let plen = (p.prefix_len as usize).min(PREFIX_MAX_INLINE);
let Some(new_child) = clone_subtree(src, dst, p.child as u16, filter_tombstones)? else {
return Ok(None);
};
let out = dst.alloc_node(NodeType::Prefix)?;
let new_p = Prefix::new(&p.bytes[..plen], u32::from(new_child));
write_struct_to_slot(dst, out.slot, &new_p)?;
Ok(Some(out.slot))
}
fn clone_node4(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
) -> Result<Option<u16>> {
let src_n = *cast::<Node4>(src_body);
let count = (src_n.count as usize).min(4);
if filter_tombstones {
let mut survivors: Vec<(u8, u32)> = Vec::with_capacity(count);
for i in 0..count {
if let Some(new_child) = clone_subtree(src, dst, src_n.children[i] as u16, true)? {
survivors.push((src_n.keys[i], u32::from(new_child)));
}
}
pack_inner_node(dst, &survivors)
} else {
let mut new_children = [0u32; 4];
for (i, slot) in new_children.iter_mut().enumerate().take(count) {
let cloned = clone_subtree(src, dst, src_n.children[i] as u16, false)?
.expect("preserve mode never returns None");
*slot = u32::from(cloned);
}
let out = dst.alloc_node(NodeType::Node4)?;
let mut new_n = Node4::empty();
new_n.count = src_n.count;
new_n.keys = src_n.keys;
new_n.children = new_children;
write_struct_to_slot(dst, out.slot, &new_n)?;
Ok(Some(out.slot))
}
}
fn clone_node16(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
) -> Result<Option<u16>> {
let src_n = *cast::<Node16>(src_body);
let count = (src_n.count as usize).min(16);
if filter_tombstones {
let mut survivors: Vec<(u8, u32)> = Vec::with_capacity(count);
for i in 0..count {
if let Some(new_child) = clone_subtree(src, dst, src_n.children[i] as u16, true)? {
survivors.push((src_n.keys[i], u32::from(new_child)));
}
}
pack_inner_node(dst, &survivors)
} else {
let mut new_children = [0u32; 16];
for (i, slot) in new_children.iter_mut().enumerate().take(count) {
let cloned = clone_subtree(src, dst, src_n.children[i] as u16, false)?
.expect("preserve mode never returns None");
*slot = u32::from(cloned);
}
let out = dst.alloc_node(NodeType::Node16)?;
let mut new_n = Node16::empty();
new_n.count = src_n.count;
new_n.keys = src_n.keys;
new_n.children = new_children;
write_struct_to_slot(dst, out.slot, &new_n)?;
Ok(Some(out.slot))
}
}
fn clone_node48(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
) -> Result<Option<u16>> {
let src_n = *cast::<Node48>(src_body);
if filter_tombstones {
let mut survivors: Vec<(u8, u32)> = Vec::with_capacity(48);
for b in 0..256usize {
let idx = src_n.index[b];
if idx == 0 {
continue;
}
let ci = idx as usize - 1;
if ci >= 48 {
return Err(Error::node_corrupt("clone_node48: index out of range"));
}
if let Some(new_child) = clone_subtree(src, dst, src_n.children[ci] as u16, true)? {
survivors.push((b as u8, u32::from(new_child)));
}
}
pack_inner_node(dst, &survivors)
} else {
let mut new_children = [0u32; 48];
for (i, slot) in new_children.iter_mut().enumerate() {
if src_n.children[i] != 0 {
let cloned = clone_subtree(src, dst, src_n.children[i] as u16, false)?
.expect("preserve mode never returns None");
*slot = u32::from(cloned);
}
}
let out = dst.alloc_node(NodeType::Node48)?;
let mut new_n = Node48::empty();
new_n.count = src_n.count;
new_n.index = src_n.index;
new_n.children = new_children;
write_struct_to_slot(dst, out.slot, &new_n)?;
Ok(Some(out.slot))
}
}
fn clone_node256(
src: &BlobFrame<'_>,
src_body: &[u8],
dst: &mut BlobFrame<'_>,
filter_tombstones: bool,
) -> Result<Option<u16>> {
let src_n = *cast::<Node256>(src_body);
if filter_tombstones {
let mut survivors: Vec<(u8, u32)> = Vec::with_capacity(64);
for (b, &child_slot) in src_n.children.iter().enumerate() {
if child_slot == 0 {
continue;
}
if let Some(new_child) = clone_subtree(src, dst, child_slot as u16, true)? {
survivors.push((b as u8, u32::from(new_child)));
}
}
pack_inner_node(dst, &survivors)
} else {
let mut new_children = [0u32; 256];
for (i, slot) in new_children.iter_mut().enumerate() {
if src_n.children[i] != 0 {
let cloned = clone_subtree(src, dst, src_n.children[i] as u16, false)?
.expect("preserve mode never returns None");
*slot = u32::from(cloned);
}
}
let out = dst.alloc_node(NodeType::Node256)?;
let mut new_n = Node256::empty();
new_n.count = src_n.count;
new_n.children = new_children;
write_struct_to_slot(dst, out.slot, &new_n)?;
Ok(Some(out.slot))
}
}
fn clone_blob_node(src_body: &[u8], dst: &mut BlobFrame<'_>) -> Result<Option<u16>> {
let src_b = *cast::<BlobNode>(src_body);
let plen = (src_b.prefix_len as usize).min(BLOB_MAX_INLINE);
let new_b = BlobNode::new(&src_b.bytes[..plen], src_b.child_blob_guid);
let out = dst.alloc_node(NodeType::Blob)?;
write_struct_to_slot(dst, out.slot, &new_b)?;
Ok(Some(out.slot))
}
fn pack_inner_node(dst: &mut BlobFrame<'_>, survivors: &[(u8, u32)]) -> Result<Option<u16>> {
debug_assert!(
survivors.windows(2).all(|w| w[0].0 < w[1].0),
"pack_inner_node: survivors must be byte-sorted ascending"
);
match survivors.len() {
0 => Ok(None),
1 => {
let (byte, child) = survivors[0];
let slot = write_prefix_chain(dst, &[byte], child as u16)?;
Ok(Some(slot))
}
2..=4 => {
let out = dst.alloc_node(NodeType::Node4)?;
let mut n = Node4::empty();
n.count = survivors.len() as u8;
for (i, &(b, c)) in survivors.iter().enumerate() {
n.keys[i] = b;
n.children[i] = c;
}
write_struct_to_slot(dst, out.slot, &n)?;
Ok(Some(out.slot))
}
5..=16 => {
let out = dst.alloc_node(NodeType::Node16)?;
let mut n = Node16::empty();
n.count = survivors.len() as u8;
for (i, &(b, c)) in survivors.iter().enumerate() {
n.keys[i] = b;
n.children[i] = c;
}
write_struct_to_slot(dst, out.slot, &n)?;
Ok(Some(out.slot))
}
17..=48 => {
let out = dst.alloc_node(NodeType::Node48)?;
let mut n = Node48::empty();
n.count = survivors.len() as u8;
for (ci, &(b, c)) in survivors.iter().enumerate() {
n.children[ci] = c;
n.index[b as usize] = (ci as u8) + 1;
}
write_struct_to_slot(dst, out.slot, &n)?;
Ok(Some(out.slot))
}
_ => {
let out = dst.alloc_node(NodeType::Node256)?;
let mut n = Node256::empty();
n.count = survivors.len() as u8;
for &(b, c) in survivors {
n.children[b as usize] = c;
}
write_struct_to_slot(dst, out.slot, &n)?;
Ok(Some(out.slot))
}
}
}