use std::sync::Arc;
use crate::persistent_artrie::error::Result;
use crate::persistent_artrie::swizzled_ptr::SwizzledPtr;
use crate::persistent_artrie_core::eviction::DiskLocationRegistry;
use crate::persistent_artrie_core::key_encoding::KeyEncoding;
use crate::persistent_artrie_core::overlay::codec::chain_chunks;
use crate::persistent_artrie_core::overlay::node::OverlayNode;
use crate::value::DictionaryValue;
pub(crate) fn peel_chain_generic<K: KeyEncoding, V: DictionaryValue>(
start: Arc<OverlayNode<K, V>>,
) -> (
Vec<K::Unit>,
Vec<Arc<OverlayNode<K, V>>>,
Arc<OverlayNode<K, V>>,
) {
let mut units: Vec<K::Unit> = Vec::new();
let mut live: Vec<Arc<OverlayNode<K, V>>> = Vec::new();
let mut cur = start;
loop {
if cur.num_children() != 1 || cur.is_final() || cur.has_value() {
return (units, live, cur);
}
let sole = {
let mut it = cur.iter_children();
let (&edge, child) = it.next().expect("num_children() == 1 => exactly one child");
child.as_in_mem().map(|arc| (edge, Arc::clone(arc)))
};
match sole {
Some((edge, child_arc)) => {
live.push(Arc::clone(&cur));
units.push(edge);
cur = child_arc;
}
None => return (units, live, cur),
}
}
}
struct PendingChild<K: KeyEncoding> {
key: K::Unit,
ptr: Option<SwizzledPtr>,
}
struct Frame<K: KeyEncoding, V: DictionaryValue> {
node: Arc<OverlayNode<K, V>>,
parent_key: Option<K::Unit>,
chain_prefix: Vec<K::Unit>,
live_spine: Vec<Arc<OverlayNode<K, V>>>,
base_depth: usize,
pushed_units: usize,
pending_in_mem: Vec<(K::Unit, Arc<OverlayNode<K, V>>)>,
slots: Vec<PendingChild<K>>,
}
fn make_frame<K: KeyEncoding, V: DictionaryValue>(
node: Arc<OverlayNode<K, V>>,
parent_key: Option<K::Unit>,
chain_prefix: Vec<K::Unit>,
live_spine: Vec<Arc<OverlayNode<K, V>>>,
base_depth: usize,
pushed_units: usize,
) -> Frame<K, V> {
let n = node.num_children();
let mut slots: Vec<PendingChild<K>> = Vec::with_capacity(n);
let mut pending_in_mem: Vec<(K::Unit, Arc<OverlayNode<K, V>>)> = Vec::with_capacity(n);
for (&key, child) in node.iter_children() {
if let Some(child_arc) = child.as_in_mem() {
slots.push(PendingChild { key, ptr: None });
pending_in_mem.push((key, Arc::clone(child_arc)));
} else if let Some(on_disk) = child.as_on_disk() {
if !on_disk.is_null() {
slots.push(PendingChild {
key,
ptr: Some(on_disk.clone()),
});
}
}
}
pending_in_mem.reverse();
Frame {
node,
parent_key,
chain_prefix,
live_spine,
base_depth,
pushed_units,
pending_in_mem,
slots,
}
}
pub(crate) trait OverlayCompressedSerialize<K: KeyEncoding, V: DictionaryValue> {
type Projected;
fn project_node(
node: &OverlayNode<K, V>,
child_disk_ptrs: &[(K::Unit, SwizzledPtr)],
) -> Result<Self::Projected>;
fn project_chunk(
synth: &OverlayNode<K, V>,
child_disk_ptrs: &[(K::Unit, SwizzledPtr)],
prefix: &[K::Unit],
) -> Result<Self::Projected>;
fn serialize_projected_node(
&self,
projected: &Self::Projected,
child_disk_ptrs: &[(K::Unit, SwizzledPtr)],
path: &[K::Unit],
registry: Option<&mut DiskLocationRegistry>,
) -> Result<SwizzledPtr>;
fn new_synth_node() -> OverlayNode<K, V>;
fn stamp_durable(live: &OverlayNode<K, V>, raw: u64);
fn serialize_compressed_loop(
&self,
root: &Arc<OverlayNode<K, V>>,
mut registry: Option<&mut DiskLocationRegistry>,
) -> Result<SwizzledPtr> {
let mut path: Vec<K::Unit> = Vec::new();
let mut stack: Vec<Frame<K, V>> = vec![make_frame(
Arc::clone(root),
None,
Vec::new(),
Vec::new(),
0,
0,
)];
let mut completed: Option<(K::Unit, SwizzledPtr)> = None;
loop {
let frame = stack
.last_mut()
.expect("serialize_compressed: non-empty work-stack");
if let Some((key, ptr)) = completed.take() {
let slot = frame
.slots
.iter_mut()
.find(|s| s.key == key && s.ptr.is_none())
.expect("completed child key has a matching unfilled slot");
slot.ptr = Some(ptr);
}
if let Some((edge, child_arc)) = frame.pending_in_mem.pop() {
let (chain_prefix, live_spine, terminus) = peel_chain_generic::<K, V>(child_arc);
let base_depth = path.len();
let mut pushed = 0usize;
for &u in std::iter::once(&edge).chain(chain_prefix.iter()) {
path.push(u);
pushed += 1;
}
stack.push(make_frame(
terminus,
Some(edge),
chain_prefix,
live_spine,
base_depth,
pushed,
));
continue;
}
let frame = stack
.pop()
.expect("serialize_compressed: frame to finalize");
let child_disk_ptrs: Vec<(K::Unit, SwizzledPtr)> = frame
.slots
.into_iter()
.map(|s| {
(
s.key,
s.ptr
.expect("post-order: every in-mem child slot filled before its parent"),
)
})
.collect();
let projected = Self::project_node(frame.node.as_ref(), &child_disk_ptrs)?;
let terminus_ptr = self.serialize_projected_node(
&projected,
&child_disk_ptrs,
&path,
registry.as_deref_mut(),
)?;
if registry.is_some() {
Self::stamp_durable(frame.node.as_ref(), terminus_ptr.to_raw());
}
let top_ptr = if frame.chain_prefix.is_empty() {
terminus_ptr
} else {
let chunks = chain_chunks(&frame.chain_prefix, K::MAX_PREFIX_LEN);
let chain_head = frame.base_depth + 1;
let mut ends: Vec<usize> = Vec::with_capacity(chunks.len());
let mut acc = chain_head;
for ch in &chunks {
ends.push(acc);
acc += ch.prefix.len() + 1;
}
debug_assert_eq!(
acc,
frame.base_depth + 1 + frame.chain_prefix.len(),
"CX #6: Σ chunk widths must equal the chain length (no-truncation witness)"
);
let synth = Self::new_synth_node();
let mut child_ptr = terminus_ptr;
for (c, chunk) in chunks.iter().enumerate().rev() {
let child_slots = [(chunk.edge, child_ptr.clone())];
let chunk_proj = Self::project_chunk(&synth, &child_slots, chunk.prefix)?;
let chunk_path = &path[..ends[c]];
let next_ptr = self.serialize_projected_node(
&chunk_proj,
&child_slots,
chunk_path,
registry.as_deref_mut(),
)?;
if registry.is_some() {
let idx = ends[c] - frame.base_depth - 1;
if let Some(top_live) = frame.live_spine.get(idx) {
Self::stamp_durable(top_live.as_ref(), next_ptr.to_raw());
}
}
child_ptr = next_ptr;
}
child_ptr
};
for _ in 0..frame.pushed_units {
path.pop();
}
match frame.parent_key {
Some(key) => completed = Some((key, top_ptr)),
None => return Ok(top_ptr),
}
}
}
}