foyer_memory/
raw.rs

1// Copyright 2025 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::hash_map::{Entry as HashMapEntry, HashMap},
17    fmt::Debug,
18    future::Future,
19    hash::Hash,
20    ops::Deref,
21    pin::Pin,
22    sync::Arc,
23    task::{Context, Poll},
24};
25
26use arc_swap::ArcSwap;
27use equivalent::Equivalent;
28#[cfg(feature = "tracing")]
29use fastrace::{
30    future::{FutureExt, InSpan},
31    Span,
32};
33use foyer_common::{
34    code::HashBuilder,
35    event::{Event, EventListener},
36    future::{Diversion, DiversionFuture},
37    metrics::Metrics,
38    properties::{Location, Properties, Source},
39    runtime::SingletonHandle,
40    strict_assert,
41    utils::scope::Scope,
42};
43use itertools::Itertools;
44use parking_lot::{Mutex, RwLock};
45use pin_project::pin_project;
46use tokio::{sync::oneshot, task::JoinHandle};
47
48use crate::{
49    error::{Error, Result},
50    eviction::{Eviction, Op},
51    indexer::{hash_table::HashTableIndexer, sentry::Sentry, Indexer},
52    pipe::NoopPipe,
53    record::{Data, Record},
54    Piece, Pipe,
55};
56
57/// The weighter for the in-memory cache.
58///
59/// The weighter is used to calculate the weight of the cache entry.
60pub trait Weighter<K, V>: Fn(&K, &V) -> usize + Send + Sync + 'static {}
61impl<K, V, T> Weighter<K, V> for T where T: Fn(&K, &V) -> usize + Send + Sync + 'static {}
62
63/// The filter for the in-memory cache.
64///
65/// The filter is used to decide whether to admit or reject an entry based on its key and value.
66///
67/// If the filter returns true, the key value can be inserted into the in-memory cache;
68/// otherwise, the key value cannot be inserted.
69///
70/// To ensure API consistency, the in-memory cache will still return a cache entry,
71/// but it will not count towards the in-memory cache usage,
72/// and it will be immediately reclaimed when the cache entry is dropped.
73pub trait Filter<K, V>: Fn(&K, &V) -> bool + Send + Sync + 'static {}
74impl<K, V, T> Filter<K, V> for T where T: Fn(&K, &V) -> bool + Send + Sync + 'static {}
75
76pub struct RawCacheConfig<E, S>
77where
78    E: Eviction,
79    S: HashBuilder,
80{
81    pub capacity: usize,
82    pub shards: usize,
83    pub eviction_config: E::Config,
84    pub hash_builder: S,
85    pub weighter: Arc<dyn Weighter<E::Key, E::Value>>,
86    pub filter: Arc<dyn Filter<E::Key, E::Value>>,
87    pub event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
88    pub metrics: Arc<Metrics>,
89}
90
91struct RawCacheShard<E, S, I>
92where
93    E: Eviction,
94    S: HashBuilder,
95    I: Indexer<Eviction = E>,
96{
97    eviction: E,
98    indexer: Sentry<I>,
99
100    usage: usize,
101    capacity: usize,
102
103    #[expect(clippy::type_complexity)]
104    waiters: Mutex<HashMap<E::Key, Vec<oneshot::Sender<RawCacheEntry<E, S, I>>>>>,
105
106    metrics: Arc<Metrics>,
107    _event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
108}
109
110impl<E, S, I> RawCacheShard<E, S, I>
111where
112    E: Eviction,
113    S: HashBuilder,
114    I: Indexer<Eviction = E>,
115{
116    /// Evict entries to fit the target usage.
117    fn evict(&mut self, target: usize, garbages: &mut Vec<(Event, Arc<Record<E>>)>) {
118        // Evict overflow records.
119        while self.usage > target {
120            let evicted = match self.eviction.pop() {
121                Some(evicted) => evicted,
122                None => break,
123            };
124            self.metrics.memory_evict.increase(1);
125
126            let e = self.indexer.remove(evicted.hash(), evicted.key()).unwrap();
127            assert_eq!(Arc::as_ptr(&evicted), Arc::as_ptr(&e));
128
129            strict_assert!(!evicted.as_ref().is_in_indexer());
130            strict_assert!(!evicted.as_ref().is_in_eviction());
131
132            self.usage -= evicted.weight();
133
134            garbages.push((Event::Evict, evicted));
135        }
136    }
137
138    fn emplace(
139        &mut self,
140        record: Arc<Record<E>>,
141        garbages: &mut Vec<(Event, Arc<Record<E>>)>,
142        waiters: &mut Vec<oneshot::Sender<RawCacheEntry<E, S, I>>>,
143    ) -> Arc<Record<E>> {
144        *waiters = self.waiters.lock().remove(record.key()).unwrap_or_default();
145
146        let weight = record.weight();
147        let old_usage = self.usage;
148
149        // Evict overflow records.
150        self.evict(self.capacity.saturating_sub(weight), garbages);
151
152        // Insert new record
153        if let Some(old) = self.indexer.insert(record.clone()) {
154            self.metrics.memory_replace.increase(1);
155
156            strict_assert!(!old.is_in_indexer());
157
158            if old.is_in_eviction() {
159                self.eviction.remove(&old);
160            }
161            strict_assert!(!old.is_in_eviction());
162
163            self.usage -= old.weight();
164
165            garbages.push((Event::Replace, old));
166        } else {
167            self.metrics.memory_insert.increase(1);
168        }
169        strict_assert!(record.is_in_indexer());
170
171        let ephemeral = record.properties().ephemeral().unwrap_or_default();
172        record.set_ephemeral(ephemeral);
173        if !ephemeral {
174            self.eviction.push(record.clone());
175            strict_assert!(record.is_in_eviction());
176        }
177
178        self.usage += weight;
179        // Increase the reference count within the lock section.
180        // The reference count of the new record must be at the moment.
181        record.inc_refs(waiters.len() + 1);
182
183        match self.usage.cmp(&old_usage) {
184            std::cmp::Ordering::Greater => self.metrics.memory_usage.increase((self.usage - old_usage) as _),
185            std::cmp::Ordering::Less => self.metrics.memory_usage.decrease((old_usage - self.usage) as _),
186            std::cmp::Ordering::Equal => {}
187        }
188
189        record
190    }
191
192    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::remove"))]
193    fn remove<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
194    where
195        Q: Hash + Equivalent<E::Key> + ?Sized,
196    {
197        let record = self.indexer.remove(hash, key)?;
198
199        if record.is_in_eviction() {
200            self.eviction.remove(&record);
201        }
202        strict_assert!(!record.is_in_indexer());
203        strict_assert!(!record.is_in_eviction());
204
205        self.usage -= record.weight();
206
207        self.metrics.memory_remove.increase(1);
208        self.metrics.memory_usage.decrease(record.weight() as _);
209
210        record.inc_refs(1);
211
212        Some(record)
213    }
214
215    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_noop"))]
216    fn get_noop<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
217    where
218        Q: Hash + Equivalent<E::Key> + ?Sized,
219    {
220        self.get_inner(hash, key)
221    }
222
223    #[cfg_attr(
224        feature = "tracing",
225        fastrace::trace(name = "foyer::memory::raw::shard::get_immutable")
226    )]
227    fn get_immutable<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
228    where
229        Q: Hash + Equivalent<E::Key> + ?Sized,
230    {
231        self.get_inner(hash, key)
232            .inspect(|record| self.acquire_immutable(record))
233    }
234
235    #[cfg_attr(
236        feature = "tracing",
237        fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")
238    )]
239    fn get_mutable<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
240    where
241        Q: Hash + Equivalent<E::Key> + ?Sized,
242    {
243        self.get_inner(hash, key).inspect(|record| self.acquire_mutable(record))
244    }
245
246    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_inner"))]
247    fn get_inner<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
248    where
249        Q: Hash + Equivalent<E::Key> + ?Sized,
250    {
251        let record = match self.indexer.get(hash, key).cloned() {
252            Some(record) => {
253                self.metrics.memory_hit.increase(1);
254                record
255            }
256            None => {
257                self.metrics.memory_miss.increase(1);
258                return None;
259            }
260        };
261
262        strict_assert!(record.is_in_indexer());
263
264        record.set_ephemeral(false);
265
266        record.inc_refs(1);
267
268        Some(record)
269    }
270
271    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::clear"))]
272    fn clear(&mut self, garbages: &mut Vec<Arc<Record<E>>>) {
273        let records = self.indexer.drain().collect_vec();
274        self.eviction.clear();
275
276        let mut count = 0;
277
278        for record in records {
279            count += 1;
280            strict_assert!(!record.is_in_indexer());
281            strict_assert!(!record.is_in_eviction());
282
283            garbages.push(record);
284        }
285
286        self.metrics.memory_remove.increase(count);
287    }
288
289    #[cfg_attr(
290        feature = "tracing",
291        fastrace::trace(name = "foyer::memory::raw::shard::acquire_immutable")
292    )]
293    fn acquire_immutable(&self, record: &Arc<Record<E>>) {
294        match E::acquire() {
295            Op::Immutable(f) => f(&self.eviction, record),
296            _ => unreachable!(),
297        }
298    }
299
300    #[cfg_attr(
301        feature = "tracing",
302        fastrace::trace(name = "foyer::memory::raw::shard::acquire_mutable")
303    )]
304    fn acquire_mutable(&mut self, record: &Arc<Record<E>>) {
305        match E::acquire() {
306            Op::Mutable(mut f) => f(&mut self.eviction, record),
307            _ => unreachable!(),
308        }
309    }
310
311    #[cfg_attr(
312        feature = "tracing",
313        fastrace::trace(name = "foyer::memory::raw::shard::release_immutable")
314    )]
315    fn release_immutable(&self, record: &Arc<Record<E>>) {
316        match E::release() {
317            Op::Immutable(f) => f(&self.eviction, record),
318            _ => unreachable!(),
319        }
320    }
321
322    #[cfg_attr(
323        feature = "tracing",
324        fastrace::trace(name = "foyer::memory::raw::shard::release_mutable")
325    )]
326    fn release_mutable(&mut self, record: &Arc<Record<E>>) {
327        match E::release() {
328            Op::Mutable(mut f) => f(&mut self.eviction, record),
329            _ => unreachable!(),
330        }
331    }
332
333    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::fetch_noop"))]
334    fn fetch_noop(&self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
335    where
336        E::Key: Clone,
337    {
338        if let Some(record) = self.get_noop(hash, key) {
339            return RawShardFetch::Hit(record);
340        }
341
342        self.fetch_queue(key.clone())
343    }
344
345    #[cfg_attr(
346        feature = "tracing",
347        fastrace::trace(name = "foyer::memory::raw::shard::fetch_immutable")
348    )]
349    fn fetch_immutable(&self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
350    where
351        E::Key: Clone,
352    {
353        if let Some(record) = self.get_immutable(hash, key) {
354            return RawShardFetch::Hit(record);
355        }
356
357        self.fetch_queue(key.clone())
358    }
359
360    #[cfg_attr(
361        feature = "tracing",
362        fastrace::trace(name = "foyer::memory::raw::shard::fetch_mutable")
363    )]
364    fn fetch_mutable(&mut self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
365    where
366        E::Key: Clone,
367    {
368        if let Some(record) = self.get_mutable(hash, key) {
369            return RawShardFetch::Hit(record);
370        }
371
372        self.fetch_queue(key.clone())
373    }
374
375    #[cfg_attr(
376        feature = "tracing",
377        fastrace::trace(name = "foyer::memory::raw::shard::fetch_queue")
378    )]
379    fn fetch_queue(&self, key: E::Key) -> RawShardFetch<E, S, I> {
380        match self.waiters.lock().entry(key) {
381            HashMapEntry::Occupied(mut o) => {
382                let (tx, rx) = oneshot::channel();
383                o.get_mut().push(tx);
384                self.metrics.memory_queue.increase(1);
385                #[cfg(feature = "tracing")]
386                let wait = rx.in_span(Span::enter_with_local_parent(
387                    "foyer::memory::raw::fetch_with_runtime::wait",
388                ));
389                #[cfg(not(feature = "tracing"))]
390                let wait = rx;
391                RawShardFetch::Wait(wait)
392            }
393            HashMapEntry::Vacant(v) => {
394                v.insert(vec![]);
395                self.metrics.memory_fetch.increase(1);
396                RawShardFetch::Miss
397            }
398        }
399    }
400}
401
402#[expect(clippy::type_complexity)]
403struct RawCacheInner<E, S, I>
404where
405    E: Eviction,
406    S: HashBuilder,
407    I: Indexer<Eviction = E>,
408{
409    shards: Vec<RwLock<RawCacheShard<E, S, I>>>,
410
411    capacity: usize,
412
413    hash_builder: Arc<S>,
414    weighter: Arc<dyn Weighter<E::Key, E::Value>>,
415    filter: Arc<dyn Filter<E::Key, E::Value>>,
416
417    metrics: Arc<Metrics>,
418    event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
419    pipe: ArcSwap<Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>>>,
420}
421
422impl<E, S, I> RawCacheInner<E, S, I>
423where
424    E: Eviction,
425    S: HashBuilder,
426    I: Indexer<Eviction = E>,
427{
428    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::inner::clear"))]
429    fn clear(&self) {
430        let mut garbages = vec![];
431
432        self.shards
433            .iter()
434            .map(|shard| shard.write())
435            .for_each(|mut shard| shard.clear(&mut garbages));
436
437        // Do not deallocate data within the lock section.
438        if let Some(listener) = self.event_listener.as_ref() {
439            for record in garbages {
440                listener.on_leave(Event::Clear, record.key(), record.value());
441            }
442        }
443    }
444}
445
446pub struct RawCache<E, S, I = HashTableIndexer<E>>
447where
448    E: Eviction,
449    S: HashBuilder,
450    I: Indexer<Eviction = E>,
451{
452    inner: Arc<RawCacheInner<E, S, I>>,
453}
454
455impl<E, S, I> Drop for RawCacheInner<E, S, I>
456where
457    E: Eviction,
458    S: HashBuilder,
459    I: Indexer<Eviction = E>,
460{
461    fn drop(&mut self) {
462        self.clear();
463    }
464}
465
466impl<E, S, I> Clone for RawCache<E, S, I>
467where
468    E: Eviction,
469    S: HashBuilder,
470    I: Indexer<Eviction = E>,
471{
472    fn clone(&self) -> Self {
473        Self {
474            inner: self.inner.clone(),
475        }
476    }
477}
478
479impl<E, S, I> RawCache<E, S, I>
480where
481    E: Eviction,
482    S: HashBuilder,
483    I: Indexer<Eviction = E>,
484{
485    pub fn new(config: RawCacheConfig<E, S>) -> Self {
486        let shard_capacity = config.capacity / config.shards;
487
488        let shards = (0..config.shards)
489            .map(|_| RawCacheShard {
490                eviction: E::new(shard_capacity, &config.eviction_config),
491                indexer: Sentry::default(),
492                usage: 0,
493                capacity: shard_capacity,
494                waiters: Mutex::default(),
495                metrics: config.metrics.clone(),
496                _event_listener: config.event_listener.clone(),
497            })
498            .map(RwLock::new)
499            .collect_vec();
500
501        let pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>> =
502            Box::new(NoopPipe::default());
503
504        let inner = RawCacheInner {
505            shards,
506            capacity: config.capacity,
507            hash_builder: Arc::new(config.hash_builder),
508            weighter: config.weighter,
509            filter: config.filter,
510            metrics: config.metrics,
511            event_listener: config.event_listener,
512            pipe: ArcSwap::new(Arc::new(pipe)),
513        };
514
515        Self { inner: Arc::new(inner) }
516    }
517
518    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::resize"))]
519    pub fn resize(&self, capacity: usize) -> Result<()> {
520        let shards = self.inner.shards.len();
521        let shard_capacity = capacity / shards;
522
523        let handles = (0..shards)
524            .map(|i| {
525                let inner = self.inner.clone();
526                std::thread::spawn(move || {
527                    let mut garbages = vec![];
528                    let res = inner.shards[i].write().with(|mut shard| {
529                        shard.eviction.update(shard_capacity, None).inspect(|_| {
530                            shard.capacity = shard_capacity;
531                            shard.evict(shard_capacity, &mut garbages)
532                        })
533                    });
534                    // Deallocate data out of the lock critical section.
535                    let pipe = inner.pipe.load();
536                    let piped = pipe.is_enabled();
537                    if inner.event_listener.is_some() || piped {
538                        for (event, record) in garbages {
539                            if let Some(listener) = inner.event_listener.as_ref() {
540                                listener.on_leave(event, record.key(), record.value())
541                            }
542                            if piped && event == Event::Evict {
543                                pipe.send(Piece::new(record));
544                            }
545                        }
546                    }
547                    res
548                })
549            })
550            .collect_vec();
551
552        let errs = handles
553            .into_iter()
554            .map(|handle| handle.join().unwrap())
555            .filter(|res| res.is_err())
556            .map(|res| res.unwrap_err())
557            .collect_vec();
558        if !errs.is_empty() {
559            return Err(Error::multiple(errs));
560        }
561
562        Ok(())
563    }
564
565    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert"))]
566    pub fn insert(&self, key: E::Key, value: E::Value) -> RawCacheEntry<E, S, I> {
567        self.insert_with_properties(key, value, Default::default())
568    }
569
570    #[cfg_attr(
571        feature = "tracing",
572        fastrace::trace(name = "foyer::memory::raw::insert_with_properties")
573    )]
574    pub fn insert_with_properties(
575        &self,
576        key: E::Key,
577        value: E::Value,
578        mut properties: E::Properties,
579    ) -> RawCacheEntry<E, S, I> {
580        let hash = self.inner.hash_builder.hash_one(&key);
581        let weight = (self.inner.weighter)(&key, &value);
582        if !(self.inner.filter)(&key, &value) {
583            properties = properties.with_disposable(true);
584        }
585        let record = Arc::new(Record::new(Data {
586            key,
587            value,
588            properties,
589            hash,
590            weight,
591        }));
592        self.insert_inner(record)
593    }
594
595    #[doc(hidden)]
596    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_piece"))]
597    pub fn insert_piece(&self, piece: Piece<E::Key, E::Value, E::Properties>) -> RawCacheEntry<E, S, I> {
598        self.insert_inner(piece.into_record())
599    }
600
601    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_inner"))]
602    fn insert_inner(&self, record: Arc<Record<E>>) -> RawCacheEntry<E, S, I> {
603        if record.properties().disposable().unwrap_or_default() {
604            // Remove the stale record if it exists.
605            self.inner.shards[self.shard(record.hash())]
606                .write()
607                .remove(record.hash(), record.key())
608                .inspect(|r| {
609                    // Deallocate data out of the lock critical section.
610                    if let Some(listener) = self.inner.event_listener.as_ref() {
611                        listener.on_leave(Event::Replace, r.key(), r.value());
612                    }
613                });
614
615            // If the record is disposable, we do not insert it into the cache.
616            // Instead, we just return it and let it be dropped immediately after the last reference drops.
617            record.inc_refs(1);
618            return RawCacheEntry {
619                record,
620                inner: self.inner.clone(),
621            };
622        }
623
624        let mut garbages = vec![];
625        let mut waiters = vec![];
626
627        let record = self.inner.shards[self.shard(record.hash())]
628            .write()
629            .with(|mut shard| shard.emplace(record, &mut garbages, &mut waiters));
630
631        // Notify waiters out of the lock critical section.
632        for waiter in waiters {
633            let _ = waiter.send(RawCacheEntry {
634                record: record.clone(),
635                inner: self.inner.clone(),
636            });
637        }
638
639        // Deallocate data out of the lock critical section.
640        let pipe = self.inner.pipe.load();
641        let piped = pipe.is_enabled();
642        if self.inner.event_listener.is_some() || piped {
643            for (event, record) in garbages {
644                if let Some(listener) = self.inner.event_listener.as_ref() {
645                    listener.on_leave(event, record.key(), record.value())
646                }
647                if piped && event == Event::Evict {
648                    pipe.send(Piece::new(record));
649                }
650            }
651        }
652
653        RawCacheEntry {
654            record,
655            inner: self.inner.clone(),
656        }
657    }
658
659    /// Evict all entries in the cache and offload them into the disk cache via the pipe if needed.
660    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::evict_all"))]
661    pub fn evict_all(&self) {
662        let mut garbages = vec![];
663        for shard in self.inner.shards.iter() {
664            shard.write().evict(0, &mut garbages);
665        }
666
667        // Deallocate data out of the lock critical section.
668        let pipe = self.inner.pipe.load();
669        let piped = pipe.is_enabled();
670        if self.inner.event_listener.is_some() || piped {
671            for (event, record) in garbages {
672                if let Some(listener) = self.inner.event_listener.as_ref() {
673                    listener.on_leave(event, record.key(), record.value())
674                }
675                if piped && event == Event::Evict {
676                    pipe.send(Piece::new(record));
677                }
678            }
679        }
680    }
681
682    /// Evict all entries in the cache and offload them into the disk cache via the pipe if needed.
683    ///
684    /// This function obeys the io throttler of the disk cache and make sure all entries will be offloaded.
685    /// Therefore, this function is asynchronous.
686    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::flush"))]
687    pub async fn flush(&self) {
688        let mut garbages = vec![];
689        for shard in self.inner.shards.iter() {
690            shard.write().evict(0, &mut garbages);
691        }
692
693        // Deallocate data out of the lock critical section.
694        let pipe = self.inner.pipe.load();
695        let piped = pipe.is_enabled();
696
697        if let Some(listener) = self.inner.event_listener.as_ref() {
698            for (event, record) in garbages.iter() {
699                listener.on_leave(*event, record.key(), record.value());
700            }
701        }
702        if piped {
703            let pieces = garbages.into_iter().map(|(_, record)| Piece::new(record)).collect_vec();
704            pipe.flush(pieces).await;
705        }
706    }
707
708    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::remove"))]
709    pub fn remove<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
710    where
711        Q: Hash + Equivalent<E::Key> + ?Sized,
712    {
713        let hash = self.inner.hash_builder.hash_one(key);
714
715        self.inner.shards[self.shard(hash)]
716            .write()
717            .with(|mut shard| {
718                shard.remove(hash, key).map(|record| RawCacheEntry {
719                    inner: self.inner.clone(),
720                    record,
721                })
722            })
723            .inspect(|record| {
724                // Deallocate data out of the lock critical section.
725                if let Some(listener) = self.inner.event_listener.as_ref() {
726                    listener.on_leave(Event::Remove, record.key(), record.value());
727                }
728            })
729    }
730
731    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get"))]
732    pub fn get<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
733    where
734        Q: Hash + Equivalent<E::Key> + ?Sized,
735    {
736        let hash = self.inner.hash_builder.hash_one(key);
737
738        let record = match E::acquire() {
739            Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
740            Op::Immutable(_) => self.inner.shards[self.shard(hash)]
741                .read()
742                .with(|shard| shard.get_immutable(hash, key)),
743            Op::Mutable(_) => self.inner.shards[self.shard(hash)]
744                .write()
745                .with(|mut shard| shard.get_mutable(hash, key)),
746        }?;
747
748        Some(RawCacheEntry {
749            inner: self.inner.clone(),
750            record,
751        })
752    }
753
754    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::contains"))]
755    pub fn contains<Q>(&self, key: &Q) -> bool
756    where
757        Q: Hash + Equivalent<E::Key> + ?Sized,
758    {
759        let hash = self.inner.hash_builder.hash_one(key);
760
761        self.inner.shards[self.shard(hash)]
762            .read()
763            .with(|shard| shard.indexer.get(hash, key).is_some())
764    }
765
766    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::touch"))]
767    pub fn touch<Q>(&self, key: &Q) -> bool
768    where
769        Q: Hash + Equivalent<E::Key> + ?Sized,
770    {
771        let hash = self.inner.hash_builder.hash_one(key);
772
773        match E::acquire() {
774            Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
775            Op::Immutable(_) => self.inner.shards[self.shard(hash)]
776                .read()
777                .with(|shard| shard.get_immutable(hash, key)),
778            Op::Mutable(_) => self.inner.shards[self.shard(hash)]
779                .write()
780                .with(|mut shard| shard.get_mutable(hash, key)),
781        }
782        .is_some()
783    }
784
785    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::clear"))]
786    pub fn clear(&self) {
787        self.inner.clear();
788    }
789
790    pub fn capacity(&self) -> usize {
791        self.inner.capacity
792    }
793
794    pub fn usage(&self) -> usize {
795        self.inner.shards.iter().map(|shard| shard.read().usage).sum()
796    }
797
798    pub fn metrics(&self) -> &Metrics {
799        &self.inner.metrics
800    }
801
802    pub fn hash_builder(&self) -> &Arc<S> {
803        &self.inner.hash_builder
804    }
805
806    pub fn shards(&self) -> usize {
807        self.inner.shards.len()
808    }
809
810    pub fn set_pipe(&self, pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>>) {
811        self.inner.pipe.store(Arc::new(pipe));
812    }
813
814    fn shard(&self, hash: u64) -> usize {
815        hash as usize % self.inner.shards.len()
816    }
817}
818
819pub struct RawCacheEntry<E, S, I = HashTableIndexer<E>>
820where
821    E: Eviction,
822    S: HashBuilder,
823    I: Indexer<Eviction = E>,
824{
825    inner: Arc<RawCacheInner<E, S, I>>,
826    record: Arc<Record<E>>,
827}
828
829impl<E, S, I> Debug for RawCacheEntry<E, S, I>
830where
831    E: Eviction,
832    S: HashBuilder,
833    I: Indexer<Eviction = E>,
834{
835    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
836        f.debug_struct("RawCacheEntry").field("record", &self.record).finish()
837    }
838}
839
840impl<E, S, I> Drop for RawCacheEntry<E, S, I>
841where
842    E: Eviction,
843    S: HashBuilder,
844    I: Indexer<Eviction = E>,
845{
846    fn drop(&mut self) {
847        let hash = self.record.hash();
848        let shard = &self.inner.shards[hash as usize % self.inner.shards.len()];
849
850        if self.record.dec_refs(1) == 0 {
851            if self.record.properties().disposable().unwrap_or_default() {
852                // TODO(MrCroxx): Send it to disk cache write queue with pipe?
853                return;
854            }
855
856            match E::release() {
857                Op::Noop => {}
858                Op::Immutable(_) => shard.read().with(|shard| shard.release_immutable(&self.record)),
859                Op::Mutable(_) => shard.write().with(|mut shard| shard.release_mutable(&self.record)),
860            }
861
862            if self.record.is_ephemeral() {
863                shard
864                    .write()
865                    .with(|mut shard| shard.remove(hash, self.key()))
866                    .inspect(|record| {
867                        // Deallocate data out of the lock critical section.
868                        let pipe = self.inner.pipe.load();
869                        let piped = pipe.is_enabled();
870                        let event = Event::Evict;
871                        if self.inner.event_listener.is_some() || piped {
872                            if let Some(listener) = self.inner.event_listener.as_ref() {
873                                listener.on_leave(Event::Evict, record.key(), record.value());
874                            }
875                        }
876                        if piped && event == Event::Evict {
877                            pipe.send(self.piece());
878                        }
879                    });
880            }
881        }
882    }
883}
884
885impl<E, S, I> Clone for RawCacheEntry<E, S, I>
886where
887    E: Eviction,
888    S: HashBuilder,
889    I: Indexer<Eviction = E>,
890{
891    fn clone(&self) -> Self {
892        self.record.inc_refs(1);
893        Self {
894            inner: self.inner.clone(),
895            record: self.record.clone(),
896        }
897    }
898}
899
900impl<E, S, I> Deref for RawCacheEntry<E, S, I>
901where
902    E: Eviction,
903    S: HashBuilder,
904    I: Indexer<Eviction = E>,
905{
906    type Target = E::Value;
907
908    fn deref(&self) -> &Self::Target {
909        self.value()
910    }
911}
912
913unsafe impl<E, S, I> Send for RawCacheEntry<E, S, I>
914where
915    E: Eviction,
916    S: HashBuilder,
917    I: Indexer<Eviction = E>,
918{
919}
920
921unsafe impl<E, S, I> Sync for RawCacheEntry<E, S, I>
922where
923    E: Eviction,
924    S: HashBuilder,
925    I: Indexer<Eviction = E>,
926{
927}
928
929impl<E, S, I> RawCacheEntry<E, S, I>
930where
931    E: Eviction,
932    S: HashBuilder,
933    I: Indexer<Eviction = E>,
934{
935    pub fn hash(&self) -> u64 {
936        self.record.hash()
937    }
938
939    pub fn key(&self) -> &E::Key {
940        self.record.key()
941    }
942
943    pub fn value(&self) -> &E::Value {
944        self.record.value()
945    }
946
947    pub fn properties(&self) -> &E::Properties {
948        self.record.properties()
949    }
950
951    pub fn weight(&self) -> usize {
952        self.record.weight()
953    }
954
955    pub fn refs(&self) -> usize {
956        self.record.refs()
957    }
958
959    pub fn is_outdated(&self) -> bool {
960        !self.record.is_in_indexer()
961    }
962
963    pub fn piece(&self) -> Piece<E::Key, E::Value, E::Properties> {
964        Piece::new(self.record.clone())
965    }
966}
967
968/// The state of `fetch`.
969#[derive(Debug, Clone, Copy, PartialEq, Eq)]
970pub enum FetchState {
971    /// Cache hit.
972    Hit,
973    /// Cache miss, but wait in queue.
974    Wait,
975    /// Cache miss, and there is no other waiters at the moment.
976    Miss,
977}
978
979/// Context for fetch calls.
980#[derive(Debug)]
981pub struct FetchContext {
982    /// If this fetch is caused by disk cache throttled.
983    pub throttled: bool,
984    /// Fetched entry source.
985    pub source: Source,
986}
987
988enum RawShardFetch<E, S, I>
989where
990    E: Eviction,
991    S: HashBuilder,
992    I: Indexer<Eviction = E>,
993{
994    Hit(Arc<Record<E>>),
995    Wait(RawFetchWait<E, S, I>),
996    Miss,
997}
998
999pub type RawFetch<E, ER, S, I = HashTableIndexer<E>> =
1000    DiversionFuture<RawFetchInner<E, ER, S, I>, std::result::Result<RawCacheEntry<E, S, I>, ER>, FetchContext>;
1001
1002type RawFetchHit<E, S, I> = Option<RawCacheEntry<E, S, I>>;
1003#[cfg(feature = "tracing")]
1004type RawFetchWait<E, S, I> = InSpan<oneshot::Receiver<RawCacheEntry<E, S, I>>>;
1005#[cfg(not(feature = "tracing"))]
1006type RawFetchWait<E, S, I> = oneshot::Receiver<RawCacheEntry<E, S, I>>;
1007type RawFetchMiss<E, I, S, ER, DFS> = JoinHandle<Diversion<std::result::Result<RawCacheEntry<E, S, I>, ER>, DFS>>;
1008
1009/// The target of a fetch operation.
1010pub enum FetchTarget<K, V, P> {
1011    /// Fetched value.
1012    Value(V),
1013    /// Fetched piece from disk cache write queue.
1014    Piece(Piece<K, V, P>),
1015}
1016
1017impl<K, V, P> Debug for FetchTarget<K, V, P> {
1018    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1019        f.debug_struct("FetchTarget").finish()
1020    }
1021}
1022
1023impl<K, V, P> From<V> for FetchTarget<K, V, P> {
1024    fn from(value: V) -> Self {
1025        Self::Value(value)
1026    }
1027}
1028
1029impl<K, V, P> From<Piece<K, V, P>> for FetchTarget<K, V, P> {
1030    fn from(piece: Piece<K, V, P>) -> Self {
1031        Self::Piece(piece)
1032    }
1033}
1034
1035#[pin_project(project = RawFetchInnerProj)]
1036pub enum RawFetchInner<E, ER, S, I>
1037where
1038    E: Eviction,
1039    S: HashBuilder,
1040    I: Indexer<Eviction = E>,
1041{
1042    Hit(RawFetchHit<E, S, I>),
1043    Wait(#[pin] RawFetchWait<E, S, I>),
1044    Miss(#[pin] RawFetchMiss<E, I, S, ER, FetchContext>),
1045}
1046
1047impl<E, ER, S, I> RawFetchInner<E, ER, S, I>
1048where
1049    E: Eviction,
1050    S: HashBuilder,
1051    I: Indexer<Eviction = E>,
1052{
1053    pub fn state(&self) -> FetchState {
1054        match self {
1055            RawFetchInner::Hit(_) => FetchState::Hit,
1056            RawFetchInner::Wait(_) => FetchState::Wait,
1057            RawFetchInner::Miss(_) => FetchState::Miss,
1058        }
1059    }
1060}
1061
1062impl<E, ER, S, I> Future for RawFetchInner<E, ER, S, I>
1063where
1064    E: Eviction,
1065    ER: From<Error>,
1066    S: HashBuilder,
1067    I: Indexer<Eviction = E>,
1068{
1069    type Output = Diversion<std::result::Result<RawCacheEntry<E, S, I>, ER>, FetchContext>;
1070
1071    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1072        match self.project() {
1073            RawFetchInnerProj::Hit(opt) => Poll::Ready(Ok(opt.take().unwrap()).into()),
1074            RawFetchInnerProj::Wait(waiter) => waiter.poll(cx).map_err(|e| Error::wait(e).into()).map(Diversion::from),
1075            RawFetchInnerProj::Miss(handle) => handle.poll(cx).map(|join| join.unwrap()),
1076        }
1077    }
1078}
1079
1080impl<E, S, I> RawCache<E, S, I>
1081where
1082    E: Eviction,
1083    S: HashBuilder,
1084    I: Indexer<Eviction = E>,
1085    E::Key: Clone,
1086{
1087    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::fetch"))]
1088    pub fn fetch<F, FU, ER>(&self, key: E::Key, fetch: F) -> RawFetch<E, ER, S, I>
1089    where
1090        F: FnOnce() -> FU,
1091        FU: Future<Output = std::result::Result<E::Value, ER>> + Send + 'static,
1092        ER: Send + 'static + Debug,
1093    {
1094        self.fetch_inner(
1095            key,
1096            Default::default(),
1097            fetch,
1098            &tokio::runtime::Handle::current().into(),
1099        )
1100    }
1101
1102    #[cfg_attr(
1103        feature = "tracing",
1104        fastrace::trace(name = "foyer::memory::raw::fetch_with_properties")
1105    )]
1106    pub fn fetch_with_properties<F, FU, ER, ID>(
1107        &self,
1108        key: E::Key,
1109        properties: E::Properties,
1110        fetch: F,
1111    ) -> RawFetch<E, ER, S, I>
1112    where
1113        F: FnOnce() -> FU,
1114        FU: Future<Output = ID> + Send + 'static,
1115        ER: Send + 'static + Debug,
1116        ID: Into<Diversion<std::result::Result<E::Value, ER>, FetchContext>>,
1117    {
1118        self.fetch_inner(key, properties, fetch, &tokio::runtime::Handle::current().into())
1119    }
1120
1121    /// Advanced fetch with specified runtime.
1122    ///
1123    /// This function is for internal usage and the doc is hidden.
1124    #[doc(hidden)]
1125    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::fetch_inner"))]
1126    pub fn fetch_inner<F, FU, ER, ID, IT>(
1127        &self,
1128        key: E::Key,
1129        mut properties: E::Properties,
1130        fetch: F,
1131        runtime: &SingletonHandle,
1132    ) -> RawFetch<E, ER, S, I>
1133    where
1134        F: FnOnce() -> FU,
1135        FU: Future<Output = ID> + Send + 'static,
1136        ER: Send + 'static + Debug,
1137        ID: Into<Diversion<std::result::Result<IT, ER>, FetchContext>>,
1138        IT: Into<FetchTarget<E::Key, E::Value, E::Properties>>,
1139    {
1140        let hash = self.inner.hash_builder.hash_one(&key);
1141
1142        let raw = match E::acquire() {
1143            Op::Noop => self.inner.shards[self.shard(hash)].read().fetch_noop(hash, &key),
1144            Op::Immutable(_) => self.inner.shards[self.shard(hash)].read().fetch_immutable(hash, &key),
1145            Op::Mutable(_) => self.inner.shards[self.shard(hash)].write().fetch_mutable(hash, &key),
1146        };
1147
1148        match raw {
1149            RawShardFetch::Hit(record) => {
1150                return RawFetch::new(RawFetchInner::Hit(Some(RawCacheEntry {
1151                    record,
1152                    inner: self.inner.clone(),
1153                })))
1154            }
1155            RawShardFetch::Wait(future) => return RawFetch::new(RawFetchInner::Wait(future)),
1156            RawShardFetch::Miss => {}
1157        }
1158
1159        let cache = self.clone();
1160        let future = fetch();
1161        let join = runtime.spawn({
1162            let task = async move {
1163                #[cfg(feature = "tracing")]
1164                let Diversion { target, store } = future
1165                    .in_span(Span::enter_with_local_parent("foyer::memory::raw::fetch_inner::fn"))
1166                    .await
1167                    .into();
1168                #[cfg(not(feature = "tracing"))]
1169                let Diversion { target, store } = future.await.into();
1170
1171                let target = match target {
1172                    Ok(value) => value,
1173                    Err(e) => {
1174                        cache.inner.shards[cache.shard(hash)].read().waiters.lock().remove(&key);
1175                        tracing::debug!("[fetch]: error raise while fetching, all waiter are dropped, err: {e:?}");
1176                        return Diversion { target: Err(e), store };
1177                    }
1178                };
1179                if let Some(ctx) = store.as_ref() {
1180                    if ctx.throttled {
1181                        properties = properties.with_location(Location::InMem)
1182                    }
1183                    properties = properties.with_source(ctx.source)
1184                };
1185                let location = properties.location().unwrap_or_default();
1186                properties = properties.with_ephemeral(matches! {location, Location::OnDisk});
1187                let entry = match target.into() {
1188                    FetchTarget::Value(value) => cache.insert_with_properties(key, value, properties),
1189                    FetchTarget::Piece(p) => cache.insert_inner(p.into_record::<E>()),
1190                };
1191                Diversion {
1192                    target: Ok(entry),
1193                    store,
1194                }
1195            };
1196            #[cfg(feature = "tracing")]
1197            let task = task.in_span(Span::enter_with_local_parent(
1198                "foyer::memory::generic::fetch_with_runtime::spawn",
1199            ));
1200            task
1201        });
1202
1203        RawFetch::new(RawFetchInner::Miss(join))
1204    }
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209    use foyer_common::hasher::ModHasher;
1210    use rand::{rngs::SmallRng, seq::IndexedRandom, RngCore, SeedableRng};
1211
1212    use super::*;
1213    use crate::{
1214        eviction::{
1215            fifo::{Fifo, FifoConfig},
1216            lfu::{Lfu, LfuConfig},
1217            lru::{Lru, LruConfig},
1218            s3fifo::{S3Fifo, S3FifoConfig},
1219            sieve::{Sieve, SieveConfig},
1220            test_utils::TestProperties,
1221        },
1222        test_utils::PiecePipe,
1223    };
1224
1225    fn is_send_sync_static<T: Send + Sync + 'static>() {}
1226
1227    #[test]
1228    fn test_send_sync_static() {
1229        is_send_sync_static::<RawCache<Fifo<(), (), TestProperties>, ModHasher>>();
1230        is_send_sync_static::<RawCache<S3Fifo<(), (), TestProperties>, ModHasher>>();
1231        is_send_sync_static::<RawCache<Lfu<(), (), TestProperties>, ModHasher>>();
1232        is_send_sync_static::<RawCache<Lru<(), (), TestProperties>, ModHasher>>();
1233        is_send_sync_static::<RawCache<Sieve<(), (), TestProperties>, ModHasher>>();
1234    }
1235
1236    #[expect(clippy::type_complexity)]
1237    fn fifo_cache_for_test(
1238    ) -> RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Fifo<u64, u64, TestProperties>>> {
1239        RawCache::new(RawCacheConfig {
1240            capacity: 256,
1241            shards: 4,
1242            eviction_config: FifoConfig::default(),
1243            hash_builder: Default::default(),
1244            weighter: Arc::new(|_, _| 1),
1245            filter: Arc::new(|_, _| true),
1246            event_listener: None,
1247            metrics: Arc::new(Metrics::noop()),
1248        })
1249    }
1250
1251    #[expect(clippy::type_complexity)]
1252    fn s3fifo_cache_for_test(
1253    ) -> RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<S3Fifo<u64, u64, TestProperties>>> {
1254        RawCache::new(RawCacheConfig {
1255            capacity: 256,
1256            shards: 4,
1257            eviction_config: S3FifoConfig::default(),
1258            hash_builder: Default::default(),
1259            weighter: Arc::new(|_, _| 1),
1260            filter: Arc::new(|_, _| true),
1261            event_listener: None,
1262            metrics: Arc::new(Metrics::noop()),
1263        })
1264    }
1265
1266    #[expect(clippy::type_complexity)]
1267    fn lru_cache_for_test(
1268    ) -> RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lru<u64, u64, TestProperties>>> {
1269        RawCache::new(RawCacheConfig {
1270            capacity: 256,
1271            shards: 4,
1272            eviction_config: LruConfig::default(),
1273            hash_builder: Default::default(),
1274            weighter: Arc::new(|_, _| 1),
1275            filter: Arc::new(|_, _| true),
1276            event_listener: None,
1277            metrics: Arc::new(Metrics::noop()),
1278        })
1279    }
1280
1281    #[expect(clippy::type_complexity)]
1282    fn lfu_cache_for_test(
1283    ) -> RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lfu<u64, u64, TestProperties>>> {
1284        RawCache::new(RawCacheConfig {
1285            capacity: 256,
1286            shards: 4,
1287            eviction_config: LfuConfig::default(),
1288            hash_builder: Default::default(),
1289            weighter: Arc::new(|_, _| 1),
1290            filter: Arc::new(|_, _| true),
1291            event_listener: None,
1292            metrics: Arc::new(Metrics::noop()),
1293        })
1294    }
1295
1296    #[expect(clippy::type_complexity)]
1297    fn sieve_cache_for_test(
1298    ) -> RawCache<Sieve<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Sieve<u64, u64, TestProperties>>> {
1299        RawCache::new(RawCacheConfig {
1300            capacity: 256,
1301            shards: 4,
1302            eviction_config: SieveConfig {},
1303            hash_builder: Default::default(),
1304            weighter: Arc::new(|_, _| 1),
1305            filter: Arc::new(|_, _| true),
1306            event_listener: None,
1307            metrics: Arc::new(Metrics::noop()),
1308        })
1309    }
1310
1311    #[test_log::test]
1312    fn test_insert_ephemeral() {
1313        let fifo = fifo_cache_for_test();
1314
1315        let e1 = fifo.insert_with_properties(1, 1, TestProperties::default().with_ephemeral(true));
1316        assert_eq!(fifo.usage(), 1);
1317        drop(e1);
1318        assert_eq!(fifo.usage(), 0);
1319
1320        let e2a = fifo.insert_with_properties(2, 2, TestProperties::default().with_ephemeral(true));
1321        assert_eq!(fifo.usage(), 1);
1322        let e2b = fifo.get(&2).expect("entry 2 should exist");
1323        drop(e2a);
1324        assert_eq!(fifo.usage(), 1);
1325        drop(e2b);
1326        assert_eq!(fifo.usage(), 1);
1327    }
1328
1329    #[test_log::test]
1330    fn test_insert_disposable() {
1331        let fifo = fifo_cache_for_test();
1332
1333        let e1 = fifo.insert_with_properties(1, 1, TestProperties::default().with_disposable(true));
1334        assert_eq!(fifo.usage(), 0);
1335        drop(e1);
1336        assert_eq!(fifo.usage(), 0);
1337
1338        let e2a = fifo.insert_with_properties(2, 2, TestProperties::default().with_disposable(true));
1339        assert_eq!(fifo.usage(), 0);
1340        assert!(fifo.get(&2).is_none());
1341        assert_eq!(fifo.usage(), 0);
1342        drop(e2a);
1343        assert_eq!(fifo.usage(), 0);
1344
1345        let fifo = fifo_cache_for_test();
1346        fifo.insert(1, 1);
1347        assert_eq!(fifo.usage(), 1);
1348        assert_eq!(fifo.get(&1).unwrap().value(), &1);
1349        let e2 = fifo.insert_with_properties(1, 100, TestProperties::default().with_disposable(true));
1350        assert_eq!(fifo.usage(), 0);
1351        drop(e2);
1352        assert_eq!(fifo.usage(), 0);
1353        assert!(fifo.get(&1).is_none());
1354    }
1355
1356    #[expect(clippy::type_complexity)]
1357    #[test_log::test]
1358    fn test_insert_filter() {
1359        let fifo: RawCache<
1360            Fifo<u64, u64, TestProperties>,
1361            ModHasher,
1362            HashTableIndexer<Fifo<u64, u64, TestProperties>>,
1363        > = RawCache::new(RawCacheConfig {
1364            capacity: 256,
1365            shards: 4,
1366            eviction_config: FifoConfig::default(),
1367            hash_builder: Default::default(),
1368            weighter: Arc::new(|_, _| 1),
1369            filter: Arc::new(|k, _| !matches!(*k, 42)),
1370            event_listener: None,
1371            metrics: Arc::new(Metrics::noop()),
1372        });
1373
1374        fifo.insert(1, 1);
1375        fifo.insert(2, 2);
1376        fifo.insert(42, 42);
1377        assert_eq!(fifo.usage(), 2);
1378        assert_eq!(fifo.get(&1).unwrap().value(), &1);
1379        assert_eq!(fifo.get(&2).unwrap().value(), &2);
1380        assert!(fifo.get(&42).is_none());
1381    }
1382
1383    #[test]
1384    fn test_evict_all() {
1385        let pipe = Box::new(PiecePipe::default());
1386
1387        let fifo = fifo_cache_for_test();
1388        fifo.set_pipe(pipe.clone());
1389        for i in 0..fifo.capacity() as _ {
1390            fifo.insert(i, i);
1391        }
1392        assert_eq!(fifo.usage(), fifo.capacity());
1393
1394        fifo.evict_all();
1395        let mut pieces = pipe
1396            .pieces()
1397            .iter()
1398            .map(|p| (p.hash(), *p.key(), *p.value()))
1399            .collect_vec();
1400        pieces.sort_by_key(|t| t.0);
1401        let expected = (0..fifo.capacity() as u64).map(|i| (i, i, i)).collect_vec();
1402        assert_eq!(pieces, expected);
1403    }
1404
1405    #[test]
1406    fn test_insert_size_over_capacity() {
1407        let cache: RawCache<Fifo<Vec<u8>, Vec<u8>, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1408            capacity: 4 * 1024, // 4KB
1409            shards: 1,
1410            eviction_config: FifoConfig::default(),
1411            hash_builder: Default::default(),
1412            weighter: Arc::new(|k, v| k.len() + v.len()),
1413            filter: Arc::new(|_, _| true),
1414            event_listener: None,
1415            metrics: Arc::new(Metrics::noop()),
1416        });
1417
1418        let key = vec![b'k'; 1024]; // 1KB
1419        let value = vec![b'v'; 5 * 1024]; // 5KB
1420
1421        cache.insert(key.clone(), value.clone());
1422        assert_eq!(cache.usage(), 6 * 1024);
1423        assert_eq!(cache.get(&key).unwrap().value(), &value);
1424    }
1425
1426    fn test_resize<E>(cache: &RawCache<E, ModHasher, HashTableIndexer<E>>)
1427    where
1428        E: Eviction<Key = u64, Value = u64>,
1429    {
1430        let capacity = cache.capacity();
1431        for i in 0..capacity as u64 * 2 {
1432            cache.insert(i, i);
1433        }
1434        assert_eq!(cache.usage(), capacity);
1435        cache.resize(capacity / 2).unwrap();
1436        assert_eq!(cache.usage(), capacity / 2);
1437        for i in 0..capacity as u64 * 2 {
1438            cache.insert(i, i);
1439        }
1440        assert_eq!(cache.usage(), capacity / 2);
1441    }
1442
1443    #[test]
1444    fn test_fifo_cache_resize() {
1445        let cache = fifo_cache_for_test();
1446        test_resize(&cache);
1447    }
1448
1449    #[test]
1450    fn test_s3fifo_cache_resize() {
1451        let cache = s3fifo_cache_for_test();
1452        test_resize(&cache);
1453    }
1454
1455    #[test]
1456    fn test_lru_cache_resize() {
1457        let cache = lru_cache_for_test();
1458        test_resize(&cache);
1459    }
1460
1461    #[test]
1462    fn test_lfu_cache_resize() {
1463        let cache = lfu_cache_for_test();
1464        test_resize(&cache);
1465    }
1466
1467    #[test]
1468    fn test_sieve_cache_resize() {
1469        let cache = sieve_cache_for_test();
1470        test_resize(&cache);
1471    }
1472
1473    mod fuzzy {
1474        use foyer_common::properties::Hint;
1475
1476        use super::*;
1477
1478        fn fuzzy<E, S>(cache: RawCache<E, S>, hints: Vec<Hint>)
1479        where
1480            E: Eviction<Key = u64, Value = u64, Properties = TestProperties>,
1481            S: HashBuilder,
1482        {
1483            let handles = (0..8)
1484                .map(|i| {
1485                    let c = cache.clone();
1486                    let hints = hints.clone();
1487                    std::thread::spawn(move || {
1488                        let mut rng = SmallRng::seed_from_u64(i);
1489                        for _ in 0..100000 {
1490                            let key = rng.next_u64();
1491                            if let Some(entry) = c.get(&key) {
1492                                assert_eq!(key, *entry);
1493                                drop(entry);
1494                                continue;
1495                            }
1496                            let hint = hints.choose(&mut rng).cloned().unwrap();
1497                            c.insert_with_properties(key, key, TestProperties::default().with_hint(hint));
1498                        }
1499                    })
1500                })
1501                .collect_vec();
1502
1503            handles.into_iter().for_each(|handle| handle.join().unwrap());
1504
1505            assert_eq!(cache.usage(), cache.capacity());
1506        }
1507
1508        #[test_log::test]
1509        fn test_fifo_cache_fuzzy() {
1510            let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1511                capacity: 256,
1512                shards: 4,
1513                eviction_config: FifoConfig::default(),
1514                hash_builder: Default::default(),
1515                weighter: Arc::new(|_, _| 1),
1516                filter: Arc::new(|_, _| true),
1517                event_listener: None,
1518                metrics: Arc::new(Metrics::noop()),
1519            });
1520            let hints = vec![Hint::Normal];
1521            fuzzy(cache, hints);
1522        }
1523
1524        #[test_log::test]
1525        fn test_s3fifo_cache_fuzzy() {
1526            let cache: RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1527                capacity: 256,
1528                shards: 4,
1529                eviction_config: S3FifoConfig::default(),
1530                hash_builder: Default::default(),
1531                weighter: Arc::new(|_, _| 1),
1532                filter: Arc::new(|_, _| true),
1533                event_listener: None,
1534                metrics: Arc::new(Metrics::noop()),
1535            });
1536            let hints = vec![Hint::Normal];
1537            fuzzy(cache, hints);
1538        }
1539
1540        #[test_log::test]
1541        fn test_lru_cache_fuzzy() {
1542            let cache: RawCache<Lru<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1543                capacity: 256,
1544                shards: 4,
1545                eviction_config: LruConfig::default(),
1546                hash_builder: Default::default(),
1547                weighter: Arc::new(|_, _| 1),
1548                filter: Arc::new(|_, _| true),
1549                event_listener: None,
1550                metrics: Arc::new(Metrics::noop()),
1551            });
1552            let hints = vec![Hint::Normal, Hint::Low];
1553            fuzzy(cache, hints);
1554        }
1555
1556        #[test_log::test]
1557        fn test_lfu_cache_fuzzy() {
1558            let cache: RawCache<Lfu<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1559                capacity: 256,
1560                shards: 4,
1561                eviction_config: LfuConfig::default(),
1562                hash_builder: Default::default(),
1563                weighter: Arc::new(|_, _| 1),
1564                filter: Arc::new(|_, _| true),
1565                event_listener: None,
1566                metrics: Arc::new(Metrics::noop()),
1567            });
1568            let hints = vec![Hint::Normal];
1569            fuzzy(cache, hints);
1570        }
1571
1572        #[test_log::test]
1573        fn test_sieve_cache_fuzzy() {
1574            let cache: RawCache<Sieve<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1575                capacity: 256,
1576                shards: 4,
1577                eviction_config: SieveConfig {},
1578                hash_builder: Default::default(),
1579                weighter: Arc::new(|_, _| 1),
1580                filter: Arc::new(|_, _| true),
1581                event_listener: None,
1582                metrics: Arc::new(Metrics::noop()),
1583            });
1584            let hints = vec![Hint::Normal];
1585            fuzzy(cache, hints);
1586        }
1587    }
1588}