1use std::{borrow::Cow, fmt::Debug, future::Future, hash::Hash, ops::Deref, sync::Arc};
16
17use ahash::RandomState;
18use equivalent::Equivalent;
19use foyer_common::{
20 code::{HashBuilder, Key, Value},
21 event::EventListener,
22 future::Diversion,
23 metrics::Metrics,
24 properties::{Hint, Location, Properties, Source},
25 runtime::SingletonHandle,
26};
27use mixtrics::{metrics::BoxedRegistry, registry::noop::NoopMetricsRegistry};
28use pin_project::pin_project;
29use serde::{Deserialize, Serialize};
30use tokio::sync::oneshot;
31
32use crate::{
33 eviction::{
34 fifo::{Fifo, FifoConfig},
35 lfu::{Lfu, LfuConfig},
36 lru::{Lru, LruConfig},
37 s3fifo::{S3Fifo, S3FifoConfig},
38 },
39 raw::{FetchContext, FetchState, RawCache, RawCacheConfig, RawCacheEntry, RawFetch, Weighter},
40 Piece, Pipe, Result,
41};
42
43#[derive(Debug, Clone, Default)]
45pub struct CacheProperties {
46 ephemeral: bool,
47 hint: Hint,
48}
49
50impl CacheProperties {
51 pub fn with_ephemeral(mut self, ephemeral: bool) -> Self {
56 self.ephemeral = ephemeral;
57 self
58 }
59
60 pub fn ephemeral(&self) -> bool {
62 self.ephemeral
63 }
64
65 pub fn with_hint(mut self, hint: Hint) -> Self {
67 self.hint = hint;
68 self
69 }
70
71 pub fn hint(&self) -> Hint {
73 self.hint
74 }
75}
76
77impl Properties for CacheProperties {
78 fn with_ephemeral(self, ephemeral: bool) -> Self {
79 self.with_ephemeral(ephemeral)
80 }
81
82 fn ephemeral(&self) -> Option<bool> {
83 Some(self.ephemeral())
84 }
85
86 fn with_hint(self, hint: Hint) -> Self {
87 self.with_hint(hint)
88 }
89
90 fn hint(&self) -> Option<Hint> {
91 Some(self.hint())
92 }
93
94 fn with_location(self, _: Location) -> Self {
95 self
96 }
97
98 fn location(&self) -> Option<Location> {
99 None
100 }
101
102 fn with_source(self, _: Source) -> Self {
103 self
104 }
105
106 fn source(&self) -> Option<Source> {
107 None
108 }
109}
110
111pub type FifoCache<K, V, S = RandomState, P = CacheProperties> = RawCache<Fifo<K, V, P>, S>;
112pub type FifoCacheEntry<K, V, S = RandomState, P = CacheProperties> = RawCacheEntry<Fifo<K, V, P>, S>;
113pub type FifoFetch<K, V, ER, S = RandomState, P = CacheProperties> = RawFetch<Fifo<K, V, P>, ER, S>;
114
115pub type S3FifoCache<K, V, S = RandomState, P = CacheProperties> = RawCache<S3Fifo<K, V, P>, S>;
116pub type S3FifoCacheEntry<K, V, S = RandomState, P = CacheProperties> = RawCacheEntry<S3Fifo<K, V, P>, S>;
117pub type S3FifoFetch<K, V, ER, S = RandomState, P = CacheProperties> = RawFetch<S3Fifo<K, V, P>, ER, S>;
118
119pub type LruCache<K, V, S = RandomState, P = CacheProperties> = RawCache<Lru<K, V, P>, S>;
120pub type LruCacheEntry<K, V, S = RandomState, P = CacheProperties> = RawCacheEntry<Lru<K, V, P>, S>;
121pub type LruFetch<K, V, ER, S = RandomState, P = CacheProperties> = RawFetch<Lru<K, V, P>, ER, S>;
122
123pub type LfuCache<K, V, S = RandomState, P = CacheProperties> = RawCache<Lfu<K, V, P>, S>;
124pub type LfuCacheEntry<K, V, S = RandomState, P = CacheProperties> = RawCacheEntry<Lfu<K, V, P>, S>;
125pub type LfuFetch<K, V, ER, S = RandomState, P = CacheProperties> = RawFetch<Lfu<K, V, P>, ER, S>;
126
127#[derive(Debug)]
129pub enum CacheEntry<K, V, S = RandomState, P = CacheProperties>
130where
131 K: Key,
132 V: Value,
133 S: HashBuilder,
134 P: Properties,
135{
136 Fifo(FifoCacheEntry<K, V, S, P>),
138 S3Fifo(S3FifoCacheEntry<K, V, S, P>),
140 Lru(LruCacheEntry<K, V, S, P>),
142 Lfu(LfuCacheEntry<K, V, S, P>),
144}
145
146impl<K, V, S, P> Clone for CacheEntry<K, V, S, P>
147where
148 K: Key,
149 V: Value,
150 S: HashBuilder,
151 P: Properties,
152{
153 fn clone(&self) -> Self {
154 match self {
155 Self::Fifo(entry) => Self::Fifo(entry.clone()),
156 Self::Lru(entry) => Self::Lru(entry.clone()),
157 Self::Lfu(entry) => Self::Lfu(entry.clone()),
158 Self::S3Fifo(entry) => Self::S3Fifo(entry.clone()),
159 }
160 }
161}
162
163impl<K, V, S, P> Deref for CacheEntry<K, V, S, P>
164where
165 K: Key,
166 V: Value,
167 S: HashBuilder,
168 P: Properties,
169{
170 type Target = V;
171
172 fn deref(&self) -> &Self::Target {
173 match self {
174 CacheEntry::Fifo(entry) => entry.deref(),
175 CacheEntry::Lru(entry) => entry.deref(),
176 CacheEntry::Lfu(entry) => entry.deref(),
177 CacheEntry::S3Fifo(entry) => entry.deref(),
178 }
179 }
180}
181
182impl<K, V, S, P> From<FifoCacheEntry<K, V, S, P>> for CacheEntry<K, V, S, P>
183where
184 K: Key,
185 V: Value,
186 S: HashBuilder,
187 P: Properties,
188{
189 fn from(entry: FifoCacheEntry<K, V, S, P>) -> Self {
190 Self::Fifo(entry)
191 }
192}
193
194impl<K, V, S, P> From<LruCacheEntry<K, V, S, P>> for CacheEntry<K, V, S, P>
195where
196 K: Key,
197 V: Value,
198 S: HashBuilder,
199 P: Properties,
200{
201 fn from(entry: LruCacheEntry<K, V, S, P>) -> Self {
202 Self::Lru(entry)
203 }
204}
205
206impl<K, V, S, P> From<LfuCacheEntry<K, V, S, P>> for CacheEntry<K, V, S, P>
207where
208 K: Key,
209 V: Value,
210 S: HashBuilder,
211 P: Properties,
212{
213 fn from(entry: LfuCacheEntry<K, V, S, P>) -> Self {
214 Self::Lfu(entry)
215 }
216}
217
218impl<K, V, S, P> From<S3FifoCacheEntry<K, V, S, P>> for CacheEntry<K, V, S, P>
219where
220 K: Key,
221 V: Value,
222 S: HashBuilder,
223 P: Properties,
224{
225 fn from(entry: S3FifoCacheEntry<K, V, S, P>) -> Self {
226 Self::S3Fifo(entry)
227 }
228}
229
230impl<K, V, S, P> CacheEntry<K, V, S, P>
231where
232 K: Key,
233 V: Value,
234 S: HashBuilder,
235 P: Properties,
236{
237 pub fn hash(&self) -> u64 {
239 match self {
240 CacheEntry::Fifo(entry) => entry.hash(),
241 CacheEntry::Lru(entry) => entry.hash(),
242 CacheEntry::Lfu(entry) => entry.hash(),
243 CacheEntry::S3Fifo(entry) => entry.hash(),
244 }
245 }
246
247 pub fn key(&self) -> &K {
249 match self {
250 CacheEntry::Fifo(entry) => entry.key(),
251 CacheEntry::Lru(entry) => entry.key(),
252 CacheEntry::Lfu(entry) => entry.key(),
253 CacheEntry::S3Fifo(entry) => entry.key(),
254 }
255 }
256
257 pub fn value(&self) -> &V {
259 match self {
260 CacheEntry::Fifo(entry) => entry.value(),
261 CacheEntry::Lru(entry) => entry.value(),
262 CacheEntry::Lfu(entry) => entry.value(),
263 CacheEntry::S3Fifo(entry) => entry.value(),
264 }
265 }
266
267 pub fn properties(&self) -> &P {
269 match self {
270 CacheEntry::Fifo(entry) => entry.properties(),
271 CacheEntry::Lru(entry) => entry.properties(),
272 CacheEntry::Lfu(entry) => entry.properties(),
273 CacheEntry::S3Fifo(entry) => entry.properties(),
274 }
275 }
276
277 pub fn weight(&self) -> usize {
279 match self {
280 CacheEntry::Fifo(entry) => entry.weight(),
281 CacheEntry::Lru(entry) => entry.weight(),
282 CacheEntry::Lfu(entry) => entry.weight(),
283 CacheEntry::S3Fifo(entry) => entry.weight(),
284 }
285 }
286
287 pub fn refs(&self) -> usize {
289 match self {
290 CacheEntry::Fifo(entry) => entry.refs(),
291 CacheEntry::Lru(entry) => entry.refs(),
292 CacheEntry::Lfu(entry) => entry.refs(),
293 CacheEntry::S3Fifo(entry) => entry.refs(),
294 }
295 }
296
297 pub fn is_outdated(&self) -> bool {
299 match self {
300 CacheEntry::Fifo(entry) => entry.is_outdated(),
301 CacheEntry::Lru(entry) => entry.is_outdated(),
302 CacheEntry::Lfu(entry) => entry.is_outdated(),
303 CacheEntry::S3Fifo(entry) => entry.is_outdated(),
304 }
305 }
306
307 pub fn piece(&self) -> Piece<K, V, P> {
309 match self {
310 CacheEntry::Fifo(entry) => entry.piece(),
311 CacheEntry::Lru(entry) => entry.piece(),
312 CacheEntry::Lfu(entry) => entry.piece(),
313 CacheEntry::S3Fifo(entry) => entry.piece(),
314 }
315 }
316}
317
318#[derive(Debug, Clone, Serialize, Deserialize)]
320pub enum EvictionConfig {
321 Fifo(FifoConfig),
323 S3Fifo(S3FifoConfig),
325 Lru(LruConfig),
327 Lfu(LfuConfig),
329}
330
331impl From<FifoConfig> for EvictionConfig {
332 fn from(value: FifoConfig) -> EvictionConfig {
333 EvictionConfig::Fifo(value)
334 }
335}
336
337impl From<S3FifoConfig> for EvictionConfig {
338 fn from(value: S3FifoConfig) -> EvictionConfig {
339 EvictionConfig::S3Fifo(value)
340 }
341}
342
343impl From<LruConfig> for EvictionConfig {
344 fn from(value: LruConfig) -> EvictionConfig {
345 EvictionConfig::Lru(value)
346 }
347}
348
349impl From<LfuConfig> for EvictionConfig {
350 fn from(value: LfuConfig) -> EvictionConfig {
351 EvictionConfig::Lfu(value)
352 }
353}
354
355pub struct CacheBuilder<K, V, S>
357where
358 K: Key,
359 V: Value,
360 S: HashBuilder,
361{
362 name: Cow<'static, str>,
363
364 capacity: usize,
365 shards: usize,
366 eviction_config: EvictionConfig,
367
368 hash_builder: S,
369 weighter: Arc<dyn Weighter<K, V>>,
370
371 event_listener: Option<Arc<dyn EventListener<Key = K, Value = V>>>,
372
373 registry: BoxedRegistry,
374 metrics: Option<Arc<Metrics>>,
375}
376
377impl<K, V> CacheBuilder<K, V, RandomState>
378where
379 K: Key,
380 V: Value,
381{
382 pub fn new(capacity: usize) -> Self {
384 Self {
385 name: "foyer".into(),
386
387 capacity,
388 shards: 8,
389 eviction_config: LruConfig::default().into(),
390
391 hash_builder: RandomState::default(),
392 weighter: Arc::new(|_, _| 1),
393 event_listener: None,
394
395 registry: Box::new(NoopMetricsRegistry),
396 metrics: None,
397 }
398 }
399}
400
401impl<K, V, S> CacheBuilder<K, V, S>
402where
403 K: Key,
404 V: Value,
405 S: HashBuilder,
406{
407 pub fn with_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
413 self.name = name.into();
414 self
415 }
416
417 pub fn with_shards(mut self, shards: usize) -> Self {
420 self.shards = shards;
421 self
422 }
423
424 pub fn with_eviction_config(mut self, eviction_config: impl Into<EvictionConfig>) -> Self {
428 self.eviction_config = eviction_config.into();
429 self
430 }
431
432 pub fn with_hash_builder<OS>(self, hash_builder: OS) -> CacheBuilder<K, V, OS>
434 where
435 OS: HashBuilder,
436 {
437 CacheBuilder {
438 name: self.name,
439 capacity: self.capacity,
440 shards: self.shards,
441 eviction_config: self.eviction_config,
442 hash_builder,
443 weighter: self.weighter,
444 event_listener: self.event_listener,
445 registry: self.registry,
446 metrics: self.metrics,
447 }
448 }
449
450 pub fn with_weighter(mut self, weighter: impl Weighter<K, V>) -> Self {
452 self.weighter = Arc::new(weighter);
453 self
454 }
455
456 pub fn with_event_listener(mut self, event_listener: Arc<dyn EventListener<Key = K, Value = V>>) -> Self {
458 self.event_listener = Some(event_listener);
459 self
460 }
461
462 pub fn with_metrics_registry(mut self, registry: BoxedRegistry) -> CacheBuilder<K, V, S> {
466 self.registry = registry;
467 self
468 }
469
470 #[doc(hidden)]
474 pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
475 self.metrics = Some(metrics);
476 self
477 }
478
479 pub fn build<P>(self) -> Cache<K, V, S, P>
481 where
482 P: Properties,
483 {
484 if self.capacity < self.shards {
485 tracing::warn!(
486 "The in-memory cache capacity({}) < shards({}).",
487 self.capacity,
488 self.shards
489 );
490 }
491
492 let metrics = self
493 .metrics
494 .unwrap_or_else(|| Arc::new(Metrics::new(self.name, &self.registry)));
495
496 match self.eviction_config {
497 EvictionConfig::Fifo(eviction_config) => Cache::Fifo(Arc::new(RawCache::new(RawCacheConfig {
498 capacity: self.capacity,
499 shards: self.shards,
500 eviction_config,
501 hash_builder: self.hash_builder,
502 weighter: self.weighter,
503 event_listener: self.event_listener,
504 metrics,
505 }))),
506 EvictionConfig::S3Fifo(eviction_config) => Cache::S3Fifo(Arc::new(RawCache::new(RawCacheConfig {
507 capacity: self.capacity,
508 shards: self.shards,
509 eviction_config,
510 hash_builder: self.hash_builder,
511 weighter: self.weighter,
512 event_listener: self.event_listener,
513 metrics,
514 }))),
515 EvictionConfig::Lru(eviction_config) => Cache::Lru(Arc::new(RawCache::new(RawCacheConfig {
516 capacity: self.capacity,
517 shards: self.shards,
518 eviction_config,
519 hash_builder: self.hash_builder,
520 weighter: self.weighter,
521 event_listener: self.event_listener,
522 metrics,
523 }))),
524 EvictionConfig::Lfu(eviction_config) => Cache::Lfu(Arc::new(RawCache::new(RawCacheConfig {
525 capacity: self.capacity,
526 shards: self.shards,
527 eviction_config,
528 hash_builder: self.hash_builder,
529 weighter: self.weighter,
530 event_listener: self.event_listener,
531 metrics,
532 }))),
533 }
534 }
535}
536
537pub enum Cache<K, V, S = RandomState, P = CacheProperties>
539where
540 K: Key,
541 V: Value,
542 S: HashBuilder,
543 P: Properties,
544{
545 Fifo(Arc<FifoCache<K, V, S, P>>),
547 Lru(Arc<LruCache<K, V, S, P>>),
549 Lfu(Arc<LfuCache<K, V, S, P>>),
551 S3Fifo(Arc<S3FifoCache<K, V, S, P>>),
553}
554
555impl<K, V, S, P> Debug for Cache<K, V, S, P>
556where
557 K: Key,
558 V: Value,
559 S: HashBuilder,
560 P: Properties,
561{
562 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
563 match self {
564 Self::Fifo(_) => f.debug_tuple("Cache::FifoCache").finish(),
565 Self::S3Fifo(_) => f.debug_tuple("Cache::S3FifoCache").finish(),
566 Self::Lru(_) => f.debug_tuple("Cache::LruCache").finish(),
567 Self::Lfu(_) => f.debug_tuple("Cache::LfuCache").finish(),
568 }
569 }
570}
571
572impl<K, V, S, P> Clone for Cache<K, V, S, P>
573where
574 K: Key,
575 V: Value,
576 S: HashBuilder,
577 P: Properties,
578{
579 fn clone(&self) -> Self {
580 match self {
581 Self::Fifo(cache) => Self::Fifo(cache.clone()),
582 Self::S3Fifo(cache) => Self::S3Fifo(cache.clone()),
583 Self::Lru(cache) => Self::Lru(cache.clone()),
584 Self::Lfu(cache) => Self::Lfu(cache.clone()),
585 }
586 }
587}
588
589impl<K, V> Cache<K, V, RandomState, CacheProperties>
590where
591 K: Key,
592 V: Value,
593{
594 pub fn builder(capacity: usize) -> CacheBuilder<K, V, RandomState> {
596 CacheBuilder::new(capacity)
597 }
598}
599
600impl<K, V, S, P> Cache<K, V, S, P>
601where
602 K: Key,
603 V: Value,
604 S: HashBuilder,
605 P: Properties,
606{
607 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::resize"))]
609 pub fn resize(&self, capacity: usize) -> Result<()> {
610 match self {
611 Cache::Fifo(cache) => cache.resize(capacity),
612 Cache::S3Fifo(cache) => cache.resize(capacity),
613 Cache::Lru(cache) => cache.resize(capacity),
614 Cache::Lfu(cache) => cache.resize(capacity),
615 }
616 }
617
618 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::insert"))]
620 pub fn insert(&self, key: K, value: V) -> CacheEntry<K, V, S, P> {
621 match self {
622 Cache::Fifo(cache) => cache.insert(key, value).into(),
623 Cache::S3Fifo(cache) => cache.insert(key, value).into(),
624 Cache::Lru(cache) => cache.insert(key, value).into(),
625 Cache::Lfu(cache) => cache.insert(key, value).into(),
626 }
627 }
628
629 #[cfg_attr(
631 feature = "tracing",
632 fastrace::trace(name = "foyer::memory::cache::insert_with_properties")
633 )]
634 pub fn insert_with_properties(&self, key: K, value: V, properties: P) -> CacheEntry<K, V, S, P> {
635 match self {
636 Cache::Fifo(cache) => cache.insert_with_properties(key, value, properties).into(),
637 Cache::S3Fifo(cache) => cache.insert_with_properties(key, value, properties).into(),
638 Cache::Lru(cache) => cache.insert_with_properties(key, value, properties).into(),
639 Cache::Lfu(cache) => cache.insert_with_properties(key, value, properties).into(),
640 }
641 }
642
643 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::remove"))]
645 pub fn remove<Q>(&self, key: &Q) -> Option<CacheEntry<K, V, S, P>>
646 where
647 Q: Hash + Equivalent<K> + ?Sized,
648 {
649 match self {
650 Cache::Fifo(cache) => cache.remove(key).map(CacheEntry::from),
651 Cache::S3Fifo(cache) => cache.remove(key).map(CacheEntry::from),
652 Cache::Lru(cache) => cache.remove(key).map(CacheEntry::from),
653 Cache::Lfu(cache) => cache.remove(key).map(CacheEntry::from),
654 }
655 }
656
657 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::get"))]
659 pub fn get<Q>(&self, key: &Q) -> Option<CacheEntry<K, V, S, P>>
660 where
661 Q: Hash + Equivalent<K> + ?Sized,
662 {
663 match self {
664 Cache::Fifo(cache) => cache.get(key).map(CacheEntry::from),
665 Cache::S3Fifo(cache) => cache.get(key).map(CacheEntry::from),
666 Cache::Lru(cache) => cache.get(key).map(CacheEntry::from),
667 Cache::Lfu(cache) => cache.get(key).map(CacheEntry::from),
668 }
669 }
670
671 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::contains"))]
673 pub fn contains<Q>(&self, key: &Q) -> bool
674 where
675 Q: Hash + Equivalent<K> + ?Sized,
676 {
677 match self {
678 Cache::Fifo(cache) => cache.contains(key),
679 Cache::S3Fifo(cache) => cache.contains(key),
680 Cache::Lru(cache) => cache.contains(key),
681 Cache::Lfu(cache) => cache.contains(key),
682 }
683 }
684
685 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::touch"))]
689 pub fn touch<Q>(&self, key: &Q) -> bool
690 where
691 Q: Hash + Equivalent<K> + ?Sized,
692 {
693 match self {
694 Cache::Fifo(cache) => cache.touch(key),
695 Cache::S3Fifo(cache) => cache.touch(key),
696 Cache::Lru(cache) => cache.touch(key),
697 Cache::Lfu(cache) => cache.touch(key),
698 }
699 }
700
701 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::clear"))]
703 pub fn clear(&self) {
704 match self {
705 Cache::Fifo(cache) => cache.clear(),
706 Cache::S3Fifo(cache) => cache.clear(),
707 Cache::Lru(cache) => cache.clear(),
708 Cache::Lfu(cache) => cache.clear(),
709 }
710 }
711
712 pub fn capacity(&self) -> usize {
714 match self {
715 Cache::Fifo(cache) => cache.capacity(),
716 Cache::S3Fifo(cache) => cache.capacity(),
717 Cache::Lru(cache) => cache.capacity(),
718 Cache::Lfu(cache) => cache.capacity(),
719 }
720 }
721
722 pub fn usage(&self) -> usize {
724 match self {
725 Cache::Fifo(cache) => cache.usage(),
726 Cache::S3Fifo(cache) => cache.usage(),
727 Cache::Lru(cache) => cache.usage(),
728 Cache::Lfu(cache) => cache.usage(),
729 }
730 }
731
732 pub fn hash<Q>(&self, key: &Q) -> u64
734 where
735 Q: Hash + ?Sized,
736 {
737 self.hash_builder().hash_one(key)
738 }
739
740 pub fn hash_builder(&self) -> &Arc<S> {
742 match self {
743 Cache::Fifo(cache) => cache.hash_builder(),
744 Cache::S3Fifo(cache) => cache.hash_builder(),
745 Cache::Lru(cache) => cache.hash_builder(),
746 Cache::Lfu(cache) => cache.hash_builder(),
747 }
748 }
749
750 pub fn shards(&self) -> usize {
752 match self {
753 Cache::Fifo(cache) => cache.shards(),
754 Cache::S3Fifo(cache) => cache.shards(),
755 Cache::Lru(cache) => cache.shards(),
756 Cache::Lfu(cache) => cache.shards(),
757 }
758 }
759
760 #[doc(hidden)]
762 pub fn set_pipe(&self, pipe: Box<dyn Pipe<Key = K, Value = V, Properties = P>>) {
763 match self {
764 Cache::Fifo(cache) => cache.set_pipe(pipe),
765 Cache::S3Fifo(cache) => cache.set_pipe(pipe),
766 Cache::Lru(cache) => cache.set_pipe(pipe),
767 Cache::Lfu(cache) => cache.set_pipe(pipe),
768 }
769 }
770
771 pub fn evict_all(&self) {
776 match self {
777 Cache::Fifo(cache) => cache.evict_all(),
778 Cache::S3Fifo(cache) => cache.evict_all(),
779 Cache::Lru(cache) => cache.evict_all(),
780 Cache::Lfu(cache) => cache.evict_all(),
781 }
782 }
783
784 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::offload"))]
789 pub async fn flush(&self) {
790 match self {
791 Cache::Fifo(cache) => cache.flush().await,
792 Cache::S3Fifo(cache) => cache.flush().await,
793 Cache::Lru(cache) => cache.flush().await,
794 Cache::Lfu(cache) => cache.flush().await,
795 }
796 }
797}
798
799#[pin_project(project = FetchProj)]
801pub enum Fetch<K, V, ER, S = RandomState, P = CacheProperties>
802where
803 K: Key,
804 V: Value,
805 S: HashBuilder,
806 P: Properties,
807{
808 Fifo(#[pin] FifoFetch<K, V, ER, S, P>),
810 S3Fifo(#[pin] S3FifoFetch<K, V, ER, S, P>),
812 Lru(#[pin] LruFetch<K, V, ER, S, P>),
814 Lfu(#[pin] LfuFetch<K, V, ER, S, P>),
816}
817
818impl<K, V, ER, S, P> From<FifoFetch<K, V, ER, S, P>> for Fetch<K, V, ER, S, P>
819where
820 K: Key,
821 V: Value,
822 S: HashBuilder,
823 P: Properties,
824{
825 fn from(entry: FifoFetch<K, V, ER, S, P>) -> Self {
826 Self::Fifo(entry)
827 }
828}
829
830impl<K, V, ER, S, P> From<S3FifoFetch<K, V, ER, S, P>> for Fetch<K, V, ER, S, P>
831where
832 K: Key,
833 V: Value,
834 S: HashBuilder,
835 P: Properties,
836{
837 fn from(entry: S3FifoFetch<K, V, ER, S, P>) -> Self {
838 Self::S3Fifo(entry)
839 }
840}
841
842impl<K, V, ER, S, P> From<LruFetch<K, V, ER, S, P>> for Fetch<K, V, ER, S, P>
843where
844 K: Key,
845 V: Value,
846 S: HashBuilder,
847 P: Properties,
848{
849 fn from(entry: LruFetch<K, V, ER, S, P>) -> Self {
850 Self::Lru(entry)
851 }
852}
853
854impl<K, V, ER, S, P> From<LfuFetch<K, V, ER, S, P>> for Fetch<K, V, ER, S, P>
855where
856 K: Key,
857 V: Value,
858 S: HashBuilder,
859 P: Properties,
860{
861 fn from(entry: LfuFetch<K, V, ER, S, P>) -> Self {
862 Self::Lfu(entry)
863 }
864}
865
866impl<K, V, ER, S, P> Future for Fetch<K, V, ER, S, P>
867where
868 K: Key,
869 V: Value,
870 ER: From<oneshot::error::RecvError>,
871 S: HashBuilder,
872 P: Properties,
873{
874 type Output = std::result::Result<CacheEntry<K, V, S, P>, ER>;
875
876 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
877 match self.project() {
878 FetchProj::Fifo(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
879 FetchProj::S3Fifo(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
880 FetchProj::Lru(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
881 FetchProj::Lfu(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
882 }
883 }
884}
885
886impl<K, V, ER, S, P> Fetch<K, V, ER, S, P>
887where
888 K: Key,
889 V: Value,
890 S: HashBuilder,
891 P: Properties,
892{
893 pub fn state(&self) -> FetchState {
895 match self {
896 Fetch::Fifo(fetch) => fetch.state(),
897 Fetch::S3Fifo(fetch) => fetch.state(),
898 Fetch::Lru(fetch) => fetch.state(),
899 Fetch::Lfu(fetch) => fetch.state(),
900 }
901 }
902
903 #[doc(hidden)]
905 pub fn store(&self) -> &Option<FetchContext> {
906 match self {
907 Fetch::Fifo(fetch) => fetch.store(),
908 Fetch::S3Fifo(fetch) => fetch.store(),
909 Fetch::Lru(fetch) => fetch.store(),
910 Fetch::Lfu(fetch) => fetch.store(),
911 }
912 }
913}
914
915impl<K, V, S, P> Cache<K, V, S, P>
916where
917 K: Key + Clone,
918 V: Value,
919 S: HashBuilder,
920 P: Properties,
921{
922 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::fetch"))]
928 pub fn fetch<F, FU, ER>(&self, key: K, fetch: F) -> Fetch<K, V, ER, S, P>
929 where
930 F: FnOnce() -> FU,
931 FU: Future<Output = std::result::Result<V, ER>> + Send + 'static,
932 ER: Send + 'static + Debug,
933 {
934 match self {
935 Cache::Fifo(cache) => Fetch::from(cache.fetch(key, fetch)),
936 Cache::S3Fifo(cache) => Fetch::from(cache.fetch(key, fetch)),
937 Cache::Lru(cache) => Fetch::from(cache.fetch(key, fetch)),
938 Cache::Lfu(cache) => Fetch::from(cache.fetch(key, fetch)),
939 }
940 }
941
942 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::cache::fetch"))]
948 pub fn fetch_with_properties<F, FU, ER>(&self, key: K, properties: P, fetch: F) -> Fetch<K, V, ER, S, P>
949 where
950 F: FnOnce() -> FU,
951 FU: Future<Output = std::result::Result<V, ER>> + Send + 'static,
952 ER: Send + 'static + Debug,
953 {
954 match self {
955 Cache::Fifo(cache) => Fetch::from(cache.fetch_with_properties(key, properties, fetch)),
956 Cache::S3Fifo(cache) => Fetch::from(cache.fetch_with_properties(key, properties, fetch)),
957 Cache::Lru(cache) => Fetch::from(cache.fetch_with_properties(key, properties, fetch)),
958 Cache::Lfu(cache) => Fetch::from(cache.fetch_with_properties(key, properties, fetch)),
959 }
960 }
961
962 #[doc(hidden)]
970 pub fn fetch_inner<F, FU, ER, ID>(
971 &self,
972 key: K,
973 properties: P,
974 fetch: F,
975 runtime: &SingletonHandle,
976 ) -> Fetch<K, V, ER, S, P>
977 where
978 F: FnOnce() -> FU,
979 FU: Future<Output = ID> + Send + 'static,
980 ER: Send + 'static + Debug,
981 ID: Into<Diversion<std::result::Result<V, ER>, FetchContext>>,
982 {
983 match self {
984 Cache::Fifo(cache) => Fetch::from(cache.fetch_inner(key, properties, fetch, runtime)),
985 Cache::Lru(cache) => Fetch::from(cache.fetch_inner(key, properties, fetch, runtime)),
986 Cache::Lfu(cache) => Fetch::from(cache.fetch_inner(key, properties, fetch, runtime)),
987 Cache::S3Fifo(cache) => Fetch::from(cache.fetch_inner(key, properties, fetch, runtime)),
988 }
989 }
990}
991
992#[cfg(test)]
993mod tests {
994 use std::{ops::Range, time::Duration};
995
996 use futures_util::future::join_all;
997 use itertools::Itertools;
998 use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
999
1000 use super::*;
1001 use crate::eviction::{fifo::FifoConfig, lfu::LfuConfig, lru::LruConfig, s3fifo::S3FifoConfig};
1002
1003 const CAPACITY: usize = 100;
1004 const SHARDS: usize = 4;
1005 const RANGE: Range<u64> = 0..1000;
1006 const OPS: usize = 10000;
1007 const CONCURRENCY: usize = 8;
1008
1009 fn fifo() -> Cache<u64, u64> {
1010 CacheBuilder::new(CAPACITY)
1011 .with_shards(SHARDS)
1012 .with_eviction_config(FifoConfig {})
1013 .build()
1014 }
1015
1016 fn lru() -> Cache<u64, u64> {
1017 CacheBuilder::new(CAPACITY)
1018 .with_shards(SHARDS)
1019 .with_eviction_config(LruConfig {
1020 high_priority_pool_ratio: 0.1,
1021 })
1022 .build()
1023 }
1024
1025 fn lfu() -> Cache<u64, u64> {
1026 CacheBuilder::new(CAPACITY)
1027 .with_shards(SHARDS)
1028 .with_eviction_config(LfuConfig {
1029 window_capacity_ratio: 0.1,
1030 protected_capacity_ratio: 0.8,
1031 cmsketch_eps: 0.001,
1032 cmsketch_confidence: 0.9,
1033 })
1034 .build()
1035 }
1036
1037 fn s3fifo() -> Cache<u64, u64> {
1038 CacheBuilder::new(CAPACITY)
1039 .with_shards(SHARDS)
1040 .with_eviction_config(S3FifoConfig {
1041 small_queue_capacity_ratio: 0.1,
1042 ghost_queue_capacity_ratio: 10.0,
1043 small_to_main_freq_threshold: 2,
1044 })
1045 .build()
1046 }
1047
1048 fn init_cache(cache: &Cache<u64, u64>, rng: &mut StdRng) {
1049 let mut v = RANGE.collect_vec();
1050 v.shuffle(rng);
1051 for i in v {
1052 cache.insert(i, i);
1053 }
1054 }
1055
1056 async fn operate(cache: &Cache<u64, u64>, rng: &mut StdRng) {
1057 let i = rng.random_range(RANGE);
1058 match rng.random_range(0..=3) {
1059 0 => {
1060 let entry = cache.insert(i, i);
1061 assert_eq!(*entry.key(), i);
1062 assert_eq!(entry.key(), entry.value());
1063 }
1064 1 => {
1065 if let Some(entry) = cache.get(&i) {
1066 assert_eq!(*entry.key(), i);
1067 assert_eq!(entry.key(), entry.value());
1068 }
1069 }
1070 2 => {
1071 cache.remove(&i);
1072 }
1073 3 => {
1074 let entry = cache
1075 .fetch(i, || async move {
1076 tokio::time::sleep(Duration::from_micros(10)).await;
1077 Ok::<_, tokio::sync::oneshot::error::RecvError>(i)
1078 })
1079 .await
1080 .unwrap();
1081 assert_eq!(*entry.key(), i);
1082 assert_eq!(entry.key(), entry.value());
1083 }
1084 _ => unreachable!(),
1085 }
1086 }
1087
1088 async fn case(cache: Cache<u64, u64>) {
1089 let mut rng = StdRng::seed_from_u64(42);
1090
1091 init_cache(&cache, &mut rng);
1092
1093 let handles = (0..CONCURRENCY)
1094 .map(|_| {
1095 let cache = cache.clone();
1096 let mut rng = rng.clone();
1097 tokio::spawn(async move {
1098 for _ in 0..OPS {
1099 operate(&cache, &mut rng).await;
1100 }
1101 })
1102 })
1103 .collect_vec();
1104
1105 join_all(handles).await;
1106 }
1107
1108 #[tokio::test]
1109 async fn test_fifo_cache() {
1110 case(fifo()).await
1111 }
1112
1113 #[tokio::test]
1114 async fn test_lru_cache() {
1115 case(lru()).await
1116 }
1117
1118 #[tokio::test]
1119 async fn test_lfu_cache() {
1120 case(lfu()).await
1121 }
1122
1123 #[tokio::test]
1124 async fn test_s3fifo_cache() {
1125 case(s3fifo()).await
1126 }
1127}