extern crate alloc;
use alloc::collections::BTreeMap;
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use crate::wire_types::SequenceNumber;
#[cfg(feature = "inspect")]
use alloc::borrow::ToOwned;
#[cfg(feature = "inspect")]
fn dispatch_rtps_tap(label: &str, sn: SequenceNumber, payload: Vec<u8>) {
let ts_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
.unwrap_or(0);
#[allow(clippy::cast_sign_loss)]
let corr = sn.0 as u64;
let frame = zerodds_inspect_endpoint::Frame::rtps(label.to_owned(), ts_ns, corr, payload);
zerodds_inspect_endpoint::tap::dispatch(&frame);
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChangeKind {
Alive,
AliveFiltered,
NotAliveDisposed,
NotAliveUnregistered,
NotAliveDisposedUnregistered,
}
impl ChangeKind {
#[must_use]
pub fn is_relevant(self) -> bool {
!matches!(self, Self::AliveFiltered)
}
#[must_use]
pub fn is_alive_kind(self) -> bool {
matches!(self, Self::Alive | Self::AliveFiltered)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CacheChange {
pub sequence_number: SequenceNumber,
pub payload: Arc<[u8]>,
pub kind: ChangeKind,
pub key_hash: Option<[u8; 16]>,
}
impl CacheChange {
#[must_use]
pub fn alive(sn: SequenceNumber, payload: Vec<u8>) -> Self {
Self::alive_arc(sn, Arc::from(payload))
}
#[must_use]
pub(crate) fn alive_arc(sn: SequenceNumber, payload: Arc<[u8]>) -> Self {
Self {
sequence_number: sn,
payload,
kind: ChangeKind::Alive,
key_hash: None,
}
}
#[must_use]
pub fn lifecycle(sn: SequenceNumber, payload: Vec<u8>, kind: ChangeKind) -> Self {
Self {
sequence_number: sn,
payload: Arc::from(payload),
kind,
key_hash: None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HistoryKind {
KeepAll,
KeepLast {
depth: usize,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum CacheError {
CapacityExceeded,
DuplicateSequenceNumber,
ZeroDepth,
}
const STATS_SENTINEL_NO_SN: i64 = i64::MIN;
#[derive(Debug)]
pub struct HistoryCacheStats {
pub len: AtomicUsize,
pub evicted: AtomicU64,
pub max_sn: AtomicI64,
pub min_sn: AtomicI64,
}
impl Default for HistoryCacheStats {
fn default() -> Self {
Self {
len: AtomicUsize::new(0),
evicted: AtomicU64::new(0),
max_sn: AtomicI64::new(STATS_SENTINEL_NO_SN),
min_sn: AtomicI64::new(STATS_SENTINEL_NO_SN),
}
}
}
impl HistoryCacheStats {
#[must_use]
pub fn snapshot(&self) -> HistoryCacheSnapshot {
HistoryCacheSnapshot {
len: self.len.load(Ordering::Acquire),
evicted: self.evicted.load(Ordering::Acquire),
max_sn: decode_sn_atom(self.max_sn.load(Ordering::Acquire)),
min_sn: decode_sn_atom(self.min_sn.load(Ordering::Acquire)),
}
}
}
impl Clone for HistoryCacheStats {
fn clone(&self) -> Self {
Self {
len: AtomicUsize::new(self.len.load(Ordering::Acquire)),
evicted: AtomicU64::new(self.evicted.load(Ordering::Acquire)),
max_sn: AtomicI64::new(self.max_sn.load(Ordering::Acquire)),
min_sn: AtomicI64::new(self.min_sn.load(Ordering::Acquire)),
}
}
}
fn decode_sn_atom(v: i64) -> Option<SequenceNumber> {
if v == STATS_SENTINEL_NO_SN {
None
} else {
Some(SequenceNumber(v))
}
}
fn encode_sn_atom(sn: Option<SequenceNumber>) -> i64 {
sn.map_or(STATS_SENTINEL_NO_SN, |s| s.0)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct HistoryCacheSnapshot {
pub len: usize,
pub evicted: u64,
pub max_sn: Option<SequenceNumber>,
pub min_sn: Option<SequenceNumber>,
}
#[derive(Debug)]
pub struct HistoryCache {
changes: BTreeMap<SequenceNumber, CacheChange>,
kind: HistoryKind,
max_samples: usize,
evicted_count: u64,
stats: Arc<HistoryCacheStats>,
#[cfg(feature = "inspect")]
inspect_label: Option<alloc::string::String>,
}
impl Clone for HistoryCache {
fn clone(&self) -> Self {
Self {
changes: self.changes.clone(),
kind: self.kind,
max_samples: self.max_samples,
evicted_count: self.evicted_count,
stats: Arc::new((*self.stats).clone()),
#[cfg(feature = "inspect")]
inspect_label: self.inspect_label.clone(),
}
}
}
impl HistoryCache {
#[must_use]
pub fn new_with_kind(kind: HistoryKind, max_samples: usize) -> Self {
Self {
changes: BTreeMap::new(),
kind,
max_samples,
evicted_count: 0,
stats: Arc::new(HistoryCacheStats::default()),
#[cfg(feature = "inspect")]
inspect_label: None,
}
}
#[cfg(feature = "inspect")]
pub fn set_inspect_label(&mut self, label: alloc::string::String) {
self.inspect_label = Some(label);
}
#[must_use]
pub fn stats(&self) -> Arc<HistoryCacheStats> {
Arc::clone(&self.stats)
}
fn refresh_stats(&self) {
self.stats.len.store(self.changes.len(), Ordering::Release);
self.stats
.evicted
.store(self.evicted_count, Ordering::Release);
let max = self.changes.keys().next_back().copied();
let min = self.changes.keys().next().copied();
self.stats
.max_sn
.store(encode_sn_atom(max), Ordering::Release);
self.stats
.min_sn
.store(encode_sn_atom(min), Ordering::Release);
}
#[must_use]
pub fn new(max_samples: usize) -> Self {
Self::new_with_kind(HistoryKind::KeepAll, max_samples)
}
#[must_use]
pub fn kind(&self) -> HistoryKind {
self.kind
}
#[must_use]
pub fn evicted_count(&self) -> u64 {
self.evicted_count
}
pub fn insert(&mut self, change: CacheChange) -> Result<(), CacheError> {
if self.changes.contains_key(&change.sequence_number) {
return Err(CacheError::DuplicateSequenceNumber);
}
let cap = self.effective_max_samples()?;
if self.changes.len() >= cap {
match self.kind {
HistoryKind::KeepAll => return Err(CacheError::CapacityExceeded),
HistoryKind::KeepLast { .. } => {
if let Some((&oldest, _)) = self.changes.iter().next() {
self.changes.remove(&oldest);
self.evicted_count = self.evicted_count.saturating_add(1);
}
}
}
}
#[cfg(feature = "inspect")]
let tap_view = self.inspect_label.as_ref().map(|label| {
(
label.clone(),
change.sequence_number,
change.payload.to_vec(),
)
});
self.changes.insert(change.sequence_number, change);
self.refresh_stats();
#[cfg(feature = "inspect")]
if let Some((label, sn, payload)) = tap_view {
dispatch_rtps_tap(&label, sn, payload);
}
Ok(())
}
fn effective_max_samples(&self) -> Result<usize, CacheError> {
match self.kind {
HistoryKind::KeepAll => Ok(self.max_samples),
HistoryKind::KeepLast { depth } => {
if depth == 0 {
return Err(CacheError::ZeroDepth);
}
Ok(core::cmp::min(depth, self.max_samples))
}
}
}
#[must_use]
pub fn get(&self, sn: SequenceNumber) -> Option<&CacheChange> {
self.changes.get(&sn)
}
pub fn remove_up_to(&mut self, sn: SequenceNumber) -> usize {
let keep = self.changes.split_off(&SequenceNumber(sn.0 + 1));
let removed = self.changes.len();
self.changes = keep;
self.refresh_stats();
removed
}
pub fn iter_range(
&self,
lo: SequenceNumber,
hi: SequenceNumber,
) -> impl Iterator<Item = &CacheChange> + '_ {
self.changes.range(lo..=hi).map(|(_, v)| v)
}
#[must_use]
pub fn min_sn(&self) -> Option<SequenceNumber> {
self.changes.keys().next().copied()
}
#[must_use]
pub fn max_sn(&self) -> Option<SequenceNumber> {
self.changes.keys().next_back().copied()
}
#[must_use]
pub fn len(&self) -> usize {
self.changes.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.changes.is_empty()
}
#[must_use]
pub fn capacity(&self) -> usize {
self.max_samples
}
}
#[cfg(feature = "std")]
#[derive(Debug)]
pub struct LockFreeReadHistoryCache {
inner: zerodds_foundation::rcu::RcuCell<LockFreeInner>,
stats: Arc<HistoryCacheStats>,
}
#[cfg(feature = "std")]
#[derive(Debug, Clone)]
pub struct LockFreeInner {
pub changes: BTreeMap<SequenceNumber, CacheChange>,
pub kind: HistoryKind,
pub max_samples: usize,
pub evicted_count: u64,
}
#[cfg(feature = "std")]
impl LockFreeInner {
fn effective_max_samples(&self) -> Result<usize, CacheError> {
match self.kind {
HistoryKind::KeepAll => Ok(self.max_samples),
HistoryKind::KeepLast { depth } => {
if depth == 0 {
return Err(CacheError::ZeroDepth);
}
Ok(core::cmp::min(depth, self.max_samples))
}
}
}
}
#[cfg(feature = "std")]
impl LockFreeReadHistoryCache {
#[must_use]
pub fn new_with_kind(kind: HistoryKind, max_samples: usize) -> Self {
Self {
inner: zerodds_foundation::rcu::RcuCell::new(LockFreeInner {
changes: BTreeMap::new(),
kind,
max_samples,
evicted_count: 0,
}),
stats: Arc::new(HistoryCacheStats::default()),
}
}
#[must_use]
pub fn new(max_samples: usize) -> Self {
Self::new_with_kind(HistoryKind::KeepAll, max_samples)
}
#[must_use]
pub fn stats(&self) -> Arc<HistoryCacheStats> {
Arc::clone(&self.stats)
}
#[must_use]
pub fn snapshot(&self) -> Arc<LockFreeInner> {
self.inner.read()
}
#[must_use]
pub fn kind(&self) -> HistoryKind {
self.inner.read().kind
}
#[must_use]
pub fn evicted_count(&self) -> u64 {
self.stats.evicted.load(Ordering::Acquire)
}
#[must_use]
pub fn len(&self) -> usize {
self.stats.len.load(Ordering::Acquire)
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[must_use]
pub fn min_sn(&self) -> Option<SequenceNumber> {
decode_sn_atom(self.stats.min_sn.load(Ordering::Acquire))
}
#[must_use]
pub fn max_sn(&self) -> Option<SequenceNumber> {
decode_sn_atom(self.stats.max_sn.load(Ordering::Acquire))
}
#[must_use]
pub fn capacity(&self) -> usize {
self.inner.read().max_samples
}
#[must_use]
pub fn get(&self, sn: SequenceNumber) -> Option<CacheChange> {
self.inner.read().changes.get(&sn).cloned()
}
#[must_use]
pub fn iter_range_snapshot(&self, lo: SequenceNumber, hi: SequenceNumber) -> Vec<CacheChange> {
let snap = self.inner.read();
snap.changes
.range(lo..=hi)
.map(|(_, v)| v.clone())
.collect()
}
pub fn insert(&self, change: CacheChange) -> Result<(), CacheError> {
let dup_or_full: Result<(), CacheError> = {
let snap = self.inner.read();
if snap.changes.contains_key(&change.sequence_number) {
Err(CacheError::DuplicateSequenceNumber)
} else {
let cap = snap.effective_max_samples()?;
if snap.changes.len() >= cap {
if matches!(snap.kind, HistoryKind::KeepAll) {
Err(CacheError::CapacityExceeded)
} else {
Ok(()) }
} else {
Ok(())
}
}
};
dup_or_full?;
self.inner.modify(|inner| {
let cap = match inner.effective_max_samples() {
Ok(c) => c,
Err(_) => return,
};
if inner.changes.len() >= cap {
if let HistoryKind::KeepLast { .. } = inner.kind {
if let Some((&oldest, _)) = inner.changes.iter().next() {
inner.changes.remove(&oldest);
inner.evicted_count = inner.evicted_count.saturating_add(1);
}
}
}
inner.changes.insert(change.sequence_number, change.clone());
});
self.refresh_stats();
Ok(())
}
pub fn remove_up_to(&self, sn: SequenceNumber) -> usize {
let mut removed = 0;
self.inner.modify(|inner| {
let keep = inner.changes.split_off(&SequenceNumber(sn.0 + 1));
removed = inner.changes.len();
inner.changes = keep;
});
self.refresh_stats();
removed
}
fn refresh_stats(&self) {
let snap = self.inner.read();
self.stats.len.store(snap.changes.len(), Ordering::Release);
self.stats
.evicted
.store(snap.evicted_count, Ordering::Release);
let max = snap.changes.keys().next_back().copied();
let min = snap.changes.keys().next().copied();
self.stats
.max_sn
.store(encode_sn_atom(max), Ordering::Release);
self.stats
.min_sn
.store(encode_sn_atom(min), Ordering::Release);
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used)]
mod tests {
use super::*;
fn sn(n: i64) -> SequenceNumber {
SequenceNumber(n)
}
fn alive(n: i64) -> CacheChange {
CacheChange::alive(sn(n), alloc::vec![n as u8])
}
#[test]
fn new_cache_is_empty() {
let c = HistoryCache::new(10);
assert_eq!(c.len(), 0);
assert!(c.is_empty());
assert_eq!(c.min_sn(), None);
assert_eq!(c.max_sn(), None);
}
#[test]
fn insert_and_get() {
let mut c = HistoryCache::new(10);
c.insert(alive(1)).expect("insert");
c.insert(alive(2)).expect("insert");
assert_eq!(
c.get(sn(1)).map(|ch| ch.payload.as_ref().to_vec()),
Some(alloc::vec![1])
);
assert_eq!(c.get(sn(3)), None);
assert_eq!(c.len(), 2);
}
#[test]
fn insert_duplicate_is_err() {
let mut c = HistoryCache::new(10);
c.insert(alive(1)).expect("insert");
assert_eq!(c.insert(alive(1)), Err(CacheError::DuplicateSequenceNumber));
}
#[test]
fn insert_at_capacity_is_err() {
let mut c = HistoryCache::new(2);
c.insert(alive(1)).expect("insert");
c.insert(alive(2)).expect("insert");
assert_eq!(c.insert(alive(3)), Err(CacheError::CapacityExceeded));
}
#[test]
fn min_max_sn_reflect_content() {
let mut c = HistoryCache::new(10);
c.insert(alive(5)).unwrap();
c.insert(alive(3)).unwrap();
c.insert(alive(7)).unwrap();
assert_eq!(c.min_sn(), Some(sn(3)));
assert_eq!(c.max_sn(), Some(sn(7)));
}
#[test]
fn remove_up_to_inclusive() {
let mut c = HistoryCache::new(10);
for i in 1..=5 {
c.insert(alive(i)).unwrap();
}
let removed = c.remove_up_to(sn(3));
assert_eq!(removed, 3);
assert_eq!(c.len(), 2);
assert_eq!(c.min_sn(), Some(sn(4)));
}
#[test]
fn remove_up_to_with_no_matches_is_noop() {
let mut c = HistoryCache::new(10);
c.insert(alive(10)).unwrap();
assert_eq!(c.remove_up_to(sn(5)), 0);
assert_eq!(c.len(), 1);
}
#[test]
fn iter_range_is_ordered() {
let mut c = HistoryCache::new(10);
for i in [5, 1, 3, 8, 2] {
c.insert(alive(i)).unwrap();
}
let collected: alloc::vec::Vec<i64> = c
.iter_range(sn(2), sn(5))
.map(|ch| ch.sequence_number.0)
.collect();
assert_eq!(collected, alloc::vec![2, 3, 5]);
}
#[test]
fn iter_range_empty_when_no_overlap() {
let mut c = HistoryCache::new(10);
c.insert(alive(1)).unwrap();
c.insert(alive(2)).unwrap();
assert_eq!(c.iter_range(sn(10), sn(20)).count(), 0);
}
#[test]
fn capacity_accessor() {
let c = HistoryCache::new(42);
assert_eq!(c.capacity(), 42);
}
#[test]
fn cache_change_alive_constructor() {
let ch = CacheChange::alive(sn(1), alloc::vec![1, 2, 3]);
assert_eq!(ch.kind, ChangeKind::Alive);
assert_eq!(ch.sequence_number, sn(1));
assert_eq!(ch.payload.as_ref(), &[1, 2, 3][..]);
}
#[test]
fn change_kind_alive_is_relevant_and_alive() {
assert!(ChangeKind::Alive.is_relevant());
assert!(ChangeKind::Alive.is_alive_kind());
}
#[test]
fn change_kind_alive_filtered_is_alive_but_not_relevant() {
assert!(ChangeKind::AliveFiltered.is_alive_kind());
assert!(!ChangeKind::AliveFiltered.is_relevant());
}
#[test]
fn change_kind_not_alive_kinds_are_not_alive() {
for k in [
ChangeKind::NotAliveDisposed,
ChangeKind::NotAliveUnregistered,
ChangeKind::NotAliveDisposedUnregistered,
] {
assert!(!k.is_alive_kind(), "{k:?}");
assert!(k.is_relevant(), "{k:?}");
}
}
#[test]
fn change_kind_distinct_variants() {
let v = [
ChangeKind::Alive,
ChangeKind::AliveFiltered,
ChangeKind::NotAliveDisposed,
ChangeKind::NotAliveUnregistered,
ChangeKind::NotAliveDisposedUnregistered,
];
for (i, a) in v.iter().enumerate() {
for (j, b) in v.iter().enumerate() {
if i == j {
assert_eq!(a, b);
} else {
assert_ne!(a, b);
}
}
}
}
#[test]
fn stats_default_is_empty_with_no_sn() {
let c = HistoryCache::new(10);
let snap = c.stats().snapshot();
assert_eq!(snap.len, 0);
assert_eq!(snap.evicted, 0);
assert_eq!(snap.max_sn, None);
assert_eq!(snap.min_sn, None);
}
#[test]
fn stats_track_insert_and_remove() {
let mut c = HistoryCache::new(10);
c.insert(alive(3)).unwrap();
c.insert(alive(5)).unwrap();
c.insert(alive(7)).unwrap();
let snap = c.stats().snapshot();
assert_eq!(snap.len, 3);
assert_eq!(snap.min_sn, Some(sn(3)));
assert_eq!(snap.max_sn, Some(sn(7)));
assert_eq!(snap.evicted, 0);
c.remove_up_to(sn(5));
let snap = c.stats().snapshot();
assert_eq!(snap.len, 1);
assert_eq!(snap.min_sn, Some(sn(7)));
assert_eq!(snap.max_sn, Some(sn(7)));
}
#[test]
fn stats_track_keeplast_eviction() {
let mut c = HistoryCache::new_with_kind(HistoryKind::KeepLast { depth: 2 }, 100);
c.insert(alive(1)).unwrap();
c.insert(alive(2)).unwrap();
c.insert(alive(3)).unwrap(); let snap = c.stats().snapshot();
assert_eq!(snap.len, 2);
assert_eq!(snap.evicted, 1);
assert_eq!(snap.min_sn, Some(sn(2)));
assert_eq!(snap.max_sn, Some(sn(3)));
}
#[test]
fn stats_arc_is_shared_across_clones_of_handle() {
let mut c = HistoryCache::new(10);
let s1 = c.stats();
let s2 = c.stats();
assert!(Arc::ptr_eq(&s1, &s2));
c.insert(alive(1)).unwrap();
assert_eq!(s1.snapshot().len, 1);
assert_eq!(s2.snapshot().len, 1);
}
#[test]
fn stats_reader_thread_sees_inserts_concurrently() {
use std::sync::Arc as StdArc;
use std::sync::Mutex as StdMutex;
use std::thread;
use std::time::Duration;
let cache = StdArc::new(StdMutex::new(HistoryCache::new(2_000)));
let stats = cache.lock().expect("init lock").stats();
let writer_cache = StdArc::clone(&cache);
let writer = thread::spawn(move || {
for i in 1..=1_000 {
let mut c = writer_cache.lock().expect("write lock");
c.insert(alive(i)).expect("insert");
}
});
let reader_stats = StdArc::clone(&stats);
let reader = thread::spawn(move || {
for _ in 0..100 {
let snap = reader_stats.snapshot();
assert!(snap.len <= 1_000);
if let Some(max) = snap.max_sn {
assert!(max.0 >= 1 && max.0 <= 1_000);
}
thread::sleep(Duration::from_micros(50));
}
});
writer.join().expect("writer joined");
reader.join().expect("reader joined");
let final_snap = stats.snapshot();
assert_eq!(final_snap.len, 1_000);
assert_eq!(final_snap.max_sn, Some(sn(1_000)));
assert_eq!(final_snap.min_sn, Some(sn(1)));
}
#[test]
fn clone_creates_independent_stats_handles() {
let mut a = HistoryCache::new(10);
a.insert(alive(1)).unwrap();
let b = a.clone();
assert!(!Arc::ptr_eq(&a.stats(), &b.stats()));
assert_eq!(a.stats().snapshot().len, 1);
assert_eq!(b.stats().snapshot().len, 1);
let mut a_mut = a;
a_mut.insert(alive(2)).unwrap();
assert_eq!(a_mut.stats().snapshot().len, 2);
assert_eq!(b.stats().snapshot().len, 1, "clone unaffected");
}
#[cfg(feature = "std")]
mod lock_free_tests {
use super::*;
#[test]
fn lock_free_new_is_empty() {
let c = LockFreeReadHistoryCache::new(10);
assert_eq!(c.len(), 0);
assert!(c.is_empty());
assert_eq!(c.min_sn(), None);
assert_eq!(c.max_sn(), None);
}
#[test]
fn lock_free_insert_and_get() {
let c = LockFreeReadHistoryCache::new(10);
c.insert(alive(1)).unwrap();
c.insert(alive(2)).unwrap();
assert_eq!(
c.get(sn(1)).map(|ch| ch.payload.as_ref().to_vec()),
Some(alloc::vec![1])
);
assert_eq!(c.get(sn(3)), None);
assert_eq!(c.len(), 2);
}
#[test]
fn lock_free_min_max_lock_free_loads() {
let c = LockFreeReadHistoryCache::new(10);
c.insert(alive(5)).unwrap();
c.insert(alive(3)).unwrap();
c.insert(alive(7)).unwrap();
assert_eq!(c.min_sn(), Some(sn(3)));
assert_eq!(c.max_sn(), Some(sn(7)));
}
#[test]
fn lock_free_keeplast_evicts_oldest() {
let c =
LockFreeReadHistoryCache::new_with_kind(HistoryKind::KeepLast { depth: 2 }, 100);
c.insert(alive(1)).unwrap();
c.insert(alive(2)).unwrap();
c.insert(alive(3)).unwrap(); assert_eq!(c.len(), 2);
assert_eq!(c.min_sn(), Some(sn(2)));
assert_eq!(c.max_sn(), Some(sn(3)));
assert_eq!(c.evicted_count(), 1);
}
#[test]
fn lock_free_keepall_full_rejects() {
let c = LockFreeReadHistoryCache::new(2);
c.insert(alive(1)).unwrap();
c.insert(alive(2)).unwrap();
assert_eq!(c.insert(alive(3)), Err(CacheError::CapacityExceeded));
}
#[test]
fn lock_free_duplicate_sn_rejected() {
let c = LockFreeReadHistoryCache::new(10);
c.insert(alive(1)).unwrap();
assert_eq!(c.insert(alive(1)), Err(CacheError::DuplicateSequenceNumber));
}
#[test]
fn lock_free_remove_up_to() {
let c = LockFreeReadHistoryCache::new(10);
for i in 1..=5 {
c.insert(alive(i)).unwrap();
}
let removed = c.remove_up_to(sn(3));
assert_eq!(removed, 3);
assert_eq!(c.len(), 2);
assert_eq!(c.min_sn(), Some(sn(4)));
}
#[test]
fn lock_free_iter_range_snapshot() {
let c = LockFreeReadHistoryCache::new(10);
for i in 1..=5 {
c.insert(alive(i)).unwrap();
}
let mid: alloc::vec::Vec<_> = c
.iter_range_snapshot(sn(2), sn(4))
.iter()
.map(|ch| ch.sequence_number)
.collect();
assert_eq!(mid, alloc::vec![sn(2), sn(3), sn(4)]);
}
#[test]
fn lock_free_snapshot_outlives_writes() {
let c = LockFreeReadHistoryCache::new(10);
c.insert(alive(1)).unwrap();
let snap = c.snapshot();
assert_eq!(snap.changes.len(), 1);
c.insert(alive(2)).unwrap();
c.insert(alive(3)).unwrap();
assert_eq!(snap.changes.len(), 1);
assert!(snap.changes.contains_key(&sn(1)));
assert_eq!(c.len(), 3);
}
#[test]
fn lock_free_concurrent_readers_writers_smoke() {
use std::sync::Arc as StdArc;
use std::thread;
let cache: StdArc<LockFreeReadHistoryCache> =
StdArc::new(LockFreeReadHistoryCache::new(2_000));
let cache_w = StdArc::clone(&cache);
let writer = thread::spawn(move || {
for i in 1..=500 {
cache_w.insert(alive(i)).expect("insert");
}
});
let cache_r = StdArc::clone(&cache);
let reader = thread::spawn(move || {
for _ in 0..200 {
let snap = cache_r.snapshot();
if let (Some(min), Some(max)) = (
snap.changes.keys().next().copied(),
snap.changes.keys().next_back().copied(),
) {
let inferred_count = (max.0 - min.0 + 1) as usize;
assert!(
snap.changes.len() <= inferred_count,
"snapshot inkonsistent"
);
}
}
});
writer.join().expect("writer joined");
reader.join().expect("reader joined");
assert_eq!(cache.len(), 500);
assert_eq!(cache.max_sn(), Some(sn(500)));
}
}
}