use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
#[derive(Debug)]
pub struct OptimisticVersion {
version: AtomicU64,
}
impl OptimisticVersion {
pub fn new() -> Self {
OptimisticVersion {
version: AtomicU64::new(0),
}
}
pub fn get(&self) -> u64 {
self.version.load(Ordering::Acquire)
}
pub fn is_stable(&self) -> bool {
self.version.load(Ordering::Acquire) % 2 == 0
}
pub fn begin_write(&self) -> u64 {
loop {
let observed = self.version.load(Ordering::Acquire);
if observed % 2 != 0 {
std::hint::spin_loop();
continue;
}
if self
.version
.compare_exchange(observed, observed + 1, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return observed;
}
std::hint::spin_loop();
}
}
pub fn end_write(&self) {
self.version.fetch_add(1, Ordering::Release);
}
pub fn validate(&self, observed: u64) -> bool {
self.version.load(Ordering::Acquire) == observed
}
}
impl Default for OptimisticVersion {
fn default() -> Self {
Self::new()
}
}
pub struct OptimisticReadGuard<'a> {
version: &'a OptimisticVersion,
observed: u64,
}
impl<'a> OptimisticReadGuard<'a> {
pub fn new(version: &'a OptimisticVersion) -> Self {
let mut observed = version.get();
while observed % 2 != 0 {
std::hint::spin_loop();
observed = version.get();
}
OptimisticReadGuard { version, observed }
}
pub fn observed(&self) -> u64 {
self.observed
}
pub fn validate(&self) -> bool {
self.version.validate(self.observed)
}
}
pub struct WriteGuard<'a> {
version: &'a OptimisticVersion,
}
impl<'a> WriteGuard<'a> {
pub fn new(version: &'a OptimisticVersion) -> Self {
version.begin_write();
WriteGuard { version }
}
}
impl<'a> Drop for WriteGuard<'a> {
fn drop(&mut self) {
self.version.end_write();
}
}
pub struct LockCoupling {
max_depth: usize,
current_depth: AtomicUsize,
}
impl LockCoupling {
pub fn new(max_depth: usize) -> Self {
LockCoupling {
max_depth,
current_depth: AtomicUsize::new(0),
}
}
pub fn try_descend(&self) -> bool {
let current = self.current_depth.load(Ordering::Relaxed);
if current >= self.max_depth {
return false;
}
self.current_depth.fetch_add(1, Ordering::Relaxed);
true
}
pub fn ascend(&self) {
let current = self.current_depth.load(Ordering::Relaxed);
if current > 0 {
self.current_depth.fetch_sub(1, Ordering::Relaxed);
}
}
pub fn depth(&self) -> usize {
self.current_depth.load(Ordering::Relaxed)
}
pub fn reset(&self) {
self.current_depth.store(0, Ordering::Relaxed);
}
}
impl Default for LockCoupling {
fn default() -> Self {
Self::new(64) }
}
#[derive(Debug)]
pub struct EpochManager {
global_epoch: AtomicU64,
active_readers: AtomicUsize,
quiescence_condvar: std::sync::Condvar,
quiescence_mutex: std::sync::Mutex<()>,
}
impl EpochManager {
pub fn new() -> Self {
EpochManager {
global_epoch: AtomicU64::new(0),
active_readers: AtomicUsize::new(0),
quiescence_condvar: std::sync::Condvar::new(),
quiescence_mutex: std::sync::Mutex::new(()),
}
}
pub fn enter_read(&self) -> u64 {
self.active_readers.fetch_add(1, Ordering::SeqCst);
self.global_epoch.load(Ordering::Acquire)
}
pub fn exit_read(&self) {
let prev = self.active_readers.fetch_sub(1, Ordering::SeqCst);
if prev == 1 {
let _guard = self
.quiescence_mutex
.lock()
.expect("quiescence_mutex poisoned");
self.quiescence_condvar.notify_all();
}
}
pub fn advance(&self) -> u64 {
self.global_epoch.fetch_add(1, Ordering::AcqRel)
}
pub fn current_epoch(&self) -> u64 {
self.global_epoch.load(Ordering::Acquire)
}
pub fn has_active_readers(&self) -> bool {
self.active_readers.load(Ordering::SeqCst) > 0
}
pub fn active_reader_count(&self) -> usize {
self.active_readers.load(Ordering::SeqCst)
}
pub fn wait_for_quiescence(
&self,
timeout: std::time::Duration,
_poll_interval: std::time::Duration,
) -> Result<u64, std::time::Duration> {
use std::time::Instant;
let start = Instant::now();
let old_epoch = self.advance();
if !self.has_active_readers() {
return Ok(old_epoch);
}
let mut guard = self
.quiescence_mutex
.lock()
.expect("quiescence_mutex poisoned");
loop {
if !self.has_active_readers() {
return Ok(old_epoch);
}
let remaining = timeout.saturating_sub(start.elapsed());
if remaining.is_zero() {
return Err(start.elapsed());
}
let (new_guard, wait_result) = self
.quiescence_condvar
.wait_timeout(guard, remaining)
.expect("quiescence_mutex poisoned");
guard = new_guard;
if !self.has_active_readers() {
return Ok(old_epoch);
}
if wait_result.timed_out() {
return Err(start.elapsed());
}
}
}
pub fn try_quiescence(&self) -> Option<u64> {
let old_epoch = self.advance();
if self.has_active_readers() {
None
} else {
Some(old_epoch)
}
}
}
impl Default for EpochManager {
fn default() -> Self {
Self::new()
}
}
pub struct EpochGuard<'a> {
manager: &'a EpochManager,
#[allow(dead_code)]
epoch: u64,
}
impl<'a> EpochGuard<'a> {
pub fn new(manager: &'a EpochManager) -> Self {
let epoch = manager.enter_read();
EpochGuard { manager, epoch }
}
}
impl<'a> Drop for EpochGuard<'a> {
fn drop(&mut self) {
self.manager.exit_read();
}
}
pub struct MvccReadContext<'a> {
epoch_manager: &'a EpochManager,
snapshot_epoch: u64,
node_versions: Vec<(usize, u64)>,
}
impl<'a> MvccReadContext<'a> {
pub fn new(epoch_manager: &'a EpochManager) -> Self {
let snapshot_epoch = epoch_manager.enter_read();
MvccReadContext {
epoch_manager,
snapshot_epoch,
node_versions: Vec::with_capacity(16), }
}
#[inline]
pub fn observe_node(&mut self, node_id: usize, version: u64) {
self.node_versions.push((node_id, version));
}
pub fn epoch(&self) -> u64 {
self.snapshot_epoch
}
pub fn observed_count(&self) -> usize {
self.node_versions.len()
}
pub fn validate<F>(&self, get_version: F) -> bool
where
F: Fn(usize) -> Option<u64>,
{
for &(node_id, expected_version) in &self.node_versions {
match get_version(node_id) {
Some(current_version) if current_version == expected_version => continue,
_ => return false, }
}
true
}
pub fn validate_mut<F>(&self, mut get_version: F) -> bool
where
F: FnMut(usize) -> Option<u64>,
{
for &(node_id, expected_version) in &self.node_versions {
match get_version(node_id) {
Some(current_version) if current_version == expected_version => continue,
_ => return false,
}
}
true
}
pub fn reset(&mut self) {
self.node_versions.clear();
}
}
impl<'a> Drop for MvccReadContext<'a> {
fn drop(&mut self) {
self.epoch_manager.exit_read();
}
}
#[derive(Debug, Default)]
pub struct RetryStats {
pub successful: AtomicU64,
pub retries: AtomicU64,
pub max_retries: AtomicU64,
}
impl RetryStats {
pub fn new() -> Self {
Self::default()
}
pub fn record_success(&self, retry_count: u64) {
self.successful.fetch_add(1, Ordering::Relaxed);
if retry_count > 0 {
self.retries.fetch_add(retry_count, Ordering::Relaxed);
loop {
let current = self.max_retries.load(Ordering::Relaxed);
if retry_count <= current {
break;
}
if self
.max_retries
.compare_exchange_weak(
current,
retry_count,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
}
}
}
pub fn retry_rate(&self) -> f64 {
let successful = self.successful.load(Ordering::Relaxed);
let retries = self.retries.load(Ordering::Relaxed);
if successful + retries == 0 {
0.0
} else {
retries as f64 / (successful + retries) as f64
}
}
pub fn snapshot(&self) -> RetryStatsSnapshot {
RetryStatsSnapshot {
successful: self.successful.load(Ordering::Relaxed),
retries: self.retries.load(Ordering::Relaxed),
max_retries: self.max_retries.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct RetryStatsSnapshot {
pub successful: u64,
pub retries: u64,
pub max_retries: u64,
}
#[derive(Debug, Default)]
pub struct TrieStats {
pub reads: AtomicU64,
pub writes: AtomicU64,
pub read_retries: AtomicU64,
pub cache_hits: AtomicU64,
pub cache_misses: AtomicU64,
pub wal_writes: AtomicU64,
pub wal_syncs: AtomicU64,
}
impl TrieStats {
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn record_read(&self) {
self.reads.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_write(&self) {
self.writes.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_read_retry(&self) {
self.read_retries.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_cache_hit(&self) {
self.cache_hits.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_cache_miss(&self) {
self.cache_misses.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_wal_write(&self) {
self.wal_writes.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_wal_sync(&self) {
self.wal_syncs.fetch_add(1, Ordering::Relaxed);
}
pub fn cache_hit_ratio(&self) -> f64 {
let hits = self.cache_hits.load(Ordering::Relaxed);
let misses = self.cache_misses.load(Ordering::Relaxed);
let total = hits + misses;
if total == 0 {
0.0
} else {
hits as f64 / total as f64
}
}
pub fn read_retry_ratio(&self) -> f64 {
let reads = self.reads.load(Ordering::Relaxed);
let retries = self.read_retries.load(Ordering::Relaxed);
if reads == 0 {
0.0
} else {
retries as f64 / reads as f64
}
}
pub fn snapshot(&self) -> TrieStatsSnapshot {
TrieStatsSnapshot {
reads: self.reads.load(Ordering::Relaxed),
writes: self.writes.load(Ordering::Relaxed),
read_retries: self.read_retries.load(Ordering::Relaxed),
cache_hits: self.cache_hits.load(Ordering::Relaxed),
cache_misses: self.cache_misses.load(Ordering::Relaxed),
wal_writes: self.wal_writes.load(Ordering::Relaxed),
wal_syncs: self.wal_syncs.load(Ordering::Relaxed),
}
}
pub fn reset(&self) {
self.reads.store(0, Ordering::Relaxed);
self.writes.store(0, Ordering::Relaxed);
self.read_retries.store(0, Ordering::Relaxed);
self.cache_hits.store(0, Ordering::Relaxed);
self.cache_misses.store(0, Ordering::Relaxed);
self.wal_writes.store(0, Ordering::Relaxed);
self.wal_syncs.store(0, Ordering::Relaxed);
}
}
#[derive(Debug, Clone, Copy)]
pub struct TrieStatsSnapshot {
pub reads: u64,
pub writes: u64,
pub read_retries: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub wal_writes: u64,
pub wal_syncs: u64,
}
impl TrieStatsSnapshot {
pub fn cache_hit_ratio(&self) -> f64 {
let total = self.cache_hits + self.cache_misses;
if total == 0 {
0.0
} else {
self.cache_hits as f64 / total as f64
}
}
pub fn read_retry_ratio(&self) -> f64 {
if self.reads == 0 {
0.0
} else {
self.read_retries as f64 / self.reads as f64
}
}
}
pub struct OptimisticCell<T> {
version: OptimisticVersion,
data: RwLock<T>,
}
unsafe impl<T: Send + Sync> Sync for OptimisticCell<T> {}
unsafe impl<T: Send> Send for OptimisticCell<T> {}
impl<T> OptimisticCell<T> {
pub fn new(data: T) -> Self {
OptimisticCell {
version: OptimisticVersion::new(),
data: RwLock::new(data),
}
}
pub fn try_read<R, F>(&self, f: F) -> Option<R>
where
F: FnOnce(&T) -> R,
{
let guard = OptimisticReadGuard::new(&self.version);
let data = self.data.read().expect("OptimisticCell read lock poisoned");
let result = f(&data);
drop(data);
if guard.validate() {
Some(result)
} else {
None
}
}
pub fn read_with_retry<R, F>(&self, f: F, max_retries: usize) -> Option<R>
where
F: Fn(&T) -> R,
{
for _ in 0..max_retries {
if let Some(result) = self.try_read(&f) {
return Some(result);
}
std::hint::spin_loop();
}
None
}
pub fn write<R, F>(&self, f: F) -> R
where
F: FnOnce(&mut T) -> R,
{
let _guard = WriteGuard::new(&self.version);
let mut data = self
.data
.write()
.expect("OptimisticCell write lock poisoned");
f(&mut data)
}
pub fn version(&self) -> u64 {
self.version.get()
}
pub fn is_locked(&self) -> bool {
!self.version.is_stable()
}
}
pub type SharedOptimisticCell<T> = Arc<OptimisticCell<T>>;
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_optimistic_version() {
let version = OptimisticVersion::new();
assert_eq!(version.get(), 0);
assert!(version.is_stable());
version.begin_write();
assert!(!version.is_stable());
assert_eq!(version.get(), 1);
version.end_write();
assert!(version.is_stable());
assert_eq!(version.get(), 2);
}
#[test]
fn test_optimistic_read_guard() {
let version = OptimisticVersion::new();
let guard = OptimisticReadGuard::new(&version);
assert_eq!(guard.observed(), 0);
assert!(guard.validate());
version.begin_write();
assert!(!guard.validate());
version.end_write();
assert!(!guard.validate()); }
#[test]
fn test_lock_coupling() {
let lc = LockCoupling::new(3);
assert_eq!(lc.depth(), 0);
assert!(lc.try_descend());
assert_eq!(lc.depth(), 1);
assert!(lc.try_descend());
assert_eq!(lc.depth(), 2);
assert!(lc.try_descend());
assert_eq!(lc.depth(), 3);
assert!(!lc.try_descend());
assert_eq!(lc.depth(), 3);
lc.ascend();
assert_eq!(lc.depth(), 2);
lc.reset();
assert_eq!(lc.depth(), 0);
}
#[test]
fn test_epoch_manager() {
let epoch = EpochManager::new();
assert_eq!(epoch.current_epoch(), 0);
assert!(!epoch.has_active_readers());
let e1 = epoch.enter_read();
assert_eq!(e1, 0);
assert!(epoch.has_active_readers());
assert_eq!(epoch.active_reader_count(), 1);
let e2 = epoch.advance();
assert_eq!(e2, 0);
assert_eq!(epoch.current_epoch(), 1);
epoch.exit_read();
assert!(!epoch.has_active_readers());
}
#[test]
fn test_epoch_guard() {
let epoch = EpochManager::new();
{
let _guard = EpochGuard::new(&epoch);
assert!(epoch.has_active_readers());
}
assert!(!epoch.has_active_readers());
}
#[test]
fn test_retry_stats() {
let stats = RetryStats::new();
stats.record_success(0);
stats.record_success(2);
stats.record_success(5);
let snapshot = stats.snapshot();
assert_eq!(snapshot.successful, 3);
assert_eq!(snapshot.retries, 7);
assert_eq!(snapshot.max_retries, 5);
}
#[test]
fn test_optimistic_cell_read() {
let cell = OptimisticCell::new(42);
let result = cell.try_read(|v| *v).expect("read should succeed");
assert_eq!(result, 42);
}
#[test]
fn test_optimistic_cell_write() {
let cell = OptimisticCell::new(42);
cell.write(|v| *v = 100);
let result = cell.try_read(|v| *v).expect("read should succeed");
assert_eq!(result, 100);
}
#[test]
fn test_optimistic_cell_concurrent() {
let cell = Arc::new(OptimisticCell::new(0));
let cell_clone = cell.clone();
let writer = thread::spawn(move || {
for i in 1..=100 {
cell_clone.write(|v| *v = i);
}
});
let reader = thread::spawn(move || {
let mut max_seen = 0;
for _ in 0..1000 {
if let Some(v) = cell.read_with_retry(|v| *v, 10) {
if v > max_seen {
max_seen = v;
}
}
}
max_seen
});
writer.join().expect("writer panicked");
let max_seen = reader.join().expect("reader panicked");
assert!(max_seen >= 0);
}
#[test]
fn test_optimistic_cell_is_locked() {
let cell = OptimisticCell::new(42);
assert!(!cell.is_locked());
cell.write(|_| {
});
assert!(!cell.is_locked());
}
#[test]
fn test_optimistic_read_guard_waits_for_stable_version() {
use std::time::Duration;
let version = Arc::new(OptimisticVersion::new());
let version_clone = Arc::clone(&version);
version.begin_write();
assert!(
!version.is_stable(),
"Version should be unstable (odd) during write"
);
let reader_handle = thread::spawn(move || {
let guard = OptimisticReadGuard::new(&version_clone);
guard.observed()
});
thread::sleep(Duration::from_millis(10));
version.end_write();
let observed = reader_handle.join().expect("reader thread should complete");
assert_eq!(observed % 2, 0, "Observed version should be even (stable)");
assert_eq!(
observed, 2,
"Version should be 2 after begin_write + end_write"
);
}
#[test]
fn test_optimistic_read_guard_immediate_stable() {
let version = OptimisticVersion::new();
assert!(version.is_stable());
let guard = OptimisticReadGuard::new(&version);
assert_eq!(guard.observed(), 0);
assert!(guard.validate());
}
#[test]
fn test_lock_coupling_ascend_at_zero() {
let lc = LockCoupling::new(5);
assert_eq!(lc.depth(), 0);
lc.ascend();
assert_eq!(lc.depth(), 0, "Depth should remain 0 after ascend from 0");
lc.ascend();
assert_eq!(lc.depth(), 0);
}
#[test]
fn test_epoch_wait_for_quiescence_immediate() {
use std::time::Duration;
let epoch = EpochManager::new();
assert!(!epoch.has_active_readers());
let result =
epoch.wait_for_quiescence(Duration::from_millis(100), Duration::from_micros(100));
assert!(result.is_ok(), "Should achieve quiescence immediately");
let old_epoch = result.unwrap();
assert_eq!(old_epoch, 0, "Old epoch should be 0");
assert_eq!(
epoch.current_epoch(),
1,
"Current epoch should be 1 after advance"
);
}
#[test]
fn test_epoch_wait_for_quiescence_timeout() {
use std::sync::atomic::AtomicBool;
use std::time::Duration;
let epoch = Arc::new(EpochManager::new());
let epoch_clone = Arc::clone(&epoch);
let reader_entered = Arc::new(AtomicBool::new(false));
let reader_entered_clone = Arc::clone(&reader_entered);
let reader_handle = thread::spawn(move || {
let _e = epoch_clone.enter_read();
reader_entered_clone.store(true, std::sync::atomic::Ordering::Release);
thread::sleep(Duration::from_millis(500));
epoch_clone.exit_read();
});
while !reader_entered.load(std::sync::atomic::Ordering::Acquire) {
thread::yield_now();
}
let result =
epoch.wait_for_quiescence(Duration::from_millis(50), Duration::from_micros(100));
assert!(result.is_err(), "Should timeout with active reader");
let elapsed = result.unwrap_err();
assert!(
elapsed >= Duration::from_millis(50),
"Should have waited at least 50ms"
);
reader_handle.join().expect("reader should complete");
}
#[test]
fn test_epoch_try_quiescence_success() {
let epoch = EpochManager::new();
let result = epoch.try_quiescence();
assert!(result.is_some(), "Should succeed with no readers");
assert_eq!(result.unwrap(), 0, "Old epoch should be 0");
}
#[test]
fn test_epoch_try_quiescence_with_readers() {
let epoch = EpochManager::new();
let _e = epoch.enter_read();
assert!(epoch.has_active_readers());
let result = epoch.try_quiescence();
assert!(result.is_none(), "Should fail with active reader");
epoch.exit_read();
let result = epoch.try_quiescence();
assert!(result.is_some(), "Should succeed after reader exits");
}
#[test]
fn test_retry_stats_no_retries() {
let stats = RetryStats::new();
stats.record_success(0);
stats.record_success(0);
stats.record_success(0);
let snapshot = stats.snapshot();
assert_eq!(snapshot.successful, 3);
assert_eq!(snapshot.retries, 0);
assert_eq!(snapshot.max_retries, 0);
assert!((stats.retry_rate() - 0.0).abs() < 0.001);
}
#[test]
fn test_retry_stats_empty() {
let stats = RetryStats::new();
assert!(
(stats.retry_rate() - 0.0).abs() < 0.001,
"Retry rate should be 0 with no ops"
);
}
#[test]
fn test_trie_stats_cache_hit_ratio_zero() {
let stats = TrieStats::new();
assert!((stats.cache_hit_ratio() - 0.0).abs() < 0.001);
let snapshot = stats.snapshot();
assert!((snapshot.cache_hit_ratio() - 0.0).abs() < 0.001);
}
#[test]
fn test_trie_stats_read_retry_ratio_zero() {
let stats = TrieStats::new();
assert!((stats.read_retry_ratio() - 0.0).abs() < 0.001);
let snapshot = stats.snapshot();
assert!((snapshot.read_retry_ratio() - 0.0).abs() < 0.001);
}
#[test]
fn test_trie_stats_reset() {
let stats = TrieStats::new();
stats.record_read();
stats.record_write();
stats.record_cache_hit();
stats.record_cache_miss();
stats.record_read_retry();
stats.record_wal_write();
stats.record_wal_sync();
let snapshot = stats.snapshot();
assert_eq!(snapshot.reads, 1);
assert_eq!(snapshot.writes, 1);
assert_eq!(snapshot.cache_hits, 1);
stats.reset();
let snapshot = stats.snapshot();
assert_eq!(snapshot.reads, 0);
assert_eq!(snapshot.writes, 0);
assert_eq!(snapshot.cache_hits, 0);
assert_eq!(snapshot.cache_misses, 0);
assert_eq!(snapshot.read_retries, 0);
assert_eq!(snapshot.wal_writes, 0);
assert_eq!(snapshot.wal_syncs, 0);
}
#[test]
fn test_mvcc_read_context_validation_changed() {
let epoch = EpochManager::new();
let mut ctx = MvccReadContext::new(&epoch);
ctx.observe_node(1, 10);
ctx.observe_node(2, 20);
ctx.observe_node(3, 30);
assert_eq!(ctx.observed_count(), 3);
let valid = ctx.validate(|id| match id {
1 => Some(10),
2 => Some(20),
3 => Some(30),
_ => None,
});
assert!(valid, "Should be valid when versions match");
let invalid = ctx.validate(|id| match id {
1 => Some(10),
2 => Some(21), 3 => Some(30),
_ => None,
});
assert!(!invalid, "Should be invalid when a version changed");
let missing = ctx.validate(|id| match id {
1 => Some(10),
2 => None, 3 => Some(30),
_ => None,
});
assert!(!missing, "Should be invalid when a node is missing");
}
#[test]
fn test_mvcc_read_context_reset() {
let epoch = EpochManager::new();
let mut ctx = MvccReadContext::new(&epoch);
ctx.observe_node(1, 10);
ctx.observe_node(2, 20);
assert_eq!(ctx.observed_count(), 2);
ctx.reset();
assert_eq!(ctx.observed_count(), 0);
assert_eq!(ctx.epoch(), 0);
}
#[test]
fn test_mvcc_read_context_validate_mut() {
let epoch = EpochManager::new();
let mut ctx = MvccReadContext::new(&epoch);
ctx.observe_node(1, 10);
ctx.observe_node(2, 20);
let mut call_count = 0;
let valid = ctx.validate_mut(|id| {
call_count += 1;
match id {
1 => Some(10),
2 => Some(20),
_ => None,
}
});
assert!(valid);
assert_eq!(call_count, 2, "Should have called get_version twice");
}
#[test]
fn test_optimistic_cell_read_with_retry_fails() {
let cell = Arc::new(OptimisticCell::new(42));
let cell_clone = Arc::clone(&cell);
let writer_running = Arc::new(std::sync::atomic::AtomicBool::new(true));
let writer_flag = Arc::clone(&writer_running);
let writer = thread::spawn(move || {
while writer_flag.load(std::sync::atomic::Ordering::Relaxed) {
cell_clone.write(|v| *v += 1);
std::hint::spin_loop();
}
});
let mut failures = 0;
for _ in 0..100 {
if cell.read_with_retry(|v| *v, 1).is_none() {
failures += 1;
}
}
writer_running.store(false, std::sync::atomic::Ordering::Relaxed);
writer.join().expect("writer should complete");
}
#[test]
fn test_write_guard_drop_increments_version() {
let version = OptimisticVersion::new();
assert_eq!(version.get(), 0);
{
let _guard = WriteGuard::new(&version);
assert_eq!(version.get(), 1); assert!(!version.is_stable());
}
assert_eq!(version.get(), 2);
assert!(version.is_stable());
}
#[test]
fn test_optimistic_version_validate_after_change() {
let version = OptimisticVersion::new();
let observed = version.get();
assert!(version.validate(observed));
version.begin_write();
version.end_write();
assert!(!version.validate(observed));
assert!(version.validate(version.get()));
}
}