use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::persistent_artrie::block_storage::BlockStorage;
use crate::persistent_artrie::error::{PersistentARTrieError, Result};
use crate::persistent_artrie::eviction::EvictionCoordinator;
use crate::persistent_artrie_core::concurrency::EpochManager;
use crate::persistent_artrie_core::durability::DurabilityPolicy;
use crate::persistent_artrie_core::key_encoding::{CharKey, KeyEncoding};
use crate::persistent_artrie_core::overlay::durable_write::{
DurableOverlayWrite, ValuePublishOutcome, ValueWriteMode,
};
use crate::persistent_artrie_core::overlay::evict::OverlayEvictable;
use crate::persistent_artrie_core::overlay::f5_build::build_overlay_root_from_terms;
use crate::persistent_artrie_core::overlay::faulter::OverlayFaulter;
use crate::persistent_artrie_core::overlay::flip::LockFreeOverlay;
use crate::persistent_artrie_core::overlay::node::OverlayNode;
use crate::persistent_artrie_core::overlay::AtomicNodePtr;
use crate::persistent_artrie_core::swizzled_ptr::SwizzledPtr;
use crate::persistent_artrie_core::wal::{Lsn, RankRegime, WalRecord};
use dashmap::DashMap;
use super::dict_impl::PersistentVocabARTrie;
type VocabOverlayNode = OverlayNode<CharKey, u64>;
impl<S: BlockStorage> OverlayEvictable<CharKey, u64, S> for PersistentVocabARTrie<S> {
#[inline]
fn overlay_root_slot(&self) -> Option<&AtomicNodePtr<CharKey, u64>> {
self.lockfree_root.as_ref()
}
#[inline]
fn overlay_epoch_manager(&self) -> &EpochManager {
&self.epoch_manager
}
#[inline]
fn overlay_eviction_coordinator(&self) -> Option<Arc<EvictionCoordinator>> {
self.eviction_coordinator.as_ref().map(Arc::clone)
}
#[inline]
fn note_faultin_cas(&self) {
self.cas_retries.fetch_add(1, Ordering::Relaxed);
}
}
impl<S: BlockStorage> OverlayFaulter<CharKey, u64> for PersistentVocabARTrie<S> {
#[inline]
fn fault_overlay_slot(&self, _slot: &SwizzledPtr) -> Option<Arc<VocabOverlayNode>> {
None
}
}
impl<S: BlockStorage> LockFreeOverlay<CharKey, u64, S> for PersistentVocabARTrie<S> {
type CounterValue = u64;
#[inline]
fn lockfree_root(&self) -> Option<&AtomicNodePtr<CharKey, u64>> {
self.lockfree_root.as_ref()
}
#[inline]
fn install_overlay(&mut self) {
PersistentVocabARTrie::install_overlay(self);
}
#[inline]
fn wal_current_lsn(&self) -> Option<u64> {
self.wal_writer.as_ref().map(|w| w.current_lsn())
}
#[inline]
fn wal_is_overlay_regime(&self) -> bool {
self.wal_writer
.as_ref()
.map(|w| w.rank_regime() == RankRegime::Overlay)
.unwrap_or(false)
}
fn wal_stamp_overlay_regime(&self) {
if let Some(ref writer) = self.wal_writer {
if let Err(e) = writer.set_overlay_regime() {
log::warn!(
"vocab install_overlay: could not stamp Overlay regime: {:?}",
e
);
}
}
}
fn overlay_publish_membership(&self, units: &[u32]) {
debug_assert!(
false,
"vocab overlay_publish_membership: vocab inserts always carry a value=id"
);
let _ = units;
}
fn overlay_counter_get(&self, units: &[u32]) -> Option<u64> {
let term = CharKey::units_to_term(units);
self.get_index_lockfree(&term)
}
fn overlay_contains(&self, units: &[u32]) -> bool {
let term = CharKey::units_to_term(units);
self.get_index_lockfree(&term).is_some()
}
fn overlay_publish_value(&self, units: &[u32], value: u64) {
let lockfree_root = match self.lockfree_root.as_ref() {
Some(r) => r,
None => return,
};
let _epoch = self.epoch_manager.enter_read();
loop {
let root = match lockfree_root.load() {
Some(r) => r,
None => {
let _ = lockfree_root.try_init(Arc::new(
crate::persistent_artrie_char::nodes::PersistentCharNode::<u64>::new(),
));
continue;
}
};
match self.try_insert_lockfree_path(&root, units, value) {
Ok(new_root) => match lockfree_root.compare_exchange(&root, new_root) {
Ok(_) => {
if let Some(ref cache) = self.lockfree_cache {
cache.insert(CharKey::units_to_term(units), value);
}
return;
}
Err(_) => continue,
},
Err(_) => return,
}
}
}
fn claim_commit_seq(&self) -> u64 {
self.commit_seq.fetch_add(1, Ordering::AcqRel) + 1
}
fn note_cas_retry(&self) {
self.cas_retries.fetch_add(1, Ordering::Relaxed);
}
fn install_prebuilt_overlay_root_seam(&mut self, root: Arc<VocabOverlayNode>) {
self.install_prebuilt_overlay_root_inherent(root)
}
fn overlay_try_remove_path(&self, units: &[u32]) {
self.overlay_remove_no_wal(units)
}
fn load_root_immutable_seam(&mut self, root_ptr: u64) -> Result<bool> {
let (_term_count, image_loaded) = self.load_root_immutable(root_ptr)?;
Ok(image_loaded)
}
}
impl<S: BlockStorage> DurableOverlayWrite<CharKey, u64, S> for PersistentVocabARTrie<S> {
#[inline]
fn durability_policy(&self) -> DurabilityPolicy {
self.durability_policy
}
#[inline]
fn append_durable_wal(&self, record: WalRecord) -> Result<Lsn> {
self.append_to_wal_returning_lsn(record)
}
#[inline]
fn append_commit_rank(&self, data_lsn: Lsn, term: &[u8], generation: u64) -> Result<Lsn> {
self.append_vocab_commit_rank(data_lsn, term, generation)
}
#[inline]
fn mark_committed(&self, lsn: Lsn) {
self.committed_watermark.mark_committed(lsn);
}
fn bound_increment_delta(&self, key: &str, _delta: u64) -> Result<i64> {
Err(PersistentARTrieError::InvalidOperation(format!(
"vocab does not support counter increment (term {key:?}); ids are write-once"
)))
}
fn build_increment_record(&self, key_bytes: &[u8], bounded: i64) -> WalRecord {
WalRecord::BatchIncrement {
entries: vec![(key_bytes.to_vec(), bounded)],
}
}
fn increment_publish_inner(&self, key: &str, _delta: u64) -> Result<(u64, u64)> {
Err(PersistentARTrieError::InvalidOperation(format!(
"vocab does not support counter increment (term {key:?}); ids are write-once"
)))
}
fn value_present_faulting(&self, key_bytes: &[u8]) -> Result<bool> {
let chars = vocab_chars(key_bytes)?;
let lockfree_root = self.require_lockfree_root()?;
let _epoch = self.epoch_manager.enter_read();
match lockfree_root.load() {
Some(root) => Ok(self.find_in_lockfree_trie(&root, &chars).is_some()),
None => Ok(false),
}
}
fn value_read_faulting(&self, key_bytes: &[u8]) -> Result<Option<u64>> {
let chars = vocab_chars(key_bytes)?;
let lockfree_root = self.require_lockfree_root()?;
let _epoch = self.epoch_manager.enter_read();
match lockfree_root.load() {
Some(root) => Ok(self.find_in_lockfree_trie(&root, &chars)),
None => Ok(None),
}
}
fn value_publish_inner(
&self,
key_bytes: &[u8],
value: u64,
mode: ValueWriteMode,
) -> Result<ValuePublishOutcome> {
match mode {
ValueWriteMode::InsertOnce => {}
ValueWriteMode::Upsert | ValueWriteMode::CompareAndSwap { .. } => {
return Err(PersistentARTrieError::InvalidOperation(
"vocab values are write-once ids; Upsert / CompareAndSwap are not supported"
.to_string(),
));
}
}
let chars = vocab_chars(key_bytes)?;
let term = std::str::from_utf8(key_bytes).map_err(|e| {
PersistentARTrieError::internal(format!("vocab key not valid UTF-8: {e}"))
})?;
let lockfree_root = self.require_lockfree_root()?;
let _epoch = self.epoch_manager.enter_read();
loop {
let commit_seq = self.commit_seq.fetch_add(1, Ordering::AcqRel) + 1;
let root = match lockfree_root.load() {
Some(r) => r,
None => {
let _ = lockfree_root.try_init(Arc::new(
crate::persistent_artrie_char::nodes::PersistentCharNode::<u64>::new(),
));
continue;
}
};
if self.find_in_lockfree_trie(&root, &chars).is_some() {
return Ok(ValuePublishOutcome::NotApplied);
}
match self.try_insert_lockfree_path(&root, &chars, value) {
Ok(new_root) => match lockfree_root.compare_exchange(&root, new_root) {
Ok(_) => {
if let Some(ref cache) = self.lockfree_cache {
cache.insert(term.to_string(), value);
}
return Ok(ValuePublishOutcome::Published(commit_seq));
}
Err(_) => {
self.cas_retries.fetch_add(1, Ordering::Relaxed);
continue;
}
},
Err(_existing) => return Ok(ValuePublishOutcome::NotApplied),
}
}
}
}
impl<S: BlockStorage> PersistentVocabARTrie<S> {
#[inline]
pub fn route_overlay(&self) -> bool {
<Self as LockFreeOverlay<CharKey, u64, S>>::route_overlay(self)
}
#[inline]
#[inline]
pub(super) fn require_lockfree_root(&self) -> Result<&AtomicNodePtr<CharKey, u64>> {
self.lockfree_root.as_ref().ok_or_else(|| {
PersistentARTrieError::InvalidOperation(
"Lock-free mode not enabled. Call install_overlay() first.".to_string(),
)
})
}
pub(super) fn get_index_lockfree(&self, term: &str) -> Option<u64> {
if let Some(ref cache) = self.lockfree_cache {
if let Some(e) = cache.get(term) {
return Some(*e);
}
}
let lockfree_root = self.lockfree_root.as_ref()?;
let _epoch = self.epoch_manager.enter_read();
let root = lockfree_root.load()?;
let chars: Vec<u32> = term.chars().map(|c| c as u32).collect();
self.find_in_lockfree_trie(&root, &chars)
}
pub(super) fn install_prebuilt_overlay_root_inherent(&mut self, root: Arc<VocabOverlayNode>) {
self.lockfree_root = Some(AtomicNodePtr::new(root));
if self.lockfree_cache.is_none() {
self.lockfree_cache = Some(DashMap::new());
}
}
pub(super) fn overlay_remove_no_wal(&self, _units: &[u32]) {
debug_assert!(
false,
"vocab overlay_remove_no_wal: vocab is insert-only (terms are never deleted)"
);
}
pub(super) fn append_to_wal_returning_lsn(&self, record: WalRecord) -> Result<Lsn> {
let wal = self.wal_writer.as_ref().ok_or_else(|| {
PersistentARTrieError::Wal(
"vocab WAL writer unavailable for durable append".to_string(),
)
})?;
let lsn = wal
.append(record)
.map_err(|e| PersistentARTrieError::Wal(format!("vocab WAL append failed: {e}")))?;
self.next_lsn.fetch_max(lsn + 1, Ordering::AcqRel);
match self.durability_policy {
DurabilityPolicy::Immediate | DurabilityPolicy::GroupCommit => {
let synced = wal.sync().map_err(|e| {
PersistentARTrieError::Wal(format!("vocab WAL sync failed: {e}"))
})?;
self.synced_lsn.fetch_max(synced, Ordering::AcqRel);
}
DurabilityPolicy::Periodic | DurabilityPolicy::None => {}
}
Ok(lsn)
}
pub(super) fn append_vocab_commit_rank(
&self,
data_lsn: Lsn,
term: &[u8],
generation: u64,
) -> Result<Lsn> {
self.append_to_wal_returning_lsn(WalRecord::CommitRank {
data_lsn,
term: term.to_vec(),
generation,
})
}
pub(super) fn load_root_immutable(&mut self, root_ptr: u64) -> Result<(usize, bool)> {
if root_ptr == 0 {
let empty = build_overlay_root_from_terms::<CharKey, u64, _>(
Vec::<(Vec<u32>, Option<u64>)>::new(),
None,
);
self.install_prebuilt_overlay_root_inherent(empty);
return Ok((0, false));
}
let all_terms: Vec<String> = self.iter_terms().collect();
let mut terms: Vec<(Vec<u32>, Option<u64>)> = Vec::with_capacity(all_terms.len());
let mut empty_term: Option<Option<u64>> = None;
for term in &all_terms {
if let Some(id) = self.get_index(term) {
if term.is_empty() {
empty_term = Some(Some(id));
} else {
terms.push((term.chars().map(|c| c as u32).collect(), Some(id)));
}
}
}
let count = terms.len() + usize::from(empty_term.is_some());
let overlay_root = build_overlay_root_from_terms::<CharKey, u64, _>(terms, empty_term);
self.install_prebuilt_overlay_root_inherent(overlay_root);
Ok((count, true))
}
}
#[inline]
fn vocab_chars(key_bytes: &[u8]) -> Result<Vec<u32>> {
let term = std::str::from_utf8(key_bytes)
.map_err(|e| PersistentARTrieError::internal(format!("vocab key not valid UTF-8: {e}")))?;
Ok(term.chars().map(|c| c as u32).collect())
}