foyer_memory/
cache.rs

1// Copyright 2025 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{borrow::Cow, fmt::Debug, future::Future, hash::Hash, ops::Deref, sync::Arc};
16
17use ahash::RandomState;
18use equivalent::Equivalent;
19use foyer_common::{
20    code::{HashBuilder, Key, Value},
21    event::EventListener,
22    future::Diversion,
23    metrics::Metrics,
24    properties::{Hint, Location, Properties, Source},
25    runtime::SingletonHandle,
26};
27use mixtrics::{metrics::BoxedRegistry, registry::noop::NoopMetricsRegistry};
28use pin_project::pin_project;
29use serde::{Deserialize, Serialize};
30use tokio::sync::oneshot;
31
32use crate::{
33    eviction::{
34        fifo::{Fifo, FifoConfig},
35        lfu::{Lfu, LfuConfig},
36        lru::{Lru, LruConfig},
37        s3fifo::{S3Fifo, S3FifoConfig},
38    },
39    raw::{FetchContext, FetchState, RawCache, RawCacheConfig, RawCacheEntry, RawFetch, Weighter},
40    Piece, Pipe, Result,
41};
42
43/// Entry properties for in-memory only cache.
44#[derive(Debug, Clone, Default)]
45pub struct CacheProperties {
46    ephemeral: bool,
47    hint: Hint,
48}
49
50impl CacheProperties {
51    /// Set the entry to be ephemeral.
52    ///
53    /// An ephemeral entry will be evicted immediately after all its holders drop it,
54    /// no matter if the capacity is reached.
55    pub fn with_ephemeral(mut self, ephemeral: bool) -> Self {
56        self.ephemeral = ephemeral;
57        self
58    }
59
60    /// Get if the entry is ephemeral.
61    pub fn ephemeral(&self) -> bool {
62        self.ephemeral
63    }
64
65    /// Set entry hint.
66    pub fn with_hint(mut self, hint: Hint) -> Self {
67        self.hint = hint;
68        self
69    }
70
71    /// Get entry hint.
72    pub fn hint(&self) -> Hint {
73        self.hint
74    }
75}
76
77impl Properties for CacheProperties {
78    fn with_ephemeral(self, ephemeral: bool) -> Self {
79        self.with_ephemeral(ephemeral)
80    }
81
82    fn ephemeral(&self) -> Option<bool> {
83        Some(self.ephemeral())
84    }
85
86    fn with_hint(self, hint: Hint) -> Self {
87        self.with_hint(hint)
88    }
89
90    fn hint(&self) -> Option<Hint> {
91        Some(self.hint())
92    }
93
94    fn with_location(self, _: Location) -> Self {
95        self
96    }
97
98    fn location(&self) -> Option<Location> {
99        None
100    }
101
102    fn with_source(self, _: Source) -> Self {
103        self
104    }
105
106    fn source(&self) -> Option<Source> {
107        None
108    }
109}
110
111pub type FifoCache<K, V, S = RandomState, P = CacheProperties> = RawCache<Fifo<K, V, P>, S>;
112pub type FifoCacheEntry<K, V, S = RandomState, P = CacheProperties> = RawCacheEntry<Fifo<K, V, P>, S>;
113pub type FifoFetch<K, V, ER, S = RandomState, P = CacheProperties> = RawFetch<Fifo<K, V, P>, ER, S>;
114
115pub type S3FifoCache<K, V, S = RandomState, P = CacheProperties> = RawCache<S3Fifo<K, V, P>, S>;
116pub type S3FifoCacheEntry<K, V, S = RandomState, P = CacheProperties> = RawCacheEntry<S3Fifo<K, V, P>, S>;
117pub type S3FifoFetch<K, V, ER, S = RandomState, P = CacheProperties> = RawFetch<S3Fifo<K, V, P>, ER, S>;
118
119pub type LruCache<K, V, S = RandomState, P = CacheProperties> = RawCache<Lru<K, V, P>, S>;
120pub type LruCacheEntry<K, V, S = RandomState, P = CacheProperties> = RawCacheEntry<Lru<K, V, P>, S>;
121pub type LruFetch<K, V, ER, S = RandomState, P = CacheProperties> = RawFetch<Lru<K, V, P>, ER, S>;
122
123pub type LfuCache<K, V, S = RandomState, P = CacheProperties> = RawCache<Lfu<K, V, P>, S>;
124pub type LfuCacheEntry<K, V, S = RandomState, P = CacheProperties> = RawCacheEntry<Lfu<K, V, P>, S>;
125pub type LfuFetch<K, V, ER, S = RandomState, P = CacheProperties> = RawFetch<Lfu<K, V, P>, ER, S>;
126
127/// A cached entry holder of the in-memory cache.
128#[derive(Debug)]
129pub enum CacheEntry<K, V, S = RandomState, P = CacheProperties>
130where
131    K: Key,
132    V: Value,
133    S: HashBuilder,
134    P: Properties,
135{
136    /// A cached entry holder of the in-memory FIFO cache.
137    Fifo(FifoCacheEntry<K, V, S, P>),
138    /// A cached entry holder of the in-memory S3FIFO cache.
139    S3Fifo(S3FifoCacheEntry<K, V, S, P>),
140    /// A cached entry holder of the in-memory LRU cache.
141    Lru(LruCacheEntry<K, V, S, P>),
142    /// A cached entry holder of the in-memory LFU cache.
143    Lfu(LfuCacheEntry<K, V, S, P>),
144}
145
146impl<K, V, S, P> Clone for CacheEntry<K, V, S, P>
147where
148    K: Key,
149    V: Value,
150    S: HashBuilder,
151    P: Properties,
152{
153    fn clone(&self) -> Self {
154        match self {
155            Self::Fifo(entry) => Self::Fifo(entry.clone()),
156            Self::Lru(entry) => Self::Lru(entry.clone()),
157            Self::Lfu(entry) => Self::Lfu(entry.clone()),
158            Self::S3Fifo(entry) => Self::S3Fifo(entry.clone()),
159        }
160    }
161}
162
163impl<K, V, S, P> Deref for CacheEntry<K, V, S, P>
164where
165    K: Key,
166    V: Value,
167    S: HashBuilder,
168    P: Properties,
169{
170    type Target = V;
171
172    fn deref(&self) -> &Self::Target {
173        match self {
174            CacheEntry::Fifo(entry) => entry.deref(),
175            CacheEntry::Lru(entry) => entry.deref(),
176            CacheEntry::Lfu(entry) => entry.deref(),
177            CacheEntry::S3Fifo(entry) => entry.deref(),
178        }
179    }
180}
181
182impl<K, V, S, P> From<FifoCacheEntry<K, V, S, P>> for CacheEntry<K, V, S, P>
183where
184    K: Key,
185    V: Value,
186    S: HashBuilder,
187    P: Properties,
188{
189    fn from(entry: FifoCacheEntry<K, V, S, P>) -> Self {
190        Self::Fifo(entry)
191    }
192}
193
194impl<K, V, S, P> From<LruCacheEntry<K, V, S, P>> for CacheEntry<K, V, S, P>
195where
196    K: Key,
197    V: Value,
198    S: HashBuilder,
199    P: Properties,
200{
201    fn from(entry: LruCacheEntry<K, V, S, P>) -> Self {
202        Self::Lru(entry)
203    }
204}
205
206impl<K, V, S, P> From<LfuCacheEntry<K, V, S, P>> for CacheEntry<K, V, S, P>
207where
208    K: Key,
209    V: Value,
210    S: HashBuilder,
211    P: Properties,
212{
213    fn from(entry: LfuCacheEntry<K, V, S, P>) -> Self {
214        Self::Lfu(entry)
215    }
216}
217
218impl<K, V, S, P> From<S3FifoCacheEntry<K, V, S, P>> for CacheEntry<K, V, S, P>
219where
220    K: Key,
221    V: Value,
222    S: HashBuilder,
223    P: Properties,
224{
225    fn from(entry: S3FifoCacheEntry<K, V, S, P>) -> Self {
226        Self::S3Fifo(entry)
227    }
228}
229
230impl<K, V, S, P> CacheEntry<K, V, S, P>
231where
232    K: Key,
233    V: Value,
234    S: HashBuilder,
235    P: Properties,
236{
237    /// Key hash of the cached entry.
238    pub fn hash(&self) -> u64 {
239        match self {
240            CacheEntry::Fifo(entry) => entry.hash(),
241            CacheEntry::Lru(entry) => entry.hash(),
242            CacheEntry::Lfu(entry) => entry.hash(),
243            CacheEntry::S3Fifo(entry) => entry.hash(),
244        }
245    }
246
247    /// Key of the cached entry.
248    pub fn key(&self) -> &K {
249        match self {
250            CacheEntry::Fifo(entry) => entry.key(),
251            CacheEntry::Lru(entry) => entry.key(),
252            CacheEntry::Lfu(entry) => entry.key(),
253            CacheEntry::S3Fifo(entry) => entry.key(),
254        }
255    }
256
257    /// Value of the cached entry.
258    pub fn value(&self) -> &V {
259        match self {
260            CacheEntry::Fifo(entry) => entry.value(),
261            CacheEntry::Lru(entry) => entry.value(),
262            CacheEntry::Lfu(entry) => entry.value(),
263            CacheEntry::S3Fifo(entry) => entry.value(),
264        }
265    }
266
267    /// Properties of the cached entry.
268    pub fn properties(&self) -> &P {
269        match self {
270            CacheEntry::Fifo(entry) => entry.properties(),
271            CacheEntry::Lru(entry) => entry.properties(),
272            CacheEntry::Lfu(entry) => entry.properties(),
273            CacheEntry::S3Fifo(entry) => entry.properties(),
274        }
275    }
276
277    /// Weight of the cached entry.
278    pub fn weight(&self) -> usize {
279        match self {
280            CacheEntry::Fifo(entry) => entry.weight(),
281            CacheEntry::Lru(entry) => entry.weight(),
282            CacheEntry::Lfu(entry) => entry.weight(),
283            CacheEntry::S3Fifo(entry) => entry.weight(),
284        }
285    }
286
287    /// External reference count of the cached entry.
288    pub fn refs(&self) -> usize {
289        match self {
290            CacheEntry::Fifo(entry) => entry.refs(),
291            CacheEntry::Lru(entry) => entry.refs(),
292            CacheEntry::Lfu(entry) => entry.refs(),
293            CacheEntry::S3Fifo(entry) => entry.refs(),
294        }
295    }
296
297    /// If the cached entry is updated and outdated.
298    pub fn is_outdated(&self) -> bool {
299        match self {
300            CacheEntry::Fifo(entry) => entry.is_outdated(),
301            CacheEntry::Lru(entry) => entry.is_outdated(),
302            CacheEntry::Lfu(entry) => entry.is_outdated(),
303            CacheEntry::S3Fifo(entry) => entry.is_outdated(),
304        }
305    }
306
307    /// Get the piece of the entry record.
308    pub fn piece(&self) -> Piece<K, V, P> {
309        match self {
310            CacheEntry::Fifo(entry) => entry.piece(),
311            CacheEntry::Lru(entry) => entry.piece(),
312            CacheEntry::Lfu(entry) => entry.piece(),
313            CacheEntry::S3Fifo(entry) => entry.piece(),
314        }
315    }
316}
317
318/// Eviction algorithm config.
319#[derive(Debug, Clone, Serialize, Deserialize)]
320pub enum EvictionConfig {
321    /// FIFO eviction algorithm config.
322    Fifo(FifoConfig),
323    /// S3FIFO eviction algorithm config.
324    S3Fifo(S3FifoConfig),
325    /// LRU eviction algorithm config.
326    Lru(LruConfig),
327    /// LFU eviction algorithm config.
328    Lfu(LfuConfig),
329}
330
331impl From<FifoConfig> for EvictionConfig {
332    fn from(value: FifoConfig) -> EvictionConfig {
333        EvictionConfig::Fifo(value)
334    }
335}
336
337impl From<S3FifoConfig> for EvictionConfig {
338    fn from(value: S3FifoConfig) -> EvictionConfig {
339        EvictionConfig::S3Fifo(value)
340    }
341}
342
343impl From<LruConfig> for EvictionConfig {
344    fn from(value: LruConfig) -> EvictionConfig {
345        EvictionConfig::Lru(value)
346    }
347}
348
349impl From<LfuConfig> for EvictionConfig {
350    fn from(value: LfuConfig) -> EvictionConfig {
351        EvictionConfig::Lfu(value)
352    }
353}
354
355/// In-memory cache builder.
356pub struct CacheBuilder<K, V, S>
357where
358    K: Key,
359    V: Value,
360    S: HashBuilder,
361{
362    name: Cow<'static, str>,
363
364    capacity: usize,
365    shards: usize,
366    eviction_config: EvictionConfig,
367
368    hash_builder: S,
369    weighter: Arc<dyn Weighter<K, V>>,
370
371    event_listener: Option<Arc<dyn EventListener<Key = K, Value = V>>>,
372
373    registry: BoxedRegistry,
374    metrics: Option<Arc<Metrics>>,
375}
376
377impl<K, V> CacheBuilder<K, V, RandomState>
378where
379    K: Key,
380    V: Value,
381{
382    /// Create a new in-memory cache builder.
383    pub fn new(capacity: usize) -> Self {
384        Self {
385            name: "foyer".into(),
386
387            capacity,
388            shards: 8,
389            eviction_config: LruConfig::default().into(),
390
391            hash_builder: RandomState::default(),
392            weighter: Arc::new(|_, _| 1),
393            event_listener: None,
394
395            registry: Box::new(NoopMetricsRegistry),
396            metrics: None,
397        }
398    }
399}
400
401impl<K, V, S> CacheBuilder<K, V, S>
402where
403    K: Key,
404    V: Value,
405    S: HashBuilder,
406{
407    /// Set the name of the foyer in-memory cache instance.
408    ///
409    /// foyer will use the name as the prefix of the metric names.
410    ///
411    /// Default: `foyer`.
412    pub fn with_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
413        self.name = name.into();
414        self
415    }
416
417    /// Set in-memory cache sharding count. Entries will be distributed to different shards based on their hash.
418    /// Operations on different shard can be parallelized.
419    pub fn with_shards(mut self, shards: usize) -> Self {
420        self.shards = shards;
421        self
422    }
423
424    /// Set in-memory cache eviction algorithm.
425    ///
426    /// The default value is a general-used w-TinyLFU algorithm.
427    pub fn with_eviction_config(mut self, eviction_config: impl Into<EvictionConfig>) -> Self {
428        self.eviction_config = eviction_config.into();
429        self
430    }
431
432    /// Set in-memory cache hash builder.
433    pub fn with_hash_builder<OS>(self, hash_builder: OS) -> CacheBuilder<K, V, OS>
434    where
435        OS: HashBuilder,
436    {
437        CacheBuilder {
438            name: self.name,
439            capacity: self.capacity,
440            shards: self.shards,
441            eviction_config: self.eviction_config,
442            hash_builder,
443            weighter: self.weighter,
444            event_listener: self.event_listener,
445            registry: self.registry,
446            metrics: self.metrics,
447        }
448    }
449
450    /// Set in-memory cache weighter.
451    pub fn with_weighter(mut self, weighter: impl Weighter<K, V>) -> Self {
452        self.weighter = Arc::new(weighter);
453        self
454    }
455
456    /// Set event listener.
457    pub fn with_event_listener(mut self, event_listener: Arc<dyn EventListener<Key = K, Value = V>>) -> Self {
458        self.event_listener = Some(event_listener);
459        self
460    }
461
462    /// Set metrics registry.
463    ///
464    /// Default: [`NoopMetricsRegistry`].
465    pub fn with_metrics_registry(mut self, registry: BoxedRegistry) -> CacheBuilder<K, V, S> {
466        self.registry = registry;
467        self
468    }
469
470    /// Set metrics.
471    ///
472    /// Note: `with_metrics` is only supposed to be called by other foyer components.
473    #[doc(hidden)]
474    pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
475        self.metrics = Some(metrics);
476        self
477    }
478
479    /// Build in-memory cache with the given configuration.
480    pub fn build<P>(self) -> Cache<K, V, S, P>
481    where
482        P: Properties,
483    {
484        if self.capacity < self.shards {
485            tracing::warn!(
486                "The in-memory cache capacity({}) < shards({}).",
487                self.capacity,
488                self.shards
489            );
490        }
491
492        let metrics = self
493            .metrics
494            .unwrap_or_else(|| Arc::new(Metrics::new(self.name, &self.registry)));
495
496        match self.eviction_config {
497            EvictionConfig::Fifo(eviction_config) => Cache::Fifo(Arc::new(RawCache::new(RawCacheConfig {
498                capacity: self.capacity,
499                shards: self.shards,
500                eviction_config,
501                hash_builder: self.hash_builder,
502                weighter: self.weighter,
503                event_listener: self.event_listener,
504                metrics,
505            }))),
506            EvictionConfig::S3Fifo(eviction_config) => Cache::S3Fifo(Arc::new(RawCache::new(RawCacheConfig {
507                capacity: self.capacity,
508                shards: self.shards,
509                eviction_config,
510                hash_builder: self.hash_builder,
511                weighter: self.weighter,
512                event_listener: self.event_listener,
513                metrics,
514            }))),
515            EvictionConfig::Lru(eviction_config) => Cache::Lru(Arc::new(RawCache::new(RawCacheConfig {
516                capacity: self.capacity,
517                shards: self.shards,
518                eviction_config,
519                hash_builder: self.hash_builder,
520                weighter: self.weighter,
521                event_listener: self.event_listener,
522                metrics,
523            }))),
524            EvictionConfig::Lfu(eviction_config) => Cache::Lfu(Arc::new(RawCache::new(RawCacheConfig {
525                capacity: self.capacity,
526                shards: self.shards,
527                eviction_config,
528                hash_builder: self.hash_builder,
529                weighter: self.weighter,
530                event_listener: self.event_listener,
531                metrics,
532            }))),
533        }
534    }
535}
536
537/// In-memory cache with plug-and-play algorithms.
538pub enum Cache<K, V, S = RandomState, P = CacheProperties>
539where
540    K: Key,
541    V: Value,
542    S: HashBuilder,
543    P: Properties,
544{
545    /// In-memory FIFO cache.
546    Fifo(Arc<FifoCache<K, V, S, P>>),
547    /// In-memory LRU cache.
548    Lru(Arc<LruCache<K, V, S, P>>),
549    /// In-memory LFU cache.
550    Lfu(Arc<LfuCache<K, V, S, P>>),
551    /// In-memory S3FIFO cache.
552    S3Fifo(Arc<S3FifoCache<K, V, S, P>>),
553}
554
555impl<K, V, S, P> Debug for Cache<K, V, S, P>
556where
557    K: Key,
558    V: Value,
559    S: HashBuilder,
560    P: Properties,
561{
562    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
563        match self {
564            Self::Fifo(_) => f.debug_tuple("Cache::FifoCache").finish(),
565            Self::S3Fifo(_) => f.debug_tuple("Cache::S3FifoCache").finish(),
566            Self::Lru(_) => f.debug_tuple("Cache::LruCache").finish(),
567            Self::Lfu(_) => f.debug_tuple("Cache::LfuCache").finish(),
568        }
569    }
570}
571
572impl<K, V, S, P> Clone for Cache<K, V, S, P>
573where
574    K: Key,
575    V: Value,
576    S: HashBuilder,
577    P: Properties,
578{
579    fn clone(&self) -> Self {
580        match self {
581            Self::Fifo(cache) => Self::Fifo(cache.clone()),
582            Self::S3Fifo(cache) => Self::S3Fifo(cache.clone()),
583            Self::Lru(cache) => Self::Lru(cache.clone()),
584            Self::Lfu(cache) => Self::Lfu(cache.clone()),
585        }
586    }
587}
588
589impl<K, V> Cache<K, V, RandomState, CacheProperties>
590where
591    K: Key,
592    V: Value,
593{
594    /// Create a new in-memory cache builder with capacity.
595    pub fn builder(capacity: usize) -> CacheBuilder<K, V, RandomState> {
596        CacheBuilder::new(capacity)
597    }
598}
599
600impl<K, V, S, P> Cache<K, V, S, P>
601where
602    K: Key,
603    V: Value,
604    S: HashBuilder,
605    P: Properties,
606{
607    /// Update capacity and evict overflowed entries.
608    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::resize"))]
609    pub fn resize(&self, capacity: usize) -> Result<()> {
610        match self {
611            Cache::Fifo(cache) => cache.resize(capacity),
612            Cache::S3Fifo(cache) => cache.resize(capacity),
613            Cache::Lru(cache) => cache.resize(capacity),
614            Cache::Lfu(cache) => cache.resize(capacity),
615        }
616    }
617
618    /// Insert cache entry to the in-memory cache.
619    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::insert"))]
620    pub fn insert(&self, key: K, value: V) -> CacheEntry<K, V, S, P> {
621        match self {
622            Cache::Fifo(cache) => cache.insert(key, value).into(),
623            Cache::S3Fifo(cache) => cache.insert(key, value).into(),
624            Cache::Lru(cache) => cache.insert(key, value).into(),
625            Cache::Lfu(cache) => cache.insert(key, value).into(),
626        }
627    }
628
629    /// Insert cache entry to the in-memory cache with properties.
630    #[cfg_attr(
631        feature = "tracing",
632        fastrace::trace(name = "foyer::memory::cache::insert_with_properties")
633    )]
634    pub fn insert_with_properties(&self, key: K, value: V, properties: P) -> CacheEntry<K, V, S, P> {
635        match self {
636            Cache::Fifo(cache) => cache.insert_with_properties(key, value, properties).into(),
637            Cache::S3Fifo(cache) => cache.insert_with_properties(key, value, properties).into(),
638            Cache::Lru(cache) => cache.insert_with_properties(key, value, properties).into(),
639            Cache::Lfu(cache) => cache.insert_with_properties(key, value, properties).into(),
640        }
641    }
642
643    /// Remove a cached entry with the given key from the in-memory cache.
644    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::remove"))]
645    pub fn remove<Q>(&self, key: &Q) -> Option<CacheEntry<K, V, S, P>>
646    where
647        Q: Hash + Equivalent<K> + ?Sized,
648    {
649        match self {
650            Cache::Fifo(cache) => cache.remove(key).map(CacheEntry::from),
651            Cache::S3Fifo(cache) => cache.remove(key).map(CacheEntry::from),
652            Cache::Lru(cache) => cache.remove(key).map(CacheEntry::from),
653            Cache::Lfu(cache) => cache.remove(key).map(CacheEntry::from),
654        }
655    }
656
657    /// Get cached entry with the given key from the in-memory cache.
658    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::get"))]
659    pub fn get<Q>(&self, key: &Q) -> Option<CacheEntry<K, V, S, P>>
660    where
661        Q: Hash + Equivalent<K> + ?Sized,
662    {
663        match self {
664            Cache::Fifo(cache) => cache.get(key).map(CacheEntry::from),
665            Cache::S3Fifo(cache) => cache.get(key).map(CacheEntry::from),
666            Cache::Lru(cache) => cache.get(key).map(CacheEntry::from),
667            Cache::Lfu(cache) => cache.get(key).map(CacheEntry::from),
668        }
669    }
670
671    /// Check if the in-memory cache contains a cached entry with the given key.
672    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::contains"))]
673    pub fn contains<Q>(&self, key: &Q) -> bool
674    where
675        Q: Hash + Equivalent<K> + ?Sized,
676    {
677        match self {
678            Cache::Fifo(cache) => cache.contains(key),
679            Cache::S3Fifo(cache) => cache.contains(key),
680            Cache::Lru(cache) => cache.contains(key),
681            Cache::Lfu(cache) => cache.contains(key),
682        }
683    }
684
685    /// Access the cached entry with the given key but don't return.
686    ///
687    /// Note: This method can be used to update the cache eviction information and order based on the algorithm.
688    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::touch"))]
689    pub fn touch<Q>(&self, key: &Q) -> bool
690    where
691        Q: Hash + Equivalent<K> + ?Sized,
692    {
693        match self {
694            Cache::Fifo(cache) => cache.touch(key),
695            Cache::S3Fifo(cache) => cache.touch(key),
696            Cache::Lru(cache) => cache.touch(key),
697            Cache::Lfu(cache) => cache.touch(key),
698        }
699    }
700
701    /// Clear the in-memory cache.
702    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::clear"))]
703    pub fn clear(&self) {
704        match self {
705            Cache::Fifo(cache) => cache.clear(),
706            Cache::S3Fifo(cache) => cache.clear(),
707            Cache::Lru(cache) => cache.clear(),
708            Cache::Lfu(cache) => cache.clear(),
709        }
710    }
711
712    /// Get the capacity of the in-memory cache.
713    pub fn capacity(&self) -> usize {
714        match self {
715            Cache::Fifo(cache) => cache.capacity(),
716            Cache::S3Fifo(cache) => cache.capacity(),
717            Cache::Lru(cache) => cache.capacity(),
718            Cache::Lfu(cache) => cache.capacity(),
719        }
720    }
721
722    /// Get the usage of the in-memory cache.
723    pub fn usage(&self) -> usize {
724        match self {
725            Cache::Fifo(cache) => cache.usage(),
726            Cache::S3Fifo(cache) => cache.usage(),
727            Cache::Lru(cache) => cache.usage(),
728            Cache::Lfu(cache) => cache.usage(),
729        }
730    }
731
732    /// Hash the given key with the hash builder of the cache.
733    pub fn hash<Q>(&self, key: &Q) -> u64
734    where
735        Q: Hash + ?Sized,
736    {
737        self.hash_builder().hash_one(key)
738    }
739
740    /// Get the hash builder of the in-memory cache.
741    pub fn hash_builder(&self) -> &Arc<S> {
742        match self {
743            Cache::Fifo(cache) => cache.hash_builder(),
744            Cache::S3Fifo(cache) => cache.hash_builder(),
745            Cache::Lru(cache) => cache.hash_builder(),
746            Cache::Lfu(cache) => cache.hash_builder(),
747        }
748    }
749
750    /// Get the shards of the in-memory cache.
751    pub fn shards(&self) -> usize {
752        match self {
753            Cache::Fifo(cache) => cache.shards(),
754            Cache::S3Fifo(cache) => cache.shards(),
755            Cache::Lru(cache) => cache.shards(),
756            Cache::Lfu(cache) => cache.shards(),
757        }
758    }
759
760    /// Set the pipe for the hybrid cache.
761    #[doc(hidden)]
762    pub fn set_pipe(&self, pipe: Box<dyn Pipe<Key = K, Value = V, Properties = P>>) {
763        match self {
764            Cache::Fifo(cache) => cache.set_pipe(pipe),
765            Cache::S3Fifo(cache) => cache.set_pipe(pipe),
766            Cache::Lru(cache) => cache.set_pipe(pipe),
767            Cache::Lfu(cache) => cache.set_pipe(pipe),
768        }
769    }
770
771    /// Evict all entries from the in-memory cache.
772    ///
773    /// Instead of [`Cache::clear`], [`Cache::evict_all`] will send the evicted pipe to the pipe.
774    /// It is useful when the cache is used as a hybrid cache.
775    pub fn evict_all(&self) {
776        match self {
777            Cache::Fifo(cache) => cache.evict_all(),
778            Cache::S3Fifo(cache) => cache.evict_all(),
779            Cache::Lru(cache) => cache.evict_all(),
780            Cache::Lfu(cache) => cache.evict_all(),
781        }
782    }
783
784    /// Evict all entries in the cache and offload them into the disk cache via the pipe if needed.
785    ///
786    /// This function obeys the io throttler of the disk cache and make sure all entries will be offloaded.
787    /// Therefore, this function is asynchronous.
788    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::offload"))]
789    pub async fn flush(&self) {
790        match self {
791            Cache::Fifo(cache) => cache.flush().await,
792            Cache::S3Fifo(cache) => cache.flush().await,
793            Cache::Lru(cache) => cache.flush().await,
794            Cache::Lfu(cache) => cache.flush().await,
795        }
796    }
797}
798
799/// A future that is used to get entry value from the remote storage for the in-memory cache.
800#[pin_project(project = FetchProj)]
801pub enum Fetch<K, V, ER, S = RandomState, P = CacheProperties>
802where
803    K: Key,
804    V: Value,
805    S: HashBuilder,
806    P: Properties,
807{
808    /// A future that is used to get entry value from the remote storage for the in-memory FIFO cache.
809    Fifo(#[pin] FifoFetch<K, V, ER, S, P>),
810    /// A future that is used to get entry value from the remote storage for the in-memory S3FIFO cache.
811    S3Fifo(#[pin] S3FifoFetch<K, V, ER, S, P>),
812    /// A future that is used to get entry value from the remote storage for the in-memory LRU cache.
813    Lru(#[pin] LruFetch<K, V, ER, S, P>),
814    /// A future that is used to get entry value from the remote storage for the in-memory LFU cache.
815    Lfu(#[pin] LfuFetch<K, V, ER, S, P>),
816}
817
818impl<K, V, ER, S, P> From<FifoFetch<K, V, ER, S, P>> for Fetch<K, V, ER, S, P>
819where
820    K: Key,
821    V: Value,
822    S: HashBuilder,
823    P: Properties,
824{
825    fn from(entry: FifoFetch<K, V, ER, S, P>) -> Self {
826        Self::Fifo(entry)
827    }
828}
829
830impl<K, V, ER, S, P> From<S3FifoFetch<K, V, ER, S, P>> for Fetch<K, V, ER, S, P>
831where
832    K: Key,
833    V: Value,
834    S: HashBuilder,
835    P: Properties,
836{
837    fn from(entry: S3FifoFetch<K, V, ER, S, P>) -> Self {
838        Self::S3Fifo(entry)
839    }
840}
841
842impl<K, V, ER, S, P> From<LruFetch<K, V, ER, S, P>> for Fetch<K, V, ER, S, P>
843where
844    K: Key,
845    V: Value,
846    S: HashBuilder,
847    P: Properties,
848{
849    fn from(entry: LruFetch<K, V, ER, S, P>) -> Self {
850        Self::Lru(entry)
851    }
852}
853
854impl<K, V, ER, S, P> From<LfuFetch<K, V, ER, S, P>> for Fetch<K, V, ER, S, P>
855where
856    K: Key,
857    V: Value,
858    S: HashBuilder,
859    P: Properties,
860{
861    fn from(entry: LfuFetch<K, V, ER, S, P>) -> Self {
862        Self::Lfu(entry)
863    }
864}
865
866impl<K, V, ER, S, P> Future for Fetch<K, V, ER, S, P>
867where
868    K: Key,
869    V: Value,
870    ER: From<oneshot::error::RecvError>,
871    S: HashBuilder,
872    P: Properties,
873{
874    type Output = std::result::Result<CacheEntry<K, V, S, P>, ER>;
875
876    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
877        match self.project() {
878            FetchProj::Fifo(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
879            FetchProj::S3Fifo(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
880            FetchProj::Lru(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
881            FetchProj::Lfu(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
882        }
883    }
884}
885
886impl<K, V, ER, S, P> Fetch<K, V, ER, S, P>
887where
888    K: Key,
889    V: Value,
890    S: HashBuilder,
891    P: Properties,
892{
893    /// Get the fetch state.
894    pub fn state(&self) -> FetchState {
895        match self {
896            Fetch::Fifo(fetch) => fetch.state(),
897            Fetch::S3Fifo(fetch) => fetch.state(),
898            Fetch::Lru(fetch) => fetch.state(),
899            Fetch::Lfu(fetch) => fetch.state(),
900        }
901    }
902
903    /// Get the ext of the fetch.
904    #[doc(hidden)]
905    pub fn store(&self) -> &Option<FetchContext> {
906        match self {
907            Fetch::Fifo(fetch) => fetch.store(),
908            Fetch::S3Fifo(fetch) => fetch.store(),
909            Fetch::Lru(fetch) => fetch.store(),
910            Fetch::Lfu(fetch) => fetch.store(),
911        }
912    }
913}
914
915impl<K, V, S, P> Cache<K, V, S, P>
916where
917    K: Key + Clone,
918    V: Value,
919    S: HashBuilder,
920    P: Properties,
921{
922    /// Get the cached entry with the given key from the in-memory cache.
923    ///
924    /// Use `fetch` to fetch the cache value from the remote storage on cache miss.
925    ///
926    /// The concurrent fetch requests will be deduplicated.
927    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::fetch"))]
928    pub fn fetch<F, FU, ER>(&self, key: K, fetch: F) -> Fetch<K, V, ER, S, P>
929    where
930        F: FnOnce() -> FU,
931        FU: Future<Output = std::result::Result<V, ER>> + Send + 'static,
932        ER: Send + 'static + Debug,
933    {
934        match self {
935            Cache::Fifo(cache) => Fetch::from(cache.fetch(key, fetch)),
936            Cache::S3Fifo(cache) => Fetch::from(cache.fetch(key, fetch)),
937            Cache::Lru(cache) => Fetch::from(cache.fetch(key, fetch)),
938            Cache::Lfu(cache) => Fetch::from(cache.fetch(key, fetch)),
939        }
940    }
941
942    /// Get the cached entry with the given key and properties from the in-memory cache.
943    ///
944    /// Use `fetch` to fetch the cache value from the remote storage on cache miss.
945    ///
946    /// The concurrent fetch requests will be deduplicated.
947    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::fetch"))]
948    pub fn fetch_with_properties<F, FU, ER>(&self, key: K, properties: P, fetch: F) -> Fetch<K, V, ER, S, P>
949    where
950        F: FnOnce() -> FU,
951        FU: Future<Output = std::result::Result<V, ER>> + Send + 'static,
952        ER: Send + 'static + Debug,
953    {
954        match self {
955            Cache::Fifo(cache) => Fetch::from(cache.fetch_with_properties(key, properties, fetch)),
956            Cache::S3Fifo(cache) => Fetch::from(cache.fetch_with_properties(key, properties, fetch)),
957            Cache::Lru(cache) => Fetch::from(cache.fetch_with_properties(key, properties, fetch)),
958            Cache::Lfu(cache) => Fetch::from(cache.fetch_with_properties(key, properties, fetch)),
959        }
960    }
961
962    /// Get the cached entry with the given key from the in-memory cache.
963    ///
964    /// The fetch task will be spawned in the give `runtime`.
965    ///
966    /// Use `fetch` to fetch the cache value from the remote storage on cache miss.
967    ///
968    /// The concurrent fetch requests will be deduplicated.
969    #[doc(hidden)]
970    pub fn fetch_inner<F, FU, ER, ID>(
971        &self,
972        key: K,
973        properties: P,
974        fetch: F,
975        runtime: &SingletonHandle,
976    ) -> Fetch<K, V, ER, S, P>
977    where
978        F: FnOnce() -> FU,
979        FU: Future<Output = ID> + Send + 'static,
980        ER: Send + 'static + Debug,
981        ID: Into<Diversion<std::result::Result<V, ER>, FetchContext>>,
982    {
983        match self {
984            Cache::Fifo(cache) => Fetch::from(cache.fetch_inner(key, properties, fetch, runtime)),
985            Cache::Lru(cache) => Fetch::from(cache.fetch_inner(key, properties, fetch, runtime)),
986            Cache::Lfu(cache) => Fetch::from(cache.fetch_inner(key, properties, fetch, runtime)),
987            Cache::S3Fifo(cache) => Fetch::from(cache.fetch_inner(key, properties, fetch, runtime)),
988        }
989    }
990}
991
992#[cfg(test)]
993mod tests {
994    use std::{ops::Range, time::Duration};
995
996    use futures_util::future::join_all;
997    use itertools::Itertools;
998    use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
999
1000    use super::*;
1001    use crate::eviction::{fifo::FifoConfig, lfu::LfuConfig, lru::LruConfig, s3fifo::S3FifoConfig};
1002
1003    const CAPACITY: usize = 100;
1004    const SHARDS: usize = 4;
1005    const RANGE: Range<u64> = 0..1000;
1006    const OPS: usize = 10000;
1007    const CONCURRENCY: usize = 8;
1008
1009    fn fifo() -> Cache<u64, u64> {
1010        CacheBuilder::new(CAPACITY)
1011            .with_shards(SHARDS)
1012            .with_eviction_config(FifoConfig {})
1013            .build()
1014    }
1015
1016    fn lru() -> Cache<u64, u64> {
1017        CacheBuilder::new(CAPACITY)
1018            .with_shards(SHARDS)
1019            .with_eviction_config(LruConfig {
1020                high_priority_pool_ratio: 0.1,
1021            })
1022            .build()
1023    }
1024
1025    fn lfu() -> Cache<u64, u64> {
1026        CacheBuilder::new(CAPACITY)
1027            .with_shards(SHARDS)
1028            .with_eviction_config(LfuConfig {
1029                window_capacity_ratio: 0.1,
1030                protected_capacity_ratio: 0.8,
1031                cmsketch_eps: 0.001,
1032                cmsketch_confidence: 0.9,
1033            })
1034            .build()
1035    }
1036
1037    fn s3fifo() -> Cache<u64, u64> {
1038        CacheBuilder::new(CAPACITY)
1039            .with_shards(SHARDS)
1040            .with_eviction_config(S3FifoConfig {
1041                small_queue_capacity_ratio: 0.1,
1042                ghost_queue_capacity_ratio: 10.0,
1043                small_to_main_freq_threshold: 2,
1044            })
1045            .build()
1046    }
1047
1048    fn init_cache(cache: &Cache<u64, u64>, rng: &mut StdRng) {
1049        let mut v = RANGE.collect_vec();
1050        v.shuffle(rng);
1051        for i in v {
1052            cache.insert(i, i);
1053        }
1054    }
1055
1056    async fn operate(cache: &Cache<u64, u64>, rng: &mut StdRng) {
1057        let i = rng.random_range(RANGE);
1058        match rng.random_range(0..=3) {
1059            0 => {
1060                let entry = cache.insert(i, i);
1061                assert_eq!(*entry.key(), i);
1062                assert_eq!(entry.key(), entry.value());
1063            }
1064            1 => {
1065                if let Some(entry) = cache.get(&i) {
1066                    assert_eq!(*entry.key(), i);
1067                    assert_eq!(entry.key(), entry.value());
1068                }
1069            }
1070            2 => {
1071                cache.remove(&i);
1072            }
1073            3 => {
1074                let entry = cache
1075                    .fetch(i, || async move {
1076                        tokio::time::sleep(Duration::from_micros(10)).await;
1077                        Ok::<_, tokio::sync::oneshot::error::RecvError>(i)
1078                    })
1079                    .await
1080                    .unwrap();
1081                assert_eq!(*entry.key(), i);
1082                assert_eq!(entry.key(), entry.value());
1083            }
1084            _ => unreachable!(),
1085        }
1086    }
1087
1088    async fn case(cache: Cache<u64, u64>) {
1089        let mut rng = StdRng::seed_from_u64(42);
1090
1091        init_cache(&cache, &mut rng);
1092
1093        let handles = (0..CONCURRENCY)
1094            .map(|_| {
1095                let cache = cache.clone();
1096                let mut rng = rng.clone();
1097                tokio::spawn(async move {
1098                    for _ in 0..OPS {
1099                        operate(&cache, &mut rng).await;
1100                    }
1101                })
1102            })
1103            .collect_vec();
1104
1105        join_all(handles).await;
1106    }
1107
1108    #[tokio::test]
1109    async fn test_fifo_cache() {
1110        case(fifo()).await
1111    }
1112
1113    #[tokio::test]
1114    async fn test_lru_cache() {
1115        case(lru()).await
1116    }
1117
1118    #[tokio::test]
1119    async fn test_lfu_cache() {
1120        case(lfu()).await
1121    }
1122
1123    #[tokio::test]
1124    async fn test_s3fifo_cache() {
1125        case(s3fifo()).await
1126    }
1127}