use super::{
deques::Deques,
housekeeper::{Housekeeper, InnerSync, SyncPace},
invalidator::{GetOrRemoveEntry, InvalidationResult, Invalidator, KeyDateLite, PredicateFun},
KeyDate, KeyHash, KeyHashDate, PredicateId, ReadOp, ValueEntry, WriteOp,
};
use crate::{
common::{
atomic_time::AtomicInstant,
deque::{CacheRegion, DeqNode, Deque},
frequency_sketch::FrequencySketch,
time::{CheckedTimeOps, Clock, Instant},
AccessTime,
},
PredicateError,
};
use crossbeam_channel::{Receiver, Sender, TrySendError};
use parking_lot::{Mutex, RwLock};
use std::{
borrow::Borrow,
collections::hash_map::RandomState,
convert::TryInto,
hash::{BuildHasher, Hash, Hasher},
ptr::NonNull,
rc::Rc,
sync::{
atomic::{AtomicBool, AtomicU8, Ordering},
Arc,
},
time::Duration,
};
pub(crate) const MAX_SYNC_REPEATS: usize = 4;
const READ_LOG_FLUSH_POINT: usize = 512;
const READ_LOG_SIZE: usize = READ_LOG_FLUSH_POINT * (MAX_SYNC_REPEATS + 2);
const WRITE_LOG_FLUSH_POINT: usize = 512;
const WRITE_LOG_LOW_WATER_MARK: usize = WRITE_LOG_FLUSH_POINT / 2;
const WRITE_LOG_SIZE: usize = WRITE_LOG_FLUSH_POINT * (MAX_SYNC_REPEATS + 2);
pub(crate) const WRITE_RETRY_INTERVAL_MICROS: u64 = 50;
pub(crate) const PERIODICAL_SYNC_INITIAL_DELAY_MILLIS: u64 = 500;
pub(crate) const PERIODICAL_SYNC_NORMAL_PACE_MILLIS: u64 = 300;
pub(crate) const PERIODICAL_SYNC_FAST_PACE_NANOS: u64 = 500;
pub(crate) type HouseKeeperArc<K, V, S> = Arc<Housekeeper<Inner<K, V, S>>>;
pub(crate) struct BaseCache<K, V, S = RandomState> {
pub(crate) inner: Arc<Inner<K, V, S>>,
read_op_ch: Sender<ReadOp<K, V>>,
pub(crate) write_op_ch: Sender<WriteOp<K, V>>,
pub(crate) housekeeper: Option<HouseKeeperArc<K, V, S>>,
}
impl<K, V, S> Clone for BaseCache<K, V, S> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
read_op_ch: self.read_op_ch.clone(),
write_op_ch: self.write_op_ch.clone(),
housekeeper: self.housekeeper.as_ref().map(Arc::clone),
}
}
}
impl<K, V, S> Drop for BaseCache<K, V, S> {
fn drop(&mut self) {
std::mem::drop(self.housekeeper.take());
}
}
impl<K, V, S> BaseCache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
pub(crate) fn new(
max_capacity: usize,
initial_capacity: Option<usize>,
build_hasher: S,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
invalidator_enabled: bool,
) -> Self {
let (r_snd, r_rcv) = crossbeam_channel::bounded(READ_LOG_SIZE);
let (w_snd, w_rcv) = crossbeam_channel::bounded(WRITE_LOG_SIZE);
let inner = Arc::new(Inner::new(
max_capacity,
initial_capacity,
build_hasher,
r_rcv,
w_rcv,
time_to_live,
time_to_idle,
invalidator_enabled,
));
if invalidator_enabled {
inner.set_invalidator(&inner);
}
let housekeeper = Housekeeper::new(Arc::downgrade(&inner));
Self {
inner,
read_op_ch: r_snd,
write_op_ch: w_snd,
housekeeper: Some(Arc::new(housekeeper)),
}
}
#[inline]
pub(crate) fn hash<Q>(&self, key: &Q) -> u64
where
Arc<K>: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.inner.hash(key)
}
pub(crate) fn get_with_hash<Q>(&self, key: &Q, hash: u64) -> Option<V>
where
Arc<K>: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let record = |op| {
self.record_read_op(op).expect("Failed to record a get op");
};
match self.inner.get_key_value(key) {
None => {
record(ReadOp::Miss(hash));
None
}
Some((arc_key, entry)) => {
let i = &self.inner;
let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
let now = i.current_time_from_expiration_clock();
if is_expired_entry_wo(ttl, va, &entry, now)
|| is_expired_entry_ao(tti, va, &entry, now)
|| self.inner.is_invalidated_entry(&arc_key, &entry)
{
record(ReadOp::Miss(hash));
None
} else {
let v = entry.value.clone();
record(ReadOp::Hit(hash, entry, now));
Some(v)
}
}
}
}
#[inline]
pub(crate) fn remove<Q>(&self, key: &Q) -> Option<Arc<ValueEntry<K, V>>>
where
Arc<K>: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.inner.remove(key)
}
#[inline]
pub(crate) fn apply_reads_writes_if_needed(
ch: &Sender<WriteOp<K, V>>,
housekeeper: Option<&HouseKeeperArc<K, V, S>>,
) {
let w_len = ch.len();
if Self::should_apply_writes(w_len) {
if let Some(h) = housekeeper {
h.try_schedule_sync();
}
}
}
pub(crate) fn invalidate_all(&self) {
let now = self.inner.current_time_from_expiration_clock();
self.inner.set_valid_after(now);
}
pub(crate) fn invalidate_entries_if(
&self,
predicate: PredicateFun<K, V>,
) -> Result<PredicateId, PredicateError> {
let now = self.inner.current_time_from_expiration_clock();
self.inner.register_invalidation_predicate(predicate, now)
}
pub(crate) fn max_capacity(&self) -> usize {
self.inner.max_capacity()
}
pub(crate) fn time_to_live(&self) -> Option<Duration> {
self.inner.time_to_live()
}
pub(crate) fn time_to_idle(&self) -> Option<Duration> {
self.inner.time_to_idle()
}
}
impl<K, V, S> BaseCache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
#[inline]
fn record_read_op(&self, op: ReadOp<K, V>) -> Result<(), TrySendError<ReadOp<K, V>>> {
self.apply_reads_if_needed();
let ch = &self.read_op_ch;
match ch.try_send(op) {
Ok(()) | Err(TrySendError::Full(_)) => Ok(()),
Err(e @ TrySendError::Disconnected(_)) => Err(e),
}
}
#[inline]
pub(crate) fn do_insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) -> WriteOp<K, V> {
let op_cnt1 = Rc::new(AtomicU8::new(0));
let op_cnt2 = Rc::clone(&op_cnt1);
let mut op1 = None;
let mut op2 = None;
self.inner.cache.insert_with_or_modify(
Arc::clone(&key),
|| {
let entry = Arc::new(ValueEntry::new(value.clone()));
let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed);
op1 = Some((
cnt,
WriteOp::Upsert(KeyHash::new(Arc::clone(&key), hash), Arc::clone(&entry)),
));
entry
},
|_k, old_entry| {
let entry = Arc::new(ValueEntry::new_with(value.clone(), old_entry));
let cnt = op_cnt2.fetch_add(1, Ordering::Relaxed);
op2 = Some((
cnt,
Arc::clone(old_entry),
WriteOp::Upsert(KeyHash::new(Arc::clone(&key), hash), Arc::clone(&entry)),
));
entry
},
);
match (op1, op2) {
(Some((_cnt, ins_op)), None) => ins_op,
(None, Some((_cnt, old_entry, upd_op))) => {
old_entry.unset_q_nodes();
upd_op
}
(Some((cnt1, ins_op)), Some((cnt2, old_entry, upd_op))) => {
if cnt1 > cnt2 {
ins_op
} else {
old_entry.unset_q_nodes();
upd_op
}
}
(None, None) => unreachable!(),
}
}
#[inline]
fn apply_reads_if_needed(&self) {
let len = self.read_op_ch.len();
if Self::should_apply_reads(len) {
if let Some(h) = &self.housekeeper {
h.try_schedule_sync();
}
}
}
#[inline]
fn should_apply_reads(ch_len: usize) -> bool {
ch_len >= READ_LOG_FLUSH_POINT
}
#[inline]
fn should_apply_writes(ch_len: usize) -> bool {
ch_len >= WRITE_LOG_FLUSH_POINT
}
}
#[cfg(test)]
impl<K, V, S> BaseCache<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
pub(crate) fn table_size(&self) -> usize {
self.inner.len()
}
pub(crate) fn invalidation_predicate_count(&self) -> usize {
self.inner.invalidation_predicate_count()
}
pub(crate) fn reconfigure_for_testing(&mut self) {
if let Some(housekeeper) = &self.housekeeper {
let mut job = housekeeper.periodical_sync_job().lock();
if let Some(job) = job.take() {
job.cancel();
}
}
}
pub(crate) fn set_expiration_clock(&self, clock: Option<Clock>) {
self.inner.set_expiration_clock(clock);
}
}
type CacheStore<K, V, S> = moka_cht::SegmentedHashMap<Arc<K>, Arc<ValueEntry<K, V>>, S>;
type CacheEntry<K, V> = (Arc<K>, Arc<ValueEntry<K, V>>);
pub(crate) struct Inner<K, V, S> {
max_capacity: usize,
cache: CacheStore<K, V, S>,
build_hasher: S,
deques: Mutex<Deques<K>>,
frequency_sketch: RwLock<FrequencySketch>,
read_op_ch: Receiver<ReadOp<K, V>>,
write_op_ch: Receiver<WriteOp<K, V>>,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
valid_after: AtomicInstant,
invalidator_enabled: bool,
invalidator: RwLock<Option<Invalidator<K, V, S>>>,
has_expiration_clock: AtomicBool,
expiration_clock: RwLock<Option<Clock>>,
}
impl<K, V, S> Inner<K, V, S>
where
K: Hash + Eq,
S: BuildHasher + Clone,
{
#[allow(clippy::too_many_arguments)]
fn new(
max_capacity: usize,
initial_capacity: Option<usize>,
build_hasher: S,
read_op_ch: Receiver<ReadOp<K, V>>,
write_op_ch: Receiver<WriteOp<K, V>>,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
invalidator_enabled: bool,
) -> Self {
let initial_capacity = initial_capacity
.map(|cap| cap + WRITE_LOG_SIZE * 4)
.unwrap_or_default();
let num_segments = 64;
let cache = moka_cht::SegmentedHashMap::with_num_segments_capacity_and_hasher(
num_segments,
initial_capacity,
build_hasher.clone(),
);
let skt_capacity = max_capacity
.try_into() .unwrap_or(u32::MAX)
.max(128);
let frequency_sketch = FrequencySketch::with_capacity(skt_capacity);
Self {
max_capacity,
cache,
build_hasher,
deques: Mutex::new(Deques::default()),
frequency_sketch: RwLock::new(frequency_sketch),
read_op_ch,
write_op_ch,
time_to_live,
time_to_idle,
valid_after: AtomicInstant::default(),
invalidator_enabled,
invalidator: RwLock::new(None),
has_expiration_clock: AtomicBool::new(false),
expiration_clock: RwLock::new(None),
}
}
fn set_invalidator(&self, self_ref: &Arc<Self>) {
*self.invalidator.write() = Some(Invalidator::new(Arc::downgrade(&Arc::clone(self_ref))));
}
#[inline]
fn hash<Q>(&self, key: &Q) -> u64
where
Arc<K>: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let mut hasher = self.build_hasher.build_hasher();
key.hash(&mut hasher);
hasher.finish()
}
#[inline]
fn get_key_value<Q>(&self, key: &Q) -> Option<CacheEntry<K, V>>
where
Arc<K>: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.cache.get_key_value(key)
}
#[inline]
fn remove<Q>(&self, key: &Q) -> Option<Arc<ValueEntry<K, V>>>
where
Arc<K>: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.cache.remove(key)
}
fn max_capacity(&self) -> usize {
self.max_capacity
}
#[inline]
fn time_to_live(&self) -> Option<Duration> {
self.time_to_live
}
#[inline]
fn time_to_idle(&self) -> Option<Duration> {
self.time_to_idle
}
#[inline]
fn has_expiry(&self) -> bool {
self.time_to_live.is_some() || self.time_to_idle.is_some()
}
#[inline]
fn is_write_order_queue_enabled(&self) -> bool {
self.time_to_live.is_some() || self.invalidator_enabled
}
#[inline]
fn valid_after(&self) -> Option<Instant> {
self.valid_after.instant()
}
#[inline]
fn set_valid_after(&self, timestamp: Instant) {
self.valid_after.set_instant(timestamp);
}
#[inline]
fn has_valid_after(&self) -> bool {
self.valid_after.is_set()
}
#[inline]
fn register_invalidation_predicate(
&self,
predicate: PredicateFun<K, V>,
registered_at: Instant,
) -> Result<PredicateId, PredicateError> {
if let Some(inv) = &*self.invalidator.read() {
inv.register_predicate(predicate, registered_at)
} else {
Err(PredicateError::InvalidationClosuresDisabled)
}
}
#[inline]
fn is_invalidated_entry(&self, key: &Arc<K>, entry: &Arc<ValueEntry<K, V>>) -> bool {
if self.invalidator_enabled {
if let Some(inv) = &*self.invalidator.read() {
return inv.apply_predicates(key, entry);
}
}
false
}
#[inline]
fn current_time_from_expiration_clock(&self) -> Instant {
if self.has_expiration_clock.load(Ordering::Relaxed) {
Instant::new(
self.expiration_clock
.read()
.as_ref()
.expect("Cannot get the expiration clock")
.now(),
)
} else {
Instant::now()
}
}
}
impl<K, V, S> GetOrRemoveEntry<K, V> for Arc<Inner<K, V, S>>
where
K: Hash + Eq,
S: BuildHasher,
{
fn get_value_entry(&self, key: &Arc<K>) -> Option<Arc<ValueEntry<K, V>>> {
self.cache.get(key)
}
fn remove_key_value_if<F>(&self, key: &Arc<K>, condition: F) -> Option<Arc<ValueEntry<K, V>>>
where
F: FnMut(&Arc<K>, &Arc<ValueEntry<K, V>>) -> bool,
{
self.cache.remove_if(key, condition)
}
}
impl<K, V, S> InnerSync for Inner<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn sync(&self, max_repeats: usize) -> Option<SyncPace> {
const EVICTION_BATCH_SIZE: usize = 500;
const INVALIDATION_BATCH_SIZE: usize = 500;
let mut deqs = self.deques.lock();
let mut calls = 0;
let mut should_sync = true;
while should_sync && calls <= max_repeats {
let r_len = self.read_op_ch.len();
if r_len > 0 {
self.apply_reads(&mut deqs, r_len);
}
let w_len = self.write_op_ch.len();
if w_len > 0 {
self.apply_writes(&mut deqs, w_len);
}
calls += 1;
should_sync = self.read_op_ch.len() >= READ_LOG_FLUSH_POINT
|| self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT;
}
if self.has_expiry() || self.has_valid_after() {
self.evict(&mut deqs, EVICTION_BATCH_SIZE);
}
if self.invalidator_enabled {
if let Some(invalidator) = &*self.invalidator.read() {
if !invalidator.is_empty() && !invalidator.is_task_running() {
self.invalidate_entries(invalidator, &mut deqs, INVALIDATION_BATCH_SIZE);
}
}
}
if should_sync {
Some(SyncPace::Fast)
} else if self.write_op_ch.len() <= WRITE_LOG_LOW_WATER_MARK {
Some(SyncPace::Normal)
} else {
None
}
}
}
impl<K, V, S> Inner<K, V, S>
where
K: Hash + Eq + Send + Sync + 'static,
V: Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn apply_reads(&self, deqs: &mut Deques<K>, count: usize) {
use ReadOp::*;
let mut freq = self.frequency_sketch.write();
let ch = &self.read_op_ch;
for _ in 0..count {
match ch.try_recv() {
Ok(Hit(hash, mut entry, timestamp)) => {
freq.increment(hash);
entry.set_last_accessed(timestamp);
deqs.move_to_back_ao(&entry)
}
Ok(Miss(hash)) => freq.increment(hash),
Err(_) => break,
}
}
}
fn apply_writes(&self, deqs: &mut Deques<K>, count: usize) {
use WriteOp::*;
let freq = self.frequency_sketch.read();
let ch = &self.write_op_ch;
let ts = self.current_time_from_expiration_clock();
for _ in 0..count {
match ch.try_recv() {
Ok(Upsert(kh, entry)) => self.handle_upsert(kh, entry, ts, deqs, &freq),
Ok(Remove(entry)) => Self::handle_remove(deqs, entry),
Err(_) => break,
};
}
}
fn handle_upsert(
&self,
kh: KeyHash<K>,
mut entry: Arc<ValueEntry<K, V>>,
timestamp: Instant,
deqs: &mut Deques<K>,
freq: &FrequencySketch,
) {
const MAX_RETRY: usize = 5;
let mut tries = 0;
let mut done = false;
entry.set_last_accessed(timestamp);
entry.set_last_modified(timestamp);
let last_accessed = entry.raw_last_accessed();
let last_modified = entry.raw_last_modified();
while tries < MAX_RETRY {
tries += 1;
if entry.is_admitted() {
deqs.move_to_back_ao(&entry);
deqs.move_to_back_wo(&entry);
} else if self.cache.len() <= self.max_capacity {
self.handle_admit(kh.clone(), &entry, last_accessed, last_modified, deqs);
} else {
let victim = match Self::find_cache_victim(deqs, freq) {
Some(node) => node,
None => {
self.handle_admit(kh.clone(), &entry, last_accessed, last_modified, deqs);
done = true;
break;
}
};
if Self::admit(kh.hash, victim, freq) {
if let Some(vic_entry) = self.cache.remove(&victim.element.key) {
Self::handle_remove(deqs, vic_entry);
} else {
let victim = NonNull::from(victim);
unsafe { deqs.probation.move_to_back(victim) };
continue; }
self.handle_admit(
kh.clone(),
&entry,
Arc::clone(&last_accessed),
Arc::clone(&last_modified),
deqs,
);
} else {
self.cache.remove(&Arc::clone(&kh.key));
}
}
done = true;
break;
}
if !done {
self.cache.remove(&Arc::clone(&kh.key));
}
}
#[inline]
fn find_cache_victim<'a>(
deqs: &'a Deques<K>,
_freq: &FrequencySketch,
) -> Option<&'a DeqNode<KeyHashDate<K>>> {
deqs.probation.peek_front()
}
#[inline]
fn admit(
candidate_hash: u64,
victim: &DeqNode<KeyHashDate<K>>,
freq: &FrequencySketch,
) -> bool {
freq.frequency(candidate_hash) > freq.frequency(victim.element.hash)
}
fn handle_admit(
&self,
kh: KeyHash<K>,
entry: &Arc<ValueEntry<K, V>>,
raw_last_accessed: Arc<AtomicInstant>,
raw_last_modified: Arc<AtomicInstant>,
deqs: &mut Deques<K>,
) {
let key = Arc::clone(&kh.key);
deqs.push_back_ao(
CacheRegion::MainProbation,
KeyHashDate::new(kh, raw_last_accessed),
entry,
);
if self.is_write_order_queue_enabled() {
deqs.push_back_wo(KeyDate::new(key, raw_last_modified), entry);
}
entry.set_is_admitted(true);
}
fn handle_remove(deqs: &mut Deques<K>, entry: Arc<ValueEntry<K, V>>) {
if entry.is_admitted() {
entry.set_is_admitted(false);
deqs.unlink_ao(&entry);
Deques::unlink_wo(&mut deqs.write_order, &entry);
}
entry.unset_q_nodes();
}
fn handle_remove_with_deques(
ao_deq_name: &str,
ao_deq: &mut Deque<KeyHashDate<K>>,
wo_deq: &mut Deque<KeyDate<K>>,
entry: Arc<ValueEntry<K, V>>,
) {
if entry.is_admitted() {
entry.set_is_admitted(false);
Deques::unlink_ao_from_deque(ao_deq_name, ao_deq, &entry);
Deques::unlink_wo(wo_deq, &entry);
}
entry.unset_q_nodes();
}
fn evict(&self, deqs: &mut Deques<K>, batch_size: usize) {
let now = self.current_time_from_expiration_clock();
if self.is_write_order_queue_enabled() {
self.remove_expired_wo(deqs, batch_size, now);
}
if self.time_to_idle.is_some() || self.has_valid_after() {
let (window, probation, protected, wo) = (
&mut deqs.window,
&mut deqs.probation,
&mut deqs.protected,
&mut deqs.write_order,
);
let mut rm_expired_ao =
|name, deq| self.remove_expired_ao(name, deq, wo, batch_size, now);
rm_expired_ao("window", window);
rm_expired_ao("probation", probation);
rm_expired_ao("protected", protected);
}
}
#[inline]
fn remove_expired_ao(
&self,
deq_name: &str,
deq: &mut Deque<KeyHashDate<K>>,
write_order_deq: &mut Deque<KeyDate<K>>,
batch_size: usize,
now: Instant,
) {
let tti = &self.time_to_idle;
let va = &self.valid_after();
for _ in 0..batch_size {
let (key, _ts) = deq
.peek_front()
.and_then(|node| {
if is_expired_entry_ao(tti, va, &*node, now) {
Some((
Some(Arc::clone(&node.element.key)),
Some(&node.element.timestamp),
))
} else {
None
}
})
.unwrap_or_default();
if key.is_none() {
break;
}
let key = key.as_ref().unwrap();
let maybe_entry = self
.cache
.remove_if(key, |_, v| is_expired_entry_ao(tti, va, v, now));
if let Some(entry) = maybe_entry {
Self::handle_remove_with_deques(deq_name, deq, write_order_deq, entry);
} else if let Some(entry) = self.cache.get(key) {
let ts = entry.last_accessed();
if ts.is_none() {
Deques::move_to_back_ao_in_deque(deq_name, deq, &entry);
Deques::move_to_back_wo_in_deque(write_order_deq, &entry);
} else {
break;
}
} else {
if let Some(node) = deq.peek_front() {
let node = NonNull::from(node);
unsafe { deq.move_to_back(node) };
}
}
}
}
#[inline]
fn remove_expired_wo(&self, deqs: &mut Deques<K>, batch_size: usize, now: Instant) {
let ttl = &self.time_to_live;
let va = &self.valid_after();
for _ in 0..batch_size {
let (key, _ts) = deqs
.write_order
.peek_front()
.and_then(|node| {
if is_expired_entry_wo(ttl, va, &*node, now) {
Some((
Some(Arc::clone(&node.element.key)),
Some(&node.element.timestamp),
))
} else {
None
}
})
.unwrap_or_default();
if key.is_none() {
break;
}
let key = key.as_ref().unwrap();
let maybe_entry = self
.cache
.remove_if(key, |_, v| is_expired_entry_wo(ttl, va, v, now));
if let Some(entry) = maybe_entry {
Self::handle_remove(deqs, entry);
} else if let Some(entry) = self.cache.get(key) {
let ts = entry.last_modified();
if ts.is_none() {
deqs.move_to_back_ao(&entry);
deqs.move_to_back_wo(&entry);
} else {
break;
}
} else {
if let Some(node) = deqs.write_order.peek_front() {
let node = NonNull::from(node);
unsafe { deqs.write_order.move_to_back(node) };
}
}
}
}
fn invalidate_entries(
&self,
invalidator: &Invalidator<K, V, S>,
deqs: &mut Deques<K>,
batch_size: usize,
) {
self.process_invalidation_result(invalidator, deqs);
self.submit_invalidation_task(invalidator, &mut deqs.write_order, batch_size);
}
fn process_invalidation_result(
&self,
invalidator: &Invalidator<K, V, S>,
deqs: &mut Deques<K>,
) {
if let Some(InvalidationResult {
invalidated,
is_done,
}) = invalidator.task_result()
{
for entry in invalidated {
Self::handle_remove(deqs, entry);
}
if is_done {
deqs.write_order.reset_cursor();
}
}
}
fn submit_invalidation_task(
&self,
invalidator: &Invalidator<K, V, S>,
write_order: &mut Deque<KeyDate<K>>,
batch_size: usize,
) {
let now = self.current_time_from_expiration_clock();
if write_order.len() == 0 {
invalidator.remove_predicates_registered_before(now);
return;
}
let mut candidates = Vec::with_capacity(batch_size);
let mut iter = write_order.peekable();
let mut len = 0;
while len < batch_size {
if let Some(kd) = iter.next() {
if let Some(ts) = kd.timestamp() {
candidates.push(KeyDateLite::new(&kd.key, ts));
len += 1;
}
} else {
break;
}
}
if len > 0 {
let is_truncated = len == batch_size && iter.peek().is_some();
invalidator.submit_task(candidates, is_truncated);
}
}
}
#[cfg(test)]
impl<K, V, S> Inner<K, V, S>
where
K: Hash + Eq,
S: BuildHasher + Clone,
{
fn len(&self) -> usize {
self.cache.len()
}
fn invalidation_predicate_count(&self) -> usize {
self.invalidator
.read()
.as_ref()
.map(|inv| inv.predicate_count())
.unwrap_or(0)
}
fn set_expiration_clock(&self, clock: Option<Clock>) {
let mut exp_clock = self.expiration_clock.write();
if let Some(clock) = clock {
*exp_clock = Some(clock);
self.has_expiration_clock.store(true, Ordering::SeqCst);
} else {
self.has_expiration_clock.store(false, Ordering::SeqCst);
*exp_clock = None;
}
}
}
#[inline]
fn is_expired_entry_ao(
time_to_idle: &Option<Duration>,
valid_after: &Option<Instant>,
entry: &impl AccessTime,
now: Instant,
) -> bool {
if let Some(ts) = entry.last_accessed() {
if let Some(va) = valid_after {
if ts < *va {
return true;
}
}
if let Some(tti) = time_to_idle {
let checked_add = ts.checked_add(*tti);
if checked_add.is_none() {
panic!("ttl overflow")
}
return checked_add.unwrap() <= now;
}
}
false
}
#[inline]
fn is_expired_entry_wo(
time_to_live: &Option<Duration>,
valid_after: &Option<Instant>,
entry: &impl AccessTime,
now: Instant,
) -> bool {
if let Some(ts) = entry.last_modified() {
if let Some(va) = valid_after {
if ts < *va {
return true;
}
}
if let Some(ttl) = time_to_live {
let checked_add = ts.checked_add(*ttl);
if checked_add.is_none() {
panic!("ttl overflow");
}
return checked_add.unwrap() <= now;
}
}
false
}
#[cfg(test)]
mod tests {
use super::BaseCache;
#[cfg_attr(target_pointer_width = "16", ignore)]
#[test]
fn test_skt_capacity_will_not_overflow() {
use std::collections::hash_map::RandomState;
let pot = |exp| 2_usize.pow(exp);
let ensure_sketch_len = |max_capacity, len, name| {
let cache = BaseCache::<u8, u8>::new(
max_capacity,
None,
RandomState::default(),
None,
None,
false,
);
assert_eq!(
cache.inner.frequency_sketch.read().table_len(),
len,
"{}",
name
);
};
if cfg!(target_pointer_width = "32") {
let pot24 = pot(24);
let pot16 = pot(16);
ensure_sketch_len(0, 128, "0");
ensure_sketch_len(128, 128, "128");
ensure_sketch_len(pot16, pot16, "pot16");
ensure_sketch_len(pot16 + 1, pot(17), "pot16 + 1");
ensure_sketch_len(pot24 - 1, pot24, "pot24 - 1");
ensure_sketch_len(pot24, pot24, "pot24");
ensure_sketch_len(pot(27), pot24, "pot(27)");
ensure_sketch_len(usize::MAX, pot24, "usize::MAX");
} else {
let pot30 = pot(30);
let pot16 = pot(16);
ensure_sketch_len(0, 128, "0");
ensure_sketch_len(128, 128, "128");
ensure_sketch_len(pot16, pot16, "pot16");
ensure_sketch_len(pot16 + 1, pot(17), "pot16 + 1");
ensure_sketch_len(pot30 - 1, pot30, "pot30- 1");
ensure_sketch_len(pot30, pot30, "pot30");
ensure_sketch_len(usize::MAX, pot30, "usize::MAX");
};
}
}