use crate::api::errors::{Error, Result};
use crate::layout::{BlobGuid, BlobNode, Node16, Node256, Node4, Node48, NodeType, Prefix};
use crate::store::{BlobFrame, BufferManager};
use super::cast;
use super::migrate::make_blob_from_node;
use super::readers::{ntype_of, read_node16, read_node256, read_node4, read_node48, read_prefix};
use super::types::{Victim, VictimEdgeKind};
use super::writers::{inner_update_child, set_prefix_child, write_struct_to_slot};
pub(super) use super::migrate::compact_blob;
pub(super) fn spillover_blob(
bm: &BufferManager,
frame: &mut BlobFrame<'_>,
seq: u64,
) -> Result<u16> {
let root_slot = frame.header().root_slot;
let victim = pick_victim_subtree(frame, root_slot)?;
let new_guid = fresh_blob_guid();
let outcome = make_blob_from_node(frame, victim.victim_slot, new_guid)?;
bm.install_new_blob(new_guid, outcome.buf, seq);
free_subtree(frame, victim.victim_slot)?;
let bn_alloc = frame.alloc_node(NodeType::Blob)?;
let bn = BlobNode::new(&[], new_guid, u32::from(outcome.entry_slot));
write_struct_to_slot(frame, bn_alloc.slot, &bn)?;
if victim.parent_slot == root_slot && victim.via_header_root {
frame.header_mut().root_slot = bn_alloc.slot;
} else {
match victim.kind {
VictimEdgeKind::Prefix => {
set_prefix_child(frame, victim.parent_slot, u32::from(bn_alloc.slot))?;
}
VictimEdgeKind::Inner(parent_ntype) => {
inner_update_child(
frame,
victim.parent_slot,
parent_ntype,
victim.byte,
u32::from(bn_alloc.slot),
)?;
}
}
}
#[cfg(feature = "tracing")]
tracing::debug!(
target: "holt::engine::spillover",
new_child_guid = ?&new_guid[..4],
victim_slot = victim.victim_slot,
bn_slot = bn_alloc.slot,
"spillover: migrated subtree to fresh child blob",
);
Ok(bn_alloc.slot)
}
pub(super) fn count_subtree_nodes(frame: &BlobFrame<'_>, root: u16) -> Result<u32> {
let ntype = ntype_of(frame.as_ref(), root)?;
let body = frame.body_of_slot(root).ok_or(Error::node_corrupt(
"count_subtree_nodes: body resolution failed",
))?;
let mut count: u32 = 1;
match ntype {
NodeType::Invalid => {
return Err(Error::node_corrupt("count_subtree_nodes: Invalid"));
}
NodeType::Leaf | NodeType::EmptyRoot | NodeType::Blob => {}
NodeType::Prefix => {
let p = cast::<Prefix>(body);
count = count.saturating_add(count_subtree_nodes(frame, p.child as u16)?);
}
NodeType::Node4 => {
let n = cast::<Node4>(body);
for i in 0..(n.count as usize).min(4) {
count = count.saturating_add(count_subtree_nodes(frame, n.children[i] as u16)?);
}
}
NodeType::Node16 => {
let n = cast::<Node16>(body);
for i in 0..(n.count as usize).min(16) {
count = count.saturating_add(count_subtree_nodes(frame, n.children[i] as u16)?);
}
}
NodeType::Node48 => {
let n = cast::<Node48>(body);
for c in &n.children {
if *c != 0 {
count = count.saturating_add(count_subtree_nodes(frame, *c as u16)?);
}
}
}
NodeType::Node256 => {
let n = cast::<Node256>(body);
for c in &n.children {
if *c != 0 {
count = count.saturating_add(count_subtree_nodes(frame, *c as u16)?);
}
}
}
}
Ok(count)
}
#[allow(clippy::too_many_lines)] fn pick_victim_subtree(frame: &BlobFrame<'_>, start_slot: u16) -> Result<Victim> {
let mut current = start_slot;
loop {
let ntype = ntype_of(frame.as_ref(), current)?;
match ntype {
NodeType::Node4 => {
let n = read_node4(frame.as_ref(), current)?;
return pick_largest_non_blob(
frame,
current,
NodeType::Node4,
(n.count as usize).min(4),
&n.keys[..],
&n.children[..],
false,
);
}
NodeType::Node16 => {
let n = read_node16(frame.as_ref(), current)?;
return pick_largest_non_blob(
frame,
current,
NodeType::Node16,
(n.count as usize).min(16),
&n.keys[..],
&n.children[..],
false,
);
}
NodeType::Node48 => {
let n = read_node48(frame.as_ref(), current)?;
let mut best: Option<Victim> = None;
let mut best_size: u32 = 0;
for b in 0..256usize {
let idx = n.index[b];
if idx == 0 {
continue;
}
let child_slot = n.children[idx as usize - 1] as u16;
if ntype_of(frame.as_ref(), child_slot)? == NodeType::Blob {
continue;
}
let size = count_subtree_nodes(frame, child_slot)?;
if size > best_size {
best_size = size;
best = Some(Victim {
parent_slot: current,
kind: VictimEdgeKind::Inner(NodeType::Node48),
byte: b as u8,
victim_slot: child_slot,
via_header_root: false,
});
}
}
return best.ok_or(Error::NotYetImplemented(
"spillover: no non-Blob children to migrate (Node48)",
));
}
NodeType::Node256 => {
let n = read_node256(frame.as_ref(), current)?;
let mut best: Option<Victim> = None;
let mut best_size: u32 = 0;
for (i, c) in n.children.iter().enumerate() {
if *c == 0 {
continue;
}
let child_slot = *c as u16;
if ntype_of(frame.as_ref(), child_slot)? == NodeType::Blob {
continue;
}
let size = count_subtree_nodes(frame, child_slot)?;
if size > best_size {
best_size = size;
best = Some(Victim {
parent_slot: current,
kind: VictimEdgeKind::Inner(NodeType::Node256),
byte: i as u8,
victim_slot: child_slot,
via_header_root: false,
});
}
}
return best.ok_or(Error::NotYetImplemented(
"spillover: no non-Blob children to migrate (Node256)",
));
}
NodeType::Prefix => {
let p = read_prefix(frame.as_ref(), current)?;
let child_slot = p.child as u16;
let child_ntype = ntype_of(frame.as_ref(), child_slot)?;
match child_ntype {
NodeType::Node4
| NodeType::Node16
| NodeType::Node48
| NodeType::Node256
| NodeType::Prefix => {
current = child_slot;
}
NodeType::Leaf | NodeType::EmptyRoot | NodeType::Blob => {
return Ok(Victim {
parent_slot: current,
kind: VictimEdgeKind::Prefix,
byte: 0,
victim_slot: child_slot,
via_header_root: false,
});
}
NodeType::Invalid => {
return Err(Error::node_corrupt(
"pick_victim_subtree: Prefix child Invalid",
));
}
}
}
NodeType::Leaf | NodeType::EmptyRoot | NodeType::Blob => {
return Err(Error::NotYetImplemented(
"spillover: tree too degenerate to migrate (root is Leaf/Empty/Blob)",
));
}
NodeType::Invalid => {
return Err(Error::node_corrupt("pick_victim_subtree: Invalid"));
}
}
}
}
fn pick_largest_non_blob(
frame: &BlobFrame<'_>,
parent_slot: u16,
parent_ntype: NodeType,
count: usize,
keys: &[u8],
children: &[u32],
via_header_root: bool,
) -> Result<Victim> {
let mut best: Option<Victim> = None;
let mut best_size: u32 = 0;
for i in 0..count {
let child_slot = children[i] as u16;
if ntype_of(frame.as_ref(), child_slot)? == NodeType::Blob {
continue;
}
let size = count_subtree_nodes(frame, child_slot)?;
if size > best_size {
best_size = size;
best = Some(Victim {
parent_slot,
kind: VictimEdgeKind::Inner(parent_ntype),
byte: keys[i],
victim_slot: child_slot,
via_header_root,
});
}
}
best.ok_or(Error::NotYetImplemented(
"spillover: no non-Blob children to migrate",
))
}
pub(super) fn free_subtree(frame: &mut BlobFrame<'_>, root: u16) -> Result<()> {
let ntype = ntype_of(frame.as_ref(), root)?;
let body_copy = frame
.body_of_slot(root)
.ok_or(Error::node_corrupt("free_subtree: body resolution failed"))?
.to_vec();
match ntype {
NodeType::Invalid => {
return Err(Error::node_corrupt("free_subtree: Invalid in source"));
}
NodeType::Leaf | NodeType::EmptyRoot | NodeType::Blob => {}
NodeType::Prefix => {
let p = cast::<Prefix>(&body_copy);
free_subtree(frame, p.child as u16)?;
}
NodeType::Node4 => {
let n = cast::<Node4>(&body_copy);
for i in 0..(n.count as usize).min(4) {
free_subtree(frame, n.children[i] as u16)?;
}
}
NodeType::Node16 => {
let n = cast::<Node16>(&body_copy);
for i in 0..(n.count as usize).min(16) {
free_subtree(frame, n.children[i] as u16)?;
}
}
NodeType::Node48 => {
let n = cast::<Node48>(&body_copy);
for c in &n.children {
if *c != 0 {
free_subtree(frame, *c as u16)?;
}
}
}
NodeType::Node256 => {
let n = cast::<Node256>(&body_copy);
for c in &n.children {
if *c != 0 {
free_subtree(frame, *c as u16)?;
}
}
}
}
frame.free_node(root)?;
Ok(())
}
pub(super) fn fresh_blob_guid() -> BlobGuid {
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(1);
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let c = COUNTER.fetch_add(1, Ordering::Relaxed) as u32;
let mut tail = [0u8; 3];
if !fill_os_entropy(&mut tail) {
let m = nanos.wrapping_mul(0x9E37_79B9_7F4A_7C15) ^ u64::from(c);
tail[0] = (m >> 16) as u8;
tail[1] = (m >> 24) as u8;
tail[2] = (m >> 32) as u8;
}
let mut g = [0u8; 16];
g[0..8].copy_from_slice(&nanos.to_be_bytes());
g[8..12].copy_from_slice(&c.to_be_bytes());
g[12] = tail[0];
g[13] = tail[1];
g[14] = tail[2];
g[15] = 0xD4; g
}
fn fill_os_entropy(buf: &mut [u8]) -> bool {
#[cfg(target_os = "linux")]
unsafe {
let r = libc::getrandom(buf.as_mut_ptr().cast(), buf.len(), 0);
return r >= 0 && (r as usize) == buf.len();
}
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd"
))]
unsafe {
libc::getentropy(buf.as_mut_ptr().cast(), buf.len()) == 0
}
#[cfg(not(any(
target_os = "linux",
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd"
)))]
{
let _ = buf;
false
}
}