moka/future/
base_cache.rs

1use super::{
2    housekeeper::Housekeeper,
3    invalidator::{Invalidator, KeyDateLite, PredicateFun},
4    key_lock::{KeyLock, KeyLockMap},
5    notifier::RemovalNotifier,
6    InterruptedOp, PredicateId,
7};
8
9use crate::{
10    common::{
11        self,
12        concurrent::{
13            arc::MiniArc,
14            constants::{
15                READ_LOG_CH_SIZE, READ_LOG_FLUSH_POINT, WRITE_LOG_CH_SIZE, WRITE_LOG_FLUSH_POINT,
16            },
17            deques::Deques,
18            entry_info::EntryInfo,
19            AccessTime, KeyHash, KeyHashDate, KvEntry, OldEntryInfo, ReadOp, ValueEntry, Weigher,
20            WriteOp,
21        },
22        deque::{DeqNode, Deque},
23        frequency_sketch::FrequencySketch,
24        iter::ScanningGet,
25        time::{AtomicInstant, Clock, Instant},
26        timer_wheel::{ReschedulingResult, TimerWheel},
27        CacheRegion, HousekeeperConfig,
28    },
29    future::CancelGuard,
30    notification::{AsyncEvictionListener, RemovalCause},
31    policy::{EvictionPolicy, EvictionPolicyConfig, ExpirationPolicy},
32    Entry, Expiry, Policy, PredicateError,
33};
34
35#[cfg(feature = "unstable-debug-counters")]
36use common::concurrent::debug_counters::CacheDebugStats;
37
38use async_lock::{Mutex, MutexGuard, RwLock};
39use crossbeam_channel::{Receiver, Sender, TrySendError};
40use crossbeam_utils::atomic::AtomicCell;
41use equivalent::Equivalent;
42use futures_util::future::BoxFuture;
43use smallvec::SmallVec;
44use std::{
45    borrow::Borrow,
46    collections::hash_map::RandomState,
47    hash::{BuildHasher, Hash, Hasher},
48    sync::{
49        atomic::{AtomicBool, AtomicU8, Ordering},
50        Arc,
51    },
52    time::{Duration, Instant as StdInstant},
53};
54
55pub(crate) type HouseKeeperArc = Arc<Housekeeper>;
56
57pub(crate) struct BaseCache<K, V, S = RandomState> {
58    pub(crate) inner: Arc<Inner<K, V, S>>,
59    read_op_ch: Sender<ReadOp<K, V>>,
60    pub(crate) write_op_ch: Sender<WriteOp<K, V>>,
61    pub(crate) interrupted_op_ch_snd: Sender<InterruptedOp<K, V>>,
62    pub(crate) interrupted_op_ch_rcv: Receiver<InterruptedOp<K, V>>,
63    pub(crate) housekeeper: Option<HouseKeeperArc>,
64}
65
66impl<K, V, S> Clone for BaseCache<K, V, S> {
67    /// Makes a clone of this shared cache.
68    ///
69    /// This operation is cheap as it only creates thread-safe reference counted
70    /// pointers to the shared internal data structures.
71    fn clone(&self) -> Self {
72        Self {
73            inner: Arc::clone(&self.inner),
74            read_op_ch: self.read_op_ch.clone(),
75            write_op_ch: self.write_op_ch.clone(),
76            interrupted_op_ch_snd: self.interrupted_op_ch_snd.clone(),
77            interrupted_op_ch_rcv: self.interrupted_op_ch_rcv.clone(),
78            housekeeper: self.housekeeper.clone(),
79        }
80    }
81}
82
83impl<K, V, S> Drop for BaseCache<K, V, S> {
84    fn drop(&mut self) {
85        // The housekeeper needs to be dropped before the inner is dropped.
86        std::mem::drop(self.housekeeper.take());
87    }
88}
89
90impl<K, V, S> BaseCache<K, V, S> {
91    pub(crate) fn name(&self) -> Option<&str> {
92        self.inner.name()
93    }
94
95    pub(crate) fn policy(&self) -> Policy {
96        self.inner.policy()
97    }
98
99    pub(crate) fn entry_count(&self) -> u64 {
100        self.inner.entry_count()
101    }
102
103    pub(crate) fn weighted_size(&self) -> u64 {
104        self.inner.weighted_size()
105    }
106
107    pub(crate) fn is_map_disabled(&self) -> bool {
108        self.inner.max_capacity == Some(0)
109    }
110
111    #[inline]
112    pub(crate) fn is_removal_notifier_enabled(&self) -> bool {
113        self.inner.is_removal_notifier_enabled()
114    }
115
116    #[inline]
117    pub(crate) fn current_time(&self) -> Instant {
118        self.inner.current_time()
119    }
120
121    #[inline]
122    pub(crate) fn write_op_ch_ready_event(&self) -> &event_listener::Event<()> {
123        &self.inner.write_op_ch_ready_event
124    }
125
126    pub(crate) fn notify_invalidate(
127        &self,
128        key: &Arc<K>,
129        entry: &MiniArc<ValueEntry<K, V>>,
130    ) -> BoxFuture<'static, ()>
131    where
132        K: Send + Sync + 'static,
133        V: Clone + Send + Sync + 'static,
134    {
135        self.inner.notify_invalidate(key, entry)
136    }
137
138    #[cfg(feature = "unstable-debug-counters")]
139    pub async fn debug_stats(&self) -> CacheDebugStats {
140        self.inner.debug_stats().await
141    }
142}
143
144impl<K, V, S> BaseCache<K, V, S>
145where
146    K: Hash + Eq,
147    S: BuildHasher,
148{
149    pub(crate) fn maybe_key_lock(&self, key: &Arc<K>) -> Option<KeyLock<'_, K, S>> {
150        self.inner.maybe_key_lock(key)
151    }
152}
153
154impl<K, V, S> BaseCache<K, V, S>
155where
156    K: Hash + Eq + Send + Sync + 'static,
157    V: Clone + Send + Sync + 'static,
158    S: BuildHasher + Clone + Send + Sync + 'static,
159{
160    // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments
161    #[allow(clippy::too_many_arguments)]
162    pub(crate) fn new(
163        name: Option<String>,
164        max_capacity: Option<u64>,
165        initial_capacity: Option<usize>,
166        build_hasher: S,
167        weigher: Option<Weigher<K, V>>,
168        eviction_policy: EvictionPolicy,
169        eviction_listener: Option<AsyncEvictionListener<K, V>>,
170        expiration_policy: ExpirationPolicy<K, V>,
171        housekeeper_config: HousekeeperConfig,
172        invalidator_enabled: bool,
173        clock: Clock,
174    ) -> Self {
175        let (r_size, w_size) = if max_capacity == Some(0) {
176            (0, 0)
177        } else {
178            (READ_LOG_CH_SIZE, WRITE_LOG_CH_SIZE)
179        };
180        let is_eviction_listener_enabled = eviction_listener.is_some();
181        let fast_now = clock.fast_now();
182
183        let (r_snd, r_rcv) = crossbeam_channel::bounded(r_size);
184        let (w_snd, w_rcv) = crossbeam_channel::bounded(w_size);
185        let (i_snd, i_rcv) = crossbeam_channel::unbounded();
186
187        let inner = Arc::new(Inner::new(
188            name,
189            max_capacity,
190            initial_capacity,
191            build_hasher,
192            weigher,
193            eviction_policy,
194            eviction_listener,
195            r_rcv,
196            w_rcv,
197            expiration_policy,
198            invalidator_enabled,
199            clock,
200        ));
201
202        Self {
203            inner,
204            read_op_ch: r_snd,
205            write_op_ch: w_snd,
206            interrupted_op_ch_snd: i_snd,
207            interrupted_op_ch_rcv: i_rcv,
208            housekeeper: Some(Arc::new(Housekeeper::new(
209                is_eviction_listener_enabled,
210                housekeeper_config,
211                fast_now,
212            ))),
213        }
214    }
215
216    #[inline]
217    pub(crate) fn hash<Q>(&self, key: &Q) -> u64
218    where
219        Q: Equivalent<K> + Hash + ?Sized,
220    {
221        self.inner.hash(key)
222    }
223
224    pub(crate) fn contains_key_with_hash<Q>(&self, key: &Q, hash: u64) -> bool
225    where
226        Q: Equivalent<K> + Hash + ?Sized,
227    {
228        // TODO: Maybe we can just call ScanningGet::scanning_get.
229        self.inner
230            .get_key_value_and(key, hash, |k, entry| {
231                let i = &self.inner;
232                let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
233                let now = self.current_time();
234
235                !is_expired_by_per_entry_ttl(entry.entry_info(), now)
236                    && !is_expired_entry_wo(ttl, va, entry, now)
237                    && !is_expired_entry_ao(tti, va, entry, now)
238                    && !i.is_invalidated_entry(k, entry)
239            })
240            .unwrap_or_default() // `false` is the default for `bool` type.
241    }
242
243    pub(crate) async fn get_with_hash<Q, I>(
244        &self,
245        key: &Q,
246        hash: u64,
247        mut ignore_if: Option<&mut I>,
248        need_key: bool,
249        record_read: bool,
250    ) -> Option<Entry<K, V>>
251    where
252        Q: Equivalent<K> + Hash + ?Sized,
253        I: FnMut(&V) -> bool,
254    {
255        if self.is_map_disabled() {
256            return None;
257        }
258
259        if record_read {
260            self.retry_interrupted_ops().await;
261        }
262
263        let mut now = self.current_time();
264
265        let maybe_kv_and_op = self
266            .inner
267            .get_key_value_and_then(key, hash, move |k, entry| {
268                if let Some(ignore_if) = &mut ignore_if {
269                    if ignore_if(&entry.value) {
270                        // Ignore the entry.
271                        return None;
272                    }
273                }
274
275                let i = &self.inner;
276                let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
277
278                if is_expired_by_per_entry_ttl(entry.entry_info(), now)
279                    || is_expired_entry_wo(ttl, va, entry, now)
280                    || is_expired_entry_ao(tti, va, entry, now)
281                    || i.is_invalidated_entry(k, entry)
282                {
283                    // Expired or invalidated entry.
284                    None
285                } else {
286                    // Valid entry.
287                    let mut is_expiry_modified = false;
288
289                    // Call the user supplied `expire_after_read` method if any.
290                    if let Some(expiry) = &self.inner.expiration_policy.expiry() {
291                        let lm = entry.last_modified().expect("Last modified is not set");
292                        // Check if the `last_modified` of entry is earlier than or equals to
293                        // `now`. If not, update the `now` to `last_modified`. This is needed
294                        // because there is a small chance that other threads have inserted
295                        // the entry _after_ we obtained `now`.
296                        now = now.max(lm);
297
298                        // Convert `last_modified` from `moka::common::time::Instant` to
299                        // `std::time::Instant`.
300                        let lm = self.inner.clock().to_std_instant(lm);
301
302                        // Call the user supplied `expire_after_read` method.
303                        //
304                        // We will put the return value (`is_expiry_modified: bool`) to a
305                        // `ReadOp` so that `apply_reads` method can determine whether or not
306                        // to reschedule the timer for the entry.
307                        //
308                        // NOTE: It is not guaranteed that the `ReadOp` is passed to
309                        // `apply_reads`. Here are the corner cases that the `ReadOp` will
310                        // not be passed to `apply_reads`:
311                        //
312                        // - If the bounded `read_op_ch` channel is full, the `ReadOp` will
313                        //   be discarded.
314                        // - If we were called by `get_with_hash_without_recording` method,
315                        //   the `ReadOp` will not be recorded at all.
316                        //
317                        // These cases are okay because when the timer wheel tries to expire
318                        // the entry, it will check if the entry is actually expired. If not,
319                        // the timer wheel will reschedule the expiration timer for the
320                        // entry.
321                        is_expiry_modified = Self::expire_after_read_or_update(
322                            |k, v, t, d| expiry.expire_after_read(k, v, t, d, lm),
323                            &entry.entry_info().key_hash().key,
324                            entry,
325                            self.inner.expiration_policy.time_to_live(),
326                            self.inner.expiration_policy.time_to_idle(),
327                            now,
328                            self.inner.clock(),
329                        );
330                    }
331
332                    entry.set_last_accessed(now);
333
334                    let maybe_key = if need_key { Some(Arc::clone(k)) } else { None };
335                    let ent = Entry::new(maybe_key, entry.value.clone(), false, false);
336                    let maybe_op = if record_read {
337                        Some(ReadOp::Hit {
338                            value_entry: MiniArc::clone(entry),
339                            is_expiry_modified,
340                        })
341                    } else {
342                        None
343                    };
344
345                    Some((ent, maybe_op, now))
346                }
347            });
348
349        if let Some((ent, maybe_op, now)) = maybe_kv_and_op {
350            if let Some(op) = maybe_op {
351                self.record_read_op(op, now)
352                    .await
353                    .expect("Failed to record a get op");
354            }
355            Some(ent)
356        } else {
357            if record_read {
358                self.record_read_op(ReadOp::Miss(hash), now)
359                    .await
360                    .expect("Failed to record a get op");
361            }
362            None
363        }
364    }
365
366    pub(crate) fn get_key_with_hash<Q>(&self, key: &Q, hash: u64) -> Option<Arc<K>>
367    where
368        Q: Equivalent<K> + Hash + ?Sized,
369    {
370        self.inner
371            .get_key_value_and(key, hash, |k, _entry| Arc::clone(k))
372    }
373
374    #[inline]
375    pub(crate) fn remove_entry<Q>(&self, key: &Q, hash: u64) -> Option<KvEntry<K, V>>
376    where
377        Q: Equivalent<K> + Hash + ?Sized,
378    {
379        self.inner.remove_entry(key, hash)
380    }
381
382    #[inline]
383    pub(crate) async fn apply_reads_writes_if_needed(
384        inner: &Arc<Inner<K, V, S>>,
385        ch: &Sender<WriteOp<K, V>>,
386        now: Instant,
387        housekeeper: Option<&HouseKeeperArc>,
388    ) {
389        let w_len = ch.len();
390
391        if let Some(hk) = housekeeper {
392            if Self::should_apply_writes(hk, w_len, now) {
393                hk.try_run_pending_tasks(inner).await;
394            }
395        }
396    }
397
398    pub(crate) fn invalidate_all(&self) {
399        let now = self.current_time();
400        self.inner.set_valid_after(now);
401    }
402
403    pub(crate) fn invalidate_entries_if(
404        &self,
405        predicate: PredicateFun<K, V>,
406    ) -> Result<PredicateId, PredicateError> {
407        let now = self.current_time();
408        self.inner.register_invalidation_predicate(predicate, now)
409    }
410}
411
412//
413// Iterator support
414//
415impl<K, V, S> ScanningGet<K, V> for BaseCache<K, V, S>
416where
417    K: Hash + Eq + Send + Sync + 'static,
418    V: Clone + Send + Sync + 'static,
419    S: BuildHasher + Clone + Send + Sync + 'static,
420{
421    fn num_cht_segments(&self) -> usize {
422        self.inner.num_cht_segments()
423    }
424
425    fn scanning_get(&self, key: &Arc<K>) -> Option<V> {
426        let hash = self.hash(&**key);
427        self.inner.get_key_value_and_then(&**key, hash, |k, entry| {
428            let i = &self.inner;
429            let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
430            let now = self.current_time();
431
432            if is_expired_by_per_entry_ttl(entry.entry_info(), now)
433                || is_expired_entry_wo(ttl, va, entry, now)
434                || is_expired_entry_ao(tti, va, entry, now)
435                || i.is_invalidated_entry(k, entry)
436            {
437                // Expired or invalidated entry.
438                None
439            } else {
440                // Valid entry.
441                Some(entry.value.clone())
442            }
443        })
444    }
445
446    fn keys(&self, cht_segment: usize) -> Option<Vec<Arc<K>>> {
447        self.inner.keys(cht_segment)
448    }
449}
450
451//
452// private methods
453//
454impl<K, V, S> BaseCache<K, V, S>
455where
456    K: Hash + Eq + Send + Sync + 'static,
457    V: Clone + Send + Sync + 'static,
458    S: BuildHasher + Clone + Send + Sync + 'static,
459{
460    #[inline]
461    async fn record_read_op(
462        &self,
463        op: ReadOp<K, V>,
464        now: Instant,
465    ) -> Result<(), TrySendError<ReadOp<K, V>>> {
466        self.apply_reads_if_needed(&self.inner, now).await;
467        let ch = &self.read_op_ch;
468        match ch.try_send(op) {
469            // Discard the ReadOp when the channel is full.
470            Ok(()) | Err(TrySendError::Full(_)) => Ok(()),
471            Err(e @ TrySendError::Disconnected(_)) => Err(e),
472        }
473    }
474
475    #[inline]
476    pub(crate) async fn do_insert_with_hash(
477        &self,
478        key: Arc<K>,
479        hash: u64,
480        value: V,
481    ) -> (WriteOp<K, V>, Instant) {
482        self.retry_interrupted_ops().await;
483
484        let weight = self.inner.weigh(&key, &value);
485        let op_cnt1 = Arc::new(AtomicU8::new(0));
486        let op_cnt2 = Arc::clone(&op_cnt1);
487        let mut op1 = None;
488        let mut op2 = None;
489
490        // Lock the key for update if blocking removal notification is enabled.
491        let kl = self.maybe_key_lock(&key);
492        let _klg = if let Some(lock) = &kl {
493            Some(lock.lock().await)
494        } else {
495            None
496        };
497
498        let ts = self.current_time();
499
500        // TODO: Instead using Arc<AtomicU8> to check if the actual operation was
501        // insert or update, check the return value of insert_with_or_modify. If it
502        // is_some, the value was updated, otherwise the value was inserted.
503
504        // Since the cache (cht::SegmentedHashMap) employs optimistic locking
505        // strategy, insert_with_or_modify() may get an insert/modify operation
506        // conflicted with other concurrent hash table operations. In that case, it
507        // has to retry the insertion or modification, so on_insert and/or on_modify
508        // closures can be executed more than once. In order to identify the last
509        // call of these closures, we use a shared counter (op_cnt{1,2}) here to
510        // record a serial number on a WriteOp, and consider the WriteOp with the
511        // largest serial number is the one made by the last call of the closures.
512        self.inner.cache.insert_with_or_modify(
513            Arc::clone(&key),
514            hash,
515            // on_insert
516            || {
517                let (entry, gen) = self.new_value_entry(&key, hash, value.clone(), ts, weight);
518                let ins_op = WriteOp::new_upsert(&key, hash, &entry, gen, 0, weight);
519                let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed);
520                op1 = Some((cnt, ins_op));
521                entry
522            },
523            // on_modify
524            |_k, old_entry| {
525                let old_weight = old_entry.policy_weight();
526
527                // Create this OldEntryInfo _before_ creating a new ValueEntry, so
528                // that the OldEntryInfo can preserve the old EntryInfo's
529                // last_accessed and last_modified timestamps.
530                let old_info = OldEntryInfo::new(old_entry);
531                let (entry, gen) = self.new_value_entry_from(value.clone(), ts, weight, old_entry);
532                let upd_op = WriteOp::new_upsert(&key, hash, &entry, gen, old_weight, weight);
533                let cnt = op_cnt2.fetch_add(1, Ordering::Relaxed);
534                op2 = Some((cnt, old_info, upd_op));
535                entry
536            },
537        );
538
539        match (op1, op2) {
540            (Some((_cnt, ins_op)), None) => self.do_post_insert_steps(ts, &key, ins_op),
541            (Some((cnt1, ins_op)), Some((cnt2, ..))) if cnt1 > cnt2 => {
542                self.do_post_insert_steps(ts, &key, ins_op)
543            }
544            (_, Some((_cnt, old_entry, upd_op))) => {
545                self.do_post_update_steps(ts, key, old_entry, upd_op, &self.interrupted_op_ch_snd)
546                    .await
547            }
548            (None, None) => unreachable!(),
549        }
550    }
551
552    fn do_post_insert_steps(
553        &self,
554        ts: Instant,
555        key: &Arc<K>,
556        ins_op: WriteOp<K, V>,
557    ) -> (WriteOp<K, V>, Instant) {
558        if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
559            (&self.inner.expiration_policy.expiry(), &ins_op)
560        {
561            Self::expire_after_create(expiry, key, value_entry, ts, self.inner.clock());
562        }
563        (ins_op, ts)
564    }
565
566    async fn do_post_update_steps(
567        &self,
568        ts: Instant,
569        key: Arc<K>,
570        old_info: OldEntryInfo<K, V>,
571        upd_op: WriteOp<K, V>,
572        interrupted_op_ch: &Sender<InterruptedOp<K, V>>,
573    ) -> (WriteOp<K, V>, Instant) {
574        use futures_util::FutureExt;
575
576        if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
577            (&self.inner.expiration_policy.expiry(), &upd_op)
578        {
579            Self::expire_after_read_or_update(
580                |k, v, t, d| expiry.expire_after_update(k, v, t, d),
581                &key,
582                value_entry,
583                self.inner.expiration_policy.time_to_live(),
584                self.inner.expiration_policy.time_to_idle(),
585                ts,
586                self.inner.clock(),
587            );
588        }
589
590        if self.is_removal_notifier_enabled() {
591            let future = self
592                .inner
593                .notify_upsert(
594                    key,
595                    &old_info.entry,
596                    old_info.last_accessed,
597                    old_info.last_modified,
598                )
599                .shared();
600            // Async Cancellation Safety: To ensure the above future should be
601            // executed even if our caller async task is cancelled, we create a
602            // cancel guard for the future (and the upd_op). If our caller is
603            // cancelled while we are awaiting for the future, the cancel guard will
604            // save the future and the upd_op to the interrupted_op_ch channel, so
605            // that we can resume/retry later.
606            let mut cancel_guard = CancelGuard::new(interrupted_op_ch, ts);
607            cancel_guard.set_future_and_op(future.clone(), upd_op.clone());
608
609            // Notify the eviction listener.
610            future.await;
611            cancel_guard.clear();
612        }
613
614        crossbeam_epoch::pin().flush();
615        (upd_op, ts)
616    }
617
618    #[inline]
619    pub(crate) async fn schedule_write_op(
620        inner: &Arc<Inner<K, V, S>>,
621        ch: &Sender<WriteOp<K, V>>,
622        ch_ready_event: &event_listener::Event<()>,
623        op: WriteOp<K, V>,
624        ts: Instant,
625        housekeeper: Option<&HouseKeeperArc>,
626        // Used only for testing.
627        _should_block: bool,
628    ) -> Result<(), TrySendError<WriteOp<K, V>>> {
629        // Testing stuff.
630        #[cfg(test)]
631        if _should_block {
632            // We are going to do a dead-lock here to simulate a full channel.
633            let mutex = Mutex::new(());
634            let _guard = mutex.lock().await;
635            // This should dead-lock.
636            mutex.lock().await;
637        }
638
639        let mut op = op;
640        let mut spin_loop_attempts = 0u8;
641        loop {
642            // Run the `Inner::do_run_pending_tasks` method if needed.
643            BaseCache::<K, V, S>::apply_reads_writes_if_needed(inner, ch, ts, housekeeper).await;
644
645            // Try to send our op to the write op channel.
646            match ch.try_send(op) {
647                Ok(()) => return Ok(()),
648                Err(TrySendError::Full(op1)) => {
649                    op = op1;
650                }
651                Err(e @ TrySendError::Disconnected(_)) => return Err(e),
652            }
653
654            // We have got a `TrySendError::Full` above. Wait a moment and try again.
655
656            if spin_loop_attempts < 4 {
657                spin_loop_attempts += 1;
658                // Wastes some CPU time with a hint to indicate to the CPU that we
659                // are spinning. Adjust the SPIN_COUNT because the `PAUSE`
660                // instruction of recent x86_64 CPUs may have longer latency than the
661                // alternatives in other CPU architectures.
662                const SPIN_COUNT: usize = if cfg!(target_arch = "x86_64") { 8 } else { 32 };
663                for _ in 0..SPIN_COUNT {
664                    std::hint::spin_loop();
665                }
666            } else {
667                spin_loop_attempts = 0;
668
669                // Yield the async runtime scheduler to other async tasks and wait
670                // for a channel ready event. This event will be sent when one of the
671                // following conditions is met:
672                //
673                // - The `Inner::do_run_pending_tasks` method has removed some ops
674                //   from the write op channel.
675                // - The `Housekeeper`'s `run_pending_tasks` or `
676                //   try_run_pending_tasks` methods has freed the lock on the
677                //   `current_task`.
678                //
679                ch_ready_event.listen().await;
680
681                // We are going to retry. Now the channel may have some space and/or
682                // one of us is allowed to run `do_run_pending_tasks` method.
683            }
684        }
685    }
686
687    pub(crate) async fn retry_interrupted_ops(&self) {
688        while let Ok(op) = self.interrupted_op_ch_rcv.try_recv() {
689            // Async Cancellation Safety: Remember that we are in an async task here.
690            // If our caller is cancelled while we are awaiting for the future, we
691            // will be cancelled too at the await point. In that case, the cancel
692            // guard below will save the future and the op to the interrupted_op_ch
693            // channel, so that we can resume/retry later.
694            let mut cancel_guard;
695
696            // Resume an interrupted future if there is one.
697            match op {
698                InterruptedOp::CallEvictionListener { ts, future, op } => {
699                    cancel_guard = CancelGuard::new(&self.interrupted_op_ch_snd, ts);
700                    cancel_guard.set_future_and_op(future.clone(), op);
701                    // Resume the interrupted future (which will notify an eviction
702                    // to the eviction listener).
703                    future.await;
704                    // If we are here, it means the above future has been completed.
705                    cancel_guard.unset_future();
706                }
707                InterruptedOp::SendWriteOp { ts, op } => {
708                    cancel_guard = CancelGuard::new(&self.interrupted_op_ch_snd, ts);
709                    cancel_guard.set_op(op);
710                }
711            }
712
713            // Retry to schedule the write op.
714            let ts = cancel_guard.ts;
715            let event = self.write_op_ch_ready_event();
716            let op = cancel_guard.op.as_ref().cloned().unwrap();
717            let hk = self.housekeeper.as_ref();
718            Self::schedule_write_op(&self.inner, &self.write_op_ch, event, op, ts, hk, false)
719                .await
720                .expect("Failed to reschedule a write op");
721
722            // If we are here, it means the above write op has been scheduled.
723            // We are all good now. Clear the cancel guard.
724            cancel_guard.clear();
725        }
726    }
727
728    #[inline]
729    async fn apply_reads_if_needed(&self, inner: &Arc<Inner<K, V, S>>, now: Instant) {
730        let len = self.read_op_ch.len();
731
732        if let Some(hk) = &self.housekeeper {
733            if Self::should_apply_reads(hk, len, now) {
734                hk.try_run_pending_tasks(inner).await;
735            }
736        }
737    }
738
739    #[inline]
740    fn should_apply_reads(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool {
741        hk.should_apply_reads(ch_len, now)
742    }
743
744    #[inline]
745    fn should_apply_writes(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool {
746        hk.should_apply_writes(ch_len, now)
747    }
748}
749
750impl<K, V, S> BaseCache<K, V, S> {
751    #[inline]
752    fn new_value_entry(
753        &self,
754        key: &Arc<K>,
755        hash: u64,
756        value: V,
757        timestamp: Instant,
758        policy_weight: u32,
759    ) -> (MiniArc<ValueEntry<K, V>>, u16) {
760        let key_hash = KeyHash::new(Arc::clone(key), hash);
761        let info = MiniArc::new(EntryInfo::new(key_hash, timestamp, policy_weight));
762        let gen: u16 = info.entry_gen();
763        (MiniArc::new(ValueEntry::new(value, info)), gen)
764    }
765
766    #[inline]
767    fn new_value_entry_from(
768        &self,
769        value: V,
770        timestamp: Instant,
771        policy_weight: u32,
772        other: &ValueEntry<K, V>,
773    ) -> (MiniArc<ValueEntry<K, V>>, u16) {
774        let info = MiniArc::clone(other.entry_info());
775        // To prevent this updated ValueEntry from being evicted by an expiration
776        // policy, increment the entry generation.
777        let gen = info.incr_entry_gen();
778        info.set_last_accessed(timestamp);
779        info.set_last_modified(timestamp);
780        info.set_policy_weight(policy_weight);
781        (MiniArc::new(ValueEntry::new_from(value, info, other)), gen)
782    }
783
784    fn expire_after_create(
785        expiry: &Arc<dyn Expiry<K, V> + Send + Sync + 'static>,
786        key: &K,
787        value_entry: &ValueEntry<K, V>,
788        ts: Instant,
789        clock: &Clock,
790    ) {
791        let duration =
792            expiry.expire_after_create(key, &value_entry.value, clock.to_std_instant(ts));
793        let expiration_time = duration.map(|duration| ts.saturating_add(duration));
794        value_entry
795            .entry_info()
796            .set_expiration_time(expiration_time);
797    }
798
799    fn expire_after_read_or_update(
800        expiry: impl FnOnce(&K, &V, StdInstant, Option<Duration>) -> Option<Duration>,
801        key: &K,
802        value_entry: &ValueEntry<K, V>,
803        ttl: Option<Duration>,
804        tti: Option<Duration>,
805        ts: Instant,
806        clock: &Clock,
807    ) -> bool {
808        let current_time = clock.to_std_instant(ts);
809        let ei = &value_entry.entry_info();
810
811        let exp_time = IntoIterator::into_iter([
812            ei.expiration_time(),
813            ttl.and_then(|dur| ei.last_modified().map(|ts| ts.saturating_add(dur))),
814            tti.and_then(|dur| ei.last_accessed().map(|ts| ts.saturating_add(dur))),
815        ])
816        .flatten()
817        .min();
818
819        let current_duration = exp_time.and_then(|time| {
820            let std_time = clock.to_std_instant(time);
821            std_time.checked_duration_since(current_time)
822        });
823
824        let duration = expiry(key, &value_entry.value, current_time, current_duration);
825
826        if duration != current_duration {
827            let expiration_time = duration.map(|duration| ts.saturating_add(duration));
828            value_entry
829                .entry_info()
830                .set_expiration_time(expiration_time);
831            // The `expiration_time` has changed from `None` to `Some` or vice versa.
832            true
833        } else {
834            false
835        }
836    }
837}
838
839//
840// for testing
841//
842#[cfg(test)]
843impl<K, V, S> BaseCache<K, V, S>
844where
845    K: Hash + Eq + Send + Sync + 'static,
846    V: Clone + Send + Sync + 'static,
847    S: BuildHasher + Clone + Send + Sync + 'static,
848{
849    pub(crate) fn invalidation_predicate_count(&self) -> usize {
850        self.inner.invalidation_predicate_count()
851    }
852
853    pub(crate) async fn reconfigure_for_testing(&mut self) {
854        // Enable the frequency sketch.
855        self.inner.enable_frequency_sketch_for_testing().await;
856        // Disable auto clean up of pending tasks.
857        if let Some(hk) = &self.housekeeper {
858            hk.disable_auto_run();
859        }
860    }
861
862    pub(crate) fn key_locks_map_is_empty(&self) -> bool {
863        self.inner.key_locks_map_is_empty()
864    }
865}
866
867struct EvictionState<'a, K, V> {
868    counters: EvictionCounters,
869    notifier: Option<&'a Arc<RemovalNotifier<K, V>>>,
870    more_entries_to_evict: bool,
871}
872
873impl<'a, K, V> EvictionState<'a, K, V> {
874    fn new(
875        entry_count: u64,
876        weighted_size: u64,
877        notifier: Option<&'a Arc<RemovalNotifier<K, V>>>,
878    ) -> Self {
879        Self {
880            counters: EvictionCounters::new(entry_count, weighted_size),
881            notifier,
882            more_entries_to_evict: false,
883        }
884    }
885
886    fn is_notifier_enabled(&self) -> bool {
887        self.notifier.is_some()
888    }
889
890    async fn notify_entry_removal(
891        &mut self,
892        key: Arc<K>,
893        entry: &MiniArc<ValueEntry<K, V>>,
894        cause: RemovalCause,
895    ) where
896        K: Send + Sync + 'static,
897        V: Clone + Send + Sync + 'static,
898    {
899        if let Some(notifier) = self.notifier {
900            notifier.notify(key, entry.value.clone(), cause).await;
901        } else {
902            panic!("notify_entry_removal is called when the notification is disabled");
903        }
904    }
905}
906
907struct EvictionCounters {
908    entry_count: u64,
909    weighted_size: u64,
910    eviction_count: u64,
911}
912
913impl EvictionCounters {
914    #[inline]
915    fn new(entry_count: u64, weighted_size: u64) -> Self {
916        Self {
917            entry_count,
918            weighted_size,
919            eviction_count: 0,
920        }
921    }
922
923    #[inline]
924    fn saturating_add(&mut self, entry_count: u64, weight: u32) {
925        self.entry_count += entry_count;
926        let total = &mut self.weighted_size;
927        *total = total.saturating_add(weight as u64);
928    }
929
930    #[inline]
931    fn saturating_sub(&mut self, entry_count: u64, weight: u32) {
932        self.entry_count -= entry_count;
933        let total = &mut self.weighted_size;
934        *total = total.saturating_sub(weight as u64);
935    }
936
937    #[inline]
938    fn incr_eviction_count(&mut self) {
939        let count = &mut self.eviction_count;
940        *count = count.saturating_add(1);
941    }
942}
943
944#[derive(Default)]
945struct EntrySizeAndFrequency {
946    policy_weight: u64,
947    freq: u32,
948}
949
950impl EntrySizeAndFrequency {
951    fn new(policy_weight: u32) -> Self {
952        Self {
953            policy_weight: policy_weight as u64,
954            ..Default::default()
955        }
956    }
957
958    fn add_policy_weight(&mut self, weight: u32) {
959        self.policy_weight += weight as u64;
960    }
961
962    fn add_frequency(&mut self, freq: &FrequencySketch, hash: u64) {
963        self.freq += freq.frequency(hash) as u32;
964    }
965}
966
967// NOTE: Clippy found that the `Admitted` variant contains at least a few hundred
968// bytes of data and the `Rejected` variant contains no data at all. It suggested to
969// box the `SmallVec`.
970//
971// We ignore the suggestion because (1) the `SmallVec` is used to avoid heap
972// allocation as it will be used in a performance hot spot, and (2) this enum has a
973// very short lifetime and there will only one instance at a time.
974#[allow(clippy::large_enum_variant)]
975enum AdmissionResult<K> {
976    Admitted {
977        /// A vec of pairs of `KeyHash` and `last_accessed`.
978        victim_keys: SmallVec<[(KeyHash<K>, Option<Instant>); 8]>,
979    },
980    Rejected,
981}
982
983type CacheStore<K, V, S> = crate::cht::SegmentedHashMap<Arc<K>, MiniArc<ValueEntry<K, V>>, S>;
984
985pub(crate) struct Inner<K, V, S> {
986    name: Option<String>,
987    max_capacity: Option<u64>,
988    entry_count: AtomicCell<u64>,
989    weighted_size: AtomicCell<u64>,
990    pub(crate) cache: CacheStore<K, V, S>,
991    build_hasher: S,
992    deques: Mutex<Deques<K>>,
993    timer_wheel: Mutex<TimerWheel<K>>,
994    frequency_sketch: RwLock<FrequencySketch>,
995    frequency_sketch_enabled: AtomicBool,
996    read_op_ch: Receiver<ReadOp<K, V>>,
997    write_op_ch: Receiver<WriteOp<K, V>>,
998    pub(crate) write_op_ch_ready_event: event_listener::Event,
999    eviction_policy: EvictionPolicyConfig,
1000    expiration_policy: ExpirationPolicy<K, V>,
1001    valid_after: AtomicInstant,
1002    weigher: Option<Weigher<K, V>>,
1003    removal_notifier: Option<Arc<RemovalNotifier<K, V>>>,
1004    key_locks: Option<KeyLockMap<K, S>>,
1005    invalidator: Option<Invalidator<K, V, S>>,
1006    clock: Clock,
1007}
1008
1009impl<K, V, S> Drop for Inner<K, V, S> {
1010    fn drop(&mut self) {
1011        // Ensure crossbeam-epoch to collect garbages (`deferred_fn`s) in the
1012        // global bag so that previously cached values will be dropped.
1013        for _ in 0..128 {
1014            crossbeam_epoch::pin().flush();
1015        }
1016
1017        // NOTE: The `CacheStore` (`cht`) will be dropped after returning from this
1018        // `drop` method. It uses crossbeam-epoch internally, but we do not have to
1019        // call `flush` for it because its `drop` methods do not create
1020        // `deferred_fn`s, and drop its values in place.
1021    }
1022}
1023
1024//
1025// functions/methods used by BaseCache
1026//
1027
1028impl<K, V, S> Inner<K, V, S> {
1029    fn name(&self) -> Option<&str> {
1030        self.name.as_deref()
1031    }
1032
1033    fn policy(&self) -> Policy {
1034        let exp = &self.expiration_policy;
1035        Policy::new(self.max_capacity, 1, exp.time_to_live(), exp.time_to_idle())
1036    }
1037
1038    #[inline]
1039    fn entry_count(&self) -> u64 {
1040        self.entry_count.load()
1041    }
1042
1043    #[inline]
1044    fn weighted_size(&self) -> u64 {
1045        self.weighted_size.load()
1046    }
1047
1048    #[inline]
1049    pub(crate) fn is_removal_notifier_enabled(&self) -> bool {
1050        self.removal_notifier.is_some()
1051    }
1052
1053    #[cfg(feature = "unstable-debug-counters")]
1054    pub async fn debug_stats(&self) -> CacheDebugStats {
1055        let ec = self.entry_count.load();
1056        let ws = self.weighted_size.load();
1057
1058        CacheDebugStats::new(
1059            ec,
1060            ws,
1061            (self.cache.capacity() * 2) as u64,
1062            self.frequency_sketch.read().await.table_size(),
1063        )
1064    }
1065
1066    pub(crate) fn maybe_key_lock(&self, key: &Arc<K>) -> Option<KeyLock<'_, K, S>>
1067    where
1068        K: Hash + Eq,
1069        S: BuildHasher,
1070    {
1071        self.key_locks.as_ref().map(|kls| kls.key_lock(key))
1072    }
1073
1074    #[inline]
1075    pub(crate) fn current_time(&self) -> Instant {
1076        self.clock.now()
1077    }
1078
1079    fn clock(&self) -> &Clock {
1080        &self.clock
1081    }
1082
1083    fn num_cht_segments(&self) -> usize {
1084        self.cache.actual_num_segments()
1085    }
1086
1087    #[inline]
1088    fn time_to_live(&self) -> Option<Duration> {
1089        self.expiration_policy.time_to_live()
1090    }
1091
1092    #[inline]
1093    fn time_to_idle(&self) -> Option<Duration> {
1094        self.expiration_policy.time_to_idle()
1095    }
1096
1097    #[inline]
1098    fn has_expiry(&self) -> bool {
1099        let exp = &self.expiration_policy;
1100        exp.time_to_live().is_some() || exp.time_to_idle().is_some()
1101    }
1102
1103    #[inline]
1104    fn is_write_order_queue_enabled(&self) -> bool {
1105        self.expiration_policy.time_to_live().is_some() || self.invalidator.is_some()
1106    }
1107
1108    #[inline]
1109    fn valid_after(&self) -> Option<Instant> {
1110        self.valid_after.instant()
1111    }
1112
1113    #[inline]
1114    fn set_valid_after(&self, timestamp: Instant) {
1115        self.valid_after.set_instant(timestamp);
1116    }
1117
1118    #[inline]
1119    fn has_valid_after(&self) -> bool {
1120        self.valid_after.is_set()
1121    }
1122}
1123
1124impl<K, V, S> Inner<K, V, S>
1125where
1126    K: Hash + Eq + Send + Sync + 'static,
1127    V: Send + Sync + 'static,
1128    S: BuildHasher + Send + Sync + Clone + 'static,
1129{
1130    // Disable a Clippy warning for having more than seven arguments.
1131    // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments
1132    #[allow(clippy::too_many_arguments)]
1133    fn new(
1134        name: Option<String>,
1135        max_capacity: Option<u64>,
1136        initial_capacity: Option<usize>,
1137        build_hasher: S,
1138        weigher: Option<Weigher<K, V>>,
1139        eviction_policy: EvictionPolicy,
1140        eviction_listener: Option<AsyncEvictionListener<K, V>>,
1141        read_op_ch: Receiver<ReadOp<K, V>>,
1142        write_op_ch: Receiver<WriteOp<K, V>>,
1143        expiration_policy: ExpirationPolicy<K, V>,
1144        invalidator_enabled: bool,
1145        clock: Clock,
1146    ) -> Self {
1147        // TODO: Calculate the number of segments based on the max capacity and
1148        // the number of CPUs.
1149        let (num_segments, initial_capacity) = if max_capacity == Some(0) {
1150            (1, 0)
1151        } else {
1152            let ic = initial_capacity
1153                .map(|cap| cap + WRITE_LOG_CH_SIZE)
1154                .unwrap_or_default();
1155            (64, ic)
1156        };
1157        let cache = crate::cht::SegmentedHashMap::with_num_segments_capacity_and_hasher(
1158            num_segments,
1159            initial_capacity,
1160            build_hasher.clone(),
1161        );
1162
1163        let now = clock.now();
1164        let timer_wheel = Mutex::new(TimerWheel::new(now));
1165
1166        let (removal_notifier, key_locks) = if let Some(listener) = eviction_listener {
1167            let rn = Arc::new(RemovalNotifier::new(listener, name.clone()));
1168            let kl = KeyLockMap::with_hasher(build_hasher.clone());
1169            (Some(rn), Some(kl))
1170        } else {
1171            (None, None)
1172        };
1173        let invalidator = if invalidator_enabled {
1174            Some(Invalidator::new(build_hasher.clone()))
1175        } else {
1176            None
1177        };
1178
1179        Self {
1180            name,
1181            max_capacity,
1182            entry_count: AtomicCell::default(),
1183            weighted_size: AtomicCell::default(),
1184            cache,
1185            build_hasher,
1186            deques: Mutex::default(),
1187            timer_wheel,
1188            frequency_sketch: RwLock::new(FrequencySketch::default()),
1189            frequency_sketch_enabled: AtomicBool::default(),
1190            read_op_ch,
1191            write_op_ch,
1192            write_op_ch_ready_event: event_listener::Event::default(),
1193            eviction_policy: eviction_policy.config,
1194            expiration_policy,
1195            valid_after: AtomicInstant::default(),
1196            weigher,
1197            removal_notifier,
1198            key_locks,
1199            invalidator,
1200            clock,
1201        }
1202    }
1203
1204    #[inline]
1205    fn hash<Q>(&self, key: &Q) -> u64
1206    where
1207        Q: Equivalent<K> + Hash + ?Sized,
1208    {
1209        let mut hasher = self.build_hasher.build_hasher();
1210        key.hash(&mut hasher);
1211        hasher.finish()
1212    }
1213
1214    #[inline]
1215    fn get_key_value_and<Q, F, T>(&self, key: &Q, hash: u64, with_entry: F) -> Option<T>
1216    where
1217        Q: Equivalent<K> + Hash + ?Sized,
1218        F: FnOnce(&Arc<K>, &MiniArc<ValueEntry<K, V>>) -> T,
1219    {
1220        self.cache
1221            .get_key_value_and(hash, |k| key.equivalent(k as &K), with_entry)
1222    }
1223
1224    #[inline]
1225    fn get_key_value_and_then<Q, F, T>(&self, key: &Q, hash: u64, with_entry: F) -> Option<T>
1226    where
1227        Q: Equivalent<K> + Hash + ?Sized,
1228        F: FnOnce(&Arc<K>, &MiniArc<ValueEntry<K, V>>) -> Option<T>,
1229    {
1230        self.cache
1231            .get_key_value_and_then(hash, |k| key.equivalent(k as &K), with_entry)
1232    }
1233
1234    #[inline]
1235    fn remove_entry<Q>(&self, key: &Q, hash: u64) -> Option<KvEntry<K, V>>
1236    where
1237        Q: Equivalent<K> + Hash + ?Sized,
1238    {
1239        self.cache
1240            .remove_entry(hash, |k| key.equivalent(k as &K))
1241            .map(|(key, entry)| KvEntry::new(key, entry))
1242    }
1243
1244    fn keys(&self, cht_segment: usize) -> Option<Vec<Arc<K>>> {
1245        // Do `Arc::clone` instead of `Arc::downgrade`. Updating existing entry
1246        // in the cht with a new value replaces the key in the cht even though the
1247        // old and new keys are equal. If we return `Weak<K>`, it will not be
1248        // upgraded later to `Arc<K> as the key may have been replaced with a new
1249        // key that equals to the old key.
1250        self.cache.keys(cht_segment, Arc::clone)
1251    }
1252
1253    #[inline]
1254    fn register_invalidation_predicate(
1255        &self,
1256        predicate: PredicateFun<K, V>,
1257        registered_at: Instant,
1258    ) -> Result<PredicateId, PredicateError> {
1259        if let Some(inv) = &self.invalidator {
1260            inv.register_predicate(predicate, registered_at)
1261        } else {
1262            Err(PredicateError::InvalidationClosuresDisabled)
1263        }
1264    }
1265
1266    /// Returns `true` if the entry is invalidated by `invalidate_entries_if` method.
1267    #[inline]
1268    fn is_invalidated_entry(&self, key: &Arc<K>, entry: &MiniArc<ValueEntry<K, V>>) -> bool
1269    where
1270        V: Clone,
1271    {
1272        if let Some(inv) = &self.invalidator {
1273            return inv.apply_predicates(key, entry);
1274        }
1275        false
1276    }
1277
1278    #[inline]
1279    fn weigh(&self, key: &K, value: &V) -> u32 {
1280        self.weigher.as_ref().map_or(1, |w| w(key, value))
1281    }
1282}
1283
1284impl<K, V, S> Inner<K, V, S>
1285where
1286    K: Hash + Eq + Send + Sync + 'static,
1287    V: Clone + Send + Sync + 'static,
1288    S: BuildHasher + Clone + Send + Sync + 'static,
1289{
1290    /// Runs the pending tasks. Returns `true` if there are more entries to evict.
1291    pub(crate) async fn do_run_pending_tasks(
1292        &self,
1293        timeout: Option<Duration>,
1294        max_log_sync_repeats: u32,
1295        eviction_batch_size: u32,
1296    ) -> bool {
1297        if self.max_capacity == Some(0) {
1298            return false;
1299        }
1300
1301        // Acquire some locks.
1302        let mut deqs = self.deques.lock().await;
1303        let mut timer_wheel = self.timer_wheel.lock().await;
1304
1305        let started_at = if timeout.is_some() {
1306            Some(self.current_time())
1307        } else {
1308            None
1309        };
1310        let mut should_process_logs = true;
1311        let mut calls = 0u32;
1312        let current_ec = self.entry_count.load();
1313        let current_ws = self.weighted_size.load();
1314        let mut eviction_state =
1315            EvictionState::new(current_ec, current_ws, self.removal_notifier.as_ref());
1316
1317        loop {
1318            if should_process_logs {
1319                let r_len = self.read_op_ch.len();
1320                if r_len > 0 {
1321                    self.apply_reads(&mut deqs, &mut timer_wheel, r_len).await;
1322                }
1323
1324                let w_len = self.write_op_ch.len();
1325                if w_len > 0 {
1326                    self.apply_writes(&mut deqs, &mut timer_wheel, w_len, &mut eviction_state)
1327                        .await;
1328                }
1329
1330                if self.eviction_policy == EvictionPolicyConfig::TinyLfu
1331                    && self.should_enable_frequency_sketch(&eviction_state.counters)
1332                {
1333                    self.enable_frequency_sketch(&eviction_state.counters).await;
1334                }
1335
1336                // If there are any async tasks waiting in `BaseCache::schedule_write_op`
1337                // method for the write op channel to have enough room, notify them.
1338                let listeners = self.write_op_ch_ready_event.total_listeners();
1339                if listeners > 0 {
1340                    let n = listeners.min(WRITE_LOG_CH_SIZE - self.write_op_ch.len());
1341                    // Notify the `n` listeners. The `notify` method accepts 0, so no
1342                    // need to check if `n` is greater than 0.
1343                    self.write_op_ch_ready_event.notify(n);
1344                }
1345
1346                calls += 1;
1347            }
1348
1349            // Set this flag to `false`. The `evict_*` and `invalidate_*` methods
1350            // below may set it to `true` if there are more entries to evict in next
1351            // loop.
1352            eviction_state.more_entries_to_evict = false;
1353            let last_eviction_count = eviction_state.counters.eviction_count;
1354
1355            // Evict entries if there are any expired entries in the hierarchical
1356            // timer wheels.
1357            if timer_wheel.is_enabled() {
1358                self.evict_expired_entries_using_timers(
1359                    &mut timer_wheel,
1360                    &mut deqs,
1361                    &mut eviction_state,
1362                )
1363                .await;
1364            }
1365
1366            // Evict entries if there are any expired entries in the write order or
1367            // access order deques.
1368            if self.has_expiry() || self.has_valid_after() {
1369                self.evict_expired_entries_using_deqs(
1370                    &mut deqs,
1371                    &mut timer_wheel,
1372                    eviction_batch_size,
1373                    &mut eviction_state,
1374                )
1375                .await;
1376            }
1377
1378            // Evict entries if there are any invalidation predicates set by the
1379            // `invalidate_entries_if` method.
1380            if let Some(invalidator) = &self.invalidator {
1381                if !invalidator.is_empty() {
1382                    self.invalidate_entries(
1383                        invalidator,
1384                        &mut deqs,
1385                        &mut timer_wheel,
1386                        eviction_batch_size,
1387                        &mut eviction_state,
1388                    )
1389                    .await;
1390                }
1391            }
1392
1393            // Evict if this cache has more entries than its capacity.
1394            let weights_to_evict = self.weights_to_evict(&eviction_state.counters);
1395            if weights_to_evict > 0 {
1396                self.evict_lru_entries(
1397                    &mut deqs,
1398                    &mut timer_wheel,
1399                    eviction_batch_size,
1400                    weights_to_evict,
1401                    &mut eviction_state,
1402                )
1403                .await;
1404            }
1405
1406            // Check whether to continue this loop or not.
1407
1408            should_process_logs = calls <= max_log_sync_repeats
1409                && (self.read_op_ch.len() >= READ_LOG_FLUSH_POINT
1410                    || self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT);
1411
1412            let should_evict_more_entries = eviction_state.more_entries_to_evict
1413                // Check if there were any entries evicted in this loop.
1414                && (eviction_state.counters.eviction_count - last_eviction_count) > 0;
1415
1416            // Break the loop if there will be nothing to do in next loop.
1417            if !should_process_logs && !should_evict_more_entries {
1418                break;
1419            }
1420
1421            // Break the loop if the eviction listener is set and timeout has been
1422            // reached.
1423            if let (Some(to), Some(started)) = (timeout, started_at) {
1424                let elapsed = self.current_time().saturating_duration_since(started);
1425                if elapsed >= to {
1426                    break;
1427                }
1428            }
1429        }
1430
1431        debug_assert_eq!(self.entry_count.load(), current_ec);
1432        debug_assert_eq!(self.weighted_size.load(), current_ws);
1433        self.entry_count.store(eviction_state.counters.entry_count);
1434        self.weighted_size
1435            .store(eviction_state.counters.weighted_size);
1436
1437        crossbeam_epoch::pin().flush();
1438
1439        // Ensure this lock is held until here.
1440        drop(deqs);
1441
1442        eviction_state.more_entries_to_evict
1443    }
1444}
1445
1446//
1447// private methods
1448//
1449impl<K, V, S> Inner<K, V, S>
1450where
1451    K: Hash + Eq + Send + Sync + 'static,
1452    V: Send + Sync + 'static,
1453    S: BuildHasher + Clone + Send + Sync + 'static,
1454{
1455    fn has_enough_capacity(&self, candidate_weight: u32, counters: &EvictionCounters) -> bool {
1456        self.max_capacity.map_or(true, |limit| {
1457            counters.weighted_size + candidate_weight as u64 <= limit
1458        })
1459    }
1460
1461    fn weights_to_evict(&self, counters: &EvictionCounters) -> u64 {
1462        self.max_capacity
1463            .map(|limit| counters.weighted_size.saturating_sub(limit))
1464            .unwrap_or_default()
1465    }
1466
1467    #[inline]
1468    fn should_enable_frequency_sketch(&self, counters: &EvictionCounters) -> bool {
1469        match self.max_capacity {
1470            None | Some(0) => false,
1471            Some(max_cap) => {
1472                if self.frequency_sketch_enabled.load(Ordering::Acquire) {
1473                    false // The frequency sketch is already enabled.
1474                } else {
1475                    counters.weighted_size >= max_cap / 2
1476                }
1477            }
1478        }
1479    }
1480
1481    #[inline]
1482    async fn enable_frequency_sketch(&self, counters: &EvictionCounters) {
1483        if let Some(max_cap) = self.max_capacity {
1484            let c = counters;
1485            let cap = if self.weigher.is_none() {
1486                max_cap
1487            } else {
1488                (c.entry_count as f64 * (c.weighted_size as f64 / max_cap as f64)) as u64
1489            };
1490            self.do_enable_frequency_sketch(cap).await;
1491        }
1492    }
1493
1494    #[cfg(test)]
1495    async fn enable_frequency_sketch_for_testing(&self) {
1496        if let Some(max_cap) = self.max_capacity {
1497            self.do_enable_frequency_sketch(max_cap).await;
1498        }
1499    }
1500
1501    #[inline]
1502    async fn do_enable_frequency_sketch(&self, cache_capacity: u64) {
1503        let skt_capacity = common::sketch_capacity(cache_capacity);
1504        self.frequency_sketch
1505            .write()
1506            .await
1507            .ensure_capacity(skt_capacity);
1508        self.frequency_sketch_enabled.store(true, Ordering::Release);
1509    }
1510
1511    async fn apply_reads(
1512        &self,
1513        deqs: &mut Deques<K>,
1514        timer_wheel: &mut TimerWheel<K>,
1515        count: usize,
1516    ) {
1517        use ReadOp::{Hit, Miss};
1518        let mut freq = self.frequency_sketch.write().await;
1519        let ch = &self.read_op_ch;
1520        for _ in 0..count {
1521            match ch.try_recv() {
1522                Ok(Hit {
1523                    value_entry,
1524                    is_expiry_modified,
1525                }) => {
1526                    let kh = value_entry.entry_info().key_hash();
1527                    freq.increment(kh.hash);
1528                    if is_expiry_modified {
1529                        self.update_timer_wheel(&value_entry, timer_wheel);
1530                    }
1531                    deqs.move_to_back_ao(&value_entry);
1532                }
1533                Ok(Miss(hash)) => freq.increment(hash),
1534                Err(_) => break,
1535            }
1536        }
1537    }
1538
1539    async fn apply_writes(
1540        &self,
1541        deqs: &mut Deques<K>,
1542        timer_wheel: &mut TimerWheel<K>,
1543        count: usize,
1544        eviction_state: &mut EvictionState<'_, K, V>,
1545    ) where
1546        V: Clone,
1547    {
1548        use WriteOp::{Remove, Upsert};
1549        let freq = self.frequency_sketch.read().await;
1550        let ch = &self.write_op_ch;
1551
1552        for _ in 0..count {
1553            match ch.try_recv() {
1554                Ok(Upsert {
1555                    key_hash: kh,
1556                    value_entry: entry,
1557                    entry_gen: gen,
1558                    old_weight,
1559                    new_weight,
1560                }) => {
1561                    self.handle_upsert(
1562                        kh,
1563                        entry,
1564                        gen,
1565                        old_weight,
1566                        new_weight,
1567                        deqs,
1568                        timer_wheel,
1569                        &freq,
1570                        eviction_state,
1571                    )
1572                    .await;
1573                }
1574                Ok(Remove {
1575                    kv_entry: KvEntry { key: _key, entry },
1576                    entry_gen: gen,
1577                }) => {
1578                    Self::handle_remove(
1579                        deqs,
1580                        timer_wheel,
1581                        entry,
1582                        Some(gen),
1583                        &mut eviction_state.counters,
1584                    );
1585                }
1586                Err(_) => break,
1587            };
1588        }
1589    }
1590
1591    #[allow(clippy::too_many_arguments)]
1592    async fn handle_upsert(
1593        &self,
1594        kh: KeyHash<K>,
1595        entry: MiniArc<ValueEntry<K, V>>,
1596        gen: u16,
1597        old_weight: u32,
1598        new_weight: u32,
1599        deqs: &mut Deques<K>,
1600        timer_wheel: &mut TimerWheel<K>,
1601        freq: &FrequencySketch,
1602        eviction_state: &mut EvictionState<'_, K, V>,
1603    ) where
1604        V: Clone,
1605    {
1606        {
1607            let counters = &mut eviction_state.counters;
1608
1609            if entry.is_admitted() {
1610                // The entry has been already admitted, so treat this as an update.
1611                counters.saturating_sub(0, old_weight);
1612                counters.saturating_add(0, new_weight);
1613                self.update_timer_wheel(&entry, timer_wheel);
1614                deqs.move_to_back_ao(&entry);
1615                deqs.move_to_back_wo(&entry);
1616                entry.entry_info().set_policy_gen(gen);
1617                return;
1618            }
1619
1620            if self.has_enough_capacity(new_weight, counters) {
1621                // There are enough room in the cache (or the cache is unbounded).
1622                // Add the candidate to the deques.
1623                self.handle_admit(&entry, new_weight, deqs, timer_wheel, counters);
1624                entry.entry_info().set_policy_gen(gen);
1625                return;
1626            }
1627        }
1628
1629        if let Some(max) = self.max_capacity {
1630            if new_weight as u64 > max {
1631                // The candidate is too big to fit in the cache. Reject it.
1632
1633                // Lock the key for removal if blocking removal notification is enabled.
1634                let kl = self.maybe_key_lock(&kh.key);
1635                let _klg = if let Some(lock) = &kl {
1636                    Some(lock.lock().await)
1637                } else {
1638                    None
1639                };
1640
1641                let removed = self.cache.remove_if(
1642                    kh.hash,
1643                    |k| k == &kh.key,
1644                    |_, current_entry| {
1645                        MiniArc::ptr_eq(entry.entry_info(), current_entry.entry_info())
1646                            && current_entry.entry_info().entry_gen() == gen
1647                    },
1648                );
1649                if let Some(entry) = removed {
1650                    if eviction_state.is_notifier_enabled() {
1651                        let key = Arc::clone(&kh.key);
1652                        eviction_state
1653                            .notify_entry_removal(key, &entry, RemovalCause::Size)
1654                            .await;
1655                    }
1656                    eviction_state.counters.incr_eviction_count();
1657                }
1658                entry.entry_info().set_policy_gen(gen);
1659                return;
1660            }
1661        }
1662
1663        // TODO: Refactoring the policy implementations.
1664        // https://github.com/moka-rs/moka/issues/389
1665
1666        // Try to admit the candidate.
1667        let admission_result = match &self.eviction_policy {
1668            EvictionPolicyConfig::TinyLfu => {
1669                let mut candidate = EntrySizeAndFrequency::new(new_weight);
1670                candidate.add_frequency(freq, kh.hash);
1671                Self::admit(&candidate, &self.cache, deqs, freq)
1672            }
1673            EvictionPolicyConfig::Lru => AdmissionResult::Admitted {
1674                victim_keys: SmallVec::default(),
1675            },
1676        };
1677
1678        match admission_result {
1679            AdmissionResult::Admitted { victim_keys } => {
1680                // Try to remove the victims from the hash map.
1681                for (vic_kh, vic_la) in victim_keys {
1682                    let vic_key = vic_kh.key;
1683                    let vic_hash = vic_kh.hash;
1684
1685                    // Lock the key for removal if blocking removal notification is enabled.
1686                    let kl = self.maybe_key_lock(&vic_key);
1687                    let _klg = if let Some(lock) = &kl {
1688                        Some(lock.lock().await)
1689                    } else {
1690                        None
1691                    };
1692
1693                    if let Some((vic_key, vic_entry)) = self.cache.remove_entry_if_and(
1694                        vic_hash,
1695                        |k| k == &vic_key,
1696                        |_, entry| entry.entry_info().last_accessed() == vic_la,
1697                        |k, v| (k.clone(), v.clone()),
1698                    ) {
1699                        if eviction_state.is_notifier_enabled() {
1700                            eviction_state
1701                                .notify_entry_removal(vic_key, &vic_entry, RemovalCause::Size)
1702                                .await;
1703                        }
1704                        eviction_state.counters.incr_eviction_count();
1705
1706                        // And then remove the victim from the deques.
1707                        Self::handle_remove(
1708                            deqs,
1709                            timer_wheel,
1710                            vic_entry,
1711                            None,
1712                            &mut eviction_state.counters,
1713                        );
1714                    } else {
1715                        // Could not remove the victim from the cache. Skip it as its
1716                        // ValueEntry might have been invalidated.
1717                        if let Some(node) = deqs.probation.peek_front() {
1718                            if node.element.key() == &vic_key && node.element.hash() == vic_hash {
1719                                deqs.probation.move_front_to_back();
1720                            }
1721                        }
1722                    }
1723                }
1724                // Add the candidate to the deques.
1725                self.handle_admit(
1726                    &entry,
1727                    new_weight,
1728                    deqs,
1729                    timer_wheel,
1730                    &mut eviction_state.counters,
1731                );
1732                entry.entry_info().set_policy_gen(gen);
1733            }
1734            AdmissionResult::Rejected => {
1735                // Lock the key for removal if blocking removal notification is enabled.
1736                let kl = self.maybe_key_lock(&kh.key);
1737                let _klg = if let Some(lock) = &kl {
1738                    Some(lock.lock().await)
1739                } else {
1740                    None
1741                };
1742
1743                // Remove the candidate from the cache (hash map) if the entry
1744                // generation matches.
1745                let key = Arc::clone(&kh.key);
1746                let removed = self.cache.remove_if(
1747                    kh.hash,
1748                    |k| k == &key,
1749                    |_, current_entry| {
1750                        MiniArc::ptr_eq(entry.entry_info(), current_entry.entry_info())
1751                            && current_entry.entry_info().entry_gen() == gen
1752                    },
1753                );
1754
1755                if let Some(entry) = removed {
1756                    entry.entry_info().set_policy_gen(gen);
1757                    if eviction_state.is_notifier_enabled() {
1758                        eviction_state
1759                            .notify_entry_removal(key, &entry, RemovalCause::Size)
1760                            .await;
1761                    }
1762                    eviction_state.counters.incr_eviction_count();
1763                }
1764            }
1765        }
1766    }
1767
1768    /// Performs size-aware admission explained in the paper:
1769    /// [Lightweight Robust Size Aware Cache Management][size-aware-cache-paper]
1770    /// by Gil Einziger, Ohad Eytan, Roy Friedman, Ben Manes.
1771    ///
1772    /// [size-aware-cache-paper]: https://arxiv.org/abs/2105.08770
1773    ///
1774    /// There are some modifications in this implementation:
1775    /// - To admit to the main space, candidate's frequency must be higher than
1776    ///   the aggregated frequencies of the potential victims. (In the paper,
1777    ///   `>=` operator is used rather than `>`)  The `>` operator will do a better
1778    ///   job to prevent the main space from polluting.
1779    /// - When a candidate is rejected, the potential victims will stay at the LRU
1780    ///   position of the probation access-order queue. (In the paper, they will be
1781    ///   promoted (to the MRU position?) to force the eviction policy to select a
1782    ///   different set of victims for the next candidate). We may implement the
1783    ///   paper's behavior later?
1784    ///
1785    #[inline]
1786    fn admit(
1787        candidate: &EntrySizeAndFrequency,
1788        cache: &CacheStore<K, V, S>,
1789        deqs: &mut Deques<K>,
1790        freq: &FrequencySketch,
1791    ) -> AdmissionResult<K> {
1792        const MAX_CONSECUTIVE_RETRIES: usize = 5;
1793        let mut retries = 0;
1794
1795        let mut victims = EntrySizeAndFrequency::default();
1796        let mut victim_keys = SmallVec::default();
1797
1798        let deq = &mut deqs.probation;
1799
1800        // Get first potential victim at the LRU position.
1801        let mut next_victim = deq.peek_front_ptr();
1802
1803        // Aggregate potential victims.
1804        while victims.policy_weight < candidate.policy_weight
1805            && victims.freq <= candidate.freq
1806            && retries <= MAX_CONSECUTIVE_RETRIES
1807        {
1808            let Some(victim) = next_victim.take() else {
1809                // No more potential victims.
1810                break;
1811            };
1812            next_victim = DeqNode::next_node_ptr(victim);
1813
1814            let vic_elem = &unsafe { victim.as_ref() }.element;
1815            if vic_elem.is_dirty() {
1816                // Skip this node as its ValueEntry have been updated or invalidated.
1817                unsafe { deq.move_to_back(victim) };
1818                retries += 1;
1819                continue;
1820            }
1821
1822            let key = vic_elem.key();
1823            let hash = vic_elem.hash();
1824            let last_accessed = vic_elem.entry_info().last_accessed();
1825
1826            if let Some(vic_entry) = cache.get(hash, |k| k == key) {
1827                victims.add_policy_weight(vic_entry.policy_weight());
1828                victims.add_frequency(freq, hash);
1829                victim_keys.push((KeyHash::new(Arc::clone(key), hash), last_accessed));
1830                retries = 0;
1831            } else {
1832                // Could not get the victim from the cache (hash map). Skip this node
1833                // as its ValueEntry might have been invalidated (after we checked
1834                // `is_dirty` above`).
1835                unsafe { deq.move_to_back(victim) };
1836                retries += 1;
1837            }
1838        }
1839
1840        // Admit or reject the candidate.
1841
1842        // TODO: Implement some randomness to mitigate hash DoS attack.
1843        // See Caffeine's implementation.
1844
1845        if victims.policy_weight >= candidate.policy_weight && candidate.freq > victims.freq {
1846            AdmissionResult::Admitted { victim_keys }
1847        } else {
1848            AdmissionResult::Rejected
1849        }
1850    }
1851
1852    fn handle_admit(
1853        &self,
1854        entry: &MiniArc<ValueEntry<K, V>>,
1855        policy_weight: u32,
1856        deqs: &mut Deques<K>,
1857        timer_wheel: &mut TimerWheel<K>,
1858        counters: &mut EvictionCounters,
1859    ) {
1860        counters.saturating_add(1, policy_weight);
1861
1862        self.update_timer_wheel(entry, timer_wheel);
1863
1864        // Update the deques.
1865        deqs.push_back_ao(
1866            CacheRegion::MainProbation,
1867            KeyHashDate::new(entry.entry_info()),
1868            entry,
1869        );
1870        if self.is_write_order_queue_enabled() {
1871            deqs.push_back_wo(KeyHashDate::new(entry.entry_info()), entry);
1872        }
1873        entry.set_admitted(true);
1874    }
1875
1876    /// NOTE: This method may enable the timer wheel.
1877    fn update_timer_wheel(
1878        &self,
1879        entry: &MiniArc<ValueEntry<K, V>>,
1880        timer_wheel: &mut TimerWheel<K>,
1881    ) {
1882        // Enable the timer wheel if needed.
1883        if entry.entry_info().expiration_time().is_some() && !timer_wheel.is_enabled() {
1884            timer_wheel.enable();
1885        }
1886
1887        // Update the timer wheel.
1888        match (
1889            entry.entry_info().expiration_time().is_some(),
1890            entry.timer_node(),
1891        ) {
1892            // Do nothing; the cache entry has no expiration time and not registered
1893            // to the timer wheel.
1894            (false, None) => (),
1895            // Register the cache entry to the timer wheel; the cache entry has an
1896            // expiration time and not registered to the timer wheel.
1897            (true, None) => {
1898                let timer = timer_wheel.schedule(
1899                    MiniArc::clone(entry.entry_info()),
1900                    MiniArc::clone(entry.deq_nodes()),
1901                );
1902                entry.set_timer_node(timer);
1903            }
1904            // Reschedule the cache entry in the timer wheel; the cache entry has an
1905            // expiration time and already registered to the timer wheel.
1906            (true, Some(tn)) => {
1907                let result = timer_wheel.reschedule(tn);
1908                if let ReschedulingResult::Removed(removed_tn) = result {
1909                    // The timer node was removed from the timer wheel because the
1910                    // expiration time has been unset by other thread after we
1911                    // checked.
1912                    entry.set_timer_node(None);
1913                    drop(removed_tn);
1914                }
1915            }
1916            // Unregister the cache entry from the timer wheel; the cache entry has
1917            // no expiration time but registered to the timer wheel.
1918            (false, Some(tn)) => {
1919                entry.set_timer_node(None);
1920                timer_wheel.deschedule(tn);
1921            }
1922        }
1923    }
1924
1925    fn handle_remove(
1926        deqs: &mut Deques<K>,
1927        timer_wheel: &mut TimerWheel<K>,
1928        entry: MiniArc<ValueEntry<K, V>>,
1929        gen: Option<u16>,
1930        counters: &mut EvictionCounters,
1931    ) {
1932        if let Some(timer_node) = entry.take_timer_node() {
1933            timer_wheel.deschedule(timer_node);
1934        }
1935        Self::handle_remove_without_timer_wheel(deqs, entry, gen, counters);
1936    }
1937
1938    fn handle_remove_without_timer_wheel(
1939        deqs: &mut Deques<K>,
1940        entry: MiniArc<ValueEntry<K, V>>,
1941        gen: Option<u16>,
1942        counters: &mut EvictionCounters,
1943    ) {
1944        if entry.is_admitted() {
1945            entry.set_admitted(false);
1946            counters.saturating_sub(1, entry.policy_weight());
1947            // The following two unlink_* functions will unset the deq nodes.
1948            deqs.unlink_ao(&entry);
1949            Deques::unlink_wo(&mut deqs.write_order, &entry);
1950        } else {
1951            entry.unset_q_nodes();
1952        }
1953        if let Some(g) = gen {
1954            entry.entry_info().set_policy_gen(g);
1955        }
1956    }
1957
1958    fn handle_remove_with_deques(
1959        ao_deq_name: &str,
1960        ao_deq: &mut Deque<KeyHashDate<K>>,
1961        wo_deq: &mut Deque<KeyHashDate<K>>,
1962        timer_wheel: &mut TimerWheel<K>,
1963        entry: MiniArc<ValueEntry<K, V>>,
1964        counters: &mut EvictionCounters,
1965    ) {
1966        if let Some(timer) = entry.take_timer_node() {
1967            timer_wheel.deschedule(timer);
1968        }
1969        if entry.is_admitted() {
1970            entry.set_admitted(false);
1971            counters.saturating_sub(1, entry.policy_weight());
1972            // The following two unlink_* functions will unset the deq nodes.
1973            Deques::unlink_ao_from_deque(ao_deq_name, ao_deq, &entry);
1974            Deques::unlink_wo(wo_deq, &entry);
1975        } else {
1976            entry.unset_q_nodes();
1977        }
1978    }
1979
1980    async fn evict_expired_entries_using_timers(
1981        &self,
1982        timer_wheel: &mut TimerWheel<K>,
1983        deqs: &mut Deques<K>,
1984        eviction_state: &mut EvictionState<'_, K, V>,
1985    ) where
1986        V: Clone,
1987    {
1988        use crate::common::timer_wheel::TimerEvent;
1989
1990        let now = self.current_time();
1991
1992        // NOTE: When necessary, the iterator returned from advance() will unset the
1993        // timer node pointer in the `ValueEntry`, so we do not have to do it here.
1994        let expired_keys = timer_wheel
1995            .advance(now)
1996            .filter_map(|event| {
1997                // We do not have to do anything if event is `TimerEvent::Descheduled(_)`
1998                // or `TimerEvent::Rescheduled(_)`.
1999                if let TimerEvent::Expired(node) = event {
2000                    let entry_info = node.element.entry_info();
2001                    let kh = entry_info.key_hash();
2002                    Some((Arc::clone(&kh.key), kh.hash, entry_info.is_dirty()))
2003                } else {
2004                    None
2005                }
2006            })
2007            .collect::<Vec<_>>();
2008
2009        // Process each expired key.
2010        //
2011        // If it is dirty or `cache.remove_if` returns `None`, we will skip it as it
2012        // may have been read, updated or invalidated by other thread.
2013        //
2014        // - The timer node should have been unset in the current `ValueEntry` as
2015        //   described above.
2016        // - When necessary, a new timer node will be recreated for the current or
2017        //   new `ValueEntry` when its `WriteOp` or `ReadOp` is processed.
2018        for (key, hash, is_dirty) in expired_keys {
2019            if is_dirty {
2020                // Skip this entry as it has been updated or invalidated by other
2021                // thread.
2022                continue;
2023            }
2024
2025            // Lock the key for removal if blocking removal notification is enabled.
2026            let kl = self.maybe_key_lock(&key);
2027            let _klg = if let Some(lock) = &kl {
2028                Some(lock.lock().await)
2029            } else {
2030                None
2031            };
2032
2033            // Remove the key from the map only when the entry is really expired.
2034            let maybe_entry = self.cache.remove_if(
2035                hash,
2036                |k| k == &key,
2037                |_, v| is_expired_by_per_entry_ttl(v.entry_info(), now),
2038            );
2039
2040            if let Some(entry) = maybe_entry {
2041                if eviction_state.is_notifier_enabled() {
2042                    eviction_state
2043                        .notify_entry_removal(key, &entry, RemovalCause::Expired)
2044                        .await;
2045                }
2046                eviction_state.counters.incr_eviction_count();
2047                Self::handle_remove_without_timer_wheel(
2048                    deqs,
2049                    entry,
2050                    None,
2051                    &mut eviction_state.counters,
2052                );
2053            } else {
2054                // Skip this entry as the key may have been read, updated or
2055                // invalidated by other thread.
2056            }
2057        }
2058    }
2059
2060    async fn evict_expired_entries_using_deqs(
2061        &self,
2062        deqs: &mut MutexGuard<'_, Deques<K>>,
2063        timer_wheel: &mut TimerWheel<K>,
2064        batch_size: u32,
2065        state: &mut EvictionState<'_, K, V>,
2066    ) where
2067        V: Clone,
2068    {
2069        use CacheRegion::{MainProbation as Probation, MainProtected as Protected, Window};
2070
2071        let now = self.current_time();
2072
2073        if self.is_write_order_queue_enabled() {
2074            self.remove_expired_wo(deqs, timer_wheel, batch_size, now, state)
2075                .await;
2076        }
2077
2078        if self.expiration_policy.time_to_idle().is_some() || self.has_valid_after() {
2079            self.remove_expired_ao(Window, deqs, timer_wheel, batch_size, now, state)
2080                .await;
2081            self.remove_expired_ao(Probation, deqs, timer_wheel, batch_size, now, state)
2082                .await;
2083            self.remove_expired_ao(Protected, deqs, timer_wheel, batch_size, now, state)
2084                .await;
2085        }
2086    }
2087
2088    #[allow(clippy::too_many_arguments)]
2089    #[inline]
2090    async fn remove_expired_ao(
2091        &self,
2092        cache_region: CacheRegion,
2093        deqs: &mut Deques<K>,
2094        timer_wheel: &mut TimerWheel<K>,
2095        batch_size: u32,
2096        now: Instant,
2097        eviction_state: &mut EvictionState<'_, K, V>,
2098    ) where
2099        V: Clone,
2100    {
2101        let tti = &self.expiration_policy.time_to_idle();
2102        let va = &self.valid_after();
2103        let deq_name = cache_region.name();
2104        let mut more_to_evict = true;
2105
2106        for _ in 0..batch_size {
2107            let maybe_key_hash_ts = deqs.select_mut(cache_region).0.peek_front().map(|node| {
2108                let elem = &node.element;
2109                (
2110                    Arc::clone(elem.key()),
2111                    elem.hash(),
2112                    elem.is_dirty(),
2113                    elem.last_accessed(),
2114                )
2115            });
2116
2117            let (key, hash, cause) = match maybe_key_hash_ts {
2118                Some((key, hash, false, Some(ts))) => {
2119                    let cause = match is_entry_expired_ao_or_invalid(tti, va, ts, now) {
2120                        (true, _) => RemovalCause::Expired,
2121                        (false, true) => RemovalCause::Explicit,
2122                        (false, false) => {
2123                            more_to_evict = false;
2124                            break;
2125                        }
2126                    };
2127                    (key, hash, cause)
2128                }
2129                // TODO: Remove the second pattern `Some((_key, false, None))` once
2130                // we change `last_modified` and `last_accessed` in `EntryInfo` from
2131                // `Option<Instant>` to `Instant`.
2132                Some((key, hash, true, _) | (key, hash, false, None)) => {
2133                    // `is_dirty` is true or `last_modified` is None. Skip this entry
2134                    // as it may have been updated by this or other async task but
2135                    // its `WriteOp` is not processed yet.
2136                    let (ao_deq, wo_deq) = deqs.select_mut(cache_region);
2137                    self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
2138                    // Set `more_to_evict` to `false` to make `run_pending_tasks` to
2139                    // return early. This will help that `schedule_write_op` to send
2140                    // the `WriteOp` to the write op channel.
2141                    more_to_evict = false;
2142                    continue;
2143                }
2144                None => {
2145                    more_to_evict = false;
2146                    break;
2147                }
2148            };
2149
2150            // Lock the key for removal if blocking removal notification is enabled.
2151            let kl = self.maybe_key_lock(&key);
2152            let _klg = if let Some(lock) = &kl {
2153                Some(lock.lock().await)
2154            } else {
2155                None
2156            };
2157
2158            // Remove the key from the map only when the entry is really
2159            // expired. This check is needed because it is possible that the entry in
2160            // the map has been updated or deleted but its deque node we checked
2161            // above has not been updated yet.
2162            let maybe_entry = self.cache.remove_if(
2163                hash,
2164                |k| k == &key,
2165                |_, v| is_expired_entry_ao(tti, va, v, now),
2166            );
2167
2168            if let Some(entry) = maybe_entry {
2169                if eviction_state.is_notifier_enabled() {
2170                    eviction_state
2171                        .notify_entry_removal(key, &entry, cause)
2172                        .await;
2173                }
2174                eviction_state.counters.incr_eviction_count();
2175                let (ao_deq, wo_deq) = deqs.select_mut(cache_region);
2176                Self::handle_remove_with_deques(
2177                    deq_name,
2178                    ao_deq,
2179                    wo_deq,
2180                    timer_wheel,
2181                    entry,
2182                    &mut eviction_state.counters,
2183                );
2184            } else {
2185                let (ao_deq, wo_deq) = deqs.select_mut(cache_region);
2186                self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
2187                more_to_evict = false;
2188            }
2189        }
2190
2191        if more_to_evict {
2192            eviction_state.more_entries_to_evict = true;
2193        }
2194    }
2195
2196    #[inline]
2197    fn skip_updated_entry_ao(
2198        &self,
2199        key: &K,
2200        hash: u64,
2201        deq_name: &str,
2202        deq: &mut Deque<KeyHashDate<K>>,
2203        write_order_deq: &mut Deque<KeyHashDate<K>>,
2204    ) {
2205        if let Some(entry) = self.cache.get(hash, |k| (k.borrow() as &K) == key) {
2206            // The key exists and the entry may have been read or updated by other
2207            // thread.
2208            Deques::move_to_back_ao_in_deque(deq_name, deq, &entry);
2209            if entry.is_dirty() {
2210                Deques::move_to_back_wo_in_deque(write_order_deq, &entry);
2211            }
2212        } else {
2213            // Skip this entry as the key may have been invalidated by other thread.
2214            // Since the invalidated ValueEntry (which should be still in the write
2215            // op queue) has a pointer to this node, move the node to the back of the
2216            // deque instead of popping (dropping) it.
2217            deq.move_front_to_back();
2218        }
2219    }
2220
2221    #[inline]
2222    fn skip_updated_entry_wo(&self, key: &K, hash: u64, deqs: &mut Deques<K>) {
2223        if let Some(entry) = self.cache.get(hash, |k| (k.borrow() as &K) == key) {
2224            // The key exists and the entry may have been read or updated by other
2225            // thread.
2226            deqs.move_to_back_ao(&entry);
2227            deqs.move_to_back_wo(&entry);
2228        } else {
2229            // Skip this entry as the key may have been invalidated by other thread.
2230            // Since the invalidated `ValueEntry` (which should be still in the write
2231            // op queue) has a pointer to this node, move the node to the back of the
2232            // deque instead of popping (dropping) it.
2233            deqs.write_order.move_front_to_back();
2234        }
2235    }
2236
2237    #[inline]
2238    async fn remove_expired_wo(
2239        &self,
2240        deqs: &mut Deques<K>,
2241        timer_wheel: &mut TimerWheel<K>,
2242        batch_size: u32,
2243        now: Instant,
2244        eviction_state: &mut EvictionState<'_, K, V>,
2245    ) where
2246        V: Clone,
2247    {
2248        let ttl = &self.expiration_policy.time_to_live();
2249        let va = &self.valid_after();
2250        let mut more_to_evict = true;
2251
2252        for _ in 0..batch_size {
2253            let maybe_key_hash_ts = deqs.write_order.peek_front().map(|node| {
2254                let elem = &node.element;
2255                (
2256                    Arc::clone(elem.key()),
2257                    elem.hash(),
2258                    elem.is_dirty(),
2259                    elem.last_modified(),
2260                )
2261            });
2262
2263            let (key, hash, cause) = match maybe_key_hash_ts {
2264                Some((key, hash, false, Some(ts))) => {
2265                    let cause = match is_entry_expired_wo_or_invalid(ttl, va, ts, now) {
2266                        (true, _) => RemovalCause::Expired,
2267                        (false, true) => RemovalCause::Explicit,
2268                        (false, false) => {
2269                            more_to_evict = false;
2270                            break;
2271                        }
2272                    };
2273                    (key, hash, cause)
2274                }
2275                // TODO: Remove the second pattern `Some((_key, false, None))` once
2276                // we change `last_modified` and `last_accessed` in `EntryInfo` from
2277                // `Option<Instant>` to `Instant`.
2278                Some((key, hash, true, _) | (key, hash, false, None)) => {
2279                    // `is_dirty` is true or `last_modified` is None. Skip this entry
2280                    // as it may have been updated by this or other async task but
2281                    // its `WriteOp` is not processed yet.
2282                    self.skip_updated_entry_wo(&key, hash, deqs);
2283                    // Set `more_to_evict` to `false` to make `run_pending_tasks` to
2284                    // return early. This will help that `schedule_write_op` to send
2285                    // the `WriteOp` to the write op channel.
2286                    more_to_evict = false;
2287                    continue;
2288                }
2289                None => {
2290                    more_to_evict = false;
2291                    break;
2292                }
2293            };
2294
2295            // Lock the key for removal if blocking removal notification is enabled.
2296            let kl = self.maybe_key_lock(&key);
2297            let _klg = if let Some(lock) = &kl {
2298                Some(lock.lock().await)
2299            } else {
2300                None
2301            };
2302
2303            let maybe_entry = self.cache.remove_if(
2304                hash,
2305                |k| k == &key,
2306                |_, v| is_expired_entry_wo(ttl, va, v, now),
2307            );
2308
2309            if let Some(entry) = maybe_entry {
2310                if eviction_state.is_notifier_enabled() {
2311                    eviction_state
2312                        .notify_entry_removal(key, &entry, cause)
2313                        .await;
2314                }
2315                eviction_state.counters.incr_eviction_count();
2316                Self::handle_remove(deqs, timer_wheel, entry, None, &mut eviction_state.counters);
2317            } else {
2318                self.skip_updated_entry_wo(&key, hash, deqs);
2319                more_to_evict = false;
2320            }
2321        }
2322
2323        if more_to_evict {
2324            eviction_state.more_entries_to_evict = true;
2325        }
2326    }
2327
2328    async fn invalidate_entries(
2329        &self,
2330        invalidator: &Invalidator<K, V, S>,
2331        deqs: &mut Deques<K>,
2332        timer_wheel: &mut TimerWheel<K>,
2333        batch_size: u32,
2334        eviction_state: &mut EvictionState<'_, K, V>,
2335    ) where
2336        V: Clone,
2337    {
2338        let now = self.current_time();
2339
2340        // If the write order queue is empty, we are done and can remove the predicates
2341        // that have been registered by now.
2342        if deqs.write_order.len() == 0 {
2343            invalidator.remove_predicates_registered_before(now);
2344            return;
2345        }
2346
2347        let mut candidates = Vec::default();
2348        let mut len = 0;
2349        let has_next;
2350        {
2351            let iter = &mut deqs.write_order.peekable();
2352
2353            while len < batch_size {
2354                if let Some(kd) = iter.next() {
2355                    if !kd.is_dirty() {
2356                        if let Some(ts) = kd.last_modified() {
2357                            let key = kd.key();
2358                            let hash = self.hash(&**key);
2359                            candidates.push(KeyDateLite::new(key, hash, ts));
2360                            len += 1;
2361                        }
2362                    }
2363                } else {
2364                    break;
2365                }
2366            }
2367
2368            has_next = iter.peek().is_some();
2369        }
2370
2371        if len == 0 {
2372            return;
2373        }
2374
2375        let is_truncated = len == batch_size && has_next;
2376        let (invalidated, is_done) = invalidator
2377            .scan_and_invalidate(self, candidates, is_truncated)
2378            .await;
2379
2380        for KvEntry { key: _key, entry } in invalidated {
2381            Self::handle_remove(deqs, timer_wheel, entry, None, &mut eviction_state.counters);
2382        }
2383        if is_done {
2384            deqs.write_order.reset_cursor();
2385        }
2386        if !invalidator.is_empty() {
2387            eviction_state.more_entries_to_evict = true;
2388        }
2389    }
2390
2391    async fn evict_lru_entries(
2392        &self,
2393        deqs: &mut Deques<K>,
2394        timer_wheel: &mut TimerWheel<K>,
2395        batch_size: u32,
2396        weights_to_evict: u64,
2397        eviction_state: &mut EvictionState<'_, K, V>,
2398    ) where
2399        V: Clone,
2400    {
2401        const CACHE_REGION: CacheRegion = CacheRegion::MainProbation;
2402        let deq_name = CACHE_REGION.name();
2403        let mut evicted = 0u64;
2404        let mut more_to_evict = true;
2405
2406        for _ in 0..batch_size {
2407            if evicted >= weights_to_evict {
2408                more_to_evict = false;
2409                break;
2410            }
2411
2412            let maybe_key_hash_ts = deqs.select_mut(CACHE_REGION).0.peek_front().map(|node| {
2413                let entry_info = node.element.entry_info();
2414                (
2415                    Arc::clone(node.element.key()),
2416                    node.element.hash(),
2417                    entry_info.is_dirty(),
2418                    entry_info.last_accessed(),
2419                )
2420            });
2421
2422            let (key, hash, ts) = match maybe_key_hash_ts {
2423                Some((key, hash, false, Some(ts))) => (key, hash, ts),
2424                // TODO: Remove the second pattern `Some((_key, false, None))` once
2425                // we change `last_modified` and `last_accessed` in `EntryInfo` from
2426                // `Option<Instant>` to `Instant`.
2427                Some((key, hash, true, _) | (key, hash, false, None)) => {
2428                    // `is_dirty` is true or `last_modified` is None. Skip this entry
2429                    // as it may have been updated by this or other async task but
2430                    // its `WriteOp` is not processed yet.
2431                    let (ao_deq, wo_deq) = deqs.select_mut(CACHE_REGION);
2432                    self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
2433                    // Set `more_to_evict` to `false` to make `run_pending_tasks` to
2434                    // return early. This will help that `schedule_write_op` to send
2435                    // the `WriteOp` to the write op channel.
2436                    more_to_evict = false;
2437                    continue;
2438                }
2439                None => {
2440                    more_to_evict = false;
2441                    break;
2442                }
2443            };
2444
2445            // Lock the key for removal if blocking removal notification is enabled.
2446            let kl = self.maybe_key_lock(&key);
2447            let _klg = if let Some(lock) = &kl {
2448                Some(lock.lock().await)
2449            } else {
2450                None
2451            };
2452
2453            let maybe_entry = self.cache.remove_if(
2454                hash,
2455                |k| k == &key,
2456                |_, v| {
2457                    if let Some(la) = v.last_accessed() {
2458                        la == ts
2459                    } else {
2460                        false
2461                    }
2462                },
2463            );
2464
2465            if let Some(entry) = maybe_entry {
2466                if eviction_state.is_notifier_enabled() {
2467                    eviction_state
2468                        .notify_entry_removal(key, &entry, RemovalCause::Size)
2469                        .await;
2470                }
2471                eviction_state.counters.incr_eviction_count();
2472                let weight = entry.policy_weight();
2473                let (deq, write_order_deq) = deqs.select_mut(CacheRegion::MainProbation);
2474                Self::handle_remove_with_deques(
2475                    deq_name,
2476                    deq,
2477                    write_order_deq,
2478                    timer_wheel,
2479                    entry,
2480                    &mut eviction_state.counters,
2481                );
2482                evicted = evicted.saturating_add(weight as u64);
2483            } else {
2484                let (ao_deq, wo_deq) = deqs.select_mut(CacheRegion::MainProbation);
2485                self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
2486                more_to_evict = false;
2487            }
2488        }
2489
2490        if more_to_evict {
2491            eviction_state.more_entries_to_evict = true;
2492        }
2493    }
2494}
2495
2496impl<K, V, S> Inner<K, V, S>
2497where
2498    K: Send + Sync + 'static,
2499    V: Clone + Send + Sync + 'static,
2500{
2501    pub(crate) async fn notify_single_removal(
2502        &self,
2503        key: Arc<K>,
2504        entry: &MiniArc<ValueEntry<K, V>>,
2505        cause: RemovalCause,
2506    ) {
2507        if let Some(notifier) = &self.removal_notifier {
2508            notifier.notify(key, entry.value.clone(), cause).await;
2509        }
2510    }
2511
2512    #[inline]
2513    fn notify_upsert(
2514        &self,
2515        key: Arc<K>,
2516        entry: &MiniArc<ValueEntry<K, V>>,
2517        last_accessed: Option<Instant>,
2518        last_modified: Option<Instant>,
2519    ) -> BoxFuture<'static, ()> {
2520        use futures_util::future::FutureExt;
2521
2522        let now = self.current_time();
2523        let exp = &self.expiration_policy;
2524
2525        let mut cause = RemovalCause::Replaced;
2526
2527        if let Some(last_accessed) = last_accessed {
2528            if is_expired_by_tti(&exp.time_to_idle(), last_accessed, now) {
2529                cause = RemovalCause::Expired;
2530            }
2531        }
2532
2533        if let Some(last_modified) = last_modified {
2534            if is_expired_by_ttl(&exp.time_to_live(), last_modified, now) {
2535                cause = RemovalCause::Expired;
2536            } else if is_invalid_entry(&self.valid_after(), last_modified) {
2537                cause = RemovalCause::Explicit;
2538            }
2539        }
2540
2541        if let Some(notifier) = &self.removal_notifier {
2542            let notifier = Arc::clone(notifier);
2543            let value = entry.value.clone();
2544            async move {
2545                notifier.notify(key, value, cause).await;
2546            }
2547            .boxed()
2548        } else {
2549            std::future::ready(()).boxed()
2550        }
2551    }
2552
2553    #[inline]
2554    fn notify_invalidate(
2555        &self,
2556        key: &Arc<K>,
2557        entry: &MiniArc<ValueEntry<K, V>>,
2558    ) -> BoxFuture<'static, ()> {
2559        use futures_util::future::FutureExt;
2560
2561        let now = self.current_time();
2562        let exp = &self.expiration_policy;
2563
2564        let mut cause = RemovalCause::Explicit;
2565
2566        if let Some(last_accessed) = entry.last_accessed() {
2567            if is_expired_by_tti(&exp.time_to_idle(), last_accessed, now) {
2568                cause = RemovalCause::Expired;
2569            }
2570        }
2571
2572        if let Some(last_modified) = entry.last_modified() {
2573            if is_expired_by_ttl(&exp.time_to_live(), last_modified, now) {
2574                cause = RemovalCause::Expired;
2575            }
2576        }
2577
2578        if let Some(notifier) = &self.removal_notifier {
2579            let notifier = Arc::clone(notifier);
2580            let key = Arc::clone(key);
2581            let value = entry.value.clone();
2582            async move { notifier.notify(key, value, cause).await }.boxed()
2583        } else {
2584            std::future::ready(()).boxed()
2585        }
2586    }
2587}
2588
2589//
2590// for testing
2591//
2592#[cfg(test)]
2593impl<K, V, S> Inner<K, V, S>
2594where
2595    K: Hash + Eq,
2596    S: BuildHasher + Clone,
2597{
2598    fn invalidation_predicate_count(&self) -> usize {
2599        if let Some(inv) = &self.invalidator {
2600            inv.predicate_count()
2601        } else {
2602            0
2603        }
2604    }
2605
2606    fn key_locks_map_is_empty(&self) -> bool {
2607        self.key_locks
2608            .as_ref()
2609            .map(|m| m.is_empty())
2610            // If key_locks is None, consider it is empty.
2611            .unwrap_or(true)
2612    }
2613}
2614
2615//
2616// private free-standing functions
2617//
2618
2619/// Returns `true` if this entry is expired by its per-entry TTL.
2620#[inline]
2621fn is_expired_by_per_entry_ttl<K>(entry_info: &MiniArc<EntryInfo<K>>, now: Instant) -> bool {
2622    if let Some(ts) = entry_info.expiration_time() {
2623        ts <= now
2624    } else {
2625        false
2626    }
2627}
2628
2629/// Returns `true` when one of the followings conditions is met:
2630///
2631/// - This entry is expired by the time-to-idle config of this cache instance.
2632/// - Or, it is invalidated by the `invalidate_all` method.
2633#[inline]
2634fn is_expired_entry_ao(
2635    time_to_idle: &Option<Duration>,
2636    valid_after: &Option<Instant>,
2637    entry: &impl AccessTime,
2638    now: Instant,
2639) -> bool {
2640    if let Some(ts) = entry.last_accessed() {
2641        is_invalid_entry(valid_after, ts) || is_expired_by_tti(time_to_idle, ts, now)
2642    } else {
2643        false
2644    }
2645}
2646
2647/// Returns `true` when one of the following conditions is met:
2648///
2649/// - This entry is expired by the time-to-live (TTL) config of this cache instance.
2650/// - Or, it is invalidated by the `invalidate_all` method.
2651#[inline]
2652fn is_expired_entry_wo(
2653    time_to_live: &Option<Duration>,
2654    valid_after: &Option<Instant>,
2655    entry: &impl AccessTime,
2656    now: Instant,
2657) -> bool {
2658    if let Some(ts) = entry.last_modified() {
2659        is_invalid_entry(valid_after, ts) || is_expired_by_ttl(time_to_live, ts, now)
2660    } else {
2661        false
2662    }
2663}
2664
2665#[inline]
2666fn is_entry_expired_ao_or_invalid(
2667    time_to_idle: &Option<Duration>,
2668    valid_after: &Option<Instant>,
2669    entry_last_accessed: Instant,
2670    now: Instant,
2671) -> (bool, bool) {
2672    let ts = entry_last_accessed;
2673    let expired = is_expired_by_tti(time_to_idle, ts, now);
2674    let invalid = is_invalid_entry(valid_after, ts);
2675    (expired, invalid)
2676}
2677
2678#[inline]
2679fn is_entry_expired_wo_or_invalid(
2680    time_to_live: &Option<Duration>,
2681    valid_after: &Option<Instant>,
2682    entry_last_modified: Instant,
2683    now: Instant,
2684) -> (bool, bool) {
2685    let ts = entry_last_modified;
2686    let expired = is_expired_by_ttl(time_to_live, ts, now);
2687    let invalid = is_invalid_entry(valid_after, ts);
2688    (expired, invalid)
2689}
2690
2691#[inline]
2692fn is_invalid_entry(valid_after: &Option<Instant>, entry_ts: Instant) -> bool {
2693    if let Some(va) = valid_after {
2694        entry_ts < *va
2695    } else {
2696        false
2697    }
2698}
2699
2700#[inline]
2701fn is_expired_by_tti(
2702    time_to_idle: &Option<Duration>,
2703    entry_last_accessed: Instant,
2704    now: Instant,
2705) -> bool {
2706    if let Some(tti) = time_to_idle {
2707        let expiration = entry_last_accessed.saturating_add(*tti);
2708        expiration <= now
2709    } else {
2710        false
2711    }
2712}
2713
2714#[inline]
2715fn is_expired_by_ttl(
2716    time_to_live: &Option<Duration>,
2717    entry_last_modified: Instant,
2718    now: Instant,
2719) -> bool {
2720    if let Some(ttl) = time_to_live {
2721        let expiration = entry_last_modified.saturating_add(*ttl);
2722        expiration <= now
2723    } else {
2724        false
2725    }
2726}
2727
2728#[cfg(test)]
2729mod tests {
2730    use crate::{
2731        common::{time::Clock, HousekeeperConfig},
2732        policy::{EvictionPolicy, ExpirationPolicy},
2733    };
2734
2735    use super::BaseCache;
2736
2737    #[cfg_attr(target_pointer_width = "16", ignore)]
2738    #[tokio::test]
2739    async fn test_skt_capacity_will_not_overflow() {
2740        use std::collections::hash_map::RandomState;
2741
2742        // power of two
2743        let pot = |exp| 2u64.pow(exp);
2744
2745        async fn ensure_sketch_len(max_capacity: u64, len: u64, name: &str) {
2746            let cache = BaseCache::<u8, u8>::new(
2747                None,
2748                Some(max_capacity),
2749                None,
2750                RandomState::default(),
2751                None,
2752                EvictionPolicy::default(),
2753                None,
2754                ExpirationPolicy::default(),
2755                HousekeeperConfig::default(),
2756                false,
2757                Clock::default(),
2758            );
2759            cache.inner.enable_frequency_sketch_for_testing().await;
2760            assert_eq!(
2761                cache.inner.frequency_sketch.read().await.table_len(),
2762                len as usize,
2763                "{name}"
2764            );
2765        }
2766
2767        if cfg!(target_pointer_width = "32") {
2768            let pot24 = pot(24);
2769            let pot16 = pot(16);
2770            ensure_sketch_len(0, 128, "0").await;
2771            ensure_sketch_len(128, 128, "128").await;
2772            ensure_sketch_len(pot16, pot16, "pot16").await;
2773            // due to ceiling to next_power_of_two
2774            ensure_sketch_len(pot16 + 1, pot(17), "pot16 + 1").await;
2775            // due to ceiling to next_power_of_two
2776            ensure_sketch_len(pot24 - 1, pot24, "pot24 - 1").await;
2777            ensure_sketch_len(pot24, pot24, "pot24").await;
2778            ensure_sketch_len(pot(27), pot24, "pot(27)").await;
2779            ensure_sketch_len(u32::MAX as u64, pot24, "u32::MAX").await;
2780        } else {
2781            // target_pointer_width: 64 or larger.
2782            let pot30 = pot(30);
2783            let pot16 = pot(16);
2784            ensure_sketch_len(0, 128, "0").await;
2785            ensure_sketch_len(128, 128, "128").await;
2786            ensure_sketch_len(pot16, pot16, "pot16").await;
2787            // due to ceiling to next_power_of_two
2788            ensure_sketch_len(pot16 + 1, pot(17), "pot16 + 1").await;
2789
2790            // The following tests will allocate large memory (~8GiB).
2791            if !cfg!(skip_large_mem_tests) {
2792                // due to ceiling to next_power_of_two
2793                ensure_sketch_len(pot30 - 1, pot30, "pot30- 1").await;
2794                ensure_sketch_len(pot30, pot30, "pot30").await;
2795                ensure_sketch_len(u64::MAX, pot30, "u64::MAX").await;
2796            }
2797        };
2798    }
2799
2800    #[tokio::test]
2801    async fn test_per_entry_expiration() {
2802        use crate::{common::time::Clock, Entry, Expiry};
2803
2804        use std::{
2805            collections::hash_map::RandomState,
2806            sync::{Arc, Mutex},
2807            time::{Duration, Instant as StdInstant},
2808        };
2809
2810        type Key = u32;
2811        type Value = char;
2812
2813        fn current_time(cache: &BaseCache<Key, Value>) -> StdInstant {
2814            cache.inner.clock().to_std_instant(cache.current_time())
2815        }
2816
2817        async fn insert(cache: &BaseCache<Key, Value>, key: Key, hash: u64, value: Value) {
2818            let (op, _now) = cache.do_insert_with_hash(Arc::new(key), hash, value).await;
2819            cache.write_op_ch.send(op).expect("Failed to send");
2820        }
2821
2822        fn never_ignore<'a, V>() -> Option<&'a mut fn(&V) -> bool> {
2823            None
2824        }
2825
2826        macro_rules! assert_params_eq {
2827            ($left:expr, $right:expr, $param_name:expr, $line:expr) => {
2828                assert_eq!(
2829                    $left, $right,
2830                    "Mismatched `{}`s. line: {}",
2831                    $param_name, $line
2832                );
2833            };
2834        }
2835
2836        macro_rules! assert_expiry {
2837            ($cache:ident, $key:ident, $hash:ident, $mock:ident, $duration_secs:expr) => {
2838                // Increment the time.
2839                $mock.increment(Duration::from_millis($duration_secs * 1000 - 1));
2840                $cache.inner.do_run_pending_tasks(None, 1, 10).await;
2841                assert!($cache.contains_key_with_hash(&$key, $hash));
2842                assert_eq!($cache.entry_count(), 1);
2843
2844                // Increment the time by 1ms (3). The entry should be expired.
2845                $mock.increment(Duration::from_millis(1));
2846                $cache.inner.do_run_pending_tasks(None, 1, 10).await;
2847                assert!(!$cache.contains_key_with_hash(&$key, $hash));
2848
2849                // Increment the time again to ensure the entry has been evicted from the
2850                // cache.
2851                $mock.increment(Duration::from_secs(1));
2852                $cache.inner.do_run_pending_tasks(None, 1, 10).await;
2853                assert_eq!($cache.entry_count(), 0);
2854            };
2855        }
2856
2857        /// Contains expected call parameters and also a return value.
2858        #[derive(Debug)]
2859        enum ExpiryExpectation {
2860            NoCall,
2861            AfterCreate {
2862                caller_line: u32,
2863                key: Key,
2864                value: Value,
2865                current_time: StdInstant,
2866                new_duration_secs: Option<u64>,
2867            },
2868            AfterRead {
2869                caller_line: u32,
2870                key: Key,
2871                value: Value,
2872                current_time: StdInstant,
2873                current_duration_secs: Option<u64>,
2874                last_modified_at: StdInstant,
2875                new_duration_secs: Option<u64>,
2876            },
2877            AfterUpdate {
2878                caller_line: u32,
2879                key: Key,
2880                value: Value,
2881                current_time: StdInstant,
2882                current_duration_secs: Option<u64>,
2883                new_duration_secs: Option<u64>,
2884            },
2885        }
2886
2887        impl ExpiryExpectation {
2888            fn after_create(
2889                caller_line: u32,
2890                key: Key,
2891                value: Value,
2892                current_time: StdInstant,
2893                new_duration_secs: Option<u64>,
2894            ) -> Self {
2895                Self::AfterCreate {
2896                    caller_line,
2897                    key,
2898                    value,
2899                    current_time,
2900                    new_duration_secs,
2901                }
2902            }
2903
2904            fn after_read(
2905                caller_line: u32,
2906                key: Key,
2907                value: Value,
2908                current_time: StdInstant,
2909                current_duration_secs: Option<u64>,
2910                last_modified_at: StdInstant,
2911                new_duration_secs: Option<u64>,
2912            ) -> Self {
2913                Self::AfterRead {
2914                    caller_line,
2915                    key,
2916                    value,
2917                    current_time,
2918                    current_duration_secs,
2919                    last_modified_at,
2920                    new_duration_secs,
2921                }
2922            }
2923
2924            fn after_update(
2925                caller_line: u32,
2926                key: Key,
2927                value: Value,
2928                current_time: StdInstant,
2929                current_duration_secs: Option<u64>,
2930                new_duration_secs: Option<u64>,
2931            ) -> Self {
2932                Self::AfterUpdate {
2933                    caller_line,
2934                    key,
2935                    value,
2936                    current_time,
2937                    current_duration_secs,
2938                    new_duration_secs,
2939                }
2940            }
2941        }
2942
2943        let expectation = Arc::new(Mutex::new(ExpiryExpectation::NoCall));
2944
2945        struct MyExpiry {
2946            expectation: Arc<Mutex<ExpiryExpectation>>,
2947        }
2948
2949        impl Expiry<u32, char> for MyExpiry {
2950            fn expire_after_create(
2951                &self,
2952                actual_key: &u32,
2953                actual_value: &char,
2954                actual_current_time: StdInstant,
2955            ) -> Option<Duration> {
2956                use ExpiryExpectation::*;
2957
2958                let lock = &mut *self.expectation.lock().unwrap();
2959                let expected = std::mem::replace(lock, NoCall);
2960                match expected {
2961                    AfterCreate {
2962                        caller_line,
2963                        key,
2964                        value,
2965                        current_time,
2966                        new_duration_secs: new_duration,
2967                    } => {
2968                        assert_params_eq!(*actual_key, key, "key", caller_line);
2969                        assert_params_eq!(*actual_value, value, "value", caller_line);
2970                        assert_params_eq!(
2971                            actual_current_time,
2972                            current_time,
2973                            "current_time",
2974                            caller_line
2975                        );
2976                        new_duration.map(Duration::from_secs)
2977                    }
2978                    expected => {
2979                        panic!(
2980                            "Unexpected call to expire_after_create: caller_line {}, expected: {expected:?}",
2981                            line!()
2982                        );
2983                    }
2984                }
2985            }
2986
2987            fn expire_after_read(
2988                &self,
2989                actual_key: &u32,
2990                actual_value: &char,
2991                actual_current_time: StdInstant,
2992                actual_current_duration: Option<Duration>,
2993                actual_last_modified_at: StdInstant,
2994            ) -> Option<Duration> {
2995                use ExpiryExpectation::*;
2996
2997                let lock = &mut *self.expectation.lock().unwrap();
2998                let expected = std::mem::replace(lock, NoCall);
2999                match expected {
3000                    AfterRead {
3001                        caller_line,
3002                        key,
3003                        value,
3004                        current_time,
3005                        current_duration_secs,
3006                        last_modified_at,
3007                        new_duration_secs,
3008                    } => {
3009                        assert_params_eq!(*actual_key, key, "key", caller_line);
3010                        assert_params_eq!(*actual_value, value, "value", caller_line);
3011                        assert_params_eq!(
3012                            actual_current_time,
3013                            current_time,
3014                            "current_time",
3015                            caller_line
3016                        );
3017                        assert_params_eq!(
3018                            actual_current_duration,
3019                            current_duration_secs.map(Duration::from_secs),
3020                            "current_duration",
3021                            caller_line
3022                        );
3023                        assert_params_eq!(
3024                            actual_last_modified_at,
3025                            last_modified_at,
3026                            "last_modified_at",
3027                            caller_line
3028                        );
3029                        new_duration_secs.map(Duration::from_secs)
3030                    }
3031                    expected => {
3032                        panic!(
3033                            "Unexpected call to expire_after_read: caller_line {}, expected: {expected:?}",
3034                            line!()
3035                        );
3036                    }
3037                }
3038            }
3039
3040            fn expire_after_update(
3041                &self,
3042                actual_key: &u32,
3043                actual_value: &char,
3044                actual_current_time: StdInstant,
3045                actual_current_duration: Option<Duration>,
3046            ) -> Option<Duration> {
3047                use ExpiryExpectation::*;
3048
3049                let lock = &mut *self.expectation.lock().unwrap();
3050                let expected = std::mem::replace(lock, NoCall);
3051                match expected {
3052                    AfterUpdate {
3053                        caller_line,
3054                        key,
3055                        value,
3056                        current_time,
3057                        current_duration_secs,
3058                        new_duration_secs,
3059                    } => {
3060                        assert_params_eq!(*actual_key, key, "key", caller_line);
3061                        assert_params_eq!(*actual_value, value, "value", caller_line);
3062                        assert_params_eq!(
3063                            actual_current_time,
3064                            current_time,
3065                            "current_time",
3066                            caller_line
3067                        );
3068                        assert_params_eq!(
3069                            actual_current_duration,
3070                            current_duration_secs.map(Duration::from_secs),
3071                            "current_duration",
3072                            caller_line
3073                        );
3074                        new_duration_secs.map(Duration::from_secs)
3075                    }
3076                    expected => {
3077                        panic!(
3078                            "Unexpected call to expire_after_update: caller_line {}, expected: {expected:?}",
3079                            line!()
3080                        );
3081                    }
3082                }
3083            }
3084        }
3085
3086        const TTL: u64 = 16;
3087        const TTI: u64 = 7;
3088        let expiry: Option<Arc<dyn Expiry<_, _> + Send + Sync + 'static>> =
3089            Some(Arc::new(MyExpiry {
3090                expectation: Arc::clone(&expectation),
3091            }));
3092        let (clock, mock) = Clock::mock();
3093
3094        let mut cache = BaseCache::<Key, Value>::new(
3095            None,
3096            None,
3097            None,
3098            RandomState::default(),
3099            None,
3100            EvictionPolicy::default(),
3101            None,
3102            ExpirationPolicy::new(
3103                Some(Duration::from_secs(TTL)),
3104                Some(Duration::from_secs(TTI)),
3105                expiry,
3106            ),
3107            HousekeeperConfig::default(),
3108            false,
3109            clock,
3110        );
3111        cache.reconfigure_for_testing().await;
3112
3113        // Make the cache exterior immutable.
3114        let cache = cache;
3115
3116        mock.increment(Duration::from_millis(10));
3117
3118        // ----------------------------------------------------
3119        // Case 1
3120        //
3121        // 1.  0s: Insert with per-entry TTL 1s.
3122        // 2. +1s: Expires.
3123        // ----------------------------------------------------
3124
3125        // Insert an entry (1). It will have a per-entry TTL of 1 second.
3126        let key = 1;
3127        let hash = cache.hash(&key);
3128        let value = 'a';
3129
3130        *expectation.lock().unwrap() =
3131            ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(1));
3132
3133        insert(&cache, key, hash, value).await;
3134        // Run a sync to register the entry to the internal data structures including
3135        // the timer wheel.
3136        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3137        assert_eq!(cache.entry_count(), 1);
3138
3139        assert_expiry!(cache, key, hash, mock, 1);
3140
3141        // ----------------------------------------------------
3142        // Case 2
3143        //
3144        // 1.  0s: Insert with no per-entry TTL.
3145        // 2. +1s: Get with per-entry TTL 3s.
3146        // 3. +3s: Expires.
3147        // ----------------------------------------------------
3148
3149        // Insert an entry (1).
3150        let key = 2;
3151        let hash = cache.hash(&key);
3152        let value = 'b';
3153
3154        *expectation.lock().unwrap() =
3155            ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
3156        let inserted_at = current_time(&cache);
3157        insert(&cache, key, hash, value).await;
3158        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3159        assert_eq!(cache.entry_count(), 1);
3160
3161        // Increment the time.
3162        mock.increment(Duration::from_secs(1));
3163        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3164        assert!(cache.contains_key_with_hash(&key, hash));
3165
3166        // Read the entry (2).
3167        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3168            line!(),
3169            key,
3170            value,
3171            current_time(&cache),
3172            Some(TTI - 1),
3173            inserted_at,
3174            Some(3),
3175        );
3176        assert_eq!(
3177            cache
3178                .get_with_hash(&key, hash, never_ignore(), false, true)
3179                .await
3180                .map(Entry::into_value),
3181            Some(value)
3182        );
3183        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3184
3185        assert_expiry!(cache, key, hash, mock, 3);
3186
3187        // ----------------------------------------------------
3188        // Case 3
3189        //
3190        // 1.  0s: Insert with no per-entry TTL.
3191        // 2. +1s: Get with no per-entry TTL.
3192        // 3. +2s: Update with per-entry TTL 3s.
3193        // 4. +3s: Expires.
3194        // ----------------------------------------------------
3195
3196        // Insert an entry (1).
3197        let key = 3;
3198        let hash = cache.hash(&key);
3199        let value = 'c';
3200
3201        *expectation.lock().unwrap() =
3202            ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
3203        let inserted_at = current_time(&cache);
3204        insert(&cache, key, hash, value).await;
3205        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3206        assert_eq!(cache.entry_count(), 1);
3207
3208        // Increment the time.
3209        mock.increment(Duration::from_secs(1));
3210        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3211        assert!(cache.contains_key_with_hash(&key, hash));
3212
3213        // Read the entry (2).
3214        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3215            line!(),
3216            key,
3217            value,
3218            current_time(&cache),
3219            Some(TTI - 1),
3220            inserted_at,
3221            None,
3222        );
3223        assert_eq!(
3224            cache
3225                .get_with_hash(&key, hash, never_ignore(), false, true)
3226                .await
3227                .map(Entry::into_value),
3228            Some(value)
3229        );
3230        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3231
3232        // Increment the time.
3233        mock.increment(Duration::from_secs(2));
3234        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3235        assert!(cache.contains_key_with_hash(&key, hash));
3236        assert_eq!(cache.entry_count(), 1);
3237
3238        // Update the entry (3).
3239        *expectation.lock().unwrap() = ExpiryExpectation::after_update(
3240            line!(),
3241            key,
3242            value,
3243            current_time(&cache),
3244            // TTI should be reset by this update.
3245            Some(TTI),
3246            Some(3),
3247        );
3248        insert(&cache, key, hash, value).await;
3249        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3250        assert_eq!(cache.entry_count(), 1);
3251
3252        assert_expiry!(cache, key, hash, mock, 3);
3253
3254        // ----------------------------------------------------
3255        // Case 4
3256        //
3257        // 1.  0s: Insert with no per-entry TTL.
3258        // 2. +1s: Get with no per-entry TTL.
3259        // 3. +2s: Update with no per-entry TTL.
3260        // 4. +7s: Expires by TTI (7s from step 3).
3261        // ----------------------------------------------------
3262
3263        // Insert an entry (1).
3264        let key = 4;
3265        let hash = cache.hash(&key);
3266        let value = 'd';
3267
3268        *expectation.lock().unwrap() =
3269            ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
3270        let inserted_at = current_time(&cache);
3271        insert(&cache, key, hash, value).await;
3272        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3273        assert_eq!(cache.entry_count(), 1);
3274
3275        // Increment the time.
3276        mock.increment(Duration::from_secs(1));
3277        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3278        assert!(cache.contains_key_with_hash(&key, hash));
3279        assert_eq!(cache.entry_count(), 1);
3280
3281        // Read the entry (2).
3282        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3283            line!(),
3284            key,
3285            value,
3286            current_time(&cache),
3287            Some(TTI - 1),
3288            inserted_at,
3289            None,
3290        );
3291        assert_eq!(
3292            cache
3293                .get_with_hash(&key, hash, never_ignore(), false, true)
3294                .await
3295                .map(Entry::into_value),
3296            Some(value)
3297        );
3298        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3299
3300        // Increment the time.
3301        mock.increment(Duration::from_secs(2));
3302        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3303        assert!(cache.contains_key_with_hash(&key, hash));
3304        assert_eq!(cache.entry_count(), 1);
3305
3306        // Update the entry (3).
3307        *expectation.lock().unwrap() = ExpiryExpectation::after_update(
3308            line!(),
3309            key,
3310            value,
3311            current_time(&cache),
3312            // TTI should be reset by this update.
3313            Some(TTI),
3314            None,
3315        );
3316        insert(&cache, key, hash, value).await;
3317        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3318        assert_eq!(cache.entry_count(), 1);
3319
3320        assert_expiry!(cache, key, hash, mock, 7);
3321
3322        // ----------------------------------------------------
3323        // Case 5
3324        //
3325        // 1.  0s: Insert with per-entry TTL 8s.
3326        // 2. +5s: Get with per-entry TTL 8s.
3327        // 3. +7s: Expires by TTI (7s).
3328        // ----------------------------------------------------
3329
3330        // Insert an entry.
3331        let key = 5;
3332        let hash = cache.hash(&key);
3333        let value = 'e';
3334
3335        *expectation.lock().unwrap() =
3336            ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8));
3337        let inserted_at = current_time(&cache);
3338        insert(&cache, key, hash, value).await;
3339        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3340        assert_eq!(cache.entry_count(), 1);
3341
3342        // Increment the time.
3343        mock.increment(Duration::from_secs(5));
3344        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3345        assert!(cache.contains_key_with_hash(&key, hash));
3346        assert_eq!(cache.entry_count(), 1);
3347
3348        // Read the entry.
3349        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3350            line!(),
3351            key,
3352            value,
3353            current_time(&cache),
3354            Some(TTI - 5),
3355            inserted_at,
3356            Some(8),
3357        );
3358        assert_eq!(
3359            cache
3360                .get_with_hash(&key, hash, never_ignore(), false, true)
3361                .await
3362                .map(Entry::into_value),
3363            Some(value)
3364        );
3365        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3366
3367        assert_expiry!(cache, key, hash, mock, 7);
3368
3369        // ----------------------------------------------------
3370        // Case 6
3371        //
3372        // 1.  0s: Insert with per-entry TTL 8s.
3373        // 2. +5s: Get with per-entry TTL 9s.
3374        // 3. +6s: Get with per-entry TTL 10s.
3375        // 4. +5s: Expires by TTL (16s).
3376        // ----------------------------------------------------
3377
3378        // Insert an entry.
3379        let key = 6;
3380        let hash = cache.hash(&key);
3381        let value = 'f';
3382
3383        *expectation.lock().unwrap() =
3384            ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8));
3385        let inserted_at = current_time(&cache);
3386        insert(&cache, key, hash, value).await;
3387        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3388        assert_eq!(cache.entry_count(), 1);
3389
3390        // Increment the time.
3391        mock.increment(Duration::from_secs(5));
3392        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3393        assert!(cache.contains_key_with_hash(&key, hash));
3394        assert_eq!(cache.entry_count(), 1);
3395
3396        // Read the entry.
3397        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3398            line!(),
3399            key,
3400            value,
3401            current_time(&cache),
3402            Some(TTI - 5),
3403            inserted_at,
3404            Some(9),
3405        );
3406        assert_eq!(
3407            cache
3408                .get_with_hash(&key, hash, never_ignore(), false, true)
3409                .await
3410                .map(Entry::into_value),
3411            Some(value)
3412        );
3413        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3414
3415        // Increment the time.
3416        mock.increment(Duration::from_secs(6));
3417        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3418        assert!(cache.contains_key_with_hash(&key, hash));
3419        assert_eq!(cache.entry_count(), 1);
3420
3421        // Read the entry.
3422        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3423            line!(),
3424            key,
3425            value,
3426            current_time(&cache),
3427            Some(TTI - 6),
3428            inserted_at,
3429            Some(10),
3430        );
3431        assert_eq!(
3432            cache
3433                .get_with_hash(&key, hash, never_ignore(), false, true)
3434                .await
3435                .map(Entry::into_value),
3436            Some(value)
3437        );
3438        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3439
3440        assert_expiry!(cache, key, hash, mock, 5);
3441
3442        // ----------------------------------------------------
3443        // Case 7
3444        //
3445        // 1.   0s: Insert with per-entry TTL 9s.
3446        // 2.  +6s: Update with per-entry TTL 8s.
3447        // 3.  +6s: Get with per-entry TTL 9s
3448        // 4.  +6s: Get with per-entry TTL 5s.
3449        // 5.  +4s: Expires by TTL (16s from step 2).
3450        // ----------------------------------------------------
3451        // Insert an entry.
3452        let key = 7;
3453        let hash = cache.hash(&key);
3454        let value = 'g';
3455
3456        *expectation.lock().unwrap() =
3457            ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(9));
3458        insert(&cache, key, hash, value).await;
3459        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3460        assert_eq!(cache.entry_count(), 1);
3461
3462        // Increment the time.
3463        mock.increment(Duration::from_secs(6));
3464        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3465        assert!(cache.contains_key_with_hash(&key, hash));
3466        assert_eq!(cache.entry_count(), 1);
3467
3468        // Update the entry (3).
3469        *expectation.lock().unwrap() = ExpiryExpectation::after_update(
3470            line!(),
3471            key,
3472            value,
3473            current_time(&cache),
3474            // From the per-entry TTL.
3475            Some(9 - 6),
3476            Some(8),
3477        );
3478        let updated_at = current_time(&cache);
3479        insert(&cache, key, hash, value).await;
3480        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3481        assert_eq!(cache.entry_count(), 1);
3482
3483        // Increment the time.
3484        mock.increment(Duration::from_secs(6));
3485        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3486        assert!(cache.contains_key_with_hash(&key, hash));
3487        assert_eq!(cache.entry_count(), 1);
3488
3489        // Read the entry.
3490        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3491            line!(),
3492            key,
3493            value,
3494            current_time(&cache),
3495            Some(TTI - 6),
3496            updated_at,
3497            Some(9),
3498        );
3499        assert_eq!(
3500            cache
3501                .get_with_hash(&key, hash, never_ignore(), false, true)
3502                .await
3503                .map(Entry::into_value),
3504            Some(value)
3505        );
3506        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3507
3508        // Increment the time.
3509        mock.increment(Duration::from_secs(6));
3510        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3511        assert!(cache.contains_key_with_hash(&key, hash));
3512        assert_eq!(cache.entry_count(), 1);
3513
3514        // Read the entry.
3515        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3516            line!(),
3517            key,
3518            value,
3519            current_time(&cache),
3520            Some(TTI - 6),
3521            updated_at,
3522            Some(5),
3523        );
3524        assert_eq!(
3525            cache
3526                .get_with_hash(&key, hash, never_ignore(), false, true)
3527                .await
3528                .map(Entry::into_value),
3529            Some(value)
3530        );
3531        cache.inner.do_run_pending_tasks(None, 1, 10).await;
3532
3533        assert_expiry!(cache, key, hash, mock, 4);
3534    }
3535}