use crate::api::errors::{Error, Result};
use crate::layout::{BlobGuid, BlobNode, Node16, Node256, Node4, Node48, NodeType, Prefix};
use crate::store::backend::Backend;
use crate::store::{BlobFrame, BufferManager};
use super::cast;
use super::migrate::make_blob_from_node;
use super::readers::{ntype_of, read_node16, read_node256, read_node4, read_node48, read_prefix};
use super::types::{Victim, VictimEdgeKind};
use super::writers::{inner_update_child, set_prefix_child, write_struct_to_slot};
pub(super) use super::migrate::compact_blob;
pub(super) fn spillover_blob(bm: &BufferManager, frame: &mut BlobFrame<'_>) -> Result<u16> {
let root_slot = frame.header().root_slot;
let victim = pick_victim_subtree(frame, root_slot)?;
let new_guid = fresh_blob_guid();
let outcome = make_blob_from_node(frame, victim.victim_slot, new_guid)?;
bm.write_blob(new_guid, &outcome.buf)?;
bm.flush()?;
free_subtree(frame, victim.victim_slot)?;
let bn_alloc = frame.alloc_node(NodeType::Blob)?;
let bn = BlobNode::new(&[], new_guid, u32::from(outcome.entry_slot));
write_struct_to_slot(frame, bn_alloc.slot, &bn)?;
if victim.parent_slot == root_slot && victim.via_header_root {
frame.header_mut().root_slot = bn_alloc.slot;
} else {
match victim.kind {
VictimEdgeKind::Prefix => {
set_prefix_child(frame, victim.parent_slot, u32::from(bn_alloc.slot))?;
}
VictimEdgeKind::Inner(parent_ntype) => {
inner_update_child(
frame,
victim.parent_slot,
parent_ntype,
victim.byte,
u32::from(bn_alloc.slot),
)?;
}
}
}
Ok(bn_alloc.slot)
}
pub(super) fn count_subtree_nodes(frame: &BlobFrame<'_>, root: u16) -> Result<u32> {
let ntype = ntype_of(frame.as_ref(), root)?;
let body = frame.body_of_slot(root).ok_or(Error::NodeCorrupt {
context: "count_subtree_nodes: body resolution failed",
})?;
let mut count: u32 = 1;
match ntype {
NodeType::Invalid => {
return Err(Error::NodeCorrupt {
context: "count_subtree_nodes: Invalid",
});
}
NodeType::Leaf | NodeType::EmptyRoot | NodeType::Blob => {}
NodeType::Prefix => {
let p = cast::<Prefix>(body);
count = count.saturating_add(count_subtree_nodes(frame, p.child as u16)?);
}
NodeType::Node4 => {
let n = cast::<Node4>(body);
for i in 0..(n.count as usize).min(4) {
count = count.saturating_add(count_subtree_nodes(frame, n.children[i] as u16)?);
}
}
NodeType::Node16 => {
let n = cast::<Node16>(body);
for i in 0..(n.count as usize).min(16) {
count = count.saturating_add(count_subtree_nodes(frame, n.children[i] as u16)?);
}
}
NodeType::Node48 => {
let n = cast::<Node48>(body);
for c in &n.children {
if *c != 0 {
count = count.saturating_add(count_subtree_nodes(frame, *c as u16)?);
}
}
}
NodeType::Node256 => {
let n = cast::<Node256>(body);
for c in &n.children {
if *c != 0 {
count = count.saturating_add(count_subtree_nodes(frame, *c as u16)?);
}
}
}
}
Ok(count)
}
#[allow(clippy::too_many_lines)] fn pick_victim_subtree(frame: &BlobFrame<'_>, start_slot: u16) -> Result<Victim> {
let mut current = start_slot;
loop {
let ntype = ntype_of(frame.as_ref(), current)?;
match ntype {
NodeType::Node4 => {
let n = read_node4(frame.as_ref(), current)?;
return pick_largest_non_blob(
frame,
current,
NodeType::Node4,
(n.count as usize).min(4),
&n.keys[..],
&n.children[..],
false,
);
}
NodeType::Node16 => {
let n = read_node16(frame.as_ref(), current)?;
return pick_largest_non_blob(
frame,
current,
NodeType::Node16,
(n.count as usize).min(16),
&n.keys[..],
&n.children[..],
false,
);
}
NodeType::Node48 => {
let n = read_node48(frame.as_ref(), current)?;
let mut best: Option<Victim> = None;
let mut best_size: u32 = 0;
for b in 0..256usize {
let idx = n.index[b];
if idx == 0 {
continue;
}
let child_slot = n.children[idx as usize - 1] as u16;
if ntype_of(frame.as_ref(), child_slot)? == NodeType::Blob {
continue;
}
let size = count_subtree_nodes(frame, child_slot)?;
if size > best_size {
best_size = size;
best = Some(Victim {
parent_slot: current,
kind: VictimEdgeKind::Inner(NodeType::Node48),
byte: b as u8,
victim_slot: child_slot,
via_header_root: false,
});
}
}
return best.ok_or(Error::NotYetImplemented(
"spillover: no non-Blob children to migrate (Node48)",
));
}
NodeType::Node256 => {
let n = read_node256(frame.as_ref(), current)?;
let mut best: Option<Victim> = None;
let mut best_size: u32 = 0;
for (i, c) in n.children.iter().enumerate() {
if *c == 0 {
continue;
}
let child_slot = *c as u16;
if ntype_of(frame.as_ref(), child_slot)? == NodeType::Blob {
continue;
}
let size = count_subtree_nodes(frame, child_slot)?;
if size > best_size {
best_size = size;
best = Some(Victim {
parent_slot: current,
kind: VictimEdgeKind::Inner(NodeType::Node256),
byte: i as u8,
victim_slot: child_slot,
via_header_root: false,
});
}
}
return best.ok_or(Error::NotYetImplemented(
"spillover: no non-Blob children to migrate (Node256)",
));
}
NodeType::Prefix => {
let p = read_prefix(frame.as_ref(), current)?;
let child_slot = p.child as u16;
let child_ntype = ntype_of(frame.as_ref(), child_slot)?;
match child_ntype {
NodeType::Node4
| NodeType::Node16
| NodeType::Node48
| NodeType::Node256
| NodeType::Prefix => {
current = child_slot;
}
NodeType::Leaf | NodeType::EmptyRoot | NodeType::Blob => {
return Ok(Victim {
parent_slot: current,
kind: VictimEdgeKind::Prefix,
byte: 0,
victim_slot: child_slot,
via_header_root: false,
});
}
NodeType::Invalid => {
return Err(Error::NodeCorrupt {
context: "pick_victim_subtree: Prefix child Invalid",
});
}
}
}
NodeType::Leaf | NodeType::EmptyRoot | NodeType::Blob => {
return Err(Error::NotYetImplemented(
"spillover: tree too degenerate to migrate (root is Leaf/Empty/Blob)",
));
}
NodeType::Invalid => {
return Err(Error::NodeCorrupt {
context: "pick_victim_subtree: Invalid",
});
}
}
}
}
fn pick_largest_non_blob(
frame: &BlobFrame<'_>,
parent_slot: u16,
parent_ntype: NodeType,
count: usize,
keys: &[u8],
children: &[u32],
via_header_root: bool,
) -> Result<Victim> {
let mut best: Option<Victim> = None;
let mut best_size: u32 = 0;
for i in 0..count {
let child_slot = children[i] as u16;
if ntype_of(frame.as_ref(), child_slot)? == NodeType::Blob {
continue;
}
let size = count_subtree_nodes(frame, child_slot)?;
if size > best_size {
best_size = size;
best = Some(Victim {
parent_slot,
kind: VictimEdgeKind::Inner(parent_ntype),
byte: keys[i],
victim_slot: child_slot,
via_header_root,
});
}
}
best.ok_or(Error::NotYetImplemented(
"spillover: no non-Blob children to migrate",
))
}
pub(super) fn free_subtree(frame: &mut BlobFrame<'_>, root: u16) -> Result<()> {
let ntype = ntype_of(frame.as_ref(), root)?;
let body_copy = frame
.body_of_slot(root)
.ok_or(Error::NodeCorrupt {
context: "free_subtree: body resolution failed",
})?
.to_vec();
match ntype {
NodeType::Invalid => {
return Err(Error::NodeCorrupt {
context: "free_subtree: Invalid in source",
});
}
NodeType::Leaf | NodeType::EmptyRoot | NodeType::Blob => {}
NodeType::Prefix => {
let p = cast::<Prefix>(&body_copy);
free_subtree(frame, p.child as u16)?;
}
NodeType::Node4 => {
let n = cast::<Node4>(&body_copy);
for i in 0..(n.count as usize).min(4) {
free_subtree(frame, n.children[i] as u16)?;
}
}
NodeType::Node16 => {
let n = cast::<Node16>(&body_copy);
for i in 0..(n.count as usize).min(16) {
free_subtree(frame, n.children[i] as u16)?;
}
}
NodeType::Node48 => {
let n = cast::<Node48>(&body_copy);
for c in &n.children {
if *c != 0 {
free_subtree(frame, *c as u16)?;
}
}
}
NodeType::Node256 => {
let n = cast::<Node256>(&body_copy);
for c in &n.children {
if *c != 0 {
free_subtree(frame, *c as u16)?;
}
}
}
}
frame.free_node(root)?;
Ok(())
}
pub(super) fn fresh_blob_guid() -> BlobGuid {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(1);
let c = COUNTER.fetch_add(1, Ordering::SeqCst);
let pid = std::process::id() as u64;
let mut g = [0u8; 16];
g[0..8].copy_from_slice(&c.to_le_bytes());
g[8..12].copy_from_slice(&(pid as u32).to_le_bytes());
g[12] = 0xA1;
g[13] = 0xB2;
g[14] = 0xC3;
g[15] = 0xD4;
g
}