foyer_memory/
raw.rs

1// Copyright 2025 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    any::Any,
17    fmt::Debug,
18    future::Future,
19    hash::Hash,
20    ops::Deref,
21    pin::Pin,
22    sync::{
23        atomic::{AtomicBool, Ordering},
24        Arc,
25    },
26    task::{Context, Poll},
27};
28
29use arc_swap::ArcSwap;
30use equivalent::Equivalent;
31use foyer_common::{
32    code::HashBuilder,
33    error::{Error, ErrorKind, Result},
34    event::{Event, EventListener},
35    metrics::Metrics,
36    properties::{Location, Properties, Source},
37    runtime::SingletonHandle,
38    strict_assert,
39    utils::scope::Scope,
40};
41use futures_util::FutureExt as _;
42use itertools::Itertools;
43use parking_lot::{Mutex, RwLock};
44use pin_project::{pin_project, pinned_drop};
45
46use crate::{
47    eviction::{Eviction, Op},
48    indexer::{sentry::Sentry, Indexer},
49    inflight::{
50        Enqueue, FetchOrTake, FetchTarget, InflightManager, Notifier, OptionalFetch, OptionalFetchBuilder,
51        RequiredFetch, RequiredFetchBuilder, Waiter,
52    },
53    pipe::NoopPipe,
54    record::{Data, Record},
55    Piece, Pipe,
56};
57
58/// The weighter for the in-memory cache.
59///
60/// The weighter is used to calculate the weight of the cache entry.
61pub trait Weighter<K, V>: Fn(&K, &V) -> usize + Send + Sync + 'static {}
62impl<K, V, T> Weighter<K, V> for T where T: Fn(&K, &V) -> usize + Send + Sync + 'static {}
63
64/// The filter for the in-memory cache.
65///
66/// The filter is used to decide whether to admit or reject an entry based on its key and value.
67///
68/// If the filter returns true, the key value can be inserted into the in-memory cache;
69/// otherwise, the key value cannot be inserted.
70///
71/// To ensure API consistency, the in-memory cache will still return a cache entry,
72/// but it will not count towards the in-memory cache usage,
73/// and it will be immediately reclaimed when the cache entry is dropped.
74pub trait Filter<K, V>: Fn(&K, &V) -> bool + Send + Sync + 'static {}
75impl<K, V, T> Filter<K, V> for T where T: Fn(&K, &V) -> bool + Send + Sync + 'static {}
76
77pub struct RawCacheConfig<E, S>
78where
79    E: Eviction,
80    S: HashBuilder,
81{
82    pub capacity: usize,
83    pub shards: usize,
84    pub eviction_config: E::Config,
85    pub hash_builder: S,
86    pub weighter: Arc<dyn Weighter<E::Key, E::Value>>,
87    pub filter: Arc<dyn Filter<E::Key, E::Value>>,
88    pub event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
89    pub metrics: Arc<Metrics>,
90}
91
92struct RawCacheShard<E, S, I>
93where
94    E: Eviction,
95    S: HashBuilder,
96    I: Indexer<Eviction = E>,
97{
98    eviction: E,
99    indexer: Sentry<I>,
100
101    usage: usize,
102    capacity: usize,
103
104    inflights: Arc<Mutex<InflightManager<E, S, I>>>,
105
106    metrics: Arc<Metrics>,
107    _event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
108}
109
110impl<E, S, I> RawCacheShard<E, S, I>
111where
112    E: Eviction,
113    S: HashBuilder,
114    I: Indexer<Eviction = E>,
115{
116    /// Evict entries to fit the target usage.
117    fn evict(&mut self, target: usize, garbages: &mut Vec<(Event, Arc<Record<E>>)>) {
118        // Evict overflow records.
119        while self.usage > target {
120            let evicted = match self.eviction.pop() {
121                Some(evicted) => evicted,
122                None => break,
123            };
124            self.metrics.memory_evict.increase(1);
125
126            let e = self.indexer.remove(evicted.hash(), evicted.key()).unwrap();
127            assert_eq!(Arc::as_ptr(&evicted), Arc::as_ptr(&e));
128
129            strict_assert!(!evicted.as_ref().is_in_indexer());
130            strict_assert!(!evicted.as_ref().is_in_eviction());
131
132            self.usage -= evicted.weight();
133
134            garbages.push((Event::Evict, evicted));
135        }
136    }
137
138    #[expect(clippy::type_complexity)]
139    fn emplace(
140        &mut self,
141        record: Arc<Record<E>>,
142        garbages: &mut Vec<(Event, Arc<Record<E>>)>,
143        notifiers: &mut Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
144    ) {
145        *notifiers = self
146            .inflights
147            .lock()
148            .take(record.hash(), record.key(), None)
149            .unwrap_or_default();
150
151        if record.properties().phantom().unwrap_or_default() {
152            if let Some(old) = self.indexer.remove(record.hash(), record.key()) {
153                strict_assert!(!old.is_in_indexer());
154
155                if old.is_in_eviction() {
156                    self.eviction.remove(&old);
157                }
158                strict_assert!(!old.is_in_eviction());
159
160                self.usage -= old.weight();
161
162                garbages.push((Event::Replace, old));
163            }
164            record.inc_refs(notifiers.len() + 1);
165            garbages.push((Event::Remove, record));
166            self.metrics.memory_insert.increase(1);
167            return;
168        }
169
170        let weight = record.weight();
171        let old_usage = self.usage;
172
173        // Evict overflow records.
174        self.evict(self.capacity.saturating_sub(weight), garbages);
175
176        // Insert new record
177        if let Some(old) = self.indexer.insert(record.clone()) {
178            self.metrics.memory_replace.increase(1);
179
180            strict_assert!(!old.is_in_indexer());
181
182            if old.is_in_eviction() {
183                self.eviction.remove(&old);
184            }
185            strict_assert!(!old.is_in_eviction());
186
187            self.usage -= old.weight();
188
189            garbages.push((Event::Replace, old));
190        } else {
191            self.metrics.memory_insert.increase(1);
192        }
193        strict_assert!(record.is_in_indexer());
194
195        strict_assert!(!record.is_in_eviction());
196        self.eviction.push(record.clone());
197        strict_assert!(record.is_in_eviction());
198
199        self.usage += weight;
200        // Increase the reference count within the lock section.
201        // The reference count of the new record must be at the moment.
202        record.inc_refs(notifiers.len() + 1);
203
204        match self.usage.cmp(&old_usage) {
205            std::cmp::Ordering::Greater => self.metrics.memory_usage.increase((self.usage - old_usage) as _),
206            std::cmp::Ordering::Less => self.metrics.memory_usage.decrease((old_usage - self.usage) as _),
207            std::cmp::Ordering::Equal => {}
208        }
209    }
210
211    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::remove"))]
212    fn remove<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
213    where
214        Q: Hash + Equivalent<E::Key> + ?Sized,
215    {
216        let record = self.indexer.remove(hash, key)?;
217
218        if record.is_in_eviction() {
219            self.eviction.remove(&record);
220        }
221        strict_assert!(!record.is_in_indexer());
222        strict_assert!(!record.is_in_eviction());
223
224        self.usage -= record.weight();
225
226        self.metrics.memory_remove.increase(1);
227        self.metrics.memory_usage.decrease(record.weight() as _);
228
229        record.inc_refs(1);
230
231        Some(record)
232    }
233
234    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_noop"))]
235    fn get_noop<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
236    where
237        Q: Hash + Equivalent<E::Key> + ?Sized,
238    {
239        self.get_inner(hash, key)
240    }
241
242    #[cfg_attr(
243        feature = "tracing",
244        fastrace::trace(name = "foyer::memory::raw::shard::get_immutable")
245    )]
246    fn get_immutable<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
247    where
248        Q: Hash + Equivalent<E::Key> + ?Sized,
249    {
250        self.get_inner(hash, key)
251            .inspect(|record| self.acquire_immutable(record))
252    }
253
254    #[cfg_attr(
255        feature = "tracing",
256        fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")
257    )]
258    fn get_mutable<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
259    where
260        Q: Hash + Equivalent<E::Key> + ?Sized,
261    {
262        self.get_inner(hash, key).inspect(|record| self.acquire_mutable(record))
263    }
264
265    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_inner"))]
266    fn get_inner<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
267    where
268        Q: Hash + Equivalent<E::Key> + ?Sized,
269    {
270        let record = match self.indexer.get(hash, key).cloned() {
271            Some(record) => {
272                self.metrics.memory_hit.increase(1);
273                record
274            }
275            None => {
276                self.metrics.memory_miss.increase(1);
277                return None;
278            }
279        };
280
281        strict_assert!(record.is_in_indexer());
282
283        record.inc_refs(1);
284
285        Some(record)
286    }
287
288    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::clear"))]
289    fn clear(&mut self, garbages: &mut Vec<Arc<Record<E>>>) {
290        let records = self.indexer.drain().collect_vec();
291        self.eviction.clear();
292
293        let mut count = 0;
294
295        for record in records {
296            count += 1;
297            strict_assert!(!record.is_in_indexer());
298            strict_assert!(!record.is_in_eviction());
299
300            garbages.push(record);
301        }
302
303        self.metrics.memory_remove.increase(count);
304    }
305
306    #[cfg_attr(
307        feature = "tracing",
308        fastrace::trace(name = "foyer::memory::raw::shard::acquire_immutable")
309    )]
310    fn acquire_immutable(&self, record: &Arc<Record<E>>) {
311        match E::acquire() {
312            Op::Immutable(f) => f(&self.eviction, record),
313            _ => unreachable!(),
314        }
315    }
316
317    #[cfg_attr(
318        feature = "tracing",
319        fastrace::trace(name = "foyer::memory::raw::shard::acquire_mutable")
320    )]
321    fn acquire_mutable(&mut self, record: &Arc<Record<E>>) {
322        match E::acquire() {
323            Op::Mutable(mut f) => f(&mut self.eviction, record),
324            _ => unreachable!(),
325        }
326    }
327
328    #[cfg_attr(
329        feature = "tracing",
330        fastrace::trace(name = "foyer::memory::raw::shard::release_immutable")
331    )]
332    fn release_immutable(&self, record: &Arc<Record<E>>) {
333        match E::release() {
334            Op::Immutable(f) => f(&self.eviction, record),
335            _ => unreachable!(),
336        }
337    }
338
339    #[cfg_attr(
340        feature = "tracing",
341        fastrace::trace(name = "foyer::memory::raw::shard::release_mutable")
342    )]
343    fn release_mutable(&mut self, record: &Arc<Record<E>>) {
344        match E::release() {
345            Op::Mutable(mut f) => f(&mut self.eviction, record),
346            _ => unreachable!(),
347        }
348    }
349}
350
351#[expect(clippy::type_complexity)]
352struct RawCacheInner<E, S, I>
353where
354    E: Eviction,
355    S: HashBuilder,
356    I: Indexer<Eviction = E>,
357{
358    shards: Vec<RwLock<RawCacheShard<E, S, I>>>,
359
360    capacity: usize,
361
362    hash_builder: Arc<S>,
363    weighter: Arc<dyn Weighter<E::Key, E::Value>>,
364    filter: Arc<dyn Filter<E::Key, E::Value>>,
365
366    metrics: Arc<Metrics>,
367    event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
368    pipe: ArcSwap<Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>>>,
369}
370
371impl<E, S, I> RawCacheInner<E, S, I>
372where
373    E: Eviction,
374    S: HashBuilder,
375    I: Indexer<Eviction = E>,
376{
377    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::inner::clear"))]
378    fn clear(&self) {
379        let mut garbages = vec![];
380
381        self.shards
382            .iter()
383            .map(|shard| shard.write())
384            .for_each(|mut shard| shard.clear(&mut garbages));
385
386        // Do not deallocate data within the lock section.
387        if let Some(listener) = self.event_listener.as_ref() {
388            for record in garbages {
389                listener.on_leave(Event::Clear, record.key(), record.value());
390            }
391        }
392    }
393}
394
395pub struct RawCache<E, S, I>
396where
397    E: Eviction,
398    S: HashBuilder,
399    I: Indexer<Eviction = E>,
400{
401    inner: Arc<RawCacheInner<E, S, I>>,
402}
403
404impl<E, S, I> Drop for RawCacheInner<E, S, I>
405where
406    E: Eviction,
407    S: HashBuilder,
408    I: Indexer<Eviction = E>,
409{
410    fn drop(&mut self) {
411        self.clear();
412    }
413}
414
415impl<E, S, I> Clone for RawCache<E, S, I>
416where
417    E: Eviction,
418    S: HashBuilder,
419    I: Indexer<Eviction = E>,
420{
421    fn clone(&self) -> Self {
422        Self {
423            inner: self.inner.clone(),
424        }
425    }
426}
427
428impl<E, S, I> RawCache<E, S, I>
429where
430    E: Eviction,
431    S: HashBuilder,
432    I: Indexer<Eviction = E>,
433{
434    pub fn new(config: RawCacheConfig<E, S>) -> Self {
435        assert!(config.shards > 0, "shards must be greater than zero.");
436
437        let shard_capacities = (0..config.shards)
438            .map(|index| Self::shard_capacity_for(config.capacity, config.shards, index))
439            .collect_vec();
440
441        let shards = shard_capacities
442            .into_iter()
443            .map(|shard_capacity| RawCacheShard {
444                eviction: E::new(shard_capacity, &config.eviction_config),
445                indexer: Sentry::default(),
446                usage: 0,
447                capacity: shard_capacity,
448                inflights: Arc::new(Mutex::new(InflightManager::new())),
449                metrics: config.metrics.clone(),
450                _event_listener: config.event_listener.clone(),
451            })
452            .map(RwLock::new)
453            .collect_vec();
454
455        let pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>> =
456            Box::new(NoopPipe::default());
457
458        let inner = RawCacheInner {
459            shards,
460            capacity: config.capacity,
461            hash_builder: Arc::new(config.hash_builder),
462            weighter: config.weighter,
463            filter: config.filter,
464            metrics: config.metrics,
465            event_listener: config.event_listener,
466            pipe: ArcSwap::new(Arc::new(pipe)),
467        };
468
469        Self { inner: Arc::new(inner) }
470    }
471
472    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::resize"))]
473    pub fn resize(&self, capacity: usize) -> Result<()> {
474        let shards = self.inner.shards.len();
475        assert!(shards > 0, "shards must be greater than zero.");
476
477        let shard_capacities = (0..shards)
478            .map(|index| Self::shard_capacity_for(capacity, shards, index))
479            .collect_vec();
480
481        let handles = shard_capacities
482            .into_iter()
483            .enumerate()
484            .map(|(i, shard_capacity)| {
485                let inner = self.inner.clone();
486                std::thread::spawn(move || {
487                    let mut garbages = vec![];
488                    let res = inner.shards[i].write().with(|mut shard| {
489                        shard.eviction.update(shard_capacity, None).inspect(|_| {
490                            shard.capacity = shard_capacity;
491                            shard.evict(shard_capacity, &mut garbages)
492                        })
493                    });
494                    // Deallocate data out of the lock critical section.
495                    let pipe = inner.pipe.load();
496                    let piped = pipe.is_enabled();
497                    if inner.event_listener.is_some() || piped {
498                        for (event, record) in garbages {
499                            if let Some(listener) = inner.event_listener.as_ref() {
500                                listener.on_leave(event, record.key(), record.value())
501                            }
502                            if piped && event == Event::Evict {
503                                pipe.send(Piece::new(record));
504                            }
505                        }
506                    }
507                    res
508                })
509            })
510            .collect_vec();
511
512        let errs = handles
513            .into_iter()
514            .map(|handle| handle.join().unwrap())
515            .filter(|res| res.is_err())
516            .map(|res| res.unwrap_err())
517            .collect_vec();
518        if !errs.is_empty() {
519            let mut e = Error::new(ErrorKind::Config, "resize raw cache failed");
520            for err in errs {
521                e = e.with_context("reason", format!("{err}"));
522            }
523            return Err(e);
524        }
525
526        Ok(())
527    }
528
529    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert"))]
530    pub fn insert(&self, key: E::Key, value: E::Value) -> RawCacheEntry<E, S, I> {
531        self.insert_with_properties(key, value, Default::default())
532    }
533
534    #[cfg_attr(
535        feature = "tracing",
536        fastrace::trace(name = "foyer::memory::raw::insert_with_properties")
537    )]
538    pub fn insert_with_properties(
539        &self,
540        key: E::Key,
541        value: E::Value,
542        properties: E::Properties,
543    ) -> RawCacheEntry<E, S, I> {
544        self.insert_with_properties_inner(key, value, properties, Source::Outer)
545    }
546
547    fn insert_with_properties_inner(
548        &self,
549        key: E::Key,
550        value: E::Value,
551        mut properties: E::Properties,
552        source: Source,
553    ) -> RawCacheEntry<E, S, I> {
554        let hash = self.inner.hash_builder.hash_one(&key);
555        let weight = (self.inner.weighter)(&key, &value);
556        if !(self.inner.filter)(&key, &value) {
557            properties = properties.with_phantom(true);
558        }
559        if let Some(location) = properties.location() {
560            if location == Location::OnDisk {
561                properties = properties.with_phantom(true);
562            }
563        }
564        let record = Arc::new(Record::new(Data {
565            key,
566            value,
567            properties,
568            hash,
569            weight,
570        }));
571        self.insert_inner(record, source)
572    }
573
574    #[doc(hidden)]
575    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_piece"))]
576    pub fn insert_piece(&self, piece: Piece<E::Key, E::Value, E::Properties>) -> RawCacheEntry<E, S, I> {
577        self.insert_inner(piece.into_record(), Source::Memory)
578    }
579
580    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_inner"))]
581    fn insert_inner(&self, record: Arc<Record<E>>, source: Source) -> RawCacheEntry<E, S, I> {
582        let mut garbages = vec![];
583        let mut notifiers = vec![];
584
585        self.inner.shards[self.shard(record.hash())]
586            .write()
587            .with(|mut shard| shard.emplace(record.clone(), &mut garbages, &mut notifiers));
588
589        // Notify waiters out of the lock critical section.
590        for notifier in notifiers {
591            let _ = notifier.send(Ok(Some(RawCacheEntry {
592                record: record.clone(),
593                inner: self.inner.clone(),
594                source,
595            })));
596        }
597
598        // Deallocate data out of the lock critical section.
599        let pipe = self.inner.pipe.load();
600        let piped = pipe.is_enabled();
601        if self.inner.event_listener.is_some() || piped {
602            for (event, record) in garbages {
603                if let Some(listener) = self.inner.event_listener.as_ref() {
604                    listener.on_leave(event, record.key(), record.value())
605                }
606                if piped && event == Event::Evict {
607                    pipe.send(Piece::new(record));
608                }
609            }
610        }
611
612        RawCacheEntry {
613            record,
614            inner: self.inner.clone(),
615            source,
616        }
617    }
618
619    /// Evict all entries in the cache and offload them into the disk cache via the pipe if needed.
620    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::evict_all"))]
621    pub fn evict_all(&self) {
622        let mut garbages = vec![];
623        for shard in self.inner.shards.iter() {
624            shard.write().evict(0, &mut garbages);
625        }
626
627        // Deallocate data out of the lock critical section.
628        let pipe = self.inner.pipe.load();
629        let piped = pipe.is_enabled();
630        if self.inner.event_listener.is_some() || piped {
631            for (event, record) in garbages {
632                if let Some(listener) = self.inner.event_listener.as_ref() {
633                    listener.on_leave(event, record.key(), record.value())
634                }
635                if piped && event == Event::Evict {
636                    pipe.send(Piece::new(record));
637                }
638            }
639        }
640    }
641
642    /// Evict all entries in the cache and offload them into the disk cache via the pipe if needed.
643    ///
644    /// This function obeys the io throttler of the disk cache and make sure all entries will be offloaded.
645    /// Therefore, this function is asynchronous.
646    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::flush"))]
647    pub async fn flush(&self) {
648        let mut garbages = vec![];
649        for shard in self.inner.shards.iter() {
650            shard.write().evict(0, &mut garbages);
651        }
652
653        // Deallocate data out of the lock critical section.
654        let pipe = self.inner.pipe.load();
655        let piped = pipe.is_enabled();
656
657        if let Some(listener) = self.inner.event_listener.as_ref() {
658            for (event, record) in garbages.iter() {
659                listener.on_leave(*event, record.key(), record.value());
660            }
661        }
662        if piped {
663            let pieces = garbages.into_iter().map(|(_, record)| Piece::new(record)).collect_vec();
664            pipe.flush(pieces).await;
665        }
666    }
667
668    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::remove"))]
669    pub fn remove<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
670    where
671        Q: Hash + Equivalent<E::Key> + ?Sized,
672    {
673        let hash = self.inner.hash_builder.hash_one(key);
674
675        self.inner.shards[self.shard(hash)]
676            .write()
677            .with(|mut shard| {
678                shard.remove(hash, key).map(|record| RawCacheEntry {
679                    inner: self.inner.clone(),
680                    record,
681                    source: Source::Memory,
682                })
683            })
684            .inspect(|record| {
685                // Deallocate data out of the lock critical section.
686                if let Some(listener) = self.inner.event_listener.as_ref() {
687                    listener.on_leave(Event::Remove, record.key(), record.value());
688                }
689            })
690    }
691
692    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get"))]
693    pub fn get<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
694    where
695        Q: Hash + Equivalent<E::Key> + ?Sized,
696    {
697        let hash = self.inner.hash_builder.hash_one(key);
698
699        let record = match E::acquire() {
700            Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
701            Op::Immutable(_) => self.inner.shards[self.shard(hash)]
702                .read()
703                .with(|shard| shard.get_immutable(hash, key)),
704            Op::Mutable(_) => self.inner.shards[self.shard(hash)]
705                .write()
706                .with(|mut shard| shard.get_mutable(hash, key)),
707        }?;
708
709        Some(RawCacheEntry {
710            inner: self.inner.clone(),
711            record,
712            source: Source::Memory,
713        })
714    }
715
716    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::contains"))]
717    pub fn contains<Q>(&self, key: &Q) -> bool
718    where
719        Q: Hash + Equivalent<E::Key> + ?Sized,
720    {
721        let hash = self.inner.hash_builder.hash_one(key);
722
723        self.inner.shards[self.shard(hash)]
724            .read()
725            .with(|shard| shard.indexer.get(hash, key).is_some())
726    }
727
728    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::touch"))]
729    pub fn touch<Q>(&self, key: &Q) -> bool
730    where
731        Q: Hash + Equivalent<E::Key> + ?Sized,
732    {
733        let hash = self.inner.hash_builder.hash_one(key);
734
735        match E::acquire() {
736            Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
737            Op::Immutable(_) => self.inner.shards[self.shard(hash)]
738                .read()
739                .with(|shard| shard.get_immutable(hash, key)),
740            Op::Mutable(_) => self.inner.shards[self.shard(hash)]
741                .write()
742                .with(|mut shard| shard.get_mutable(hash, key)),
743        }
744        .is_some()
745    }
746
747    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::clear"))]
748    pub fn clear(&self) {
749        self.inner.clear();
750    }
751
752    pub fn capacity(&self) -> usize {
753        self.inner.capacity
754    }
755
756    pub fn usage(&self) -> usize {
757        self.inner.shards.iter().map(|shard| shard.read().usage).sum()
758    }
759
760    pub fn metrics(&self) -> &Metrics {
761        &self.inner.metrics
762    }
763
764    pub fn hash_builder(&self) -> &Arc<S> {
765        &self.inner.hash_builder
766    }
767
768    pub fn shards(&self) -> usize {
769        self.inner.shards.len()
770    }
771
772    pub fn set_pipe(&self, pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>>) {
773        self.inner.pipe.store(Arc::new(pipe));
774    }
775
776    fn shard(&self, hash: u64) -> usize {
777        hash as usize % self.inner.shards.len()
778    }
779
780    fn shard_capacity_for(total: usize, shards: usize, index: usize) -> usize {
781        let base = total / shards;
782        let remainder = total % shards;
783        base + usize::from(index < remainder)
784    }
785}
786
787pub struct RawCacheEntry<E, S, I>
788where
789    E: Eviction,
790    S: HashBuilder,
791    I: Indexer<Eviction = E>,
792{
793    inner: Arc<RawCacheInner<E, S, I>>,
794    record: Arc<Record<E>>,
795    source: Source,
796}
797
798impl<E, S, I> Debug for RawCacheEntry<E, S, I>
799where
800    E: Eviction,
801    S: HashBuilder,
802    I: Indexer<Eviction = E>,
803{
804    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
805        f.debug_struct("RawCacheEntry").field("record", &self.record).finish()
806    }
807}
808
809impl<E, S, I> Drop for RawCacheEntry<E, S, I>
810where
811    E: Eviction,
812    S: HashBuilder,
813    I: Indexer<Eviction = E>,
814{
815    fn drop(&mut self) {
816        let hash = self.record.hash();
817        let shard = &self.inner.shards[hash as usize % self.inner.shards.len()];
818
819        if self.record.dec_refs(1) == 0 {
820            if self.record.properties().phantom().unwrap_or_default() {
821                if let Some(listener) = self.inner.event_listener.as_ref() {
822                    listener.on_leave(Event::Evict, self.record.key(), self.record.value());
823                }
824                let pipe = self.inner.pipe.load();
825                if pipe.is_enabled() {
826                    pipe.send(Piece::new(self.record.clone()));
827                }
828                return;
829            }
830
831            match E::release() {
832                Op::Noop => {}
833                Op::Immutable(_) => shard.read().with(|shard| shard.release_immutable(&self.record)),
834                Op::Mutable(_) => shard.write().with(|mut shard| shard.release_mutable(&self.record)),
835            }
836        }
837    }
838}
839
840impl<E, S, I> Clone for RawCacheEntry<E, S, I>
841where
842    E: Eviction,
843    S: HashBuilder,
844    I: Indexer<Eviction = E>,
845{
846    fn clone(&self) -> Self {
847        self.record.inc_refs(1);
848        Self {
849            inner: self.inner.clone(),
850            record: self.record.clone(),
851            source: self.source,
852        }
853    }
854}
855
856impl<E, S, I> Deref for RawCacheEntry<E, S, I>
857where
858    E: Eviction,
859    S: HashBuilder,
860    I: Indexer<Eviction = E>,
861{
862    type Target = E::Value;
863
864    fn deref(&self) -> &Self::Target {
865        self.value()
866    }
867}
868
869unsafe impl<E, S, I> Send for RawCacheEntry<E, S, I>
870where
871    E: Eviction,
872    S: HashBuilder,
873    I: Indexer<Eviction = E>,
874{
875}
876
877unsafe impl<E, S, I> Sync for RawCacheEntry<E, S, I>
878where
879    E: Eviction,
880    S: HashBuilder,
881    I: Indexer<Eviction = E>,
882{
883}
884
885impl<E, S, I> RawCacheEntry<E, S, I>
886where
887    E: Eviction,
888    S: HashBuilder,
889    I: Indexer<Eviction = E>,
890{
891    pub fn hash(&self) -> u64 {
892        self.record.hash()
893    }
894
895    pub fn key(&self) -> &E::Key {
896        self.record.key()
897    }
898
899    pub fn value(&self) -> &E::Value {
900        self.record.value()
901    }
902
903    pub fn properties(&self) -> &E::Properties {
904        self.record.properties()
905    }
906
907    pub fn weight(&self) -> usize {
908        self.record.weight()
909    }
910
911    pub fn refs(&self) -> usize {
912        self.record.refs()
913    }
914
915    pub fn is_outdated(&self) -> bool {
916        !self.record.is_in_indexer()
917    }
918
919    pub fn piece(&self) -> Piece<E::Key, E::Value, E::Properties> {
920        Piece::new(self.record.clone())
921    }
922
923    pub fn source(&self) -> Source {
924        self.source
925    }
926}
927
928impl<E, S, I> RawCache<E, S, I>
929where
930    E: Eviction,
931    S: HashBuilder,
932    I: Indexer<Eviction = E>,
933{
934    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get_or_fetch"))]
935    pub fn get_or_fetch<Q, F, FU, IT, ER>(&self, key: &Q, fetch: F) -> RawGetOrFetch<E, S, I>
936    where
937        Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
938        F: FnOnce() -> FU,
939        FU: Future<Output = std::result::Result<IT, ER>> + Send + 'static,
940        IT: Into<FetchTarget<E::Key, E::Value, E::Properties>>,
941        ER: Into<anyhow::Error>,
942    {
943        let fut = fetch();
944        self.get_or_fetch_inner(
945            key,
946            || None,
947            || {
948                Some(Box::new(|_| {
949                    async {
950                        match fut.await {
951                            Ok(it) => Ok(it.into()),
952                            Err(e) => Err(Error::new(ErrorKind::External, "fetch failed").with_source(e)),
953                        }
954                    }
955                    .boxed()
956                }))
957            },
958            (),
959            &tokio::runtime::Handle::current().into(),
960        )
961    }
962
963    /// Advanced fetch with specified runtime.
964    ///
965    /// This function is for internal usage and the doc is hidden.
966    #[doc(hidden)]
967    #[cfg_attr(
968        feature = "tracing",
969        fastrace::trace(name = "foyer::memory::raw::get_or_fetch_inner")
970    )]
971    pub fn get_or_fetch_inner<Q, C, FO, FR>(
972        &self,
973        key: &Q,
974        fo: FO,
975        fr: FR,
976        ctx: C,
977        runtime: &SingletonHandle,
978    ) -> RawGetOrFetch<E, S, I>
979    where
980        Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
981        C: Any + Send + Sync + 'static,
982        FO: FnOnce() -> Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
983        FR: FnOnce() -> Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
984    {
985        let hash = self.inner.hash_builder.hash_one(key);
986
987        // Make sure cache query and inflight query are in the same lock critical section.
988        let extract = |key: &Q, opt: Option<Arc<Record<E>>>, inflights: &Arc<Mutex<InflightManager<E, S, I>>>| {
989            opt.map(|record| {
990                RawGetOrFetch::Hit(Some(RawCacheEntry {
991                    inner: self.inner.clone(),
992                    record,
993                    source: Source::Memory,
994                }))
995            })
996            .unwrap_or_else(|| match inflights.lock().enqueue(hash, key, fr()) {
997                Enqueue::Lead {
998                    id,
999                    close,
1000                    waiter,
1001                    required_fetch_builder,
1002                } => {
1003                    let fetch = RawFetch {
1004                        state: RawFetchState::Init {
1005                            optional_fetch_builder: fo(),
1006                            required_fetch_builder,
1007                        },
1008                        id,
1009                        hash,
1010                        key: Some(key.to_owned()),
1011                        ctx,
1012                        cache: self.clone(),
1013                        inflights: inflights.clone(),
1014                        close,
1015                    };
1016                    runtime.spawn(fetch);
1017                    RawGetOrFetch::Miss(RawWait { waiter })
1018                }
1019                Enqueue::Wait(waiter) => RawGetOrFetch::Miss(RawWait { waiter }),
1020            })
1021        };
1022
1023        let res = match E::acquire() {
1024            Op::Noop => self.inner.shards[self.shard(hash)]
1025                .read()
1026                .with(|shard| extract(key, shard.get_noop(hash, key), &shard.inflights)),
1027            Op::Immutable(_) => self.inner.shards[self.shard(hash)]
1028                .read()
1029                .with(|shard| extract(key, shard.get_immutable(hash, key), &shard.inflights)),
1030            Op::Mutable(_) => self.inner.shards[self.shard(hash)]
1031                .write()
1032                .with(|mut shard| extract(key, shard.get_mutable(hash, key), &shard.inflights)),
1033        };
1034
1035        res
1036    }
1037}
1038
1039#[must_use]
1040#[pin_project(project = RawGetOrFetchProj)]
1041pub enum RawGetOrFetch<E, S, I>
1042where
1043    E: Eviction,
1044    S: HashBuilder,
1045    I: Indexer<Eviction = E>,
1046{
1047    Hit(Option<RawCacheEntry<E, S, I>>),
1048    Miss(#[pin] RawWait<E, S, I>),
1049}
1050
1051impl<E, S, I> Debug for RawGetOrFetch<E, S, I>
1052where
1053    E: Eviction,
1054    S: HashBuilder,
1055    I: Indexer<Eviction = E>,
1056{
1057    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1058        match self {
1059            Self::Hit(e) => f.debug_tuple("Hit").field(e).finish(),
1060            Self::Miss(fut) => f.debug_tuple("Miss").field(fut).finish(),
1061        }
1062    }
1063}
1064
1065impl<E, S, I> Future for RawGetOrFetch<E, S, I>
1066where
1067    E: Eviction,
1068    S: HashBuilder,
1069    I: Indexer<Eviction = E>,
1070{
1071    type Output = Result<Option<RawCacheEntry<E, S, I>>>;
1072
1073    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1074        let this = self.project();
1075        match this {
1076            RawGetOrFetchProj::Hit(opt) => {
1077                assert!(opt.is_some(), "entry is already taken");
1078                Poll::Ready(Ok(opt.take()))
1079            }
1080            RawGetOrFetchProj::Miss(fut) => fut.poll(cx),
1081        }
1082    }
1083}
1084
1085impl<E, S, I> RawGetOrFetch<E, S, I>
1086where
1087    E: Eviction,
1088    S: HashBuilder,
1089    I: Indexer<Eviction = E>,
1090{
1091    pub fn need_await(&self) -> bool {
1092        matches!(self, Self::Miss(_))
1093    }
1094
1095    #[expect(clippy::allow_attributes)]
1096    #[allow(clippy::result_large_err)]
1097    pub fn try_unwrap(self) -> std::result::Result<Option<RawCacheEntry<E, S, I>>, Self> {
1098        match self {
1099            Self::Hit(opt) => {
1100                assert!(opt.is_some(), "entry is already taken");
1101                Ok(opt)
1102            }
1103            Self::Miss(_) => Err(self),
1104        }
1105    }
1106}
1107
1108type Once<T> = Option<T>;
1109
1110#[must_use]
1111enum Try<E, S, I, C>
1112where
1113    E: Eviction,
1114    S: HashBuilder,
1115    I: Indexer<Eviction = E>,
1116    C: Any + Send + 'static,
1117{
1118    Noop,
1119    SetStateAndContinue(RawFetchState<E, S, I, C>),
1120    Ready,
1121}
1122
1123macro_rules! handle_try {
1124    ($state:expr, $method:ident($($args:expr),* $(,)?)) => {
1125        handle_try! { $state, Self::$method($($args),*) }
1126    };
1127
1128    ($state:expr, $try:expr) => {
1129        match $try {
1130            Try::Noop => {}
1131            Try::SetStateAndContinue(state) => {
1132                $state = state;
1133                continue;
1134            },
1135            Try::Ready => {
1136                $state = RawFetchState::Ready;
1137                return Poll::Ready(())
1138            },
1139        }
1140    };
1141}
1142
1143#[expect(clippy::type_complexity)]
1144pub enum RawFetchState<E, S, I, C>
1145where
1146    E: Eviction,
1147    S: HashBuilder,
1148    I: Indexer<Eviction = E>,
1149{
1150    Init {
1151        optional_fetch_builder: Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1152        required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1153    },
1154    FetchOptional {
1155        optional_fetch: OptionalFetch<FetchTarget<E::Key, E::Value, E::Properties>>,
1156        required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1157    },
1158    FetchRequired {
1159        required_fetch: RequiredFetch<FetchTarget<E::Key, E::Value, E::Properties>>,
1160    },
1161    Notify {
1162        res: Option<Result<Option<RawCacheEntry<E, S, I>>>>,
1163        notifiers: Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
1164    },
1165    Ready,
1166}
1167
1168impl<E, S, I, C> Debug for RawFetchState<E, S, I, C>
1169where
1170    E: Eviction,
1171    S: HashBuilder,
1172    I: Indexer<Eviction = E>,
1173{
1174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1175        match self {
1176            Self::Init { .. } => f.debug_struct("Init").finish(),
1177            Self::FetchOptional { .. } => f.debug_struct("Optional").finish(),
1178            Self::FetchRequired { .. } => f.debug_struct("Required").finish(),
1179            Self::Notify { res, .. } => f.debug_struct("Notify").field("res", res).finish(),
1180            Self::Ready => f.debug_struct("Ready").finish(),
1181        }
1182    }
1183}
1184
1185#[pin_project]
1186pub struct RawWait<E, S, I>
1187where
1188    E: Eviction,
1189    S: HashBuilder,
1190    I: Indexer<Eviction = E>,
1191{
1192    waiter: Waiter<Option<RawCacheEntry<E, S, I>>>,
1193}
1194
1195impl<E, S, I> Debug for RawWait<E, S, I>
1196where
1197    E: Eviction,
1198    S: HashBuilder,
1199    I: Indexer<Eviction = E>,
1200{
1201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1202        f.debug_struct("RawWait").field("waiter", &self.waiter).finish()
1203    }
1204}
1205
1206impl<E, S, I> Future for RawWait<E, S, I>
1207where
1208    E: Eviction,
1209    S: HashBuilder,
1210    I: Indexer<Eviction = E>,
1211{
1212    type Output = Result<Option<RawCacheEntry<E, S, I>>>;
1213
1214    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1215        let this = self.project();
1216        // TODO(MrCroxx): Switch to `Result::flatten` after MSRV is 1.89+
1217        // return waiter.poll_unpin(cx).map(|r| r.map_err(|e| e.into()).flatten());
1218        this.waiter.poll_unpin(cx).map(|r| match r {
1219            Ok(r) => r,
1220            Err(e) => Err(Error::new(ErrorKind::ChannelClosed, "waiter channel closed").with_source(e)),
1221        })
1222    }
1223}
1224
1225#[pin_project(PinnedDrop)]
1226pub struct RawFetch<E, S, I, C>
1227where
1228    E: Eviction,
1229    S: HashBuilder,
1230    I: Indexer<Eviction = E>,
1231{
1232    state: RawFetchState<E, S, I, C>,
1233    id: usize,
1234    hash: u64,
1235    key: Once<E::Key>,
1236    ctx: C,
1237    cache: RawCache<E, S, I>,
1238    inflights: Arc<Mutex<InflightManager<E, S, I>>>,
1239    close: Arc<AtomicBool>,
1240}
1241
1242impl<E, S, I, C> Debug for RawFetch<E, S, I, C>
1243where
1244    E: Eviction,
1245    S: HashBuilder,
1246    I: Indexer<Eviction = E>,
1247{
1248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1249        f.debug_struct("RawFetch")
1250            .field("state", &self.state)
1251            .field("id", &self.id)
1252            .field("hash", &self.hash)
1253            .finish()
1254    }
1255}
1256
1257impl<E, S, I, C> Future for RawFetch<E, S, I, C>
1258where
1259    E: Eviction,
1260    S: HashBuilder,
1261    I: Indexer<Eviction = E>,
1262    C: Any + Send + 'static,
1263{
1264    type Output = ();
1265
1266    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1267        let this = self.as_mut().project();
1268        loop {
1269            match this.state {
1270                RawFetchState::Init {
1271                    optional_fetch_builder,
1272                    required_fetch_builder,
1273                } => {
1274                    handle_try! { *this.state, try_set_optional(optional_fetch_builder, required_fetch_builder, this.ctx) }
1275                    handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), this.inflights, Ok(None)) }
1276                }
1277                RawFetchState::FetchOptional {
1278                    optional_fetch,
1279                    required_fetch_builder,
1280                } => {
1281                    if this.close.load(Ordering::Relaxed) {
1282                        return Poll::Ready(());
1283                    }
1284                    match optional_fetch.poll_unpin(cx) {
1285                        Poll::Pending => return Poll::Pending,
1286                        Poll::Ready(Ok(Some(target))) => {
1287                            handle_try! {*this.state, handle_target(target, this.key, this.cache, Source::Disk) }
1288                        }
1289                        Poll::Ready(Ok(None)) => {
1290                            handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), &this.inflights, Ok(None)) }
1291                        }
1292                        Poll::Ready(Err(e)) => {
1293                            handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), &this.inflights, Err(e)) }
1294                        }
1295                    }
1296                }
1297                RawFetchState::FetchRequired { required_fetch } => {
1298                    if this.close.load(Ordering::Relaxed) {
1299                        return Poll::Ready(());
1300                    }
1301                    match required_fetch.poll_unpin(cx) {
1302                        Poll::Pending => return Poll::Pending,
1303                        Poll::Ready(Ok(target)) => {
1304                            handle_try! { *this.state, handle_target(target, this.key, this.cache, Source::Outer) }
1305                        }
1306                        Poll::Ready(Err(e)) => {
1307                            handle_try! { *this.state, handle_error(e, *this.id, *this.hash, this.key.as_ref().unwrap(), this.inflights) }
1308                        }
1309                    }
1310                }
1311                RawFetchState::Notify { res, notifiers } => {
1312                    handle_try! { *this.state, handle_notify(res.take().unwrap(), notifiers) }
1313                }
1314                RawFetchState::Ready => return Poll::Ready(()),
1315            }
1316        }
1317    }
1318}
1319
1320impl<E, S, I, C> RawFetch<E, S, I, C>
1321where
1322    E: Eviction,
1323    S: HashBuilder,
1324    I: Indexer<Eviction = E>,
1325    C: Any + Send + 'static,
1326{
1327    #[expect(clippy::type_complexity)]
1328    fn try_set_optional(
1329        optional_fetch_builder: &mut Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1330        required_fetch_builder: &mut Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1331        ctx: &mut C,
1332    ) -> Try<E, S, I, C> {
1333        match optional_fetch_builder.take() {
1334            None => Try::Noop,
1335            Some(optional_fetch_builder) => {
1336                let optional_fetch = optional_fetch_builder(ctx);
1337                Try::SetStateAndContinue(RawFetchState::FetchOptional {
1338                    optional_fetch,
1339                    required_fetch_builder: required_fetch_builder.take(),
1340                })
1341            }
1342        }
1343    }
1344
1345    #[expect(clippy::type_complexity)]
1346    fn try_set_required(
1347        required_fetch_builder: &mut Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1348        ctx: &mut C,
1349        id: usize,
1350        hash: u64,
1351        key: &E::Key,
1352        inflights: &Arc<Mutex<InflightManager<E, S, I>>>,
1353        res_no_fetch: Result<Option<RawCacheEntry<E, S, I>>>,
1354    ) -> Try<E, S, I, C> {
1355        // Fast path if the required fetch builder is provided.
1356        match required_fetch_builder.take() {
1357            None => {}
1358            Some(required_fetch_builder) => {
1359                let required_fetch = required_fetch_builder(ctx);
1360                return Try::SetStateAndContinue(RawFetchState::FetchRequired { required_fetch });
1361            }
1362        }
1363        // Slow path if the leader has no optional fetch.
1364        let fetch_or_take = match inflights.lock().fetch_or_take(hash, key, id) {
1365            Some(fetch_or_take) => fetch_or_take,
1366            None => return Try::Ready,
1367        };
1368        match fetch_or_take {
1369            FetchOrTake::Fetch(required_fetch_builder) => {
1370                let required_fetch = required_fetch_builder(ctx);
1371                Try::SetStateAndContinue(RawFetchState::FetchRequired { required_fetch })
1372            }
1373            FetchOrTake::Notifiers(notifiers) => Try::SetStateAndContinue(RawFetchState::Notify {
1374                res: Some(res_no_fetch),
1375                notifiers,
1376            }),
1377        }
1378    }
1379
1380    fn handle_target(
1381        target: FetchTarget<E::Key, E::Value, E::Properties>,
1382        key: &mut Once<E::Key>,
1383        cache: &RawCache<E, S, I>,
1384        source: Source,
1385    ) -> Try<E, S, I, C> {
1386        match target {
1387            FetchTarget::Entry { value, properties } => {
1388                let key = key.take().unwrap();
1389                cache.insert_with_properties_inner(key, value, properties, source);
1390            }
1391            FetchTarget::Piece(piece) => {
1392                cache.insert_piece(piece);
1393            }
1394        }
1395        Try::Ready
1396    }
1397
1398    fn handle_error(
1399        e: Error,
1400        id: usize,
1401        hash: u64,
1402        key: &E::Key,
1403        inflights: &Arc<Mutex<InflightManager<E, S, I>>>,
1404    ) -> Try<E, S, I, C> {
1405        let notifiers = match inflights.lock().take(hash, key, Some(id)) {
1406            Some(notifiers) => notifiers,
1407            None => {
1408                return Try::Ready;
1409            }
1410        };
1411        Try::SetStateAndContinue(RawFetchState::Notify {
1412            res: Some(Err(e)),
1413            notifiers,
1414        })
1415    }
1416
1417    #[expect(clippy::type_complexity)]
1418    fn handle_notify(
1419        res: Result<Option<RawCacheEntry<E, S, I>>>,
1420        notifiers: &mut Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
1421    ) -> Try<E, S, I, C> {
1422        match res {
1423            Ok(e) => {
1424                for notifier in notifiers.drain(..) {
1425                    let _ = notifier.send(Ok(e.clone()));
1426                }
1427            }
1428            Err(e) => {
1429                for notifier in notifiers.drain(..) {
1430                    let _ = notifier.send(Err(e.clone()));
1431                }
1432            }
1433        }
1434        Try::Ready
1435    }
1436}
1437
1438#[pinned_drop]
1439impl<E, S, I, C> PinnedDrop for RawFetch<E, S, I, C>
1440where
1441    E: Eviction,
1442    S: HashBuilder,
1443    I: Indexer<Eviction = E>,
1444{
1445    fn drop(self: Pin<&mut Self>) {
1446        let this = self.project();
1447        match this.state {
1448            RawFetchState::Notify { .. } | RawFetchState::Ready => return,
1449            RawFetchState::Init { .. } | RawFetchState::FetchOptional { .. } | RawFetchState::FetchRequired { .. } => {}
1450        }
1451        if let Some(notifiers) = this
1452            .inflights
1453            .lock()
1454            .take(*this.hash, this.key.as_ref().unwrap(), Some(*this.id))
1455        {
1456            for notifier in notifiers {
1457                let _ =
1458                    notifier
1459                        .send(Err(Error::new(ErrorKind::TaskCancelled, "fetch task cancelled")
1460                            .with_context("hash", *this.hash)));
1461            }
1462        }
1463    }
1464}
1465
1466#[cfg(test)]
1467mod tests {
1468    use foyer_common::hasher::ModHasher;
1469    use rand::{rngs::SmallRng, seq::IndexedRandom, RngCore, SeedableRng};
1470
1471    use super::*;
1472    use crate::{
1473        eviction::{
1474            fifo::{Fifo, FifoConfig},
1475            lfu::{Lfu, LfuConfig},
1476            lru::{Lru, LruConfig},
1477            s3fifo::{S3Fifo, S3FifoConfig},
1478            sieve::{Sieve, SieveConfig},
1479            test_utils::TestProperties,
1480        },
1481        indexer::hash_table::HashTableIndexer,
1482        test_utils::PiecePipe,
1483    };
1484
1485    fn is_send_sync_static<T: Send + Sync + 'static>() {}
1486
1487    #[test]
1488    fn test_send_sync_static() {
1489        is_send_sync_static::<RawCache<Fifo<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1490        is_send_sync_static::<RawCache<S3Fifo<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1491        is_send_sync_static::<RawCache<Lfu<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1492        is_send_sync_static::<RawCache<Lru<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1493        is_send_sync_static::<RawCache<Sieve<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1494    }
1495
1496    #[expect(clippy::type_complexity)]
1497    fn fifo_cache_for_test(
1498    ) -> RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Fifo<u64, u64, TestProperties>>> {
1499        RawCache::new(RawCacheConfig {
1500            capacity: 256,
1501            shards: 4,
1502            eviction_config: FifoConfig::default(),
1503            hash_builder: Default::default(),
1504            weighter: Arc::new(|_, _| 1),
1505            filter: Arc::new(|_, _| true),
1506            event_listener: None,
1507            metrics: Arc::new(Metrics::noop()),
1508        })
1509    }
1510
1511    #[expect(clippy::type_complexity)]
1512    fn s3fifo_cache_for_test(
1513    ) -> RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<S3Fifo<u64, u64, TestProperties>>> {
1514        RawCache::new(RawCacheConfig {
1515            capacity: 256,
1516            shards: 4,
1517            eviction_config: S3FifoConfig::default(),
1518            hash_builder: Default::default(),
1519            weighter: Arc::new(|_, _| 1),
1520            filter: Arc::new(|_, _| true),
1521            event_listener: None,
1522            metrics: Arc::new(Metrics::noop()),
1523        })
1524    }
1525
1526    #[expect(clippy::type_complexity)]
1527    fn lru_cache_for_test(
1528    ) -> RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lru<u64, u64, TestProperties>>> {
1529        RawCache::new(RawCacheConfig {
1530            capacity: 256,
1531            shards: 4,
1532            eviction_config: LruConfig::default(),
1533            hash_builder: Default::default(),
1534            weighter: Arc::new(|_, _| 1),
1535            filter: Arc::new(|_, _| true),
1536            event_listener: None,
1537            metrics: Arc::new(Metrics::noop()),
1538        })
1539    }
1540
1541    #[expect(clippy::type_complexity)]
1542    fn lfu_cache_for_test(
1543    ) -> RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lfu<u64, u64, TestProperties>>> {
1544        RawCache::new(RawCacheConfig {
1545            capacity: 256,
1546            shards: 4,
1547            eviction_config: LfuConfig::default(),
1548            hash_builder: Default::default(),
1549            weighter: Arc::new(|_, _| 1),
1550            filter: Arc::new(|_, _| true),
1551            event_listener: None,
1552            metrics: Arc::new(Metrics::noop()),
1553        })
1554    }
1555
1556    #[expect(clippy::type_complexity)]
1557    fn sieve_cache_for_test(
1558    ) -> RawCache<Sieve<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Sieve<u64, u64, TestProperties>>> {
1559        RawCache::new(RawCacheConfig {
1560            capacity: 256,
1561            shards: 4,
1562            eviction_config: SieveConfig {},
1563            hash_builder: Default::default(),
1564            weighter: Arc::new(|_, _| 1),
1565            filter: Arc::new(|_, _| true),
1566            event_listener: None,
1567            metrics: Arc::new(Metrics::noop()),
1568        })
1569    }
1570
1571    #[test_log::test]
1572    fn test_insert_phantom() {
1573        let fifo = fifo_cache_for_test();
1574
1575        let e1 = fifo.insert_with_properties(1, 1, TestProperties::default().with_phantom(true));
1576        assert_eq!(fifo.usage(), 0);
1577        drop(e1);
1578        assert_eq!(fifo.usage(), 0);
1579
1580        let e2a = fifo.insert_with_properties(2, 2, TestProperties::default().with_phantom(true));
1581        assert_eq!(fifo.usage(), 0);
1582        assert!(fifo.get(&2).is_none());
1583        assert_eq!(fifo.usage(), 0);
1584        drop(e2a);
1585        assert_eq!(fifo.usage(), 0);
1586
1587        let fifo = fifo_cache_for_test();
1588        fifo.insert(1, 1);
1589        assert_eq!(fifo.usage(), 1);
1590        assert_eq!(fifo.get(&1).unwrap().value(), &1);
1591        let e2 = fifo.insert_with_properties(1, 100, TestProperties::default().with_phantom(true));
1592        assert_eq!(fifo.usage(), 0);
1593        drop(e2);
1594        assert_eq!(fifo.usage(), 0);
1595        assert!(fifo.get(&1).is_none());
1596    }
1597
1598    #[expect(clippy::type_complexity)]
1599    #[test_log::test]
1600    fn test_insert_filter() {
1601        let fifo: RawCache<
1602            Fifo<u64, u64, TestProperties>,
1603            ModHasher,
1604            HashTableIndexer<Fifo<u64, u64, TestProperties>>,
1605        > = RawCache::new(RawCacheConfig {
1606            capacity: 256,
1607            shards: 4,
1608            eviction_config: FifoConfig::default(),
1609            hash_builder: Default::default(),
1610            weighter: Arc::new(|_, _| 1),
1611            filter: Arc::new(|k, _| !matches!(*k, 42)),
1612            event_listener: None,
1613            metrics: Arc::new(Metrics::noop()),
1614        });
1615
1616        fifo.insert(1, 1);
1617        fifo.insert(2, 2);
1618        fifo.insert(42, 42);
1619        assert_eq!(fifo.usage(), 2);
1620        assert_eq!(fifo.get(&1).unwrap().value(), &1);
1621        assert_eq!(fifo.get(&2).unwrap().value(), &2);
1622        assert!(fifo.get(&42).is_none());
1623    }
1624
1625    #[test]
1626    fn test_evict_all() {
1627        let pipe = Box::new(PiecePipe::default());
1628
1629        let fifo = fifo_cache_for_test();
1630        fifo.set_pipe(pipe.clone());
1631        for i in 0..fifo.capacity() as _ {
1632            fifo.insert(i, i);
1633        }
1634        assert_eq!(fifo.usage(), fifo.capacity());
1635
1636        fifo.evict_all();
1637        let mut pieces = pipe
1638            .pieces()
1639            .iter()
1640            .map(|p| (p.hash(), *p.key(), *p.value()))
1641            .collect_vec();
1642        pieces.sort_by_key(|t| t.0);
1643        let expected = (0..fifo.capacity() as u64).map(|i| (i, i, i)).collect_vec();
1644        assert_eq!(pieces, expected);
1645    }
1646
1647    #[test]
1648    fn test_insert_size_over_capacity() {
1649        let cache: RawCache<Fifo<Vec<u8>, Vec<u8>, TestProperties>, ModHasher, HashTableIndexer<_>> =
1650            RawCache::new(RawCacheConfig {
1651                capacity: 4 * 1024, // 4KB
1652                shards: 1,
1653                eviction_config: FifoConfig::default(),
1654                hash_builder: Default::default(),
1655                weighter: Arc::new(|k, v| k.len() + v.len()),
1656                filter: Arc::new(|_, _| true),
1657                event_listener: None,
1658                metrics: Arc::new(Metrics::noop()),
1659            });
1660
1661        let key = vec![b'k'; 1024]; // 1KB
1662        let value = vec![b'v'; 5 * 1024]; // 5KB
1663
1664        cache.insert(key.clone(), value.clone());
1665        assert_eq!(cache.usage(), 6 * 1024);
1666        assert_eq!(cache.get(&key).unwrap().value(), &value);
1667    }
1668
1669    #[test]
1670    fn test_capacity_distribution_without_loss() {
1671        let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1672            RawCache::new(RawCacheConfig {
1673                capacity: 3,
1674                shards: 2,
1675                eviction_config: FifoConfig::default(),
1676                hash_builder: Default::default(),
1677                weighter: Arc::new(|_, _| 1),
1678                filter: Arc::new(|_, _| true),
1679                event_listener: None,
1680                metrics: Arc::new(Metrics::noop()),
1681            });
1682
1683        for key in 0..3 {
1684            let entry = cache.insert(key, key);
1685            drop(entry);
1686        }
1687
1688        assert_eq!(cache.usage(), 3);
1689
1690        for key in 0..3 {
1691            let entry = cache.get(&key).expect("entry should exist");
1692            assert_eq!(*entry, key);
1693            drop(entry);
1694        }
1695    }
1696
1697    #[test]
1698    fn test_capacity_distribution_with_more_shards_than_capacity() {
1699        let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1700            RawCache::new(RawCacheConfig {
1701                capacity: 2,
1702                shards: 4,
1703                eviction_config: FifoConfig::default(),
1704                hash_builder: Default::default(),
1705                weighter: Arc::new(|_, _| 1),
1706                filter: Arc::new(|_, _| true),
1707                event_listener: None,
1708                metrics: Arc::new(Metrics::noop()),
1709            });
1710
1711        for key in 0..2 {
1712            let entry = cache.insert(key, key);
1713            drop(entry);
1714        }
1715
1716        assert_eq!(cache.usage(), 2);
1717
1718        for key in 0..2 {
1719            let entry = cache.get(&key).expect("entry should exist");
1720            assert_eq!(*entry, key);
1721            drop(entry);
1722        }
1723
1724        assert!(cache.get(&2).is_none());
1725    }
1726
1727    fn test_resize<E>(cache: &RawCache<E, ModHasher, HashTableIndexer<E>>)
1728    where
1729        E: Eviction<Key = u64, Value = u64>,
1730    {
1731        let capacity = cache.capacity();
1732        for i in 0..capacity as u64 * 2 {
1733            cache.insert(i, i);
1734        }
1735        assert_eq!(cache.usage(), capacity);
1736        cache.resize(capacity / 2).unwrap();
1737        assert_eq!(cache.usage(), capacity / 2);
1738        for i in 0..capacity as u64 * 2 {
1739            cache.insert(i, i);
1740        }
1741        assert_eq!(cache.usage(), capacity / 2);
1742    }
1743
1744    #[test]
1745    fn test_fifo_cache_resize() {
1746        let cache = fifo_cache_for_test();
1747        test_resize(&cache);
1748    }
1749
1750    #[test]
1751    fn test_s3fifo_cache_resize() {
1752        let cache = s3fifo_cache_for_test();
1753        test_resize(&cache);
1754    }
1755
1756    #[test]
1757    fn test_lru_cache_resize() {
1758        let cache = lru_cache_for_test();
1759        test_resize(&cache);
1760    }
1761
1762    #[test]
1763    fn test_lfu_cache_resize() {
1764        let cache = lfu_cache_for_test();
1765        test_resize(&cache);
1766    }
1767
1768    #[test]
1769    fn test_sieve_cache_resize() {
1770        let cache = sieve_cache_for_test();
1771        test_resize(&cache);
1772    }
1773
1774    mod fuzzy {
1775        use foyer_common::properties::Hint;
1776
1777        use super::*;
1778
1779        fn fuzzy<E, S>(cache: RawCache<E, S, HashTableIndexer<E>>, hints: Vec<Hint>)
1780        where
1781            E: Eviction<Key = u64, Value = u64, Properties = TestProperties>,
1782            S: HashBuilder,
1783        {
1784            let handles = (0..8)
1785                .map(|i| {
1786                    let c = cache.clone();
1787                    let hints = hints.clone();
1788                    std::thread::spawn(move || {
1789                        let mut rng = SmallRng::seed_from_u64(i);
1790                        for _ in 0..100000 {
1791                            let key = rng.next_u64();
1792                            if let Some(entry) = c.get(&key) {
1793                                assert_eq!(key, *entry);
1794                                drop(entry);
1795                                continue;
1796                            }
1797                            let hint = hints.choose(&mut rng).cloned().unwrap();
1798                            c.insert_with_properties(key, key, TestProperties::default().with_hint(hint));
1799                        }
1800                    })
1801                })
1802                .collect_vec();
1803
1804            handles.into_iter().for_each(|handle| handle.join().unwrap());
1805
1806            assert_eq!(cache.usage(), cache.capacity());
1807        }
1808
1809        #[test_log::test]
1810        fn test_fifo_cache_fuzzy() {
1811            let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1812                RawCache::new(RawCacheConfig {
1813                    capacity: 256,
1814                    shards: 4,
1815                    eviction_config: FifoConfig::default(),
1816                    hash_builder: Default::default(),
1817                    weighter: Arc::new(|_, _| 1),
1818                    filter: Arc::new(|_, _| true),
1819                    event_listener: None,
1820                    metrics: Arc::new(Metrics::noop()),
1821                });
1822            let hints = vec![Hint::Normal];
1823            fuzzy(cache, hints);
1824        }
1825
1826        #[test_log::test]
1827        fn test_s3fifo_cache_fuzzy() {
1828            let cache: RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1829                RawCache::new(RawCacheConfig {
1830                    capacity: 256,
1831                    shards: 4,
1832                    eviction_config: S3FifoConfig::default(),
1833                    hash_builder: Default::default(),
1834                    weighter: Arc::new(|_, _| 1),
1835                    filter: Arc::new(|_, _| true),
1836                    event_listener: None,
1837                    metrics: Arc::new(Metrics::noop()),
1838                });
1839            let hints = vec![Hint::Normal];
1840            fuzzy(cache, hints);
1841        }
1842
1843        #[test_log::test]
1844        fn test_lru_cache_fuzzy() {
1845            let cache: RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1846                RawCache::new(RawCacheConfig {
1847                    capacity: 256,
1848                    shards: 4,
1849                    eviction_config: LruConfig::default(),
1850                    hash_builder: Default::default(),
1851                    weighter: Arc::new(|_, _| 1),
1852                    filter: Arc::new(|_, _| true),
1853                    event_listener: None,
1854                    metrics: Arc::new(Metrics::noop()),
1855                });
1856            let hints = vec![Hint::Normal, Hint::Low];
1857            fuzzy(cache, hints);
1858        }
1859
1860        #[test_log::test]
1861        fn test_lfu_cache_fuzzy() {
1862            let cache: RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1863                RawCache::new(RawCacheConfig {
1864                    capacity: 256,
1865                    shards: 4,
1866                    eviction_config: LfuConfig::default(),
1867                    hash_builder: Default::default(),
1868                    weighter: Arc::new(|_, _| 1),
1869                    filter: Arc::new(|_, _| true),
1870                    event_listener: None,
1871                    metrics: Arc::new(Metrics::noop()),
1872                });
1873            let hints = vec![Hint::Normal];
1874            fuzzy(cache, hints);
1875        }
1876
1877        #[test_log::test]
1878        fn test_sieve_cache_fuzzy() {
1879            let cache: RawCache<Sieve<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1880                RawCache::new(RawCacheConfig {
1881                    capacity: 256,
1882                    shards: 4,
1883                    eviction_config: SieveConfig {},
1884                    hash_builder: Default::default(),
1885                    weighter: Arc::new(|_, _| 1),
1886                    filter: Arc::new(|_, _| true),
1887                    event_listener: None,
1888                    metrics: Arc::new(Metrics::noop()),
1889                });
1890            let hints = vec![Hint::Normal];
1891            fuzzy(cache, hints);
1892        }
1893    }
1894}