pub mod types;
pub mod header;
pub mod sync_handle;
#[cfg(feature = "io-uring-backend")]
pub mod io_uring_ctor;
pub mod mmap_ctor;
pub mod lockfree_cas;
pub(crate) mod overlay_write_mode;
pub(crate) mod overlay_serialize;
pub mod persistence_api;
pub mod query_api;
pub mod mutation_api;
pub mod path_query;
pub mod dict_impl;
pub use types::{
NodeRef, VocabTrieFileHeader,
DEFAULT_VOCAB_BUFFER_POOL_SIZE,
VOCAB_FILE_HEADER_SIZE,
VOCAB_HEADER_VERSION_V2,
VOCAB_TRIE_MAGIC,
};
pub use dict_impl::{PersistentVocabARTrie, SharedVocabARTrie, VocabSyncHandle};
pub use crate::persistent_artrie::dict_impl::DurabilityPolicy;
pub use crate::persistent_artrie::eviction::{
AccessTracker, DiskLocationRegistry, EvictionConfig, EvictionCoordinator, EvictionStats,
EvictionUrgency, LruRegistry,
};
use crate::bijective::BijectiveDictionary;
use crate::persistent_artrie::error::Result;
use crate::persistent_artrie::recovery::RecoveryReport;
use crate::{Dictionary, DictionaryNode, MappedDictionary, MutableMappedDictionary};
use std::path::Path;
impl Dictionary for PersistentVocabARTrie {
type Node = VocabTrieNodeRef;
fn root(&self) -> Self::Node {
let children = self.get_root_children();
VocabTrieNodeRef::new(false, children, Vec::new())
}
fn contains(&self, term: &str) -> bool {
PersistentVocabARTrie::contains(self, term)
}
fn len(&self) -> Option<usize> {
Some(PersistentVocabARTrie::len(self))
}
}
#[derive(Clone)]
pub struct VocabTrieNodeRef {
is_final: bool,
children: Vec<(char, bool)>,
path: Vec<char>,
}
impl VocabTrieNodeRef {
fn new(is_final: bool, children: Vec<(char, bool)>, path: Vec<char>) -> Self {
Self {
is_final,
children,
path,
}
}
}
impl DictionaryNode for VocabTrieNodeRef {
type Unit = char;
fn is_final(&self) -> bool {
self.is_final
}
fn transition(&self, label: char) -> Option<Self> {
for &(child_label, child_is_final) in &self.children {
if child_label == label {
let mut new_path = self.path.clone();
new_path.push(label);
return Some(VocabTrieNodeRef::new(child_is_final, Vec::new(), new_path));
}
}
None
}
fn edges(&self) -> Box<dyn Iterator<Item = (char, Self)> + '_> {
let path = self.path.clone();
let edges: Vec<_> = self
.children
.iter()
.map(move |&(label, is_final)| {
let mut new_path = path.clone();
new_path.push(label);
(label, VocabTrieNodeRef::new(is_final, Vec::new(), new_path))
})
.collect();
Box::new(edges.into_iter())
}
}
impl MappedDictionary for PersistentVocabARTrie {
type Value = u64;
fn get_value(&self, term: &str) -> Option<Self::Value> {
self.get_index(term)
}
}
impl MutableMappedDictionary for PersistentVocabARTrie {
fn insert_with_value(&self, term: &str, value: Self::Value) -> bool {
match self.insert_with_index(term, value) {
Ok(inserted) => inserted,
Err(error) => {
log::warn!(
"PersistentVocabARTrie::insert_with_value({term:?}, {value}) failed: {error}"
);
false
}
}
}
fn union_with<F>(&self, other: &Self, merge_fn: F) -> usize
where
F: Fn(&Self::Value, &Self::Value) -> Self::Value,
Self::Value: Clone,
{
let other_terms: Vec<(String, u64)> = other
.iter_terms()
.filter_map(|term| other.get_index(&term).map(|index| (term, index)))
.collect();
let mut inserted = 0;
for (term, other_index) in other_terms {
if let Some(existing_index) = self.get_index(&term) {
let merged_index = merge_fn(&existing_index, &other_index);
if merged_index != existing_index {
log::warn!(
"PersistentVocabARTrie::union_with cannot remap existing term \
{term:?} from index {existing_index} to {merged_index}; \
vocabulary indices are immutable"
);
}
continue;
}
match self.insert_with_index(&term, other_index) {
Ok(true) => inserted += 1,
Ok(false) => {}
Err(error) => {
log::warn!(
"PersistentVocabARTrie::union_with failed for {term:?} at \
index {other_index}: {error}"
);
}
}
}
inserted
}
fn update_or_insert<F>(&self, term: &str, default_value: Self::Value, update_fn: F) -> bool
where
F: Fn(&mut Self::Value),
{
if let Some(existing_index) = self.get_index(term) {
let mut proposed_index = existing_index;
update_fn(&mut proposed_index);
if proposed_index != existing_index {
log::warn!(
"PersistentVocabARTrie::update_or_insert({term:?}) cannot remap \
existing index {existing_index} to {proposed_index}; vocabulary \
indices are immutable"
);
}
return false;
}
match self.insert_with_index(term, default_value) {
Ok(inserted) => inserted,
Err(error) => {
log::warn!(
"PersistentVocabARTrie::update_or_insert({term:?}, {default_value}, _) \
failed: {error}"
);
false
}
}
}
}
impl BijectiveDictionary for PersistentVocabARTrie {
fn get_term(&self, value: &Self::Value) -> Option<std::borrow::Cow<'_, str>> {
Self::get_term(self, *value).map(std::borrow::Cow::Owned)
}
fn contains_value(&self, value: &Self::Value) -> bool {
self.contains_index(*value)
}
fn bijection_len(&self) -> usize {
self.len()
}
}
use parking_lot::RwLock;
use std::sync::Arc;
impl Dictionary for SharedVocabARTrie {
type Node = VocabTrieNodeRef;
fn root(&self) -> Self::Node {
let guard = self.read();
let children = guard.get_root_children();
VocabTrieNodeRef::new(false, children, Vec::new())
}
fn contains(&self, term: &str) -> bool {
let guard = self.read();
guard.contains(term)
}
fn len(&self) -> Option<usize> {
let guard = self.read();
Some(guard.len())
}
}
impl MappedDictionary for SharedVocabARTrie {
type Value = u64;
fn get_value(&self, term: &str) -> Option<Self::Value> {
let guard = self.read();
guard.get_index(term)
}
}
impl MutableMappedDictionary for SharedVocabARTrie {
fn insert_with_value(&self, term: &str, value: Self::Value) -> bool {
let guard = self.write();
match guard.insert_with_index(term, value) {
Ok(inserted) => inserted,
Err(error) => {
log::warn!(
"SharedVocabARTrie::insert_with_value({term:?}, {value}) failed: {error}"
);
false
}
}
}
fn union_with<F>(&self, other: &Self, merge_fn: F) -> usize
where
F: Fn(&Self::Value, &Self::Value) -> Self::Value,
Self::Value: Clone,
{
let other_terms: Vec<(String, u64)> = {
let other_guard = other.read();
other_guard
.iter_terms()
.filter_map(|term| other_guard.get_index(&term).map(|index| (term, index)))
.collect()
};
let mut conflicts = Vec::new();
let inserted = {
let self_guard = self.write();
let mut inserted = 0;
for (term, other_index) in other_terms {
if let Some(existing_index) = self_guard.get_index(&term) {
conflicts.push((term, existing_index, other_index));
continue;
}
match self_guard.insert_with_index(&term, other_index) {
Ok(true) => inserted += 1,
Ok(false) => {}
Err(error) => {
log::warn!(
"SharedVocabARTrie::union_with failed for {term:?} at \
index {other_index}: {error}"
);
}
}
}
inserted
};
for (term, existing_index, other_index) in conflicts {
let merged_index = merge_fn(&existing_index, &other_index);
if merged_index != existing_index {
log::warn!(
"SharedVocabARTrie::union_with cannot remap existing term \
{term:?} from index {existing_index} to {merged_index}; \
vocabulary indices are immutable"
);
}
}
inserted
}
fn update_or_insert<F>(&self, term: &str, default_value: Self::Value, update_fn: F) -> bool
where
F: Fn(&mut Self::Value),
{
let guard = self.write();
if let Some(existing_index) = guard.get_index(term) {
drop(guard);
let mut proposed_index = existing_index;
update_fn(&mut proposed_index);
if proposed_index != existing_index {
log::warn!(
"SharedVocabARTrie::update_or_insert({term:?}) cannot remap \
existing index {existing_index} to {proposed_index}; vocabulary \
indices are immutable"
);
}
return false;
}
match guard.insert_with_index(term, default_value) {
Ok(inserted) => inserted,
Err(error) => {
log::warn!(
"SharedVocabARTrie::update_or_insert({term:?}, {default_value}, _) \
failed: {error}"
);
false
}
}
}
}
impl BijectiveDictionary for SharedVocabARTrie {
fn get_term(&self, value: &Self::Value) -> Option<std::borrow::Cow<'_, str>> {
let guard = self.read();
guard.get_term(*value).map(std::borrow::Cow::Owned)
}
fn contains_value(&self, value: &Self::Value) -> bool {
let guard = self.read();
guard.contains_index(*value)
}
fn bijection_len(&self) -> usize {
let guard = self.read();
guard.len()
}
}
impl crate::artrie_trait::ARTrie for SharedVocabARTrie {
type Unit = char;
type Value = u64;
fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
PersistentVocabARTrie::create(path).map(|t| Arc::new(RwLock::new(t)))
}
fn create_with_slot_tracking<P: AsRef<Path>>(path: P) -> Result<Self> {
PersistentVocabARTrie::create(path).map(|t| Arc::new(RwLock::new(t)))
}
fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
PersistentVocabARTrie::open(path).map(|t| Arc::new(RwLock::new(t)))
}
fn open_with_slot_tracking<P: AsRef<Path>>(path: P) -> Result<Self> {
PersistentVocabARTrie::open(path).map(|t| Arc::new(RwLock::new(t)))
}
fn open_with_recovery<P: AsRef<Path>>(path: P) -> Result<(Self, RecoveryReport)> {
PersistentVocabARTrie::open_with_recovery(path).map(|(t, r)| (Arc::new(RwLock::new(t)), r))
}
fn open_with_recovery_and_slot_tracking<P: AsRef<Path>>(
path: P,
) -> Result<(Self, RecoveryReport)> {
let (trie, report) = PersistentVocabARTrie::open_with_recovery(path)?;
trie.enable_slot_tracking();
Ok((Arc::new(RwLock::new(trie)), report))
}
fn enable_slot_tracking(&self) {
self.write().enable_slot_tracking();
}
fn flush_sequential(&self) -> Result<()> {
self.write().flush_sequential()
}
fn insert(&self, term: &str) -> bool
where
Self::Value: Default,
{
let mut guard = self.write();
let old_count = guard.len();
if let Err(error) = PersistentVocabARTrie::insert(&mut *guard, term) {
log::warn!("SharedVocabARTrie::insert failed: {error}");
return false;
}
guard.len() > old_count
}
fn insert_with_value(&self, term: &str, value: Self::Value) -> bool {
let guard = self.write();
match guard.insert_with_index(term, value) {
Ok(inserted) => inserted,
Err(error) => {
log::warn!(
"SharedVocabARTrie::insert_with_value({term:?}, {value}) failed: {error}"
);
false
}
}
}
fn contains(&self, term: &str) -> bool {
let guard = self.read();
guard.contains(term)
}
fn get_value(&self, term: &str) -> Option<Self::Value> {
let guard = self.read();
guard.get_index(term)
}
fn remove(&self, term: &str) -> bool {
log::warn!(
"SharedVocabARTrie::remove({term:?}) is unsupported — vocab tries \
are append-only to preserve the term ↔ index bijection. Returns \
false unconditionally."
);
false
}
fn len(&self) -> usize {
let guard = self.read();
guard.len()
}
fn checkpoint(&self) -> Result<()> {
let guard = self.write();
guard.checkpoint()
}
fn is_dirty(&self) -> bool {
let guard = self.read();
guard.is_dirty()
}
fn remove_prefix(&self, prefix: &str) -> usize {
log::warn!(
"SharedVocabARTrie::remove_prefix({prefix:?}) is unsupported — \
vocab tries are append-only. Returns 0 unconditionally."
);
0
}
fn iter_prefix(&self, prefix: &str) -> Option<Box<dyn Iterator<Item = String> + '_>> {
let guard = self.read();
let prefix_chars: Vec<char> = prefix.chars().collect();
let prefix_exists = if prefix.is_empty() {
true
} else {
guard.get_index(prefix).is_some()
|| !guard.get_children_at_path(&prefix_chars).is_empty()
};
if prefix_exists {
let terms: Vec<String> = guard.iter_terms_with_prefix(prefix).collect();
Some(Box::new(terms.into_iter()))
} else {
None
}
}
fn sync(&self) -> Result<()> {
let guard = self.write();
guard.sync()
}
fn current_lsn(&self) -> u64 {
let guard = self.read();
guard.current_lsn()
}
fn synced_lsn(&self) -> Option<u64> {
let guard = self.read();
guard.synced_lsn()
}
fn durability_policy(&self) -> DurabilityPolicy {
let guard = self.read();
guard.durability_policy()
}
fn upsert(&self, term: &str, value: Self::Value) -> Result<bool> {
let guard = self.write();
guard.insert_with_index(term, value)
}
}
impl crate::artrie_trait::EvictableARTrie for SharedVocabARTrie {
fn enable_eviction(
&self,
config: crate::persistent_artrie::eviction::EvictionConfig,
) -> crate::persistent_artrie::error::Result<()> {
use crate::persistent_artrie::error::PersistentARTrieError;
config
.validate()
.map_err(|e| PersistentARTrieError::internal(&e))?;
let mut guard = self.write();
if guard.eviction_coordinator.is_some() {
return Err(PersistentARTrieError::internal("Eviction already enabled"));
}
let epoch_manager = Arc::new(crate::persistent_artrie::concurrency::EpochManager::new());
let coordinator = crate::persistent_artrie::eviction::EvictionCoordinator::new(
config.clone(),
epoch_manager,
);
coordinator
.start_char(move |_nodes_to_evict| (0, 0))
.map_err(|e| PersistentARTrieError::internal(&e))?;
coordinator
.start_memory_monitor()
.map_err(|e| PersistentARTrieError::internal(&e))?;
guard.eviction_coordinator = Some(coordinator);
Ok(())
}
fn disable_eviction(&self) -> crate::persistent_artrie::error::Result<()> {
let coordinator = {
let mut guard = self.write();
guard.eviction_coordinator.take()
};
if let Some(coordinator) = coordinator {
coordinator.shutdown();
}
Ok(())
}
fn eviction_enabled(&self) -> bool {
let guard = self.read();
guard.eviction_coordinator.is_some()
}
fn eviction_stats(&self) -> crate::persistent_artrie::eviction::EvictionStats {
let guard = self.read();
guard
.eviction_coordinator
.as_ref()
.map(|c| c.stats())
.unwrap_or_default()
}
fn force_eviction(
&self,
target_bytes: usize,
) -> crate::persistent_artrie::error::Result<(usize, usize)> {
let guard = self.read();
let Some(coordinator) = &guard.eviction_coordinator else {
return Ok((0, 0));
};
Ok(coordinator.force_eviction(target_bytes))
}
fn touch_node(&self, path: &[Self::Unit]) {
let guard = self.read();
if let Some(coordinator) = &guard.eviction_coordinator {
use crate::persistent_artrie::eviction::lru_tracker::hash_char_path;
coordinator.lru_registry().touch_hash(hash_char_path(path));
}
}
}
pub type IndexedVocabularyPersistent = PersistentVocabARTrie;
#[deprecated(since = "0.9.0", note = "Use SharedVocabARTrie instead")]
pub type SharedVocabTrie = SharedVocabARTrie;
#[deprecated(since = "0.9.0", note = "Use PersistentVocabARTrie directly instead")]
pub type DiskBackedVocabTrieInner = PersistentVocabARTrie;
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_vocab_trie_basic() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.vocab");
let vocab = PersistentVocabARTrie::create(&path).unwrap();
let idx1 = vocab.insert("apple").expect("insert apple");
let idx2 = vocab.insert("banana").expect("insert banana");
let idx3 = vocab.insert("cherry").expect("insert cherry");
assert_eq!(idx1, 0);
assert_eq!(idx2, 1);
assert_eq!(idx3, 2);
assert_eq!(vocab.len(), 3);
assert_eq!(vocab.get_index("apple"), Some(0));
assert_eq!(vocab.get_index("banana"), Some(1));
assert_eq!(vocab.get_index("cherry"), Some(2));
assert_eq!(vocab.get_index("durian"), None);
assert_eq!(vocab.get_term(0), Some("apple".to_string()));
assert_eq!(vocab.get_term(1), Some("banana".to_string()));
assert_eq!(vocab.get_term(2), Some("cherry".to_string()));
assert_eq!(vocab.get_term(999), None);
}
#[test]
fn test_vocab_trie_unicode() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.vocab");
let vocab = PersistentVocabARTrie::create(&path).unwrap();
vocab.insert("日本語").expect("insert term failed");
vocab.insert("中文").expect("insert term failed");
vocab.insert("한글").expect("insert term failed");
vocab.insert("العربية").expect("insert term failed");
vocab.insert("emoji😀").expect("insert term failed");
assert_eq!(vocab.get_index("日本語"), Some(0));
assert_eq!(vocab.get_index("中文"), Some(1));
assert_eq!(vocab.get_index("한글"), Some(2));
assert_eq!(vocab.get_index("العربية"), Some(3));
assert_eq!(vocab.get_index("emoji😀"), Some(4));
assert_eq!(vocab.get_term(0), Some("日本語".to_string()));
assert_eq!(vocab.get_term(4), Some("emoji😀".to_string()));
}
#[test]
fn test_vocab_trie_custom_start() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.vocab");
let vocab = PersistentVocabARTrie::create_with_start_index(&path, 10).unwrap();
let idx1 = vocab.insert("first").expect("insert first");
let idx2 = vocab.insert("second").expect("insert second");
assert_eq!(idx1, 10);
assert_eq!(idx2, 11);
assert_eq!(vocab.start_index(), 10);
}
#[test]
fn test_vocab_trie_idempotent_insert() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.vocab");
let vocab = PersistentVocabARTrie::create(&path).unwrap();
let idx1 = vocab.insert("duplicate").expect("insert duplicate");
let idx2 = vocab.insert("duplicate").expect("insert duplicate again");
let idx3 = vocab
.insert("duplicate")
.expect("insert duplicate third time");
assert_eq!(idx1, 0);
assert_eq!(idx2, 0);
assert_eq!(idx3, 0);
assert_eq!(vocab.len(), 1);
}
#[test]
fn test_vocab_trie_traits() {
use crate::Dictionary;
use crate::MappedDictionary;
let dir = tempdir().unwrap();
let path = dir.path().join("test.vocab");
let vocab = PersistentVocabARTrie::create(&path).unwrap();
vocab.insert("test").expect("insert term failed");
assert!(Dictionary::contains(&vocab, "test"));
assert!(!Dictionary::contains(&vocab, "missing"));
assert_eq!(Dictionary::len(&vocab), Some(1));
assert_eq!(MappedDictionary::get_value(&vocab, "test"), Some(0));
assert_eq!(MappedDictionary::get_value(&vocab, "missing"), None);
}
#[test]
fn test_vocab_trie_artrie_trait() {
use crate::artrie_trait::ARTrie;
let dir = tempdir().unwrap();
let path = dir.path().join("test.vocab");
let vocab: SharedVocabARTrie = ARTrie::create(&path).unwrap();
assert!(ARTrie::insert(&vocab, "hello"));
assert!(!ARTrie::insert(&vocab, "hello")); assert!(ARTrie::contains(&vocab, "hello"));
assert_eq!(ARTrie::get_value(&vocab, "hello"), Some(0));
assert_eq!(ARTrie::len(&vocab), 1);
}
#[test]
fn test_vocab_trie_lsn_tracking() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.vocab");
let vocab = PersistentVocabARTrie::create(&path).unwrap();
let initial_lsn = vocab.current_lsn();
assert!(initial_lsn > 0);
assert!(vocab.synced_lsn().is_none());
vocab.insert("test").expect("insert term failed");
assert!(vocab.current_lsn() > initial_lsn);
vocab.sync().unwrap();
assert!(vocab.synced_lsn().is_some());
}
#[test]
fn test_vocab_trie_durability_policy() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.vocab");
let mut vocab = PersistentVocabARTrie::create(&path).unwrap();
assert_eq!(vocab.durability_policy(), DurabilityPolicy::Immediate);
vocab.set_durability_policy(DurabilityPolicy::Periodic);
assert_eq!(vocab.durability_policy(), DurabilityPolicy::Periodic);
}
#[test]
fn test_shared_vocab_artrie() {
use crate::artrie_trait::ARTrie;
let dir = tempdir().unwrap();
let path = dir.path().join("test.vocab");
let vocab: SharedVocabARTrie = ARTrie::create(&path).unwrap();
assert!(ARTrie::insert(&vocab, "hello"));
assert!(ARTrie::insert(&vocab, "world"));
assert!(ARTrie::contains(&vocab, "hello"));
assert!(ARTrie::contains(&vocab, "world"));
assert_eq!(ARTrie::len(&vocab), 2);
ARTrie::checkpoint(&vocab).unwrap();
}
}