use std::sync::Arc;
use crate::persistent_artrie_core::key_encoding::ByteKey;
use crate::persistent_artrie_core::overlay::evict::OverlayEvictable;
use crate::persistent_artrie_core::overlay::{AtomicNodePtr, Child, OverlayFaulter, OverlayNode};
use crate::value::DictionaryValue;
use super::arena_manager::ArenaSlot;
use super::block_storage::BlockStorage;
use super::dict_impl::PersistentARTrie;
use super::error::{PersistentARTrieError, Result};
use super::serialization;
use super::serialization::v2::DeserializationContext;
use super::swizzled_ptr::SwizzledPtr;
impl<V: DictionaryValue, S: BlockStorage> PersistentARTrie<V, S> {
pub(crate) fn load_overlay_node_from_disk(
&self,
disk_ptr: &SwizzledPtr,
) -> Result<Arc<OverlayNode<ByteKey, V>>> {
let arena_manager = self.arena_manager.as_ref().ok_or_else(|| {
PersistentARTrieError::internal("No arena manager for overlay fault-in load")
})?;
let disk_loc = disk_ptr
.disk_location()
.ok_or_else(|| PersistentARTrieError::internal("Node pointer is swizzled or null"))?;
let arena_id = disk_loc
.block_id
.checked_sub(1)
.ok_or_else(|| PersistentARTrieError::internal("Invalid block_id 0 for arena node"))?;
let slot = ArenaSlot::new(arena_id, disk_loc.offset);
let am = arena_manager.read();
let node_data = am.read(slot)?;
let ctx = DeserializationContext::new(slot);
let node = serialization::v2::deserialize_node_v2(node_data, &ctx).map_err(|e| {
PersistentARTrieError::corrupted(format!(
"Failed to deserialize overlay ART node: {:?}",
e
))
})?;
let is_final = node.header().is_final();
let value_bytes = serialization::v2::read_node_value(node_data);
let child_ptrs: Vec<(u8, SwizzledPtr)> = node
.iter_children()
.filter(|(_, ptr)| !ptr.is_null())
.map(|(key, ptr)| (key, ptr.clone()))
.collect();
let prefix_len = node.header().prefix_len as usize;
let prefix_bytes: Vec<u8> = if prefix_len > 0 {
node.prefix().bytes[..prefix_len].to_vec()
} else {
Vec::new()
};
drop(am);
let value: Option<V> = match value_bytes {
Some(vb) => Some(
crate::serialization::bincode_compat::deserialize(&vb).map_err(|e| {
PersistentARTrieError::corrupted(format!("deserialize overlay value: {e}"))
})?,
),
None => None,
};
let mut real = OverlayNode::<ByteKey, V>::new();
if is_final {
real = real.as_final();
}
if let Some(v) = value {
real = real.with_value(v);
}
for (edge, ptr) in child_ptrs {
real = real.with_child(edge, Child::OnDisk(ptr));
}
let mut cur = real;
for i in (0..prefix_len).rev() {
cur = OverlayNode::<ByteKey, V>::new()
.with_child(prefix_bytes[i], Child::InMem(Arc::new(cur)));
debug_assert!(
cur.prefix_len() == 0 && !cur.is_final() && cur.num_children() == 1,
"CX #43 (4A): an expanded prefix intermediate must be prefix_len=0, non-final, single-child"
);
}
if prefix_len > 0 {
cur.set_durable_stamp(disk_ptr.to_raw());
}
Ok(Arc::new(cur))
}
}
impl<V: DictionaryValue, S: BlockStorage> OverlayFaulter<ByteKey, V> for PersistentARTrie<V, S> {
#[inline]
fn fault_overlay_slot(&self, slot: &SwizzledPtr) -> Option<Arc<OverlayNode<ByteKey, V>>> {
self.load_overlay_node_from_disk(slot).ok()
}
}
impl<V: DictionaryValue, S: BlockStorage> OverlayEvictable<ByteKey, V, S>
for PersistentARTrie<V, S>
{
#[inline]
fn overlay_root_slot(&self) -> Option<&AtomicNodePtr<ByteKey, V>> {
self.lockfree_root.as_ref()
}
#[inline]
fn overlay_epoch_manager(&self) -> &crate::persistent_artrie_core::concurrency::EpochManager {
&self.epoch_manager
}
#[inline]
fn overlay_eviction_coordinator(
&self,
) -> Option<Arc<crate::persistent_artrie::eviction::EvictionCoordinator>> {
self.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.as_ref()
.map(Arc::clone)
}
}
pub(crate) fn evict_overlay_nodes<V: DictionaryValue, S: BlockStorage>(
trie: &PersistentARTrie<V, S>,
mut nodes: Vec<(u64, Vec<u8>, SwizzledPtr)>,
max_rebase_retries: usize,
) -> (usize, usize) {
use crate::persistent_artrie::eviction::lru_tracker::LruRegistry;
use crate::persistent_artrie_core::overlay::evict::{OverlayEvictOutcome, OverlayEvictable};
nodes.sort_by(|a, b| b.1.len().cmp(&a.1.len()));
let mut evicted = 0usize;
let mut bytes_freed = 0usize;
for (_path_hash, path, disk_ptr) in nodes {
let mut attempt = 0;
loop {
match trie.evict_overlay_node_at_path(&path, disk_ptr.clone()) {
OverlayEvictOutcome::Evicted => {
evicted += 1;
bytes_freed += 256;
if let Some(coordinator) = trie.overlay_eviction_coordinator() {
coordinator
.lru_registry()
.remove_hash(LruRegistry::path_hash(&path));
}
break;
}
OverlayEvictOutcome::RootCasLost => {
attempt += 1;
if attempt > max_rebase_retries {
break; }
}
OverlayEvictOutcome::NotEvictable => break, }
}
}
(evicted, bytes_freed)
}
#[cfg(any(test, feature = "bench-internals"))]
impl<V: DictionaryValue, S: BlockStorage> PersistentARTrie<V, S> {
pub(crate) fn bench_enable_eviction(
&self,
config: crate::persistent_artrie::eviction::EvictionConfig,
) -> Result<()> {
config
.validate()
.map_err(|e| PersistentARTrieError::internal(&e))?;
if self
.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.is_some()
{
return Err(PersistentARTrieError::internal("Eviction already enabled"));
}
let epoch_manager = Arc::clone(&self.epoch_manager);
let coordinator = crate::persistent_artrie::eviction::EvictionCoordinator::new(
config.clone(),
epoch_manager,
);
coordinator
.start(|_nodes_to_evict| (0usize, 0usize))
.map_err(|e| PersistentARTrieError::internal(&e))?;
coordinator
.start_memory_monitor()
.map_err(|e| PersistentARTrieError::internal(&e))?;
*self
.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned") = Some(coordinator);
Ok(())
}
pub(crate) fn bench_immutable_checkpoint_with_eviction(&self) -> Result<()> {
let snapshot = self.capture_overlay_snapshot()?;
self.publish_overlay_snapshot_retaining_with_eviction(snapshot)
}
pub(crate) fn evictable_node_count(&self) -> Option<usize> {
self.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.as_ref()
.map(|c| c.disk_registry_len())
}
}