use crate::persistent_artrie_core::durability::DurabilityPolicy;
use crate::persistent_artrie_core::error::Result;
use crate::persistent_artrie_core::key_encoding::KeyEncoding;
use crate::persistent_artrie_core::overlay::flip::LockFreeOverlay;
use crate::persistent_artrie_core::wal::{Lsn, WalRecord};
use crate::value::DictionaryValue;
pub(crate) enum ValueWriteMode {
InsertOnce,
Upsert,
CompareAndSwap { expected_bytes: Option<Vec<u8>> },
}
pub(crate) enum ValuePublishOutcome {
Published(u64),
NotApplied,
}
pub(crate) trait DurableOverlayWrite<K: KeyEncoding, V: DictionaryValue, S>:
LockFreeOverlay<K, V, S>
{
fn durability_policy(&self) -> DurabilityPolicy;
fn append_durable_wal(&self, record: WalRecord) -> Result<Lsn>;
fn append_commit_rank(&self, data_lsn: Lsn, term: &[u8], generation: u64) -> Result<Lsn>;
fn mark_committed(&self, lsn: Lsn);
fn bound_increment_delta(&self, key: &str, delta: Self::CounterValue) -> Result<i64>;
fn build_increment_record(&self, key_bytes: &[u8], bounded: i64) -> WalRecord;
fn increment_publish_inner(
&self,
key: &str,
delta: Self::CounterValue,
) -> Result<(Self::CounterValue, u64)>;
fn durable_policy_gate(&self, method: &str, noun: &str) -> Result<()> {
match self.durability_policy() {
DurabilityPolicy::Immediate | DurabilityPolicy::GroupCommit => Ok(()),
DurabilityPolicy::Periodic | DurabilityPolicy::None => Err(
crate::persistent_artrie_core::error::PersistentARTrieError::InvalidOperation(
format!(
"{method} requires Immediate or GroupCommit durability so an \
acknowledged {noun} is guaranteed durable before it becomes visible"
),
),
),
}
}
fn commit_rank_and_mark(&self, data_lsn: Lsn, key_bytes: &[u8], generation: u64) -> Result<()> {
let rank_lsn = self.append_commit_rank(data_lsn, key_bytes, generation)?;
self.mark_committed(data_lsn);
self.mark_committed(rank_lsn);
Ok(())
}
fn mark_committed_burned(&self, lsn: Lsn) {
self.mark_committed(lsn);
}
fn try_increment_cas_durable_default(
&self,
key: &str,
key_bytes: &[u8],
delta: Self::CounterValue,
) -> Result<Self::CounterValue> {
self.durable_policy_gate("try_increment_cas_durable", "increment")?;
if self.lockfree_root().is_none() {
return Err(
crate::persistent_artrie_core::error::PersistentARTrieError::InvalidOperation(
"Lock-free mode not enabled. Call install_overlay() first.".to_string(),
),
);
}
let bounded = self.bound_increment_delta(key, delta)?;
let lsn = self.append_durable_wal(self.build_increment_record(key_bytes, bounded))?;
let (new_val, generation) = self.increment_publish_inner(key, delta)?;
self.commit_rank_and_mark(lsn, key_bytes, generation)?;
Ok(new_val)
}
fn value_present_faulting(&self, key_bytes: &[u8]) -> Result<bool>;
fn value_read_faulting(&self, key_bytes: &[u8]) -> Result<Option<V>>;
fn value_publish_inner(
&self,
key_bytes: &[u8],
value: V,
mode: ValueWriteMode,
) -> Result<ValuePublishOutcome>;
fn insert_cas_with_value_durable_default(&self, key_bytes: &[u8], value: V) -> Result<bool> {
self.durable_policy_gate("insert_cas_with_value_durable", "write")?;
if self.lockfree_root().is_none() {
return Err(
crate::persistent_artrie_core::error::PersistentARTrieError::InvalidOperation(
"Lock-free mode not enabled. Call install_overlay() first.".to_string(),
),
);
}
if self.value_present_faulting(key_bytes)? {
return Ok(false);
}
let value_bytes = crate::serialization::bincode_compat::serialize(&value).map_err(|e| {
crate::persistent_artrie_core::error::PersistentARTrieError::internal(format!(
"Failed to serialize value: {}",
e
))
})?;
let lsn = self.append_durable_wal(WalRecord::Insert {
term: key_bytes.to_vec(),
value: Some(value_bytes),
})?;
match self.value_publish_inner(key_bytes, value, ValueWriteMode::InsertOnce)? {
ValuePublishOutcome::Published(generation) => {
self.commit_rank_and_mark(lsn, key_bytes, generation)?;
Ok(true)
}
ValuePublishOutcome::NotApplied => {
self.mark_committed_burned(lsn);
Ok(false)
}
}
}
fn upsert_cas_durable_default(&self, key_bytes: &[u8], value: V) -> Result<bool> {
self.durable_policy_gate("upsert_cas_durable", "write")?;
if self.lockfree_root().is_none() {
return Err(
crate::persistent_artrie_core::error::PersistentARTrieError::InvalidOperation(
"Lock-free mode not enabled. Call install_overlay() first.".to_string(),
),
);
}
let existed = self.value_present_faulting(key_bytes)?;
let value_bytes = crate::serialization::bincode_compat::serialize(&value).map_err(|e| {
crate::persistent_artrie_core::error::PersistentARTrieError::internal(format!(
"Failed to serialize value: {}",
e
))
})?;
let lsn = self.append_durable_wal(WalRecord::Upsert {
term: key_bytes.to_vec(),
value: value_bytes,
})?;
match self.value_publish_inner(key_bytes, value, ValueWriteMode::Upsert)? {
ValuePublishOutcome::Published(generation) => {
self.commit_rank_and_mark(lsn, key_bytes, generation)?;
Ok(!existed)
}
ValuePublishOutcome::NotApplied => {
self.mark_committed_burned(lsn);
Err(
crate::persistent_artrie_core::error::PersistentARTrieError::internal(
"upsert_cas_durable: value publish unexpectedly refused (NotApplied); the \
Upsert record is durable and replays on reopen",
),
)
}
}
}
fn compare_and_swap_cas_durable_default(
&self,
key_bytes: &[u8],
expected: Option<V>,
new: V,
) -> Result<bool> {
self.durable_policy_gate("compare_and_swap", "write")?;
if self.lockfree_root().is_none() {
return Err(
crate::persistent_artrie_core::error::PersistentARTrieError::InvalidOperation(
"Lock-free mode not enabled. Call install_overlay() first.".to_string(),
),
);
}
let current = self.value_read_faulting(key_bytes)?;
let expected_bytes = match &expected {
Some(e) => Some(
crate::serialization::bincode_compat::serialize(e).map_err(|err| {
crate::persistent_artrie_core::error::PersistentARTrieError::internal(format!(
"Failed to serialize value: {}",
err
))
})?,
),
None => None,
};
let current_bytes = match ¤t {
Some(c) => Some(
crate::serialization::bincode_compat::serialize(c).map_err(|err| {
crate::persistent_artrie_core::error::PersistentARTrieError::internal(format!(
"Failed to serialize value: {}",
err
))
})?,
),
None => None,
};
if current_bytes != expected_bytes {
return Ok(false);
}
let new_bytes = crate::serialization::bincode_compat::serialize(&new).map_err(|e| {
crate::persistent_artrie_core::error::PersistentARTrieError::internal(format!(
"Failed to serialize value: {}",
e
))
})?;
let lsn = self.append_durable_wal(WalRecord::Upsert {
term: key_bytes.to_vec(),
value: new_bytes,
})?;
match self.value_publish_inner(
key_bytes,
new,
ValueWriteMode::CompareAndSwap { expected_bytes },
)? {
ValuePublishOutcome::Published(generation) => {
self.commit_rank_and_mark(lsn, key_bytes, generation)?;
Ok(true)
}
ValuePublishOutcome::NotApplied => {
self.mark_committed_burned(lsn);
Ok(false)
}
}
}
fn get_or_insert_durable_default(&self, key_bytes: &[u8], default: V) -> Result<V> {
if let Some(v) = self.value_read_faulting(key_bytes)? {
return Ok(v);
}
if self.insert_cas_with_value_durable_default(key_bytes, default.clone())? {
Ok(default)
} else {
match self.value_read_faulting(key_bytes)? {
Some(v) => Ok(v),
None => Ok(default),
}
}
}
}