foyer_memory/
raw.rs

1// Copyright 2025 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::hash_map::{Entry as HashMapEntry, HashMap},
17    fmt::Debug,
18    future::Future,
19    hash::Hash,
20    ops::Deref,
21    pin::Pin,
22    sync::Arc,
23    task::{Context, Poll},
24};
25
26use arc_swap::ArcSwap;
27use equivalent::Equivalent;
28#[cfg(feature = "tracing")]
29use fastrace::{
30    future::{FutureExt, InSpan},
31    Span,
32};
33use foyer_common::{
34    code::HashBuilder,
35    event::{Event, EventListener},
36    future::{Diversion, DiversionFuture},
37    metrics::Metrics,
38    properties::{Location, Properties, Source},
39    runtime::SingletonHandle,
40    strict_assert,
41    utils::scope::Scope,
42};
43use itertools::Itertools;
44use parking_lot::{Mutex, RwLock};
45use pin_project::pin_project;
46use tokio::{sync::oneshot, task::JoinHandle};
47
48use crate::{
49    error::{Error, Result},
50    eviction::{Eviction, Op},
51    indexer::{hash_table::HashTableIndexer, sentry::Sentry, Indexer},
52    pipe::NoopPipe,
53    record::{Data, Record},
54    Piece, Pipe,
55};
56
57/// The weighter for the in-memory cache.
58///
59/// The weighter is used to calculate the weight of the cache entry.
60pub trait Weighter<K, V>: Fn(&K, &V) -> usize + Send + Sync + 'static {}
61impl<K, V, T> Weighter<K, V> for T where T: Fn(&K, &V) -> usize + Send + Sync + 'static {}
62
63/// The filter for the in-memory cache.
64///
65/// The filter is used to decide whether to admit or reject an entry based on its key and value.
66///
67/// If the filter returns true, the key value can be inserted into the in-memory cache;
68/// otherwise, the key value cannot be inserted.
69///
70/// To ensure API consistency, the in-memory cache will still return a cache entry,
71/// but it will not count towards the in-memory cache usage,
72/// and it will be immediately reclaimed when the cache entry is dropped.
73pub trait Filter<K, V>: Fn(&K, &V) -> bool + Send + Sync + 'static {}
74impl<K, V, T> Filter<K, V> for T where T: Fn(&K, &V) -> bool + Send + Sync + 'static {}
75
76pub struct RawCacheConfig<E, S>
77where
78    E: Eviction,
79    S: HashBuilder,
80{
81    pub capacity: usize,
82    pub shards: usize,
83    pub eviction_config: E::Config,
84    pub hash_builder: S,
85    pub weighter: Arc<dyn Weighter<E::Key, E::Value>>,
86    pub filter: Arc<dyn Filter<E::Key, E::Value>>,
87    pub event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
88    pub metrics: Arc<Metrics>,
89}
90
91struct RawCacheShard<E, S, I>
92where
93    E: Eviction,
94    S: HashBuilder,
95    I: Indexer<Eviction = E>,
96{
97    eviction: E,
98    indexer: Sentry<I>,
99
100    usage: usize,
101    capacity: usize,
102
103    #[expect(clippy::type_complexity)]
104    waiters: Mutex<HashMap<E::Key, Vec<oneshot::Sender<RawCacheEntry<E, S, I>>>>>,
105
106    metrics: Arc<Metrics>,
107    _event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
108}
109
110impl<E, S, I> RawCacheShard<E, S, I>
111where
112    E: Eviction,
113    S: HashBuilder,
114    I: Indexer<Eviction = E>,
115{
116    /// Evict entries to fit the target usage.
117    fn evict(&mut self, target: usize, garbages: &mut Vec<(Event, Arc<Record<E>>)>) {
118        // Evict overflow records.
119        while self.usage > target {
120            let evicted = match self.eviction.pop() {
121                Some(evicted) => evicted,
122                None => break,
123            };
124            self.metrics.memory_evict.increase(1);
125
126            let e = self.indexer.remove(evicted.hash(), evicted.key()).unwrap();
127            assert_eq!(Arc::as_ptr(&evicted), Arc::as_ptr(&e));
128
129            strict_assert!(!evicted.as_ref().is_in_indexer());
130            strict_assert!(!evicted.as_ref().is_in_eviction());
131
132            self.usage -= evicted.weight();
133
134            garbages.push((Event::Evict, evicted));
135        }
136    }
137
138    fn emplace(
139        &mut self,
140        record: Arc<Record<E>>,
141        garbages: &mut Vec<(Event, Arc<Record<E>>)>,
142        waiters: &mut Vec<oneshot::Sender<RawCacheEntry<E, S, I>>>,
143    ) {
144        *waiters = self.waiters.lock().remove(record.key()).unwrap_or_default();
145
146        if record.properties().phantom().unwrap_or_default() {
147            if let Some(old) = self.indexer.remove(record.hash(), record.key()) {
148                strict_assert!(!old.is_in_indexer());
149
150                if old.is_in_eviction() {
151                    self.eviction.remove(&old);
152                }
153                strict_assert!(!old.is_in_eviction());
154
155                self.usage -= old.weight();
156
157                garbages.push((Event::Replace, old));
158            }
159            record.inc_refs(waiters.len() + 1);
160            garbages.push((Event::Remove, record));
161            self.metrics.memory_insert.increase(1);
162            return;
163        }
164
165        let weight = record.weight();
166        let old_usage = self.usage;
167
168        // Evict overflow records.
169        self.evict(self.capacity.saturating_sub(weight), garbages);
170
171        // Insert new record
172        if let Some(old) = self.indexer.insert(record.clone()) {
173            self.metrics.memory_replace.increase(1);
174
175            strict_assert!(!old.is_in_indexer());
176
177            if old.is_in_eviction() {
178                self.eviction.remove(&old);
179            }
180            strict_assert!(!old.is_in_eviction());
181
182            self.usage -= old.weight();
183
184            garbages.push((Event::Replace, old));
185        } else {
186            self.metrics.memory_insert.increase(1);
187        }
188        strict_assert!(record.is_in_indexer());
189
190        strict_assert!(!record.is_in_eviction());
191        self.eviction.push(record.clone());
192        strict_assert!(record.is_in_eviction());
193
194        self.usage += weight;
195        // Increase the reference count within the lock section.
196        // The reference count of the new record must be at the moment.
197        record.inc_refs(waiters.len() + 1);
198
199        match self.usage.cmp(&old_usage) {
200            std::cmp::Ordering::Greater => self.metrics.memory_usage.increase((self.usage - old_usage) as _),
201            std::cmp::Ordering::Less => self.metrics.memory_usage.decrease((old_usage - self.usage) as _),
202            std::cmp::Ordering::Equal => {}
203        }
204    }
205
206    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::remove"))]
207    fn remove<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
208    where
209        Q: Hash + Equivalent<E::Key> + ?Sized,
210    {
211        let record = self.indexer.remove(hash, key)?;
212
213        if record.is_in_eviction() {
214            self.eviction.remove(&record);
215        }
216        strict_assert!(!record.is_in_indexer());
217        strict_assert!(!record.is_in_eviction());
218
219        self.usage -= record.weight();
220
221        self.metrics.memory_remove.increase(1);
222        self.metrics.memory_usage.decrease(record.weight() as _);
223
224        record.inc_refs(1);
225
226        Some(record)
227    }
228
229    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_noop"))]
230    fn get_noop<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
231    where
232        Q: Hash + Equivalent<E::Key> + ?Sized,
233    {
234        self.get_inner(hash, key)
235    }
236
237    #[cfg_attr(
238        feature = "tracing",
239        fastrace::trace(name = "foyer::memory::raw::shard::get_immutable")
240    )]
241    fn get_immutable<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
242    where
243        Q: Hash + Equivalent<E::Key> + ?Sized,
244    {
245        self.get_inner(hash, key)
246            .inspect(|record| self.acquire_immutable(record))
247    }
248
249    #[cfg_attr(
250        feature = "tracing",
251        fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")
252    )]
253    fn get_mutable<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
254    where
255        Q: Hash + Equivalent<E::Key> + ?Sized,
256    {
257        self.get_inner(hash, key).inspect(|record| self.acquire_mutable(record))
258    }
259
260    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_inner"))]
261    fn get_inner<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
262    where
263        Q: Hash + Equivalent<E::Key> + ?Sized,
264    {
265        let record = match self.indexer.get(hash, key).cloned() {
266            Some(record) => {
267                self.metrics.memory_hit.increase(1);
268                record
269            }
270            None => {
271                self.metrics.memory_miss.increase(1);
272                return None;
273            }
274        };
275
276        strict_assert!(record.is_in_indexer());
277
278        record.inc_refs(1);
279
280        Some(record)
281    }
282
283    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::clear"))]
284    fn clear(&mut self, garbages: &mut Vec<Arc<Record<E>>>) {
285        let records = self.indexer.drain().collect_vec();
286        self.eviction.clear();
287
288        let mut count = 0;
289
290        for record in records {
291            count += 1;
292            strict_assert!(!record.is_in_indexer());
293            strict_assert!(!record.is_in_eviction());
294
295            garbages.push(record);
296        }
297
298        self.metrics.memory_remove.increase(count);
299    }
300
301    #[cfg_attr(
302        feature = "tracing",
303        fastrace::trace(name = "foyer::memory::raw::shard::acquire_immutable")
304    )]
305    fn acquire_immutable(&self, record: &Arc<Record<E>>) {
306        match E::acquire() {
307            Op::Immutable(f) => f(&self.eviction, record),
308            _ => unreachable!(),
309        }
310    }
311
312    #[cfg_attr(
313        feature = "tracing",
314        fastrace::trace(name = "foyer::memory::raw::shard::acquire_mutable")
315    )]
316    fn acquire_mutable(&mut self, record: &Arc<Record<E>>) {
317        match E::acquire() {
318            Op::Mutable(mut f) => f(&mut self.eviction, record),
319            _ => unreachable!(),
320        }
321    }
322
323    #[cfg_attr(
324        feature = "tracing",
325        fastrace::trace(name = "foyer::memory::raw::shard::release_immutable")
326    )]
327    fn release_immutable(&self, record: &Arc<Record<E>>) {
328        match E::release() {
329            Op::Immutable(f) => f(&self.eviction, record),
330            _ => unreachable!(),
331        }
332    }
333
334    #[cfg_attr(
335        feature = "tracing",
336        fastrace::trace(name = "foyer::memory::raw::shard::release_mutable")
337    )]
338    fn release_mutable(&mut self, record: &Arc<Record<E>>) {
339        match E::release() {
340            Op::Mutable(mut f) => f(&mut self.eviction, record),
341            _ => unreachable!(),
342        }
343    }
344
345    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::fetch_noop"))]
346    fn fetch_noop(&self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
347    where
348        E::Key: Clone,
349    {
350        if let Some(record) = self.get_noop(hash, key) {
351            return RawShardFetch::Hit(record);
352        }
353
354        self.fetch_queue(key.clone())
355    }
356
357    #[cfg_attr(
358        feature = "tracing",
359        fastrace::trace(name = "foyer::memory::raw::shard::fetch_immutable")
360    )]
361    fn fetch_immutable(&self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
362    where
363        E::Key: Clone,
364    {
365        if let Some(record) = self.get_immutable(hash, key) {
366            return RawShardFetch::Hit(record);
367        }
368
369        self.fetch_queue(key.clone())
370    }
371
372    #[cfg_attr(
373        feature = "tracing",
374        fastrace::trace(name = "foyer::memory::raw::shard::fetch_mutable")
375    )]
376    fn fetch_mutable(&mut self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
377    where
378        E::Key: Clone,
379    {
380        if let Some(record) = self.get_mutable(hash, key) {
381            return RawShardFetch::Hit(record);
382        }
383
384        self.fetch_queue(key.clone())
385    }
386
387    #[cfg_attr(
388        feature = "tracing",
389        fastrace::trace(name = "foyer::memory::raw::shard::fetch_queue")
390    )]
391    fn fetch_queue(&self, key: E::Key) -> RawShardFetch<E, S, I> {
392        match self.waiters.lock().entry(key) {
393            HashMapEntry::Occupied(mut o) => {
394                let (tx, rx) = oneshot::channel();
395                o.get_mut().push(tx);
396                self.metrics.memory_queue.increase(1);
397                #[cfg(feature = "tracing")]
398                let wait = rx.in_span(Span::enter_with_local_parent(
399                    "foyer::memory::raw::fetch_with_runtime::wait",
400                ));
401                #[cfg(not(feature = "tracing"))]
402                let wait = rx;
403                RawShardFetch::Wait(wait)
404            }
405            HashMapEntry::Vacant(v) => {
406                v.insert(vec![]);
407                self.metrics.memory_fetch.increase(1);
408                RawShardFetch::Miss
409            }
410        }
411    }
412}
413
414#[expect(clippy::type_complexity)]
415struct RawCacheInner<E, S, I>
416where
417    E: Eviction,
418    S: HashBuilder,
419    I: Indexer<Eviction = E>,
420{
421    shards: Vec<RwLock<RawCacheShard<E, S, I>>>,
422
423    capacity: usize,
424
425    hash_builder: Arc<S>,
426    weighter: Arc<dyn Weighter<E::Key, E::Value>>,
427    filter: Arc<dyn Filter<E::Key, E::Value>>,
428
429    metrics: Arc<Metrics>,
430    event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
431    pipe: ArcSwap<Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>>>,
432}
433
434impl<E, S, I> RawCacheInner<E, S, I>
435where
436    E: Eviction,
437    S: HashBuilder,
438    I: Indexer<Eviction = E>,
439{
440    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::inner::clear"))]
441    fn clear(&self) {
442        let mut garbages = vec![];
443
444        self.shards
445            .iter()
446            .map(|shard| shard.write())
447            .for_each(|mut shard| shard.clear(&mut garbages));
448
449        // Do not deallocate data within the lock section.
450        if let Some(listener) = self.event_listener.as_ref() {
451            for record in garbages {
452                listener.on_leave(Event::Clear, record.key(), record.value());
453            }
454        }
455    }
456}
457
458pub struct RawCache<E, S, I = HashTableIndexer<E>>
459where
460    E: Eviction,
461    S: HashBuilder,
462    I: Indexer<Eviction = E>,
463{
464    inner: Arc<RawCacheInner<E, S, I>>,
465}
466
467impl<E, S, I> Drop for RawCacheInner<E, S, I>
468where
469    E: Eviction,
470    S: HashBuilder,
471    I: Indexer<Eviction = E>,
472{
473    fn drop(&mut self) {
474        self.clear();
475    }
476}
477
478impl<E, S, I> Clone for RawCache<E, S, I>
479where
480    E: Eviction,
481    S: HashBuilder,
482    I: Indexer<Eviction = E>,
483{
484    fn clone(&self) -> Self {
485        Self {
486            inner: self.inner.clone(),
487        }
488    }
489}
490
491impl<E, S, I> RawCache<E, S, I>
492where
493    E: Eviction,
494    S: HashBuilder,
495    I: Indexer<Eviction = E>,
496{
497    pub fn new(config: RawCacheConfig<E, S>) -> Self {
498        let shard_capacity = config.capacity / config.shards;
499
500        let shards = (0..config.shards)
501            .map(|_| RawCacheShard {
502                eviction: E::new(shard_capacity, &config.eviction_config),
503                indexer: Sentry::default(),
504                usage: 0,
505                capacity: shard_capacity,
506                waiters: Mutex::default(),
507                metrics: config.metrics.clone(),
508                _event_listener: config.event_listener.clone(),
509            })
510            .map(RwLock::new)
511            .collect_vec();
512
513        let pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>> =
514            Box::new(NoopPipe::default());
515
516        let inner = RawCacheInner {
517            shards,
518            capacity: config.capacity,
519            hash_builder: Arc::new(config.hash_builder),
520            weighter: config.weighter,
521            filter: config.filter,
522            metrics: config.metrics,
523            event_listener: config.event_listener,
524            pipe: ArcSwap::new(Arc::new(pipe)),
525        };
526
527        Self { inner: Arc::new(inner) }
528    }
529
530    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::resize"))]
531    pub fn resize(&self, capacity: usize) -> Result<()> {
532        let shards = self.inner.shards.len();
533        let shard_capacity = capacity / shards;
534
535        let handles = (0..shards)
536            .map(|i| {
537                let inner = self.inner.clone();
538                std::thread::spawn(move || {
539                    let mut garbages = vec![];
540                    let res = inner.shards[i].write().with(|mut shard| {
541                        shard.eviction.update(shard_capacity, None).inspect(|_| {
542                            shard.capacity = shard_capacity;
543                            shard.evict(shard_capacity, &mut garbages)
544                        })
545                    });
546                    // Deallocate data out of the lock critical section.
547                    let pipe = inner.pipe.load();
548                    let piped = pipe.is_enabled();
549                    if inner.event_listener.is_some() || piped {
550                        for (event, record) in garbages {
551                            if let Some(listener) = inner.event_listener.as_ref() {
552                                listener.on_leave(event, record.key(), record.value())
553                            }
554                            if piped && event == Event::Evict {
555                                pipe.send(Piece::new(record));
556                            }
557                        }
558                    }
559                    res
560                })
561            })
562            .collect_vec();
563
564        let errs = handles
565            .into_iter()
566            .map(|handle| handle.join().unwrap())
567            .filter(|res| res.is_err())
568            .map(|res| res.unwrap_err())
569            .collect_vec();
570        if !errs.is_empty() {
571            return Err(Error::multiple(errs));
572        }
573
574        Ok(())
575    }
576
577    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert"))]
578    pub fn insert(&self, key: E::Key, value: E::Value) -> RawCacheEntry<E, S, I> {
579        self.insert_with_properties(key, value, Default::default())
580    }
581
582    #[cfg_attr(
583        feature = "tracing",
584        fastrace::trace(name = "foyer::memory::raw::insert_with_properties")
585    )]
586    pub fn insert_with_properties(
587        &self,
588        key: E::Key,
589        value: E::Value,
590        mut properties: E::Properties,
591    ) -> RawCacheEntry<E, S, I> {
592        let hash = self.inner.hash_builder.hash_one(&key);
593        let weight = (self.inner.weighter)(&key, &value);
594        if !(self.inner.filter)(&key, &value) {
595            properties = properties.with_phantom(true);
596        }
597        if let Some(location) = properties.location() {
598            if location == Location::OnDisk {
599                properties = properties.with_phantom(true);
600            }
601        }
602        let record = Arc::new(Record::new(Data {
603            key,
604            value,
605            properties,
606            hash,
607            weight,
608        }));
609        self.insert_inner(record)
610    }
611
612    #[doc(hidden)]
613    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_piece"))]
614    pub fn insert_piece(&self, piece: Piece<E::Key, E::Value, E::Properties>) -> RawCacheEntry<E, S, I> {
615        self.insert_inner(piece.into_record())
616    }
617
618    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_inner"))]
619    fn insert_inner(&self, record: Arc<Record<E>>) -> RawCacheEntry<E, S, I> {
620        let mut garbages = vec![];
621        let mut waiters = vec![];
622
623        self.inner.shards[self.shard(record.hash())]
624            .write()
625            .with(|mut shard| shard.emplace(record.clone(), &mut garbages, &mut waiters));
626
627        // Notify waiters out of the lock critical section.
628        for waiter in waiters {
629            let _ = waiter.send(RawCacheEntry {
630                record: record.clone(),
631                inner: self.inner.clone(),
632            });
633        }
634
635        // Deallocate data out of the lock critical section.
636        let pipe = self.inner.pipe.load();
637        let piped = pipe.is_enabled();
638        if self.inner.event_listener.is_some() || piped {
639            for (event, record) in garbages {
640                if let Some(listener) = self.inner.event_listener.as_ref() {
641                    listener.on_leave(event, record.key(), record.value())
642                }
643                if piped && event == Event::Evict {
644                    pipe.send(Piece::new(record));
645                }
646            }
647        }
648
649        RawCacheEntry {
650            record,
651            inner: self.inner.clone(),
652        }
653    }
654
655    /// Evict all entries in the cache and offload them into the disk cache via the pipe if needed.
656    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::evict_all"))]
657    pub fn evict_all(&self) {
658        let mut garbages = vec![];
659        for shard in self.inner.shards.iter() {
660            shard.write().evict(0, &mut garbages);
661        }
662
663        // Deallocate data out of the lock critical section.
664        let pipe = self.inner.pipe.load();
665        let piped = pipe.is_enabled();
666        if self.inner.event_listener.is_some() || piped {
667            for (event, record) in garbages {
668                if let Some(listener) = self.inner.event_listener.as_ref() {
669                    listener.on_leave(event, record.key(), record.value())
670                }
671                if piped && event == Event::Evict {
672                    pipe.send(Piece::new(record));
673                }
674            }
675        }
676    }
677
678    /// Evict all entries in the cache and offload them into the disk cache via the pipe if needed.
679    ///
680    /// This function obeys the io throttler of the disk cache and make sure all entries will be offloaded.
681    /// Therefore, this function is asynchronous.
682    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::flush"))]
683    pub async fn flush(&self) {
684        let mut garbages = vec![];
685        for shard in self.inner.shards.iter() {
686            shard.write().evict(0, &mut garbages);
687        }
688
689        // Deallocate data out of the lock critical section.
690        let pipe = self.inner.pipe.load();
691        let piped = pipe.is_enabled();
692
693        if let Some(listener) = self.inner.event_listener.as_ref() {
694            for (event, record) in garbages.iter() {
695                listener.on_leave(*event, record.key(), record.value());
696            }
697        }
698        if piped {
699            let pieces = garbages.into_iter().map(|(_, record)| Piece::new(record)).collect_vec();
700            pipe.flush(pieces).await;
701        }
702    }
703
704    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::remove"))]
705    pub fn remove<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
706    where
707        Q: Hash + Equivalent<E::Key> + ?Sized,
708    {
709        let hash = self.inner.hash_builder.hash_one(key);
710
711        self.inner.shards[self.shard(hash)]
712            .write()
713            .with(|mut shard| {
714                shard.remove(hash, key).map(|record| RawCacheEntry {
715                    inner: self.inner.clone(),
716                    record,
717                })
718            })
719            .inspect(|record| {
720                // Deallocate data out of the lock critical section.
721                if let Some(listener) = self.inner.event_listener.as_ref() {
722                    listener.on_leave(Event::Remove, record.key(), record.value());
723                }
724            })
725    }
726
727    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get"))]
728    pub fn get<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
729    where
730        Q: Hash + Equivalent<E::Key> + ?Sized,
731    {
732        let hash = self.inner.hash_builder.hash_one(key);
733
734        let record = match E::acquire() {
735            Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
736            Op::Immutable(_) => self.inner.shards[self.shard(hash)]
737                .read()
738                .with(|shard| shard.get_immutable(hash, key)),
739            Op::Mutable(_) => self.inner.shards[self.shard(hash)]
740                .write()
741                .with(|mut shard| shard.get_mutable(hash, key)),
742        }?;
743
744        Some(RawCacheEntry {
745            inner: self.inner.clone(),
746            record,
747        })
748    }
749
750    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::contains"))]
751    pub fn contains<Q>(&self, key: &Q) -> bool
752    where
753        Q: Hash + Equivalent<E::Key> + ?Sized,
754    {
755        let hash = self.inner.hash_builder.hash_one(key);
756
757        self.inner.shards[self.shard(hash)]
758            .read()
759            .with(|shard| shard.indexer.get(hash, key).is_some())
760    }
761
762    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::touch"))]
763    pub fn touch<Q>(&self, key: &Q) -> bool
764    where
765        Q: Hash + Equivalent<E::Key> + ?Sized,
766    {
767        let hash = self.inner.hash_builder.hash_one(key);
768
769        match E::acquire() {
770            Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
771            Op::Immutable(_) => self.inner.shards[self.shard(hash)]
772                .read()
773                .with(|shard| shard.get_immutable(hash, key)),
774            Op::Mutable(_) => self.inner.shards[self.shard(hash)]
775                .write()
776                .with(|mut shard| shard.get_mutable(hash, key)),
777        }
778        .is_some()
779    }
780
781    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::clear"))]
782    pub fn clear(&self) {
783        self.inner.clear();
784    }
785
786    pub fn capacity(&self) -> usize {
787        self.inner.capacity
788    }
789
790    pub fn usage(&self) -> usize {
791        self.inner.shards.iter().map(|shard| shard.read().usage).sum()
792    }
793
794    pub fn metrics(&self) -> &Metrics {
795        &self.inner.metrics
796    }
797
798    pub fn hash_builder(&self) -> &Arc<S> {
799        &self.inner.hash_builder
800    }
801
802    pub fn shards(&self) -> usize {
803        self.inner.shards.len()
804    }
805
806    pub fn set_pipe(&self, pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>>) {
807        self.inner.pipe.store(Arc::new(pipe));
808    }
809
810    fn shard(&self, hash: u64) -> usize {
811        hash as usize % self.inner.shards.len()
812    }
813}
814
815pub struct RawCacheEntry<E, S, I = HashTableIndexer<E>>
816where
817    E: Eviction,
818    S: HashBuilder,
819    I: Indexer<Eviction = E>,
820{
821    inner: Arc<RawCacheInner<E, S, I>>,
822    record: Arc<Record<E>>,
823}
824
825impl<E, S, I> Debug for RawCacheEntry<E, S, I>
826where
827    E: Eviction,
828    S: HashBuilder,
829    I: Indexer<Eviction = E>,
830{
831    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
832        f.debug_struct("RawCacheEntry").field("record", &self.record).finish()
833    }
834}
835
836impl<E, S, I> Drop for RawCacheEntry<E, S, I>
837where
838    E: Eviction,
839    S: HashBuilder,
840    I: Indexer<Eviction = E>,
841{
842    fn drop(&mut self) {
843        let hash = self.record.hash();
844        let shard = &self.inner.shards[hash as usize % self.inner.shards.len()];
845
846        if self.record.dec_refs(1) == 0 {
847            if self.record.properties().phantom().unwrap_or_default() {
848                if let Some(listener) = self.inner.event_listener.as_ref() {
849                    listener.on_leave(Event::Evict, self.record.key(), self.record.value());
850                }
851                let pipe = self.inner.pipe.load();
852                if pipe.is_enabled() {
853                    pipe.send(Piece::new(self.record.clone()));
854                }
855                return;
856            }
857
858            match E::release() {
859                Op::Noop => {}
860                Op::Immutable(_) => shard.read().with(|shard| shard.release_immutable(&self.record)),
861                Op::Mutable(_) => shard.write().with(|mut shard| shard.release_mutable(&self.record)),
862            }
863        }
864    }
865}
866
867impl<E, S, I> Clone for RawCacheEntry<E, S, I>
868where
869    E: Eviction,
870    S: HashBuilder,
871    I: Indexer<Eviction = E>,
872{
873    fn clone(&self) -> Self {
874        self.record.inc_refs(1);
875        Self {
876            inner: self.inner.clone(),
877            record: self.record.clone(),
878        }
879    }
880}
881
882impl<E, S, I> Deref for RawCacheEntry<E, S, I>
883where
884    E: Eviction,
885    S: HashBuilder,
886    I: Indexer<Eviction = E>,
887{
888    type Target = E::Value;
889
890    fn deref(&self) -> &Self::Target {
891        self.value()
892    }
893}
894
895unsafe impl<E, S, I> Send for RawCacheEntry<E, S, I>
896where
897    E: Eviction,
898    S: HashBuilder,
899    I: Indexer<Eviction = E>,
900{
901}
902
903unsafe impl<E, S, I> Sync for RawCacheEntry<E, S, I>
904where
905    E: Eviction,
906    S: HashBuilder,
907    I: Indexer<Eviction = E>,
908{
909}
910
911impl<E, S, I> RawCacheEntry<E, S, I>
912where
913    E: Eviction,
914    S: HashBuilder,
915    I: Indexer<Eviction = E>,
916{
917    pub fn hash(&self) -> u64 {
918        self.record.hash()
919    }
920
921    pub fn key(&self) -> &E::Key {
922        self.record.key()
923    }
924
925    pub fn value(&self) -> &E::Value {
926        self.record.value()
927    }
928
929    pub fn properties(&self) -> &E::Properties {
930        self.record.properties()
931    }
932
933    pub fn weight(&self) -> usize {
934        self.record.weight()
935    }
936
937    pub fn refs(&self) -> usize {
938        self.record.refs()
939    }
940
941    pub fn is_outdated(&self) -> bool {
942        !self.record.is_in_indexer()
943    }
944
945    pub fn piece(&self) -> Piece<E::Key, E::Value, E::Properties> {
946        Piece::new(self.record.clone())
947    }
948}
949
950/// The state of `fetch`.
951#[derive(Debug, Clone, Copy, PartialEq, Eq)]
952pub enum FetchState {
953    /// Cache hit.
954    Hit,
955    /// Cache miss, but wait in queue.
956    Wait,
957    /// Cache miss, and there is no other waiters at the moment.
958    Miss,
959}
960
961/// Context for fetch calls.
962#[derive(Debug)]
963pub struct FetchContext {
964    /// If this fetch is caused by disk cache throttled.
965    pub throttled: bool,
966    /// Fetched entry source.
967    pub source: Source,
968}
969
970enum RawShardFetch<E, S, I>
971where
972    E: Eviction,
973    S: HashBuilder,
974    I: Indexer<Eviction = E>,
975{
976    Hit(Arc<Record<E>>),
977    Wait(RawFetchWait<E, S, I>),
978    Miss,
979}
980
981pub type RawFetch<E, ER, S, I = HashTableIndexer<E>> =
982    DiversionFuture<RawFetchInner<E, ER, S, I>, std::result::Result<RawCacheEntry<E, S, I>, ER>, FetchContext>;
983
984type RawFetchHit<E, S, I> = Option<RawCacheEntry<E, S, I>>;
985#[cfg(feature = "tracing")]
986type RawFetchWait<E, S, I> = InSpan<oneshot::Receiver<RawCacheEntry<E, S, I>>>;
987#[cfg(not(feature = "tracing"))]
988type RawFetchWait<E, S, I> = oneshot::Receiver<RawCacheEntry<E, S, I>>;
989type RawFetchMiss<E, I, S, ER, DFS> = JoinHandle<Diversion<std::result::Result<RawCacheEntry<E, S, I>, ER>, DFS>>;
990
991/// The target of a fetch operation.
992pub enum FetchTarget<K, V, P> {
993    /// Fetched value.
994    Value(V),
995    /// Fetched piece from disk cache write queue.
996    Piece(Piece<K, V, P>),
997}
998
999impl<K, V, P> Debug for FetchTarget<K, V, P> {
1000    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1001        f.debug_struct("FetchTarget").finish()
1002    }
1003}
1004
1005impl<K, V, P> From<V> for FetchTarget<K, V, P> {
1006    fn from(value: V) -> Self {
1007        Self::Value(value)
1008    }
1009}
1010
1011impl<K, V, P> From<Piece<K, V, P>> for FetchTarget<K, V, P> {
1012    fn from(piece: Piece<K, V, P>) -> Self {
1013        Self::Piece(piece)
1014    }
1015}
1016
1017#[pin_project(project = RawFetchInnerProj)]
1018pub enum RawFetchInner<E, ER, S, I>
1019where
1020    E: Eviction,
1021    S: HashBuilder,
1022    I: Indexer<Eviction = E>,
1023{
1024    Hit(RawFetchHit<E, S, I>),
1025    Wait(#[pin] RawFetchWait<E, S, I>),
1026    Miss(#[pin] RawFetchMiss<E, I, S, ER, FetchContext>),
1027}
1028
1029impl<E, ER, S, I> RawFetchInner<E, ER, S, I>
1030where
1031    E: Eviction,
1032    S: HashBuilder,
1033    I: Indexer<Eviction = E>,
1034{
1035    pub fn state(&self) -> FetchState {
1036        match self {
1037            RawFetchInner::Hit(_) => FetchState::Hit,
1038            RawFetchInner::Wait(_) => FetchState::Wait,
1039            RawFetchInner::Miss(_) => FetchState::Miss,
1040        }
1041    }
1042}
1043
1044impl<E, ER, S, I> Future for RawFetchInner<E, ER, S, I>
1045where
1046    E: Eviction,
1047    ER: From<Error>,
1048    S: HashBuilder,
1049    I: Indexer<Eviction = E>,
1050{
1051    type Output = Diversion<std::result::Result<RawCacheEntry<E, S, I>, ER>, FetchContext>;
1052
1053    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1054        match self.project() {
1055            RawFetchInnerProj::Hit(opt) => Poll::Ready(Ok(opt.take().unwrap()).into()),
1056            RawFetchInnerProj::Wait(waiter) => waiter.poll(cx).map_err(|e| Error::wait(e).into()).map(Diversion::from),
1057            RawFetchInnerProj::Miss(handle) => handle.poll(cx).map(|join| join.unwrap()),
1058        }
1059    }
1060}
1061
1062impl<E, S, I> RawCache<E, S, I>
1063where
1064    E: Eviction,
1065    S: HashBuilder,
1066    I: Indexer<Eviction = E>,
1067    E::Key: Clone,
1068{
1069    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::fetch"))]
1070    pub fn fetch<F, FU, ER>(&self, key: E::Key, fetch: F) -> RawFetch<E, ER, S, I>
1071    where
1072        F: FnOnce() -> FU,
1073        FU: Future<Output = std::result::Result<E::Value, ER>> + Send + 'static,
1074        ER: Send + 'static + Debug,
1075    {
1076        self.fetch_inner(
1077            key,
1078            Default::default(),
1079            fetch,
1080            &tokio::runtime::Handle::current().into(),
1081        )
1082    }
1083
1084    #[cfg_attr(
1085        feature = "tracing",
1086        fastrace::trace(name = "foyer::memory::raw::fetch_with_properties")
1087    )]
1088    pub fn fetch_with_properties<F, FU, ER, ID>(
1089        &self,
1090        key: E::Key,
1091        properties: E::Properties,
1092        fetch: F,
1093    ) -> RawFetch<E, ER, S, I>
1094    where
1095        F: FnOnce() -> FU,
1096        FU: Future<Output = ID> + Send + 'static,
1097        ER: Send + 'static + Debug,
1098        ID: Into<Diversion<std::result::Result<E::Value, ER>, FetchContext>>,
1099    {
1100        self.fetch_inner(key, properties, fetch, &tokio::runtime::Handle::current().into())
1101    }
1102
1103    /// Advanced fetch with specified runtime.
1104    ///
1105    /// This function is for internal usage and the doc is hidden.
1106    #[doc(hidden)]
1107    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::fetch_inner"))]
1108    pub fn fetch_inner<F, FU, ER, ID, IT>(
1109        &self,
1110        key: E::Key,
1111        mut properties: E::Properties,
1112        fetch: F,
1113        runtime: &SingletonHandle,
1114    ) -> RawFetch<E, ER, S, I>
1115    where
1116        F: FnOnce() -> FU,
1117        FU: Future<Output = ID> + Send + 'static,
1118        ER: Send + 'static + Debug,
1119        ID: Into<Diversion<std::result::Result<IT, ER>, FetchContext>>,
1120        IT: Into<FetchTarget<E::Key, E::Value, E::Properties>>,
1121    {
1122        let hash = self.inner.hash_builder.hash_one(&key);
1123
1124        let raw = match E::acquire() {
1125            Op::Noop => self.inner.shards[self.shard(hash)].read().fetch_noop(hash, &key),
1126            Op::Immutable(_) => self.inner.shards[self.shard(hash)].read().fetch_immutable(hash, &key),
1127            Op::Mutable(_) => self.inner.shards[self.shard(hash)].write().fetch_mutable(hash, &key),
1128        };
1129
1130        match raw {
1131            RawShardFetch::Hit(record) => {
1132                tracing::trace!(hash, "fetch => Hit");
1133                return RawFetch::new(RawFetchInner::Hit(Some(RawCacheEntry {
1134                    record,
1135                    inner: self.inner.clone(),
1136                })));
1137            }
1138            RawShardFetch::Wait(future) => {
1139                tracing::trace!(hash, "fetch => Wait");
1140                return RawFetch::new(RawFetchInner::Wait(future));
1141            }
1142            RawShardFetch::Miss => {
1143                tracing::trace!(hash, "fetch => Miss");
1144            }
1145        }
1146
1147        let cache = self.clone();
1148        let future = fetch();
1149        let join = runtime.spawn({
1150            tracing::trace!(hash, "fetch => join !!!");
1151            let task = async move {
1152                #[cfg(feature = "tracing")]
1153                let Diversion { target, store } = future
1154                    .in_span(Span::enter_with_local_parent("foyer::memory::raw::fetch_inner::fn"))
1155                    .await
1156                    .into();
1157                #[cfg(not(feature = "tracing"))]
1158                let Diversion { target, store } = future.await.into();
1159
1160                let target = match target {
1161                    Ok(value) => value,
1162                    Err(e) => {
1163                        cache.inner.shards[cache.shard(hash)].read().waiters.lock().remove(&key);
1164                        tracing::debug!("[fetch]: error raise while fetching, all waiter are dropped, err: {e:?}");
1165                        return Diversion { target: Err(e), store };
1166                    }
1167                };
1168                if let Some(ctx) = store.as_ref() {
1169                    if ctx.throttled {
1170                        // TODO(MrCroxx): Make sure foyer doesn't issue tombstone to the disk cache.
1171                        // TODO(MrCroxx): Also make sure the disk cache will write tombstone instead of this case.
1172                        properties = properties.with_location(Location::InMem)
1173                    }
1174                    properties = properties.with_source(ctx.source)
1175                };
1176                tracing::trace!(hash, "fetch => insert !!!");
1177                let entry = match target.into() {
1178                    FetchTarget::Value(value) => cache.insert_with_properties(key, value, properties),
1179                    FetchTarget::Piece(p) => cache.insert_inner(p.into_record::<E>()),
1180                };
1181                Diversion {
1182                    target: Ok(entry),
1183                    store,
1184                }
1185            };
1186            #[cfg(feature = "tracing")]
1187            let task = task.in_span(Span::enter_with_local_parent(
1188                "foyer::memory::generic::fetch_with_runtime::spawn",
1189            ));
1190            task
1191        });
1192
1193        RawFetch::new(RawFetchInner::Miss(join))
1194    }
1195}
1196
1197#[cfg(test)]
1198mod tests {
1199    use foyer_common::hasher::ModHasher;
1200    use rand::{rngs::SmallRng, seq::IndexedRandom, RngCore, SeedableRng};
1201
1202    use super::*;
1203    use crate::{
1204        eviction::{
1205            fifo::{Fifo, FifoConfig},
1206            lfu::{Lfu, LfuConfig},
1207            lru::{Lru, LruConfig},
1208            s3fifo::{S3Fifo, S3FifoConfig},
1209            sieve::{Sieve, SieveConfig},
1210            test_utils::TestProperties,
1211        },
1212        test_utils::PiecePipe,
1213    };
1214
1215    fn is_send_sync_static<T: Send + Sync + 'static>() {}
1216
1217    #[test]
1218    fn test_send_sync_static() {
1219        is_send_sync_static::<RawCache<Fifo<(), (), TestProperties>, ModHasher>>();
1220        is_send_sync_static::<RawCache<S3Fifo<(), (), TestProperties>, ModHasher>>();
1221        is_send_sync_static::<RawCache<Lfu<(), (), TestProperties>, ModHasher>>();
1222        is_send_sync_static::<RawCache<Lru<(), (), TestProperties>, ModHasher>>();
1223        is_send_sync_static::<RawCache<Sieve<(), (), TestProperties>, ModHasher>>();
1224    }
1225
1226    #[expect(clippy::type_complexity)]
1227    fn fifo_cache_for_test(
1228    ) -> RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Fifo<u64, u64, TestProperties>>> {
1229        RawCache::new(RawCacheConfig {
1230            capacity: 256,
1231            shards: 4,
1232            eviction_config: FifoConfig::default(),
1233            hash_builder: Default::default(),
1234            weighter: Arc::new(|_, _| 1),
1235            filter: Arc::new(|_, _| true),
1236            event_listener: None,
1237            metrics: Arc::new(Metrics::noop()),
1238        })
1239    }
1240
1241    #[expect(clippy::type_complexity)]
1242    fn s3fifo_cache_for_test(
1243    ) -> RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<S3Fifo<u64, u64, TestProperties>>> {
1244        RawCache::new(RawCacheConfig {
1245            capacity: 256,
1246            shards: 4,
1247            eviction_config: S3FifoConfig::default(),
1248            hash_builder: Default::default(),
1249            weighter: Arc::new(|_, _| 1),
1250            filter: Arc::new(|_, _| true),
1251            event_listener: None,
1252            metrics: Arc::new(Metrics::noop()),
1253        })
1254    }
1255
1256    #[expect(clippy::type_complexity)]
1257    fn lru_cache_for_test(
1258    ) -> RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lru<u64, u64, TestProperties>>> {
1259        RawCache::new(RawCacheConfig {
1260            capacity: 256,
1261            shards: 4,
1262            eviction_config: LruConfig::default(),
1263            hash_builder: Default::default(),
1264            weighter: Arc::new(|_, _| 1),
1265            filter: Arc::new(|_, _| true),
1266            event_listener: None,
1267            metrics: Arc::new(Metrics::noop()),
1268        })
1269    }
1270
1271    #[expect(clippy::type_complexity)]
1272    fn lfu_cache_for_test(
1273    ) -> RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lfu<u64, u64, TestProperties>>> {
1274        RawCache::new(RawCacheConfig {
1275            capacity: 256,
1276            shards: 4,
1277            eviction_config: LfuConfig::default(),
1278            hash_builder: Default::default(),
1279            weighter: Arc::new(|_, _| 1),
1280            filter: Arc::new(|_, _| true),
1281            event_listener: None,
1282            metrics: Arc::new(Metrics::noop()),
1283        })
1284    }
1285
1286    #[expect(clippy::type_complexity)]
1287    fn sieve_cache_for_test(
1288    ) -> RawCache<Sieve<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Sieve<u64, u64, TestProperties>>> {
1289        RawCache::new(RawCacheConfig {
1290            capacity: 256,
1291            shards: 4,
1292            eviction_config: SieveConfig {},
1293            hash_builder: Default::default(),
1294            weighter: Arc::new(|_, _| 1),
1295            filter: Arc::new(|_, _| true),
1296            event_listener: None,
1297            metrics: Arc::new(Metrics::noop()),
1298        })
1299    }
1300
1301    #[test_log::test]
1302    fn test_insert_phantom() {
1303        let fifo = fifo_cache_for_test();
1304
1305        let e1 = fifo.insert_with_properties(1, 1, TestProperties::default().with_phantom(true));
1306        assert_eq!(fifo.usage(), 0);
1307        drop(e1);
1308        assert_eq!(fifo.usage(), 0);
1309
1310        let e2a = fifo.insert_with_properties(2, 2, TestProperties::default().with_phantom(true));
1311        assert_eq!(fifo.usage(), 0);
1312        assert!(fifo.get(&2).is_none());
1313        assert_eq!(fifo.usage(), 0);
1314        drop(e2a);
1315        assert_eq!(fifo.usage(), 0);
1316
1317        let fifo = fifo_cache_for_test();
1318        fifo.insert(1, 1);
1319        assert_eq!(fifo.usage(), 1);
1320        assert_eq!(fifo.get(&1).unwrap().value(), &1);
1321        let e2 = fifo.insert_with_properties(1, 100, TestProperties::default().with_phantom(true));
1322        assert_eq!(fifo.usage(), 0);
1323        drop(e2);
1324        assert_eq!(fifo.usage(), 0);
1325        assert!(fifo.get(&1).is_none());
1326    }
1327
1328    #[expect(clippy::type_complexity)]
1329    #[test_log::test]
1330    fn test_insert_filter() {
1331        let fifo: RawCache<
1332            Fifo<u64, u64, TestProperties>,
1333            ModHasher,
1334            HashTableIndexer<Fifo<u64, u64, TestProperties>>,
1335        > = RawCache::new(RawCacheConfig {
1336            capacity: 256,
1337            shards: 4,
1338            eviction_config: FifoConfig::default(),
1339            hash_builder: Default::default(),
1340            weighter: Arc::new(|_, _| 1),
1341            filter: Arc::new(|k, _| !matches!(*k, 42)),
1342            event_listener: None,
1343            metrics: Arc::new(Metrics::noop()),
1344        });
1345
1346        fifo.insert(1, 1);
1347        fifo.insert(2, 2);
1348        fifo.insert(42, 42);
1349        assert_eq!(fifo.usage(), 2);
1350        assert_eq!(fifo.get(&1).unwrap().value(), &1);
1351        assert_eq!(fifo.get(&2).unwrap().value(), &2);
1352        assert!(fifo.get(&42).is_none());
1353    }
1354
1355    #[test]
1356    fn test_evict_all() {
1357        let pipe = Box::new(PiecePipe::default());
1358
1359        let fifo = fifo_cache_for_test();
1360        fifo.set_pipe(pipe.clone());
1361        for i in 0..fifo.capacity() as _ {
1362            fifo.insert(i, i);
1363        }
1364        assert_eq!(fifo.usage(), fifo.capacity());
1365
1366        fifo.evict_all();
1367        let mut pieces = pipe
1368            .pieces()
1369            .iter()
1370            .map(|p| (p.hash(), *p.key(), *p.value()))
1371            .collect_vec();
1372        pieces.sort_by_key(|t| t.0);
1373        let expected = (0..fifo.capacity() as u64).map(|i| (i, i, i)).collect_vec();
1374        assert_eq!(pieces, expected);
1375    }
1376
1377    #[test]
1378    fn test_insert_size_over_capacity() {
1379        let cache: RawCache<Fifo<Vec<u8>, Vec<u8>, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1380            capacity: 4 * 1024, // 4KB
1381            shards: 1,
1382            eviction_config: FifoConfig::default(),
1383            hash_builder: Default::default(),
1384            weighter: Arc::new(|k, v| k.len() + v.len()),
1385            filter: Arc::new(|_, _| true),
1386            event_listener: None,
1387            metrics: Arc::new(Metrics::noop()),
1388        });
1389
1390        let key = vec![b'k'; 1024]; // 1KB
1391        let value = vec![b'v'; 5 * 1024]; // 5KB
1392
1393        cache.insert(key.clone(), value.clone());
1394        assert_eq!(cache.usage(), 6 * 1024);
1395        assert_eq!(cache.get(&key).unwrap().value(), &value);
1396    }
1397
1398    fn test_resize<E>(cache: &RawCache<E, ModHasher, HashTableIndexer<E>>)
1399    where
1400        E: Eviction<Key = u64, Value = u64>,
1401    {
1402        let capacity = cache.capacity();
1403        for i in 0..capacity as u64 * 2 {
1404            cache.insert(i, i);
1405        }
1406        assert_eq!(cache.usage(), capacity);
1407        cache.resize(capacity / 2).unwrap();
1408        assert_eq!(cache.usage(), capacity / 2);
1409        for i in 0..capacity as u64 * 2 {
1410            cache.insert(i, i);
1411        }
1412        assert_eq!(cache.usage(), capacity / 2);
1413    }
1414
1415    #[test]
1416    fn test_fifo_cache_resize() {
1417        let cache = fifo_cache_for_test();
1418        test_resize(&cache);
1419    }
1420
1421    #[test]
1422    fn test_s3fifo_cache_resize() {
1423        let cache = s3fifo_cache_for_test();
1424        test_resize(&cache);
1425    }
1426
1427    #[test]
1428    fn test_lru_cache_resize() {
1429        let cache = lru_cache_for_test();
1430        test_resize(&cache);
1431    }
1432
1433    #[test]
1434    fn test_lfu_cache_resize() {
1435        let cache = lfu_cache_for_test();
1436        test_resize(&cache);
1437    }
1438
1439    #[test]
1440    fn test_sieve_cache_resize() {
1441        let cache = sieve_cache_for_test();
1442        test_resize(&cache);
1443    }
1444
1445    mod fuzzy {
1446        use foyer_common::properties::Hint;
1447
1448        use super::*;
1449
1450        fn fuzzy<E, S>(cache: RawCache<E, S>, hints: Vec<Hint>)
1451        where
1452            E: Eviction<Key = u64, Value = u64, Properties = TestProperties>,
1453            S: HashBuilder,
1454        {
1455            let handles = (0..8)
1456                .map(|i| {
1457                    let c = cache.clone();
1458                    let hints = hints.clone();
1459                    std::thread::spawn(move || {
1460                        let mut rng = SmallRng::seed_from_u64(i);
1461                        for _ in 0..100000 {
1462                            let key = rng.next_u64();
1463                            if let Some(entry) = c.get(&key) {
1464                                assert_eq!(key, *entry);
1465                                drop(entry);
1466                                continue;
1467                            }
1468                            let hint = hints.choose(&mut rng).cloned().unwrap();
1469                            c.insert_with_properties(key, key, TestProperties::default().with_hint(hint));
1470                        }
1471                    })
1472                })
1473                .collect_vec();
1474
1475            handles.into_iter().for_each(|handle| handle.join().unwrap());
1476
1477            assert_eq!(cache.usage(), cache.capacity());
1478        }
1479
1480        #[test_log::test]
1481        fn test_fifo_cache_fuzzy() {
1482            let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1483                capacity: 256,
1484                shards: 4,
1485                eviction_config: FifoConfig::default(),
1486                hash_builder: Default::default(),
1487                weighter: Arc::new(|_, _| 1),
1488                filter: Arc::new(|_, _| true),
1489                event_listener: None,
1490                metrics: Arc::new(Metrics::noop()),
1491            });
1492            let hints = vec![Hint::Normal];
1493            fuzzy(cache, hints);
1494        }
1495
1496        #[test_log::test]
1497        fn test_s3fifo_cache_fuzzy() {
1498            let cache: RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1499                capacity: 256,
1500                shards: 4,
1501                eviction_config: S3FifoConfig::default(),
1502                hash_builder: Default::default(),
1503                weighter: Arc::new(|_, _| 1),
1504                filter: Arc::new(|_, _| true),
1505                event_listener: None,
1506                metrics: Arc::new(Metrics::noop()),
1507            });
1508            let hints = vec![Hint::Normal];
1509            fuzzy(cache, hints);
1510        }
1511
1512        #[test_log::test]
1513        fn test_lru_cache_fuzzy() {
1514            let cache: RawCache<Lru<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1515                capacity: 256,
1516                shards: 4,
1517                eviction_config: LruConfig::default(),
1518                hash_builder: Default::default(),
1519                weighter: Arc::new(|_, _| 1),
1520                filter: Arc::new(|_, _| true),
1521                event_listener: None,
1522                metrics: Arc::new(Metrics::noop()),
1523            });
1524            let hints = vec![Hint::Normal, Hint::Low];
1525            fuzzy(cache, hints);
1526        }
1527
1528        #[test_log::test]
1529        fn test_lfu_cache_fuzzy() {
1530            let cache: RawCache<Lfu<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1531                capacity: 256,
1532                shards: 4,
1533                eviction_config: LfuConfig::default(),
1534                hash_builder: Default::default(),
1535                weighter: Arc::new(|_, _| 1),
1536                filter: Arc::new(|_, _| true),
1537                event_listener: None,
1538                metrics: Arc::new(Metrics::noop()),
1539            });
1540            let hints = vec![Hint::Normal];
1541            fuzzy(cache, hints);
1542        }
1543
1544        #[test_log::test]
1545        fn test_sieve_cache_fuzzy() {
1546            let cache: RawCache<Sieve<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1547                capacity: 256,
1548                shards: 4,
1549                eviction_config: SieveConfig {},
1550                hash_builder: Default::default(),
1551                weighter: Arc::new(|_, _| 1),
1552                filter: Arc::new(|_, _| true),
1553                event_listener: None,
1554                metrics: Arc::new(Metrics::noop()),
1555            });
1556            let hints = vec![Hint::Normal];
1557            fuzzy(cache, hints);
1558        }
1559    }
1560}