1use std::{
16 any::Any,
17 fmt::Debug,
18 future::Future,
19 hash::Hash,
20 ops::Deref,
21 pin::Pin,
22 sync::{
23 atomic::{AtomicBool, Ordering},
24 Arc,
25 },
26 task::{Context, Poll},
27};
28
29use arc_swap::ArcSwap;
30use equivalent::Equivalent;
31use foyer_common::{
32 code::HashBuilder,
33 error::{Error, ErrorKind, Result},
34 event::{Event, EventListener},
35 metrics::Metrics,
36 properties::{Location, Properties, Source},
37 runtime::SingletonHandle,
38 strict_assert,
39 utils::scope::Scope,
40};
41use futures_util::FutureExt as _;
42use itertools::Itertools;
43use parking_lot::{Mutex, RwLock};
44use pin_project::{pin_project, pinned_drop};
45
46use crate::{
47 eviction::{Eviction, Op},
48 indexer::{sentry::Sentry, Indexer},
49 inflight::{
50 Enqueue, FetchOrTake, FetchTarget, InflightManager, Notifier, OptionalFetch, OptionalFetchBuilder,
51 RequiredFetch, RequiredFetchBuilder, Waiter,
52 },
53 pipe::NoopPipe,
54 record::{Data, Record},
55 Piece, Pipe,
56};
57
58pub trait Weighter<K, V>: Fn(&K, &V) -> usize + Send + Sync + 'static {}
62impl<K, V, T> Weighter<K, V> for T where T: Fn(&K, &V) -> usize + Send + Sync + 'static {}
63
64pub trait Filter<K, V>: Fn(&K, &V) -> bool + Send + Sync + 'static {}
75impl<K, V, T> Filter<K, V> for T where T: Fn(&K, &V) -> bool + Send + Sync + 'static {}
76
77pub struct RawCacheConfig<E, S>
78where
79 E: Eviction,
80 S: HashBuilder,
81{
82 pub capacity: usize,
83 pub shards: usize,
84 pub eviction_config: E::Config,
85 pub hash_builder: S,
86 pub weighter: Arc<dyn Weighter<E::Key, E::Value>>,
87 pub filter: Arc<dyn Filter<E::Key, E::Value>>,
88 pub event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
89 pub metrics: Arc<Metrics>,
90}
91
92struct RawCacheShard<E, S, I>
93where
94 E: Eviction,
95 S: HashBuilder,
96 I: Indexer<Eviction = E>,
97{
98 eviction: E,
99 indexer: Sentry<I>,
100
101 usage: usize,
102 capacity: usize,
103
104 inflights: Arc<Mutex<InflightManager<E, S, I>>>,
105
106 metrics: Arc<Metrics>,
107 _event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
108}
109
110impl<E, S, I> RawCacheShard<E, S, I>
111where
112 E: Eviction,
113 S: HashBuilder,
114 I: Indexer<Eviction = E>,
115{
116 fn evict(&mut self, target: usize, garbages: &mut Vec<(Event, Arc<Record<E>>)>) {
118 while self.usage > target {
120 let evicted = match self.eviction.pop() {
121 Some(evicted) => evicted,
122 None => break,
123 };
124 self.metrics.memory_evict.increase(1);
125
126 let e = self.indexer.remove(evicted.hash(), evicted.key()).unwrap();
127 assert_eq!(Arc::as_ptr(&evicted), Arc::as_ptr(&e));
128
129 strict_assert!(!evicted.as_ref().is_in_indexer());
130 strict_assert!(!evicted.as_ref().is_in_eviction());
131
132 self.usage -= evicted.weight();
133
134 garbages.push((Event::Evict, evicted));
135 }
136 }
137
138 #[expect(clippy::type_complexity)]
139 fn emplace(
140 &mut self,
141 record: Arc<Record<E>>,
142 garbages: &mut Vec<(Event, Arc<Record<E>>)>,
143 notifiers: &mut Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
144 ) {
145 *notifiers = self
146 .inflights
147 .lock()
148 .take(record.hash(), record.key(), None)
149 .unwrap_or_default();
150
151 if record.properties().phantom().unwrap_or_default() {
152 if let Some(old) = self.indexer.remove(record.hash(), record.key()) {
153 strict_assert!(!old.is_in_indexer());
154
155 if old.is_in_eviction() {
156 self.eviction.remove(&old);
157 }
158 strict_assert!(!old.is_in_eviction());
159
160 self.usage -= old.weight();
161
162 garbages.push((Event::Replace, old));
163 }
164 record.inc_refs(notifiers.len() + 1);
165 garbages.push((Event::Remove, record));
166 self.metrics.memory_insert.increase(1);
167 return;
168 }
169
170 let weight = record.weight();
171 let old_usage = self.usage;
172
173 self.evict(self.capacity.saturating_sub(weight), garbages);
175
176 if let Some(old) = self.indexer.insert(record.clone()) {
178 self.metrics.memory_replace.increase(1);
179
180 strict_assert!(!old.is_in_indexer());
181
182 if old.is_in_eviction() {
183 self.eviction.remove(&old);
184 }
185 strict_assert!(!old.is_in_eviction());
186
187 self.usage -= old.weight();
188
189 garbages.push((Event::Replace, old));
190 } else {
191 self.metrics.memory_insert.increase(1);
192 }
193 strict_assert!(record.is_in_indexer());
194
195 strict_assert!(!record.is_in_eviction());
196 self.eviction.push(record.clone());
197 strict_assert!(record.is_in_eviction());
198
199 self.usage += weight;
200 record.inc_refs(notifiers.len() + 1);
203
204 match self.usage.cmp(&old_usage) {
205 std::cmp::Ordering::Greater => self.metrics.memory_usage.increase((self.usage - old_usage) as _),
206 std::cmp::Ordering::Less => self.metrics.memory_usage.decrease((old_usage - self.usage) as _),
207 std::cmp::Ordering::Equal => {}
208 }
209 }
210
211 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::remove"))]
212 fn remove<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
213 where
214 Q: Hash + Equivalent<E::Key> + ?Sized,
215 {
216 let record = self.indexer.remove(hash, key)?;
217
218 if record.is_in_eviction() {
219 self.eviction.remove(&record);
220 }
221 strict_assert!(!record.is_in_indexer());
222 strict_assert!(!record.is_in_eviction());
223
224 self.usage -= record.weight();
225
226 self.metrics.memory_remove.increase(1);
227 self.metrics.memory_usage.decrease(record.weight() as _);
228
229 record.inc_refs(1);
230
231 Some(record)
232 }
233
234 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_noop"))]
235 fn get_noop<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
236 where
237 Q: Hash + Equivalent<E::Key> + ?Sized,
238 {
239 self.get_inner(hash, key)
240 }
241
242 #[cfg_attr(
243 feature = "tracing",
244 fastrace::trace(name = "foyer::memory::raw::shard::get_immutable")
245 )]
246 fn get_immutable<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
247 where
248 Q: Hash + Equivalent<E::Key> + ?Sized,
249 {
250 self.get_inner(hash, key)
251 .inspect(|record| self.acquire_immutable(record))
252 }
253
254 #[cfg_attr(
255 feature = "tracing",
256 fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")
257 )]
258 fn get_mutable<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
259 where
260 Q: Hash + Equivalent<E::Key> + ?Sized,
261 {
262 self.get_inner(hash, key).inspect(|record| self.acquire_mutable(record))
263 }
264
265 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_inner"))]
266 fn get_inner<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
267 where
268 Q: Hash + Equivalent<E::Key> + ?Sized,
269 {
270 let record = match self.indexer.get(hash, key).cloned() {
271 Some(record) => {
272 self.metrics.memory_hit.increase(1);
273 record
274 }
275 None => {
276 self.metrics.memory_miss.increase(1);
277 return None;
278 }
279 };
280
281 strict_assert!(record.is_in_indexer());
282
283 record.inc_refs(1);
284
285 Some(record)
286 }
287
288 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::clear"))]
289 fn clear(&mut self, garbages: &mut Vec<Arc<Record<E>>>) {
290 let records = self.indexer.drain().collect_vec();
291 self.eviction.clear();
292
293 let mut count = 0;
294
295 for record in records {
296 count += 1;
297 strict_assert!(!record.is_in_indexer());
298 strict_assert!(!record.is_in_eviction());
299
300 garbages.push(record);
301 }
302
303 self.metrics.memory_remove.increase(count);
304 }
305
306 #[cfg_attr(
307 feature = "tracing",
308 fastrace::trace(name = "foyer::memory::raw::shard::acquire_immutable")
309 )]
310 fn acquire_immutable(&self, record: &Arc<Record<E>>) {
311 match E::acquire() {
312 Op::Immutable(f) => f(&self.eviction, record),
313 _ => unreachable!(),
314 }
315 }
316
317 #[cfg_attr(
318 feature = "tracing",
319 fastrace::trace(name = "foyer::memory::raw::shard::acquire_mutable")
320 )]
321 fn acquire_mutable(&mut self, record: &Arc<Record<E>>) {
322 match E::acquire() {
323 Op::Mutable(mut f) => f(&mut self.eviction, record),
324 _ => unreachable!(),
325 }
326 }
327
328 #[cfg_attr(
329 feature = "tracing",
330 fastrace::trace(name = "foyer::memory::raw::shard::release_immutable")
331 )]
332 fn release_immutable(&self, record: &Arc<Record<E>>) {
333 match E::release() {
334 Op::Immutable(f) => f(&self.eviction, record),
335 _ => unreachable!(),
336 }
337 }
338
339 #[cfg_attr(
340 feature = "tracing",
341 fastrace::trace(name = "foyer::memory::raw::shard::release_mutable")
342 )]
343 fn release_mutable(&mut self, record: &Arc<Record<E>>) {
344 match E::release() {
345 Op::Mutable(mut f) => f(&mut self.eviction, record),
346 _ => unreachable!(),
347 }
348 }
349}
350
351#[expect(clippy::type_complexity)]
352struct RawCacheInner<E, S, I>
353where
354 E: Eviction,
355 S: HashBuilder,
356 I: Indexer<Eviction = E>,
357{
358 shards: Vec<RwLock<RawCacheShard<E, S, I>>>,
359
360 capacity: usize,
361
362 hash_builder: Arc<S>,
363 weighter: Arc<dyn Weighter<E::Key, E::Value>>,
364 filter: Arc<dyn Filter<E::Key, E::Value>>,
365
366 metrics: Arc<Metrics>,
367 event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
368 pipe: ArcSwap<Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>>>,
369}
370
371impl<E, S, I> RawCacheInner<E, S, I>
372where
373 E: Eviction,
374 S: HashBuilder,
375 I: Indexer<Eviction = E>,
376{
377 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::inner::clear"))]
378 fn clear(&self) {
379 let mut garbages = vec![];
380
381 self.shards
382 .iter()
383 .map(|shard| shard.write())
384 .for_each(|mut shard| shard.clear(&mut garbages));
385
386 if let Some(listener) = self.event_listener.as_ref() {
388 for record in garbages {
389 listener.on_leave(Event::Clear, record.key(), record.value());
390 }
391 }
392 }
393}
394
395pub struct RawCache<E, S, I>
396where
397 E: Eviction,
398 S: HashBuilder,
399 I: Indexer<Eviction = E>,
400{
401 inner: Arc<RawCacheInner<E, S, I>>,
402}
403
404impl<E, S, I> Drop for RawCacheInner<E, S, I>
405where
406 E: Eviction,
407 S: HashBuilder,
408 I: Indexer<Eviction = E>,
409{
410 fn drop(&mut self) {
411 self.clear();
412 }
413}
414
415impl<E, S, I> Clone for RawCache<E, S, I>
416where
417 E: Eviction,
418 S: HashBuilder,
419 I: Indexer<Eviction = E>,
420{
421 fn clone(&self) -> Self {
422 Self {
423 inner: self.inner.clone(),
424 }
425 }
426}
427
428impl<E, S, I> RawCache<E, S, I>
429where
430 E: Eviction,
431 S: HashBuilder,
432 I: Indexer<Eviction = E>,
433{
434 pub fn new(config: RawCacheConfig<E, S>) -> Self {
435 assert!(config.shards > 0, "shards must be greater than zero.");
436
437 let shard_capacities = (0..config.shards)
438 .map(|index| Self::shard_capacity_for(config.capacity, config.shards, index))
439 .collect_vec();
440
441 let shards = shard_capacities
442 .into_iter()
443 .map(|shard_capacity| RawCacheShard {
444 eviction: E::new(shard_capacity, &config.eviction_config),
445 indexer: Sentry::default(),
446 usage: 0,
447 capacity: shard_capacity,
448 inflights: Arc::new(Mutex::new(InflightManager::new())),
449 metrics: config.metrics.clone(),
450 _event_listener: config.event_listener.clone(),
451 })
452 .map(RwLock::new)
453 .collect_vec();
454
455 let pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>> =
456 Box::new(NoopPipe::default());
457
458 let inner = RawCacheInner {
459 shards,
460 capacity: config.capacity,
461 hash_builder: Arc::new(config.hash_builder),
462 weighter: config.weighter,
463 filter: config.filter,
464 metrics: config.metrics,
465 event_listener: config.event_listener,
466 pipe: ArcSwap::new(Arc::new(pipe)),
467 };
468
469 Self { inner: Arc::new(inner) }
470 }
471
472 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::resize"))]
473 pub fn resize(&self, capacity: usize) -> Result<()> {
474 let shards = self.inner.shards.len();
475 assert!(shards > 0, "shards must be greater than zero.");
476
477 let shard_capacities = (0..shards)
478 .map(|index| Self::shard_capacity_for(capacity, shards, index))
479 .collect_vec();
480
481 let handles = shard_capacities
482 .into_iter()
483 .enumerate()
484 .map(|(i, shard_capacity)| {
485 let inner = self.inner.clone();
486 std::thread::spawn(move || {
487 let mut garbages = vec![];
488 let res = inner.shards[i].write().with(|mut shard| {
489 shard.eviction.update(shard_capacity, None).inspect(|_| {
490 shard.capacity = shard_capacity;
491 shard.evict(shard_capacity, &mut garbages)
492 })
493 });
494 let pipe = inner.pipe.load();
496 let piped = pipe.is_enabled();
497 if inner.event_listener.is_some() || piped {
498 for (event, record) in garbages {
499 if let Some(listener) = inner.event_listener.as_ref() {
500 listener.on_leave(event, record.key(), record.value())
501 }
502 if piped && event == Event::Evict {
503 pipe.send(Piece::new(record));
504 }
505 }
506 }
507 res
508 })
509 })
510 .collect_vec();
511
512 let errs = handles
513 .into_iter()
514 .map(|handle| handle.join().unwrap())
515 .filter(|res| res.is_err())
516 .map(|res| res.unwrap_err())
517 .collect_vec();
518 if !errs.is_empty() {
519 let mut e = Error::new(ErrorKind::Config, "resize raw cache failed");
520 for err in errs {
521 e = e.with_context("reason", format!("{err}"));
522 }
523 return Err(e);
524 }
525
526 Ok(())
527 }
528
529 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert"))]
530 pub fn insert(&self, key: E::Key, value: E::Value) -> RawCacheEntry<E, S, I> {
531 self.insert_with_properties(key, value, Default::default())
532 }
533
534 #[cfg_attr(
535 feature = "tracing",
536 fastrace::trace(name = "foyer::memory::raw::insert_with_properties")
537 )]
538 pub fn insert_with_properties(
539 &self,
540 key: E::Key,
541 value: E::Value,
542 properties: E::Properties,
543 ) -> RawCacheEntry<E, S, I> {
544 self.insert_with_properties_inner(key, value, properties, Source::Outer)
545 }
546
547 fn insert_with_properties_inner(
548 &self,
549 key: E::Key,
550 value: E::Value,
551 mut properties: E::Properties,
552 source: Source,
553 ) -> RawCacheEntry<E, S, I> {
554 let hash = self.inner.hash_builder.hash_one(&key);
555 let weight = (self.inner.weighter)(&key, &value);
556 if !(self.inner.filter)(&key, &value) {
557 properties = properties.with_phantom(true);
558 }
559 if let Some(location) = properties.location() {
560 if location == Location::OnDisk {
561 properties = properties.with_phantom(true);
562 }
563 }
564 let record = Arc::new(Record::new(Data {
565 key,
566 value,
567 properties,
568 hash,
569 weight,
570 }));
571 self.insert_inner(record, source)
572 }
573
574 #[doc(hidden)]
575 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_piece"))]
576 pub fn insert_piece(&self, piece: Piece<E::Key, E::Value, E::Properties>) -> RawCacheEntry<E, S, I> {
577 self.insert_inner(piece.into_record(), Source::Memory)
578 }
579
580 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_inner"))]
581 fn insert_inner(&self, record: Arc<Record<E>>, source: Source) -> RawCacheEntry<E, S, I> {
582 let mut garbages = vec![];
583 let mut notifiers = vec![];
584
585 self.inner.shards[self.shard(record.hash())]
586 .write()
587 .with(|mut shard| shard.emplace(record.clone(), &mut garbages, &mut notifiers));
588
589 for notifier in notifiers {
591 let _ = notifier.send(Ok(Some(RawCacheEntry {
592 record: record.clone(),
593 inner: self.inner.clone(),
594 source,
595 })));
596 }
597
598 let pipe = self.inner.pipe.load();
600 let piped = pipe.is_enabled();
601 if self.inner.event_listener.is_some() || piped {
602 for (event, record) in garbages {
603 if let Some(listener) = self.inner.event_listener.as_ref() {
604 listener.on_leave(event, record.key(), record.value())
605 }
606 if piped && event == Event::Evict {
607 pipe.send(Piece::new(record));
608 }
609 }
610 }
611
612 RawCacheEntry {
613 record,
614 inner: self.inner.clone(),
615 source,
616 }
617 }
618
619 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::evict_all"))]
621 pub fn evict_all(&self) {
622 let mut garbages = vec![];
623 for shard in self.inner.shards.iter() {
624 shard.write().evict(0, &mut garbages);
625 }
626
627 let pipe = self.inner.pipe.load();
629 let piped = pipe.is_enabled();
630 if self.inner.event_listener.is_some() || piped {
631 for (event, record) in garbages {
632 if let Some(listener) = self.inner.event_listener.as_ref() {
633 listener.on_leave(event, record.key(), record.value())
634 }
635 if piped && event == Event::Evict {
636 pipe.send(Piece::new(record));
637 }
638 }
639 }
640 }
641
642 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::flush"))]
647 pub async fn flush(&self) {
648 let mut garbages = vec![];
649 for shard in self.inner.shards.iter() {
650 shard.write().evict(0, &mut garbages);
651 }
652
653 let pipe = self.inner.pipe.load();
655 let piped = pipe.is_enabled();
656
657 if let Some(listener) = self.inner.event_listener.as_ref() {
658 for (event, record) in garbages.iter() {
659 listener.on_leave(*event, record.key(), record.value());
660 }
661 }
662 if piped {
663 let pieces = garbages.into_iter().map(|(_, record)| Piece::new(record)).collect_vec();
664 pipe.flush(pieces).await;
665 }
666 }
667
668 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::remove"))]
669 pub fn remove<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
670 where
671 Q: Hash + Equivalent<E::Key> + ?Sized,
672 {
673 let hash = self.inner.hash_builder.hash_one(key);
674
675 self.inner.shards[self.shard(hash)]
676 .write()
677 .with(|mut shard| {
678 shard.remove(hash, key).map(|record| RawCacheEntry {
679 inner: self.inner.clone(),
680 record,
681 source: Source::Memory,
682 })
683 })
684 .inspect(|record| {
685 if let Some(listener) = self.inner.event_listener.as_ref() {
687 listener.on_leave(Event::Remove, record.key(), record.value());
688 }
689 })
690 }
691
692 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get"))]
693 pub fn get<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
694 where
695 Q: Hash + Equivalent<E::Key> + ?Sized,
696 {
697 let hash = self.inner.hash_builder.hash_one(key);
698
699 let record = match E::acquire() {
700 Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
701 Op::Immutable(_) => self.inner.shards[self.shard(hash)]
702 .read()
703 .with(|shard| shard.get_immutable(hash, key)),
704 Op::Mutable(_) => self.inner.shards[self.shard(hash)]
705 .write()
706 .with(|mut shard| shard.get_mutable(hash, key)),
707 }?;
708
709 Some(RawCacheEntry {
710 inner: self.inner.clone(),
711 record,
712 source: Source::Memory,
713 })
714 }
715
716 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::contains"))]
717 pub fn contains<Q>(&self, key: &Q) -> bool
718 where
719 Q: Hash + Equivalent<E::Key> + ?Sized,
720 {
721 let hash = self.inner.hash_builder.hash_one(key);
722
723 self.inner.shards[self.shard(hash)]
724 .read()
725 .with(|shard| shard.indexer.get(hash, key).is_some())
726 }
727
728 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::touch"))]
729 pub fn touch<Q>(&self, key: &Q) -> bool
730 where
731 Q: Hash + Equivalent<E::Key> + ?Sized,
732 {
733 let hash = self.inner.hash_builder.hash_one(key);
734
735 match E::acquire() {
736 Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
737 Op::Immutable(_) => self.inner.shards[self.shard(hash)]
738 .read()
739 .with(|shard| shard.get_immutable(hash, key)),
740 Op::Mutable(_) => self.inner.shards[self.shard(hash)]
741 .write()
742 .with(|mut shard| shard.get_mutable(hash, key)),
743 }
744 .is_some()
745 }
746
747 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::clear"))]
748 pub fn clear(&self) {
749 self.inner.clear();
750 }
751
752 pub fn capacity(&self) -> usize {
753 self.inner.capacity
754 }
755
756 pub fn usage(&self) -> usize {
757 self.inner.shards.iter().map(|shard| shard.read().usage).sum()
758 }
759
760 pub fn metrics(&self) -> &Metrics {
761 &self.inner.metrics
762 }
763
764 pub fn hash_builder(&self) -> &Arc<S> {
765 &self.inner.hash_builder
766 }
767
768 pub fn shards(&self) -> usize {
769 self.inner.shards.len()
770 }
771
772 pub fn set_pipe(&self, pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>>) {
773 self.inner.pipe.store(Arc::new(pipe));
774 }
775
776 fn shard(&self, hash: u64) -> usize {
777 hash as usize % self.inner.shards.len()
778 }
779
780 fn shard_capacity_for(total: usize, shards: usize, index: usize) -> usize {
781 let base = total / shards;
782 let remainder = total % shards;
783 base + usize::from(index < remainder)
784 }
785}
786
787pub struct RawCacheEntry<E, S, I>
788where
789 E: Eviction,
790 S: HashBuilder,
791 I: Indexer<Eviction = E>,
792{
793 inner: Arc<RawCacheInner<E, S, I>>,
794 record: Arc<Record<E>>,
795 source: Source,
796}
797
798impl<E, S, I> Debug for RawCacheEntry<E, S, I>
799where
800 E: Eviction,
801 S: HashBuilder,
802 I: Indexer<Eviction = E>,
803{
804 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
805 f.debug_struct("RawCacheEntry").field("record", &self.record).finish()
806 }
807}
808
809impl<E, S, I> Drop for RawCacheEntry<E, S, I>
810where
811 E: Eviction,
812 S: HashBuilder,
813 I: Indexer<Eviction = E>,
814{
815 fn drop(&mut self) {
816 let hash = self.record.hash();
817 let shard = &self.inner.shards[hash as usize % self.inner.shards.len()];
818
819 if self.record.dec_refs(1) == 0 {
820 if self.record.properties().phantom().unwrap_or_default() {
821 if let Some(listener) = self.inner.event_listener.as_ref() {
822 listener.on_leave(Event::Evict, self.record.key(), self.record.value());
823 }
824 let pipe = self.inner.pipe.load();
825 if pipe.is_enabled() {
826 pipe.send(Piece::new(self.record.clone()));
827 }
828 return;
829 }
830
831 match E::release() {
832 Op::Noop => {}
833 Op::Immutable(_) => shard.read().with(|shard| shard.release_immutable(&self.record)),
834 Op::Mutable(_) => shard.write().with(|mut shard| shard.release_mutable(&self.record)),
835 }
836 }
837 }
838}
839
840impl<E, S, I> Clone for RawCacheEntry<E, S, I>
841where
842 E: Eviction,
843 S: HashBuilder,
844 I: Indexer<Eviction = E>,
845{
846 fn clone(&self) -> Self {
847 self.record.inc_refs(1);
848 Self {
849 inner: self.inner.clone(),
850 record: self.record.clone(),
851 source: self.source,
852 }
853 }
854}
855
856impl<E, S, I> Deref for RawCacheEntry<E, S, I>
857where
858 E: Eviction,
859 S: HashBuilder,
860 I: Indexer<Eviction = E>,
861{
862 type Target = E::Value;
863
864 fn deref(&self) -> &Self::Target {
865 self.value()
866 }
867}
868
869unsafe impl<E, S, I> Send for RawCacheEntry<E, S, I>
870where
871 E: Eviction,
872 S: HashBuilder,
873 I: Indexer<Eviction = E>,
874{
875}
876
877unsafe impl<E, S, I> Sync for RawCacheEntry<E, S, I>
878where
879 E: Eviction,
880 S: HashBuilder,
881 I: Indexer<Eviction = E>,
882{
883}
884
885impl<E, S, I> RawCacheEntry<E, S, I>
886where
887 E: Eviction,
888 S: HashBuilder,
889 I: Indexer<Eviction = E>,
890{
891 pub fn hash(&self) -> u64 {
892 self.record.hash()
893 }
894
895 pub fn key(&self) -> &E::Key {
896 self.record.key()
897 }
898
899 pub fn value(&self) -> &E::Value {
900 self.record.value()
901 }
902
903 pub fn properties(&self) -> &E::Properties {
904 self.record.properties()
905 }
906
907 pub fn weight(&self) -> usize {
908 self.record.weight()
909 }
910
911 pub fn refs(&self) -> usize {
912 self.record.refs()
913 }
914
915 pub fn is_outdated(&self) -> bool {
916 !self.record.is_in_indexer()
917 }
918
919 pub fn piece(&self) -> Piece<E::Key, E::Value, E::Properties> {
920 Piece::new(self.record.clone())
921 }
922
923 pub fn source(&self) -> Source {
924 self.source
925 }
926}
927
928impl<E, S, I> RawCache<E, S, I>
929where
930 E: Eviction,
931 S: HashBuilder,
932 I: Indexer<Eviction = E>,
933{
934 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get_or_fetch"))]
935 pub fn get_or_fetch<Q, F, FU, IT, ER>(&self, key: &Q, fetch: F) -> RawGetOrFetch<E, S, I>
936 where
937 Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
938 F: FnOnce() -> FU,
939 FU: Future<Output = std::result::Result<IT, ER>> + Send + 'static,
940 IT: Into<FetchTarget<E::Key, E::Value, E::Properties>>,
941 ER: Into<anyhow::Error>,
942 {
943 let fut = fetch();
944 self.get_or_fetch_inner(
945 key,
946 || None,
947 || {
948 Some(Box::new(|_| {
949 async {
950 match fut.await {
951 Ok(it) => Ok(it.into()),
952 Err(e) => Err(Error::new(ErrorKind::External, "fetch failed").with_source(e)),
953 }
954 }
955 .boxed()
956 }))
957 },
958 (),
959 &tokio::runtime::Handle::current().into(),
960 )
961 }
962
963 #[doc(hidden)]
967 #[cfg_attr(
968 feature = "tracing",
969 fastrace::trace(name = "foyer::memory::raw::get_or_fetch_inner")
970 )]
971 pub fn get_or_fetch_inner<Q, C, FO, FR>(
972 &self,
973 key: &Q,
974 fo: FO,
975 fr: FR,
976 ctx: C,
977 runtime: &SingletonHandle,
978 ) -> RawGetOrFetch<E, S, I>
979 where
980 Q: Hash + Equivalent<E::Key> + ?Sized + ToOwned<Owned = E::Key>,
981 C: Any + Send + Sync + 'static,
982 FO: FnOnce() -> Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
983 FR: FnOnce() -> Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
984 {
985 let hash = self.inner.hash_builder.hash_one(key);
986
987 let extract = |key: &Q, opt: Option<Arc<Record<E>>>, inflights: &Arc<Mutex<InflightManager<E, S, I>>>| {
989 opt.map(|record| {
990 RawGetOrFetch::Hit(Some(RawCacheEntry {
991 inner: self.inner.clone(),
992 record,
993 source: Source::Memory,
994 }))
995 })
996 .unwrap_or_else(|| match inflights.lock().enqueue(hash, key, fr()) {
997 Enqueue::Lead {
998 id,
999 close,
1000 waiter,
1001 required_fetch_builder,
1002 } => {
1003 let fetch = RawFetch {
1004 state: RawFetchState::Init {
1005 optional_fetch_builder: fo(),
1006 required_fetch_builder,
1007 },
1008 id,
1009 hash,
1010 key: Some(key.to_owned()),
1011 ctx,
1012 cache: self.clone(),
1013 inflights: inflights.clone(),
1014 close,
1015 };
1016 runtime.spawn(fetch);
1017 RawGetOrFetch::Miss(RawWait { waiter })
1018 }
1019 Enqueue::Wait(waiter) => RawGetOrFetch::Miss(RawWait { waiter }),
1020 })
1021 };
1022
1023 let res = match E::acquire() {
1024 Op::Noop => self.inner.shards[self.shard(hash)]
1025 .read()
1026 .with(|shard| extract(key, shard.get_noop(hash, key), &shard.inflights)),
1027 Op::Immutable(_) => self.inner.shards[self.shard(hash)]
1028 .read()
1029 .with(|shard| extract(key, shard.get_immutable(hash, key), &shard.inflights)),
1030 Op::Mutable(_) => self.inner.shards[self.shard(hash)]
1031 .write()
1032 .with(|mut shard| extract(key, shard.get_mutable(hash, key), &shard.inflights)),
1033 };
1034
1035 res
1036 }
1037}
1038
1039#[must_use]
1040#[pin_project(project = RawGetOrFetchProj)]
1041pub enum RawGetOrFetch<E, S, I>
1042where
1043 E: Eviction,
1044 S: HashBuilder,
1045 I: Indexer<Eviction = E>,
1046{
1047 Hit(Option<RawCacheEntry<E, S, I>>),
1048 Miss(#[pin] RawWait<E, S, I>),
1049}
1050
1051impl<E, S, I> Debug for RawGetOrFetch<E, S, I>
1052where
1053 E: Eviction,
1054 S: HashBuilder,
1055 I: Indexer<Eviction = E>,
1056{
1057 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1058 match self {
1059 Self::Hit(e) => f.debug_tuple("Hit").field(e).finish(),
1060 Self::Miss(fut) => f.debug_tuple("Miss").field(fut).finish(),
1061 }
1062 }
1063}
1064
1065impl<E, S, I> Future for RawGetOrFetch<E, S, I>
1066where
1067 E: Eviction,
1068 S: HashBuilder,
1069 I: Indexer<Eviction = E>,
1070{
1071 type Output = Result<Option<RawCacheEntry<E, S, I>>>;
1072
1073 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1074 let this = self.project();
1075 match this {
1076 RawGetOrFetchProj::Hit(opt) => {
1077 assert!(opt.is_some(), "entry is already taken");
1078 Poll::Ready(Ok(opt.take()))
1079 }
1080 RawGetOrFetchProj::Miss(fut) => fut.poll(cx),
1081 }
1082 }
1083}
1084
1085impl<E, S, I> RawGetOrFetch<E, S, I>
1086where
1087 E: Eviction,
1088 S: HashBuilder,
1089 I: Indexer<Eviction = E>,
1090{
1091 pub fn need_await(&self) -> bool {
1092 matches!(self, Self::Miss(_))
1093 }
1094
1095 #[expect(clippy::allow_attributes)]
1096 #[allow(clippy::result_large_err)]
1097 pub fn try_unwrap(self) -> std::result::Result<Option<RawCacheEntry<E, S, I>>, Self> {
1098 match self {
1099 Self::Hit(opt) => {
1100 assert!(opt.is_some(), "entry is already taken");
1101 Ok(opt)
1102 }
1103 Self::Miss(_) => Err(self),
1104 }
1105 }
1106}
1107
1108type Once<T> = Option<T>;
1109
1110#[must_use]
1111enum Try<E, S, I, C>
1112where
1113 E: Eviction,
1114 S: HashBuilder,
1115 I: Indexer<Eviction = E>,
1116 C: Any + Send + 'static,
1117{
1118 Noop,
1119 SetStateAndContinue(RawFetchState<E, S, I, C>),
1120 Ready,
1121}
1122
1123macro_rules! handle_try {
1124 ($state:expr, $method:ident($($args:expr),* $(,)?)) => {
1125 handle_try! { $state, Self::$method($($args),*) }
1126 };
1127
1128 ($state:expr, $try:expr) => {
1129 match $try {
1130 Try::Noop => {}
1131 Try::SetStateAndContinue(state) => {
1132 $state = state;
1133 continue;
1134 },
1135 Try::Ready => {
1136 $state = RawFetchState::Ready;
1137 return Poll::Ready(())
1138 },
1139 }
1140 };
1141}
1142
1143#[expect(clippy::type_complexity)]
1144pub enum RawFetchState<E, S, I, C>
1145where
1146 E: Eviction,
1147 S: HashBuilder,
1148 I: Indexer<Eviction = E>,
1149{
1150 Init {
1151 optional_fetch_builder: Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1152 required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1153 },
1154 FetchOptional {
1155 optional_fetch: OptionalFetch<FetchTarget<E::Key, E::Value, E::Properties>>,
1156 required_fetch_builder: Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1157 },
1158 FetchRequired {
1159 required_fetch: RequiredFetch<FetchTarget<E::Key, E::Value, E::Properties>>,
1160 },
1161 Notify {
1162 res: Option<Result<Option<RawCacheEntry<E, S, I>>>>,
1163 notifiers: Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
1164 },
1165 Ready,
1166}
1167
1168impl<E, S, I, C> Debug for RawFetchState<E, S, I, C>
1169where
1170 E: Eviction,
1171 S: HashBuilder,
1172 I: Indexer<Eviction = E>,
1173{
1174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1175 match self {
1176 Self::Init { .. } => f.debug_struct("Init").finish(),
1177 Self::FetchOptional { .. } => f.debug_struct("Optional").finish(),
1178 Self::FetchRequired { .. } => f.debug_struct("Required").finish(),
1179 Self::Notify { res, .. } => f.debug_struct("Notify").field("res", res).finish(),
1180 Self::Ready => f.debug_struct("Ready").finish(),
1181 }
1182 }
1183}
1184
1185#[pin_project]
1186pub struct RawWait<E, S, I>
1187where
1188 E: Eviction,
1189 S: HashBuilder,
1190 I: Indexer<Eviction = E>,
1191{
1192 waiter: Waiter<Option<RawCacheEntry<E, S, I>>>,
1193}
1194
1195impl<E, S, I> Debug for RawWait<E, S, I>
1196where
1197 E: Eviction,
1198 S: HashBuilder,
1199 I: Indexer<Eviction = E>,
1200{
1201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1202 f.debug_struct("RawWait").field("waiter", &self.waiter).finish()
1203 }
1204}
1205
1206impl<E, S, I> Future for RawWait<E, S, I>
1207where
1208 E: Eviction,
1209 S: HashBuilder,
1210 I: Indexer<Eviction = E>,
1211{
1212 type Output = Result<Option<RawCacheEntry<E, S, I>>>;
1213
1214 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1215 let this = self.project();
1216 this.waiter.poll_unpin(cx).map(|r| match r {
1219 Ok(r) => r,
1220 Err(e) => Err(Error::new(ErrorKind::ChannelClosed, "waiter channel closed").with_source(e)),
1221 })
1222 }
1223}
1224
1225#[pin_project(PinnedDrop)]
1226pub struct RawFetch<E, S, I, C>
1227where
1228 E: Eviction,
1229 S: HashBuilder,
1230 I: Indexer<Eviction = E>,
1231{
1232 state: RawFetchState<E, S, I, C>,
1233 id: usize,
1234 hash: u64,
1235 key: Once<E::Key>,
1236 ctx: C,
1237 cache: RawCache<E, S, I>,
1238 inflights: Arc<Mutex<InflightManager<E, S, I>>>,
1239 close: Arc<AtomicBool>,
1240}
1241
1242impl<E, S, I, C> Debug for RawFetch<E, S, I, C>
1243where
1244 E: Eviction,
1245 S: HashBuilder,
1246 I: Indexer<Eviction = E>,
1247{
1248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1249 f.debug_struct("RawFetch")
1250 .field("state", &self.state)
1251 .field("id", &self.id)
1252 .field("hash", &self.hash)
1253 .finish()
1254 }
1255}
1256
1257impl<E, S, I, C> Future for RawFetch<E, S, I, C>
1258where
1259 E: Eviction,
1260 S: HashBuilder,
1261 I: Indexer<Eviction = E>,
1262 C: Any + Send + 'static,
1263{
1264 type Output = ();
1265
1266 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1267 let this = self.as_mut().project();
1268 loop {
1269 match this.state {
1270 RawFetchState::Init {
1271 optional_fetch_builder,
1272 required_fetch_builder,
1273 } => {
1274 handle_try! { *this.state, try_set_optional(optional_fetch_builder, required_fetch_builder, this.ctx) }
1275 handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), this.inflights, Ok(None)) }
1276 }
1277 RawFetchState::FetchOptional {
1278 optional_fetch,
1279 required_fetch_builder,
1280 } => {
1281 if this.close.load(Ordering::Relaxed) {
1282 return Poll::Ready(());
1283 }
1284 match optional_fetch.poll_unpin(cx) {
1285 Poll::Pending => return Poll::Pending,
1286 Poll::Ready(Ok(Some(target))) => {
1287 handle_try! {*this.state, handle_target(target, this.key, this.cache, Source::Disk) }
1288 }
1289 Poll::Ready(Ok(None)) => {
1290 handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), &this.inflights, Ok(None)) }
1291 }
1292 Poll::Ready(Err(e)) => {
1293 handle_try! { *this.state, try_set_required(required_fetch_builder, this.ctx, *this.id, *this.hash, this.key.as_ref().unwrap(), &this.inflights, Err(e)) }
1294 }
1295 }
1296 }
1297 RawFetchState::FetchRequired { required_fetch } => {
1298 if this.close.load(Ordering::Relaxed) {
1299 return Poll::Ready(());
1300 }
1301 match required_fetch.poll_unpin(cx) {
1302 Poll::Pending => return Poll::Pending,
1303 Poll::Ready(Ok(target)) => {
1304 handle_try! { *this.state, handle_target(target, this.key, this.cache, Source::Outer) }
1305 }
1306 Poll::Ready(Err(e)) => {
1307 handle_try! { *this.state, handle_error(e, *this.id, *this.hash, this.key.as_ref().unwrap(), this.inflights) }
1308 }
1309 }
1310 }
1311 RawFetchState::Notify { res, notifiers } => {
1312 handle_try! { *this.state, handle_notify(res.take().unwrap(), notifiers) }
1313 }
1314 RawFetchState::Ready => return Poll::Ready(()),
1315 }
1316 }
1317 }
1318}
1319
1320impl<E, S, I, C> RawFetch<E, S, I, C>
1321where
1322 E: Eviction,
1323 S: HashBuilder,
1324 I: Indexer<Eviction = E>,
1325 C: Any + Send + 'static,
1326{
1327 #[expect(clippy::type_complexity)]
1328 fn try_set_optional(
1329 optional_fetch_builder: &mut Option<OptionalFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1330 required_fetch_builder: &mut Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1331 ctx: &mut C,
1332 ) -> Try<E, S, I, C> {
1333 match optional_fetch_builder.take() {
1334 None => Try::Noop,
1335 Some(optional_fetch_builder) => {
1336 let optional_fetch = optional_fetch_builder(ctx);
1337 Try::SetStateAndContinue(RawFetchState::FetchOptional {
1338 optional_fetch,
1339 required_fetch_builder: required_fetch_builder.take(),
1340 })
1341 }
1342 }
1343 }
1344
1345 #[expect(clippy::type_complexity)]
1346 fn try_set_required(
1347 required_fetch_builder: &mut Option<RequiredFetchBuilder<E::Key, E::Value, E::Properties, C>>,
1348 ctx: &mut C,
1349 id: usize,
1350 hash: u64,
1351 key: &E::Key,
1352 inflights: &Arc<Mutex<InflightManager<E, S, I>>>,
1353 res_no_fetch: Result<Option<RawCacheEntry<E, S, I>>>,
1354 ) -> Try<E, S, I, C> {
1355 match required_fetch_builder.take() {
1357 None => {}
1358 Some(required_fetch_builder) => {
1359 let required_fetch = required_fetch_builder(ctx);
1360 return Try::SetStateAndContinue(RawFetchState::FetchRequired { required_fetch });
1361 }
1362 }
1363 let fetch_or_take = match inflights.lock().fetch_or_take(hash, key, id) {
1365 Some(fetch_or_take) => fetch_or_take,
1366 None => return Try::Ready,
1367 };
1368 match fetch_or_take {
1369 FetchOrTake::Fetch(required_fetch_builder) => {
1370 let required_fetch = required_fetch_builder(ctx);
1371 Try::SetStateAndContinue(RawFetchState::FetchRequired { required_fetch })
1372 }
1373 FetchOrTake::Notifiers(notifiers) => Try::SetStateAndContinue(RawFetchState::Notify {
1374 res: Some(res_no_fetch),
1375 notifiers,
1376 }),
1377 }
1378 }
1379
1380 fn handle_target(
1381 target: FetchTarget<E::Key, E::Value, E::Properties>,
1382 key: &mut Once<E::Key>,
1383 cache: &RawCache<E, S, I>,
1384 source: Source,
1385 ) -> Try<E, S, I, C> {
1386 match target {
1387 FetchTarget::Entry { value, properties } => {
1388 let key = key.take().unwrap();
1389 cache.insert_with_properties_inner(key, value, properties, source);
1390 }
1391 FetchTarget::Piece(piece) => {
1392 cache.insert_piece(piece);
1393 }
1394 }
1395 Try::Ready
1396 }
1397
1398 fn handle_error(
1399 e: Error,
1400 id: usize,
1401 hash: u64,
1402 key: &E::Key,
1403 inflights: &Arc<Mutex<InflightManager<E, S, I>>>,
1404 ) -> Try<E, S, I, C> {
1405 let notifiers = match inflights.lock().take(hash, key, Some(id)) {
1406 Some(notifiers) => notifiers,
1407 None => {
1408 return Try::Ready;
1409 }
1410 };
1411 Try::SetStateAndContinue(RawFetchState::Notify {
1412 res: Some(Err(e)),
1413 notifiers,
1414 })
1415 }
1416
1417 #[expect(clippy::type_complexity)]
1418 fn handle_notify(
1419 res: Result<Option<RawCacheEntry<E, S, I>>>,
1420 notifiers: &mut Vec<Notifier<Option<RawCacheEntry<E, S, I>>>>,
1421 ) -> Try<E, S, I, C> {
1422 match res {
1423 Ok(e) => {
1424 for notifier in notifiers.drain(..) {
1425 let _ = notifier.send(Ok(e.clone()));
1426 }
1427 }
1428 Err(e) => {
1429 for notifier in notifiers.drain(..) {
1430 let _ = notifier.send(Err(e.clone()));
1431 }
1432 }
1433 }
1434 Try::Ready
1435 }
1436}
1437
1438#[pinned_drop]
1439impl<E, S, I, C> PinnedDrop for RawFetch<E, S, I, C>
1440where
1441 E: Eviction,
1442 S: HashBuilder,
1443 I: Indexer<Eviction = E>,
1444{
1445 fn drop(self: Pin<&mut Self>) {
1446 let this = self.project();
1447 match this.state {
1448 RawFetchState::Notify { .. } | RawFetchState::Ready => return,
1449 RawFetchState::Init { .. } | RawFetchState::FetchOptional { .. } | RawFetchState::FetchRequired { .. } => {}
1450 }
1451 if let Some(notifiers) = this
1452 .inflights
1453 .lock()
1454 .take(*this.hash, this.key.as_ref().unwrap(), Some(*this.id))
1455 {
1456 for notifier in notifiers {
1457 let _ =
1458 notifier
1459 .send(Err(Error::new(ErrorKind::TaskCancelled, "fetch task cancelled")
1460 .with_context("hash", *this.hash)));
1461 }
1462 }
1463 }
1464}
1465
1466#[cfg(test)]
1467mod tests {
1468 use foyer_common::hasher::ModHasher;
1469 use rand::{rngs::SmallRng, seq::IndexedRandom, RngCore, SeedableRng};
1470
1471 use super::*;
1472 use crate::{
1473 eviction::{
1474 fifo::{Fifo, FifoConfig},
1475 lfu::{Lfu, LfuConfig},
1476 lru::{Lru, LruConfig},
1477 s3fifo::{S3Fifo, S3FifoConfig},
1478 sieve::{Sieve, SieveConfig},
1479 test_utils::TestProperties,
1480 },
1481 indexer::hash_table::HashTableIndexer,
1482 test_utils::PiecePipe,
1483 };
1484
1485 fn is_send_sync_static<T: Send + Sync + 'static>() {}
1486
1487 #[test]
1488 fn test_send_sync_static() {
1489 is_send_sync_static::<RawCache<Fifo<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1490 is_send_sync_static::<RawCache<S3Fifo<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1491 is_send_sync_static::<RawCache<Lfu<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1492 is_send_sync_static::<RawCache<Lru<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1493 is_send_sync_static::<RawCache<Sieve<(), (), TestProperties>, ModHasher, HashTableIndexer<_>>>();
1494 }
1495
1496 #[expect(clippy::type_complexity)]
1497 fn fifo_cache_for_test(
1498 ) -> RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Fifo<u64, u64, TestProperties>>> {
1499 RawCache::new(RawCacheConfig {
1500 capacity: 256,
1501 shards: 4,
1502 eviction_config: FifoConfig::default(),
1503 hash_builder: Default::default(),
1504 weighter: Arc::new(|_, _| 1),
1505 filter: Arc::new(|_, _| true),
1506 event_listener: None,
1507 metrics: Arc::new(Metrics::noop()),
1508 })
1509 }
1510
1511 #[expect(clippy::type_complexity)]
1512 fn s3fifo_cache_for_test(
1513 ) -> RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<S3Fifo<u64, u64, TestProperties>>> {
1514 RawCache::new(RawCacheConfig {
1515 capacity: 256,
1516 shards: 4,
1517 eviction_config: S3FifoConfig::default(),
1518 hash_builder: Default::default(),
1519 weighter: Arc::new(|_, _| 1),
1520 filter: Arc::new(|_, _| true),
1521 event_listener: None,
1522 metrics: Arc::new(Metrics::noop()),
1523 })
1524 }
1525
1526 #[expect(clippy::type_complexity)]
1527 fn lru_cache_for_test(
1528 ) -> RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lru<u64, u64, TestProperties>>> {
1529 RawCache::new(RawCacheConfig {
1530 capacity: 256,
1531 shards: 4,
1532 eviction_config: LruConfig::default(),
1533 hash_builder: Default::default(),
1534 weighter: Arc::new(|_, _| 1),
1535 filter: Arc::new(|_, _| true),
1536 event_listener: None,
1537 metrics: Arc::new(Metrics::noop()),
1538 })
1539 }
1540
1541 #[expect(clippy::type_complexity)]
1542 fn lfu_cache_for_test(
1543 ) -> RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lfu<u64, u64, TestProperties>>> {
1544 RawCache::new(RawCacheConfig {
1545 capacity: 256,
1546 shards: 4,
1547 eviction_config: LfuConfig::default(),
1548 hash_builder: Default::default(),
1549 weighter: Arc::new(|_, _| 1),
1550 filter: Arc::new(|_, _| true),
1551 event_listener: None,
1552 metrics: Arc::new(Metrics::noop()),
1553 })
1554 }
1555
1556 #[expect(clippy::type_complexity)]
1557 fn sieve_cache_for_test(
1558 ) -> RawCache<Sieve<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Sieve<u64, u64, TestProperties>>> {
1559 RawCache::new(RawCacheConfig {
1560 capacity: 256,
1561 shards: 4,
1562 eviction_config: SieveConfig {},
1563 hash_builder: Default::default(),
1564 weighter: Arc::new(|_, _| 1),
1565 filter: Arc::new(|_, _| true),
1566 event_listener: None,
1567 metrics: Arc::new(Metrics::noop()),
1568 })
1569 }
1570
1571 #[test_log::test]
1572 fn test_insert_phantom() {
1573 let fifo = fifo_cache_for_test();
1574
1575 let e1 = fifo.insert_with_properties(1, 1, TestProperties::default().with_phantom(true));
1576 assert_eq!(fifo.usage(), 0);
1577 drop(e1);
1578 assert_eq!(fifo.usage(), 0);
1579
1580 let e2a = fifo.insert_with_properties(2, 2, TestProperties::default().with_phantom(true));
1581 assert_eq!(fifo.usage(), 0);
1582 assert!(fifo.get(&2).is_none());
1583 assert_eq!(fifo.usage(), 0);
1584 drop(e2a);
1585 assert_eq!(fifo.usage(), 0);
1586
1587 let fifo = fifo_cache_for_test();
1588 fifo.insert(1, 1);
1589 assert_eq!(fifo.usage(), 1);
1590 assert_eq!(fifo.get(&1).unwrap().value(), &1);
1591 let e2 = fifo.insert_with_properties(1, 100, TestProperties::default().with_phantom(true));
1592 assert_eq!(fifo.usage(), 0);
1593 drop(e2);
1594 assert_eq!(fifo.usage(), 0);
1595 assert!(fifo.get(&1).is_none());
1596 }
1597
1598 #[expect(clippy::type_complexity)]
1599 #[test_log::test]
1600 fn test_insert_filter() {
1601 let fifo: RawCache<
1602 Fifo<u64, u64, TestProperties>,
1603 ModHasher,
1604 HashTableIndexer<Fifo<u64, u64, TestProperties>>,
1605 > = RawCache::new(RawCacheConfig {
1606 capacity: 256,
1607 shards: 4,
1608 eviction_config: FifoConfig::default(),
1609 hash_builder: Default::default(),
1610 weighter: Arc::new(|_, _| 1),
1611 filter: Arc::new(|k, _| !matches!(*k, 42)),
1612 event_listener: None,
1613 metrics: Arc::new(Metrics::noop()),
1614 });
1615
1616 fifo.insert(1, 1);
1617 fifo.insert(2, 2);
1618 fifo.insert(42, 42);
1619 assert_eq!(fifo.usage(), 2);
1620 assert_eq!(fifo.get(&1).unwrap().value(), &1);
1621 assert_eq!(fifo.get(&2).unwrap().value(), &2);
1622 assert!(fifo.get(&42).is_none());
1623 }
1624
1625 #[test]
1626 fn test_evict_all() {
1627 let pipe = Box::new(PiecePipe::default());
1628
1629 let fifo = fifo_cache_for_test();
1630 fifo.set_pipe(pipe.clone());
1631 for i in 0..fifo.capacity() as _ {
1632 fifo.insert(i, i);
1633 }
1634 assert_eq!(fifo.usage(), fifo.capacity());
1635
1636 fifo.evict_all();
1637 let mut pieces = pipe
1638 .pieces()
1639 .iter()
1640 .map(|p| (p.hash(), *p.key(), *p.value()))
1641 .collect_vec();
1642 pieces.sort_by_key(|t| t.0);
1643 let expected = (0..fifo.capacity() as u64).map(|i| (i, i, i)).collect_vec();
1644 assert_eq!(pieces, expected);
1645 }
1646
1647 #[test]
1648 fn test_insert_size_over_capacity() {
1649 let cache: RawCache<Fifo<Vec<u8>, Vec<u8>, TestProperties>, ModHasher, HashTableIndexer<_>> =
1650 RawCache::new(RawCacheConfig {
1651 capacity: 4 * 1024, shards: 1,
1653 eviction_config: FifoConfig::default(),
1654 hash_builder: Default::default(),
1655 weighter: Arc::new(|k, v| k.len() + v.len()),
1656 filter: Arc::new(|_, _| true),
1657 event_listener: None,
1658 metrics: Arc::new(Metrics::noop()),
1659 });
1660
1661 let key = vec![b'k'; 1024]; let value = vec![b'v'; 5 * 1024]; cache.insert(key.clone(), value.clone());
1665 assert_eq!(cache.usage(), 6 * 1024);
1666 assert_eq!(cache.get(&key).unwrap().value(), &value);
1667 }
1668
1669 #[test]
1670 fn test_capacity_distribution_without_loss() {
1671 let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1672 RawCache::new(RawCacheConfig {
1673 capacity: 3,
1674 shards: 2,
1675 eviction_config: FifoConfig::default(),
1676 hash_builder: Default::default(),
1677 weighter: Arc::new(|_, _| 1),
1678 filter: Arc::new(|_, _| true),
1679 event_listener: None,
1680 metrics: Arc::new(Metrics::noop()),
1681 });
1682
1683 for key in 0..3 {
1684 let entry = cache.insert(key, key);
1685 drop(entry);
1686 }
1687
1688 assert_eq!(cache.usage(), 3);
1689
1690 for key in 0..3 {
1691 let entry = cache.get(&key).expect("entry should exist");
1692 assert_eq!(*entry, key);
1693 drop(entry);
1694 }
1695 }
1696
1697 #[test]
1698 fn test_capacity_distribution_with_more_shards_than_capacity() {
1699 let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1700 RawCache::new(RawCacheConfig {
1701 capacity: 2,
1702 shards: 4,
1703 eviction_config: FifoConfig::default(),
1704 hash_builder: Default::default(),
1705 weighter: Arc::new(|_, _| 1),
1706 filter: Arc::new(|_, _| true),
1707 event_listener: None,
1708 metrics: Arc::new(Metrics::noop()),
1709 });
1710
1711 for key in 0..2 {
1712 let entry = cache.insert(key, key);
1713 drop(entry);
1714 }
1715
1716 assert_eq!(cache.usage(), 2);
1717
1718 for key in 0..2 {
1719 let entry = cache.get(&key).expect("entry should exist");
1720 assert_eq!(*entry, key);
1721 drop(entry);
1722 }
1723
1724 assert!(cache.get(&2).is_none());
1725 }
1726
1727 fn test_resize<E>(cache: &RawCache<E, ModHasher, HashTableIndexer<E>>)
1728 where
1729 E: Eviction<Key = u64, Value = u64>,
1730 {
1731 let capacity = cache.capacity();
1732 for i in 0..capacity as u64 * 2 {
1733 cache.insert(i, i);
1734 }
1735 assert_eq!(cache.usage(), capacity);
1736 cache.resize(capacity / 2).unwrap();
1737 assert_eq!(cache.usage(), capacity / 2);
1738 for i in 0..capacity as u64 * 2 {
1739 cache.insert(i, i);
1740 }
1741 assert_eq!(cache.usage(), capacity / 2);
1742 }
1743
1744 #[test]
1745 fn test_fifo_cache_resize() {
1746 let cache = fifo_cache_for_test();
1747 test_resize(&cache);
1748 }
1749
1750 #[test]
1751 fn test_s3fifo_cache_resize() {
1752 let cache = s3fifo_cache_for_test();
1753 test_resize(&cache);
1754 }
1755
1756 #[test]
1757 fn test_lru_cache_resize() {
1758 let cache = lru_cache_for_test();
1759 test_resize(&cache);
1760 }
1761
1762 #[test]
1763 fn test_lfu_cache_resize() {
1764 let cache = lfu_cache_for_test();
1765 test_resize(&cache);
1766 }
1767
1768 #[test]
1769 fn test_sieve_cache_resize() {
1770 let cache = sieve_cache_for_test();
1771 test_resize(&cache);
1772 }
1773
1774 mod fuzzy {
1775 use foyer_common::properties::Hint;
1776
1777 use super::*;
1778
1779 fn fuzzy<E, S>(cache: RawCache<E, S, HashTableIndexer<E>>, hints: Vec<Hint>)
1780 where
1781 E: Eviction<Key = u64, Value = u64, Properties = TestProperties>,
1782 S: HashBuilder,
1783 {
1784 let handles = (0..8)
1785 .map(|i| {
1786 let c = cache.clone();
1787 let hints = hints.clone();
1788 std::thread::spawn(move || {
1789 let mut rng = SmallRng::seed_from_u64(i);
1790 for _ in 0..100000 {
1791 let key = rng.next_u64();
1792 if let Some(entry) = c.get(&key) {
1793 assert_eq!(key, *entry);
1794 drop(entry);
1795 continue;
1796 }
1797 let hint = hints.choose(&mut rng).cloned().unwrap();
1798 c.insert_with_properties(key, key, TestProperties::default().with_hint(hint));
1799 }
1800 })
1801 })
1802 .collect_vec();
1803
1804 handles.into_iter().for_each(|handle| handle.join().unwrap());
1805
1806 assert_eq!(cache.usage(), cache.capacity());
1807 }
1808
1809 #[test_log::test]
1810 fn test_fifo_cache_fuzzy() {
1811 let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1812 RawCache::new(RawCacheConfig {
1813 capacity: 256,
1814 shards: 4,
1815 eviction_config: FifoConfig::default(),
1816 hash_builder: Default::default(),
1817 weighter: Arc::new(|_, _| 1),
1818 filter: Arc::new(|_, _| true),
1819 event_listener: None,
1820 metrics: Arc::new(Metrics::noop()),
1821 });
1822 let hints = vec![Hint::Normal];
1823 fuzzy(cache, hints);
1824 }
1825
1826 #[test_log::test]
1827 fn test_s3fifo_cache_fuzzy() {
1828 let cache: RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1829 RawCache::new(RawCacheConfig {
1830 capacity: 256,
1831 shards: 4,
1832 eviction_config: S3FifoConfig::default(),
1833 hash_builder: Default::default(),
1834 weighter: Arc::new(|_, _| 1),
1835 filter: Arc::new(|_, _| true),
1836 event_listener: None,
1837 metrics: Arc::new(Metrics::noop()),
1838 });
1839 let hints = vec![Hint::Normal];
1840 fuzzy(cache, hints);
1841 }
1842
1843 #[test_log::test]
1844 fn test_lru_cache_fuzzy() {
1845 let cache: RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1846 RawCache::new(RawCacheConfig {
1847 capacity: 256,
1848 shards: 4,
1849 eviction_config: LruConfig::default(),
1850 hash_builder: Default::default(),
1851 weighter: Arc::new(|_, _| 1),
1852 filter: Arc::new(|_, _| true),
1853 event_listener: None,
1854 metrics: Arc::new(Metrics::noop()),
1855 });
1856 let hints = vec![Hint::Normal, Hint::Low];
1857 fuzzy(cache, hints);
1858 }
1859
1860 #[test_log::test]
1861 fn test_lfu_cache_fuzzy() {
1862 let cache: RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1863 RawCache::new(RawCacheConfig {
1864 capacity: 256,
1865 shards: 4,
1866 eviction_config: LfuConfig::default(),
1867 hash_builder: Default::default(),
1868 weighter: Arc::new(|_, _| 1),
1869 filter: Arc::new(|_, _| true),
1870 event_listener: None,
1871 metrics: Arc::new(Metrics::noop()),
1872 });
1873 let hints = vec![Hint::Normal];
1874 fuzzy(cache, hints);
1875 }
1876
1877 #[test_log::test]
1878 fn test_sieve_cache_fuzzy() {
1879 let cache: RawCache<Sieve<u64, u64, TestProperties>, ModHasher, HashTableIndexer<_>> =
1880 RawCache::new(RawCacheConfig {
1881 capacity: 256,
1882 shards: 4,
1883 eviction_config: SieveConfig {},
1884 hash_builder: Default::default(),
1885 weighter: Arc::new(|_, _| 1),
1886 filter: Arc::new(|_, _| true),
1887 event_listener: None,
1888 metrics: Arc::new(Metrics::noop()),
1889 });
1890 let hints = vec![Hint::Normal];
1891 fuzzy(cache, hints);
1892 }
1893 }
1894}