use crate::api::errors::{Error, Result};
use crate::layout::{
leaf_body_size, size_of_node, BlobGuid, BlobNode, Node16, Node256, Node4, Node48, NodeType,
Prefix, DATA_AREA_START, PAGE_SIZE,
};
use crate::store::{decode_child_off, encode_child_off, BlobFrame, BufferManager};
use super::super::simd;
use super::cast;
use super::migrate::make_blob_from_node_in;
use super::readers::{
child_offset, ntype_of, read_leaf_key_ref, read_node16, read_node256, read_node4, read_node48,
read_prefix,
};
use super::types::{Victim, VictimEdgeKind};
use super::writers::{inner_update_child, set_prefix_child, write_struct_at};
pub(super) use super::migrate::compact_blob;
const SPILLOVER_TARGET_CHILD_FILL_PCT: u32 = 70;
const SPILLOVER_MIN_CHILD_FILL_PCT: u32 = 35;
#[derive(Debug, Clone, Copy)]
struct SubtreeFootprint {
nodes: u32,
bytes: u32,
}
#[derive(Debug, Clone, Copy)]
struct VictimCandidate {
victim: Victim,
footprint: SubtreeFootprint,
boundary: BoundaryQuality,
boundary_depth: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BoundaryQuality {
Arbitrary,
PathComponent,
}
fn spillover_data_capacity() -> u32 {
PAGE_SIZE - DATA_AREA_START
}
fn spillover_target_child_bytes() -> u32 {
spillover_data_capacity() * SPILLOVER_TARGET_CHILD_FILL_PCT / 100
}
fn spillover_min_child_bytes() -> u32 {
spillover_data_capacity() * SPILLOVER_MIN_CHILD_FILL_PCT / 100
}
pub(super) fn spillover_blob(
bm: &BufferManager,
frame: &mut BlobFrame<'_>,
seq: u64,
) -> Result<u32> {
let root_off = decode_child_off(frame.header().root_slot);
let victim = pick_victim_subtree(frame, root_off)?;
let new_guid = fresh_blob_guid();
let outcome = make_blob_from_node_in(bm, frame, victim.victim_off, new_guid)?;
bm.install_new_blob(new_guid, outcome.buf, seq);
abandon_subtree(frame, victim.victim_off)?;
let bn_alloc = frame.alloc_node(NodeType::Blob)?;
let bn_off = frame
.offset_of_slot(bn_alloc.slot)
.ok_or(Error::node_corrupt("spillover: BlobNode offset"))?;
let bn = BlobNode::new(&[], new_guid);
write_struct_at(frame, bn_off, &bn)?;
if victim.parent_off == root_off && victim.via_header_root {
frame.header_mut().root_slot = encode_child_off(bn_off);
} else {
match victim.kind {
VictimEdgeKind::Prefix => {
set_prefix_child(frame, victim.parent_off, bn_off)?;
}
VictimEdgeKind::Inner(parent_ntype) => {
inner_update_child(frame, victim.parent_off, parent_ntype, victim.byte, bn_off)?;
}
}
}
#[cfg(feature = "tracing")]
tracing::debug!(
target: "holt::engine::spillover",
new_child_guid = ?&new_guid[..4],
victim_off = victim.victim_off,
bn_off = bn_off,
"spillover: migrated subtree to fresh child blob",
);
Ok(bn_off)
}
type FootprintMemo = std::collections::HashMap<u32, SubtreeFootprint>;
fn footprint_memo(frame: &BlobFrame<'_>) -> FootprintMemo {
FootprintMemo::with_capacity(frame.header().num_slots as usize + 1)
}
#[cfg(test)]
fn subtree_footprint(frame: &BlobFrame<'_>, root: u32) -> Result<SubtreeFootprint> {
let mut memo = footprint_memo(frame);
subtree_footprint_memo(frame, root, &mut memo)
}
fn subtree_footprint_memo(
frame: &BlobFrame<'_>,
root: u32,
memo: &mut FootprintMemo,
) -> Result<SubtreeFootprint> {
if let Some(cached) = memo.get(&root).copied() {
return Ok(cached);
}
let ntype = ntype_of(frame.as_ref(), root)?;
if ntype == NodeType::Invalid {
return Err(Error::node_corrupt("subtree_footprint: Invalid"));
}
let body = frame.body_at_offset(root).ok_or(Error::node_corrupt(
"subtree_footprint: body resolution failed",
))?;
let mut out = SubtreeFootprint {
nodes: 1,
bytes: size_of_node(ntype),
};
match ntype {
NodeType::Invalid => unreachable!("handled before size_of_node"),
NodeType::Leaf => {
let (key, leaf) = read_leaf_key_ref(frame.as_ref(), root)?;
out.bytes = leaf_body_size(key.len() as u32, u32::from(leaf.value_len));
}
NodeType::EmptyRoot | NodeType::Blob => {}
NodeType::Prefix => {
let p = cast::<Prefix>(body);
out = out.saturating_add(subtree_footprint_memo(
frame,
child_offset(p.child as u16),
memo,
)?);
}
NodeType::Node4 => {
let n = cast::<Node4>(body);
for i in 0..(n.count as usize).min(4) {
out = out.saturating_add(subtree_footprint_memo(
frame,
child_offset(n.children[i]),
memo,
)?);
}
}
NodeType::Node16 => {
let n = cast::<Node16>(body);
for i in 0..(n.count as usize).min(16) {
out = out.saturating_add(subtree_footprint_memo(
frame,
child_offset(n.children[i]),
memo,
)?);
}
}
NodeType::Node48 => {
let n = cast::<Node48>(body);
let mut i = 0usize;
while let Some(next_i) = simd::find_next_nonzero_u16(&n.children, i) {
i = next_i + 1;
out = out.saturating_add(subtree_footprint_memo(
frame,
child_offset(n.children[next_i]),
memo,
)?);
}
}
NodeType::Node256 => {
let n = cast::<Node256>(body);
let mut i = 0usize;
while let Some(next_i) = simd::find_next_nonzero_u16(&n.children, i) {
i = next_i + 1;
out = out.saturating_add(subtree_footprint_memo(
frame,
child_offset(n.children[next_i]),
memo,
)?);
}
}
}
memo.insert(root, out);
Ok(out)
}
impl SubtreeFootprint {
fn saturating_add(mut self, rhs: Self) -> Self {
self.nodes = self.nodes.saturating_add(rhs.nodes);
self.bytes = self.bytes.saturating_add(rhs.bytes);
self
}
}
#[allow(clippy::too_many_lines)] fn pick_victim_subtree(frame: &BlobFrame<'_>, start_off: u32) -> Result<Victim> {
let mut best: Option<VictimCandidate> = None;
let mut memo = footprint_memo(frame);
collect_victim_candidates(frame, start_off, 0, &mut best, &mut memo)?;
best.map(|candidate| candidate.victim)
.ok_or(Error::NotYetImplemented(
"spillover: no non-Blob subtree to migrate",
))
}
#[allow(clippy::too_many_lines)] fn collect_victim_candidates(
frame: &BlobFrame<'_>,
current: u32,
depth: usize,
best: &mut Option<VictimCandidate>,
memo: &mut FootprintMemo,
) -> Result<()> {
let ntype = ntype_of(frame.as_ref(), current)?;
match ntype {
NodeType::Node4 => {
let n = read_node4(frame.as_ref(), current)?;
for i in 0..(n.count as usize).min(4) {
let child_depth = depth + 1;
visit_child_edge(
frame,
Victim {
parent_off: current,
kind: VictimEdgeKind::Inner(NodeType::Node4),
byte: n.keys[i],
victim_off: child_offset(n.children[i]),
via_header_root: false,
},
boundary_quality_for_byte(n.keys[i]),
child_depth,
best,
memo,
)?;
}
}
NodeType::Node16 => {
let n = read_node16(frame.as_ref(), current)?;
for i in 0..(n.count as usize).min(16) {
let child_depth = depth + 1;
visit_child_edge(
frame,
Victim {
parent_off: current,
kind: VictimEdgeKind::Inner(NodeType::Node16),
byte: n.keys[i],
victim_off: child_offset(n.children[i]),
via_header_root: false,
},
boundary_quality_for_byte(n.keys[i]),
child_depth,
best,
memo,
)?;
}
}
NodeType::Node48 => {
let n = read_node48(frame.as_ref(), current)?;
let mut b = 0usize;
while let Some(next_b) = simd::find_next_nonzero_byte(&n.index, b) {
b = next_b + 1;
let idx = n.index[next_b];
let ci = idx as usize - 1;
if ci >= 48 {
return Err(Error::node_corrupt(
"collect_victim_candidates: Node48 index out of range",
));
}
let child_depth = depth + 1;
visit_child_edge(
frame,
Victim {
parent_off: current,
kind: VictimEdgeKind::Inner(NodeType::Node48),
byte: next_b as u8,
victim_off: child_offset(n.children[ci]),
via_header_root: false,
},
boundary_quality_for_byte(next_b as u8),
child_depth,
best,
memo,
)?;
}
}
NodeType::Node256 => {
let n = read_node256(frame.as_ref(), current)?;
let mut b = 0usize;
while let Some(next_b) = simd::find_next_nonzero_u16(&n.children, b) {
b = next_b + 1;
let child = n.children[next_b];
let child_depth = depth + 1;
visit_child_edge(
frame,
Victim {
parent_off: current,
kind: VictimEdgeKind::Inner(NodeType::Node256),
byte: next_b as u8,
victim_off: child_offset(child),
via_header_root: false,
},
boundary_quality_for_byte(next_b as u8),
child_depth,
best,
memo,
)?;
}
}
NodeType::Prefix => {
let p = read_prefix(frame.as_ref(), current)?;
let plen = p.prefix_len as usize;
let prefix = &p.bytes[..plen.min(p.bytes.len())];
let child_depth = depth + plen;
visit_child_edge(
frame,
Victim {
parent_off: current,
kind: VictimEdgeKind::Prefix,
byte: 0,
victim_off: child_offset(p.child as u16),
via_header_root: false,
},
boundary_quality_for_prefix(prefix),
child_depth,
best,
memo,
)?;
}
NodeType::Leaf | NodeType::EmptyRoot | NodeType::Blob => {}
NodeType::Invalid => {
return Err(Error::node_corrupt("collect_victim_candidates: Invalid"));
}
}
Ok(())
}
fn visit_child_edge(
frame: &BlobFrame<'_>,
victim: Victim,
boundary: BoundaryQuality,
boundary_depth: usize,
best: &mut Option<VictimCandidate>,
memo: &mut FootprintMemo,
) -> Result<()> {
let child_ntype = ntype_of(frame.as_ref(), victim.victim_off)?;
match child_ntype {
NodeType::Invalid => {
return Err(Error::node_corrupt("visit_child_edge: Invalid child"));
}
NodeType::Blob => return Ok(()),
_ => {}
}
let footprint = subtree_footprint_memo(frame, victim.victim_off, memo)?;
let candidate = VictimCandidate {
victim,
footprint,
boundary,
boundary_depth,
};
if best
.as_ref()
.is_none_or(|current| candidate_is_better(candidate, *current))
{
*best = Some(candidate);
}
if footprint.bytes > spillover_target_child_bytes() {
collect_victim_candidates(frame, victim.victim_off, boundary_depth, best, memo)?;
}
Ok(())
}
fn boundary_quality_for_byte(byte: u8) -> BoundaryQuality {
if byte == b'/' {
BoundaryQuality::PathComponent
} else {
BoundaryQuality::Arbitrary
}
}
fn boundary_quality_for_prefix(prefix: &[u8]) -> BoundaryQuality {
prefix
.last()
.copied()
.map_or(BoundaryQuality::Arbitrary, boundary_quality_for_byte)
}
fn candidate_is_better(candidate: VictimCandidate, current: VictimCandidate) -> bool {
let c = candidate.footprint;
let b = current.footprint;
let target = spillover_target_child_bytes();
let min = spillover_min_child_bytes();
let c_in_band = c.bytes >= min && c.bytes <= target;
let b_in_band = b.bytes >= min && b.bytes <= target;
if c_in_band != b_in_band {
return c_in_band;
}
if c_in_band {
return candidate_tie_in_band(
candidate,
current,
c.bytes.abs_diff(target),
b.bytes.abs_diff(target),
);
}
let c_below = c.bytes < min;
let b_below = b.bytes < min;
if c_below && b_below {
return candidate_tie(candidate, current, b.bytes, c.bytes);
}
let c_over = c.bytes > target;
let b_over = b.bytes > target;
if c_over && b_over {
return candidate_tie(candidate, current, c.bytes, b.bytes);
}
candidate_tie(
candidate,
current,
c.bytes.abs_diff(target),
b.bytes.abs_diff(target),
)
}
fn candidate_tie_in_band(
candidate: VictimCandidate,
current: VictimCandidate,
candidate_score: u32,
current_score: u32,
) -> bool {
if candidate.boundary != current.boundary {
return candidate.boundary == BoundaryQuality::PathComponent;
}
candidate_tie(candidate, current, candidate_score, current_score)
}
fn candidate_tie(
candidate: VictimCandidate,
current: VictimCandidate,
candidate_score: u32,
current_score: u32,
) -> bool {
if candidate_score != current_score {
return candidate_score < current_score;
}
if candidate.boundary != current.boundary {
return candidate.boundary == BoundaryQuality::PathComponent;
}
if candidate.boundary_depth != current.boundary_depth {
return candidate.boundary_depth < current.boundary_depth;
}
candidate.footprint.nodes > current.footprint.nodes
}
pub(super) fn abandon_subtree(frame: &mut BlobFrame<'_>, root: u32) -> Result<()> {
let ntype = ntype_of(frame.as_ref(), root)?;
let body_copy = frame
.body_at_offset(root)
.ok_or(Error::node_corrupt(
"abandon_subtree: body resolution failed",
))?
.to_vec();
match ntype {
NodeType::Invalid => {
return Err(Error::node_corrupt("abandon_subtree: Invalid in source"));
}
NodeType::Leaf | NodeType::EmptyRoot | NodeType::Blob => {}
NodeType::Prefix => {
let p = cast::<Prefix>(&body_copy);
abandon_subtree(frame, child_offset(p.child as u16))?;
}
NodeType::Node4 => {
let n = cast::<Node4>(&body_copy);
for i in 0..(n.count as usize).min(4) {
abandon_subtree(frame, child_offset(n.children[i]))?;
}
}
NodeType::Node16 => {
let n = cast::<Node16>(&body_copy);
for i in 0..(n.count as usize).min(16) {
abandon_subtree(frame, child_offset(n.children[i]))?;
}
}
NodeType::Node48 => {
let n = cast::<Node48>(&body_copy);
let mut i = 0usize;
while let Some(next_i) = simd::find_next_nonzero_u16(&n.children, i) {
i = next_i + 1;
abandon_subtree(frame, child_offset(n.children[next_i]))?;
}
}
NodeType::Node256 => {
let n = cast::<Node256>(&body_copy);
let mut i = 0usize;
while let Some(next_i) = simd::find_next_nonzero_u16(&n.children, i) {
i = next_i + 1;
abandon_subtree(frame, child_offset(n.children[next_i]))?;
}
}
}
frame.note_abandoned(root);
Ok(())
}
pub(crate) fn fresh_blob_guid() -> BlobGuid {
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(1);
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| d.as_nanos() as u64);
let c = COUNTER.fetch_add(1, Ordering::Relaxed) as u32;
let mut tail = [0u8; 3];
if !fill_os_entropy(&mut tail) {
let m = nanos.wrapping_mul(0x9E37_79B9_7F4A_7C15) ^ u64::from(c);
tail[0] = (m >> 16) as u8;
tail[1] = (m >> 24) as u8;
tail[2] = (m >> 32) as u8;
}
let mut g = [0u8; 16];
g[0..8].copy_from_slice(&nanos.to_be_bytes());
g[8..12].copy_from_slice(&c.to_be_bytes());
g[12] = tail[0];
g[13] = tail[1];
g[14] = tail[2];
g[15] = 0xD4; g
}
fn fill_os_entropy(buf: &mut [u8]) -> bool {
#[cfg(target_os = "linux")]
unsafe {
let r = libc::getrandom(buf.as_mut_ptr().cast(), buf.len(), 0);
r >= 0 && (r as usize) == buf.len()
}
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd"
))]
unsafe {
libc::getentropy(buf.as_mut_ptr().cast(), buf.len()) == 0
}
#[cfg(not(any(
target_os = "linux",
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd"
)))]
{
let _ = buf;
false
}
}
#[cfg(test)]
mod tests {
use super::super::insert::insert;
use super::*;
use crate::layout::PAGE_SIZE;
use crate::store::BlobFrame;
fn candidate(bytes: u32, nodes: u32) -> VictimCandidate {
candidate_with_boundary(bytes, nodes, BoundaryQuality::Arbitrary, 16)
}
fn path_candidate(bytes: u32, nodes: u32) -> VictimCandidate {
candidate_with_boundary(bytes, nodes, BoundaryQuality::PathComponent, 16)
}
fn candidate_with_boundary(
bytes: u32,
nodes: u32,
boundary: BoundaryQuality,
boundary_depth: usize,
) -> VictimCandidate {
VictimCandidate {
victim: Victim {
parent_off: 0,
kind: VictimEdgeKind::Prefix,
byte: 0,
victim_off: 0,
via_header_root: false,
},
footprint: SubtreeFootprint { nodes, bytes },
boundary,
boundary_depth,
}
}
#[test]
fn spillover_scoring_prefers_target_band_over_largest() {
let target = spillover_target_child_bytes();
assert!(candidate_is_better(
candidate(target - 1024, 10),
candidate(target + 80_000, 100),
));
assert!(!candidate_is_better(
candidate(target + 80_000, 100),
candidate(target - 1024, 10),
));
}
#[test]
fn spillover_scoring_avoids_tiny_and_overfull_children() {
let min = spillover_min_child_bytes();
let target = spillover_target_child_bytes();
assert!(candidate_is_better(
candidate(min - 1024, 20),
candidate(min / 2, 100),
));
assert!(candidate_is_better(
candidate(target + 1024, 20),
candidate(target + 90_000, 100),
));
}
#[test]
fn spillover_scoring_uses_node_count_only_as_tie_breaker() {
let target = spillover_target_child_bytes();
assert!(candidate_is_better(
candidate(target - 4096, 20),
candidate(target - 4096, 10),
));
}
#[test]
fn spillover_scoring_prefers_path_boundary_within_target_band() {
let target = spillover_target_child_bytes();
assert!(candidate_is_better(
path_candidate(target - 32_000, 10),
candidate(target - 1024, 100),
));
}
#[test]
fn spillover_scoring_keeps_fill_band_before_path_boundary() {
let min = spillover_min_child_bytes();
let target = spillover_target_child_bytes();
assert!(candidate_is_better(
candidate(target - 1024, 10),
path_candidate(min - 1024, 100),
));
}
fn put(frame: &mut BlobFrame<'_>, key: &[u8], value: &[u8], seq: u64) {
let root = frame.header().root_slot;
insert(frame, root, key, value, seq).unwrap();
}
#[test]
fn victim_search_descends_into_overfull_path_branch() {
let mut buf = vec![0u8; PAGE_SIZE as usize];
BlobFrame::init(&mut buf, [0x31; 16]).unwrap();
let mut frame = BlobFrame::wrap(&mut buf);
let value = vec![0x5A; 1024];
let mut seq = 1u64;
for i in 0..240u32 {
let key = format!("a/x/file-{i:06}").into_bytes();
put(&mut frame, &key, &value, seq);
seq += 1;
}
for i in 0..120u32 {
let key = format!("a/y/file-{i:06}").into_bytes();
put(&mut frame, &key, &value, seq);
seq += 1;
}
put(&mut frame, b"b/tiny", b"v", seq);
let victim =
pick_victim_subtree(&frame, decode_child_off(frame.header().root_slot)).unwrap();
let footprint = subtree_footprint(&frame, victim.victim_off).unwrap();
assert!(
footprint.bytes >= spillover_min_child_bytes(),
"victim too small: {footprint:?}",
);
assert!(
footprint.bytes <= spillover_target_child_bytes(),
"victim should be a nested in-band branch, not the overfull direct branch: {footprint:?}",
);
assert_eq!(victim.byte, b'x');
}
}