foyer_memory/
raw.rs

1// Copyright 2025 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::hash_map::{Entry as HashMapEntry, HashMap},
17    fmt::Debug,
18    future::Future,
19    hash::Hash,
20    ops::Deref,
21    pin::Pin,
22    sync::Arc,
23    task::{Context, Poll},
24};
25
26use arc_swap::ArcSwap;
27use equivalent::Equivalent;
28use fastrace::{
29    future::{FutureExt, InSpan},
30    Span,
31};
32use foyer_common::{
33    code::HashBuilder,
34    event::{Event, EventListener},
35    future::{Diversion, DiversionFuture},
36    metrics::Metrics,
37    runtime::SingletonHandle,
38    scope::Scope,
39    strict_assert,
40};
41use itertools::Itertools;
42use parking_lot::{Mutex, RwLock};
43use pin_project::pin_project;
44use tokio::{sync::oneshot, task::JoinHandle};
45
46use crate::{
47    error::{Error, Result},
48    eviction::{Eviction, Op},
49    indexer::{hash_table::HashTableIndexer, sentry::Sentry, Indexer},
50    pipe::NoopPipe,
51    record::{Data, Record},
52    Piece, Pipe,
53};
54
55/// The weighter for the in-memory cache.
56///
57/// The weighter is used to calculate the weight of the cache entry.
58pub trait Weighter<K, V>: Fn(&K, &V) -> usize + Send + Sync + 'static {}
59impl<K, V, T> Weighter<K, V> for T where T: Fn(&K, &V) -> usize + Send + Sync + 'static {}
60
61pub struct RawCacheConfig<E, S>
62where
63    E: Eviction,
64    S: HashBuilder,
65{
66    pub capacity: usize,
67    pub shards: usize,
68    pub eviction_config: E::Config,
69    pub hash_builder: S,
70    pub weighter: Arc<dyn Weighter<E::Key, E::Value>>,
71    pub event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
72    pub metrics: Arc<Metrics>,
73}
74
75struct RawCacheShard<E, S, I>
76where
77    E: Eviction,
78    S: HashBuilder,
79    I: Indexer<Eviction = E>,
80{
81    eviction: E,
82    indexer: Sentry<I>,
83
84    usage: usize,
85    capacity: usize,
86
87    #[expect(clippy::type_complexity)]
88    waiters: Mutex<HashMap<E::Key, Vec<oneshot::Sender<RawCacheEntry<E, S, I>>>>>,
89
90    metrics: Arc<Metrics>,
91    _event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
92}
93
94impl<E, S, I> RawCacheShard<E, S, I>
95where
96    E: Eviction,
97    S: HashBuilder,
98    I: Indexer<Eviction = E>,
99{
100    /// Evict entries to fit the target usage.
101    fn evict(&mut self, target: usize, garbages: &mut Vec<(Event, Arc<Record<E>>)>) {
102        // Evict overflow records.
103        while self.usage > target {
104            let evicted = match self.eviction.pop() {
105                Some(evicted) => evicted,
106                None => break,
107            };
108            self.metrics.memory_evict.increase(1);
109
110            let e = self.indexer.remove(evicted.hash(), evicted.key()).unwrap();
111            assert_eq!(Arc::as_ptr(&evicted), Arc::as_ptr(&e));
112
113            strict_assert!(!evicted.as_ref().is_in_indexer());
114            strict_assert!(!evicted.as_ref().is_in_eviction());
115
116            self.usage -= evicted.weight();
117
118            garbages.push((Event::Evict, evicted));
119        }
120    }
121
122    fn emplace(
123        &mut self,
124        data: Data<E>,
125        ephemeral: bool,
126        garbages: &mut Vec<(Event, Arc<Record<E>>)>,
127        waiters: &mut Vec<oneshot::Sender<RawCacheEntry<E, S, I>>>,
128    ) -> Arc<Record<E>> {
129        std::mem::swap(waiters, &mut self.waiters.lock().remove(&data.key).unwrap_or_default());
130
131        let weight = data.weight;
132        let old_usage = self.usage;
133
134        let record = Arc::new(Record::new(data));
135
136        // Evict overflow records.
137        self.evict(self.capacity.saturating_sub(weight), garbages);
138
139        // Insert new record
140        if let Some(old) = self.indexer.insert(record.clone()) {
141            self.metrics.memory_replace.increase(1);
142
143            strict_assert!(!old.is_in_indexer());
144
145            if old.is_in_eviction() {
146                self.eviction.remove(&old);
147            }
148            strict_assert!(!old.is_in_eviction());
149
150            self.usage -= old.weight();
151
152            garbages.push((Event::Replace, old));
153        } else {
154            self.metrics.memory_insert.increase(1);
155        }
156        strict_assert!(record.is_in_indexer());
157
158        record.set_ephemeral(ephemeral);
159        if !ephemeral {
160            self.eviction.push(record.clone());
161            strict_assert!(record.is_in_eviction());
162        }
163
164        self.usage += weight;
165        // Increase the reference count within the lock section.
166        // The reference count of the new record must be at the moment.
167        let refs = waiters.len() + 1;
168        let inc = record.inc_refs(refs);
169        assert_eq!(refs, inc);
170
171        match self.usage.cmp(&old_usage) {
172            std::cmp::Ordering::Greater => self.metrics.memory_usage.increase((self.usage - old_usage) as _),
173            std::cmp::Ordering::Less => self.metrics.memory_usage.decrease((old_usage - self.usage) as _),
174            std::cmp::Ordering::Equal => {}
175        }
176
177        record
178    }
179
180    #[fastrace::trace(name = "foyer::memory::raw::shard::remove")]
181    fn remove<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
182    where
183        Q: Hash + Equivalent<E::Key> + ?Sized,
184    {
185        let record = self.indexer.remove(hash, key)?;
186
187        if record.is_in_eviction() {
188            self.eviction.remove(&record);
189        }
190        strict_assert!(!record.is_in_indexer());
191        strict_assert!(!record.is_in_eviction());
192
193        self.usage -= record.weight();
194
195        self.metrics.memory_remove.increase(1);
196        self.metrics.memory_usage.decrease(record.weight() as _);
197
198        record.inc_refs(1);
199
200        Some(record)
201    }
202
203    #[fastrace::trace(name = "foyer::memory::raw::shard::get_noop")]
204    fn get_noop<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
205    where
206        Q: Hash + Equivalent<E::Key> + ?Sized,
207    {
208        self.get_inner(hash, key)
209    }
210
211    #[fastrace::trace(name = "foyer::memory::raw::shard::get_immutable")]
212    fn get_immutable<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
213    where
214        Q: Hash + Equivalent<E::Key> + ?Sized,
215    {
216        self.get_inner(hash, key)
217            .inspect(|record| self.acquire_immutable(record))
218    }
219
220    #[fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")]
221    fn get_mutable<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
222    where
223        Q: Hash + Equivalent<E::Key> + ?Sized,
224    {
225        self.get_inner(hash, key).inspect(|record| self.acquire_mutable(record))
226    }
227
228    #[fastrace::trace(name = "foyer::memory::raw::shard::get_inner")]
229    fn get_inner<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
230    where
231        Q: Hash + Equivalent<E::Key> + ?Sized,
232    {
233        let record = match self.indexer.get(hash, key).cloned() {
234            Some(record) => {
235                self.metrics.memory_hit.increase(1);
236                record
237            }
238            None => {
239                self.metrics.memory_miss.increase(1);
240                return None;
241            }
242        };
243
244        strict_assert!(record.is_in_indexer());
245
246        record.set_ephemeral(false);
247
248        record.inc_refs(1);
249
250        Some(record)
251    }
252
253    #[fastrace::trace(name = "foyer::memory::raw::shard::clear")]
254    fn clear(&mut self, garbages: &mut Vec<Arc<Record<E>>>) {
255        let records = self.indexer.drain().collect_vec();
256        self.eviction.clear();
257
258        let mut count = 0;
259
260        for record in records {
261            count += 1;
262            strict_assert!(!record.is_in_indexer());
263            strict_assert!(!record.is_in_eviction());
264
265            garbages.push(record);
266        }
267
268        self.metrics.memory_remove.increase(count);
269    }
270
271    #[fastrace::trace(name = "foyer::memory::raw::shard::acquire_immutable")]
272    fn acquire_immutable(&self, record: &Arc<Record<E>>) {
273        match E::acquire() {
274            Op::Immutable(f) => f(&self.eviction, record),
275            _ => unreachable!(),
276        }
277    }
278
279    #[fastrace::trace(name = "foyer::memory::raw::shard::acquire_mutable")]
280    fn acquire_mutable(&mut self, record: &Arc<Record<E>>) {
281        match E::acquire() {
282            Op::Mutable(mut f) => f(&mut self.eviction, record),
283            _ => unreachable!(),
284        }
285    }
286
287    #[fastrace::trace(name = "foyer::memory::raw::shard::release_immutable")]
288    fn release_immutable(&self, record: &Arc<Record<E>>) {
289        match E::release() {
290            Op::Immutable(f) => f(&self.eviction, record),
291            _ => unreachable!(),
292        }
293    }
294
295    #[fastrace::trace(name = "foyer::memory::raw::shard::release_mutable")]
296    fn release_mutable(&mut self, record: &Arc<Record<E>>) {
297        match E::release() {
298            Op::Mutable(mut f) => f(&mut self.eviction, record),
299            _ => unreachable!(),
300        }
301    }
302
303    #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_noop")]
304    fn fetch_noop(&self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
305    where
306        E::Key: Clone,
307    {
308        if let Some(record) = self.get_noop(hash, key) {
309            return RawShardFetch::Hit(record);
310        }
311
312        self.fetch_queue(key.clone())
313    }
314
315    #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_immutable")]
316    fn fetch_immutable(&self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
317    where
318        E::Key: Clone,
319    {
320        if let Some(record) = self.get_immutable(hash, key) {
321            return RawShardFetch::Hit(record);
322        }
323
324        self.fetch_queue(key.clone())
325    }
326
327    #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_mutable")]
328    fn fetch_mutable(&mut self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
329    where
330        E::Key: Clone,
331    {
332        if let Some(record) = self.get_mutable(hash, key) {
333            return RawShardFetch::Hit(record);
334        }
335
336        self.fetch_queue(key.clone())
337    }
338
339    #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_queue")]
340    fn fetch_queue(&self, key: E::Key) -> RawShardFetch<E, S, I> {
341        match self.waiters.lock().entry(key) {
342            HashMapEntry::Occupied(mut o) => {
343                let (tx, rx) = oneshot::channel();
344                o.get_mut().push(tx);
345                self.metrics.memory_queue.increase(1);
346                RawShardFetch::Wait(rx.in_span(Span::enter_with_local_parent(
347                    "foyer::memory::raw::fetch_with_runtime::wait",
348                )))
349            }
350            HashMapEntry::Vacant(v) => {
351                v.insert(vec![]);
352                self.metrics.memory_fetch.increase(1);
353                RawShardFetch::Miss
354            }
355        }
356    }
357}
358
359struct RawCacheInner<E, S, I>
360where
361    E: Eviction,
362    S: HashBuilder,
363    I: Indexer<Eviction = E>,
364{
365    shards: Vec<RwLock<RawCacheShard<E, S, I>>>,
366
367    capacity: usize,
368
369    hash_builder: Arc<S>,
370    weighter: Arc<dyn Weighter<E::Key, E::Value>>,
371
372    metrics: Arc<Metrics>,
373    event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
374    pipe: ArcSwap<Box<dyn Pipe<Key = E::Key, Value = E::Value>>>,
375}
376
377impl<E, S, I> RawCacheInner<E, S, I>
378where
379    E: Eviction,
380    S: HashBuilder,
381    I: Indexer<Eviction = E>,
382{
383    #[fastrace::trace(name = "foyer::memory::raw::inner::clear")]
384    fn clear(&self) {
385        let mut garbages = vec![];
386
387        self.shards
388            .iter()
389            .map(|shard| shard.write())
390            .for_each(|mut shard| shard.clear(&mut garbages));
391
392        // Do not deallocate data within the lock section.
393        if let Some(listener) = self.event_listener.as_ref() {
394            for record in garbages {
395                listener.on_leave(Event::Clear, record.key(), record.value());
396            }
397        }
398    }
399}
400
401pub struct RawCache<E, S = ahash::RandomState, I = HashTableIndexer<E>>
402where
403    E: Eviction,
404    S: HashBuilder,
405    I: Indexer<Eviction = E>,
406{
407    inner: Arc<RawCacheInner<E, S, I>>,
408}
409
410impl<E, S, I> Drop for RawCacheInner<E, S, I>
411where
412    E: Eviction,
413    S: HashBuilder,
414    I: Indexer<Eviction = E>,
415{
416    fn drop(&mut self) {
417        self.clear();
418    }
419}
420
421impl<E, S, I> Clone for RawCache<E, S, I>
422where
423    E: Eviction,
424    S: HashBuilder,
425    I: Indexer<Eviction = E>,
426{
427    fn clone(&self) -> Self {
428        Self {
429            inner: self.inner.clone(),
430        }
431    }
432}
433
434impl<E, S, I> RawCache<E, S, I>
435where
436    E: Eviction,
437    S: HashBuilder,
438    I: Indexer<Eviction = E>,
439{
440    pub fn new(config: RawCacheConfig<E, S>) -> Self {
441        let shard_capacity = config.capacity / config.shards;
442
443        let shards = (0..config.shards)
444            .map(|_| RawCacheShard {
445                eviction: E::new(shard_capacity, &config.eviction_config),
446                indexer: Sentry::default(),
447                usage: 0,
448                capacity: shard_capacity,
449                waiters: Mutex::default(),
450                metrics: config.metrics.clone(),
451                _event_listener: config.event_listener.clone(),
452            })
453            .map(RwLock::new)
454            .collect_vec();
455
456        let pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value>> = Box::new(NoopPipe::default());
457
458        let inner = RawCacheInner {
459            shards,
460            capacity: config.capacity,
461            hash_builder: Arc::new(config.hash_builder),
462            weighter: config.weighter,
463            metrics: config.metrics,
464            event_listener: config.event_listener,
465            pipe: ArcSwap::new(Arc::new(pipe)),
466        };
467
468        Self { inner: Arc::new(inner) }
469    }
470
471    #[fastrace::trace(name = "foyer::memory::raw::resize")]
472    pub fn resize(&self, capacity: usize) -> Result<()> {
473        let shards = self.inner.shards.len();
474        let shard_capacity = capacity / shards;
475
476        let handles = (0..shards)
477            .map(|i| {
478                let inner = self.inner.clone();
479                std::thread::spawn(move || {
480                    let mut garbages = vec![];
481                    let res = inner.shards[i].write().with(|mut shard| {
482                        shard.eviction.update(shard_capacity, None).inspect(|_| {
483                            shard.capacity = shard_capacity;
484                            shard.evict(shard_capacity, &mut garbages)
485                        })
486                    });
487                    // Deallocate data out of the lock critical section.
488                    let pipe = inner.pipe.load();
489                    let piped = pipe.is_enabled();
490                    if inner.event_listener.is_some() || piped {
491                        for (event, record) in garbages {
492                            if let Some(listener) = inner.event_listener.as_ref() {
493                                listener.on_leave(event, record.key(), record.value())
494                            }
495                            if piped && event == Event::Evict {
496                                pipe.send(Piece::new(record));
497                            }
498                        }
499                    }
500                    res
501                })
502            })
503            .collect_vec();
504
505        let errs = handles
506            .into_iter()
507            .map(|handle| handle.join().unwrap())
508            .filter(|res| res.is_err())
509            .map(|res| res.unwrap_err())
510            .collect_vec();
511        if !errs.is_empty() {
512            return Err(Error::multiple(errs));
513        }
514
515        Ok(())
516    }
517
518    #[fastrace::trace(name = "foyer::memory::raw::insert")]
519    pub fn insert(&self, key: E::Key, value: E::Value) -> RawCacheEntry<E, S, I> {
520        self.insert_with_hint(key, value, Default::default())
521    }
522
523    #[fastrace::trace(name = "foyer::memory::raw::insert_with_hint")]
524    pub fn insert_with_hint(&self, key: E::Key, value: E::Value, hint: E::Hint) -> RawCacheEntry<E, S, I> {
525        self.emplace(key, value, hint, false)
526    }
527
528    #[fastrace::trace(name = "foyer::memory::raw::insert_ephemeral")]
529    pub fn insert_ephemeral(&self, key: E::Key, value: E::Value) -> RawCacheEntry<E, S, I> {
530        self.insert_ephemeral_with_hint(key, value, Default::default())
531    }
532
533    #[fastrace::trace(name = "foyer::memory::raw::insert_ephemeral_with_hint")]
534    pub fn insert_ephemeral_with_hint(&self, key: E::Key, value: E::Value, hint: E::Hint) -> RawCacheEntry<E, S, I> {
535        self.emplace(key, value, hint, true)
536    }
537
538    #[fastrace::trace(name = "foyer::memory::raw::emplace")]
539    fn emplace(&self, key: E::Key, value: E::Value, hint: E::Hint, ephemeral: bool) -> RawCacheEntry<E, S, I> {
540        let hash = self.inner.hash_builder.hash_one(&key);
541        let weight = (self.inner.weighter)(&key, &value);
542
543        let mut garbages = vec![];
544        let mut waiters = vec![];
545
546        let record = self.inner.shards[self.shard(hash)].write().with(|mut shard| {
547            shard.emplace(
548                Data {
549                    key,
550                    value,
551                    hint,
552                    hash,
553                    weight,
554                },
555                ephemeral,
556                &mut garbages,
557                &mut waiters,
558            )
559        });
560
561        // Notify waiters out of the lock critical section.
562        for waiter in waiters {
563            let _ = waiter.send(RawCacheEntry {
564                record: record.clone(),
565                inner: self.inner.clone(),
566            });
567        }
568
569        // Deallocate data out of the lock critical section.
570        let pipe = self.inner.pipe.load();
571        let piped = pipe.is_enabled();
572        if self.inner.event_listener.is_some() || piped {
573            for (event, record) in garbages {
574                if let Some(listener) = self.inner.event_listener.as_ref() {
575                    listener.on_leave(event, record.key(), record.value())
576                }
577                if piped && event == Event::Evict {
578                    pipe.send(Piece::new(record));
579                }
580            }
581        }
582
583        RawCacheEntry {
584            record,
585            inner: self.inner.clone(),
586        }
587    }
588
589    #[fastrace::trace(name = "foyer::memory::raw::remove")]
590    pub fn remove<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
591    where
592        Q: Hash + Equivalent<E::Key> + ?Sized,
593    {
594        let hash = self.inner.hash_builder.hash_one(key);
595
596        self.inner.shards[self.shard(hash)]
597            .write()
598            .with(|mut shard| {
599                shard.remove(hash, key).map(|record| RawCacheEntry {
600                    inner: self.inner.clone(),
601                    record,
602                })
603            })
604            .inspect(|record| {
605                // Deallocate data out of the lock critical section.
606                if let Some(listener) = self.inner.event_listener.as_ref() {
607                    listener.on_leave(Event::Remove, record.key(), record.value());
608                }
609            })
610    }
611
612    #[fastrace::trace(name = "foyer::memory::raw::get")]
613    pub fn get<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
614    where
615        Q: Hash + Equivalent<E::Key> + ?Sized,
616    {
617        let hash = self.inner.hash_builder.hash_one(key);
618
619        let record = match E::acquire() {
620            Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
621            Op::Immutable(_) => self.inner.shards[self.shard(hash)]
622                .read()
623                .with(|shard| shard.get_immutable(hash, key)),
624            Op::Mutable(_) => self.inner.shards[self.shard(hash)]
625                .write()
626                .with(|mut shard| shard.get_mutable(hash, key)),
627        }?;
628
629        Some(RawCacheEntry {
630            inner: self.inner.clone(),
631            record,
632        })
633    }
634
635    #[fastrace::trace(name = "foyer::memory::raw::contains")]
636    pub fn contains<Q>(&self, key: &Q) -> bool
637    where
638        Q: Hash + Equivalent<E::Key> + ?Sized,
639    {
640        let hash = self.inner.hash_builder.hash_one(key);
641
642        self.inner.shards[self.shard(hash)]
643            .read()
644            .with(|shard| shard.indexer.get(hash, key).is_some())
645    }
646
647    #[fastrace::trace(name = "foyer::memory::raw::touch")]
648    pub fn touch<Q>(&self, key: &Q) -> bool
649    where
650        Q: Hash + Equivalent<E::Key> + ?Sized,
651    {
652        let hash = self.inner.hash_builder.hash_one(key);
653
654        match E::acquire() {
655            Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
656            Op::Immutable(_) => self.inner.shards[self.shard(hash)]
657                .read()
658                .with(|shard| shard.get_immutable(hash, key)),
659            Op::Mutable(_) => self.inner.shards[self.shard(hash)]
660                .write()
661                .with(|mut shard| shard.get_mutable(hash, key)),
662        }
663        .is_some()
664    }
665
666    #[fastrace::trace(name = "foyer::memory::raw::clear")]
667    pub fn clear(&self) {
668        self.inner.clear();
669    }
670
671    pub fn capacity(&self) -> usize {
672        self.inner.capacity
673    }
674
675    pub fn usage(&self) -> usize {
676        self.inner.shards.iter().map(|shard| shard.read().usage).sum()
677    }
678
679    pub fn metrics(&self) -> &Metrics {
680        &self.inner.metrics
681    }
682
683    pub fn hash_builder(&self) -> &Arc<S> {
684        &self.inner.hash_builder
685    }
686
687    pub fn shards(&self) -> usize {
688        self.inner.shards.len()
689    }
690
691    pub fn set_pipe(&self, pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value>>) {
692        self.inner.pipe.store(Arc::new(pipe));
693    }
694
695    fn shard(&self, hash: u64) -> usize {
696        hash as usize % self.inner.shards.len()
697    }
698}
699
700pub struct RawCacheEntry<E, S = ahash::RandomState, I = HashTableIndexer<E>>
701where
702    E: Eviction,
703    S: HashBuilder,
704    I: Indexer<Eviction = E>,
705{
706    inner: Arc<RawCacheInner<E, S, I>>,
707    record: Arc<Record<E>>,
708}
709
710impl<E, S, I> Debug for RawCacheEntry<E, S, I>
711where
712    E: Eviction,
713    S: HashBuilder,
714    I: Indexer<Eviction = E>,
715{
716    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
717        f.debug_struct("RawCacheEntry").field("record", &self.record).finish()
718    }
719}
720
721impl<E, S, I> Drop for RawCacheEntry<E, S, I>
722where
723    E: Eviction,
724    S: HashBuilder,
725    I: Indexer<Eviction = E>,
726{
727    fn drop(&mut self) {
728        let hash = self.record.hash();
729        let shard = &self.inner.shards[hash as usize % self.inner.shards.len()];
730
731        if self.record.dec_refs(1) == 0 {
732            match E::release() {
733                Op::Noop => {}
734                Op::Immutable(_) => shard.read().with(|shard| shard.release_immutable(&self.record)),
735                Op::Mutable(_) => shard.write().with(|mut shard| shard.release_mutable(&self.record)),
736            }
737
738            if self.record.is_ephemeral() {
739                shard
740                    .write()
741                    .with(|mut shard| shard.remove(hash, self.key()))
742                    .inspect(|record| {
743                        // Deallocate data out of the lock critical section.
744                        if let Some(listener) = self.inner.event_listener.as_ref() {
745                            listener.on_leave(Event::Remove, record.key(), record.value());
746                        }
747                    });
748            }
749        }
750    }
751}
752
753impl<E, S, I> Clone for RawCacheEntry<E, S, I>
754where
755    E: Eviction,
756    S: HashBuilder,
757    I: Indexer<Eviction = E>,
758{
759    fn clone(&self) -> Self {
760        self.record.inc_refs(1);
761        Self {
762            inner: self.inner.clone(),
763            record: self.record.clone(),
764        }
765    }
766}
767
768impl<E, S, I> Deref for RawCacheEntry<E, S, I>
769where
770    E: Eviction,
771    S: HashBuilder,
772    I: Indexer<Eviction = E>,
773{
774    type Target = E::Value;
775
776    fn deref(&self) -> &Self::Target {
777        self.value()
778    }
779}
780
781unsafe impl<E, S, I> Send for RawCacheEntry<E, S, I>
782where
783    E: Eviction,
784    S: HashBuilder,
785    I: Indexer<Eviction = E>,
786{
787}
788
789unsafe impl<E, S, I> Sync for RawCacheEntry<E, S, I>
790where
791    E: Eviction,
792    S: HashBuilder,
793    I: Indexer<Eviction = E>,
794{
795}
796
797impl<E, S, I> RawCacheEntry<E, S, I>
798where
799    E: Eviction,
800    S: HashBuilder,
801    I: Indexer<Eviction = E>,
802{
803    pub fn hash(&self) -> u64 {
804        self.record.hash()
805    }
806
807    pub fn key(&self) -> &E::Key {
808        self.record.key()
809    }
810
811    pub fn value(&self) -> &E::Value {
812        self.record.value()
813    }
814
815    pub fn hint(&self) -> &E::Hint {
816        self.record.hint()
817    }
818
819    pub fn weight(&self) -> usize {
820        self.record.weight()
821    }
822
823    pub fn refs(&self) -> usize {
824        self.record.refs()
825    }
826
827    pub fn is_outdated(&self) -> bool {
828        !self.record.is_in_indexer()
829    }
830
831    pub fn piece(&self) -> Piece<E::Key, E::Value> {
832        Piece::new(self.record.clone())
833    }
834}
835
836/// The state of `fetch`.
837#[derive(Debug, Clone, Copy, PartialEq, Eq)]
838pub enum FetchState {
839    /// Cache hit.
840    Hit,
841    /// Cache miss, but wait in queue.
842    Wait,
843    /// Cache miss, and there is no other waiters at the moment.
844    Miss,
845}
846
847/// A mark for fetch calls.
848pub struct FetchMark;
849
850enum RawShardFetch<E, S, I>
851where
852    E: Eviction,
853    S: HashBuilder,
854    I: Indexer<Eviction = E>,
855{
856    Hit(Arc<Record<E>>),
857    Wait(InSpan<oneshot::Receiver<RawCacheEntry<E, S, I>>>),
858    Miss,
859}
860
861pub type RawFetch<E, ER, S = ahash::RandomState, I = HashTableIndexer<E>> =
862    DiversionFuture<RawFetchInner<E, ER, S, I>, std::result::Result<RawCacheEntry<E, S, I>, ER>, FetchMark>;
863
864type RawFetchHit<E, S, I> = Option<RawCacheEntry<E, S, I>>;
865type RawFetchWait<E, S, I> = InSpan<oneshot::Receiver<RawCacheEntry<E, S, I>>>;
866type RawFetchMiss<E, I, S, ER, DFS> = JoinHandle<Diversion<std::result::Result<RawCacheEntry<E, S, I>, ER>, DFS>>;
867
868#[pin_project(project = RawFetchInnerProj)]
869pub enum RawFetchInner<E, ER, S, I>
870where
871    E: Eviction,
872    S: HashBuilder,
873    I: Indexer<Eviction = E>,
874{
875    Hit(RawFetchHit<E, S, I>),
876    Wait(#[pin] RawFetchWait<E, S, I>),
877    Miss(#[pin] RawFetchMiss<E, I, S, ER, FetchMark>),
878}
879
880impl<E, ER, S, I> RawFetchInner<E, ER, S, I>
881where
882    E: Eviction,
883    S: HashBuilder,
884    I: Indexer<Eviction = E>,
885{
886    pub fn state(&self) -> FetchState {
887        match self {
888            RawFetchInner::Hit(_) => FetchState::Hit,
889            RawFetchInner::Wait(_) => FetchState::Wait,
890            RawFetchInner::Miss(_) => FetchState::Miss,
891        }
892    }
893}
894
895impl<E, ER, S, I> Future for RawFetchInner<E, ER, S, I>
896where
897    E: Eviction,
898    ER: From<oneshot::error::RecvError>,
899    S: HashBuilder,
900    I: Indexer<Eviction = E>,
901{
902    type Output = Diversion<std::result::Result<RawCacheEntry<E, S, I>, ER>, FetchMark>;
903
904    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
905        match self.project() {
906            RawFetchInnerProj::Hit(opt) => Poll::Ready(Ok(opt.take().unwrap()).into()),
907            RawFetchInnerProj::Wait(waiter) => waiter.poll(cx).map_err(|err| err.into()).map(Diversion::from),
908            RawFetchInnerProj::Miss(handle) => handle.poll(cx).map(|join| join.unwrap()),
909        }
910    }
911}
912
913impl<E, S, I> RawCache<E, S, I>
914where
915    E: Eviction,
916    S: HashBuilder,
917    I: Indexer<Eviction = E>,
918    E::Key: Clone,
919{
920    #[fastrace::trace(name = "foyer::memory::raw::fetch")]
921    pub fn fetch<F, FU, ER>(&self, key: E::Key, fetch: F) -> RawFetch<E, ER, S, I>
922    where
923        F: FnOnce() -> FU,
924        FU: Future<Output = std::result::Result<E::Value, ER>> + Send + 'static,
925        ER: Send + 'static + Debug,
926    {
927        self.fetch_inner(
928            key,
929            Default::default(),
930            fetch,
931            &tokio::runtime::Handle::current().into(),
932        )
933    }
934
935    #[fastrace::trace(name = "foyer::memory::raw::fetch_with_hint")]
936    pub fn fetch_with_hint<F, FU, ER>(&self, key: E::Key, hint: E::Hint, fetch: F) -> RawFetch<E, ER, S, I>
937    where
938        F: FnOnce() -> FU,
939        FU: Future<Output = std::result::Result<E::Value, ER>> + Send + 'static,
940        ER: Send + 'static + Debug,
941    {
942        self.fetch_inner(key, hint, fetch, &tokio::runtime::Handle::current().into())
943    }
944
945    /// Internal fetch function, only for other foyer crates usages only, so the doc is hidden.
946    #[doc(hidden)]
947    #[fastrace::trace(name = "foyer::memory::raw::fetch_inner")]
948    pub fn fetch_inner<F, FU, ER, ID>(
949        &self,
950        key: E::Key,
951        hint: E::Hint,
952        fetch: F,
953        runtime: &SingletonHandle,
954    ) -> RawFetch<E, ER, S, I>
955    where
956        F: FnOnce() -> FU,
957        FU: Future<Output = ID> + Send + 'static,
958        ER: Send + 'static + Debug,
959        ID: Into<Diversion<std::result::Result<E::Value, ER>, FetchMark>>,
960    {
961        let hash = self.inner.hash_builder.hash_one(&key);
962
963        let raw = match E::acquire() {
964            Op::Noop => self.inner.shards[self.shard(hash)].read().fetch_noop(hash, &key),
965            Op::Immutable(_) => self.inner.shards[self.shard(hash)].read().fetch_immutable(hash, &key),
966            Op::Mutable(_) => self.inner.shards[self.shard(hash)].write().fetch_mutable(hash, &key),
967        };
968
969        match raw {
970            RawShardFetch::Hit(record) => {
971                return RawFetch::new(RawFetchInner::Hit(Some(RawCacheEntry {
972                    record,
973                    inner: self.inner.clone(),
974                })))
975            }
976            RawShardFetch::Wait(future) => return RawFetch::new(RawFetchInner::Wait(future)),
977            RawShardFetch::Miss => {}
978        }
979
980        let cache = self.clone();
981        let future = fetch();
982        let join = runtime.spawn(
983            async move {
984                let Diversion { target, store } = future
985                    .in_span(Span::enter_with_local_parent("foyer::memory::raw::fetch_inner::fn"))
986                    .await
987                    .into();
988                let value = match target {
989                    Ok(value) => value,
990                    Err(e) => {
991                        cache.inner.shards[cache.shard(hash)].read().waiters.lock().remove(&key);
992                        tracing::debug!("[fetch]: error raise while fetching, all waiter are dropped, err: {e:?}");
993                        return Diversion { target: Err(e), store };
994                    }
995                };
996                let entry = cache.insert_with_hint(key, value, hint);
997                Diversion {
998                    target: Ok(entry),
999                    store,
1000                }
1001            }
1002            .in_span(Span::enter_with_local_parent(
1003                "foyer::memory::generic::fetch_with_runtime::spawn",
1004            )),
1005        );
1006
1007        RawFetch::new(RawFetchInner::Miss(join))
1008    }
1009}
1010
1011#[cfg(test)]
1012mod tests {
1013    use foyer_common::hasher::ModRandomState;
1014    use rand::{rngs::SmallRng, seq::IndexedRandom, RngCore, SeedableRng};
1015
1016    use super::*;
1017    use crate::eviction::{
1018        fifo::{Fifo, FifoConfig, FifoHint},
1019        lfu::{Lfu, LfuConfig, LfuHint},
1020        lru::{Lru, LruConfig, LruHint},
1021        s3fifo::{S3Fifo, S3FifoConfig, S3FifoHint},
1022    };
1023
1024    fn is_send_sync_static<T: Send + Sync + 'static>() {}
1025
1026    #[test]
1027    fn test_send_sync_static() {
1028        is_send_sync_static::<RawCache<Fifo<(), ()>>>();
1029        is_send_sync_static::<RawCache<S3Fifo<(), ()>>>();
1030        is_send_sync_static::<RawCache<Lfu<(), ()>>>();
1031        is_send_sync_static::<RawCache<Lru<(), ()>>>();
1032    }
1033
1034    fn fifo_cache_for_test() -> RawCache<Fifo<u64, u64>, ModRandomState, HashTableIndexer<Fifo<u64, u64>>> {
1035        RawCache::new(RawCacheConfig {
1036            capacity: 256,
1037            shards: 4,
1038            eviction_config: FifoConfig::default(),
1039            hash_builder: Default::default(),
1040            weighter: Arc::new(|_, _| 1),
1041            event_listener: None,
1042            metrics: Arc::new(Metrics::noop()),
1043        })
1044    }
1045
1046    fn s3fifo_cache_for_test() -> RawCache<S3Fifo<u64, u64>, ModRandomState, HashTableIndexer<S3Fifo<u64, u64>>> {
1047        RawCache::new(RawCacheConfig {
1048            capacity: 256,
1049            shards: 4,
1050            eviction_config: S3FifoConfig::default(),
1051            hash_builder: Default::default(),
1052            weighter: Arc::new(|_, _| 1),
1053            event_listener: None,
1054            metrics: Arc::new(Metrics::noop()),
1055        })
1056    }
1057
1058    fn lru_cache_for_test() -> RawCache<Lru<u64, u64>, ModRandomState, HashTableIndexer<Lru<u64, u64>>> {
1059        RawCache::new(RawCacheConfig {
1060            capacity: 256,
1061            shards: 4,
1062            eviction_config: LruConfig::default(),
1063            hash_builder: Default::default(),
1064            weighter: Arc::new(|_, _| 1),
1065            event_listener: None,
1066            metrics: Arc::new(Metrics::noop()),
1067        })
1068    }
1069
1070    fn lfu_cache_for_test() -> RawCache<Lfu<u64, u64>, ModRandomState, HashTableIndexer<Lfu<u64, u64>>> {
1071        RawCache::new(RawCacheConfig {
1072            capacity: 256,
1073            shards: 4,
1074            eviction_config: LfuConfig::default(),
1075            hash_builder: Default::default(),
1076            weighter: Arc::new(|_, _| 1),
1077            event_listener: None,
1078            metrics: Arc::new(Metrics::noop()),
1079        })
1080    }
1081
1082    #[test]
1083    fn test_insert_ephemeral() {
1084        let fifo = fifo_cache_for_test();
1085
1086        let e1 = fifo.insert_ephemeral(1, 1);
1087        assert_eq!(fifo.usage(), 1);
1088        drop(e1);
1089        assert_eq!(fifo.usage(), 0);
1090
1091        let e2a = fifo.insert_ephemeral(2, 2);
1092        assert_eq!(fifo.usage(), 1);
1093        let e2b = fifo.get(&2).expect("entry 2 should exist");
1094        drop(e2a);
1095        assert_eq!(fifo.usage(), 1);
1096        drop(e2b);
1097        assert_eq!(fifo.usage(), 1);
1098    }
1099
1100    fn test_resize<E>(cache: &RawCache<E, ModRandomState, HashTableIndexer<E>>)
1101    where
1102        E: Eviction<Key = u64, Value = u64>,
1103    {
1104        let capacity = cache.capacity();
1105        for i in 0..capacity as u64 * 2 {
1106            cache.insert(i, i);
1107        }
1108        assert_eq!(cache.usage(), capacity);
1109        cache.resize(capacity / 2).unwrap();
1110        assert_eq!(cache.usage(), capacity / 2);
1111        for i in 0..capacity as u64 * 2 {
1112            cache.insert(i, i);
1113        }
1114        assert_eq!(cache.usage(), capacity / 2);
1115    }
1116
1117    #[test]
1118    fn test_fifo_cache_resize() {
1119        let cache = fifo_cache_for_test();
1120        test_resize(&cache);
1121    }
1122
1123    #[test]
1124    fn test_s3fifo_cache_resize() {
1125        let cache = s3fifo_cache_for_test();
1126        test_resize(&cache);
1127    }
1128
1129    #[test]
1130    fn test_lru_cache_resize() {
1131        let cache = lru_cache_for_test();
1132        test_resize(&cache);
1133    }
1134
1135    #[test]
1136    fn test_lfu_cache_resize() {
1137        let cache = lfu_cache_for_test();
1138        test_resize(&cache);
1139    }
1140
1141    mod fuzzy {
1142        use super::*;
1143
1144        fn fuzzy<E>(cache: RawCache<E>, hints: Vec<E::Hint>)
1145        where
1146            E: Eviction<Key = u64, Value = u64>,
1147        {
1148            let handles = (0..8)
1149                .map(|i| {
1150                    let c = cache.clone();
1151                    let hints = hints.clone();
1152                    std::thread::spawn(move || {
1153                        let mut rng = SmallRng::seed_from_u64(i);
1154                        for _ in 0..100000 {
1155                            let key = rng.next_u64();
1156                            if let Some(entry) = c.get(&key) {
1157                                assert_eq!(key, *entry);
1158                                drop(entry);
1159                                continue;
1160                            }
1161                            let hint = hints.choose(&mut rng).cloned().unwrap();
1162                            c.insert_with_hint(key, key, hint);
1163                        }
1164                    })
1165                })
1166                .collect_vec();
1167
1168            handles.into_iter().for_each(|handle| handle.join().unwrap());
1169
1170            assert_eq!(cache.usage(), cache.capacity());
1171        }
1172
1173        #[test_log::test]
1174        fn test_fifo_cache_fuzzy() {
1175            let cache: RawCache<Fifo<u64, u64>> = RawCache::new(RawCacheConfig {
1176                capacity: 256,
1177                shards: 4,
1178                eviction_config: FifoConfig::default(),
1179                hash_builder: Default::default(),
1180                weighter: Arc::new(|_, _| 1),
1181                event_listener: None,
1182                metrics: Arc::new(Metrics::noop()),
1183            });
1184            let hints = vec![FifoHint];
1185            fuzzy(cache, hints);
1186        }
1187
1188        #[test_log::test]
1189        fn test_s3fifo_cache_fuzzy() {
1190            let cache: RawCache<S3Fifo<u64, u64>> = RawCache::new(RawCacheConfig {
1191                capacity: 256,
1192                shards: 4,
1193                eviction_config: S3FifoConfig::default(),
1194                hash_builder: Default::default(),
1195                weighter: Arc::new(|_, _| 1),
1196                event_listener: None,
1197                metrics: Arc::new(Metrics::noop()),
1198            });
1199            let hints = vec![S3FifoHint];
1200            fuzzy(cache, hints);
1201        }
1202
1203        #[test_log::test]
1204        fn test_lru_cache_fuzzy() {
1205            let cache: RawCache<Lru<u64, u64>> = RawCache::new(RawCacheConfig {
1206                capacity: 256,
1207                shards: 4,
1208                eviction_config: LruConfig::default(),
1209                hash_builder: Default::default(),
1210                weighter: Arc::new(|_, _| 1),
1211                event_listener: None,
1212                metrics: Arc::new(Metrics::noop()),
1213            });
1214            let hints = vec![LruHint::HighPriority, LruHint::LowPriority];
1215            fuzzy(cache, hints);
1216        }
1217
1218        #[test_log::test]
1219        fn test_lfu_cache_fuzzy() {
1220            let cache: RawCache<Lfu<u64, u64>> = RawCache::new(RawCacheConfig {
1221                capacity: 256,
1222                shards: 4,
1223                eviction_config: LfuConfig::default(),
1224                hash_builder: Default::default(),
1225                weighter: Arc::new(|_, _| 1),
1226                event_listener: None,
1227                metrics: Arc::new(Metrics::noop()),
1228            });
1229            let hints = vec![LfuHint];
1230            fuzzy(cache, hints);
1231        }
1232    }
1233}