pub mod types;
pub mod nodes;
pub mod serialization_char;
pub mod arena;
pub mod arena_manager;
pub mod compact_encoding;
pub mod traversal_context;
pub mod dirty_tracker;
pub mod dedup;
pub mod relative_encoding;
pub mod recovery;
pub mod per_node_log_char;
pub mod prefix_term;
pub mod recovery_stats;
pub mod file_header;
pub mod transactions;
pub mod mvcc;
pub mod dict_impl_char;
#[cfg(feature = "io-uring-backend")]
pub mod io_uring_ctor;
pub mod mmap_ctor;
pub mod disk_io;
pub mod lockfree_cas;
pub mod query_api;
pub mod overlay_read;
pub mod prefix_api;
pub(crate) mod overlay_fault;
pub mod merge_api;
pub mod document_tx;
pub mod batch_insert;
#[cfg(feature = "parallel-merge")]
pub mod parallel_merge;
pub mod observability;
pub mod epoch_checkpointing;
pub mod atomic_ops;
pub mod persist;
pub mod prefetch_api;
pub mod wal_helpers;
pub(crate) mod committed_watermark;
pub(crate) mod overlay_write_mode;
pub(crate) mod lockfree_value_route;
pub mod mutation_api;
pub(crate) mod f5_loader;
#[cfg(test)]
mod eviction_registry_tests;
#[cfg(test)]
mod overlay_eviction_driver_correspondence;
#[cfg(test)]
mod overlay_dictionary_node_faulting_tests;
pub use types::{
CharTrieFileHeader, CharTrieNodeInner, EnhancedRecoveryMode, EnhancedRecoveryStats,
CHAR_FILE_HEADER_SIZE, CHAR_HEADER_VERSION_V1, CHAR_HEADER_VERSION_V2, CHAR_TRIE_MAGIC,
DEFAULT_CHAR_BUFFER_POOL_SIZE,
};
pub use types::{PrefixTermWithArena, PrefixTermWithValueAndArena};
pub use dict_impl_char::{
CharDocumentTransaction,
DurabilityPolicy,
TransactionState,
};
pub use nodes::{
AddChildError, CharArtNode, CharBucket, CharCompressedPrefix, CharNode, CharNode16, CharNode4,
CharNode48, CharNodeHeader, CHAR_MAX_PREFIX_LEN,
};
pub use serialization_char::{
char_from_bytes, char_serialized_size, char_to_bytes, deserialize_char_node,
serialize_char_node, SerializedCharNodeHeader, CHAR_FORMAT_VERSION, CHAR_NODE_MAGIC,
CHAR_SERIALIZED_HEADER_SIZE,
};
pub use serialization_char::{
char_compact_serialized_size, char_from_bytes_compact, char_to_bytes_compact,
};
pub use compact_encoding::{
determine_key_width, determine_ptr_width, CompactHeader, DecodedCompactNode,
COMPACT_NODE_TYPE_BUCKET, COMPACT_NODE_TYPE_N16, COMPACT_NODE_TYPE_N4, COMPACT_NODE_TYPE_N48,
};
pub use arena::{
ArenaHeader, CharNodeArena, CharNodeArenaV2, SlotEntry, VarintSlotEntry, ARENA_MAGIC,
ARENA_MAGIC_V2, ARENA_VERSION, ARENA_VERSION_V2, FLAG_VARINT_DIRECTORY, HEADER_SIZE,
MIN_FREE_SPACE, SLOT_SIZE,
};
pub use arena_manager::{
ArenaManager, ArenaSlot, ArenaStats, FlushConfig, FlushStats, ReservedSlots,
};
pub use per_node_log_char::{
CharInlineLog,
CharInlineLogIter,
CharLogIterExt,
CharLogWriter,
CharNodeLogEntry,
DirtyNodeTracker,
NodeId,
PageId,
PerNodeLogConfig,
PerNodeLogStats,
PerNodeLogStatsAtomic,
};
pub use traversal_context::{LightweightTraversalContext, TraversalContext, TraversalStats};
pub use dirty_tracker::{BatchDirtyTracker, DirtyTracker, DirtyTrackerStats};
pub use dedup::{
BatchDeduplicator, DeduplicatingArenaManager, DeduplicatorStats, NodeDeduplicator,
};
pub use relative_encoding::{
decode_child_pointer, decode_children, decode_sequential_siblings, encode_child_pointer,
encode_children, encode_sequential_siblings, encoded_size, is_same_arena,
try_decode_child_pointer, try_decode_children, try_decode_full, try_decode_relative,
try_decode_sequential_siblings, try_encode_child_pointer, try_encode_children,
try_encode_sequential_siblings, try_encoded_size, RelativeEncodingError,
RelativeEncodingResult, CROSS_ARENA_SIZE, FLAG_CROSS_ARENA, FLAG_RELATIVE_OFFSETS,
FLAG_SEQUENTIAL_SIBLINGS,
};
pub use recovery::{
detect_corruption,
find_wal_archive_segments,
rebuild_from_wal_segments,
CorruptionInfo,
CorruptionType,
IncrementalRecovery,
RecoveredOperation,
RecoveredState,
RecoveryError,
RecoveryManager,
RecoveryMode,
RecoveryPolicy,
RecoveryReport,
RecoveryStats,
};
pub use crate::persistent_artrie::eviction::{
AccessTracker, DiskLocationRegistry, EvictionConfig, EvictionCoordinator, EvictionStats,
EvictionUrgency, LruRegistry,
};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc;
use parking_lot::RwLock;
use crate::persistent_artrie::wal::AsyncWalWriter;
use crate::persistent_artrie::wal_managed::WalManaged;
use crate::persistent_artrie_core::key_encoding::CharKey;
use crate::value::DictionaryValue;
use crate::zipper::{DictZipper, ValuedDictZipper};
use crate::{
Dictionary, DictionaryNode, MappedDictionary, MappedDictionaryNode, MutableMappedDictionary,
};
pub type SharedCharARTrie<V, S = crate::persistent_artrie::disk_manager::MmapDiskManager> =
Arc<PersistentARTrieChar<V, S>>;
#[deprecated(since = "0.9.0", note = "Use SharedCharARTrie instead")]
pub type SharedCharTrie<V> = SharedCharARTrie<V>;
#[doc(inline)]
pub use crate::persistent_artrie_core::shared_access::SharedTrieAccess;
impl<V: DictionaryValue, S: crate::persistent_artrie::block_storage::BlockStorage>
crate::persistent_artrie_core::shared_access::SharedTrieAccess
for Arc<PersistentARTrieChar<V, S>>
{
type Target = PersistentARTrieChar<V, S>;
#[inline]
fn read(
&self,
) -> crate::persistent_artrie_core::shared_access::TrieAccessGuard<'_, PersistentARTrieChar<V, S>>
{
crate::persistent_artrie_core::shared_access::TrieAccessGuard::from_ref(self.as_ref())
}
#[inline]
fn write(
&self,
) -> crate::persistent_artrie_core::shared_access::TrieAccessGuard<'_, PersistentARTrieChar<V, S>>
{
crate::persistent_artrie_core::shared_access::TrieAccessGuard::from_ref(self.as_ref())
}
}
pub struct PersistentARTrieChar<V: DictionaryValue = (), S: crate::persistent_artrie::block_storage::BlockStorage = crate::persistent_artrie::disk_manager::MmapDiskManager> {
pub(crate) len: AtomicUsize,
pub(crate) dirty: AtomicBool,
pub(crate) buffer_manager: Option<Arc<RwLock<crate::persistent_artrie::buffer_manager::BufferManager<S>>>>,
pub(crate) wal_writer: Option<Arc<crate::persistent_artrie::wal::AsyncWalWriter>>,
pub(crate) wal_config: crate::persistent_artrie::wal::WalConfig,
pub(crate) next_lsn: std::sync::atomic::AtomicU64,
pub(crate) committed_watermark: committed_watermark::CommittedWatermark,
pub(crate) checkpoint_lock: std::sync::Arc<parking_lot::Mutex<()>>,
pub(crate) merge_lock: std::sync::Arc<parking_lot::Mutex<()>>,
pub(crate) file_path: Option<std::path::PathBuf>,
pub(crate) arena_manager: Option<Arc<RwLock<ArenaManager<S>>>>,
pub(crate) version: crate::persistent_artrie::concurrency::OptimisticVersion,
pub(crate) epoch_manager: Arc<crate::persistent_artrie::concurrency::EpochManager>,
pub(crate) structural_generation: std::sync::atomic::AtomicU64,
pub(crate) retry_stats: crate::persistent_artrie::concurrency::RetryStats,
#[cfg(feature = "group-commit")]
pub(crate) group_commit:
std::sync::Mutex<Option<Arc<crate::persistent_artrie::group_commit::GroupCommitCoordinator>>>,
pub(crate) memory_monitor:
std::sync::Mutex<Option<Arc<crate::persistent_artrie::memory_monitor::MemoryPressureMonitor>>>,
pub(crate) cache_stats: crate::persistent_artrie::adaptive_pool::CacheStats,
pub(crate) checkpoint_manager:
std::sync::Mutex<Option<Arc<crate::persistent_artrie::epoch::CheckpointManager>>>,
pub(crate) durability_policy:
crate::persistent_artrie_core::shared_access::AtomicEnumCell<DurabilityPolicy>,
pub(crate) eviction_coordinator:
std::sync::Mutex<Option<Arc<crate::persistent_artrie::eviction::EvictionCoordinator>>>,
pub(crate) prefetcher: crate::persistent_artrie::prefetch::Prefetcher,
pub(crate) _phantom: std::marker::PhantomData<V>,
pub(crate) lockfree_root: Option<nodes::AtomicNodePtr<V>>,
pub(crate) lockfree_cache: Option<dashmap::DashMap<String, bool>>,
pub(crate) cas_retries: std::sync::atomic::AtomicU64,
pub(crate) commit_seq: std::sync::atomic::AtomicU64,
pub(crate) commit_seq_by_data_lsn: std::sync::Mutex<std::collections::BTreeMap<u64, u64>>,
}
impl<V: DictionaryValue, S: crate::persistent_artrie::block_storage::BlockStorage> std::fmt::Debug
for PersistentARTrieChar<V, S>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PersistentARTrieChar")
.field("len", &self.len)
.field("dirty", &self.dirty)
.finish_non_exhaustive()
}
}
impl<V: DictionaryValue> Default for PersistentARTrieChar<V> {
#[allow(deprecated)]
fn default() -> Self {
Self::new()
}
}
impl<V: DictionaryValue, S: crate::persistent_artrie::block_storage::BlockStorage> Drop
for PersistentARTrieChar<V, S>
{
fn drop(&mut self) {
self.close();
}
}
impl<V: DictionaryValue, S: crate::persistent_artrie::block_storage::BlockStorage> WalManaged
for PersistentARTrieChar<V, S>
{
fn wal_writer(&self) -> Option<&Arc<AsyncWalWriter>> {
self.wal_writer.as_ref()
}
}
impl<V: DictionaryValue, S: crate::persistent_artrie::block_storage::BlockStorage>
PersistentARTrieChar<V, S>
{
pub fn close(&self) {
let coordinator = self
.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.take();
if let Some(coordinator) = coordinator {
coordinator.shutdown();
}
let monitor = self
.memory_monitor
.lock()
.expect("memory_monitor mutex poisoned")
.take();
if let Some(monitor) = monitor {
monitor.shutdown();
}
if let Some(wal) = &self.wal_writer {
wal.stop_sync();
}
}
#[inline]
pub fn is_dirty(&self) -> bool {
self.dirty.load(AtomicOrdering::Acquire)
}
#[inline]
pub fn len(&self) -> usize {
self.overlay_len()
}
#[inline]
pub fn term_count(&self) -> usize {
self.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.overlay_is_empty()
}
pub fn root(&self) -> PersistentARTrieCharNode<V> {
use crate::persistent_artrie_core::overlay::flip::LockFreeOverlay;
let root = <Self as LockFreeOverlay<CharKey, V, S>>::overlay_root_node(self)
.unwrap_or_else(|| {
Arc::new(crate::persistent_artrie_core::overlay::OverlayNode::<
CharKey,
V,
>::new())
});
PersistentARTrieCharNode::from_overlay_root(root, None)
}
pub fn iter(&self) -> impl Iterator<Item = String> + '_ {
self.iter_prefix("")
.ok()
.flatten()
.unwrap_or_default()
.into_iter()
}
pub fn iter_with_values(&self) -> impl Iterator<Item = (String, V)> + '_
where
V: Clone,
{
self.iter_prefix_with_values("")
.ok()
.flatten()
.unwrap_or_default()
.into_iter()
}
pub fn iter_prefix_vec(&self, prefix: &str) -> Option<impl Iterator<Item = String> + '_> {
self.iter_prefix(prefix)
.ok()
.flatten()
.map(|v| v.into_iter())
}
pub fn iter_prefix_with_values_vec(
&self,
prefix: &str,
) -> Option<impl Iterator<Item = (String, V)> + '_>
where
V: Clone,
{
self.iter_prefix_with_values(prefix)
.ok()
.flatten()
.map(|v| v.into_iter())
}
}
impl<V: DictionaryValue + Default> FromIterator<String> for PersistentARTrieChar<V> {
#[allow(deprecated)]
fn from_iter<I: IntoIterator<Item = String>>(iter: I) -> Self {
let trie = Self::new();
for term in iter {
trie.insert(&term);
}
trie
}
}
impl<'a, V: DictionaryValue + Default> FromIterator<&'a str> for PersistentARTrieChar<V> {
#[allow(deprecated)]
fn from_iter<I: IntoIterator<Item = &'a str>>(iter: I) -> Self {
let trie = Self::new();
for term in iter {
trie.insert(term);
}
trie
}
}
pub type PersistentARTrieCharNode<V = ()> =
crate::persistent_artrie_core::overlay::OverlayDictionaryNode<CharKey, V>;
impl<V: DictionaryValue, S: crate::persistent_artrie::block_storage::BlockStorage> Dictionary
for PersistentARTrieChar<V, S>
{
type Node = PersistentARTrieCharNode<V>;
fn root(&self) -> Self::Node {
PersistentARTrieChar::root(self)
}
fn contains(&self, term: &str) -> bool {
PersistentARTrieChar::contains(self, term)
}
#[inline]
fn len(&self) -> Option<usize> {
Some(self.len())
}
}
impl<V: DictionaryValue + Clone, S: crate::persistent_artrie::block_storage::BlockStorage>
MappedDictionary for PersistentARTrieChar<V, S>
{
type Value = V;
fn get_value(&self, term: &str) -> Option<V> {
self.get_value(term)
}
}
impl<V: DictionaryValue> Dictionary for SharedCharARTrie<V> {
type Node = PersistentARTrieCharNode<V>;
fn root(&self) -> Self::Node {
let guard = self.read();
use crate::persistent_artrie_core::overlay::flip::LockFreeOverlay;
let root =
<PersistentARTrieChar<V> as LockFreeOverlay<CharKey, V, _>>::overlay_root_node(&guard)
.unwrap_or_else(|| {
Arc::new(crate::persistent_artrie_core::overlay::OverlayNode::<
CharKey,
V,
>::new())
});
drop(guard);
let faulter: Arc<dyn crate::persistent_artrie_core::overlay::OverlayFaulter<CharKey, V>> =
Arc::new(overlay_fault::SharedOverlayFaulter::new(Arc::clone(self)));
PersistentARTrieCharNode::from_overlay_root(root, Some(faulter))
}
fn contains(&self, term: &str) -> bool {
let guard = self.read();
guard.contains(term)
}
#[inline]
fn len(&self) -> Option<usize> {
let guard = self.read();
Some(guard.len())
}
}
impl<V: DictionaryValue + Clone> MappedDictionary for SharedCharARTrie<V> {
type Value = V;
fn get_value(&self, term: &str) -> Option<V> {
let guard = self.read();
guard.get_value(term)
}
}
impl<V: DictionaryValue + Clone> MutableMappedDictionary for SharedCharARTrie<V> {
fn insert_with_value(&self, term: &str, value: V) -> bool {
let guard = self.write();
guard.insert_with_value(term, value).unwrap_or(false)
}
fn union_with<F>(&self, other: &Self, merge_fn: F) -> usize
where
F: Fn(&Self::Value, &Self::Value) -> Self::Value,
Self::Value: Clone,
{
let entries: Vec<(String, V)> = {
let other_guard = other.read();
match other_guard.iter_prefix_with_values_and_arena("") {
Ok(Some(terms)) => terms.into_iter().map(|i| (i.term, i.value)).collect(),
_ => Vec::new(),
}
};
let merge_lock = self.merge_lock.clone();
let _merge_guard = merge_lock.lock();
self.merge_entries(entries, merge_fn).unwrap_or(0)
}
fn update_or_insert<F>(&self, term: &str, default_value: V, update_fn: F) -> bool
where
F: FnOnce(&mut V),
{
let guard = self.write();
if let Some(existing) = guard.get_value(term) {
let mut value = existing;
update_fn(&mut value);
guard.upsert(term, value).unwrap_or(false);
false } else {
guard
.insert_with_value(term, default_value)
.unwrap_or(false);
true }
}
}
#[derive(Debug, Clone)]
pub struct PersistentARTrieCharZipper<V: DictionaryValue = ()> {
node: PersistentARTrieCharNode<V>,
path_vec: Vec<char>,
}
impl<V: DictionaryValue> PersistentARTrieCharZipper<V> {
pub fn new(dict: &PersistentARTrieChar<V>) -> Self {
Self {
node: dict.root(),
path_vec: Vec::new(),
}
}
pub fn path_string(&self) -> String {
self.path_vec.iter().collect()
}
}
impl<V: DictionaryValue> DictZipper for PersistentARTrieCharZipper<V> {
type Unit = char;
fn is_final(&self) -> bool {
self.node.is_final()
}
fn descend(&self, label: char) -> Option<Self> {
self.node.transition(label).map(|child| {
let mut new_path = self.path_vec.clone();
new_path.push(label);
Self {
node: child,
path_vec: new_path,
}
})
}
fn children(&self) -> impl Iterator<Item = (char, Self)> {
let path = self.path_vec.clone();
self.node
.edges()
.map(move |(c, child)| {
let mut new_path = path.clone();
new_path.push(c);
(
c,
Self {
node: child,
path_vec: new_path,
},
)
})
.collect::<Vec<_>>()
.into_iter()
}
fn path(&self) -> Vec<char> {
self.path_vec.clone()
}
}
impl<V: DictionaryValue + Clone> ValuedDictZipper for PersistentARTrieCharZipper<V> {
type Value = V;
fn value(&self) -> Option<V> {
if !self.node.is_final() {
return None;
}
self.node.value()
}
}
impl<V: DictionaryValue> crate::artrie_trait::ARTrie for SharedCharARTrie<V> {
type Unit = char;
type Value = V;
fn create<P: AsRef<std::path::Path>>(path: P) -> crate::persistent_artrie::error::Result<Self> {
PersistentARTrieChar::create(path).map(Arc::new)
}
fn create_with_slot_tracking<P: AsRef<std::path::Path>>(
path: P,
) -> crate::persistent_artrie::error::Result<Self> {
PersistentARTrieChar::create_with_slot_tracking(path).map(Arc::new)
}
fn open<P: AsRef<std::path::Path>>(path: P) -> crate::persistent_artrie::error::Result<Self> {
PersistentARTrieChar::open(path).map(Arc::new)
}
fn open_with_slot_tracking<P: AsRef<std::path::Path>>(
path: P,
) -> crate::persistent_artrie::error::Result<Self> {
PersistentARTrieChar::open_with_slot_tracking(path).map(Arc::new)
}
fn open_with_recovery<P: AsRef<std::path::Path>>(
path: P,
) -> crate::persistent_artrie::error::Result<(
Self,
crate::persistent_artrie::recovery::RecoveryReport,
)> {
PersistentARTrieChar::open_with_recovery(path).map(|(t, r)| (Arc::new(t), r))
}
fn open_with_recovery_and_slot_tracking<P: AsRef<std::path::Path>>(
path: P,
) -> crate::persistent_artrie::error::Result<(
Self,
crate::persistent_artrie::recovery::RecoveryReport,
)> {
let (trie, report) = PersistentARTrieChar::open_with_recovery(path)?;
if let Some(ref am) = trie.arena_manager {
am.write().enable_slot_tracking();
}
Ok((Arc::new(trie), report))
}
fn enable_slot_tracking(&self) {
let guard = self.read();
if let Some(ref am) = guard.arena_manager {
am.write().enable_slot_tracking();
}
}
fn flush_sequential(&self) -> crate::persistent_artrie::error::Result<()> {
let guard = self.read();
if let Some(ref am) = guard.arena_manager {
am.write().flush_sequential()?;
}
Ok(())
}
fn insert(&self, term: &str) -> bool
where
Self::Value: Default,
{
let guard = self.write();
guard.insert(term).unwrap_or(false)
}
fn insert_with_value(&self, term: &str, value: Self::Value) -> bool {
let guard = self.write();
guard.insert_with_value(term, value).unwrap_or(false)
}
fn contains(&self, term: &str) -> bool {
let guard = self.read();
guard.contains(term)
}
fn get_value(&self, term: &str) -> Option<Self::Value>
where
V: Clone,
{
let guard = self.read();
guard.get_value(term)
}
fn remove(&self, term: &str) -> bool {
let guard = self.write();
guard.remove(term).unwrap_or(false)
}
#[inline]
fn len(&self) -> usize {
let guard = self.read();
guard.len()
}
fn checkpoint(&self) -> crate::persistent_artrie::error::Result<()> {
let ckpt_lock = self.checkpoint_lock.clone();
let _ckpt_guard = ckpt_lock.lock();
let snapshot = self.capture_snapshot_immutable()?;
if self
.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.is_some()
{
self.publish_immutable_snapshot_retaining_wal_with_eviction(snapshot)
} else {
self.publish_immutable_snapshot_retaining_wal(&snapshot)
}
}
fn is_dirty(&self) -> bool {
let guard = self.read();
guard.is_dirty()
}
fn remove_prefix(&self, prefix: &str) -> usize {
let guard = self.write();
guard.remove_prefix(prefix).unwrap_or(0)
}
fn iter_prefix(&self, prefix: &str) -> Option<Box<dyn Iterator<Item = String> + '_>> {
let guard = self.read();
guard
.iter_prefix(prefix)
.ok()
.flatten()
.map(|v| Box::new(v.into_iter()) as Box<dyn Iterator<Item = String> + '_>)
}
fn sync(&self) -> crate::persistent_artrie::error::Result<()> {
let guard = self.write();
guard.sync()
}
fn current_lsn(&self) -> u64 {
let guard = self.read();
guard.current_lsn()
}
fn synced_lsn(&self) -> Option<u64> {
let guard = self.read();
guard.synced_lsn()
}
fn durability_policy(&self) -> crate::persistent_artrie::dict_impl::DurabilityPolicy {
let guard = self.read();
guard.durability_policy()
}
fn upsert(
&self,
term: &str,
value: Self::Value,
) -> crate::persistent_artrie::error::Result<bool> {
let guard = self.write();
guard.upsert(term, value)
}
}
pub(crate) use crate::persistent_artrie_core::overlay::evict::OverlayEvictOutcome;
impl<V: DictionaryValue, S: crate::persistent_artrie::block_storage::BlockStorage>
crate::persistent_artrie_core::overlay::evict::OverlayEvictable<
crate::persistent_artrie_core::key_encoding::CharKey,
V,
S,
> for PersistentARTrieChar<V, S>
{
#[inline]
fn overlay_root_slot(
&self,
) -> Option<
&crate::persistent_artrie_core::overlay::AtomicNodePtr<
crate::persistent_artrie_core::key_encoding::CharKey,
V,
>,
> {
self.lockfree_root.as_ref()
}
#[inline]
fn overlay_epoch_manager(&self) -> &crate::persistent_artrie_core::concurrency::EpochManager {
&self.epoch_manager
}
#[inline]
fn overlay_eviction_coordinator(
&self,
) -> Option<Arc<crate::persistent_artrie::eviction::EvictionCoordinator>> {
self.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.as_ref()
.map(Arc::clone)
}
#[inline]
fn note_faultin_cas(&self) {
self.cas_retries
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
pub(crate) fn evict_overlay_nodes<
V: DictionaryValue,
S: crate::persistent_artrie::block_storage::BlockStorage,
>(
trie: &PersistentARTrieChar<V, S>,
mut nodes: Vec<(
u64,
Vec<char>,
crate::persistent_artrie::swizzled_ptr::SwizzledPtr,
)>,
max_rebase_retries: usize,
) -> (usize, usize) {
use crate::persistent_artrie_core::overlay::evict::OverlayEvictable;
nodes.sort_by(|a, b| b.1.len().cmp(&a.1.len()));
let mut evicted = 0usize;
let mut bytes_freed = 0usize;
for (_path_hash, path, disk_ptr) in nodes {
let mut char_path: Vec<u32> = Vec::with_capacity(path.len());
char_path.extend(path.iter().map(|&c| c as u32));
let mut attempt = 0;
loop {
match trie.evict_overlay_node_at_path(&char_path, disk_ptr.clone()) {
OverlayEvictOutcome::Evicted => {
evicted += 1;
bytes_freed += 256;
if let Some(coordinator) = trie.overlay_eviction_coordinator() {
use crate::persistent_artrie::eviction::lru_tracker::hash_char_path;
coordinator
.lru_registry()
.remove_hash(hash_char_path(&path));
}
break;
}
OverlayEvictOutcome::RootCasLost => {
attempt += 1;
if attempt > max_rebase_retries {
break; }
}
OverlayEvictOutcome::NotEvictable => break, }
}
}
(evicted, bytes_freed)
}
impl<V: DictionaryValue> crate::artrie_trait::EvictableARTrie for SharedCharARTrie<V> {
fn enable_eviction(
&self,
config: crate::persistent_artrie::eviction::EvictionConfig,
) -> crate::persistent_artrie::error::Result<()> {
use crate::persistent_artrie::error::PersistentARTrieError;
config
.validate()
.map_err(|e| PersistentARTrieError::internal(&e))?;
if self
.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.is_some()
{
return Err(PersistentARTrieError::internal("Eviction already enabled"));
}
let epoch_manager = Arc::clone(&self.epoch_manager);
let coordinator = crate::persistent_artrie::eviction::EvictionCoordinator::new(
config.clone(),
epoch_manager,
);
let self_weak = Arc::downgrade(self);
coordinator
.start_char(move |nodes_to_evict| {
let Some(trie) = self_weak.upgrade() else {
return (0, 0);
};
evict_overlay_nodes(&trie, nodes_to_evict, 4)
})
.map_err(|e| PersistentARTrieError::internal(&e))?;
coordinator
.start_memory_monitor()
.map_err(|e| PersistentARTrieError::internal(&e))?;
let mut slot = self
.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned");
if slot.is_some() {
drop(slot);
coordinator.shutdown();
return Err(PersistentARTrieError::internal("Eviction already enabled"));
}
*slot = Some(coordinator);
Ok(())
}
fn disable_eviction(&self) -> crate::persistent_artrie::error::Result<()> {
let coordinator = self
.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.take();
if let Some(coordinator) = coordinator {
coordinator.shutdown();
}
Ok(())
}
fn eviction_enabled(&self) -> bool {
self.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.is_some()
}
fn eviction_stats(&self) -> crate::persistent_artrie::eviction::EvictionStats {
self.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.as_ref()
.map(|c| c.stats())
.unwrap_or_default()
}
fn force_eviction(
&self,
target_bytes: usize,
) -> crate::persistent_artrie::error::Result<(usize, usize)> {
let coordinator = {
match self
.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.as_ref()
{
Some(c) => Arc::clone(c),
None => return Ok((0, 0)),
}
};
let trie = Arc::clone(self);
Ok(coordinator.force_eviction_char(target_bytes, move |nodes| {
evict_overlay_nodes(&trie, nodes, 4)
}))
}
fn touch_node(&self, path: &[Self::Unit]) {
if let Some(coordinator) = self
.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.as_ref()
{
use crate::persistent_artrie::eviction::lru_tracker::hash_char_path;
coordinator.lru_registry().touch_hash(hash_char_path(path));
}
}
}
impl<V: DictionaryValue, S: crate::persistent_artrie::block_storage::BlockStorage>
PersistentARTrieChar<V, S>
{
pub(crate) fn invalidate_eviction_registry(&self) {
self.structural_generation
.fetch_add(1, std::sync::atomic::Ordering::Release);
if let Some(coordinator) = self
.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.as_ref()
{
coordinator.invalidate_registry();
}
}
pub fn evictable_node_count(&self) -> Option<usize> {
self.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.as_ref()
.map(|c| c.disk_registry_char_len())
}
#[cfg(any(test, feature = "bench-internals"))]
pub fn bench_enable_eviction(
&mut self,
config: crate::persistent_artrie::eviction::EvictionConfig,
) -> crate::persistent_artrie::error::Result<()> {
use crate::persistent_artrie::error::PersistentARTrieError;
config
.validate()
.map_err(|e| PersistentARTrieError::internal(&e))?;
if self
.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.is_some()
{
return Err(PersistentARTrieError::internal("Eviction already enabled"));
}
let epoch_manager = Arc::clone(&self.epoch_manager);
let coordinator = crate::persistent_artrie::eviction::EvictionCoordinator::new(
config.clone(),
epoch_manager,
);
coordinator
.start_char(|_nodes_to_evict| (0usize, 0usize))
.map_err(|e| PersistentARTrieError::internal(&e))?;
coordinator
.start_memory_monitor()
.map_err(|e| PersistentARTrieError::internal(&e))?;
*self
.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned") = Some(coordinator);
Ok(())
}
#[cfg(feature = "bench-internals")]
pub fn bench_evict_overlay_cold_nodes<F>(&self, budget_bytes: usize, cold_filter: F) -> usize
where
F: Fn(&[char]) -> bool,
{
let coordinator = match self
.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.as_ref()
{
Some(c) => Arc::clone(c),
None => return 0,
};
coordinator
.force_eviction_char(budget_bytes, |cands| {
let filtered: Vec<_> = cands
.into_iter()
.filter(|(_, p, _)| cold_filter(p))
.collect();
evict_overlay_nodes(self, filtered, 4)
})
.0
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dictionary_node_traversal_descends_after_reopen() {
use crate::{DictionaryNode, MappedDictionaryNode};
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("reopen_traversal.artc");
{
let trie = PersistentARTrieChar::<i32>::create(&path).expect("create");
trie.insert_with_value("receive", 1).expect("insert");
trie.insert_with_value("recipe", 2).expect("insert");
trie.insert_with_value("decide", 3).expect("insert");
trie.checkpoint().expect("checkpoint");
}
let trie = PersistentARTrieChar::<i32>::open(&path).expect("open");
assert_eq!(trie.len(), 3);
assert!(
trie.root().edges().count() > 0,
"root edges empty after reopen — swizzle-fault regression"
);
let mut node = trie.root();
for ch in "receive".chars() {
node = node
.transition(ch)
.unwrap_or_else(|| panic!("transition '{ch}' lost after reopen"));
}
assert!(node.is_final(), "terminal node not final after reopen");
assert_eq!(node.value(), Some(1), "value lost after reopen");
assert!(
trie.root().transition('x').is_none(),
"spurious transition for absent edge"
);
}
#[test]
fn dictionary_node_traversal_descends_after_forced_eviction() {
use crate::{Dictionary, DictionaryNode, EvictableARTrie};
use std::sync::Arc;
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("evict_traversal.artc");
{
let trie = PersistentARTrieChar::<i32>::create(&path).expect("create");
for (t, v) in [
("receive", 1),
("recipe", 2),
("recital", 3),
("reception", 4),
("decide", 5),
] {
trie.insert_with_value(t, v).expect("insert");
}
trie.checkpoint().expect("checkpoint");
}
let shared: SharedCharARTrie<i32> =
Arc::new(PersistentARTrieChar::<i32>::open(&path).expect("open"));
let _ = shared.enable_eviction(EvictionConfig::default());
shared.write().checkpoint().expect("checkpoint");
let _ = shared.force_eviction(usize::MAX);
let _ = shared.disable_eviction();
assert!(
shared.root().edges().count() > 0,
"root edges empty after eviction — re-fault regression"
);
let mut node = shared.root();
for ch in "reception".chars() {
node = node
.transition(ch)
.unwrap_or_else(|| panic!("transition '{ch}' lost after eviction"));
}
assert!(node.is_final(), "terminal node not final after eviction");
}
#[test]
#[allow(deprecated)]
fn test_new_empty() {
let trie: PersistentARTrieChar<()> = PersistentARTrieChar::new();
assert!(trie.is_empty());
assert_eq!(trie.len(), 0);
}
#[test]
#[allow(deprecated)]
fn test_insert_ascii() {
let trie: PersistentARTrieChar<()> = PersistentARTrieChar::new();
assert!(trie.insert("hello").expect("insert failed"));
assert!(trie.insert("world").expect("insert failed"));
assert!(!trie.insert("hello").expect("insert failed")); assert_eq!(trie.len(), 2);
}
#[test]
#[allow(deprecated)]
fn test_insert_unicode() {
let trie: PersistentARTrieChar<()> = PersistentARTrieChar::new();
assert!(trie.insert("héllo").expect("insert failed")); assert!(trie.insert("日本語").expect("insert failed")); assert!(trie.insert("emoji😀").expect("insert failed")); assert_eq!(trie.len(), 3);
}
#[test]
#[allow(deprecated)]
fn test_contains() {
let trie: PersistentARTrieChar<()> = PersistentARTrieChar::new();
let _ = trie.insert("hello");
let _ = trie.insert("héllo");
assert!(trie.contains("hello"));
assert!(trie.contains("héllo"));
assert!(!trie.contains("helo"));
assert!(!trie.contains("hello ")); }
#[test]
#[allow(deprecated)]
fn test_value_storage() {
let trie: PersistentARTrieChar<i32> = PersistentARTrieChar::new();
let _ = trie.insert_with_value("one", 1);
let _ = trie.insert_with_value("two", 2);
let _ = trie.insert_with_value("three", 3);
assert_eq!(trie.get("one"), Some(1));
assert_eq!(trie.get("two"), Some(2));
assert_eq!(trie.get("three"), Some(3));
assert_eq!(trie.get("four"), None);
}
#[test]
#[allow(deprecated)]
fn test_unicode_correctness() {
let trie: PersistentARTrieChar<()> = PersistentARTrieChar::new();
let _ = trie.insert("¡");
let root = trie.root();
let edges: Vec<_> = root.edges().collect();
assert_eq!(edges.len(), 1);
assert_eq!(edges[0].0, '¡');
}
#[test]
#[allow(deprecated)]
fn test_from_iter() {
let terms = vec!["alpha", "beta", "gamma"];
let trie: PersistentARTrieChar<()> = terms.into_iter().collect();
assert_eq!(trie.len(), 3);
assert!(trie.contains("alpha"));
assert!(trie.contains("beta"));
assert!(trie.contains("gamma"));
}
#[test]
#[allow(deprecated)]
fn test_zipper() {
let trie: PersistentARTrieChar<()> = PersistentARTrieChar::new();
let _ = trie.insert("hello");
let _ = trie.insert("help");
let zipper = PersistentARTrieCharZipper::new(&trie);
let zipper = zipper.descend('h').expect("should have 'h'");
let zipper = zipper.descend('e').expect("should have 'e'");
let zipper = zipper.descend('l').expect("should have 'l'");
let edges: Vec<_> = zipper.children().map(|(c, _)| c).collect();
assert_eq!(edges.len(), 2); }
#[test]
#[allow(deprecated)]
fn test_iter() {
let trie: PersistentARTrieChar<()> = PersistentARTrieChar::new();
let _ = trie.insert("apple");
let _ = trie.insert("banana");
let _ = trie.insert("cherry");
let terms: Vec<_> = trie.iter().collect();
assert_eq!(terms.len(), 3);
assert!(terms.contains(&"apple".to_string()));
assert!(terms.contains(&"banana".to_string()));
assert!(terms.contains(&"cherry".to_string()));
}
}