1use std::{
16 collections::hash_map::{Entry as HashMapEntry, HashMap},
17 fmt::Debug,
18 future::Future,
19 hash::Hash,
20 ops::Deref,
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24};
25
26use arc_swap::ArcSwap;
27use equivalent::Equivalent;
28#[cfg(feature = "tracing")]
29use fastrace::{
30 future::{FutureExt, InSpan},
31 Span,
32};
33use foyer_common::{
34 code::HashBuilder,
35 event::{Event, EventListener},
36 future::{Diversion, DiversionFuture},
37 metrics::Metrics,
38 properties::{Location, Properties, Source},
39 runtime::SingletonHandle,
40 strict_assert,
41 utils::scope::Scope,
42};
43use itertools::Itertools;
44use parking_lot::{Mutex, RwLock};
45use pin_project::pin_project;
46use tokio::{sync::oneshot, task::JoinHandle};
47
48use crate::{
49 error::{Error, Result},
50 eviction::{Eviction, Op},
51 indexer::{hash_table::HashTableIndexer, sentry::Sentry, Indexer},
52 pipe::NoopPipe,
53 record::{Data, Record},
54 Piece, Pipe,
55};
56
57pub trait Weighter<K, V>: Fn(&K, &V) -> usize + Send + Sync + 'static {}
61impl<K, V, T> Weighter<K, V> for T where T: Fn(&K, &V) -> usize + Send + Sync + 'static {}
62
63pub trait Filter<K, V>: Fn(&K, &V) -> bool + Send + Sync + 'static {}
74impl<K, V, T> Filter<K, V> for T where T: Fn(&K, &V) -> bool + Send + Sync + 'static {}
75
76pub struct RawCacheConfig<E, S>
77where
78 E: Eviction,
79 S: HashBuilder,
80{
81 pub capacity: usize,
82 pub shards: usize,
83 pub eviction_config: E::Config,
84 pub hash_builder: S,
85 pub weighter: Arc<dyn Weighter<E::Key, E::Value>>,
86 pub filter: Arc<dyn Filter<E::Key, E::Value>>,
87 pub event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
88 pub metrics: Arc<Metrics>,
89}
90
91struct RawCacheShard<E, S, I>
92where
93 E: Eviction,
94 S: HashBuilder,
95 I: Indexer<Eviction = E>,
96{
97 eviction: E,
98 indexer: Sentry<I>,
99
100 usage: usize,
101 capacity: usize,
102
103 #[expect(clippy::type_complexity)]
104 waiters: Mutex<HashMap<E::Key, Vec<oneshot::Sender<RawCacheEntry<E, S, I>>>>>,
105
106 metrics: Arc<Metrics>,
107 _event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
108}
109
110impl<E, S, I> RawCacheShard<E, S, I>
111where
112 E: Eviction,
113 S: HashBuilder,
114 I: Indexer<Eviction = E>,
115{
116 fn evict(&mut self, target: usize, garbages: &mut Vec<(Event, Arc<Record<E>>)>) {
118 while self.usage > target {
120 let evicted = match self.eviction.pop() {
121 Some(evicted) => evicted,
122 None => break,
123 };
124 self.metrics.memory_evict.increase(1);
125
126 let e = self.indexer.remove(evicted.hash(), evicted.key()).unwrap();
127 assert_eq!(Arc::as_ptr(&evicted), Arc::as_ptr(&e));
128
129 strict_assert!(!evicted.as_ref().is_in_indexer());
130 strict_assert!(!evicted.as_ref().is_in_eviction());
131
132 self.usage -= evicted.weight();
133
134 garbages.push((Event::Evict, evicted));
135 }
136 }
137
138 fn emplace(
139 &mut self,
140 record: Arc<Record<E>>,
141 garbages: &mut Vec<(Event, Arc<Record<E>>)>,
142 waiters: &mut Vec<oneshot::Sender<RawCacheEntry<E, S, I>>>,
143 ) {
144 *waiters = self.waiters.lock().remove(record.key()).unwrap_or_default();
145
146 if record.properties().phantom().unwrap_or_default() {
147 if let Some(old) = self.indexer.remove(record.hash(), record.key()) {
148 strict_assert!(!old.is_in_indexer());
149
150 if old.is_in_eviction() {
151 self.eviction.remove(&old);
152 }
153 strict_assert!(!old.is_in_eviction());
154
155 self.usage -= old.weight();
156
157 garbages.push((Event::Replace, old));
158 }
159 record.inc_refs(waiters.len() + 1);
160 garbages.push((Event::Remove, record));
161 self.metrics.memory_insert.increase(1);
162 return;
163 }
164
165 let weight = record.weight();
166 let old_usage = self.usage;
167
168 self.evict(self.capacity.saturating_sub(weight), garbages);
170
171 if let Some(old) = self.indexer.insert(record.clone()) {
173 self.metrics.memory_replace.increase(1);
174
175 strict_assert!(!old.is_in_indexer());
176
177 if old.is_in_eviction() {
178 self.eviction.remove(&old);
179 }
180 strict_assert!(!old.is_in_eviction());
181
182 self.usage -= old.weight();
183
184 garbages.push((Event::Replace, old));
185 } else {
186 self.metrics.memory_insert.increase(1);
187 }
188 strict_assert!(record.is_in_indexer());
189
190 strict_assert!(!record.is_in_eviction());
191 self.eviction.push(record.clone());
192 strict_assert!(record.is_in_eviction());
193
194 self.usage += weight;
195 record.inc_refs(waiters.len() + 1);
198
199 match self.usage.cmp(&old_usage) {
200 std::cmp::Ordering::Greater => self.metrics.memory_usage.increase((self.usage - old_usage) as _),
201 std::cmp::Ordering::Less => self.metrics.memory_usage.decrease((old_usage - self.usage) as _),
202 std::cmp::Ordering::Equal => {}
203 }
204 }
205
206 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::remove"))]
207 fn remove<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
208 where
209 Q: Hash + Equivalent<E::Key> + ?Sized,
210 {
211 let record = self.indexer.remove(hash, key)?;
212
213 if record.is_in_eviction() {
214 self.eviction.remove(&record);
215 }
216 strict_assert!(!record.is_in_indexer());
217 strict_assert!(!record.is_in_eviction());
218
219 self.usage -= record.weight();
220
221 self.metrics.memory_remove.increase(1);
222 self.metrics.memory_usage.decrease(record.weight() as _);
223
224 record.inc_refs(1);
225
226 Some(record)
227 }
228
229 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_noop"))]
230 fn get_noop<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
231 where
232 Q: Hash + Equivalent<E::Key> + ?Sized,
233 {
234 self.get_inner(hash, key)
235 }
236
237 #[cfg_attr(
238 feature = "tracing",
239 fastrace::trace(name = "foyer::memory::raw::shard::get_immutable")
240 )]
241 fn get_immutable<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
242 where
243 Q: Hash + Equivalent<E::Key> + ?Sized,
244 {
245 self.get_inner(hash, key)
246 .inspect(|record| self.acquire_immutable(record))
247 }
248
249 #[cfg_attr(
250 feature = "tracing",
251 fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")
252 )]
253 fn get_mutable<Q>(&mut self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
254 where
255 Q: Hash + Equivalent<E::Key> + ?Sized,
256 {
257 self.get_inner(hash, key).inspect(|record| self.acquire_mutable(record))
258 }
259
260 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::get_inner"))]
261 fn get_inner<Q>(&self, hash: u64, key: &Q) -> Option<Arc<Record<E>>>
262 where
263 Q: Hash + Equivalent<E::Key> + ?Sized,
264 {
265 let record = match self.indexer.get(hash, key).cloned() {
266 Some(record) => {
267 self.metrics.memory_hit.increase(1);
268 record
269 }
270 None => {
271 self.metrics.memory_miss.increase(1);
272 return None;
273 }
274 };
275
276 strict_assert!(record.is_in_indexer());
277
278 record.inc_refs(1);
279
280 Some(record)
281 }
282
283 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::clear"))]
284 fn clear(&mut self, garbages: &mut Vec<Arc<Record<E>>>) {
285 let records = self.indexer.drain().collect_vec();
286 self.eviction.clear();
287
288 let mut count = 0;
289
290 for record in records {
291 count += 1;
292 strict_assert!(!record.is_in_indexer());
293 strict_assert!(!record.is_in_eviction());
294
295 garbages.push(record);
296 }
297
298 self.metrics.memory_remove.increase(count);
299 }
300
301 #[cfg_attr(
302 feature = "tracing",
303 fastrace::trace(name = "foyer::memory::raw::shard::acquire_immutable")
304 )]
305 fn acquire_immutable(&self, record: &Arc<Record<E>>) {
306 match E::acquire() {
307 Op::Immutable(f) => f(&self.eviction, record),
308 _ => unreachable!(),
309 }
310 }
311
312 #[cfg_attr(
313 feature = "tracing",
314 fastrace::trace(name = "foyer::memory::raw::shard::acquire_mutable")
315 )]
316 fn acquire_mutable(&mut self, record: &Arc<Record<E>>) {
317 match E::acquire() {
318 Op::Mutable(mut f) => f(&mut self.eviction, record),
319 _ => unreachable!(),
320 }
321 }
322
323 #[cfg_attr(
324 feature = "tracing",
325 fastrace::trace(name = "foyer::memory::raw::shard::release_immutable")
326 )]
327 fn release_immutable(&self, record: &Arc<Record<E>>) {
328 match E::release() {
329 Op::Immutable(f) => f(&self.eviction, record),
330 _ => unreachable!(),
331 }
332 }
333
334 #[cfg_attr(
335 feature = "tracing",
336 fastrace::trace(name = "foyer::memory::raw::shard::release_mutable")
337 )]
338 fn release_mutable(&mut self, record: &Arc<Record<E>>) {
339 match E::release() {
340 Op::Mutable(mut f) => f(&mut self.eviction, record),
341 _ => unreachable!(),
342 }
343 }
344
345 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::shard::fetch_noop"))]
346 fn fetch_noop(&self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
347 where
348 E::Key: Clone,
349 {
350 if let Some(record) = self.get_noop(hash, key) {
351 return RawShardFetch::Hit(record);
352 }
353
354 self.fetch_queue(key.clone())
355 }
356
357 #[cfg_attr(
358 feature = "tracing",
359 fastrace::trace(name = "foyer::memory::raw::shard::fetch_immutable")
360 )]
361 fn fetch_immutable(&self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
362 where
363 E::Key: Clone,
364 {
365 if let Some(record) = self.get_immutable(hash, key) {
366 return RawShardFetch::Hit(record);
367 }
368
369 self.fetch_queue(key.clone())
370 }
371
372 #[cfg_attr(
373 feature = "tracing",
374 fastrace::trace(name = "foyer::memory::raw::shard::fetch_mutable")
375 )]
376 fn fetch_mutable(&mut self, hash: u64, key: &E::Key) -> RawShardFetch<E, S, I>
377 where
378 E::Key: Clone,
379 {
380 if let Some(record) = self.get_mutable(hash, key) {
381 return RawShardFetch::Hit(record);
382 }
383
384 self.fetch_queue(key.clone())
385 }
386
387 #[cfg_attr(
388 feature = "tracing",
389 fastrace::trace(name = "foyer::memory::raw::shard::fetch_queue")
390 )]
391 fn fetch_queue(&self, key: E::Key) -> RawShardFetch<E, S, I> {
392 match self.waiters.lock().entry(key) {
393 HashMapEntry::Occupied(mut o) => {
394 let (tx, rx) = oneshot::channel();
395 o.get_mut().push(tx);
396 self.metrics.memory_queue.increase(1);
397 #[cfg(feature = "tracing")]
398 let wait = rx.in_span(Span::enter_with_local_parent(
399 "foyer::memory::raw::fetch_with_runtime::wait",
400 ));
401 #[cfg(not(feature = "tracing"))]
402 let wait = rx;
403 RawShardFetch::Wait(wait)
404 }
405 HashMapEntry::Vacant(v) => {
406 v.insert(vec![]);
407 self.metrics.memory_fetch.increase(1);
408 RawShardFetch::Miss
409 }
410 }
411 }
412}
413
414#[expect(clippy::type_complexity)]
415struct RawCacheInner<E, S, I>
416where
417 E: Eviction,
418 S: HashBuilder,
419 I: Indexer<Eviction = E>,
420{
421 shards: Vec<RwLock<RawCacheShard<E, S, I>>>,
422
423 capacity: usize,
424
425 hash_builder: Arc<S>,
426 weighter: Arc<dyn Weighter<E::Key, E::Value>>,
427 filter: Arc<dyn Filter<E::Key, E::Value>>,
428
429 metrics: Arc<Metrics>,
430 event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
431 pipe: ArcSwap<Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>>>,
432}
433
434impl<E, S, I> RawCacheInner<E, S, I>
435where
436 E: Eviction,
437 S: HashBuilder,
438 I: Indexer<Eviction = E>,
439{
440 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::inner::clear"))]
441 fn clear(&self) {
442 let mut garbages = vec![];
443
444 self.shards
445 .iter()
446 .map(|shard| shard.write())
447 .for_each(|mut shard| shard.clear(&mut garbages));
448
449 if let Some(listener) = self.event_listener.as_ref() {
451 for record in garbages {
452 listener.on_leave(Event::Clear, record.key(), record.value());
453 }
454 }
455 }
456}
457
458pub struct RawCache<E, S, I = HashTableIndexer<E>>
459where
460 E: Eviction,
461 S: HashBuilder,
462 I: Indexer<Eviction = E>,
463{
464 inner: Arc<RawCacheInner<E, S, I>>,
465}
466
467impl<E, S, I> Drop for RawCacheInner<E, S, I>
468where
469 E: Eviction,
470 S: HashBuilder,
471 I: Indexer<Eviction = E>,
472{
473 fn drop(&mut self) {
474 self.clear();
475 }
476}
477
478impl<E, S, I> Clone for RawCache<E, S, I>
479where
480 E: Eviction,
481 S: HashBuilder,
482 I: Indexer<Eviction = E>,
483{
484 fn clone(&self) -> Self {
485 Self {
486 inner: self.inner.clone(),
487 }
488 }
489}
490
491impl<E, S, I> RawCache<E, S, I>
492where
493 E: Eviction,
494 S: HashBuilder,
495 I: Indexer<Eviction = E>,
496{
497 pub fn new(config: RawCacheConfig<E, S>) -> Self {
498 let shard_capacity = config.capacity / config.shards;
499
500 let shards = (0..config.shards)
501 .map(|_| RawCacheShard {
502 eviction: E::new(shard_capacity, &config.eviction_config),
503 indexer: Sentry::default(),
504 usage: 0,
505 capacity: shard_capacity,
506 waiters: Mutex::default(),
507 metrics: config.metrics.clone(),
508 _event_listener: config.event_listener.clone(),
509 })
510 .map(RwLock::new)
511 .collect_vec();
512
513 let pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>> =
514 Box::new(NoopPipe::default());
515
516 let inner = RawCacheInner {
517 shards,
518 capacity: config.capacity,
519 hash_builder: Arc::new(config.hash_builder),
520 weighter: config.weighter,
521 filter: config.filter,
522 metrics: config.metrics,
523 event_listener: config.event_listener,
524 pipe: ArcSwap::new(Arc::new(pipe)),
525 };
526
527 Self { inner: Arc::new(inner) }
528 }
529
530 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::resize"))]
531 pub fn resize(&self, capacity: usize) -> Result<()> {
532 let shards = self.inner.shards.len();
533 let shard_capacity = capacity / shards;
534
535 let handles = (0..shards)
536 .map(|i| {
537 let inner = self.inner.clone();
538 std::thread::spawn(move || {
539 let mut garbages = vec![];
540 let res = inner.shards[i].write().with(|mut shard| {
541 shard.eviction.update(shard_capacity, None).inspect(|_| {
542 shard.capacity = shard_capacity;
543 shard.evict(shard_capacity, &mut garbages)
544 })
545 });
546 let pipe = inner.pipe.load();
548 let piped = pipe.is_enabled();
549 if inner.event_listener.is_some() || piped {
550 for (event, record) in garbages {
551 if let Some(listener) = inner.event_listener.as_ref() {
552 listener.on_leave(event, record.key(), record.value())
553 }
554 if piped && event == Event::Evict {
555 pipe.send(Piece::new(record));
556 }
557 }
558 }
559 res
560 })
561 })
562 .collect_vec();
563
564 let errs = handles
565 .into_iter()
566 .map(|handle| handle.join().unwrap())
567 .filter(|res| res.is_err())
568 .map(|res| res.unwrap_err())
569 .collect_vec();
570 if !errs.is_empty() {
571 return Err(Error::multiple(errs));
572 }
573
574 Ok(())
575 }
576
577 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert"))]
578 pub fn insert(&self, key: E::Key, value: E::Value) -> RawCacheEntry<E, S, I> {
579 self.insert_with_properties(key, value, Default::default())
580 }
581
582 #[cfg_attr(
583 feature = "tracing",
584 fastrace::trace(name = "foyer::memory::raw::insert_with_properties")
585 )]
586 pub fn insert_with_properties(
587 &self,
588 key: E::Key,
589 value: E::Value,
590 mut properties: E::Properties,
591 ) -> RawCacheEntry<E, S, I> {
592 let hash = self.inner.hash_builder.hash_one(&key);
593 let weight = (self.inner.weighter)(&key, &value);
594 if !(self.inner.filter)(&key, &value) {
595 properties = properties.with_phantom(true);
596 }
597 if let Some(location) = properties.location() {
598 if location == Location::OnDisk {
599 properties = properties.with_phantom(true);
600 }
601 }
602 let record = Arc::new(Record::new(Data {
603 key,
604 value,
605 properties,
606 hash,
607 weight,
608 }));
609 self.insert_inner(record)
610 }
611
612 #[doc(hidden)]
613 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_piece"))]
614 pub fn insert_piece(&self, piece: Piece<E::Key, E::Value, E::Properties>) -> RawCacheEntry<E, S, I> {
615 self.insert_inner(piece.into_record())
616 }
617
618 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::insert_inner"))]
619 fn insert_inner(&self, record: Arc<Record<E>>) -> RawCacheEntry<E, S, I> {
620 let mut garbages = vec![];
621 let mut waiters = vec![];
622
623 self.inner.shards[self.shard(record.hash())]
624 .write()
625 .with(|mut shard| shard.emplace(record.clone(), &mut garbages, &mut waiters));
626
627 for waiter in waiters {
629 let _ = waiter.send(RawCacheEntry {
630 record: record.clone(),
631 inner: self.inner.clone(),
632 });
633 }
634
635 let pipe = self.inner.pipe.load();
637 let piped = pipe.is_enabled();
638 if self.inner.event_listener.is_some() || piped {
639 for (event, record) in garbages {
640 if let Some(listener) = self.inner.event_listener.as_ref() {
641 listener.on_leave(event, record.key(), record.value())
642 }
643 if piped && event == Event::Evict {
644 pipe.send(Piece::new(record));
645 }
646 }
647 }
648
649 RawCacheEntry {
650 record,
651 inner: self.inner.clone(),
652 }
653 }
654
655 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::evict_all"))]
657 pub fn evict_all(&self) {
658 let mut garbages = vec![];
659 for shard in self.inner.shards.iter() {
660 shard.write().evict(0, &mut garbages);
661 }
662
663 let pipe = self.inner.pipe.load();
665 let piped = pipe.is_enabled();
666 if self.inner.event_listener.is_some() || piped {
667 for (event, record) in garbages {
668 if let Some(listener) = self.inner.event_listener.as_ref() {
669 listener.on_leave(event, record.key(), record.value())
670 }
671 if piped && event == Event::Evict {
672 pipe.send(Piece::new(record));
673 }
674 }
675 }
676 }
677
678 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::flush"))]
683 pub async fn flush(&self) {
684 let mut garbages = vec![];
685 for shard in self.inner.shards.iter() {
686 shard.write().evict(0, &mut garbages);
687 }
688
689 let pipe = self.inner.pipe.load();
691 let piped = pipe.is_enabled();
692
693 if let Some(listener) = self.inner.event_listener.as_ref() {
694 for (event, record) in garbages.iter() {
695 listener.on_leave(*event, record.key(), record.value());
696 }
697 }
698 if piped {
699 let pieces = garbages.into_iter().map(|(_, record)| Piece::new(record)).collect_vec();
700 pipe.flush(pieces).await;
701 }
702 }
703
704 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::remove"))]
705 pub fn remove<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
706 where
707 Q: Hash + Equivalent<E::Key> + ?Sized,
708 {
709 let hash = self.inner.hash_builder.hash_one(key);
710
711 self.inner.shards[self.shard(hash)]
712 .write()
713 .with(|mut shard| {
714 shard.remove(hash, key).map(|record| RawCacheEntry {
715 inner: self.inner.clone(),
716 record,
717 })
718 })
719 .inspect(|record| {
720 if let Some(listener) = self.inner.event_listener.as_ref() {
722 listener.on_leave(Event::Remove, record.key(), record.value());
723 }
724 })
725 }
726
727 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::get"))]
728 pub fn get<Q>(&self, key: &Q) -> Option<RawCacheEntry<E, S, I>>
729 where
730 Q: Hash + Equivalent<E::Key> + ?Sized,
731 {
732 let hash = self.inner.hash_builder.hash_one(key);
733
734 let record = match E::acquire() {
735 Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
736 Op::Immutable(_) => self.inner.shards[self.shard(hash)]
737 .read()
738 .with(|shard| shard.get_immutable(hash, key)),
739 Op::Mutable(_) => self.inner.shards[self.shard(hash)]
740 .write()
741 .with(|mut shard| shard.get_mutable(hash, key)),
742 }?;
743
744 Some(RawCacheEntry {
745 inner: self.inner.clone(),
746 record,
747 })
748 }
749
750 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::contains"))]
751 pub fn contains<Q>(&self, key: &Q) -> bool
752 where
753 Q: Hash + Equivalent<E::Key> + ?Sized,
754 {
755 let hash = self.inner.hash_builder.hash_one(key);
756
757 self.inner.shards[self.shard(hash)]
758 .read()
759 .with(|shard| shard.indexer.get(hash, key).is_some())
760 }
761
762 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::touch"))]
763 pub fn touch<Q>(&self, key: &Q) -> bool
764 where
765 Q: Hash + Equivalent<E::Key> + ?Sized,
766 {
767 let hash = self.inner.hash_builder.hash_one(key);
768
769 match E::acquire() {
770 Op::Noop => self.inner.shards[self.shard(hash)].read().get_noop(hash, key),
771 Op::Immutable(_) => self.inner.shards[self.shard(hash)]
772 .read()
773 .with(|shard| shard.get_immutable(hash, key)),
774 Op::Mutable(_) => self.inner.shards[self.shard(hash)]
775 .write()
776 .with(|mut shard| shard.get_mutable(hash, key)),
777 }
778 .is_some()
779 }
780
781 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::clear"))]
782 pub fn clear(&self) {
783 self.inner.clear();
784 }
785
786 pub fn capacity(&self) -> usize {
787 self.inner.capacity
788 }
789
790 pub fn usage(&self) -> usize {
791 self.inner.shards.iter().map(|shard| shard.read().usage).sum()
792 }
793
794 pub fn metrics(&self) -> &Metrics {
795 &self.inner.metrics
796 }
797
798 pub fn hash_builder(&self) -> &Arc<S> {
799 &self.inner.hash_builder
800 }
801
802 pub fn shards(&self) -> usize {
803 self.inner.shards.len()
804 }
805
806 pub fn set_pipe(&self, pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value, Properties = E::Properties>>) {
807 self.inner.pipe.store(Arc::new(pipe));
808 }
809
810 fn shard(&self, hash: u64) -> usize {
811 hash as usize % self.inner.shards.len()
812 }
813}
814
815pub struct RawCacheEntry<E, S, I = HashTableIndexer<E>>
816where
817 E: Eviction,
818 S: HashBuilder,
819 I: Indexer<Eviction = E>,
820{
821 inner: Arc<RawCacheInner<E, S, I>>,
822 record: Arc<Record<E>>,
823}
824
825impl<E, S, I> Debug for RawCacheEntry<E, S, I>
826where
827 E: Eviction,
828 S: HashBuilder,
829 I: Indexer<Eviction = E>,
830{
831 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
832 f.debug_struct("RawCacheEntry").field("record", &self.record).finish()
833 }
834}
835
836impl<E, S, I> Drop for RawCacheEntry<E, S, I>
837where
838 E: Eviction,
839 S: HashBuilder,
840 I: Indexer<Eviction = E>,
841{
842 fn drop(&mut self) {
843 let hash = self.record.hash();
844 let shard = &self.inner.shards[hash as usize % self.inner.shards.len()];
845
846 if self.record.dec_refs(1) == 0 {
847 if self.record.properties().phantom().unwrap_or_default() {
848 if let Some(listener) = self.inner.event_listener.as_ref() {
849 listener.on_leave(Event::Evict, self.record.key(), self.record.value());
850 }
851 let pipe = self.inner.pipe.load();
852 if pipe.is_enabled() {
853 pipe.send(Piece::new(self.record.clone()));
854 }
855 return;
856 }
857
858 match E::release() {
859 Op::Noop => {}
860 Op::Immutable(_) => shard.read().with(|shard| shard.release_immutable(&self.record)),
861 Op::Mutable(_) => shard.write().with(|mut shard| shard.release_mutable(&self.record)),
862 }
863 }
864 }
865}
866
867impl<E, S, I> Clone for RawCacheEntry<E, S, I>
868where
869 E: Eviction,
870 S: HashBuilder,
871 I: Indexer<Eviction = E>,
872{
873 fn clone(&self) -> Self {
874 self.record.inc_refs(1);
875 Self {
876 inner: self.inner.clone(),
877 record: self.record.clone(),
878 }
879 }
880}
881
882impl<E, S, I> Deref for RawCacheEntry<E, S, I>
883where
884 E: Eviction,
885 S: HashBuilder,
886 I: Indexer<Eviction = E>,
887{
888 type Target = E::Value;
889
890 fn deref(&self) -> &Self::Target {
891 self.value()
892 }
893}
894
895unsafe impl<E, S, I> Send for RawCacheEntry<E, S, I>
896where
897 E: Eviction,
898 S: HashBuilder,
899 I: Indexer<Eviction = E>,
900{
901}
902
903unsafe impl<E, S, I> Sync for RawCacheEntry<E, S, I>
904where
905 E: Eviction,
906 S: HashBuilder,
907 I: Indexer<Eviction = E>,
908{
909}
910
911impl<E, S, I> RawCacheEntry<E, S, I>
912where
913 E: Eviction,
914 S: HashBuilder,
915 I: Indexer<Eviction = E>,
916{
917 pub fn hash(&self) -> u64 {
918 self.record.hash()
919 }
920
921 pub fn key(&self) -> &E::Key {
922 self.record.key()
923 }
924
925 pub fn value(&self) -> &E::Value {
926 self.record.value()
927 }
928
929 pub fn properties(&self) -> &E::Properties {
930 self.record.properties()
931 }
932
933 pub fn weight(&self) -> usize {
934 self.record.weight()
935 }
936
937 pub fn refs(&self) -> usize {
938 self.record.refs()
939 }
940
941 pub fn is_outdated(&self) -> bool {
942 !self.record.is_in_indexer()
943 }
944
945 pub fn piece(&self) -> Piece<E::Key, E::Value, E::Properties> {
946 Piece::new(self.record.clone())
947 }
948}
949
950#[derive(Debug, Clone, Copy, PartialEq, Eq)]
952pub enum FetchState {
953 Hit,
955 Wait,
957 Miss,
959}
960
961#[derive(Debug)]
963pub struct FetchContext {
964 pub throttled: bool,
966 pub source: Source,
968}
969
970enum RawShardFetch<E, S, I>
971where
972 E: Eviction,
973 S: HashBuilder,
974 I: Indexer<Eviction = E>,
975{
976 Hit(Arc<Record<E>>),
977 Wait(RawFetchWait<E, S, I>),
978 Miss,
979}
980
981pub type RawFetch<E, ER, S, I = HashTableIndexer<E>> =
982 DiversionFuture<RawFetchInner<E, ER, S, I>, std::result::Result<RawCacheEntry<E, S, I>, ER>, FetchContext>;
983
984type RawFetchHit<E, S, I> = Option<RawCacheEntry<E, S, I>>;
985#[cfg(feature = "tracing")]
986type RawFetchWait<E, S, I> = InSpan<oneshot::Receiver<RawCacheEntry<E, S, I>>>;
987#[cfg(not(feature = "tracing"))]
988type RawFetchWait<E, S, I> = oneshot::Receiver<RawCacheEntry<E, S, I>>;
989type RawFetchMiss<E, I, S, ER, DFS> = JoinHandle<Diversion<std::result::Result<RawCacheEntry<E, S, I>, ER>, DFS>>;
990
991pub enum FetchTarget<K, V, P> {
993 Value(V),
995 Piece(Piece<K, V, P>),
997}
998
999impl<K, V, P> Debug for FetchTarget<K, V, P> {
1000 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1001 f.debug_struct("FetchTarget").finish()
1002 }
1003}
1004
1005impl<K, V, P> From<V> for FetchTarget<K, V, P> {
1006 fn from(value: V) -> Self {
1007 Self::Value(value)
1008 }
1009}
1010
1011impl<K, V, P> From<Piece<K, V, P>> for FetchTarget<K, V, P> {
1012 fn from(piece: Piece<K, V, P>) -> Self {
1013 Self::Piece(piece)
1014 }
1015}
1016
1017#[pin_project(project = RawFetchInnerProj)]
1018pub enum RawFetchInner<E, ER, S, I>
1019where
1020 E: Eviction,
1021 S: HashBuilder,
1022 I: Indexer<Eviction = E>,
1023{
1024 Hit(RawFetchHit<E, S, I>),
1025 Wait(#[pin] RawFetchWait<E, S, I>),
1026 Miss(#[pin] RawFetchMiss<E, I, S, ER, FetchContext>),
1027}
1028
1029impl<E, ER, S, I> RawFetchInner<E, ER, S, I>
1030where
1031 E: Eviction,
1032 S: HashBuilder,
1033 I: Indexer<Eviction = E>,
1034{
1035 pub fn state(&self) -> FetchState {
1036 match self {
1037 RawFetchInner::Hit(_) => FetchState::Hit,
1038 RawFetchInner::Wait(_) => FetchState::Wait,
1039 RawFetchInner::Miss(_) => FetchState::Miss,
1040 }
1041 }
1042}
1043
1044impl<E, ER, S, I> Future for RawFetchInner<E, ER, S, I>
1045where
1046 E: Eviction,
1047 ER: From<Error>,
1048 S: HashBuilder,
1049 I: Indexer<Eviction = E>,
1050{
1051 type Output = Diversion<std::result::Result<RawCacheEntry<E, S, I>, ER>, FetchContext>;
1052
1053 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1054 match self.project() {
1055 RawFetchInnerProj::Hit(opt) => Poll::Ready(Ok(opt.take().unwrap()).into()),
1056 RawFetchInnerProj::Wait(waiter) => waiter.poll(cx).map_err(|e| Error::wait(e).into()).map(Diversion::from),
1057 RawFetchInnerProj::Miss(handle) => handle.poll(cx).map(|join| join.unwrap()),
1058 }
1059 }
1060}
1061
1062impl<E, S, I> RawCache<E, S, I>
1063where
1064 E: Eviction,
1065 S: HashBuilder,
1066 I: Indexer<Eviction = E>,
1067 E::Key: Clone,
1068{
1069 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::fetch"))]
1070 pub fn fetch<F, FU, ER>(&self, key: E::Key, fetch: F) -> RawFetch<E, ER, S, I>
1071 where
1072 F: FnOnce() -> FU,
1073 FU: Future<Output = std::result::Result<E::Value, ER>> + Send + 'static,
1074 ER: Send + 'static + Debug,
1075 {
1076 self.fetch_inner(
1077 key,
1078 Default::default(),
1079 fetch,
1080 &tokio::runtime::Handle::current().into(),
1081 )
1082 }
1083
1084 #[cfg_attr(
1085 feature = "tracing",
1086 fastrace::trace(name = "foyer::memory::raw::fetch_with_properties")
1087 )]
1088 pub fn fetch_with_properties<F, FU, ER, ID>(
1089 &self,
1090 key: E::Key,
1091 properties: E::Properties,
1092 fetch: F,
1093 ) -> RawFetch<E, ER, S, I>
1094 where
1095 F: FnOnce() -> FU,
1096 FU: Future<Output = ID> + Send + 'static,
1097 ER: Send + 'static + Debug,
1098 ID: Into<Diversion<std::result::Result<E::Value, ER>, FetchContext>>,
1099 {
1100 self.fetch_inner(key, properties, fetch, &tokio::runtime::Handle::current().into())
1101 }
1102
1103 #[doc(hidden)]
1107 #[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::memory::raw::fetch_inner"))]
1108 pub fn fetch_inner<F, FU, ER, ID, IT>(
1109 &self,
1110 key: E::Key,
1111 mut properties: E::Properties,
1112 fetch: F,
1113 runtime: &SingletonHandle,
1114 ) -> RawFetch<E, ER, S, I>
1115 where
1116 F: FnOnce() -> FU,
1117 FU: Future<Output = ID> + Send + 'static,
1118 ER: Send + 'static + Debug,
1119 ID: Into<Diversion<std::result::Result<IT, ER>, FetchContext>>,
1120 IT: Into<FetchTarget<E::Key, E::Value, E::Properties>>,
1121 {
1122 let hash = self.inner.hash_builder.hash_one(&key);
1123
1124 let raw = match E::acquire() {
1125 Op::Noop => self.inner.shards[self.shard(hash)].read().fetch_noop(hash, &key),
1126 Op::Immutable(_) => self.inner.shards[self.shard(hash)].read().fetch_immutable(hash, &key),
1127 Op::Mutable(_) => self.inner.shards[self.shard(hash)].write().fetch_mutable(hash, &key),
1128 };
1129
1130 match raw {
1131 RawShardFetch::Hit(record) => {
1132 tracing::trace!(hash, "fetch => Hit");
1133 return RawFetch::new(RawFetchInner::Hit(Some(RawCacheEntry {
1134 record,
1135 inner: self.inner.clone(),
1136 })));
1137 }
1138 RawShardFetch::Wait(future) => {
1139 tracing::trace!(hash, "fetch => Wait");
1140 return RawFetch::new(RawFetchInner::Wait(future));
1141 }
1142 RawShardFetch::Miss => {
1143 tracing::trace!(hash, "fetch => Miss");
1144 }
1145 }
1146
1147 let cache = self.clone();
1148 let future = fetch();
1149 let join = runtime.spawn({
1150 tracing::trace!(hash, "fetch => join !!!");
1151 let task = async move {
1152 #[cfg(feature = "tracing")]
1153 let Diversion { target, store } = future
1154 .in_span(Span::enter_with_local_parent("foyer::memory::raw::fetch_inner::fn"))
1155 .await
1156 .into();
1157 #[cfg(not(feature = "tracing"))]
1158 let Diversion { target, store } = future.await.into();
1159
1160 let target = match target {
1161 Ok(value) => value,
1162 Err(e) => {
1163 cache.inner.shards[cache.shard(hash)].read().waiters.lock().remove(&key);
1164 tracing::debug!("[fetch]: error raise while fetching, all waiter are dropped, err: {e:?}");
1165 return Diversion { target: Err(e), store };
1166 }
1167 };
1168 if let Some(ctx) = store.as_ref() {
1169 if ctx.throttled {
1170 properties = properties.with_location(Location::InMem)
1173 }
1174 properties = properties.with_source(ctx.source)
1175 };
1176 tracing::trace!(hash, "fetch => insert !!!");
1177 let entry = match target.into() {
1178 FetchTarget::Value(value) => cache.insert_with_properties(key, value, properties),
1179 FetchTarget::Piece(p) => cache.insert_inner(p.into_record::<E>()),
1180 };
1181 Diversion {
1182 target: Ok(entry),
1183 store,
1184 }
1185 };
1186 #[cfg(feature = "tracing")]
1187 let task = task.in_span(Span::enter_with_local_parent(
1188 "foyer::memory::generic::fetch_with_runtime::spawn",
1189 ));
1190 task
1191 });
1192
1193 RawFetch::new(RawFetchInner::Miss(join))
1194 }
1195}
1196
1197#[cfg(test)]
1198mod tests {
1199 use foyer_common::hasher::ModHasher;
1200 use rand::{rngs::SmallRng, seq::IndexedRandom, RngCore, SeedableRng};
1201
1202 use super::*;
1203 use crate::{
1204 eviction::{
1205 fifo::{Fifo, FifoConfig},
1206 lfu::{Lfu, LfuConfig},
1207 lru::{Lru, LruConfig},
1208 s3fifo::{S3Fifo, S3FifoConfig},
1209 sieve::{Sieve, SieveConfig},
1210 test_utils::TestProperties,
1211 },
1212 test_utils::PiecePipe,
1213 };
1214
1215 fn is_send_sync_static<T: Send + Sync + 'static>() {}
1216
1217 #[test]
1218 fn test_send_sync_static() {
1219 is_send_sync_static::<RawCache<Fifo<(), (), TestProperties>, ModHasher>>();
1220 is_send_sync_static::<RawCache<S3Fifo<(), (), TestProperties>, ModHasher>>();
1221 is_send_sync_static::<RawCache<Lfu<(), (), TestProperties>, ModHasher>>();
1222 is_send_sync_static::<RawCache<Lru<(), (), TestProperties>, ModHasher>>();
1223 is_send_sync_static::<RawCache<Sieve<(), (), TestProperties>, ModHasher>>();
1224 }
1225
1226 #[expect(clippy::type_complexity)]
1227 fn fifo_cache_for_test(
1228 ) -> RawCache<Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Fifo<u64, u64, TestProperties>>> {
1229 RawCache::new(RawCacheConfig {
1230 capacity: 256,
1231 shards: 4,
1232 eviction_config: FifoConfig::default(),
1233 hash_builder: Default::default(),
1234 weighter: Arc::new(|_, _| 1),
1235 filter: Arc::new(|_, _| true),
1236 event_listener: None,
1237 metrics: Arc::new(Metrics::noop()),
1238 })
1239 }
1240
1241 #[expect(clippy::type_complexity)]
1242 fn s3fifo_cache_for_test(
1243 ) -> RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher, HashTableIndexer<S3Fifo<u64, u64, TestProperties>>> {
1244 RawCache::new(RawCacheConfig {
1245 capacity: 256,
1246 shards: 4,
1247 eviction_config: S3FifoConfig::default(),
1248 hash_builder: Default::default(),
1249 weighter: Arc::new(|_, _| 1),
1250 filter: Arc::new(|_, _| true),
1251 event_listener: None,
1252 metrics: Arc::new(Metrics::noop()),
1253 })
1254 }
1255
1256 #[expect(clippy::type_complexity)]
1257 fn lru_cache_for_test(
1258 ) -> RawCache<Lru<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lru<u64, u64, TestProperties>>> {
1259 RawCache::new(RawCacheConfig {
1260 capacity: 256,
1261 shards: 4,
1262 eviction_config: LruConfig::default(),
1263 hash_builder: Default::default(),
1264 weighter: Arc::new(|_, _| 1),
1265 filter: Arc::new(|_, _| true),
1266 event_listener: None,
1267 metrics: Arc::new(Metrics::noop()),
1268 })
1269 }
1270
1271 #[expect(clippy::type_complexity)]
1272 fn lfu_cache_for_test(
1273 ) -> RawCache<Lfu<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Lfu<u64, u64, TestProperties>>> {
1274 RawCache::new(RawCacheConfig {
1275 capacity: 256,
1276 shards: 4,
1277 eviction_config: LfuConfig::default(),
1278 hash_builder: Default::default(),
1279 weighter: Arc::new(|_, _| 1),
1280 filter: Arc::new(|_, _| true),
1281 event_listener: None,
1282 metrics: Arc::new(Metrics::noop()),
1283 })
1284 }
1285
1286 #[expect(clippy::type_complexity)]
1287 fn sieve_cache_for_test(
1288 ) -> RawCache<Sieve<u64, u64, TestProperties>, ModHasher, HashTableIndexer<Sieve<u64, u64, TestProperties>>> {
1289 RawCache::new(RawCacheConfig {
1290 capacity: 256,
1291 shards: 4,
1292 eviction_config: SieveConfig {},
1293 hash_builder: Default::default(),
1294 weighter: Arc::new(|_, _| 1),
1295 filter: Arc::new(|_, _| true),
1296 event_listener: None,
1297 metrics: Arc::new(Metrics::noop()),
1298 })
1299 }
1300
1301 #[test_log::test]
1302 fn test_insert_phantom() {
1303 let fifo = fifo_cache_for_test();
1304
1305 let e1 = fifo.insert_with_properties(1, 1, TestProperties::default().with_phantom(true));
1306 assert_eq!(fifo.usage(), 0);
1307 drop(e1);
1308 assert_eq!(fifo.usage(), 0);
1309
1310 let e2a = fifo.insert_with_properties(2, 2, TestProperties::default().with_phantom(true));
1311 assert_eq!(fifo.usage(), 0);
1312 assert!(fifo.get(&2).is_none());
1313 assert_eq!(fifo.usage(), 0);
1314 drop(e2a);
1315 assert_eq!(fifo.usage(), 0);
1316
1317 let fifo = fifo_cache_for_test();
1318 fifo.insert(1, 1);
1319 assert_eq!(fifo.usage(), 1);
1320 assert_eq!(fifo.get(&1).unwrap().value(), &1);
1321 let e2 = fifo.insert_with_properties(1, 100, TestProperties::default().with_phantom(true));
1322 assert_eq!(fifo.usage(), 0);
1323 drop(e2);
1324 assert_eq!(fifo.usage(), 0);
1325 assert!(fifo.get(&1).is_none());
1326 }
1327
1328 #[expect(clippy::type_complexity)]
1329 #[test_log::test]
1330 fn test_insert_filter() {
1331 let fifo: RawCache<
1332 Fifo<u64, u64, TestProperties>,
1333 ModHasher,
1334 HashTableIndexer<Fifo<u64, u64, TestProperties>>,
1335 > = RawCache::new(RawCacheConfig {
1336 capacity: 256,
1337 shards: 4,
1338 eviction_config: FifoConfig::default(),
1339 hash_builder: Default::default(),
1340 weighter: Arc::new(|_, _| 1),
1341 filter: Arc::new(|k, _| !matches!(*k, 42)),
1342 event_listener: None,
1343 metrics: Arc::new(Metrics::noop()),
1344 });
1345
1346 fifo.insert(1, 1);
1347 fifo.insert(2, 2);
1348 fifo.insert(42, 42);
1349 assert_eq!(fifo.usage(), 2);
1350 assert_eq!(fifo.get(&1).unwrap().value(), &1);
1351 assert_eq!(fifo.get(&2).unwrap().value(), &2);
1352 assert!(fifo.get(&42).is_none());
1353 }
1354
1355 #[test]
1356 fn test_evict_all() {
1357 let pipe = Box::new(PiecePipe::default());
1358
1359 let fifo = fifo_cache_for_test();
1360 fifo.set_pipe(pipe.clone());
1361 for i in 0..fifo.capacity() as _ {
1362 fifo.insert(i, i);
1363 }
1364 assert_eq!(fifo.usage(), fifo.capacity());
1365
1366 fifo.evict_all();
1367 let mut pieces = pipe
1368 .pieces()
1369 .iter()
1370 .map(|p| (p.hash(), *p.key(), *p.value()))
1371 .collect_vec();
1372 pieces.sort_by_key(|t| t.0);
1373 let expected = (0..fifo.capacity() as u64).map(|i| (i, i, i)).collect_vec();
1374 assert_eq!(pieces, expected);
1375 }
1376
1377 #[test]
1378 fn test_insert_size_over_capacity() {
1379 let cache: RawCache<Fifo<Vec<u8>, Vec<u8>, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1380 capacity: 4 * 1024, shards: 1,
1382 eviction_config: FifoConfig::default(),
1383 hash_builder: Default::default(),
1384 weighter: Arc::new(|k, v| k.len() + v.len()),
1385 filter: Arc::new(|_, _| true),
1386 event_listener: None,
1387 metrics: Arc::new(Metrics::noop()),
1388 });
1389
1390 let key = vec![b'k'; 1024]; let value = vec![b'v'; 5 * 1024]; cache.insert(key.clone(), value.clone());
1394 assert_eq!(cache.usage(), 6 * 1024);
1395 assert_eq!(cache.get(&key).unwrap().value(), &value);
1396 }
1397
1398 fn test_resize<E>(cache: &RawCache<E, ModHasher, HashTableIndexer<E>>)
1399 where
1400 E: Eviction<Key = u64, Value = u64>,
1401 {
1402 let capacity = cache.capacity();
1403 for i in 0..capacity as u64 * 2 {
1404 cache.insert(i, i);
1405 }
1406 assert_eq!(cache.usage(), capacity);
1407 cache.resize(capacity / 2).unwrap();
1408 assert_eq!(cache.usage(), capacity / 2);
1409 for i in 0..capacity as u64 * 2 {
1410 cache.insert(i, i);
1411 }
1412 assert_eq!(cache.usage(), capacity / 2);
1413 }
1414
1415 #[test]
1416 fn test_fifo_cache_resize() {
1417 let cache = fifo_cache_for_test();
1418 test_resize(&cache);
1419 }
1420
1421 #[test]
1422 fn test_s3fifo_cache_resize() {
1423 let cache = s3fifo_cache_for_test();
1424 test_resize(&cache);
1425 }
1426
1427 #[test]
1428 fn test_lru_cache_resize() {
1429 let cache = lru_cache_for_test();
1430 test_resize(&cache);
1431 }
1432
1433 #[test]
1434 fn test_lfu_cache_resize() {
1435 let cache = lfu_cache_for_test();
1436 test_resize(&cache);
1437 }
1438
1439 #[test]
1440 fn test_sieve_cache_resize() {
1441 let cache = sieve_cache_for_test();
1442 test_resize(&cache);
1443 }
1444
1445 mod fuzzy {
1446 use foyer_common::properties::Hint;
1447
1448 use super::*;
1449
1450 fn fuzzy<E, S>(cache: RawCache<E, S>, hints: Vec<Hint>)
1451 where
1452 E: Eviction<Key = u64, Value = u64, Properties = TestProperties>,
1453 S: HashBuilder,
1454 {
1455 let handles = (0..8)
1456 .map(|i| {
1457 let c = cache.clone();
1458 let hints = hints.clone();
1459 std::thread::spawn(move || {
1460 let mut rng = SmallRng::seed_from_u64(i);
1461 for _ in 0..100000 {
1462 let key = rng.next_u64();
1463 if let Some(entry) = c.get(&key) {
1464 assert_eq!(key, *entry);
1465 drop(entry);
1466 continue;
1467 }
1468 let hint = hints.choose(&mut rng).cloned().unwrap();
1469 c.insert_with_properties(key, key, TestProperties::default().with_hint(hint));
1470 }
1471 })
1472 })
1473 .collect_vec();
1474
1475 handles.into_iter().for_each(|handle| handle.join().unwrap());
1476
1477 assert_eq!(cache.usage(), cache.capacity());
1478 }
1479
1480 #[test_log::test]
1481 fn test_fifo_cache_fuzzy() {
1482 let cache: RawCache<Fifo<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1483 capacity: 256,
1484 shards: 4,
1485 eviction_config: FifoConfig::default(),
1486 hash_builder: Default::default(),
1487 weighter: Arc::new(|_, _| 1),
1488 filter: Arc::new(|_, _| true),
1489 event_listener: None,
1490 metrics: Arc::new(Metrics::noop()),
1491 });
1492 let hints = vec![Hint::Normal];
1493 fuzzy(cache, hints);
1494 }
1495
1496 #[test_log::test]
1497 fn test_s3fifo_cache_fuzzy() {
1498 let cache: RawCache<S3Fifo<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1499 capacity: 256,
1500 shards: 4,
1501 eviction_config: S3FifoConfig::default(),
1502 hash_builder: Default::default(),
1503 weighter: Arc::new(|_, _| 1),
1504 filter: Arc::new(|_, _| true),
1505 event_listener: None,
1506 metrics: Arc::new(Metrics::noop()),
1507 });
1508 let hints = vec![Hint::Normal];
1509 fuzzy(cache, hints);
1510 }
1511
1512 #[test_log::test]
1513 fn test_lru_cache_fuzzy() {
1514 let cache: RawCache<Lru<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1515 capacity: 256,
1516 shards: 4,
1517 eviction_config: LruConfig::default(),
1518 hash_builder: Default::default(),
1519 weighter: Arc::new(|_, _| 1),
1520 filter: Arc::new(|_, _| true),
1521 event_listener: None,
1522 metrics: Arc::new(Metrics::noop()),
1523 });
1524 let hints = vec![Hint::Normal, Hint::Low];
1525 fuzzy(cache, hints);
1526 }
1527
1528 #[test_log::test]
1529 fn test_lfu_cache_fuzzy() {
1530 let cache: RawCache<Lfu<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1531 capacity: 256,
1532 shards: 4,
1533 eviction_config: LfuConfig::default(),
1534 hash_builder: Default::default(),
1535 weighter: Arc::new(|_, _| 1),
1536 filter: Arc::new(|_, _| true),
1537 event_listener: None,
1538 metrics: Arc::new(Metrics::noop()),
1539 });
1540 let hints = vec![Hint::Normal];
1541 fuzzy(cache, hints);
1542 }
1543
1544 #[test_log::test]
1545 fn test_sieve_cache_fuzzy() {
1546 let cache: RawCache<Sieve<u64, u64, TestProperties>, ModHasher> = RawCache::new(RawCacheConfig {
1547 capacity: 256,
1548 shards: 4,
1549 eviction_config: SieveConfig {},
1550 hash_builder: Default::default(),
1551 weighter: Arc::new(|_, _| 1),
1552 filter: Arc::new(|_, _| true),
1553 event_listener: None,
1554 metrics: Arc::new(Metrics::noop()),
1555 });
1556 let hints = vec![Hint::Normal];
1557 fuzzy(cache, hints);
1558 }
1559 }
1560}