1use super::{
2 housekeeper::Housekeeper,
3 invalidator::{Invalidator, KeyDateLite, PredicateFun},
4 key_lock::{KeyLock, KeyLockMap},
5 notifier::RemovalNotifier,
6 InterruptedOp, PredicateId,
7};
8
9use crate::{
10 common::{
11 self,
12 concurrent::{
13 arc::MiniArc,
14 constants::{
15 READ_LOG_CH_SIZE, READ_LOG_FLUSH_POINT, WRITE_LOG_CH_SIZE, WRITE_LOG_FLUSH_POINT,
16 },
17 deques::Deques,
18 entry_info::EntryInfo,
19 AccessTime, KeyHash, KeyHashDate, KvEntry, OldEntryInfo, ReadOp, ValueEntry, Weigher,
20 WriteOp,
21 },
22 deque::{DeqNode, Deque},
23 frequency_sketch::FrequencySketch,
24 iter::ScanningGet,
25 time::{AtomicInstant, Clock, Instant},
26 timer_wheel::{ReschedulingResult, TimerWheel},
27 CacheRegion, HousekeeperConfig,
28 },
29 future::CancelGuard,
30 notification::{AsyncEvictionListener, RemovalCause},
31 policy::{EvictionPolicy, EvictionPolicyConfig, ExpirationPolicy},
32 Entry, Expiry, Policy, PredicateError,
33};
34
35#[cfg(feature = "unstable-debug-counters")]
36use common::concurrent::debug_counters::CacheDebugStats;
37
38use async_lock::{Mutex, MutexGuard, RwLock};
39use crossbeam_channel::{Receiver, Sender, TrySendError};
40use crossbeam_utils::atomic::AtomicCell;
41use equivalent::Equivalent;
42use futures_util::future::BoxFuture;
43use smallvec::SmallVec;
44use std::{
45 borrow::Borrow,
46 collections::hash_map::RandomState,
47 hash::{BuildHasher, Hash, Hasher},
48 sync::{
49 atomic::{AtomicBool, AtomicU8, Ordering},
50 Arc,
51 },
52 time::{Duration, Instant as StdInstant},
53};
54
55pub(crate) type HouseKeeperArc = Arc<Housekeeper>;
56
57pub(crate) struct BaseCache<K, V, S = RandomState> {
58 pub(crate) inner: Arc<Inner<K, V, S>>,
59 read_op_ch: Sender<ReadOp<K, V>>,
60 pub(crate) write_op_ch: Sender<WriteOp<K, V>>,
61 pub(crate) interrupted_op_ch_snd: Sender<InterruptedOp<K, V>>,
62 pub(crate) interrupted_op_ch_rcv: Receiver<InterruptedOp<K, V>>,
63 pub(crate) housekeeper: Option<HouseKeeperArc>,
64}
65
66impl<K, V, S> Clone for BaseCache<K, V, S> {
67 fn clone(&self) -> Self {
72 Self {
73 inner: Arc::clone(&self.inner),
74 read_op_ch: self.read_op_ch.clone(),
75 write_op_ch: self.write_op_ch.clone(),
76 interrupted_op_ch_snd: self.interrupted_op_ch_snd.clone(),
77 interrupted_op_ch_rcv: self.interrupted_op_ch_rcv.clone(),
78 housekeeper: self.housekeeper.clone(),
79 }
80 }
81}
82
83impl<K, V, S> Drop for BaseCache<K, V, S> {
84 fn drop(&mut self) {
85 std::mem::drop(self.housekeeper.take());
87 }
88}
89
90impl<K, V, S> BaseCache<K, V, S> {
91 pub(crate) fn name(&self) -> Option<&str> {
92 self.inner.name()
93 }
94
95 pub(crate) fn policy(&self) -> Policy {
96 self.inner.policy()
97 }
98
99 pub(crate) fn entry_count(&self) -> u64 {
100 self.inner.entry_count()
101 }
102
103 pub(crate) fn weighted_size(&self) -> u64 {
104 self.inner.weighted_size()
105 }
106
107 pub(crate) fn is_map_disabled(&self) -> bool {
108 self.inner.max_capacity == Some(0)
109 }
110
111 #[inline]
112 pub(crate) fn is_removal_notifier_enabled(&self) -> bool {
113 self.inner.is_removal_notifier_enabled()
114 }
115
116 #[inline]
117 pub(crate) fn current_time(&self) -> Instant {
118 self.inner.current_time()
119 }
120
121 #[inline]
122 pub(crate) fn write_op_ch_ready_event(&self) -> &event_listener::Event<()> {
123 &self.inner.write_op_ch_ready_event
124 }
125
126 pub(crate) fn notify_invalidate(
127 &self,
128 key: &Arc<K>,
129 entry: &MiniArc<ValueEntry<K, V>>,
130 ) -> BoxFuture<'static, ()>
131 where
132 K: Send + Sync + 'static,
133 V: Clone + Send + Sync + 'static,
134 {
135 self.inner.notify_invalidate(key, entry)
136 }
137
138 #[cfg(feature = "unstable-debug-counters")]
139 pub async fn debug_stats(&self) -> CacheDebugStats {
140 self.inner.debug_stats().await
141 }
142}
143
144impl<K, V, S> BaseCache<K, V, S>
145where
146 K: Hash + Eq,
147 S: BuildHasher,
148{
149 pub(crate) fn maybe_key_lock(&self, key: &Arc<K>) -> Option<KeyLock<'_, K, S>> {
150 self.inner.maybe_key_lock(key)
151 }
152}
153
154impl<K, V, S> BaseCache<K, V, S>
155where
156 K: Hash + Eq + Send + Sync + 'static,
157 V: Clone + Send + Sync + 'static,
158 S: BuildHasher + Clone + Send + Sync + 'static,
159{
160 #[allow(clippy::too_many_arguments)]
162 pub(crate) fn new(
163 name: Option<String>,
164 max_capacity: Option<u64>,
165 initial_capacity: Option<usize>,
166 build_hasher: S,
167 weigher: Option<Weigher<K, V>>,
168 eviction_policy: EvictionPolicy,
169 eviction_listener: Option<AsyncEvictionListener<K, V>>,
170 expiration_policy: ExpirationPolicy<K, V>,
171 housekeeper_config: HousekeeperConfig,
172 invalidator_enabled: bool,
173 clock: Clock,
174 ) -> Self {
175 let (r_size, w_size) = if max_capacity == Some(0) {
176 (0, 0)
177 } else {
178 (READ_LOG_CH_SIZE, WRITE_LOG_CH_SIZE)
179 };
180 let is_eviction_listener_enabled = eviction_listener.is_some();
181 let fast_now = clock.fast_now();
182
183 let (r_snd, r_rcv) = crossbeam_channel::bounded(r_size);
184 let (w_snd, w_rcv) = crossbeam_channel::bounded(w_size);
185 let (i_snd, i_rcv) = crossbeam_channel::unbounded();
186
187 let inner = Arc::new(Inner::new(
188 name,
189 max_capacity,
190 initial_capacity,
191 build_hasher,
192 weigher,
193 eviction_policy,
194 eviction_listener,
195 r_rcv,
196 w_rcv,
197 expiration_policy,
198 invalidator_enabled,
199 clock,
200 ));
201
202 Self {
203 inner,
204 read_op_ch: r_snd,
205 write_op_ch: w_snd,
206 interrupted_op_ch_snd: i_snd,
207 interrupted_op_ch_rcv: i_rcv,
208 housekeeper: Some(Arc::new(Housekeeper::new(
209 is_eviction_listener_enabled,
210 housekeeper_config,
211 fast_now,
212 ))),
213 }
214 }
215
216 #[inline]
217 pub(crate) fn hash<Q>(&self, key: &Q) -> u64
218 where
219 Q: Equivalent<K> + Hash + ?Sized,
220 {
221 self.inner.hash(key)
222 }
223
224 pub(crate) fn contains_key_with_hash<Q>(&self, key: &Q, hash: u64) -> bool
225 where
226 Q: Equivalent<K> + Hash + ?Sized,
227 {
228 self.inner
230 .get_key_value_and(key, hash, |k, entry| {
231 let i = &self.inner;
232 let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
233 let now = self.current_time();
234
235 !is_expired_by_per_entry_ttl(entry.entry_info(), now)
236 && !is_expired_entry_wo(ttl, va, entry, now)
237 && !is_expired_entry_ao(tti, va, entry, now)
238 && !i.is_invalidated_entry(k, entry)
239 })
240 .unwrap_or_default() }
242
243 pub(crate) async fn get_with_hash<Q, I>(
244 &self,
245 key: &Q,
246 hash: u64,
247 mut ignore_if: Option<&mut I>,
248 need_key: bool,
249 record_read: bool,
250 ) -> Option<Entry<K, V>>
251 where
252 Q: Equivalent<K> + Hash + ?Sized,
253 I: FnMut(&V) -> bool,
254 {
255 if self.is_map_disabled() {
256 return None;
257 }
258
259 if record_read {
260 self.retry_interrupted_ops().await;
261 }
262
263 let mut now = self.current_time();
264
265 let maybe_kv_and_op = self
266 .inner
267 .get_key_value_and_then(key, hash, move |k, entry| {
268 if let Some(ignore_if) = &mut ignore_if {
269 if ignore_if(&entry.value) {
270 return None;
272 }
273 }
274
275 let i = &self.inner;
276 let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
277
278 if is_expired_by_per_entry_ttl(entry.entry_info(), now)
279 || is_expired_entry_wo(ttl, va, entry, now)
280 || is_expired_entry_ao(tti, va, entry, now)
281 || i.is_invalidated_entry(k, entry)
282 {
283 None
285 } else {
286 let mut is_expiry_modified = false;
288
289 if let Some(expiry) = &self.inner.expiration_policy.expiry() {
291 let lm = entry.last_modified().expect("Last modified is not set");
292 now = now.max(lm);
297
298 let lm = self.inner.clock().to_std_instant(lm);
301
302 is_expiry_modified = Self::expire_after_read_or_update(
322 |k, v, t, d| expiry.expire_after_read(k, v, t, d, lm),
323 &entry.entry_info().key_hash().key,
324 entry,
325 self.inner.expiration_policy.time_to_live(),
326 self.inner.expiration_policy.time_to_idle(),
327 now,
328 self.inner.clock(),
329 );
330 }
331
332 entry.set_last_accessed(now);
333
334 let maybe_key = if need_key { Some(Arc::clone(k)) } else { None };
335 let ent = Entry::new(maybe_key, entry.value.clone(), false, false);
336 let maybe_op = if record_read {
337 Some(ReadOp::Hit {
338 value_entry: MiniArc::clone(entry),
339 is_expiry_modified,
340 })
341 } else {
342 None
343 };
344
345 Some((ent, maybe_op, now))
346 }
347 });
348
349 if let Some((ent, maybe_op, now)) = maybe_kv_and_op {
350 if let Some(op) = maybe_op {
351 self.record_read_op(op, now)
352 .await
353 .expect("Failed to record a get op");
354 }
355 Some(ent)
356 } else {
357 if record_read {
358 self.record_read_op(ReadOp::Miss(hash), now)
359 .await
360 .expect("Failed to record a get op");
361 }
362 None
363 }
364 }
365
366 pub(crate) fn get_key_with_hash<Q>(&self, key: &Q, hash: u64) -> Option<Arc<K>>
367 where
368 Q: Equivalent<K> + Hash + ?Sized,
369 {
370 self.inner
371 .get_key_value_and(key, hash, |k, _entry| Arc::clone(k))
372 }
373
374 #[inline]
375 pub(crate) fn remove_entry<Q>(&self, key: &Q, hash: u64) -> Option<KvEntry<K, V>>
376 where
377 Q: Equivalent<K> + Hash + ?Sized,
378 {
379 self.inner.remove_entry(key, hash)
380 }
381
382 #[inline]
383 pub(crate) async fn apply_reads_writes_if_needed(
384 inner: &Arc<Inner<K, V, S>>,
385 ch: &Sender<WriteOp<K, V>>,
386 now: Instant,
387 housekeeper: Option<&HouseKeeperArc>,
388 ) {
389 let w_len = ch.len();
390
391 if let Some(hk) = housekeeper {
392 if Self::should_apply_writes(hk, w_len, now) {
393 hk.try_run_pending_tasks(inner).await;
394 }
395 }
396 }
397
398 pub(crate) fn invalidate_all(&self) {
399 let now = self.current_time();
400 self.inner.set_valid_after(now);
401 }
402
403 pub(crate) fn invalidate_entries_if(
404 &self,
405 predicate: PredicateFun<K, V>,
406 ) -> Result<PredicateId, PredicateError> {
407 let now = self.current_time();
408 self.inner.register_invalidation_predicate(predicate, now)
409 }
410}
411
412impl<K, V, S> ScanningGet<K, V> for BaseCache<K, V, S>
416where
417 K: Hash + Eq + Send + Sync + 'static,
418 V: Clone + Send + Sync + 'static,
419 S: BuildHasher + Clone + Send + Sync + 'static,
420{
421 fn num_cht_segments(&self) -> usize {
422 self.inner.num_cht_segments()
423 }
424
425 fn scanning_get(&self, key: &Arc<K>) -> Option<V> {
426 let hash = self.hash(&**key);
427 self.inner.get_key_value_and_then(&**key, hash, |k, entry| {
428 let i = &self.inner;
429 let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
430 let now = self.current_time();
431
432 if is_expired_by_per_entry_ttl(entry.entry_info(), now)
433 || is_expired_entry_wo(ttl, va, entry, now)
434 || is_expired_entry_ao(tti, va, entry, now)
435 || i.is_invalidated_entry(k, entry)
436 {
437 None
439 } else {
440 Some(entry.value.clone())
442 }
443 })
444 }
445
446 fn keys(&self, cht_segment: usize) -> Option<Vec<Arc<K>>> {
447 self.inner.keys(cht_segment)
448 }
449}
450
451impl<K, V, S> BaseCache<K, V, S>
455where
456 K: Hash + Eq + Send + Sync + 'static,
457 V: Clone + Send + Sync + 'static,
458 S: BuildHasher + Clone + Send + Sync + 'static,
459{
460 #[inline]
461 async fn record_read_op(
462 &self,
463 op: ReadOp<K, V>,
464 now: Instant,
465 ) -> Result<(), TrySendError<ReadOp<K, V>>> {
466 self.apply_reads_if_needed(&self.inner, now).await;
467 let ch = &self.read_op_ch;
468 match ch.try_send(op) {
469 Ok(()) | Err(TrySendError::Full(_)) => Ok(()),
471 Err(e @ TrySendError::Disconnected(_)) => Err(e),
472 }
473 }
474
475 #[inline]
476 pub(crate) async fn do_insert_with_hash(
477 &self,
478 key: Arc<K>,
479 hash: u64,
480 value: V,
481 ) -> (WriteOp<K, V>, Instant) {
482 self.retry_interrupted_ops().await;
483
484 let weight = self.inner.weigh(&key, &value);
485 let op_cnt1 = Arc::new(AtomicU8::new(0));
486 let op_cnt2 = Arc::clone(&op_cnt1);
487 let mut op1 = None;
488 let mut op2 = None;
489
490 let kl = self.maybe_key_lock(&key);
492 let _klg = if let Some(lock) = &kl {
493 Some(lock.lock().await)
494 } else {
495 None
496 };
497
498 let ts = self.current_time();
499
500 self.inner.cache.insert_with_or_modify(
513 Arc::clone(&key),
514 hash,
515 || {
517 let (entry, gen) = self.new_value_entry(&key, hash, value.clone(), ts, weight);
518 let ins_op = WriteOp::new_upsert(&key, hash, &entry, gen, 0, weight);
519 let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed);
520 op1 = Some((cnt, ins_op));
521 entry
522 },
523 |_k, old_entry| {
525 let old_weight = old_entry.policy_weight();
526
527 let old_info = OldEntryInfo::new(old_entry);
531 let (entry, gen) = self.new_value_entry_from(value.clone(), ts, weight, old_entry);
532 let upd_op = WriteOp::new_upsert(&key, hash, &entry, gen, old_weight, weight);
533 let cnt = op_cnt2.fetch_add(1, Ordering::Relaxed);
534 op2 = Some((cnt, old_info, upd_op));
535 entry
536 },
537 );
538
539 match (op1, op2) {
540 (Some((_cnt, ins_op)), None) => self.do_post_insert_steps(ts, &key, ins_op),
541 (Some((cnt1, ins_op)), Some((cnt2, ..))) if cnt1 > cnt2 => {
542 self.do_post_insert_steps(ts, &key, ins_op)
543 }
544 (_, Some((_cnt, old_entry, upd_op))) => {
545 self.do_post_update_steps(ts, key, old_entry, upd_op, &self.interrupted_op_ch_snd)
546 .await
547 }
548 (None, None) => unreachable!(),
549 }
550 }
551
552 fn do_post_insert_steps(
553 &self,
554 ts: Instant,
555 key: &Arc<K>,
556 ins_op: WriteOp<K, V>,
557 ) -> (WriteOp<K, V>, Instant) {
558 if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
559 (&self.inner.expiration_policy.expiry(), &ins_op)
560 {
561 Self::expire_after_create(expiry, key, value_entry, ts, self.inner.clock());
562 }
563 (ins_op, ts)
564 }
565
566 async fn do_post_update_steps(
567 &self,
568 ts: Instant,
569 key: Arc<K>,
570 old_info: OldEntryInfo<K, V>,
571 upd_op: WriteOp<K, V>,
572 interrupted_op_ch: &Sender<InterruptedOp<K, V>>,
573 ) -> (WriteOp<K, V>, Instant) {
574 use futures_util::FutureExt;
575
576 if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
577 (&self.inner.expiration_policy.expiry(), &upd_op)
578 {
579 Self::expire_after_read_or_update(
580 |k, v, t, d| expiry.expire_after_update(k, v, t, d),
581 &key,
582 value_entry,
583 self.inner.expiration_policy.time_to_live(),
584 self.inner.expiration_policy.time_to_idle(),
585 ts,
586 self.inner.clock(),
587 );
588 }
589
590 if self.is_removal_notifier_enabled() {
591 let future = self
592 .inner
593 .notify_upsert(
594 key,
595 &old_info.entry,
596 old_info.last_accessed,
597 old_info.last_modified,
598 )
599 .shared();
600 let mut cancel_guard = CancelGuard::new(interrupted_op_ch, ts);
607 cancel_guard.set_future_and_op(future.clone(), upd_op.clone());
608
609 future.await;
611 cancel_guard.clear();
612 }
613
614 crossbeam_epoch::pin().flush();
615 (upd_op, ts)
616 }
617
618 #[inline]
619 pub(crate) async fn schedule_write_op(
620 inner: &Arc<Inner<K, V, S>>,
621 ch: &Sender<WriteOp<K, V>>,
622 ch_ready_event: &event_listener::Event<()>,
623 op: WriteOp<K, V>,
624 ts: Instant,
625 housekeeper: Option<&HouseKeeperArc>,
626 _should_block: bool,
628 ) -> Result<(), TrySendError<WriteOp<K, V>>> {
629 #[cfg(test)]
631 if _should_block {
632 let mutex = Mutex::new(());
634 let _guard = mutex.lock().await;
635 mutex.lock().await;
637 }
638
639 let mut op = op;
640 let mut spin_loop_attempts = 0u8;
641 loop {
642 BaseCache::<K, V, S>::apply_reads_writes_if_needed(inner, ch, ts, housekeeper).await;
644
645 match ch.try_send(op) {
647 Ok(()) => return Ok(()),
648 Err(TrySendError::Full(op1)) => {
649 op = op1;
650 }
651 Err(e @ TrySendError::Disconnected(_)) => return Err(e),
652 }
653
654 if spin_loop_attempts < 4 {
657 spin_loop_attempts += 1;
658 const SPIN_COUNT: usize = if cfg!(target_arch = "x86_64") { 8 } else { 32 };
663 for _ in 0..SPIN_COUNT {
664 std::hint::spin_loop();
665 }
666 } else {
667 spin_loop_attempts = 0;
668
669 ch_ready_event.listen().await;
680
681 }
684 }
685 }
686
687 pub(crate) async fn retry_interrupted_ops(&self) {
688 while let Ok(op) = self.interrupted_op_ch_rcv.try_recv() {
689 let mut cancel_guard;
695
696 match op {
698 InterruptedOp::CallEvictionListener { ts, future, op } => {
699 cancel_guard = CancelGuard::new(&self.interrupted_op_ch_snd, ts);
700 cancel_guard.set_future_and_op(future.clone(), op);
701 future.await;
704 cancel_guard.unset_future();
706 }
707 InterruptedOp::SendWriteOp { ts, op } => {
708 cancel_guard = CancelGuard::new(&self.interrupted_op_ch_snd, ts);
709 cancel_guard.set_op(op);
710 }
711 }
712
713 let ts = cancel_guard.ts;
715 let event = self.write_op_ch_ready_event();
716 let op = cancel_guard.op.as_ref().cloned().unwrap();
717 let hk = self.housekeeper.as_ref();
718 Self::schedule_write_op(&self.inner, &self.write_op_ch, event, op, ts, hk, false)
719 .await
720 .expect("Failed to reschedule a write op");
721
722 cancel_guard.clear();
725 }
726 }
727
728 #[inline]
729 async fn apply_reads_if_needed(&self, inner: &Arc<Inner<K, V, S>>, now: Instant) {
730 let len = self.read_op_ch.len();
731
732 if let Some(hk) = &self.housekeeper {
733 if Self::should_apply_reads(hk, len, now) {
734 hk.try_run_pending_tasks(inner).await;
735 }
736 }
737 }
738
739 #[inline]
740 fn should_apply_reads(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool {
741 hk.should_apply_reads(ch_len, now)
742 }
743
744 #[inline]
745 fn should_apply_writes(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool {
746 hk.should_apply_writes(ch_len, now)
747 }
748}
749
750impl<K, V, S> BaseCache<K, V, S> {
751 #[inline]
752 fn new_value_entry(
753 &self,
754 key: &Arc<K>,
755 hash: u64,
756 value: V,
757 timestamp: Instant,
758 policy_weight: u32,
759 ) -> (MiniArc<ValueEntry<K, V>>, u16) {
760 let key_hash = KeyHash::new(Arc::clone(key), hash);
761 let info = MiniArc::new(EntryInfo::new(key_hash, timestamp, policy_weight));
762 let gen: u16 = info.entry_gen();
763 (MiniArc::new(ValueEntry::new(value, info)), gen)
764 }
765
766 #[inline]
767 fn new_value_entry_from(
768 &self,
769 value: V,
770 timestamp: Instant,
771 policy_weight: u32,
772 other: &ValueEntry<K, V>,
773 ) -> (MiniArc<ValueEntry<K, V>>, u16) {
774 let info = MiniArc::clone(other.entry_info());
775 let gen = info.incr_entry_gen();
778 info.set_last_accessed(timestamp);
779 info.set_last_modified(timestamp);
780 info.set_policy_weight(policy_weight);
781 (MiniArc::new(ValueEntry::new_from(value, info, other)), gen)
782 }
783
784 fn expire_after_create(
785 expiry: &Arc<dyn Expiry<K, V> + Send + Sync + 'static>,
786 key: &K,
787 value_entry: &ValueEntry<K, V>,
788 ts: Instant,
789 clock: &Clock,
790 ) {
791 let duration =
792 expiry.expire_after_create(key, &value_entry.value, clock.to_std_instant(ts));
793 let expiration_time = duration.map(|duration| ts.saturating_add(duration));
794 value_entry
795 .entry_info()
796 .set_expiration_time(expiration_time);
797 }
798
799 fn expire_after_read_or_update(
800 expiry: impl FnOnce(&K, &V, StdInstant, Option<Duration>) -> Option<Duration>,
801 key: &K,
802 value_entry: &ValueEntry<K, V>,
803 ttl: Option<Duration>,
804 tti: Option<Duration>,
805 ts: Instant,
806 clock: &Clock,
807 ) -> bool {
808 let current_time = clock.to_std_instant(ts);
809 let ei = &value_entry.entry_info();
810
811 let exp_time = IntoIterator::into_iter([
812 ei.expiration_time(),
813 ttl.and_then(|dur| ei.last_modified().map(|ts| ts.saturating_add(dur))),
814 tti.and_then(|dur| ei.last_accessed().map(|ts| ts.saturating_add(dur))),
815 ])
816 .flatten()
817 .min();
818
819 let current_duration = exp_time.and_then(|time| {
820 let std_time = clock.to_std_instant(time);
821 std_time.checked_duration_since(current_time)
822 });
823
824 let duration = expiry(key, &value_entry.value, current_time, current_duration);
825
826 if duration != current_duration {
827 let expiration_time = duration.map(|duration| ts.saturating_add(duration));
828 value_entry
829 .entry_info()
830 .set_expiration_time(expiration_time);
831 true
833 } else {
834 false
835 }
836 }
837}
838
839#[cfg(test)]
843impl<K, V, S> BaseCache<K, V, S>
844where
845 K: Hash + Eq + Send + Sync + 'static,
846 V: Clone + Send + Sync + 'static,
847 S: BuildHasher + Clone + Send + Sync + 'static,
848{
849 pub(crate) fn invalidation_predicate_count(&self) -> usize {
850 self.inner.invalidation_predicate_count()
851 }
852
853 pub(crate) async fn reconfigure_for_testing(&mut self) {
854 self.inner.enable_frequency_sketch_for_testing().await;
856 if let Some(hk) = &self.housekeeper {
858 hk.disable_auto_run();
859 }
860 }
861
862 pub(crate) fn key_locks_map_is_empty(&self) -> bool {
863 self.inner.key_locks_map_is_empty()
864 }
865}
866
867struct EvictionState<'a, K, V> {
868 counters: EvictionCounters,
869 notifier: Option<&'a Arc<RemovalNotifier<K, V>>>,
870 more_entries_to_evict: bool,
871}
872
873impl<'a, K, V> EvictionState<'a, K, V> {
874 fn new(
875 entry_count: u64,
876 weighted_size: u64,
877 notifier: Option<&'a Arc<RemovalNotifier<K, V>>>,
878 ) -> Self {
879 Self {
880 counters: EvictionCounters::new(entry_count, weighted_size),
881 notifier,
882 more_entries_to_evict: false,
883 }
884 }
885
886 fn is_notifier_enabled(&self) -> bool {
887 self.notifier.is_some()
888 }
889
890 async fn notify_entry_removal(
891 &mut self,
892 key: Arc<K>,
893 entry: &MiniArc<ValueEntry<K, V>>,
894 cause: RemovalCause,
895 ) where
896 K: Send + Sync + 'static,
897 V: Clone + Send + Sync + 'static,
898 {
899 if let Some(notifier) = self.notifier {
900 notifier.notify(key, entry.value.clone(), cause).await;
901 } else {
902 panic!("notify_entry_removal is called when the notification is disabled");
903 }
904 }
905}
906
907struct EvictionCounters {
908 entry_count: u64,
909 weighted_size: u64,
910 eviction_count: u64,
911}
912
913impl EvictionCounters {
914 #[inline]
915 fn new(entry_count: u64, weighted_size: u64) -> Self {
916 Self {
917 entry_count,
918 weighted_size,
919 eviction_count: 0,
920 }
921 }
922
923 #[inline]
924 fn saturating_add(&mut self, entry_count: u64, weight: u32) {
925 self.entry_count += entry_count;
926 let total = &mut self.weighted_size;
927 *total = total.saturating_add(weight as u64);
928 }
929
930 #[inline]
931 fn saturating_sub(&mut self, entry_count: u64, weight: u32) {
932 self.entry_count -= entry_count;
933 let total = &mut self.weighted_size;
934 *total = total.saturating_sub(weight as u64);
935 }
936
937 #[inline]
938 fn incr_eviction_count(&mut self) {
939 let count = &mut self.eviction_count;
940 *count = count.saturating_add(1);
941 }
942}
943
944#[derive(Default)]
945struct EntrySizeAndFrequency {
946 policy_weight: u64,
947 freq: u32,
948}
949
950impl EntrySizeAndFrequency {
951 fn new(policy_weight: u32) -> Self {
952 Self {
953 policy_weight: policy_weight as u64,
954 ..Default::default()
955 }
956 }
957
958 fn add_policy_weight(&mut self, weight: u32) {
959 self.policy_weight += weight as u64;
960 }
961
962 fn add_frequency(&mut self, freq: &FrequencySketch, hash: u64) {
963 self.freq += freq.frequency(hash) as u32;
964 }
965}
966
967#[allow(clippy::large_enum_variant)]
975enum AdmissionResult<K> {
976 Admitted {
977 victim_keys: SmallVec<[(KeyHash<K>, Option<Instant>); 8]>,
979 },
980 Rejected,
981}
982
983type CacheStore<K, V, S> = crate::cht::SegmentedHashMap<Arc<K>, MiniArc<ValueEntry<K, V>>, S>;
984
985pub(crate) struct Inner<K, V, S> {
986 name: Option<String>,
987 max_capacity: Option<u64>,
988 entry_count: AtomicCell<u64>,
989 weighted_size: AtomicCell<u64>,
990 pub(crate) cache: CacheStore<K, V, S>,
991 build_hasher: S,
992 deques: Mutex<Deques<K>>,
993 timer_wheel: Mutex<TimerWheel<K>>,
994 frequency_sketch: RwLock<FrequencySketch>,
995 frequency_sketch_enabled: AtomicBool,
996 read_op_ch: Receiver<ReadOp<K, V>>,
997 write_op_ch: Receiver<WriteOp<K, V>>,
998 pub(crate) write_op_ch_ready_event: event_listener::Event,
999 eviction_policy: EvictionPolicyConfig,
1000 expiration_policy: ExpirationPolicy<K, V>,
1001 valid_after: AtomicInstant,
1002 weigher: Option<Weigher<K, V>>,
1003 removal_notifier: Option<Arc<RemovalNotifier<K, V>>>,
1004 key_locks: Option<KeyLockMap<K, S>>,
1005 invalidator: Option<Invalidator<K, V, S>>,
1006 clock: Clock,
1007}
1008
1009impl<K, V, S> Drop for Inner<K, V, S> {
1010 fn drop(&mut self) {
1011 for _ in 0..128 {
1014 crossbeam_epoch::pin().flush();
1015 }
1016
1017 }
1022}
1023
1024impl<K, V, S> Inner<K, V, S> {
1029 fn name(&self) -> Option<&str> {
1030 self.name.as_deref()
1031 }
1032
1033 fn policy(&self) -> Policy {
1034 let exp = &self.expiration_policy;
1035 Policy::new(self.max_capacity, 1, exp.time_to_live(), exp.time_to_idle())
1036 }
1037
1038 #[inline]
1039 fn entry_count(&self) -> u64 {
1040 self.entry_count.load()
1041 }
1042
1043 #[inline]
1044 fn weighted_size(&self) -> u64 {
1045 self.weighted_size.load()
1046 }
1047
1048 #[inline]
1049 pub(crate) fn is_removal_notifier_enabled(&self) -> bool {
1050 self.removal_notifier.is_some()
1051 }
1052
1053 #[cfg(feature = "unstable-debug-counters")]
1054 pub async fn debug_stats(&self) -> CacheDebugStats {
1055 let ec = self.entry_count.load();
1056 let ws = self.weighted_size.load();
1057
1058 CacheDebugStats::new(
1059 ec,
1060 ws,
1061 (self.cache.capacity() * 2) as u64,
1062 self.frequency_sketch.read().await.table_size(),
1063 )
1064 }
1065
1066 pub(crate) fn maybe_key_lock(&self, key: &Arc<K>) -> Option<KeyLock<'_, K, S>>
1067 where
1068 K: Hash + Eq,
1069 S: BuildHasher,
1070 {
1071 self.key_locks.as_ref().map(|kls| kls.key_lock(key))
1072 }
1073
1074 #[inline]
1075 pub(crate) fn current_time(&self) -> Instant {
1076 self.clock.now()
1077 }
1078
1079 fn clock(&self) -> &Clock {
1080 &self.clock
1081 }
1082
1083 fn num_cht_segments(&self) -> usize {
1084 self.cache.actual_num_segments()
1085 }
1086
1087 #[inline]
1088 fn time_to_live(&self) -> Option<Duration> {
1089 self.expiration_policy.time_to_live()
1090 }
1091
1092 #[inline]
1093 fn time_to_idle(&self) -> Option<Duration> {
1094 self.expiration_policy.time_to_idle()
1095 }
1096
1097 #[inline]
1098 fn has_expiry(&self) -> bool {
1099 let exp = &self.expiration_policy;
1100 exp.time_to_live().is_some() || exp.time_to_idle().is_some()
1101 }
1102
1103 #[inline]
1104 fn is_write_order_queue_enabled(&self) -> bool {
1105 self.expiration_policy.time_to_live().is_some() || self.invalidator.is_some()
1106 }
1107
1108 #[inline]
1109 fn valid_after(&self) -> Option<Instant> {
1110 self.valid_after.instant()
1111 }
1112
1113 #[inline]
1114 fn set_valid_after(&self, timestamp: Instant) {
1115 self.valid_after.set_instant(timestamp);
1116 }
1117
1118 #[inline]
1119 fn has_valid_after(&self) -> bool {
1120 self.valid_after.is_set()
1121 }
1122}
1123
1124impl<K, V, S> Inner<K, V, S>
1125where
1126 K: Hash + Eq + Send + Sync + 'static,
1127 V: Send + Sync + 'static,
1128 S: BuildHasher + Send + Sync + Clone + 'static,
1129{
1130 #[allow(clippy::too_many_arguments)]
1133 fn new(
1134 name: Option<String>,
1135 max_capacity: Option<u64>,
1136 initial_capacity: Option<usize>,
1137 build_hasher: S,
1138 weigher: Option<Weigher<K, V>>,
1139 eviction_policy: EvictionPolicy,
1140 eviction_listener: Option<AsyncEvictionListener<K, V>>,
1141 read_op_ch: Receiver<ReadOp<K, V>>,
1142 write_op_ch: Receiver<WriteOp<K, V>>,
1143 expiration_policy: ExpirationPolicy<K, V>,
1144 invalidator_enabled: bool,
1145 clock: Clock,
1146 ) -> Self {
1147 let (num_segments, initial_capacity) = if max_capacity == Some(0) {
1150 (1, 0)
1151 } else {
1152 let ic = initial_capacity
1153 .map(|cap| cap + WRITE_LOG_CH_SIZE)
1154 .unwrap_or_default();
1155 (64, ic)
1156 };
1157 let cache = crate::cht::SegmentedHashMap::with_num_segments_capacity_and_hasher(
1158 num_segments,
1159 initial_capacity,
1160 build_hasher.clone(),
1161 );
1162
1163 let now = clock.now();
1164 let timer_wheel = Mutex::new(TimerWheel::new(now));
1165
1166 let (removal_notifier, key_locks) = if let Some(listener) = eviction_listener {
1167 let rn = Arc::new(RemovalNotifier::new(listener, name.clone()));
1168 let kl = KeyLockMap::with_hasher(build_hasher.clone());
1169 (Some(rn), Some(kl))
1170 } else {
1171 (None, None)
1172 };
1173 let invalidator = if invalidator_enabled {
1174 Some(Invalidator::new(build_hasher.clone()))
1175 } else {
1176 None
1177 };
1178
1179 Self {
1180 name,
1181 max_capacity,
1182 entry_count: AtomicCell::default(),
1183 weighted_size: AtomicCell::default(),
1184 cache,
1185 build_hasher,
1186 deques: Mutex::default(),
1187 timer_wheel,
1188 frequency_sketch: RwLock::new(FrequencySketch::default()),
1189 frequency_sketch_enabled: AtomicBool::default(),
1190 read_op_ch,
1191 write_op_ch,
1192 write_op_ch_ready_event: event_listener::Event::default(),
1193 eviction_policy: eviction_policy.config,
1194 expiration_policy,
1195 valid_after: AtomicInstant::default(),
1196 weigher,
1197 removal_notifier,
1198 key_locks,
1199 invalidator,
1200 clock,
1201 }
1202 }
1203
1204 #[inline]
1205 fn hash<Q>(&self, key: &Q) -> u64
1206 where
1207 Q: Equivalent<K> + Hash + ?Sized,
1208 {
1209 let mut hasher = self.build_hasher.build_hasher();
1210 key.hash(&mut hasher);
1211 hasher.finish()
1212 }
1213
1214 #[inline]
1215 fn get_key_value_and<Q, F, T>(&self, key: &Q, hash: u64, with_entry: F) -> Option<T>
1216 where
1217 Q: Equivalent<K> + Hash + ?Sized,
1218 F: FnOnce(&Arc<K>, &MiniArc<ValueEntry<K, V>>) -> T,
1219 {
1220 self.cache
1221 .get_key_value_and(hash, |k| key.equivalent(k as &K), with_entry)
1222 }
1223
1224 #[inline]
1225 fn get_key_value_and_then<Q, F, T>(&self, key: &Q, hash: u64, with_entry: F) -> Option<T>
1226 where
1227 Q: Equivalent<K> + Hash + ?Sized,
1228 F: FnOnce(&Arc<K>, &MiniArc<ValueEntry<K, V>>) -> Option<T>,
1229 {
1230 self.cache
1231 .get_key_value_and_then(hash, |k| key.equivalent(k as &K), with_entry)
1232 }
1233
1234 #[inline]
1235 fn remove_entry<Q>(&self, key: &Q, hash: u64) -> Option<KvEntry<K, V>>
1236 where
1237 Q: Equivalent<K> + Hash + ?Sized,
1238 {
1239 self.cache
1240 .remove_entry(hash, |k| key.equivalent(k as &K))
1241 .map(|(key, entry)| KvEntry::new(key, entry))
1242 }
1243
1244 fn keys(&self, cht_segment: usize) -> Option<Vec<Arc<K>>> {
1245 self.cache.keys(cht_segment, Arc::clone)
1251 }
1252
1253 #[inline]
1254 fn register_invalidation_predicate(
1255 &self,
1256 predicate: PredicateFun<K, V>,
1257 registered_at: Instant,
1258 ) -> Result<PredicateId, PredicateError> {
1259 if let Some(inv) = &self.invalidator {
1260 inv.register_predicate(predicate, registered_at)
1261 } else {
1262 Err(PredicateError::InvalidationClosuresDisabled)
1263 }
1264 }
1265
1266 #[inline]
1268 fn is_invalidated_entry(&self, key: &Arc<K>, entry: &MiniArc<ValueEntry<K, V>>) -> bool
1269 where
1270 V: Clone,
1271 {
1272 if let Some(inv) = &self.invalidator {
1273 return inv.apply_predicates(key, entry);
1274 }
1275 false
1276 }
1277
1278 #[inline]
1279 fn weigh(&self, key: &K, value: &V) -> u32 {
1280 self.weigher.as_ref().map_or(1, |w| w(key, value))
1281 }
1282}
1283
1284impl<K, V, S> Inner<K, V, S>
1285where
1286 K: Hash + Eq + Send + Sync + 'static,
1287 V: Clone + Send + Sync + 'static,
1288 S: BuildHasher + Clone + Send + Sync + 'static,
1289{
1290 pub(crate) async fn do_run_pending_tasks(
1292 &self,
1293 timeout: Option<Duration>,
1294 max_log_sync_repeats: u32,
1295 eviction_batch_size: u32,
1296 ) -> bool {
1297 if self.max_capacity == Some(0) {
1298 return false;
1299 }
1300
1301 let mut deqs = self.deques.lock().await;
1303 let mut timer_wheel = self.timer_wheel.lock().await;
1304
1305 let started_at = if timeout.is_some() {
1306 Some(self.current_time())
1307 } else {
1308 None
1309 };
1310 let mut should_process_logs = true;
1311 let mut calls = 0u32;
1312 let current_ec = self.entry_count.load();
1313 let current_ws = self.weighted_size.load();
1314 let mut eviction_state =
1315 EvictionState::new(current_ec, current_ws, self.removal_notifier.as_ref());
1316
1317 loop {
1318 if should_process_logs {
1319 let r_len = self.read_op_ch.len();
1320 if r_len > 0 {
1321 self.apply_reads(&mut deqs, &mut timer_wheel, r_len).await;
1322 }
1323
1324 let w_len = self.write_op_ch.len();
1325 if w_len > 0 {
1326 self.apply_writes(&mut deqs, &mut timer_wheel, w_len, &mut eviction_state)
1327 .await;
1328 }
1329
1330 if self.eviction_policy == EvictionPolicyConfig::TinyLfu
1331 && self.should_enable_frequency_sketch(&eviction_state.counters)
1332 {
1333 self.enable_frequency_sketch(&eviction_state.counters).await;
1334 }
1335
1336 let listeners = self.write_op_ch_ready_event.total_listeners();
1339 if listeners > 0 {
1340 let n = listeners.min(WRITE_LOG_CH_SIZE - self.write_op_ch.len());
1341 self.write_op_ch_ready_event.notify(n);
1344 }
1345
1346 calls += 1;
1347 }
1348
1349 eviction_state.more_entries_to_evict = false;
1353 let last_eviction_count = eviction_state.counters.eviction_count;
1354
1355 if timer_wheel.is_enabled() {
1358 self.evict_expired_entries_using_timers(
1359 &mut timer_wheel,
1360 &mut deqs,
1361 &mut eviction_state,
1362 )
1363 .await;
1364 }
1365
1366 if self.has_expiry() || self.has_valid_after() {
1369 self.evict_expired_entries_using_deqs(
1370 &mut deqs,
1371 &mut timer_wheel,
1372 eviction_batch_size,
1373 &mut eviction_state,
1374 )
1375 .await;
1376 }
1377
1378 if let Some(invalidator) = &self.invalidator {
1381 if !invalidator.is_empty() {
1382 self.invalidate_entries(
1383 invalidator,
1384 &mut deqs,
1385 &mut timer_wheel,
1386 eviction_batch_size,
1387 &mut eviction_state,
1388 )
1389 .await;
1390 }
1391 }
1392
1393 let weights_to_evict = self.weights_to_evict(&eviction_state.counters);
1395 if weights_to_evict > 0 {
1396 self.evict_lru_entries(
1397 &mut deqs,
1398 &mut timer_wheel,
1399 eviction_batch_size,
1400 weights_to_evict,
1401 &mut eviction_state,
1402 )
1403 .await;
1404 }
1405
1406 should_process_logs = calls <= max_log_sync_repeats
1409 && (self.read_op_ch.len() >= READ_LOG_FLUSH_POINT
1410 || self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT);
1411
1412 let should_evict_more_entries = eviction_state.more_entries_to_evict
1413 && (eviction_state.counters.eviction_count - last_eviction_count) > 0;
1415
1416 if !should_process_logs && !should_evict_more_entries {
1418 break;
1419 }
1420
1421 if let (Some(to), Some(started)) = (timeout, started_at) {
1424 let elapsed = self.current_time().saturating_duration_since(started);
1425 if elapsed >= to {
1426 break;
1427 }
1428 }
1429 }
1430
1431 debug_assert_eq!(self.entry_count.load(), current_ec);
1432 debug_assert_eq!(self.weighted_size.load(), current_ws);
1433 self.entry_count.store(eviction_state.counters.entry_count);
1434 self.weighted_size
1435 .store(eviction_state.counters.weighted_size);
1436
1437 crossbeam_epoch::pin().flush();
1438
1439 drop(deqs);
1441
1442 eviction_state.more_entries_to_evict
1443 }
1444}
1445
1446impl<K, V, S> Inner<K, V, S>
1450where
1451 K: Hash + Eq + Send + Sync + 'static,
1452 V: Send + Sync + 'static,
1453 S: BuildHasher + Clone + Send + Sync + 'static,
1454{
1455 fn has_enough_capacity(&self, candidate_weight: u32, counters: &EvictionCounters) -> bool {
1456 self.max_capacity.map_or(true, |limit| {
1457 counters.weighted_size + candidate_weight as u64 <= limit
1458 })
1459 }
1460
1461 fn weights_to_evict(&self, counters: &EvictionCounters) -> u64 {
1462 self.max_capacity
1463 .map(|limit| counters.weighted_size.saturating_sub(limit))
1464 .unwrap_or_default()
1465 }
1466
1467 #[inline]
1468 fn should_enable_frequency_sketch(&self, counters: &EvictionCounters) -> bool {
1469 match self.max_capacity {
1470 None | Some(0) => false,
1471 Some(max_cap) => {
1472 if self.frequency_sketch_enabled.load(Ordering::Acquire) {
1473 false } else {
1475 counters.weighted_size >= max_cap / 2
1476 }
1477 }
1478 }
1479 }
1480
1481 #[inline]
1482 async fn enable_frequency_sketch(&self, counters: &EvictionCounters) {
1483 if let Some(max_cap) = self.max_capacity {
1484 let c = counters;
1485 let cap = if self.weigher.is_none() {
1486 max_cap
1487 } else {
1488 (c.entry_count as f64 * (c.weighted_size as f64 / max_cap as f64)) as u64
1489 };
1490 self.do_enable_frequency_sketch(cap).await;
1491 }
1492 }
1493
1494 #[cfg(test)]
1495 async fn enable_frequency_sketch_for_testing(&self) {
1496 if let Some(max_cap) = self.max_capacity {
1497 self.do_enable_frequency_sketch(max_cap).await;
1498 }
1499 }
1500
1501 #[inline]
1502 async fn do_enable_frequency_sketch(&self, cache_capacity: u64) {
1503 let skt_capacity = common::sketch_capacity(cache_capacity);
1504 self.frequency_sketch
1505 .write()
1506 .await
1507 .ensure_capacity(skt_capacity);
1508 self.frequency_sketch_enabled.store(true, Ordering::Release);
1509 }
1510
1511 async fn apply_reads(
1512 &self,
1513 deqs: &mut Deques<K>,
1514 timer_wheel: &mut TimerWheel<K>,
1515 count: usize,
1516 ) {
1517 use ReadOp::{Hit, Miss};
1518 let mut freq = self.frequency_sketch.write().await;
1519 let ch = &self.read_op_ch;
1520 for _ in 0..count {
1521 match ch.try_recv() {
1522 Ok(Hit {
1523 value_entry,
1524 is_expiry_modified,
1525 }) => {
1526 let kh = value_entry.entry_info().key_hash();
1527 freq.increment(kh.hash);
1528 if is_expiry_modified {
1529 self.update_timer_wheel(&value_entry, timer_wheel);
1530 }
1531 deqs.move_to_back_ao(&value_entry);
1532 }
1533 Ok(Miss(hash)) => freq.increment(hash),
1534 Err(_) => break,
1535 }
1536 }
1537 }
1538
1539 async fn apply_writes(
1540 &self,
1541 deqs: &mut Deques<K>,
1542 timer_wheel: &mut TimerWheel<K>,
1543 count: usize,
1544 eviction_state: &mut EvictionState<'_, K, V>,
1545 ) where
1546 V: Clone,
1547 {
1548 use WriteOp::{Remove, Upsert};
1549 let freq = self.frequency_sketch.read().await;
1550 let ch = &self.write_op_ch;
1551
1552 for _ in 0..count {
1553 match ch.try_recv() {
1554 Ok(Upsert {
1555 key_hash: kh,
1556 value_entry: entry,
1557 entry_gen: gen,
1558 old_weight,
1559 new_weight,
1560 }) => {
1561 self.handle_upsert(
1562 kh,
1563 entry,
1564 gen,
1565 old_weight,
1566 new_weight,
1567 deqs,
1568 timer_wheel,
1569 &freq,
1570 eviction_state,
1571 )
1572 .await;
1573 }
1574 Ok(Remove {
1575 kv_entry: KvEntry { key: _key, entry },
1576 entry_gen: gen,
1577 }) => {
1578 Self::handle_remove(
1579 deqs,
1580 timer_wheel,
1581 entry,
1582 Some(gen),
1583 &mut eviction_state.counters,
1584 );
1585 }
1586 Err(_) => break,
1587 };
1588 }
1589 }
1590
1591 #[allow(clippy::too_many_arguments)]
1592 async fn handle_upsert(
1593 &self,
1594 kh: KeyHash<K>,
1595 entry: MiniArc<ValueEntry<K, V>>,
1596 gen: u16,
1597 old_weight: u32,
1598 new_weight: u32,
1599 deqs: &mut Deques<K>,
1600 timer_wheel: &mut TimerWheel<K>,
1601 freq: &FrequencySketch,
1602 eviction_state: &mut EvictionState<'_, K, V>,
1603 ) where
1604 V: Clone,
1605 {
1606 {
1607 let counters = &mut eviction_state.counters;
1608
1609 if entry.is_admitted() {
1610 counters.saturating_sub(0, old_weight);
1612 counters.saturating_add(0, new_weight);
1613 self.update_timer_wheel(&entry, timer_wheel);
1614 deqs.move_to_back_ao(&entry);
1615 deqs.move_to_back_wo(&entry);
1616 entry.entry_info().set_policy_gen(gen);
1617 return;
1618 }
1619
1620 if self.has_enough_capacity(new_weight, counters) {
1621 self.handle_admit(&entry, new_weight, deqs, timer_wheel, counters);
1624 entry.entry_info().set_policy_gen(gen);
1625 return;
1626 }
1627 }
1628
1629 if let Some(max) = self.max_capacity {
1630 if new_weight as u64 > max {
1631 let kl = self.maybe_key_lock(&kh.key);
1635 let _klg = if let Some(lock) = &kl {
1636 Some(lock.lock().await)
1637 } else {
1638 None
1639 };
1640
1641 let removed = self.cache.remove_if(
1642 kh.hash,
1643 |k| k == &kh.key,
1644 |_, current_entry| {
1645 MiniArc::ptr_eq(entry.entry_info(), current_entry.entry_info())
1646 && current_entry.entry_info().entry_gen() == gen
1647 },
1648 );
1649 if let Some(entry) = removed {
1650 if eviction_state.is_notifier_enabled() {
1651 let key = Arc::clone(&kh.key);
1652 eviction_state
1653 .notify_entry_removal(key, &entry, RemovalCause::Size)
1654 .await;
1655 }
1656 eviction_state.counters.incr_eviction_count();
1657 }
1658 entry.entry_info().set_policy_gen(gen);
1659 return;
1660 }
1661 }
1662
1663 let admission_result = match &self.eviction_policy {
1668 EvictionPolicyConfig::TinyLfu => {
1669 let mut candidate = EntrySizeAndFrequency::new(new_weight);
1670 candidate.add_frequency(freq, kh.hash);
1671 Self::admit(&candidate, &self.cache, deqs, freq)
1672 }
1673 EvictionPolicyConfig::Lru => AdmissionResult::Admitted {
1674 victim_keys: SmallVec::default(),
1675 },
1676 };
1677
1678 match admission_result {
1679 AdmissionResult::Admitted { victim_keys } => {
1680 for (vic_kh, vic_la) in victim_keys {
1682 let vic_key = vic_kh.key;
1683 let vic_hash = vic_kh.hash;
1684
1685 let kl = self.maybe_key_lock(&vic_key);
1687 let _klg = if let Some(lock) = &kl {
1688 Some(lock.lock().await)
1689 } else {
1690 None
1691 };
1692
1693 if let Some((vic_key, vic_entry)) = self.cache.remove_entry_if_and(
1694 vic_hash,
1695 |k| k == &vic_key,
1696 |_, entry| entry.entry_info().last_accessed() == vic_la,
1697 |k, v| (k.clone(), v.clone()),
1698 ) {
1699 if eviction_state.is_notifier_enabled() {
1700 eviction_state
1701 .notify_entry_removal(vic_key, &vic_entry, RemovalCause::Size)
1702 .await;
1703 }
1704 eviction_state.counters.incr_eviction_count();
1705
1706 Self::handle_remove(
1708 deqs,
1709 timer_wheel,
1710 vic_entry,
1711 None,
1712 &mut eviction_state.counters,
1713 );
1714 } else {
1715 if let Some(node) = deqs.probation.peek_front() {
1718 if node.element.key() == &vic_key && node.element.hash() == vic_hash {
1719 deqs.probation.move_front_to_back();
1720 }
1721 }
1722 }
1723 }
1724 self.handle_admit(
1726 &entry,
1727 new_weight,
1728 deqs,
1729 timer_wheel,
1730 &mut eviction_state.counters,
1731 );
1732 entry.entry_info().set_policy_gen(gen);
1733 }
1734 AdmissionResult::Rejected => {
1735 let kl = self.maybe_key_lock(&kh.key);
1737 let _klg = if let Some(lock) = &kl {
1738 Some(lock.lock().await)
1739 } else {
1740 None
1741 };
1742
1743 let key = Arc::clone(&kh.key);
1746 let removed = self.cache.remove_if(
1747 kh.hash,
1748 |k| k == &key,
1749 |_, current_entry| {
1750 MiniArc::ptr_eq(entry.entry_info(), current_entry.entry_info())
1751 && current_entry.entry_info().entry_gen() == gen
1752 },
1753 );
1754
1755 if let Some(entry) = removed {
1756 entry.entry_info().set_policy_gen(gen);
1757 if eviction_state.is_notifier_enabled() {
1758 eviction_state
1759 .notify_entry_removal(key, &entry, RemovalCause::Size)
1760 .await;
1761 }
1762 eviction_state.counters.incr_eviction_count();
1763 }
1764 }
1765 }
1766 }
1767
1768 #[inline]
1786 fn admit(
1787 candidate: &EntrySizeAndFrequency,
1788 cache: &CacheStore<K, V, S>,
1789 deqs: &mut Deques<K>,
1790 freq: &FrequencySketch,
1791 ) -> AdmissionResult<K> {
1792 const MAX_CONSECUTIVE_RETRIES: usize = 5;
1793 let mut retries = 0;
1794
1795 let mut victims = EntrySizeAndFrequency::default();
1796 let mut victim_keys = SmallVec::default();
1797
1798 let deq = &mut deqs.probation;
1799
1800 let mut next_victim = deq.peek_front_ptr();
1802
1803 while victims.policy_weight < candidate.policy_weight
1805 && victims.freq <= candidate.freq
1806 && retries <= MAX_CONSECUTIVE_RETRIES
1807 {
1808 let Some(victim) = next_victim.take() else {
1809 break;
1811 };
1812 next_victim = DeqNode::next_node_ptr(victim);
1813
1814 let vic_elem = &unsafe { victim.as_ref() }.element;
1815 if vic_elem.is_dirty() {
1816 unsafe { deq.move_to_back(victim) };
1818 retries += 1;
1819 continue;
1820 }
1821
1822 let key = vic_elem.key();
1823 let hash = vic_elem.hash();
1824 let last_accessed = vic_elem.entry_info().last_accessed();
1825
1826 if let Some(vic_entry) = cache.get(hash, |k| k == key) {
1827 victims.add_policy_weight(vic_entry.policy_weight());
1828 victims.add_frequency(freq, hash);
1829 victim_keys.push((KeyHash::new(Arc::clone(key), hash), last_accessed));
1830 retries = 0;
1831 } else {
1832 unsafe { deq.move_to_back(victim) };
1836 retries += 1;
1837 }
1838 }
1839
1840 if victims.policy_weight >= candidate.policy_weight && candidate.freq > victims.freq {
1846 AdmissionResult::Admitted { victim_keys }
1847 } else {
1848 AdmissionResult::Rejected
1849 }
1850 }
1851
1852 fn handle_admit(
1853 &self,
1854 entry: &MiniArc<ValueEntry<K, V>>,
1855 policy_weight: u32,
1856 deqs: &mut Deques<K>,
1857 timer_wheel: &mut TimerWheel<K>,
1858 counters: &mut EvictionCounters,
1859 ) {
1860 counters.saturating_add(1, policy_weight);
1861
1862 self.update_timer_wheel(entry, timer_wheel);
1863
1864 deqs.push_back_ao(
1866 CacheRegion::MainProbation,
1867 KeyHashDate::new(entry.entry_info()),
1868 entry,
1869 );
1870 if self.is_write_order_queue_enabled() {
1871 deqs.push_back_wo(KeyHashDate::new(entry.entry_info()), entry);
1872 }
1873 entry.set_admitted(true);
1874 }
1875
1876 fn update_timer_wheel(
1878 &self,
1879 entry: &MiniArc<ValueEntry<K, V>>,
1880 timer_wheel: &mut TimerWheel<K>,
1881 ) {
1882 if entry.entry_info().expiration_time().is_some() && !timer_wheel.is_enabled() {
1884 timer_wheel.enable();
1885 }
1886
1887 match (
1889 entry.entry_info().expiration_time().is_some(),
1890 entry.timer_node(),
1891 ) {
1892 (false, None) => (),
1895 (true, None) => {
1898 let timer = timer_wheel.schedule(
1899 MiniArc::clone(entry.entry_info()),
1900 MiniArc::clone(entry.deq_nodes()),
1901 );
1902 entry.set_timer_node(timer);
1903 }
1904 (true, Some(tn)) => {
1907 let result = timer_wheel.reschedule(tn);
1908 if let ReschedulingResult::Removed(removed_tn) = result {
1909 entry.set_timer_node(None);
1913 drop(removed_tn);
1914 }
1915 }
1916 (false, Some(tn)) => {
1919 entry.set_timer_node(None);
1920 timer_wheel.deschedule(tn);
1921 }
1922 }
1923 }
1924
1925 fn handle_remove(
1926 deqs: &mut Deques<K>,
1927 timer_wheel: &mut TimerWheel<K>,
1928 entry: MiniArc<ValueEntry<K, V>>,
1929 gen: Option<u16>,
1930 counters: &mut EvictionCounters,
1931 ) {
1932 if let Some(timer_node) = entry.take_timer_node() {
1933 timer_wheel.deschedule(timer_node);
1934 }
1935 Self::handle_remove_without_timer_wheel(deqs, entry, gen, counters);
1936 }
1937
1938 fn handle_remove_without_timer_wheel(
1939 deqs: &mut Deques<K>,
1940 entry: MiniArc<ValueEntry<K, V>>,
1941 gen: Option<u16>,
1942 counters: &mut EvictionCounters,
1943 ) {
1944 if entry.is_admitted() {
1945 entry.set_admitted(false);
1946 counters.saturating_sub(1, entry.policy_weight());
1947 deqs.unlink_ao(&entry);
1949 Deques::unlink_wo(&mut deqs.write_order, &entry);
1950 } else {
1951 entry.unset_q_nodes();
1952 }
1953 if let Some(g) = gen {
1954 entry.entry_info().set_policy_gen(g);
1955 }
1956 }
1957
1958 fn handle_remove_with_deques(
1959 ao_deq_name: &str,
1960 ao_deq: &mut Deque<KeyHashDate<K>>,
1961 wo_deq: &mut Deque<KeyHashDate<K>>,
1962 timer_wheel: &mut TimerWheel<K>,
1963 entry: MiniArc<ValueEntry<K, V>>,
1964 counters: &mut EvictionCounters,
1965 ) {
1966 if let Some(timer) = entry.take_timer_node() {
1967 timer_wheel.deschedule(timer);
1968 }
1969 if entry.is_admitted() {
1970 entry.set_admitted(false);
1971 counters.saturating_sub(1, entry.policy_weight());
1972 Deques::unlink_ao_from_deque(ao_deq_name, ao_deq, &entry);
1974 Deques::unlink_wo(wo_deq, &entry);
1975 } else {
1976 entry.unset_q_nodes();
1977 }
1978 }
1979
1980 async fn evict_expired_entries_using_timers(
1981 &self,
1982 timer_wheel: &mut TimerWheel<K>,
1983 deqs: &mut Deques<K>,
1984 eviction_state: &mut EvictionState<'_, K, V>,
1985 ) where
1986 V: Clone,
1987 {
1988 use crate::common::timer_wheel::TimerEvent;
1989
1990 let now = self.current_time();
1991
1992 let expired_keys = timer_wheel
1995 .advance(now)
1996 .filter_map(|event| {
1997 if let TimerEvent::Expired(node) = event {
2000 let entry_info = node.element.entry_info();
2001 let kh = entry_info.key_hash();
2002 Some((Arc::clone(&kh.key), kh.hash, entry_info.is_dirty()))
2003 } else {
2004 None
2005 }
2006 })
2007 .collect::<Vec<_>>();
2008
2009 for (key, hash, is_dirty) in expired_keys {
2019 if is_dirty {
2020 continue;
2023 }
2024
2025 let kl = self.maybe_key_lock(&key);
2027 let _klg = if let Some(lock) = &kl {
2028 Some(lock.lock().await)
2029 } else {
2030 None
2031 };
2032
2033 let maybe_entry = self.cache.remove_if(
2035 hash,
2036 |k| k == &key,
2037 |_, v| is_expired_by_per_entry_ttl(v.entry_info(), now),
2038 );
2039
2040 if let Some(entry) = maybe_entry {
2041 if eviction_state.is_notifier_enabled() {
2042 eviction_state
2043 .notify_entry_removal(key, &entry, RemovalCause::Expired)
2044 .await;
2045 }
2046 eviction_state.counters.incr_eviction_count();
2047 Self::handle_remove_without_timer_wheel(
2048 deqs,
2049 entry,
2050 None,
2051 &mut eviction_state.counters,
2052 );
2053 } else {
2054 }
2057 }
2058 }
2059
2060 async fn evict_expired_entries_using_deqs(
2061 &self,
2062 deqs: &mut MutexGuard<'_, Deques<K>>,
2063 timer_wheel: &mut TimerWheel<K>,
2064 batch_size: u32,
2065 state: &mut EvictionState<'_, K, V>,
2066 ) where
2067 V: Clone,
2068 {
2069 use CacheRegion::{MainProbation as Probation, MainProtected as Protected, Window};
2070
2071 let now = self.current_time();
2072
2073 if self.is_write_order_queue_enabled() {
2074 self.remove_expired_wo(deqs, timer_wheel, batch_size, now, state)
2075 .await;
2076 }
2077
2078 if self.expiration_policy.time_to_idle().is_some() || self.has_valid_after() {
2079 self.remove_expired_ao(Window, deqs, timer_wheel, batch_size, now, state)
2080 .await;
2081 self.remove_expired_ao(Probation, deqs, timer_wheel, batch_size, now, state)
2082 .await;
2083 self.remove_expired_ao(Protected, deqs, timer_wheel, batch_size, now, state)
2084 .await;
2085 }
2086 }
2087
2088 #[allow(clippy::too_many_arguments)]
2089 #[inline]
2090 async fn remove_expired_ao(
2091 &self,
2092 cache_region: CacheRegion,
2093 deqs: &mut Deques<K>,
2094 timer_wheel: &mut TimerWheel<K>,
2095 batch_size: u32,
2096 now: Instant,
2097 eviction_state: &mut EvictionState<'_, K, V>,
2098 ) where
2099 V: Clone,
2100 {
2101 let tti = &self.expiration_policy.time_to_idle();
2102 let va = &self.valid_after();
2103 let deq_name = cache_region.name();
2104 let mut more_to_evict = true;
2105
2106 for _ in 0..batch_size {
2107 let maybe_key_hash_ts = deqs.select_mut(cache_region).0.peek_front().map(|node| {
2108 let elem = &node.element;
2109 (
2110 Arc::clone(elem.key()),
2111 elem.hash(),
2112 elem.is_dirty(),
2113 elem.last_accessed(),
2114 )
2115 });
2116
2117 let (key, hash, cause) = match maybe_key_hash_ts {
2118 Some((key, hash, false, Some(ts))) => {
2119 let cause = match is_entry_expired_ao_or_invalid(tti, va, ts, now) {
2120 (true, _) => RemovalCause::Expired,
2121 (false, true) => RemovalCause::Explicit,
2122 (false, false) => {
2123 more_to_evict = false;
2124 break;
2125 }
2126 };
2127 (key, hash, cause)
2128 }
2129 Some((key, hash, true, _) | (key, hash, false, None)) => {
2133 let (ao_deq, wo_deq) = deqs.select_mut(cache_region);
2137 self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
2138 more_to_evict = false;
2142 continue;
2143 }
2144 None => {
2145 more_to_evict = false;
2146 break;
2147 }
2148 };
2149
2150 let kl = self.maybe_key_lock(&key);
2152 let _klg = if let Some(lock) = &kl {
2153 Some(lock.lock().await)
2154 } else {
2155 None
2156 };
2157
2158 let maybe_entry = self.cache.remove_if(
2163 hash,
2164 |k| k == &key,
2165 |_, v| is_expired_entry_ao(tti, va, v, now),
2166 );
2167
2168 if let Some(entry) = maybe_entry {
2169 if eviction_state.is_notifier_enabled() {
2170 eviction_state
2171 .notify_entry_removal(key, &entry, cause)
2172 .await;
2173 }
2174 eviction_state.counters.incr_eviction_count();
2175 let (ao_deq, wo_deq) = deqs.select_mut(cache_region);
2176 Self::handle_remove_with_deques(
2177 deq_name,
2178 ao_deq,
2179 wo_deq,
2180 timer_wheel,
2181 entry,
2182 &mut eviction_state.counters,
2183 );
2184 } else {
2185 let (ao_deq, wo_deq) = deqs.select_mut(cache_region);
2186 self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
2187 more_to_evict = false;
2188 }
2189 }
2190
2191 if more_to_evict {
2192 eviction_state.more_entries_to_evict = true;
2193 }
2194 }
2195
2196 #[inline]
2197 fn skip_updated_entry_ao(
2198 &self,
2199 key: &K,
2200 hash: u64,
2201 deq_name: &str,
2202 deq: &mut Deque<KeyHashDate<K>>,
2203 write_order_deq: &mut Deque<KeyHashDate<K>>,
2204 ) {
2205 if let Some(entry) = self.cache.get(hash, |k| (k.borrow() as &K) == key) {
2206 Deques::move_to_back_ao_in_deque(deq_name, deq, &entry);
2209 if entry.is_dirty() {
2210 Deques::move_to_back_wo_in_deque(write_order_deq, &entry);
2211 }
2212 } else {
2213 deq.move_front_to_back();
2218 }
2219 }
2220
2221 #[inline]
2222 fn skip_updated_entry_wo(&self, key: &K, hash: u64, deqs: &mut Deques<K>) {
2223 if let Some(entry) = self.cache.get(hash, |k| (k.borrow() as &K) == key) {
2224 deqs.move_to_back_ao(&entry);
2227 deqs.move_to_back_wo(&entry);
2228 } else {
2229 deqs.write_order.move_front_to_back();
2234 }
2235 }
2236
2237 #[inline]
2238 async fn remove_expired_wo(
2239 &self,
2240 deqs: &mut Deques<K>,
2241 timer_wheel: &mut TimerWheel<K>,
2242 batch_size: u32,
2243 now: Instant,
2244 eviction_state: &mut EvictionState<'_, K, V>,
2245 ) where
2246 V: Clone,
2247 {
2248 let ttl = &self.expiration_policy.time_to_live();
2249 let va = &self.valid_after();
2250 let mut more_to_evict = true;
2251
2252 for _ in 0..batch_size {
2253 let maybe_key_hash_ts = deqs.write_order.peek_front().map(|node| {
2254 let elem = &node.element;
2255 (
2256 Arc::clone(elem.key()),
2257 elem.hash(),
2258 elem.is_dirty(),
2259 elem.last_modified(),
2260 )
2261 });
2262
2263 let (key, hash, cause) = match maybe_key_hash_ts {
2264 Some((key, hash, false, Some(ts))) => {
2265 let cause = match is_entry_expired_wo_or_invalid(ttl, va, ts, now) {
2266 (true, _) => RemovalCause::Expired,
2267 (false, true) => RemovalCause::Explicit,
2268 (false, false) => {
2269 more_to_evict = false;
2270 break;
2271 }
2272 };
2273 (key, hash, cause)
2274 }
2275 Some((key, hash, true, _) | (key, hash, false, None)) => {
2279 self.skip_updated_entry_wo(&key, hash, deqs);
2283 more_to_evict = false;
2287 continue;
2288 }
2289 None => {
2290 more_to_evict = false;
2291 break;
2292 }
2293 };
2294
2295 let kl = self.maybe_key_lock(&key);
2297 let _klg = if let Some(lock) = &kl {
2298 Some(lock.lock().await)
2299 } else {
2300 None
2301 };
2302
2303 let maybe_entry = self.cache.remove_if(
2304 hash,
2305 |k| k == &key,
2306 |_, v| is_expired_entry_wo(ttl, va, v, now),
2307 );
2308
2309 if let Some(entry) = maybe_entry {
2310 if eviction_state.is_notifier_enabled() {
2311 eviction_state
2312 .notify_entry_removal(key, &entry, cause)
2313 .await;
2314 }
2315 eviction_state.counters.incr_eviction_count();
2316 Self::handle_remove(deqs, timer_wheel, entry, None, &mut eviction_state.counters);
2317 } else {
2318 self.skip_updated_entry_wo(&key, hash, deqs);
2319 more_to_evict = false;
2320 }
2321 }
2322
2323 if more_to_evict {
2324 eviction_state.more_entries_to_evict = true;
2325 }
2326 }
2327
2328 async fn invalidate_entries(
2329 &self,
2330 invalidator: &Invalidator<K, V, S>,
2331 deqs: &mut Deques<K>,
2332 timer_wheel: &mut TimerWheel<K>,
2333 batch_size: u32,
2334 eviction_state: &mut EvictionState<'_, K, V>,
2335 ) where
2336 V: Clone,
2337 {
2338 let now = self.current_time();
2339
2340 if deqs.write_order.len() == 0 {
2343 invalidator.remove_predicates_registered_before(now);
2344 return;
2345 }
2346
2347 let mut candidates = Vec::default();
2348 let mut len = 0;
2349 let has_next;
2350 {
2351 let iter = &mut deqs.write_order.peekable();
2352
2353 while len < batch_size {
2354 if let Some(kd) = iter.next() {
2355 if !kd.is_dirty() {
2356 if let Some(ts) = kd.last_modified() {
2357 let key = kd.key();
2358 let hash = self.hash(&**key);
2359 candidates.push(KeyDateLite::new(key, hash, ts));
2360 len += 1;
2361 }
2362 }
2363 } else {
2364 break;
2365 }
2366 }
2367
2368 has_next = iter.peek().is_some();
2369 }
2370
2371 if len == 0 {
2372 return;
2373 }
2374
2375 let is_truncated = len == batch_size && has_next;
2376 let (invalidated, is_done) = invalidator
2377 .scan_and_invalidate(self, candidates, is_truncated)
2378 .await;
2379
2380 for KvEntry { key: _key, entry } in invalidated {
2381 Self::handle_remove(deqs, timer_wheel, entry, None, &mut eviction_state.counters);
2382 }
2383 if is_done {
2384 deqs.write_order.reset_cursor();
2385 }
2386 if !invalidator.is_empty() {
2387 eviction_state.more_entries_to_evict = true;
2388 }
2389 }
2390
2391 async fn evict_lru_entries(
2392 &self,
2393 deqs: &mut Deques<K>,
2394 timer_wheel: &mut TimerWheel<K>,
2395 batch_size: u32,
2396 weights_to_evict: u64,
2397 eviction_state: &mut EvictionState<'_, K, V>,
2398 ) where
2399 V: Clone,
2400 {
2401 const CACHE_REGION: CacheRegion = CacheRegion::MainProbation;
2402 let deq_name = CACHE_REGION.name();
2403 let mut evicted = 0u64;
2404 let mut more_to_evict = true;
2405
2406 for _ in 0..batch_size {
2407 if evicted >= weights_to_evict {
2408 more_to_evict = false;
2409 break;
2410 }
2411
2412 let maybe_key_hash_ts = deqs.select_mut(CACHE_REGION).0.peek_front().map(|node| {
2413 let entry_info = node.element.entry_info();
2414 (
2415 Arc::clone(node.element.key()),
2416 node.element.hash(),
2417 entry_info.is_dirty(),
2418 entry_info.last_accessed(),
2419 )
2420 });
2421
2422 let (key, hash, ts) = match maybe_key_hash_ts {
2423 Some((key, hash, false, Some(ts))) => (key, hash, ts),
2424 Some((key, hash, true, _) | (key, hash, false, None)) => {
2428 let (ao_deq, wo_deq) = deqs.select_mut(CACHE_REGION);
2432 self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
2433 more_to_evict = false;
2437 continue;
2438 }
2439 None => {
2440 more_to_evict = false;
2441 break;
2442 }
2443 };
2444
2445 let kl = self.maybe_key_lock(&key);
2447 let _klg = if let Some(lock) = &kl {
2448 Some(lock.lock().await)
2449 } else {
2450 None
2451 };
2452
2453 let maybe_entry = self.cache.remove_if(
2454 hash,
2455 |k| k == &key,
2456 |_, v| {
2457 if let Some(la) = v.last_accessed() {
2458 la == ts
2459 } else {
2460 false
2461 }
2462 },
2463 );
2464
2465 if let Some(entry) = maybe_entry {
2466 if eviction_state.is_notifier_enabled() {
2467 eviction_state
2468 .notify_entry_removal(key, &entry, RemovalCause::Size)
2469 .await;
2470 }
2471 eviction_state.counters.incr_eviction_count();
2472 let weight = entry.policy_weight();
2473 let (deq, write_order_deq) = deqs.select_mut(CacheRegion::MainProbation);
2474 Self::handle_remove_with_deques(
2475 deq_name,
2476 deq,
2477 write_order_deq,
2478 timer_wheel,
2479 entry,
2480 &mut eviction_state.counters,
2481 );
2482 evicted = evicted.saturating_add(weight as u64);
2483 } else {
2484 let (ao_deq, wo_deq) = deqs.select_mut(CacheRegion::MainProbation);
2485 self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
2486 more_to_evict = false;
2487 }
2488 }
2489
2490 if more_to_evict {
2491 eviction_state.more_entries_to_evict = true;
2492 }
2493 }
2494}
2495
2496impl<K, V, S> Inner<K, V, S>
2497where
2498 K: Send + Sync + 'static,
2499 V: Clone + Send + Sync + 'static,
2500{
2501 pub(crate) async fn notify_single_removal(
2502 &self,
2503 key: Arc<K>,
2504 entry: &MiniArc<ValueEntry<K, V>>,
2505 cause: RemovalCause,
2506 ) {
2507 if let Some(notifier) = &self.removal_notifier {
2508 notifier.notify(key, entry.value.clone(), cause).await;
2509 }
2510 }
2511
2512 #[inline]
2513 fn notify_upsert(
2514 &self,
2515 key: Arc<K>,
2516 entry: &MiniArc<ValueEntry<K, V>>,
2517 last_accessed: Option<Instant>,
2518 last_modified: Option<Instant>,
2519 ) -> BoxFuture<'static, ()> {
2520 use futures_util::future::FutureExt;
2521
2522 let now = self.current_time();
2523 let exp = &self.expiration_policy;
2524
2525 let mut cause = RemovalCause::Replaced;
2526
2527 if let Some(last_accessed) = last_accessed {
2528 if is_expired_by_tti(&exp.time_to_idle(), last_accessed, now) {
2529 cause = RemovalCause::Expired;
2530 }
2531 }
2532
2533 if let Some(last_modified) = last_modified {
2534 if is_expired_by_ttl(&exp.time_to_live(), last_modified, now) {
2535 cause = RemovalCause::Expired;
2536 } else if is_invalid_entry(&self.valid_after(), last_modified) {
2537 cause = RemovalCause::Explicit;
2538 }
2539 }
2540
2541 if let Some(notifier) = &self.removal_notifier {
2542 let notifier = Arc::clone(notifier);
2543 let value = entry.value.clone();
2544 async move {
2545 notifier.notify(key, value, cause).await;
2546 }
2547 .boxed()
2548 } else {
2549 std::future::ready(()).boxed()
2550 }
2551 }
2552
2553 #[inline]
2554 fn notify_invalidate(
2555 &self,
2556 key: &Arc<K>,
2557 entry: &MiniArc<ValueEntry<K, V>>,
2558 ) -> BoxFuture<'static, ()> {
2559 use futures_util::future::FutureExt;
2560
2561 let now = self.current_time();
2562 let exp = &self.expiration_policy;
2563
2564 let mut cause = RemovalCause::Explicit;
2565
2566 if let Some(last_accessed) = entry.last_accessed() {
2567 if is_expired_by_tti(&exp.time_to_idle(), last_accessed, now) {
2568 cause = RemovalCause::Expired;
2569 }
2570 }
2571
2572 if let Some(last_modified) = entry.last_modified() {
2573 if is_expired_by_ttl(&exp.time_to_live(), last_modified, now) {
2574 cause = RemovalCause::Expired;
2575 }
2576 }
2577
2578 if let Some(notifier) = &self.removal_notifier {
2579 let notifier = Arc::clone(notifier);
2580 let key = Arc::clone(key);
2581 let value = entry.value.clone();
2582 async move { notifier.notify(key, value, cause).await }.boxed()
2583 } else {
2584 std::future::ready(()).boxed()
2585 }
2586 }
2587}
2588
2589#[cfg(test)]
2593impl<K, V, S> Inner<K, V, S>
2594where
2595 K: Hash + Eq,
2596 S: BuildHasher + Clone,
2597{
2598 fn invalidation_predicate_count(&self) -> usize {
2599 if let Some(inv) = &self.invalidator {
2600 inv.predicate_count()
2601 } else {
2602 0
2603 }
2604 }
2605
2606 fn key_locks_map_is_empty(&self) -> bool {
2607 self.key_locks
2608 .as_ref()
2609 .map(|m| m.is_empty())
2610 .unwrap_or(true)
2612 }
2613}
2614
2615#[inline]
2621fn is_expired_by_per_entry_ttl<K>(entry_info: &MiniArc<EntryInfo<K>>, now: Instant) -> bool {
2622 if let Some(ts) = entry_info.expiration_time() {
2623 ts <= now
2624 } else {
2625 false
2626 }
2627}
2628
2629#[inline]
2634fn is_expired_entry_ao(
2635 time_to_idle: &Option<Duration>,
2636 valid_after: &Option<Instant>,
2637 entry: &impl AccessTime,
2638 now: Instant,
2639) -> bool {
2640 if let Some(ts) = entry.last_accessed() {
2641 is_invalid_entry(valid_after, ts) || is_expired_by_tti(time_to_idle, ts, now)
2642 } else {
2643 false
2644 }
2645}
2646
2647#[inline]
2652fn is_expired_entry_wo(
2653 time_to_live: &Option<Duration>,
2654 valid_after: &Option<Instant>,
2655 entry: &impl AccessTime,
2656 now: Instant,
2657) -> bool {
2658 if let Some(ts) = entry.last_modified() {
2659 is_invalid_entry(valid_after, ts) || is_expired_by_ttl(time_to_live, ts, now)
2660 } else {
2661 false
2662 }
2663}
2664
2665#[inline]
2666fn is_entry_expired_ao_or_invalid(
2667 time_to_idle: &Option<Duration>,
2668 valid_after: &Option<Instant>,
2669 entry_last_accessed: Instant,
2670 now: Instant,
2671) -> (bool, bool) {
2672 let ts = entry_last_accessed;
2673 let expired = is_expired_by_tti(time_to_idle, ts, now);
2674 let invalid = is_invalid_entry(valid_after, ts);
2675 (expired, invalid)
2676}
2677
2678#[inline]
2679fn is_entry_expired_wo_or_invalid(
2680 time_to_live: &Option<Duration>,
2681 valid_after: &Option<Instant>,
2682 entry_last_modified: Instant,
2683 now: Instant,
2684) -> (bool, bool) {
2685 let ts = entry_last_modified;
2686 let expired = is_expired_by_ttl(time_to_live, ts, now);
2687 let invalid = is_invalid_entry(valid_after, ts);
2688 (expired, invalid)
2689}
2690
2691#[inline]
2692fn is_invalid_entry(valid_after: &Option<Instant>, entry_ts: Instant) -> bool {
2693 if let Some(va) = valid_after {
2694 entry_ts < *va
2695 } else {
2696 false
2697 }
2698}
2699
2700#[inline]
2701fn is_expired_by_tti(
2702 time_to_idle: &Option<Duration>,
2703 entry_last_accessed: Instant,
2704 now: Instant,
2705) -> bool {
2706 if let Some(tti) = time_to_idle {
2707 let expiration = entry_last_accessed.saturating_add(*tti);
2708 expiration <= now
2709 } else {
2710 false
2711 }
2712}
2713
2714#[inline]
2715fn is_expired_by_ttl(
2716 time_to_live: &Option<Duration>,
2717 entry_last_modified: Instant,
2718 now: Instant,
2719) -> bool {
2720 if let Some(ttl) = time_to_live {
2721 let expiration = entry_last_modified.saturating_add(*ttl);
2722 expiration <= now
2723 } else {
2724 false
2725 }
2726}
2727
2728#[cfg(test)]
2729mod tests {
2730 use crate::{
2731 common::{time::Clock, HousekeeperConfig},
2732 policy::{EvictionPolicy, ExpirationPolicy},
2733 };
2734
2735 use super::BaseCache;
2736
2737 #[cfg_attr(target_pointer_width = "16", ignore)]
2738 #[tokio::test]
2739 async fn test_skt_capacity_will_not_overflow() {
2740 use std::collections::hash_map::RandomState;
2741
2742 let pot = |exp| 2u64.pow(exp);
2744
2745 async fn ensure_sketch_len(max_capacity: u64, len: u64, name: &str) {
2746 let cache = BaseCache::<u8, u8>::new(
2747 None,
2748 Some(max_capacity),
2749 None,
2750 RandomState::default(),
2751 None,
2752 EvictionPolicy::default(),
2753 None,
2754 ExpirationPolicy::default(),
2755 HousekeeperConfig::default(),
2756 false,
2757 Clock::default(),
2758 );
2759 cache.inner.enable_frequency_sketch_for_testing().await;
2760 assert_eq!(
2761 cache.inner.frequency_sketch.read().await.table_len(),
2762 len as usize,
2763 "{name}"
2764 );
2765 }
2766
2767 if cfg!(target_pointer_width = "32") {
2768 let pot24 = pot(24);
2769 let pot16 = pot(16);
2770 ensure_sketch_len(0, 128, "0").await;
2771 ensure_sketch_len(128, 128, "128").await;
2772 ensure_sketch_len(pot16, pot16, "pot16").await;
2773 ensure_sketch_len(pot16 + 1, pot(17), "pot16 + 1").await;
2775 ensure_sketch_len(pot24 - 1, pot24, "pot24 - 1").await;
2777 ensure_sketch_len(pot24, pot24, "pot24").await;
2778 ensure_sketch_len(pot(27), pot24, "pot(27)").await;
2779 ensure_sketch_len(u32::MAX as u64, pot24, "u32::MAX").await;
2780 } else {
2781 let pot30 = pot(30);
2783 let pot16 = pot(16);
2784 ensure_sketch_len(0, 128, "0").await;
2785 ensure_sketch_len(128, 128, "128").await;
2786 ensure_sketch_len(pot16, pot16, "pot16").await;
2787 ensure_sketch_len(pot16 + 1, pot(17), "pot16 + 1").await;
2789
2790 if !cfg!(skip_large_mem_tests) {
2792 ensure_sketch_len(pot30 - 1, pot30, "pot30- 1").await;
2794 ensure_sketch_len(pot30, pot30, "pot30").await;
2795 ensure_sketch_len(u64::MAX, pot30, "u64::MAX").await;
2796 }
2797 };
2798 }
2799
2800 #[tokio::test]
2801 async fn test_per_entry_expiration() {
2802 use crate::{common::time::Clock, Entry, Expiry};
2803
2804 use std::{
2805 collections::hash_map::RandomState,
2806 sync::{Arc, Mutex},
2807 time::{Duration, Instant as StdInstant},
2808 };
2809
2810 type Key = u32;
2811 type Value = char;
2812
2813 fn current_time(cache: &BaseCache<Key, Value>) -> StdInstant {
2814 cache.inner.clock().to_std_instant(cache.current_time())
2815 }
2816
2817 async fn insert(cache: &BaseCache<Key, Value>, key: Key, hash: u64, value: Value) {
2818 let (op, _now) = cache.do_insert_with_hash(Arc::new(key), hash, value).await;
2819 cache.write_op_ch.send(op).expect("Failed to send");
2820 }
2821
2822 fn never_ignore<'a, V>() -> Option<&'a mut fn(&V) -> bool> {
2823 None
2824 }
2825
2826 macro_rules! assert_params_eq {
2827 ($left:expr, $right:expr, $param_name:expr, $line:expr) => {
2828 assert_eq!(
2829 $left, $right,
2830 "Mismatched `{}`s. line: {}",
2831 $param_name, $line
2832 );
2833 };
2834 }
2835
2836 macro_rules! assert_expiry {
2837 ($cache:ident, $key:ident, $hash:ident, $mock:ident, $duration_secs:expr) => {
2838 $mock.increment(Duration::from_millis($duration_secs * 1000 - 1));
2840 $cache.inner.do_run_pending_tasks(None, 1, 10).await;
2841 assert!($cache.contains_key_with_hash(&$key, $hash));
2842 assert_eq!($cache.entry_count(), 1);
2843
2844 $mock.increment(Duration::from_millis(1));
2846 $cache.inner.do_run_pending_tasks(None, 1, 10).await;
2847 assert!(!$cache.contains_key_with_hash(&$key, $hash));
2848
2849 $mock.increment(Duration::from_secs(1));
2852 $cache.inner.do_run_pending_tasks(None, 1, 10).await;
2853 assert_eq!($cache.entry_count(), 0);
2854 };
2855 }
2856
2857 #[derive(Debug)]
2859 enum ExpiryExpectation {
2860 NoCall,
2861 AfterCreate {
2862 caller_line: u32,
2863 key: Key,
2864 value: Value,
2865 current_time: StdInstant,
2866 new_duration_secs: Option<u64>,
2867 },
2868 AfterRead {
2869 caller_line: u32,
2870 key: Key,
2871 value: Value,
2872 current_time: StdInstant,
2873 current_duration_secs: Option<u64>,
2874 last_modified_at: StdInstant,
2875 new_duration_secs: Option<u64>,
2876 },
2877 AfterUpdate {
2878 caller_line: u32,
2879 key: Key,
2880 value: Value,
2881 current_time: StdInstant,
2882 current_duration_secs: Option<u64>,
2883 new_duration_secs: Option<u64>,
2884 },
2885 }
2886
2887 impl ExpiryExpectation {
2888 fn after_create(
2889 caller_line: u32,
2890 key: Key,
2891 value: Value,
2892 current_time: StdInstant,
2893 new_duration_secs: Option<u64>,
2894 ) -> Self {
2895 Self::AfterCreate {
2896 caller_line,
2897 key,
2898 value,
2899 current_time,
2900 new_duration_secs,
2901 }
2902 }
2903
2904 fn after_read(
2905 caller_line: u32,
2906 key: Key,
2907 value: Value,
2908 current_time: StdInstant,
2909 current_duration_secs: Option<u64>,
2910 last_modified_at: StdInstant,
2911 new_duration_secs: Option<u64>,
2912 ) -> Self {
2913 Self::AfterRead {
2914 caller_line,
2915 key,
2916 value,
2917 current_time,
2918 current_duration_secs,
2919 last_modified_at,
2920 new_duration_secs,
2921 }
2922 }
2923
2924 fn after_update(
2925 caller_line: u32,
2926 key: Key,
2927 value: Value,
2928 current_time: StdInstant,
2929 current_duration_secs: Option<u64>,
2930 new_duration_secs: Option<u64>,
2931 ) -> Self {
2932 Self::AfterUpdate {
2933 caller_line,
2934 key,
2935 value,
2936 current_time,
2937 current_duration_secs,
2938 new_duration_secs,
2939 }
2940 }
2941 }
2942
2943 let expectation = Arc::new(Mutex::new(ExpiryExpectation::NoCall));
2944
2945 struct MyExpiry {
2946 expectation: Arc<Mutex<ExpiryExpectation>>,
2947 }
2948
2949 impl Expiry<u32, char> for MyExpiry {
2950 fn expire_after_create(
2951 &self,
2952 actual_key: &u32,
2953 actual_value: &char,
2954 actual_current_time: StdInstant,
2955 ) -> Option<Duration> {
2956 use ExpiryExpectation::*;
2957
2958 let lock = &mut *self.expectation.lock().unwrap();
2959 let expected = std::mem::replace(lock, NoCall);
2960 match expected {
2961 AfterCreate {
2962 caller_line,
2963 key,
2964 value,
2965 current_time,
2966 new_duration_secs: new_duration,
2967 } => {
2968 assert_params_eq!(*actual_key, key, "key", caller_line);
2969 assert_params_eq!(*actual_value, value, "value", caller_line);
2970 assert_params_eq!(
2971 actual_current_time,
2972 current_time,
2973 "current_time",
2974 caller_line
2975 );
2976 new_duration.map(Duration::from_secs)
2977 }
2978 expected => {
2979 panic!(
2980 "Unexpected call to expire_after_create: caller_line {}, expected: {expected:?}",
2981 line!()
2982 );
2983 }
2984 }
2985 }
2986
2987 fn expire_after_read(
2988 &self,
2989 actual_key: &u32,
2990 actual_value: &char,
2991 actual_current_time: StdInstant,
2992 actual_current_duration: Option<Duration>,
2993 actual_last_modified_at: StdInstant,
2994 ) -> Option<Duration> {
2995 use ExpiryExpectation::*;
2996
2997 let lock = &mut *self.expectation.lock().unwrap();
2998 let expected = std::mem::replace(lock, NoCall);
2999 match expected {
3000 AfterRead {
3001 caller_line,
3002 key,
3003 value,
3004 current_time,
3005 current_duration_secs,
3006 last_modified_at,
3007 new_duration_secs,
3008 } => {
3009 assert_params_eq!(*actual_key, key, "key", caller_line);
3010 assert_params_eq!(*actual_value, value, "value", caller_line);
3011 assert_params_eq!(
3012 actual_current_time,
3013 current_time,
3014 "current_time",
3015 caller_line
3016 );
3017 assert_params_eq!(
3018 actual_current_duration,
3019 current_duration_secs.map(Duration::from_secs),
3020 "current_duration",
3021 caller_line
3022 );
3023 assert_params_eq!(
3024 actual_last_modified_at,
3025 last_modified_at,
3026 "last_modified_at",
3027 caller_line
3028 );
3029 new_duration_secs.map(Duration::from_secs)
3030 }
3031 expected => {
3032 panic!(
3033 "Unexpected call to expire_after_read: caller_line {}, expected: {expected:?}",
3034 line!()
3035 );
3036 }
3037 }
3038 }
3039
3040 fn expire_after_update(
3041 &self,
3042 actual_key: &u32,
3043 actual_value: &char,
3044 actual_current_time: StdInstant,
3045 actual_current_duration: Option<Duration>,
3046 ) -> Option<Duration> {
3047 use ExpiryExpectation::*;
3048
3049 let lock = &mut *self.expectation.lock().unwrap();
3050 let expected = std::mem::replace(lock, NoCall);
3051 match expected {
3052 AfterUpdate {
3053 caller_line,
3054 key,
3055 value,
3056 current_time,
3057 current_duration_secs,
3058 new_duration_secs,
3059 } => {
3060 assert_params_eq!(*actual_key, key, "key", caller_line);
3061 assert_params_eq!(*actual_value, value, "value", caller_line);
3062 assert_params_eq!(
3063 actual_current_time,
3064 current_time,
3065 "current_time",
3066 caller_line
3067 );
3068 assert_params_eq!(
3069 actual_current_duration,
3070 current_duration_secs.map(Duration::from_secs),
3071 "current_duration",
3072 caller_line
3073 );
3074 new_duration_secs.map(Duration::from_secs)
3075 }
3076 expected => {
3077 panic!(
3078 "Unexpected call to expire_after_update: caller_line {}, expected: {expected:?}",
3079 line!()
3080 );
3081 }
3082 }
3083 }
3084 }
3085
3086 const TTL: u64 = 16;
3087 const TTI: u64 = 7;
3088 let expiry: Option<Arc<dyn Expiry<_, _> + Send + Sync + 'static>> =
3089 Some(Arc::new(MyExpiry {
3090 expectation: Arc::clone(&expectation),
3091 }));
3092 let (clock, mock) = Clock::mock();
3093
3094 let mut cache = BaseCache::<Key, Value>::new(
3095 None,
3096 None,
3097 None,
3098 RandomState::default(),
3099 None,
3100 EvictionPolicy::default(),
3101 None,
3102 ExpirationPolicy::new(
3103 Some(Duration::from_secs(TTL)),
3104 Some(Duration::from_secs(TTI)),
3105 expiry,
3106 ),
3107 HousekeeperConfig::default(),
3108 false,
3109 clock,
3110 );
3111 cache.reconfigure_for_testing().await;
3112
3113 let cache = cache;
3115
3116 mock.increment(Duration::from_millis(10));
3117
3118 let key = 1;
3127 let hash = cache.hash(&key);
3128 let value = 'a';
3129
3130 *expectation.lock().unwrap() =
3131 ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(1));
3132
3133 insert(&cache, key, hash, value).await;
3134 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3137 assert_eq!(cache.entry_count(), 1);
3138
3139 assert_expiry!(cache, key, hash, mock, 1);
3140
3141 let key = 2;
3151 let hash = cache.hash(&key);
3152 let value = 'b';
3153
3154 *expectation.lock().unwrap() =
3155 ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
3156 let inserted_at = current_time(&cache);
3157 insert(&cache, key, hash, value).await;
3158 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3159 assert_eq!(cache.entry_count(), 1);
3160
3161 mock.increment(Duration::from_secs(1));
3163 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3164 assert!(cache.contains_key_with_hash(&key, hash));
3165
3166 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3168 line!(),
3169 key,
3170 value,
3171 current_time(&cache),
3172 Some(TTI - 1),
3173 inserted_at,
3174 Some(3),
3175 );
3176 assert_eq!(
3177 cache
3178 .get_with_hash(&key, hash, never_ignore(), false, true)
3179 .await
3180 .map(Entry::into_value),
3181 Some(value)
3182 );
3183 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3184
3185 assert_expiry!(cache, key, hash, mock, 3);
3186
3187 let key = 3;
3198 let hash = cache.hash(&key);
3199 let value = 'c';
3200
3201 *expectation.lock().unwrap() =
3202 ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
3203 let inserted_at = current_time(&cache);
3204 insert(&cache, key, hash, value).await;
3205 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3206 assert_eq!(cache.entry_count(), 1);
3207
3208 mock.increment(Duration::from_secs(1));
3210 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3211 assert!(cache.contains_key_with_hash(&key, hash));
3212
3213 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3215 line!(),
3216 key,
3217 value,
3218 current_time(&cache),
3219 Some(TTI - 1),
3220 inserted_at,
3221 None,
3222 );
3223 assert_eq!(
3224 cache
3225 .get_with_hash(&key, hash, never_ignore(), false, true)
3226 .await
3227 .map(Entry::into_value),
3228 Some(value)
3229 );
3230 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3231
3232 mock.increment(Duration::from_secs(2));
3234 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3235 assert!(cache.contains_key_with_hash(&key, hash));
3236 assert_eq!(cache.entry_count(), 1);
3237
3238 *expectation.lock().unwrap() = ExpiryExpectation::after_update(
3240 line!(),
3241 key,
3242 value,
3243 current_time(&cache),
3244 Some(TTI),
3246 Some(3),
3247 );
3248 insert(&cache, key, hash, value).await;
3249 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3250 assert_eq!(cache.entry_count(), 1);
3251
3252 assert_expiry!(cache, key, hash, mock, 3);
3253
3254 let key = 4;
3265 let hash = cache.hash(&key);
3266 let value = 'd';
3267
3268 *expectation.lock().unwrap() =
3269 ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
3270 let inserted_at = current_time(&cache);
3271 insert(&cache, key, hash, value).await;
3272 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3273 assert_eq!(cache.entry_count(), 1);
3274
3275 mock.increment(Duration::from_secs(1));
3277 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3278 assert!(cache.contains_key_with_hash(&key, hash));
3279 assert_eq!(cache.entry_count(), 1);
3280
3281 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3283 line!(),
3284 key,
3285 value,
3286 current_time(&cache),
3287 Some(TTI - 1),
3288 inserted_at,
3289 None,
3290 );
3291 assert_eq!(
3292 cache
3293 .get_with_hash(&key, hash, never_ignore(), false, true)
3294 .await
3295 .map(Entry::into_value),
3296 Some(value)
3297 );
3298 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3299
3300 mock.increment(Duration::from_secs(2));
3302 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3303 assert!(cache.contains_key_with_hash(&key, hash));
3304 assert_eq!(cache.entry_count(), 1);
3305
3306 *expectation.lock().unwrap() = ExpiryExpectation::after_update(
3308 line!(),
3309 key,
3310 value,
3311 current_time(&cache),
3312 Some(TTI),
3314 None,
3315 );
3316 insert(&cache, key, hash, value).await;
3317 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3318 assert_eq!(cache.entry_count(), 1);
3319
3320 assert_expiry!(cache, key, hash, mock, 7);
3321
3322 let key = 5;
3332 let hash = cache.hash(&key);
3333 let value = 'e';
3334
3335 *expectation.lock().unwrap() =
3336 ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8));
3337 let inserted_at = current_time(&cache);
3338 insert(&cache, key, hash, value).await;
3339 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3340 assert_eq!(cache.entry_count(), 1);
3341
3342 mock.increment(Duration::from_secs(5));
3344 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3345 assert!(cache.contains_key_with_hash(&key, hash));
3346 assert_eq!(cache.entry_count(), 1);
3347
3348 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3350 line!(),
3351 key,
3352 value,
3353 current_time(&cache),
3354 Some(TTI - 5),
3355 inserted_at,
3356 Some(8),
3357 );
3358 assert_eq!(
3359 cache
3360 .get_with_hash(&key, hash, never_ignore(), false, true)
3361 .await
3362 .map(Entry::into_value),
3363 Some(value)
3364 );
3365 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3366
3367 assert_expiry!(cache, key, hash, mock, 7);
3368
3369 let key = 6;
3380 let hash = cache.hash(&key);
3381 let value = 'f';
3382
3383 *expectation.lock().unwrap() =
3384 ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8));
3385 let inserted_at = current_time(&cache);
3386 insert(&cache, key, hash, value).await;
3387 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3388 assert_eq!(cache.entry_count(), 1);
3389
3390 mock.increment(Duration::from_secs(5));
3392 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3393 assert!(cache.contains_key_with_hash(&key, hash));
3394 assert_eq!(cache.entry_count(), 1);
3395
3396 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3398 line!(),
3399 key,
3400 value,
3401 current_time(&cache),
3402 Some(TTI - 5),
3403 inserted_at,
3404 Some(9),
3405 );
3406 assert_eq!(
3407 cache
3408 .get_with_hash(&key, hash, never_ignore(), false, true)
3409 .await
3410 .map(Entry::into_value),
3411 Some(value)
3412 );
3413 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3414
3415 mock.increment(Duration::from_secs(6));
3417 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3418 assert!(cache.contains_key_with_hash(&key, hash));
3419 assert_eq!(cache.entry_count(), 1);
3420
3421 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3423 line!(),
3424 key,
3425 value,
3426 current_time(&cache),
3427 Some(TTI - 6),
3428 inserted_at,
3429 Some(10),
3430 );
3431 assert_eq!(
3432 cache
3433 .get_with_hash(&key, hash, never_ignore(), false, true)
3434 .await
3435 .map(Entry::into_value),
3436 Some(value)
3437 );
3438 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3439
3440 assert_expiry!(cache, key, hash, mock, 5);
3441
3442 let key = 7;
3453 let hash = cache.hash(&key);
3454 let value = 'g';
3455
3456 *expectation.lock().unwrap() =
3457 ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(9));
3458 insert(&cache, key, hash, value).await;
3459 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3460 assert_eq!(cache.entry_count(), 1);
3461
3462 mock.increment(Duration::from_secs(6));
3464 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3465 assert!(cache.contains_key_with_hash(&key, hash));
3466 assert_eq!(cache.entry_count(), 1);
3467
3468 *expectation.lock().unwrap() = ExpiryExpectation::after_update(
3470 line!(),
3471 key,
3472 value,
3473 current_time(&cache),
3474 Some(9 - 6),
3476 Some(8),
3477 );
3478 let updated_at = current_time(&cache);
3479 insert(&cache, key, hash, value).await;
3480 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3481 assert_eq!(cache.entry_count(), 1);
3482
3483 mock.increment(Duration::from_secs(6));
3485 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3486 assert!(cache.contains_key_with_hash(&key, hash));
3487 assert_eq!(cache.entry_count(), 1);
3488
3489 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3491 line!(),
3492 key,
3493 value,
3494 current_time(&cache),
3495 Some(TTI - 6),
3496 updated_at,
3497 Some(9),
3498 );
3499 assert_eq!(
3500 cache
3501 .get_with_hash(&key, hash, never_ignore(), false, true)
3502 .await
3503 .map(Entry::into_value),
3504 Some(value)
3505 );
3506 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3507
3508 mock.increment(Duration::from_secs(6));
3510 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3511 assert!(cache.contains_key_with_hash(&key, hash));
3512 assert_eq!(cache.entry_count(), 1);
3513
3514 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3516 line!(),
3517 key,
3518 value,
3519 current_time(&cache),
3520 Some(TTI - 6),
3521 updated_at,
3522 Some(5),
3523 );
3524 assert_eq!(
3525 cache
3526 .get_with_hash(&key, hash, never_ignore(), false, true)
3527 .await
3528 .map(Entry::into_value),
3529 Some(value)
3530 );
3531 cache.inner.do_run_pending_tasks(None, 1, 10).await;
3532
3533 assert_expiry!(cache, key, hash, mock, 4);
3534 }
3535}