foyer_memory/
cache.rs

1// Copyright 2025 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{borrow::Cow, fmt::Debug, future::Future, hash::Hash, ops::Deref, sync::Arc};
16
17use ahash::RandomState;
18use equivalent::Equivalent;
19use foyer_common::{
20    code::{HashBuilder, Key, Value},
21    event::EventListener,
22    future::Diversion,
23    metrics::Metrics,
24    runtime::SingletonHandle,
25};
26use mixtrics::{metrics::BoxedRegistry, registry::noop::NoopMetricsRegistry};
27use pin_project::pin_project;
28use serde::{Deserialize, Serialize};
29use tokio::sync::oneshot;
30
31use crate::{
32    eviction::{
33        fifo::{Fifo, FifoConfig},
34        lfu::{Lfu, LfuConfig},
35        lru::{Lru, LruConfig},
36        s3fifo::{S3Fifo, S3FifoConfig},
37    },
38    raw::{FetchMark, FetchState, RawCache, RawCacheConfig, RawCacheEntry, RawFetch, Weighter},
39    record::CacheHint,
40    Piece, Pipe, Result,
41};
42
43pub type FifoCache<K, V, S = RandomState> = RawCache<Fifo<K, V>, S>;
44pub type FifoCacheEntry<K, V, S = RandomState> = RawCacheEntry<Fifo<K, V>, S>;
45pub type FifoFetch<K, V, ER, S = RandomState> = RawFetch<Fifo<K, V>, ER, S>;
46
47pub type S3FifoCache<K, V, S = RandomState> = RawCache<S3Fifo<K, V>, S>;
48pub type S3FifoCacheEntry<K, V, S = RandomState> = RawCacheEntry<S3Fifo<K, V>, S>;
49pub type S3FifoFetch<K, V, ER, S = RandomState> = RawFetch<S3Fifo<K, V>, ER, S>;
50
51pub type LruCache<K, V, S = RandomState> = RawCache<Lru<K, V>, S>;
52pub type LruCacheEntry<K, V, S = RandomState> = RawCacheEntry<Lru<K, V>, S>;
53pub type LruFetch<K, V, ER, S = RandomState> = RawFetch<Lru<K, V>, ER, S>;
54
55pub type LfuCache<K, V, S = RandomState> = RawCache<Lfu<K, V>, S>;
56pub type LfuCacheEntry<K, V, S = RandomState> = RawCacheEntry<Lfu<K, V>, S>;
57pub type LfuFetch<K, V, ER, S = RandomState> = RawFetch<Lfu<K, V>, ER, S>;
58
59/// A cached entry holder of the in-memory cache.
60#[derive(Debug)]
61pub enum CacheEntry<K, V, S = RandomState>
62where
63    K: Key,
64    V: Value,
65    S: HashBuilder,
66{
67    /// A cached entry holder of the in-memory FIFO cache.
68    Fifo(FifoCacheEntry<K, V, S>),
69    /// A cached entry holder of the in-memory S3FIFO cache.
70    S3Fifo(S3FifoCacheEntry<K, V, S>),
71    /// A cached entry holder of the in-memory LRU cache.
72    Lru(LruCacheEntry<K, V, S>),
73    /// A cached entry holder of the in-memory LFU cache.
74    Lfu(LfuCacheEntry<K, V, S>),
75}
76
77impl<K, V, S> Clone for CacheEntry<K, V, S>
78where
79    K: Key,
80    V: Value,
81    S: HashBuilder,
82{
83    fn clone(&self) -> Self {
84        match self {
85            Self::Fifo(entry) => Self::Fifo(entry.clone()),
86            Self::Lru(entry) => Self::Lru(entry.clone()),
87            Self::Lfu(entry) => Self::Lfu(entry.clone()),
88            Self::S3Fifo(entry) => Self::S3Fifo(entry.clone()),
89        }
90    }
91}
92
93impl<K, V, S> Deref for CacheEntry<K, V, S>
94where
95    K: Key,
96    V: Value,
97    S: HashBuilder,
98{
99    type Target = V;
100
101    fn deref(&self) -> &Self::Target {
102        match self {
103            CacheEntry::Fifo(entry) => entry.deref(),
104            CacheEntry::Lru(entry) => entry.deref(),
105            CacheEntry::Lfu(entry) => entry.deref(),
106            CacheEntry::S3Fifo(entry) => entry.deref(),
107        }
108    }
109}
110
111impl<K, V, S> From<FifoCacheEntry<K, V, S>> for CacheEntry<K, V, S>
112where
113    K: Key,
114    V: Value,
115    S: HashBuilder,
116{
117    fn from(entry: FifoCacheEntry<K, V, S>) -> Self {
118        Self::Fifo(entry)
119    }
120}
121
122impl<K, V, S> From<LruCacheEntry<K, V, S>> for CacheEntry<K, V, S>
123where
124    K: Key,
125    V: Value,
126    S: HashBuilder,
127{
128    fn from(entry: LruCacheEntry<K, V, S>) -> Self {
129        Self::Lru(entry)
130    }
131}
132
133impl<K, V, S> From<LfuCacheEntry<K, V, S>> for CacheEntry<K, V, S>
134where
135    K: Key,
136    V: Value,
137    S: HashBuilder,
138{
139    fn from(entry: LfuCacheEntry<K, V, S>) -> Self {
140        Self::Lfu(entry)
141    }
142}
143
144impl<K, V, S> From<S3FifoCacheEntry<K, V, S>> for CacheEntry<K, V, S>
145where
146    K: Key,
147    V: Value,
148    S: HashBuilder,
149{
150    fn from(entry: S3FifoCacheEntry<K, V, S>) -> Self {
151        Self::S3Fifo(entry)
152    }
153}
154
155impl<K, V, S> CacheEntry<K, V, S>
156where
157    K: Key,
158    V: Value,
159    S: HashBuilder,
160{
161    /// Key hash of the cached entry.
162    pub fn hash(&self) -> u64 {
163        match self {
164            CacheEntry::Fifo(entry) => entry.hash(),
165            CacheEntry::Lru(entry) => entry.hash(),
166            CacheEntry::Lfu(entry) => entry.hash(),
167            CacheEntry::S3Fifo(entry) => entry.hash(),
168        }
169    }
170
171    /// Key of the cached entry.
172    pub fn key(&self) -> &K {
173        match self {
174            CacheEntry::Fifo(entry) => entry.key(),
175            CacheEntry::Lru(entry) => entry.key(),
176            CacheEntry::Lfu(entry) => entry.key(),
177            CacheEntry::S3Fifo(entry) => entry.key(),
178        }
179    }
180
181    /// Value of the cached entry.
182    pub fn value(&self) -> &V {
183        match self {
184            CacheEntry::Fifo(entry) => entry.value(),
185            CacheEntry::Lru(entry) => entry.value(),
186            CacheEntry::Lfu(entry) => entry.value(),
187            CacheEntry::S3Fifo(entry) => entry.value(),
188        }
189    }
190
191    /// Hint of the cached entry.
192    pub fn hint(&self) -> CacheHint {
193        match self {
194            CacheEntry::Fifo(entry) => entry.hint().clone().into(),
195            CacheEntry::Lru(entry) => entry.hint().clone().into(),
196            CacheEntry::Lfu(entry) => entry.hint().clone().into(),
197            CacheEntry::S3Fifo(entry) => entry.hint().clone().into(),
198        }
199    }
200
201    /// Weight of the cached entry.
202    pub fn weight(&self) -> usize {
203        match self {
204            CacheEntry::Fifo(entry) => entry.weight(),
205            CacheEntry::Lru(entry) => entry.weight(),
206            CacheEntry::Lfu(entry) => entry.weight(),
207            CacheEntry::S3Fifo(entry) => entry.weight(),
208        }
209    }
210
211    /// External reference count of the cached entry.
212    pub fn refs(&self) -> usize {
213        match self {
214            CacheEntry::Fifo(entry) => entry.refs(),
215            CacheEntry::Lru(entry) => entry.refs(),
216            CacheEntry::Lfu(entry) => entry.refs(),
217            CacheEntry::S3Fifo(entry) => entry.refs(),
218        }
219    }
220
221    /// If the cached entry is updated and outdated.
222    pub fn is_outdated(&self) -> bool {
223        match self {
224            CacheEntry::Fifo(entry) => entry.is_outdated(),
225            CacheEntry::Lru(entry) => entry.is_outdated(),
226            CacheEntry::Lfu(entry) => entry.is_outdated(),
227            CacheEntry::S3Fifo(entry) => entry.is_outdated(),
228        }
229    }
230
231    /// Get the piece of the entry record.
232    pub fn piece(&self) -> Piece<K, V> {
233        match self {
234            CacheEntry::Fifo(entry) => entry.piece(),
235            CacheEntry::Lru(entry) => entry.piece(),
236            CacheEntry::Lfu(entry) => entry.piece(),
237            CacheEntry::S3Fifo(entry) => entry.piece(),
238        }
239    }
240}
241
242/// Eviction algorithm config.
243#[derive(Debug, Clone, Serialize, Deserialize)]
244pub enum EvictionConfig {
245    /// FIFO eviction algorithm config.
246    Fifo(FifoConfig),
247    /// S3FIFO eviction algorithm config.
248    S3Fifo(S3FifoConfig),
249    /// LRU eviction algorithm config.
250    Lru(LruConfig),
251    /// LFU eviction algorithm config.
252    Lfu(LfuConfig),
253}
254
255impl From<FifoConfig> for EvictionConfig {
256    fn from(value: FifoConfig) -> EvictionConfig {
257        EvictionConfig::Fifo(value)
258    }
259}
260
261impl From<S3FifoConfig> for EvictionConfig {
262    fn from(value: S3FifoConfig) -> EvictionConfig {
263        EvictionConfig::S3Fifo(value)
264    }
265}
266
267impl From<LruConfig> for EvictionConfig {
268    fn from(value: LruConfig) -> EvictionConfig {
269        EvictionConfig::Lru(value)
270    }
271}
272
273impl From<LfuConfig> for EvictionConfig {
274    fn from(value: LfuConfig) -> EvictionConfig {
275        EvictionConfig::Lfu(value)
276    }
277}
278
279/// In-memory cache builder.
280pub struct CacheBuilder<K, V, S>
281where
282    K: Key,
283    V: Value,
284    S: HashBuilder,
285{
286    name: Cow<'static, str>,
287
288    capacity: usize,
289    shards: usize,
290    eviction_config: EvictionConfig,
291
292    hash_builder: S,
293    weighter: Arc<dyn Weighter<K, V>>,
294
295    event_listener: Option<Arc<dyn EventListener<Key = K, Value = V>>>,
296
297    registry: BoxedRegistry,
298    metrics: Option<Arc<Metrics>>,
299}
300
301impl<K, V> CacheBuilder<K, V, RandomState>
302where
303    K: Key,
304    V: Value,
305{
306    /// Create a new in-memory cache builder.
307    pub fn new(capacity: usize) -> Self {
308        Self {
309            name: "foyer".into(),
310
311            capacity,
312            shards: 8,
313            eviction_config: LruConfig::default().into(),
314
315            hash_builder: RandomState::default(),
316            weighter: Arc::new(|_, _| 1),
317            event_listener: None,
318
319            registry: Box::new(NoopMetricsRegistry),
320            metrics: None,
321        }
322    }
323}
324
325impl<K, V, S> CacheBuilder<K, V, S>
326where
327    K: Key,
328    V: Value,
329    S: HashBuilder,
330{
331    /// Set the name of the foyer in-memory cache instance.
332    ///
333    /// foyer will use the name as the prefix of the metric names.
334    ///
335    /// Default: `foyer`.
336    pub fn with_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
337        self.name = name.into();
338        self
339    }
340
341    /// Set in-memory cache sharding count. Entries will be distributed to different shards based on their hash.
342    /// Operations on different shard can be parallelized.
343    pub fn with_shards(mut self, shards: usize) -> Self {
344        self.shards = shards;
345        self
346    }
347
348    /// Set in-memory cache eviction algorithm.
349    ///
350    /// The default value is a general-used w-TinyLFU algorithm.
351    pub fn with_eviction_config(mut self, eviction_config: impl Into<EvictionConfig>) -> Self {
352        self.eviction_config = eviction_config.into();
353        self
354    }
355
356    /// Set in-memory cache hash builder.
357    pub fn with_hash_builder<OS>(self, hash_builder: OS) -> CacheBuilder<K, V, OS>
358    where
359        OS: HashBuilder,
360    {
361        CacheBuilder {
362            name: self.name,
363            capacity: self.capacity,
364            shards: self.shards,
365            eviction_config: self.eviction_config,
366            hash_builder,
367            weighter: self.weighter,
368            event_listener: self.event_listener,
369            registry: self.registry,
370            metrics: self.metrics,
371        }
372    }
373
374    /// Set in-memory cache weighter.
375    pub fn with_weighter(mut self, weighter: impl Weighter<K, V>) -> Self {
376        self.weighter = Arc::new(weighter);
377        self
378    }
379
380    /// Set event listener.
381    pub fn with_event_listener(mut self, event_listener: Arc<dyn EventListener<Key = K, Value = V>>) -> Self {
382        self.event_listener = Some(event_listener);
383        self
384    }
385
386    /// Set metrics registry.
387    ///
388    /// Default: [`NoopMetricsRegistry`].
389    pub fn with_metrics_registry(mut self, registry: BoxedRegistry) -> CacheBuilder<K, V, S> {
390        self.registry = registry;
391        self
392    }
393
394    /// Set metrics.
395    ///
396    /// Note: `with_metrics` is only supposed to be called by other foyer components.
397    #[doc(hidden)]
398    pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
399        self.metrics = Some(metrics);
400        self
401    }
402
403    /// Build in-memory cache with the given configuration.
404    pub fn build(self) -> Cache<K, V, S> {
405        if self.capacity < self.shards {
406            tracing::warn!(
407                "The in-memory cache capacity({}) < shards({}).",
408                self.capacity,
409                self.shards
410            );
411        }
412
413        let metrics = self
414            .metrics
415            .unwrap_or_else(|| Arc::new(Metrics::new(self.name, &self.registry)));
416
417        match self.eviction_config {
418            EvictionConfig::Fifo(eviction_config) => Cache::Fifo(Arc::new(RawCache::new(RawCacheConfig {
419                capacity: self.capacity,
420                shards: self.shards,
421                eviction_config,
422                hash_builder: self.hash_builder,
423                weighter: self.weighter,
424                event_listener: self.event_listener,
425                metrics,
426            }))),
427            EvictionConfig::S3Fifo(eviction_config) => Cache::S3Fifo(Arc::new(RawCache::new(RawCacheConfig {
428                capacity: self.capacity,
429                shards: self.shards,
430                eviction_config,
431                hash_builder: self.hash_builder,
432                weighter: self.weighter,
433                event_listener: self.event_listener,
434                metrics,
435            }))),
436            EvictionConfig::Lru(eviction_config) => Cache::Lru(Arc::new(RawCache::new(RawCacheConfig {
437                capacity: self.capacity,
438                shards: self.shards,
439                eviction_config,
440                hash_builder: self.hash_builder,
441                weighter: self.weighter,
442                event_listener: self.event_listener,
443                metrics,
444            }))),
445            EvictionConfig::Lfu(eviction_config) => Cache::Lfu(Arc::new(RawCache::new(RawCacheConfig {
446                capacity: self.capacity,
447                shards: self.shards,
448                eviction_config,
449                hash_builder: self.hash_builder,
450                weighter: self.weighter,
451                event_listener: self.event_listener,
452                metrics,
453            }))),
454        }
455    }
456}
457
458/// In-memory cache with plug-and-play algorithms.
459pub enum Cache<K, V, S = RandomState>
460where
461    K: Key,
462    V: Value,
463    S: HashBuilder,
464{
465    /// In-memory FIFO cache.
466    Fifo(Arc<FifoCache<K, V, S>>),
467    /// In-memory LRU cache.
468    Lru(Arc<LruCache<K, V, S>>),
469    /// In-memory LFU cache.
470    Lfu(Arc<LfuCache<K, V, S>>),
471    /// In-memory S3FIFO cache.
472    S3Fifo(Arc<S3FifoCache<K, V, S>>),
473}
474
475impl<K, V, S> Debug for Cache<K, V, S>
476where
477    K: Key,
478    V: Value,
479    S: HashBuilder,
480{
481    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
482        match self {
483            Self::Fifo(_) => f.debug_tuple("Cache::FifoCache").finish(),
484            Self::S3Fifo(_) => f.debug_tuple("Cache::S3FifoCache").finish(),
485            Self::Lru(_) => f.debug_tuple("Cache::LruCache").finish(),
486            Self::Lfu(_) => f.debug_tuple("Cache::LfuCache").finish(),
487        }
488    }
489}
490
491impl<K, V, S> Clone for Cache<K, V, S>
492where
493    K: Key,
494    V: Value,
495    S: HashBuilder,
496{
497    fn clone(&self) -> Self {
498        match self {
499            Self::Fifo(cache) => Self::Fifo(cache.clone()),
500            Self::S3Fifo(cache) => Self::S3Fifo(cache.clone()),
501            Self::Lru(cache) => Self::Lru(cache.clone()),
502            Self::Lfu(cache) => Self::Lfu(cache.clone()),
503        }
504    }
505}
506
507impl<K, V, S> Cache<K, V, S>
508where
509    K: Key,
510    V: Value,
511    S: HashBuilder,
512{
513    /// Update capacity and evict overflowed entries.
514    #[fastrace::trace(name = "foyer::memory::cache::resize")]
515    pub fn resize(&self, capacity: usize) -> Result<()> {
516        match self {
517            Cache::Fifo(cache) => cache.resize(capacity),
518            Cache::S3Fifo(cache) => cache.resize(capacity),
519            Cache::Lru(cache) => cache.resize(capacity),
520            Cache::Lfu(cache) => cache.resize(capacity),
521        }
522    }
523
524    /// Insert cache entry to the in-memory cache.
525    #[fastrace::trace(name = "foyer::memory::cache::insert")]
526    pub fn insert(&self, key: K, value: V) -> CacheEntry<K, V, S> {
527        match self {
528            Cache::Fifo(cache) => cache.insert(key, value).into(),
529            Cache::S3Fifo(cache) => cache.insert(key, value).into(),
530            Cache::Lru(cache) => cache.insert(key, value).into(),
531            Cache::Lfu(cache) => cache.insert(key, value).into(),
532        }
533    }
534
535    /// Insert cache entry with cache hint to the in-memory cache.
536    #[fastrace::trace(name = "foyer::memory::cache::insert_with_hint")]
537    pub fn insert_with_hint(&self, key: K, value: V, hint: CacheHint) -> CacheEntry<K, V, S> {
538        match self {
539            Cache::Fifo(cache) => cache.insert_with_hint(key, value, hint.into()).into(),
540            Cache::S3Fifo(cache) => cache.insert_with_hint(key, value, hint.into()).into(),
541            Cache::Lru(cache) => cache.insert_with_hint(key, value, hint.into()).into(),
542            Cache::Lfu(cache) => cache.insert_with_hint(key, value, hint.into()).into(),
543        }
544    }
545
546    /// Temporarily insert cache entry to the in-memory cache.
547    ///
548    /// The entry will be removed as soon as the returned entry is dropped.
549    ///
550    /// The entry will become a normal entry after it is accessed.
551    #[fastrace::trace(name = "foyer::memory::cache::insert_ephemeral")]
552    pub fn insert_ephemeral(&self, key: K, value: V) -> CacheEntry<K, V, S> {
553        match self {
554            Cache::Fifo(cache) => cache.insert_ephemeral(key, value).into(),
555            Cache::S3Fifo(cache) => cache.insert_ephemeral(key, value).into(),
556            Cache::Lru(cache) => cache.insert_ephemeral(key, value).into(),
557            Cache::Lfu(cache) => cache.insert_ephemeral(key, value).into(),
558        }
559    }
560
561    /// Temporarily insert cache entry with cache hint to the in-memory cache.
562    ///
563    /// The entry will be removed as soon as the returned entry is dropped.
564    ///
565    /// The entry will become a normal entry after it is accessed.
566    #[fastrace::trace(name = "foyer::memory::cache::insert_ephemeral_with_hint")]
567    pub fn insert_ephemeral_with_hint(&self, key: K, value: V, hint: CacheHint) -> CacheEntry<K, V, S> {
568        match self {
569            Cache::Fifo(cache) => cache.insert_ephemeral_with_hint(key, value, hint.into()).into(),
570            Cache::Lru(cache) => cache.insert_ephemeral_with_hint(key, value, hint.into()).into(),
571            Cache::Lfu(cache) => cache.insert_ephemeral_with_hint(key, value, hint.into()).into(),
572            Cache::S3Fifo(cache) => cache.insert_ephemeral_with_hint(key, value, hint.into()).into(),
573        }
574    }
575
576    /// Remove a cached entry with the given key from the in-memory cache.
577    #[fastrace::trace(name = "foyer::memory::cache::remove")]
578    pub fn remove<Q>(&self, key: &Q) -> Option<CacheEntry<K, V, S>>
579    where
580        Q: Hash + Equivalent<K> + ?Sized,
581    {
582        match self {
583            Cache::Fifo(cache) => cache.remove(key).map(CacheEntry::from),
584            Cache::S3Fifo(cache) => cache.remove(key).map(CacheEntry::from),
585            Cache::Lru(cache) => cache.remove(key).map(CacheEntry::from),
586            Cache::Lfu(cache) => cache.remove(key).map(CacheEntry::from),
587        }
588    }
589
590    /// Get cached entry with the given key from the in-memory cache.
591    #[fastrace::trace(name = "foyer::memory::cache::get")]
592    pub fn get<Q>(&self, key: &Q) -> Option<CacheEntry<K, V, S>>
593    where
594        Q: Hash + Equivalent<K> + ?Sized,
595    {
596        match self {
597            Cache::Fifo(cache) => cache.get(key).map(CacheEntry::from),
598            Cache::S3Fifo(cache) => cache.get(key).map(CacheEntry::from),
599            Cache::Lru(cache) => cache.get(key).map(CacheEntry::from),
600            Cache::Lfu(cache) => cache.get(key).map(CacheEntry::from),
601        }
602    }
603
604    /// Check if the in-memory cache contains a cached entry with the given key.
605    #[fastrace::trace(name = "foyer::memory::cache::contains")]
606    pub fn contains<Q>(&self, key: &Q) -> bool
607    where
608        Q: Hash + Equivalent<K> + ?Sized,
609    {
610        match self {
611            Cache::Fifo(cache) => cache.contains(key),
612            Cache::S3Fifo(cache) => cache.contains(key),
613            Cache::Lru(cache) => cache.contains(key),
614            Cache::Lfu(cache) => cache.contains(key),
615        }
616    }
617
618    /// Access the cached entry with the given key but don't return.
619    ///
620    /// Note: This method can be used to update the cache eviction information and order based on the algorithm.
621    #[fastrace::trace(name = "foyer::memory::cache::touch")]
622    pub fn touch<Q>(&self, key: &Q) -> bool
623    where
624        Q: Hash + Equivalent<K> + ?Sized,
625    {
626        match self {
627            Cache::Fifo(cache) => cache.touch(key),
628            Cache::S3Fifo(cache) => cache.touch(key),
629            Cache::Lru(cache) => cache.touch(key),
630            Cache::Lfu(cache) => cache.touch(key),
631        }
632    }
633
634    /// Clear the in-memory cache.
635    #[fastrace::trace(name = "foyer::memory::cache::clear")]
636    pub fn clear(&self) {
637        match self {
638            Cache::Fifo(cache) => cache.clear(),
639            Cache::S3Fifo(cache) => cache.clear(),
640            Cache::Lru(cache) => cache.clear(),
641            Cache::Lfu(cache) => cache.clear(),
642        }
643    }
644
645    /// Get the capacity of the in-memory cache.
646    pub fn capacity(&self) -> usize {
647        match self {
648            Cache::Fifo(cache) => cache.capacity(),
649            Cache::S3Fifo(cache) => cache.capacity(),
650            Cache::Lru(cache) => cache.capacity(),
651            Cache::Lfu(cache) => cache.capacity(),
652        }
653    }
654
655    /// Get the usage of the in-memory cache.
656    pub fn usage(&self) -> usize {
657        match self {
658            Cache::Fifo(cache) => cache.usage(),
659            Cache::S3Fifo(cache) => cache.usage(),
660            Cache::Lru(cache) => cache.usage(),
661            Cache::Lfu(cache) => cache.usage(),
662        }
663    }
664
665    /// Hash the given key with the hash builder of the cache.
666    pub fn hash<Q>(&self, key: &Q) -> u64
667    where
668        Q: Hash + ?Sized,
669    {
670        self.hash_builder().hash_one(key)
671    }
672
673    /// Get the hash builder of the in-memory cache.
674    pub fn hash_builder(&self) -> &Arc<S> {
675        match self {
676            Cache::Fifo(cache) => cache.hash_builder(),
677            Cache::S3Fifo(cache) => cache.hash_builder(),
678            Cache::Lru(cache) => cache.hash_builder(),
679            Cache::Lfu(cache) => cache.hash_builder(),
680        }
681    }
682
683    /// Get the shards of the in-memory cache.
684    pub fn shards(&self) -> usize {
685        match self {
686            Cache::Fifo(cache) => cache.shards(),
687            Cache::S3Fifo(cache) => cache.shards(),
688            Cache::Lru(cache) => cache.shards(),
689            Cache::Lfu(cache) => cache.shards(),
690        }
691    }
692
693    /// Set the pipe for the hybrid cache.
694    #[doc(hidden)]
695    pub fn set_pipe(&self, pipe: Box<dyn Pipe<Key = K, Value = V>>) {
696        match self {
697            Cache::Fifo(cache) => cache.set_pipe(pipe),
698            Cache::S3Fifo(cache) => cache.set_pipe(pipe),
699            Cache::Lru(cache) => cache.set_pipe(pipe),
700            Cache::Lfu(cache) => cache.set_pipe(pipe),
701        }
702    }
703}
704
705/// A future that is used to get entry value from the remote storage for the in-memory cache.
706#[pin_project(project = FetchProj)]
707pub enum Fetch<K, V, ER, S = RandomState>
708where
709    K: Key,
710    V: Value,
711    S: HashBuilder,
712{
713    /// A future that is used to get entry value from the remote storage for the in-memory FIFO cache.
714    Fifo(#[pin] FifoFetch<K, V, ER, S>),
715    /// A future that is used to get entry value from the remote storage for the in-memory S3FIFO cache.
716    S3Fifo(#[pin] S3FifoFetch<K, V, ER, S>),
717    /// A future that is used to get entry value from the remote storage for the in-memory LRU cache.
718    Lru(#[pin] LruFetch<K, V, ER, S>),
719    /// A future that is used to get entry value from the remote storage for the in-memory LFU cache.
720    Lfu(#[pin] LfuFetch<K, V, ER, S>),
721}
722
723impl<K, V, ER, S> From<FifoFetch<K, V, ER, S>> for Fetch<K, V, ER, S>
724where
725    K: Key,
726    V: Value,
727    S: HashBuilder,
728{
729    fn from(entry: FifoFetch<K, V, ER, S>) -> Self {
730        Self::Fifo(entry)
731    }
732}
733
734impl<K, V, ER, S> From<S3FifoFetch<K, V, ER, S>> for Fetch<K, V, ER, S>
735where
736    K: Key,
737    V: Value,
738    S: HashBuilder,
739{
740    fn from(entry: S3FifoFetch<K, V, ER, S>) -> Self {
741        Self::S3Fifo(entry)
742    }
743}
744
745impl<K, V, ER, S> From<LruFetch<K, V, ER, S>> for Fetch<K, V, ER, S>
746where
747    K: Key,
748    V: Value,
749    S: HashBuilder,
750{
751    fn from(entry: LruFetch<K, V, ER, S>) -> Self {
752        Self::Lru(entry)
753    }
754}
755
756impl<K, V, ER, S> From<LfuFetch<K, V, ER, S>> for Fetch<K, V, ER, S>
757where
758    K: Key,
759    V: Value,
760    S: HashBuilder,
761{
762    fn from(entry: LfuFetch<K, V, ER, S>) -> Self {
763        Self::Lfu(entry)
764    }
765}
766
767impl<K, V, ER, S> Future for Fetch<K, V, ER, S>
768where
769    K: Key,
770    V: Value,
771    ER: From<oneshot::error::RecvError>,
772    S: HashBuilder,
773{
774    type Output = std::result::Result<CacheEntry<K, V, S>, ER>;
775
776    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
777        match self.project() {
778            FetchProj::Fifo(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
779            FetchProj::S3Fifo(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
780            FetchProj::Lru(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
781            FetchProj::Lfu(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
782        }
783    }
784}
785
786impl<K, V, ER, S> Fetch<K, V, ER, S>
787where
788    K: Key,
789    V: Value,
790    S: HashBuilder,
791{
792    /// Get the fetch state.
793    pub fn state(&self) -> FetchState {
794        match self {
795            Fetch::Fifo(fetch) => fetch.state(),
796            Fetch::S3Fifo(fetch) => fetch.state(),
797            Fetch::Lru(fetch) => fetch.state(),
798            Fetch::Lfu(fetch) => fetch.state(),
799        }
800    }
801
802    /// Get the ext of the fetch.
803    #[doc(hidden)]
804    pub fn store(&self) -> &Option<FetchMark> {
805        match self {
806            Fetch::Fifo(fetch) => fetch.store(),
807            Fetch::S3Fifo(fetch) => fetch.store(),
808            Fetch::Lru(fetch) => fetch.store(),
809            Fetch::Lfu(fetch) => fetch.store(),
810        }
811    }
812}
813
814impl<K, V, S> Cache<K, V, S>
815where
816    K: Key + Clone,
817    V: Value,
818    S: HashBuilder,
819{
820    /// Get the cached entry with the given key from the in-memory cache.
821    ///
822    /// Use `fetch` to fetch the cache value from the remote storage on cache miss.
823    ///
824    /// The concurrent fetch requests will be deduplicated.
825    #[fastrace::trace(name = "foyer::memory::cache::fetch")]
826    pub fn fetch<F, FU, ER>(&self, key: K, fetch: F) -> Fetch<K, V, ER, S>
827    where
828        F: FnOnce() -> FU,
829        FU: Future<Output = std::result::Result<V, ER>> + Send + 'static,
830        ER: Send + 'static + Debug,
831    {
832        match self {
833            Cache::Fifo(cache) => Fetch::from(cache.fetch(key, fetch)),
834            Cache::S3Fifo(cache) => Fetch::from(cache.fetch(key, fetch)),
835            Cache::Lru(cache) => Fetch::from(cache.fetch(key, fetch)),
836            Cache::Lfu(cache) => Fetch::from(cache.fetch(key, fetch)),
837        }
838    }
839
840    /// Get the cached entry with the given key and hint from the in-memory cache.
841    ///
842    /// Use `fetch` to fetch the cache value from the remote storage on cache miss.
843    ///
844    /// The concurrent fetch requests will be deduplicated.
845    #[fastrace::trace(name = "foyer::memory::cache::fetch_with_hint")]
846    pub fn fetch_with_hint<F, FU, ER>(&self, key: K, hint: CacheHint, fetch: F) -> Fetch<K, V, ER, S>
847    where
848        F: FnOnce() -> FU,
849        FU: Future<Output = std::result::Result<V, ER>> + Send + 'static,
850        ER: Send + 'static + Debug,
851    {
852        match self {
853            Cache::Fifo(cache) => Fetch::from(cache.fetch_with_hint(key, hint.into(), fetch)),
854            Cache::S3Fifo(cache) => Fetch::from(cache.fetch_with_hint(key, hint.into(), fetch)),
855            Cache::Lru(cache) => Fetch::from(cache.fetch_with_hint(key, hint.into(), fetch)),
856            Cache::Lfu(cache) => Fetch::from(cache.fetch_with_hint(key, hint.into(), fetch)),
857        }
858    }
859
860    /// Get the cached entry with the given key from the in-memory cache.
861    ///
862    /// The fetch task will be spawned in the give `runtime`.
863    ///
864    /// Use `fetch` to fetch the cache value from the remote storage on cache miss.
865    ///
866    /// The concurrent fetch requests will be deduplicated.
867    #[doc(hidden)]
868    pub fn fetch_inner<F, FU, ER, ID>(
869        &self,
870        key: K,
871        hint: CacheHint,
872        fetch: F,
873        runtime: &SingletonHandle,
874    ) -> Fetch<K, V, ER, S>
875    where
876        F: FnOnce() -> FU,
877        FU: Future<Output = ID> + Send + 'static,
878        ER: Send + 'static + Debug,
879        ID: Into<Diversion<std::result::Result<V, ER>, FetchMark>>,
880    {
881        match self {
882            Cache::Fifo(cache) => Fetch::from(cache.fetch_inner(key, hint.into(), fetch, runtime)),
883            Cache::Lru(cache) => Fetch::from(cache.fetch_inner(key, hint.into(), fetch, runtime)),
884            Cache::Lfu(cache) => Fetch::from(cache.fetch_inner(key, hint.into(), fetch, runtime)),
885            Cache::S3Fifo(cache) => Fetch::from(cache.fetch_inner(key, hint.into(), fetch, runtime)),
886        }
887    }
888}
889
890#[cfg(test)]
891mod tests {
892    use std::{ops::Range, time::Duration};
893
894    use futures_util::future::join_all;
895    use itertools::Itertools;
896    use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
897
898    use super::*;
899    use crate::eviction::{fifo::FifoConfig, lfu::LfuConfig, lru::LruConfig, s3fifo::S3FifoConfig};
900
901    const CAPACITY: usize = 100;
902    const SHARDS: usize = 4;
903    const RANGE: Range<u64> = 0..1000;
904    const OPS: usize = 10000;
905    const CONCURRENCY: usize = 8;
906
907    fn fifo() -> Cache<u64, u64> {
908        CacheBuilder::new(CAPACITY)
909            .with_shards(SHARDS)
910            .with_eviction_config(FifoConfig {})
911            .build()
912    }
913
914    fn lru() -> Cache<u64, u64> {
915        CacheBuilder::new(CAPACITY)
916            .with_shards(SHARDS)
917            .with_eviction_config(LruConfig {
918                high_priority_pool_ratio: 0.1,
919            })
920            .build()
921    }
922
923    fn lfu() -> Cache<u64, u64> {
924        CacheBuilder::new(CAPACITY)
925            .with_shards(SHARDS)
926            .with_eviction_config(LfuConfig {
927                window_capacity_ratio: 0.1,
928                protected_capacity_ratio: 0.8,
929                cmsketch_eps: 0.001,
930                cmsketch_confidence: 0.9,
931            })
932            .build()
933    }
934
935    fn s3fifo() -> Cache<u64, u64> {
936        CacheBuilder::new(CAPACITY)
937            .with_shards(SHARDS)
938            .with_eviction_config(S3FifoConfig {
939                small_queue_capacity_ratio: 0.1,
940                ghost_queue_capacity_ratio: 10.0,
941                small_to_main_freq_threshold: 2,
942            })
943            .build()
944    }
945
946    fn init_cache(cache: &Cache<u64, u64>, rng: &mut StdRng) {
947        let mut v = RANGE.collect_vec();
948        v.shuffle(rng);
949        for i in v {
950            cache.insert(i, i);
951        }
952    }
953
954    async fn operate(cache: &Cache<u64, u64>, rng: &mut StdRng) {
955        let i = rng.random_range(RANGE);
956        match rng.random_range(0..=3) {
957            0 => {
958                let entry = cache.insert(i, i);
959                assert_eq!(*entry.key(), i);
960                assert_eq!(entry.key(), entry.value());
961            }
962            1 => {
963                if let Some(entry) = cache.get(&i) {
964                    assert_eq!(*entry.key(), i);
965                    assert_eq!(entry.key(), entry.value());
966                }
967            }
968            2 => {
969                cache.remove(&i);
970            }
971            3 => {
972                let entry = cache
973                    .fetch(i, || async move {
974                        tokio::time::sleep(Duration::from_micros(10)).await;
975                        Ok::<_, tokio::sync::oneshot::error::RecvError>(i)
976                    })
977                    .await
978                    .unwrap();
979                assert_eq!(*entry.key(), i);
980                assert_eq!(entry.key(), entry.value());
981            }
982            _ => unreachable!(),
983        }
984    }
985
986    async fn case(cache: Cache<u64, u64>) {
987        let mut rng = StdRng::seed_from_u64(42);
988
989        init_cache(&cache, &mut rng);
990
991        let handles = (0..CONCURRENCY)
992            .map(|_| {
993                let cache = cache.clone();
994                let mut rng = rng.clone();
995                tokio::spawn(async move {
996                    for _ in 0..OPS {
997                        operate(&cache, &mut rng).await;
998                    }
999                })
1000            })
1001            .collect_vec();
1002
1003        join_all(handles).await;
1004    }
1005
1006    #[tokio::test]
1007    async fn test_fifo_cache() {
1008        case(fifo()).await
1009    }
1010
1011    #[tokio::test]
1012    async fn test_lru_cache() {
1013        case(lru()).await
1014    }
1015
1016    #[tokio::test]
1017    async fn test_lfu_cache() {
1018        case(lfu()).await
1019    }
1020
1021    #[tokio::test]
1022    async fn test_s3fifo_cache() {
1023        case(s3fifo()).await
1024    }
1025}