mod ll;
use self::ll::{LLNode, LL};
use crate::cowcell::{CowCell, CowCellReadTxn};
use crate::hashmap::*;
use crossbeam::channel::{unbounded, Receiver, Sender};
use parking_lot::{Mutex, RwLock};
use std::collections::HashMap as Map;
use std::borrow::Borrow;
use std::cell::UnsafeCell;
use std::convert::TryFrom;
use std::fmt::Debug;
use std::hash::Hash;
use std::mem;
use std::ops::Deref;
use std::ops::DerefMut;
use std::time::Instant;
const READ_THREAD_RATIO: usize = 16;
#[derive(Clone, Debug, PartialEq)]
pub struct CacheStats {
pub reader_hits: usize,
pub reader_includes: usize,
pub write_hits: usize,
pub write_inc_or_mod: usize,
pub shared_max: usize,
pub freq: usize,
pub recent: usize,
pub freq_evicts: usize,
pub recent_evicts: usize,
pub p_weight: usize,
pub all_seen_keys: usize,
}
enum ThreadCacheItem<V> {
Present(V, bool),
Removed(bool),
}
enum CacheEvent<K, V> {
Hit(Instant, u64),
Include(Instant, K, V, u64),
}
#[derive(Hash, Ord, PartialOrd, Eq, PartialEq, Clone, Debug)]
struct CacheItemInner<K>
where
K: Hash + Eq + Ord + Clone + Debug,
{
k: K,
txid: u64,
}
#[derive(Clone, Debug)]
enum CacheItem<K, V>
where
K: Hash + Eq + Ord + Clone + Debug,
{
Freq(*mut LLNode<CacheItemInner<K>>, V),
Rec(*mut LLNode<CacheItemInner<K>>, V),
GhostFreq(*mut LLNode<CacheItemInner<K>>),
GhostRec(*mut LLNode<CacheItemInner<K>>),
Haunted(*mut LLNode<CacheItemInner<K>>),
}
#[cfg(test)]
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum CacheState {
Freq,
Rec,
GhostFreq,
GhostRec,
Haunted,
None,
}
#[cfg(test)]
#[derive(Debug, PartialEq)]
pub(crate) struct CStat {
max: usize,
cache: usize,
tlocal: usize,
freq: usize,
rec: usize,
ghost_freq: usize,
ghost_rec: usize,
haunted: usize,
p: usize,
}
struct ArcInner<K, V>
where
K: Hash + Eq + Ord + Clone + Debug,
V: Clone + Debug,
{
p: usize,
freq: LL<CacheItemInner<K>>,
rec: LL<CacheItemInner<K>>,
ghost_freq: LL<CacheItemInner<K>>,
ghost_rec: LL<CacheItemInner<K>>,
haunted: LL<CacheItemInner<K>>,
rx: Receiver<CacheEvent<K, V>>,
min_txid: u64,
}
struct ArcShared<K, V>
where
K: Hash + Eq + Ord + Clone + Debug,
V: Clone + Debug,
{
max: usize,
read_max: usize,
tx: Sender<CacheEvent<K, V>>,
}
pub struct ARCache<K, V>
where
K: Hash + Eq + Ord + Clone + Debug,
V: Clone + Debug,
{
cache: HashMap<K, CacheItem<K, V>>,
shared: RwLock<ArcShared<K, V>>,
inner: Mutex<ArcInner<K, V>>,
stats: CowCell<CacheStats>,
}
unsafe impl<K: Hash + Eq + Ord + Clone + Debug, V: Clone + Debug> Send for ARCache<K, V> {}
unsafe impl<K: Hash + Eq + Ord + Clone + Debug, V: Clone + Debug> Sync for ARCache<K, V> {}
struct ReadCache<K, V>
where
K: Hash + Eq + Ord + Clone + Debug,
V: Clone + Debug,
{
set: Map<K, *mut LLNode<(K, V)>>,
read_size: usize,
tlru: LL<(K, V)>,
}
pub struct ARCacheReadTxn<'a, K, V>
where
K: Hash + Eq + Ord + Clone + Debug,
V: Clone + Debug,
{
caller: &'a ARCache<K, V>,
cache: HashMapReadTxn<'a, K, CacheItem<K, V>>,
tlocal: Option<ReadCache<K, V>>,
tx: Sender<CacheEvent<K, V>>,
ts: Instant,
}
pub struct ARCacheWriteTxn<'a, K, V>
where
K: Hash + Eq + Ord + Clone + Debug,
V: Clone + Debug,
{
caller: &'a ARCache<K, V>,
cache: HashMapWriteTxn<'a, K, CacheItem<K, V>>,
tlocal: Map<K, ThreadCacheItem<V>>,
hit: UnsafeCell<Vec<u64>>,
clear: UnsafeCell<bool>,
}
impl<K: Hash + Eq + Ord + Clone + Debug, V: Clone + Debug> CacheItem<K, V> {
fn to_vref(&self) -> Option<&V> {
match &self {
CacheItem::Freq(_, v) | CacheItem::Rec(_, v) => Some(&v),
_ => None,
}
}
#[cfg(test)]
fn to_state(&self) -> CacheState {
match &self {
CacheItem::Freq(_, _v) => CacheState::Freq,
CacheItem::Rec(_, _v) => CacheState::Rec,
CacheItem::GhostFreq(_) => CacheState::GhostFreq,
CacheItem::GhostRec(_) => CacheState::GhostRec,
CacheItem::Haunted(_) => CacheState::Haunted,
}
}
}
macro_rules! drain_ll_to_ghost {
(
$cache:expr,
$ll:expr,
$gf:expr,
$gr:expr,
$txid:expr
) => {{
while $ll.len() > 0 {
let n = $ll.pop();
debug_assert!(!n.is_null());
unsafe {
(*n).as_mut().txid = $txid;
}
let mut r = $cache.get_mut(unsafe { &(*n).as_mut().k });
match r {
Some(ref mut ci) => {
let mut next_state = match &ci {
CacheItem::Freq(n, _) => {
$gf.append_n(*n);
CacheItem::GhostFreq(*n)
}
CacheItem::Rec(n, _) => {
$gr.append_n(*n);
CacheItem::GhostRec(*n)
}
_ => {
unreachable!();
}
};
mem::swap(*ci, &mut next_state);
}
None => {
unreachable!();
}
}
}
}};
}
macro_rules! evict_to_len {
(
$cache:expr,
$ll:expr,
$to_ll:expr,
$size:expr,
$txid:expr
) => {{
debug_assert!($ll.len() >= $size);
while $ll.len() > $size {
let n = $ll.pop();
debug_assert!(!n.is_null());
let mut r = $cache.get_mut(unsafe { &(*n).as_mut().k });
unsafe {
(*n).as_mut().txid = $txid;
}
match r {
Some(ref mut ci) => {
let mut next_state = match &ci {
CacheItem::Freq(llp, _v) => {
debug_assert!(*llp == n);
$to_ll.append_n(*llp);
CacheItem::GhostFreq(*llp)
}
CacheItem::Rec(llp, _v) => {
debug_assert!(*llp == n);
$to_ll.append_n(*llp);
CacheItem::GhostRec(*llp)
}
_ => {
unreachable!();
}
};
mem::swap(*ci, &mut next_state);
}
None => {
unreachable!();
}
}
}
}};
}
macro_rules! evict_to_haunted_len {
(
$cache:expr,
$ll:expr,
$to_ll:expr,
$size:expr,
$txid:expr
) => {{
debug_assert!($ll.len() >= $size);
while $ll.len() > $size {
let n = $ll.pop();
debug_assert!(!n.is_null());
$to_ll.append_n(n);
let mut r = $cache.get_mut(unsafe { &(*n).as_mut().k });
unsafe {
(*n).as_mut().txid = $txid;
}
match r {
Some(ref mut ci) => {
let mut next_state = CacheItem::Haunted(n);
mem::swap(*ci, &mut next_state);
}
None => {
unreachable!();
}
};
}
}};
}
impl<K: Hash + Eq + Ord + Clone + Debug, V: Clone + Debug> ARCache<K, V> {
pub fn new(
total: usize,
threads: usize,
ex_ro_miss: usize,
ex_rw_miss: usize,
read_cache: bool,
) -> Self {
let total = isize::try_from(total).unwrap();
let threads = isize::try_from(threads).unwrap();
let ro_miss = isize::try_from(ex_ro_miss).unwrap();
let wr_miss = isize::try_from(ex_rw_miss).unwrap();
let ratio = isize::try_from(READ_THREAD_RATIO).unwrap();
let max = -((ratio * ((ro_miss * threads) + wr_miss - total)) / (ratio + threads));
let read_max = if read_cache { max / ratio } else { 0 };
let max = usize::try_from(max).unwrap();
let read_max = usize::try_from(read_max).unwrap();
Self::new_size(max, read_max)
}
pub fn new_size(max: usize, read_max: usize) -> Self {
assert!(max > 0);
let (tx, rx) = unbounded();
let shared = RwLock::new(ArcShared { max, read_max, tx });
let inner = Mutex::new(ArcInner {
p: 0,
freq: LL::new(),
rec: LL::new(),
ghost_freq: LL::new(),
ghost_rec: LL::new(),
haunted: LL::new(),
rx,
min_txid: 0,
});
let stats = CowCell::new(CacheStats {
reader_hits: 0,
reader_includes: 0,
write_hits: 0,
write_inc_or_mod: 0,
shared_max: 0,
freq: 0,
recent: 0,
freq_evicts: 0,
recent_evicts: 0,
p_weight: 0,
all_seen_keys: 0,
});
ARCache {
cache: HashMap::new(),
shared,
inner,
stats,
}
}
pub fn read(&self) -> ARCacheReadTxn<K, V> {
let rshared = self.shared.read();
let tlocal = if rshared.read_max > 0 {
Some(ReadCache {
set: Map::new(),
read_size: rshared.read_max,
tlru: LL::new(),
})
} else {
None
};
ARCacheReadTxn {
caller: &self,
cache: self.cache.read(),
tlocal,
tx: rshared.tx.clone(),
ts: Instant::now(),
}
}
pub fn write(&self) -> ARCacheWriteTxn<K, V> {
ARCacheWriteTxn {
caller: &self,
cache: self.cache.write(),
tlocal: Map::new(),
hit: UnsafeCell::new(Vec::new()),
clear: UnsafeCell::new(false),
}
}
pub fn view_stats(&self) -> CowCellReadTxn<CacheStats> {
self.stats.read()
}
fn try_write(&self) -> Option<ARCacheWriteTxn<K, V>> {
self.cache.try_write().map(|cache| ARCacheWriteTxn {
caller: &self,
cache,
tlocal: Map::new(),
hit: UnsafeCell::new(Vec::new()),
clear: UnsafeCell::new(false),
})
}
fn try_quiesce(&self) {
if let Some(wr_txn) = self.try_write() {
wr_txn.commit()
};
}
fn calc_p_freq(ghost_rec_len: usize, ghost_freq_len: usize, p: &mut usize) {
let delta = if ghost_rec_len > ghost_freq_len {
ghost_rec_len / ghost_freq_len
} else {
1
};
if delta < *p {
*p -= delta
} else {
*p = 0
}
}
fn calc_p_rec(cap: usize, ghost_rec_len: usize, ghost_freq_len: usize, p: &mut usize) {
let delta = if ghost_freq_len > ghost_rec_len {
ghost_freq_len / ghost_rec_len
} else {
1
};
if delta <= cap - *p {
*p += delta
} else {
*p = cap
}
}
fn drain_tlocal_inc<'a>(
&'a self,
cache: &mut HashMapWriteTxn<'a, K, CacheItem<K, V>>,
inner: &mut ArcInner<K, V>,
shared: &ArcShared<K, V>,
tlocal: Map<K, ThreadCacheItem<V>>,
commit_txid: u64,
) {
tlocal.into_iter().for_each(|(k, tcio)| {
let r = cache.get_mut(&k);
match (r, tcio) {
(None, ThreadCacheItem::Present(tci, clean)) => {
assert!(clean);
let llp = inner.rec.append_k(CacheItemInner {
k: k.clone(),
txid: commit_txid,
});
cache.insert(k, CacheItem::Rec(llp, tci));
}
(None, ThreadCacheItem::Removed(clean)) => {
assert!(clean);
let llp = inner.haunted.append_k(CacheItemInner {
k: k.clone(),
txid: commit_txid,
});
cache.insert(k, CacheItem::Haunted(llp));
}
(Some(ref mut ci), ThreadCacheItem::Removed(clean)) => {
assert!(clean);
let mut next_state = match ci {
CacheItem::Freq(llp, _v) => {
unsafe { (**llp).as_mut().txid = commit_txid };
inner.freq.extract(*llp);
inner.haunted.append_n(*llp);
CacheItem::Haunted(*llp)
}
CacheItem::Rec(llp, _v) => {
unsafe { (**llp).as_mut().txid = commit_txid };
inner.rec.extract(*llp);
inner.haunted.append_n(*llp);
CacheItem::Haunted(*llp)
}
CacheItem::GhostFreq(llp) => {
unsafe { (**llp).as_mut().txid = commit_txid };
inner.ghost_freq.extract(*llp);
inner.haunted.append_n(*llp);
CacheItem::Haunted(*llp)
}
CacheItem::GhostRec(llp) => {
unsafe { (**llp).as_mut().txid = commit_txid };
inner.ghost_rec.extract(*llp);
inner.haunted.append_n(*llp);
CacheItem::Haunted(*llp)
}
CacheItem::Haunted(llp) => {
unsafe { (**llp).as_mut().txid = commit_txid };
CacheItem::Haunted(*llp)
}
};
mem::swap(*ci, &mut next_state);
}
(Some(ref mut ci), ThreadCacheItem::Present(ref tci, clean)) => {
assert!(clean);
let mut next_state = match ci {
CacheItem::Freq(llp, _v) => {
unsafe { (**llp).as_mut().txid = commit_txid };
inner.freq.touch(*llp);
CacheItem::Freq(*llp, (*tci).clone())
}
CacheItem::Rec(llp, _v) => {
unsafe { (**llp).as_mut().txid = commit_txid };
inner.rec.extract(*llp);
inner.freq.append_n(*llp);
CacheItem::Freq(*llp, (*tci).clone())
}
CacheItem::GhostFreq(llp) => {
Self::calc_p_freq(
inner.ghost_rec.len(),
inner.ghost_freq.len(),
&mut inner.p,
);
unsafe { (**llp).as_mut().txid = commit_txid };
inner.ghost_freq.extract(*llp);
inner.freq.append_n(*llp);
CacheItem::Freq(*llp, (*tci).clone())
}
CacheItem::GhostRec(llp) => {
Self::calc_p_rec(
shared.max,
inner.ghost_rec.len(),
inner.ghost_freq.len(),
&mut inner.p,
);
unsafe { (**llp).as_mut().txid = commit_txid };
inner.ghost_rec.extract(*llp);
inner.rec.append_n(*llp);
CacheItem::Rec(*llp, (*tci).clone())
}
CacheItem::Haunted(llp) => {
unsafe { (**llp).as_mut().txid = commit_txid };
inner.haunted.extract(*llp);
inner.rec.append_n(*llp);
CacheItem::Rec(*llp, (*tci).clone())
}
};
mem::swap(*ci, &mut next_state);
}
}
});
}
fn drain_rx<'a>(
&'a self,
cache: &mut HashMapWriteTxn<'a, K, CacheItem<K, V>>,
inner: &mut ArcInner<K, V>,
shared: &ArcShared<K, V>,
stats: &mut CacheStats,
commit_ts: Instant,
) {
while let Ok(ce) = inner.rx.try_recv() {
let t = match ce {
CacheEvent::Hit(t, k_hash) => {
stats.reader_hits += 1;
if let Some(ref mut ci_slots) = unsafe { cache.get_slot_mut(k_hash) } {
for ref mut ci in ci_slots.iter_mut() {
let mut next_state = match &ci.v {
CacheItem::Freq(llp, v) => {
inner.freq.touch(*llp);
CacheItem::Freq(*llp, v.clone())
}
CacheItem::Rec(llp, v) => {
inner.rec.extract(*llp);
inner.freq.append_n(*llp);
CacheItem::Freq(*llp, v.clone())
}
CacheItem::GhostFreq(llp) => {
inner.ghost_freq.touch(*llp);
CacheItem::GhostFreq(*llp)
}
CacheItem::GhostRec(llp) => {
inner.ghost_rec.touch(*llp);
CacheItem::GhostRec(*llp)
}
CacheItem::Haunted(llp) => {
CacheItem::Haunted(*llp)
}
};
mem::swap(&mut (*ci).v, &mut next_state);
}
}
t
}
CacheEvent::Include(t, k, iv, txid) => {
stats.reader_includes += 1;
let mut r = cache.get_mut(&k);
match r {
Some(ref mut ci) => {
let mut next_state = match &ci {
CacheItem::Freq(llp, _v) => {
inner.freq.touch(*llp);
if unsafe { (**llp).as_ref().txid >= txid }
|| inner.min_txid > txid
{
None
} else {
unsafe { (**llp).as_mut().txid = txid };
Some(CacheItem::Freq(*llp, iv))
}
}
CacheItem::Rec(llp, v) => {
inner.rec.extract(*llp);
inner.freq.append_n(*llp);
if unsafe { (**llp).as_ref().txid >= txid }
|| inner.min_txid > txid
{
Some(CacheItem::Freq(*llp, v.clone()))
} else {
unsafe { (**llp).as_mut().txid = txid };
Some(CacheItem::Freq(*llp, iv))
}
}
CacheItem::GhostFreq(llp) => {
Self::calc_p_freq(
inner.ghost_rec.len(),
inner.ghost_freq.len(),
&mut inner.p,
);
inner.ghost_freq.extract(*llp);
if unsafe { (**llp).as_ref().txid > txid }
|| inner.min_txid > txid
{
inner.ghost_freq.append_n(*llp);
None
} else {
inner.freq.append_n(*llp);
unsafe { (**llp).as_mut().txid = txid };
Some(CacheItem::Freq(*llp, iv))
}
}
CacheItem::GhostRec(llp) => {
Self::calc_p_rec(
shared.max,
inner.ghost_rec.len(),
inner.ghost_freq.len(),
&mut inner.p,
);
if unsafe { (**llp).as_ref().txid > txid }
|| inner.min_txid > txid
{
inner.ghost_rec.touch(*llp);
None
} else {
inner.ghost_rec.extract(*llp);
inner.rec.append_n(*llp);
unsafe { (**llp).as_mut().txid = txid };
Some(CacheItem::Rec(*llp, iv))
}
}
CacheItem::Haunted(llp) => {
if unsafe { (**llp).as_ref().txid > txid }
|| inner.min_txid > txid
{
None
} else {
inner.haunted.extract(*llp);
inner.rec.append_n(*llp);
unsafe { (**llp).as_mut().txid = txid };
Some(CacheItem::Rec(*llp, iv))
}
}
};
if let Some(ref mut next_state) = next_state {
mem::swap(*ci, next_state);
}
}
None => {
if txid >= inner.min_txid {
let llp = inner.rec.append_k(CacheItemInner { k: k.clone(), txid });
cache.insert(k, CacheItem::Rec(llp, iv));
}
}
};
t
}
};
if t >= commit_ts {
break;
}
}
}
fn drain_tlocal_hits<'a>(
&'a self,
cache: &mut HashMapWriteTxn<'a, K, CacheItem<K, V>>,
inner: &mut ArcInner<K, V>,
commit_txid: u64,
hit: Vec<u64>,
) {
hit.into_iter().for_each(|k_hash| {
let mut r = unsafe { cache.get_slot_mut(k_hash) };
match r {
Some(ref mut ci_slots) => {
for ref mut ci in ci_slots.iter_mut() {
let mut next_state = match &ci.v {
CacheItem::Freq(llp, v) => {
if unsafe { (**llp).as_ref().txid != commit_txid } {
inner.freq.touch(*llp);
Some(CacheItem::Freq(*llp, v.clone()))
} else {
None
}
}
CacheItem::Rec(llp, v) => {
if unsafe { (**llp).as_ref().txid != commit_txid } {
inner.rec.extract(*llp);
inner.freq.append_n(*llp);
Some(CacheItem::Freq(*llp, v.clone()))
} else {
None
}
}
_ => {
None
}
};
if let Some(ref mut next_state) = next_state {
mem::swap(&mut (*ci).v, next_state);
}
}
}
None => {
unreachable!();
}
}
});
}
#[allow(clippy::cognitive_complexity)]
fn evict<'a>(
&'a self,
cache: &mut HashMapWriteTxn<'a, K, CacheItem<K, V>>,
inner: &mut ArcInner<K, V>,
shared: &ArcShared<K, V>,
stats: &mut CacheStats,
commit_txid: u64,
) {
debug_assert!(inner.p <= shared.max);
let p = inner.p;
stats.p_weight = p;
if inner.rec.len() + inner.freq.len() > shared.max {
let delta = (inner.rec.len() + inner.freq.len()) - shared.max;
let rec_to_len = if inner.p == 0 {
debug_assert!(delta <= inner.rec.len());
inner.rec.len() - delta
} else if inner.rec.len() > inner.p {
let rec_delta = inner.rec.len() - inner.p;
if rec_delta > delta {
inner.rec.len() - delta
} else {
inner.rec.len() - rec_delta
}
} else {
inner.rec.len()
};
debug_assert!(shared.max >= rec_to_len);
let freq_to_len = shared.max - rec_to_len;
debug_assert!(freq_to_len + rec_to_len <= shared.max);
stats.freq_evicts += inner.freq.len() - freq_to_len;
stats.recent_evicts += inner.rec.len() - rec_to_len;
evict_to_len!(
cache,
inner.rec,
&mut inner.ghost_rec,
rec_to_len,
commit_txid
);
evict_to_len!(
cache,
inner.freq,
&mut inner.ghost_freq,
freq_to_len,
commit_txid
);
if inner.ghost_rec.len() > (shared.max - p) {
evict_to_haunted_len!(
cache,
inner.ghost_rec,
&mut inner.haunted,
freq_to_len,
commit_txid
);
}
if inner.ghost_freq.len() > p {
evict_to_haunted_len!(
cache,
inner.ghost_freq,
&mut inner.haunted,
rec_to_len,
commit_txid
);
}
}
}
#[allow(clippy::unnecessary_mut_passed)]
fn commit<'a>(
&'a self,
mut cache: HashMapWriteTxn<'a, K, CacheItem<K, V>>,
tlocal: Map<K, ThreadCacheItem<V>>,
hit: Vec<u64>,
clear: bool,
) {
let commit_ts = Instant::now();
let commit_txid = cache.get_txid();
let mut inner = self.inner.lock();
let shared = self.shared.read();
let mut stat_guard = self.stats.write();
let stats = stat_guard.get_mut();
if clear {
inner.min_txid = commit_txid;
stats.freq_evicts += inner.freq.len();
stats.recent_evicts += inner.rec.len();
drain_ll_to_ghost!(
&mut cache,
inner.freq,
inner.ghost_freq,
inner.ghost_rec,
commit_txid
);
drain_ll_to_ghost!(
&mut cache,
inner.rec,
inner.ghost_freq,
inner.ghost_rec,
commit_txid
);
}
stats.write_inc_or_mod += tlocal.len();
self.drain_tlocal_inc(
&mut cache,
inner.deref_mut(),
shared.deref(),
tlocal,
commit_txid,
);
self.drain_rx(
&mut cache,
inner.deref_mut(),
shared.deref(),
stats,
commit_ts,
);
stats.write_hits += hit.len();
self.drain_tlocal_hits(&mut cache, inner.deref_mut(), commit_txid, hit);
self.evict(
&mut cache,
inner.deref_mut(),
shared.deref(),
stats,
commit_txid,
);
stats.shared_max = shared.max;
stats.freq = inner.freq.len();
stats.recent = inner.rec.len();
stats.all_seen_keys = cache.len();
stat_guard.commit();
cache.commit();
}
}
impl<'a, K: Hash + Eq + Ord + Clone + Debug, V: Clone + Debug> ARCacheWriteTxn<'a, K, V> {
pub fn commit(self) {
self.caller.commit(
self.cache,
self.tlocal,
self.hit.into_inner(),
self.clear.into_inner(),
)
}
pub fn clear(&mut self) {
unsafe {
let clear_ptr = self.clear.get();
*clear_ptr = true;
}
unsafe {
let hit_ptr = self.hit.get();
(*hit_ptr).clear();
}
self.tlocal.clear();
}
pub fn get<'b, Q: ?Sized>(&'a self, k: &'b Q) -> Option<&'a V>
where
K: Borrow<Q>,
Q: Hash + Eq + Ord,
{
let k_hash: u64 = self.cache.prehash(k);
let r: Option<&V> = if let Some(tci) = self.tlocal.get(k) {
match tci {
ThreadCacheItem::Present(v, _clean) => {
let v = v as *const _;
unsafe { Some(&(*v)) }
}
ThreadCacheItem::Removed(_clean) => {
return None;
}
}
} else {
let is_cleared = unsafe {
let clear_ptr = self.clear.get();
*clear_ptr
};
if !is_cleared {
if let Some(v) = self.cache.get_prehashed(k, k_hash) {
(*v).to_vref()
} else {
None
}
} else {
None
}
};
if r.is_some() {
unsafe {
let hit_ptr = self.hit.get();
(*hit_ptr).push(k_hash);
}
}
r
}
pub fn contains_key<'b, Q: ?Sized>(&'a self, k: &'b Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + Ord,
{
self.get(k).is_some()
}
pub fn insert(&mut self, k: K, v: V) {
self.tlocal.insert(k, ThreadCacheItem::Present(v, true));
}
pub fn remove(&mut self, k: K) {
self.tlocal.insert(k, ThreadCacheItem::Removed(true));
}
pub fn insert_dirty(&mut self, k: K, v: V) {
self.tlocal.insert(k, ThreadCacheItem::Present(v, false));
}
pub fn remove_dirty(&mut self, k: K) {
self.tlocal.insert(k, ThreadCacheItem::Removed(false));
}
pub fn iter_mut_mark_clean(&mut self) -> impl Iterator<Item = (&K, Option<&mut V>)> {
self.tlocal
.iter_mut()
.filter(|(_k, v)| match v {
ThreadCacheItem::Present(_v, c) => !c,
ThreadCacheItem::Removed(c) => !c,
})
.map(|(k, v)| {
match v {
ThreadCacheItem::Present(_v, c) => *c = true,
ThreadCacheItem::Removed(c) => *c = true,
}
let data = match v {
ThreadCacheItem::Present(v, _c) => Some(v),
ThreadCacheItem::Removed(_c) => None,
};
(k, data)
})
}
#[cfg(test)]
pub(crate) fn iter_rec(&self) -> impl Iterator<Item = &K> {
self.cache.values().filter_map(|ci| match &ci {
CacheItem::Rec(lln, _) => unsafe {
let cii = &*((**lln).k.as_ptr());
Some(&cii.k)
},
_ => None,
})
}
#[cfg(test)]
pub(crate) fn iter_ghost_rec(&self) -> impl Iterator<Item = &K> {
self.cache.values().filter_map(|ci| match &ci {
CacheItem::GhostRec(lln) => unsafe {
let cii = &*((**lln).k.as_ptr());
Some(&cii.k)
},
_ => None,
})
}
#[cfg(test)]
pub(crate) fn iter_ghost_freq(&self) -> impl Iterator<Item = &K> {
self.cache.values().filter_map(|ci| match &ci {
CacheItem::GhostFreq(lln) => unsafe {
let cii = &*((**lln).k.as_ptr());
Some(&cii.k)
},
_ => None,
})
}
#[cfg(test)]
pub(crate) fn peek_hit(&self) -> &[u64] {
let hit_ptr = self.hit.get();
unsafe { &(*hit_ptr) }
}
#[cfg(test)]
pub(crate) fn peek_cache<'b, Q: ?Sized>(&'a self, k: &'b Q) -> CacheState
where
K: Borrow<Q>,
Q: Hash + Eq + Ord,
{
if let Some(v) = self.cache.get(k) {
(*v).to_state()
} else {
CacheState::None
}
}
#[cfg(test)]
pub(crate) fn peek_stat(&self) -> CStat {
let inner = self.caller.inner.lock();
let shared = self.caller.shared.read();
CStat {
max: shared.max,
cache: self.cache.len(),
tlocal: self.tlocal.len(),
freq: inner.freq.len(),
rec: inner.rec.len(),
ghost_freq: inner.ghost_freq.len(),
ghost_rec: inner.ghost_rec.len(),
haunted: inner.haunted.len(),
p: inner.p,
}
}
}
impl<'a, K: Hash + Eq + Ord + Clone + Debug, V: Clone + Debug> ARCacheReadTxn<'a, K, V> {
pub fn get<'b, Q: ?Sized>(&'b self, k: &'b Q) -> Option<&'b V>
where
K: Borrow<Q>,
Q: Hash + Eq + Ord,
{
let k_hash: u64 = self.cache.prehash(k);
let r: Option<&V> = self
.tlocal
.as_ref()
.and_then(|cache| {
cache.set.get(k).and_then(|v| unsafe {
let v = &(**v).as_ref().1 as *const _;
Some(&(*v))
})
})
.or_else(|| {
self.cache.get_prehashed(k, k_hash).and_then(|v| {
(*v).to_vref().map(|vin| unsafe {
let vin = vin as *const _;
&(*vin)
})
})
});
if r.is_some() {
self.tx
.send(CacheEvent::Hit(self.ts, k_hash))
.expect("Invalid tx state");
}
r
}
pub fn contains_key<'b, Q: ?Sized>(&mut self, k: &'b Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + Ord,
{
self.get(k).is_some()
}
pub fn insert(&mut self, k: K, mut v: V) {
self.tx
.send(CacheEvent::Include(
self.ts,
k.clone(),
v.clone(),
self.cache.get_txid(),
))
.expect("Invalid tx state!");
if let Some(ref mut cache) = self.tlocal {
let n = if cache.tlru.len() >= cache.read_size {
let n = cache.tlru.pop();
let mut k_clone = k.clone();
unsafe {
mem::swap(&mut k_clone, &mut (*n).as_mut().0);
mem::swap(&mut v, &mut (*n).as_mut().1);
}
cache.set.remove(&k_clone);
n
} else {
cache.tlru.append_k((k.clone(), v))
};
let r = cache.set.insert(k, n);
assert!(r.is_none());
}
}
}
impl<'a, K: Hash + Eq + Ord + Clone + Debug, V: Clone + Debug> Drop for ARCacheReadTxn<'a, K, V> {
fn drop(&mut self) {
self.caller.try_quiesce();
}
}
#[cfg(test)]
mod tests {
use crate::arcache::ARCache as Arc;
use crate::arcache::CStat;
use crate::arcache::CacheState;
#[test]
fn test_cache_arc_basic() {
let arc: Arc<usize, usize> = Arc::new_size(4, 4);
let mut wr_txn = arc.write();
assert!(wr_txn.get(&1) == None);
assert!(wr_txn.peek_hit().len() == 0);
wr_txn.insert(1, 1);
assert!(wr_txn.get(&1) == Some(&1));
assert!(wr_txn.peek_hit().len() == 1);
wr_txn.commit();
let wr_txn = arc.write();
assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
assert!(wr_txn.get(&1) == Some(&1));
assert!(wr_txn.peek_hit().len() == 1);
wr_txn.commit();
let wr_txn = arc.write();
assert!(wr_txn.peek_cache(&1) == CacheState::Freq);
println!("{:?}", wr_txn.peek_stat());
}
#[test]
fn test_cache_evict() {
println!("== 1");
let arc: Arc<usize, usize> = Arc::new_size(4, 4);
let mut wr_txn = arc.write();
assert!(
CStat {
max: 4,
cache: 0,
tlocal: 0,
freq: 0,
rec: 0,
ghost_freq: 0,
ghost_rec: 0,
haunted: 0,
p: 0
} == wr_txn.peek_stat()
);
wr_txn.insert(1, 1);
wr_txn.insert(2, 2);
wr_txn.insert(3, 3);
wr_txn.insert(4, 4);
assert!(
CStat {
max: 4,
cache: 0,
tlocal: 4,
freq: 0,
rec: 0,
ghost_freq: 0,
ghost_rec: 0,
haunted: 0,
p: 0
} == wr_txn.peek_stat()
);
wr_txn.commit();
println!("== 2");
let wr_txn = arc.write();
assert!(
CStat {
max: 4,
cache: 4,
tlocal: 0,
freq: 0,
rec: 4,
ghost_freq: 0,
ghost_rec: 0,
haunted: 0,
p: 0
} == wr_txn.peek_stat()
);
assert!(wr_txn.get(&1) == Some(&1));
assert!(wr_txn.get(&1) == Some(&1));
assert!(wr_txn.get(&2) == Some(&2));
wr_txn.commit();
println!("== 3");
let mut wr_txn = arc.write();
assert!(
CStat {
max: 4,
cache: 4,
tlocal: 0,
freq: 2,
rec: 2,
ghost_freq: 0,
ghost_rec: 0,
haunted: 0,
p: 0
} == wr_txn.peek_stat()
);
wr_txn.insert(5, 5);
wr_txn.commit();
println!("== 4");
let wr_txn = arc.write();
println!("stat -> {:?}", wr_txn.peek_stat());
assert!(
CStat {
max: 4,
cache: 5,
tlocal: 0,
freq: 2,
rec: 2,
ghost_freq: 0,
ghost_rec: 1,
haunted: 0,
p: 0
} == wr_txn.peek_stat()
);
let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
assert!(wr_txn.get(&rec_set[0]) == Some(&rec_set[0]));
assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
wr_txn.commit();
println!("== 5");
let mut wr_txn = arc.write();
println!("stat -> {:?}", wr_txn.peek_stat());
assert!(
CStat {
max: 4,
cache: 5,
tlocal: 0,
freq: 4,
rec: 0,
ghost_freq: 0,
ghost_rec: 1,
haunted: 0,
p: 0
} == wr_txn.peek_stat()
);
let grec: usize = wr_txn.iter_ghost_rec().take(1).copied().next().unwrap();
wr_txn.insert(grec, grec);
assert!(wr_txn.get(&grec) == Some(&grec));
wr_txn.commit();
println!("== 6");
let mut wr_txn = arc.write();
println!("stat -> {:?}", wr_txn.peek_stat());
assert!(
CStat {
max: 4,
cache: 5,
tlocal: 0,
freq: 3,
rec: 1,
ghost_freq: 1,
ghost_rec: 0,
haunted: 0,
p: 1
} == wr_txn.peek_stat()
);
assert!(wr_txn.peek_cache(&grec) == CacheState::Rec);
wr_txn.insert(10, 10);
wr_txn.insert(11, 11);
wr_txn.insert(12, 12);
wr_txn.commit();
println!("== 7");
let mut wr_txn = arc.write();
println!("stat -> {:?}", wr_txn.peek_stat());
assert!(
CStat {
max: 4,
cache: 8,
tlocal: 0,
freq: 3,
rec: 1,
ghost_freq: 1,
ghost_rec: 3,
haunted: 0,
p: 1
} == wr_txn.peek_stat()
);
let grec_set: Vec<usize> = wr_txn.iter_ghost_rec().take(3).copied().collect();
grec_set.iter().for_each(|i| wr_txn.insert(*i, *i));
wr_txn.commit();
println!("== 8");
let mut wr_txn = arc.write();
println!("stat -> {:?}", wr_txn.peek_stat());
assert!(
CStat {
max: 4,
cache: 8,
tlocal: 0,
freq: 0,
rec: 4,
ghost_freq: 4,
ghost_rec: 0,
haunted: 0,
p: 4
} == wr_txn.peek_stat()
);
grec_set
.iter()
.for_each(|i| assert!(wr_txn.peek_cache(i) == CacheState::Rec));
let gfreq_set: Vec<usize> = wr_txn.iter_ghost_freq().take(4).copied().collect();
gfreq_set.iter().for_each(|i| wr_txn.insert(*i, *i));
wr_txn.commit();
println!("== 9");
let wr_txn = arc.write();
println!("stat -> {:?}", wr_txn.peek_stat());
assert!(
CStat {
max: 4,
cache: 8,
tlocal: 0,
freq: 4,
rec: 0,
ghost_freq: 0,
ghost_rec: 4,
haunted: 0,
p: 0
} == wr_txn.peek_stat()
);
gfreq_set
.iter()
.for_each(|i| assert!(wr_txn.peek_cache(i) == CacheState::Freq));
wr_txn.commit();
let stats = arc.view_stats();
println!("{:?}", *stats);
}
#[test]
fn test_cache_concurrent_basic() {
let arc: Arc<usize, usize> = Arc::new_size(4, 4);
{
let mut rd_txn = arc.read();
rd_txn.insert(1, 1);
rd_txn.insert(2, 2);
rd_txn.insert(3, 3);
rd_txn.insert(4, 4);
}
arc.try_quiesce();
println!("== 2");
let wr_txn = arc.write();
println!("{:?}", wr_txn.peek_stat());
assert!(
CStat {
max: 4,
cache: 4,
tlocal: 0,
freq: 0,
rec: 4,
ghost_freq: 0,
ghost_rec: 0,
haunted: 0,
p: 0
} == wr_txn.peek_stat()
);
assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
assert!(wr_txn.peek_cache(&3) == CacheState::Rec);
assert!(wr_txn.peek_cache(&4) == CacheState::Rec);
{
let mut rd_txn = arc.read();
assert!(rd_txn.get(&3) == Some(&3));
assert!(rd_txn.get(&4) == Some(&4));
rd_txn.insert(5, 5);
rd_txn.insert(6, 6);
}
wr_txn.commit();
println!("== 3");
let wr_txn = arc.write();
assert!(
CStat {
max: 4,
cache: 6,
tlocal: 0,
freq: 2,
rec: 2,
ghost_freq: 0,
ghost_rec: 2,
haunted: 0,
p: 0
} == wr_txn.peek_stat()
);
assert!(wr_txn.peek_cache(&1) == CacheState::GhostRec);
assert!(wr_txn.peek_cache(&2) == CacheState::GhostRec);
assert!(wr_txn.peek_cache(&3) == CacheState::Freq);
assert!(wr_txn.peek_cache(&4) == CacheState::Freq);
assert!(wr_txn.peek_cache(&5) == CacheState::Rec);
assert!(wr_txn.peek_cache(&6) == CacheState::Rec);
{
let mut rd_txn = arc.read();
rd_txn.insert(1, 1);
rd_txn.insert(2, 2);
}
wr_txn.commit();
println!("== 4");
let wr_txn = arc.write();
assert!(
CStat {
max: 4,
cache: 6,
tlocal: 0,
freq: 2,
rec: 2,
ghost_freq: 0,
ghost_rec: 2,
haunted: 0,
p: 2
} == wr_txn.peek_stat()
);
assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
assert!(wr_txn.peek_cache(&3) == CacheState::Freq);
assert!(wr_txn.peek_cache(&4) == CacheState::Freq);
assert!(wr_txn.peek_cache(&5) == CacheState::GhostRec);
assert!(wr_txn.peek_cache(&6) == CacheState::GhostRec);
let stats = arc.view_stats();
println!("{:?}", *stats);
}
#[test]
fn test_cache_concurrent_cursed_1() {
let arc: Arc<usize, usize> = Arc::new_size(4, 4);
let mut wr_txn = arc.write();
let mut rd_txn = arc.read();
wr_txn.insert(1, 1);
assert!(rd_txn.get(&1) == None);
wr_txn.commit();
assert!(rd_txn.get(&1) == None);
let mut wr_txn = arc.write();
wr_txn.insert(10, 1);
wr_txn.insert(11, 1);
wr_txn.insert(12, 1);
wr_txn.insert(13, 1);
wr_txn.insert(14, 1);
wr_txn.insert(15, 1);
wr_txn.insert(16, 1);
wr_txn.insert(17, 1);
wr_txn.commit();
let wr_txn = arc.write();
assert!(wr_txn.peek_cache(&1) == CacheState::Haunted);
assert!(rd_txn.get(&1) == None);
rd_txn.insert(1, 100);
wr_txn.commit();
let wr_txn = arc.write();
assert!(wr_txn.peek_cache(&1) == CacheState::Haunted);
assert!(rd_txn.get(&1) == Some(&100));
}
#[test]
fn test_cache_clear() {
let arc: Arc<usize, usize> = Arc::new_size(4, 4);
let mut wr_txn = arc.write();
wr_txn.insert(10, 10);
wr_txn.insert(11, 11);
wr_txn.insert(12, 12);
wr_txn.insert(13, 13);
wr_txn.insert(14, 14);
wr_txn.insert(15, 15);
wr_txn.insert(16, 16);
wr_txn.insert(17, 17);
wr_txn.commit();
let wr_txn = arc.write();
let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
println!("{:?}", rec_set);
assert!(wr_txn.get(&rec_set[0]) == Some(&rec_set[0]));
assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
wr_txn.commit();
let mut wr_txn = arc.write();
println!("stat -> {:?}", wr_txn.peek_stat());
assert!(
CStat {
max: 4,
cache: 8,
tlocal: 0,
freq: 2,
rec: 2,
ghost_freq: 0,
ghost_rec: 4,
haunted: 0,
p: 0
} == wr_txn.peek_stat()
);
wr_txn.clear();
wr_txn.commit();
let wr_txn = arc.write();
println!("stat -> {:?}", wr_txn.peek_stat());
assert!(
CStat {
max: 4,
cache: 8,
tlocal: 0,
freq: 0,
rec: 0,
ghost_freq: 2,
ghost_rec: 6,
haunted: 0,
p: 0
} == wr_txn.peek_stat()
);
let stats = arc.view_stats();
println!("{:?}", *stats);
}
#[test]
fn test_cache_clear_rollback() {
let arc: Arc<usize, usize> = Arc::new_size(4, 4);
let mut wr_txn = arc.write();
wr_txn.insert(10, 10);
wr_txn.insert(11, 11);
wr_txn.insert(12, 12);
wr_txn.insert(13, 13);
wr_txn.insert(14, 14);
wr_txn.insert(15, 15);
wr_txn.insert(16, 16);
wr_txn.insert(17, 17);
wr_txn.commit();
let wr_txn = arc.write();
let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
println!("{:?}", rec_set);
let r = wr_txn.get(&rec_set[0]);
println!("{:?}", r);
assert!(r == Some(&rec_set[0]));
assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
wr_txn.commit();
let mut wr_txn = arc.write();
println!("stat -> {:?}", wr_txn.peek_stat());
assert!(
CStat {
max: 4,
cache: 8,
tlocal: 0,
freq: 2,
rec: 2,
ghost_freq: 0,
ghost_rec: 4,
haunted: 0,
p: 0
} == wr_txn.peek_stat()
);
wr_txn.clear();
drop(wr_txn);
let wr_txn = arc.write();
println!("stat -> {:?}", wr_txn.peek_stat());
assert!(
CStat {
max: 4,
cache: 8,
tlocal: 0,
freq: 2,
rec: 2,
ghost_freq: 0,
ghost_rec: 4,
haunted: 0,
p: 0
} == wr_txn.peek_stat()
);
}
#[test]
fn test_cache_clear_cursed() {
let arc: Arc<usize, usize> = Arc::new_size(4, 4);
let mut wr_txn = arc.write();
wr_txn.insert(10, 1);
wr_txn.commit();
let wr_txn = arc.write();
assert!(wr_txn.peek_cache(&10) == CacheState::Rec);
wr_txn.commit();
let mut rd_txn = arc.read();
let mut wr_txn = arc.write();
wr_txn.clear();
wr_txn.commit();
assert!(rd_txn.get(&10) == Some(&1));
rd_txn.insert(11, 1);
std::mem::drop(rd_txn);
arc.try_quiesce();
let wr_txn = arc.write();
assert!(wr_txn.peek_cache(&10) == CacheState::GhostRec);
println!("--> {:?}", wr_txn.peek_cache(&11));
assert!(wr_txn.peek_cache(&11) == CacheState::None);
}
#[test]
fn test_cache_dirty_write() {
let arc: Arc<usize, usize> = Arc::new_size(4, 4);
let mut wr_txn = arc.write();
wr_txn.insert_dirty(10, 1);
wr_txn.iter_mut_mark_clean().for_each(|(_k, _v)| {});
wr_txn.commit();
}
#[test]
fn test_cache_read_no_tlocal() {
let arc: Arc<usize, usize> = Arc::new_size(4, 0);
{
let mut rd_txn = arc.read();
rd_txn.insert(1, 1);
rd_txn.insert(2, 2);
rd_txn.insert(3, 3);
rd_txn.insert(4, 4);
assert!(rd_txn.get(&1).is_none());
assert!(rd_txn.get(&2).is_none());
assert!(rd_txn.get(&3).is_none());
assert!(rd_txn.get(&4).is_none());
}
arc.try_quiesce();
println!("== 2");
let wr_txn = arc.write();
assert!(
CStat {
max: 4,
cache: 4,
tlocal: 0,
freq: 0,
rec: 4,
ghost_freq: 0,
ghost_rec: 0,
haunted: 0,
p: 0
} == wr_txn.peek_stat()
);
assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
assert!(wr_txn.peek_cache(&3) == CacheState::Rec);
assert!(wr_txn.peek_cache(&4) == CacheState::Rec);
}
}