foyer_memory/
raw.rs

1// Copyright 2026 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    any::Any,
17    fmt::Debug,
18    future::Future,
19    hash::Hash,
20    ops::Deref,
21    pin::Pin,
22    sync::{
23        atomic::{AtomicBool, Ordering},
24        Arc,
25    },
26    task::{Context, Poll},
27};
28
29use equivalent::Equivalent;
30use foyer_common::{
31    code::HashBuilder,
32    error::{Error, ErrorKind, Result},
33    event::{Event, EventListener},
34    metrics::Metrics,
35    properties::{Location, Properties, Source},
36    spawn::Spawner,
37    strict_assert,
38    utils::scope::Scope,
39};
40use futures_util::FutureExt as _;
41use itertools::Itertools;
42use parking_lot::{Mutex, RwLock};
43use pin_project::{pin_project, pinned_drop};
44
45use crate::{
46    eviction::{Eviction, Op},
47    indexer::{sentry::Sentry, Indexer},
48    inflight::{
49        Enqueue, FetchOrTake, FetchTarget, InflightManager, Notifier, OptionalFetch, OptionalFetchBuilder,
50        RequiredFetch, RequiredFetchBuilder, Waiter,
51    },
52    pipe::{ArcPipe, NoopPipe},
53    record::{Data, Record},
54    Piece,
55};
56
57/// The weighter for the in-memory cache.
58///
59/// The weighter is used to calculate the weight of the cache entry.
60pub trait Weighter<K, V>: Fn(&K, &V) -> usize + Send + Sync + 'static {}
61impl<K, V, T> Weighter<K, V> for T where T: Fn(&K, &V) -> usize + Send + Sync + 'static {}
62
63/// The filter for the in-memory cache.
64///
65/// The filter is used to decide whether to admit or reject an entry based on its key and value.
66///
67/// If the filter returns true, the key value can be inserted into the in-memory cache;
68/// otherwise, the key value cannot be inserted.
69///
70/// To ensure API consistency, the in-memory cache will still return a cache entry,
71/// but it will not count towards the in-memory cache usage,
72/// and it will be immediately reclaimed when the cache entry is dropped.
73pub trait Filter<K, V>: Fn(&K, &V) -> bool + Send + Sync + 'static {}
74impl<K, V, T> Filter<K, V> for T where T: Fn(&K, &V) -> bool + Send + Sync + 'static {}
75
76pub struct RawCacheConfig<E, S>
77where
78    E: Eviction,
79    S: HashBuilder,
80{
81    pub capacity: usize,
82    pub shards: usize,
83    pub eviction_config: E::Config,
84    pub hash_builder: S,
85    pub weighter: Arc<dyn Weighter<E::Key, E::Value>>,
86    pub filter: Arc<dyn Filter<E::Key, E::Value>>,
87    pub event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
88    pub metrics: Arc<Metrics>,
89}
90
91struct RawCacheShard<E, S, I>
92where
93    E: Eviction,
94    S: HashBuilder,
95    I: Indexer<Eviction = E>,
96{
97    eviction: E,
98    indexer: Sentry<I>,
99
100    usage: usize,
101    entries: usize,
102    capacity: usize,
103
104    inflights: Arc<Mutex<InflightManager<E, S, I>>>,
105
106    metrics: Arc<Metrics>,
107    _event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
108}
109
110impl<E, S, I> RawCacheShard<E, S, I>
111where
112    E: Eviction,
113    S: HashBuilder,
114    I: Indexer<Eviction = E>,
115{
116    /// Evict entries to fit the target usage.
117    fn evict(&mut self, target: usize, garbages: &mut Vec<(Event, Arc<Record<E>>)>) {
118        // Evict overflow records.
119        while self.usage > target {
120            let evicted = match self.eviction.pop() {
121                Some(evicted) => evicted,
122                None => break,
123            };
124            self.metrics.memory_evict.increase(1);
125
126            let e = self.indexer.remove(evicted.hash(), evicted.key()).unwrap();
127            assert_eq!(Arc::as_ptr(&evicted), Arc::as_ptr(&e));
128
129            strict_assert!(!evicted.as_ref().is_in_indexer());
130            strict_assert!(!evicted.as_ref().is_in_eviction());
131
132            self.usage -= evicted.weight();
133            self.entries -= 1;
134            self.metrics.memory_entries.decrease(1);
135
136            garbages.push((Event::Evict, evicted));
137        }
138    }
139
140    #[expect(clippy::type_complexity)]
141    fn emplace(
142        &mut self,
143        record: Arc<Record<E>>,
144        garbages: &mut Vec<(Event, Arc<Record<E>>)>,
145        notifiers: &mut Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
146    ) {
147        *notifiers = self
148            .inflights
149            .lock()
150            .take(record.hash(), record.key(), None)
151            .unwrap_or_default();
152
153        if record.properties().phantom().unwrap_or_default() {
154            if let Some(old) = self.indexer.remove(record.hash(), record.key()) {
155                strict_assert!(!old.is_in_indexer());
156
157                if old.is_in_eviction() {
158                    self.eviction.remove(&old);
159                }
160                strict_assert!(!old.is_in_eviction());
161
162                self.usage -= old.weight();
163                self.entries -= 1;
164                self.metrics.memory_entries.decrease(1);
165
166                garbages.push((Event::Replace, old));
167            }
168            record.inc_refs(notifiers.len() + 1);
169            garbages.push((Event::Remove, record));
170            self.metrics.memory_insert.increase(1);
171            return;
172        }
173
174        let weight = record.weight();
175        let old_usage = self.usage;
176
177        // Evict overflow records.
178        self.evict(self.capacity.saturating_sub(weight), garbages);
179
180        // Insert new record
181        if let Some(old) = self.indexer.insert(record.clone()) {
182            self.metrics.memory_replace.increase(1);
183
184            strict_assert!(!old.is_in_indexer());
185
186            if old.is_in_eviction() {
187                self.eviction.remove(&old);
188            }
189            strict_assert!(!old.is_in_eviction());
190
191            self.usage -= old.weight();
192
193            garbages.push((Event::Replace, old));
194        } else {
195            self.metrics.memory_insert.increase(1);
196            self.entries += 1;
197            self.metrics.memory_entries.increase(1);
198        }
199        strict_assert!(record.is_in_indexer());
200
201        strict_assert!(!record.is_in_eviction());
202        self.eviction.push(record.clone());
203        strict_assert!(record.is_in_eviction());
204
205        self.usage += weight;
206        // Increase the reference count within the lock section.
207        // The reference count of the new record must be at the moment.
208        record.inc_refs(notifiers.len() + 1);
209
210        match self.usage.cmp(&old_usage) {
211            std::cmp::Ordering::Greater => self.metrics.memory_usage.increase((self.usage - old_usage) as _),
212            std::cmp::Ordering::Less => self.metrics.memory_usage.decrease((old_usage - self.usage) as _),
213            std::cmp::Ordering::Equal => {}
214        }
215    }
216
217    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::remove"))]
218    fn remove<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
219    where
220        Q: Hash + Equivalent<E::Key> + ?Sized,
221    {
222        let record = self.indexer.remove(hash, key)?;
223
224        if record.is_in_eviction() {
225            self.eviction.remove(&record);
226        }
227        strict_assert!(!record.is_in_indexer());
228        strict_assert!(!record.is_in_eviction());
229
230        self.usage -= record.weight();
231        self.entries -= 1;
232
233        self.metrics.memory_remove.increase(1);
234        self.metrics.memory_usage.decrease(record.weight() as _);
235        self.metrics.memory_entries.decrease(1);
236
237        record.inc_refs(1);
238
239        Some(record)
240    }
241
242    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_noop"))]
243    fn get_noop<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
244    where
245        Q: Hash + Equivalent<E::Key> + ?Sized,
246    {
247        self.get_inner(hash, key)
248    }
249
250    #[cfg_attr(
251        feature = "tracing",
252        fastrace::trace(name = "foyer::memory::raw::shard::get_immutable")
253    )]
254    fn get_immutable<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
255    where
256        Q: Hash + Equivalent<E::Key> + ?Sized,
257    {
258        self.get_inner(hash, key)
259            .inspect(|record| self.acquire_immutable(record))
260    }
261
262    #[cfg_attr(
263        feature = "tracing",
264        fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")
265    )]
266    fn get_mutable<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
267    where
268        Q: Hash + Equivalent<E::Key> + ?Sized,
269    {
270        self.get_inner(hash, key).inspect(|record| self.acquire_mutable(record))
271    }
272
273    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_inner"))]
274    fn get_inner<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
275    where
276        Q: Hash + Equivalent<E::Key> + ?Sized,
277    {
278        let record = match self.indexer.get(hash, key).cloned() {
279            Some(record) => {
280                self.metrics.memory_hit.increase(1);
281                record
282            }
283            None => {
284                self.metrics.memory_miss.increase(1);
285                return None;
286            }
287        };
288
289        strict_assert!(record.is_in_indexer());
290
291        record.inc_refs(1);
292
293        Some(record)
294    }
295
296    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::clear"))]
297    fn clear(&mut self, garbages: &mut Vec<Arc<Record<E>>>) {
298        let records = self.indexer.drain().collect_vec();
299        self.eviction.clear();
300
301        let mut count = 0;
302
303        for record in records {
304            count += 1;
305            strict_assert!(!record.is_in_indexer());
306            strict_assert!(!record.is_in_eviction());
307
308            garbages.push(record);
309        }
310
311        self.entries = 0;
312        if count > 0 {
313            self.metrics.memory_entries.decrease(count);
314            self.metrics.memory_remove.increase(count);
315        }
316    }
317
318    #[cfg_attr(
319        feature = "tracing",
320        fastrace::trace(name = "foyer::memory::raw::shard::acquire_immutable")
321    )]
322    fn acquire_immutable(&self, record: &Arc<Record<E>>) {
323        match E::acquire() {
324            Op::Immutable(f) => f(&self.eviction, record),
325            _ => unreachable!(),
326        }
327    }
328
329    #[cfg_attr(
330        feature = "tracing",
331        fastrace::trace(name = "foyer::memory::raw::shard::acquire_mutable")
332    )]
333    fn acquire_mutable(&mut self, record: &Arc<Record<E>>) {
334        match E::acquire() {
335            Op::Mutable(mut f) => f(&mut self.eviction, record),
336            _ => unreachable!(),
337        }
338    }
339
340    #[cfg_attr(
341        feature = "tracing",
342        fastrace::trace(name = "foyer::memory::raw::shard::release_immutable")
343    )]
344    fn release_immutable(&self, record: &Arc<Record<E>>) {
345        match E::release() {
346            Op::Immutable(f) => f(&self.eviction, record),
347            _ => unreachable!(),
348        }
349    }
350
351    #[cfg_attr(
352        feature = "tracing",
353        fastrace::trace(name = "foyer::memory::raw::shard::release_mutable")
354    )]
355    fn release_mutable(&mut self, record: &Arc<Record<E>>) {
356        match E::release() {
357            Op::Mutable(mut f) => f(&mut self.eviction, record),
358            _ => unreachable!(),
359        }
360    }
361}
362
363struct RawCacheInner<E, S, I>
364where
365    E: Eviction,
366    S: HashBuilder,
367    I: Indexer<Eviction = E>,
368{
369    shards: Vec<RwLock<RawCacheShard<E, S, I>>>,
370
371    capacity: usize,
372
373    hash_builder: Arc<S>,
374    weighter: Arc<dyn Weighter<E::Key, E::Value>>,
375    filter: Arc<dyn Filter<E::Key, E::Value>>,
376
377    metrics: Arc<Metrics>,
378    event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
379}
380
381impl<E, S, I> RawCacheInner<E, S, I>
382where
383    E: Eviction,
384    S: HashBuilder,
385    I: Indexer<Eviction = E>,
386{
387    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::inner::clear"))]
388    fn clear(&self) {
389        let mut garbages = vec![];
390
391        self.shards
392            .iter()
393            .map(|shard| shard.write())
394            .for_each(|mut shard| shard.clear(&mut garbages));
395
396        // Do not deallocate data within the lock section.
397        if let Some(listener) = self.event_listener.as_ref() {
398            for record in garbages {
399                listener.on_leave(Event::Clear, record.key(), record.value());
400            }
401        }
402    }
403}
404
405pub struct RawCache<E, S, I>
406where
407    E: Eviction,
408    S: HashBuilder,
409    I: Indexer<Eviction = E>,
410{
411    pipe: ArcPipe<E::Key, E::Value, E::Properties>,
412    inner: Arc<RawCacheInner<E, S, I>>,
413}
414
415impl<E, S, I> Clone for RawCache<E, S, I>
416where
417    E: Eviction,
418    S: HashBuilder,
419    I: Indexer<Eviction = E>,
420{
421    fn clone(&self) -> Self {
422        Self {
423            pipe: self.pipe.clone(),
424            inner: self.inner.clone(),
425        }
426    }
427}
428
429impl<E, S, I> Drop for RawCacheInner<E, S, I>
430where
431    E: Eviction,
432    S: HashBuilder,
433    I: Indexer<Eviction = E>,
434{
435    fn drop(&mut self) {
436        self.clear();
437    }
438}
439
440impl<E, S, I> RawCache<E, S, I>
441where
442    E: Eviction,
443    S: HashBuilder,
444    I: Indexer<Eviction = E>,
445{
446    pub fn new(config: RawCacheConfig<E, S>) -> Self {
447        assert!(config.shards > 0, "shards must be greater than zero.");
448
449        let shard_capacities = (0..config.shards)
450            .map(|index| Self::shard_capacity_for(config.capacity, config.shards, index))
451            .collect_vec();
452
453        let shards = shard_capacities
454            .into_iter()
455            .map(|shard_capacity| RawCacheShard {
456                eviction: E::new(shard_capacity, &config.eviction_config),
457                indexer: Sentry::default(),
458                usage: 0,
459                entries: 0,
460                capacity: shard_capacity,
461                inflights: Arc::new(Mutex::new(InflightManager::new())),
462                metrics: config.metrics.clone(),
463                _event_listener: config.event_listener.clone(),
464            })
465            .map(RwLock::new)
466            .collect_vec();
467
468        Self {
469            pipe: Arc::new(NoopPipe::default()),
470            inner: Arc::new(RawCacheInner {
471                shards,
472                capacity: config.capacity,
473                hash_builder: Arc::new(config.hash_builder),
474                weighter: config.weighter,
475                filter: config.filter,
476                metrics: config.metrics,
477                event_listener: config.event_listener,
478            }),
479        }
480    }
481
482    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::resize"))]
483    pub fn resize(&self, capacity: usize) -> Result<()> {
484        let shards = self.inner.shards.len();
485        assert!(shards > 0, "shards must be greater than zero.");
486
487        let shard_capacities = (0..shards)
488            .map(|index| Self::shard_capacity_for(capacity, shards, index))
489            .collect_vec();
490
491        let handles = shard_capacities
492            .into_iter()
493            .enumerate()
494            .map(|(i, shard_capacity)| {
495                let pipe = self.pipe.clone();
496                let inner = self.inner.clone();
497                std::thread::spawn(move || {
498                    let mut garbages = vec![];
499                    let res = inner.shards[i].write().with(|mut shard| {
500                        shard.eviction.update(shard_capacity, None).inspect(|_| {
501                            shard.capacity = shard_capacity;
502                            shard.evict(shard_capacity, &mut garbages)
503                        })
504                    });
505                    // Deallocate data out of the lock critical section.
506                    let piped = pipe.is_enabled();
507                    if inner.event_listener.is_some() || piped {
508                        for (event, record) in garbages {
509                            if let Some(listener) = inner.event_listener.as_ref() {
510                                listener.on_leave(event, record.key(), record.value())
511                            }
512                            if piped && event == Event::Evict {
513                                pipe.send(Piece::new(record));
514                            }
515                        }
516                    }
517                    res
518                })
519            })
520            .collect_vec();
521
522        let errs = handles
523            .into_iter()
524            .map(|handle| handle.join().unwrap())
525            .filter(|res| res.is_err())
526            .map(|res| res.unwrap_err())
527            .collect_vec();
528        if !errs.is_empty() {
529            let mut e = Error::new(ErrorKind::Config, "resize raw cache failed");
530            for err in errs {
531                e = e.with_context("reason", format!("{err}"));
532            }
533            return Err(e);
534        }
535
536        Ok(())
537    }
538
539    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert"))]
540    pub fn insert(&self, key: E::Key, value: E::Value) -> RawCacheEntry<E, S, I> {
541        self.insert_with_properties(key, value, Default::default())
542    }
543
544    #[cfg_attr(
545        feature = "tracing",
546        fastrace::trace(name = "foyer::memory::raw::insert_with_properties")
547    )]
548    pub fn insert_with_properties(
549        &self,
550        key: E::Key,
551        value: E::Value,
552        properties: E::Properties,
553    ) -> RawCacheEntry<E, S, I> {
554        self.insert_with_properties_inner(key, value, properties, Source::Outer)
555    }
556
557    fn insert_with_properties_inner(
558        &self,
559        key: E::Key,
560        value: E::Value,
561        mut properties: E::Properties,
562        source: Source,
563    ) -> RawCacheEntry<E, S, I> {
564        let hash = self.inner.hash_builder.hash_one(&key);
565        let weight = (self.inner.weighter)(&key, &value);
566        if !(self.inner.filter)(&key, &value) {
567            properties = properties.with_phantom(true);
568        }
569        if let Some(location) = properties.location() {
570            if location == Location::OnDisk {
571                properties = properties.with_phantom(true);
572            }
573        }
574        let record = Arc::new(Record::new(Data {
575            key,
576            value,
577            properties,
578            hash,
579            weight,
580        }));
581        self.insert_inner(record, source)
582    }
583
584    #[doc(hidden)]
585    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_piece"))]
586    pub fn insert_piece(&self, piece: Piece<E::Key, E::Value, E::Properties>) -> RawCacheEntry<E, S, I> {
587        self.insert_inner(piece.into_record(), Source::Memory)
588    }
589
590    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_inner"))]
591    fn insert_inner(&self, record: Arc<Record<E>>, source: Source) -> RawCacheEntry<E, S, I> {
592        let mut garbages = vec![];
593        let mut notifiers = vec![];
594
595        self.inner.shards[self.shard(record.hash())]
596            .write()
597            .with(|mut shard| shard.emplace(record.clone(), &mut garbages, &mut notifiers));
598
599        // Notify waiters out of the lock critical section.
600        for notifier in notifiers {
601            let _ = notifier.send(Ok(Some(RawCacheEntry {
602                pipe: self.pipe.clone(),
603                record: record.clone(),
604                inner: self.inner.clone(),
605                source,
606            })));
607        }
608
609        // Deallocate data out of the lock critical section.
610        let piped = self.pipe.is_enabled();
611        if self.inner.event_listener.is_some() || piped {
612            for (event, record) in garbages {
613                if let Some(listener) = self.inner.event_listener.as_ref() {
614                    listener.on_leave(event, record.key(), record.value())
615                }
616                if piped && event == Event::Evict {
617                    self.pipe.send(Piece::new(record));
618                }
619            }
620        }
621
622        RawCacheEntry {
623            record,
624            pipe: self.pipe.clone(),
625            inner: self.inner.clone(),
626            source,
627        }
628    }
629
630    /// Evict all entries in the cache and offload them into the disk cache via the pipe if needed.
631    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::evict_all"))]
632    pub fn evict_all(&self) {
633        let mut garbages = vec![];
634        for shard in self.inner.shards.iter() {
635            shard.write().evict(0, &mut garbages);
636        }
637
638        // Deallocate data out of the lock critical section.
639        let piped = self.pipe.is_enabled();
640        if self.inner.event_listener.is_some() || piped {
641            for (event, record) in garbages {
642                if let Some(listener) = self.inner.event_listener.as_ref() {
643                    listener.on_leave(event, record.key(), record.value())
644                }
645                if piped && event == Event::Evict {
646                    self.pipe.send(Piece::new(record));
647                }
648            }
649        }
650    }
651
652    /// Evict all entries in the cache and offload them into the disk cache via the pipe if needed.
653    ///
654    /// This function obeys the io throttler of the disk cache and make sure all entries will be offloaded.
655    /// Therefore, this function is asynchronous.
656    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::flush"))]
657    pub async fn flush(&self) {
658        let mut garbages = vec![];
659        for shard in self.inner.shards.iter() {
660            shard.write().evict(0, &mut garbages);
661        }
662
663        // Deallocate data out of the lock critical section.
664        let piped = self.pipe.is_enabled();
665
666        if let Some(listener) = self.inner.event_listener.as_ref() {
667            for (event, record) in garbages.iter() {
668                listener.on_leave(*event, record.key(), record.value());
669            }
670        }
671        if piped {
672            let pieces = garbages.into_iter().map(|(_, record)| Piece::new(record)).collect_vec();
673            self.pipe.flush(pieces).await;
674        }
675    }
676
677    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::remove"))]
678    pub fn remove<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
679    where
680        Q: Hash + Equivalent<E::Key> + ?Sized,
681    {
682        let hash = self.inner.hash_builder.hash_one(key);
683
684        self.inner.shards[self.shard(hash)]
685            .write()
686            .with(|mut shard| {
687                shard.remove(hash, key).map(|record| RawCacheEntry {
688                    pipe: self.pipe.clone(),
689                    inner: self.inner.clone(),
690                    record,
691                    source: Source::Memory,
692                })
693            })
694            .inspect(|record| {
695                // Deallocate data out of the lock critical section.
696                if let Some(listener) = self.inner.event_listener.as_ref() {
697                    listener.on_leave(Event::Remove, record.key(), record.value());
698                }
699            })
700    }
701
702    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get"))]
703    pub fn get<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
704    where
705        Q: Hash + Equivalent<E::Key> + ?Sized,
706    {
707        let hash = self.inner.hash_builder.hash_one(key);
708
709        let record = match E::acquire() {
710            Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
711            Op::Immutable(_) => self.inner.shards[self.shard(hash)]
712                .read()
713                .with(|shard| shard.get_immutable(hash, key)),
714            Op::Mutable(_) => self.inner.shards[self.shard(hash)]
715                .write()
716                .with(|mut shard| shard.get_mutable(hash, key)),
717        }?;
718
719        Some(RawCacheEntry {
720            pipe: self.pipe.clone(),
721            inner: self.inner.clone(),
722            record,
723            source: Source::Memory,
724        })
725    }
726
727    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::contains"))]
728    pub fn contains<Q>(&self, key: &Q) -> bool
729    where
730        Q: Hash + Equivalent<E::Key> + ?Sized,
731    {
732        let hash = self.inner.hash_builder.hash_one(key);
733
734        self.inner.shards[self.shard(hash)]
735            .read()
736            .with(|shard| shard.indexer.get(hash, key).is_some())
737    }
738
739    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::touch"))]
740    pub fn touch<Q>(&self, key: &Q) -> bool
741    where
742        Q: Hash + Equivalent<E::Key> + ?Sized,
743    {
744        let hash = self.inner.hash_builder.hash_one(key);
745
746        match E::acquire() {
747            Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
748            Op::Immutable(_) => self.inner.shards[self.shard(hash)]
749                .read()
750                .with(|shard| shard.get_immutable(hash, key)),
751            Op::Mutable(_) => self.inner.shards[self.shard(hash)]
752                .write()
753                .with(|mut shard| shard.get_mutable(hash, key)),
754        }
755        .is_some()
756    }
757
758    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::clear"))]
759    pub fn clear(&self) {
760        self.inner.clear();
761    }
762
763    pub fn capacity(&self) -> usize {
764        self.inner.capacity
765    }
766
767    pub fn usage(&self) -> usize {
768        self.inner.shards.iter().map(|shard| shard.read().usage).sum()
769    }
770
771    pub fn entries(&self) -> usize {
772        self.inner.shards.iter().map(|shard| shard.read().entries).sum()
773    }
774
775    pub fn metrics(&self) -> &Metrics {
776        &self.inner.metrics
777    }
778
779    pub fn hash_builder(&self) -> &Arc<S> {
780        &self.inner.hash_builder
781    }
782
783    pub fn shards(&self) -> usize {
784        self.inner.shards.len()
785    }
786
787    pub(crate) fn with_pipe(mut self, pipe: ArcPipe<E::Key, E::Value, E::Properties>) -> Self {
788        self.pipe = pipe;
789        self
790    }
791
792    fn shard(&self, hash: u64) -> usize {
793        hash as usize % self.inner.shards.len()
794    }
795
796    fn shard_capacity_for(total: usize, shards: usize, index: usize) -> usize {
797        let base = total / shards;
798        let remainder = total % shards;
799        base + usize::from(index < remainder)
800    }
801}
802
803pub struct RawCacheEntry<E, S, I>
804where
805    E: Eviction,
806    S: HashBuilder,
807    I: Indexer<Eviction = E>,
808{
809    pipe: ArcPipe<E::Key, E::Value, E::Properties>,
810    inner: Arc<RawCacheInner<E, S, I>>,
811    record: Arc<Record<E>>,
812    source: Source,
813}
814
815impl<E, S, I> Debug for RawCacheEntry<E, S, I>
816where
817    E: Eviction,
818    S: HashBuilder,
819    I: Indexer<Eviction = E>,
820{
821    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
822        f.debug_struct("RawCacheEntry").field("record", &self.record).finish()
823    }
824}
825
826impl<E, S, I> Drop for RawCacheEntry<E, S, I>
827where
828    E: Eviction,
829    S: HashBuilder,
830    I: Indexer<Eviction = E>,
831{
832    fn drop(&mut self) {
833        let hash = self.record.hash();
834        let shard = &self.inner.shards[hash as usize % self.inner.shards.len()];
835
836        if self.record.dec_refs(1) == 0 {
837            if self.record.properties().phantom().unwrap_or_default() {
838                if let Some(listener) = self.inner.event_listener.as_ref() {
839                    listener.on_leave(Event::Evict, self.record.key(), self.record.value());
840                }
841                if self.pipe.is_enabled() {
842                    self.pipe.send(Piece::new(self.record.clone()));
843                }
844                return;
845            }
846
847            match E::release() {
848                Op::Noop => {}
849                Op::Immutable(_) => shard.read().with(|shard| shard.release_immutable(&self.record)),
850                Op::Mutable(_) => shard.write().with(|mut shard| shard.release_mutable(&self.record)),
851            }
852        }
853    }
854}
855
856impl<E, S, I> Clone for RawCacheEntry<E, S, I>
857where
858    E: Eviction,
859    S: HashBuilder,
860    I: Indexer<Eviction = E>,
861{
862    fn clone(&self) -> Self {
863        self.record.inc_refs(1);
864        Self {
865            pipe: self.pipe.clone(),
866            inner: self.inner.clone(),
867            record: self.record.clone(),
868            source: self.source,
869        }
870    }
871}
872
873impl<E, S, I> Deref for RawCacheEntry<E, S, I>
874where
875    E: Eviction,
876    S: HashBuilder,
877    I: Indexer<Eviction = E>,
878{
879    type Target = E::Value;
880
881    fn deref(&self) -> &Self::Target {
882        self.value()
883    }
884}
885
886unsafe impl<E, S, I> Send for RawCacheEntry<E, S, I>
887where
888    E: Eviction,
889    S: HashBuilder,
890    I: Indexer<Eviction = E>,
891{
892}
893
894unsafe impl<E, S, I> Sync for RawCacheEntry<E, S, I>
895where
896    E: Eviction,
897    S: HashBuilder,
898    I: Indexer<Eviction = E>,
899{
900}
901
902impl<E, S, I> RawCacheEntry<E, S, I>
903where
904    E: Eviction,
905    S: HashBuilder,
906    I: Indexer<Eviction = E>,
907{
908    pub fn hash(&self) -> u64 {
909        self.record.hash()
910    }
911
912    pub fn key(&self) -> &E::Key {
913        self.record.key()
914    }
915
916    pub fn value(&self) -> &E::Value {
917        self.record.value()
918    }
919
920    pub fn properties(&self) -> &E::Properties {
921        self.record.properties()
922    }
923
924    pub fn weight(&self) -> usize {
925        self.record.weight()
926    }
927
928    pub fn refs(&self) -> usize {
929        self.record.refs()
930    }
931
932    pub fn is_outdated(&self) -> bool {
933        !self.record.is_in_indexer()
934    }
935
936    pub fn piece(&self) -> Piece<E::Key, E::Value, E::Properties> {
937        Piece::new(self.record.clone())
938    }
939
940    pub fn source(&self) -> Source {
941        self.source
942    }
943}
944
945impl<E, S, I> RawCache<E, S, I>
946where
947    E: Eviction,
948    S: HashBuilder,
949    I: Indexer<Eviction = E>,
950{
951    #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get_or_fetch"))]
952    pub fn get_or_fetch<Q, F, FU, IT, ER>(&self, key: &Q, fetch: F) -> RawGetOrFetch<E, S, I>
953    where
954        Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
955        F: FnOnce() -> FU,
956        FU: Future<Output = std::result::Result<IT, ER>> + Send + 'static,
957        IT: Into<FetchTarget<E::Key, E::Value, E::Properties>>,
958        ER: Into<anyhow::Error>,
959    {
960        let fut = fetch();
961        self.get_or_fetch_inner(
962            key,
963            || None,
964            || {
965                Some(Box::new(|_| {
966                    async {
967                        match fut.await {
968                            Ok(it) => Ok(it.into()),
969                            Err(e) => Err(Error::new(ErrorKind::External, "fetch failed").with_source(e)),
970                        }
971                    }
972                    .boxed()
973                }))
974            },
975            (),
976            &Spawner::current(),
977        )
978    }
979
980    /// Advanced fetch with specified runtime.
981    ///
982    /// This function is for internal usage and the doc is hidden.
983    #[doc(hidden)]
984    #[cfg_attr(
985        feature = "tracing",
986        fastrace::trace(name = "foyer::memory::raw::get_or_fetch_inner")
987    )]
988    pub fn get_or_fetch_inner<Q, C, FO, FR>(
989        &self,
990        key: &Q,
991        fo: FO,
992        fr: FR,
993        ctx: C,
994        spawner: &Spawner,
995    ) -> RawGetOrFetch<E, S, I>
996    where
997        Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
998        C: Any + Send + Sync + 'static,
999        FO: FnOnce() -> Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1000        FR: FnOnce() -> Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1001    {
1002        let hash = self.inner.hash_builder.hash_one(key);
1003
1004        // Make sure cache query and inflight query are in the same lock critical section.
1005        let extract = |key: &Q, opt: Option<Arc<Record<E>>>, inflights: &Arc<Mutex<InflightManager<E, S, I>>>| {
1006            opt.map(|record| {
1007                RawGetOrFetch::Hit(Some(RawCacheEntry {
1008                    pipe: self.pipe.clone(),
1009                    inner: self.inner.clone(),
1010                    record,
1011                    source: Source::Memory,
1012                }))
1013            })
1014            .unwrap_or_else(|| match inflights.lock().enqueue(hash, key, fr()) {
1015                Enqueue::Lead {
1016                    id,
1017                    close,
1018                    waiter,
1019                    required_fetch_builder,
1020                } => {
1021                    let fetch = RawFetch {
1022                        state: RawFetchState::Init {
1023                            optional_fetch_builder: fo(),
1024                            required_fetch_builder,
1025                        },
1026                        id,
1027                        hash,
1028                        key: Some(key.to_owned()),
1029                        ctx,
1030                        cache: self.clone(),
1031                        inflights: inflights.clone(),
1032                        close,
1033                    };
1034                    spawner.spawn(fetch);
1035                    RawGetOrFetch::Miss(RawWait { waiter })
1036                }
1037                Enqueue::Wait(waiter) => RawGetOrFetch::Miss(RawWait { waiter }),
1038            })
1039        };
1040
1041        let res = match E::acquire() {
1042            Op::Noop => self.inner.shards[self.shard(hash)]
1043                .read()
1044                .with(|shard| extract(key, shard.get_noop(hash, key), &shard.inflights)),
1045            Op::Immutable(_) => self.inner.shards[self.shard(hash)]
1046                .read()
1047                .with(|shard| extract(key, shard.get_immutable(hash, key), &shard.inflights)),
1048            Op::Mutable(_) => self.inner.shards[self.shard(hash)]
1049                .write()
1050                .with(|mut shard| extract(key, shard.get_mutable(hash, key), &shard.inflights)),
1051        };
1052
1053        res
1054    }
1055}
1056
1057#[must_use]
1058#[pin_project(project = RawGetOrFetchProj)]
1059pub enum RawGetOrFetch<E, S, I>
1060where
1061    E: Eviction,
1062    S: HashBuilder,
1063    I: Indexer<Eviction = E>,
1064{
1065    Hit(Option<RawCacheEntry<E, S, I>>),
1066    Miss(#[pin] RawWait<E, S, I>),
1067}
1068
1069impl<E, S, I> Debug for RawGetOrFetch<E, S, I>
1070where
1071    E: Eviction,
1072    S: HashBuilder,
1073    I: Indexer<Eviction = E>,
1074{
1075    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1076        match self {
1077            Self::Hit(e) => f.debug_tuple("Hit").field(e).finish(),
1078            Self::Miss(fut) => f.debug_tuple("Miss").field(fut).finish(),
1079        }
1080    }
1081}
1082
1083impl<E, S, I> Future for RawGetOrFetch<E, S, I>
1084where
1085    E: Eviction,
1086    S: HashBuilder,
1087    I: Indexer<Eviction = E>,
1088{
1089    type Output = Result<Option<RawCacheEntry<E, S, I>>>;
1090
1091    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1092        let this = self.project();
1093        match this {
1094            RawGetOrFetchProj::Hit(opt) => {
1095                assert!(opt.is_some(), "entry is already taken");
1096                Poll::Ready(Ok(opt.take()))
1097            }
1098            RawGetOrFetchProj::Miss(fut) => fut.poll(cx),
1099        }
1100    }
1101}
1102
1103impl<E, S, I> RawGetOrFetch<E, S, I>
1104where
1105    E: Eviction,
1106    S: HashBuilder,
1107    I: Indexer<Eviction = E>,
1108{
1109    pub fn need_await(&self) -> bool {
1110        matches!(self, Self::Miss(_))
1111    }
1112
1113    #[expect(clippy::allow_attributes)]
1114    #[allow(clippy::result_large_err)]
1115    pub fn try_unwrap(self) -> std::result::Result<Option<RawCacheEntry<E, S, I>>, Self> {
1116        match self {
1117            Self::Hit(opt) => {
1118                assert!(opt.is_some(), "entry is already taken");
1119                Ok(opt)
1120            }
1121            Self::Miss(_) => Err(self),
1122        }
1123    }
1124}
1125
1126type Once<T> = Option<T>;
1127
1128#[must_use]
1129enum Try<E, S, I, C>
1130where
1131    E: Eviction,
1132    S: HashBuilder,
1133    I: Indexer<Eviction = E>,
1134    C: Any + Send + 'static,
1135{
1136    Noop,
1137    SetStateAndContinue(RawFetchState<E, S, I, C>),
1138    Ready,
1139}
1140
1141macro_rules! handle_try {
1142    ($state:expr, $method:ident($($args:expr),* $(,)?)) => {
1143        handle_try! { $state, Self::$method($($args),*) }
1144    };
1145
1146    ($state:expr, $try:expr) => {
1147        match $try {
1148            Try::Noop => {}
1149            Try::SetStateAndContinue(state) => {
1150                $state = state;
1151                continue;
1152            },
1153            Try::Ready => {
1154                $state = RawFetchState::Ready;
1155                return Poll::Ready(())
1156            },
1157        }
1158    };
1159}
1160
1161#[expect(clippy::type_complexity)]
1162pub enum RawFetchState<E, S, I, C>
1163where
1164    E: Eviction,
1165    S: HashBuilder,
1166    I: Indexer<Eviction = E>,
1167{
1168    Init {
1169        optional_fetch_builder: Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1170        required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1171    },
1172    FetchOptional {
1173        optional_fetch: OptionalFetch<FetchTarget<E::Key, E::Value, E::Properties>>,
1174        required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1175    },
1176    FetchRequired {
1177        required_fetch: RequiredFetch<FetchTarget<E::Key, E::Value, E::Properties>>,
1178    },
1179    Notify {
1180        res: Option<Result<Option<RawCacheEntry<E, S, I>>>>,
1181        notifiers: Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
1182    },
1183    Ready,
1184}
1185
1186impl<E, S, I, C> Debug for RawFetchState<E, S, I, C>
1187where
1188    E: Eviction,
1189    S: HashBuilder,
1190    I: Indexer<Eviction = E>,
1191{
1192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1193        match self {
1194            Self::Init { .. } => f.debug_struct("Init").finish(),
1195            Self::FetchOptional { .. } => f.debug_struct("Optional").finish(),
1196            Self::FetchRequired { .. } => f.debug_struct("Required").finish(),
1197            Self::Notify { res, .. } => f.debug_struct("Notify").field("res", res).finish(),
1198            Self::Ready => f.debug_struct("Ready").finish(),
1199        }
1200    }
1201}
1202
1203#[pin_project]
1204pub struct RawWait<E, S, I>
1205where
1206    E: Eviction,
1207    S: HashBuilder,
1208    I: Indexer<Eviction = E>,
1209{
1210    waiter: Waiter<Option<RawCacheEntry<E, S, I>>>,
1211}
1212
1213impl<E, S, I> Debug for RawWait<E, S, I>
1214where
1215    E: Eviction,
1216    S: HashBuilder,
1217    I: Indexer<Eviction = E>,
1218{
1219    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1220        f.debug_struct("RawWait").field("waiter", &self.waiter).finish()
1221    }
1222}
1223
1224impl<E, S, I> Future for RawWait<E, S, I>
1225where
1226    E: Eviction,
1227    S: HashBuilder,
1228    I: Indexer<Eviction = E>,
1229{
1230    type Output = Result<Option<RawCacheEntry<E, S, I>>>;
1231
1232    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1233        let this = self.project();
1234        // TODO(MrCroxx): Switch to `Result::flatten` after MSRV is 1.89+
1235        // return waiter.poll_unpin(cx).map(|r| r.map_err(|e| e.into()).flatten());
1236        this.waiter.poll_unpin(cx).map(|r| match r {
1237            Ok(r) => r,
1238            Err(e) => Err(Error::new(ErrorKind::ChannelClosed, "waiter channel closed").with_source(e)),
1239        })
1240    }
1241}
1242
1243#[pin_project(PinnedDrop)]
1244pub struct RawFetch<E, S, I, C>
1245where
1246    E: Eviction,
1247    S: HashBuilder,
1248    I: Indexer<Eviction = E>,
1249{
1250    state: RawFetchState<E, S, I, C>,
1251    id: usize,
1252    hash: u64,
1253    key: Once<E::Key>,
1254    ctx: C,
1255    cache: RawCache<E, S, I>,
1256    inflights: Arc<Mutex<InflightManager<E, S, I>>>,
1257    close: Arc<AtomicBool>,
1258}
1259
1260impl<E, S, I, C> Debug for RawFetch<E, S, I, C>
1261where
1262    E: Eviction,
1263    S: HashBuilder,
1264    I: Indexer<Eviction = E>,
1265{
1266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1267        f.debug_struct("RawFetch")
1268            .field("state", &self.state)
1269            .field("id", &self.id)
1270            .field("hash", &self.hash)
1271            .finish()
1272    }
1273}
1274
1275impl<E, S, I, C> Future for RawFetch<E, S, I, C>
1276where
1277    E: Eviction,
1278    S: HashBuilder,
1279    I: Indexer<Eviction = E>,
1280    C: Any + Send + 'static,
1281{
1282    type Output = ();
1283
1284    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1285        let this = self.as_mut().project();
1286        loop {
1287            match this.state {
1288                RawFetchState::Init {
1289                    optional_fetch_builder,
1290                    required_fetch_builder,
1291                } => {
1292                    handle_try! { *this.state, try_set_optional(optional_fetch_builder, required_fetch_builder, this.ctx) }
1293                    handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), this.inflights, Ok(None)) }
1294                }
1295                RawFetchState::FetchOptional {
1296                    optional_fetch,
1297                    required_fetch_builder,
1298                } => {
1299                    if this.close.load(Ordering::Relaxed) {
1300                        return Poll::Ready(());
1301                    }
1302                    match optional_fetch.poll_unpin(cx) {
1303                        Poll::Pending => return Poll::Pending,
1304                        Poll::Ready(Ok(Some(target))) => {
1305                            handle_try! {*this.state, handle_target(target, this.key, this.cache, Source::Disk) }
1306                        }
1307                        Poll::Ready(Ok(None)) => {
1308                            handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), &this.inflights, Ok(None)) }
1309                        }
1310                        Poll::Ready(Err(e)) => {
1311                            handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), &this.inflights, Err(e)) }
1312                        }
1313                    }
1314                }
1315                RawFetchState::FetchRequired { required_fetch } => {
1316                    if this.close.load(Ordering::Relaxed) {
1317                        return Poll::Ready(());
1318                    }
1319                    match required_fetch.poll_unpin(cx) {
1320                        Poll::Pending => return Poll::Pending,
1321                        Poll::Ready(Ok(target)) => {
1322                            handle_try! { *this.state, handle_target(target, this.key, this.cache, Source::Outer) }
1323                        }
1324                        Poll::Ready(Err(e)) => {
1325                            handle_try! { *this.state, handle_error(e, *this.id, *this.hash, this.key.as_ref().unwrap(), this.inflights) }
1326                        }
1327                    }
1328                }
1329                RawFetchState::Notify { res, notifiers } => {
1330                    handle_try! { *this.state, handle_notify(res.take().unwrap(), notifiers) }
1331                }
1332                RawFetchState::Ready => return Poll::Ready(()),
1333            }
1334        }
1335    }
1336}
1337
1338impl<E, S, I, C> RawFetch<E, S, I, C>
1339where
1340    E: Eviction,
1341    S: HashBuilder,
1342    I: Indexer<Eviction = E>,
1343    C: Any + Send + 'static,
1344{
1345    #[expect(clippy::type_complexity)]
1346    fn try_set_optional(
1347        optional_fetch_builder: &mut Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1348        required_fetch_builder: &mut Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1349        ctx: &mut C,
1350    ) -> Try<E, S, I, C> {
1351        match optional_fetch_builder.take() {
1352            None => Try::Noop,
1353            Some(optional_fetch_builder) => {
1354                let optional_fetch = optional_fetch_builder(ctx);
1355                Try::SetStateAndContinue(RawFetchState::FetchOptional {
1356                    optional_fetch,
1357                    required_fetch_builder: required_fetch_builder.take(),
1358                })
1359            }
1360        }
1361    }
1362
1363    #[expect(clippy::type_complexity)]
1364    fn try_set_required(
1365        required_fetch_builder: &mut Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1366        ctx: &mut C,
1367        id: usize,
1368        hash: u64,
1369        key: &E::Key,
1370        inflights: &Arc<Mutex<InflightManager<E, S, I>>>,
1371        res_no_fetch: Result<Option<RawCacheEntry<E, S, I>>>,
1372    ) -> Try<E, S, I, C> {
1373        // Fast path if the required fetch builder is provided.
1374        match required_fetch_builder.take() {
1375            None => {}
1376            Some(required_fetch_builder) => {
1377                let required_fetch = required_fetch_builder(ctx);
1378                return Try::SetStateAndContinue(RawFetchState::FetchRequired { required_fetch });
1379            }
1380        }
1381        // Slow path if the leader has no optional fetch.
1382        let fetch_or_take = match inflights.lock().fetch_or_take(hash, key, id) {
1383            Some(fetch_or_take) => fetch_or_take,
1384            None => return Try::Ready,
1385        };
1386        match fetch_or_take {
1387            FetchOrTake::Fetch(required_fetch_builder) => {
1388                let required_fetch = required_fetch_builder(ctx);
1389                Try::SetStateAndContinue(RawFetchState::FetchRequired { required_fetch })
1390            }
1391            FetchOrTake::Notifiers(notifiers) => Try::SetStateAndContinue(RawFetchState::Notify {
1392                res: Some(res_no_fetch),
1393                notifiers,
1394            }),
1395        }
1396    }
1397
1398    fn handle_target(
1399        target: FetchTarget<E::Key, E::Value, E::Properties>,
1400        key: &mut Once<E::Key>,
1401        cache: &RawCache<E, S, I>,
1402        source: Source,
1403    ) -> Try<E, S, I, C> {
1404        match target {
1405            FetchTarget::Entry { value, properties } => {
1406                let key = key.take().unwrap();
1407                cache.insert_with_properties_inner(key, value, properties, source);
1408            }
1409            FetchTarget::Piece(piece) => {
1410                cache.insert_piece(piece);
1411            }
1412        }
1413        Try::Ready
1414    }
1415
1416    fn handle_error(
1417        e: Error,
1418        id: usize,
1419        hash: u64,
1420        key: &E::Key,
1421        inflights: &Arc<Mutex<InflightManager<E, S, I>>>,
1422    ) -> Try<E, S, I, C> {
1423        let notifiers = match inflights.lock().take(hash, key, Some(id)) {
1424            Some(notifiers) => notifiers,
1425            None => {
1426                return Try::Ready;
1427            }
1428        };
1429        Try::SetStateAndContinue(RawFetchState::Notify {
1430            res: Some(Err(e)),
1431            notifiers,
1432        })
1433    }
1434
1435    #[expect(clippy::type_complexity)]
1436    fn handle_notify(
1437        res: Result<Option<RawCacheEntry<E, S, I>>>,
1438        notifiers: &mut Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
1439    ) -> Try<E, S, I, C> {
1440        match res {
1441            Ok(e) => {
1442                for notifier in notifiers.drain(..) {
1443                    let _ = notifier.send(Ok(e.clone()));
1444                }
1445            }
1446            Err(e) => {
1447                for notifier in notifiers.drain(..) {
1448                    let _ = notifier.send(Err(e.clone()));
1449                }
1450            }
1451        }
1452        Try::Ready
1453    }
1454}
1455
1456#[pinned_drop]
1457impl<E, S, I, C> PinnedDrop for RawFetch<E, S, I, C>
1458where
1459    E: Eviction,
1460    S: HashBuilder,
1461    I: Indexer<Eviction = E>,
1462{
1463    fn drop(self: Pin<&mut Self>) {
1464        let this = self.project();
1465        match this.state {
1466            RawFetchState::Notify { .. } | RawFetchState::Ready => return,
1467            RawFetchState::Init { .. } | RawFetchState::FetchOptional { .. } | RawFetchState::FetchRequired { .. } => {}
1468        }
1469        if let Some(notifiers) = this
1470            .inflights
1471            .lock()
1472            .take(*this.hash, this.key.as_ref().unwrap(), Some(*this.id))
1473        {
1474            for notifier in notifiers {
1475                let _ =
1476                    notifier
1477                        .send(Err(Error::new(ErrorKind::TaskCancelled, "fetch task cancelled")
1478                            .with_context("hash", *this.hash)));
1479            }
1480        }
1481    }
1482}
1483
1484#[cfg(test)]
1485mod tests {
1486    use foyer_common::hasher::ModHasher;
1487    use rand::{rngs::SmallRng, seq::IndexedRandom, RngCore, SeedableRng};
1488
1489    use super::*;
1490    use crate::{
1491        eviction::{
1492            fifo::{Fifo, FifoConfig},
1493            lfu::{Lfu, LfuConfig},
1494            lru::{Lru, LruConfig},
1495            s3fifo::{S3Fifo, S3FifoConfig},
1496            sieve::{Sieve, SieveConfig},
1497            test_utils::TestProperties,
1498        },
1499        indexer::hash_table::HashTableIndexer,
1500        test_utils::PiecePipe,
1501    };
1502
1503    fn is_send_sync_static<T: Send + Sync + 'static>() {}
1504
1505    #[test]
1506    fn test_send_sync_static() {
1507        is_send_sync_static::<RawCache<Fifo<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1508        is_send_sync_static::<RawCache<S3Fifo<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1509        is_send_sync_static::<RawCache<Lfu<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1510        is_send_sync_static::<RawCache<Lru<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1511        is_send_sync_static::<RawCache<Sieve<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1512    }
1513
1514    #[expect(clippy::type_complexity)]
1515    fn fifo_cache_for_test(
1516    ) -> RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Fifo<u64, u64, TestProperties>>> {
1517        RawCache::new(RawCacheConfig {
1518            capacity: 256,
1519            shards: 4,
1520            eviction_config: FifoConfig::default(),
1521            hash_builder: Default::default(),
1522            weighter: Arc::new(|_, _| 1),
1523            filter: Arc::new(|_, _| true),
1524            event_listener: None,
1525            metrics: Arc::new(Metrics::noop()),
1526        })
1527    }
1528
1529    #[expect(clippy::type_complexity)]
1530    fn s3fifo_cache_for_test(
1531    ) -> RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<S3Fifo<u64, u64, TestProperties>>> {
1532        RawCache::new(RawCacheConfig {
1533            capacity: 256,
1534            shards: 4,
1535            eviction_config: S3FifoConfig::default(),
1536            hash_builder: Default::default(),
1537            weighter: Arc::new(|_, _| 1),
1538            filter: Arc::new(|_, _| true),
1539            event_listener: None,
1540            metrics: Arc::new(Metrics::noop()),
1541        })
1542    }
1543
1544    #[expect(clippy::type_complexity)]
1545    fn lru_cache_for_test(
1546    ) -> RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lru<u64, u64, TestProperties>>> {
1547        RawCache::new(RawCacheConfig {
1548            capacity: 256,
1549            shards: 4,
1550            eviction_config: LruConfig::default(),
1551            hash_builder: Default::default(),
1552            weighter: Arc::new(|_, _| 1),
1553            filter: Arc::new(|_, _| true),
1554            event_listener: None,
1555            metrics: Arc::new(Metrics::noop()),
1556        })
1557    }
1558
1559    #[expect(clippy::type_complexity)]
1560    fn lfu_cache_for_test(
1561    ) -> RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lfu<u64, u64, TestProperties>>> {
1562        RawCache::new(RawCacheConfig {
1563            capacity: 256,
1564            shards: 4,
1565            eviction_config: LfuConfig::default(),
1566            hash_builder: Default::default(),
1567            weighter: Arc::new(|_, _| 1),
1568            filter: Arc::new(|_, _| true),
1569            event_listener: None,
1570            metrics: Arc::new(Metrics::noop()),
1571        })
1572    }
1573
1574    #[expect(clippy::type_complexity)]
1575    fn sieve_cache_for_test(
1576    ) -> RawCache<Sieve<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Sieve<u64, u64, TestProperties>>> {
1577        RawCache::new(RawCacheConfig {
1578            capacity: 256,
1579            shards: 4,
1580            eviction_config: SieveConfig {},
1581            hash_builder: Default::default(),
1582            weighter: Arc::new(|_, _| 1),
1583            filter: Arc::new(|_, _| true),
1584            event_listener: None,
1585            metrics: Arc::new(Metrics::noop()),
1586        })
1587    }
1588
1589    #[test_log::test]
1590    fn test_insert_phantom() {
1591        let fifo = fifo_cache_for_test();
1592
1593        let e1 = fifo.insert_with_properties(1, 1, TestProperties::default().with_phantom(true));
1594        assert_eq!(fifo.usage(), 0);
1595        drop(e1);
1596        assert_eq!(fifo.usage(), 0);
1597
1598        let e2a = fifo.insert_with_properties(2, 2, TestProperties::default().with_phantom(true));
1599        assert_eq!(fifo.usage(), 0);
1600        assert!(fifo.get(&2).is_none());
1601        assert_eq!(fifo.usage(), 0);
1602        drop(e2a);
1603        assert_eq!(fifo.usage(), 0);
1604
1605        let fifo = fifo_cache_for_test();
1606        fifo.insert(1, 1);
1607        assert_eq!(fifo.usage(), 1);
1608        assert_eq!(fifo.get(&1).unwrap().value(), &1);
1609        let e2 = fifo.insert_with_properties(1, 100, TestProperties::default().with_phantom(true));
1610        assert_eq!(fifo.usage(), 0);
1611        drop(e2);
1612        assert_eq!(fifo.usage(), 0);
1613        assert!(fifo.get(&1).is_none());
1614    }
1615
1616    #[expect(clippy::type_complexity)]
1617    #[test_log::test]
1618    fn test_insert_filter() {
1619        let fifo: RawCache<
1620            Fifo<u64, u64, TestProperties>,
1621            ModHasher,
1622            HashTableIndexer<Fifo<u64, u64, TestProperties>>,
1623        > = RawCache::new(RawCacheConfig {
1624            capacity: 256,
1625            shards: 4,
1626            eviction_config: FifoConfig::default(),
1627            hash_builder: Default::default(),
1628            weighter: Arc::new(|_, _| 1),
1629            filter: Arc::new(|k, _| !matches!(*k, 42)),
1630            event_listener: None,
1631            metrics: Arc::new(Metrics::noop()),
1632        });
1633
1634        fifo.insert(1, 1);
1635        fifo.insert(2, 2);
1636        fifo.insert(42, 42);
1637        assert_eq!(fifo.usage(), 2);
1638        assert_eq!(fifo.get(&1).unwrap().value(), &1);
1639        assert_eq!(fifo.get(&2).unwrap().value(), &2);
1640        assert!(fifo.get(&42).is_none());
1641    }
1642
1643    #[test]
1644    fn test_evict_all() {
1645        let pipe = Arc::new(PiecePipe::default());
1646
1647        let fifo = fifo_cache_for_test().with_pipe(pipe.clone());
1648        for i in 0..fifo.capacity() as _ {
1649            fifo.insert(i, i);
1650        }
1651        assert_eq!(fifo.usage(), fifo.capacity());
1652
1653        fifo.evict_all();
1654        let mut pieces = pipe
1655            .pieces()
1656            .iter()
1657            .map(|p| (p.hash(), *p.key(), *p.value()))
1658            .collect_vec();
1659        pieces.sort_by_key(|t| t.0);
1660        let expected = (0..fifo.capacity() as u64).map(|i| (i, i, i)).collect_vec();
1661        assert_eq!(pieces, expected);
1662    }
1663
1664    #[test]
1665    fn test_insert_size_over_capacity() {
1666        let cache: RawCache<Fifo<Vec<u8>, Vec<u8>, TestProperties>, ModHasher, HashTableIndexer<_>> =
1667            RawCache::new(RawCacheConfig {
1668                capacity: 4 * 1024, // 4KB
1669                shards: 1,
1670                eviction_config: FifoConfig::default(),
1671                hash_builder: Default::default(),
1672                weighter: Arc::new(|k, v| k.len() + v.len()),
1673                filter: Arc::new(|_, _| true),
1674                event_listener: None,
1675                metrics: Arc::new(Metrics::noop()),
1676            });
1677
1678        let key = vec![b'k'; 1024]; // 1KB
1679        let value = vec![b'v'; 5 * 1024]; // 5KB
1680
1681        cache.insert(key.clone(), value.clone());
1682        assert_eq!(cache.usage(), 6 * 1024);
1683        assert_eq!(cache.get(&key).unwrap().value(), &value);
1684    }
1685
1686    #[test]
1687    fn test_capacity_distribution_without_loss() {
1688        let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1689            RawCache::new(RawCacheConfig {
1690                capacity: 3,
1691                shards: 2,
1692                eviction_config: FifoConfig::default(),
1693                hash_builder: Default::default(),
1694                weighter: Arc::new(|_, _| 1),
1695                filter: Arc::new(|_, _| true),
1696                event_listener: None,
1697                metrics: Arc::new(Metrics::noop()),
1698            });
1699
1700        for key in 0..3 {
1701            let entry = cache.insert(key, key);
1702            drop(entry);
1703        }
1704
1705        assert_eq!(cache.usage(), 3);
1706
1707        for key in 0..3 {
1708            let entry = cache.get(&key).expect("entry should exist");
1709            assert_eq!(*entry, key);
1710            drop(entry);
1711        }
1712    }
1713
1714    #[test]
1715    fn test_capacity_distribution_with_more_shards_than_capacity() {
1716        let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1717            RawCache::new(RawCacheConfig {
1718                capacity: 2,
1719                shards: 4,
1720                eviction_config: FifoConfig::default(),
1721                hash_builder: Default::default(),
1722                weighter: Arc::new(|_, _| 1),
1723                filter: Arc::new(|_, _| true),
1724                event_listener: None,
1725                metrics: Arc::new(Metrics::noop()),
1726            });
1727
1728        for key in 0..2 {
1729            let entry = cache.insert(key, key);
1730            drop(entry);
1731        }
1732
1733        assert_eq!(cache.usage(), 2);
1734
1735        for key in 0..2 {
1736            let entry = cache.get(&key).expect("entry should exist");
1737            assert_eq!(*entry, key);
1738            drop(entry);
1739        }
1740
1741        assert!(cache.get(&2).is_none());
1742    }
1743
1744    fn test_resize<E>(cache: &RawCache<E, ModHasher, HashTableIndexer<E>>)
1745    where
1746        E: Eviction<Key = u64, Value = u64>,
1747    {
1748        let capacity = cache.capacity();
1749        for i in 0..capacity as u64 * 2 {
1750            cache.insert(i, i);
1751        }
1752        assert_eq!(cache.usage(), capacity);
1753        cache.resize(capacity / 2).unwrap();
1754        assert_eq!(cache.usage(), capacity / 2);
1755        for i in 0..capacity as u64 * 2 {
1756            cache.insert(i, i);
1757        }
1758        assert_eq!(cache.usage(), capacity / 2);
1759    }
1760
1761    #[test]
1762    fn test_fifo_cache_resize() {
1763        let cache = fifo_cache_for_test();
1764        test_resize(&cache);
1765    }
1766
1767    #[test]
1768    fn test_s3fifo_cache_resize() {
1769        let cache = s3fifo_cache_for_test();
1770        test_resize(&cache);
1771    }
1772
1773    #[test]
1774    fn test_lru_cache_resize() {
1775        let cache = lru_cache_for_test();
1776        test_resize(&cache);
1777    }
1778
1779    #[test]
1780    fn test_lfu_cache_resize() {
1781        let cache = lfu_cache_for_test();
1782        test_resize(&cache);
1783    }
1784
1785    #[test]
1786    fn test_sieve_cache_resize() {
1787        let cache = sieve_cache_for_test();
1788        test_resize(&cache);
1789    }
1790
1791    mod fuzzy {
1792        use foyer_common::properties::Hint;
1793
1794        use super::*;
1795
1796        fn fuzzy<E, S>(cache: RawCache<E, S, HashTableIndexer<E>>, hints: Vec<Hint>)
1797        where
1798            E: Eviction<Key = u64, Value = u64, Properties = TestProperties>,
1799            S: HashBuilder,
1800        {
1801            let handles = (0..8)
1802                .map(|i| {
1803                    let c = cache.clone();
1804                    let hints = hints.clone();
1805                    std::thread::spawn(move || {
1806                        let mut rng = SmallRng::seed_from_u64(i);
1807                        for _ in 0..100000 {
1808                            let key = rng.next_u64();
1809                            if let Some(entry) = c.get(&key) {
1810                                assert_eq!(key, *entry);
1811                                drop(entry);
1812                                continue;
1813                            }
1814                            let hint = hints.choose(&mut rng).cloned().unwrap();
1815                            c.insert_with_properties(key, key, TestProperties::default().with_hint(hint));
1816                        }
1817                    })
1818                })
1819                .collect_vec();
1820
1821            handles.into_iter().for_each(|handle| handle.join().unwrap());
1822
1823            assert_eq!(cache.usage(), cache.capacity());
1824        }
1825
1826        #[test_log::test]
1827        fn test_fifo_cache_fuzzy() {
1828            let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1829                RawCache::new(RawCacheConfig {
1830                    capacity: 256,
1831                    shards: 4,
1832                    eviction_config: FifoConfig::default(),
1833                    hash_builder: Default::default(),
1834                    weighter: Arc::new(|_, _| 1),
1835                    filter: Arc::new(|_, _| true),
1836                    event_listener: None,
1837                    metrics: Arc::new(Metrics::noop()),
1838                });
1839            let hints = vec![Hint::Normal];
1840            fuzzy(cache, hints);
1841        }
1842
1843        #[test_log::test]
1844        fn test_s3fifo_cache_fuzzy() {
1845            let cache: RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1846                RawCache::new(RawCacheConfig {
1847                    capacity: 256,
1848                    shards: 4,
1849                    eviction_config: S3FifoConfig::default(),
1850                    hash_builder: Default::default(),
1851                    weighter: Arc::new(|_, _| 1),
1852                    filter: Arc::new(|_, _| true),
1853                    event_listener: None,
1854                    metrics: Arc::new(Metrics::noop()),
1855                });
1856            let hints = vec![Hint::Normal];
1857            fuzzy(cache, hints);
1858        }
1859
1860        #[test_log::test]
1861        fn test_lru_cache_fuzzy() {
1862            let cache: RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1863                RawCache::new(RawCacheConfig {
1864                    capacity: 256,
1865                    shards: 4,
1866                    eviction_config: LruConfig::default(),
1867                    hash_builder: Default::default(),
1868                    weighter: Arc::new(|_, _| 1),
1869                    filter: Arc::new(|_, _| true),
1870                    event_listener: None,
1871                    metrics: Arc::new(Metrics::noop()),
1872                });
1873            let hints = vec![Hint::Normal, Hint::Low];
1874            fuzzy(cache, hints);
1875        }
1876
1877        #[test_log::test]
1878        fn test_lfu_cache_fuzzy() {
1879            let cache: RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1880                RawCache::new(RawCacheConfig {
1881                    capacity: 256,
1882                    shards: 4,
1883                    eviction_config: LfuConfig::default(),
1884                    hash_builder: Default::default(),
1885                    weighter: Arc::new(|_, _| 1),
1886                    filter: Arc::new(|_, _| true),
1887                    event_listener: None,
1888                    metrics: Arc::new(Metrics::noop()),
1889                });
1890            let hints = vec![Hint::Normal];
1891            fuzzy(cache, hints);
1892        }
1893
1894        #[test_log::test]
1895        fn test_sieve_cache_fuzzy() {
1896            let cache: RawCache<Sieve<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1897                RawCache::new(RawCacheConfig {
1898                    capacity: 256,
1899                    shards: 4,
1900                    eviction_config: SieveConfig {},
1901                    hash_builder: Default::default(),
1902                    weighter: Arc::new(|_, _| 1),
1903                    filter: Arc::new(|_, _| true),
1904                    event_listener: None,
1905                    metrics: Arc::new(Metrics::noop()),
1906                });
1907            let hints = vec![Hint::Normal];
1908            fuzzy(cache, hints);
1909        }
1910    }
1911}