use std::sync::atomic::Ordering;
use std::sync::Arc;
use super::block_storage::BlockStorage;
use super::dict_impl::PersistentARTrie;
use super::error::{PersistentARTrieError, Result};
use crate::persistent_artrie_core::durability::DurabilityPolicy;
use crate::persistent_artrie_core::key_encoding::{ByteKey, KeyEncoding};
use crate::persistent_artrie_core::overlay::durable_write::{
DurableOverlayWrite, ValuePublishOutcome, ValueWriteMode,
};
use crate::persistent_artrie_core::overlay::flip::LockFreeOverlay;
use crate::persistent_artrie_core::wal::{Lsn, RankRegime, WalRecord};
use crate::value::DictionaryValue;
impl<V: DictionaryValue, S: BlockStorage> LockFreeOverlay<ByteKey, V, S>
for PersistentARTrie<V, S>
{
type CounterValue = u64;
#[inline]
fn lockfree_root(
&self,
) -> Option<&crate::persistent_artrie_core::overlay::AtomicNodePtr<ByteKey, V>> {
self.lockfree_root.as_ref()
}
#[inline]
fn install_overlay(&mut self) {
PersistentARTrie::install_overlay(self)
}
#[inline]
fn wal_current_lsn(&self) -> Option<u64> {
self.wal_writer.as_ref().map(|w| w.current_lsn())
}
#[inline]
fn wal_is_overlay_regime(&self) -> bool {
self.wal_writer
.as_ref()
.map(|w| w.rank_regime() == RankRegime::Overlay)
.unwrap_or(false)
}
fn wal_stamp_overlay_regime(&self) {
if let Some(ref writer) = self.wal_writer {
if let Err(e) = writer.set_overlay_regime() {
log::warn!("install_overlay: could not stamp Overlay regime: {:?}", e);
}
}
}
fn overlay_publish_membership(&self, units: &[u8]) {
self.insert_cas(units);
}
fn overlay_counter_get(&self, units: &[u8]) -> Option<u64> {
use std::any::Any;
(self as &dyn Any)
.downcast_ref::<PersistentARTrie<u64, S>>()
.and_then(|trie_u64| trie_u64.get_lockfree(units))
}
fn overlay_contains(&self, units: &[u8]) -> bool {
self.contains_lockfree(units)
}
fn overlay_publish_value(&self, units: &[u8], value: V) {
use super::nodes::persistent_node::PersistentNode;
let lockfree_root = match self.lockfree_root.as_ref() {
Some(r) => r,
None => return,
};
let _epoch = self.epoch_manager.enter_read();
loop {
let root = match lockfree_root.load() {
Some(r) => r,
None => {
let _ = lockfree_root.try_init(Arc::new(PersistentNode::<V>::new()));
continue;
}
};
match self.build_value_path_recursive(&root, units, 0, value.clone()) {
Some(new_root) => match lockfree_root.compare_exchange(&root, new_root) {
Ok(_) => {
if let Some(ref cache) = self.lockfree_cache {
cache.insert(units.to_vec(), true);
}
return;
}
Err(_) => continue,
},
None => return,
}
}
}
fn claim_commit_seq(&self) -> u64 {
use std::sync::atomic::Ordering;
self.commit_seq.fetch_add(1, Ordering::AcqRel) + 1
}
fn note_cas_retry(&self) {
use std::sync::atomic::Ordering;
self.cas_retries.fetch_add(1, Ordering::Relaxed);
}
fn install_prebuilt_overlay_root_seam(
&mut self,
root: Arc<crate::persistent_artrie_core::overlay::node::OverlayNode<ByteKey, V>>,
) {
self.install_prebuilt_overlay_root_inherent(root)
}
fn overlay_try_remove_path(&self, units: &[u8]) {
self.overlay_remove_no_wal(units)
}
fn load_root_immutable_seam(&mut self, root_ptr: u64) -> Result<bool> {
let (_term_count, image_loaded) = self.load_root_immutable(root_ptr)?;
Ok(image_loaded)
}
}
impl<V: DictionaryValue, S: BlockStorage> DurableOverlayWrite<ByteKey, V, S>
for PersistentARTrie<V, S>
{
#[inline]
fn durability_policy(&self) -> DurabilityPolicy {
PersistentARTrie::durability_policy(self)
}
#[inline]
fn append_durable_wal(&self, record: WalRecord) -> Result<Lsn> {
self.append_to_wal_returning_lsn(record)
}
#[inline]
fn append_commit_rank(&self, data_lsn: Lsn, term: &[u8], generation: u64) -> Result<Lsn> {
PersistentARTrie::append_commit_rank(self, data_lsn, term, generation)
}
#[inline]
fn mark_committed(&self, lsn: Lsn) {
self.committed_watermark.mark_committed(lsn);
}
fn bound_increment_delta(&self, key: &str, delta: u64) -> Result<i64> {
i64::try_from(delta).map_err(|_| {
PersistentARTrieError::InvalidOperation(format!(
"try_increment_cas_durable delta for byte term {:?} exceeds the i64 per-call WAL \
delta domain: {}",
key, delta
))
})
}
#[inline]
fn build_increment_record(&self, key_bytes: &[u8], bounded: i64) -> WalRecord {
WalRecord::BatchIncrement {
entries: vec![(key_bytes.to_vec(), bounded)],
}
}
fn increment_publish_inner(&self, key: &str, delta: u64) -> Result<(u64, u64)> {
use std::any::Any;
match (self as &dyn Any).downcast_ref::<PersistentARTrie<u64, S>>() {
Some(trie_u64) => trie_u64.try_increment_cas_inner(key.as_bytes(), delta),
None => Ok((0, 0)),
}
}
fn value_present_faulting(&self, key_bytes: &[u8]) -> Result<bool> {
let lockfree_root = self.lockfree_root.as_ref().ok_or_else(|| {
PersistentARTrieError::InvalidOperation(
"Lock-free mode not enabled. Call install_overlay() first.".to_string(),
)
})?;
let _epoch = self.epoch_manager.enter_read();
Ok(self.find_leaf_lockfree(lockfree_root, key_bytes).is_some())
}
fn value_read_faulting(&self, key_bytes: &[u8]) -> Result<Option<V>> {
let lockfree_root = self.lockfree_root.as_ref().ok_or_else(|| {
PersistentARTrieError::InvalidOperation(
"Lock-free mode not enabled. Call install_overlay() first.".to_string(),
)
})?;
let _epoch = self.epoch_manager.enter_read();
Ok(self
.find_leaf_lockfree(lockfree_root, key_bytes)
.and_then(|leaf| leaf.get_value()))
}
fn value_publish_inner(
&self,
key_bytes: &[u8],
value: V,
mode: ValueWriteMode,
) -> Result<ValuePublishOutcome> {
use super::nodes::persistent_node::PersistentNode;
let lockfree_root = self.lockfree_root.as_ref().ok_or_else(|| {
PersistentARTrieError::InvalidOperation(
"Lock-free mode not enabled. Call install_overlay() first.".to_string(),
)
})?;
let _epoch = self.epoch_manager.enter_read();
loop {
let commit_seq = self.commit_seq.fetch_add(1, Ordering::AcqRel) + 1;
let root = match lockfree_root.load() {
Some(r) => r,
None => {
let new_root = Arc::new(PersistentNode::<V>::new());
let _ = lockfree_root.try_init(new_root);
continue;
}
};
match &mode {
ValueWriteMode::InsertOnce => {
if self.find_leaf_recursive(&root, key_bytes, 0).is_some() {
return Ok(ValuePublishOutcome::NotApplied);
}
}
ValueWriteMode::Upsert => {}
ValueWriteMode::CompareAndSwap { expected_bytes } => {
let cur = self
.find_leaf_recursive(&root, key_bytes, 0)
.and_then(|leaf| leaf.get_value());
let cur_bytes = match &cur {
Some(v) => Some(
crate::serialization::bincode_compat::serialize(v).map_err(|e| {
PersistentARTrieError::internal(format!(
"Failed to serialize value: {}",
e
))
})?,
),
None => None,
};
if &cur_bytes != expected_bytes {
return Ok(ValuePublishOutcome::NotApplied);
}
}
}
let new_root = match self.build_value_path_recursive(&root, key_bytes, 0, value.clone())
{
Some(r) => r,
None => {
self.cas_retries.fetch_add(1, Ordering::Relaxed);
return Err(PersistentARTrieError::internal(
"value_publish_inner: an on-disk overlay child blocked the path-copy; \
the record is durable and replays on reopen",
));
}
};
match lockfree_root.compare_exchange(&root, new_root) {
Ok(_) => {
if let Some(ref cache) = self.lockfree_cache {
cache.insert(key_bytes.to_vec(), true);
}
return Ok(ValuePublishOutcome::Published(commit_seq));
}
Err(_actual) => {
self.cas_retries.fetch_add(1, Ordering::Relaxed);
continue;
}
}
}
}
}
impl<V: DictionaryValue, S: BlockStorage> PersistentARTrie<V, S> {
#[inline]
pub fn route_overlay(&self) -> bool {
<Self as LockFreeOverlay<ByteKey, V, S>>::route_overlay(self)
}
#[cfg_attr(not(test), allow(dead_code))]
#[inline]
#[cfg_attr(not(test), allow(dead_code))]
#[inline]
#[cfg_attr(not(test), allow(dead_code))]
#[inline]
pub(crate) fn overlay_len(&self) -> usize {
<Self as LockFreeOverlay<ByteKey, V, S>>::overlay_len(self)
}
#[cfg_attr(not(test), allow(dead_code))]
#[inline]
pub(crate) fn overlay_is_empty(&self) -> bool {
<Self as LockFreeOverlay<ByteKey, V, S>>::overlay_is_empty(self)
}
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn overlay_iter_prefix(&self, prefix: &[u8]) -> Option<Vec<Vec<u8>>> {
<Self as LockFreeOverlay<ByteKey, V, S>>::overlay_collect_units(self, prefix).map(|seqs| {
seqs.into_iter()
.map(|units| ByteKey::units_to_term(&units))
.collect()
})
}
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn overlay_iter_prefix_with_values(
&self,
prefix: &[u8],
) -> Option<Vec<(Vec<u8>, V)>> {
<Self as LockFreeOverlay<ByteKey, V, S>>::overlay_collect_units_with_values(self, prefix)
.map(|seqs| {
seqs.into_iter()
.map(|(units, v)| (ByteKey::units_to_term(&units), v))
.collect()
})
}
#[cfg_attr(not(test), allow(dead_code))]
#[inline]
pub(crate) fn overlay_get_value(&self, term: &[u8]) -> Option<Option<V>> {
<Self as LockFreeOverlay<ByteKey, V, S>>::overlay_route_get_value(self, term)
}
}
#[cfg(test)]
mod tests {
#[test]
fn byte_create_flips_all_v_to_overlay() {
use crate::persistent_artrie::PersistentARTrie;
std::fs::create_dir_all("target/test-tmp").ok();
let dir = tempfile::Builder::new()
.prefix("byte-m4b-create-flip")
.tempdir_in("target/test-tmp")
.expect("scratch tempdir under target/test-tmp");
let path_unit = dir.path().join("unit.part");
let trie_unit = PersistentARTrie::<()>::create(&path_unit).expect("create<()>");
assert!(
trie_unit.route_overlay(),
"M4b: a fresh create::<()>() must flip to the overlay (route_overlay true)"
);
let path_i64 = dir.path().join("i64.part");
let trie_i64 = PersistentARTrie::<i64>::create(&path_i64).expect("create<i64>");
assert!(
trie_i64.route_overlay(),
"M4b: a fresh create::<i64>() must flip to the overlay (route_overlay true)"
);
let path_str = dir.path().join("str.part");
let trie_str = PersistentARTrie::<String>::create(&path_str).expect("create<String>");
assert!(
trie_str.route_overlay(),
"arbitrary V (String) create-flips to the overlay (default)"
);
}
#[test]
fn byte_eligible_v_gate() {
use crate::persistent_artrie::PersistentARTrie;
}
#[test]
fn byte_arbitrary_v_create_flips_to_overlay() {
use crate::persistent_artrie::PersistentARTrie;
std::fs::create_dir_all("target/test-tmp").ok();
let dir = tempfile::Builder::new()
.prefix("byte-arbitrary-v")
.tempdir_in("target/test-tmp")
.expect("scratch tempdir under target/test-tmp");
let path = dir.path().join("t.part");
let trie = PersistentARTrie::<String>::create(&path).expect("create");
assert!(
trie.route_overlay(),
"a String trie create-flips to the overlay (default)"
);
}
}