use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::persistent_artrie::block_storage::BlockStorage;
use crate::persistent_artrie::error::Result;
use crate::persistent_artrie_core::durability::DurabilityPolicy;
use crate::persistent_artrie_core::key_encoding::{CharKey, KeyEncoding};
use crate::persistent_artrie_core::overlay::checkpoint::OverlayCheckpoint;
use crate::persistent_artrie_core::overlay::durable_write::{
DurableOverlayWrite, ValuePublishOutcome, ValueWriteMode,
};
use crate::persistent_artrie_core::overlay::evict::OverlayEvictable;
use crate::persistent_artrie_core::overlay::flip::LockFreeOverlay;
use crate::persistent_artrie_core::wal::{Lsn, RankRegime, WalRecord};
use crate::value::DictionaryValue;
use super::persist::CheckpointSnapshot;
impl<V: DictionaryValue, S: BlockStorage> LockFreeOverlay<CharKey, V, S>
for super::PersistentARTrieChar<V, S>
{
type CounterValue = u64;
#[inline]
fn lockfree_root(
&self,
) -> Option<&crate::persistent_artrie_core::overlay::AtomicNodePtr<CharKey, V>> {
self.lockfree_root.as_ref()
}
#[inline]
fn install_overlay(&mut self) {
super::PersistentARTrieChar::install_overlay(self)
}
#[inline]
fn wal_current_lsn(&self) -> Option<u64> {
self.wal_writer.as_ref().map(|w| w.current_lsn())
}
#[inline]
fn wal_is_overlay_regime(&self) -> bool {
self.wal_writer
.as_ref()
.map(|w| w.rank_regime() == RankRegime::Overlay)
.unwrap_or(false)
}
fn wal_stamp_overlay_regime(&self) {
if let Some(ref writer) = self.wal_writer {
if let Err(e) = writer.set_overlay_regime() {
log::warn!("install_overlay: could not stamp Overlay regime: {:?}", e);
}
}
}
fn overlay_publish_membership(&self, units: &[u32]) {
let term = CharKey::units_to_term(units);
self.insert_cas(&term);
}
fn overlay_counter_get(&self, units: &[u32]) -> Option<u64> {
use std::any::Any;
let term = CharKey::units_to_term(units);
(self as &dyn Any)
.downcast_ref::<super::PersistentARTrieChar<u64, S>>()
.and_then(|trie_u64| trie_u64.get_lockfree(&term))
}
fn overlay_contains(&self, units: &[u32]) -> bool {
let term = CharKey::units_to_term(units);
self.contains_lockfree(&term)
}
fn overlay_publish_value(&self, units: &[u32], value: V) {
use super::nodes::persistent_node::PersistentCharNode;
let lockfree_root = match self.lockfree_root.as_ref() {
Some(r) => r,
None => return,
};
let _epoch = self.epoch_manager.enter_read();
loop {
let root = match lockfree_root.load() {
Some(r) => r,
None => {
let _ = lockfree_root.try_init(Arc::new(PersistentCharNode::<V>::new()));
continue;
}
};
match self.build_value_path_recursive(&root, units, 0, value.clone()) {
Some(new_root) => match lockfree_root.compare_exchange(&root, new_root) {
Ok(_) => {
if let Some(ref cache) = self.lockfree_cache {
cache.insert(CharKey::units_to_term(units), true);
}
return;
}
Err(_) => continue,
},
None => return,
}
}
}
fn claim_commit_seq(&self) -> u64 {
use std::sync::atomic::Ordering;
self.commit_seq.fetch_add(1, Ordering::AcqRel) + 1
}
fn note_cas_retry(&self) {
use std::sync::atomic::Ordering;
self.cas_retries.fetch_add(1, Ordering::Relaxed);
}
fn install_prebuilt_overlay_root_seam(
&mut self,
root: Arc<crate::persistent_artrie_core::overlay::node::OverlayNode<CharKey, V>>,
) {
self.install_prebuilt_overlay_root_inherent(root)
}
fn overlay_try_remove_path(&self, units: &[u32]) {
self.overlay_remove_no_wal(units)
}
fn load_root_immutable_seam(&mut self, root_ptr: u64) -> Result<bool> {
let buffer_manager = self.buffer_manager.clone().ok_or_else(|| {
crate::persistent_artrie_core::error::PersistentARTrieError::internal(
"F7 load_root_immutable_seam: no buffer manager",
)
})?;
let (_term_count, image_loaded) = self.load_root_immutable(&buffer_manager, root_ptr)?;
Ok(image_loaded)
}
}
impl<V: DictionaryValue, S: BlockStorage> DurableOverlayWrite<CharKey, V, S>
for super::PersistentARTrieChar<V, S>
{
#[inline]
fn durability_policy(&self) -> DurabilityPolicy {
super::PersistentARTrieChar::durability_policy(self)
}
#[inline]
fn append_durable_wal(&self, record: WalRecord) -> Result<Lsn> {
self.append_to_wal_returning_lsn(record)
}
#[inline]
fn append_commit_rank(&self, data_lsn: Lsn, term: &[u8], generation: u64) -> Result<Lsn> {
super::PersistentARTrieChar::append_commit_rank(self, data_lsn, term, generation)
}
#[inline]
fn mark_committed(&self, lsn: Lsn) {
self.committed_watermark.mark_committed(lsn);
}
fn bound_increment_delta(&self, key: &str, delta: u64) -> Result<i64> {
i64::try_from(delta).map_err(|_| {
crate::persistent_artrie_core::error::PersistentARTrieError::InvalidOperation(format!(
"try_increment_cas_durable delta for term {:?} exceeds the i64 per-call WAL delta \
domain: {}",
key, delta
))
})
}
#[inline]
fn build_increment_record(&self, key_bytes: &[u8], bounded: i64) -> WalRecord {
WalRecord::BatchIncrement {
entries: vec![(key_bytes.to_vec(), bounded)],
}
}
fn increment_publish_inner(&self, key: &str, delta: u64) -> Result<(u64, u64)> {
use std::any::Any;
match (self as &dyn Any).downcast_ref::<super::PersistentARTrieChar<u64, S>>() {
Some(trie_u64) => trie_u64.try_increment_cas_inner(key, delta),
None => Ok((0, 0)),
}
}
fn value_present_faulting(&self, key_bytes: &[u8]) -> Result<bool> {
let term = std::str::from_utf8(key_bytes).map_err(|e| {
crate::persistent_artrie_core::error::PersistentARTrieError::internal(format!(
"char key not valid UTF-8: {}",
e
))
})?;
let chars: Vec<u32> = term.chars().map(|c| c as u32).collect();
let lockfree_root = self.lockfree_root.as_ref().ok_or_else(|| {
crate::persistent_artrie_core::error::PersistentARTrieError::InvalidOperation(
"Lock-free mode not enabled. Call install_overlay() first.".to_string(),
)
})?;
let _epoch = self.epoch_manager.enter_read();
Ok(
match self.find_leaf_faulting(
lockfree_root,
&chars,
super::lockfree_cas::DEFAULT_MAX_FAULTIN_RETRIES,
) {
Ok(found) => found.is_some(),
Err(_) => self.find_leaf_lockfree(lockfree_root, &chars).is_some(),
},
)
}
fn value_read_faulting(&self, key_bytes: &[u8]) -> Result<Option<V>> {
let term = std::str::from_utf8(key_bytes).map_err(|e| {
crate::persistent_artrie_core::error::PersistentARTrieError::internal(format!(
"char key not valid UTF-8: {}",
e
))
})?;
let chars: Vec<u32> = term.chars().map(|c| c as u32).collect();
let lockfree_root = self.lockfree_root.as_ref().ok_or_else(|| {
crate::persistent_artrie_core::error::PersistentARTrieError::InvalidOperation(
"Lock-free mode not enabled. Call install_overlay() first.".to_string(),
)
})?;
let _epoch = self.epoch_manager.enter_read();
Ok(
match self.find_leaf_faulting(
lockfree_root,
&chars,
super::lockfree_cas::DEFAULT_MAX_FAULTIN_RETRIES,
) {
Ok(found) => found.and_then(|leaf| leaf.get_value()),
Err(_) => self
.find_leaf_lockfree(lockfree_root, &chars)
.and_then(|leaf| leaf.get_value()),
},
)
}
fn value_publish_inner(
&self,
key_bytes: &[u8],
value: V,
mode: ValueWriteMode,
) -> Result<ValuePublishOutcome> {
use super::nodes::persistent_node::PersistentCharNode;
let term = std::str::from_utf8(key_bytes).map_err(|e| {
crate::persistent_artrie_core::error::PersistentARTrieError::internal(format!(
"char key not valid UTF-8: {}",
e
))
})?;
let chars: Vec<u32> = term.chars().map(|c| c as u32).collect();
let lockfree_root = self.lockfree_root.as_ref().ok_or_else(|| {
crate::persistent_artrie_core::error::PersistentARTrieError::InvalidOperation(
"Lock-free mode not enabled. Call install_overlay() first.".to_string(),
)
})?;
let _epoch = self.epoch_manager.enter_read();
loop {
let commit_seq = self.commit_seq.fetch_add(1, Ordering::AcqRel) + 1;
let root = match lockfree_root.load() {
Some(r) => r,
None => {
let new_root = Arc::new(PersistentCharNode::<V>::new());
let _ = lockfree_root.try_init(new_root);
continue;
}
};
match &mode {
ValueWriteMode::InsertOnce => {
if self.find_leaf_recursive(&root, &chars, 0).is_some() {
return Ok(ValuePublishOutcome::NotApplied);
}
}
ValueWriteMode::Upsert => {}
ValueWriteMode::CompareAndSwap { expected_bytes } => {
let cur = self
.find_leaf_recursive(&root, &chars, 0)
.and_then(|leaf| leaf.get_value());
let cur_bytes = match &cur {
Some(v) => {
Some(crate::serialization::bincode_compat::serialize(v).map_err(|e| {
crate::persistent_artrie_core::error::PersistentARTrieError::internal(
format!("Failed to serialize value: {}", e),
)
})?)
}
None => None,
};
if &cur_bytes != expected_bytes {
return Ok(ValuePublishOutcome::NotApplied);
}
}
}
let new_root = match self.build_value_path_recursive(&root, &chars, 0, value.clone()) {
Some(r) => r,
None => {
self.cas_retries.fetch_add(1, Ordering::Relaxed);
return Err(crate::persistent_artrie_core::error::PersistentARTrieError::internal(
"value_publish_inner: could not fault an evicted prefix in to publish the \
valued leaf; the record is durable and replays on reopen",
));
}
};
match lockfree_root.compare_exchange(&root, new_root) {
Ok(_) => {
if let Some(ref cache) = self.lockfree_cache {
cache.insert(term.to_string(), true);
}
return Ok(ValuePublishOutcome::Published(commit_seq));
}
Err(_actual) => {
self.cas_retries.fetch_add(1, Ordering::Relaxed);
continue;
}
}
}
}
}
impl<V: DictionaryValue, S: BlockStorage> OverlayCheckpoint<CharKey, V, S>
for super::PersistentARTrieChar<V, S>
{
type CheckpointSnapshot = CheckpointSnapshot;
#[inline]
fn has_eviction_coordinator(&self) -> bool {
self.eviction_coordinator
.lock()
.expect("eviction_coordinator mutex poisoned")
.is_some()
}
#[inline]
fn capture_overlay_snapshot(&self) -> Result<CheckpointSnapshot> {
self.capture_snapshot_immutable()
}
#[inline]
fn publish_overlay_snapshot_retaining(&self, snapshot: &CheckpointSnapshot) -> Result<()> {
self.publish_immutable_snapshot_retaining_wal(snapshot)
}
#[inline]
fn publish_overlay_snapshot_retaining_with_eviction(
&self,
snapshot: CheckpointSnapshot,
) -> Result<()> {
self.publish_immutable_snapshot_retaining_wal_with_eviction(snapshot)
}
}
impl<V: DictionaryValue, S: BlockStorage> super::PersistentARTrieChar<V, S> {
#[inline]
pub fn route_overlay(&self) -> bool {
<Self as LockFreeOverlay<CharKey, V, S>>::route_overlay(self)
}
}
#[cfg(test)]
mod tests {
#[test]
fn v1_arbitrary_v_create_flips_to_overlay() {
use crate::persistent_artrie_char::PersistentARTrieChar;
std::fs::create_dir_all("target/test-tmp").ok();
let dir = tempfile::Builder::new()
.prefix("v1-gate")
.tempdir_in("target/test-tmp")
.expect("scratch tempdir under target/test-tmp");
let path = dir.path().join("t.artc");
let trie = PersistentARTrieChar::<String>::create(&path).expect("create");
assert!(
trie.route_overlay(),
"a String trie create-flips to the overlay (default)"
);
}
}